diff --git a/job/workerpool.go b/job/workerpool.go index 44d0e2762..e55126078 100644 --- a/job/workerpool.go +++ b/job/workerpool.go @@ -7,7 +7,24 @@ import ( "github.com/vmware/harbor/utils/log" ) -var WorkerPool chan *Worker +type workerPool struct { + workerChan chan *Worker + workerList []*Worker +} + +var WorkerPool *workerPool + +func (wp *workerPool) StopJobs(jobs []int64) { + log.Debugf("Works working on jobs: %v will be stopped", jobs) + for _, id := range jobs { + for _, w := range wp.workerList { + if w.SM.JobID == id { + log.Debugf("found a worker whose job ID is %d, will try to stop it", id) + w.SM.Stop(id) + } + } + } +} type Worker struct { ID int @@ -19,7 +36,7 @@ type Worker struct { func (w *Worker) Start() { go func() { for { - WorkerPool <- w + WorkerPool.workerChan <- w select { case jobID := <-w.RepJobs: log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) @@ -43,10 +60,10 @@ func (w *Worker) Stop() { func (w *Worker) handleRepJob(id int64) { err := w.SM.Reset(id) if err != nil { - log.Errorf("Worker %d, failed to re-initialize statemachine, error: %v", w.ID, err) + log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err) err2 := dao.UpdateRepJobStatus(id, models.JobError) if err2 != nil { - log.Errorf("Failed to update job status to ERROR, error:%v", err2) + log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2) } return } @@ -70,9 +87,13 @@ func NewWorker(id int) *Worker { } func InitWorkerPool() { - WorkerPool = make(chan *Worker, config.MaxJobWorkers()) + WorkerPool = &workerPool{ + workerChan: make(chan *Worker, config.MaxJobWorkers()), + workerList: make([]*Worker, 0, config.MaxJobWorkers()), + } for i := 0; i < config.MaxJobWorkers(); i++ { worker := NewWorker(i) + WorkerPool.workerList = append(WorkerPool.workerList, worker) worker.Start() log.Debugf("worker %d started", worker.ID) } @@ -84,7 +105,7 @@ func Dispatch() { case job := <-JobQueue: go func(jobID int64) { log.Debugf("Trying to dispatch job: %d", jobID) - worker := <-WorkerPool + worker := <-WorkerPool.workerChan worker.RepJobs <- jobID }(job) }