From 8777c07d475d1b8393c601b23348e932d407a2d0 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 6 Aug 2019 06:42:58 +0800 Subject: [PATCH] Fix replication tasks stuck in "InProgress" issue Fix replication tasks stuck in "InProgress" issue Signed-off-by: Wenkai Yin --- src/common/dao/base.go | 2 + src/common/dao/pro_meta.go | 8 ++-- src/common/dao/project.go | 2 +- src/common/dao/quota.go | 2 +- src/common/dao/quota_usage.go | 2 +- src/common/dao/repository.go | 2 +- .../service/notifications/jobs/handler.go | 3 +- src/replication/dao/execution.go | 38 ++++++++++----- src/replication/operation/controller.go | 47 ++++++++++++++++--- src/replication/operation/controller_test.go | 36 ++++++++++---- .../operation/execution/execution.go | 6 +-- src/replication/operation/hook/task.go | 8 +++- 12 files changed, 114 insertions(+), 42 deletions(-) diff --git a/src/common/dao/base.go b/src/common/dao/base.go index 804a73208..6a31a11c4 100644 --- a/src/common/dao/base.go +++ b/src/common/dao/base.go @@ -139,6 +139,8 @@ var once sync.Once // GetOrmer :set ormer singleton func GetOrmer() orm.Ormer { once.Do(func() { + // override the default value(1000) to return all records when setting no limit + orm.DefaultRowsLimit = -1 globalOrm = orm.NewOrm() }) return globalOrm diff --git a/src/common/dao/pro_meta.go b/src/common/dao/pro_meta.go index d4a9c4e6f..a6593e2ef 100644 --- a/src/common/dao/pro_meta.go +++ b/src/common/dao/pro_meta.go @@ -44,7 +44,7 @@ func DeleteProjectMetadata(projectID int64, name ...string) error { params = append(params, projectID) if len(name) > 0 { - sql += fmt.Sprintf(` and name in ( %s )`, paramPlaceholder(len(name))) + sql += fmt.Sprintf(` and name in ( %s )`, ParamPlaceholderForIn(len(name))) params = append(params, name) } @@ -74,7 +74,7 @@ func GetProjectMetadata(projectID int64, name ...string) ([]*models.ProjectMetad params = append(params, projectID) if len(name) > 0 { - sql += fmt.Sprintf(` and name in ( %s )`, paramPlaceholder(len(name))) + sql += fmt.Sprintf(` and name in ( %s )`, ParamPlaceholderForIn(len(name))) params = append(params, name) } @@ -82,7 +82,9 @@ func GetProjectMetadata(projectID int64, name ...string) ([]*models.ProjectMetad return proMetas, err } -func paramPlaceholder(n int) string { +// ParamPlaceholderForIn returns a string that contains placeholders for sql keyword "in" +// e.g. n=3, returns "?,?,?" +func ParamPlaceholderForIn(n int) string { placeholders := []string{} for i := 0; i < n; i++ { placeholders = append(placeholders, "?") diff --git a/src/common/dao/project.go b/src/common/dao/project.go index b3066bcf1..483da9ba5 100644 --- a/src/common/dao/project.go +++ b/src/common/dao/project.go @@ -259,7 +259,7 @@ func projectQueryConditions(query *models.ProjectQueryParam) (string, []interfac } if len(query.ProjectIDs) > 0 { sql += fmt.Sprintf(` and p.project_id in ( %s )`, - paramPlaceholder(len(query.ProjectIDs))) + ParamPlaceholderForIn(len(query.ProjectIDs))) params = append(params, query.ProjectIDs) } return sql, params diff --git a/src/common/dao/quota.go b/src/common/dao/quota.go index 6cf130d3d..c86c53797 100644 --- a/src/common/dao/quota.go +++ b/src/common/dao/quota.go @@ -193,7 +193,7 @@ func quotaQueryConditions(query ...*models.QuotaQuery) (string, []interface{}) { } if len(q.ReferenceIDs) != 0 { - sql += fmt.Sprintf(`AND a.reference_id IN (%s) `, paramPlaceholder(len(q.ReferenceIDs))) + sql += fmt.Sprintf(`AND a.reference_id IN (%s) `, ParamPlaceholderForIn(len(q.ReferenceIDs))) params = append(params, q.ReferenceIDs) } diff --git a/src/common/dao/quota_usage.go b/src/common/dao/quota_usage.go index 8e2f7ca48..d8b55db9b 100644 --- a/src/common/dao/quota_usage.go +++ b/src/common/dao/quota_usage.go @@ -111,7 +111,7 @@ func quotaUsageQueryConditions(query ...*models.QuotaUsageQuery) (string, []inte params = append(params, q.ReferenceID) } if len(q.ReferenceIDs) != 0 { - sql += fmt.Sprintf(`and reference_id in (%s) `, paramPlaceholder(len(q.ReferenceIDs))) + sql += fmt.Sprintf(`and reference_id in (%s) `, ParamPlaceholderForIn(len(q.ReferenceIDs))) params = append(params, q.ReferenceIDs) } diff --git a/src/common/dao/repository.go b/src/common/dao/repository.go index c05a46899..abb859525 100644 --- a/src/common/dao/repository.go +++ b/src/common/dao/repository.go @@ -178,7 +178,7 @@ func repositoryQueryConditions(query ...*models.RepositoryQuery) (string, []inte if len(q.ProjectIDs) > 0 { sql += fmt.Sprintf(`and r.project_id in ( %s ) `, - paramPlaceholder(len(q.ProjectIDs))) + ParamPlaceholderForIn(len(q.ProjectIDs))) params = append(params, q.ProjectIDs) } diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index d35147f9d..00107b1d7 100644 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -32,12 +32,11 @@ import ( var statusMap = map[string]string{ job.JobServiceStatusPending: models.JobPending, + job.JobServiceStatusScheduled: models.JobScheduled, job.JobServiceStatusRunning: models.JobRunning, job.JobServiceStatusStopped: models.JobStopped, - job.JobServiceStatusCancelled: models.JobCanceled, job.JobServiceStatusError: models.JobError, job.JobServiceStatusSuccess: models.JobFinished, - job.JobServiceStatusScheduled: models.JobScheduled, } // Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice. diff --git a/src/replication/dao/execution.go b/src/replication/dao/execution.go index 030b57ef0..1fbd54ba2 100644 --- a/src/replication/dao/execution.go +++ b/src/replication/dao/execution.go @@ -322,25 +322,39 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) { return o.Update(task, props...) } -// UpdateTaskStatus ... +// UpdateTaskStatus updates the status of task. +// The implementation uses raw sql rather than QuerySetter.Filter... as QuerySetter +// will generate sql like: +// `UPDATE "replication_task" SET "end_time" = $1, "status" = $2 +// WHERE "id" IN ( SELECT T0."id" FROM "replication_task" T0 WHERE T0."id" = $3 +// AND T0."status" IN ($4, $5, $6))]` +// which is not a "single" sql statement, this will cause issues when running in concurrency func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { - qs := dao.GetOrmer().QueryTable(&models.Task{}). - Filter("id", id) - if len(statusCondition) > 0 { - qs = qs.Filter("status", statusCondition[0]) - } - params := orm.Params{ - "status": status, - } + params := []interface{}{} + sql := `update replication_task set status = ? ` + params = append(params, status) + if taskFinished(status) { // should update endTime - params["end_time"] = time.Now() + sql += `, end_time = ? ` + params = append(params, time.Now()) } - n, err := qs.Update(params) + + sql += `where id = ? ` + params = append(params, id) + if len(statusCondition) > 0 { + sql += fmt.Sprintf(`and status in (%s) `, dao.ParamPlaceholderForIn(len(statusCondition))) + params = append(params, statusCondition) + } + + result, err := dao.GetOrmer().Raw(sql, params...).Exec() if err != nil { return 0, err } - log.Debugf("update task status %d: -> %s", id, status) + n, _ := result.RowsAffected() + if n > 0 { + log.Debugf("update task status %d: -> %s", id, status) + } return n, err } diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index 878325e7e..35d6e84fe 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -16,10 +16,12 @@ package operation import ( "fmt" + "regexp" "time" "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/utils/log" + hjob "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/replication/dao/models" "github.com/goharbor/harbor/src/replication/model" "github.com/goharbor/harbor/src/replication/operation/execution" @@ -45,6 +47,11 @@ const ( maxReplicators = 1024 ) +var ( + statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running" + statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern) +) + // NewController returns a controller implementation func NewController(js job.Client) Controller { ctl := &controller{ @@ -149,19 +156,36 @@ func (c *controller) StopReplication(executionID int64) error { } // got tasks, stopping the tasks one by one for _, task := range tasks { - if !isTaskRunning(task) { - log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status) + if isTaskInFinalStatus(task) { + log.Debugf("the task %d(job ID: %s) is in final status, its status is %s, skip", task.ID, task.JobID, task.Status) continue } if err = c.scheduler.Stop(task.JobID); err != nil { - return err + status, flag := isStatusBehindError(err) + if flag { + switch hjob.Status(status) { + case hjob.ErrorStatus: + status = models.TaskStatusFailed + case hjob.SuccessStatus: + status = models.TaskStatusSucceed + } + e := c.executionMgr.UpdateTaskStatus(task.ID, status) + if e != nil { + log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e) + } else { + log.Debugf("got status behind error for task %d, update it's status to %s directly", task.ID, status) + } + continue + } + log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err) + continue } log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID) } return nil } -func isTaskRunning(task *models.Task) bool { +func isTaskInFinalStatus(task *models.Task) bool { if task == nil { return false } @@ -169,9 +193,20 @@ func isTaskRunning(task *models.Task) bool { case models.TaskStatusSucceed, models.TaskStatusStopped, models.TaskStatusFailed: - return false + return true } - return true + return false +} + +func isStatusBehindError(err error) (string, bool) { + if err == nil { + return "", false + } + strs := statusBehindErrorReg.FindStringSubmatch(err.Error()) + if len(strs) != 2 { + return "", false + } + return strs[1], true } func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { diff --git a/src/replication/operation/controller_test.go b/src/replication/operation/controller_test.go index c19daa8c9..b6f4f5d77 100644 --- a/src/replication/operation/controller_test.go +++ b/src/replication/operation/controller_test.go @@ -15,6 +15,7 @@ package operation import ( + "errors" "io" "os" "testing" @@ -344,40 +345,57 @@ func TestGetTaskLog(t *testing.T) { func TestIsTaskRunning(t *testing.T) { cases := []struct { - task *models.Task - isRunning bool + task *models.Task + isFinalStatus bool }{ { - task: nil, - isRunning: false, + task: nil, + isFinalStatus: false, }, { task: &models.Task{ Status: models.TaskStatusSucceed, }, - isRunning: false, + isFinalStatus: true, }, { task: &models.Task{ Status: models.TaskStatusFailed, }, - isRunning: false, + isFinalStatus: true, }, { task: &models.Task{ Status: models.TaskStatusStopped, }, - isRunning: false, + isFinalStatus: true, }, { task: &models.Task{ Status: models.TaskStatusInProgress, }, - isRunning: true, + isFinalStatus: false, }, } for _, c := range cases { - assert.Equal(t, c.isRunning, isTaskRunning(c.task)) + assert.Equal(t, c.isFinalStatus, isTaskInFinalStatus(c.task)) } } + +func TestIsStatusBehindError(t *testing.T) { + // nil error + status, flag := isStatusBehindError(nil) + assert.False(t, flag) + + // not status behind error + err := errors.New("not status behind error") + status, flag = isStatusBehindError(err) + assert.False(t, flag) + + // status behind error + err = errors.New("mismatch job status for stopping job: 9feedf9933jffs, job status Error is behind Running") + status, flag = isStatusBehindError(err) + assert.True(t, flag) + assert.Equal(t, "Error", status) +} diff --git a/src/replication/operation/execution/execution.go b/src/replication/operation/execution/execution.go index ba9189826..0d4db946c 100644 --- a/src/replication/operation/execution/execution.go +++ b/src/replication/operation/execution/execution.go @@ -152,13 +152,9 @@ func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error { // UpdateTaskStatus ... func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error { - n, err := dao.UpdateTaskStatus(taskID, status, statusCondition...) - if err != nil { + if _, err := dao.UpdateTaskStatus(taskID, status, statusCondition...); err != nil { return err } - if n == 0 { - return fmt.Errorf("Update task status failed %d: -> %s ", taskID, status) - } return nil } diff --git a/src/replication/operation/hook/task.go b/src/replication/operation/hook/task.go index 576d0deab..220763dbd 100644 --- a/src/replication/operation/hook/task.go +++ b/src/replication/operation/hook/task.go @@ -25,17 +25,23 @@ func UpdateTask(ctl operation.Controller, id int64, status string) error { jobStatus := job.Status(status) // convert the job status to task status s := "" + preStatus := []string{} switch jobStatus { case job.PendingStatus: s = models.TaskStatusPending + preStatus = append(preStatus, models.TaskStatusInitialized) case job.ScheduledStatus, job.RunningStatus: s = models.TaskStatusInProgress + preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending) case job.StoppedStatus: s = models.TaskStatusStopped + preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress) case job.ErrorStatus: s = models.TaskStatusFailed + preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress) case job.SuccessStatus: s = models.TaskStatusSucceed + preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress) } - return ctl.UpdateTaskStatus(id, s) + return ctl.UpdateTaskStatus(id, s, preStatus...) }