mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-26 09:31:24 +01:00
Don't ignore the NotFoundErr when handling the status hook of tasks to avoid the status out of sync
Don't ignore the NotFoundErr when handling the status hook of tasks to avoid the status out of sync Fixes #14016 Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
b7c5fc0562
commit
e55c7d05ff
@ -95,6 +95,16 @@ func (c *controller) Start(ctx context.Context, policy *model.Policy, resource *
|
|||||||
// may be submitted already when the process starts, so create a new context
|
// may be submitted already when the process starts, so create a new context
|
||||||
// with orm populated
|
// with orm populated
|
||||||
ctxx := orm.NewContext(context.Background(), c.ormCreator.Create())
|
ctxx := orm.NewContext(context.Background(), c.ormCreator.Create())
|
||||||
|
|
||||||
|
// as we start a new transaction in the goroutine, the execution record may not
|
||||||
|
// be inserted yet, wait until it is ready before continue
|
||||||
|
if err := lib.RetryUntil(func() error {
|
||||||
|
_, err := c.execMgr.Get(ctxx, id)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
logger.Errorf("failed to wait the execution record to be inserted: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
err := c.flowCtl.Start(ctxx, id, policy, resource)
|
err := c.flowCtl.Start(ctxx, id, policy, resource)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// no err, return directly
|
// no err, return directly
|
||||||
|
@ -61,6 +61,7 @@ func (r *replicationTestSuite) TestStart() {
|
|||||||
|
|
||||||
// got error when running the replication flow
|
// got error when running the replication flow
|
||||||
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
|
||||||
r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
|
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
|
||||||
@ -78,6 +79,7 @@ func (r *replicationTestSuite) TestStart() {
|
|||||||
|
|
||||||
// got no error when running the replication flow
|
// got no error when running the replication flow
|
||||||
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
|
||||||
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
r.ormCreator.On("Create").Return(nil)
|
r.ormCreator.On("Create").Return(nil)
|
||||||
id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual)
|
id, err = r.ctl.Start(context.Background(), &model.Policy{Enabled: true}, nil, task.ExecutionTriggerManual)
|
||||||
|
@ -37,19 +37,17 @@ end
|
|||||||
|
|
||||||
// luaFuncCompareText is common lua script function
|
// luaFuncCompareText is common lua script function
|
||||||
var luaFuncCompareText = `
|
var luaFuncCompareText = `
|
||||||
local function compare(status, revision, checkInT)
|
local function compare(status, revision)
|
||||||
local sCode = stCode(status)
|
local sCode = stCode(status)
|
||||||
local aCode = stCode(ARGV[1])
|
local aCode = stCode(ARGV[1])
|
||||||
local aRev = tonumber(ARGV[2]) or 0
|
local aRev = tonumber(ARGV[2]) or 0
|
||||||
local aCheckInT = tonumber(ARGV[3]) or 0
|
local aCheckInT = tonumber(ARGV[3]) or 0
|
||||||
|
|
||||||
if revision < aRev or
|
if revision < aRev or
|
||||||
( revision == aRev and sCode < aCode ) or
|
( revision == aRev and sCode <= aCode ) or
|
||||||
( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT))
|
( revision == aRev and aCheckInT ~= 0 )
|
||||||
then
|
then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
|
|
||||||
return 'no'
|
return 'no'
|
||||||
end
|
end
|
||||||
`
|
`
|
||||||
@ -129,7 +127,7 @@ if res then
|
|||||||
checkInAt = tonumber(res[3]) or 0
|
checkInAt = tonumber(res[3]) or 0
|
||||||
ack = res[4]
|
ack = res[4]
|
||||||
|
|
||||||
local reply = compare(st, rev, checkInAt)
|
local reply = compare(st, rev)
|
||||||
|
|
||||||
if reply == 'ok' then
|
if reply == 'ok' then
|
||||||
if not ack then
|
if not ack then
|
||||||
@ -142,7 +140,7 @@ if res then
|
|||||||
rev = a['revision']
|
rev = a['revision']
|
||||||
checkInAt = a['check_in_at']
|
checkInAt = a['check_in_at']
|
||||||
|
|
||||||
local reply2 = compare(st, rev, checkInAt)
|
local reply2 = compare(st, rev)
|
||||||
if reply2 == 'ok' then
|
if reply2 == 'ok' then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
@ -178,7 +176,7 @@ local function canSetAck(jk, nrev)
|
|||||||
if ackv then
|
if ackv then
|
||||||
-- ack existing
|
-- ack existing
|
||||||
local ack = cjson.decode(ackv)
|
local ack = cjson.decode(ackv)
|
||||||
local cmp = compare(ack['status'], ack['revision'], ack['check_in_at'])
|
local cmp = compare(ack['status'], ack['revision'])
|
||||||
if cmp == 'ok' then
|
if cmp == 'ok' then
|
||||||
return 'ok'
|
return 'ok'
|
||||||
end
|
end
|
||||||
|
@ -88,6 +88,10 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA
|
|||||||
log.Debugf("the database record for task %d created", id)
|
log.Debugf("the database record for task %d created", id)
|
||||||
|
|
||||||
// submit job to jobservice
|
// submit job to jobservice
|
||||||
|
// As all database operations are in a transaction which is committed until API returns,
|
||||||
|
// when the job is submitted to the jobservice and running, the task record may not
|
||||||
|
// insert yet, this will cause the status hook handler returning 404, and the jobservice
|
||||||
|
// will re-send the status hook again
|
||||||
jobID, err := m.submitJob(ctx, id, jb)
|
jobID, err := m.submitJob(ctx, id, jb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// failed to submit job to jobservice, delete the task record
|
// failed to submit job to jobservice, delete the task record
|
||||||
|
@ -19,9 +19,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
|
||||||
libhttp "github.com/goharbor/harbor/src/lib/http"
|
libhttp "github.com/goharbor/harbor/src/lib/http"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/task"
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -45,11 +43,17 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := j.handler.Handle(r.Context(), sc); err != nil {
|
if err := j.handler.Handle(r.Context(), sc); err != nil {
|
||||||
// ignore the not found error to avoid the jobservice re-sending the hook
|
// When the status hook comes, the execution/task database record may not insert yet
|
||||||
if errors.IsNotFoundErr(err) {
|
// because of that the transaction isn't committed
|
||||||
log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err)
|
// Do not ignore the NotFoundErr here to make jobservice resend the status hook
|
||||||
return
|
// again to avoid the status lost
|
||||||
}
|
/*
|
||||||
|
// ignore the not found error to avoid the jobservice re-sending the hook
|
||||||
|
if errors.IsNotFoundErr(err) {
|
||||||
|
log.Warningf("got not found error: %v, ignore it to avoid subsequent retrying webhooks from jobservice", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*/
|
||||||
libhttp.SendError(w, err)
|
libhttp.SendError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user