From 4efad287ce26e3c7d3e00c9b71ab4d8c5ed5024d Mon Sep 17 00:00:00 2001 From: peimingming Date: Wed, 13 Mar 2019 09:35:01 +0800 Subject: [PATCH] Add execution and hooks Signed-off-by: peimingming --- .../postgresql/0004_1.8.0_schema.up.sql | 33 +- src/common/models/base.go | 4 - src/core/api/models/execution.go | 15 + src/core/api/replication_execution.go | 16 +- src/core/api/replication_execution_test.go | 24 +- src/core/api/replication_policy_ng.go | 5 +- src/core/router.go | 1 + .../service/notifications/jobs/handler.go | 11 + src/replication/ng/dao/base.go | 13 + src/replication/ng/dao/dao_test.go | 3 +- src/replication/ng/dao/execution.go | 370 ++++++++++++++++++ src/replication/ng/dao/execution_test.go | 237 +++++++++++ src/replication/ng/dao/models/base.go | 19 + src/replication/ng/dao/models/execution.go | 146 +++++++ src/replication/ng/execution/execution.go | 147 ++++++- .../ng/execution/execution_test.go | 149 +++++++ src/replication/ng/flow/controller_test.go | 17 +- src/replication/ng/flow/flow.go | 21 +- src/replication/ng/model/execution.go | 88 ----- src/replication/ng/operation/controller.go | 21 +- .../ng/operation/controller_test.go | 23 +- 21 files changed, 1203 insertions(+), 160 deletions(-) create mode 100644 src/core/api/models/execution.go create mode 100644 src/replication/ng/dao/base.go create mode 100644 src/replication/ng/dao/execution.go create mode 100644 src/replication/ng/dao/execution_test.go create mode 100644 src/replication/ng/dao/models/base.go create mode 100644 src/replication/ng/dao/models/execution.go create mode 100644 src/replication/ng/execution/execution_test.go delete mode 100644 src/replication/ng/model/execution.go diff --git a/make/migrations/postgresql/0004_1.8.0_schema.up.sql b/make/migrations/postgresql/0004_1.8.0_schema.up.sql index bdebac2c5..7aa4407e3 100644 --- a/make/migrations/postgresql/0004_1.8.0_schema.up.sql +++ b/make/migrations/postgresql/0004_1.8.0_schema.up.sql @@ -60,4 +60,35 @@ CREATE TABLE "replication_policy_ng" ( "creation_time" timestamp(6) DEFAULT now(), "update_time" timestamp(6) DEFAULT now(), CONSTRAINT unique_policy_ng_name UNIQUE ("name") -); \ No newline at end of file +); + +create table replication_execution ( + id SERIAL NOT NULL, + policy_id int NOT NULL, + status varchar(32), + status_text varchar(256), + total int NOT NULL DEFAULT 0, + failed int NOT NULL DEFAULT 0, + succeed int NOT NULL DEFAULT 0, + in_progress int NOT NULL DEFAULT 0, + stopped int NOT NULL DEFAULT 0, + trigger varchar(64), + start_time timestamp default CURRENT_TIMESTAMP, + end_time timestamp NULL, + PRIMARY KEY (id) + ); +CREATE INDEX execution_policy ON replication_execution (policy_id); + +create table replication_task ( + id SERIAL NOT NULL, + execution_id int NOT NULL, + resource_type varchar(64), + src_resource varchar(256), + dst_resource varchar(256), + job_id varchar(64), + status varchar(32), + start_time timestamp default CURRENT_TIMESTAMP, + end_time timestamp NULL, + PRIMARY KEY (id) +); +CREATE INDEX task_execution ON replication_task (execution_id); \ No newline at end of file diff --git a/src/common/models/base.go b/src/common/models/base.go index 50b2cc0f2..6bf3e525f 100644 --- a/src/common/models/base.go +++ b/src/common/models/base.go @@ -16,14 +16,10 @@ package models import ( "github.com/astaxie/beego/orm" - - "github.com/goharbor/harbor/src/replication/ng/dao/models" ) func init() { orm.RegisterModel( - new(models.Registry), - new(models.RepPolicy), new(RepPolicy), new(RepJob), new(User), diff --git a/src/core/api/models/execution.go b/src/core/api/models/execution.go new file mode 100644 index 000000000..e5445c27f --- /dev/null +++ b/src/core/api/models/execution.go @@ -0,0 +1,15 @@ +package models + +import ( + "time" +) + +// Execution defines the data model used in API level +type Execution struct { + ID int64 `json:"id"` + Status string `json:"status"` + TriggerMode string `json:"trigger_mode"` + Duration int `json:"duration"` + SuccessRate string `json:"success_rate"` + StartTime time.Time `json:"start_time"` +} diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 1049bf12d..0652fa541 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -20,7 +20,7 @@ import ( "strconv" "github.com/goharbor/harbor/src/replication/ng" - "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/dao/models" ) // ReplicationOperationAPI handles the replication operation requests @@ -73,9 +73,9 @@ func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac // ListExecutions ... func (r *ReplicationOperationAPI) ListExecutions() { - query := &model.ExecutionQuery{ - Status: r.GetString("status"), - Trigger: r.GetString("trigger"), + query := &models.ExecutionQuery{ + Statuses: []string{r.GetString("status")}, + Trigger: r.GetString("trigger"), } if len(r.GetString("policy_id")) > 0 { policyID, err := r.GetInt64("policy_id") @@ -97,7 +97,7 @@ func (r *ReplicationOperationAPI) ListExecutions() { // CreateExecution starts a replication func (r *ReplicationOperationAPI) CreateExecution() { - execution := &model.Execution{} + execution := &models.Execution{} r.DecodeJSONReq(execution) policy, err := ng.PolicyMgr.Get(execution.PolicyID) if err != nil { @@ -160,10 +160,10 @@ func (r *ReplicationOperationAPI) ListTasks() { return } - query := &model.TaskQuery{ + query := &models.TaskQuery{ ExecutionID: executionID, - ResourceType: (model.ResourceType)(r.GetString("resource_type")), - Status: r.GetString("status"), + ResourceType: r.GetString("resource_type"), + Statuses: []string{r.GetString("status")}, } query.Page, query.Size = r.GetPaginationParams() total, tasks, err := ng.OperationCtl.ListTasks(query) diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index ec9bafdbe..e9ad45e55 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/goharbor/harbor/src/replication/ng" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -30,40 +31,43 @@ func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64 func (f *fakedOperationController) StopReplication(int64) error { return nil } -func (f *fakedOperationController) ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) { - return 1, []*model.Execution{ +func (f *fakedOperationController) ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 1, []*models.Execution{ { ID: 1, PolicyID: 1, }, }, nil } -func (f *fakedOperationController) GetExecution(id int64) (*model.Execution, error) { +func (f *fakedOperationController) GetExecution(id int64) (*models.Execution, error) { if id == 1 { - return &model.Execution{ + return &models.Execution{ ID: 1, PolicyID: 1, }, nil } return nil, nil } -func (f *fakedOperationController) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { - return 1, []*model.Task{ +func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 1, []*models.Task{ { ID: 1, ExecutionID: 1, }, }, nil } -func (f *fakedOperationController) GetTask(id int64) (*model.Task, error) { +func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) { if id == 1 { - return &model.Task{ + return &models.Task{ ID: 1, ExecutionID: 1, }, nil } return nil, nil } +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { + return nil +} func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) { return []byte("success"), nil } @@ -174,7 +178,7 @@ func TestCreateExecution(t *testing.T) { request: &testingRequest{ method: http.MethodPost, url: "/api/replication/executions", - bodyJSON: &model.Execution{ + bodyJSON: &models.Execution{ PolicyID: 2, }, credential: sysAdmin, @@ -186,7 +190,7 @@ func TestCreateExecution(t *testing.T) { request: &testingRequest{ method: http.MethodPost, url: "/api/replication/executions", - bodyJSON: &model.Execution{ + bodyJSON: &models.Execution{ PolicyID: 1, }, credential: sysAdmin, diff --git a/src/core/api/replication_policy_ng.go b/src/core/api/replication_policy_ng.go index f7410ada8..8ac6feca1 100644 --- a/src/core/api/replication_policy_ng.go +++ b/src/core/api/replication_policy_ng.go @@ -20,6 +20,7 @@ import ( "strconv" "github.com/goharbor/harbor/src/replication/ng" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -192,7 +193,7 @@ func (r *ReplicationPolicyAPI) Delete() { return } - _, executions, err := ng.OperationCtl.ListExecutions(&model.ExecutionQuery{ + _, executions, err := ng.OperationCtl.ListExecutions(&models.ExecutionQuery{ PolicyID: id, }) if err != nil { @@ -201,7 +202,7 @@ func (r *ReplicationPolicyAPI) Delete() { } for _, execution := range executions { - if execution.Status == model.ExecutionStatusInProgress { + if execution.Status == models.ExecutionStatusInProgress { r.HandleStatusPreconditionFailed(fmt.Sprintf("the policy %d has running executions, can not be deleted", id)) return } diff --git a/src/core/router.go b/src/core/router.go index a954ff950..b491845af 100644 --- a/src/core/router.go +++ b/src/core/router.go @@ -131,6 +131,7 @@ func initRouters() { beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan") beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication") beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob") + beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask") beego.Router("/service/token", &token.Handler{}) beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post") diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index af621bb10..ceb5f5a27 100644 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -23,6 +23,7 @@ import ( "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" + "github.com/goharbor/harbor/src/replication/ng" ) var statusMap = map[string]string{ @@ -87,3 +88,13 @@ func (h *Handler) HandleReplication() { return } } + +// HandleReplicationTask handles the webhook of replication task +func (h *Handler) HandleReplicationTask() { + log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status) + if err := ng.OperationCtl.UpdateTaskStatus(h.id, h.status); err != nil { + log.Errorf("Failed to update replication task status, id: %d, status: %s", h.id, h.status) + h.HandleInternalServerError(err.Error()) + return + } +} diff --git a/src/replication/ng/dao/base.go b/src/replication/ng/dao/base.go new file mode 100644 index 000000000..29a567d8b --- /dev/null +++ b/src/replication/ng/dao/base.go @@ -0,0 +1,13 @@ +package dao + +import "github.com/astaxie/beego/orm" + +func paginateForQuerySetter(qs orm.QuerySeter, page, size int64) orm.QuerySeter { + if size > 0 { + qs = qs.Limit(size) + if page > 0 { + qs = qs.Offset((page - 1) * size) + } + } + return qs +} diff --git a/src/replication/ng/dao/dao_test.go b/src/replication/ng/dao/dao_test.go index d27c52f37..2f36ac1f6 100644 --- a/src/replication/ng/dao/dao_test.go +++ b/src/replication/ng/dao/dao_test.go @@ -32,7 +32,8 @@ func TestMain(m *testing.M) { "harbor_label", "harbor_resource_label", "harbor_user", "img_scan_job", "img_scan_overview", "job_log", "project", "project_member", "project_metadata", "properties", "registry", "replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng", - "replication_target", "repository", "robot", "role", "schema_migrations", "user_group";`, + "replication_target", "repository", "robot", "role", "schema_migrations", "user_group", + "replication_execution", "replication_task";`, `DROP FUNCTION "update_update_time_at_column"();`, } dao.PrepareTestData(clearSqls, nil) diff --git a/src/replication/ng/dao/execution.go b/src/replication/ng/dao/execution.go new file mode 100644 index 000000000..8a0e7ef2e --- /dev/null +++ b/src/replication/ng/dao/execution.go @@ -0,0 +1,370 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "time" + + "fmt" + "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +// AddExecution ... +func AddExecution(execution *models.Execution) (int64, error) { + o := dao.GetOrmer() + now := time.Now() + execution.StartTime = now + + return o.Insert(execution) +} + +// GetTotalOfExecutions returns the total count of replication execution +func GetTotalOfExecutions(query ...*models.ExecutionQuery) (int64, error) { + qs := executionQueryConditions(query...) + return qs.Count() +} + +// GetExecutions ... +func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error) { + executions := []*models.Execution{} + + qs := executionQueryConditions(query...) + if len(query) > 0 && query[0] != nil { + qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size) + } + + qs = qs.OrderBy("-StartTime") + + _, err := qs.All(&executions) + return executions, err +} + +func executionQueryConditions(query ...*models.ExecutionQuery) orm.QuerySeter { + qs := dao.GetOrmer().QueryTable(new(models.Execution)) + if len(query) == 0 || query[0] == nil { + return qs + } + + q := query[0] + if q.PolicyID != 0 { + qs = qs.Filter("PolicyID", q.PolicyID) + } + if len(q.Trigger) > 0 { + qs = qs.Filter("Trigger", q.Trigger) + } + if len(q.Statuses) > 0 { + qs = qs.Filter("Status__in", q.Statuses) + } + return qs +} + +// GetExecution ... +func GetExecution(id int64) (*models.Execution, error) { + o := dao.GetOrmer() + t := models.Execution{ID: id} + err := o.Read(&t) + if err == orm.ErrNoRows { + return nil, nil + } + return &t, err +} + +// DeleteExecution ... +func DeleteExecution(id int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Execution{ID: id}) + return err +} + +// DeleteAllExecutions ... +func DeleteAllExecutions(policyID int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Execution{PolicyID: policyID}, "PolicyID") + return err +} + +// UpdateExecution ... +func UpdateExecution(execution *models.Execution, props ...string) (int64, error) { + if execution.ID == 0 { + return 0, fmt.Errorf("execution ID is empty") + } + o := dao.GetOrmer() + return o.Update(execution, props...) +} + +// AddTask ... +func AddTask(task *models.Task) (int64, error) { + o := dao.GetOrmer() + sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status) + values (?, ?, ?, ?, ?, ?) RETURNING id` + + args := []interface{}{} + args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status) + + var taskID int64 + err := o.Raw(sql, args).QueryRow(&taskID) + if err != nil { + return 0, err + } + + return taskID, nil +} + +// GetTask ... +func GetTask(id int64) (*models.Task, error) { + o := dao.GetOrmer() + sql := `select * from replication_task where id = ?` + + var task models.Task + + if err := o.Raw(sql, id).QueryRow(&task); err != nil { + if err == orm.ErrNoRows { + return nil, nil + } + return nil, err + } + + return &task, nil +} + +// GetTotalOfTasks ... +func GetTotalOfTasks(query ...*models.TaskQuery) (int64, error) { + qs := taskQueryConditions(query...) + return qs.Count() +} + +// GetTasks ... +func GetTasks(query ...*models.TaskQuery) ([]*models.Task, error) { + tasks := []*models.Task{} + + qs := taskQueryConditions(query...) + if len(query) > 0 && query[0] != nil { + qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size) + } + + qs = qs.OrderBy("-StartTime") + + _, err := qs.All(&tasks) + return tasks, err +} + +func taskQueryConditions(query ...*models.TaskQuery) orm.QuerySeter { + qs := dao.GetOrmer().QueryTable(new(models.Task)) + if len(query) == 0 || query[0] == nil { + return qs + } + + q := query[0] + if q.ExecutionID != 0 { + qs = qs.Filter("ExecutionID", q.ExecutionID) + } + if len(q.JobID) > 0 { + qs = qs.Filter("JobID", q.JobID) + } + if len(q.ResourceType) > 0 { + qs = qs.Filter("ResourceType", q.ResourceType) + } + if len(q.Statuses) > 0 { + qs = qs.Filter("Status__in", q.Statuses) + } + return qs +} + +// DeleteTask ... +func DeleteTask(id int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Task{ID: id}) + return err +} + +// DeleteAllTasks ... +func DeleteAllTasks(executionID int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Task{ExecutionID: executionID}, "ExecutionID") + return err +} + +// UpdateTask ... +func UpdateTask(task *models.Task, props ...string) (int64, error) { + if task.ID == 0 { + return 0, fmt.Errorf("task ID is empty") + } + o := dao.GetOrmer() + return o.Update(task, props...) +} + +// UpdateTaskStatus ... +func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { + // can not use the globalOrm + o := orm.NewOrm() + o.Begin() + + // query the task status + var task models.Task + sql := `select * from replication_task where id = ?` + if err := o.Raw(sql, id).QueryRow(&task); err != nil { + if err == orm.ErrNoRows { + o.Rollback() + return 0, err + } + } + + // check status + satisfy := false + if len(statusCondition) == 0 { + satisfy = true + } else { + for _, stCondition := range statusCondition { + if task.Status == stCondition { + satisfy = true + break + } + } + } + if !satisfy { + o.Rollback() + return 0, fmt.Errorf("Status condition not match ") + } + + // update status + params := []interface{}{} + sql = `update replication_task set status = ?` + params = append(params, status) + if taskFinished(status) { // should update endTime + sql += ` ,end_time = ?` + params = append(params, time.Now()) + } + sql += ` where id = ?` + params = append(params, id) + _, err := o.Raw(sql, params).Exec() + log.Infof("Update task %d: %s -> %s", id, task.Status, status) + if err != nil { + log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status) + o.Rollback() + return 0, err + } + + // query the execution + var execution models.Execution + sql = `select * from replication_execution where id = ?` + if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil { + if err == orm.ErrNoRows { + log.Errorf("Execution not found id: %d", task.ExecutionID) + o.Rollback() + return 0, err + } + } + // check execution data + execuStatus, _ := getStatus(task.Status) + count := getStatusCount(&execution, execuStatus) + if count <= 0 { + log.Errorf("Task statistics in execution inconsistent") + o.Commit() + return 1, nil + } + + // update execution data + updateStatusCount(&execution, execuStatus, -1) + execuStatusUp, _ := getStatus(status) + updateStatusCount(&execution, execuStatusUp, 1) + + resetExecutionStatus(&execution) + _, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped, + models.ExecutionPropsName.EndTime) + if err != nil { + log.Errorf("Update execution status failed %d: %v", execution.ID, err) + o.Rollback() + return 0, err + } + o.Commit() + return 1, nil +} + +func taskFinished(status string) bool { + if status == models.TaskStatusFailed || status == models.TaskStatusStopped || status == models.TaskStatusSucceed { + return true + } + return false +} + +func getStatus(status string) (string, error) { + switch status { + case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: + return models.ExecutionStatusInProgress, nil + case models.TaskStatusSucceed: + return models.ExecutionStatusSucceed, nil + case models.TaskStatusStopped: + return models.ExecutionStatusStopped, nil + case models.TaskStatusFailed: + return models.ExecutionStatusFailed, nil + } + return "", fmt.Errorf("Not support task status ") +} + +func getStatusCount(execution *models.Execution, status string) int { + switch status { + case models.ExecutionStatusInProgress: + return execution.InProgress + case models.ExecutionStatusSucceed: + return execution.Succeed + case models.ExecutionStatusStopped: + return execution.Stopped + case models.ExecutionStatusFailed: + return execution.Failed + } + return 0 +} + +func updateStatusCount(execution *models.Execution, status string, delta int) error { + switch status { + case models.ExecutionStatusInProgress: + execution.InProgress += delta + case models.ExecutionStatusSucceed: + execution.Succeed += delta + case models.ExecutionStatusStopped: + execution.Stopped += delta + case models.ExecutionStatusFailed: + execution.Failed += delta + } + return nil +} + +func resetExecutionStatus(execution *models.Execution) error { + status := generateStatus(execution) + if status != execution.Status { + execution.Status = status + log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status) + } + if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 { + // execution finished in this time + execution.EndTime = time.Now() + } + return nil +} + +func generateStatus(execution *models.Execution) string { + if execution.InProgress > 0 { + return models.ExecutionStatusInProgress + } else if execution.Failed > 0 { + return models.ExecutionStatusFailed + } else if execution.Stopped > 0 { + return models.ExecutionStatusStopped + } + return models.ExecutionStatusSucceed +} diff --git a/src/replication/ng/dao/execution_test.go b/src/replication/ng/dao/execution_test.go new file mode 100644 index 000000000..a9adec642 --- /dev/null +++ b/src/replication/ng/dao/execution_test.go @@ -0,0 +1,237 @@ +package dao + +import ( + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestMethodOfExecution(t *testing.T) { + execution1 := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 7, + InProgress: 5, + Stopped: 0, + Trigger: "Event", + StartTime: time.Now(), + } + execution2 := &models.Execution{ + PolicyID: 11209, + Status: "Failed", + StatusText: "Network error", + Total: 9, + Failed: 1, + Succeed: 8, + InProgress: 0, + Stopped: 0, + Trigger: "Manual", + StartTime: time.Now(), + } + + // test add + id1, err := AddExecution(execution1) + require.Nil(t, err) + + _, err = AddExecution(execution2) + require.Nil(t, err) + + // test list + query := &models.ExecutionQuery{ + Statuses: []string{"InProgress", "Failed"}, + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + executions, err := GetExecutions(query) + require.Nil(t, err) + assert.Equal(t, 2, len(executions)) + + total, err := GetTotalOfExecutions(query) + require.Nil(t, err) + assert.Equal(t, int64(2), total) + + // test get + execution, err := GetExecution(id1) + require.Nil(t, err) + assert.Equal(t, execution1.Status, execution.Status) + + // test update + executionNew := &models.Execution{ + ID: id1, + Status: "Succeed", + Succeed: 12, + InProgress: 0, + EndTime: time.Now(), + } + n, err := UpdateExecution(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.EndTime) + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + // test delete + require.Nil(t, DeleteExecution(execution1.ID)) + execution, err = GetExecution(execution1.ID) + require.Nil(t, err) + require.Nil(t, execution) + + // test delete all + require.Nil(t, DeleteAllExecutions(execution1.PolicyID)) + query = &models.ExecutionQuery{} + n, err = GetTotalOfExecutions(query) + require.Nil(t, err) + assert.Equal(t, int64(0), n) +} + +func TestMethodOfTask(t *testing.T) { + task1 := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StartTime: time.Now(), + } + task2 := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + + // test add + id1, err := AddTask(task1) + require.Nil(t, err) + + _, err = AddTask(task2) + require.Nil(t, err) + + // test list + query := &models.TaskQuery{ + ResourceType: "resourceType1", + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + tasks, err := GetTasks(query) + require.Nil(t, err) + assert.Equal(t, 1, len(tasks)) + + total, err := GetTotalOfTasks(query) + require.Nil(t, err) + assert.Equal(t, int64(1), total) + + // test get + task, err := GetTask(id1) + require.Nil(t, err) + assert.Equal(t, task1.Status, task.Status) + + // test update + taskNew := &models.Task{ + ID: id1, + Status: "Failed", + EndTime: time.Now(), + } + n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime) + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + // test delete + require.Nil(t, DeleteTask(id1)) + task, err = GetTask(id1) + require.Nil(t, err) + require.Nil(t, task) + + // test delete all + require.Nil(t, DeleteAllTasks(task1.ExecutionID)) + query = &models.TaskQuery{} + n, err = GetTotalOfTasks(query) + require.Nil(t, err) + assert.Equal(t, int64(0), n) +} + +func TestUpdateJobStatus(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 10, + InProgress: 1, + Stopped: 1, + Trigger: "Event", + StartTime: time.Now(), + } + executionID, _ := AddExecution(execution) + task1 := &models.Task{ + ID: 20191, + ExecutionID: executionID, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Pending", + StartTime: time.Now(), + } + task2 := &models.Task{ + ID: 20192, + ExecutionID: executionID, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + taskID1, _ := AddTask(task1) + taskID2, _ := AddTask(task2) + + defer func() { + DeleteAllTasks(executionID) + DeleteAllExecutions(11209) + }() + + // update Pending->InProgress + n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err := GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, execution.InProgress, execu.InProgress) + assert.Equal(t, execution.Status, execu.Status) + + // update InProgress->Failed: Execution.InProgress-1, Failed+1 + n, err = UpdateTaskStatus(taskID1, "Failed") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err = GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, 1, execu.Failed) + assert.Equal(t, "Failed", execu.Status) + + // update Stopped->Pending: Execution.Stopped-1, InProgress+1 + n, err = UpdateTaskStatus(taskID2, "Pending") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err = GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, 1, execu.InProgress) + assert.Equal(t, "InProgress", execu.Status) +} diff --git a/src/replication/ng/dao/models/base.go b/src/replication/ng/dao/models/base.go new file mode 100644 index 000000000..da1221e2f --- /dev/null +++ b/src/replication/ng/dao/models/base.go @@ -0,0 +1,19 @@ +package models + +import ( + "github.com/astaxie/beego/orm" +) + +func init() { + orm.RegisterModel( + new(Registry), + new(RepPolicy), + new(Execution), + new(Task)) +} + +// Pagination ... +type Pagination struct { + Page int64 + Size int64 +} diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go new file mode 100644 index 000000000..d73c3e21e --- /dev/null +++ b/src/replication/ng/dao/models/execution.go @@ -0,0 +1,146 @@ +package models + +import ( + "time" +) + +const ( + // ExecutionTable is the table name for replication executions + ExecutionTable = "replication_execution" + // TaskTable is table name for replication tasks + TaskTable = "replication_task" +) + +// execution/task status/trigger const +const ( + ExecutionStatusFailed string = "Failed" + ExecutionStatusSucceed string = "Succeed" + ExecutionStatusStopped string = "Stopped" + ExecutionStatusInProgress string = "InProgress" + + ExecutionTriggerManual string = "Manual" + ExecutionTriggerEvent string = "Event" + ExecutionTriggerSchedule string = "Schedule" + + // The task has been persisted in db but not submitted to Jobservice + TaskStatusInitialized string = "Initialized" + TaskStatusPending string = "Pending" + TaskStatusInProgress string = "InProgress" + TaskStatusSucceed string = "Succeed" + TaskStatusFailed string = "Failed" + TaskStatusStopped string = "Stopped" +) + +// ExecutionPropsName defines the names of fields of Execution +var ExecutionPropsName = ExecutionFieldsName{ + ID: "ID", + PolicyID: "PolicyID", + Status: "Status", + StatusText: "StatusText", + Total: "Total", + Failed: "Failed", + Succeed: "Succeed", + InProgress: "InProgress", + Stopped: "Stopped", + Trigger: "Trigger", + StartTime: "StartTime", + EndTime: "EndTime", +} + +// ExecutionFieldsName defines the props of Execution +type ExecutionFieldsName struct { + ID string + PolicyID string + Status string + StatusText string + Total string + Failed string + Succeed string + InProgress string + Stopped string + Trigger string + StartTime string + EndTime string +} + +// Execution holds information about once replication execution. +type Execution struct { + ID int64 `orm:"pk;auto;column(id)" json:"id"` + PolicyID int64 `orm:"column(policy_id)" json:"policy_id"` + Status string `orm:"column(status)" json:"status"` + StatusText string `orm:"column(status_text)" json:"status_text"` + Total int `orm:"column(total)" json:"total"` + Failed int `orm:"column(failed)" json:"failed"` + Succeed int `orm:"column(succeed)" json:"succeed"` + InProgress int `orm:"column(in_progress)" json:"in_progress"` + Stopped int `orm:"column(stopped)" json:"stopped"` + Trigger string `orm:"column(trigger)" json:"trigger"` + StartTime time.Time `orm:"column(start_time)" json:"start_time"` + EndTime time.Time `orm:"column(end_time)" json:"end_time"` +} + +// TaskPropsName defines the names of fields of Task +var TaskPropsName = TaskFieldsName{ + ID: "ID", + ExecutionID: "ExecutionID", + ResourceType: "ResourceType", + SrcResource: "SrcResource", + DstResource: "DstResource", + JobID: "JobID", + Status: "Status", + StartTime: "StartTime", + EndTime: "EndTime", +} + +// TaskFieldsName defines the props of Task +type TaskFieldsName struct { + ID string + ExecutionID string + ResourceType string + SrcResource string + DstResource string + JobID string + Status string + StartTime string + EndTime string +} + +// Task represent the tasks in one execution. +type Task struct { + ID int64 `orm:"pk;auto;column(id)" json:"id"` + ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` + ResourceType string `orm:"column(resource_type)" json:"resource_type"` + SrcResource string `orm:"column(src_resource)" json:"src_resource"` + DstResource string `orm:"column(dst_resource)" json:"dst_resource"` + JobID string `orm:"column(job_id)" json:"job_id"` + Status string `orm:"column(status)" json:"status"` + StartTime time.Time `orm:"column(start_time)" json:"start_time"` + EndTime time.Time `orm:"column(end_time)" json:"end_time"` +} + +// TableName is required by by beego orm to map Execution to table replication_execution +func (r *Execution) TableName() string { + return ExecutionTable +} + +// TableName is required by by beego orm to map Task to table replication_task +func (r *Task) TableName() string { + return TaskTable +} + +// ExecutionQuery holds the query conditions for replication executions +type ExecutionQuery struct { + PolicyID int64 + Statuses []string + Trigger string + Pagination +} + +// TaskQuery holds the query conditions for replication task +type TaskQuery struct { + ExecutionID int64 + JobID string + Statuses []string + ResourceType string + Pagination +} diff --git a/src/replication/ng/execution/execution.go b/src/replication/ng/execution/execution.go index 1b1c5a62a..e47a97251 100644 --- a/src/replication/ng/execution/execution.go +++ b/src/replication/ng/execution/execution.go @@ -15,34 +15,37 @@ package execution import ( - "github.com/goharbor/harbor/src/replication/ng/model" + "fmt" + "github.com/goharbor/harbor/src/core/utils" + "github.com/goharbor/harbor/src/replication/ng/dao" + "github.com/goharbor/harbor/src/replication/ng/dao/models" ) // Manager manages the executions type Manager interface { // Create a new execution - Create(*model.Execution) (int64, error) + Create(*models.Execution) (int64, error) // List the summaries of executions - List(...*model.ExecutionQuery) (int64, []*model.Execution, error) + List(...*models.ExecutionQuery) (int64, []*models.Execution, error) // Get the specified execution - Get(int64) (*model.Execution, error) + Get(int64) (*models.Execution, error) // Update the data of the specified execution, the "props" are the // properties of execution that need to be updated - Update(execution *model.Execution, props ...string) error + Update(execution *models.Execution, props ...string) error // Remove the execution specified by the ID Remove(int64) error // Remove all executions of one policy specified by the policy ID RemoveAll(int64) error // Create a task - CreateTask(*model.Task) (int64, error) + CreateTask(*models.Task) (int64, error) // List the tasks according to the query - ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) + ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) // Get one specified task - GetTask(int64) (*model.Task, error) + GetTask(int64) (*models.Task, error) // Update the task, the "props" are the properties of task // that need to be updated, it cannot include "status". If // you want to update the status, use "UpdateTaskStatus" instead - UpdateTask(task *model.Task, props ...string) error + UpdateTask(task *models.Task, props ...string) error // UpdateTaskStatus only updates the task status. If "statusCondition" // presents, only the tasks whose status equal to "statusCondition" // will be updated @@ -54,3 +57,129 @@ type Manager interface { // Get the log of one specific task GetTaskLog(int64) ([]byte, error) } + +// DefaultManager .. +type DefaultManager struct { +} + +// NewDefaultManager ... +func NewDefaultManager() (Manager, error) { + return &DefaultManager{}, nil +} + +// Create a new execution +func (dm *DefaultManager) Create(execution *models.Execution) (int64, error) { + return dao.AddExecution(execution) +} + +// List the summaries of executions +func (dm *DefaultManager) List(queries ...*models.ExecutionQuery) (int64, []*models.Execution, error) { + total, err := dao.GetTotalOfExecutions(queries...) + if err != nil { + return 0, nil, err + } + + executions, err := dao.GetExecutions(queries...) + if err != nil { + return 0, nil, err + } + return total, executions, nil +} + +// Get the specified execution +func (dm *DefaultManager) Get(id int64) (*models.Execution, error) { + return dao.GetExecution(id) +} + +// Update ... +func (dm *DefaultManager) Update(execution *models.Execution, props ...string) error { + n, err := dao.UpdateExecution(execution, props...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Execution not found error: %d ", execution.ID) + } + return nil +} + +// Remove the execution specified by the ID +func (dm *DefaultManager) Remove(id int64) error { + return dao.DeleteExecution(id) +} + +// RemoveAll executions of one policy specified by the policy ID +func (dm *DefaultManager) RemoveAll(policyID int64) error { + return dao.DeleteAllExecutions(policyID) +} + +// CreateTask used to create a task +func (dm *DefaultManager) CreateTask(task *models.Task) (int64, error) { + return dao.AddTask(task) +} + +// ListTasks list the tasks according to the query +func (dm *DefaultManager) ListTasks(queries ...*models.TaskQuery) (int64, []*models.Task, error) { + total, err := dao.GetTotalOfTasks(queries...) + if err != nil { + return 0, nil, err + } + + tasks, err := dao.GetTasks(queries...) + if err != nil { + return 0, nil, err + } + return total, tasks, nil +} + +// GetTask get one specified task +func (dm *DefaultManager) GetTask(id int64) (*models.Task, error) { + return dao.GetTask(id) +} + +// UpdateTask ... +func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error { + n, err := dao.UpdateTask(task, props...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Task not found error: %d ", task.ID) + } + return nil +} + +// UpdateTaskStatus ... +func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error { + n, err := dao.UpdateTaskStatus(taskID, status, statusCondition...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Update task status failed %d: -> %s ", taskID, status) + } + return nil +} + +// RemoveTask remove one task specified by task ID +func (dm *DefaultManager) RemoveTask(id int64) error { + return dao.DeleteTask(id) +} + +// RemoveAllTasks of one execution specified by the execution ID +func (dm *DefaultManager) RemoveAllTasks(executionID int64) error { + return dao.DeleteAllTasks(executionID) +} + +// GetTaskLog get the log of one specific task +func (dm *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) { + task, err := dao.GetTask(taskID) + if err != nil { + return nil, err + } + if task == nil { + return nil, fmt.Errorf("Task not found %d ", taskID) + } + + return utils.GetJobServiceClient().GetJobLog(task.JobID) +} diff --git a/src/replication/ng/execution/execution_test.go b/src/replication/ng/execution/execution_test.go new file mode 100644 index 000000000..6a6370438 --- /dev/null +++ b/src/replication/ng/execution/execution_test.go @@ -0,0 +1,149 @@ +package execution + +import ( + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +var executionManager, _ = NewDefaultManager() + +func TestMain(m *testing.M) { + databases := []string{"postgresql"} + for _, database := range databases { + log.Infof("run test cases for database: %s", database) + result := 1 + switch database { + case "postgresql": + dao.PrepareTestForPostgresSQL() + default: + log.Fatalf("invalid database: %s", database) + } + + result = m.Run() + + if result != 0 { + os.Exit(result) + } + } + +} + +func TestMethodOfExecutionManager(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 7, + InProgress: 5, + Stopped: 0, + Trigger: "Event", + StartTime: time.Now(), + } + + defer func() { + executionManager.RemoveAll(execution.PolicyID) + }() + + // Create + id, err := executionManager.Create(execution) + require.Nil(t, err) + + // List + query := &models.ExecutionQuery{ + Statuses: []string{"InProgress", "Failed"}, + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + count, executions, err := executionManager.List(query) + require.Nil(t, err) + assert.Equal(t, int64(1), count) + assert.Equal(t, 1, len(executions)) + + // Get + _, err = executionManager.Get(id) + require.Nil(t, err) + + // Update + executionNew := &models.Execution{ + ID: id, + Status: "Failed", + Succeed: 12, + InProgress: 0, + EndTime: time.Now(), + } + err = executionManager.Update(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.EndTime) + require.Nil(t, err) + + // Remove + require.Nil(t, executionManager.Remove(id)) +} + +func TestMethodOfTaskManager(t *testing.T) { + task := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StartTime: time.Now(), + } + + defer func() { + executionManager.RemoveAllTasks(task.ExecutionID) + }() + + // CreateTask + id, err := executionManager.CreateTask(task) + require.Nil(t, err) + + // ListTasks + query := &models.TaskQuery{ + ResourceType: "resourceType1", + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + count, tasks, err := executionManager.ListTasks(query) + require.Nil(t, err) + assert.Equal(t, 1, len(tasks)) + assert.Equal(t, int64(1), count) + + // GetTask + _, err = executionManager.GetTask(id) + require.Nil(t, err) + + // UpdateTask + taskNew := &models.Task{ + ID: id, + SrcResource: "srcResourceChanged", + } + err = executionManager.UpdateTask(taskNew, models.TaskPropsName.SrcResource) + require.Nil(t, err) + taskUpdate, _ := executionManager.GetTask(id) + assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource) + + // UpdateTaskStatus + err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed) + require.NotNil(t, err) + taskUpdate, _ = executionManager.GetTask(id) + assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status) + + // Remove + require.Nil(t, executionManager.RemoveTask(id)) + + // RemoveAll + require.Nil(t, executionManager.RemoveAll(id)) +} diff --git a/src/replication/ng/flow/controller_test.go b/src/replication/ng/flow/controller_test.go index 89c4ea779..278c7953a 100644 --- a/src/replication/ng/flow/controller_test.go +++ b/src/replication/ng/flow/controller_test.go @@ -20,6 +20,7 @@ import ( "github.com/docker/distribution" "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/scheduler" "github.com/stretchr/testify/assert" @@ -85,16 +86,16 @@ func (f *fakedRegistryManager) HealthCheck() error { type fakedExecutionManager struct{} -func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { +func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) { +func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { return 0, nil, nil } -func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { +func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { return nil, nil } -func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { +func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { return nil } func (f *fakedExecutionManager) Remove(int64) error { @@ -103,16 +104,16 @@ func (f *fakedExecutionManager) Remove(int64) error { func (f *fakedExecutionManager) RemoveAll(int64) error { return nil } -func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { +func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { +func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { return 0, nil, nil } -func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { +func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { return nil, nil } -func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { +func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index 29a5cd121..8e46c7a02 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -26,6 +26,7 @@ import ( "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/registry" ) @@ -98,9 +99,9 @@ func newFlow(policy *model.Policy, registryMgr registry.Manager, } func (f *flow) createExecution() (int64, error) { - id, err := f.executionMgr.Create(&model.Execution{ + id, err := f.executionMgr.Create(&models.Execution{ PolicyID: f.policy.ID, - Status: model.ExecutionStatusInProgress, + Status: models.ExecutionStatusInProgress, StartTime: time.Now(), }) f.executionID = id @@ -205,10 +206,10 @@ func (f *flow) preprocess() error { func (f *flow) createTasks() error { for _, item := range f.scheduleItems { - task := &model.Task{ + task := &models.Task{ ExecutionID: f.executionID, - Status: model.TaskStatusInitialized, - ResourceType: item.SrcResource.Type, + Status: models.TaskStatusInitialized, + ResourceType: string(item.SrcResource.Type), SrcResource: getResourceName(item.SrcResource), DstResource: getResourceName(item.DstResource), } @@ -240,17 +241,17 @@ func (f *flow) schedule() error { // task as failure if result.Error != nil { log.Errorf("failed to schedule task %d: %v", result.TaskID, err) - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusFailed); err != nil { + if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } continue } allFailed = false // if the task is submitted successfully, update the status, job ID and start time - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil { + if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } - if err = f.executionMgr.UpdateTask(&model.Task{ + if err = f.executionMgr.UpdateTask(&models.Task{ ID: result.TaskID, JobID: result.JobID, StartTime: time.Now(), @@ -276,9 +277,9 @@ func (f *flow) markExecutionFailure(err error) { log.Errorf("the execution %d is marked as failure because of the error: %s", f.executionID, statusText) err = f.executionMgr.Update( - &model.Execution{ + &models.Execution{ ID: f.executionID, - Status: model.ExecutionStatusFailed, + Status: models.ExecutionStatusFailed, StatusText: statusText, EndTime: time.Now(), }) diff --git a/src/replication/ng/model/execution.go b/src/replication/ng/model/execution.go deleted file mode 100644 index 105671aff..000000000 --- a/src/replication/ng/model/execution.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "time" - - "github.com/goharbor/harbor/src/common/models" -) - -// execution/task status/trigger const -const ( - ExecutionStatusFailed string = "Failed" - ExecutionStatusSucceed string = "Succeed" - ExecutionStatusStopped string = "Stopped" - ExecutionStatusInProgress string = "InProgress" - - ExecutionTriggerManual string = "Manual" - ExecutionTriggerEvent string = "Event" - ExecutionTriggerSchedule string = "Schedule" - - // The task has been persisted in db but not submitted to Jobservice - TaskStatusInitialized string = "Initialized" - TaskStatusPending string = "Pending" - TaskStatusInProgress string = "InProgress" - TaskStatusSucceed string = "Succeed" - TaskStatusFailed string = "Failed" - TaskStatusStopped string = "Stopped" -) - -// Execution defines an execution of the replication -type Execution struct { - ID int64 `json:"id"` - PolicyID int64 `json:"policy_id"` - Status string `json:"status"` - StatusText string `json:"status_text"` - Trigger string `json:"trigger"` - Total int `json:"total"` - Failed int `json:"failed"` - Succeed int `json:"succeed"` - Pending int `json:"pending"` - InProgress int `json:"in_progress"` - Stopped int `json:"stopped"` - Initialized int `json:"initialized"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` -} - -// Task holds the information of one replication task -type Task struct { - ID int64 `json:"id"` - ExecutionID int64 `json:"execution_id"` - ResourceType ResourceType `json:"resource_type"` - SrcResource string `json:"src_resource"` - DstResource string `json:"dst_resource"` - JobID string `json:"job_id"` - Status string `json:"status"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` -} - -// ExecutionQuery defines the query conditions for listing executions -type ExecutionQuery struct { - PolicyID int64 - Status string - Trigger string - models.Pagination -} - -// TaskQuery defines the query conditions for listing tasks -type TaskQuery struct { - ExecutionID int64 - ResourceType ResourceType - Status string - models.Pagination -} diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 0a000f0fc..3ad2e12b2 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -15,6 +15,7 @@ package operation import ( + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/execution" "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/model" @@ -25,10 +26,11 @@ import ( type Controller interface { StartReplication(policy *model.Policy) (int64, error) StopReplication(int64) error - ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) - GetExecution(int64) (*model.Execution, error) - ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) - GetTask(int64) (*model.Task, error) + ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) + GetExecution(int64) (*models.Execution, error) + ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) + GetTask(int64) (*models.Task, error) + UpdateTaskStatus(id int64, status string, statusCondition ...string) error GetTaskLog(int64) ([]byte, error) } @@ -51,18 +53,21 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error func (d *defaultController) StopReplication(executionID int64) error { return d.flowCtl.StopReplication(executionID) } -func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) { +func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return d.executionMgr.List(query...) } -func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) { +func (d *defaultController) GetExecution(executionID int64) (*models.Execution, error) { return d.executionMgr.Get(executionID) } -func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) { +func (d *defaultController) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) { return d.executionMgr.ListTasks(query...) } -func (d *defaultController) GetTask(id int64) (*model.Task, error) { +func (d *defaultController) GetTask(id int64) (*models.Task, error) { return d.executionMgr.GetTask(id) } +func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { + return d.executionMgr.UpdateTaskStatus(id, status, statusCondition...) +} func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) { return d.executionMgr.GetTaskLog(taskID) } diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go index a68e2bda6..73f48e517 100644 --- a/src/replication/ng/operation/controller_test.go +++ b/src/replication/ng/operation/controller_test.go @@ -17,6 +17,7 @@ package operation import ( "testing" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,22 +34,22 @@ func (f *fakedFlowController) StopReplication(int64) error { type fakedExecutionManager struct{} -func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { +func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) { - return 1, []*model.Execution{ +func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 1, []*models.Execution{ { ID: 1, }, }, nil } -func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { - return &model.Execution{ +func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { + return &models.Execution{ ID: 1, }, nil } -func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { +func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { return nil } func (f *fakedExecutionManager) Remove(int64) error { @@ -57,20 +58,20 @@ func (f *fakedExecutionManager) Remove(int64) error { func (f *fakedExecutionManager) RemoveAll(int64) error { return nil } -func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { +func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { - return 1, []*model.Task{ +func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 1, []*models.Task{ { ID: 1, }, }, nil } -func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { +func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { return nil, nil } -func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { +func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {