mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-30 14:13:43 +01:00
Merge remote-tracking branch 'upstream/job-service' into sync_image
This commit is contained in:
commit
e8de73585e
12
dao/job.go
12
dao/job.go
@ -3,15 +3,7 @@ package dao
|
|||||||
import (
|
import (
|
||||||
"github.com/astaxie/beego/orm"
|
"github.com/astaxie/beego/orm"
|
||||||
"github.com/vmware/harbor/models"
|
"github.com/vmware/harbor/models"
|
||||||
"github.com/vmware/harbor/utils/log"
|
// "github.com/vmware/harbor/utils/log"
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
JobPending string = "pending"
|
|
||||||
JobRunning string = "running"
|
|
||||||
JobError string = "error"
|
|
||||||
JobStopped string = "stopped"
|
|
||||||
JobFinished string = "finished"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func AddJob(entry models.JobEntry) (int64, error) {
|
func AddJob(entry models.JobEntry) (int64, error) {
|
||||||
@ -32,7 +24,7 @@ func AddJob(entry models.JobEntry) (int64, error) {
|
|||||||
|
|
||||||
func AddJobLog(id int64, level string, message string) error {
|
func AddJobLog(id int64, level string, message string) error {
|
||||||
sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())`
|
sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())`
|
||||||
log.Debugf("trying to add a log for job:%d", id)
|
//log.Debugf("trying to add a log for job:%d", id)
|
||||||
o := orm.NewOrm()
|
o := orm.NewOrm()
|
||||||
p, err := o.Raw(sql).Prepare()
|
p, err := o.Raw(sql).Prepare()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2,7 +2,7 @@ package imgout
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/vmware/harbor/dao"
|
//"github.com/vmware/harbor/dao"
|
||||||
"github.com/vmware/harbor/job"
|
"github.com/vmware/harbor/job"
|
||||||
"github.com/vmware/harbor/models"
|
"github.com/vmware/harbor/models"
|
||||||
"time"
|
"time"
|
||||||
@ -24,11 +24,11 @@ type ImgPuller struct {
|
|||||||
logger job.Logger
|
logger job.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ip ImgPuller) Enter() error {
|
func (ip ImgPuller) Enter() (string, error) {
|
||||||
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 10s", ip.img)
|
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(30 * time.Second)
|
||||||
ip.logger.Infof("wake up from sleep....")
|
ip.logger.Infof("wake up from sleep....")
|
||||||
return nil
|
return "push-img", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ImgPusher struct {
|
type ImgPusher struct {
|
||||||
@ -37,11 +37,11 @@ type ImgPusher struct {
|
|||||||
logger job.Logger
|
logger job.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ip ImgPusher) Enter() error {
|
func (ip ImgPusher) Enter() (string, error) {
|
||||||
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 10s", ip.targetURL)
|
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(30 * time.Second)
|
||||||
ip.logger.Infof("wake up from sleep....")
|
ip.logger.Infof("wake up from sleep....")
|
||||||
return nil
|
return job.JobContinue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -53,15 +53,7 @@ func (r Runner) Run(je models.JobEntry) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
path := []string{dao.JobRunning, "pull-img", "push-img", dao.JobFinished}
|
r.Start(job.JobRunning)
|
||||||
for _, state := range path {
|
|
||||||
err := r.EnterState(state)
|
|
||||||
if err != nil {
|
|
||||||
r.Logger.Errorf("Error durint transition to state: %s, error: %v", state, err)
|
|
||||||
r.EnterState(dao.JobError)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,10 +65,10 @@ func (r *Runner) init(je models.JobEntry) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.Logger = job.Logger{je.ID}
|
r.Logger = job.Logger{je.ID}
|
||||||
r.AddTransition(dao.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger})
|
r.AddTransition(job.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger})
|
||||||
//only handle on target for now
|
//only handle on target for now
|
||||||
url := r.parm.Targets[0].URL
|
url := r.parm.Targets[0].URL
|
||||||
r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger})
|
r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger})
|
||||||
r.AddTransition("push-img", dao.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, dao.JobFinished})
|
r.AddTransition("push-img", job.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, job.JobFinished})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -3,10 +3,30 @@ package job
|
|||||||
import (
|
import (
|
||||||
"github.com/vmware/harbor/models"
|
"github.com/vmware/harbor/models"
|
||||||
"github.com/vmware/harbor/utils/log"
|
"github.com/vmware/harbor/utils/log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var lock chan bool
|
||||||
|
|
||||||
|
const defaultMaxJobs int64 = 10
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
maxJobsEnv := os.Getenv("MAX_CONCURRENT_JOB")
|
||||||
|
maxJobs, err := strconv.ParseInt(maxJobsEnv, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("Failed to parse max job setting, error: %v, the default value: %d will be used", err, defaultMaxJobs)
|
||||||
|
maxJobs = defaultMaxJobs
|
||||||
|
}
|
||||||
|
lock = make(chan bool, maxJobs)
|
||||||
|
}
|
||||||
func Schedule(job models.JobEntry) {
|
func Schedule(job models.JobEntry) {
|
||||||
log.Infof("job: %d will be scheduled", job.ID)
|
log.Infof("job: %d will be scheduled", job.ID)
|
||||||
//TODO: add support for cron string when needed.
|
//TODO: add support for cron string when needed.
|
||||||
go run(job)
|
go func() {
|
||||||
|
lock <- true
|
||||||
|
defer func() { <-lock }()
|
||||||
|
log.Infof("running job: %d", job.ID)
|
||||||
|
run(job)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
@ -4,10 +4,15 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/vmware/harbor/dao"
|
"github.com/vmware/harbor/dao"
|
||||||
"github.com/vmware/harbor/utils/log"
|
"github.com/vmware/harbor/utils/log"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// StateHandler handles transition, it associates with each state, will be called when
|
||||||
|
// SM enters and exits a state during a transition.
|
||||||
type StateHandler interface {
|
type StateHandler interface {
|
||||||
Enter() error
|
// Enter returns the next state, if it returns empty string the SM will hold the current state or
|
||||||
|
// or decide the next state.
|
||||||
|
Enter() (string, error)
|
||||||
//Exit should be idempotent
|
//Exit should be idempotent
|
||||||
Exit() error
|
Exit() error
|
||||||
}
|
}
|
||||||
@ -16,8 +21,8 @@ type DummyHandler struct {
|
|||||||
JobID int64
|
JobID int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dh DummyHandler) Enter() error {
|
func (dh DummyHandler) Enter() (string, error) {
|
||||||
return nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dh DummyHandler) Exit() error {
|
func (dh DummyHandler) Exit() error {
|
||||||
@ -29,13 +34,27 @@ type StatusUpdater struct {
|
|||||||
State string
|
State string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (su StatusUpdater) Enter() error {
|
func (su StatusUpdater) Enter() (string, error) {
|
||||||
err := dao.UpdateJobStatus(su.JobID, su.State)
|
err := dao.UpdateJobStatus(su.JobID, su.State)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
|
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
|
||||||
}
|
}
|
||||||
return err
|
var next string = JobContinue
|
||||||
|
if su.State == JobStopped || su.State == JobError || su.State == JobFinished {
|
||||||
|
next = ""
|
||||||
}
|
}
|
||||||
|
return next, err
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobPending string = "pending"
|
||||||
|
JobRunning string = "running"
|
||||||
|
JobError string = "error"
|
||||||
|
JobStopped string = "stopped"
|
||||||
|
JobFinished string = "finished"
|
||||||
|
// statemachine will move to next possible state based on trasition table
|
||||||
|
JobContinue string = "_continue"
|
||||||
|
)
|
||||||
|
|
||||||
type JobSM struct {
|
type JobSM struct {
|
||||||
JobID int64
|
JobID int64
|
||||||
@ -45,28 +64,34 @@ type JobSM struct {
|
|||||||
ForcedStates map[string]struct{}
|
ForcedStates map[string]struct{}
|
||||||
Transitions map[string]map[string]struct{}
|
Transitions map[string]map[string]struct{}
|
||||||
Handlers map[string]StateHandler
|
Handlers map[string]StateHandler
|
||||||
|
lock *sync.Mutex
|
||||||
|
desiredState string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *JobSM) EnterState(s string) error {
|
// EnsterState transit the statemachine from the current state to the state in parameter.
|
||||||
|
// It returns the next state the statemachine should tranit to.
|
||||||
|
func (sm *JobSM) EnterState(s string) (string, error) {
|
||||||
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
|
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
|
||||||
targets, ok := sm.Transitions[sm.CurrentState]
|
targets, ok := sm.Transitions[sm.CurrentState]
|
||||||
_, exist := targets[s]
|
_, exist := targets[s]
|
||||||
_, isForced := sm.ForcedStates[s]
|
_, isForced := sm.ForcedStates[s]
|
||||||
if !exist && !isForced {
|
if !exist && !isForced {
|
||||||
return fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
|
return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
|
||||||
}
|
}
|
||||||
exitHandler, ok := sm.Handlers[sm.CurrentState]
|
exitHandler, ok := sm.Handlers[sm.CurrentState]
|
||||||
if ok {
|
if ok {
|
||||||
if err := exitHandler.Exit(); err != nil {
|
if err := exitHandler.Exit(); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
|
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
|
||||||
}
|
}
|
||||||
enterHandler, ok := sm.Handlers[s]
|
enterHandler, ok := sm.Handlers[s]
|
||||||
|
var next string = JobContinue
|
||||||
|
var err error
|
||||||
if ok {
|
if ok {
|
||||||
if err := enterHandler.Enter(); err != nil {
|
if next, err = enterHandler.Enter(); err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("No handler found for state:%s, skip", s)
|
log.Debugf("No handler found for state:%s, skip", s)
|
||||||
@ -74,7 +99,41 @@ func (sm *JobSM) EnterState(s string) error {
|
|||||||
sm.PreviousState = sm.CurrentState
|
sm.PreviousState = sm.CurrentState
|
||||||
sm.CurrentState = s
|
sm.CurrentState = s
|
||||||
log.Debugf("Transition succeeded, current state: %s", s)
|
log.Debugf("Transition succeeded, current state: %s", s)
|
||||||
return nil
|
return next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start kicks off the statemachine to transit from current state to s, and moves on
|
||||||
|
// It will search the transit map if the next state is "_continue", and
|
||||||
|
// will enter error state if there's more than one possible path when next state is "_continue"
|
||||||
|
func (sm *JobSM) Start(s string) {
|
||||||
|
n, err := sm.EnterState(s)
|
||||||
|
log.Debugf("next state from handler: %s", n)
|
||||||
|
for len(n) > 0 && err == nil {
|
||||||
|
if d := sm.getDesiredState(); len(d) > 0 {
|
||||||
|
log.Debugf("Desired state: %s, will ignore the next state from handler")
|
||||||
|
n = d
|
||||||
|
sm.setDesiredState("")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if n == JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
|
||||||
|
for n = range sm.Transitions[sm.CurrentState] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Debugf("Continue to state: %s", n)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if n == JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
|
||||||
|
log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState]))
|
||||||
|
err = fmt.Errorf("Unable to continue")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
n, err = sm.EnterState(n)
|
||||||
|
log.Debugf("next state from handler: %s", n)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("The statemachin will enter error state due to error: %v", err)
|
||||||
|
sm.EnterState(JobError)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
|
func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
|
||||||
@ -94,11 +153,28 @@ func (sm *JobSM) RemoveTransition(from string, to string) {
|
|||||||
delete(sm.Transitions[from], to)
|
delete(sm.Transitions[from], to)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *JobSM) Stop() {
|
||||||
|
sm.setDesiredState(JobStopped)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *JobSM) getDesiredState() string {
|
||||||
|
sm.lock.Lock()
|
||||||
|
defer sm.lock.Unlock()
|
||||||
|
return sm.desiredState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *JobSM) setDesiredState(s string) {
|
||||||
|
sm.lock.Lock()
|
||||||
|
defer sm.lock.Unlock()
|
||||||
|
sm.desiredState = s
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *JobSM) InitJobSM() {
|
func (sm *JobSM) InitJobSM() {
|
||||||
|
sm.lock = &sync.Mutex{}
|
||||||
sm.Handlers = make(map[string]StateHandler)
|
sm.Handlers = make(map[string]StateHandler)
|
||||||
sm.Transitions = make(map[string]map[string]struct{})
|
sm.Transitions = make(map[string]map[string]struct{})
|
||||||
sm.CurrentState = dao.JobPending
|
sm.CurrentState = JobPending
|
||||||
log.Debugf("sm.Handlers: %v", sm.Handlers)
|
sm.AddTransition(JobPending, JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, JobRunning})
|
||||||
sm.AddTransition(dao.JobPending, dao.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobRunning})
|
sm.Handlers[JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobError}
|
||||||
sm.Handlers[dao.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobError}
|
sm.Handlers[JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobStopped}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user