package job import ( "github.com/vmware/harbor/dao" "github.com/vmware/harbor/job/config" "github.com/vmware/harbor/models" "github.com/vmware/harbor/utils/log" ) type workerPool struct { workerChan chan *Worker workerList []*Worker } // WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs. // it consists of a channel for free workers and a list to all workers var WorkerPool *workerPool // StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker. func (wp *workerPool) StopJobs(jobs []int64) { log.Debugf("Works working on jobs: %v will be stopped", jobs) for _, id := range jobs { for _, w := range wp.workerList { if w.SM.JobID == id { log.Debugf("found a worker whose job ID is %d, will try to stop it", id) w.SM.Stop(id) } } } } // Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine, // the actual work to handle the job is done via state machine. type Worker struct { ID int RepJobs chan int64 SM *SM quit chan bool } // Start is a loop worker gets id from its channel and handle it. func (w *Worker) Start() { go func() { for { WorkerPool.workerChan <- w select { case jobID := <-w.RepJobs: log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) w.handleRepJob(jobID) case q := <-w.quit: if q { log.Debugf("worker: %d, will stop.", w.ID) return } } } }() } // Stop ... func (w *Worker) Stop() { go func() { w.quit <- true }() } func (w *Worker) handleRepJob(id int64) { err := w.SM.Reset(id) if err != nil { log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err) err2 := dao.UpdateRepJobStatus(id, models.JobError) if err2 != nil { log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2) } return } if w.SM.Parms.Enabled == 0 { log.Debugf("The policy of job:%d is disabled, will cancel the job") _ = dao.UpdateRepJobStatus(id, models.JobCanceled) } else { w.SM.Start(models.JobRunning) } } // NewWorker returns a pointer to new instance of worker func NewWorker(id int) *Worker { w := &Worker{ ID: id, RepJobs: make(chan int64), quit: make(chan bool), SM: &SM{}, } w.SM.Init() return w } // InitWorkerPool create workers according to configuration. func InitWorkerPool() { WorkerPool = &workerPool{ workerChan: make(chan *Worker, config.MaxJobWorkers()), workerList: make([]*Worker, 0, config.MaxJobWorkers()), } for i := 0; i < config.MaxJobWorkers(); i++ { worker := NewWorker(i) WorkerPool.workerList = append(WorkerPool.workerList, worker) worker.Start() log.Debugf("worker %d started", worker.ID) } } // Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it. func Dispatch() { for { select { case job := <-jobQueue: go func(jobID int64) { log.Debugf("Trying to dispatch job: %d", jobID) worker := <-WorkerPool.workerChan worker.RepJobs <- jobID }(job) } } }