diff --git a/src/controller/replication/controller.go b/src/controller/replication/controller.go index 44ecca11e..ea6464efc 100644 --- a/src/controller/replication/controller.go +++ b/src/controller/replication/controller.go @@ -95,6 +95,16 @@ func (c *controller) Start(ctx context.Context, policy *model.Policy, resource * // may be submitted already when the process starts, so create a new context // with orm populated ctxx := orm.NewContext(context.Background(), c.ormCreator.Create()) + + // as we start a new transaction in the goroutine, the execution record may not + // be inserted yet, wait until it is ready before continue + if err := lib.RetryUntil(func() error { + _, err := c.execMgr.Get(ctxx, id) + return err + }); err != nil { + logger.Errorf("failed to wait the execution record to be inserted: %v", err) + } + err := c.flowCtl.Start(ctxx, id, policy, resource) if err == nil { // no err, return directly diff --git a/src/controller/replication/controller_test.go b/src/controller/replication/controller_test.go index 2ddaf207b..1079c2b98 100644 --- a/src/controller/replication/controller_test.go +++ b/src/controller/replication/controller_test.go @@ -61,6 +61,7 @@ func (r *replicationTestSuite) TestStart() { // got error when running the replication flow r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil) r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil) r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil) r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error")) @@ -78,6 +79,7 @@ func (r *replicationTestSuite) TestStart() { // got no error when running the replication flow r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) + r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil) r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) r.ormCreator.On("Create").Return(nil) id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual) diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go index fff8d3777..764a93e53 100644 --- a/src/jobservice/common/rds/scripts.go +++ b/src/jobservice/common/rds/scripts.go @@ -37,19 +37,17 @@ end // luaFuncCompareText is common lua script function var luaFuncCompareText = ` -local function compare(status, revision, checkInT) +local function compare(status, revision) local sCode = stCode(status) local aCode = stCode(ARGV[1]) local aRev = tonumber(ARGV[2]) or 0 local aCheckInT = tonumber(ARGV[3]) or 0 - if revision < aRev or - ( revision == aRev and sCode < aCode ) or - ( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT)) + ( revision == aRev and sCode <= aCode ) or + ( revision == aRev and aCheckInT ~= 0 ) then return 'ok' end - return 'no' end ` @@ -129,7 +127,7 @@ if res then checkInAt = tonumber(res[3]) or 0 ack = res[4] - local reply = compare(st, rev, checkInAt) + local reply = compare(st, rev) if reply == 'ok' then if not ack then @@ -142,7 +140,7 @@ if res then rev = a['revision'] checkInAt = a['check_in_at'] - local reply2 = compare(st, rev, checkInAt) + local reply2 = compare(st, rev) if reply2 == 'ok' then return 'ok' end @@ -178,7 +176,7 @@ local function canSetAck(jk, nrev) if ackv then -- ack existing local ack = cjson.decode(ackv) - local cmp = compare(ack['status'], ack['revision'], ack['check_in_at']) + local cmp = compare(ack['status'], ack['revision']) if cmp == 'ok' then return 'ok' end diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index ada25850d..bca88eb93 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -88,6 +88,10 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA log.Debugf("the database record for task %d created", id) // submit job to jobservice + // As all database operations are in a transaction which is committed until API returns, + // when the job is submitted to the jobservice and running, the task record may not + // insert yet, this will cause the status hook handler returning 404, and the jobservice + // will re-send the status hook again jobID, err := m.submitJob(ctx, id, jb) if err != nil { // failed to submit job to jobservice, delete the task record diff --git a/src/server/handler/job_status_hook.go b/src/server/handler/job_status_hook.go index 1b259b12d..499ff7144 100644 --- a/src/server/handler/job_status_hook.go +++ b/src/server/handler/job_status_hook.go @@ -19,9 +19,7 @@ import ( "net/http" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/lib/errors" libhttp "github.com/goharbor/harbor/src/lib/http" - "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/pkg/task" ) @@ -45,11 +43,17 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } if err := j.handler.Handle(r.Context(), sc); err != nil { - // ignore the not found error to avoid the jobservice re-sending the hook - if errors.IsNotFoundErr(err) { - log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err) - return - } + // When the status hook comes, the execution/task database record may not insert yet + // because of that the transaction isn't committed + // Do not ignore the NotFoundErr here to make jobservice resend the status hook + // again to avoid the status lost + /* + // ignore the not found error to avoid the jobservice re-sending the hook + if errors.IsNotFoundErr(err) { + log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err) + return + } + */ libhttp.SendError(w, err) return }