From 2e9521ad458e0d3ea39d582fe0c35e6979a0a2e2 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 25 Jul 2019 15:25:23 +0800 Subject: [PATCH] Support to stop one execution of retention Support to stop one execution of retention Signed-off-by: Wenkai Yin --- .../postgresql/0010_1.9.0_schema.up.sql | 1 + .../service/notifications/jobs/handler.go | 6 +-- src/pkg/retention/controller.go | 14 ++--- src/pkg/retention/dao/models/retention.go | 12 +---- src/pkg/retention/job.go | 2 +- src/pkg/retention/launcher.go | 52 +++++++++++++++++-- src/pkg/retention/launcher_test.go | 28 +++++++++- src/pkg/retention/manager.go | 5 +- src/pkg/retention/models.go | 1 + 9 files changed, 90 insertions(+), 31 deletions(-) diff --git a/make/migrations/postgresql/0010_1.9.0_schema.up.sql b/make/migrations/postgresql/0010_1.9.0_schema.up.sql index 3f60dbf07..1a10420d6 100644 --- a/make/migrations/postgresql/0010_1.9.0_schema.up.sql +++ b/make/migrations/postgresql/0010_1.9.0_schema.up.sql @@ -40,6 +40,7 @@ create table retention_task ( id SERIAL NOT NULL, execution_id integer, + job_id varchar(64), status varchar(32), start_time timestamp default CURRENT_TIMESTAMP, end_time timestamp default CURRENT_TIMESTAMP, diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index 4509bfdd2..ac153ecb9 100644 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -18,14 +18,13 @@ import ( "encoding/json" "time" - "github.com/goharbor/harbor/src/pkg/retention" - "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/job" jobmodels "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" + "github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/replication" "github.com/goharbor/harbor/src/replication/operation/hook" "github.com/goharbor/harbor/src/replication/policy/scheduler" @@ -115,7 +114,8 @@ func (h *Handler) HandleRetentionTask() { ID: h.id, Status: h.status, } - if h.status == models.JobFinished { + if h.status == models.JobFinished || h.status == models.JobError || + h.status == models.JobStopped { task.EndTime = time.Now() } props = append(props, "EndTime") diff --git a/src/pkg/retention/controller.go b/src/pkg/retention/controller.go index 53f0088b8..fc425fcad 100644 --- a/src/pkg/retention/controller.go +++ b/src/pkg/retention/controller.go @@ -218,21 +218,15 @@ func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) er if err != nil { return err } - + if e == nil { + return fmt.Errorf("execution %d not found", eid) + } switch action { case "stop": - if e.Status != ExecutionStatusInProgress { - return fmt.Errorf("cannot abort, current status is %s", e.Status) - } - - e.Status = ExecutionStatusStopped - e.EndTime = time.Now() - // TODO: STOP THE EXECUTION + return r.launcher.Stop(eid) default: return fmt.Errorf("not support action %s", action) } - - return r.manager.UpdateExecution(e) } // ListRetentionExecs List Retention Executions diff --git a/src/pkg/retention/dao/models/retention.go b/src/pkg/retention/dao/models/retention.go index 3ff35147c..a3d24de70 100644 --- a/src/pkg/retention/dao/models/retention.go +++ b/src/pkg/retention/dao/models/retention.go @@ -11,7 +11,6 @@ func init() { new(RetentionPolicy), new(RetentionExecution), new(RetentionTask), - new(RetentionScheduleJob), ) } @@ -49,17 +48,8 @@ type RetentionExecution struct { type RetentionTask struct { ID int64 `orm:"pk;auto;column(id)"` ExecutionID int64 `orm:"column(execution_id)"` + JobID string `orm:"column(job_id)"` Status string `orm:"column(status)"` StartTime time.Time `orm:"column(start_time)"` EndTime time.Time `orm:"column(end_time)"` } - -// RetentionScheduleJob Retention Schedule Job -type RetentionScheduleJob struct { - ID int64 `orm:"pk;auto;column(id)" json:"id"` - Status string - PolicyID int64 `orm:"column(policy_id)"` - JobID int64 `orm:"column(job_id)"` - CreateTime time.Time - UpdateTime time.Time -} diff --git a/src/pkg/retention/job.go b/src/pkg/retention/job.go index 3fb782749..bce40e427 100644 --- a/src/pkg/retention/job.go +++ b/src/pkg/retention/job.go @@ -73,7 +73,7 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error { // Log stage: start repoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) - myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s", repoPath, liteMeta.Algorithm) + myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s \n Dry Run: %f", repoPath, liteMeta.Algorithm, isDryRun) // Stop check point 1: if isStopped(ctx) { diff --git a/src/pkg/retention/launcher.go b/src/pkg/retention/launcher.go index 2f931f4af..92006ce63 100644 --- a/src/pkg/retention/launcher.go +++ b/src/pkg/retention/launcher.go @@ -16,9 +16,11 @@ package retention import ( "fmt" + "time" cjob "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job/models" + cmodels "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/config" @@ -27,6 +29,7 @@ import ( "github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/lwp" + "github.com/goharbor/harbor/src/pkg/retention/q" "github.com/goharbor/harbor/src/pkg/retention/res" "github.com/goharbor/harbor/src/pkg/retention/res/selectors" "github.com/pkg/errors" @@ -55,6 +58,14 @@ type Launcher interface { // int64 : the count of tasks // error : common error if any errors occurred Launch(policy *policy.Metadata, executionID int64, isDryRun bool) (int64, error) + // Stop the jobs for one execution + // + // Arguments: + // executionID int64 : the execution ID + // + // Returns: + // error : common error if any errors occurred + Stop(executionID int64) error } // NewLauncher returns an instance of Launcher @@ -172,9 +183,11 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool // create task records jobDatas := make([]*jobData, 0) + now := time.Now() for repo, p := range repositoryRules { taskID, err := l.retentionMgr.CreateTask(&Task{ ExecutionID: executionID, + StartTime: now, }) if err != nil { return 0, launcherError(err) @@ -200,19 +213,52 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool ParamMeta: jobData.policy, ParamDryRun: isDryRun, } - _, err := l.jobserviceClient.SubmitJob(j) + task := &Task{ + ID: jobData.taskID, + } + props := []string{"Status"} + jobID, err := l.jobserviceClient.SubmitJob(j) if err != nil { log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.taskID, err))) - continue + task.Status = cmodels.JobError + task.EndTime = time.Now() + props = append(props, "EndTime") + } else { + allFailed = false + task.JobID = jobID + task.Status = cmodels.JobPending + props = append(props, "JobID") + } + if err = l.retentionMgr.UpdateTask(task, props...); err != nil { + log.Errorf("failed to update the status of task %d: %v", task.ID, err) } - allFailed = false } + if allFailed { return 0, launcherError(fmt.Errorf("all tasks failed")) } return int64(len(jobDatas)), nil } +func (l *launcher) Stop(executionID int64) error { + if executionID <= 0 { + return launcherError(fmt.Errorf("invalid execution ID: %d", executionID)) + } + tasks, err := l.retentionMgr.ListTasks(&q.TaskQuery{ + ExecutionID: executionID, + }) + if err != nil { + return err + } + for _, task := range tasks { + if err = l.jobserviceClient.PostAction(task.JobID, cjob.JobActionStop); err != nil { + log.Errorf("failed to stop task %d, job ID: %s : %v", task.ID, task.JobID, err) + continue + } + } + return nil +} + func launcherError(err error) error { return errors.Wrap(err, "launcher") } diff --git a/src/pkg/retention/launcher_test.go b/src/pkg/retention/launcher_test.go index b8f741dec..8f80380a9 100644 --- a/src/pkg/retention/launcher_test.go +++ b/src/pkg/retention/launcher_test.go @@ -98,7 +98,13 @@ func (f *fakeRetentionManager) GetExecution(eid int64) (*Execution, error) { return nil, nil } func (f *fakeRetentionManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) { - return nil, nil + return []*Task{ + { + ID: 1, + ExecutionID: 1, + JobID: "1", + }, + }, nil } func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) { return 0, nil @@ -149,7 +155,9 @@ func (l *launchTestSuite) SetupTest() { }, } l.retentionMgr = &fakeRetentionManager{} - l.jobserviceClient = &hjob.MockJobClient{} + l.jobserviceClient = &hjob.MockJobClient{ + JobUUID: []string{"1"}, + } } func (l *launchTestSuite) TestGetProjects() { @@ -231,6 +239,22 @@ func (l *launchTestSuite) TestLaunch() { assert.Equal(l.T(), int64(2), n) } +func (l *launchTestSuite) TestStop() { + t := l.T() + launcher := &launcher{ + projectMgr: l.projectMgr, + repositoryMgr: l.repositoryMgr, + retentionMgr: l.retentionMgr, + jobserviceClient: l.jobserviceClient, + } + // invalid execution ID + err := launcher.Stop(0) + require.NotNil(t, err) + + err = launcher.Stop(1) + require.Nil(t, err) +} + func TestLaunchTestSuite(t *testing.T) { suite.Run(t, new(launchTestSuite)) } diff --git a/src/pkg/retention/manager.go b/src/pkg/retention/manager.go index 8156928b7..c9c1d8cf0 100644 --- a/src/pkg/retention/manager.go +++ b/src/pkg/retention/manager.go @@ -18,9 +18,9 @@ import ( "encoding/json" "errors" "fmt" - "github.com/astaxie/beego/orm" "time" + "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/pkg/retention/dao" "github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/policy" @@ -173,6 +173,7 @@ func (d *DefaultManager) CreateTask(task *Task) (int64, error) { } t := &models.RetentionTask{ ExecutionID: task.ExecutionID, + JobID: task.JobID, Status: task.Status, StartTime: task.StartTime, EndTime: task.EndTime, @@ -194,6 +195,7 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) { tasks = append(tasks, &Task{ ID: t.ID, ExecutionID: t.ExecutionID, + JobID: t.JobID, Status: t.Status, StartTime: t.StartTime, EndTime: t.EndTime, @@ -213,6 +215,7 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error { return dao.UpdateTask(&models.RetentionTask{ ID: task.ID, ExecutionID: task.ExecutionID, + JobID: task.JobID, Status: task.Status, StartTime: task.StartTime, EndTime: task.EndTime, diff --git a/src/pkg/retention/models.go b/src/pkg/retention/models.go index 9c963b144..ebbc524ad 100644 --- a/src/pkg/retention/models.go +++ b/src/pkg/retention/models.go @@ -51,6 +51,7 @@ type Execution struct { type Task struct { ID int64 `json:"id"` ExecutionID int64 `json:"execution_id"` + JobID string `json:"job_id"` Status string `json:"status"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"`