Merge pull request #8829 from ywk253100/190822_retry_status

Add status revision to handle retrying in replication task
This commit is contained in:
Wang Yan 2019-08-28 10:55:13 +08:00 committed by GitHub
commit 87893abc5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 121 additions and 119 deletions

View File

@ -182,3 +182,5 @@ create table notification_policy (
update_time timestamp default CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
ALTER TABLE replication_task ADD COLUMN status_revision int DEFAULT 0;

View File

@ -66,7 +66,7 @@ func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
}
return nil, nil
}
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
return nil
}
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {

View File

@ -239,9 +239,6 @@ func (r *ReplicationPolicyAPI) Delete() {
}
}
// the execution's status will not be updated if it is not queried
// so need to check the status of tasks to determine the status of
// the execution
func hasRunningExecutions(policyID int64) (bool, error) {
_, executions, err := replication.OperationCtl.ListExecutions(&models.ExecutionQuery{
PolicyID: policyID,
@ -253,35 +250,11 @@ func hasRunningExecutions(policyID int64) (bool, error) {
if execution.Status != models.ExecutionStatusInProgress {
continue
}
_, tasks, err := replication.OperationCtl.ListTasks(&models.TaskQuery{
ExecutionID: execution.ID,
})
if err != nil {
return false, err
}
for _, task := range tasks {
if isTaskRunning(task) {
return true, nil
}
}
}
return false, nil
}
// return true if the status of the task is running or pending
func isTaskRunning(task *models.Task) bool {
if task == nil {
return false
}
switch task.Status {
case models.TaskStatusSucceed,
models.TaskStatusStopped,
models.TaskStatusFailed:
return false
}
return true
}
// ignore the credential for the registries
func populateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
if err := event.PopulateRegistries(registryMgr, policy); err != nil {

View File

@ -121,7 +121,7 @@ func (h *Handler) HandleReplicationScheduleJob() {
// HandleReplicationTask handles the webhook of replication task
func (h *Handler) HandleReplicationTask() {
log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status)
if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus); err != nil {
if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus, h.revision); err != nil {
log.Errorf("failed to update the status of the replication task %d: %v", h.id, err)
h.SendInternalServerError(err)
return

View File

@ -131,12 +131,6 @@ func fillExecution(execution *models.Execution) error {
}
resetExecutionStatus(execution)
// if execution status changed to a final status, store to DB
if executionFinished(execution.Status) {
UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress,
models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped,
models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total)
}
return nil
}
@ -329,23 +323,21 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) {
// WHERE "id" IN ( SELECT T0."id" FROM "replication_task" T0 WHERE T0."id" = $3
// AND T0."status" IN ($4, $5, $6))]`
// which is not a "single" sql statement, this will cause issues when running in concurrency
func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) {
func UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) (int64, error) {
params := []interface{}{}
sql := `update replication_task set status = ? `
params = append(params, status)
sql := `update replication_task set status = ?, status_revision = ?, end_time = ? `
params = append(params, status, statusRevision)
var t time.Time
// when the task is in final status, update the endtime
// when the task re-runs again, the endtime should be cleared
// so set the endtime to null if the task isn't in final status
if taskFinished(status) {
// should update endTime
sql += `, end_time = ? `
params = append(params, time.Now())
t = time.Now()
}
params = append(params, t)
sql += `where id = ? `
params = append(params, id)
if len(statusCondition) > 0 {
sql += fmt.Sprintf(`and status in (%s) `, dao.ParamPlaceholderForIn(len(statusCondition)))
params = append(params, statusCondition)
}
sql += fmt.Sprintf(`where id = ? and (status_revision < ? or status_revision = ? and status in (%s)) `, dao.ParamPlaceholderForIn(len(statusCondition)))
params = append(params, id, statusRevision, statusRevision, statusCondition)
result, err := dao.GetOrmer().Raw(sql, params...).Exec()
if err != nil {

View File

@ -99,6 +99,7 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StatusRevision: 1,
StartTime: &now,
}
task2 := &models.Task{
@ -108,6 +109,7 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StatusRevision: 1,
StartTime: &now,
EndTime: &now,
}
@ -151,11 +153,12 @@ func TestMethodOfTask(t *testing.T) {
assert.Equal(t, int64(1), n)
// test update status
n, err = UpdateTaskStatus(id1, "Succeed")
n, err = UpdateTaskStatus(id1, "Succeed", 2, "Initialized")
require.Nil(t, err)
assert.Equal(t, int64(1), n)
task, _ = GetTask(id1)
assert.Equal(t, "Succeed", task.Status)
assert.Equal(t, int64(2), task.StatusRevision)
// test delete
require.Nil(t, DeleteTask(id1))
@ -244,6 +247,7 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: models.TaskStatusInProgress,
StatusRevision: 1,
StartTime: &now,
}
task2 := &models.Task{
@ -254,6 +258,7 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StatusRevision: 1,
StartTime: &now,
EndTime: &now,
}
@ -275,7 +280,7 @@ func TestExecutionFill2(t *testing.T) {
assert.Equal(t, 0, exe.Succeed)
// update task status and query and fill
UpdateTaskStatus(taskID1, models.TaskStatusFailed)
UpdateTaskStatus(taskID1, models.TaskStatusFailed, 2, models.TaskStatusInProgress)
exes, err := GetExecutions(&models.ExecutionQuery{
PolicyID: 11209,
})

View File

@ -117,6 +117,7 @@ type Task struct {
Operation string `orm:"column(operation)" json:"operation"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
StatusRevision int64 `orm:"column(status_revision)"`
StartTime *time.Time `orm:"column(start_time)" json:"start_time"`
EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"`
}

View File

@ -44,7 +44,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo
func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
return nil, nil
}
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
return nil
}
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {

View File

@ -17,6 +17,7 @@ package operation
import (
"fmt"
"regexp"
"strings"
"time"
"github.com/goharbor/harbor/src/common/job"
@ -39,7 +40,7 @@ type Controller interface {
GetExecution(int64) (*models.Execution, error)
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
GetTask(int64) (*models.Task, error)
UpdateTaskStatus(id int64, status string, statusCondition ...string) error
UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error
GetTaskLog(int64) ([]byte, error)
}
@ -50,6 +51,7 @@ const (
var (
statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running"
statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern)
jobNotFoundErrorMsg = "object is not found"
)
// NewController returns a controller implementation
@ -169,7 +171,10 @@ func (c *controller) StopReplication(executionID int64) error {
case hjob.SuccessStatus:
status = models.TaskStatusSucceed
}
e := c.executionMgr.UpdateTaskStatus(task.ID, status)
e := c.executionMgr.UpdateTask(&models.Task{
ID: task.ID,
Status: status,
}, "Status")
if e != nil {
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
} else {
@ -177,6 +182,18 @@ func (c *controller) StopReplication(executionID int64) error {
}
continue
}
if isJobNotFoundError(err) {
e := c.executionMgr.UpdateTask(&models.Task{
ID: task.ID,
Status: models.ExecutionStatusStopped,
}, "Status")
if e != nil {
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
} else {
log.Debugf("got job not found error for task %d, update it's status to %s directly", task.ID, models.ExecutionStatusStopped)
}
continue
}
log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err)
continue
}
@ -209,6 +226,13 @@ func isStatusBehindError(err error) (string, bool) {
return strs[1], true
}
func isJobNotFoundError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), jobNotFoundErrorMsg)
}
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return c.executionMgr.List(query...)
}
@ -221,8 +245,8 @@ func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Tas
func (c *controller) GetTask(id int64) (*models.Task, error) {
return c.executionMgr.GetTask(id)
}
func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
func (c *controller) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
return c.executionMgr.UpdateTaskStatus(id, status, statusRevision, statusCondition...)
}
func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
return c.executionMgr.GetTaskLog(taskID)

View File

@ -75,7 +75,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
@ -333,7 +333,7 @@ func TestGetTask(t *testing.T) {
}
func TestUpdateTaskStatus(t *testing.T) {
err := ctl.UpdateTaskStatus(1, "running")
err := ctl.UpdateTaskStatus(1, "running", 1)
require.Nil(t, err)
}

View File

@ -50,7 +50,7 @@ type Manager interface {
// UpdateTaskStatus only updates the task status. If "statusCondition"
// presents, only the tasks whose status equal to "statusCondition"
// will be updated
UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error
UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error
// Remove one task specified by task ID
RemoveTask(int64) error
// Remove all tasks of one execution specified by the execution ID
@ -151,8 +151,8 @@ func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error {
}
// UpdateTaskStatus ...
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error {
if _, err := dao.UpdateTaskStatus(taskID, status, statusCondition...); err != nil {
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error {
if _, err := dao.UpdateTaskStatus(taskID, status, statusRevision, statusCondition...); err != nil {
return err
}
return nil

View File

@ -82,6 +82,7 @@ func TestMethodOfTaskManager(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StatusRevision: 1,
StartTime: &now,
}
@ -121,7 +122,7 @@ func TestMethodOfTaskManager(t *testing.T) {
assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource)
// UpdateTaskStatus
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed)
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed, 1, models.TaskStatusInitialized)
require.Nil(t, err)
taskUpdate, _ = executionManager.GetTask(id)
assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status)

View File

@ -279,19 +279,23 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite
for _, result := range results {
// if the task is failed to be submitted, update the status of the
// task as failure
now := time.Now()
if result.Error != nil {
log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error)
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
if err = executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
Status: models.TaskStatusFailed,
EndTime: &now,
}, "Status", "EndTime"); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
continue
}
allFailed = false
// if the task is submitted successfully, update the status, job ID and start time
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, 0, models.TaskStatusInitialized); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
now := time.Now()
if err = executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
JobID: result.JobID,

View File

@ -176,7 +176,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {

View File

@ -21,7 +21,7 @@ import (
)
// UpdateTask update the status of the task
func UpdateTask(ctl operation.Controller, id int64, status string) error {
func UpdateTask(ctl operation.Controller, id int64, status string, statusRevision int64) error {
jobStatus := job.Status(status)
// convert the job status to task status
s := ""
@ -43,5 +43,5 @@ func UpdateTask(ctl operation.Controller, id int64, status string) error {
s = models.TaskStatusSucceed
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress)
}
return ctl.UpdateTaskStatus(id, s, preStatus...)
return ctl.UpdateTaskStatus(id, s, statusRevision, preStatus...)
}

View File

@ -46,7 +46,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo
func (f *fakedOperationController) GetTask(int64) (*models.Task, error) {
return nil, nil
}
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
f.status = status
return nil
}
@ -87,7 +87,7 @@ func TestUpdateTask(t *testing.T) {
}
for _, c := range cases {
err := UpdateTask(mgr, 1, c.inputStatus)
err := UpdateTask(mgr, 1, c.inputStatus, 1)
require.Nil(t, err)
assert.Equal(t, c.expectedStatus, mgr.status)
}