diff --git a/make/migrations/postgresql/0052_2.2.2_schema.up.sql b/make/migrations/postgresql/0052_2.2.2_schema.up.sql new file mode 100644 index 000000000..192bd5c3d --- /dev/null +++ b/make/migrations/postgresql/0052_2.2.2_schema.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE schedule ADD COLUMN IF NOT EXISTS revision integer; +UPDATE schedule set revision = 0; \ No newline at end of file diff --git a/src/controller/scan/callback.go b/src/controller/scan/callback.go index 04546d383..de292f547 100644 --- a/src/controller/scan/callback.go +++ b/src/controller/scan/callback.go @@ -116,9 +116,9 @@ func scanTaskStatusChange(ctx context.Context, taskID int64, status string) (err } // scanTaskCheckInProcessor checkin processor handles the webhook of scan job -func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) { +func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) { checkInReport := &scan.CheckInReport{} - if err := checkInReport.FromJSON(data); err != nil { + if err := checkInReport.FromJSON(sc.CheckIn); err != nil { log.G(ctx).WithField("error", err).Errorf("failed to convert data to report") return err } diff --git a/src/controller/scan/callback_test.go b/src/controller/scan/callback_test.go index ef594f5f8..4d2d55fc2 100644 --- a/src/controller/scan/callback_test.go +++ b/src/controller/scan/callback_test.go @@ -135,7 +135,7 @@ func (suite *CallbackTestSuite) TestScanTaskStatusChange() { func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() { { - suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, "report")) + suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: "report"})) } { @@ -156,7 +156,7 @@ func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() { } r, _ := json.Marshal(report) - suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, string(r))) + suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: string(r)})) } } diff --git a/src/pkg/retention/callback.go b/src/pkg/retention/callback.go index 43b2b8d4a..61d48fe9f 100644 --- a/src/pkg/retention/callback.go +++ b/src/pkg/retention/callback.go @@ -19,18 +19,18 @@ func init() { } -func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) { +func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) { taskID := t.ID status := t.Status log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status) // handle checkin - if data != "" { + if sc.CheckIn != "" { var retainObj struct { Total int `json:"total"` Retained int `json:"retained"` Deleted []*selector.Result `json:"deleted"` } - if err := json.Unmarshal([]byte(data), &retainObj); err != nil { + if err := json.Unmarshal([]byte(sc.CheckIn), &retainObj); err != nil { log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err) return err diff --git a/src/pkg/scheduler/callback.go b/src/pkg/scheduler/callback.go index 1bbe9d9de..7b51fddce 100644 --- a/src/pkg/scheduler/callback.go +++ b/src/pkg/scheduler/callback.go @@ -17,6 +17,7 @@ package scheduler import ( "context" "fmt" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" @@ -67,7 +68,7 @@ func callbackFuncExist(name string) bool { return exist } -func triggerCallback(ctx context.Context, task *task.Task, data string) (err error) { +func triggerCallback(ctx context.Context, task *task.Task, sc *job.StatusChange) (err error) { execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID) if err != nil { return err @@ -80,6 +81,16 @@ func triggerCallback(ctx context.Context, task *task.Task, data string) (err err if err != nil { return err } + // Try to update the schedule record with the checkin revision to avoid duplicated trigger + // refer to https://github.com/goharbor/harbor/issues/14683 for more details + n, err := Sched.(*scheduler).dao.UpdateRevision(ctx, schedule.ID, sc.Metadata.CheckInAt) + if err != nil { + return err + } + if n == 0 { + log.Warningf("got no schedule record with ID %d and revision < %d, ignore", schedule.ID, sc.Metadata.CheckInAt) + return nil + } callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName) if err != nil { return err diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index 658a44626..1ad9ed578 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -34,6 +34,7 @@ type schedule struct { VendorID int64 `orm:"column(vendor_id)"` CRONType string `orm:"column(cron_type)"` CRON string `orm:"column(cron)"` + Revision int64 `orm:"column(revision)"` // to identity the duplicated checkin hook from jobservice ExtraAttrs string `orm:"column(extra_attrs)"` CallbackFuncName string `orm:"column(callback_func_name)"` CallbackFuncParam string `orm:"column(callback_func_param)"` @@ -48,6 +49,7 @@ type DAO interface { Get(ctx context.Context, id int64) (s *schedule, err error) Delete(ctx context.Context, id int64) (err error) Update(ctx context.Context, s *schedule, props ...string) (err error) + UpdateRevision(ctx context.Context, id, revision int64) (n int64, err error) } type dao struct{} @@ -132,3 +134,13 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e } return nil } + +func (d *dao) UpdateRevision(ctx context.Context, id, revision int64) (int64, error) { + ormer, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + return ormer.QueryTable(&schedule{}).Filter("ID", id).Filter("Revision__lt", revision).Update(beegoorm.Params{ + "Revision": revision, + }) +} diff --git a/src/pkg/scheduler/mock_dao_test.go b/src/pkg/scheduler/mock_dao_test.go index 70e8504be..d599ade8f 100644 --- a/src/pkg/scheduler/mock_dao_test.go +++ b/src/pkg/scheduler/mock_dao_test.go @@ -115,3 +115,24 @@ func (_m *mockDAO) Update(ctx context.Context, s *schedule, props ...string) err return r0 } + +// UpdateRevision provides a mock function with given fields: ctx, id, revision +func (_m *mockDAO) UpdateRevision(ctx context.Context, id int64, revision int64) (int64, error) { + ret := _m.Called(ctx, id, revision) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, int64, int64) int64); ok { + r0 = rf(ctx, id, revision) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64, int64) error); ok { + r1 = rf(ctx, id, revision) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/src/pkg/task/hook.go b/src/pkg/task/hook.go index 1f4808d2f..c410998b5 100644 --- a/src/pkg/task/hook.go +++ b/src/pkg/task/hook.go @@ -17,7 +17,6 @@ package task import ( "context" "fmt" - "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" @@ -80,7 +79,7 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error { } t := &Task{} t.From(task) - return processor(ctx, t, sc.CheckIn) + return processor(ctx, t, sc) } // update task status diff --git a/src/pkg/task/hook_test.go b/src/pkg/task/hook_test.go index 8a975bae7..a7d115820 100644 --- a/src/pkg/task/hook_test.go +++ b/src/pkg/task/hook_test.go @@ -43,7 +43,7 @@ func (h *hookHandlerTestSuite) SetupTest() { func (h *hookHandlerTestSuite) TestHandle() { // handle check in data - checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, data string) (err error) { return nil } + checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) { return nil } defer delete(checkInProcessorRegistry, "test") h.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ { diff --git a/src/pkg/task/registry.go b/src/pkg/task/registry.go index 378915aaf..61b36d77a 100644 --- a/src/pkg/task/registry.go +++ b/src/pkg/task/registry.go @@ -17,6 +17,8 @@ package task import ( "context" "fmt" + + "github.com/goharbor/harbor/src/jobservice/job" ) var ( @@ -26,7 +28,7 @@ var ( ) // CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook -type CheckInProcessor func(ctx context.Context, task *Task, data string) (err error) +type CheckInProcessor func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) // StatusChangePostFunc is the function called after the task status changed type StatusChangePostFunc func(ctx context.Context, taskID int64, status string) (err error)