Merge pull request #8400 from ywk253100/190725_stop_rerention

Support to stop one execution of retention
This commit is contained in:
Wenkai Yin(尹文开) 2019-07-26 13:33:15 +08:00 committed by GitHub
commit 8bc7f78fe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 90 additions and 31 deletions

View File

@ -40,6 +40,7 @@ create table retention_task
(
id SERIAL NOT NULL,
execution_id integer,
job_id varchar(64),
status varchar(32),
start_time timestamp default CURRENT_TIMESTAMP,
end_time timestamp default CURRENT_TIMESTAMP,

View File

@ -18,14 +18,13 @@ import (
"encoding/json"
"time"
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job"
jobmodels "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api"
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/replication"
"github.com/goharbor/harbor/src/replication/operation/hook"
"github.com/goharbor/harbor/src/replication/policy/scheduler"
@ -115,7 +114,8 @@ func (h *Handler) HandleRetentionTask() {
ID: h.id,
Status: h.status,
}
if h.status == models.JobFinished {
if h.status == models.JobFinished || h.status == models.JobError ||
h.status == models.JobStopped {
task.EndTime = time.Now()
}
props = append(props, "EndTime")

View File

@ -218,21 +218,15 @@ func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) er
if err != nil {
return err
}
if e == nil {
return fmt.Errorf("execution %d not found", eid)
}
switch action {
case "stop":
if e.Status != ExecutionStatusInProgress {
return fmt.Errorf("cannot abort, current status is %s", e.Status)
}
e.Status = ExecutionStatusStopped
e.EndTime = time.Now()
// TODO: STOP THE EXECUTION
return r.launcher.Stop(eid)
default:
return fmt.Errorf("not support action %s", action)
}
return r.manager.UpdateExecution(e)
}
// ListRetentionExecs List Retention Executions

View File

@ -11,7 +11,6 @@ func init() {
new(RetentionPolicy),
new(RetentionExecution),
new(RetentionTask),
new(RetentionScheduleJob),
)
}
@ -49,17 +48,8 @@ type RetentionExecution struct {
type RetentionTask struct {
ID int64 `orm:"pk;auto;column(id)"`
ExecutionID int64 `orm:"column(execution_id)"`
JobID string `orm:"column(job_id)"`
Status string `orm:"column(status)"`
StartTime time.Time `orm:"column(start_time)"`
EndTime time.Time `orm:"column(end_time)"`
}
// RetentionScheduleJob Retention Schedule Job
type RetentionScheduleJob struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
Status string
PolicyID int64 `orm:"column(policy_id)"`
JobID int64 `orm:"column(job_id)"`
CreateTime time.Time
UpdateTime time.Time
}

View File

@ -73,7 +73,7 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
// Log stage: start
repoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s", repoPath, liteMeta.Algorithm)
myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s \n Dry Run: %f", repoPath, liteMeta.Algorithm, isDryRun)
// Stop check point 1:
if isStopped(ctx) {

View File

@ -16,9 +16,11 @@ package retention
import (
"fmt"
"time"
cjob "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
cmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
@ -27,6 +29,7 @@ import (
"github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
"github.com/goharbor/harbor/src/pkg/retention/q"
"github.com/goharbor/harbor/src/pkg/retention/res"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors"
"github.com/pkg/errors"
@ -55,6 +58,14 @@ type Launcher interface {
// int64 : the count of tasks
// error : common error if any errors occurred
Launch(policy *policy.Metadata, executionID int64, isDryRun bool) (int64, error)
// Stop the jobs for one execution
//
// Arguments:
// executionID int64 : the execution ID
//
// Returns:
// error : common error if any errors occurred
Stop(executionID int64) error
}
// NewLauncher returns an instance of Launcher
@ -172,9 +183,11 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
// create task records
jobDatas := make([]*jobData, 0)
now := time.Now()
for repo, p := range repositoryRules {
taskID, err := l.retentionMgr.CreateTask(&Task{
ExecutionID: executionID,
StartTime: now,
})
if err != nil {
return 0, launcherError(err)
@ -200,19 +213,52 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
ParamMeta: jobData.policy,
ParamDryRun: isDryRun,
}
_, err := l.jobserviceClient.SubmitJob(j)
task := &Task{
ID: jobData.taskID,
}
props := []string{"Status"}
jobID, err := l.jobserviceClient.SubmitJob(j)
if err != nil {
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.taskID, err)))
continue
task.Status = cmodels.JobError
task.EndTime = time.Now()
props = append(props, "EndTime")
} else {
allFailed = false
task.JobID = jobID
task.Status = cmodels.JobPending
props = append(props, "JobID")
}
if err = l.retentionMgr.UpdateTask(task, props...); err != nil {
log.Errorf("failed to update the status of task %d: %v", task.ID, err)
}
allFailed = false
}
if allFailed {
return 0, launcherError(fmt.Errorf("all tasks failed"))
}
return int64(len(jobDatas)), nil
}
func (l *launcher) Stop(executionID int64) error {
if executionID <= 0 {
return launcherError(fmt.Errorf("invalid execution ID: %d", executionID))
}
tasks, err := l.retentionMgr.ListTasks(&q.TaskQuery{
ExecutionID: executionID,
})
if err != nil {
return err
}
for _, task := range tasks {
if err = l.jobserviceClient.PostAction(task.JobID, cjob.JobActionStop); err != nil {
log.Errorf("failed to stop task %d, job ID: %s : %v", task.ID, task.JobID, err)
continue
}
}
return nil
}
func launcherError(err error) error {
return errors.Wrap(err, "launcher")
}

View File

@ -98,7 +98,13 @@ func (f *fakeRetentionManager) GetExecution(eid int64) (*Execution, error) {
return nil, nil
}
func (f *fakeRetentionManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
return nil, nil
return []*Task{
{
ID: 1,
ExecutionID: 1,
JobID: "1",
},
}, nil
}
func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) {
return 0, nil
@ -149,7 +155,9 @@ func (l *launchTestSuite) SetupTest() {
},
}
l.retentionMgr = &fakeRetentionManager{}
l.jobserviceClient = &hjob.MockJobClient{}
l.jobserviceClient = &hjob.MockJobClient{
JobUUID: []string{"1"},
}
}
func (l *launchTestSuite) TestGetProjects() {
@ -231,6 +239,22 @@ func (l *launchTestSuite) TestLaunch() {
assert.Equal(l.T(), int64(2), n)
}
func (l *launchTestSuite) TestStop() {
t := l.T()
launcher := &launcher{
projectMgr: l.projectMgr,
repositoryMgr: l.repositoryMgr,
retentionMgr: l.retentionMgr,
jobserviceClient: l.jobserviceClient,
}
// invalid execution ID
err := launcher.Stop(0)
require.NotNil(t, err)
err = launcher.Stop(1)
require.Nil(t, err)
}
func TestLaunchTestSuite(t *testing.T) {
suite.Run(t, new(launchTestSuite))
}

View File

@ -18,9 +18,9 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/astaxie/beego/orm"
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/pkg/retention/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy"
@ -173,6 +173,7 @@ func (d *DefaultManager) CreateTask(task *Task) (int64, error) {
}
t := &models.RetentionTask{
ExecutionID: task.ExecutionID,
JobID: task.JobID,
Status: task.Status,
StartTime: task.StartTime,
EndTime: task.EndTime,
@ -194,6 +195,7 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
tasks = append(tasks, &Task{
ID: t.ID,
ExecutionID: t.ExecutionID,
JobID: t.JobID,
Status: t.Status,
StartTime: t.StartTime,
EndTime: t.EndTime,
@ -213,6 +215,7 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
return dao.UpdateTask(&models.RetentionTask{
ID: task.ID,
ExecutionID: task.ExecutionID,
JobID: task.JobID,
Status: task.Status,
StartTime: task.StartTime,
EndTime: task.EndTime,

View File

@ -51,6 +51,7 @@ type Execution struct {
type Task struct {
ID int64 `json:"id"`
ExecutionID int64 `json:"execution_id"`
JobID string `json:"job_id"`
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`