mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-22 16:48:30 +01:00
Merge pull request #12618 from ywk253100/200729_tk_mgr
Refresh the status of execution for every status changing of task
This commit is contained in:
commit
d4f18139ef
@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS execution (
|
||||
extra_attrs JSON,
|
||||
start_time timestamp DEFAULT CURRENT_TIMESTAMP,
|
||||
end_time timestamp,
|
||||
revision int,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
|
@ -16,7 +16,10 @@ package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
@ -36,14 +39,23 @@ type ExecutionDAO interface {
|
||||
Update(ctx context.Context, execution *Execution, props ...string) (err error)
|
||||
// Delete the specified execution
|
||||
Delete(ctx context.Context, id int64) (err error)
|
||||
// GetMetrics returns the task metrics for the specified execution
|
||||
GetMetrics(ctx context.Context, id int64) (metrics *Metrics, err error)
|
||||
// RefreshStatus refreshes the status of the specified execution according to it's tasks. If it's status
|
||||
// is final, update the end time as well
|
||||
RefreshStatus(ctx context.Context, id int64) (err error)
|
||||
}
|
||||
|
||||
// NewExecutionDAO returns an instance of ExecutionDAO
|
||||
func NewExecutionDAO() ExecutionDAO {
|
||||
return &executionDAO{}
|
||||
return &executionDAO{
|
||||
taskDAO: NewTaskDAO(),
|
||||
}
|
||||
}
|
||||
|
||||
type executionDAO struct{}
|
||||
type executionDAO struct {
|
||||
taskDAO TaskDAO
|
||||
}
|
||||
|
||||
func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
|
||||
if query != nil {
|
||||
@ -132,3 +144,151 @@ func (e *executionDAO) Delete(ctx context.Context, id int64) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *executionDAO) GetMetrics(ctx context.Context, id int64) (*Metrics, error) {
|
||||
scs, err := e.taskDAO.ListStatusCount(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics := &Metrics{}
|
||||
if len(scs) == 0 {
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
for _, sc := range scs {
|
||||
switch sc.Status {
|
||||
case job.SuccessStatus.String():
|
||||
metrics.SuccessTaskCount = sc.Count
|
||||
case job.ErrorStatus.String():
|
||||
metrics.ErrorTaskCount = sc.Count
|
||||
case job.PendingStatus.String():
|
||||
metrics.PendingTaskCount = sc.Count
|
||||
case job.RunningStatus.String():
|
||||
metrics.RunningTaskCount = sc.Count
|
||||
case job.ScheduledStatus.String():
|
||||
metrics.ScheduledTaskCount = sc.Count
|
||||
case job.StoppedStatus.String():
|
||||
metrics.StoppedTaskCount = sc.Count
|
||||
default:
|
||||
log.Errorf("unknown task status: %s", sc.Status)
|
||||
}
|
||||
}
|
||||
metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount +
|
||||
metrics.PendingTaskCount + metrics.RunningTaskCount +
|
||||
metrics.ScheduledTaskCount + metrics.StoppedTaskCount
|
||||
return metrics, nil
|
||||
}
|
||||
func (e *executionDAO) RefreshStatus(ctx context.Context, id int64) error {
|
||||
// as the status of the execution can be refreshed by multiple operators concurrently
|
||||
// we use the optimistic locking to avoid the conflict and retry 5 times at most
|
||||
for i := 0; i < 5; i++ {
|
||||
retry, err := e.refreshStatus(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !retry {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("failed to refresh the status of the execution %d after %d retries", id, 5)
|
||||
}
|
||||
|
||||
func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, error) {
|
||||
execution, err := e.Get(ctx, id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
metrics, err := e.GetMetrics(ctx, id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// no task, return directly
|
||||
if metrics.TaskCount == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var status string
|
||||
if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 {
|
||||
status = job.RunningStatus.String()
|
||||
} else if metrics.ErrorTaskCount > 0 {
|
||||
status = job.ErrorStatus.String()
|
||||
} else if metrics.StoppedTaskCount > 0 {
|
||||
status = job.StoppedStatus.String()
|
||||
} else if metrics.SuccessTaskCount > 0 {
|
||||
status = job.SuccessStatus.String()
|
||||
}
|
||||
|
||||
ormer, err := orm.FromContext(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?`
|
||||
result, err := ormer.Raw(sql, status, id, execution.Revision).Exec()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
n, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// if the count of affected rows is 0, that means the execution is updating by others, retry
|
||||
if n == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
/* this is another solution to solve the concurrency issue for refreshing the execution status
|
||||
// set a score for each status:
|
||||
// pending, running, scheduled - 4
|
||||
// error - 3
|
||||
// stopped - 2
|
||||
// success - 1
|
||||
// and set the status of record with highest score as the status of execution
|
||||
sql := `with status_score as (
|
||||
select status,
|
||||
case
|
||||
when status='%s' or status='%s' or status='%s' then 4
|
||||
when status='%s' then 3
|
||||
when status='%s' then 2
|
||||
when status='%s' then 1
|
||||
else 0
|
||||
end as score
|
||||
from task
|
||||
where execution_id=?
|
||||
group by status
|
||||
)
|
||||
update execution
|
||||
set status=(
|
||||
select
|
||||
case
|
||||
when max(score)=4 then '%s'
|
||||
when max(score)=3 then '%s'
|
||||
when max(score)=2 then '%s'
|
||||
when max(score)=1 then '%s'
|
||||
when max(score)=0 then ''
|
||||
end as status
|
||||
from status_score)
|
||||
where id = ?`
|
||||
sql = fmt.Sprintf(sql, job.PendingStatus.String(), job.RunningStatus.String(), job.ScheduledStatus.String(),
|
||||
job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String(),
|
||||
job.RunningStatus.String(), job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String())
|
||||
if _, err = ormer.Raw(sql, id, id).Exec(); err != nil {
|
||||
return err
|
||||
}
|
||||
*/
|
||||
|
||||
// update the end time if the status is final, otherwise set the end time as NULL, this is useful
|
||||
// for retrying jobs
|
||||
sql = `update execution
|
||||
set end_time = (
|
||||
case
|
||||
when status='%s' or status='%s' or status='%s' then (
|
||||
select max(end_time)
|
||||
from task
|
||||
where execution_id=?)
|
||||
else NULL
|
||||
end)
|
||||
where id=?`
|
||||
sql = fmt.Sprintf(sql, job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String())
|
||||
_, err = ormer.Raw(sql, id, id).Exec()
|
||||
return false, err
|
||||
}
|
||||
|
@ -17,8 +17,10 @@ package dao
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
@ -29,13 +31,17 @@ type executionDAOTestSuite struct {
|
||||
suite.Suite
|
||||
ctx context.Context
|
||||
executionDAO *executionDAO
|
||||
taskDao *taskDAO
|
||||
executionID int64
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) SetupSuite() {
|
||||
dao.PrepareTestForPostgresSQL()
|
||||
e.ctx = orm.Context()
|
||||
e.executionDAO = &executionDAO{}
|
||||
e.taskDao = &taskDAO{}
|
||||
e.executionDAO = &executionDAO{
|
||||
taskDAO: e.taskDao,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) SetupTest() {
|
||||
@ -116,6 +122,163 @@ func (e *executionDAOTestSuite) TestDelete() {
|
||||
// happy pass is covered by TearDownTest
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) TestGetMetrics() {
|
||||
taskID01, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.SuccessStatus.String(),
|
||||
StatusCode: job.SuccessStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID01)
|
||||
|
||||
taskID02, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.StoppedStatus.String(),
|
||||
StatusCode: job.StoppedStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID02)
|
||||
|
||||
taskID03, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.ErrorStatus.String(),
|
||||
StatusCode: job.ErrorStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID03)
|
||||
|
||||
taskID04, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.PendingStatus.String(),
|
||||
StatusCode: job.PendingStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID04)
|
||||
|
||||
taskID05, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.RunningStatus.String(),
|
||||
StatusCode: job.RunningStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID05)
|
||||
|
||||
taskID06, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
StatusCode: job.ScheduledStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID06)
|
||||
|
||||
metrics, err := e.executionDAO.GetMetrics(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(int64(6), metrics.TaskCount)
|
||||
e.Equal(int64(1), metrics.SuccessTaskCount)
|
||||
e.Equal(int64(1), metrics.StoppedTaskCount)
|
||||
e.Equal(int64(1), metrics.ErrorTaskCount)
|
||||
e.Equal(int64(1), metrics.PendingTaskCount)
|
||||
e.Equal(int64(1), metrics.RunningTaskCount)
|
||||
e.Equal(int64(1), metrics.ScheduledTaskCount)
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) TestRefreshStatus() {
|
||||
// contains tasks with status: success
|
||||
taskID01, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.SuccessStatus.String(),
|
||||
StatusCode: job.SuccessStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
EndTime: time.Now(),
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID01)
|
||||
|
||||
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
execution, err := e.executionDAO.Get(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(job.SuccessStatus.String(), execution.Status)
|
||||
e.NotEmpty(execution.EndTime)
|
||||
|
||||
// contains tasks with status: stopped
|
||||
taskID02, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.StoppedStatus.String(),
|
||||
StatusCode: job.StoppedStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
EndTime: time.Now(),
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID02)
|
||||
|
||||
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(job.StoppedStatus.String(), execution.Status)
|
||||
e.NotEmpty(execution.EndTime)
|
||||
|
||||
// contains tasks with status: error
|
||||
taskID03, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.ErrorStatus.String(),
|
||||
StatusCode: job.ErrorStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
EndTime: time.Now(),
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID03)
|
||||
|
||||
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(job.ErrorStatus.String(), execution.Status)
|
||||
e.NotEmpty(execution.EndTime)
|
||||
|
||||
// contains tasks with status: pending, running, scheduled
|
||||
taskID04, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.PendingStatus.String(),
|
||||
StatusCode: job.PendingStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID04)
|
||||
|
||||
taskID05, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.RunningStatus.String(),
|
||||
StatusCode: job.RunningStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID05)
|
||||
|
||||
taskID06, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: e.executionID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
StatusCode: job.ScheduledStatus.Code(),
|
||||
ExtraAttrs: "{}",
|
||||
})
|
||||
e.Require().Nil(err)
|
||||
defer e.taskDao.Delete(e.ctx, taskID06)
|
||||
|
||||
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(job.RunningStatus.String(), execution.Status)
|
||||
e.Empty(execution.EndTime)
|
||||
}
|
||||
|
||||
func TestExecutionDAOSuite(t *testing.T) {
|
||||
suite.Run(t, &executionDAOTestSuite{})
|
||||
}
|
||||
|
@ -39,6 +39,18 @@ type Execution struct {
|
||||
ExtraAttrs string `orm:"column(extra_attrs)"` // json string
|
||||
StartTime time.Time `orm:"column(start_time)"`
|
||||
EndTime time.Time `orm:"column(end_time)"`
|
||||
Revision int64 `orm:"column(revision)"`
|
||||
}
|
||||
|
||||
// Metrics is the task metrics for one execution
|
||||
type Metrics struct {
|
||||
TaskCount int64 `json:"task_count"`
|
||||
SuccessTaskCount int64 `json:"success_task_count"`
|
||||
ErrorTaskCount int64 `json:"error_task_count"`
|
||||
PendingTaskCount int64 `json:"pending_task_count"`
|
||||
RunningTaskCount int64 `json:"running_task_count"`
|
||||
ScheduledTaskCount int64 `json:"scheduled_task_count"`
|
||||
StoppedTaskCount int64 `json:"stopped_task_count"`
|
||||
}
|
||||
|
||||
// Task database model
|
||||
|
@ -203,88 +203,13 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
|
||||
}
|
||||
}
|
||||
|
||||
// if the status isn't null which means the status is set manually, return directly
|
||||
if len(exec.Status) > 0 {
|
||||
return exec
|
||||
}
|
||||
|
||||
// populate task metrics
|
||||
e.populateExecutionMetrics(ctx, exec)
|
||||
// populate status
|
||||
e.populateExecutionStatus(exec)
|
||||
// populate the end time
|
||||
e.populateExecutionEndTime(ctx, exec)
|
||||
metrics, err := e.executionDAO.GetMetrics(ctx, execution.ID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get metrics of the execution %d: %v", execution.ID, err)
|
||||
} else {
|
||||
exec.Metrics = metrics
|
||||
}
|
||||
|
||||
return exec
|
||||
}
|
||||
|
||||
func (e *executionManager) populateExecutionMetrics(ctx context.Context, execution *Execution) {
|
||||
scs, err := e.taskDAO.ListStatusCount(ctx, execution.ID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to list status count of execution %d: %v", execution.ID, err)
|
||||
return
|
||||
}
|
||||
if len(scs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
metrics := &Metrics{}
|
||||
for _, sc := range scs {
|
||||
switch sc.Status {
|
||||
case job.SuccessStatus.String():
|
||||
metrics.SuccessTaskCount = sc.Count
|
||||
case job.ErrorStatus.String():
|
||||
metrics.ErrorTaskCount = sc.Count
|
||||
case job.PendingStatus.String():
|
||||
metrics.PendingTaskCount = sc.Count
|
||||
case job.RunningStatus.String():
|
||||
metrics.RunningTaskCount = sc.Count
|
||||
case job.ScheduledStatus.String():
|
||||
metrics.ScheduledTaskCount = sc.Count
|
||||
case job.StoppedStatus.String():
|
||||
metrics.StoppedTaskCount = sc.Count
|
||||
default:
|
||||
log.Errorf("unknown task status: %s", sc.Status)
|
||||
}
|
||||
}
|
||||
metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount +
|
||||
metrics.PendingTaskCount + metrics.RunningTaskCount +
|
||||
metrics.ScheduledTaskCount + metrics.StoppedTaskCount
|
||||
execution.Metrics = metrics
|
||||
}
|
||||
|
||||
func (e *executionManager) populateExecutionStatus(execution *Execution) {
|
||||
metrics := execution.Metrics
|
||||
if metrics == nil {
|
||||
execution.Status = job.RunningStatus.String()
|
||||
return
|
||||
}
|
||||
if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 {
|
||||
execution.Status = job.RunningStatus.String()
|
||||
return
|
||||
}
|
||||
if metrics.ErrorTaskCount > 0 {
|
||||
execution.Status = job.ErrorStatus.String()
|
||||
return
|
||||
}
|
||||
if metrics.StoppedTaskCount > 0 {
|
||||
execution.Status = job.StoppedStatus.String()
|
||||
return
|
||||
}
|
||||
if metrics.SuccessTaskCount > 0 {
|
||||
execution.Status = job.SuccessStatus.String()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executionManager) populateExecutionEndTime(ctx context.Context, execution *Execution) {
|
||||
if !job.Status(execution.Status).Final() {
|
||||
return
|
||||
}
|
||||
endTime, err := e.taskDAO.GetMaxEndTime(ctx, execution.ID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get the max end time of the execution %d: %v", execution.ID, err)
|
||||
return
|
||||
}
|
||||
execution.EndTime = endTime
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package task
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
@ -127,10 +126,16 @@ func (e *executionManagerTestSuite) TestGet() {
|
||||
ID: 1,
|
||||
Status: job.SuccessStatus.String(),
|
||||
}, nil)
|
||||
e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{
|
||||
TaskCount: 1,
|
||||
SuccessTaskCount: 1,
|
||||
}, nil)
|
||||
exec, err := e.execMgr.Get(nil, 1)
|
||||
e.Require().Nil(err)
|
||||
e.Equal(int64(1), exec.ID)
|
||||
e.Equal(job.SuccessStatus.String(), exec.Status)
|
||||
e.Equal(int64(1), exec.Metrics.TaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.SuccessTaskCount)
|
||||
e.execDAO.AssertExpectations(e.T())
|
||||
}
|
||||
|
||||
@ -141,136 +146,20 @@ func (e *executionManagerTestSuite) TestList() {
|
||||
Status: job.SuccessStatus.String(),
|
||||
},
|
||||
}, nil)
|
||||
e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{
|
||||
TaskCount: 1,
|
||||
SuccessTaskCount: 1,
|
||||
}, nil)
|
||||
execs, err := e.execMgr.List(nil, nil)
|
||||
e.Require().Nil(err)
|
||||
e.Require().Len(execs, 1)
|
||||
e.Equal(int64(1), execs[0].ID)
|
||||
e.Equal(job.SuccessStatus.String(), execs[0].Status)
|
||||
e.Equal(int64(1), execs[0].Metrics.TaskCount)
|
||||
e.Equal(int64(1), execs[0].Metrics.SuccessTaskCount)
|
||||
e.execDAO.AssertExpectations(e.T())
|
||||
}
|
||||
|
||||
func (e *executionManagerTestSuite) TestPopulateExecutionMetrics() {
|
||||
e.taskDAO.On("ListStatusCount", mock.Anything, mock.Anything).Return([]*dao.StatusCount{
|
||||
{
|
||||
Status: job.SuccessStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
{
|
||||
Status: job.ErrorStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
{
|
||||
Status: job.StoppedStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
{
|
||||
Status: job.RunningStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
{
|
||||
Status: job.PendingStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
{
|
||||
Status: job.ScheduledStatus.String(),
|
||||
Count: 1,
|
||||
},
|
||||
}, nil)
|
||||
exec := &Execution{}
|
||||
e.execMgr.populateExecutionMetrics(nil, exec)
|
||||
e.Require().NotNil(exec.Metrics)
|
||||
e.Equal(int64(6), exec.Metrics.TaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.SuccessTaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.ErrorTaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.StoppedTaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.PendingTaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.RunningTaskCount)
|
||||
e.Equal(int64(1), exec.Metrics.ScheduledTaskCount)
|
||||
e.taskDAO.AssertExpectations(e.T())
|
||||
}
|
||||
|
||||
func (e *executionManagerTestSuite) TestPopulateExecutionStatus() {
|
||||
// running
|
||||
exec := &Execution{}
|
||||
e.execMgr.populateExecutionStatus(exec)
|
||||
e.Equal(job.RunningStatus.String(), exec.Status)
|
||||
|
||||
// running
|
||||
exec = &Execution{
|
||||
Metrics: &Metrics{
|
||||
SuccessTaskCount: 1,
|
||||
ErrorTaskCount: 1,
|
||||
PendingTaskCount: 1,
|
||||
RunningTaskCount: 1,
|
||||
ScheduledTaskCount: 1,
|
||||
StoppedTaskCount: 1,
|
||||
},
|
||||
}
|
||||
e.execMgr.populateExecutionStatus(exec)
|
||||
e.Equal(job.RunningStatus.String(), exec.Status)
|
||||
|
||||
// error
|
||||
exec = &Execution{
|
||||
Metrics: &Metrics{
|
||||
SuccessTaskCount: 1,
|
||||
ErrorTaskCount: 1,
|
||||
PendingTaskCount: 0,
|
||||
RunningTaskCount: 0,
|
||||
ScheduledTaskCount: 0,
|
||||
StoppedTaskCount: 1,
|
||||
},
|
||||
}
|
||||
e.execMgr.populateExecutionStatus(exec)
|
||||
e.Equal(job.ErrorStatus.String(), exec.Status)
|
||||
|
||||
// stopped
|
||||
exec = &Execution{
|
||||
Metrics: &Metrics{
|
||||
SuccessTaskCount: 1,
|
||||
ErrorTaskCount: 0,
|
||||
PendingTaskCount: 0,
|
||||
RunningTaskCount: 0,
|
||||
ScheduledTaskCount: 0,
|
||||
StoppedTaskCount: 1,
|
||||
},
|
||||
}
|
||||
e.execMgr.populateExecutionStatus(exec)
|
||||
e.Equal(job.StoppedStatus.String(), exec.Status)
|
||||
|
||||
// success
|
||||
exec = &Execution{
|
||||
Metrics: &Metrics{
|
||||
SuccessTaskCount: 1,
|
||||
ErrorTaskCount: 0,
|
||||
PendingTaskCount: 0,
|
||||
RunningTaskCount: 0,
|
||||
ScheduledTaskCount: 0,
|
||||
StoppedTaskCount: 0,
|
||||
},
|
||||
}
|
||||
e.execMgr.populateExecutionStatus(exec)
|
||||
e.Equal(job.SuccessStatus.String(), exec.Status)
|
||||
}
|
||||
|
||||
func (e *executionManagerTestSuite) TestPopulateExecutionEndTime() {
|
||||
// isn't final status
|
||||
exec := &Execution{
|
||||
Status: job.RunningStatus.String(),
|
||||
}
|
||||
e.execMgr.populateExecutionEndTime(nil, exec)
|
||||
e.Equal(time.Time{}, exec.EndTime)
|
||||
|
||||
// final status
|
||||
now := time.Now()
|
||||
exec = &Execution{
|
||||
Status: job.SuccessStatus.String(),
|
||||
}
|
||||
e.taskDAO.On("GetMaxEndTime", mock.Anything, mock.Anything).Return(now, nil)
|
||||
e.execMgr.populateExecutionEndTime(nil, exec)
|
||||
e.Equal(now, exec.EndTime)
|
||||
e.taskDAO.AssertExpectations(e.T())
|
||||
}
|
||||
|
||||
func TestExecutionManagerSuite(t *testing.T) {
|
||||
suite.Run(t, &executionManagerTestSuite{})
|
||||
}
|
||||
|
@ -38,12 +38,12 @@ type HookHandler struct {
|
||||
|
||||
// Handle the job status changing webhook
|
||||
func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusChange) error {
|
||||
task, err := h.taskDAO.Get(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// process check in data
|
||||
if len(sc.CheckIn) > 0 {
|
||||
task, err := h.taskDAO.Get(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
execution, err := h.executionDAO.Get(ctx, task.ExecutionID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -58,6 +58,10 @@ func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusCh
|
||||
return processor(ctx, t, sc)
|
||||
}
|
||||
|
||||
// update status
|
||||
return h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision)
|
||||
// update task status
|
||||
if err = h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision); err != nil {
|
||||
return err
|
||||
}
|
||||
// update execution status
|
||||
return h.executionDAO.RefreshStatus(ctx, task.ExecutionID)
|
||||
}
|
||||
|
@ -64,7 +64,12 @@ func (h *hookHandlerTestSuite) TestHandle() {
|
||||
h.SetupTest()
|
||||
|
||||
// handle status changing
|
||||
h.taskDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
||||
ID: 1,
|
||||
ExecutionID: 1,
|
||||
}, nil)
|
||||
h.taskDAO.On("UpdateStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
h.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(nil)
|
||||
sc = &job.StatusChange{
|
||||
Status: job.SuccessStatus.String(),
|
||||
Metadata: &job.StatsInfo{
|
||||
|
@ -95,6 +95,29 @@ func (_m *mockExecutionDAO) Get(ctx context.Context, id int64) (*dao.Execution,
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetMetrics provides a mock function with given fields: ctx, id
|
||||
func (_m *mockExecutionDAO) GetMetrics(ctx context.Context, id int64) (*dao.Metrics, error) {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 *dao.Metrics
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) *dao.Metrics); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*dao.Metrics)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||
r1 = rf(ctx, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// List provides a mock function with given fields: ctx, query
|
||||
func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Execution, error) {
|
||||
ret := _m.Called(ctx, query)
|
||||
@ -118,6 +141,20 @@ func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Ex
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// RefreshStatus provides a mock function with given fields: ctx, id
|
||||
func (_m *mockExecutionDAO) RefreshStatus(ctx context.Context, id int64) error {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Update provides a mock function with given fields: ctx, execution, props
|
||||
func (_m *mockExecutionDAO) Update(ctx context.Context, execution *dao.Execution, props ...string) error {
|
||||
_va := make([]interface{}, len(props))
|
||||
|
@ -49,8 +49,8 @@ type Execution struct {
|
||||
// 1. After creating the execution, there may be some errors before creating tasks, the
|
||||
// "StatusMessage" can contain the error message
|
||||
// 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case
|
||||
StatusMessage string `json:"status_message"`
|
||||
Metrics *Metrics `json:"metrics"`
|
||||
StatusMessage string `json:"status_message"`
|
||||
Metrics *dao.Metrics `json:"metrics"`
|
||||
// trigger type: manual/schedule/event
|
||||
Trigger string `json:"trigger"`
|
||||
// the customized attributes for different kinds of consumers
|
||||
@ -59,17 +59,6 @@ type Execution struct {
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
|
||||
// Metrics for tasks
|
||||
type Metrics struct {
|
||||
TaskCount int64 `json:"task_count"`
|
||||
SuccessTaskCount int64 `json:"success_task_count"`
|
||||
ErrorTaskCount int64 `json:"error_task_count"`
|
||||
PendingTaskCount int64 `json:"pending_task_count"`
|
||||
RunningTaskCount int64 `json:"running_task_count"`
|
||||
ScheduledTaskCount int64 `json:"scheduled_task_count"`
|
||||
StoppedTaskCount int64 `json:"stopped_task_count"`
|
||||
}
|
||||
|
||||
// Task is the unit for running. It stores the jobservice job records and related information
|
||||
type Task struct {
|
||||
ID int64 `json:"id"`
|
||||
|
@ -56,7 +56,7 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if err = j.handler.Handle(r.Context(), taskID, sc); err != nil {
|
||||
// ignore the not found error to avoid the jobservice re-sending the hook
|
||||
if errors.IsNotFoundErr(err) {
|
||||
log.Warningf("got the status change hook for a non existing task %d", taskID)
|
||||
log.Warningf("task %d does not exist, ignore the not found error to avoid subsequent retrying webhooks from jobservice", taskID)
|
||||
return
|
||||
}
|
||||
libhttp.SendError(w, err)
|
||||
|
Loading…
Reference in New Issue
Block a user