restart scan jobs when jobservice is started

This commit is contained in:
Tan Jiang 2017-08-16 16:02:36 +08:00
parent bb00c2c2cf
commit 2ffcf10eaa
4 changed files with 48 additions and 6 deletions

View File

@ -1774,3 +1774,20 @@ func TestListScanOverviews(t *testing.T) {
err = ClearTable(models.ScanOverviewTable) err = ClearTable(models.ScanOverviewTable)
assert.Nil(err) assert.Nil(err)
} }
func TestGetScanJobsByStatus(t *testing.T) {
assert := assert.New(t)
err := ClearTable(models.ScanOverviewTable)
assert.Nil(err)
id, err := AddScanJob(sj1)
assert.Nil(err)
err = UpdateScanJobStatus(id, models.JobRunning)
assert.Nil(err)
r1, err := GetScanJobsByStatus(models.JobPending, models.JobCanceled)
assert.Nil(err)
assert.Equal(0, len(r1))
r2, err := GetScanJobsByStatus(models.JobPending, models.JobRunning)
assert.Nil(err)
assert.Equal(1, len(r2))
assert.Equal(sj1.Repository, r2[0].Repository)
}

View File

@ -404,11 +404,16 @@ func UpdateRepJobStatus(id int64, status string) error {
return err return err
} }
// ResetRunningJobs update all running jobs status to pending // ResetRunningJobs update all running jobs status to pending, including replication jobs and scan jobs.
func ResetRunningJobs() error { func ResetRunningJobs() error {
o := GetOrmer() o := GetOrmer()
sql := fmt.Sprintf("update replication_job set status = '%s', update_time = ? where status = '%s'", models.JobPending, models.JobRunning) sql := fmt.Sprintf("update replication_job set status = '%s', update_time = ? where status = '%s'", models.JobPending, models.JobRunning)
_, err := o.Raw(sql, time.Now()).Exec() _, err := o.Raw(sql, time.Now()).Exec()
if err != nil {
return err
}
sql = fmt.Sprintf("update %s set status = '%s', update_time = ? where status = '%s'", models.ScanJobTable, models.JobPending, models.JobRunning)
_, err = o.Raw(sql, time.Now()).Exec()
return err return err
} }

View File

@ -57,6 +57,17 @@ func GetScanJobsByDigest(digest string, limit ...int) ([]*models.ScanJob, error)
return res, err return res, err
} }
// GetScanJobsByStatus return a list of scan jobs with any of the given statuses in param
func GetScanJobsByStatus(status ...string) ([]*models.ScanJob, error) {
var res []*models.ScanJob
var t []interface{}
for _, s := range status {
t = append(t, interface{}(s))
}
_, err := scanJobQs().Filter("status__in", t...).All(&res)
return res, err
}
// UpdateScanJobStatus updates the status of a scan job. // UpdateScanJobStatus updates the status of a scan job.
func UpdateScanJobStatus(id int64, status string) error { func UpdateScanJobStatus(id int64, status string) error {
o := GetOrmer() o := GetOrmer()

View File

@ -51,21 +51,30 @@ func main() {
} }
func resumeJobs() { func resumeJobs() {
//TODO: may need to resume scan jobs also?
log.Debugf("Trying to resume halted jobs...") log.Debugf("Trying to resume halted jobs...")
err := dao.ResetRunningJobs() err := dao.ResetRunningJobs()
if err != nil { if err != nil {
log.Warningf("Failed to reset all running jobs to pending, error: %v", err) log.Warningf("Failed to reset all running jobs to pending, error: %v", err)
} }
jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying) rjobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying, models.JobRunning)
if err == nil { if err == nil {
for _, j := range jobs { for _, j := range rjobs {
rj := job.NewRepJob(j.ID) rj := job.NewRepJob(j.ID)
log.Debugf("Resuming job: %v", rj) log.Debugf("Resuming replication job: %v", rj)
job.Schedule(rj) job.Schedule(rj)
} }
} else { } else {
log.Warningf("Failed to jobs to resume, error: %v", err) log.Warningf("Failed to resume replication jobs, error: %v", err)
}
sjobs, err := dao.GetScanJobsByStatus(models.JobPending, models.JobRetrying, models.JobRunning)
if err == nil {
for _, j := range sjobs {
sj := job.NewScanJob(j.ID)
log.Debugf("Resuming scan job: %v", sj)
job.Schedule(sj)
}
} else {
log.Warningf("Failed to resume scan jobs, error: %v", err)
} }
} }