From 0fd230c2d65c47c798cab240a1bb775a56f5d55f Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 29 Jul 2020 18:17:11 +0800 Subject: [PATCH] Refresh the status of execution for every status changing of task Refresh the status of execution for every status changing of task to support filtering executions by status directly Signed-off-by: Wenkai Yin --- .../postgresql/0040_2.1.0_schema.up.sql | 1 + src/pkg/task/dao/execution.go | 164 ++++++++++++++++- src/pkg/task/dao/execution_test.go | 165 +++++++++++++++++- src/pkg/task/dao/model.go | 12 ++ src/pkg/task/execution.go | 87 +-------- src/pkg/task/execution_test.go | 135 ++------------ src/pkg/task/hook.go | 16 +- src/pkg/task/hook_test.go | 5 + src/pkg/task/mock_execution_dao_test.go | 37 ++++ src/pkg/task/model.go | 15 +- src/server/handler/job_status_hook.go | 2 +- 11 files changed, 412 insertions(+), 227 deletions(-) diff --git a/make/migrations/postgresql/0040_2.1.0_schema.up.sql b/make/migrations/postgresql/0040_2.1.0_schema.up.sql index 068b52d45..f1526dd55 100644 --- a/make/migrations/postgresql/0040_2.1.0_schema.up.sql +++ b/make/migrations/postgresql/0040_2.1.0_schema.up.sql @@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS execution ( extra_attrs JSON, start_time timestamp DEFAULT CURRENT_TIMESTAMP, end_time timestamp, + revision int, PRIMARY KEY (id) ); diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index 20f54f6a4..7aecc9e8a 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -16,7 +16,10 @@ package dao import ( "context" + "fmt" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" @@ -36,14 +39,23 @@ type ExecutionDAO interface { Update(ctx context.Context, execution *Execution, props ...string) (err error) // Delete the specified execution Delete(ctx context.Context, id int64) (err error) + // GetMetrics returns the task metrics for the specified execution + GetMetrics(ctx context.Context, id int64) (metrics *Metrics, err error) + // RefreshStatus refreshes the status of the specified execution according to it's tasks. If it's status + // is final, update the end time as well + RefreshStatus(ctx context.Context, id int64) (err error) } // NewExecutionDAO returns an instance of ExecutionDAO func NewExecutionDAO() ExecutionDAO { - return &executionDAO{} + return &executionDAO{ + taskDAO: NewTaskDAO(), + } } -type executionDAO struct{} +type executionDAO struct { + taskDAO TaskDAO +} func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) { if query != nil { @@ -132,3 +144,151 @@ func (e *executionDAO) Delete(ctx context.Context, id int64) error { } return nil } + +func (e *executionDAO) GetMetrics(ctx context.Context, id int64) (*Metrics, error) { + scs, err := e.taskDAO.ListStatusCount(ctx, id) + if err != nil { + return nil, err + } + metrics := &Metrics{} + if len(scs) == 0 { + return metrics, nil + } + + for _, sc := range scs { + switch sc.Status { + case job.SuccessStatus.String(): + metrics.SuccessTaskCount = sc.Count + case job.ErrorStatus.String(): + metrics.ErrorTaskCount = sc.Count + case job.PendingStatus.String(): + metrics.PendingTaskCount = sc.Count + case job.RunningStatus.String(): + metrics.RunningTaskCount = sc.Count + case job.ScheduledStatus.String(): + metrics.ScheduledTaskCount = sc.Count + case job.StoppedStatus.String(): + metrics.StoppedTaskCount = sc.Count + default: + log.Errorf("unknown task status: %s", sc.Status) + } + } + metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount + + metrics.PendingTaskCount + metrics.RunningTaskCount + + metrics.ScheduledTaskCount + metrics.StoppedTaskCount + return metrics, nil +} +func (e *executionDAO) RefreshStatus(ctx context.Context, id int64) error { + // as the status of the execution can be refreshed by multiple operators concurrently + // we use the optimistic locking to avoid the conflict and retry 5 times at most + for i := 0; i < 5; i++ { + retry, err := e.refreshStatus(ctx, id) + if err != nil { + return err + } + if !retry { + return nil + } + } + return fmt.Errorf("failed to refresh the status of the execution %d after %d retries", id, 5) +} + +func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, error) { + execution, err := e.Get(ctx, id) + if err != nil { + return false, err + } + metrics, err := e.GetMetrics(ctx, id) + if err != nil { + return false, err + } + // no task, return directly + if metrics.TaskCount == 0 { + return false, nil + } + + var status string + if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 { + status = job.RunningStatus.String() + } else if metrics.ErrorTaskCount > 0 { + status = job.ErrorStatus.String() + } else if metrics.StoppedTaskCount > 0 { + status = job.StoppedStatus.String() + } else if metrics.SuccessTaskCount > 0 { + status = job.SuccessStatus.String() + } + + ormer, err := orm.FromContext(ctx) + if err != nil { + return false, err + } + sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?` + result, err := ormer.Raw(sql, status, id, execution.Revision).Exec() + if err != nil { + return false, err + } + n, err := result.RowsAffected() + if err != nil { + return false, err + } + // if the count of affected rows is 0, that means the execution is updating by others, retry + if n == 0 { + return true, nil + } + + /* this is another solution to solve the concurrency issue for refreshing the execution status + // set a score for each status: + // pending, running, scheduled - 4 + // error - 3 + // stopped - 2 + // success - 1 + // and set the status of record with highest score as the status of execution + sql := `with status_score as ( + select status, + case + when status='%s' or status='%s' or status='%s' then 4 + when status='%s' then 3 + when status='%s' then 2 + when status='%s' then 1 + else 0 + end as score + from task + where execution_id=? + group by status + ) + update execution + set status=( + select + case + when max(score)=4 then '%s' + when max(score)=3 then '%s' + when max(score)=2 then '%s' + when max(score)=1 then '%s' + when max(score)=0 then '' + end as status + from status_score) + where id = ?` + sql = fmt.Sprintf(sql, job.PendingStatus.String(), job.RunningStatus.String(), job.ScheduledStatus.String(), + job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String(), + job.RunningStatus.String(), job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String()) + if _, err = ormer.Raw(sql, id, id).Exec(); err != nil { + return err + } + */ + + // update the end time if the status is final, otherwise set the end time as NULL, this is useful + // for retrying jobs + sql = `update execution + set end_time = ( + case + when status='%s' or status='%s' or status='%s' then ( + select max(end_time) + from task + where execution_id=?) + else NULL + end) + where id=?` + sql = fmt.Sprintf(sql, job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String()) + _, err = ormer.Raw(sql, id, id).Exec() + return false, err +} diff --git a/src/pkg/task/dao/execution_test.go b/src/pkg/task/dao/execution_test.go index 4c3ce727b..cf1cdbe81 100644 --- a/src/pkg/task/dao/execution_test.go +++ b/src/pkg/task/dao/execution_test.go @@ -17,8 +17,10 @@ package dao import ( "context" "testing" + "time" "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" @@ -29,13 +31,17 @@ type executionDAOTestSuite struct { suite.Suite ctx context.Context executionDAO *executionDAO + taskDao *taskDAO executionID int64 } func (e *executionDAOTestSuite) SetupSuite() { dao.PrepareTestForPostgresSQL() e.ctx = orm.Context() - e.executionDAO = &executionDAO{} + e.taskDao = &taskDAO{} + e.executionDAO = &executionDAO{ + taskDAO: e.taskDao, + } } func (e *executionDAOTestSuite) SetupTest() { @@ -116,6 +122,163 @@ func (e *executionDAOTestSuite) TestDelete() { // happy pass is covered by TearDownTest } +func (e *executionDAOTestSuite) TestGetMetrics() { + taskID01, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.SuccessStatus.String(), + StatusCode: job.SuccessStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID01) + + taskID02, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.StoppedStatus.String(), + StatusCode: job.StoppedStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID02) + + taskID03, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.ErrorStatus.String(), + StatusCode: job.ErrorStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID03) + + taskID04, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.PendingStatus.String(), + StatusCode: job.PendingStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID04) + + taskID05, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.RunningStatus.String(), + StatusCode: job.RunningStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID05) + + taskID06, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.ScheduledStatus.String(), + StatusCode: job.ScheduledStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID06) + + metrics, err := e.executionDAO.GetMetrics(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal(int64(6), metrics.TaskCount) + e.Equal(int64(1), metrics.SuccessTaskCount) + e.Equal(int64(1), metrics.StoppedTaskCount) + e.Equal(int64(1), metrics.ErrorTaskCount) + e.Equal(int64(1), metrics.PendingTaskCount) + e.Equal(int64(1), metrics.RunningTaskCount) + e.Equal(int64(1), metrics.ScheduledTaskCount) +} + +func (e *executionDAOTestSuite) TestRefreshStatus() { + // contains tasks with status: success + taskID01, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.SuccessStatus.String(), + StatusCode: job.SuccessStatus.Code(), + ExtraAttrs: "{}", + EndTime: time.Now(), + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID01) + + err = e.executionDAO.RefreshStatus(e.ctx, e.executionID) + e.Require().Nil(err) + execution, err := e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal(job.SuccessStatus.String(), execution.Status) + e.NotEmpty(execution.EndTime) + + // contains tasks with status: stopped + taskID02, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.StoppedStatus.String(), + StatusCode: job.StoppedStatus.Code(), + ExtraAttrs: "{}", + EndTime: time.Now(), + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID02) + + err = e.executionDAO.RefreshStatus(e.ctx, e.executionID) + e.Require().Nil(err) + execution, err = e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal(job.StoppedStatus.String(), execution.Status) + e.NotEmpty(execution.EndTime) + + // contains tasks with status: error + taskID03, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.ErrorStatus.String(), + StatusCode: job.ErrorStatus.Code(), + ExtraAttrs: "{}", + EndTime: time.Now(), + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID03) + + err = e.executionDAO.RefreshStatus(e.ctx, e.executionID) + e.Require().Nil(err) + execution, err = e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal(job.ErrorStatus.String(), execution.Status) + e.NotEmpty(execution.EndTime) + + // contains tasks with status: pending, running, scheduled + taskID04, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.PendingStatus.String(), + StatusCode: job.PendingStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID04) + + taskID05, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.RunningStatus.String(), + StatusCode: job.RunningStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID05) + + taskID06, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: e.executionID, + Status: job.ScheduledStatus.String(), + StatusCode: job.ScheduledStatus.Code(), + ExtraAttrs: "{}", + }) + e.Require().Nil(err) + defer e.taskDao.Delete(e.ctx, taskID06) + + err = e.executionDAO.RefreshStatus(e.ctx, e.executionID) + e.Require().Nil(err) + execution, err = e.executionDAO.Get(e.ctx, e.executionID) + e.Require().Nil(err) + e.Equal(job.RunningStatus.String(), execution.Status) + e.Empty(execution.EndTime) +} + func TestExecutionDAOSuite(t *testing.T) { suite.Run(t, &executionDAOTestSuite{}) } diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go index 3ca06b5ee..4862e8a35 100644 --- a/src/pkg/task/dao/model.go +++ b/src/pkg/task/dao/model.go @@ -39,6 +39,18 @@ type Execution struct { ExtraAttrs string `orm:"column(extra_attrs)"` // json string StartTime time.Time `orm:"column(start_time)"` EndTime time.Time `orm:"column(end_time)"` + Revision int64 `orm:"column(revision)"` +} + +// Metrics is the task metrics for one execution +type Metrics struct { + TaskCount int64 `json:"task_count"` + SuccessTaskCount int64 `json:"success_task_count"` + ErrorTaskCount int64 `json:"error_task_count"` + PendingTaskCount int64 `json:"pending_task_count"` + RunningTaskCount int64 `json:"running_task_count"` + ScheduledTaskCount int64 `json:"scheduled_task_count"` + StoppedTaskCount int64 `json:"stopped_task_count"` } // Task database model diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 7a182d82b..ff66a50c1 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -203,88 +203,13 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao } } - // if the status isn't null which means the status is set manually, return directly - if len(exec.Status) > 0 { - return exec - } - // populate task metrics - e.populateExecutionMetrics(ctx, exec) - // populate status - e.populateExecutionStatus(exec) - // populate the end time - e.populateExecutionEndTime(ctx, exec) + metrics, err := e.executionDAO.GetMetrics(ctx, execution.ID) + if err != nil { + log.Errorf("failed to get metrics of the execution %d: %v", execution.ID, err) + } else { + exec.Metrics = metrics + } return exec } - -func (e *executionManager) populateExecutionMetrics(ctx context.Context, execution *Execution) { - scs, err := e.taskDAO.ListStatusCount(ctx, execution.ID) - if err != nil { - log.Errorf("failed to list status count of execution %d: %v", execution.ID, err) - return - } - if len(scs) == 0 { - return - } - - metrics := &Metrics{} - for _, sc := range scs { - switch sc.Status { - case job.SuccessStatus.String(): - metrics.SuccessTaskCount = sc.Count - case job.ErrorStatus.String(): - metrics.ErrorTaskCount = sc.Count - case job.PendingStatus.String(): - metrics.PendingTaskCount = sc.Count - case job.RunningStatus.String(): - metrics.RunningTaskCount = sc.Count - case job.ScheduledStatus.String(): - metrics.ScheduledTaskCount = sc.Count - case job.StoppedStatus.String(): - metrics.StoppedTaskCount = sc.Count - default: - log.Errorf("unknown task status: %s", sc.Status) - } - } - metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount + - metrics.PendingTaskCount + metrics.RunningTaskCount + - metrics.ScheduledTaskCount + metrics.StoppedTaskCount - execution.Metrics = metrics -} - -func (e *executionManager) populateExecutionStatus(execution *Execution) { - metrics := execution.Metrics - if metrics == nil { - execution.Status = job.RunningStatus.String() - return - } - if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 { - execution.Status = job.RunningStatus.String() - return - } - if metrics.ErrorTaskCount > 0 { - execution.Status = job.ErrorStatus.String() - return - } - if metrics.StoppedTaskCount > 0 { - execution.Status = job.StoppedStatus.String() - return - } - if metrics.SuccessTaskCount > 0 { - execution.Status = job.SuccessStatus.String() - return - } -} - -func (e *executionManager) populateExecutionEndTime(ctx context.Context, execution *Execution) { - if !job.Status(execution.Status).Final() { - return - } - endTime, err := e.taskDAO.GetMaxEndTime(ctx, execution.ID) - if err != nil { - log.Errorf("failed to get the max end time of the execution %d: %v", execution.ID, err) - return - } - execution.EndTime = endTime -} diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index 38be0abed..78e50383b 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -16,7 +16,6 @@ package task import ( "testing" - "time" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" @@ -127,10 +126,16 @@ func (e *executionManagerTestSuite) TestGet() { ID: 1, Status: job.SuccessStatus.String(), }, nil) + e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{ + TaskCount: 1, + SuccessTaskCount: 1, + }, nil) exec, err := e.execMgr.Get(nil, 1) e.Require().Nil(err) e.Equal(int64(1), exec.ID) e.Equal(job.SuccessStatus.String(), exec.Status) + e.Equal(int64(1), exec.Metrics.TaskCount) + e.Equal(int64(1), exec.Metrics.SuccessTaskCount) e.execDAO.AssertExpectations(e.T()) } @@ -141,136 +146,20 @@ func (e *executionManagerTestSuite) TestList() { Status: job.SuccessStatus.String(), }, }, nil) + e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{ + TaskCount: 1, + SuccessTaskCount: 1, + }, nil) execs, err := e.execMgr.List(nil, nil) e.Require().Nil(err) e.Require().Len(execs, 1) e.Equal(int64(1), execs[0].ID) e.Equal(job.SuccessStatus.String(), execs[0].Status) + e.Equal(int64(1), execs[0].Metrics.TaskCount) + e.Equal(int64(1), execs[0].Metrics.SuccessTaskCount) e.execDAO.AssertExpectations(e.T()) } -func (e *executionManagerTestSuite) TestPopulateExecutionMetrics() { - e.taskDAO.On("ListStatusCount", mock.Anything, mock.Anything).Return([]*dao.StatusCount{ - { - Status: job.SuccessStatus.String(), - Count: 1, - }, - { - Status: job.ErrorStatus.String(), - Count: 1, - }, - { - Status: job.StoppedStatus.String(), - Count: 1, - }, - { - Status: job.RunningStatus.String(), - Count: 1, - }, - { - Status: job.PendingStatus.String(), - Count: 1, - }, - { - Status: job.ScheduledStatus.String(), - Count: 1, - }, - }, nil) - exec := &Execution{} - e.execMgr.populateExecutionMetrics(nil, exec) - e.Require().NotNil(exec.Metrics) - e.Equal(int64(6), exec.Metrics.TaskCount) - e.Equal(int64(1), exec.Metrics.SuccessTaskCount) - e.Equal(int64(1), exec.Metrics.ErrorTaskCount) - e.Equal(int64(1), exec.Metrics.StoppedTaskCount) - e.Equal(int64(1), exec.Metrics.PendingTaskCount) - e.Equal(int64(1), exec.Metrics.RunningTaskCount) - e.Equal(int64(1), exec.Metrics.ScheduledTaskCount) - e.taskDAO.AssertExpectations(e.T()) -} - -func (e *executionManagerTestSuite) TestPopulateExecutionStatus() { - // running - exec := &Execution{} - e.execMgr.populateExecutionStatus(exec) - e.Equal(job.RunningStatus.String(), exec.Status) - - // running - exec = &Execution{ - Metrics: &Metrics{ - SuccessTaskCount: 1, - ErrorTaskCount: 1, - PendingTaskCount: 1, - RunningTaskCount: 1, - ScheduledTaskCount: 1, - StoppedTaskCount: 1, - }, - } - e.execMgr.populateExecutionStatus(exec) - e.Equal(job.RunningStatus.String(), exec.Status) - - // error - exec = &Execution{ - Metrics: &Metrics{ - SuccessTaskCount: 1, - ErrorTaskCount: 1, - PendingTaskCount: 0, - RunningTaskCount: 0, - ScheduledTaskCount: 0, - StoppedTaskCount: 1, - }, - } - e.execMgr.populateExecutionStatus(exec) - e.Equal(job.ErrorStatus.String(), exec.Status) - - // stopped - exec = &Execution{ - Metrics: &Metrics{ - SuccessTaskCount: 1, - ErrorTaskCount: 0, - PendingTaskCount: 0, - RunningTaskCount: 0, - ScheduledTaskCount: 0, - StoppedTaskCount: 1, - }, - } - e.execMgr.populateExecutionStatus(exec) - e.Equal(job.StoppedStatus.String(), exec.Status) - - // success - exec = &Execution{ - Metrics: &Metrics{ - SuccessTaskCount: 1, - ErrorTaskCount: 0, - PendingTaskCount: 0, - RunningTaskCount: 0, - ScheduledTaskCount: 0, - StoppedTaskCount: 0, - }, - } - e.execMgr.populateExecutionStatus(exec) - e.Equal(job.SuccessStatus.String(), exec.Status) -} - -func (e *executionManagerTestSuite) TestPopulateExecutionEndTime() { - // isn't final status - exec := &Execution{ - Status: job.RunningStatus.String(), - } - e.execMgr.populateExecutionEndTime(nil, exec) - e.Equal(time.Time{}, exec.EndTime) - - // final status - now := time.Now() - exec = &Execution{ - Status: job.SuccessStatus.String(), - } - e.taskDAO.On("GetMaxEndTime", mock.Anything, mock.Anything).Return(now, nil) - e.execMgr.populateExecutionEndTime(nil, exec) - e.Equal(now, exec.EndTime) - e.taskDAO.AssertExpectations(e.T()) -} - func TestExecutionManagerSuite(t *testing.T) { suite.Run(t, &executionManagerTestSuite{}) } diff --git a/src/pkg/task/hook.go b/src/pkg/task/hook.go index c5aed5bbd..320616a69 100644 --- a/src/pkg/task/hook.go +++ b/src/pkg/task/hook.go @@ -38,12 +38,12 @@ type HookHandler struct { // Handle the job status changing webhook func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusChange) error { + task, err := h.taskDAO.Get(ctx, taskID) + if err != nil { + return err + } // process check in data if len(sc.CheckIn) > 0 { - task, err := h.taskDAO.Get(ctx, taskID) - if err != nil { - return err - } execution, err := h.executionDAO.Get(ctx, task.ExecutionID) if err != nil { return err @@ -58,6 +58,10 @@ func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusCh return processor(ctx, t, sc) } - // update status - return h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision) + // update task status + if err = h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision); err != nil { + return err + } + // update execution status + return h.executionDAO.RefreshStatus(ctx, task.ExecutionID) } diff --git a/src/pkg/task/hook_test.go b/src/pkg/task/hook_test.go index 5a3dd97ca..87d732d9b 100644 --- a/src/pkg/task/hook_test.go +++ b/src/pkg/task/hook_test.go @@ -64,7 +64,12 @@ func (h *hookHandlerTestSuite) TestHandle() { h.SetupTest() // handle status changing + h.taskDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{ + ID: 1, + ExecutionID: 1, + }, nil) h.taskDAO.On("UpdateStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + h.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(nil) sc = &job.StatusChange{ Status: job.SuccessStatus.String(), Metadata: &job.StatsInfo{ diff --git a/src/pkg/task/mock_execution_dao_test.go b/src/pkg/task/mock_execution_dao_test.go index c63da61be..7480412cc 100644 --- a/src/pkg/task/mock_execution_dao_test.go +++ b/src/pkg/task/mock_execution_dao_test.go @@ -95,6 +95,29 @@ func (_m *mockExecutionDAO) Get(ctx context.Context, id int64) (*dao.Execution, return r0, r1 } +// GetMetrics provides a mock function with given fields: ctx, id +func (_m *mockExecutionDAO) GetMetrics(ctx context.Context, id int64) (*dao.Metrics, error) { + ret := _m.Called(ctx, id) + + var r0 *dao.Metrics + if rf, ok := ret.Get(0).(func(context.Context, int64) *dao.Metrics); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*dao.Metrics) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // List provides a mock function with given fields: ctx, query func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Execution, error) { ret := _m.Called(ctx, query) @@ -118,6 +141,20 @@ func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Ex return r0, r1 } +// RefreshStatus provides a mock function with given fields: ctx, id +func (_m *mockExecutionDAO) RefreshStatus(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Update provides a mock function with given fields: ctx, execution, props func (_m *mockExecutionDAO) Update(ctx context.Context, execution *dao.Execution, props ...string) error { _va := make([]interface{}, len(props)) diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go index 08f884796..ddc0b8018 100644 --- a/src/pkg/task/model.go +++ b/src/pkg/task/model.go @@ -49,8 +49,8 @@ type Execution struct { // 1. After creating the execution, there may be some errors before creating tasks, the // "StatusMessage" can contain the error message // 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case - StatusMessage string `json:"status_message"` - Metrics *Metrics `json:"metrics"` + StatusMessage string `json:"status_message"` + Metrics *dao.Metrics `json:"metrics"` // trigger type: manual/schedule/event Trigger string `json:"trigger"` // the customized attributes for different kinds of consumers @@ -59,17 +59,6 @@ type Execution struct { EndTime time.Time `json:"end_time"` } -// Metrics for tasks -type Metrics struct { - TaskCount int64 `json:"task_count"` - SuccessTaskCount int64 `json:"success_task_count"` - ErrorTaskCount int64 `json:"error_task_count"` - PendingTaskCount int64 `json:"pending_task_count"` - RunningTaskCount int64 `json:"running_task_count"` - ScheduledTaskCount int64 `json:"scheduled_task_count"` - StoppedTaskCount int64 `json:"stopped_task_count"` -} - // Task is the unit for running. It stores the jobservice job records and related information type Task struct { ID int64 `json:"id"` diff --git a/src/server/handler/job_status_hook.go b/src/server/handler/job_status_hook.go index efd66b48c..11b92f115 100644 --- a/src/server/handler/job_status_hook.go +++ b/src/server/handler/job_status_hook.go @@ -56,7 +56,7 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err = j.handler.Handle(r.Context(), taskID, sc); err != nil { // ignore the not found error to avoid the jobservice re-sending the hook if errors.IsNotFoundErr(err) { - log.Warningf("got the status change hook for a non existing task %d", taskID) + log.Warningf("task %d does not exist, ignore the not found error to avoid subsequent retrying webhooks from jobservice", taskID) return } libhttp.SendError(w, err)