Merge pull request #7298 from ywk253100/190404_filter_exec_task

Stop a replication execution
This commit is contained in:
Wenkai Yin 2019-04-04 15:41:42 +08:00 committed by GitHub
commit 9db85f02d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 1 deletions

View File

@ -16,6 +16,7 @@ package operation
import (
"fmt"
"strings"
"time"
"github.com/goharbor/harbor/src/common/job"
@ -119,9 +120,52 @@ func (c *controller) createFlow(executionID int64, policy *model.Policy, resourc
}
func (c *controller) StopReplication(executionID int64) error {
// TODO implement the function
_, tasks, err := c.ListTasks(&models.TaskQuery{
ExecutionID: executionID,
})
if err != nil {
return err
}
for _, task := range tasks {
if !isTaskRunning(task) {
log.Debugf("the task %d(job ID: %s) isn't running, its status is %s, skip", task.ID, task.JobID, task.Status)
continue
}
if err = c.scheduler.Stop(task.JobID); err != nil {
if isNotRunningJobError(err) {
log.Warningf("got not running job error when trying stop the task %d(job ID: %s): %v, skip", task.ID, task.JobID, err)
continue
}
return err
}
log.Debugf("the stop request for task %d(job ID: %s) sent", task.ID, task.JobID)
}
return nil
}
func isTaskRunning(task *models.Task) bool {
if task == nil {
return false
}
switch task.Status {
case models.TaskStatusSucceed,
models.TaskStatusStopped,
models.TaskStatusFailed:
return false
}
return true
}
// when trying to stop a job which isn't running in jobservice,
// an error whose message contains "xxx is not a running job"
// will be returned
func isNotRunningJobError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "is not a running job")
}
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return c.executionMgr.List(query...)
}

View File

@ -15,6 +15,7 @@
package operation
import (
"errors"
"io"
"testing"
@ -295,3 +296,67 @@ func TestGetTaskLog(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, "message", string(log))
}
func TestIsTaskRunning(t *testing.T) {
cases := []struct {
task *models.Task
isRunning bool
}{
{
task: nil,
isRunning: false,
},
{
task: &models.Task{
Status: models.TaskStatusSucceed,
},
isRunning: false,
},
{
task: &models.Task{
Status: models.TaskStatusFailed,
},
isRunning: false,
},
{
task: &models.Task{
Status: models.TaskStatusStopped,
},
isRunning: false,
},
{
task: &models.Task{
Status: models.TaskStatusInProgress,
},
isRunning: true,
},
}
for _, c := range cases {
assert.Equal(t, c.isRunning, isTaskRunning(c.task))
}
}
func TestIsNotRunningJobError(t *testing.T) {
cases := []struct {
err error
isNotRunningJobError bool
}{
{
err: nil,
isNotRunningJobError: false,
},
{
err: errors.New("not the error"),
isNotRunningJobError: false,
},
{
err: errors.New(`[ERROR] [handler.go:253]: Serve http request 'POST /api/v1/jobs/734a11140d939ef700889725' error: 500 {"code":10008,"message":"Stop job failed with error","details":"job '734a11140d939ef700889725' is not a running job"}`),
isNotRunningJobError: true,
},
}
for _, c := range cases {
assert.Equal(t, c.isNotRunningJobError, isNotRunningJobError(c.err))
}
}

View File

@ -67,6 +67,13 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (*model.Policy, e
if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil {
return nil, err
}
// convert the type of value from string to model.ResourceType if the filter
// is a resource type filter
for _, filter := range filters {
if filter.Type == model.FilterTypeResource {
filter.Value = (model.ResourceType)(filter.Value.(string))
}
}
ply.Filters = filters
}