From 78fc2846bd856181f5fc069fab60af4fa435e02a Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 4 Apr 2019 14:51:37 +0800 Subject: [PATCH] Stop a replication execution Implement the function to support to stop the replication execution Signed-off-by: Wenkai Yin --- src/replication/ng/operation/controller.go | 46 ++++++++++++- .../ng/operation/controller_test.go | 65 +++++++++++++++++++ src/replication/ng/policy/manager/manager.go | 7 ++ 3 files changed, 117 insertions(+), 1 deletion(-) diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 2c7f702be..a1c7151f9 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -16,6 +16,7 @@ package operation import ( "fmt" + "strings" "time" "github.com/goharbor/harbor/src/common/job" @@ -119,9 +120,52 @@ func (c *controller) createFlow(executionID int64, policy *model.Policy, resourc } func (c *controller) StopReplication(executionID int64) error { - // TODO implement the function + _, tasks, err := c.ListTasks(&models.TaskQuery{ + ExecutionID: executionID, + }) + if err != nil { + return err + } + for _, task := range tasks { + if !isTaskRunning(task) { + log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status) + continue + } + if err = c.scheduler.Stop(task.JobID); err != nil { + if isNotRunningJobError(err) { + log.Warningf("got not running job error when trying stop the task %d(job ID: %s): %v, skip", task.ID, task.JobID, err) + continue + } + return err + } + log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID) + } return nil } + +func isTaskRunning(task *models.Task) bool { + if task == nil { + return false + } + switch task.Status { + case models.TaskStatusSucceed, + models.TaskStatusStopped, + models.TaskStatusFailed: + return false + } + return true +} + +// when trying to stop a job which isn't running in jobservice, +// an error whose message contains "xxx is not a running job" +// will be returned +func isNotRunningJobError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "is not a running job") +} + func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return c.executionMgr.List(query...) } diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go index ff28fb7f7..35efb240f 100644 --- a/src/replication/ng/operation/controller_test.go +++ b/src/replication/ng/operation/controller_test.go @@ -15,6 +15,7 @@ package operation import ( + "errors" "io" "testing" @@ -295,3 +296,67 @@ func TestGetTaskLog(t *testing.T) { require.Nil(t, err) assert.Equal(t, "message", string(log)) } + +func TestIsTaskRunning(t *testing.T) { + cases := []struct { + task *models.Task + isRunning bool + }{ + { + task: nil, + isRunning: false, + }, + { + task: &models.Task{ + Status: models.TaskStatusSucceed, + }, + isRunning: false, + }, + { + task: &models.Task{ + Status: models.TaskStatusFailed, + }, + isRunning: false, + }, + { + task: &models.Task{ + Status: models.TaskStatusStopped, + }, + isRunning: false, + }, + { + task: &models.Task{ + Status: models.TaskStatusInProgress, + }, + isRunning: true, + }, + } + + for _, c := range cases { + assert.Equal(t, c.isRunning, isTaskRunning(c.task)) + } +} + +func TestIsNotRunningJobError(t *testing.T) { + cases := []struct { + err error + isNotRunningJobError bool + }{ + { + err: nil, + isNotRunningJobError: false, + }, + { + err: errors.New("not the error"), + isNotRunningJobError: false, + }, + { + err: errors.New(`[ERROR] [handler.go:253]: Serve http request 'POST /api/v1/jobs/734a11140d939ef700889725' error: 500 {"code":10008,"message":"Stop job failed with error","details":"job '734a11140d939ef700889725' is not a running job"}`), + isNotRunningJobError: true, + }, + } + + for _, c := range cases { + assert.Equal(t, c.isNotRunningJobError, isNotRunningJobError(c.err)) + } +} diff --git a/src/replication/ng/policy/manager/manager.go b/src/replication/ng/policy/manager/manager.go index 282065b1b..db6d4d9b3 100644 --- a/src/replication/ng/policy/manager/manager.go +++ b/src/replication/ng/policy/manager/manager.go @@ -67,6 +67,13 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (*model.Policy, e if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil { return nil, err } + // convert the type of value from string to model.ResourceType if the filter + // is a resource type filter + for _, filter := range filters { + if filter.Type == model.FilterTypeResource { + filter.Value = (model.ResourceType)(filter.Value.(string)) + } + } ply.Filters = filters }