Merge pull request #14860 from ywk253100/210512_dup_task

[cherry-pick]Fix duplicate execution record issue
This commit is contained in:
Wenkai Yin(尹文开) 2021-05-12 16:50:37 +08:00 committed by GitHub
commit c1f9b14a22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 12 deletions

View File

@ -0,0 +1,2 @@
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS revision integer;
UPDATE schedule set revision = 0;

View File

@ -116,9 +116,9 @@ func scanTaskStatusChange(ctx context.Context, taskID int64, status string) (err
}
// scanTaskCheckInProcessor checkin processor handles the webhook of scan job
func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
func scanTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) {
checkInReport := &scan.CheckInReport{}
if err := checkInReport.FromJSON(data); err != nil {
if err := checkInReport.FromJSON(sc.CheckIn); err != nil {
log.G(ctx).WithField("error", err).Errorf("failed to convert data to report")
return err
}

View File

@ -135,7 +135,7 @@ func (suite *CallbackTestSuite) TestScanTaskStatusChange() {
func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() {
{
suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, "report"))
suite.Error(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: "report"}))
}
{
@ -156,7 +156,7 @@ func (suite *CallbackTestSuite) TestScanTaskCheckInProcessor() {
}
r, _ := json.Marshal(report)
suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, string(r)))
suite.NoError(scanTaskCheckInProcessor(context.TODO(), &task.Task{}, &job.StatusChange{CheckIn: string(r)}))
}
}

View File

@ -19,18 +19,18 @@ func init() {
}
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, data string) (err error) {
func retentionTaskCheckInProcessor(ctx context.Context, t *task.Task, sc *job.StatusChange) (err error) {
taskID := t.ID
status := t.Status
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
// handle checkin
if data != "" {
if sc.CheckIn != "" {
var retainObj struct {
Total int `json:"total"`
Retained int `json:"retained"`
Deleted []*selector.Result `json:"deleted"`
}
if err := json.Unmarshal([]byte(data), &retainObj); err != nil {
if err := json.Unmarshal([]byte(sc.CheckIn), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
return err

View File

@ -17,6 +17,7 @@ package scheduler
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
@ -67,7 +68,7 @@ func callbackFuncExist(name string) bool {
return exist
}
func triggerCallback(ctx context.Context, task *task.Task, data string) (err error) {
func triggerCallback(ctx context.Context, task *task.Task, sc *job.StatusChange) (err error) {
execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID)
if err != nil {
return err
@ -80,6 +81,16 @@ func triggerCallback(ctx context.Context, task *task.Task, data string) (err err
if err != nil {
return err
}
// Try to update the schedule record with the checkin revision to avoid duplicated trigger
// refer to https://github.com/goharbor/harbor/issues/14683 for more details
n, err := Sched.(*scheduler).dao.UpdateRevision(ctx, schedule.ID, sc.Metadata.CheckInAt)
if err != nil {
return err
}
if n == 0 {
log.Warningf("got no schedule record with ID %d and revision < %d, ignore", schedule.ID, sc.Metadata.CheckInAt)
return nil
}
callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName)
if err != nil {
return err

View File

@ -34,6 +34,7 @@ type schedule struct {
VendorID int64 `orm:"column(vendor_id)"`
CRONType string `orm:"column(cron_type)"`
CRON string `orm:"column(cron)"`
Revision int64 `orm:"column(revision)"` // to identity the duplicated checkin hook from jobservice
ExtraAttrs string `orm:"column(extra_attrs)"`
CallbackFuncName string `orm:"column(callback_func_name)"`
CallbackFuncParam string `orm:"column(callback_func_param)"`
@ -48,6 +49,7 @@ type DAO interface {
Get(ctx context.Context, id int64) (s *schedule, err error)
Delete(ctx context.Context, id int64) (err error)
Update(ctx context.Context, s *schedule, props ...string) (err error)
UpdateRevision(ctx context.Context, id, revision int64) (n int64, err error)
}
type dao struct{}
@ -132,3 +134,13 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e
}
return nil
}
func (d *dao) UpdateRevision(ctx context.Context, id, revision int64) (int64, error) {
ormer, err := orm.FromContext(ctx)
if err != nil {
return 0, err
}
return ormer.QueryTable(&schedule{}).Filter("ID", id).Filter("Revision__lt", revision).Update(beegoorm.Params{
"Revision": revision,
})
}

View File

@ -115,3 +115,24 @@ func (_m *mockDAO) Update(ctx context.Context, s *schedule, props ...string) err
return r0
}
// UpdateRevision provides a mock function with given fields: ctx, id, revision
func (_m *mockDAO) UpdateRevision(ctx context.Context, id int64, revision int64) (int64, error) {
ret := _m.Called(ctx, id, revision)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, int64, int64) int64); ok {
r0 = rf(ctx, id, revision)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, int64, int64) error); ok {
r1 = rf(ctx, id, revision)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -17,7 +17,6 @@ package task
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
@ -80,7 +79,7 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error {
}
t := &Task{}
t.From(task)
return processor(ctx, t, sc.CheckIn)
return processor(ctx, t, sc)
}
// update task status

View File

@ -43,7 +43,7 @@ func (h *hookHandlerTestSuite) SetupTest() {
func (h *hookHandlerTestSuite) TestHandle() {
// handle check in data
checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, data string) (err error) { return nil }
checkInProcessorRegistry["test"] = func(ctx context.Context, task *Task, sc *job.StatusChange) (err error) { return nil }
defer delete(checkInProcessorRegistry, "test")
h.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{

View File

@ -17,6 +17,8 @@ package task
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
)
var (
@ -26,7 +28,7 @@ var (
)
// CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook
type CheckInProcessor func(ctx context.Context, task *Task, data string) (err error)
type CheckInProcessor func(ctx context.Context, task *Task, sc *job.StatusChange) (err error)
// StatusChangePostFunc is the function called after the task status changed
type StatusChangePostFunc func(ctx context.Context, taskID int64, status string) (err error)