From 4dc4b6728cebf30e3032a7a37f48817bf8c37d59 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 9 Jul 2020 10:53:32 +0800 Subject: [PATCH] Refactor the scheduler with the task manager mechanism Refactor the scheduler with the task manager mechanism, this will reduce the duplicate code Signed-off-by: Wenkai Yin --- .../postgresql/0040_2.1.0_schema.up.sql | 47 +++ src/core/api/base.go | 10 +- src/core/main.go | 4 - .../notifications/scheduler/handler.go | 46 +-- src/lib/orm/error.go | 2 +- src/pkg/retention/controller.go | 11 +- src/pkg/retention/controller_test.go | 10 +- src/pkg/scheduler/callback.go | 89 ++++++ src/pkg/scheduler/callback_test.go | 73 +++++ src/pkg/scheduler/dao.go | 128 ++++++++ src/pkg/scheduler/dao/schedule.go | 99 ------ src/pkg/scheduler/dao/schedule_test.go | 122 -------- src/pkg/scheduler/dao_test.go | 128 ++++++++ src/pkg/scheduler/hook/handler.go | 59 ---- src/pkg/scheduler/hook/handler_test.go | 55 ---- src/pkg/scheduler/manager.go | 66 ---- src/pkg/scheduler/manager_test.go | 110 ------- src/pkg/scheduler/mock.go | 17 ++ src/pkg/scheduler/mock_dao_test.go | 117 +++++++ src/pkg/scheduler/model/schedule.go | 40 --- src/pkg/scheduler/periodic_job.go | 8 +- src/pkg/scheduler/scheduler.go | 287 +++++++++--------- src/pkg/scheduler/scheduler_test.go | 188 +++++++----- src/pkg/task/checkin.go | 4 +- src/pkg/task/checkin_test.go | 6 +- src/pkg/task/dao/execution.go | 2 +- src/pkg/task/task.go | 12 +- src/pkg/task/task_test.go | 20 +- src/server/handler/job_status_hook.go | 7 + src/testing/pkg/scheduler/mock.go | 17 ++ src/testing/pkg/scheduler/scheduler.go | 110 ++++--- 31 files changed, 984 insertions(+), 910 deletions(-) create mode 100644 src/pkg/scheduler/callback.go create mode 100644 src/pkg/scheduler/callback_test.go create mode 100644 src/pkg/scheduler/dao.go delete mode 100644 src/pkg/scheduler/dao/schedule.go delete mode 100644 src/pkg/scheduler/dao/schedule_test.go create mode 100644 src/pkg/scheduler/dao_test.go delete mode 100644 src/pkg/scheduler/hook/handler.go delete mode 100644 src/pkg/scheduler/hook/handler_test.go delete mode 100644 src/pkg/scheduler/manager.go delete mode 100644 src/pkg/scheduler/manager_test.go create mode 100644 src/pkg/scheduler/mock.go create mode 100644 src/pkg/scheduler/mock_dao_test.go delete mode 100644 src/pkg/scheduler/model/schedule.go create mode 100644 src/testing/pkg/scheduler/mock.go diff --git a/make/migrations/postgresql/0040_2.1.0_schema.up.sql b/make/migrations/postgresql/0040_2.1.0_schema.up.sql index 3f2ecc380..eb0cc75a6 100644 --- a/make/migrations/postgresql/0040_2.1.0_schema.up.sql +++ b/make/migrations/postgresql/0040_2.1.0_schema.up.sql @@ -63,3 +63,50 @@ CREATE TABLE IF NOT EXISTS p2p_preheat_policy ( creation_time timestamp, update_time timestamp ); + +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS cron varchar(64); +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS execution_id int; +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_name varchar(128); +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_param text; + +/*abstract the cron, callback function parameters from table retention_policy*/ +UPDATE schedule +SET cron = retention.cron, callback_func_name = 'RetentionCallback', + callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}') +FROM ( + SELECT id, data::json->'trigger'->'references'->>'job_id' AS schedule_id, + data::json->'trigger'->'settings'->>'cron' AS cron + FROM retention_policy + ) AS retention +WHERE schedule.id=retention.schedule_id::int; + +/*create new execution and task record for each schedule*/ +DO $$ +DECLARE + sched RECORD; + exec_id integer; + status_code integer; +BEGIN + FOR sched IN SELECT * FROM schedule + LOOP + INSERT INTO execution (vendor_type, trigger) VALUES ('SCHEDULER', 'MANUAL') RETURNING id INTO exec_id; + IF sched.status = 'Pending' THEN + status_code = 0; + ELSIF sched.status = 'Scheduled' THEN + status_code = 1; + ELSIF sched.status = 'Running' THEN + status_code = 2; + ELSIF sched.status = 'Stopped' OR sched.status = 'Error' OR sched.status = 'Success' THEN + status_code = 3; + ELSE + status_code = 0; + END IF; + INSERT INTO task (execution_id, job_id, status, status_code, status_revision, run_count) VALUES (exec_id, sched.job_id, sched.status, status_code, 0, 0); + UPDATE schedule SET execution_id=exec_id WHERE id = sched.id; + END LOOP; +END $$; + +ALTER TABLE schedule DROP COLUMN IF EXISTS job_id; +ALTER TABLE schedule DROP COLUMN IF EXISTS status; + +ALTER TABLE schedule ADD CONSTRAINT schedule_execution FOREIGN KEY (execution_id) REFERENCES execution(id); diff --git a/src/core/api/base.go b/src/core/api/base.go index 29874c7df..94173aa61 100644 --- a/src/core/api/base.go +++ b/src/core/api/base.go @@ -184,13 +184,11 @@ func Init() error { // init project manager initProjectManager() - initRetentionScheduler() - retentionMgr = retention.NewManager() retentionLauncher = retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr) - retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, retentionScheduler, retentionLauncher) + retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher) callbackFun := func(p interface{}) error { str, ok := p.(string) @@ -204,7 +202,7 @@ func Init() error { _, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false) return err } - err := scheduler.Register(retention.SchedulerCallback, callbackFun) + err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, callbackFun) return err } @@ -227,7 +225,3 @@ func initChartController() error { func initProjectManager() { projectMgr = project.Mgr } - -func initRetentionScheduler() { - retentionScheduler = scheduler.GlobalScheduler -} diff --git a/src/core/main.go b/src/core/main.go index 07ff85ed0..eed6300d0 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -45,7 +45,6 @@ import ( _ "github.com/goharbor/harbor/src/pkg/notifier/topic" "github.com/goharbor/harbor/src/pkg/scan" "github.com/goharbor/harbor/src/pkg/scan/dao/scanner" - "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/version" "github.com/goharbor/harbor/src/replication" "github.com/goharbor/harbor/src/server" @@ -125,9 +124,6 @@ func main() { log.Fatalf("failed to load config: %v", err) } - // init the scheduler - scheduler.Init() - password, err := config.InitialAdminPassword() if err != nil { log.Fatalf("failed to get admin's initial password: %v", err) diff --git a/src/core/service/notifications/scheduler/handler.go b/src/core/service/notifications/scheduler/handler.go index a153135f1..f31800889 100644 --- a/src/core/service/notifications/scheduler/handler.go +++ b/src/core/service/notifications/scheduler/handler.go @@ -16,13 +16,11 @@ package scheduler import ( "encoding/json" - "fmt" - "github.com/goharbor/harbor/src/core/service/notifications" - "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/core/service/notifications" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/scheduler" - "github.com/goharbor/harbor/src/pkg/scheduler/hook" ) // Handler handles the scheduler requests @@ -34,46 +32,20 @@ type Handler struct { func (h *Handler) Handle() { log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id")) - var data models.JobStatusChange + var data job.StatusChange if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil { log.Errorf("failed to decode hook event: %v", err) return } - // status update - if len(data.CheckIn) == 0 { - schedulerID, err := h.GetInt64FromPath(":id") - if err != nil { - log.Errorf("failed to get the schedule ID: %v", err) - return - } - if err := hook.GlobalController.UpdateStatus(schedulerID, data.Status); err != nil { - h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err)) - return - } - log.Debugf("handle status update hook event for schedule %s completed", h.GetStringFromPath(":id")) + + schedulerID, err := h.GetInt64FromPath(":id") + if err != nil { + log.Errorf("failed to get the schedule ID: %v", err) return } - // run callback function - // just log the error message when handling check in request if got any error - params := map[string]interface{}{} - if err := json.Unmarshal([]byte(data.CheckIn), ¶ms); err != nil { - log.Errorf("failed to unmarshal parameters from check in message: %v", err) + if err = scheduler.HandleLegacyHook(h.Ctx.Request.Context(), schedulerID, &data); err != nil { + log.Errorf("failed to handle the legacy hook: %v", err) return } - callbackFuncNameParam, exist := params[scheduler.JobParamCallbackFunc] - if !exist { - log.Error("cannot get the parameter \"callback_func_name\" from the check in message") - return - } - callbackFuncName, ok := callbackFuncNameParam.(string) - if !ok || len(callbackFuncName) == 0 { - log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName) - return - } - if err := hook.GlobalController.Run(callbackFuncName, params[scheduler.JobParamCallbackFuncParams]); err != nil { - log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err) - return - } - log.Debugf("callback function %s called for schedule %s", callbackFuncName, h.GetStringFromPath(":id")) } diff --git a/src/lib/orm/error.go b/src/lib/orm/error.go index 47448e02c..c4afc288e 100644 --- a/src/lib/orm/error.go +++ b/src/lib/orm/error.go @@ -42,7 +42,7 @@ func WrapConflictError(err error, format string, args ...interface{}) error { // as a src/internal/error.Error with not found error code, else return nil func AsNotFoundError(err error, messageFormat string, args ...interface{}) *errors.Error { if errors.Is(err, orm.ErrNoRows) { - e := errors.NotFoundError(err) + e := errors.NotFoundError(nil) if len(messageFormat) > 0 { e.WithMessage(messageFormat, args...) } diff --git a/src/pkg/retention/controller.go b/src/pkg/retention/controller.go index 57d1411c6..ba6859b17 100644 --- a/src/pkg/retention/controller.go +++ b/src/pkg/retention/controller.go @@ -16,6 +16,7 @@ package retention import ( "fmt" + "github.com/goharbor/harbor/src/lib/orm" "time" "github.com/goharbor/harbor/src/pkg/project" @@ -65,7 +66,7 @@ type DefaultAPIController struct { const ( // SchedulerCallback ... - SchedulerCallback = "SchedulerCallback" + SchedulerCallback = "RetentionCallback" ) // TriggerParam ... @@ -84,7 +85,7 @@ func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) (int64, error if p.Trigger.Kind == policy.TriggerKindSchedule { cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron] if ok && len(cron.(string)) > 0 { - jobid, err := r.scheduler.Schedule(cron.(string), SchedulerCallback, TriggerParam{ + jobid, err := r.scheduler.Schedule(orm.Context(), cron.(string), SchedulerCallback, TriggerParam{ PolicyID: p.ID, Trigger: ExecutionTriggerSchedule, }) @@ -142,13 +143,13 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error { } } if needUn { - err = r.scheduler.UnSchedule(p0.Trigger.References[policy.TriggerReferencesJobid].(int64)) + err = r.scheduler.UnSchedule(orm.Context(), p0.Trigger.References[policy.TriggerReferencesJobid].(int64)) if err != nil { return err } } if needSch { - jobid, err := r.scheduler.Schedule(p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{ + jobid, err := r.scheduler.Schedule(orm.Context(), p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{ PolicyID: p.ID, Trigger: ExecutionTriggerSchedule, }) @@ -168,7 +169,7 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error { return err } if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 { - err = r.scheduler.UnSchedule(p.Trigger.References[policy.TriggerReferencesJobid].(int64)) + err = r.scheduler.UnSchedule(orm.Context(), p.Trigger.References[policy.TriggerReferencesJobid].(int64)) if err != nil { return err } diff --git a/src/pkg/retention/controller_test.go b/src/pkg/retention/controller_test.go index 761958b68..358cf56a7 100644 --- a/src/pkg/retention/controller_test.go +++ b/src/pkg/retention/controller_test.go @@ -1,9 +1,11 @@ package retention import ( + "context" "github.com/goharbor/harbor/src/pkg/retention/dep" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/rule" + "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/testing/pkg/repository" "github.com/stretchr/testify/suite" "strings" @@ -201,14 +203,18 @@ func (s *ControllerTestSuite) TestExecution() { type fakeRetentionScheduler struct { } -func (f *fakeRetentionScheduler) Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) { +func (f *fakeRetentionScheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { return 111, nil } -func (f *fakeRetentionScheduler) UnSchedule(id int64) error { +func (f *fakeRetentionScheduler) UnSchedule(ctx context.Context, id int64) error { return nil } +func (f *fakeRetentionScheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) { + return nil, nil +} + type fakeLauncher struct { } diff --git a/src/pkg/scheduler/callback.go b/src/pkg/scheduler/callback.go new file mode 100644 index 000000000..1eedaa233 --- /dev/null +++ b/src/pkg/scheduler/callback.go @@ -0,0 +1,89 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "fmt" + + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task" +) + +var ( + registry = make(map[string]CallbackFunc) +) + +// CallbackFunc defines the function that the scheduler calls when triggered +type CallbackFunc func(interface{}) error + +func init() { + if err := task.RegisterCheckInProcessor(JobNameScheduler, triggerCallback); err != nil { + log.Errorf("failed to register check in processor for scheduler: %v", err) + } +} + +// RegisterCallbackFunc registers the callback function which will be called when the scheduler is triggered +func RegisterCallbackFunc(name string, callbackFunc CallbackFunc) error { + if len(name) == 0 { + return errors.New("empty name") + } + if callbackFunc == nil { + return errors.New("callback function is nil") + } + + _, exist := registry[name] + if exist { + return fmt.Errorf("callback function %s already exists", name) + } + registry[name] = callbackFunc + + return nil +} + +func getCallbackFunc(name string) (CallbackFunc, error) { + f, exist := registry[name] + if !exist { + return nil, fmt.Errorf("callback function %s not found", name) + } + return f, nil +} + +func callbackFuncExist(name string) bool { + _, exist := registry[name] + return exist +} + +func triggerCallback(ctx context.Context, task *task.Task, change *job.StatusChange) (err error) { + schedules, err := Sched.(*scheduler).dao.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": task.ExecutionID, + }, + }) + if err != nil { + return err + } + if len(schedules) == 0 { + return fmt.Errorf("the schedule whose execution ID is %d not found", task.ExecutionID) + } + callbackFunc, err := getCallbackFunc(schedules[0].CallbackFuncName) + if err != nil { + return err + } + return callbackFunc(schedules[0].CallbackFuncParam) +} diff --git a/src/pkg/scheduler/callback_test.go b/src/pkg/scheduler/callback_test.go new file mode 100644 index 000000000..62525fb00 --- /dev/null +++ b/src/pkg/scheduler/callback_test.go @@ -0,0 +1,73 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type callbackTestSuite struct { + suite.Suite +} + +func (c *callbackTestSuite) SetupTest() { + registry = map[string]CallbackFunc{} + err := RegisterCallbackFunc("callback", func(interface{}) error { return nil }) + c.Require().Nil(err) +} + +func (c *callbackTestSuite) TestRegisterCallbackFunc() { + // empty name + err := RegisterCallbackFunc("", nil) + c.NotNil(err) + + // nil callback function + err = RegisterCallbackFunc("test", nil) + c.NotNil(err) + + // pass + err = RegisterCallbackFunc("test", func(interface{}) error { return nil }) + c.Nil(err) + + // duplicate name + err = RegisterCallbackFunc("test", func(interface{}) error { return nil }) + c.NotNil(err) +} + +func (c *callbackTestSuite) TestGetCallbackFunc() { + // not exist + _, err := getCallbackFunc("not-exist") + c.NotNil(err) + + // pass + f, err := getCallbackFunc("callback") + c.Require().Nil(err) + c.NotNil(f) +} + +func (c *callbackTestSuite) TestCallbackFuncExist() { + // not exist + c.False(callbackFuncExist("not-exist")) + + // exist + c.True(callbackFuncExist("callback")) +} + +func TestCallbackTestSuite(t *testing.T) { + s := &callbackTestSuite{} + suite.Run(t, s) +} diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go new file mode 100644 index 000000000..f854a6388 --- /dev/null +++ b/src/pkg/scheduler/dao.go @@ -0,0 +1,128 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "time" + + beegoorm "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" +) + +func init() { + beegoorm.RegisterModel(&schedule{}) +} + +type schedule struct { + ID int64 `orm:"pk;auto;column(id)"` + CRON string `orm:"column(cron)"` + ExecutionID int64 `orm:"column(execution_id)"` + CallbackFuncName string `orm:"column(callback_func_name)"` + CallbackFuncParam string `orm:"column(callback_func_param)"` + CreationTime time.Time `orm:"column(creation_time)"` + UpdateTime time.Time `orm:"column(update_time)"` +} + +// DAO is the data access object interface for schedule +type DAO interface { + Create(ctx context.Context, schedule *schedule) (id int64, err error) + List(ctx context.Context, query *q.Query) (schedules []*schedule, err error) + Get(ctx context.Context, id int64) (schedule *schedule, err error) + Delete(ctx context.Context, id int64) (err error) + Update(ctx context.Context, schedule *schedule, props ...string) (err error) +} + +type dao struct{} + +func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + id, err := ormer.Insert(schedule) + if err != nil { + if e := orm.AsForeignKeyError(err, + "the schedule tries to reference a non existing execution %d", schedule.ExecutionID); e != nil { + err = e + } + return 0, err + } + return id, nil +} + +func (d *dao) List(ctx context.Context, query *q.Query) ([]*schedule, error) { + qs, err := orm.QuerySetter(ctx, &schedule{}, query) + if err != nil { + return nil, err + } + + schedules := []*schedule{} + if _, err = qs.All(&schedules); err != nil { + return nil, err + } + return schedules, nil +} + +func (d *dao) Get(ctx context.Context, id int64) (*schedule, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return nil, err + } + schedule := &schedule{ + ID: id, + } + if err = ormer.Read(schedule); err != nil { + if e := orm.AsNotFoundError(err, "schedule %d not found", id); e != nil { + err = e + } + return nil, err + } + return schedule, nil +} + +func (d *dao) Delete(ctx context.Context, id int64) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Delete(&schedule{ + ID: id, + }) + if err != nil { + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("schedule %d not found", id) + } + + return nil +} +func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + n, err := ormer.Update(schedule, props...) + if err != nil { + return err + } + if n == 0 { + return errors.NotFoundError(nil).WithMessage("schedule %d not found", schedule.ID) + } + return nil +} diff --git a/src/pkg/scheduler/dao/schedule.go b/src/pkg/scheduler/dao/schedule.go deleted file mode 100644 index 1728556e4..000000000 --- a/src/pkg/scheduler/dao/schedule.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dao - -import ( - "errors" - "fmt" - "time" - - "github.com/astaxie/beego/orm" - "github.com/goharbor/harbor/src/common/dao" - "github.com/goharbor/harbor/src/pkg/scheduler/model" -) - -// ScheduleDao defines the method that a schedule data access model should implement -type ScheduleDao interface { - Create(*model.Schedule) (int64, error) - Update(*model.Schedule, ...string) error - Delete(int64) error - Get(int64) (*model.Schedule, error) - List(...*model.ScheduleQuery) ([]*model.Schedule, error) -} - -// New returns an instance of the default schedule data access model implementation -func New() ScheduleDao { - return &scheduleDao{} -} - -type scheduleDao struct{} - -func (s *scheduleDao) Create(schedule *model.Schedule) (int64, error) { - if schedule == nil { - return 0, errors.New("nil schedule") - } - now := time.Now() - schedule.CreationTime = &now - schedule.UpdateTime = &now - return dao.GetOrmer().Insert(schedule) -} - -func (s *scheduleDao) Update(schedule *model.Schedule, cols ...string) error { - if schedule == nil { - return errors.New("nil schedule") - } - if schedule.ID <= 0 { - return fmt.Errorf("invalid ID: %d", schedule.ID) - } - now := time.Now() - schedule.UpdateTime = &now - _, err := dao.GetOrmer().Update(schedule, cols...) - return err -} - -func (s *scheduleDao) Delete(id int64) error { - _, err := dao.GetOrmer().Delete(&model.Schedule{ - ID: id, - }) - return err -} - -func (s *scheduleDao) Get(id int64) (*model.Schedule, error) { - schedule := &model.Schedule{ - ID: id, - } - if err := dao.GetOrmer().Read(schedule); err != nil { - if err == orm.ErrNoRows { - return nil, nil - } - return nil, err - } - return schedule, nil -} - -func (s *scheduleDao) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) { - qs := dao.GetOrmer().QueryTable(&model.Schedule{}) - if len(query) > 0 && query[0] != nil { - if len(query[0].JobID) > 0 { - qs = qs.Filter("JobID", query[0].JobID) - } - } - schedules := []*model.Schedule{} - _, err := qs.All(&schedules) - if err != nil { - return nil, err - } - return schedules, nil -} diff --git a/src/pkg/scheduler/dao/schedule_test.go b/src/pkg/scheduler/dao/schedule_test.go deleted file mode 100644 index 60acf4f23..000000000 --- a/src/pkg/scheduler/dao/schedule_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package dao - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/goharbor/harbor/src/common/dao" - "github.com/goharbor/harbor/src/pkg/scheduler/model" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -var schDao = &scheduleDao{} - -type scheduleTestSuite struct { - suite.Suite - scheduleID int64 -} - -func (s *scheduleTestSuite) SetupSuite() { - dao.PrepareTestForPostgresSQL() -} - -func (s *scheduleTestSuite) SetupTest() { - t := s.T() - id, err := schDao.Create(&model.Schedule{ - JobID: "1", - Status: "pending", - }) - require.Nil(t, err) - s.scheduleID = id -} -func (s *scheduleTestSuite) TearDownTest() { - // clear - dao.GetOrmer().Raw("delete from schedule").Exec() -} - -func (s *scheduleTestSuite) TestCreate() { - t := s.T() - // nil schedule - _, err := schDao.Create(nil) - require.NotNil(t, err) - - // pass - _, err = schDao.Create(&model.Schedule{ - JobID: "1", - }) - require.Nil(t, err) -} - -func (s *scheduleTestSuite) TestUpdate() { - t := s.T() - // nil schedule - err := schDao.Update(nil) - require.NotNil(t, err) - - // invalid ID - err = schDao.Update(&model.Schedule{}) - require.NotNil(t, err) - - // pass - err = schDao.Update(&model.Schedule{ - ID: s.scheduleID, - Status: "running", - }) - require.Nil(t, err) - schedule, err := schDao.Get(s.scheduleID) - require.Nil(t, err) - assert.Equal(t, "running", schedule.Status) -} - -func (s *scheduleTestSuite) TestDelete() { - t := s.T() - err := schDao.Delete(s.scheduleID) - require.Nil(t, err) - schedule, err := schDao.Get(s.scheduleID) - require.Nil(t, err) - assert.Nil(t, schedule) -} - -func (s *scheduleTestSuite) TestGet() { - t := s.T() - schedule, err := schDao.Get(s.scheduleID) - require.Nil(t, err) - assert.Equal(t, "pending", schedule.Status) -} - -func (s *scheduleTestSuite) TestList() { - t := s.T() - // nil query - schedules, err := schDao.List() - require.Nil(t, err) - require.Equal(t, 1, len(schedules)) - assert.Equal(t, s.scheduleID, schedules[0].ID) - - // query by job ID - schedules, err = schDao.List(&model.ScheduleQuery{ - JobID: "1", - }) - require.Nil(t, err) - require.Equal(t, 1, len(schedules)) - assert.Equal(t, s.scheduleID, schedules[0].ID) -} - -func TestScheduleDao(t *testing.T) { - suite.Run(t, &scheduleTestSuite{}) -} diff --git a/src/pkg/scheduler/dao_test.go b/src/pkg/scheduler/dao_test.go new file mode 100644 index 000000000..c7d411db6 --- /dev/null +++ b/src/pkg/scheduler/dao_test.go @@ -0,0 +1,128 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "testing" + + common_dao "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/stretchr/testify/suite" +) + +type daoTestSuite struct { + suite.Suite + dao DAO + execMgr task.ExecutionManager + ctx context.Context + id int64 + execID int64 +} + +func (d *daoTestSuite) SetupSuite() { + d.dao = &dao{} + d.execMgr = task.NewExecutionManager() + common_dao.PrepareTestForPostgresSQL() + d.ctx = orm.Context() +} + +func (d *daoTestSuite) SetupTest() { + execID, err := d.execMgr.Create(d.ctx, "vendor", 0, "trigger") + d.Require().Nil(err) + d.execID = execID + schedule := &schedule{ + CRON: "0 * * * * *", + ExecutionID: execID, + CallbackFuncName: "callback_func_01", + CallbackFuncParam: "callback_func_params", + } + id, err := d.dao.Create(d.ctx, schedule) + d.Require().Nil(err) + d.id = id +} + +func (d *daoTestSuite) TearDownTest() { + d.Require().Nil(d.dao.Delete(d.ctx, d.id)) + d.Require().Nil(d.execMgr.Delete(d.ctx, d.execID)) +} + +func (d *daoTestSuite) TestCreate() { + // the happy pass is covered in SetupTest + + // foreign key error + _, err := d.dao.Create(d.ctx, &schedule{ + CRON: "0 * * * * *", + ExecutionID: 10000, + CallbackFuncName: "callback_func", + }) + d.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode)) +} + +func (d *daoTestSuite) TestList() { + schedules, err := d.dao.List(d.ctx, &q.Query{ + Keywords: map[string]interface{}{ + "CallbackFuncName": "callback_func_01", + }, + }) + d.Require().Nil(err) + d.Require().Len(schedules, 1) + d.Equal(d.id, schedules[0].ID) +} + +func (d *daoTestSuite) TestGet() { + // not found + schedule, err := d.dao.Get(d.ctx, 10000) + d.True(errors.IsNotFoundErr(err)) + + // pass + schedule, err = d.dao.Get(d.ctx, d.id) + d.Require().Nil(err) + d.Equal(d.id, schedule.ID) +} + +func (d *daoTestSuite) TestDelete() { + // the happy pass is covered in TearDownTest + + // not found + err := d.dao.Delete(d.ctx, 10000) + d.True(errors.IsNotFoundErr(err)) +} + +func (d *daoTestSuite) TestUpdate() { + // not found + err := d.dao.Update(d.ctx, &schedule{ + ID: 10000, + }) + d.True(errors.IsNotFoundErr(err)) + + // pass + err = d.dao.Update(d.ctx, &schedule{ + ID: d.id, + CRON: "* */2 * * * *", + }, "CRON") + d.Require().Nil(err) + + schedule, err := d.dao.Get(d.ctx, d.id) + d.Require().Nil(err) + d.Equal("* */2 * * * *", schedule.CRON) +} + +func TestDaoTestSuite(t *testing.T) { + suite.Run(t, &daoTestSuite{}) +} diff --git a/src/pkg/scheduler/hook/handler.go b/src/pkg/scheduler/hook/handler.go deleted file mode 100644 index f176850fa..000000000 --- a/src/pkg/scheduler/hook/handler.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package hook - -import ( - "time" - - "github.com/goharbor/harbor/src/pkg/scheduler" - "github.com/goharbor/harbor/src/pkg/scheduler/model" -) - -// GlobalController is an instance of the default controller that can be used globally -var GlobalController = NewController() - -// Controller updates the scheduler job status or runs the callback function -type Controller interface { - UpdateStatus(scheduleID int64, status string) error - Run(callbackFuncName string, params interface{}) error -} - -// NewController returns an instance of the default controller -func NewController() Controller { - return &controller{ - manager: scheduler.GlobalManager, - } -} - -type controller struct { - manager scheduler.Manager -} - -func (c *controller) UpdateStatus(scheduleID int64, status string) error { - now := time.Now() - return c.manager.Update(&model.Schedule{ - ID: scheduleID, - Status: status, - UpdateTime: &now, - }, "Status", "UpdateTime") -} - -func (c *controller) Run(callbackFuncName string, params interface{}) error { - f, err := scheduler.GetCallbackFunc(callbackFuncName) - if err != nil { - return err - } - return f(params) -} diff --git a/src/pkg/scheduler/hook/handler_test.go b/src/pkg/scheduler/hook/handler_test.go deleted file mode 100644 index f245aabe6..000000000 --- a/src/pkg/scheduler/hook/handler_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package hook - -import ( - "github.com/goharbor/harbor/src/pkg/scheduler" - "github.com/goharbor/harbor/src/pkg/scheduler/model" - schedulertesting "github.com/goharbor/harbor/src/testing/pkg/scheduler" - "github.com/stretchr/testify/require" - "testing" -) - -var h = &controller{ - manager: &schedulertesting.FakeManager{}, -} - -func TestUpdateStatus(t *testing.T) { - // task not exist - err := h.UpdateStatus(1, "running") - require.NotNil(t, err) - - // pass - h.manager.(*schedulertesting.FakeManager).Schedules = []*model.Schedule{ - { - ID: 1, - Status: "", - }, - } - err = h.UpdateStatus(1, "running") - require.Nil(t, err) -} - -func TestRun(t *testing.T) { - // callback function not exist - err := h.Run("not-exist", nil) - require.NotNil(t, err) - - // pass - err = scheduler.Register("callback", func(interface{}) error { return nil }) - require.Nil(t, err) - err = h.Run("callback", nil) - require.Nil(t, err) -} diff --git a/src/pkg/scheduler/manager.go b/src/pkg/scheduler/manager.go deleted file mode 100644 index 735e89873..000000000 --- a/src/pkg/scheduler/manager.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler - -import ( - "github.com/goharbor/harbor/src/pkg/scheduler/dao" - "github.com/goharbor/harbor/src/pkg/scheduler/model" -) - -var ( - // GlobalManager is an instance of the default manager that - // can be used globally - GlobalManager = NewManager() -) - -// Manager manages the schedule of the scheduler -type Manager interface { - Create(*model.Schedule) (int64, error) - Update(*model.Schedule, ...string) error - Delete(int64) error - Get(int64) (*model.Schedule, error) - List(...*model.ScheduleQuery) ([]*model.Schedule, error) -} - -// NewManager returns an instance of the default manager -func NewManager() Manager { - return &manager{ - scheduleDao: dao.New(), - } -} - -type manager struct { - scheduleDao dao.ScheduleDao -} - -func (m *manager) Create(schedule *model.Schedule) (int64, error) { - return m.scheduleDao.Create(schedule) -} - -func (m *manager) Update(schedule *model.Schedule, props ...string) error { - return m.scheduleDao.Update(schedule, props...) -} - -func (m *manager) Delete(id int64) error { - return m.scheduleDao.Delete(id) -} - -func (m *manager) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) { - return m.scheduleDao.List(query...) -} - -func (m *manager) Get(id int64) (*model.Schedule, error) { - return m.scheduleDao.Get(id) -} diff --git a/src/pkg/scheduler/manager_test.go b/src/pkg/scheduler/manager_test.go deleted file mode 100644 index b9f59b358..000000000 --- a/src/pkg/scheduler/manager_test.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler - -import ( - "testing" - - "github.com/goharbor/harbor/src/pkg/scheduler/model" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" -) - -var mgr *manager - -type fakeScheduleDao struct { - schedules []*model.Schedule - mock.Mock -} - -func (f *fakeScheduleDao) Create(*model.Schedule) (int64, error) { - f.Called() - return 1, nil -} -func (f *fakeScheduleDao) Update(*model.Schedule, ...string) error { - f.Called() - return nil -} -func (f *fakeScheduleDao) Delete(int64) error { - f.Called() - return nil -} -func (f *fakeScheduleDao) Get(int64) (*model.Schedule, error) { - f.Called() - return nil, nil -} -func (f *fakeScheduleDao) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) { - f.Called() - if len(query) == 0 || query[0] == nil { - return f.schedules, nil - } - result := []*model.Schedule{} - for _, sch := range f.schedules { - if sch.JobID == query[0].JobID { - result = append(result, sch) - } - } - return result, nil -} - -type managerTestSuite struct { - suite.Suite -} - -func (m *managerTestSuite) SetupTest() { - // recreate schedule manager - mgr = &manager{ - scheduleDao: &fakeScheduleDao{}, - } -} - -func (m *managerTestSuite) TestCreate() { - t := m.T() - mgr.scheduleDao.(*fakeScheduleDao).On("Create", mock.Anything) - mgr.Create(nil) - mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Create") -} - -func (m *managerTestSuite) TestUpdate() { - t := m.T() - mgr.scheduleDao.(*fakeScheduleDao).On("Update", mock.Anything) - mgr.Update(nil) - mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Update") -} - -func (m *managerTestSuite) TestDelete() { - t := m.T() - mgr.scheduleDao.(*fakeScheduleDao).On("Delete", mock.Anything) - mgr.Delete(1) - mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Delete") -} - -func (m *managerTestSuite) TestGet() { - t := m.T() - mgr.scheduleDao.(*fakeScheduleDao).On("Get", mock.Anything) - mgr.Get(1) - mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Get") -} - -func (m *managerTestSuite) TestList() { - t := m.T() - mgr.scheduleDao.(*fakeScheduleDao).On("List", mock.Anything) - mgr.List(nil) - mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "List") -} - -func TestManager(t *testing.T) { - suite.Run(t, &managerTestSuite{}) -} diff --git a/src/pkg/scheduler/mock.go b/src/pkg/scheduler/mock.go new file mode 100644 index 000000000..7472da311 --- /dev/null +++ b/src/pkg/scheduler/mock.go @@ -0,0 +1,17 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +//go:generate mockery -name DAO -output . -outpkg scheduler -filename mock_dao_test.go -structname mockDAO -inpkg diff --git a/src/pkg/scheduler/mock_dao_test.go b/src/pkg/scheduler/mock_dao_test.go new file mode 100644 index 000000000..56cd768bc --- /dev/null +++ b/src/pkg/scheduler/mock_dao_test.go @@ -0,0 +1,117 @@ +// Code generated by mockery v1.1.2. DO NOT EDIT. + +package scheduler + +import ( + context "context" + + q "github.com/goharbor/harbor/src/lib/q" + mock "github.com/stretchr/testify/mock" +) + +// mockDAO is an autogenerated mock type for the DAO type +type mockDAO struct { + mock.Mock +} + +// Create provides a mock function with given fields: ctx, schedule +func (_m *mockDAO) Create(ctx context.Context, schd *schedule) (int64, error) { + ret := _m.Called(ctx, schd) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *schedule) int64); ok { + r0 = rf(ctx, schd) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *schedule) error); ok { + r1 = rf(ctx, schd) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Delete provides a mock function with given fields: ctx, id +func (_m *mockDAO) Delete(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Get provides a mock function with given fields: ctx, id +func (_m *mockDAO) Get(ctx context.Context, id int64) (*schedule, error) { + ret := _m.Called(ctx, id) + + var r0 *schedule + if rf, ok := ret.Get(0).(func(context.Context, int64) *schedule); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*schedule) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx, query +func (_m *mockDAO) List(ctx context.Context, query *q.Query) ([]*schedule, error) { + ret := _m.Called(ctx, query) + + var r0 []*schedule + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*schedule); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*schedule) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: ctx, schedule, props +func (_m *mockDAO) Update(ctx context.Context, schd *schedule, props ...string) error { + _va := make([]interface{}, len(props)) + for _i := range props { + _va[_i] = props[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, schd) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *schedule, ...string) error); ok { + r0 = rf(ctx, schd, props...) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/pkg/scheduler/model/schedule.go b/src/pkg/scheduler/model/schedule.go deleted file mode 100644 index 3cdbe0a68..000000000 --- a/src/pkg/scheduler/model/schedule.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "time" - - "github.com/astaxie/beego/orm" -) - -func init() { - orm.RegisterModel( - new(Schedule)) -} - -// Schedule is a record for a scheduler job -type Schedule struct { - ID int64 `orm:"pk;auto;column(id)" json:"id"` - JobID string `orm:"column(job_id)" json:"job_id"` - Status string `orm:"column(status)" json:"status"` - CreationTime *time.Time `orm:"column(creation_time)" json:"creation_time"` - UpdateTime *time.Time `orm:"column(update_time)" json:"update_time"` -} - -// ScheduleQuery is query for schedule -type ScheduleQuery struct { - JobID string -} diff --git a/src/pkg/scheduler/periodic_job.go b/src/pkg/scheduler/periodic_job.go index eefafed3b..eb067f440 100644 --- a/src/pkg/scheduler/periodic_job.go +++ b/src/pkg/scheduler/periodic_job.go @@ -15,8 +15,6 @@ package scheduler import ( - "encoding/json" - "github.com/goharbor/harbor/src/jobservice/job" ) @@ -51,9 +49,5 @@ func (pj *PeriodicJob) Validate(params job.Parameters) error { // Run the job func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error { - data, err := json.Marshal(params) - if err != nil { - return err - } - return ctx.Checkin(string(data)) + return ctx.Checkin("checkin") } diff --git a/src/pkg/scheduler/scheduler.go b/src/pkg/scheduler/scheduler.go index e356b3b55..67d6791a1 100644 --- a/src/pkg/scheduler/scheduler.go +++ b/src/pkg/scheduler/scheduler.go @@ -15,194 +15,201 @@ package scheduler import ( + "context" "encoding/json" "fmt" - "net/http" - "sync" "time" - chttp "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/job" - "github.com/goharbor/harbor/src/common/job/models" - "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" - "github.com/goharbor/harbor/src/pkg/scheduler/model" -) - -// const definitions -const ( - JobParamCallbackFunc = "callback_func" - JobParamCallbackFuncParams = "params" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/task" + cronlib "github.com/robfig/cron" ) var ( - // GlobalScheduler is an instance of the default scheduler that - // can be used globally. Call Init() to initialize it first - GlobalScheduler Scheduler - registry = make(map[string]CallbackFunc) + // Sched is an instance of the default scheduler that can be used globally + Sched = New() ) -// CallbackFunc defines the function that the scheduler calls when triggered -type CallbackFunc func(interface{}) error +// Schedule describes the detail information about the created schedule +type Schedule struct { + ID int64 `json:"id"` + CRON string `json:"cron"` + Status string `json:"status"` // status of the underlying task(jobservice job) + CreationTime time.Time `json:"creation_time"` + UpdateTime time.Time `json:"update_time"` + // we can extend this model to include more information(e.g. how many times the schedule already + // runs; when will the schedule runs next time) +} // Scheduler provides the capability to run a periodic task, a callback function // needs to be registered before using the scheduler -// The "params" is passed to the callback function specified by "callbackFuncName" -// as encoded json string, so the callback function must decode it before using type Scheduler interface { - Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) - UnSchedule(id int64) error -} - -// Register the callback function with name, and the function will be called -// by the scheduler when the scheduler is triggered -func Register(name string, callbackFunc CallbackFunc) error { - if len(name) == 0 { - return errors.New("empty name") - } - if callbackFunc == nil { - return errors.New("callback function is nil") - } - - _, exist := registry[name] - if exist { - return fmt.Errorf("callback function %s already exists", name) - } - registry[name] = callbackFunc - - return nil -} - -// GetCallbackFunc returns the registered callback function specified by the name -func GetCallbackFunc(name string) (CallbackFunc, error) { - f, exist := registry[name] - if !exist { - return nil, fmt.Errorf("callback function %s not found", name) - } - return f, nil -} - -func callbackFuncExist(name string) bool { - _, exist := registry[name] - return exist -} - -// Init the GlobalScheduler -func Init() { - GlobalScheduler = New(config.InternalCoreURL()) + // Schedule creates a task which calls the specified callback function periodically + // The callback function needs to be registered first + // The "params" is passed to the callback function as encoded json string, so the callback + // function must decode it before using + Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) + // UnSchedule the created schedule instance + UnSchedule(ctx context.Context, id int64) error + // GetSchedule gets the schedule specified by ID + GetSchedule(ctx context.Context, id int64) (*Schedule, error) } // New returns an instance of the default scheduler -func New(internalCoreURL string) Scheduler { +func New() Scheduler { return &scheduler{ - internalCoreURL: internalCoreURL, - jobserviceClient: job.GlobalClient, - manager: GlobalManager, + dao: &dao{}, + execMgr: task.ExecMgr, + taskMgr: task.Mgr, } } type scheduler struct { - sync.RWMutex - internalCoreURL string - manager Manager - jobserviceClient job.Client + dao DAO + execMgr task.ExecutionManager + taskMgr task.Manager } -func (s *scheduler) Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) { +func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { + if _, err := cronlib.Parse(cron); err != nil { + return 0, errors.New(nil).WithCode(errors.BadRequestCode). + WithMessage("invalid cron %s: %v", cron, err) + } if !callbackFuncExist(callbackFuncName) { return 0, fmt.Errorf("callback function %s not found", callbackFuncName) } - // create schedule record + execID, err := s.execMgr.Create(ctx, JobNameScheduler, 0, task.ExecutionTriggerManual) + if err != nil { + return 0, err + } + now := time.Now() - scheduleID, err := s.manager.Create(&model.Schedule{ - CreationTime: &now, - UpdateTime: &now, + sched := &schedule{ + CRON: cron, + ExecutionID: execID, + CallbackFuncName: callbackFuncName, + CreationTime: now, + UpdateTime: now, + } + if params != nil { + paramsData, err := json.Marshal(params) + if err != nil { + return 0, err + } + sched.CallbackFuncParam = string(paramsData) + } + + // create schedule record + // when status/checkin hook comes, the database record must exist, + // so the database record must be created first before submitting job + id, err := s.dao.Create(ctx, sched) + if err != nil { + return 0, err + } + + taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{ + Name: JobNameScheduler, + Metadata: &job.Metadata{ + JobKind: job.KindPeriodic, + Cron: cron, + }, }) if err != nil { return 0, err } - // if got error in the following steps, delete the schedule record in database - defer func() { - if err != nil { - e := s.manager.Delete(scheduleID) - if e != nil { - log.Errorf("failed to delete the schedule %d: %v", scheduleID, e) - } - } - }() - log.Debugf("the schedule record %d created", scheduleID) - - // submit scheduler job to Jobservice - statusHookURL := fmt.Sprintf("%s/service/notifications/schedules/%d", s.internalCoreURL, scheduleID) - jd := &models.JobData{ - Name: JobNameScheduler, - Parameters: map[string]interface{}{ - JobParamCallbackFunc: callbackFuncName, - }, - Metadata: &models.JobMetadata{ - JobKind: job.JobKindPeriodic, - Cron: cron, - }, - StatusHook: statusHookURL, - } - if params != nil { - var paramsData []byte - paramsData, err = json.Marshal(params) - if err != nil { - return 0, err - } - jd.Parameters[JobParamCallbackFuncParams] = string(paramsData) - } - jobID, err := s.jobserviceClient.SubmitJob(jd) + // when task manager creating a task, it creates the task database record first and + // then submits the job to jobservice. If the submitting failed, it doesn't return + // any error. So we check the task status to make sure the job is submitted to jobservice + // successfully here + task, err := s.taskMgr.Get(ctx, taskID) if err != nil { return 0, err } - // if got error in the following steps, stop the scheduler job - defer func() { - if err != nil { - if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil { - log.Errorf("failed to stop the scheduler job %s: %v", jobID, e) - } - } - }() - log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID) - - // populate the job ID for the schedule - err = s.manager.Update(&model.Schedule{ - ID: scheduleID, - JobID: jobID, - }, "JobID") - if err != nil { - return 0, err + if task.Status == job.ErrorStatus.String() { + return 0, fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String()) } - return scheduleID, nil + return id, nil } -func (s *scheduler) UnSchedule(id int64) error { - schedule, err := s.manager.Get(id) +func (s *scheduler) UnSchedule(ctx context.Context, id int64) error { + schedule, err := s.dao.Get(ctx, id) if err != nil { + if errors.IsNotFoundErr(err) { + log.Warningf("trying to unschedule a non existing schedule %d, skip directly", id) + return nil + } return err } - if schedule == nil { - log.Warningf("the schedule record %d not found", id) - return nil + if err = s.execMgr.Stop(ctx, schedule.ExecutionID); err != nil { + return err } - if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil { - herr, ok := err.(*chttp.Error) - // if the job specified by jobID is not found in Jobservice, just delete - // the schedule record - if !ok || herr.Code != http.StatusNotFound { + + // after the stop called, the execution cannot be stopped immediately, + // use the for loop to make sure the execution be in final status before deleting it + for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 { + exec, err := s.execMgr.Get(ctx, schedule.ExecutionID) + if err != nil { return err } + if job.Status(exec.Status).Final() { + // delete schedule record + if err = s.dao.Delete(ctx, id); err != nil { + return err + } + // delete execution + return s.execMgr.Delete(ctx, schedule.ExecutionID) + } + time.Sleep(t) } - log.Debugf("the stop action for job %s submitted to the Jobservice", schedule.JobID) - if err = s.manager.Delete(schedule.ID); err != nil { + + return fmt.Errorf("failed to unschedule the schedule %d: the execution isn't in final status", id) +} + +func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) { + schedule, err := s.dao.Get(ctx, id) + if err != nil { + return nil, err + } + schd := &Schedule{ + ID: schedule.ID, + CRON: schedule.CRON, + CreationTime: schedule.CreationTime, + UpdateTime: schedule.UpdateTime, + } + exec, err := s.execMgr.Get(ctx, schedule.ExecutionID) + if err != nil { + return nil, err + } + schd.Status = exec.Status + return schd, nil +} + +// HandleLegacyHook handles the legacy web hook for scheduler +// We rewrite the implementation of scheduler with task manager mechanism in v2.1, +// this method is used to handle the job status hook for the legacy implementation +// We can remove the method and the hook endpoint after several releases +func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChange) error { + scheduler := Sched.(*scheduler) + schedule, err := scheduler.dao.Get(ctx, scheduleID) + if err != nil { return err } - log.Debugf("the schedule record %d deleted", schedule.ID) - - return nil + tasks, err := scheduler.taskMgr.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "ExecutionID": schedule.ExecutionID, + }, + }) + if err != nil { + return err + } + if len(tasks) == 0 { + return errors.New(nil).WithCode(errors.NotFoundCode). + WithMessage("no task references the execution %d", schedule.ExecutionID) + } + return task.NewHookHandler().Handle(ctx, tasks[0].ID, sc) } diff --git a/src/pkg/scheduler/scheduler_test.go b/src/pkg/scheduler/scheduler_test.go index 79e6dbbeb..f2035b18a 100644 --- a/src/pkg/scheduler/scheduler_test.go +++ b/src/pkg/scheduler/scheduler_test.go @@ -15,97 +15,149 @@ package scheduler import ( - "github.com/goharbor/harbor/src/testing/job" - schedulertesting "github.com/goharbor/harbor/src/testing/pkg/scheduler" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/goharbor/harbor/src/testing/mock" + tasktesting "github.com/goharbor/harbor/src/testing/pkg/task" "github.com/stretchr/testify/suite" "testing" ) -var sch *scheduler - type schedulerTestSuite struct { suite.Suite + scheduler *scheduler + dao *mockDAO + execMgr *tasktesting.FakeExecutionManager + taskMgr *tasktesting.FakeManager } func (s *schedulerTestSuite) SetupTest() { - t := s.T() - // empty callback function registry before running every test case - // and register a new callback function named "callback" - registry = make(map[string]CallbackFunc) - err := Register("callback", func(interface{}) error { return nil }) - require.Nil(t, err) + registry = map[string]CallbackFunc{} + err := RegisterCallbackFunc("callback", func(interface{}) error { return nil }) + s.Require().Nil(err) - // recreate the scheduler object - sch = &scheduler{ - jobserviceClient: &job.MockJobClient{}, - manager: &schedulertesting.FakeManager{}, + s.dao = &mockDAO{} + s.execMgr = &tasktesting.FakeExecutionManager{} + s.taskMgr = &tasktesting.FakeManager{} + + s.scheduler = &scheduler{ + dao: s.dao, + execMgr: s.execMgr, + taskMgr: s.taskMgr, } } -func (s *schedulerTestSuite) TestRegister() { - t := s.T() - var name string - var callbackFun CallbackFunc - - // empty name - err := Register(name, callbackFun) - require.NotNil(t, err) - - // nil callback function - name = "test" - err = Register(name, callbackFun) - require.NotNil(t, err) - - // pass - callbackFun = func(interface{}) error { return nil } - err = Register(name, callbackFun) - require.Nil(t, err) - - // duplicate name - err = Register(name, callbackFun) - require.NotNil(t, err) -} - -func (s *schedulerTestSuite) TestGetCallbackFunc() { - t := s.T() - // not exist - _, err := GetCallbackFunc("not-exist") - require.NotNil(t, err) - - // pass - f, err := GetCallbackFunc("callback") - require.Nil(t, err) - assert.NotNil(t, f) -} - func (s *schedulerTestSuite) TestSchedule() { - t := s.T() + // invalid cron + id, err := s.scheduler.Schedule(nil, "", "callback", nil) + s.NotNil(err) // callback function not exist - _, err := sch.Schedule("0 * * * * *", "not-exist", nil) - require.NotNil(t, err) + id, err = s.scheduler.Schedule(nil, "0 * * * * *", "not-exist", nil) + s.NotNil(err) + + // failed to submit to jobservice + s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{ + ID: 1, + ExecutionID: 1, + Status: job.ErrorStatus.String(), + }, nil) + _, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param") + s.Require().NotNil(err) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) + s.taskMgr.AssertExpectations(s.T()) + + // reset mocks + s.SetupTest() // pass - id, err := sch.Schedule("0 * * * * *", "callback", nil) - require.Nil(t, err) - assert.Equal(t, int64(1), id) + s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) + s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{ + ID: 1, + ExecutionID: 1, + Status: job.SuccessStatus.String(), + }, nil) + id, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param") + s.Require().Nil(err) + s.Equal(int64(1), id) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) + s.taskMgr.AssertExpectations(s.T()) } func (s *schedulerTestSuite) TestUnSchedule() { - t := s.T() - // schedule not exist - err := sch.UnSchedule(1) - require.NotNil(t, err) + // not existing schedule + s.dao.On("Get", mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil)) + err := s.scheduler.UnSchedule(nil, 10000) + s.Nil(err) + s.dao.AssertExpectations(s.T()) - // schedule exist - id, err := sch.Schedule("0 * * * * *", "callback", nil) - require.Nil(t, err) - assert.Equal(t, int64(1), id) + // reset mocks + s.SetupTest() - err = sch.UnSchedule(id) - require.Nil(t, err) + // the underlying task isn't stopped + s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ + ID: 1, + CRON: "0 * * * * *", + ExecutionID: 1, + CallbackFuncName: "callback", + }, nil) + s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) + s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ + ID: 1, + Status: job.RunningStatus.String(), + }, nil) + err = s.scheduler.UnSchedule(nil, 1) + s.NotNil(err) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) + + // reset mocks + s.SetupTest() + + // pass + s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ + ID: 1, + CRON: "0 * * * * *", + ExecutionID: 1, + CallbackFuncName: "callback", + }, nil) + s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil) + s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ + ID: 1, + Status: job.StoppedStatus.String(), + }, nil) + s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil) + s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil) + err = s.scheduler.UnSchedule(nil, 1) + s.Nil(err) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) +} + +func (s *schedulerTestSuite) TestGetSchedule() { + s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{ + ID: 1, + CRON: "0 * * * * *", + ExecutionID: 1, + }, nil) + s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{ + ID: 1, + Status: job.SuccessStatus.String(), + }, nil) + schedule, err := s.scheduler.GetSchedule(nil, 1) + s.Require().Nil(err) + s.Equal("0 * * * * *", schedule.CRON) + s.Equal(job.SuccessStatus.String(), schedule.Status) + s.dao.AssertExpectations(s.T()) + s.execMgr.AssertExpectations(s.T()) } func TestScheduler(t *testing.T) { diff --git a/src/pkg/task/checkin.go b/src/pkg/task/checkin.go index 7b6a90fb7..2bd503f38 100644 --- a/src/pkg/task/checkin.go +++ b/src/pkg/task/checkin.go @@ -28,8 +28,8 @@ var ( // CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook type CheckInProcessor func(ctx context.Context, task *Task, change *job.StatusChange) (err error) -// Register check in processor for the specific vendor type -func Register(vendorType string, processor CheckInProcessor) error { +// RegisterCheckInProcessor registers check in processor for the specific vendor type +func RegisterCheckInProcessor(vendorType string, processor CheckInProcessor) error { if _, exist := registry[vendorType]; exist { return fmt.Errorf("check in processor for %s already exists", vendorType) } diff --git a/src/pkg/task/checkin_test.go b/src/pkg/task/checkin_test.go index 279d08557..3c12f20fe 100644 --- a/src/pkg/task/checkin_test.go +++ b/src/pkg/task/checkin_test.go @@ -20,11 +20,11 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRegister(t *testing.T) { - err := Register("test", nil) +func TestRegisterCheckInProcessor(t *testing.T) { + err := RegisterCheckInProcessor("test", nil) assert.Nil(t, err) // already exist - err = Register("test", nil) + err = RegisterCheckInProcessor("test", nil) assert.NotNil(t, err) } diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index 1aa538291..20f54f6a4 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -122,7 +122,7 @@ func (e *executionDAO) Delete(ctx context.Context, id int64) error { }) if err != nil { if e := orm.AsForeignKeyError(err, - "the execution %d is referenced by other tasks", id); e != nil { + "the execution %d is referenced by other resources", id); e != nil { err = e } return err diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index a02571dae..c015d5c9e 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -79,7 +79,8 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA jobID, err := m.submitJob(ctx, id, jb) if err != nil { // failed to submit job to jobservice, update the status of task to error - log.Errorf("failed to submit job to jobservice: %v", err) + err = fmt.Errorf("failed to submit job to jobservice: %v", err) + log.Error(err) now := time.Now() err = m.dao.Update(ctx, &dao.Task{ ID: id, @@ -155,12 +156,9 @@ func (m *manager) Stop(ctx context.Context, id int64) error { return err } - // if the task is already in final status, return directly - if job.Status(task.Status).Final() { - log.Debugf("the task %d is in final status %s, skip", task.ID, task.Status) - return nil - } - + // when a task is in final status, if it's a periodic or retrying job it will + // run again in the near future, so we must operate the stop action to these final + // status jobs as well if err = m.jsClient.PostAction(task.JobID, string(job.StopCommand)); err != nil { // job not found, update it's status to stop directly if err == cjob.ErrJobNotFound { diff --git a/src/pkg/task/task_test.go b/src/pkg/task/task_test.go index 246b62155..2c4c00359 100644 --- a/src/pkg/task/task_test.go +++ b/src/pkg/task/task_test.go @@ -70,21 +70,7 @@ func (t *taskManagerTestSuite) TestCreate() { } func (t *taskManagerTestSuite) TestStop() { - // the task is in final status - t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ - ID: 1, - ExecutionID: 1, - Status: job.SuccessStatus.String(), - }, nil) - - err := t.mgr.Stop(nil, 1) - t.Require().Nil(err) - t.dao.AssertExpectations(t.T()) - - // reset mock - t.SetupTest() - - // the task isn't in final status, job not found + // job not found t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ ID: 1, ExecutionID: 1, @@ -93,7 +79,7 @@ func (t *taskManagerTestSuite) TestStop() { t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound) t.dao.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - err = t.mgr.Stop(nil, 1) + err := t.mgr.Stop(nil, 1) t.Require().Nil(err) t.dao.AssertExpectations(t.T()) t.jsClient.AssertExpectations(t.T()) @@ -101,7 +87,7 @@ func (t *taskManagerTestSuite) TestStop() { // reset mock t.SetupTest() - // the task isn't in final status + // pass t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ ID: 1, ExecutionID: 1, diff --git a/src/server/handler/job_status_hook.go b/src/server/handler/job_status_hook.go index 0161446a3..efd66b48c 100644 --- a/src/server/handler/job_status_hook.go +++ b/src/server/handler/job_status_hook.go @@ -20,7 +20,9 @@ import ( "strconv" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/errors" libhttp "github.com/goharbor/harbor/src/lib/http" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/task" "github.com/goharbor/harbor/src/server/router" ) @@ -52,6 +54,11 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err = j.handler.Handle(r.Context(), taskID, sc); err != nil { + // ignore the not found error to avoid the jobservice re-sending the hook + if errors.IsNotFoundErr(err) { + log.Warningf("got the status change hook for a non existing task %d", taskID) + return + } libhttp.SendError(w, err) return } diff --git a/src/testing/pkg/scheduler/mock.go b/src/testing/pkg/scheduler/mock.go new file mode 100644 index 000000000..b1f049bb0 --- /dev/null +++ b/src/testing/pkg/scheduler/mock.go @@ -0,0 +1,17 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +//go:generate mockery -dir ../../../pkg/scheduler -name Scheduler -output . -outpkg scheduler diff --git a/src/testing/pkg/scheduler/scheduler.go b/src/testing/pkg/scheduler/scheduler.go index 5675d8baf..be7396b0e 100644 --- a/src/testing/pkg/scheduler/scheduler.go +++ b/src/testing/pkg/scheduler/scheduler.go @@ -1,77 +1,73 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Code generated by mockery v1.1.2. DO NOT EDIT. package scheduler import ( - "fmt" + context "context" - "github.com/goharbor/harbor/src/pkg/scheduler/model" + scheduler "github.com/goharbor/harbor/src/pkg/scheduler" + mock "github.com/stretchr/testify/mock" ) -// FakeManager ... -type FakeManager struct { - idCounter int64 - Schedules []*model.Schedule +// Scheduler is an autogenerated mock type for the Scheduler type +type Scheduler struct { + mock.Mock } -// Create ... -func (f *FakeManager) Create(schedule *model.Schedule) (int64, error) { - f.idCounter++ - id := f.idCounter - schedule.ID = id - f.Schedules = append(f.Schedules, schedule) - return id, nil -} +// GetSchedule provides a mock function with given fields: ctx, id +func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) { + ret := _m.Called(ctx, id) -// Update ... -func (f *FakeManager) Update(schedule *model.Schedule, props ...string) error { - for i, sch := range f.Schedules { - if sch.ID == schedule.ID { - f.Schedules[i] = schedule - return nil + var r0 *scheduler.Schedule + if rf, ok := ret.Get(0).(func(context.Context, int64) *scheduler.Schedule); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*scheduler.Schedule) } } - return fmt.Errorf("the execution %d not found", schedule.ID) -} -// Delete ... -func (f *FakeManager) Delete(id int64) error { - length := len(f.Schedules) - for i, sch := range f.Schedules { - if sch.ID == id { - f.Schedules = f.Schedules[:i] - if i != length-1 { - f.Schedules = append(f.Schedules, f.Schedules[i+1:]...) - } - return nil - } + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) } - return fmt.Errorf("the execution %d not found", id) + + return r0, r1 } -// Get ... -func (f *FakeManager) Get(id int64) (*model.Schedule, error) { - for _, sch := range f.Schedules { - if sch.ID == id { - return sch, nil - } +// Schedule provides a mock function with given fields: ctx, cron, callbackFuncName, params +func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) { + ret := _m.Called(ctx, cron, callbackFuncName, params) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}) int64); ok { + r0 = rf(ctx, cron, callbackFuncName, params) + } else { + r0 = ret.Get(0).(int64) } - return nil, fmt.Errorf("the execution %d not found", id) + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}) error); ok { + r1 = rf(ctx, cron, callbackFuncName, params) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// List ... -func (f *FakeManager) List(...*model.ScheduleQuery) ([]*model.Schedule, error) { - return f.Schedules, nil +// UnSchedule provides a mock function with given fields: ctx, id +func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 }