Merge pull request #7154 from mmpei/replication_ng_execution_upgrade

Execution updateStatus logic upgrade
This commit is contained in:
Wenkai Yin 2019-03-19 17:14:40 +08:00 committed by GitHub
commit af692dd579
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 215 additions and 182 deletions

View File

@ -51,6 +51,12 @@ func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error)
qs = qs.OrderBy("-StartTime")
_, err := qs.All(&executions)
if err != nil || len(executions) == 0 {
return executions, err
}
for _, e := range executions {
fillExecution(e)
}
return executions, err
}
@ -81,9 +87,113 @@ func GetExecution(id int64) (*models.Execution, error) {
if err == orm.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
fillExecution(&t)
return &t, err
}
// fillExecution will fill the statistics data and status by tasks data
func fillExecution(execution *models.Execution) error {
if executionFinished(execution.Status) {
return nil
}
o := dao.GetOrmer()
sql := `select status, count(*) as c from replication_task where execution_id = ? group by status`
queryParam := make([]interface{}, 1)
queryParam = append(queryParam, execution.ID)
dt := []*models.TaskStat{}
count, err := o.Raw(sql, queryParam).QueryRows(&dt)
if err != nil {
log.Errorf("Query tasks error execution %d: %v", execution.ID, err)
return err
}
if count == 0 {
return nil
}
total := 0
for _, d := range dt {
status, _ := getStatus(d.Status)
updateStatusCount(execution, status, d.C)
total += d.C
}
if execution.Total != total {
log.Errorf("execution task count inconsistent and fixed, executionID=%d, execution.total=%d, tasks.count=%d",
execution.ID, execution.Total, total)
execution.Total = total
}
resetExecutionStatus(execution)
// if execution status changed to a final status, store to DB
if executionFinished(execution.Status) {
UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress,
models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped,
models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total)
}
return nil
}
func getStatus(status string) (string, error) {
switch status {
case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress:
return models.ExecutionStatusInProgress, nil
case models.TaskStatusSucceed:
return models.ExecutionStatusSucceed, nil
case models.TaskStatusStopped:
return models.ExecutionStatusStopped, nil
case models.TaskStatusFailed:
return models.ExecutionStatusFailed, nil
}
return "", fmt.Errorf("Not support task status ")
}
func updateStatusCount(execution *models.Execution, status string, delta int) error {
switch status {
case models.ExecutionStatusInProgress:
execution.InProgress += delta
case models.ExecutionStatusSucceed:
execution.Succeed += delta
case models.ExecutionStatusStopped:
execution.Stopped += delta
case models.ExecutionStatusFailed:
execution.Failed += delta
}
return nil
}
func resetExecutionStatus(execution *models.Execution) error {
execution.Status = generateStatus(execution)
if executionFinished(execution.Status) {
execution.EndTime = time.Now()
}
return nil
}
func generateStatus(execution *models.Execution) string {
if execution.InProgress > 0 {
return models.ExecutionStatusInProgress
} else if execution.Failed > 0 {
return models.ExecutionStatusFailed
} else if execution.Stopped > 0 {
return models.ExecutionStatusStopped
}
return models.ExecutionStatusSucceed
}
func executionFinished(status string) bool {
if status == models.ExecutionStatusInProgress {
return false
}
return true
}
// DeleteExecution ...
func DeleteExecution(id int64) error {
o := dao.GetOrmer()
@ -110,19 +220,10 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error
// AddTask ...
func AddTask(task *models.Task) (int64, error) {
o := dao.GetOrmer()
sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status)
values (?, ?, ?, ?, ?, ?) RETURNING id`
now := time.Now()
task.StartTime = now
args := []interface{}{}
args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status)
var taskID int64
err := o.Raw(sql, args).QueryRow(&taskID)
if err != nil {
return 0, err
}
return taskID, nil
return o.Insert(task)
}
// GetTask ...
@ -210,40 +311,11 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) {
// UpdateTaskStatus ...
func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) {
// can not use the globalOrm
o := orm.NewOrm()
o.Begin()
// query the task status
var task models.Task
sql := `select * from replication_task where id = ?`
if err := o.Raw(sql, id).QueryRow(&task); err != nil {
if err == orm.ErrNoRows {
o.Rollback()
return 0, err
}
}
// check status
satisfy := false
if len(statusCondition) == 0 {
satisfy = true
} else {
for _, stCondition := range statusCondition {
if task.Status == stCondition {
satisfy = true
break
}
}
}
if !satisfy {
o.Rollback()
return 0, fmt.Errorf("Status condition not match ")
}
o := dao.GetOrmer()
// update status
params := []interface{}{}
sql = `update replication_task set status = ?`
sql := `update replication_task set status = ?`
params = append(params, status)
if taskFinished(status) { // should update endTime
sql += ` ,end_time = ?`
@ -251,49 +323,26 @@ func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64
}
sql += ` where id = ?`
params = append(params, id)
_, err := o.Raw(sql, params).Exec()
log.Infof("Update task %d: %s -> %s", id, task.Status, status)
if err != nil {
log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status)
o.Rollback()
return 0, err
}
// query the execution
var execution models.Execution
sql = `select * from replication_execution where id = ?`
if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil {
if err == orm.ErrNoRows {
log.Errorf("Execution not found id: %d", task.ExecutionID)
o.Rollback()
return 0, err
if len(statusCondition) > 0 {
sql += ` and status in (`
for _, stCondition := range statusCondition {
sql += ` ?,`
params = append(params, stCondition)
}
}
// check execution data
execuStatus, _ := getStatus(task.Status)
count := getStatusCount(&execution, execuStatus)
if count <= 0 {
log.Errorf("Task statistics in execution inconsistent")
o.Commit()
return 1, nil
sql = sql[0 : len(sql)-1]
sql += `)`
}
// update execution data
updateStatusCount(&execution, execuStatus, -1)
execuStatusUp, _ := getStatus(status)
updateStatusCount(&execution, execuStatusUp, 1)
resetExecutionStatus(&execution)
_, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress,
models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped,
models.ExecutionPropsName.EndTime)
log.Infof("Update task %d: -> %s", id, status)
res, err := o.Raw(sql, params).Exec()
if err != nil {
log.Errorf("Update execution status failed %d: %v", execution.ID, err)
o.Rollback()
return 0, err
}
o.Commit()
return 1, nil
count, err := res.RowsAffected()
if err != nil {
return 0, err
}
return count, nil
}
func taskFinished(status string) bool {
@ -302,69 +351,3 @@ func taskFinished(status string) bool {
}
return false
}
func getStatus(status string) (string, error) {
switch status {
case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress:
return models.ExecutionStatusInProgress, nil
case models.TaskStatusSucceed:
return models.ExecutionStatusSucceed, nil
case models.TaskStatusStopped:
return models.ExecutionStatusStopped, nil
case models.TaskStatusFailed:
return models.ExecutionStatusFailed, nil
}
return "", fmt.Errorf("Not support task status ")
}
func getStatusCount(execution *models.Execution, status string) int {
switch status {
case models.ExecutionStatusInProgress:
return execution.InProgress
case models.ExecutionStatusSucceed:
return execution.Succeed
case models.ExecutionStatusStopped:
return execution.Stopped
case models.ExecutionStatusFailed:
return execution.Failed
}
return 0
}
func updateStatusCount(execution *models.Execution, status string, delta int) error {
switch status {
case models.ExecutionStatusInProgress:
execution.InProgress += delta
case models.ExecutionStatusSucceed:
execution.Succeed += delta
case models.ExecutionStatusStopped:
execution.Stopped += delta
case models.ExecutionStatusFailed:
execution.Failed += delta
}
return nil
}
func resetExecutionStatus(execution *models.Execution) error {
status := generateStatus(execution)
if status != execution.Status {
execution.Status = status
log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status)
}
if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 {
// execution finished in this time
execution.EndTime = time.Now()
}
return nil
}
func generateStatus(execution *models.Execution) string {
if execution.InProgress > 0 {
return models.ExecutionStatusInProgress
} else if execution.Failed > 0 {
return models.ExecutionStatusFailed
} else if execution.Stopped > 0 {
return models.ExecutionStatusStopped
}
return models.ExecutionStatusSucceed
}

View File

@ -148,6 +148,13 @@ func TestMethodOfTask(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, int64(1), n)
// test update status
n, err = UpdateTaskStatus(id1, "Succeed")
require.Nil(t, err)
assert.Equal(t, int64(1), n)
task, _ = GetTask(id1)
assert.Equal(t, "Succeed", task.Status)
// test delete
require.Nil(t, DeleteTask(id1))
task, err = GetTask(id1)
@ -162,16 +169,12 @@ func TestMethodOfTask(t *testing.T) {
assert.Equal(t, int64(0), n)
}
func TestUpdateJobStatus(t *testing.T) {
func TestExecutionFill(t *testing.T) {
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
StatusText: "None",
Total: 12,
Failed: 0,
Succeed: 10,
InProgress: 1,
Stopped: 1,
Total: 2,
Trigger: "Event",
StartTime: time.Now(),
}
@ -183,7 +186,56 @@ func TestUpdateJobStatus(t *testing.T) {
SrcResource: "srcResource1",
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Pending",
Status: "Succeed",
StartTime: time.Now(),
}
task2 := &models.Task{
ID: 20192,
ExecutionID: executionID,
ResourceType: "resourceType2",
SrcResource: "srcResource2",
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: time.Now(),
}
AddTask(task1)
AddTask(task2)
defer func() {
DeleteAllTasks(executionID)
DeleteAllExecutions(11209)
}()
// query and fill
exe, err := GetExecution(executionID)
require.Nil(t, err)
assert.Equal(t, "Stopped", exe.Status)
assert.Equal(t, 0, exe.InProgress)
assert.Equal(t, 1, exe.Stopped)
assert.Equal(t, 0, exe.Failed)
assert.Equal(t, 1, exe.Succeed)
}
func TestExecutionFill2(t *testing.T) {
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
StatusText: "None",
Total: 2,
Trigger: "Event",
StartTime: time.Now(),
}
executionID, _ := AddExecution(execution)
task1 := &models.Task{
ID: 20191,
ExecutionID: executionID,
ResourceType: "resourceType1",
SrcResource: "srcResource1",
DstResource: "dstResource1",
JobID: "jobID1",
Status: models.TaskStatusInProgress,
StartTime: time.Now(),
}
task2 := &models.Task{
@ -198,40 +250,32 @@ func TestUpdateJobStatus(t *testing.T) {
EndTime: time.Now(),
}
taskID1, _ := AddTask(task1)
taskID2, _ := AddTask(task2)
AddTask(task2)
defer func() {
DeleteAllTasks(executionID)
DeleteAllExecutions(11209)
}()
// update Pending->InProgress
n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending")
// query and fill
exe, err := GetExecution(executionID)
require.Nil(t, err)
assert.Equal(t, int64(1), n)
assert.Equal(t, models.ExecutionStatusInProgress, exe.Status)
assert.Equal(t, 1, exe.InProgress)
assert.Equal(t, 1, exe.Stopped)
assert.Equal(t, 0, exe.Failed)
assert.Equal(t, 0, exe.Succeed)
execu, err := GetExecution(executionID)
// update task status and query and fill
UpdateTaskStatus(taskID1, models.TaskStatusFailed)
exes, err := GetExecutions(&models.ExecutionQuery{
PolicyID: 11209,
})
require.Nil(t, err)
assert.Equal(t, execution.InProgress, execu.InProgress)
assert.Equal(t, execution.Status, execu.Status)
// update InProgress->Failed: Execution.InProgress-1, Failed+1
n, err = UpdateTaskStatus(taskID1, "Failed")
require.Nil(t, err)
assert.Equal(t, int64(1), n)
execu, err = GetExecution(executionID)
require.Nil(t, err)
assert.Equal(t, 1, execu.Failed)
assert.Equal(t, "Failed", execu.Status)
// update Stopped->Pending: Execution.Stopped-1, InProgress+1
n, err = UpdateTaskStatus(taskID2, "Pending")
require.Nil(t, err)
assert.Equal(t, int64(1), n)
execu, err = GetExecution(executionID)
require.Nil(t, err)
assert.Equal(t, 1, execu.InProgress)
assert.Equal(t, "InProgress", execu.Status)
assert.Equal(t, 1, len(exes))
assert.Equal(t, models.ExecutionStatusFailed, exes[0].Status)
assert.Equal(t, 0, exes[0].InProgress)
assert.Equal(t, 1, exes[0].Stopped)
assert.Equal(t, 1, exes[0].Failed)
assert.Equal(t, 0, exes[0].Succeed)
}

View File

@ -144,3 +144,9 @@ type TaskQuery struct {
ResourceType string
Pagination
}
// TaskStat holds statistics of task by status
type TaskStat struct {
Status string `orm:"column(status)"`
C int `orm:"column(c)"`
}

View File

@ -138,9 +138,9 @@ func TestMethodOfTaskManager(t *testing.T) {
// UpdateTaskStatus
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed)
require.NotNil(t, err)
require.Nil(t, err)
taskUpdate, _ = executionManager.GetTask(id)
assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status)
assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status)
// Remove
require.Nil(t, executionManager.RemoveTask(id))