From 48b067f59681d849c5a67129a29836f60132a5f6 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 15 Aug 2019 09:50:53 +0800 Subject: [PATCH] Handle the retention task status updating in concurrency Compare the status code when updating retention task status to avoid the concurrent issue Signed-off-by: Wenkai Yin --- .../postgresql/0010_1.9.0_schema.up.sql | 1 + .../service/notifications/jobs/handler.go | 71 +++++++++++-------- src/pkg/retention/dao/models/retention.go | 1 + src/pkg/retention/dao/retention.go | 15 +++- src/pkg/retention/dao/retention_test.go | 10 ++- src/pkg/retention/launcher_test.go | 3 + src/pkg/retention/manager.go | 22 +++++- src/pkg/retention/manager_test.go | 28 +++++--- src/pkg/retention/models.go | 7 +- 9 files changed, 107 insertions(+), 51 deletions(-) diff --git a/make/migrations/postgresql/0010_1.9.0_schema.up.sql b/make/migrations/postgresql/0010_1.9.0_schema.up.sql index 7fd3a4241..3a2cc8e7e 100644 --- a/make/migrations/postgresql/0010_1.9.0_schema.up.sql +++ b/make/migrations/postgresql/0010_1.9.0_schema.up.sql @@ -131,6 +131,7 @@ create table retention_task repository varchar(255), job_id varchar(64), status varchar(32), + status_code integer, start_time timestamp default CURRENT_TIMESTAMP, end_time timestamp default CURRENT_TIMESTAMP, total integer, diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index cf6706b7c..47377f9cc 100755 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -24,6 +24,7 @@ import ( "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" + jjob "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/notification" "github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/replication" @@ -109,42 +110,52 @@ func (h *Handler) HandleReplicationTask() { // HandleRetentionTask handles the webhook of retention task func (h *Handler) HandleRetentionTask() { - log.Debugf("received retention task status update event: task-%d, status-%s", h.id, h.status) + taskID := h.id + status := h.rawStatus + log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status) mgr := &retention.DefaultManager{} - props := []string{"Status"} - task := &retention.Task{ - ID: h.id, - Status: h.status, - } - if h.status == models.JobFinished || h.status == models.JobError || - h.status == models.JobStopped { - task.EndTime = time.Now() - props = append(props, "EndTime") - } else if h.status == models.JobRunning { - if h.checkIn != "" { - var retainObj struct { - Total int `json:"total"` - Retained int `json:"retained"` - } - if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil { - log.Errorf("failed to resolve checkin of retention task %d: %v", h.id, err) - } else { - if retainObj.Total > 0 { - task.Total = retainObj.Total - props = append(props, "Total") - } - if retainObj.Retained > 0 { - task.Retained = retainObj.Retained - props = append(props, "Retained") - } - } + // handle checkin + if h.checkIn != "" { + var retainObj struct { + Total int `json:"total"` + Retained int `json:"retained"` } + if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil { + log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err) + return + } + task := &retention.Task{ + ID: taskID, + Total: retainObj.Total, + Retained: retainObj.Retained, + } + if err := mgr.UpdateTask(task, "Total", "Retained"); err != nil { + log.Errorf("failed to update of retention task %d: %v", taskID, err) + h.SendInternalServerError(err) + return + } + return } - if err := mgr.UpdateTask(task, props...); err != nil { - log.Errorf("failed to update the status of retention task %d: %v", h.id, err) + + // handle status updating + if err := mgr.UpdateTaskStatus(taskID, status); err != nil { + log.Errorf("failed to update the status of retention task %d: %v", taskID, err) h.SendInternalServerError(err) return } + // if the status is the final status, update the end time + if status == jjob.StoppedStatus.String() || status == jjob.SuccessStatus.String() || + status == jjob.ErrorStatus.String() { + task := &retention.Task{ + ID: taskID, + EndTime: time.Now(), + } + if err := mgr.UpdateTask(task, "EndTime"); err != nil { + log.Errorf("failed to update of retention task %d: %v", taskID, err) + h.SendInternalServerError(err) + return + } + } } // HandleNotificationJob handles the hook of notification job diff --git a/src/pkg/retention/dao/models/retention.go b/src/pkg/retention/dao/models/retention.go index f1c5cce7b..8e7d94590 100644 --- a/src/pkg/retention/dao/models/retention.go +++ b/src/pkg/retention/dao/models/retention.go @@ -54,6 +54,7 @@ type RetentionTask struct { Repository string `orm:"column(repository)"` JobID string `orm:"column(job_id)"` Status string `orm:"column(status)"` + StatusCode int `orm:"column(status_code)"` StartTime time.Time `orm:"column(start_time)"` EndTime time.Time `orm:"column(end_time)"` Total int `orm:"column(total)"` diff --git a/src/pkg/retention/dao/retention.go b/src/pkg/retention/dao/retention.go index e10cf12e4..2c810923d 100644 --- a/src/pkg/retention/dao/retention.go +++ b/src/pkg/retention/dao/retention.go @@ -3,12 +3,13 @@ package dao import ( "errors" "fmt" + "strconv" + "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/common/dao" jobmodels "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/q" - "strconv" ) // CreatePolicy Create Policy @@ -228,6 +229,18 @@ func UpdateTask(task *models.RetentionTask, cols ...string) error { return err } +// UpdateTaskStatus updates the status of task whose status code is less than the statusCode provided +func UpdateTaskStatus(taskID int64, status string, statusCode int) error { + _, err := dao.GetOrmer().QueryTable(&models.RetentionTask{}). + Filter("ID", taskID). + Filter("StatusCode__lt", statusCode). + Update(orm.Params{ + "Status": status, + "StatusCode": statusCode, + }) + return err +} + // DeleteTask deletes the task record specified by ID in database func DeleteTask(id int64) error { _, err := dao.GetOrmer().Delete(&models.RetentionTask{ diff --git a/src/pkg/retention/dao/retention_test.go b/src/pkg/retention/dao/retention_test.go index df7c757c7..597b86c40 100644 --- a/src/pkg/retention/dao/retention_test.go +++ b/src/pkg/retention/dao/retention_test.go @@ -194,8 +194,12 @@ func TestTask(t *testing.T) { // update task.ID = id - task.Status = "running" - err = UpdateTask(task, "Status") + task.Total = 1 + err = UpdateTask(task, "Total") + require.Nil(t, err) + + // update status + err = UpdateTaskStatus(id, "running", 1) require.Nil(t, err) // list @@ -205,8 +209,10 @@ func TestTask(t *testing.T) { }) require.Nil(t, err) require.Equal(t, 1, len(tasks)) + assert.Equal(t, 1, tasks[0].Total) assert.Equal(t, int64(1), tasks[0].ExecutionID) assert.Equal(t, "running", tasks[0].Status) + assert.Equal(t, 1, tasks[0].StatusCode) // delete err = DeleteTask(id) diff --git a/src/pkg/retention/launcher_test.go b/src/pkg/retention/launcher_test.go index fa94087ca..27ce757b6 100644 --- a/src/pkg/retention/launcher_test.go +++ b/src/pkg/retention/launcher_test.go @@ -126,6 +126,9 @@ func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) { func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error { return nil } +func (f *fakeRetentionManager) UpdateTaskStatus(int64, string) error { + return nil +} func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) { return nil, nil } diff --git a/src/pkg/retention/manager.go b/src/pkg/retention/manager.go index ccb2f6339..c93f92256 100644 --- a/src/pkg/retention/manager.go +++ b/src/pkg/retention/manager.go @@ -21,7 +21,8 @@ import ( "time" "github.com/astaxie/beego/orm" - "github.com/goharbor/harbor/src/common/job" + cjob "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/retention/dao" "github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/policy" @@ -58,6 +59,12 @@ type Manager interface { CreateTask(task *Task) (int64, error) // Update the specified task UpdateTask(task *Task, cols ...string) error + // Update the status of the specified task + // The status is updated only when it is behind the one stored + // in the database. + // e.g. if the status is running but the status stored + // in database is failed, the updating doesn't take effect + UpdateTaskStatus(taskID int64, status string) error // Get the task specified by the task ID GetTask(taskID int64) (*Task, error) // Get the log of the specified task @@ -217,6 +224,7 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) { Repository: t.Repository, JobID: t.JobID, Status: t.Status, + StatusCode: t.StatusCode, StartTime: t.StartTime, EndTime: t.EndTime, Total: t.Total, @@ -252,6 +260,15 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error { }, cols...) } +// UpdateTaskStatus updates the status of the specified task +func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string) error { + if taskID <= 0 { + return fmt.Errorf("invalid task ID: %d", taskID) + } + st := job.Status(status) + return dao.UpdateTaskStatus(taskID, status, st.Code()) +} + // GetTask returns the task specified by task ID func (d *DefaultManager) GetTask(taskID int64) (*Task, error) { if taskID <= 0 { @@ -267,6 +284,7 @@ func (d *DefaultManager) GetTask(taskID int64) (*Task, error) { Repository: task.Repository, JobID: task.JobID, Status: task.Status, + StatusCode: task.StatusCode, StartTime: task.StartTime, EndTime: task.EndTime, Total: task.Total, @@ -283,7 +301,7 @@ func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) { if task == nil { return nil, fmt.Errorf("task %d not found", taskID) } - return job.GlobalClient.GetJobLog(task.JobID) + return cjob.GlobalClient.GetJobLog(task.JobID) } // NewManager ... diff --git a/src/pkg/retention/manager_test.go b/src/pkg/retention/manager_test.go index b8310af8c..cbcbf5f10 100644 --- a/src/pkg/retention/manager_test.go +++ b/src/pkg/retention/manager_test.go @@ -6,8 +6,8 @@ import ( "time" "github.com/goharbor/harbor/src/common/dao" - "github.com/goharbor/harbor/src/common/job" + jjob "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/rule" "github.com/goharbor/harbor/src/pkg/retention/q" @@ -171,7 +171,9 @@ func TestTask(t *testing.T) { task := &Task{ ExecutionID: 1, JobID: "1", - Status: TaskStatusPending, + Status: jjob.PendingStatus.String(), + StatusCode: jjob.PendingStatus.Code(), + Total: 0, StartTime: time.Now(), } // create @@ -185,23 +187,29 @@ func TestTask(t *testing.T) { // update task.ID = id - task.Status = TaskStatusInProgress - err = m.UpdateTask(task, "Status") + task.Total = 1 + err = m.UpdateTask(task, "Total") + require.Nil(t, err) + + // update status to success which is a final status + err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String()) + require.Nil(t, err) + + // try to update status to running, as the status has already + // been updated to a final status, this updating shouldn't take effect + err = m.UpdateTaskStatus(id, jjob.RunningStatus.String()) require.Nil(t, err) // list tasks, err := m.ListTasks(&q.TaskQuery{ ExecutionID: 1, - Status: TaskStatusInProgress, }) require.Nil(t, err) require.Equal(t, 1, len(tasks)) assert.Equal(t, int64(1), tasks[0].ExecutionID) - assert.Equal(t, TaskStatusInProgress, tasks[0].Status) - - task.Status = TaskStatusFailed - err = m.UpdateTask(task, "Status") - require.Nil(t, err) + assert.Equal(t, 1, tasks[0].Total) + assert.Equal(t, jjob.SuccessStatus.String(), tasks[0].Status) + assert.Equal(t, jjob.SuccessStatus.Code(), tasks[0].StatusCode) // get task log job.GlobalClient = &tjob.MockJobClient{ diff --git a/src/pkg/retention/models.go b/src/pkg/retention/models.go index 6b2daab9a..1e4219937 100644 --- a/src/pkg/retention/models.go +++ b/src/pkg/retention/models.go @@ -23,12 +23,6 @@ const ( ExecutionStatusFailed string = "Failed" ExecutionStatusStopped string = "Stopped" - TaskStatusPending string = "Pending" - TaskStatusInProgress string = "InProgress" - TaskStatusSucceed string = "Succeed" - TaskStatusFailed string = "Failed" - TaskStatusStopped string = "Stopped" - CandidateKindImage string = "image" CandidateKindChart string = "chart" @@ -54,6 +48,7 @@ type Task struct { Repository string `json:"repository"` JobID string `json:"job_id"` Status string `json:"status"` + StatusCode int `json:"status_code"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Total int `json:"total"`