From f21b1481bb5ba3efb9e3c1dd8c4e704d9dcc44a1 Mon Sep 17 00:00:00 2001 From: Chlins Zhang Date: Tue, 21 Mar 2023 10:10:55 +0800 Subject: [PATCH] refactor: refactor the old goroutine execution sweep (#18361) refactor: refactor the old goroutine execution sweep to global execution sweep job 1. Delete the old goroutine execution sweeper when create execution.(in the case of high concurrency can cause goroutine backlogs, affect the performance of core) 2. Introduce the new way to sweep executions, a global scheduled job will take the work. Signed-off-by: chlins --- src/controller/gc/controller.go | 5 - src/controller/p2p/preheat/enforcer.go | 5 - src/controller/replication/execution.go | 5 - src/controller/retention/controller.go | 5 - src/controller/scan/base_controller.go | 9 +- src/controller/scan/callback.go | 2 +- src/controller/scandataexport/execution.go | 4 - src/controller/systemartifact/execution.go | 4 - src/controller/task/sweep.go | 143 +++++++++++++ src/core/main.go | 7 + src/jobservice/job/known_jobs.go | 25 +++ src/jobservice/runner/redis.go | 2 +- src/jobservice/runtime/bootstrap.go | 1 + src/pkg/task/execution.go | 100 +-------- src/pkg/task/execution_test.go | 8 - src/pkg/task/mock.go | 1 + src/pkg/task/mock_sweep_manager_test.go | 69 +++++++ src/pkg/task/sweep_job.go | 187 +++++++++++++++++ src/pkg/task/sweep_job_test.go | 71 +++++++ src/pkg/task/sweep_manager.go | 189 ++++++++++++++++++ src/pkg/task/sweep_manager_test.go | 56 ++++++ src/server/v2.0/handler/scan_all.go | 6 +- .../python/test_job_service_dashboard.py | 2 +- 23 files changed, 758 insertions(+), 148 deletions(-) create mode 100644 src/controller/task/sweep.go create mode 100644 src/pkg/task/mock_sweep_manager_test.go create mode 100644 src/pkg/task/sweep_job.go create mode 100644 src/pkg/task/sweep_job_test.go create mode 100644 src/pkg/task/sweep_manager.go create mode 100644 src/pkg/task/sweep_manager_test.go diff --git a/src/controller/gc/controller.go b/src/controller/gc/controller.go index eb8bd903f..b060e25dc 100644 --- a/src/controller/gc/controller.go +++ b/src/controller/gc/controller.go @@ -10,11 +10,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 gc execution records - task.SetExecutionSweeperCount(job.GarbageCollectionVendorType, 50) -} - var ( // Ctl is a global garbage collection controller instance Ctl = NewController() diff --git a/src/controller/p2p/preheat/enforcer.go b/src/controller/p2p/preheat/enforcer.go index e017b648e..072b56063 100644 --- a/src/controller/p2p/preheat/enforcer.go +++ b/src/controller/p2p/preheat/enforcer.go @@ -46,11 +46,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 p2p preheat execution records - task.SetExecutionSweeperCount(job.P2PPreheatVendorType, 50) -} - const ( defaultSeverityCode = 99 extraAttrTotal = "totalCount" diff --git a/src/controller/replication/execution.go b/src/controller/replication/execution.go index 547d2057d..8eb610365 100644 --- a/src/controller/replication/execution.go +++ b/src/controller/replication/execution.go @@ -35,11 +35,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 replication execution records - task.SetExecutionSweeperCount(job.ReplicationVendorType, 50) -} - // Ctl is a global replication controller instance var Ctl = NewController() diff --git a/src/controller/retention/controller.go b/src/controller/retention/controller.go index 967ad15a3..642d07403 100644 --- a/src/controller/retention/controller.go +++ b/src/controller/retention/controller.go @@ -31,11 +31,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - // keep only the latest created 50 retention execution records - task.SetExecutionSweeperCount(job.RetentionVendorType, 50) -} - // go:generate mockery -name Controller -case snake // Controller to handle the requests related with retention diff --git a/src/controller/scan/base_controller.go b/src/controller/scan/base_controller.go index 5eda6b723..6ef9b3d00 100644 --- a/src/controller/scan/base_controller.go +++ b/src/controller/scan/base_controller.go @@ -55,8 +55,6 @@ var DefaultController = NewController() // const definitions const ( - VendorTypeScanAll = "SCAN_ALL" - configRegistryEndpoint = "registryEndpoint" configCoreInternalAddr = "coreInternalAddr" @@ -69,11 +67,6 @@ const ( robotIDKey = "robot_id" ) -func init() { - // keep only the latest created 5 scan all execution records - task.SetExecutionSweeperCount(VendorTypeScanAll, 5) -} - // uuidGenerator is a func template which is for generating UUID. type uuidGenerator func() (string, error) @@ -346,7 +339,7 @@ func (bc *basicController) Stop(ctx context.Context, artifact *ar.Artifact) erro } func (bc *basicController) ScanAll(ctx context.Context, trigger string, async bool) (int64, error) { - executionID, err := bc.execMgr.Create(ctx, VendorTypeScanAll, 0, trigger) + executionID, err := bc.execMgr.Create(ctx, job.ScanAllVendorType, 0, trigger) if err != nil { return 0, err } diff --git a/src/controller/scan/callback.go b/src/controller/scan/callback.go index bfca916c5..58bdc1b25 100644 --- a/src/controller/scan/callback.go +++ b/src/controller/scan/callback.go @@ -46,7 +46,7 @@ func init() { } // NOTE: the vendor type of execution for the scan job trigger by the scan all is VendorTypeScanAll - if err := task.RegisterTaskStatusChangePostFunc(VendorTypeScanAll, scanTaskStatusChange); err != nil { + if err := task.RegisterTaskStatusChangePostFunc(job.ScanAllVendorType, scanTaskStatusChange); err != nil { log.Fatalf("failed to register the task status change post for the scan all job, error %v", err) } diff --git a/src/controller/scandataexport/execution.go b/src/controller/scandataexport/execution.go index 95bb71e8e..44b1d34b7 100644 --- a/src/controller/scandataexport/execution.go +++ b/src/controller/scandataexport/execution.go @@ -16,10 +16,6 @@ import ( "github.com/goharbor/harbor/src/pkg/task" ) -func init() { - task.SetExecutionSweeperCount(job.ScanDataExportVendorType, 50) -} - var Ctl = NewController() type Controller interface { diff --git a/src/controller/systemartifact/execution.go b/src/controller/systemartifact/execution.go index a178fda04..5191a3931 100644 --- a/src/controller/systemartifact/execution.go +++ b/src/controller/systemartifact/execution.go @@ -24,10 +24,6 @@ var ( sched = scheduler.Sched ) -func init() { - task.SetExecutionSweeperCount(job.SystemArtifactCleanupVendorType, 50) -} - var Ctl = NewController() type Controller interface { diff --git a/src/controller/task/sweep.go b/src/controller/task/sweep.go new file mode 100644 index 000000000..5ebc848d1 --- /dev/null +++ b/src/controller/task/sweep.go @@ -0,0 +1,143 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" +) + +var ( + // SweepCtl is the global sweep controller + SweepCtl = NewSweepController() +) + +type SweepParams struct { + // ExecRetainCounts records the retained execution counts for different vendor type + ExecRetainCounts map[string]int64 +} + +const ( + // SchedulerCallback ... + SchedulerCallback = "EXECUTION_SWEEP_CALLBACK" + // systemVendorID represents the id for system job. + systemVendorID = -1 + + cronTypeCustom = "Custom" + // run for every hour + cronSpec = "0 0 * * * *" +) + +func init() { + err := scheduler.RegisterCallbackFunc(SchedulerCallback, sweepCallback) + if err != nil { + log.Fatalf("failed to register execution sweep job callback, error: %v", err) + } +} + +func sweepCallback(ctx context.Context, p string) error { + params := &SweepParams{ExecRetainCounts: job.GetExecutionSweeperCount()} + return SweepCtl.Start(ctx, params, task.ExecutionTriggerSchedule) +} + +type SweepController interface { + Start(ctx context.Context, params *SweepParams, trigger string) error +} + +type sweepController struct { + execMgr task.ExecutionManager + taskMgr task.Manager +} + +func (sc *sweepController) Start(ctx context.Context, params *SweepParams, trigger string) error { + jobParams := make(map[string]interface{}) + jobParams[task.ExecRetainCounts] = params.ExecRetainCounts + + execID, err := sc.execMgr.Create(ctx, job.ExecSweepVendorType, systemVendorID, trigger, jobParams) + if err != nil { + log.Errorf("failed to create execution for %s, error: %v", job.ExecSweepVendorType, err) + return err + } + + _, err = sc.taskMgr.Create(ctx, execID, &task.Job{ + Name: job.ExecSweepVendorType, + Metadata: &job.Metadata{ + JobKind: job.KindGeneric, + }, + Parameters: jobParams, + }) + if err != nil { + log.Errorf("failed to create task for %s, error: %v", job.ExecSweepVendorType, err) + return err + } + + return nil +} + +func NewSweepController() SweepController { + return &sweepController{ + execMgr: task.ExecMgr, + taskMgr: task.Mgr, + } +} + +// ScheduleSweepJob schedules the system execution sweep job. +func ScheduleSweepJob(ctx context.Context) error { + sched, err := getScheduledSweepJob(ctx) + if err != nil { + return err + } + // unschedule the job if the cron changed + if sched != nil { + if sched.CRON != cronSpec { + log.Debugf("reschedule the system execution job because the cron changed, old: %s, new: %s", sched.CRON, cronSpec) + if err = scheduler.Sched.UnScheduleByID(ctx, sched.ID); err != nil { + return err + } + } else { + log.Debug("skip to schedule the system execution job because the old one existed and cron not changed") + return nil + } + } + + // schedule a job if no schedule found or cron changed + scheduleID, err := scheduler.Sched.Schedule(ctx, job.ExecSweepVendorType, systemVendorID, cronTypeCustom, cronSpec, SchedulerCallback, nil, nil) + if err != nil { + return err + } + + log.Debugf("scheduled the system execution sweep job, id: %d", scheduleID) + return nil +} + +// getScheduledSweepJob gets sweep job which already scheduled. +func getScheduledSweepJob(ctx context.Context) (*scheduler.Schedule, error) { + query := q.New(map[string]interface{}{"vendor_type": job.ExecSweepVendorType}) + schedules, err := scheduler.Sched.ListSchedules(ctx, query) + if err != nil { + return nil, err + } + + if len(schedules) > 0 { + return schedules[0], nil + } + + return nil, nil +} diff --git a/src/core/main.go b/src/core/main.go index 41179eb4e..b7695d3ae 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -35,6 +35,7 @@ import ( "github.com/goharbor/harbor/src/controller/health" "github.com/goharbor/harbor/src/controller/registry" "github.com/goharbor/harbor/src/controller/systemartifact" + "github.com/goharbor/harbor/src/controller/task" "github.com/goharbor/harbor/src/core/api" _ "github.com/goharbor/harbor/src/core/auth/authproxy" _ "github.com/goharbor/harbor/src/core/auth/db" @@ -258,7 +259,13 @@ func main() { log.Errorf("failed to check the jobservice health status: timeout, error: %v", err) return } + + // schedule system artifact cleanup job systemartifact.ScheduleCleanupTask(ctx) + // schedule system execution sweep job + if err := task.ScheduleSweepJob(ctx); err != nil { + log.Errorf("failed to schedule system execution sweep job, error: %v", err) + } }() web.RunWithMiddleWares("", middlewares.MiddleWares()...) } diff --git a/src/jobservice/job/known_jobs.go b/src/jobservice/job/known_jobs.go index a89843d9b..7862857db 100644 --- a/src/jobservice/job/known_jobs.go +++ b/src/jobservice/job/known_jobs.go @@ -40,4 +40,29 @@ const ( SystemArtifactCleanupVendorType = "SYSTEM_ARTIFACT_CLEANUP" // ScanDataExportVendorType : the name of the scan data export job ScanDataExportVendorType = "SCAN_DATA_EXPORT" + // ExecSweepVendorType: the name of the execution sweep job + ExecSweepVendorType = "EXECUTION_SWEEP" + // ScanAllVendorType: the name of the scan all job + ScanAllVendorType = "SCAN_ALL" ) + +var ( + // executionSweeperCount stores the count for execution retained + executionSweeperCount = map[string]int64{ + ScanAllVendorType: 5, + ExecSweepVendorType: 10, + GarbageCollectionVendorType: 50, + SlackJobVendorType: 50, + WebhookJobVendorType: 50, + ReplicationVendorType: 50, + ScanDataExportVendorType: 50, + SystemArtifactCleanupVendorType: 50, + P2PPreheatVendorType: 50, + RetentionVendorType: 50, + } +) + +// GetExecutionSweeperCount gets the count of execution records retained by the sweeper +func GetExecutionSweeperCount() map[string]int64 { + return executionSweeperCount +} diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index 1ac7a088d..a878ae81a 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -161,7 +161,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { defer func() { if r := recover(); r != nil { // Log the stack - buf := make([]byte, 1<<10) + buf := make([]byte, 1<<20) size := runtime.Stack(buf, false) err = errors.Errorf("runtime error: %s; stack: %s", r, buf[0:size]) logger.Errorf("Run job %s:%s error: %s", j.Name, j.ID, err) diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index aadcb8b18..aad3837bb 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -331,6 +331,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool( "IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil), "IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil), job.SystemArtifactCleanupVendorType: (*systemartifact.Cleanup)(nil), + job.ExecSweepVendorType: (*task.SweepJob)(nil), }); err != nil { // exit return nil, err diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 3d8582698..49fee7f96 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -21,19 +21,16 @@ import ( "time" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" - "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" ) var ( // ExecMgr is a global execution manager instance - ExecMgr = NewExecutionManager() - executionSweeperCount = map[string]uint8{} - ErrTimeOut = errors.New("stopping the execution timeout") + ExecMgr = NewExecutionManager() + ErrTimeOut = errors.New("stopping the execution timeout") ) // ExecutionManager manages executions. @@ -83,8 +80,6 @@ func NewExecutionManager() ExecutionManager { executionDAO: dao.NewExecutionDAO(), taskMgr: Mgr, taskDAO: dao.NewTaskDAO(), - ormCreator: orm.Crt, - wp: lib.NewWorkerPool(10), } } @@ -92,8 +87,6 @@ type executionManager struct { executionDAO dao.ExecutionDAO taskMgr Manager taskDAO dao.TaskDAO - ormCreator orm.Creator - wp *lib.WorkerPool } func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) { @@ -126,91 +119,9 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor return 0, err } - // sweep the execution records to avoid the execution/task records explosion - go func() { - e.wp.GetWorker() - defer e.wp.ReleaseWorker() - // as we start a new transaction here to do the sweep work, the current execution record - // may be not visible(when the transaction in which the current execution is created - // in isn't committed), this will cause that there are one more execution records than expected - ctx := orm.NewContext(context.Background(), e.ormCreator.Create()) - if err := e.sweep(ctx, vendorType, vendorID); err != nil { - log.Errorf("failed to sweep the executions of %s: %v", vendorType, err) - return - } - }() - return id, nil } -func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error { - size := int64(executionSweeperCount[vendorType]) - if size == 0 { - log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType) - return nil - } - - // get the #size execution record - query := &q.Query{ - Keywords: map[string]interface{}{ - "VendorType": vendorType, - "VendorID": vendorID, - }, - Sorts: []*q.Sort{ - { - Key: "StartTime", - DESC: true, - }}, - PageSize: 1, - PageNumber: size, - } - executions, err := e.executionDAO.List(ctx, query) - if err != nil { - return err - } - // list is null means that the execution count < size, return directly - if len(executions) == 0 { - return nil - } - - query.Keywords["StartTime"] = &q.Range{ - Max: executions[0].StartTime, - } - totalOfCandidate, err := e.executionDAO.Count(ctx, query) - if err != nil { - return err - } - // n is the page count of all candidates - n := totalOfCandidate / 1000 - if totalOfCandidate%1000 > 0 { - n = n + 1 - } - query.PageSize = 1000 - for i := n; i >= 1; i-- { - query.PageNumber = i - executions, err := e.List(ctx, query) - if err != nil { - return err - } - for _, execution := range executions { - // if the status of the execution isn't final, skip - if !job.Status(execution.Status).Final() { - continue - } - - log.Debugf("delete execution %d by sweeper", execution.ID) - if err = e.Delete(ctx, execution.ID); err != nil { - // the execution may be deleted by the other sweep operation, ignore the not found error - if errors.IsNotFoundErr(err) { - continue - } - log.Errorf("failed to delete the execution %d: %v", execution.ID, err) - } - } - } - return nil -} - func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error { data, err := json.Marshal(extraAttrs) if err != nil { @@ -450,10 +361,3 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao return exec } - -// SetExecutionSweeperCount sets the count of execution records retained by the sweeper -// If no count is set for the specified vendor, the default value will be used -// The sweeper retains the latest created #count execution records for the specified vendor -func SetExecutionSweeperCount(vendorType string, count uint8) { - executionSweeperCount[vendorType] = count -} diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index bd673769d..8ceaee901 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task/dao" @@ -47,8 +46,6 @@ func (e *executionManagerTestSuite) SetupTest() { executionDAO: e.execDAO, taskMgr: e.taskMgr, taskDAO: e.taskDAO, - ormCreator: e.ormCreator, - wp: lib.NewWorkerPool(10), } } @@ -61,11 +58,7 @@ func (e *executionManagerTestSuite) TestCount() { } func (e *executionManagerTestSuite) TestCreate() { - SetExecutionSweeperCount("vendor", 50) - e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) - e.ormCreator.On("Create").Return(&orm.FakeOrmer{}) - e.execDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual, map[string]interface{}{"k": "v"}) e.Require().Nil(err) @@ -73,7 +66,6 @@ func (e *executionManagerTestSuite) TestCreate() { // sleep to make sure the function in the goroutine run time.Sleep(1 * time.Second) e.execDAO.AssertExpectations(e.T()) - e.ormCreator.AssertExpectations(e.T()) } func (e *executionManagerTestSuite) TestUpdateExtraAttrs() { diff --git a/src/pkg/task/mock.go b/src/pkg/task/mock.go index 0a113159c..856fd7fab 100644 --- a/src/pkg/task/mock.go +++ b/src/pkg/task/mock.go @@ -18,3 +18,4 @@ package task //go:generate mockery --dir ./dao --name ExecutionDAO --output . --outpkg task --filename mock_execution_dao_test.go --structname mockExecutionDAO //go:generate mockery --name Manager --output . --outpkg task --filename mock_task_manager_test.go --structname mockTaskManager --inpackage //go:generate mockery --dir ../../common/job --name Client --output . --outpkg task --filename mock_jobservice_client_test.go --structname mockJobserviceClient +//go:generate mockery --name SweepManager --output . --outpkg task --filename mock_sweep_manager_test.go --structname mockSweepManager --inpackage diff --git a/src/pkg/task/mock_sweep_manager_test.go b/src/pkg/task/mock_sweep_manager_test.go new file mode 100644 index 000000000..e3c84850d --- /dev/null +++ b/src/pkg/task/mock_sweep_manager_test.go @@ -0,0 +1,69 @@ +// Code generated by mockery v2.22.1. DO NOT EDIT. + +package task + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// mockSweepManager is an autogenerated mock type for the SweepManager type +type mockSweepManager struct { + mock.Mock +} + +// Clean provides a mock function with given fields: ctx, execID +func (_m *mockSweepManager) Clean(ctx context.Context, execID []int64) error { + ret := _m.Called(ctx, execID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, execID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ListCandidates provides a mock function with given fields: ctx, vendorType, retainCnt +func (_m *mockSweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { + ret := _m.Called(ctx, vendorType, retainCnt) + + var r0 []int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) ([]int64, error)); ok { + return rf(ctx, vendorType, retainCnt) + } + if rf, ok := ret.Get(0).(func(context.Context, string, int64) []int64); ok { + r0 = rf(ctx, vendorType, retainCnt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, int64) error); ok { + r1 = rf(ctx, vendorType, retainCnt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTnewMockSweepManager interface { + mock.TestingT + Cleanup(func()) +} + +// newMockSweepManager creates a new instance of mockSweepManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockSweepManager(t mockConstructorTestingTnewMockSweepManager) *mockSweepManager { + mock := &mockSweepManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/pkg/task/sweep_job.go b/src/pkg/task/sweep_job.go new file mode 100644 index 000000000..9bf3a1b94 --- /dev/null +++ b/src/pkg/task/sweep_job.go @@ -0,0 +1,187 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "encoding/json" + "os" + "strconv" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/lib/errors" +) + +func init() { + // the default batch size is 65535 + sweepBatchSize = 65535 + envBatchSize := os.Getenv("EXECUTION_SWEEP_BATCH_SIZE") + if len(envBatchSize) > 0 { + batchSize, err := strconv.Atoi(envBatchSize) + if err != nil { + logger.Errorf("failed to parse the batch size from env, value: %s, error: %v", envBatchSize, err) + } else { + if batchSize <= 0 || batchSize > 65535 { + logger.Errorf("invalid batch size %d for sweep job, should be positive and not over than 65535", batchSize) + } else { + // override the batch size if provided is valid + sweepBatchSize = batchSize + } + } + } +} + +var ( + // notice that the batch size should not over than 65535 as length limitation of postgres parameters + sweepBatchSize int + errStop = errors.New("stopped") +) + +const ( + // ExecRetainCounts is the params key of execution retain count + ExecRetainCounts = "execution_retain_counts" +) + +// SweepJob used to cleanup the executions and tasks for different vendors. +type SweepJob struct { + execRetainCountsMap map[string]int64 + logger logger.Interface + mgr SweepManager +} + +// MaxFails of sweep job. Don't need to retry. +func (sj *SweepJob) MaxFails() uint { + return 1 +} + +// MaxCurrency limit 1 concurrency of sweep job. +func (sj *SweepJob) MaxCurrency() uint { + return 1 +} + +// ShouldRetry indicates no need to retry sweep job. +func (sj *SweepJob) ShouldRetry() bool { + return false +} + +// Validate the parameters of preheat job. +func (sj *SweepJob) Validate(params job.Parameters) error { + return nil +} + +// Run the sweep process. +func (sj *SweepJob) Run(ctx job.Context, params job.Parameters) error { + if err := sj.init(ctx, params); err != nil { + return err + } + + sj.logger.Info("start to run sweep job") + + var errs errors.Errors + for vendor, cnt := range sj.execRetainCountsMap { + if sj.shouldStop(ctx) { + sj.logger.Info("received the stop signal, quit sweep job") + return nil + } + + if err := sj.sweep(ctx, vendor, cnt); err != nil { + if err == errStop { + sj.logger.Info("received the stop signal, quit sweep job") + return nil + } + + sj.logger.Errorf("[%s] failed to run sweep, error: %v", vendor, err) + errs = append(errs, err) + } + } + + sj.logger.Info("end to run sweep job") + + if len(errs) > 0 { + return errs + } + + return nil +} + +func (sj *SweepJob) init(ctx job.Context, params job.Parameters) error { + if sj.mgr == nil { + // use global manager if no sweep manager found + sj.mgr = SweepMgr + } + sj.logger = ctx.GetLogger() + sj.parseParams(params) + return nil +} + +func (sj *SweepJob) parseParams(params job.Parameters) { + sj.execRetainCountsMap = make(map[string]int64) + execRetainCounts, err := json.Marshal(params[ExecRetainCounts]) + if err != nil { + sj.logger.Errorf("failed to marshal params %+v, error: %v", params[ExecRetainCounts], err) + return + } + + if err = json.Unmarshal(execRetainCounts, &sj.execRetainCountsMap); err != nil { + sj.logger.Errorf("failed to unmarshal params %s, error: %v", string(execRetainCounts), err) + return + } +} + +// sweep cleanup the executions/tasks by vendor type and retain count. +func (sj *SweepJob) sweep(ctx job.Context, vendorType string, retainCount int64) error { + sj.logger.Infof("[%s] start to sweep, retain latest %d executions", vendorType, retainCount) + + start := time.Now() + candidates, err := sj.mgr.ListCandidates(ctx.SystemContext(), vendorType, retainCount) + if err != nil { + sj.logger.Errorf("[%s] failed to list candidates, error: %v", vendorType, err) + return err + } + + total := len(candidates) + sj.logger.Infof("[%s] listed %d candidate executions for sweep", vendorType, total) + // batch clean the executions and tasks + for i := 0; i < total; i += sweepBatchSize { + // checkpoint + if sj.shouldStop(ctx) { + return errStop + } + // calculate the batch position + j := i + sweepBatchSize + // avoid overflow + if j > total { + j = total + } + + if err = sj.mgr.Clean(ctx.SystemContext(), candidates[i:j]); err != nil { + sj.logger.Errorf("[%s] failed to batch clean candidates, error: %v", vendorType, err) + return err + } + } + + sj.logger.Infof("[%s] end to sweep, %d executions were deleted in total, elapsed time: %v", vendorType, total, time.Since(start)) + + return nil +} + +func (sj *SweepJob) shouldStop(ctx job.Context) bool { + opCmd, exit := ctx.OPCommand() + if exit && opCmd.IsStop() { + return true + } + return false +} diff --git a/src/pkg/task/sweep_job_test.go b/src/pkg/task/sweep_job_test.go new file mode 100644 index 000000000..1f471a321 --- /dev/null +++ b/src/pkg/task/sweep_job_test.go @@ -0,0 +1,71 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "testing" + + "github.com/goharbor/harbor/src/jobservice/job" + mockjobservice "github.com/goharbor/harbor/src/testing/jobservice" + "github.com/stretchr/testify/suite" +) + +type sweepJobTestSuite struct { + suite.Suite + jobCtx *mockjobservice.MockJobContext + sweepMgr *mockSweepManager +} + +func (suite *sweepJobTestSuite) SetupSuite() { + suite.jobCtx = &mockjobservice.MockJobContext{} + suite.sweepMgr = &mockSweepManager{} +} + +func TestSweepJob(t *testing.T) { + suite.Run(t, &sweepJobTestSuite{}) +} + +func (suite *sweepJobTestSuite) TestRun() { + params := map[string]interface{}{ + "execution_retain_counts": map[string]int{ + "WEBHOOK": 10, + "REPLICATION": 20, + }, + } + // test stop case + j := &SweepJob{} + suite.jobCtx.On("OPCommand").Return(job.StopCommand, true).Once() + err := j.Run(suite.jobCtx, params) + suite.NoError(err, "stop job should not return error") + + // test sweep error case + j = &SweepJob{} + suite.jobCtx.On("OPCommand").Return(job.NilCommand, true) + err = j.Run(suite.jobCtx, params) + suite.Error(err, "should got error if sweep failed") + + // test normal case + j = &SweepJob{mgr: suite.sweepMgr} + ctx := context.TODO() + suite.jobCtx.On("OPCommand").Return(job.NilCommand, true) + suite.jobCtx.On("SystemContext").Return(ctx, nil) + suite.sweepMgr.On("ListCandidates", ctx, "WEBHOOK", int64(10)).Return([]int64{1}, nil) + suite.sweepMgr.On("ListCandidates", ctx, "REPLICATION", int64(20)).Return([]int64{2}, nil) + suite.sweepMgr.On("Clean", ctx, []int64{1}).Return(nil) + suite.sweepMgr.On("Clean", ctx, []int64{2}).Return(nil) + err = j.Run(suite.jobCtx, params) + suite.NoError(err) +} diff --git a/src/pkg/task/sweep_manager.go b/src/pkg/task/sweep_manager.go new file mode 100644 index 000000000..f697b6b2d --- /dev/null +++ b/src/pkg/task/sweep_manager.go @@ -0,0 +1,189 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + "time" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task/dao" +) + +var ( + // SweepMgr is a global sweep manager instance. + SweepMgr = NewSweepManager() + + timeFormat = "2006-01-02 15:04:05.999999999" + defaultPageSize = 100000 + finalStatusCode = 3 +) + +type SweepManager interface { + // ListCandidates lists the candidate execution ids which met the sweep criteria. + ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error) + // Clean deletes the tasks belonging to the execution which in final status and deletes executions. + Clean(ctx context.Context, execID []int64) (err error) +} + +// sweepManager implements the interface SweepManager. +type sweepManager struct { + execDAO dao.ExecutionDAO +} + +// listVendorIDs lists distinct vendor ids by vendor type. +func (sm *sweepManager) listVendorIDs(ctx context.Context, vendorType string) ([]int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + + var ids []int64 + if _, err = ormer.Raw(`SELECT DISTINCT vendor_id FROM execution WHERE vendor_type = ?`, vendorType).QueryRows(&ids); err != nil { + return nil, err + } + + return ids, nil +} + +// getCandidateMaxStartTime returns the max start time for candidate executions, obtain the start time of the xth recent one. +func (sm *sweepManager) getCandidateMaxStartTime(ctx context.Context, vendorType string, vendorID, retainCnt int64) (*time.Time, error) { + query := &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": vendorType, + "VendorID": vendorID, + }, + Sorts: []*q.Sort{ + { + Key: "StartTime", + DESC: true, + }}, + PageSize: 1, + PageNumber: retainCnt, + } + executions, err := sm.execDAO.List(ctx, query) + if err != nil { + return nil, err + } + // list is null means that the execution count < retainCnt, return nil time + if len(executions) == 0 { + return nil, nil + } + + return &executions[0].StartTime, nil +} + +func (sm *sweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + + vendorIDs, err := sm.listVendorIDs(ctx, vendorType) + if err != nil { + return nil, errors.Wrapf(err, "failed to list vendor ids for vendor type %s", vendorType) + } + + // execIDs stores the result + var execIDs []int64 + for _, vendorID := range vendorIDs { + maxStartTime, err := sm.getCandidateMaxStartTime(ctx, vendorType, vendorID, retainCnt) + if err != nil { + return nil, errors.Wrapf(err, "failed to get candidate max start time, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + // continue if no max start time got that means no candidate executions + if maxStartTime == nil { + continue + } + // candidate criteria + // 1. exact vendor type & vendor id + // 2. start_time is before the max start time + // 3. status is the final state + // count the records for pagination + sql := `SELECT COUNT(1) FROM execution WHERE vendor_type = ? AND vendor_id = ? AND start_time < ? AND status IN (?,?,?)` + totalOfCandidate := 0 + params := []interface{}{ + vendorType, + vendorID, + maxStartTime.Format(timeFormat), + // final status should in Error/Success/Stopped + job.ErrorStatus.String(), + job.SuccessStatus.String(), + job.StoppedStatus.String(), + } + if err = ormer.Raw(sql, params...).QueryRow(&totalOfCandidate); err != nil { + return nil, errors.Wrapf(err, "failed to count candidates, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + // n is the page count of all candidates + n := totalOfCandidate / defaultPageSize + if totalOfCandidate%defaultPageSize > 0 { + n = n + 1 + } + + sql = `SELECT id FROM execution WHERE vendor_type = ? AND vendor_id = ? AND start_time < ? AND status IN (?,?,?)` + // default page size is 100000 + q2 := &q.Query{PageSize: int64(defaultPageSize)} + for i := n; i >= 1; i-- { + q2.PageNumber = int64(i) + // should copy params as pagination will append the slice + paginationParams := make([]interface{}, len(params)) + copy(paginationParams, params) + paginationSQL, paginationParams := orm.PaginationOnRawSQL(q2, sql, paginationParams) + ids := make([]int64, 0, defaultPageSize) + if _, err = ormer.Raw(paginationSQL, paginationParams...).QueryRows(&ids); err != nil { + return nil, errors.Wrapf(err, "failed to list candidate execution ids, vendor type: %s, vendor id: %d", vendorType, vendorID) + } + execIDs = append(execIDs, ids...) + } + } + + return execIDs, nil +} + +func (sm *sweepManager) Clean(ctx context.Context, execIDs []int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + // construct sql params + params := make([]interface{}, 0, len(execIDs)) + for _, eid := range execIDs { + params = append(params, eid) + } + // delete tasks + sql := fmt.Sprintf("DELETE FROM task WHERE status_code = %d AND execution_id IN (%s)", finalStatusCode, orm.ParamPlaceholderForIn(len(params))) + _, err = ormer.Raw(sql, params...).Exec() + if err != nil { + return errors.Wrap(err, "failed to delete tasks") + } + // delete executions + sql = fmt.Sprintf("DELETE FROM execution WHERE id IN (%s)", orm.ParamPlaceholderForIn(len(params))) + _, err = ormer.Raw(sql, params...).Exec() + if err != nil { + return errors.Wrap(err, "failed to delete executions") + } + + return nil +} + +func NewSweepManager() SweepManager { + return &sweepManager{ + execDAO: dao.NewExecutionDAO(), + } +} diff --git a/src/pkg/task/sweep_manager_test.go b/src/pkg/task/sweep_manager_test.go new file mode 100644 index 000000000..172404ecd --- /dev/null +++ b/src/pkg/task/sweep_manager_test.go @@ -0,0 +1,56 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "testing" + "time" + + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/pkg/task/dao" + "github.com/goharbor/harbor/src/testing/mock" + "github.com/stretchr/testify/suite" +) + +type sweepManagerTestSuite struct { + suite.Suite + execDao *mockExecutionDAO + mgr *sweepManager +} + +func TestSweepManager(t *testing.T) { + suite.Run(t, &sweepManagerTestSuite{}) +} + +func (suite *sweepManagerTestSuite) SetupSuite() { + suite.execDao = &mockExecutionDAO{} + suite.mgr = &sweepManager{execDAO: suite.execDao} +} + +func (suite *sweepManagerTestSuite) TestGetCandidateMaxStartTime() { + // test error case + suite.execDao.On("List", mock.Anything, mock.Anything).Return(nil, errors.New("failed to list executions")).Once() + startTime, err := suite.mgr.getCandidateMaxStartTime(context.TODO(), "WEBHOOK", 1, 10) + suite.Error(err, "should got error") + suite.Nil(startTime) + // test normal case + now := time.Now() + execs := []*dao.Execution{{ID: 1, StartTime: now}} + suite.execDao.On("List", mock.Anything, mock.Anything).Return(execs, nil) + startTime, err = suite.mgr.getCandidateMaxStartTime(context.TODO(), "WEBHOOK", 1, 10) + suite.NoError(err, "should not got error") + suite.Equal(now.String(), startTime.String()) +} diff --git a/src/server/v2.0/handler/scan_all.go b/src/server/v2.0/handler/scan_all.go index 23bc77ce3..1359fe8de 100644 --- a/src/server/v2.0/handler/scan_all.go +++ b/src/server/v2.0/handler/scan_all.go @@ -206,11 +206,11 @@ func (s *scanAllAPI) createOrUpdateScanAllSchedule(ctx context.Context, cronType } } - return s.scheduler.Schedule(ctx, scan.VendorTypeScanAll, 0, cronType, cron, scan.ScanAllCallback, nil, nil) + return s.scheduler.Schedule(ctx, job.ScanAllVendorType, 0, cronType, cron, scan.ScanAllCallback, nil, nil) } func (s *scanAllAPI) getScanAllSchedule(ctx context.Context) (*scheduler.Schedule, error) { - query := q.New(q.KeyWords{"vendor_type": scan.VendorTypeScanAll}) + query := q.New(q.KeyWords{"vendor_type": job.ScanAllVendorType}) schedules, err := s.scheduler.ListSchedules(ctx, query.First(q.NewSort("creation_time", true))) if err != nil { return nil, err @@ -265,7 +265,7 @@ func (s *scanAllAPI) getMetrics(ctx context.Context, trigger ...string) (*models } func (s *scanAllAPI) getLatestScanAllExecution(ctx context.Context, trigger ...string) (*task.Execution, error) { - query := q.New(q.KeyWords{"vendor_type": scan.VendorTypeScanAll}) + query := q.New(q.KeyWords{"vendor_type": job.ScanAllVendorType}) if len(trigger) > 0 { query.Keywords["trigger"] = trigger[0] } diff --git a/tests/apitests/python/test_job_service_dashboard.py b/tests/apitests/python/test_job_service_dashboard.py index ec18f4002..6fdc8460e 100644 --- a/tests/apitests/python/test_job_service_dashboard.py +++ b/tests/apitests/python/test_job_service_dashboard.py @@ -33,7 +33,7 @@ class TestJobServiceDashboard(unittest.TestCase, object): self.registry = Registry() self.scan_all = ScanAll() self.schedule = Schedule() - self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT_LOG", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"] + self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT_LOG", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK", "EXECUTION_SWEEP"] self.cron_type = "Custom" self.cron = "0 0 0 * * 0"