diff --git a/src/replication/ng/dao/execution.go b/src/replication/ng/dao/execution.go index 8a0e7ef2e..b6d641a57 100644 --- a/src/replication/ng/dao/execution.go +++ b/src/replication/ng/dao/execution.go @@ -51,6 +51,12 @@ func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error) qs = qs.OrderBy("-StartTime") _, err := qs.All(&executions) + if err != nil || len(executions) == 0 { + return executions, err + } + for _, e := range executions { + fillExecution(e) + } return executions, err } @@ -81,9 +87,113 @@ func GetExecution(id int64) (*models.Execution, error) { if err == orm.ErrNoRows { return nil, nil } + if err != nil { + return nil, err + } + fillExecution(&t) return &t, err } +// fillExecution will fill the statistics data and status by tasks data +func fillExecution(execution *models.Execution) error { + if executionFinished(execution.Status) { + return nil + } + + o := dao.GetOrmer() + sql := `select status, count(*) as c from replication_task where execution_id = ? group by status` + queryParam := make([]interface{}, 1) + queryParam = append(queryParam, execution.ID) + + dt := []*models.TaskStat{} + count, err := o.Raw(sql, queryParam).QueryRows(&dt) + + if err != nil { + log.Errorf("Query tasks error execution %d: %v", execution.ID, err) + return err + } + + if count == 0 { + return nil + } + + total := 0 + for _, d := range dt { + status, _ := getStatus(d.Status) + updateStatusCount(execution, status, d.C) + total += d.C + } + + if execution.Total != total { + log.Errorf("execution task count inconsistent and fixed, executionID=%d, execution.total=%d, tasks.count=%d", + execution.ID, execution.Total, total) + execution.Total = total + } + resetExecutionStatus(execution) + + // if execution status changed to a final status, store to DB + if executionFinished(execution.Status) { + UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped, + models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total) + } + return nil +} + +func getStatus(status string) (string, error) { + switch status { + case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: + return models.ExecutionStatusInProgress, nil + case models.TaskStatusSucceed: + return models.ExecutionStatusSucceed, nil + case models.TaskStatusStopped: + return models.ExecutionStatusStopped, nil + case models.TaskStatusFailed: + return models.ExecutionStatusFailed, nil + } + return "", fmt.Errorf("Not support task status ") +} + +func updateStatusCount(execution *models.Execution, status string, delta int) error { + switch status { + case models.ExecutionStatusInProgress: + execution.InProgress += delta + case models.ExecutionStatusSucceed: + execution.Succeed += delta + case models.ExecutionStatusStopped: + execution.Stopped += delta + case models.ExecutionStatusFailed: + execution.Failed += delta + } + return nil +} + +func resetExecutionStatus(execution *models.Execution) error { + execution.Status = generateStatus(execution) + if executionFinished(execution.Status) { + execution.EndTime = time.Now() + } + return nil +} + +func generateStatus(execution *models.Execution) string { + if execution.InProgress > 0 { + return models.ExecutionStatusInProgress + } else if execution.Failed > 0 { + return models.ExecutionStatusFailed + } else if execution.Stopped > 0 { + return models.ExecutionStatusStopped + } + return models.ExecutionStatusSucceed +} + +func executionFinished(status string) bool { + if status == models.ExecutionStatusInProgress { + return false + } + return true +} + // DeleteExecution ... func DeleteExecution(id int64) error { o := dao.GetOrmer() @@ -110,19 +220,10 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error // AddTask ... func AddTask(task *models.Task) (int64, error) { o := dao.GetOrmer() - sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status) - values (?, ?, ?, ?, ?, ?) RETURNING id` + now := time.Now() + task.StartTime = now - args := []interface{}{} - args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status) - - var taskID int64 - err := o.Raw(sql, args).QueryRow(&taskID) - if err != nil { - return 0, err - } - - return taskID, nil + return o.Insert(task) } // GetTask ... @@ -210,40 +311,11 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) { // UpdateTaskStatus ... func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { - // can not use the globalOrm - o := orm.NewOrm() - o.Begin() - - // query the task status - var task models.Task - sql := `select * from replication_task where id = ?` - if err := o.Raw(sql, id).QueryRow(&task); err != nil { - if err == orm.ErrNoRows { - o.Rollback() - return 0, err - } - } - - // check status - satisfy := false - if len(statusCondition) == 0 { - satisfy = true - } else { - for _, stCondition := range statusCondition { - if task.Status == stCondition { - satisfy = true - break - } - } - } - if !satisfy { - o.Rollback() - return 0, fmt.Errorf("Status condition not match ") - } + o := dao.GetOrmer() // update status params := []interface{}{} - sql = `update replication_task set status = ?` + sql := `update replication_task set status = ?` params = append(params, status) if taskFinished(status) { // should update endTime sql += ` ,end_time = ?` @@ -251,49 +323,26 @@ func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64 } sql += ` where id = ?` params = append(params, id) - _, err := o.Raw(sql, params).Exec() - log.Infof("Update task %d: %s -> %s", id, task.Status, status) - if err != nil { - log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status) - o.Rollback() - return 0, err - } - - // query the execution - var execution models.Execution - sql = `select * from replication_execution where id = ?` - if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil { - if err == orm.ErrNoRows { - log.Errorf("Execution not found id: %d", task.ExecutionID) - o.Rollback() - return 0, err + if len(statusCondition) > 0 { + sql += ` and status in (` + for _, stCondition := range statusCondition { + sql += ` ?,` + params = append(params, stCondition) } - } - // check execution data - execuStatus, _ := getStatus(task.Status) - count := getStatusCount(&execution, execuStatus) - if count <= 0 { - log.Errorf("Task statistics in execution inconsistent") - o.Commit() - return 1, nil + sql = sql[0 : len(sql)-1] + sql += `)` } - // update execution data - updateStatusCount(&execution, execuStatus, -1) - execuStatusUp, _ := getStatus(status) - updateStatusCount(&execution, execuStatusUp, 1) - - resetExecutionStatus(&execution) - _, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress, - models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped, - models.ExecutionPropsName.EndTime) + log.Infof("Update task %d: -> %s", id, status) + res, err := o.Raw(sql, params).Exec() if err != nil { - log.Errorf("Update execution status failed %d: %v", execution.ID, err) - o.Rollback() return 0, err } - o.Commit() - return 1, nil + count, err := res.RowsAffected() + if err != nil { + return 0, err + } + return count, nil } func taskFinished(status string) bool { @@ -302,69 +351,3 @@ func taskFinished(status string) bool { } return false } - -func getStatus(status string) (string, error) { - switch status { - case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: - return models.ExecutionStatusInProgress, nil - case models.TaskStatusSucceed: - return models.ExecutionStatusSucceed, nil - case models.TaskStatusStopped: - return models.ExecutionStatusStopped, nil - case models.TaskStatusFailed: - return models.ExecutionStatusFailed, nil - } - return "", fmt.Errorf("Not support task status ") -} - -func getStatusCount(execution *models.Execution, status string) int { - switch status { - case models.ExecutionStatusInProgress: - return execution.InProgress - case models.ExecutionStatusSucceed: - return execution.Succeed - case models.ExecutionStatusStopped: - return execution.Stopped - case models.ExecutionStatusFailed: - return execution.Failed - } - return 0 -} - -func updateStatusCount(execution *models.Execution, status string, delta int) error { - switch status { - case models.ExecutionStatusInProgress: - execution.InProgress += delta - case models.ExecutionStatusSucceed: - execution.Succeed += delta - case models.ExecutionStatusStopped: - execution.Stopped += delta - case models.ExecutionStatusFailed: - execution.Failed += delta - } - return nil -} - -func resetExecutionStatus(execution *models.Execution) error { - status := generateStatus(execution) - if status != execution.Status { - execution.Status = status - log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status) - } - if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 { - // execution finished in this time - execution.EndTime = time.Now() - } - return nil -} - -func generateStatus(execution *models.Execution) string { - if execution.InProgress > 0 { - return models.ExecutionStatusInProgress - } else if execution.Failed > 0 { - return models.ExecutionStatusFailed - } else if execution.Stopped > 0 { - return models.ExecutionStatusStopped - } - return models.ExecutionStatusSucceed -} diff --git a/src/replication/ng/dao/execution_test.go b/src/replication/ng/dao/execution_test.go index a9adec642..a3f609445 100644 --- a/src/replication/ng/dao/execution_test.go +++ b/src/replication/ng/dao/execution_test.go @@ -148,6 +148,13 @@ func TestMethodOfTask(t *testing.T) { require.Nil(t, err) assert.Equal(t, int64(1), n) + // test update status + n, err = UpdateTaskStatus(id1, "Succeed") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + task, _ = GetTask(id1) + assert.Equal(t, "Succeed", task.Status) + // test delete require.Nil(t, DeleteTask(id1)) task, err = GetTask(id1) @@ -162,16 +169,12 @@ func TestMethodOfTask(t *testing.T) { assert.Equal(t, int64(0), n) } -func TestUpdateJobStatus(t *testing.T) { +func TestExecutionFill(t *testing.T) { execution := &models.Execution{ PolicyID: 11209, Status: "InProgress", StatusText: "None", - Total: 12, - Failed: 0, - Succeed: 10, - InProgress: 1, - Stopped: 1, + Total: 2, Trigger: "Event", StartTime: time.Now(), } @@ -183,7 +186,56 @@ func TestUpdateJobStatus(t *testing.T) { SrcResource: "srcResource1", DstResource: "dstResource1", JobID: "jobID1", - Status: "Pending", + Status: "Succeed", + StartTime: time.Now(), + } + task2 := &models.Task{ + ID: 20192, + ExecutionID: executionID, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + AddTask(task1) + AddTask(task2) + + defer func() { + DeleteAllTasks(executionID) + DeleteAllExecutions(11209) + }() + + // query and fill + exe, err := GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, "Stopped", exe.Status) + assert.Equal(t, 0, exe.InProgress) + assert.Equal(t, 1, exe.Stopped) + assert.Equal(t, 0, exe.Failed) + assert.Equal(t, 1, exe.Succeed) +} + +func TestExecutionFill2(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 2, + Trigger: "Event", + StartTime: time.Now(), + } + executionID, _ := AddExecution(execution) + task1 := &models.Task{ + ID: 20191, + ExecutionID: executionID, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: models.TaskStatusInProgress, StartTime: time.Now(), } task2 := &models.Task{ @@ -198,40 +250,32 @@ func TestUpdateJobStatus(t *testing.T) { EndTime: time.Now(), } taskID1, _ := AddTask(task1) - taskID2, _ := AddTask(task2) + AddTask(task2) defer func() { DeleteAllTasks(executionID) DeleteAllExecutions(11209) }() - // update Pending->InProgress - n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending") + // query and fill + exe, err := GetExecution(executionID) require.Nil(t, err) - assert.Equal(t, int64(1), n) + assert.Equal(t, models.ExecutionStatusInProgress, exe.Status) + assert.Equal(t, 1, exe.InProgress) + assert.Equal(t, 1, exe.Stopped) + assert.Equal(t, 0, exe.Failed) + assert.Equal(t, 0, exe.Succeed) - execu, err := GetExecution(executionID) + // update task status and query and fill + UpdateTaskStatus(taskID1, models.TaskStatusFailed) + exes, err := GetExecutions(&models.ExecutionQuery{ + PolicyID: 11209, + }) require.Nil(t, err) - assert.Equal(t, execution.InProgress, execu.InProgress) - assert.Equal(t, execution.Status, execu.Status) - - // update InProgress->Failed: Execution.InProgress-1, Failed+1 - n, err = UpdateTaskStatus(taskID1, "Failed") - require.Nil(t, err) - assert.Equal(t, int64(1), n) - - execu, err = GetExecution(executionID) - require.Nil(t, err) - assert.Equal(t, 1, execu.Failed) - assert.Equal(t, "Failed", execu.Status) - - // update Stopped->Pending: Execution.Stopped-1, InProgress+1 - n, err = UpdateTaskStatus(taskID2, "Pending") - require.Nil(t, err) - assert.Equal(t, int64(1), n) - - execu, err = GetExecution(executionID) - require.Nil(t, err) - assert.Equal(t, 1, execu.InProgress) - assert.Equal(t, "InProgress", execu.Status) + assert.Equal(t, 1, len(exes)) + assert.Equal(t, models.ExecutionStatusFailed, exes[0].Status) + assert.Equal(t, 0, exes[0].InProgress) + assert.Equal(t, 1, exes[0].Stopped) + assert.Equal(t, 1, exes[0].Failed) + assert.Equal(t, 0, exes[0].Succeed) } diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go index d73c3e21e..dfd6fe9e9 100644 --- a/src/replication/ng/dao/models/execution.go +++ b/src/replication/ng/dao/models/execution.go @@ -144,3 +144,9 @@ type TaskQuery struct { ResourceType string Pagination } + +// TaskStat holds statistics of task by status +type TaskStat struct { + Status string `orm:"column(status)"` + C int `orm:"column(c)"` +} diff --git a/src/replication/ng/execution/execution_test.go b/src/replication/ng/execution/execution_test.go index 4a8fa0900..3ccb23d54 100644 --- a/src/replication/ng/execution/execution_test.go +++ b/src/replication/ng/execution/execution_test.go @@ -138,9 +138,9 @@ func TestMethodOfTaskManager(t *testing.T) { // UpdateTaskStatus err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed) - require.NotNil(t, err) + require.Nil(t, err) taskUpdate, _ = executionManager.GetTask(id) - assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status) + assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status) // Remove require.Nil(t, executionManager.RemoveTask(id))