From 2ffcf10eaa9ff2d601f923fa2ff96978dcc07b52 Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Wed, 16 Aug 2017 16:02:36 +0800 Subject: [PATCH] restart scan jobs when jobservice is started --- src/common/dao/dao_test.go | 17 +++++++++++++++++ src/common/dao/replication_job.go | 7 ++++++- src/common/dao/scan_job.go | 11 +++++++++++ src/jobservice/main.go | 19 ++++++++++++++----- 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/src/common/dao/dao_test.go b/src/common/dao/dao_test.go index 8d8c8375a..f37c91ea2 100644 --- a/src/common/dao/dao_test.go +++ b/src/common/dao/dao_test.go @@ -1774,3 +1774,20 @@ func TestListScanOverviews(t *testing.T) { err = ClearTable(models.ScanOverviewTable) assert.Nil(err) } + +func TestGetScanJobsByStatus(t *testing.T) { + assert := assert.New(t) + err := ClearTable(models.ScanOverviewTable) + assert.Nil(err) + id, err := AddScanJob(sj1) + assert.Nil(err) + err = UpdateScanJobStatus(id, models.JobRunning) + assert.Nil(err) + r1, err := GetScanJobsByStatus(models.JobPending, models.JobCanceled) + assert.Nil(err) + assert.Equal(0, len(r1)) + r2, err := GetScanJobsByStatus(models.JobPending, models.JobRunning) + assert.Nil(err) + assert.Equal(1, len(r2)) + assert.Equal(sj1.Repository, r2[0].Repository) +} diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index e5b061eff..42133f857 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -404,11 +404,16 @@ func UpdateRepJobStatus(id int64, status string) error { return err } -// ResetRunningJobs update all running jobs status to pending +// ResetRunningJobs update all running jobs status to pending, including replication jobs and scan jobs. func ResetRunningJobs() error { o := GetOrmer() sql := fmt.Sprintf("update replication_job set status = '%s', update_time = ? where status = '%s'", models.JobPending, models.JobRunning) _, err := o.Raw(sql, time.Now()).Exec() + if err != nil { + return err + } + sql = fmt.Sprintf("update %s set status = '%s', update_time = ? where status = '%s'", models.ScanJobTable, models.JobPending, models.JobRunning) + _, err = o.Raw(sql, time.Now()).Exec() return err } diff --git a/src/common/dao/scan_job.go b/src/common/dao/scan_job.go index d3bf1c982..2fd666fe1 100644 --- a/src/common/dao/scan_job.go +++ b/src/common/dao/scan_job.go @@ -57,6 +57,17 @@ func GetScanJobsByDigest(digest string, limit ...int) ([]*models.ScanJob, error) return res, err } +// GetScanJobsByStatus return a list of scan jobs with any of the given statuses in param +func GetScanJobsByStatus(status ...string) ([]*models.ScanJob, error) { + var res []*models.ScanJob + var t []interface{} + for _, s := range status { + t = append(t, interface{}(s)) + } + _, err := scanJobQs().Filter("status__in", t...).All(&res) + return res, err +} + // UpdateScanJobStatus updates the status of a scan job. func UpdateScanJobStatus(id int64, status string) error { o := GetOrmer() diff --git a/src/jobservice/main.go b/src/jobservice/main.go index e6e9c1546..6ddcaaa1c 100644 --- a/src/jobservice/main.go +++ b/src/jobservice/main.go @@ -51,21 +51,30 @@ func main() { } func resumeJobs() { - //TODO: may need to resume scan jobs also? log.Debugf("Trying to resume halted jobs...") err := dao.ResetRunningJobs() if err != nil { log.Warningf("Failed to reset all running jobs to pending, error: %v", err) } - jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying) + rjobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying, models.JobRunning) if err == nil { - for _, j := range jobs { + for _, j := range rjobs { rj := job.NewRepJob(j.ID) - log.Debugf("Resuming job: %v", rj) + log.Debugf("Resuming replication job: %v", rj) job.Schedule(rj) } } else { - log.Warningf("Failed to jobs to resume, error: %v", err) + log.Warningf("Failed to resume replication jobs, error: %v", err) + } + sjobs, err := dao.GetScanJobsByStatus(models.JobPending, models.JobRetrying, models.JobRunning) + if err == nil { + for _, j := range sjobs { + sj := job.NewScanJob(j.ID) + log.Debugf("Resuming scan job: %v", sj) + job.Schedule(sj) + } + } else { + log.Warningf("Failed to resume scan jobs, error: %v", err) } }