mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-22 16:48:30 +01:00
Add status revision to handle retrying in replication task
Add status revision to handle retrying in replication task Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
661470e7bc
commit
7924f37d86
@ -182,3 +182,5 @@ create table notification_policy (
|
||||
update_time timestamp default CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
ALTER TABLE replication_task ADD COLUMN status_revision int DEFAULT 0;
|
@ -66,7 +66,7 @@ func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {
|
||||
|
@ -239,9 +239,6 @@ func (r *ReplicationPolicyAPI) Delete() {
|
||||
}
|
||||
}
|
||||
|
||||
// the execution's status will not be updated if it is not queried
|
||||
// so need to check the status of tasks to determine the status of
|
||||
// the execution
|
||||
func hasRunningExecutions(policyID int64) (bool, error) {
|
||||
_, executions, err := replication.OperationCtl.ListExecutions(&models.ExecutionQuery{
|
||||
PolicyID: policyID,
|
||||
@ -253,35 +250,11 @@ func hasRunningExecutions(policyID int64) (bool, error) {
|
||||
if execution.Status != models.ExecutionStatusInProgress {
|
||||
continue
|
||||
}
|
||||
_, tasks, err := replication.OperationCtl.ListTasks(&models.TaskQuery{
|
||||
ExecutionID: execution.ID,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, task := range tasks {
|
||||
if isTaskRunning(task) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// return true if the status of the task is running or pending
|
||||
func isTaskRunning(task *models.Task) bool {
|
||||
if task == nil {
|
||||
return false
|
||||
}
|
||||
switch task.Status {
|
||||
case models.TaskStatusSucceed,
|
||||
models.TaskStatusStopped,
|
||||
models.TaskStatusFailed:
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ignore the credential for the registries
|
||||
func populateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
|
||||
if err := event.PopulateRegistries(registryMgr, policy); err != nil {
|
||||
|
@ -104,7 +104,7 @@ func (h *Handler) HandleReplicationScheduleJob() {
|
||||
// HandleReplicationTask handles the webhook of replication task
|
||||
func (h *Handler) HandleReplicationTask() {
|
||||
log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status)
|
||||
if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus); err != nil {
|
||||
if err := hook.UpdateTask(replication.OperationCtl, h.id, h.rawStatus, h.revision); err != nil {
|
||||
log.Errorf("failed to update the status of the replication task %d: %v", h.id, err)
|
||||
h.SendInternalServerError(err)
|
||||
return
|
||||
|
@ -131,12 +131,6 @@ func fillExecution(execution *models.Execution) error {
|
||||
}
|
||||
resetExecutionStatus(execution)
|
||||
|
||||
// if execution status changed to a final status, store to DB
|
||||
if executionFinished(execution.Status) {
|
||||
UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress,
|
||||
models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped,
|
||||
models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -329,23 +323,21 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) {
|
||||
// WHERE "id" IN ( SELECT T0."id" FROM "replication_task" T0 WHERE T0."id" = $3
|
||||
// AND T0."status" IN ($4, $5, $6))]`
|
||||
// which is not a "single" sql statement, this will cause issues when running in concurrency
|
||||
func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) {
|
||||
func UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) (int64, error) {
|
||||
params := []interface{}{}
|
||||
sql := `update replication_task set status = ? `
|
||||
params = append(params, status)
|
||||
|
||||
sql := `update replication_task set status = ?, status_revision = ?, end_time = ? `
|
||||
params = append(params, status, statusRevision)
|
||||
var t time.Time
|
||||
// when the task is in final status, update the endtime
|
||||
// when the task re-runs again, the endtime should be cleared
|
||||
// so set the endtime to null if the task isn't in final status
|
||||
if taskFinished(status) {
|
||||
// should update endTime
|
||||
sql += `, end_time = ? `
|
||||
params = append(params, time.Now())
|
||||
t = time.Now()
|
||||
}
|
||||
params = append(params, t)
|
||||
|
||||
sql += `where id = ? `
|
||||
params = append(params, id)
|
||||
if len(statusCondition) > 0 {
|
||||
sql += fmt.Sprintf(`and status in (%s) `, dao.ParamPlaceholderForIn(len(statusCondition)))
|
||||
params = append(params, statusCondition)
|
||||
}
|
||||
sql += fmt.Sprintf(`where id = ? and (status_revision < ? or status_revision = ? and status in (%s)) `, dao.ParamPlaceholderForIn(len(statusCondition)))
|
||||
params = append(params, id, statusRevision, statusRevision, statusCondition)
|
||||
|
||||
result, err := dao.GetOrmer().Raw(sql, params...).Exec()
|
||||
if err != nil {
|
||||
|
@ -99,6 +99,7 @@ func TestMethodOfTask(t *testing.T) {
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: "Initialized",
|
||||
StatusRevision: 1,
|
||||
StartTime: &now,
|
||||
}
|
||||
task2 := &models.Task{
|
||||
@ -108,6 +109,7 @@ func TestMethodOfTask(t *testing.T) {
|
||||
DstResource: "dstResource2",
|
||||
JobID: "jobID2",
|
||||
Status: "Stopped",
|
||||
StatusRevision: 1,
|
||||
StartTime: &now,
|
||||
EndTime: &now,
|
||||
}
|
||||
@ -151,11 +153,12 @@ func TestMethodOfTask(t *testing.T) {
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
// test update status
|
||||
n, err = UpdateTaskStatus(id1, "Succeed")
|
||||
n, err = UpdateTaskStatus(id1, "Succeed", 2, "Initialized")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
task, _ = GetTask(id1)
|
||||
assert.Equal(t, "Succeed", task.Status)
|
||||
assert.Equal(t, int64(2), task.StatusRevision)
|
||||
|
||||
// test delete
|
||||
require.Nil(t, DeleteTask(id1))
|
||||
@ -244,6 +247,7 @@ func TestExecutionFill2(t *testing.T) {
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: models.TaskStatusInProgress,
|
||||
StatusRevision: 1,
|
||||
StartTime: &now,
|
||||
}
|
||||
task2 := &models.Task{
|
||||
@ -254,6 +258,7 @@ func TestExecutionFill2(t *testing.T) {
|
||||
DstResource: "dstResource2",
|
||||
JobID: "jobID2",
|
||||
Status: "Stopped",
|
||||
StatusRevision: 1,
|
||||
StartTime: &now,
|
||||
EndTime: &now,
|
||||
}
|
||||
@ -275,7 +280,7 @@ func TestExecutionFill2(t *testing.T) {
|
||||
assert.Equal(t, 0, exe.Succeed)
|
||||
|
||||
// update task status and query and fill
|
||||
UpdateTaskStatus(taskID1, models.TaskStatusFailed)
|
||||
UpdateTaskStatus(taskID1, models.TaskStatusFailed, 2, models.TaskStatusInProgress)
|
||||
exes, err := GetExecutions(&models.ExecutionQuery{
|
||||
PolicyID: 11209,
|
||||
})
|
||||
|
@ -117,6 +117,7 @@ type Task struct {
|
||||
Operation string `orm:"column(operation)" json:"operation"`
|
||||
JobID string `orm:"column(job_id)" json:"job_id"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
StatusRevision int64 `orm:"column(status_revision)"`
|
||||
StartTime *time.Time `orm:"column(start_time)" json:"start_time"`
|
||||
EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"`
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo
|
||||
func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {
|
||||
|
@ -17,6 +17,7 @@ package operation
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
@ -39,7 +40,7 @@ type Controller interface {
|
||||
GetExecution(int64) (*models.Execution, error)
|
||||
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
|
||||
GetTask(int64) (*models.Task, error)
|
||||
UpdateTaskStatus(id int64, status string, statusCondition ...string) error
|
||||
UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error
|
||||
GetTaskLog(int64) ([]byte, error)
|
||||
}
|
||||
|
||||
@ -50,6 +51,7 @@ const (
|
||||
var (
|
||||
statusBehindErrorPattern = "mismatch job status for stopping job: .*, job status (.*) is behind Running"
|
||||
statusBehindErrorReg = regexp.MustCompile(statusBehindErrorPattern)
|
||||
jobNotFoundErrorMsg = "object is not found"
|
||||
)
|
||||
|
||||
// NewController returns a controller implementation
|
||||
@ -169,7 +171,10 @@ func (c *controller) StopReplication(executionID int64) error {
|
||||
case hjob.SuccessStatus:
|
||||
status = models.TaskStatusSucceed
|
||||
}
|
||||
e := c.executionMgr.UpdateTaskStatus(task.ID, status)
|
||||
e := c.executionMgr.UpdateTask(&models.Task{
|
||||
ID: task.ID,
|
||||
Status: status,
|
||||
}, "Status")
|
||||
if e != nil {
|
||||
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
|
||||
} else {
|
||||
@ -177,6 +182,18 @@ func (c *controller) StopReplication(executionID int64) error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
if isJobNotFoundError(err) {
|
||||
e := c.executionMgr.UpdateTask(&models.Task{
|
||||
ID: task.ID,
|
||||
Status: models.ExecutionStatusStopped,
|
||||
}, "Status")
|
||||
if e != nil {
|
||||
log.Errorf("failed to update the status the task %d(job ID: %s): %v", task.ID, task.JobID, e)
|
||||
} else {
|
||||
log.Debugf("got job not found error for task %d, update it's status to %s directly", task.ID, models.ExecutionStatusStopped)
|
||||
}
|
||||
continue
|
||||
}
|
||||
log.Errorf("failed to stop the task %d(job ID: %s): %v", task.ID, task.JobID, err)
|
||||
continue
|
||||
}
|
||||
@ -209,6 +226,13 @@ func isStatusBehindError(err error) (string, bool) {
|
||||
return strs[1], true
|
||||
}
|
||||
|
||||
func isJobNotFoundError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(err.Error(), jobNotFoundErrorMsg)
|
||||
}
|
||||
|
||||
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
return c.executionMgr.List(query...)
|
||||
}
|
||||
@ -221,8 +245,8 @@ func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Tas
|
||||
func (c *controller) GetTask(id int64) (*models.Task, error) {
|
||||
return c.executionMgr.GetTask(id)
|
||||
}
|
||||
func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
|
||||
func (c *controller) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
|
||||
return c.executionMgr.UpdateTaskStatus(id, status, statusRevision, statusCondition...)
|
||||
}
|
||||
func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
|
||||
return c.executionMgr.GetTaskLog(taskID)
|
||||
|
@ -75,7 +75,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
||||
@ -333,7 +333,7 @@ func TestGetTask(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateTaskStatus(t *testing.T) {
|
||||
err := ctl.UpdateTaskStatus(1, "running")
|
||||
err := ctl.UpdateTaskStatus(1, "running", 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ type Manager interface {
|
||||
// UpdateTaskStatus only updates the task status. If "statusCondition"
|
||||
// presents, only the tasks whose status equal to "statusCondition"
|
||||
// will be updated
|
||||
UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error
|
||||
UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error
|
||||
// Remove one task specified by task ID
|
||||
RemoveTask(int64) error
|
||||
// Remove all tasks of one execution specified by the execution ID
|
||||
@ -151,8 +151,8 @@ func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error {
|
||||
}
|
||||
|
||||
// UpdateTaskStatus ...
|
||||
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error {
|
||||
if _, err := dao.UpdateTaskStatus(taskID, status, statusCondition...); err != nil {
|
||||
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64, statusCondition ...string) error {
|
||||
if _, err := dao.UpdateTaskStatus(taskID, status, statusRevision, statusCondition...); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -82,6 +82,7 @@ func TestMethodOfTaskManager(t *testing.T) {
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: "Initialized",
|
||||
StatusRevision: 1,
|
||||
StartTime: &now,
|
||||
}
|
||||
|
||||
@ -121,7 +122,7 @@ func TestMethodOfTaskManager(t *testing.T) {
|
||||
assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource)
|
||||
|
||||
// UpdateTaskStatus
|
||||
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed)
|
||||
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed, 1, models.TaskStatusInitialized)
|
||||
require.Nil(t, err)
|
||||
taskUpdate, _ = executionManager.GetTask(id)
|
||||
assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status)
|
||||
|
@ -279,19 +279,23 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite
|
||||
for _, result := range results {
|
||||
// if the task is failed to be submitted, update the status of the
|
||||
// task as failure
|
||||
now := time.Now()
|
||||
if result.Error != nil {
|
||||
log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error)
|
||||
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
|
||||
if err = executionMgr.UpdateTask(&models.Task{
|
||||
ID: result.TaskID,
|
||||
Status: models.TaskStatusFailed,
|
||||
EndTime: &now,
|
||||
}, "Status", "EndTime"); err != nil {
|
||||
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
allFailed = false
|
||||
// if the task is submitted successfully, update the status, job ID and start time
|
||||
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
|
||||
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, 0, models.TaskStatusInitialized); err != nil {
|
||||
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
now := time.Now()
|
||||
if err = executionMgr.UpdateTask(&models.Task{
|
||||
ID: result.TaskID,
|
||||
JobID: result.JobID,
|
||||
|
@ -176,7 +176,7 @@ func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, int64, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
// UpdateTask update the status of the task
|
||||
func UpdateTask(ctl operation.Controller, id int64, status string) error {
|
||||
func UpdateTask(ctl operation.Controller, id int64, status string, statusRevision int64) error {
|
||||
jobStatus := job.Status(status)
|
||||
// convert the job status to task status
|
||||
s := ""
|
||||
@ -43,5 +43,5 @@ func UpdateTask(ctl operation.Controller, id int64, status string) error {
|
||||
s = models.TaskStatusSucceed
|
||||
preStatus = append(preStatus, models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress)
|
||||
}
|
||||
return ctl.UpdateTaskStatus(id, s, preStatus...)
|
||||
return ctl.UpdateTaskStatus(id, s, statusRevision, preStatus...)
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*mo
|
||||
func (f *fakedOperationController) GetTask(int64) (*models.Task, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusRevision int64, statusCondition ...string) error {
|
||||
f.status = status
|
||||
return nil
|
||||
}
|
||||
@ -87,7 +87,7 @@ func TestUpdateTask(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
err := UpdateTask(mgr, 1, c.inputStatus)
|
||||
err := UpdateTask(mgr, 1, c.inputStatus, 1)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, c.expectedStatus, mgr.status)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user