diff --git a/src/controller/retention/controller.go b/src/controller/retention/controller.go index 986268e58..4ec607a40 100644 --- a/src/controller/retention/controller.go +++ b/src/controller/retention/controller.go @@ -23,7 +23,10 @@ import ( "github.com/goharbor/harbor/src/controller/event/operator" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/lib/retry" "github.com/goharbor/harbor/src/pkg" "github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/repository" @@ -80,6 +83,7 @@ type defaultController struct { projectManager project.Manager repositoryMgr repository.Manager scheduler scheduler.Scheduler + wp *lib.WorkerPool } const ( @@ -248,21 +252,49 @@ func (r *defaultController) TriggerRetentionExec(ctx context.Context, policyID i "dry_run": dryRun, "operator": operator.FromContext(ctx), } + id, err := r.execMgr.Create(ctx, job.RetentionVendorType, policyID, trigger, extra) - if num, err := r.launcher.Launch(ctx, p, id, dryRun); err != nil { - if err1 := r.execMgr.StopAndWait(ctx, id, 10*time.Second); err1 != nil { - logger.Errorf("failed to stop the retention execution %d: %v", id, err1) - } - if err1 := r.execMgr.MarkError(ctx, id, err.Error()); err1 != nil { - logger.Errorf("failed to mark error for the retention execution %d: %v", id, err1) - } + if err != nil { return 0, err - } else if num == 0 { - // no candidates, mark the execution as done directly - if err := r.execMgr.MarkDone(ctx, id, "no resources for retention"); err != nil { - logger.Errorf("failed to mark done for the execution %d: %v", id, err) - } } + + go func() { + r.wp.GetWorker() + defer r.wp.ReleaseWorker() + // copy the context to request a new ormer + ctx = orm.Copy(ctx) + // as we start a new transaction in the goroutine, the execution record may not + // be inserted yet, wait until it is ready before continue + if err := retry.Retry(func() error { + _, err := r.execMgr.Get(ctx, id) + return err + }); err != nil { + markErr := r.execMgr.MarkError(ctx, id, fmt.Sprintf( + "failed to wait the execution record to be inserted: %v", err)) + if markErr != nil { + logger.Errorf("failed to mark the status of execution %d to error: %v", id, markErr) + } + return + } + + if num, err := r.launcher.Launch(ctx, p, id, dryRun); err != nil { + logger.Errorf("failed to launch the retention jobs, err: %v", err) + + if err = r.execMgr.StopAndWait(ctx, id, 10*time.Second); err != nil { + logger.Errorf("failed to stop the retention execution %d: %v", id, err) + } + + if err = r.execMgr.MarkError(ctx, id, err.Error()); err != nil { + logger.Errorf("failed to mark error for the retention execution %d: %v", id, err) + } + } else if num == 0 { + // no candidates, mark the execution as done directly + if err := r.execMgr.MarkDone(ctx, id, "no resources for retention"); err != nil { + logger.Errorf("failed to mark done for the execution %d: %v", id, err) + } + } + }() + return id, err } @@ -434,5 +466,6 @@ func NewController() Controller { projectManager: pkg.ProjectMgr, repositoryMgr: pkg.RepositoryMgr, scheduler: scheduler.Sched, + wp: lib.NewWorkerPool(10), } } diff --git a/src/controller/retention/controller_test.go b/src/controller/retention/controller_test.go index 2088feb07..6261d4534 100644 --- a/src/controller/retention/controller_test.go +++ b/src/controller/retention/controller_test.go @@ -25,6 +25,7 @@ import ( "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/retention" @@ -238,6 +239,7 @@ func (s *ControllerTestSuite) TestExecution() { projectManager: projectMgr, repositoryMgr: repositoryMgr, scheduler: retentionScheduler, + wp: lib.NewWorkerPool(10), } p1 := &policy.Metadata{