diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index 7c6d2d805..b97319cf0 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -334,7 +334,8 @@ func FilterRepJobs(policyID int64, repository string, status []string, startTime // GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy. func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) { var res []*models.RepJob - _, err := repJobPolicyIDQs(policyID).Filter("status__in", models.JobPending, models.JobRunning).All(&res) + _, err := repJobPolicyIDQs(policyID).Filter("status__in", + models.JobPending, models.JobRunning, models.JobRetrying).All(&res) genTagListForJob(res...) return res, err } diff --git a/src/jobservice/api/replication.go b/src/jobservice/api/replication.go index f9d579560..84f96a675 100644 --- a/src/jobservice/api/replication.go +++ b/src/jobservice/api/replication.go @@ -137,8 +137,31 @@ func (rj *ReplicationJob) HandleAction() { rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop") return } + + runningJobs := []*models.RepJob{} + pendingAndRetryingJobs := []*models.RepJob{} + for _, job := range jobs { + if job.Status == models.JobRunning { + runningJobs = append(runningJobs, job) + continue + } + pendingAndRetryingJobs = append(pendingAndRetryingJobs, job) + } + + // stop pending and retrying jobs by updating job status in database + // when the jobs are dispatched, the status will be checked first + for _, job := range pendingAndRetryingJobs { + id := job.ID + if err := dao.UpdateRepJobStatus(id, models.JobStopped); err != nil { + log.Errorf("failed to update the status of job %d: %v", id, err) + continue + } + log.Debugf("the status of job %d is updated to %s", id, models.JobStopped) + } + + // stop running jobs in statemachine var repJobs []job.Job - for _, j := range jobs { + for _, j := range runningJobs { //transform the data record to job struct that can be handled by state machine. repJob := job.NewRepJob(j.ID) repJobs = append(repJobs, repJob) diff --git a/src/jobservice/job/jobs.go b/src/jobservice/job/jobs.go index 1752f9164..67d7523fe 100644 --- a/src/jobservice/job/jobs.go +++ b/src/jobservice/job/jobs.go @@ -50,6 +50,7 @@ type Job interface { Type() Type LogPath() string UpdateStatus(status string) error + GetStatus() (string, error) Init() error //Parm() interface{} } @@ -152,6 +153,18 @@ func (rj *RepJob) Init() error { return nil } +// GetStatus returns the status of the job +func (rj *RepJob) GetStatus() (string, error) { + job, err := dao.GetRepJob(rj.id) + if err != nil { + return "", err + } + if job == nil { + return "", fmt.Errorf("replication job %v not found", rj.id) + } + return job.Status, nil +} + // NewRepJob returns a pointer to RepJob which implements the Job interface. // Given API only gets the id, it will call this func to get a instance that can be manuevered by state machine. func NewRepJob(id int64) *RepJob { @@ -217,6 +230,18 @@ func (sj *ScanJob) Init() error { return nil } +// GetStatus returns the status of the job +func (sj *ScanJob) GetStatus() (string, error) { + job, err := dao.GetScanJob(sj.id) + if err != nil { + return "", err + } + if job == nil { + return "", fmt.Errorf("scan job %d not found", sj.id) + } + return job.Status, nil +} + //NewScanJob creates a instance of ScanJob by id. func NewScanJob(id int64) *ScanJob { return &ScanJob{id: id} diff --git a/src/jobservice/job/scheduler.go b/src/jobservice/job/scheduler.go index 38068f302..4d38435c4 100644 --- a/src/jobservice/job/scheduler.go +++ b/src/jobservice/job/scheduler.go @@ -15,8 +15,9 @@ package job import ( - "github.com/vmware/harbor/src/common/utils/log" "time" + + "github.com/vmware/harbor/src/common/utils/log" ) var jobQueue = make(chan Job) diff --git a/src/jobservice/job/workerpool.go b/src/jobservice/job/workerpool.go index 910c55f4d..e0ac8226b 100644 --- a/src/jobservice/job/workerpool.go +++ b/src/jobservice/job/workerpool.go @@ -16,6 +16,7 @@ package job import ( "fmt" + "strings" "sync" "github.com/vmware/harbor/src/common/models" @@ -155,9 +156,29 @@ func Dispatch() { select { case job := <-jobQueue: go func(job Job) { - log.Debugf("Trying to dispatch job: %v", job) + jobID := job.ID() + jobType := strings.ToLower(job.Type().String()) + log.Debugf("trying to dispatch %s job %d ...", jobType, jobID) worker := <-WorkerPools[job.Type()].workerChan + + status, err := job.GetStatus() + if err != nil { + // put the work back to the worker pool + worker.queue <- worker + log.Errorf("failed to get status of %s job %d: %v", jobType, jobID, err) + return + } + + // check the status of job before dispatching + if status == models.JobStopped { + // put the work back to the worker pool + worker.queue <- worker + log.Debugf("%s job %d is stopped, skip dispatching", jobType, jobID) + return + } + worker.Jobs <- job + log.Debugf("%s job %d dispatched successfully", jobType, jobID) }(job) } }