mirror of https://github.com/goharbor/harbor.git
perf: optimize the trigger retention API (#19533)
Enhance the API for triggering retention by optimizing it from synchronous to asynchronous to solve the problem of slow response in the case of a large number of tasks. Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
parent
b7116fff0f
commit
f6d5bf2e0f
|
@ -23,7 +23,10 @@ import (
|
||||||
"github.com/goharbor/harbor/src/controller/event/operator"
|
"github.com/goharbor/harbor/src/controller/event/operator"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
"github.com/goharbor/harbor/src/lib/retry"
|
||||||
"github.com/goharbor/harbor/src/pkg"
|
"github.com/goharbor/harbor/src/pkg"
|
||||||
"github.com/goharbor/harbor/src/pkg/project"
|
"github.com/goharbor/harbor/src/pkg/project"
|
||||||
"github.com/goharbor/harbor/src/pkg/repository"
|
"github.com/goharbor/harbor/src/pkg/repository"
|
||||||
|
@ -80,6 +83,7 @@ type defaultController struct {
|
||||||
projectManager project.Manager
|
projectManager project.Manager
|
||||||
repositoryMgr repository.Manager
|
repositoryMgr repository.Manager
|
||||||
scheduler scheduler.Scheduler
|
scheduler scheduler.Scheduler
|
||||||
|
wp *lib.WorkerPool
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -248,21 +252,49 @@ func (r *defaultController) TriggerRetentionExec(ctx context.Context, policyID i
|
||||||
"dry_run": dryRun,
|
"dry_run": dryRun,
|
||||||
"operator": operator.FromContext(ctx),
|
"operator": operator.FromContext(ctx),
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := r.execMgr.Create(ctx, job.RetentionVendorType, policyID, trigger, extra)
|
id, err := r.execMgr.Create(ctx, job.RetentionVendorType, policyID, trigger, extra)
|
||||||
if num, err := r.launcher.Launch(ctx, p, id, dryRun); err != nil {
|
if err != nil {
|
||||||
if err1 := r.execMgr.StopAndWait(ctx, id, 10*time.Second); err1 != nil {
|
|
||||||
logger.Errorf("failed to stop the retention execution %d: %v", id, err1)
|
|
||||||
}
|
|
||||||
if err1 := r.execMgr.MarkError(ctx, id, err.Error()); err1 != nil {
|
|
||||||
logger.Errorf("failed to mark error for the retention execution %d: %v", id, err1)
|
|
||||||
}
|
|
||||||
return 0, err
|
return 0, err
|
||||||
} else if num == 0 {
|
|
||||||
// no candidates, mark the execution as done directly
|
|
||||||
if err := r.execMgr.MarkDone(ctx, id, "no resources for retention"); err != nil {
|
|
||||||
logger.Errorf("failed to mark done for the execution %d: %v", id, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
r.wp.GetWorker()
|
||||||
|
defer r.wp.ReleaseWorker()
|
||||||
|
// copy the context to request a new ormer
|
||||||
|
ctx = orm.Copy(ctx)
|
||||||
|
// as we start a new transaction in the goroutine, the execution record may not
|
||||||
|
// be inserted yet, wait until it is ready before continue
|
||||||
|
if err := retry.Retry(func() error {
|
||||||
|
_, err := r.execMgr.Get(ctx, id)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
markErr := r.execMgr.MarkError(ctx, id, fmt.Sprintf(
|
||||||
|
"failed to wait the execution record to be inserted: %v", err))
|
||||||
|
if markErr != nil {
|
||||||
|
logger.Errorf("failed to mark the status of execution %d to error: %v", id, markErr)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if num, err := r.launcher.Launch(ctx, p, id, dryRun); err != nil {
|
||||||
|
logger.Errorf("failed to launch the retention jobs, err: %v", err)
|
||||||
|
|
||||||
|
if err = r.execMgr.StopAndWait(ctx, id, 10*time.Second); err != nil {
|
||||||
|
logger.Errorf("failed to stop the retention execution %d: %v", id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = r.execMgr.MarkError(ctx, id, err.Error()); err != nil {
|
||||||
|
logger.Errorf("failed to mark error for the retention execution %d: %v", id, err)
|
||||||
|
}
|
||||||
|
} else if num == 0 {
|
||||||
|
// no candidates, mark the execution as done directly
|
||||||
|
if err := r.execMgr.MarkDone(ctx, id, "no resources for retention"); err != nil {
|
||||||
|
logger.Errorf("failed to mark done for the execution %d: %v", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,5 +466,6 @@ func NewController() Controller {
|
||||||
projectManager: pkg.ProjectMgr,
|
projectManager: pkg.ProjectMgr,
|
||||||
repositoryMgr: pkg.RepositoryMgr,
|
repositoryMgr: pkg.RepositoryMgr,
|
||||||
scheduler: scheduler.Sched,
|
scheduler: scheduler.Sched,
|
||||||
|
wp: lib.NewWorkerPool(10),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/dao"
|
"github.com/goharbor/harbor/src/common/dao"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
"github.com/goharbor/harbor/src/lib/orm"
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention"
|
"github.com/goharbor/harbor/src/pkg/retention"
|
||||||
|
@ -238,6 +239,7 @@ func (s *ControllerTestSuite) TestExecution() {
|
||||||
projectManager: projectMgr,
|
projectManager: projectMgr,
|
||||||
repositoryMgr: repositoryMgr,
|
repositoryMgr: repositoryMgr,
|
||||||
scheduler: retentionScheduler,
|
scheduler: retentionScheduler,
|
||||||
|
wp: lib.NewWorkerPool(10),
|
||||||
}
|
}
|
||||||
|
|
||||||
p1 := &policy.Metadata{
|
p1 := &policy.Metadata{
|
||||||
|
|
Loading…
Reference in New Issue