From c04f3a2aac845bab6e73faf61608b8c118875a5f Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 11 May 2021 21:19:36 +0800 Subject: [PATCH] Fix duplicate execution record issue When the core service cannot response the checkin request in time, duplicated execution records may be created, this commit introduces the revision column to make sure there is only one record for one schedule trigger Signed-off-by: Wenkai Yin --- .../postgresql/0052_2.2.2_schema.up.sql | 2 ++ src/controller/scan/callback.go | 4 ++-- src/controller/scan/callback_test.go | 4 ++-- src/pkg/retention/callback.go | 6 +++--- src/pkg/scheduler/callback.go | 13 +++++++++++- src/pkg/scheduler/dao.go | 12 +++++++++++ src/pkg/scheduler/mock_dao_test.go | 21 +++++++++++++++++++ src/pkg/task/hook.go | 3 +-- src/pkg/task/hook_test.go | 2 +- src/pkg/task/registry.go | 4 +++- 10 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 make/migrations/postgresql/0052_2.2.2_schema.up.sql diff --git a/make/migrations/postgresql/0052_2.2.2_schema.up.sql b/make/migrations/postgresql/0052_2.2.2_schema.up.sql new file mode 100644 index 000000000..192bd5c3d --- /dev/null +++ b/make/migrations/postgresql/0052_2.2.2_schema.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS revision integer; +UPDATE schedule set revision = 0; \ No newline at end of file diff --git a/src/controller/scan/callback.go b/src/controller/scan/callback.go index 04546d383..de292f547 100644 --- a/src/controller/scan/callback.go +++ b/src/controller/scan/callback.go @@ -116,9 +116,9 @@ func scanTaskStatusChange(ctx context.Context, taskID int64, status string) (err } // scanTaskCheckInProcessor checkin processor handles the webhook of scan job -func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) { +func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) { checkInReport := &scan.CheckInReport{} - if err := checkInReport.FromJSON(data); err != nil { + if err := checkInReport.FromJSON(sc.CheckIn); err != nil { log.G(ctx).WithField("error", err).Errorf("failed to convert data to report") return err } diff --git a/src/controller/scan/callback_test.go b/src/controller/scan/callback_test.go index ef594f5f8..4d2d55fc2 100644 --- a/src/controller/scan/callback_test.go +++ b/src/controller/scan/callback_test.go @@ -135,7 +135,7 @@ func (suite *CallbackTestSuite) TestScanTaskStatusChange() { func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() { { - suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, "report")) + suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: "report"})) } { @@ -156,7 +156,7 @@ func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() { } r, _ := json.Marshal(report) - suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, string(r))) + suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: string(r)})) } } diff --git a/src/pkg/retention/callback.go b/src/pkg/retention/callback.go index 43b2b8d4a..61d48fe9f 100644 --- a/src/pkg/retention/callback.go +++ b/src/pkg/retention/callback.go @@ -19,18 +19,18 @@ func init() { } -func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) { +func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) { taskID := t.ID status := t.Status log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status) // handle checkin - if data != "" { + if sc.CheckIn != "" { var retainObj struct { Total int `json:"total"` Retained int `json:"retained"` Deleted []*selector.Result `json:"deleted"` } - if err := json.Unmarshal([]byte(data), &retainObj); err != nil { + if err := json.Unmarshal([]byte(sc.CheckIn), &retainObj); err != nil { log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err) return err diff --git a/src/pkg/scheduler/callback.go b/src/pkg/scheduler/callback.go index 1bbe9d9de..7b51fddce 100644 --- a/src/pkg/scheduler/callback.go +++ b/src/pkg/scheduler/callback.go @@ -17,6 +17,7 @@ package scheduler import ( "context" "fmt" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" @@ -67,7 +68,7 @@ func callbackFuncExist(name string) bool { return exist } -func triggerCallback(ctx context.Context, task *task.Task, data string) (err error) { +func triggerCallback(ctx context.Context, task *task.Task, sc *job.StatusChange) (err error) { execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID) if err != nil { return err @@ -80,6 +81,16 @@ func triggerCallback(ctx context.Context, task *task.Task, data string) (err err if err != nil { return err } + // Try to update the schedule record with the checkin revision to avoid duplicated trigger + // refer to https://github.com/goharbor/harbor/issues/14683 for more details + n, err := Sched.(*scheduler).dao.UpdateRevision(ctx, schedule.ID, sc.Metadata.CheckInAt) + if err != nil { + return err + } + if n == 0 { + log.Warningf("got no schedule record with ID %d and revision < %d, ignore", schedule.ID, sc.Metadata.CheckInAt) + return nil + } callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName) if err != nil { return err diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index 658a44626..1ad9ed578 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -34,6 +34,7 @@ type schedule struct { VendorID int64 `orm:"column(vendor_id)"` CRONType string `orm:"column(cron_type)"` CRON string `orm:"column(cron)"` + Revision int64 `orm:"column(revision)"` // to identity the duplicated checkin hook from jobservice ExtraAttrs string `orm:"column(extra_attrs)"` CallbackFuncName string `orm:"column(callback_func_name)"` CallbackFuncParam string `orm:"column(callback_func_param)"` @@ -48,6 +49,7 @@ type DAO interface { Get(ctx context.Context, id int64) (s *schedule, err error) Delete(ctx context.Context, id int64) (err error) Update(ctx context.Context, s *schedule, props ...string) (err error) + UpdateRevision(ctx context.Context, id, revision int64) (n int64, err error) } type dao struct{} @@ -132,3 +134,13 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e } return nil } + +func (d *dao) UpdateRevision(ctx context.Context, id, revision int64) (int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + return ormer.QueryTable(&schedule{}).Filter("ID", id).Filter("Revision__lt", revision).Update(beegoorm.Params{ + "Revision": revision, + }) +} diff --git a/src/pkg/scheduler/mock_dao_test.go b/src/pkg/scheduler/mock_dao_test.go index 70e8504be..d599ade8f 100644 --- a/src/pkg/scheduler/mock_dao_test.go +++ b/src/pkg/scheduler/mock_dao_test.go @@ -115,3 +115,24 @@ func (_m *mockDAO) Update(ctx context.Context, s *schedule, props ...string) err return r0 } + +// UpdateRevision provides a mock function with given fields: ctx, id, revision +func (_m *mockDAO) UpdateRevision(ctx context.Context, id int64, revision int64) (int64, error) { + ret := _m.Called(ctx, id, revision) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, int64, int64) int64); ok { + r0 = rf(ctx, id, revision) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64, int64) error); ok { + r1 = rf(ctx, id, revision) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/src/pkg/task/hook.go b/src/pkg/task/hook.go index 1f4808d2f..c410998b5 100644 --- a/src/pkg/task/hook.go +++ b/src/pkg/task/hook.go @@ -17,7 +17,6 @@ package task import ( "context" "fmt" - "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" @@ -80,7 +79,7 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error { } t := &Task{} t.From(task) - return processor(ctx, t, sc.CheckIn) + return processor(ctx, t, sc) } // update task status diff --git a/src/pkg/task/hook_test.go b/src/pkg/task/hook_test.go index 8a975bae7..a7d115820 100644 --- a/src/pkg/task/hook_test.go +++ b/src/pkg/task/hook_test.go @@ -43,7 +43,7 @@ func (h *hookHandlerTestSuite) SetupTest() { func (h *hookHandlerTestSuite) TestHandle() { // handle check in data - checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, data string) (err error) { return nil } + checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) { return nil } defer delete(checkInProcessorRegistry, "test") h.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ { diff --git a/src/pkg/task/registry.go b/src/pkg/task/registry.go index 378915aaf..61b36d77a 100644 --- a/src/pkg/task/registry.go +++ b/src/pkg/task/registry.go @@ -17,6 +17,8 @@ package task import ( "context" "fmt" + + "github.com/goharbor/harbor/src/jobservice/job" ) var ( @@ -26,7 +28,7 @@ var ( ) // CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook -type CheckInProcessor func(ctx context.Context, task *Task, data string) (err error) +type CheckInProcessor func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) // StatusChangePostFunc is the function called after the task status changed type StatusChangePostFunc func(ctx context.Context, taskID int64, status string) (err error)