Fix duplicate execution record issue

When the core service cannot response the checkin request in time, duplicated execution records may be created, this commit introduces the revision column to make sure there is only one record for one schedule trigger

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2021-05-11 21:19:36 +08:00
parent d1b553fd3a
commit c04f3a2aac
10 changed files with 59 additions and 12 deletions

View File

@ -0,0 +1,2 @@
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS revision integer;
UPDATE schedule set revision = 0;

View File

@ -116,9 +116,9 @@ func scanTaskStatusChange(ctx context.Context, taskID int64, status string) (err
}
// scanTaskCheckInProcessor checkin processor handles the webhook of scan job
func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) {
checkInReport := &scan.CheckInReport{}
if err := checkInReport.FromJSON(data); err != nil {
if err := checkInReport.FromJSON(sc.CheckIn); err != nil {
log.G(ctx).WithField("error", err).Errorf("failed to convert data to report")
return err
}

View File

@ -135,7 +135,7 @@ func (suite *CallbackTestSuite) TestScanTaskStatusChange() {
func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() {
{
suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, "report"))
suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: "report"}))
}
{
@ -156,7 +156,7 @@ func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() {
}
r, _ := json.Marshal(report)
suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, string(r)))
suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: string(r)}))
}
}

View File

@ -19,18 +19,18 @@ func init() {
}
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) {
taskID := t.ID
status := t.Status
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
// handle checkin
if data != "" {
if sc.CheckIn != "" {
var retainObj struct {
Total int `json:"total"`
Retained int `json:"retained"`
Deleted []*selector.Result `json:"deleted"`
}
if err := json.Unmarshal([]byte(data), &retainObj); err != nil {
if err := json.Unmarshal([]byte(sc.CheckIn), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
return err

View File

@ -17,6 +17,7 @@ package scheduler
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
@ -67,7 +68,7 @@ func callbackFuncExist(name string) bool {
return exist
}
func triggerCallback(ctx context.Context, task *task.Task, data string) (err error) {
func triggerCallback(ctx context.Context, task *task.Task, sc *job.StatusChange) (err error) {
execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID)
if err != nil {
return err
@ -80,6 +81,16 @@ func triggerCallback(ctx context.Context, task *task.Task, data string) (err err
if err != nil {
return err
}
// Try to update the schedule record with the checkin revision to avoid duplicated trigger
// refer to https://github.com/goharbor/harbor/issues/14683 for more details
n, err := Sched.(*scheduler).dao.UpdateRevision(ctx, schedule.ID, sc.Metadata.CheckInAt)
if err != nil {
return err
}
if n == 0 {
log.Warningf("got no schedule record with ID %d and revision < %d, ignore", schedule.ID, sc.Metadata.CheckInAt)
return nil
}
callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName)
if err != nil {
return err

View File

@ -34,6 +34,7 @@ type schedule struct {
VendorID int64 `orm:"column(vendor_id)"`
CRONType string `orm:"column(cron_type)"`
CRON string `orm:"column(cron)"`
Revision int64 `orm:"column(revision)"` // to identity the duplicated checkin hook from jobservice
ExtraAttrs string `orm:"column(extra_attrs)"`
CallbackFuncName string `orm:"column(callback_func_name)"`
CallbackFuncParam string `orm:"column(callback_func_param)"`
@ -48,6 +49,7 @@ type DAO interface {
Get(ctx context.Context, id int64) (s *schedule, err error)
Delete(ctx context.Context, id int64) (err error)
Update(ctx context.Context, s *schedule, props ...string) (err error)
UpdateRevision(ctx context.Context, id, revision int64) (n int64, err error)
}
type dao struct{}
@ -132,3 +134,13 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e
}
return nil
}
func (d *dao) UpdateRevision(ctx context.Context, id, revision int64) (int64, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return 0, err
}
return ormer.QueryTable(&schedule{}).Filter("ID", id).Filter("Revision__lt", revision).Update(beegoorm.Params{
"Revision": revision,
})
}

View File

@ -115,3 +115,24 @@ func (_m *mockDAO) Update(ctx context.Context, s *schedule, props ...string) err
return r0
}
// UpdateRevision provides a mock function with given fields: ctx, id, revision
func (_m *mockDAO) UpdateRevision(ctx context.Context, id int64, revision int64) (int64, error) {
ret := _m.Called(ctx, id, revision)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, int64, int64) int64); ok {
r0 = rf(ctx, id, revision)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, int64, int64) error); ok {
r1 = rf(ctx, id, revision)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -17,7 +17,6 @@ package task
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
@ -80,7 +79,7 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error {
}
t := &Task{}
t.From(task)
return processor(ctx, t, sc.CheckIn)
return processor(ctx, t, sc)
}
// update task status

View File

@ -43,7 +43,7 @@ func (h *hookHandlerTestSuite) SetupTest() {
func (h *hookHandlerTestSuite) TestHandle() {
// handle check in data
checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, data string) (err error) { return nil }
checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) { return nil }
defer delete(checkInProcessorRegistry, "test")
h.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{

View File

@ -17,6 +17,8 @@ package task
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
)
var (
@ -26,7 +28,7 @@ var (
)
// CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook
type CheckInProcessor func(ctx context.Context, task *Task, data string) (err error)
type CheckInProcessor func(ctx context.Context, task *Task, sc *job.StatusChange) (err error)
// StatusChangePostFunc is the function called after the task status changed
type StatusChangePostFunc func(ctx context.Context, taskID int64, status string) (err error)