Merge pull request #13853 from bitsf/retention_task_manager

feat(retention) refactor task manager
This commit is contained in:
Wenkai Yin(尹文开) 2021-01-05 18:53:09 +08:00 committed by GitHub
commit 17400acea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 439 additions and 487 deletions

View File

@ -513,3 +513,85 @@ CREATE TABLE IF NOT EXISTS "report_vulnerability_record" (
CONSTRAINT fk_vuln_record_id FOREIGN KEY(vuln_record_id) REFERENCES vulnerability_record(id) ON DELETE CASCADE,
CONSTRAINT fk_report_uuid FOREIGN KEY(report_uuid) REFERENCES scan_report(uuid) ON DELETE CASCADE
);
/*delete the retention execution records whose policy doesn't exist*/
DELETE FROM retention_execution
WHERE id IN (SELECT re.id FROM retention_execution re
LEFT JOIN retention_policy rp ON re.policy_id=rp.id
WHERE rp.id IS NULL);
/*delete the replication task records whose execution doesn't exist*/
DELETE FROM retention_task
WHERE id IN (SELECT rt.id FROM retention_task rt
LEFT JOIN retention_execution re ON rt.execution_id=re.id
WHERE re.id IS NULL);
/*move the replication execution records into the new execution table*/
ALTER TABLE retention_execution ADD COLUMN IF NOT EXISTS new_execution_id int;
DO $$
DECLARE
rep_exec RECORD;
status_count RECORD;
trigger varchar(64);
rep_status varchar(32);
rep_end_time timestamp;
new_exec_id integer;
in_progress integer;
failed integer;
success integer;
stopped integer;
BEGIN
FOR rep_exec IN SELECT * FROM retention_execution where new_execution_id is null
LOOP
FOR status_count IN SELECT status, COUNT(*) as c FROM retention_task WHERE execution_id=rep_exec.id GROUP BY status
LOOP
IF status_count.status = 'Scheduled' or status_count.status = 'Pending' or status_count.status = 'Running' THEN
in_progress = in_progress + status_count.c;
ELSIF status_count.status = 'Stopped' THEN
stopped = stopped + status_count.c;
ELSIF status_count.status = 'Error' THEN
failed = failed + status_count.c;
ELSE
success = success + status_count.c;
END IF;
END LOOP;
IF in_progress>0 THEN
rep_status = 'InProgress';
ELSIF failed>0 THEN
rep_status = 'Failed';
ELSIF stopped>0 THEN
rep_status = 'Stopped';
ELSE
rep_status = 'Succeed';
END IF;
select max(end_time) into rep_end_time from retention_task where execution_id = rep_exec.id;
INSERT INTO execution (vendor_type, vendor_id, status, revision, trigger, start_time, end_time, extra_attrs)
VALUES ('RETENTION', rep_exec.policy_id, rep_status, 0, rep_exec.trigger, rep_exec.start_time, rep_end_time,
CONCAT('{"dry_run": ', case rep_exec.dry_run when 't' then 'true' else 'false' end, '}')::json) RETURNING id INTO new_exec_id;
UPDATE retention_execution SET new_execution_id=new_exec_id WHERE id=rep_exec.id;
END LOOP;
END $$;
/*move the replication task records into the new task table*/
DO $$
DECLARE
rep_task RECORD;
status_code integer;
BEGIN
FOR rep_task IN SELECT * FROM retention_task
LOOP
INSERT INTO task (vendor_type, execution_id, job_id, status, status_code, status_revision,
run_count, extra_attrs, creation_time, start_time, update_time, end_time)
VALUES ('RETENTION', (SELECT new_execution_id FROM retention_execution WHERE id=rep_task.execution_id),
rep_task.job_id, rep_task.status, rep_task.status_code, rep_task.status_revision,
1, CONCAT('{"total":"', rep_task.total,'","retained":"', rep_task.retained,'"}')::json,
rep_task.start_time, rep_task.start_time, rep_task.end_time, rep_task.end_time);
END LOOP;
END $$;
DROP TABLE IF EXISTS replication_task;
DROP TABLE IF EXISTS replication_execution;

View File

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/goharbor/harbor/src/pkg/task"
"net/http"
"github.com/ghodss/yaml"
@ -45,9 +46,6 @@ const (
// the managers/controllers used globally
var (
projectMgr project.Manager
retentionScheduler scheduler.Scheduler
retentionMgr retention.Manager
retentionLauncher retention.Launcher
retentionController retention.APIController
)
@ -187,11 +185,11 @@ func Init() error {
// init project manager
initProjectManager()
retentionMgr = retention.NewManager()
retentionMgr := retention.NewManager()
retentionLauncher = retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr)
retentionLauncher := retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr, task.ExecMgr, task.Mgr)
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher, task.ExecMgr, task.Mgr)
retentionCallbackFun := func(ctx context.Context, p string) error {
param := &retention.TriggerParam{}

View File

@ -3,15 +3,16 @@ package api
import (
"encoding/json"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/task"
"net/http"
"strconv"
"github.com/goharbor/harbor/src/common/rbac"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/project/metadata"
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/q"
)
// RetentionAPI ...
@ -266,7 +267,7 @@ func (r *RetentionAPI) TriggerRetentionExec() {
if !r.requireAccess(p, rbac.ActionUpdate) {
return
}
eid, err := retentionController.TriggerRetentionExec(id, retention.ExecutionTriggerManual, d.DryRun)
eid, err := retentionController.TriggerRetentionExec(id, task.ExecutionTriggerManual, d.DryRun)
if err != nil {
r.SendInternalServerError(err)
return
@ -366,6 +367,10 @@ func (r *RetentionAPI) ListRetentionExecTasks() {
query := &q.Query{
PageNumber: page,
PageSize: size,
Keywords: map[string]interface{}{
"VendorID": id,
"VendorType": job.Retention,
},
}
p, err := retentionController.GetRetention(id)
if err != nil {

View File

@ -20,15 +20,10 @@ import (
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/event/metadata"
"github.com/goharbor/harbor/src/core/service/notifications"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/notifier/event"
"github.com/goharbor/harbor/src/pkg/retention"
)
var statusMap = map[string]string{
@ -89,61 +84,6 @@ func (h *Handler) Prepare() {
}
}
// HandleRetentionTask handles the webhook of retention task
func (h *Handler) HandleRetentionTask() {
taskID := h.id
status := h.rawStatus
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
mgr := &retention.DefaultManager{}
// handle checkin
if h.checkIn != "" {
var retainObj struct {
Total int `json:"total"`
Retained int `json:"retained"`
Deleted []*selector.Result `json:"deleted"`
}
if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
return
}
task := &retention.Task{
ID: taskID,
Total: retainObj.Total,
Retained: retainObj.Retained,
}
if err := mgr.UpdateTask(task, "Total", "Retained"); err != nil {
log.Errorf("failed to update of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
e := &event.Event{}
metaData := &metadata.RetentionMetaData{
Total: retainObj.Total,
Retained: retainObj.Retained,
Deleted: retainObj.Deleted,
Status: "SUCCESS",
TaskID: taskID,
}
if err := e.Build(metaData); err == nil {
if err := e.Publish(); err != nil {
log.Error(errors.Wrap(err, "tag retention job hook handler: event publish"))
}
} else {
log.Error(errors.Wrap(err, "tag retention job hook handler: event publish"))
}
return
}
// handle status updating
if err := mgr.UpdateTaskStatus(taskID, status, h.revision); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
}
// HandleNotificationJob handles the hook of notification job
func (h *Handler) HandleNotificationJob() {
log.Debugf("received notification job status update event: job-%d, status-%s", h.id, h.status)

View File

@ -0,0 +1,71 @@
package retention
import (
"context"
"encoding/json"
"github.com/goharbor/harbor/src/controller/event/metadata"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/notifier/event"
"github.com/goharbor/harbor/src/pkg/task"
)
func init() {
if err := task.RegisterCheckInProcessor(job.Retention, retentionTaskCheckInProcessor); err != nil {
log.Fatalf("failed to register the checkin processor for the retention job, error %v", err)
}
}
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
taskID := t.ID
status := t.Status
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
// handle checkin
if data != "" {
var retainObj struct {
Total int `json:"total"`
Retained int `json:"retained"`
Deleted []*selector.Result `json:"deleted"`
}
if err := json.Unmarshal([]byte(data), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
return err
}
t, err := task.Mgr.Get(ctx, taskID)
if err != nil {
return err
}
t.ExtraAttrs["total"] = retainObj.Total
t.ExtraAttrs["retained"] = retainObj.Retained
err = task.Mgr.UpdateExtraAttrs(ctx, taskID, t.ExtraAttrs)
if err != nil {
log.G(ctx).WithField("error", err).Errorf("failed to update of retention task %d", taskID)
return err
}
e := &event.Event{}
metaData := &metadata.RetentionMetaData{
Total: retainObj.Total,
Retained: retainObj.Retained,
Deleted: retainObj.Deleted,
Status: "SUCCESS",
TaskID: taskID,
}
if err := e.Build(metaData); err == nil {
if err := e.Publish(); err != nil {
log.G(ctx).WithField("error", err).Errorf("tag retention job hook handler: event publish")
}
} else {
log.G(ctx).WithField("error", err).Errorf("tag retention job hook handler: event publish")
}
}
return nil
}

View File

@ -15,15 +15,18 @@
package retention
import (
"context"
"fmt"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/q"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
"time"
)
// go:generate mockery -name APIController -case snake
@ -60,6 +63,8 @@ type APIController interface {
// DefaultAPIController ...
type DefaultAPIController struct {
manager Manager
execMgr task.ExecutionManager
taskMgr task.Manager
launcher Launcher
projectManager project.Manager
repositoryMgr repository.Manager
@ -179,9 +184,36 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error {
}
}
ctx := orm.Context()
err = r.deleteExecs(ctx, id)
if err != nil {
return err
}
return r.manager.DeletePolicyAndExec(id)
}
// deleteExecs delete executions
func (r *DefaultAPIController) deleteExecs(ctx context.Context, vendorID int64) error {
executions, err := r.execMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.Retention,
"VendorID": vendorID,
},
})
if err != nil {
return err
}
for _, execution := range executions {
if err = r.execMgr.Delete(ctx, execution.ID); err != nil {
return err
}
}
return nil
}
// TriggerRetentionExec Trigger Retention Execution
func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger string, dryRun bool) (int64, error) {
p, err := r.manager.GetPolicy(policyID)
@ -189,16 +221,19 @@ func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger stri
return 0, err
}
exec := &Execution{
PolicyID: policyID,
StartTime: time.Now().Truncate(time.Second),
Trigger: trigger,
DryRun: dryRun,
}
id, err := r.manager.CreateExecution(exec)
ctx := orm.Context()
id, err := r.execMgr.Create(ctx, job.Retention, policyID, trigger,
map[string]interface{}{
"dry_run": dryRun,
},
)
if _, err = r.launcher.Launch(p, id, dryRun); err != nil {
// clean execution if launch failed
_ = r.manager.DeleteExecution(id)
if err1 := r.execMgr.StopAndWait(ctx, id, 10*time.Second); err1 != nil {
logger.Errorf("failed to stop the retention execution %d: %v", id, err1)
}
if err1 := r.execMgr.MarkError(ctx, id, err.Error()); err1 != nil {
logger.Errorf("failed to mark error for the retention execution %d: %v", id, err1)
}
return 0, err
}
return id, err
@ -207,7 +242,8 @@ func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger stri
// OperateRetentionExec Operate Retention Execution
func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) error {
e, err := r.manager.GetExecution(eid)
ctx := orm.Context()
e, err := r.execMgr.Get(ctx, eid)
if err != nil {
return err
}
@ -224,50 +260,135 @@ func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) er
// GetRetentionExec Get Retention Execution
func (r *DefaultAPIController) GetRetentionExec(executionID int64) (*Execution, error) {
return r.manager.GetExecution(executionID)
ctx := orm.Context()
e, err := r.execMgr.Get(ctx, executionID)
if err != nil {
return nil, err
}
return convertExecution(e), nil
}
// ListRetentionExecs List Retention Executions
func (r *DefaultAPIController) ListRetentionExecs(policyID int64, query *q.Query) ([]*Execution, error) {
return r.manager.ListExecutions(policyID, query)
ctx := orm.Context()
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Retention
query.Keywords["VendorID"] = policyID
execs, err := r.execMgr.List(ctx, query)
if err != nil {
return nil, err
}
var executions []*Execution
for _, exec := range execs {
executions = append(executions, convertExecution(exec))
}
return executions, nil
}
func convertExecution(exec *task.Execution) *Execution {
return &Execution{
ID: exec.ID,
PolicyID: exec.VendorID,
StartTime: exec.StartTime,
EndTime: exec.EndTime,
Status: exec.Status,
Trigger: exec.Trigger,
DryRun: exec.ExtraAttrs["dry_run"].(bool),
}
}
// GetTotalOfRetentionExecs Count Retention Executions
func (r *DefaultAPIController) GetTotalOfRetentionExecs(policyID int64) (int64, error) {
return r.manager.GetTotalOfRetentionExecs(policyID)
ctx := orm.Context()
return r.execMgr.Count(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.Retention,
"VendorID": policyID,
},
})
}
// ListRetentionExecTasks List Retention Execution Histories
func (r *DefaultAPIController) ListRetentionExecTasks(executionID int64, query *q.Query) ([]*Task, error) {
q1 := &q.TaskQuery{
ExecutionID: executionID,
ctx := orm.Context()
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Retention
query.Keywords["ExecutionID"] = executionID
tks, err := r.taskMgr.List(ctx, query)
if err != nil {
return nil, err
}
if query != nil {
q1.PageSize = query.PageSize
q1.PageNumber = query.PageNumber
var tasks []*Task
for _, tk := range tks {
tasks = append(tasks, convertTask(tk))
}
return tasks, nil
}
func convertTask(t *task.Task) *Task {
return &Task{
ID: t.ID,
ExecutionID: t.ExecutionID,
Repository: t.GetStringFromExtraAttrs("repository"),
JobID: t.JobID,
Status: t.Status,
StatusCode: job.Status(t.Status).Code(),
StartTime: t.StartTime,
EndTime: t.EndTime,
Total: int(t.GetNumFromExtraAttrs("total")),
Retained: int(t.GetNumFromExtraAttrs("retained")),
}
return r.manager.ListTasks(q1)
}
// GetTotalOfRetentionExecTasks Count Retention Execution Histories
func (r *DefaultAPIController) GetTotalOfRetentionExecTasks(executionID int64) (int64, error) {
return r.manager.GetTotalOfTasks(executionID)
ctx := orm.Context()
return r.taskMgr.Count(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.Retention,
"ExecutionID": executionID,
},
})
}
// GetRetentionExecTaskLog Get Retention Execution Task Log
func (r *DefaultAPIController) GetRetentionExecTaskLog(taskID int64) ([]byte, error) {
return r.manager.GetTaskLog(taskID)
ctx := orm.Context()
return r.taskMgr.GetLog(ctx, taskID)
}
// GetRetentionExecTask Get Retention Execution Task
func (r *DefaultAPIController) GetRetentionExecTask(taskID int64) (*Task, error) {
return r.manager.GetTask(taskID)
ctx := orm.Context()
t, err := r.taskMgr.Get(ctx, taskID)
if err != nil {
return nil, err
}
return convertTask(t), nil
}
// UpdateTaskInfo Update task info
func (r *DefaultAPIController) UpdateTaskInfo(taskID int64, total int, retained int) error {
ctx := orm.Context()
t, err := r.taskMgr.Get(ctx, taskID)
if err != nil {
return err
}
t.ExtraAttrs["total"] = total
t.ExtraAttrs["retained"] = retained
return r.taskMgr.UpdateExtraAttrs(ctx, taskID, t.ExtraAttrs)
}
// NewAPIController ...
func NewAPIController(retentionMgr Manager, projectManager project.Manager, repositoryMgr repository.Manager, scheduler scheduler.Scheduler, retentionLauncher Launcher) APIController {
func NewAPIController(retentionMgr Manager, projectManager project.Manager, repositoryMgr repository.Manager, scheduler scheduler.Scheduler, retentionLauncher Launcher, execMgr task.ExecutionManager, taskMgr task.Manager) APIController {
return &DefaultAPIController{
manager: retentionMgr,
execMgr: execMgr,
taskMgr: taskMgr,
launcher: retentionLauncher,
projectManager: projectManager,
repositoryMgr: repositoryMgr,

View File

@ -16,17 +16,20 @@ package retention
import (
"context"
"strings"
"testing"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/goharbor/harbor/src/testing/pkg/project"
"github.com/goharbor/harbor/src/testing/pkg/repository"
testingTask "github.com/goharbor/harbor/src/testing/pkg/task"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"strings"
"testing"
)
type ControllerTestSuite struct {
@ -50,8 +53,37 @@ func (s *ControllerTestSuite) TestPolicy() {
repositoryMgr := &repository.FakeManager{}
retentionScheduler := &fakeRetentionScheduler{}
retentionLauncher := &fakeLauncher{}
execMgr := &testingTask.ExecutionManager{}
taskMgr := &testingTask.Manager{}
retentionMgr := NewManager()
c := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"dry_run": true,
},
}, nil)
execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"dry_run": true,
},
}}, nil)
taskMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Task{{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"total": 1,
"retained": 1,
},
}}, nil)
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
c := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher, execMgr, taskMgr)
p1 := &policy.Metadata{
Algorithm: "or",
@ -148,8 +180,36 @@ func (s *ControllerTestSuite) TestExecution() {
repositoryMgr := &repository.FakeManager{}
retentionScheduler := &fakeRetentionScheduler{}
retentionLauncher := &fakeLauncher{}
execMgr := &testingTask.ExecutionManager{}
taskMgr := &testingTask.Manager{}
retentionMgr := NewManager()
m := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"dry_run": true,
},
}, nil)
execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"dry_run": true,
},
}}, nil)
taskMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Task{{
ID: 1,
Status: job.RunningStatus.String(),
ExtraAttrs: map[string]interface{}{
"total": 1,
"retained": 1,
},
}}, nil)
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
m := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher, execMgr, taskMgr)
p1 := &policy.Metadata{
Algorithm: "or",
@ -213,7 +273,7 @@ func (s *ControllerTestSuite) TestExecution() {
ts, err := m.ListRetentionExecTasks(id, nil)
s.Require().Nil(err)
s.Require().EqualValues(0, len(ts))
s.Require().EqualValues(1, len(ts))
}

View File

@ -1,8 +1,8 @@
package retention
import (
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/q"
)
// FakedRetentionController ...

View File

@ -16,18 +16,15 @@ package retention
import (
"fmt"
"time"
beegoorm "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/selector/selectors/index"
cjob "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
cmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib/errors"
@ -37,7 +34,6 @@ import (
"github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
"github.com/goharbor/harbor/src/pkg/retention/q"
)
const (
@ -75,11 +71,13 @@ type Launcher interface {
// NewLauncher returns an instance of Launcher
func NewLauncher(projectMgr project.Manager, repositoryMgr repository.Manager,
retentionMgr Manager) Launcher {
retentionMgr Manager, execMgr task.ExecutionManager, taskMgr task.Manager) Launcher {
return &launcher{
projectMgr: projectMgr,
repositoryMgr: repositoryMgr,
retentionMgr: retentionMgr,
execMgr: execMgr,
taskMgr: taskMgr,
jobserviceClient: cjob.GlobalClient,
internalCoreURL: config.InternalCoreURL(),
chartServerEnabled: config.WithChartMuseum(),
@ -95,6 +93,8 @@ type jobData struct {
type launcher struct {
retentionMgr Manager
taskMgr task.Manager
execMgr task.ExecutionManager
projectMgr project.Manager
repositoryMgr repository.Manager
jobserviceClient cjob.Client
@ -206,13 +206,8 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
return 0, nil
}
// create task records in database
if err = l.createTasks(executionID, jobDatas); err != nil {
return 0, launcherError(err)
}
// submit jobs to jobservice
if err = l.submitJobs(jobDatas); err != nil {
// submit tasks to jobservice
if err = l.submitTasks(executionID, jobDatas); err != nil {
return 0, launcherError(err)
}
@ -246,58 +241,23 @@ func createJobs(repositoryRules map[selector.Repository]*lwp.Metadata, isDryRun
return jobDatas, nil
}
// create task records in database
func (l *launcher) createTasks(executionID int64, jobDatas []*jobData) error {
now := time.Now()
func (l *launcher) submitTasks(executionID int64, jobDatas []*jobData) error {
ctx := orm.Context()
for _, jobData := range jobDatas {
taskID, err := l.retentionMgr.CreateTask(&Task{
ExecutionID: executionID,
Repository: jobData.Repository.Name,
StartTime: now.Truncate(time.Second),
})
_, err := l.taskMgr.Create(ctx, executionID, &task.Job{
Name: jobData.JobName,
Parameters: jobData.JobParams,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},
},
map[string]interface{}{
"repository": jobData.Repository.Name,
"dry_run": jobData.JobParams[ParamDryRun],
})
if err != nil {
return err
}
jobData.TaskID = taskID
}
return nil
}
// create task records in database
func (l *launcher) submitJobs(jobDatas []*jobData) error {
allFailed := true
for _, jobData := range jobDatas {
task := &Task{
ID: jobData.TaskID,
}
props := []string{"Status"}
j := &models.JobData{
Name: jobData.JobName,
Metadata: &models.JobMetadata{
JobKind: job.KindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.TaskID),
Parameters: jobData.JobParams,
}
// Submit job
jobID, err := l.jobserviceClient.SubmitJob(j)
if err != nil {
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.TaskID, err)))
task.Status = cmodels.JobError
task.EndTime = time.Now()
props = append(props, "EndTime")
} else {
allFailed = false
task.JobID = jobID
task.Status = cmodels.JobPending
props = append(props, "JobID")
}
if err = l.retentionMgr.UpdateTask(task, props...); err != nil {
log.Errorf("failed to update the status of task %d: %v", task.ID, err)
}
}
if allFailed {
return launcherError(fmt.Errorf("all tasks failed"))
}
return nil
}
@ -306,19 +266,8 @@ func (l *launcher) Stop(executionID int64) error {
if executionID <= 0 {
return launcherError(fmt.Errorf("invalid execution ID: %d", executionID))
}
tasks, err := l.retentionMgr.ListTasks(&q.TaskQuery{
ExecutionID: executionID,
})
if err != nil {
return err
}
for _, task := range tasks {
if err = l.jobserviceClient.PostAction(task.JobID, cjob.JobActionStop); err != nil {
log.Errorf("failed to stop task %d, job ID: %s : %v", task.ID, task.JobID, err)
continue
}
}
return nil
ctx := orm.Context()
return l.execMgr.Stop(ctx, executionID)
}
func launcherError(err error) error {

View File

@ -28,6 +28,7 @@ import (
"github.com/goharbor/harbor/src/testing/mock"
projecttesting "github.com/goharbor/harbor/src/testing/pkg/project"
"github.com/goharbor/harbor/src/testing/pkg/repository"
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@ -104,6 +105,8 @@ func (f *fakeRetentionManager) ListHistories(executionID int64, query *q.Query)
type launchTestSuite struct {
suite.Suite
projectMgr project.Manager
execMgr *tasktesting.ExecutionManager
taskMgr *tasktesting.Manager
repositoryMgr *repository.FakeManager
retentionMgr Manager
jobserviceClient job.Client
@ -125,6 +128,8 @@ func (l *launchTestSuite) SetupTest() {
l.projectMgr = projectMgr
l.repositoryMgr = &repository.FakeManager{}
l.retentionMgr = &fakeRetentionManager{}
l.execMgr = &tasktesting.ExecutionManager{}
l.taskMgr = &tasktesting.Manager{}
l.jobserviceClient = &hjob.MockJobClient{
JobUUID: []string{"1"},
}
@ -156,10 +161,16 @@ func (l *launchTestSuite) TestGetRepositories() {
}
func (l *launchTestSuite) TestLaunch() {
l.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
l.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
l.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
launcher := &launcher{
projectMgr: l.projectMgr,
repositoryMgr: l.repositoryMgr,
retentionMgr: l.retentionMgr,
execMgr: l.execMgr,
taskMgr: l.taskMgr,
jobserviceClient: l.jobserviceClient,
chartServerEnabled: true,
}
@ -244,10 +255,13 @@ func (l *launchTestSuite) TestLaunch() {
func (l *launchTestSuite) TestStop() {
t := l.T()
l.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
launcher := &launcher{
projectMgr: l.projectMgr,
repositoryMgr: l.repositoryMgr,
retentionMgr: l.retentionMgr,
execMgr: l.execMgr,
taskMgr: l.taskMgr,
jobserviceClient: l.jobserviceClient,
}
// invalid execution ID

View File

@ -16,17 +16,13 @@ package retention
import (
"encoding/json"
"errors"
"fmt"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"time"
"github.com/astaxie/beego/orm"
cjob "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/q"
)
// Manager defines operations of managing policy
@ -41,32 +37,6 @@ type Manager interface {
DeletePolicyAndExec(ID int64) error
// Get the specified policy
GetPolicy(ID int64) (*policy.Metadata, error)
// Create a new retention execution
CreateExecution(execution *Execution) (int64, error)
// Delete a new retention execution
DeleteExecution(int64) error
// Get the specified execution
GetExecution(eid int64) (*Execution, error)
// List executions
ListExecutions(policyID int64, query *q.Query) ([]*Execution, error)
// GetTotalOfRetentionExecs Count Retention Executions
GetTotalOfRetentionExecs(policyID int64) (int64, error)
// List tasks histories
ListTasks(query ...*q.TaskQuery) ([]*Task, error)
// GetTotalOfTasks Count Tasks
GetTotalOfTasks(executionID int64) (int64, error)
// Create a new retention task
CreateTask(task *Task) (int64, error)
// Update the specified task
UpdateTask(task *Task, cols ...string) error
// Update the status of the specified task
// The status is updated only when (the statusRevision > the current revision)
// or (the the statusRevision = the current revision and status > the current status)
UpdateTaskStatus(taskID int64, status string, statusRevision int64) error
// Get the task specified by the task ID
GetTask(taskID int64) (*Task, error)
// Get the log of the specified task
GetTaskLog(taskID int64) ([]byte, error)
}
// DefaultManager ...
@ -123,188 +93,6 @@ func (d *DefaultManager) GetPolicy(id int64) (*policy.Metadata, error) {
return p, nil
}
// CreateExecution Create Execution
func (d *DefaultManager) CreateExecution(execution *Execution) (int64, error) {
exec := &models.RetentionExecution{}
exec.PolicyID = execution.PolicyID
exec.StartTime = execution.StartTime
exec.DryRun = execution.DryRun
exec.Trigger = execution.Trigger
return dao.CreateExecution(exec)
}
// DeleteExecution Delete Execution
func (d *DefaultManager) DeleteExecution(eid int64) error {
return dao.DeleteExecution(eid)
}
// ListExecutions List Executions
func (d *DefaultManager) ListExecutions(policyID int64, query *q.Query) ([]*Execution, error) {
execs, err := dao.ListExecutions(policyID, query)
if err != nil {
if err == orm.ErrNoRows {
return nil, nil
}
return nil, err
}
var execs1 []*Execution
for _, e := range execs {
e1 := &Execution{}
e1.ID = e.ID
e1.PolicyID = e.PolicyID
e1.Status = e.Status
e1.StartTime = e.StartTime
e1.EndTime = e.EndTime
e1.Trigger = e.Trigger
e1.DryRun = e.DryRun
execs1 = append(execs1, e1)
}
return execs1, nil
}
// GetTotalOfRetentionExecs Count Executions
func (d *DefaultManager) GetTotalOfRetentionExecs(policyID int64) (int64, error) {
return dao.GetTotalOfRetentionExecs(policyID)
}
// GetExecution Get Execution
func (d *DefaultManager) GetExecution(eid int64) (*Execution, error) {
e, err := dao.GetExecution(eid)
if err != nil {
return nil, err
}
e1 := &Execution{}
e1.ID = e.ID
e1.PolicyID = e.PolicyID
e1.Status = e.Status
e1.StartTime = e.StartTime
e1.EndTime = e.EndTime
e1.Trigger = e.Trigger
e1.DryRun = e.DryRun
return e1, nil
}
// CreateTask creates task record
func (d *DefaultManager) CreateTask(task *Task) (int64, error) {
if task == nil {
return 0, errors.New("nil task")
}
t := &models.RetentionTask{
ExecutionID: task.ExecutionID,
Repository: task.Repository,
JobID: task.JobID,
Status: task.Status,
StatusCode: task.StatusCode,
StatusRevision: task.StatusRevision,
StartTime: task.StartTime,
EndTime: task.EndTime,
Total: task.Total,
Retained: task.Retained,
}
return dao.CreateTask(t)
}
// ListTasks lists tasks according to the query
func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
ts, err := dao.ListTask(query...)
if err != nil {
if err == orm.ErrNoRows {
return nil, nil
}
return nil, err
}
tasks := make([]*Task, 0)
for _, t := range ts {
tasks = append(tasks, &Task{
ID: t.ID,
ExecutionID: t.ExecutionID,
Repository: t.Repository,
JobID: t.JobID,
Status: t.Status,
StatusCode: t.StatusCode,
StatusRevision: t.StatusRevision,
StartTime: t.StartTime,
EndTime: t.EndTime,
Total: t.Total,
Retained: t.Retained,
})
}
return tasks, nil
}
// GetTotalOfTasks Count tasks
func (d *DefaultManager) GetTotalOfTasks(executionID int64) (int64, error) {
return dao.GetTotalOfTasks(executionID)
}
// UpdateTask updates the task
func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
if task == nil {
return errors.New("nil task")
}
if task.ID <= 0 {
return fmt.Errorf("invalid task ID: %d", task.ID)
}
return dao.UpdateTask(&models.RetentionTask{
ID: task.ID,
ExecutionID: task.ExecutionID,
Repository: task.Repository,
JobID: task.JobID,
Status: task.Status,
StatusCode: task.StatusCode,
StatusRevision: task.StatusRevision,
StartTime: task.StartTime,
EndTime: task.EndTime,
Total: task.Total,
Retained: task.Retained,
}, cols...)
}
// UpdateTaskStatus updates the status of the specified task
func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64) error {
if taskID <= 0 {
return fmt.Errorf("invalid task ID: %d", taskID)
}
st := job.Status(status)
return dao.UpdateTaskStatus(taskID, status, st.Code(), statusRevision)
}
// GetTask returns the task specified by task ID
func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
if taskID <= 0 {
return nil, fmt.Errorf("invalid task ID: %d", taskID)
}
task, err := dao.GetTask(taskID)
if err != nil {
return nil, err
}
return &Task{
ID: task.ID,
ExecutionID: task.ExecutionID,
Repository: task.Repository,
JobID: task.JobID,
Status: task.Status,
StatusCode: task.StatusCode,
StatusRevision: task.StatusRevision,
StartTime: task.StartTime,
EndTime: task.EndTime,
Total: task.Total,
Retained: task.Retained,
}, nil
}
// GetTaskLog gets the logs of task
func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
task, err := d.GetTask(taskID)
if err != nil {
return nil, err
}
if task == nil {
return nil, fmt.Errorf("task %d not found", taskID)
}
return cjob.GlobalClient.GetJobLog(task.JobID)
}
// NewManager ...
func NewManager() Manager {
return &DefaultManager{}

View File

@ -1,20 +1,13 @@
package retention
import (
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/stretchr/testify/assert"
"os"
"strings"
"testing"
"time"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q"
tjob "github.com/goharbor/harbor/src/testing/job"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
@ -134,89 +127,4 @@ func TestExecution(t *testing.T) {
policyID, err := m.CreatePolicy(p1)
assert.Nil(t, err)
assert.True(t, policyID > 0)
e1 := &Execution{
PolicyID: policyID,
StartTime: time.Now(),
Trigger: ExecutionTriggerManual,
DryRun: false,
}
id, err := m.CreateExecution(e1)
assert.Nil(t, err)
assert.True(t, id > 0)
e1, err = m.GetExecution(id)
assert.Nil(t, err)
assert.NotNil(t, e1)
assert.EqualValues(t, id, e1.ID)
es, err := m.ListExecutions(policyID, nil)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(es))
err = m.DeleteExecution(id)
assert.Nil(t, err)
}
func TestTask(t *testing.T) {
m := NewManager()
err := m.DeleteExecution(1000)
require.Nil(t, err)
task := &Task{
ExecutionID: 1000,
JobID: "1",
Status: jjob.PendingStatus.String(),
StatusCode: jjob.PendingStatus.Code(),
StatusRevision: 1,
Total: 0,
StartTime: time.Now(),
}
// create
id, err := m.CreateTask(task)
require.Nil(t, err)
// get
tk, err := m.GetTask(id)
require.Nil(t, err)
assert.EqualValues(t, 1000, tk.ExecutionID)
// update
task.ID = id
task.Total = 1
err = m.UpdateTask(task, "Total")
require.Nil(t, err)
// update status to success which is a final status
err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String(), 1)
require.Nil(t, err)
// try to update status to running, as the status has already
// been updated to a final status and the stautus revision doesn't change,
// this updating shouldn't take effect
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 1)
require.Nil(t, err)
// update the revision and try to update status to running again
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 2)
require.Nil(t, err)
// list
tasks, err := m.ListTasks(&q.TaskQuery{
ExecutionID: 1000,
})
require.Nil(t, err)
require.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1000), tasks[0].ExecutionID)
assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, jjob.RunningStatus.String(), tasks[0].Status)
assert.Equal(t, jjob.RunningStatus.Code(), tasks[0].StatusCode)
assert.Equal(t, int64(2), tasks[0].StatusRevision)
// get task log
job.GlobalClient = &tjob.MockJobClient{
JobUUID: []string{"1"},
}
data, err := m.GetTaskLog(task.ID)
require.Nil(t, err)
assert.Equal(t, "some log", string(data))
}

View File

@ -3,8 +3,8 @@
package mocks
import (
"github.com/goharbor/harbor/src/lib/q"
policy "github.com/goharbor/harbor/src/pkg/retention/policy"
q "github.com/goharbor/harbor/src/pkg/retention/q"
mock "github.com/stretchr/testify/mock"
retention "github.com/goharbor/harbor/src/pkg/retention"

View File

@ -142,6 +142,22 @@ func (t *Task) GetBoolFromExtraAttrs(key string) bool {
return b
}
// GetNumFromExtraAttrs returns the num value specified by key
func (t *Task) GetNumFromExtraAttrs(key string) float64 {
if len(t.ExtraAttrs) == 0 {
return 0
}
rt, exist := t.ExtraAttrs[key]
if !exist {
return 0
}
v, ok := rt.(float64)
if !ok {
return 0
}
return v
}
// Job is the model represents the requested jobservice job
type Job struct {
Name string

View File

@ -51,12 +51,12 @@ func registerRoutes() {
beego.Router("/api/internal/syncquota", &api.InternalAPI{}, "post:SyncQuota")
beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob")
beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask")
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/adminjob/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for adminjob
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/scan/:uuid").HandlerFunc(ignoreNotification) // ignore legacy scan job notifaction
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/retention/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler())
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler())
beego.Router("/service/token", &token.Handler{})