From d6288a43e8b1efd1835285abe1e62665c62d602c Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 28 Jul 2020 13:31:51 +0800 Subject: [PATCH] Do some refine for the scheduler 1. Accept vendorType and vendorID when creating the schedule 2. Provide more methods in the scheduler interface to reduce the duplicated works of callers 3. Use a new ormer and transaction when creating the schedule Signed-off-by: Wenkai Yin --- .../postgresql/0040_2.1.0_schema.up.sql | 14 +- src/controller/p2p/preheat/controller.go | 24 +- src/controller/p2p/preheat/controllor_test.go | 6 +- src/pkg/p2p/preheat/models/policy/policy.go | 3 +- src/pkg/retention/controller.go | 44 ++-- src/pkg/retention/controller_test.go | 11 +- src/pkg/retention/manager.go | 5 - src/pkg/retention/policy/models.go | 6 - src/pkg/scheduler/callback.go | 20 +- src/pkg/scheduler/dao.go | 7 +- src/pkg/scheduler/dao_test.go | 25 +-- src/pkg/scheduler/scheduler.go | 205 ++++++++++++++---- src/pkg/scheduler/scheduler_test.go | 139 ++++++++---- src/testing/pkg/scheduler/scheduler.go | 63 +++++- 14 files changed, 386 insertions(+), 186 deletions(-) diff --git a/make/migrations/postgresql/0040_2.1.0_schema.up.sql b/make/migrations/postgresql/0040_2.1.0_schema.up.sql index 6e9a93481..698cc3f7c 100644 --- a/make/migrations/postgresql/0040_2.1.0_schema.up.sql +++ b/make/migrations/postgresql/0040_2.1.0_schema.up.sql @@ -68,15 +68,16 @@ CREATE TABLE IF NOT EXISTS p2p_preheat_policy ( UNIQUE (name, project_id) ); +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_type varchar(16); +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_id int; ALTER TABLE schedule ADD COLUMN IF NOT EXISTS cron varchar(64); -ALTER TABLE schedule ADD COLUMN IF NOT EXISTS execution_id int; ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_name varchar(128); ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_param text; /*abstract the cron, callback function parameters from table retention_policy*/ UPDATE schedule -SET cron = retention.cron, callback_func_name = 'RetentionCallback', - callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}') +SET vendor_type= 'RETENTION', vendor_id=retention.id, cron = retention.cron, + callback_func_name = 'RETENTION', callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}') FROM ( SELECT id, data::json->'trigger'->'references'->>'job_id' AS schedule_id, data::json->'trigger'->'settings'->>'cron' AS cron @@ -93,7 +94,7 @@ DECLARE BEGIN FOR sched IN SELECT * FROM schedule LOOP - INSERT INTO execution (vendor_type, trigger) VALUES ('SCHEDULER', 'MANUAL') RETURNING id INTO exec_id; + INSERT INTO execution (vendor_type, vendor_id, trigger) VALUES ('SCHEDULER', sched.id, 'MANUAL') RETURNING id INTO exec_id; IF sched.status = 'Pending' THEN status_code = 0; ELSIF sched.status = 'Scheduled' THEN @@ -106,11 +107,8 @@ BEGIN status_code = 0; END IF; INSERT INTO task (execution_id, job_id, status, status_code, status_revision, run_count) VALUES (exec_id, sched.job_id, sched.status, status_code, 0, 0); - UPDATE schedule SET execution_id=exec_id WHERE id = sched.id; END LOOP; END $$; ALTER TABLE schedule DROP COLUMN IF EXISTS job_id; -ALTER TABLE schedule DROP COLUMN IF EXISTS status; - -ALTER TABLE schedule ADD CONSTRAINT schedule_execution FOREIGN KEY (execution_id) REFERENCES execution(id); +ALTER TABLE schedule DROP COLUMN IF EXISTS status; \ No newline at end of file diff --git a/src/controller/p2p/preheat/controller.go b/src/controller/p2p/preheat/controller.go index 84cef2ac5..bac6ce927 100644 --- a/src/controller/p2p/preheat/controller.go +++ b/src/controller/p2p/preheat/controller.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/p2p/preheat/instance" @@ -238,8 +239,8 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(schema.Trigger.Settings.Cron) > 0 { // schedule and update policy - schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id}) - if err != nil { + if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, schema.Trigger.Settings.Cron, + SchedulerCallback, TriggerParam{PolicyID: id}); err != nil { return 0, err } @@ -248,7 +249,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche } if err != nil { - if e := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID); e != nil { + if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil { return 0, errors.Wrap(e, err.Error()) } @@ -293,12 +294,12 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche } var cron = schema.Trigger.Settings.Cron - var oldJobID = s0.Trigger.Settings.JobID + var oldCron = s0.Trigger.Settings.Cron var needUn bool var needSch bool if s0.Trigger.Type != schema.Trigger.Type { - if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 { + if s0.Trigger.Type == policyModels.TriggerTypeScheduled && len(oldCron) > 0 { needUn = true } if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 { @@ -309,7 +310,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche if schema.Trigger.Type == policyModels.TriggerTypeScheduled && s0.Trigger.Settings.Cron != cron { // unschedule old - if oldJobID > 0 { + if len(oldCron) > 0 { needUn = true } // schedule new @@ -323,7 +324,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche // unschedule old if needUn { - err = c.scheduler.UnSchedule(ctx, oldJobID) + err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID) if err != nil { return err } @@ -331,11 +332,10 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche // schedule new if needSch { - jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID}) - if err != nil { + if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, cron, SchedulerCallback, + TriggerParam{PolicyID: schema.ID}); err != nil { return err } - schema.Trigger.Settings.JobID = jobid if err := schema.Encode(); err != nil { // Possible // TODO: Refactor @@ -355,8 +355,8 @@ func (c *controller) DeletePolicy(ctx context.Context, id int64) error { if err != nil { return err } - if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 { - err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID) + if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 { + err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id) if err != nil { return err } diff --git a/src/controller/p2p/preheat/controllor_test.go b/src/controller/p2p/preheat/controllor_test.go index 8751842eb..466c9fdf2 100644 --- a/src/controller/p2p/preheat/controllor_test.go +++ b/src/controller/p2p/preheat/controllor_test.go @@ -204,10 +204,10 @@ func (s *preheatSuite) TestCreatePolicy() { FiltersStr: `[{"type":"repository","value":"harbor*"},{"type":"tag","value":"2*"}]`, TriggerStr: fmt.Sprintf(`{"type":"%s", "trigger_setting":{"cron":"* * * * */1"}}`, policy.TriggerTypeScheduled), } - s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil) s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil) - s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil) + s.fakeScheduler.On("UnScheduleByVendor", s.ctx, mock.Anything, mock.Anything).Return(nil) id, err := s.controller.CreatePolicy(s.ctx, policy) s.NoError(err) s.Equal(int64(1), id) @@ -231,7 +231,6 @@ func (s *preheatSuite) TestGetPolicyByName() { func (s *preheatSuite) TestUpdatePolicy() { var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} - p0.Trigger.Settings.JobID = 1 p0.Trigger.Settings.Cron = "* * * * */1" p0.Filters = []*policy.Filter{ { @@ -272,7 +271,6 @@ func (s *preheatSuite) TestUpdatePolicy() { func (s *preheatSuite) TestDeletePolicy() { var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} - p0.Trigger.Settings.JobID = 1 s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil) s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil) err := s.controller.DeletePolicy(s.ctx, 1) diff --git a/src/pkg/p2p/preheat/models/policy/policy.go b/src/pkg/p2p/preheat/models/policy/policy.go index 72d0bcfe7..19964f82e 100644 --- a/src/pkg/p2p/preheat/models/policy/policy.go +++ b/src/pkg/p2p/preheat/models/policy/policy.go @@ -99,8 +99,7 @@ type Trigger struct { Type TriggerType `json:"type"` Settings struct { // The cron string for scheduled trigger. - Cron string `json:"cron,omitempty"` - JobID int64 `json:"job_id,omitempty"` + Cron string `json:"cron,omitempty"` } `json:"trigger_setting,omitempty"` } diff --git a/src/pkg/retention/controller.go b/src/pkg/retention/controller.go index c725e18f7..16adf2248 100644 --- a/src/pkg/retention/controller.go +++ b/src/pkg/retention/controller.go @@ -66,7 +66,8 @@ type DefaultAPIController struct { const ( // SchedulerCallback ... - SchedulerCallback = "RetentionCallback" + SchedulerCallback = "RETENTION" + schedulerVendorType = "RETENTION" ) // TriggerParam ... @@ -82,26 +83,23 @@ func (r *DefaultAPIController) GetRetention(id int64) (*policy.Metadata, error) // CreateRetention Create Retention func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) (int64, error) { - if p.Trigger.Kind == policy.TriggerKindSchedule { - cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron] - if ok && len(cron.(string)) > 0 { - jobid, err := r.scheduler.Schedule(orm.Context(), cron.(string), SchedulerCallback, TriggerParam{ - PolicyID: p.ID, - Trigger: ExecutionTriggerSchedule, - }) - if err != nil { - return 0, err - } - if p.Trigger.References == nil { - p.Trigger.References = map[string]interface{}{} - } - p.Trigger.References[policy.TriggerReferencesJobid] = jobid - } - } id, err := r.manager.CreatePolicy(p) if err != nil { return 0, err } + + if p.Trigger.Kind == policy.TriggerKindSchedule { + cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron] + if ok && len(cron.(string)) > 0 { + if _, err = r.scheduler.Schedule(orm.Context(), schedulerVendorType, id, cron.(string), SchedulerCallback, TriggerParam{ + PolicyID: id, + Trigger: ExecutionTriggerSchedule, + }); err != nil { + return 0, err + } + } + } + return id, nil } @@ -142,24 +140,26 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error { return fmt.Errorf("not support Trigger %s", p.Trigger.Kind) } } + if err = r.manager.UpdatePolicy(p); err != nil { + return err + } if needUn { - err = r.scheduler.UnSchedule(orm.Context(), p0.Trigger.References[policy.TriggerReferencesJobid].(int64)) + err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, p.ID) if err != nil { return err } } if needSch { - jobid, err := r.scheduler.Schedule(orm.Context(), p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{ + _, err := r.scheduler.Schedule(orm.Context(), schedulerVendorType, p.ID, p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{ PolicyID: p.ID, Trigger: ExecutionTriggerSchedule, }) if err != nil { return err } - p.Trigger.References[policy.TriggerReferencesJobid] = jobid } - return r.manager.UpdatePolicy(p) + return nil } // DeleteRetention Delete Retention @@ -169,7 +169,7 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error { return err } if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 { - err = r.scheduler.UnSchedule(orm.Context(), p.Trigger.References[policy.TriggerReferencesJobid].(int64)) + err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, id) if err != nil { return err } diff --git a/src/pkg/retention/controller_test.go b/src/pkg/retention/controller_test.go index beefe1a98..d849976ad 100644 --- a/src/pkg/retention/controller_test.go +++ b/src/pkg/retention/controller_test.go @@ -19,6 +19,7 @@ import ( "strings" "testing" + "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/retention/dep" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/rule" @@ -219,17 +220,23 @@ func (s *ControllerTestSuite) TestExecution() { type fakeRetentionScheduler struct { } -func (f *fakeRetentionScheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { +func (f *fakeRetentionScheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) { return 111, nil } -func (f *fakeRetentionScheduler) UnSchedule(ctx context.Context, id int64) error { +func (f *fakeRetentionScheduler) UnScheduleByID(ctx context.Context, id int64) error { + return nil +} +func (f *fakeRetentionScheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error { return nil } func (f *fakeRetentionScheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) { return nil, nil } +func (f *fakeRetentionScheduler) ListSchedules(ctx context.Context, q *q.Query) ([]*scheduler.Schedule, error) { + return nil, nil +} type fakeLauncher struct { } diff --git a/src/pkg/retention/manager.go b/src/pkg/retention/manager.go index c123491cc..c20d9d99c 100644 --- a/src/pkg/retention/manager.go +++ b/src/pkg/retention/manager.go @@ -120,11 +120,6 @@ func (d *DefaultManager) GetPolicy(id int64) (*policy.Metadata, error) { return nil, err } p.ID = id - if p.Trigger.Settings != nil { - if _, ok := p.Trigger.References[policy.TriggerReferencesJobid]; ok { - p.Trigger.References[policy.TriggerReferencesJobid] = int64(p.Trigger.References[policy.TriggerReferencesJobid].(float64)) - } - } return p, nil } diff --git a/src/pkg/retention/policy/models.go b/src/pkg/retention/policy/models.go index 3b083b769..da319bf0c 100644 --- a/src/pkg/retention/policy/models.go +++ b/src/pkg/retention/policy/models.go @@ -28,8 +28,6 @@ const ( // TriggerKindSchedule Schedule TriggerKindSchedule = "Schedule" - // TriggerReferencesJobid job_id - TriggerReferencesJobid = "job_id" // TriggerSettingsCron cron TriggerSettingsCron = "cron" @@ -97,10 +95,6 @@ type Trigger struct { // Settings for the specified trigger // '[cron]="* 22 11 * * *"' for the 'Schedule' Settings map[string]interface{} `json:"settings" valid:"Required"` - - // References of the trigger - // e.g: schedule job ID - References map[string]interface{} `json:"references"` } // Scope definition diff --git a/src/pkg/scheduler/callback.go b/src/pkg/scheduler/callback.go index 1eedaa233..af2d19ae0 100644 --- a/src/pkg/scheduler/callback.go +++ b/src/pkg/scheduler/callback.go @@ -21,7 +21,6 @@ import ( "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" - "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task" ) @@ -70,20 +69,21 @@ func callbackFuncExist(name string) bool { } func triggerCallback(ctx context.Context, task *task.Task, change *job.StatusChange) (err error) { - schedules, err := Sched.(*scheduler).dao.List(ctx, &q.Query{ - Keywords: map[string]interface{}{ - "ExecutionID": task.ExecutionID, - }, - }) + execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID) if err != nil { return err } - if len(schedules) == 0 { - return fmt.Errorf("the schedule whose execution ID is %d not found", task.ExecutionID) + if execution.VendorType != JobNameScheduler { + return fmt.Errorf("the vendor type of execution %d isn't %s: %s", + task.ExecutionID, JobNameScheduler, execution.VendorType) } - callbackFunc, err := getCallbackFunc(schedules[0].CallbackFuncName) + schedule, err := Sched.(*scheduler).dao.Get(ctx, execution.VendorID) if err != nil { return err } - return callbackFunc(schedules[0].CallbackFuncParam) + callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName) + if err != nil { + return err + } + return callbackFunc(schedule.CallbackFuncParam) } diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index 87256ca34..4227171c5 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -30,8 +30,9 @@ func init() { type schedule struct { ID int64 `orm:"pk;auto;column(id)"` + VendorType string `orm:"column(vendor_type)"` + VendorID int64 `orm:"column(vendor_id)"` CRON string `orm:"column(cron)"` - ExecutionID int64 `orm:"column(execution_id)"` CallbackFuncName string `orm:"column(callback_func_name)"` CallbackFuncParam string `orm:"column(callback_func_param)"` CreationTime time.Time `orm:"column(creation_time)"` @@ -56,10 +57,6 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) { } id, err := ormer.Insert(schedule) if err != nil { - if e := orm.AsForeignKeyError(err, - "the schedule tries to reference a non existing execution %d", schedule.ExecutionID); e != nil { - err = e - } return 0, err } return id, nil diff --git a/src/pkg/scheduler/dao_test.go b/src/pkg/scheduler/dao_test.go index c7d411db6..192b8917e 100644 --- a/src/pkg/scheduler/dao_test.go +++ b/src/pkg/scheduler/dao_test.go @@ -22,33 +22,27 @@ import ( "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" - "github.com/goharbor/harbor/src/pkg/task" "github.com/stretchr/testify/suite" ) type daoTestSuite struct { suite.Suite - dao DAO - execMgr task.ExecutionManager - ctx context.Context - id int64 - execID int64 + dao DAO + ctx context.Context + id int64 } func (d *daoTestSuite) SetupSuite() { d.dao = &dao{} - d.execMgr = task.NewExecutionManager() common_dao.PrepareTestForPostgresSQL() d.ctx = orm.Context() } func (d *daoTestSuite) SetupTest() { - execID, err := d.execMgr.Create(d.ctx, "vendor", 0, "trigger") - d.Require().Nil(err) - d.execID = execID schedule := &schedule{ + VendorType: "Vendor", + VendorID: 1, CRON: "0 * * * * *", - ExecutionID: execID, CallbackFuncName: "callback_func_01", CallbackFuncParam: "callback_func_params", } @@ -59,19 +53,10 @@ func (d *daoTestSuite) SetupTest() { func (d *daoTestSuite) TearDownTest() { d.Require().Nil(d.dao.Delete(d.ctx, d.id)) - d.Require().Nil(d.execMgr.Delete(d.ctx, d.execID)) } func (d *daoTestSuite) TestCreate() { // the happy pass is covered in SetupTest - - // foreign key error - _, err := d.dao.Create(d.ctx, &schedule{ - CRON: "0 * * * * *", - ExecutionID: 10000, - CallbackFuncName: "callback_func", - }) - d.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode)) } func (d *daoTestSuite) TestList() { diff --git a/src/pkg/scheduler/scheduler.go b/src/pkg/scheduler/scheduler.go index 67d6791a1..8e9fd082c 100644 --- a/src/pkg/scheduler/scheduler.go +++ b/src/pkg/scheduler/scheduler.go @@ -20,9 +20,11 @@ import ( "fmt" "time" + beegoorm "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/task" cronlib "github.com/robfig/cron" @@ -36,6 +38,8 @@ var ( // Schedule describes the detail information about the created schedule type Schedule struct { ID int64 `json:"id"` + VendorType string `json:"vendor_type"` + VendorID int64 `json:"vendor_id"` CRON string `json:"cron"` Status string `json:"status"` // status of the underlying task(jobservice job) CreationTime time.Time `json:"creation_time"` @@ -49,13 +53,20 @@ type Schedule struct { type Scheduler interface { // Schedule creates a task which calls the specified callback function periodically // The callback function needs to be registered first + // The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.), + // and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention). // The "params" is passed to the callback function as encoded json string, so the callback // function must decode it before using - Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) - // UnSchedule the created schedule instance - UnSchedule(ctx context.Context, id int64) error + Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, + callbackFuncName string, params interface{}) (int64, error) + // UnScheduleByID the schedule specified by ID + UnScheduleByID(ctx context.Context, id int64) error + // UnScheduleByVendor the schedule specified by vendor + UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error // GetSchedule gets the schedule specified by ID GetSchedule(ctx context.Context, id int64) (*Schedule, error) + // ListSchedules according to the query + ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) } // New returns an instance of the default scheduler @@ -73,7 +84,38 @@ type scheduler struct { taskMgr task.Manager } -func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { +// Currently all database operations inside one request handling are covered by +// one transaction, which means if any one of the operations fails, all of them +// will be roll back. As the scheduler creates jobservice jobs that cannot be +// roll back by the transaction, this will cause some unexpected data inconsistence +// in some cases. +// The implementation of "Schedule" replaces the ormer with a new one in the context +// to out of control from the global transaction, and uses a new transaction that only +// covers the logic inside the function +func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, + callbackFuncName string, params interface{}) (int64, error) { + var scheduleID int64 + f := func(ctx context.Context) error { + id, err := s.schedule(ctx, vendorType, vendorID, cron, callbackFuncName, params) + if err != nil { + return err + } + scheduleID = id + return nil + } + + ctx = orm.NewContext(ctx, beegoorm.NewOrm()) + if err := orm.WithTransaction(f)(ctx); err != nil { + return 0, err + } + return scheduleID, nil +} + +func (s *scheduler) schedule(ctx context.Context, vendorType string, vendorID int64, cron string, + callbackFuncName string, params interface{}) (int64, error) { + if len(vendorType) == 0 { + return 0, fmt.Errorf("empty vendor type") + } if _, err := cronlib.Parse(cron); err != nil { return 0, errors.New(nil).WithCode(errors.BadRequestCode). WithMessage("invalid cron %s: %v", cron, err) @@ -82,15 +124,11 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName return 0, fmt.Errorf("callback function %s not found", callbackFuncName) } - execID, err := s.execMgr.Create(ctx, JobNameScheduler, 0, task.ExecutionTriggerManual) - if err != nil { - return 0, err - } - now := time.Now() sched := &schedule{ + VendorType: vendorType, + VendorID: vendorID, CRON: cron, - ExecutionID: execID, CallbackFuncName: callbackFuncName, CreationTime: now, UpdateTime: now, @@ -102,15 +140,19 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName } sched.CallbackFuncParam = string(paramsData) } - // create schedule record - // when status/checkin hook comes, the database record must exist, + // when checkin hook comes, the database record must exist, // so the database record must be created first before submitting job id, err := s.dao.Create(ctx, sched) if err != nil { return 0, err } + execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual) + if err != nil { + return 0, err + } + taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{ Name: JobNameScheduler, Metadata: &job.Metadata{ @@ -121,6 +163,15 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName if err != nil { return 0, err } + // make sure the created task is stopped if got any error in the following steps + defer func() { + if err == nil { + return + } + if err := s.taskMgr.Stop(ctx, taskID); err != nil { + log.Errorf("failed to stop the task %d: %v", taskID, err) + } + }() // when task manager creating a task, it creates the task database record first and // then submits the job to jobservice. If the submitting failed, it doesn't return // any error. So we check the task status to make sure the job is submitted to jobservice @@ -130,44 +181,76 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName return 0, err } if task.Status == job.ErrorStatus.String() { - return 0, fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String()) + // assign the error to "err" to trigger the defer function to clean up the created task + err = fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String()) + return 0, err } return id, nil } -func (s *scheduler) UnSchedule(ctx context.Context, id int64) error { - schedule, err := s.dao.Get(ctx, id) +func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error { + executions, err := s.execMgr.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": JobNameScheduler, + "VendorID": id, + }, + }) if err != nil { - if errors.IsNotFoundErr(err) { - log.Warningf("trying to unschedule a non existing schedule %d, skip directly", id) - return nil - } return err } - if err = s.execMgr.Stop(ctx, schedule.ExecutionID); err != nil { - return err - } - - // after the stop called, the execution cannot be stopped immediately, - // use the for loop to make sure the execution be in final status before deleting it - for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 { - exec, err := s.execMgr.Get(ctx, schedule.ExecutionID) - if err != nil { + if len(executions) > 0 { + executionID := executions[0].ID + if err = s.execMgr.Stop(ctx, executionID); err != nil { return err } - if job.Status(exec.Status).Final() { - // delete schedule record - if err = s.dao.Delete(ctx, id); err != nil { + final := false + // after the stop called, the execution cannot be stopped immediately, and the execution + // cannot be deleted if it's status isn't in final status, so use the for loop to make + // sure the execution be in final status before deleting it + for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 { + exec, err := s.execMgr.Get(ctx, executionID) + if err != nil { return err } - // delete execution - return s.execMgr.Delete(ctx, schedule.ExecutionID) + if job.Status(exec.Status).Final() { + final = true + break + } + time.Sleep(t) + } + if !final { + return fmt.Errorf("failed to unschedule the schedule %d: the execution %d isn't in final status", id, executionID) + } + // delete execution + if err = s.execMgr.Delete(ctx, executionID); err != nil { + return err } - time.Sleep(t) } - return fmt.Errorf("failed to unschedule the schedule %d: the execution isn't in final status", id) + // delete schedule record + return s.dao.Delete(ctx, id) +} + +func (s *scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error { + q := &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": vendorType, + }, + } + if vendorID > 0 { + q.Keywords["VendorID"] = vendorID + } + schedules, err := s.dao.List(ctx, q) + if err != nil { + return err + } + for _, schedule := range schedules { + if err = s.UnScheduleByID(ctx, schedule.ID); err != nil { + return err + } + } + return nil } func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) { @@ -175,17 +258,49 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error if err != nil { return nil, err } + return s.convertSchedule(ctx, schedule) +} + +func (s *scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) { + schedules, err := s.dao.List(ctx, query) + if err != nil { + return nil, err + } + var scheds []*Schedule + for _, schedule := range schedules { + sched, err := s.convertSchedule(ctx, schedule) + if err != nil { + return nil, err + } + scheds = append(scheds, sched) + } + return scheds, nil +} + +func (s *scheduler) convertSchedule(ctx context.Context, schedule *schedule) (*Schedule, error) { schd := &Schedule{ ID: schedule.ID, + VendorType: schedule.VendorType, + VendorID: schedule.VendorID, CRON: schedule.CRON, CreationTime: schedule.CreationTime, UpdateTime: schedule.UpdateTime, } - exec, err := s.execMgr.Get(ctx, schedule.ExecutionID) + executions, err := s.execMgr.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": JobNameScheduler, + "VendorID": schedule.ID, + }, + }) if err != nil { return nil, err } - schd.Status = exec.Status + if len(executions) == 0 { + // if no execution found for the schedule, mark it's status as error + schd.Status = job.ErrorStatus.String() + } else { + schd.Status = executions[0].Status + } return schd, nil } @@ -195,13 +310,23 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error // We can remove the method and the hook endpoint after several releases func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChange) error { scheduler := Sched.(*scheduler) - schedule, err := scheduler.dao.Get(ctx, scheduleID) + executions, err := scheduler.execMgr.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": JobNameScheduler, + "VendorID": scheduleID, + }, + }) if err != nil { return err } + if len(executions) == 0 { + return errors.New(nil).WithCode(errors.NotFoundCode). + WithMessage("no execution found for the schedule %d", scheduleID) + } + tasks, err := scheduler.taskMgr.List(ctx, &q.Query{ Keywords: map[string]interface{}{ - "ExecutionID": schedule.ExecutionID, + "ExecutionID": executions[0].ID, }, }) if err != nil { @@ -209,7 +334,7 @@ func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChang } if len(tasks) == 0 { return errors.New(nil).WithCode(errors.NotFoundCode). - WithMessage("no task references the execution %d", schedule.ExecutionID) + WithMessage("no task found for the execution %d", executions[0].ID) } return task.NewHookHandler().Handle(ctx, tasks[0].ID, sc) } diff --git a/src/pkg/scheduler/scheduler_test.go b/src/pkg/scheduler/scheduler_test.go index f2035b18a..a5d5913ae 100644 --- a/src/pkg/scheduler/scheduler_test.go +++ b/src/pkg/scheduler/scheduler_test.go @@ -16,7 +16,6 @@ package scheduler import ( "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/pkg/task" "github.com/goharbor/harbor/src/testing/mock" tasktesting "github.com/goharbor/harbor/src/testing/pkg/task" @@ -49,24 +48,29 @@ func (s *schedulerTestSuite) SetupTest() { } func (s *schedulerTestSuite) TestSchedule() { + // empty vendor type + id, err := s.scheduler.Schedule(nil, "", 0, "0 * * * * *", "callback", nil) + s.NotNil(err) + // invalid cron - id, err := s.scheduler.Schedule(nil, "", "callback", nil) + id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "callback", nil) s.NotNil(err) // callback function not exist - id, err = s.scheduler.Schedule(nil, "0 * * * * *", "not-exist", nil) + id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "not-exist", nil) s.NotNil(err) // failed to submit to jobservice - s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{ ID: 1, ExecutionID: 1, Status: job.ErrorStatus.String(), }, nil) - _, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param") + s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) + _, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param") s.Require().NotNil(err) s.dao.AssertExpectations(s.T()) s.execMgr.AssertExpectations(s.T()) @@ -84,7 +88,7 @@ func (s *schedulerTestSuite) TestSchedule() { ExecutionID: 1, Status: job.SuccessStatus.String(), }, nil) - id, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param") + id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param") s.Require().Nil(err) s.Equal(int64(1), id) s.dao.AssertExpectations(s.T()) @@ -92,29 +96,19 @@ func (s *schedulerTestSuite) TestSchedule() { s.taskMgr.AssertExpectations(s.T()) } -func (s *schedulerTestSuite) TestUnSchedule() { - // not existing schedule - s.dao.On("Get", mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil)) - err := s.scheduler.UnSchedule(nil, 10000) - s.Nil(err) - s.dao.AssertExpectations(s.T()) - - // reset mocks - s.SetupTest() - +func (s *schedulerTestSuite) TestUnScheduleByID() { // the underlying task isn't stopped - s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ - ID: 1, - CRON: "0 * * * * *", - ExecutionID: 1, - CallbackFuncName: "callback", + s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{ + { + ID: 1, + }, }, nil) s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ ID: 1, Status: job.RunningStatus.String(), }, nil) - err = s.scheduler.UnSchedule(nil, 1) + err := s.scheduler.UnScheduleByID(nil, 1) s.NotNil(err) s.dao.AssertExpectations(s.T()) s.execMgr.AssertExpectations(s.T()) @@ -123,11 +117,10 @@ func (s *schedulerTestSuite) TestUnSchedule() { s.SetupTest() // pass - s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ - ID: 1, - CRON: "0 * * * * *", - ExecutionID: 1, - CallbackFuncName: "callback", + s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{ + { + ID: 1, + }, }, nil) s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ @@ -136,26 +129,96 @@ func (s *schedulerTestSuite) TestUnSchedule() { }, nil) s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil) s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil) - err = s.scheduler.UnSchedule(nil, 1) + err = s.scheduler.UnScheduleByID(nil, 1) + s.Nil(err) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) +} + +func (s *schedulerTestSuite) TestUnScheduleByVendor() { + s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{ + { + ID: 1, + }, + }, nil) + s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{ + { + ID: 1, + }, + }, nil) + s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) + s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ + ID: 1, + Status: job.StoppedStatus.String(), + }, nil) + s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil) + s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil) + err := s.scheduler.UnScheduleByVendor(nil, "vendor", 1) s.Nil(err) s.dao.AssertExpectations(s.T()) s.execMgr.AssertExpectations(s.T()) } func (s *schedulerTestSuite) TestGetSchedule() { + // no execution for the schedule s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ - ID: 1, - CRON: "0 * * * * *", - ExecutionID: 1, + ID: 1, + VendorType: "vendor", + VendorID: 1, + CRON: "0 * * * * *", }, nil) - s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ - ID: 1, - Status: job.SuccessStatus.String(), - }, nil) - schedule, err := s.scheduler.GetSchedule(nil, 1) + s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil) + schd, err := s.scheduler.GetSchedule(nil, 1) s.Require().Nil(err) - s.Equal("0 * * * * *", schedule.CRON) - s.Equal(job.SuccessStatus.String(), schedule.Status) + s.Equal("0 * * * * *", schd.CRON) + s.Equal(job.ErrorStatus.String(), schd.Status) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) + + // reset mocks + s.SetupTest() + + // pass + s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ + ID: 1, + VendorType: "vendor", + VendorID: 1, + CRON: "0 * * * * *", + }, nil) + s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{ + { + ID: 1, + Status: job.SuccessStatus.String(), + }, + }, nil) + schd, err = s.scheduler.GetSchedule(nil, 1) + s.Require().Nil(err) + s.Equal("0 * * * * *", schd.CRON) + s.Equal(job.SuccessStatus.String(), schd.Status) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) +} + +func (s *schedulerTestSuite) TestListSchedules() { + s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{ + { + ID: 1, + VendorType: "vendor", + VendorID: 1, + CRON: "0 * * * * *", + }, + }, nil) + s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{ + { + ID: 1, + Status: job.SuccessStatus.String(), + }, + }, nil) + schds, err := s.scheduler.ListSchedules(nil, nil) + s.Require().Nil(err) + s.Require().Len(schds, 1) + s.Equal("0 * * * * *", schds[0].CRON) + s.Equal(job.SuccessStatus.String(), schds[0].Status) s.dao.AssertExpectations(s.T()) s.execMgr.AssertExpectations(s.T()) } diff --git a/src/testing/pkg/scheduler/scheduler.go b/src/testing/pkg/scheduler/scheduler.go index 3b4ae1523..7c836aa2b 100644 --- a/src/testing/pkg/scheduler/scheduler.go +++ b/src/testing/pkg/scheduler/scheduler.go @@ -5,8 +5,10 @@ package scheduler import ( context "context" - scheduler "github.com/goharbor/harbor/src/pkg/scheduler" + q "github.com/goharbor/harbor/src/lib/q" mock "github.com/stretchr/testify/mock" + + scheduler "github.com/goharbor/harbor/src/pkg/scheduler" ) // Scheduler is an autogenerated mock type for the Scheduler type @@ -37,20 +39,22 @@ func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Sche return r0, r1 } -// Schedule provides a mock function with given fields: ctx, cron, callbackFuncName, params -func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { - ret := _m.Called(ctx, cron, callbackFuncName, params) +// ListSchedules provides a mock function with given fields: ctx, query +func (_m *Scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) { + ret := _m.Called(ctx, query) - var r0 int64 - if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}) int64); ok { - r0 = rf(ctx, cron, callbackFuncName, params) + var r0 []*scheduler.Schedule + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*scheduler.Schedule); ok { + r0 = rf(ctx, query) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*scheduler.Schedule) + } } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}) error); ok { - r1 = rf(ctx, cron, callbackFuncName, params) + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) } else { r1 = ret.Error(1) } @@ -58,8 +62,29 @@ func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName return r0, r1 } -// UnSchedule provides a mock function with given fields: ctx, id -func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error { +// Schedule provides a mock function with given fields: ctx, vendorType, vendorID, cron, callbackFuncName, params +func (_m *Scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) { + ret := _m.Called(ctx, vendorType, vendorID, cron, callbackFuncName, params) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, string, int64, string, string, interface{}) int64); ok { + r0 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, int64, string, string, interface{}) error); ok { + r1 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UnScheduleByID provides a mock function with given fields: ctx, id +func (_m *Scheduler) UnScheduleByID(ctx context.Context, id int64) error { ret := _m.Called(ctx, id) var r0 error @@ -71,3 +96,17 @@ func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error { return r0 } + +// UnScheduleByVendor provides a mock function with given fields: ctx, vendorType, vendorID +func (_m *Scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error { + ret := _m.Called(ctx, vendorType, vendorID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok { + r0 = rf(ctx, vendorType, vendorID) + } else { + r0 = ret.Error(0) + } + + return r0 +}