Merge pull request #8606 from ywk253100/190807_stuck

Fix replication tasks stuck in "InProgress" issue
This commit is contained in:
Steven Zou 2019-08-13 15:59:20 +08:00 committed by GitHub
commit 1adc3a9469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 114 additions and 42 deletions

View File

@ -139,6 +139,8 @@ var once sync.Once
// GetOrmer :set ormer singleton
func GetOrmer() orm.Ormer {
once.Do(func() {
// override the default value(1000) to return all records when setting no limit
orm.DefaultRowsLimit = -1
globalOrm = orm.NewOrm()
})
return globalOrm

View File

@ -44,7 +44,7 @@ func DeleteProjectMetadata(projectID int64, name ...string) error {
params = append(params, projectID)
if len(name) > 0 {
sql += fmt.Sprintf(` and name in ( %s )`, paramPlaceholder(len(name)))
sql += fmt.Sprintf(` and name in ( %s )`, ParamPlaceholderForIn(len(name)))
params = append(params, name)
}
@ -74,7 +74,7 @@ func GetProjectMetadata(projectID int64, name ...string) ([]*models.ProjectMetad
params = append(params, projectID)
if len(name) > 0 {
sql += fmt.Sprintf(` and name in ( %s )`, paramPlaceholder(len(name)))
sql += fmt.Sprintf(` and name in ( %s )`, ParamPlaceholderForIn(len(name)))
params = append(params, name)
}
@ -82,7 +82,9 @@ func GetProjectMetadata(projectID int64, name ...string) ([]*models.ProjectMetad
return proMetas, err
}
func paramPlaceholder(n int) string {
// ParamPlaceholderForIn returns a string that contains placeholders for sql keyword "in"
// e.g. n=3, returns "?,?,?"
func ParamPlaceholderForIn(n int) string {
placeholders := []string{}
for i := 0; i < n; i++ {
placeholders = append(placeholders, "?")

View File

@ -260,7 +260,7 @@ func projectQueryConditions(query *models.ProjectQueryParam) (string, []interfac
}
if len(query.ProjectIDs) > 0 {
sql += fmt.Sprintf(` and p.project_id in ( %s )`,
paramPlaceholder(len(query.ProjectIDs)))
ParamPlaceholderForIn(len(query.ProjectIDs)))
params = append(params, query.ProjectIDs)
}
return sql, params

View File

@ -193,7 +193,7 @@ func quotaQueryConditions(query ...*models.QuotaQuery) (string, []interface{}) {
}
if len(q.ReferenceIDs) != 0 {
sql += fmt.Sprintf(`AND a.reference_id IN (%s) `, paramPlaceholder(len(q.ReferenceIDs)))
sql += fmt.Sprintf(`AND a.reference_id IN (%s) `, ParamPlaceholderForIn(len(q.ReferenceIDs)))
params = append(params, q.ReferenceIDs)
}

View File

@ -111,7 +111,7 @@ func quotaUsageQueryConditions(query ...*models.QuotaUsageQuery) (string, []inte
params = append(params, q.ReferenceID)
}
if len(q.ReferenceIDs) != 0 {
sql += fmt.Sprintf(`and reference_id in (%s) `, paramPlaceholder(len(q.ReferenceIDs)))
sql += fmt.Sprintf(`and reference_id in (%s) `, ParamPlaceholderForIn(len(q.ReferenceIDs)))
params = append(params, q.ReferenceIDs)
}

View File

@ -178,7 +178,7 @@ func repositoryQueryConditions(query ...*models.RepositoryQuery) (string, []inte
if len(q.ProjectIDs) > 0 {
sql += fmt.Sprintf(`and r.project_id in ( %s ) `,
paramPlaceholder(len(q.ProjectIDs)))
ParamPlaceholderForIn(len(q.ProjectIDs)))
params = append(params, q.ProjectIDs)
}

View File

@ -33,12 +33,11 @@ import (
var statusMap = map[string]string{
job.JobServiceStatusPending: models.JobPending,
job.JobServiceStatusScheduled: models.JobScheduled,
job.JobServiceStatusRunning: models.JobRunning,
job.JobServiceStatusStopped: models.JobStopped,
job.JobServiceStatusCancelled: models.JobCanceled,
job.JobServiceStatusError: models.JobError,
job.JobServiceStatusSuccess: models.JobFinished,
job.JobServiceStatusScheduled: models.JobScheduled,
}
// Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice.

View File

@ -322,25 +322,39 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) {
return o.Update(task, props...)
}
// UpdateTaskStatus ...
// UpdateTaskStatus updates the status of task.
// The implementation uses raw sql rather than QuerySetter.Filter... as QuerySetter
// will generate sql like:
// `UPDATE "replication_task" SET "end_time" = $1, "status" = $2
// WHERE "id" IN ( SELECT T0."id" FROM "replication_task" T0 WHERE T0."id" = $3
// AND T0."status" IN ($4, $5, $6))]`
// which is not a "single" sql statement, this will cause issues when running in concurrency
func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) {
qs := dao.GetOrmer().QueryTable(&models.Task{}).
Filter("id", id)
if len(statusCondition) > 0 {
qs = qs.Filter("status", statusCondition[0])
}
params := orm.Params{
"status": status,
}
params := []interface{}{}
sql := `update replication_task set status = ? `
params = append(params, status)
if taskFinished(status) {
// should update endTime
params["end_time"] = time.Now()
sql += `, end_time = ? `
params = append(params, time.Now())
}
n, err := qs.Update(params)
sql += `where id = ? `
params = append(params, id)
if len(statusCondition) > 0 {
sql += fmt.Sprintf(`and status in (%s) `, dao.ParamPlaceholderForIn(len(statusCondition)))
params = append(params, statusCondition)
}
result, err := dao.GetOrmer().Raw(sql, params...).Exec()
if err != nil {
return 0, err
}
log.Debugf("update task status %d: -> %s", id, status)
n, _ := result.RowsAffected()
if n > 0 {
log.Debugf("update task status %d: -> %s", id, status)
}
return n, err
}

View File

@ -16,10 +16,12 @@ package operation
import (
"fmt"
"regexp"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
hjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/replication/dao/models"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/operation/execution"
@ -45,6 +47,11 @@ const (
maxReplicators = 1024
)
var (
statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running"
statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern)
)
// NewController returns a controller implementation
func NewController(js job.Client) Controller {
ctl := &controller{
@ -149,19 +156,36 @@ func (c *controller) StopReplication(executionID int64) error {
}
// got tasks, stopping the tasks one by one
for _, task := range tasks {
if !isTaskRunning(task) {
log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status)
if isTaskInFinalStatus(task) {
log.Debugf("the task %d(job ID: %s) is in final status, its status is %s, skip", task.ID, task.JobID, task.Status)
continue
}
if err = c.scheduler.Stop(task.JobID); err != nil {
return err
status, flag := isStatusBehindError(err)
if flag {
switch hjob.Status(status) {
case hjob.ErrorStatus:
status = models.TaskStatusFailed
case hjob.SuccessStatus:
status = models.TaskStatusSucceed
}
e := c.executionMgr.UpdateTaskStatus(task.ID, status)
if e != nil {
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
} else {
log.Debugf("got status behind error for task %d, update it's status to %s directly", task.ID, status)
}
continue
}
log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err)
continue
}
log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID)
}
return nil
}
func isTaskRunning(task *models.Task) bool {
func isTaskInFinalStatus(task *models.Task) bool {
if task == nil {
return false
}
@ -169,9 +193,20 @@ func isTaskRunning(task *models.Task) bool {
case models.TaskStatusSucceed,
models.TaskStatusStopped,
models.TaskStatusFailed:
return false
return true
}
return true
return false
}
func isStatusBehindError(err error) (string, bool) {
if err == nil {
return "", false
}
strs := statusBehindErrorReg.FindStringSubmatch(err.Error())
if len(strs) != 2 {
return "", false
}
return strs[1], true
}
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {

View File

@ -15,6 +15,7 @@
package operation
import (
"errors"
"io"
"os"
"testing"
@ -344,40 +345,57 @@ func TestGetTaskLog(t *testing.T) {
func TestIsTaskRunning(t *testing.T) {
cases := []struct {
task *models.Task
isRunning bool
task *models.Task
isFinalStatus bool
}{
{
task: nil,
isRunning: false,
task: nil,
isFinalStatus: false,
},
{
task: &models.Task{
Status: models.TaskStatusSucceed,
},
isRunning: false,
isFinalStatus: true,
},
{
task: &models.Task{
Status: models.TaskStatusFailed,
},
isRunning: false,
isFinalStatus: true,
},
{
task: &models.Task{
Status: models.TaskStatusStopped,
},
isRunning: false,
isFinalStatus: true,
},
{
task: &models.Task{
Status: models.TaskStatusInProgress,
},
isRunning: true,
isFinalStatus: false,
},
}
for _, c := range cases {
assert.Equal(t, c.isRunning, isTaskRunning(c.task))
assert.Equal(t, c.isFinalStatus, isTaskInFinalStatus(c.task))
}
}
func TestIsStatusBehindError(t *testing.T) {
// nil error
status, flag := isStatusBehindError(nil)
assert.False(t, flag)
// not status behind error
err := errors.New("not status behind error")
status, flag = isStatusBehindError(err)
assert.False(t, flag)
// status behind error
err = errors.New("mismatch job status for stopping job: 9feedf9933jffs, job status Error is behind Running")
status, flag = isStatusBehindError(err)
assert.True(t, flag)
assert.Equal(t, "Error", status)
}

View File

@ -152,13 +152,9 @@ func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error {
// UpdateTaskStatus ...
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error {
n, err := dao.UpdateTaskStatus(taskID, status, statusCondition...)
if err != nil {
if _, err := dao.UpdateTaskStatus(taskID, status, statusCondition...); err != nil {
return err
}
if n == 0 {
return fmt.Errorf("Update task status failed %d: -> %s ", taskID, status)
}
return nil
}

View File

@ -25,17 +25,23 @@ func UpdateTask(ctl operation.Controller, id int64, status string) error {
jobStatus := job.Status(status)
// convert the job status to task status
s := ""
preStatus := []string{}
switch jobStatus {
case job.PendingStatus:
s = models.TaskStatusPending
preStatus = append(preStatus, models.TaskStatusInitialized)
case job.ScheduledStatus, job.RunningStatus:
s = models.TaskStatusInProgress
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending)
case job.StoppedStatus:
s = models.TaskStatusStopped
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress)
case job.ErrorStatus:
s = models.TaskStatusFailed
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress)
case job.SuccessStatus:
s = models.TaskStatusSucceed
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress)
}
return ctl.UpdateTaskStatus(id, s)
return ctl.UpdateTaskStatus(id, s, preStatus...)
}