mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-25 01:58:35 +01:00
feat(retention) refactor task manager
Signed-off-by: Ziming Zhang <zziming@vmware.com>
This commit is contained in:
parent
d0152cb446
commit
8faa76a1b6
@ -513,3 +513,85 @@ CREATE TABLE IF NOT EXISTS "report_vulnerability_record" (
|
||||
CONSTRAINT fk_vuln_record_id FOREIGN KEY(vuln_record_id) REFERENCES vulnerability_record(id) ON DELETE CASCADE,
|
||||
CONSTRAINT fk_report_uuid FOREIGN KEY(report_uuid) REFERENCES scan_report(uuid) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
/*delete the retention execution records whose policy doesn't exist*/
|
||||
DELETE FROM retention_execution
|
||||
WHERE id IN (SELECT re.id FROM retention_execution re
|
||||
LEFT JOIN retention_policy rp ON re.policy_id=rp.id
|
||||
WHERE rp.id IS NULL);
|
||||
|
||||
/*delete the replication task records whose execution doesn't exist*/
|
||||
DELETE FROM retention_task
|
||||
WHERE id IN (SELECT rt.id FROM retention_task rt
|
||||
LEFT JOIN retention_execution re ON rt.execution_id=re.id
|
||||
WHERE re.id IS NULL);
|
||||
|
||||
/*move the replication execution records into the new execution table*/
|
||||
ALTER TABLE retention_execution ADD COLUMN IF NOT EXISTS new_execution_id int;
|
||||
|
||||
DO $$
|
||||
DECLARE
|
||||
rep_exec RECORD;
|
||||
status_count RECORD;
|
||||
trigger varchar(64);
|
||||
rep_status varchar(32);
|
||||
rep_end_time timestamp;
|
||||
new_exec_id integer;
|
||||
in_progress integer;
|
||||
failed integer;
|
||||
success integer;
|
||||
stopped integer;
|
||||
BEGIN
|
||||
FOR rep_exec IN SELECT * FROM retention_execution where new_execution_id is null
|
||||
LOOP
|
||||
|
||||
FOR status_count IN SELECT status, COUNT(*) as c FROM retention_task WHERE execution_id=rep_exec.id GROUP BY status
|
||||
LOOP
|
||||
IF status_count.status = 'Scheduled' or status_count.status = 'Pending' or status_count.status = 'Running' THEN
|
||||
in_progress = in_progress + status_count.c;
|
||||
ELSIF status_count.status = 'Stopped' THEN
|
||||
stopped = stopped + status_count.c;
|
||||
ELSIF status_count.status = 'Error' THEN
|
||||
failed = failed + status_count.c;
|
||||
ELSE
|
||||
success = success + status_count.c;
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
IF in_progress>0 THEN
|
||||
rep_status = 'InProgress';
|
||||
ELSIF failed>0 THEN
|
||||
rep_status = 'Failed';
|
||||
ELSIF stopped>0 THEN
|
||||
rep_status = 'Stopped';
|
||||
ELSE
|
||||
rep_status = 'Succeed';
|
||||
END IF;
|
||||
select max(end_time) into rep_end_time from retention_task where execution_id = rep_exec.id;
|
||||
|
||||
INSERT INTO execution (vendor_type, vendor_id, status, revision, trigger, start_time, end_time, extra_attrs)
|
||||
VALUES ('RETENTION', rep_exec.policy_id, rep_status, 0, rep_exec.trigger, rep_exec.start_time, rep_end_time,
|
||||
CONCAT('{"dry_run": ', case rep_exec.dry_run when 't' then 'true' else 'false' end, '}')::json) RETURNING id INTO new_exec_id;
|
||||
UPDATE retention_execution SET new_execution_id=new_exec_id WHERE id=rep_exec.id;
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
/*move the replication task records into the new task table*/
|
||||
DO $$
|
||||
DECLARE
|
||||
rep_task RECORD;
|
||||
status_code integer;
|
||||
BEGIN
|
||||
FOR rep_task IN SELECT * FROM retention_task
|
||||
LOOP
|
||||
INSERT INTO task (vendor_type, execution_id, job_id, status, status_code, status_revision,
|
||||
run_count, extra_attrs, creation_time, start_time, update_time, end_time)
|
||||
VALUES ('RETENTION', (SELECT new_execution_id FROM retention_execution WHERE id=rep_task.execution_id),
|
||||
rep_task.job_id, rep_task.status, rep_task.status_code, rep_task.status_revision,
|
||||
1, CONCAT('{"total":"', rep_task.total,'","retained":"', rep_task.retained,'"}')::json,
|
||||
rep_task.start_time, rep_task.start_time, rep_task.end_time, rep_task.end_time);
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
DROP TABLE IF EXISTS replication_task;
|
||||
DROP TABLE IF EXISTS replication_execution;
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"net/http"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
@ -45,9 +46,6 @@ const (
|
||||
// the managers/controllers used globally
|
||||
var (
|
||||
projectMgr project.Manager
|
||||
retentionScheduler scheduler.Scheduler
|
||||
retentionMgr retention.Manager
|
||||
retentionLauncher retention.Launcher
|
||||
retentionController retention.APIController
|
||||
)
|
||||
|
||||
@ -187,11 +185,11 @@ func Init() error {
|
||||
// init project manager
|
||||
initProjectManager()
|
||||
|
||||
retentionMgr = retention.NewManager()
|
||||
retentionMgr := retention.NewManager()
|
||||
|
||||
retentionLauncher = retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr)
|
||||
retentionLauncher := retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr, task.ExecMgr, task.Mgr)
|
||||
|
||||
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
|
||||
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher, task.ExecMgr, task.Mgr)
|
||||
|
||||
retentionCallbackFun := func(ctx context.Context, p string) error {
|
||||
param := &retention.TriggerParam{}
|
||||
|
@ -3,15 +3,16 @@ package api
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/rbac"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/project/metadata"
|
||||
"github.com/goharbor/harbor/src/pkg/retention"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
)
|
||||
|
||||
// RetentionAPI ...
|
||||
@ -266,7 +267,7 @@ func (r *RetentionAPI) TriggerRetentionExec() {
|
||||
if !r.requireAccess(p, rbac.ActionUpdate) {
|
||||
return
|
||||
}
|
||||
eid, err := retentionController.TriggerRetentionExec(id, retention.ExecutionTriggerManual, d.DryRun)
|
||||
eid, err := retentionController.TriggerRetentionExec(id, task.ExecutionTriggerManual, d.DryRun)
|
||||
if err != nil {
|
||||
r.SendInternalServerError(err)
|
||||
return
|
||||
@ -366,6 +367,10 @@ func (r *RetentionAPI) ListRetentionExecTasks() {
|
||||
query := &q.Query{
|
||||
PageNumber: page,
|
||||
PageSize: size,
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorID": id,
|
||||
"VendorType": job.Retention,
|
||||
},
|
||||
}
|
||||
p, err := retentionController.GetRetention(id)
|
||||
if err != nil {
|
||||
|
@ -20,15 +20,10 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/controller/event/metadata"
|
||||
"github.com/goharbor/harbor/src/core/service/notifications"
|
||||
jjob "github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/selector"
|
||||
"github.com/goharbor/harbor/src/pkg/notification"
|
||||
"github.com/goharbor/harbor/src/pkg/notifier/event"
|
||||
"github.com/goharbor/harbor/src/pkg/retention"
|
||||
)
|
||||
|
||||
var statusMap = map[string]string{
|
||||
@ -89,61 +84,6 @@ func (h *Handler) Prepare() {
|
||||
}
|
||||
}
|
||||
|
||||
// HandleRetentionTask handles the webhook of retention task
|
||||
func (h *Handler) HandleRetentionTask() {
|
||||
taskID := h.id
|
||||
status := h.rawStatus
|
||||
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
|
||||
mgr := &retention.DefaultManager{}
|
||||
// handle checkin
|
||||
if h.checkIn != "" {
|
||||
var retainObj struct {
|
||||
Total int `json:"total"`
|
||||
Retained int `json:"retained"`
|
||||
Deleted []*selector.Result `json:"deleted"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil {
|
||||
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
|
||||
return
|
||||
}
|
||||
task := &retention.Task{
|
||||
ID: taskID,
|
||||
Total: retainObj.Total,
|
||||
Retained: retainObj.Retained,
|
||||
}
|
||||
if err := mgr.UpdateTask(task, "Total", "Retained"); err != nil {
|
||||
log.Errorf("failed to update of retention task %d: %v", taskID, err)
|
||||
h.SendInternalServerError(err)
|
||||
return
|
||||
}
|
||||
|
||||
e := &event.Event{}
|
||||
metaData := &metadata.RetentionMetaData{
|
||||
Total: retainObj.Total,
|
||||
Retained: retainObj.Retained,
|
||||
Deleted: retainObj.Deleted,
|
||||
Status: "SUCCESS",
|
||||
TaskID: taskID,
|
||||
}
|
||||
|
||||
if err := e.Build(metaData); err == nil {
|
||||
if err := e.Publish(); err != nil {
|
||||
log.Error(errors.Wrap(err, "tag retention job hook handler: event publish"))
|
||||
}
|
||||
} else {
|
||||
log.Error(errors.Wrap(err, "tag retention job hook handler: event publish"))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// handle status updating
|
||||
if err := mgr.UpdateTaskStatus(taskID, status, h.revision); err != nil {
|
||||
log.Errorf("failed to update the status of retention task %d: %v", taskID, err)
|
||||
h.SendInternalServerError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// HandleNotificationJob handles the hook of notification job
|
||||
func (h *Handler) HandleNotificationJob() {
|
||||
log.Debugf("received notification job status update event: job-%d, status-%s", h.id, h.status)
|
||||
|
71
src/pkg/retention/callback.go
Normal file
71
src/pkg/retention/callback.go
Normal file
@ -0,0 +1,71 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/goharbor/harbor/src/controller/event/metadata"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/selector"
|
||||
"github.com/goharbor/harbor/src/pkg/notifier/event"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
if err := task.RegisterCheckInProcessor(job.Retention, retentionTaskCheckInProcessor); err != nil {
|
||||
log.Fatalf("failed to register the checkin processor for the retention job, error %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
|
||||
taskID := t.ID
|
||||
status := t.Status
|
||||
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
|
||||
// handle checkin
|
||||
if data != "" {
|
||||
var retainObj struct {
|
||||
Total int `json:"total"`
|
||||
Retained int `json:"retained"`
|
||||
Deleted []*selector.Result `json:"deleted"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(data), &retainObj); err != nil {
|
||||
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
t, err := task.Mgr.Get(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.ExtraAttrs["total"] = retainObj.Total
|
||||
t.ExtraAttrs["retained"] = retainObj.Retained
|
||||
|
||||
err = task.Mgr.UpdateExtraAttrs(ctx, taskID, t.ExtraAttrs)
|
||||
if err != nil {
|
||||
log.G(ctx).WithField("error", err).Errorf("failed to update of retention task %d", taskID)
|
||||
return err
|
||||
}
|
||||
|
||||
e := &event.Event{}
|
||||
metaData := &metadata.RetentionMetaData{
|
||||
Total: retainObj.Total,
|
||||
Retained: retainObj.Retained,
|
||||
Deleted: retainObj.Deleted,
|
||||
Status: "SUCCESS",
|
||||
TaskID: taskID,
|
||||
}
|
||||
|
||||
if err := e.Build(metaData); err == nil {
|
||||
if err := e.Publish(); err != nil {
|
||||
log.G(ctx).WithField("error", err).Errorf("tag retention job hook handler: event publish")
|
||||
}
|
||||
} else {
|
||||
log.G(ctx).WithField("error", err).Errorf("tag retention job hook handler: event publish")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -15,15 +15,18 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/project"
|
||||
"github.com/goharbor/harbor/src/pkg/repository"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"time"
|
||||
)
|
||||
|
||||
// go:generate mockery -name APIController -case snake
|
||||
@ -60,6 +63,8 @@ type APIController interface {
|
||||
// DefaultAPIController ...
|
||||
type DefaultAPIController struct {
|
||||
manager Manager
|
||||
execMgr task.ExecutionManager
|
||||
taskMgr task.Manager
|
||||
launcher Launcher
|
||||
projectManager project.Manager
|
||||
repositoryMgr repository.Manager
|
||||
@ -179,9 +184,36 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error {
|
||||
}
|
||||
}
|
||||
|
||||
ctx := orm.Context()
|
||||
err = r.deleteExecs(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.manager.DeletePolicyAndExec(id)
|
||||
}
|
||||
|
||||
// deleteExecs delete executions
|
||||
func (r *DefaultAPIController) deleteExecs(ctx context.Context, vendorID int64) error {
|
||||
executions, err := r.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"VendorID": vendorID,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, execution := range executions {
|
||||
if err = r.execMgr.Delete(ctx, execution.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TriggerRetentionExec Trigger Retention Execution
|
||||
func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger string, dryRun bool) (int64, error) {
|
||||
p, err := r.manager.GetPolicy(policyID)
|
||||
@ -189,16 +221,19 @@ func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger stri
|
||||
return 0, err
|
||||
}
|
||||
|
||||
exec := &Execution{
|
||||
PolicyID: policyID,
|
||||
StartTime: time.Now().Truncate(time.Second),
|
||||
Trigger: trigger,
|
||||
DryRun: dryRun,
|
||||
}
|
||||
id, err := r.manager.CreateExecution(exec)
|
||||
ctx := orm.Context()
|
||||
id, err := r.execMgr.Create(ctx, job.Retention, policyID, trigger,
|
||||
map[string]interface{}{
|
||||
"dry_run": dryRun,
|
||||
},
|
||||
)
|
||||
if _, err = r.launcher.Launch(p, id, dryRun); err != nil {
|
||||
// clean execution if launch failed
|
||||
_ = r.manager.DeleteExecution(id)
|
||||
if err1 := r.execMgr.StopAndWait(ctx, id, 10*time.Second); err1 != nil {
|
||||
logger.Errorf("failed to stop the retention execution %d: %v", id, err1)
|
||||
}
|
||||
if err1 := r.execMgr.MarkError(ctx, id, err.Error()); err1 != nil {
|
||||
logger.Errorf("failed to mark error for the retention execution %d: %v", id, err1)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
return id, err
|
||||
@ -207,7 +242,8 @@ func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger stri
|
||||
|
||||
// OperateRetentionExec Operate Retention Execution
|
||||
func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) error {
|
||||
e, err := r.manager.GetExecution(eid)
|
||||
ctx := orm.Context()
|
||||
e, err := r.execMgr.Get(ctx, eid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -224,50 +260,135 @@ func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) er
|
||||
|
||||
// GetRetentionExec Get Retention Execution
|
||||
func (r *DefaultAPIController) GetRetentionExec(executionID int64) (*Execution, error) {
|
||||
return r.manager.GetExecution(executionID)
|
||||
ctx := orm.Context()
|
||||
e, err := r.execMgr.Get(ctx, executionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertExecution(e), nil
|
||||
}
|
||||
|
||||
// ListRetentionExecs List Retention Executions
|
||||
func (r *DefaultAPIController) ListRetentionExecs(policyID int64, query *q.Query) ([]*Execution, error) {
|
||||
return r.manager.ListExecutions(policyID, query)
|
||||
ctx := orm.Context()
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Retention
|
||||
query.Keywords["VendorID"] = policyID
|
||||
execs, err := r.execMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var executions []*Execution
|
||||
for _, exec := range execs {
|
||||
executions = append(executions, convertExecution(exec))
|
||||
}
|
||||
return executions, nil
|
||||
}
|
||||
|
||||
func convertExecution(exec *task.Execution) *Execution {
|
||||
return &Execution{
|
||||
ID: exec.ID,
|
||||
PolicyID: exec.VendorID,
|
||||
StartTime: exec.StartTime,
|
||||
EndTime: exec.EndTime,
|
||||
Status: exec.Status,
|
||||
Trigger: exec.Trigger,
|
||||
DryRun: exec.ExtraAttrs["dry_run"].(bool),
|
||||
}
|
||||
}
|
||||
|
||||
// GetTotalOfRetentionExecs Count Retention Executions
|
||||
func (r *DefaultAPIController) GetTotalOfRetentionExecs(policyID int64) (int64, error) {
|
||||
return r.manager.GetTotalOfRetentionExecs(policyID)
|
||||
ctx := orm.Context()
|
||||
return r.execMgr.Count(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"VendorID": policyID,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// ListRetentionExecTasks List Retention Execution Histories
|
||||
func (r *DefaultAPIController) ListRetentionExecTasks(executionID int64, query *q.Query) ([]*Task, error) {
|
||||
q1 := &q.TaskQuery{
|
||||
ExecutionID: executionID,
|
||||
ctx := orm.Context()
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Retention
|
||||
query.Keywords["ExecutionID"] = executionID
|
||||
tks, err := r.taskMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if query != nil {
|
||||
q1.PageSize = query.PageSize
|
||||
q1.PageNumber = query.PageNumber
|
||||
var tasks []*Task
|
||||
for _, tk := range tks {
|
||||
tasks = append(tasks, convertTask(tk))
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
func convertTask(t *task.Task) *Task {
|
||||
return &Task{
|
||||
ID: t.ID,
|
||||
ExecutionID: t.ExecutionID,
|
||||
Repository: t.GetStringFromExtraAttrs("repository"),
|
||||
JobID: t.JobID,
|
||||
Status: t.Status,
|
||||
StatusCode: job.Status(t.Status).Code(),
|
||||
StartTime: t.StartTime,
|
||||
EndTime: t.EndTime,
|
||||
Total: int(t.GetNumFromExtraAttrs("total")),
|
||||
Retained: int(t.GetNumFromExtraAttrs("retained")),
|
||||
}
|
||||
return r.manager.ListTasks(q1)
|
||||
}
|
||||
|
||||
// GetTotalOfRetentionExecTasks Count Retention Execution Histories
|
||||
func (r *DefaultAPIController) GetTotalOfRetentionExecTasks(executionID int64) (int64, error) {
|
||||
return r.manager.GetTotalOfTasks(executionID)
|
||||
ctx := orm.Context()
|
||||
return r.taskMgr.Count(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"ExecutionID": executionID,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// GetRetentionExecTaskLog Get Retention Execution Task Log
|
||||
func (r *DefaultAPIController) GetRetentionExecTaskLog(taskID int64) ([]byte, error) {
|
||||
return r.manager.GetTaskLog(taskID)
|
||||
ctx := orm.Context()
|
||||
return r.taskMgr.GetLog(ctx, taskID)
|
||||
}
|
||||
|
||||
// GetRetentionExecTask Get Retention Execution Task
|
||||
func (r *DefaultAPIController) GetRetentionExecTask(taskID int64) (*Task, error) {
|
||||
return r.manager.GetTask(taskID)
|
||||
ctx := orm.Context()
|
||||
t, err := r.taskMgr.Get(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertTask(t), nil
|
||||
}
|
||||
|
||||
// UpdateTaskInfo Update task info
|
||||
func (r *DefaultAPIController) UpdateTaskInfo(taskID int64, total int, retained int) error {
|
||||
ctx := orm.Context()
|
||||
t, err := r.taskMgr.Get(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.ExtraAttrs["total"] = total
|
||||
t.ExtraAttrs["retained"] = retained
|
||||
|
||||
return r.taskMgr.UpdateExtraAttrs(ctx, taskID, t.ExtraAttrs)
|
||||
}
|
||||
|
||||
// NewAPIController ...
|
||||
func NewAPIController(retentionMgr Manager, projectManager project.Manager, repositoryMgr repository.Manager, scheduler scheduler.Scheduler, retentionLauncher Launcher) APIController {
|
||||
func NewAPIController(retentionMgr Manager, projectManager project.Manager, repositoryMgr repository.Manager, scheduler scheduler.Scheduler, retentionLauncher Launcher, execMgr task.ExecutionManager, taskMgr task.Manager) APIController {
|
||||
return &DefaultAPIController{
|
||||
manager: retentionMgr,
|
||||
execMgr: execMgr,
|
||||
taskMgr: taskMgr,
|
||||
launcher: retentionLauncher,
|
||||
projectManager: projectManager,
|
||||
repositoryMgr: repositoryMgr,
|
||||
|
@ -16,17 +16,20 @@ package retention
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"github.com/goharbor/harbor/src/testing/pkg/project"
|
||||
"github.com/goharbor/harbor/src/testing/pkg/repository"
|
||||
testingTask "github.com/goharbor/harbor/src/testing/pkg/task"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type ControllerTestSuite struct {
|
||||
@ -50,8 +53,37 @@ func (s *ControllerTestSuite) TestPolicy() {
|
||||
repositoryMgr := &repository.FakeManager{}
|
||||
retentionScheduler := &fakeRetentionScheduler{}
|
||||
retentionLauncher := &fakeLauncher{}
|
||||
execMgr := &testingTask.ExecutionManager{}
|
||||
taskMgr := &testingTask.Manager{}
|
||||
retentionMgr := NewManager()
|
||||
c := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
|
||||
execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||
execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"dry_run": true,
|
||||
},
|
||||
}, nil)
|
||||
execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"dry_run": true,
|
||||
},
|
||||
}}, nil)
|
||||
taskMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Task{{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"total": 1,
|
||||
"retained": 1,
|
||||
},
|
||||
}}, nil)
|
||||
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
c := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher, execMgr, taskMgr)
|
||||
|
||||
p1 := &policy.Metadata{
|
||||
Algorithm: "or",
|
||||
@ -148,8 +180,36 @@ func (s *ControllerTestSuite) TestExecution() {
|
||||
repositoryMgr := &repository.FakeManager{}
|
||||
retentionScheduler := &fakeRetentionScheduler{}
|
||||
retentionLauncher := &fakeLauncher{}
|
||||
execMgr := &testingTask.ExecutionManager{}
|
||||
taskMgr := &testingTask.Manager{}
|
||||
retentionMgr := NewManager()
|
||||
m := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
|
||||
execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"dry_run": true,
|
||||
},
|
||||
}, nil)
|
||||
execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"dry_run": true,
|
||||
},
|
||||
}}, nil)
|
||||
taskMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Task{{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
ExtraAttrs: map[string]interface{}{
|
||||
"total": 1,
|
||||
"retained": 1,
|
||||
},
|
||||
}}, nil)
|
||||
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
m := NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher, execMgr, taskMgr)
|
||||
|
||||
p1 := &policy.Metadata{
|
||||
Algorithm: "or",
|
||||
@ -213,7 +273,7 @@ func (s *ControllerTestSuite) TestExecution() {
|
||||
|
||||
ts, err := m.ListRetentionExecTasks(id, nil)
|
||||
s.Require().Nil(err)
|
||||
s.Require().EqualValues(0, len(ts))
|
||||
s.Require().EqualValues(1, len(ts))
|
||||
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
)
|
||||
|
||||
// FakedRetentionController ...
|
||||
|
@ -16,18 +16,15 @@ package retention
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
beegoorm "github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/selector"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/selector/selectors/index"
|
||||
|
||||
cjob "github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/common/job/models"
|
||||
cmodels "github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
@ -37,7 +34,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/pkg/repository"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -75,11 +71,13 @@ type Launcher interface {
|
||||
|
||||
// NewLauncher returns an instance of Launcher
|
||||
func NewLauncher(projectMgr project.Manager, repositoryMgr repository.Manager,
|
||||
retentionMgr Manager) Launcher {
|
||||
retentionMgr Manager, execMgr task.ExecutionManager, taskMgr task.Manager) Launcher {
|
||||
return &launcher{
|
||||
projectMgr: projectMgr,
|
||||
repositoryMgr: repositoryMgr,
|
||||
retentionMgr: retentionMgr,
|
||||
execMgr: execMgr,
|
||||
taskMgr: taskMgr,
|
||||
jobserviceClient: cjob.GlobalClient,
|
||||
internalCoreURL: config.InternalCoreURL(),
|
||||
chartServerEnabled: config.WithChartMuseum(),
|
||||
@ -95,6 +93,8 @@ type jobData struct {
|
||||
|
||||
type launcher struct {
|
||||
retentionMgr Manager
|
||||
taskMgr task.Manager
|
||||
execMgr task.ExecutionManager
|
||||
projectMgr project.Manager
|
||||
repositoryMgr repository.Manager
|
||||
jobserviceClient cjob.Client
|
||||
@ -206,13 +206,8 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
if err = l.createTasks(executionID, jobDatas); err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
|
||||
// submit jobs to jobservice
|
||||
if err = l.submitJobs(jobDatas); err != nil {
|
||||
// submit tasks to jobservice
|
||||
if err = l.submitTasks(executionID, jobDatas); err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
|
||||
@ -246,58 +241,23 @@ func createJobs(repositoryRules map[selector.Repository]*lwp.Metadata, isDryRun
|
||||
return jobDatas, nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
func (l *launcher) createTasks(executionID int64, jobDatas []*jobData) error {
|
||||
now := time.Now()
|
||||
func (l *launcher) submitTasks(executionID int64, jobDatas []*jobData) error {
|
||||
ctx := orm.Context()
|
||||
for _, jobData := range jobDatas {
|
||||
taskID, err := l.retentionMgr.CreateTask(&Task{
|
||||
ExecutionID: executionID,
|
||||
Repository: jobData.Repository.Name,
|
||||
StartTime: now.Truncate(time.Second),
|
||||
})
|
||||
_, err := l.taskMgr.Create(ctx, executionID, &task.Job{
|
||||
Name: jobData.JobName,
|
||||
Parameters: jobData.JobParams,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
},
|
||||
map[string]interface{}{
|
||||
"repository": jobData.Repository.Name,
|
||||
"dry_run": jobData.JobParams[ParamDryRun],
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jobData.TaskID = taskID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
func (l *launcher) submitJobs(jobDatas []*jobData) error {
|
||||
allFailed := true
|
||||
for _, jobData := range jobDatas {
|
||||
task := &Task{
|
||||
ID: jobData.TaskID,
|
||||
}
|
||||
props := []string{"Status"}
|
||||
j := &models.JobData{
|
||||
Name: jobData.JobName,
|
||||
Metadata: &models.JobMetadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.TaskID),
|
||||
Parameters: jobData.JobParams,
|
||||
}
|
||||
// Submit job
|
||||
jobID, err := l.jobserviceClient.SubmitJob(j)
|
||||
if err != nil {
|
||||
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.TaskID, err)))
|
||||
task.Status = cmodels.JobError
|
||||
task.EndTime = time.Now()
|
||||
props = append(props, "EndTime")
|
||||
} else {
|
||||
allFailed = false
|
||||
task.JobID = jobID
|
||||
task.Status = cmodels.JobPending
|
||||
props = append(props, "JobID")
|
||||
}
|
||||
if err = l.retentionMgr.UpdateTask(task, props...); err != nil {
|
||||
log.Errorf("failed to update the status of task %d: %v", task.ID, err)
|
||||
}
|
||||
}
|
||||
if allFailed {
|
||||
return launcherError(fmt.Errorf("all tasks failed"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -306,19 +266,8 @@ func (l *launcher) Stop(executionID int64) error {
|
||||
if executionID <= 0 {
|
||||
return launcherError(fmt.Errorf("invalid execution ID: %d", executionID))
|
||||
}
|
||||
tasks, err := l.retentionMgr.ListTasks(&q.TaskQuery{
|
||||
ExecutionID: executionID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, task := range tasks {
|
||||
if err = l.jobserviceClient.PostAction(task.JobID, cjob.JobActionStop); err != nil {
|
||||
log.Errorf("failed to stop task %d, job ID: %s : %v", task.ID, task.JobID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
ctx := orm.Context()
|
||||
return l.execMgr.Stop(ctx, executionID)
|
||||
}
|
||||
|
||||
func launcherError(err error) error {
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/testing/mock"
|
||||
projecttesting "github.com/goharbor/harbor/src/testing/pkg/project"
|
||||
"github.com/goharbor/harbor/src/testing/pkg/repository"
|
||||
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
@ -104,6 +105,8 @@ func (f *fakeRetentionManager) ListHistories(executionID int64, query *q.Query)
|
||||
type launchTestSuite struct {
|
||||
suite.Suite
|
||||
projectMgr project.Manager
|
||||
execMgr *tasktesting.ExecutionManager
|
||||
taskMgr *tasktesting.Manager
|
||||
repositoryMgr *repository.FakeManager
|
||||
retentionMgr Manager
|
||||
jobserviceClient job.Client
|
||||
@ -125,6 +128,8 @@ func (l *launchTestSuite) SetupTest() {
|
||||
l.projectMgr = projectMgr
|
||||
l.repositoryMgr = &repository.FakeManager{}
|
||||
l.retentionMgr = &fakeRetentionManager{}
|
||||
l.execMgr = &tasktesting.ExecutionManager{}
|
||||
l.taskMgr = &tasktesting.Manager{}
|
||||
l.jobserviceClient = &hjob.MockJobClient{
|
||||
JobUUID: []string{"1"},
|
||||
}
|
||||
@ -156,10 +161,16 @@ func (l *launchTestSuite) TestGetRepositories() {
|
||||
}
|
||||
|
||||
func (l *launchTestSuite) TestLaunch() {
|
||||
l.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
l.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
l.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
launcher := &launcher{
|
||||
projectMgr: l.projectMgr,
|
||||
repositoryMgr: l.repositoryMgr,
|
||||
retentionMgr: l.retentionMgr,
|
||||
execMgr: l.execMgr,
|
||||
taskMgr: l.taskMgr,
|
||||
jobserviceClient: l.jobserviceClient,
|
||||
chartServerEnabled: true,
|
||||
}
|
||||
@ -244,10 +255,13 @@ func (l *launchTestSuite) TestLaunch() {
|
||||
|
||||
func (l *launchTestSuite) TestStop() {
|
||||
t := l.T()
|
||||
l.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
launcher := &launcher{
|
||||
projectMgr: l.projectMgr,
|
||||
repositoryMgr: l.repositoryMgr,
|
||||
retentionMgr: l.retentionMgr,
|
||||
execMgr: l.execMgr,
|
||||
taskMgr: l.taskMgr,
|
||||
jobserviceClient: l.jobserviceClient,
|
||||
}
|
||||
// invalid execution ID
|
||||
|
@ -16,17 +16,13 @@ package retention
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"time"
|
||||
|
||||
"github.com/astaxie/beego/orm"
|
||||
cjob "github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/dao"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/dao/models"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
)
|
||||
|
||||
// Manager defines operations of managing policy
|
||||
@ -41,32 +37,6 @@ type Manager interface {
|
||||
DeletePolicyAndExec(ID int64) error
|
||||
// Get the specified policy
|
||||
GetPolicy(ID int64) (*policy.Metadata, error)
|
||||
// Create a new retention execution
|
||||
CreateExecution(execution *Execution) (int64, error)
|
||||
// Delete a new retention execution
|
||||
DeleteExecution(int64) error
|
||||
// Get the specified execution
|
||||
GetExecution(eid int64) (*Execution, error)
|
||||
// List executions
|
||||
ListExecutions(policyID int64, query *q.Query) ([]*Execution, error)
|
||||
// GetTotalOfRetentionExecs Count Retention Executions
|
||||
GetTotalOfRetentionExecs(policyID int64) (int64, error)
|
||||
// List tasks histories
|
||||
ListTasks(query ...*q.TaskQuery) ([]*Task, error)
|
||||
// GetTotalOfTasks Count Tasks
|
||||
GetTotalOfTasks(executionID int64) (int64, error)
|
||||
// Create a new retention task
|
||||
CreateTask(task *Task) (int64, error)
|
||||
// Update the specified task
|
||||
UpdateTask(task *Task, cols ...string) error
|
||||
// Update the status of the specified task
|
||||
// The status is updated only when (the statusRevision > the current revision)
|
||||
// or (the the statusRevision = the current revision and status > the current status)
|
||||
UpdateTaskStatus(taskID int64, status string, statusRevision int64) error
|
||||
// Get the task specified by the task ID
|
||||
GetTask(taskID int64) (*Task, error)
|
||||
// Get the log of the specified task
|
||||
GetTaskLog(taskID int64) ([]byte, error)
|
||||
}
|
||||
|
||||
// DefaultManager ...
|
||||
@ -123,188 +93,6 @@ func (d *DefaultManager) GetPolicy(id int64) (*policy.Metadata, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// CreateExecution Create Execution
|
||||
func (d *DefaultManager) CreateExecution(execution *Execution) (int64, error) {
|
||||
exec := &models.RetentionExecution{}
|
||||
exec.PolicyID = execution.PolicyID
|
||||
exec.StartTime = execution.StartTime
|
||||
exec.DryRun = execution.DryRun
|
||||
exec.Trigger = execution.Trigger
|
||||
return dao.CreateExecution(exec)
|
||||
}
|
||||
|
||||
// DeleteExecution Delete Execution
|
||||
func (d *DefaultManager) DeleteExecution(eid int64) error {
|
||||
return dao.DeleteExecution(eid)
|
||||
}
|
||||
|
||||
// ListExecutions List Executions
|
||||
func (d *DefaultManager) ListExecutions(policyID int64, query *q.Query) ([]*Execution, error) {
|
||||
execs, err := dao.ListExecutions(policyID, query)
|
||||
if err != nil {
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var execs1 []*Execution
|
||||
for _, e := range execs {
|
||||
e1 := &Execution{}
|
||||
e1.ID = e.ID
|
||||
e1.PolicyID = e.PolicyID
|
||||
e1.Status = e.Status
|
||||
e1.StartTime = e.StartTime
|
||||
e1.EndTime = e.EndTime
|
||||
e1.Trigger = e.Trigger
|
||||
e1.DryRun = e.DryRun
|
||||
execs1 = append(execs1, e1)
|
||||
}
|
||||
return execs1, nil
|
||||
}
|
||||
|
||||
// GetTotalOfRetentionExecs Count Executions
|
||||
func (d *DefaultManager) GetTotalOfRetentionExecs(policyID int64) (int64, error) {
|
||||
return dao.GetTotalOfRetentionExecs(policyID)
|
||||
}
|
||||
|
||||
// GetExecution Get Execution
|
||||
func (d *DefaultManager) GetExecution(eid int64) (*Execution, error) {
|
||||
e, err := dao.GetExecution(eid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e1 := &Execution{}
|
||||
e1.ID = e.ID
|
||||
e1.PolicyID = e.PolicyID
|
||||
e1.Status = e.Status
|
||||
e1.StartTime = e.StartTime
|
||||
e1.EndTime = e.EndTime
|
||||
e1.Trigger = e.Trigger
|
||||
e1.DryRun = e.DryRun
|
||||
return e1, nil
|
||||
}
|
||||
|
||||
// CreateTask creates task record
|
||||
func (d *DefaultManager) CreateTask(task *Task) (int64, error) {
|
||||
if task == nil {
|
||||
return 0, errors.New("nil task")
|
||||
}
|
||||
t := &models.RetentionTask{
|
||||
ExecutionID: task.ExecutionID,
|
||||
Repository: task.Repository,
|
||||
JobID: task.JobID,
|
||||
Status: task.Status,
|
||||
StatusCode: task.StatusCode,
|
||||
StatusRevision: task.StatusRevision,
|
||||
StartTime: task.StartTime,
|
||||
EndTime: task.EndTime,
|
||||
Total: task.Total,
|
||||
Retained: task.Retained,
|
||||
}
|
||||
return dao.CreateTask(t)
|
||||
}
|
||||
|
||||
// ListTasks lists tasks according to the query
|
||||
func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
|
||||
ts, err := dao.ListTask(query...)
|
||||
if err != nil {
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
tasks := make([]*Task, 0)
|
||||
for _, t := range ts {
|
||||
tasks = append(tasks, &Task{
|
||||
ID: t.ID,
|
||||
ExecutionID: t.ExecutionID,
|
||||
Repository: t.Repository,
|
||||
JobID: t.JobID,
|
||||
Status: t.Status,
|
||||
StatusCode: t.StatusCode,
|
||||
StatusRevision: t.StatusRevision,
|
||||
StartTime: t.StartTime,
|
||||
EndTime: t.EndTime,
|
||||
Total: t.Total,
|
||||
Retained: t.Retained,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// GetTotalOfTasks Count tasks
|
||||
func (d *DefaultManager) GetTotalOfTasks(executionID int64) (int64, error) {
|
||||
return dao.GetTotalOfTasks(executionID)
|
||||
}
|
||||
|
||||
// UpdateTask updates the task
|
||||
func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
|
||||
if task == nil {
|
||||
return errors.New("nil task")
|
||||
}
|
||||
if task.ID <= 0 {
|
||||
return fmt.Errorf("invalid task ID: %d", task.ID)
|
||||
}
|
||||
return dao.UpdateTask(&models.RetentionTask{
|
||||
ID: task.ID,
|
||||
ExecutionID: task.ExecutionID,
|
||||
Repository: task.Repository,
|
||||
JobID: task.JobID,
|
||||
Status: task.Status,
|
||||
StatusCode: task.StatusCode,
|
||||
StatusRevision: task.StatusRevision,
|
||||
StartTime: task.StartTime,
|
||||
EndTime: task.EndTime,
|
||||
Total: task.Total,
|
||||
Retained: task.Retained,
|
||||
}, cols...)
|
||||
}
|
||||
|
||||
// UpdateTaskStatus updates the status of the specified task
|
||||
func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusRevision int64) error {
|
||||
if taskID <= 0 {
|
||||
return fmt.Errorf("invalid task ID: %d", taskID)
|
||||
}
|
||||
st := job.Status(status)
|
||||
return dao.UpdateTaskStatus(taskID, status, st.Code(), statusRevision)
|
||||
}
|
||||
|
||||
// GetTask returns the task specified by task ID
|
||||
func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
|
||||
if taskID <= 0 {
|
||||
return nil, fmt.Errorf("invalid task ID: %d", taskID)
|
||||
}
|
||||
task, err := dao.GetTask(taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Task{
|
||||
ID: task.ID,
|
||||
ExecutionID: task.ExecutionID,
|
||||
Repository: task.Repository,
|
||||
JobID: task.JobID,
|
||||
Status: task.Status,
|
||||
StatusCode: task.StatusCode,
|
||||
StatusRevision: task.StatusRevision,
|
||||
StartTime: task.StartTime,
|
||||
EndTime: task.EndTime,
|
||||
Total: task.Total,
|
||||
Retained: task.Retained,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetTaskLog gets the logs of task
|
||||
func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
|
||||
task, err := d.GetTask(taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if task == nil {
|
||||
return nil, fmt.Errorf("task %d not found", taskID)
|
||||
}
|
||||
return cjob.GlobalClient.GetJobLog(task.JobID)
|
||||
}
|
||||
|
||||
// NewManager ...
|
||||
func NewManager() Manager {
|
||||
return &DefaultManager{}
|
||||
|
@ -1,20 +1,13 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
jjob "github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
tjob "github.com/goharbor/harbor/src/testing/job"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
@ -134,89 +127,4 @@ func TestExecution(t *testing.T) {
|
||||
policyID, err := m.CreatePolicy(p1)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, policyID > 0)
|
||||
|
||||
e1 := &Execution{
|
||||
PolicyID: policyID,
|
||||
StartTime: time.Now(),
|
||||
Trigger: ExecutionTriggerManual,
|
||||
DryRun: false,
|
||||
}
|
||||
id, err := m.CreateExecution(e1)
|
||||
assert.Nil(t, err)
|
||||
assert.True(t, id > 0)
|
||||
|
||||
e1, err = m.GetExecution(id)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, e1)
|
||||
assert.EqualValues(t, id, e1.ID)
|
||||
|
||||
es, err := m.ListExecutions(policyID, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, 1, len(es))
|
||||
|
||||
err = m.DeleteExecution(id)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestTask(t *testing.T) {
|
||||
m := NewManager()
|
||||
err := m.DeleteExecution(1000)
|
||||
require.Nil(t, err)
|
||||
task := &Task{
|
||||
ExecutionID: 1000,
|
||||
JobID: "1",
|
||||
Status: jjob.PendingStatus.String(),
|
||||
StatusCode: jjob.PendingStatus.Code(),
|
||||
StatusRevision: 1,
|
||||
Total: 0,
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
// create
|
||||
id, err := m.CreateTask(task)
|
||||
require.Nil(t, err)
|
||||
|
||||
// get
|
||||
tk, err := m.GetTask(id)
|
||||
require.Nil(t, err)
|
||||
assert.EqualValues(t, 1000, tk.ExecutionID)
|
||||
|
||||
// update
|
||||
task.ID = id
|
||||
task.Total = 1
|
||||
err = m.UpdateTask(task, "Total")
|
||||
require.Nil(t, err)
|
||||
|
||||
// update status to success which is a final status
|
||||
err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String(), 1)
|
||||
require.Nil(t, err)
|
||||
|
||||
// try to update status to running, as the status has already
|
||||
// been updated to a final status and the stautus revision doesn't change,
|
||||
// this updating shouldn't take effect
|
||||
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 1)
|
||||
require.Nil(t, err)
|
||||
|
||||
// update the revision and try to update status to running again
|
||||
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String(), 2)
|
||||
require.Nil(t, err)
|
||||
|
||||
// list
|
||||
tasks, err := m.ListTasks(&q.TaskQuery{
|
||||
ExecutionID: 1000,
|
||||
})
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 1, len(tasks))
|
||||
assert.Equal(t, int64(1000), tasks[0].ExecutionID)
|
||||
assert.Equal(t, 1, tasks[0].Total)
|
||||
assert.Equal(t, jjob.RunningStatus.String(), tasks[0].Status)
|
||||
assert.Equal(t, jjob.RunningStatus.Code(), tasks[0].StatusCode)
|
||||
assert.Equal(t, int64(2), tasks[0].StatusRevision)
|
||||
|
||||
// get task log
|
||||
job.GlobalClient = &tjob.MockJobClient{
|
||||
JobUUID: []string{"1"},
|
||||
}
|
||||
data, err := m.GetTaskLog(task.ID)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, "some log", string(data))
|
||||
}
|
||||
|
@ -3,8 +3,8 @@
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
policy "github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
q "github.com/goharbor/harbor/src/pkg/retention/q"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
retention "github.com/goharbor/harbor/src/pkg/retention"
|
||||
|
@ -142,6 +142,22 @@ func (t *Task) GetBoolFromExtraAttrs(key string) bool {
|
||||
return b
|
||||
}
|
||||
|
||||
// GetNumFromExtraAttrs returns the num value specified by key
|
||||
func (t *Task) GetNumFromExtraAttrs(key string) float64 {
|
||||
if len(t.ExtraAttrs) == 0 {
|
||||
return 0
|
||||
}
|
||||
rt, exist := t.ExtraAttrs[key]
|
||||
if !exist {
|
||||
return 0
|
||||
}
|
||||
v, ok := rt.(float64)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// Job is the model represents the requested jobservice job
|
||||
type Job struct {
|
||||
Name string
|
||||
|
@ -51,12 +51,12 @@ func registerRoutes() {
|
||||
beego.Router("/api/internal/syncquota", &api.InternalAPI{}, "post:SyncQuota")
|
||||
|
||||
beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob")
|
||||
beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask")
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/adminjob/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for adminjob
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/scan/:uuid").HandlerFunc(ignoreNotification) // ignore legacy scan job notifaction
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/retention/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler())
|
||||
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler())
|
||||
|
||||
beego.Router("/service/token", &token.Handler{})
|
||||
|
Loading…
Reference in New Issue
Block a user