From 661470e7bce9a9311cdd019fbe7fc966e416dd88 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 22 Aug 2019 18:31:03 +0800 Subject: [PATCH] Add status revision to retention task to handle retrying Add status revision to retention task to handle retrying Signed-off-by: Wenkai Yin --- .../postgresql/0010_1.9.0_schema.up.sql | 1 + .../service/notifications/jobs/handler.go | 22 ++--- src/pkg/retention/dao/models/retention.go | 21 +++-- src/pkg/retention/dao/retention.go | 57 ++++++++---- src/pkg/retention/dao/retention_test.go | 3 +- src/pkg/retention/launcher_test.go | 2 +- src/pkg/retention/manager.go | 92 ++++++++++--------- src/pkg/retention/manager_test.go | 29 +++--- src/pkg/retention/models.go | 21 +++-- 9 files changed, 135 insertions(+), 113 deletions(-) diff --git a/make/migrations/postgresql/0010_1.9.0_schema.up.sql b/make/migrations/postgresql/0010_1.9.0_schema.up.sql index 261b6d9a0..80725fbe4 100644 --- a/make/migrations/postgresql/0010_1.9.0_schema.up.sql +++ b/make/migrations/postgresql/0010_1.9.0_schema.up.sql @@ -133,6 +133,7 @@ create table retention_task job_id varchar(64), status varchar(32), status_code integer, + status_revision integer, start_time timestamp default CURRENT_TIMESTAMP, end_time timestamp default CURRENT_TIMESTAMP, total integer, diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index 47377f9cc..bfd79c889 100755 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -20,7 +20,6 @@ import ( "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/job" - jobmodels "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" @@ -48,6 +47,7 @@ type Handler struct { status string rawStatus string checkIn string + revision int64 } // Prepare ... @@ -60,7 +60,7 @@ func (h *Handler) Prepare() { return } h.id = id - var data jobmodels.JobStatusChange + var data jjob.StatusChange err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data) if err != nil { log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err) @@ -76,6 +76,9 @@ func (h *Handler) Prepare() { } h.status = status h.checkIn = data.CheckIn + if data.Metadata != nil { + h.revision = data.Metadata.Revision + } } // HandleScan handles the webhook of scan job @@ -138,24 +141,11 @@ func (h *Handler) HandleRetentionTask() { } // handle status updating - if err := mgr.UpdateTaskStatus(taskID, status); err != nil { + if err := mgr.UpdateTaskStatus(taskID, status, h.revision); err != nil { log.Errorf("failed to update the status of retention task %d: %v", taskID, err) h.SendInternalServerError(err) return } - // if the status is the final status, update the end time - if status == jjob.StoppedStatus.String() || status == jjob.SuccessStatus.String() || - status == jjob.ErrorStatus.String() { - task := &retention.Task{ - ID: taskID, - EndTime: time.Now(), - } - if err := mgr.UpdateTask(task, "EndTime"); err != nil { - log.Errorf("failed to update of retention task %d: %v", taskID, err) - h.SendInternalServerError(err) - return - } - } } // HandleNotificationJob handles the hook of notification job diff --git a/src/pkg/retention/dao/models/retention.go b/src/pkg/retention/dao/models/retention.go index 8e7d94590..b101a87dc 100644 --- a/src/pkg/retention/dao/models/retention.go +++ b/src/pkg/retention/dao/models/retention.go @@ -49,14 +49,15 @@ type RetentionExecution struct { // RetentionTask ... type RetentionTask struct { - ID int64 `orm:"pk;auto;column(id)"` - ExecutionID int64 `orm:"column(execution_id)"` - Repository string `orm:"column(repository)"` - JobID string `orm:"column(job_id)"` - Status string `orm:"column(status)"` - StatusCode int `orm:"column(status_code)"` - StartTime time.Time `orm:"column(start_time)"` - EndTime time.Time `orm:"column(end_time)"` - Total int `orm:"column(total)"` - Retained int `orm:"column(retained)"` + ID int64 `orm:"pk;auto;column(id)"` + ExecutionID int64 `orm:"column(execution_id)"` + Repository string `orm:"column(repository)"` + JobID string `orm:"column(job_id)"` + Status string `orm:"column(status)"` + StatusCode int `orm:"column(status_code)"` // For order the different statuses + StatusRevision int64 `orm:"column(status_revision)"` // For differentiating the each retry of the same job + StartTime time.Time `orm:"column(start_time)"` + EndTime time.Time `orm:"column(end_time)"` + Total int `orm:"column(total)"` + Retained int `orm:"column(retained)"` } diff --git a/src/pkg/retention/dao/retention.go b/src/pkg/retention/dao/retention.go index 9956c0c75..2a4e5970d 100644 --- a/src/pkg/retention/dao/retention.go +++ b/src/pkg/retention/dao/retention.go @@ -4,10 +4,11 @@ import ( "errors" "fmt" "strconv" + "time" "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/common/dao" - jobmodels "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/q" ) @@ -115,21 +116,17 @@ func fillStatus(exec *models.RetentionExecution) error { } total += v switch k { - case jobmodels.JobScheduled: + case job.ScheduledStatus.String(): running += v - case jobmodels.JobPending: + case job.PendingStatus.String(): running += v - case jobmodels.JobRunning: + case job.RunningStatus.String(): running += v - case jobmodels.JobRetrying: - running += v - case jobmodels.JobFinished: + case job.SuccessStatus.String(): succeed += v - case jobmodels.JobCanceled: + case job.StoppedStatus.String(): stopped += v - case jobmodels.JobStopped: - stopped += v - case jobmodels.JobError: + case job.ErrorStatus.String(): failed += v } } @@ -232,15 +229,26 @@ func UpdateTask(task *models.RetentionTask, cols ...string) error { return err } -// UpdateTaskStatus updates the status of task whose status code is less than the statusCode provided -func UpdateTaskStatus(taskID int64, status string, statusCode int) error { - _, err := dao.GetOrmer().QueryTable(&models.RetentionTask{}). - Filter("ID", taskID). - Filter("StatusCode__lt", statusCode). - Update(orm.Params{ - "Status": status, - "StatusCode": statusCode, - }) +// UpdateTaskStatus updates the status of task according to the status code and revision to avoid +// override when running in concurrency +func UpdateTaskStatus(taskID int64, status string, statusCode int, statusRevision int64) error { + params := []interface{}{} + // use raw sql rather than the ORM as the sql generated by ORM isn't a "single" statement + // which means the operation isn't atomic + sql := `update retention_task set status = ?, status_code = ?, status_revision = ?, end_time = ? ` + params = append(params, status, statusCode, statusRevision) + var t time.Time + // when the task is in final status, update the endtime + // when the task re-runs again, the endtime should be cleared + // so set the endtime to null if the task isn't in final status + if IsFinalStatus(status) { + t = time.Now() + } + params = append(params, t) + sql += `where id = ? and + (status_revision = ? and status_code < ? or status_revision < ?) ` + params = append(params, taskID, statusRevision, statusCode, statusRevision) + _, err := dao.GetOrmer().Raw(sql, params).Exec() return err } @@ -292,3 +300,12 @@ func GetTotalOfTasks(executionID int64) (int64, error) { qs = qs.Filter("ExecutionID", executionID) return qs.Count() } + +// IsFinalStatus checks whether the status is a final status +func IsFinalStatus(status string) bool { + if status == job.StoppedStatus.String() || status == job.SuccessStatus.String() || + status == job.ErrorStatus.String() { + return true + } + return false +} diff --git a/src/pkg/retention/dao/retention_test.go b/src/pkg/retention/dao/retention_test.go index 597b86c40..b3fcdc85b 100644 --- a/src/pkg/retention/dao/retention_test.go +++ b/src/pkg/retention/dao/retention_test.go @@ -199,7 +199,7 @@ func TestTask(t *testing.T) { require.Nil(t, err) // update status - err = UpdateTaskStatus(id, "running", 1) + err = UpdateTaskStatus(id, "running", 1, 1) require.Nil(t, err) // list @@ -213,6 +213,7 @@ func TestTask(t *testing.T) { assert.Equal(t, int64(1), tasks[0].ExecutionID) assert.Equal(t, "running", tasks[0].Status) assert.Equal(t, 1, tasks[0].StatusCode) + assert.Equal(t, int64(1), tasks[0].StatusRevision) // delete err = DeleteTask(id) diff --git a/src/pkg/retention/launcher_test.go b/src/pkg/retention/launcher_test.go index 27ce757b6..c63b7bf28 100644 --- a/src/pkg/retention/launcher_test.go +++ b/src/pkg/retention/launcher_test.go @@ -126,7 +126,7 @@ func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) { func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error { return nil } -func (f *fakeRetentionManager) UpdateTaskStatus(int64, string) error { +func (f *fakeRetentionManager) UpdateTaskStatus(int64, string, int64) error { return nil } func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) { diff --git a/src/pkg/retention/manager.go b/src/pkg/retention/manager.go index c93f92256..4d3121f94 100644 --- a/src/pkg/retention/manager.go +++ b/src/pkg/retention/manager.go @@ -60,11 +60,9 @@ type Manager interface { // Update the specified task UpdateTask(task *Task, cols ...string) error // Update the status of the specified task - // The status is updated only when it is behind the one stored - // in the database. - // e.g. if the status is running but the status stored - // in database is failed, the updating doesn't take effect - UpdateTaskStatus(taskID int64, status string) error + // The status is updated only when (the statusRevision > the current revision) + // or (the the statusRevision = the current revision and status > the current status) + UpdateTaskStatus(taskID int64, status string, statusRevision int64) error // Get the task specified by the task ID GetTask(taskID int64) (*Task, error) // Get the log of the specified task @@ -195,14 +193,16 @@ func (d *DefaultManager) CreateTask(task *Task) (int64, error) { return 0, errors.New("nil task") } t := &models.RetentionTask{ - ExecutionID: task.ExecutionID, - Repository: task.Repository, - JobID: task.JobID, - Status: task.Status, - StartTime: task.StartTime, - EndTime: task.EndTime, - Total: task.Total, - Retained: task.Retained, + ExecutionID: task.ExecutionID, + Repository: task.Repository, + JobID: task.JobID, + Status: task.Status, + StatusCode: task.StatusCode, + StatusRevision: task.StatusRevision, + StartTime: task.StartTime, + EndTime: task.EndTime, + Total: task.Total, + Retained: task.Retained, } return dao.CreateTask(t) } @@ -219,16 +219,17 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) { tasks := make([]*Task, 0) for _, t := range ts { tasks = append(tasks, &Task{ - ID: t.ID, - ExecutionID: t.ExecutionID, - Repository: t.Repository, - JobID: t.JobID, - Status: t.Status, - StatusCode: t.StatusCode, - StartTime: t.StartTime, - EndTime: t.EndTime, - Total: t.Total, - Retained: t.Retained, + ID: t.ID, + ExecutionID: t.ExecutionID, + Repository: t.Repository, + JobID: t.JobID, + Status: t.Status, + StatusCode: t.StatusCode, + StatusRevision: t.StatusRevision, + StartTime: t.StartTime, + EndTime: t.EndTime, + Total: t.Total, + Retained: t.Retained, }) } return tasks, nil @@ -248,25 +249,27 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error { return fmt.Errorf("invalid task ID: %d", task.ID) } return dao.UpdateTask(&models.RetentionTask{ - ID: task.ID, - ExecutionID: task.ExecutionID, - Repository: task.Repository, - JobID: task.JobID, - Status: task.Status, - StartTime: task.StartTime, - EndTime: task.EndTime, - Total: task.Total, - Retained: task.Retained, + ID: task.ID, + ExecutionID: task.ExecutionID, + Repository: task.Repository, + JobID: task.JobID, + Status: task.Status, + StatusCode: task.StatusCode, + StatusRevision: task.StatusRevision, + StartTime: task.StartTime, + EndTime: task.EndTime, + Total: task.Total, + Retained: task.Retained, }, cols...) } // UpdateTaskStatus updates the status of the specified task -func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string) error { +func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64) error { if taskID <= 0 { return fmt.Errorf("invalid task ID: %d", taskID) } st := job.Status(status) - return dao.UpdateTaskStatus(taskID, status, st.Code()) + return dao.UpdateTaskStatus(taskID, status, st.Code(), statusRevision) } // GetTask returns the task specified by task ID @@ -279,16 +282,17 @@ func (d *DefaultManager) GetTask(taskID int64) (*Task, error) { return nil, err } return &Task{ - ID: task.ID, - ExecutionID: task.ExecutionID, - Repository: task.Repository, - JobID: task.JobID, - Status: task.Status, - StatusCode: task.StatusCode, - StartTime: task.StartTime, - EndTime: task.EndTime, - Total: task.Total, - Retained: task.Retained, + ID: task.ID, + ExecutionID: task.ExecutionID, + Repository: task.Repository, + JobID: task.JobID, + Status: task.Status, + StatusCode: task.StatusCode, + StatusRevision: task.StatusRevision, + StartTime: task.StartTime, + EndTime: task.EndTime, + Total: task.Total, + Retained: task.Retained, }, nil } diff --git a/src/pkg/retention/manager_test.go b/src/pkg/retention/manager_test.go index fce13f950..83e1ab0ef 100644 --- a/src/pkg/retention/manager_test.go +++ b/src/pkg/retention/manager_test.go @@ -171,12 +171,13 @@ func TestTask(t *testing.T) { err := m.DeleteExecution(1000) require.Nil(t, err) task := &Task{ - ExecutionID: 1000, - JobID: "1", - Status: jjob.PendingStatus.String(), - StatusCode: jjob.PendingStatus.Code(), - Total: 0, - StartTime: time.Now(), + ExecutionID: 1000, + JobID: "1", + Status: jjob.PendingStatus.String(), + StatusCode: jjob.PendingStatus.Code(), + StatusRevision: 1, + Total: 0, + StartTime: time.Now(), } // create id, err := m.CreateTask(task) @@ -194,12 +195,17 @@ func TestTask(t *testing.T) { require.Nil(t, err) // update status to success which is a final status - err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String()) + err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String(), 1) require.Nil(t, err) // try to update status to running, as the status has already - // been updated to a final status, this updating shouldn't take effect - err = m.UpdateTaskStatus(id, jjob.RunningStatus.String()) + // been updated to a final status and the stautus revision doesn't change, + // this updating shouldn't take effect + err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 1) + require.Nil(t, err) + + // update the revision and try to update status to running again + err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 2) require.Nil(t, err) // list @@ -210,8 +216,9 @@ func TestTask(t *testing.T) { require.Equal(t, 1, len(tasks)) assert.Equal(t, int64(1000), tasks[0].ExecutionID) assert.Equal(t, 1, tasks[0].Total) - assert.Equal(t, jjob.SuccessStatus.String(), tasks[0].Status) - assert.Equal(t, jjob.SuccessStatus.Code(), tasks[0].StatusCode) + assert.Equal(t, jjob.RunningStatus.String(), tasks[0].Status) + assert.Equal(t, jjob.RunningStatus.Code(), tasks[0].StatusCode) + assert.Equal(t, int64(2), tasks[0].StatusRevision) // get task log job.GlobalClient = &tjob.MockJobClient{ diff --git a/src/pkg/retention/models.go b/src/pkg/retention/models.go index 1e4219937..920b98e7d 100644 --- a/src/pkg/retention/models.go +++ b/src/pkg/retention/models.go @@ -43,16 +43,17 @@ type Execution struct { // Task of retention type Task struct { - ID int64 `json:"id"` - ExecutionID int64 `json:"execution_id"` - Repository string `json:"repository"` - JobID string `json:"job_id"` - Status string `json:"status"` - StatusCode int `json:"status_code"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - Total int `json:"total"` - Retained int `json:"retained"` + ID int64 `json:"id"` + ExecutionID int64 `json:"execution_id"` + Repository string `json:"repository"` + JobID string `json:"job_id"` + Status string `json:"status"` + StatusCode int `json:"status_code"` + StatusRevision int64 `json:"status_revision"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Total int `json:"total"` + Retained int `json:"retained"` } // History of retention