Merge pull request #14447 from ywk253100/210315_retention_exec

Add upgrade sql file introduced in 2.1.4
This commit is contained in:
Daniel Jiang 2021-03-16 11:14:45 +08:00 committed by GitHub
commit 5ab879a670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 42 additions and 14 deletions

View File

@ -0,0 +1,9 @@
/*
When upgrading from 2.0 to 2.1, the status and revision of retention schedule execution isn't migrated, correct them here.
As v2.2.0 isn't usable because of several serious bugs, we won't support upgrade from 2.1.4 to 2.2.0 anymore. After we add the
sql file here, users will get error when upgrading from 2.1.4 to 2.2.0 because of this sql file doesn't exist on 2.2.0
*/
UPDATE execution
SET revision=0, status=task.status
FROM task
WHERE execution.id=task.execution_id AND execution.vendor_type='SCHEDULER' AND execution.revision IS NULL;

View File

@ -222,7 +222,7 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
}
// when an execution is in final status, if it contains task that is a periodic or retrying job it will
// run again in the near future, so we must operate the stop action
// run again in the near future, so we must operate the stop action no matter the status is final or not
tasks, err := e.taskDAO.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": id,
@ -231,15 +231,23 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
if err != nil {
return err
}
// contains no task and the status isn't final, update the status to stop directly
if len(tasks) == 0 && !job.Status(execution.Status).Final() {
if len(tasks) == 0 {
// in final status, return directly
if job.Status(execution.Status).Final() {
return nil
}
// isn't in final status, update directly.
// as this is used for the corner case(the case that the execution exists but all tasks are disappeared. In normal
// cases, if the execution contains no tasks, it is already set as "success" by the upper level caller directly),
// no need to handle concurrency
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.StoppedStatus.String(),
Revision: execution.Revision + 1,
UpdateTime: now,
EndTime: now,
}, "Status", "UpdateTime", "EndTime")
}, "Status", "Revision", "UpdateTime", "EndTime")
}
for _, task := range tasks {
@ -248,10 +256,6 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
continue
}
}
// refresh the status explicitly in case that the execution status
// isn't refreshed by task status change hook
_, _, err = e.executionDAO.RefreshStatus(ctx, id)
return err
}

View File

@ -95,14 +95,28 @@ func (e *executionManagerTestSuite) TestMarkError() {
}
func (e *executionManagerTestSuite) TestStop() {
// the execution contains no tasks and the status is final
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.SuccessStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
err := e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
// reset the mocks
e.SetupTest()
// the execution contains no tasks and the status isn't final
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err = e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
@ -122,7 +136,6 @@ func (e *executionManagerTestSuite) TestStop() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err = e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
@ -143,7 +156,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err := e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().NotNil(err)
e.taskDAO.AssertExpectations(e.T())
@ -165,7 +177,6 @@ func (e *executionManagerTestSuite) TestStopAndWait() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(false, "", nil)
err = e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())

View File

@ -184,7 +184,9 @@ func (m *manager) Stop(ctx context.Context, id int64) error {
return err
}
log.Debugf("got job not found error for task %d, update it's status to stop directly", task.ID)
return nil
// as in this case no status hook will be sent, here refresh the execution status directly
_, _, err = m.execDAO.RefreshStatus(ctx, task.ExecutionID)
return err
}
return err
}

View File

@ -94,10 +94,12 @@ func (t *taskManagerTestSuite) TestStop() {
t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound)
t.dao.On("Update", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
t.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(true, "", nil)
err := t.mgr.Stop(nil, 1)
t.Require().Nil(err)
t.dao.AssertExpectations(t.T())
t.jsClient.AssertExpectations(t.T())
t.execDAO.AssertExpectations(t.T())
// reset mock
t.SetupTest()