diff --git a/dao/job.go b/dao/job.go index 82e0a1e47..c5e6f45b3 100644 --- a/dao/job.go +++ b/dao/job.go @@ -3,15 +3,7 @@ package dao import ( "github.com/astaxie/beego/orm" "github.com/vmware/harbor/models" - "github.com/vmware/harbor/utils/log" -) - -const ( - JobPending string = "pending" - JobRunning string = "running" - JobError string = "error" - JobStopped string = "stopped" - JobFinished string = "finished" + // "github.com/vmware/harbor/utils/log" ) func AddJob(entry models.JobEntry) (int64, error) { @@ -32,7 +24,7 @@ func AddJob(entry models.JobEntry) (int64, error) { func AddJobLog(id int64, level string, message string) error { sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())` - log.Debugf("trying to add a log for job:%d", id) + //log.Debugf("trying to add a log for job:%d", id) o := orm.NewOrm() p, err := o.Raw(sql).Prepare() if err != nil { diff --git a/job/imgout/runner.go b/job/imgout/runner.go index 7bd012c8b..54097ee33 100644 --- a/job/imgout/runner.go +++ b/job/imgout/runner.go @@ -2,7 +2,7 @@ package imgout import ( "encoding/json" - "github.com/vmware/harbor/dao" + //"github.com/vmware/harbor/dao" "github.com/vmware/harbor/job" "github.com/vmware/harbor/models" "time" @@ -24,11 +24,11 @@ type ImgPuller struct { logger job.Logger } -func (ip ImgPuller) Enter() error { - ip.logger.Infof("I'm pretending to pull img:%s, then sleep 10s", ip.img) - time.Sleep(10 * time.Second) +func (ip ImgPuller) Enter() (string, error) { + ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img) + time.Sleep(30 * time.Second) ip.logger.Infof("wake up from sleep....") - return nil + return "push-img", nil } type ImgPusher struct { @@ -37,11 +37,11 @@ type ImgPusher struct { logger job.Logger } -func (ip ImgPusher) Enter() error { - ip.logger.Infof("I'm pretending to push img to:%s, then sleep 10s", ip.targetURL) - time.Sleep(10 * time.Second) +func (ip ImgPusher) Enter() (string, error) { + ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL) + time.Sleep(30 * time.Second) ip.logger.Infof("wake up from sleep....") - return nil + return job.JobContinue, nil } func init() { @@ -53,15 +53,7 @@ func (r Runner) Run(je models.JobEntry) error { if err != nil { return err } - path := []string{dao.JobRunning, "pull-img", "push-img", dao.JobFinished} - for _, state := range path { - err := r.EnterState(state) - if err != nil { - r.Logger.Errorf("Error durint transition to state: %s, error: %v", state, err) - r.EnterState(dao.JobError) - break - } - } + r.Start(job.JobRunning) return nil } @@ -73,10 +65,10 @@ func (r *Runner) init(je models.JobEntry) error { return err } r.Logger = job.Logger{je.ID} - r.AddTransition(dao.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger}) + r.AddTransition(job.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger}) //only handle on target for now url := r.parm.Targets[0].URL r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger}) - r.AddTransition("push-img", dao.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, dao.JobFinished}) + r.AddTransition("push-img", job.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, job.JobFinished}) return nil } diff --git a/job/scheduler.go b/job/scheduler.go index 41d7c7ba1..bb265e80a 100644 --- a/job/scheduler.go +++ b/job/scheduler.go @@ -3,10 +3,30 @@ package job import ( "github.com/vmware/harbor/models" "github.com/vmware/harbor/utils/log" + "os" + "strconv" ) +var lock chan bool + +const defaultMaxJobs int64 = 10 + +func init() { + maxJobsEnv := os.Getenv("MAX_CONCURRENT_JOB") + maxJobs, err := strconv.ParseInt(maxJobsEnv, 10, 32) + if err != nil { + log.Warningf("Failed to parse max job setting, error: %v, the default value: %d will be used", err, defaultMaxJobs) + maxJobs = defaultMaxJobs + } + lock = make(chan bool, maxJobs) +} func Schedule(job models.JobEntry) { log.Infof("job: %d will be scheduled", job.ID) //TODO: add support for cron string when needed. - go run(job) + go func() { + lock <- true + defer func() { <-lock }() + log.Infof("running job: %d", job.ID) + run(job) + }() } diff --git a/job/statemachine.go b/job/statemachine.go index f7ec6a01c..f6bc88b92 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -4,10 +4,15 @@ import ( "fmt" "github.com/vmware/harbor/dao" "github.com/vmware/harbor/utils/log" + "sync" ) +// StateHandler handles transition, it associates with each state, will be called when +// SM enters and exits a state during a transition. type StateHandler interface { - Enter() error + // Enter returns the next state, if it returns empty string the SM will hold the current state or + // or decide the next state. + Enter() (string, error) //Exit should be idempotent Exit() error } @@ -16,8 +21,8 @@ type DummyHandler struct { JobID int64 } -func (dh DummyHandler) Enter() error { - return nil +func (dh DummyHandler) Enter() (string, error) { + return "", nil } func (dh DummyHandler) Exit() error { @@ -29,14 +34,28 @@ type StatusUpdater struct { State string } -func (su StatusUpdater) Enter() error { +func (su StatusUpdater) Enter() (string, error) { err := dao.UpdateJobStatus(su.JobID, su.State) if err != nil { log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err) } - return err + var next string = JobContinue + if su.State == JobStopped || su.State == JobError || su.State == JobFinished { + next = "" + } + return next, err } +const ( + JobPending string = "pending" + JobRunning string = "running" + JobError string = "error" + JobStopped string = "stopped" + JobFinished string = "finished" + // statemachine will move to next possible state based on trasition table + JobContinue string = "_continue" +) + type JobSM struct { JobID int64 CurrentState string @@ -45,28 +64,34 @@ type JobSM struct { ForcedStates map[string]struct{} Transitions map[string]map[string]struct{} Handlers map[string]StateHandler + lock *sync.Mutex + desiredState string } -func (sm *JobSM) EnterState(s string) error { +// EnsterState transit the statemachine from the current state to the state in parameter. +// It returns the next state the statemachine should tranit to. +func (sm *JobSM) EnterState(s string) (string, error) { log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s) targets, ok := sm.Transitions[sm.CurrentState] _, exist := targets[s] _, isForced := sm.ForcedStates[s] if !exist && !isForced { - return fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s) + return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s) } exitHandler, ok := sm.Handlers[sm.CurrentState] if ok { if err := exitHandler.Exit(); err != nil { - return err + return "", err } } else { log.Debugf("No handler found for state:%s, skip", sm.CurrentState) } enterHandler, ok := sm.Handlers[s] + var next string = JobContinue + var err error if ok { - if err := enterHandler.Enter(); err != nil { - return err + if next, err = enterHandler.Enter(); err != nil { + return "", err } } else { log.Debugf("No handler found for state:%s, skip", s) @@ -74,7 +99,41 @@ func (sm *JobSM) EnterState(s string) error { sm.PreviousState = sm.CurrentState sm.CurrentState = s log.Debugf("Transition succeeded, current state: %s", s) - return nil + return next, nil +} + +// Start kicks off the statemachine to transit from current state to s, and moves on +// It will search the transit map if the next state is "_continue", and +// will enter error state if there's more than one possible path when next state is "_continue" +func (sm *JobSM) Start(s string) { + n, err := sm.EnterState(s) + log.Debugf("next state from handler: %s", n) + for len(n) > 0 && err == nil { + if d := sm.getDesiredState(); len(d) > 0 { + log.Debugf("Desired state: %s, will ignore the next state from handler") + n = d + sm.setDesiredState("") + continue + } + if n == JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 { + for n = range sm.Transitions[sm.CurrentState] { + break + } + log.Debugf("Continue to state: %s", n) + continue + } + if n == JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 { + log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState])) + err = fmt.Errorf("Unable to continue") + break + } + n, err = sm.EnterState(n) + log.Debugf("next state from handler: %s", n) + } + if err != nil { + log.Warningf("The statemachin will enter error state due to error: %v", err) + sm.EnterState(JobError) + } } func (sm *JobSM) AddTransition(from string, to string, h StateHandler) { @@ -94,11 +153,28 @@ func (sm *JobSM) RemoveTransition(from string, to string) { delete(sm.Transitions[from], to) } +func (sm *JobSM) Stop() { + sm.setDesiredState(JobStopped) +} + +func (sm *JobSM) getDesiredState() string { + sm.lock.Lock() + defer sm.lock.Unlock() + return sm.desiredState +} + +func (sm *JobSM) setDesiredState(s string) { + sm.lock.Lock() + defer sm.lock.Unlock() + sm.desiredState = s +} + func (sm *JobSM) InitJobSM() { + sm.lock = &sync.Mutex{} sm.Handlers = make(map[string]StateHandler) sm.Transitions = make(map[string]map[string]struct{}) - sm.CurrentState = dao.JobPending - log.Debugf("sm.Handlers: %v", sm.Handlers) - sm.AddTransition(dao.JobPending, dao.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobRunning}) - sm.Handlers[dao.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobError} + sm.CurrentState = JobPending + sm.AddTransition(JobPending, JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, JobRunning}) + sm.Handlers[JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobError} + sm.Handlers[JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobStopped} }