mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-09 04:01:14 +01:00
Merge pull request #7132 from mmpei/replication_ng_execution_hook
Add execution and hooks
This commit is contained in:
commit
689412b4d0
@ -61,3 +61,34 @@ CREATE TABLE "replication_policy_ng" (
|
||||
"update_time" timestamp(6) DEFAULT now(),
|
||||
CONSTRAINT unique_policy_ng_name UNIQUE ("name")
|
||||
);
|
||||
|
||||
create table replication_execution (
|
||||
id SERIAL NOT NULL,
|
||||
policy_id int NOT NULL,
|
||||
status varchar(32),
|
||||
status_text varchar(256),
|
||||
total int NOT NULL DEFAULT 0,
|
||||
failed int NOT NULL DEFAULT 0,
|
||||
succeed int NOT NULL DEFAULT 0,
|
||||
in_progress int NOT NULL DEFAULT 0,
|
||||
stopped int NOT NULL DEFAULT 0,
|
||||
trigger varchar(64),
|
||||
start_time timestamp default CURRENT_TIMESTAMP,
|
||||
end_time timestamp NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
CREATE INDEX execution_policy ON replication_execution (policy_id);
|
||||
|
||||
create table replication_task (
|
||||
id SERIAL NOT NULL,
|
||||
execution_id int NOT NULL,
|
||||
resource_type varchar(64),
|
||||
src_resource varchar(256),
|
||||
dst_resource varchar(256),
|
||||
job_id varchar(64),
|
||||
status varchar(32),
|
||||
start_time timestamp default CURRENT_TIMESTAMP,
|
||||
end_time timestamp NULL,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
CREATE INDEX task_execution ON replication_task (execution_id);
|
@ -16,14 +16,10 @@ package models
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
)
|
||||
|
||||
func init() {
|
||||
orm.RegisterModel(
|
||||
new(models.Registry),
|
||||
new(models.RepPolicy),
|
||||
new(RepPolicy),
|
||||
new(RepJob),
|
||||
new(User),
|
||||
|
15
src/core/api/models/execution.go
Normal file
15
src/core/api/models/execution.go
Normal file
@ -0,0 +1,15 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Execution defines the data model used in API level
|
||||
type Execution struct {
|
||||
ID int64 `json:"id"`
|
||||
Status string `json:"status"`
|
||||
TriggerMode string `json:"trigger_mode"`
|
||||
Duration int `json:"duration"`
|
||||
SuccessRate string `json:"success_rate"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
}
|
@ -20,7 +20,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
)
|
||||
|
||||
// ReplicationOperationAPI handles the replication operation requests
|
||||
@ -73,8 +73,8 @@ func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac
|
||||
|
||||
// ListExecutions ...
|
||||
func (r *ReplicationOperationAPI) ListExecutions() {
|
||||
query := &model.ExecutionQuery{
|
||||
Status: r.GetString("status"),
|
||||
query := &models.ExecutionQuery{
|
||||
Statuses: []string{r.GetString("status")},
|
||||
Trigger: r.GetString("trigger"),
|
||||
}
|
||||
if len(r.GetString("policy_id")) > 0 {
|
||||
@ -97,7 +97,7 @@ func (r *ReplicationOperationAPI) ListExecutions() {
|
||||
|
||||
// CreateExecution starts a replication
|
||||
func (r *ReplicationOperationAPI) CreateExecution() {
|
||||
execution := &model.Execution{}
|
||||
execution := &models.Execution{}
|
||||
r.DecodeJSONReq(execution)
|
||||
policy, err := ng.PolicyMgr.Get(execution.PolicyID)
|
||||
if err != nil {
|
||||
@ -160,10 +160,10 @@ func (r *ReplicationOperationAPI) ListTasks() {
|
||||
return
|
||||
}
|
||||
|
||||
query := &model.TaskQuery{
|
||||
query := &models.TaskQuery{
|
||||
ExecutionID: executionID,
|
||||
ResourceType: (model.ResourceType)(r.GetString("resource_type")),
|
||||
Status: r.GetString("status"),
|
||||
ResourceType: r.GetString("resource_type"),
|
||||
Statuses: []string{r.GetString("status")},
|
||||
}
|
||||
query.Page, query.Size = r.GetPaginationParams()
|
||||
total, tasks, err := ng.OperationCtl.ListTasks(query)
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
@ -30,40 +31,43 @@ func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64
|
||||
func (f *fakedOperationController) StopReplication(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedOperationController) ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
return 1, []*model.Execution{
|
||||
func (f *fakedOperationController) ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
return 1, []*models.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
PolicyID: 1,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedOperationController) GetExecution(id int64) (*model.Execution, error) {
|
||||
func (f *fakedOperationController) GetExecution(id int64) (*models.Execution, error) {
|
||||
if id == 1 {
|
||||
return &model.Execution{
|
||||
return &models.Execution{
|
||||
ID: 1,
|
||||
PolicyID: 1,
|
||||
}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedOperationController) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
return 1, []*model.Task{
|
||||
func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||
return 1, []*models.Task{
|
||||
{
|
||||
ID: 1,
|
||||
ExecutionID: 1,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedOperationController) GetTask(id int64) (*model.Task, error) {
|
||||
func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
|
||||
if id == 1 {
|
||||
return &model.Task{
|
||||
return &models.Task{
|
||||
ID: 1,
|
||||
ExecutionID: 1,
|
||||
}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {
|
||||
return []byte("success"), nil
|
||||
}
|
||||
@ -174,7 +178,7 @@ func TestCreateExecution(t *testing.T) {
|
||||
request: &testingRequest{
|
||||
method: http.MethodPost,
|
||||
url: "/api/replication/executions",
|
||||
bodyJSON: &model.Execution{
|
||||
bodyJSON: &models.Execution{
|
||||
PolicyID: 2,
|
||||
},
|
||||
credential: sysAdmin,
|
||||
@ -186,7 +190,7 @@ func TestCreateExecution(t *testing.T) {
|
||||
request: &testingRequest{
|
||||
method: http.MethodPost,
|
||||
url: "/api/replication/executions",
|
||||
bodyJSON: &model.Execution{
|
||||
bodyJSON: &models.Execution{
|
||||
PolicyID: 1,
|
||||
},
|
||||
credential: sysAdmin,
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
@ -192,7 +193,7 @@ func (r *ReplicationPolicyAPI) Delete() {
|
||||
return
|
||||
}
|
||||
|
||||
_, executions, err := ng.OperationCtl.ListExecutions(&model.ExecutionQuery{
|
||||
_, executions, err := ng.OperationCtl.ListExecutions(&models.ExecutionQuery{
|
||||
PolicyID: id,
|
||||
})
|
||||
if err != nil {
|
||||
@ -201,7 +202,7 @@ func (r *ReplicationPolicyAPI) Delete() {
|
||||
}
|
||||
|
||||
for _, execution := range executions {
|
||||
if execution.Status == model.ExecutionStatusInProgress {
|
||||
if execution.Status == models.ExecutionStatusInProgress {
|
||||
r.HandleStatusPreconditionFailed(fmt.Sprintf("the policy %d has running executions, can not be deleted", id))
|
||||
return
|
||||
}
|
||||
|
@ -131,6 +131,7 @@ func initRouters() {
|
||||
beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan")
|
||||
beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication")
|
||||
beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob")
|
||||
beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask")
|
||||
beego.Router("/service/token", &token.Handler{})
|
||||
|
||||
beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post")
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api"
|
||||
"github.com/goharbor/harbor/src/replication/ng"
|
||||
)
|
||||
|
||||
var statusMap = map[string]string{
|
||||
@ -87,3 +88,13 @@ func (h *Handler) HandleReplication() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// HandleReplicationTask handles the webhook of replication task
|
||||
func (h *Handler) HandleReplicationTask() {
|
||||
log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status)
|
||||
if err := ng.OperationCtl.UpdateTaskStatus(h.id, h.status); err != nil {
|
||||
log.Errorf("Failed to update replication task status, id: %d, status: %s", h.id, h.status)
|
||||
h.HandleInternalServerError(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
13
src/replication/ng/dao/base.go
Normal file
13
src/replication/ng/dao/base.go
Normal file
@ -0,0 +1,13 @@
|
||||
package dao
|
||||
|
||||
import "github.com/astaxie/beego/orm"
|
||||
|
||||
func paginateForQuerySetter(qs orm.QuerySeter, page, size int64) orm.QuerySeter {
|
||||
if size > 0 {
|
||||
qs = qs.Limit(size)
|
||||
if page > 0 {
|
||||
qs = qs.Offset((page - 1) * size)
|
||||
}
|
||||
}
|
||||
return qs
|
||||
}
|
@ -32,7 +32,8 @@ func TestMain(m *testing.M) {
|
||||
"harbor_label", "harbor_resource_label", "harbor_user", "img_scan_job", "img_scan_overview",
|
||||
"job_log", "project", "project_member", "project_metadata", "properties", "registry",
|
||||
"replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng",
|
||||
"replication_target", "repository", "robot", "role", "schema_migrations", "user_group";`,
|
||||
"replication_target", "repository", "robot", "role", "schema_migrations", "user_group",
|
||||
"replication_execution", "replication_task";`,
|
||||
`DROP FUNCTION "update_update_time_at_column"();`,
|
||||
}
|
||||
dao.PrepareTestData(clearSqls, nil)
|
||||
|
370
src/replication/ng/dao/execution.go
Normal file
370
src/replication/ng/dao/execution.go
Normal file
@ -0,0 +1,370 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package dao
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
)
|
||||
|
||||
// AddExecution ...
|
||||
func AddExecution(execution *models.Execution) (int64, error) {
|
||||
o := dao.GetOrmer()
|
||||
now := time.Now()
|
||||
execution.StartTime = now
|
||||
|
||||
return o.Insert(execution)
|
||||
}
|
||||
|
||||
// GetTotalOfExecutions returns the total count of replication execution
|
||||
func GetTotalOfExecutions(query ...*models.ExecutionQuery) (int64, error) {
|
||||
qs := executionQueryConditions(query...)
|
||||
return qs.Count()
|
||||
}
|
||||
|
||||
// GetExecutions ...
|
||||
func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error) {
|
||||
executions := []*models.Execution{}
|
||||
|
||||
qs := executionQueryConditions(query...)
|
||||
if len(query) > 0 && query[0] != nil {
|
||||
qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size)
|
||||
}
|
||||
|
||||
qs = qs.OrderBy("-StartTime")
|
||||
|
||||
_, err := qs.All(&executions)
|
||||
return executions, err
|
||||
}
|
||||
|
||||
func executionQueryConditions(query ...*models.ExecutionQuery) orm.QuerySeter {
|
||||
qs := dao.GetOrmer().QueryTable(new(models.Execution))
|
||||
if len(query) == 0 || query[0] == nil {
|
||||
return qs
|
||||
}
|
||||
|
||||
q := query[0]
|
||||
if q.PolicyID != 0 {
|
||||
qs = qs.Filter("PolicyID", q.PolicyID)
|
||||
}
|
||||
if len(q.Trigger) > 0 {
|
||||
qs = qs.Filter("Trigger", q.Trigger)
|
||||
}
|
||||
if len(q.Statuses) > 0 {
|
||||
qs = qs.Filter("Status__in", q.Statuses)
|
||||
}
|
||||
return qs
|
||||
}
|
||||
|
||||
// GetExecution ...
|
||||
func GetExecution(id int64) (*models.Execution, error) {
|
||||
o := dao.GetOrmer()
|
||||
t := models.Execution{ID: id}
|
||||
err := o.Read(&t)
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return &t, err
|
||||
}
|
||||
|
||||
// DeleteExecution ...
|
||||
func DeleteExecution(id int64) error {
|
||||
o := dao.GetOrmer()
|
||||
_, err := o.Delete(&models.Execution{ID: id})
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteAllExecutions ...
|
||||
func DeleteAllExecutions(policyID int64) error {
|
||||
o := dao.GetOrmer()
|
||||
_, err := o.Delete(&models.Execution{PolicyID: policyID}, "PolicyID")
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateExecution ...
|
||||
func UpdateExecution(execution *models.Execution, props ...string) (int64, error) {
|
||||
if execution.ID == 0 {
|
||||
return 0, fmt.Errorf("execution ID is empty")
|
||||
}
|
||||
o := dao.GetOrmer()
|
||||
return o.Update(execution, props...)
|
||||
}
|
||||
|
||||
// AddTask ...
|
||||
func AddTask(task *models.Task) (int64, error) {
|
||||
o := dao.GetOrmer()
|
||||
sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status)
|
||||
values (?, ?, ?, ?, ?, ?) RETURNING id`
|
||||
|
||||
args := []interface{}{}
|
||||
args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status)
|
||||
|
||||
var taskID int64
|
||||
err := o.Raw(sql, args).QueryRow(&taskID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// GetTask ...
|
||||
func GetTask(id int64) (*models.Task, error) {
|
||||
o := dao.GetOrmer()
|
||||
sql := `select * from replication_task where id = ?`
|
||||
|
||||
var task models.Task
|
||||
|
||||
if err := o.Raw(sql, id).QueryRow(&task); err != nil {
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &task, nil
|
||||
}
|
||||
|
||||
// GetTotalOfTasks ...
|
||||
func GetTotalOfTasks(query ...*models.TaskQuery) (int64, error) {
|
||||
qs := taskQueryConditions(query...)
|
||||
return qs.Count()
|
||||
}
|
||||
|
||||
// GetTasks ...
|
||||
func GetTasks(query ...*models.TaskQuery) ([]*models.Task, error) {
|
||||
tasks := []*models.Task{}
|
||||
|
||||
qs := taskQueryConditions(query...)
|
||||
if len(query) > 0 && query[0] != nil {
|
||||
qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size)
|
||||
}
|
||||
|
||||
qs = qs.OrderBy("-StartTime")
|
||||
|
||||
_, err := qs.All(&tasks)
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
func taskQueryConditions(query ...*models.TaskQuery) orm.QuerySeter {
|
||||
qs := dao.GetOrmer().QueryTable(new(models.Task))
|
||||
if len(query) == 0 || query[0] == nil {
|
||||
return qs
|
||||
}
|
||||
|
||||
q := query[0]
|
||||
if q.ExecutionID != 0 {
|
||||
qs = qs.Filter("ExecutionID", q.ExecutionID)
|
||||
}
|
||||
if len(q.JobID) > 0 {
|
||||
qs = qs.Filter("JobID", q.JobID)
|
||||
}
|
||||
if len(q.ResourceType) > 0 {
|
||||
qs = qs.Filter("ResourceType", q.ResourceType)
|
||||
}
|
||||
if len(q.Statuses) > 0 {
|
||||
qs = qs.Filter("Status__in", q.Statuses)
|
||||
}
|
||||
return qs
|
||||
}
|
||||
|
||||
// DeleteTask ...
|
||||
func DeleteTask(id int64) error {
|
||||
o := dao.GetOrmer()
|
||||
_, err := o.Delete(&models.Task{ID: id})
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteAllTasks ...
|
||||
func DeleteAllTasks(executionID int64) error {
|
||||
o := dao.GetOrmer()
|
||||
_, err := o.Delete(&models.Task{ExecutionID: executionID}, "ExecutionID")
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateTask ...
|
||||
func UpdateTask(task *models.Task, props ...string) (int64, error) {
|
||||
if task.ID == 0 {
|
||||
return 0, fmt.Errorf("task ID is empty")
|
||||
}
|
||||
o := dao.GetOrmer()
|
||||
return o.Update(task, props...)
|
||||
}
|
||||
|
||||
// UpdateTaskStatus ...
|
||||
func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) {
|
||||
// can not use the globalOrm
|
||||
o := orm.NewOrm()
|
||||
o.Begin()
|
||||
|
||||
// query the task status
|
||||
var task models.Task
|
||||
sql := `select * from replication_task where id = ?`
|
||||
if err := o.Raw(sql, id).QueryRow(&task); err != nil {
|
||||
if err == orm.ErrNoRows {
|
||||
o.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// check status
|
||||
satisfy := false
|
||||
if len(statusCondition) == 0 {
|
||||
satisfy = true
|
||||
} else {
|
||||
for _, stCondition := range statusCondition {
|
||||
if task.Status == stCondition {
|
||||
satisfy = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !satisfy {
|
||||
o.Rollback()
|
||||
return 0, fmt.Errorf("Status condition not match ")
|
||||
}
|
||||
|
||||
// update status
|
||||
params := []interface{}{}
|
||||
sql = `update replication_task set status = ?`
|
||||
params = append(params, status)
|
||||
if taskFinished(status) { // should update endTime
|
||||
sql += ` ,end_time = ?`
|
||||
params = append(params, time.Now())
|
||||
}
|
||||
sql += ` where id = ?`
|
||||
params = append(params, id)
|
||||
_, err := o.Raw(sql, params).Exec()
|
||||
log.Infof("Update task %d: %s -> %s", id, task.Status, status)
|
||||
if err != nil {
|
||||
log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status)
|
||||
o.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// query the execution
|
||||
var execution models.Execution
|
||||
sql = `select * from replication_execution where id = ?`
|
||||
if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil {
|
||||
if err == orm.ErrNoRows {
|
||||
log.Errorf("Execution not found id: %d", task.ExecutionID)
|
||||
o.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
// check execution data
|
||||
execuStatus, _ := getStatus(task.Status)
|
||||
count := getStatusCount(&execution, execuStatus)
|
||||
if count <= 0 {
|
||||
log.Errorf("Task statistics in execution inconsistent")
|
||||
o.Commit()
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
// update execution data
|
||||
updateStatusCount(&execution, execuStatus, -1)
|
||||
execuStatusUp, _ := getStatus(status)
|
||||
updateStatusCount(&execution, execuStatusUp, 1)
|
||||
|
||||
resetExecutionStatus(&execution)
|
||||
_, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress,
|
||||
models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped,
|
||||
models.ExecutionPropsName.EndTime)
|
||||
if err != nil {
|
||||
log.Errorf("Update execution status failed %d: %v", execution.ID, err)
|
||||
o.Rollback()
|
||||
return 0, err
|
||||
}
|
||||
o.Commit()
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func taskFinished(status string) bool {
|
||||
if status == models.TaskStatusFailed || status == models.TaskStatusStopped || status == models.TaskStatusSucceed {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func getStatus(status string) (string, error) {
|
||||
switch status {
|
||||
case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress:
|
||||
return models.ExecutionStatusInProgress, nil
|
||||
case models.TaskStatusSucceed:
|
||||
return models.ExecutionStatusSucceed, nil
|
||||
case models.TaskStatusStopped:
|
||||
return models.ExecutionStatusStopped, nil
|
||||
case models.TaskStatusFailed:
|
||||
return models.ExecutionStatusFailed, nil
|
||||
}
|
||||
return "", fmt.Errorf("Not support task status ")
|
||||
}
|
||||
|
||||
func getStatusCount(execution *models.Execution, status string) int {
|
||||
switch status {
|
||||
case models.ExecutionStatusInProgress:
|
||||
return execution.InProgress
|
||||
case models.ExecutionStatusSucceed:
|
||||
return execution.Succeed
|
||||
case models.ExecutionStatusStopped:
|
||||
return execution.Stopped
|
||||
case models.ExecutionStatusFailed:
|
||||
return execution.Failed
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func updateStatusCount(execution *models.Execution, status string, delta int) error {
|
||||
switch status {
|
||||
case models.ExecutionStatusInProgress:
|
||||
execution.InProgress += delta
|
||||
case models.ExecutionStatusSucceed:
|
||||
execution.Succeed += delta
|
||||
case models.ExecutionStatusStopped:
|
||||
execution.Stopped += delta
|
||||
case models.ExecutionStatusFailed:
|
||||
execution.Failed += delta
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resetExecutionStatus(execution *models.Execution) error {
|
||||
status := generateStatus(execution)
|
||||
if status != execution.Status {
|
||||
execution.Status = status
|
||||
log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status)
|
||||
}
|
||||
if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 {
|
||||
// execution finished in this time
|
||||
execution.EndTime = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateStatus(execution *models.Execution) string {
|
||||
if execution.InProgress > 0 {
|
||||
return models.ExecutionStatusInProgress
|
||||
} else if execution.Failed > 0 {
|
||||
return models.ExecutionStatusFailed
|
||||
} else if execution.Stopped > 0 {
|
||||
return models.ExecutionStatusStopped
|
||||
}
|
||||
return models.ExecutionStatusSucceed
|
||||
}
|
237
src/replication/ng/dao/execution_test.go
Normal file
237
src/replication/ng/dao/execution_test.go
Normal file
@ -0,0 +1,237 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMethodOfExecution(t *testing.T) {
|
||||
execution1 := &models.Execution{
|
||||
PolicyID: 11209,
|
||||
Status: "InProgress",
|
||||
StatusText: "None",
|
||||
Total: 12,
|
||||
Failed: 0,
|
||||
Succeed: 7,
|
||||
InProgress: 5,
|
||||
Stopped: 0,
|
||||
Trigger: "Event",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
execution2 := &models.Execution{
|
||||
PolicyID: 11209,
|
||||
Status: "Failed",
|
||||
StatusText: "Network error",
|
||||
Total: 9,
|
||||
Failed: 1,
|
||||
Succeed: 8,
|
||||
InProgress: 0,
|
||||
Stopped: 0,
|
||||
Trigger: "Manual",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
|
||||
// test add
|
||||
id1, err := AddExecution(execution1)
|
||||
require.Nil(t, err)
|
||||
|
||||
_, err = AddExecution(execution2)
|
||||
require.Nil(t, err)
|
||||
|
||||
// test list
|
||||
query := &models.ExecutionQuery{
|
||||
Statuses: []string{"InProgress", "Failed"},
|
||||
Pagination: models.Pagination{
|
||||
Page: 1,
|
||||
Size: 10,
|
||||
},
|
||||
}
|
||||
executions, err := GetExecutions(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, 2, len(executions))
|
||||
|
||||
total, err := GetTotalOfExecutions(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(2), total)
|
||||
|
||||
// test get
|
||||
execution, err := GetExecution(id1)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, execution1.Status, execution.Status)
|
||||
|
||||
// test update
|
||||
executionNew := &models.Execution{
|
||||
ID: id1,
|
||||
Status: "Succeed",
|
||||
Succeed: 12,
|
||||
InProgress: 0,
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
n, err := UpdateExecution(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress,
|
||||
models.ExecutionPropsName.EndTime)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
// test delete
|
||||
require.Nil(t, DeleteExecution(execution1.ID))
|
||||
execution, err = GetExecution(execution1.ID)
|
||||
require.Nil(t, err)
|
||||
require.Nil(t, execution)
|
||||
|
||||
// test delete all
|
||||
require.Nil(t, DeleteAllExecutions(execution1.PolicyID))
|
||||
query = &models.ExecutionQuery{}
|
||||
n, err = GetTotalOfExecutions(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(0), n)
|
||||
}
|
||||
|
||||
func TestMethodOfTask(t *testing.T) {
|
||||
task1 := &models.Task{
|
||||
ExecutionID: 112200,
|
||||
ResourceType: "resourceType1",
|
||||
SrcResource: "srcResource1",
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: "Initialized",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
task2 := &models.Task{
|
||||
ExecutionID: 112200,
|
||||
ResourceType: "resourceType2",
|
||||
SrcResource: "srcResource2",
|
||||
DstResource: "dstResource2",
|
||||
JobID: "jobID2",
|
||||
Status: "Stopped",
|
||||
StartTime: time.Now(),
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
|
||||
// test add
|
||||
id1, err := AddTask(task1)
|
||||
require.Nil(t, err)
|
||||
|
||||
_, err = AddTask(task2)
|
||||
require.Nil(t, err)
|
||||
|
||||
// test list
|
||||
query := &models.TaskQuery{
|
||||
ResourceType: "resourceType1",
|
||||
Pagination: models.Pagination{
|
||||
Page: 1,
|
||||
Size: 10,
|
||||
},
|
||||
}
|
||||
tasks, err := GetTasks(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, 1, len(tasks))
|
||||
|
||||
total, err := GetTotalOfTasks(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), total)
|
||||
|
||||
// test get
|
||||
task, err := GetTask(id1)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, task1.Status, task.Status)
|
||||
|
||||
// test update
|
||||
taskNew := &models.Task{
|
||||
ID: id1,
|
||||
Status: "Failed",
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
// test delete
|
||||
require.Nil(t, DeleteTask(id1))
|
||||
task, err = GetTask(id1)
|
||||
require.Nil(t, err)
|
||||
require.Nil(t, task)
|
||||
|
||||
// test delete all
|
||||
require.Nil(t, DeleteAllTasks(task1.ExecutionID))
|
||||
query = &models.TaskQuery{}
|
||||
n, err = GetTotalOfTasks(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(0), n)
|
||||
}
|
||||
|
||||
func TestUpdateJobStatus(t *testing.T) {
|
||||
execution := &models.Execution{
|
||||
PolicyID: 11209,
|
||||
Status: "InProgress",
|
||||
StatusText: "None",
|
||||
Total: 12,
|
||||
Failed: 0,
|
||||
Succeed: 10,
|
||||
InProgress: 1,
|
||||
Stopped: 1,
|
||||
Trigger: "Event",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
executionID, _ := AddExecution(execution)
|
||||
task1 := &models.Task{
|
||||
ID: 20191,
|
||||
ExecutionID: executionID,
|
||||
ResourceType: "resourceType1",
|
||||
SrcResource: "srcResource1",
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: "Pending",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
task2 := &models.Task{
|
||||
ID: 20192,
|
||||
ExecutionID: executionID,
|
||||
ResourceType: "resourceType2",
|
||||
SrcResource: "srcResource2",
|
||||
DstResource: "dstResource2",
|
||||
JobID: "jobID2",
|
||||
Status: "Stopped",
|
||||
StartTime: time.Now(),
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
taskID1, _ := AddTask(task1)
|
||||
taskID2, _ := AddTask(task2)
|
||||
|
||||
defer func() {
|
||||
DeleteAllTasks(executionID)
|
||||
DeleteAllExecutions(11209)
|
||||
}()
|
||||
|
||||
// update Pending->InProgress
|
||||
n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
execu, err := GetExecution(executionID)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, execution.InProgress, execu.InProgress)
|
||||
assert.Equal(t, execution.Status, execu.Status)
|
||||
|
||||
// update InProgress->Failed: Execution.InProgress-1, Failed+1
|
||||
n, err = UpdateTaskStatus(taskID1, "Failed")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
execu, err = GetExecution(executionID)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, 1, execu.Failed)
|
||||
assert.Equal(t, "Failed", execu.Status)
|
||||
|
||||
// update Stopped->Pending: Execution.Stopped-1, InProgress+1
|
||||
n, err = UpdateTaskStatus(taskID2, "Pending")
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), n)
|
||||
|
||||
execu, err = GetExecution(executionID)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, 1, execu.InProgress)
|
||||
assert.Equal(t, "InProgress", execu.Status)
|
||||
}
|
19
src/replication/ng/dao/models/base.go
Normal file
19
src/replication/ng/dao/models/base.go
Normal file
@ -0,0 +1,19 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
)
|
||||
|
||||
func init() {
|
||||
orm.RegisterModel(
|
||||
new(Registry),
|
||||
new(RepPolicy),
|
||||
new(Execution),
|
||||
new(Task))
|
||||
}
|
||||
|
||||
// Pagination ...
|
||||
type Pagination struct {
|
||||
Page int64
|
||||
Size int64
|
||||
}
|
146
src/replication/ng/dao/models/execution.go
Normal file
146
src/replication/ng/dao/models/execution.go
Normal file
@ -0,0 +1,146 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// ExecutionTable is the table name for replication executions
|
||||
ExecutionTable = "replication_execution"
|
||||
// TaskTable is table name for replication tasks
|
||||
TaskTable = "replication_task"
|
||||
)
|
||||
|
||||
// execution/task status/trigger const
|
||||
const (
|
||||
ExecutionStatusFailed string = "Failed"
|
||||
ExecutionStatusSucceed string = "Succeed"
|
||||
ExecutionStatusStopped string = "Stopped"
|
||||
ExecutionStatusInProgress string = "InProgress"
|
||||
|
||||
ExecutionTriggerManual string = "Manual"
|
||||
ExecutionTriggerEvent string = "Event"
|
||||
ExecutionTriggerSchedule string = "Schedule"
|
||||
|
||||
// The task has been persisted in db but not submitted to Jobservice
|
||||
TaskStatusInitialized string = "Initialized"
|
||||
TaskStatusPending string = "Pending"
|
||||
TaskStatusInProgress string = "InProgress"
|
||||
TaskStatusSucceed string = "Succeed"
|
||||
TaskStatusFailed string = "Failed"
|
||||
TaskStatusStopped string = "Stopped"
|
||||
)
|
||||
|
||||
// ExecutionPropsName defines the names of fields of Execution
|
||||
var ExecutionPropsName = ExecutionFieldsName{
|
||||
ID: "ID",
|
||||
PolicyID: "PolicyID",
|
||||
Status: "Status",
|
||||
StatusText: "StatusText",
|
||||
Total: "Total",
|
||||
Failed: "Failed",
|
||||
Succeed: "Succeed",
|
||||
InProgress: "InProgress",
|
||||
Stopped: "Stopped",
|
||||
Trigger: "Trigger",
|
||||
StartTime: "StartTime",
|
||||
EndTime: "EndTime",
|
||||
}
|
||||
|
||||
// ExecutionFieldsName defines the props of Execution
|
||||
type ExecutionFieldsName struct {
|
||||
ID string
|
||||
PolicyID string
|
||||
Status string
|
||||
StatusText string
|
||||
Total string
|
||||
Failed string
|
||||
Succeed string
|
||||
InProgress string
|
||||
Stopped string
|
||||
Trigger string
|
||||
StartTime string
|
||||
EndTime string
|
||||
}
|
||||
|
||||
// Execution holds information about once replication execution.
|
||||
type Execution struct {
|
||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
||||
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
StatusText string `orm:"column(status_text)" json:"status_text"`
|
||||
Total int `orm:"column(total)" json:"total"`
|
||||
Failed int `orm:"column(failed)" json:"failed"`
|
||||
Succeed int `orm:"column(succeed)" json:"succeed"`
|
||||
InProgress int `orm:"column(in_progress)" json:"in_progress"`
|
||||
Stopped int `orm:"column(stopped)" json:"stopped"`
|
||||
Trigger string `orm:"column(trigger)" json:"trigger"`
|
||||
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
|
||||
EndTime time.Time `orm:"column(end_time)" json:"end_time"`
|
||||
}
|
||||
|
||||
// TaskPropsName defines the names of fields of Task
|
||||
var TaskPropsName = TaskFieldsName{
|
||||
ID: "ID",
|
||||
ExecutionID: "ExecutionID",
|
||||
ResourceType: "ResourceType",
|
||||
SrcResource: "SrcResource",
|
||||
DstResource: "DstResource",
|
||||
JobID: "JobID",
|
||||
Status: "Status",
|
||||
StartTime: "StartTime",
|
||||
EndTime: "EndTime",
|
||||
}
|
||||
|
||||
// TaskFieldsName defines the props of Task
|
||||
type TaskFieldsName struct {
|
||||
ID string
|
||||
ExecutionID string
|
||||
ResourceType string
|
||||
SrcResource string
|
||||
DstResource string
|
||||
JobID string
|
||||
Status string
|
||||
StartTime string
|
||||
EndTime string
|
||||
}
|
||||
|
||||
// Task represent the tasks in one execution.
|
||||
type Task struct {
|
||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
||||
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
|
||||
ResourceType string `orm:"column(resource_type)" json:"resource_type"`
|
||||
SrcResource string `orm:"column(src_resource)" json:"src_resource"`
|
||||
DstResource string `orm:"column(dst_resource)" json:"dst_resource"`
|
||||
JobID string `orm:"column(job_id)" json:"job_id"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
|
||||
EndTime time.Time `orm:"column(end_time)" json:"end_time"`
|
||||
}
|
||||
|
||||
// TableName is required by by beego orm to map Execution to table replication_execution
|
||||
func (r *Execution) TableName() string {
|
||||
return ExecutionTable
|
||||
}
|
||||
|
||||
// TableName is required by by beego orm to map Task to table replication_task
|
||||
func (r *Task) TableName() string {
|
||||
return TaskTable
|
||||
}
|
||||
|
||||
// ExecutionQuery holds the query conditions for replication executions
|
||||
type ExecutionQuery struct {
|
||||
PolicyID int64
|
||||
Statuses []string
|
||||
Trigger string
|
||||
Pagination
|
||||
}
|
||||
|
||||
// TaskQuery holds the query conditions for replication task
|
||||
type TaskQuery struct {
|
||||
ExecutionID int64
|
||||
JobID string
|
||||
Statuses []string
|
||||
ResourceType string
|
||||
Pagination
|
||||
}
|
@ -15,34 +15,37 @@
|
||||
package execution
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/core/utils"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
)
|
||||
|
||||
// Manager manages the executions
|
||||
type Manager interface {
|
||||
// Create a new execution
|
||||
Create(*model.Execution) (int64, error)
|
||||
Create(*models.Execution) (int64, error)
|
||||
// List the summaries of executions
|
||||
List(...*model.ExecutionQuery) (int64, []*model.Execution, error)
|
||||
List(...*models.ExecutionQuery) (int64, []*models.Execution, error)
|
||||
// Get the specified execution
|
||||
Get(int64) (*model.Execution, error)
|
||||
Get(int64) (*models.Execution, error)
|
||||
// Update the data of the specified execution, the "props" are the
|
||||
// properties of execution that need to be updated
|
||||
Update(execution *model.Execution, props ...string) error
|
||||
Update(execution *models.Execution, props ...string) error
|
||||
// Remove the execution specified by the ID
|
||||
Remove(int64) error
|
||||
// Remove all executions of one policy specified by the policy ID
|
||||
RemoveAll(int64) error
|
||||
// Create a task
|
||||
CreateTask(*model.Task) (int64, error)
|
||||
CreateTask(*models.Task) (int64, error)
|
||||
// List the tasks according to the query
|
||||
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
|
||||
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
|
||||
// Get one specified task
|
||||
GetTask(int64) (*model.Task, error)
|
||||
GetTask(int64) (*models.Task, error)
|
||||
// Update the task, the "props" are the properties of task
|
||||
// that need to be updated, it cannot include "status". If
|
||||
// you want to update the status, use "UpdateTaskStatus" instead
|
||||
UpdateTask(task *model.Task, props ...string) error
|
||||
UpdateTask(task *models.Task, props ...string) error
|
||||
// UpdateTaskStatus only updates the task status. If "statusCondition"
|
||||
// presents, only the tasks whose status equal to "statusCondition"
|
||||
// will be updated
|
||||
@ -54,3 +57,129 @@ type Manager interface {
|
||||
// Get the log of one specific task
|
||||
GetTaskLog(int64) ([]byte, error)
|
||||
}
|
||||
|
||||
// DefaultManager ..
|
||||
type DefaultManager struct {
|
||||
}
|
||||
|
||||
// NewDefaultManager ...
|
||||
func NewDefaultManager() (Manager, error) {
|
||||
return &DefaultManager{}, nil
|
||||
}
|
||||
|
||||
// Create a new execution
|
||||
func (dm *DefaultManager) Create(execution *models.Execution) (int64, error) {
|
||||
return dao.AddExecution(execution)
|
||||
}
|
||||
|
||||
// List the summaries of executions
|
||||
func (dm *DefaultManager) List(queries ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
total, err := dao.GetTotalOfExecutions(queries...)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
executions, err := dao.GetExecutions(queries...)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return total, executions, nil
|
||||
}
|
||||
|
||||
// Get the specified execution
|
||||
func (dm *DefaultManager) Get(id int64) (*models.Execution, error) {
|
||||
return dao.GetExecution(id)
|
||||
}
|
||||
|
||||
// Update ...
|
||||
func (dm *DefaultManager) Update(execution *models.Execution, props ...string) error {
|
||||
n, err := dao.UpdateExecution(execution, props...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("Execution not found error: %d ", execution.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the execution specified by the ID
|
||||
func (dm *DefaultManager) Remove(id int64) error {
|
||||
return dao.DeleteExecution(id)
|
||||
}
|
||||
|
||||
// RemoveAll executions of one policy specified by the policy ID
|
||||
func (dm *DefaultManager) RemoveAll(policyID int64) error {
|
||||
return dao.DeleteAllExecutions(policyID)
|
||||
}
|
||||
|
||||
// CreateTask used to create a task
|
||||
func (dm *DefaultManager) CreateTask(task *models.Task) (int64, error) {
|
||||
return dao.AddTask(task)
|
||||
}
|
||||
|
||||
// ListTasks list the tasks according to the query
|
||||
func (dm *DefaultManager) ListTasks(queries ...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||
total, err := dao.GetTotalOfTasks(queries...)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
tasks, err := dao.GetTasks(queries...)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return total, tasks, nil
|
||||
}
|
||||
|
||||
// GetTask get one specified task
|
||||
func (dm *DefaultManager) GetTask(id int64) (*models.Task, error) {
|
||||
return dao.GetTask(id)
|
||||
}
|
||||
|
||||
// UpdateTask ...
|
||||
func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error {
|
||||
n, err := dao.UpdateTask(task, props...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("Task not found error: %d ", task.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTaskStatus ...
|
||||
func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error {
|
||||
n, err := dao.UpdateTaskStatus(taskID, status, statusCondition...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
return fmt.Errorf("Update task status failed %d: -> %s ", taskID, status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveTask remove one task specified by task ID
|
||||
func (dm *DefaultManager) RemoveTask(id int64) error {
|
||||
return dao.DeleteTask(id)
|
||||
}
|
||||
|
||||
// RemoveAllTasks of one execution specified by the execution ID
|
||||
func (dm *DefaultManager) RemoveAllTasks(executionID int64) error {
|
||||
return dao.DeleteAllTasks(executionID)
|
||||
}
|
||||
|
||||
// GetTaskLog get the log of one specific task
|
||||
func (dm *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
|
||||
task, err := dao.GetTask(taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if task == nil {
|
||||
return nil, fmt.Errorf("Task not found %d ", taskID)
|
||||
}
|
||||
|
||||
return utils.GetJobServiceClient().GetJobLog(task.JobID)
|
||||
}
|
||||
|
149
src/replication/ng/execution/execution_test.go
Normal file
149
src/replication/ng/execution/execution_test.go
Normal file
@ -0,0 +1,149 @@
|
||||
package execution
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var executionManager, _ = NewDefaultManager()
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
databases := []string{"postgresql"}
|
||||
for _, database := range databases {
|
||||
log.Infof("run test cases for database: %s", database)
|
||||
result := 1
|
||||
switch database {
|
||||
case "postgresql":
|
||||
dao.PrepareTestForPostgresSQL()
|
||||
default:
|
||||
log.Fatalf("invalid database: %s", database)
|
||||
}
|
||||
|
||||
result = m.Run()
|
||||
|
||||
if result != 0 {
|
||||
os.Exit(result)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMethodOfExecutionManager(t *testing.T) {
|
||||
execution := &models.Execution{
|
||||
PolicyID: 11209,
|
||||
Status: "InProgress",
|
||||
StatusText: "None",
|
||||
Total: 12,
|
||||
Failed: 0,
|
||||
Succeed: 7,
|
||||
InProgress: 5,
|
||||
Stopped: 0,
|
||||
Trigger: "Event",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
executionManager.RemoveAll(execution.PolicyID)
|
||||
}()
|
||||
|
||||
// Create
|
||||
id, err := executionManager.Create(execution)
|
||||
require.Nil(t, err)
|
||||
|
||||
// List
|
||||
query := &models.ExecutionQuery{
|
||||
Statuses: []string{"InProgress", "Failed"},
|
||||
Pagination: models.Pagination{
|
||||
Page: 1,
|
||||
Size: 10,
|
||||
},
|
||||
}
|
||||
count, executions, err := executionManager.List(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), count)
|
||||
assert.Equal(t, 1, len(executions))
|
||||
|
||||
// Get
|
||||
_, err = executionManager.Get(id)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Update
|
||||
executionNew := &models.Execution{
|
||||
ID: id,
|
||||
Status: "Failed",
|
||||
Succeed: 12,
|
||||
InProgress: 0,
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
err = executionManager.Update(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress,
|
||||
models.ExecutionPropsName.EndTime)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Remove
|
||||
require.Nil(t, executionManager.Remove(id))
|
||||
}
|
||||
|
||||
func TestMethodOfTaskManager(t *testing.T) {
|
||||
task := &models.Task{
|
||||
ExecutionID: 112200,
|
||||
ResourceType: "resourceType1",
|
||||
SrcResource: "srcResource1",
|
||||
DstResource: "dstResource1",
|
||||
JobID: "jobID1",
|
||||
Status: "Initialized",
|
||||
StartTime: time.Now(),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
executionManager.RemoveAllTasks(task.ExecutionID)
|
||||
}()
|
||||
|
||||
// CreateTask
|
||||
id, err := executionManager.CreateTask(task)
|
||||
require.Nil(t, err)
|
||||
|
||||
// ListTasks
|
||||
query := &models.TaskQuery{
|
||||
ResourceType: "resourceType1",
|
||||
Pagination: models.Pagination{
|
||||
Page: 1,
|
||||
Size: 10,
|
||||
},
|
||||
}
|
||||
count, tasks, err := executionManager.ListTasks(query)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, 1, len(tasks))
|
||||
assert.Equal(t, int64(1), count)
|
||||
|
||||
// GetTask
|
||||
_, err = executionManager.GetTask(id)
|
||||
require.Nil(t, err)
|
||||
|
||||
// UpdateTask
|
||||
taskNew := &models.Task{
|
||||
ID: id,
|
||||
SrcResource: "srcResourceChanged",
|
||||
}
|
||||
err = executionManager.UpdateTask(taskNew, models.TaskPropsName.SrcResource)
|
||||
require.Nil(t, err)
|
||||
taskUpdate, _ := executionManager.GetTask(id)
|
||||
assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource)
|
||||
|
||||
// UpdateTaskStatus
|
||||
err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed)
|
||||
require.NotNil(t, err)
|
||||
taskUpdate, _ = executionManager.GetTask(id)
|
||||
assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status)
|
||||
|
||||
// Remove
|
||||
require.Nil(t, executionManager.RemoveTask(id))
|
||||
|
||||
// RemoveAll
|
||||
require.Nil(t, executionManager.RemoveAll(id))
|
||||
}
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -85,16 +86,16 @@ func (f *fakedRegistryManager) HealthCheck() error {
|
||||
|
||||
type fakedExecutionManager struct{}
|
||||
|
||||
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
|
||||
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
|
||||
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Update(*model.Execution, ...string) error {
|
||||
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Remove(int64) error {
|
||||
@ -103,16 +104,16 @@ func (f *fakedExecutionManager) Remove(int64) error {
|
||||
func (f *fakedExecutionManager) RemoveAll(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
|
||||
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
|
||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||
)
|
||||
@ -98,9 +99,9 @@ func newFlow(policy *model.Policy, registryMgr registry.Manager,
|
||||
}
|
||||
|
||||
func (f *flow) createExecution() (int64, error) {
|
||||
id, err := f.executionMgr.Create(&model.Execution{
|
||||
id, err := f.executionMgr.Create(&models.Execution{
|
||||
PolicyID: f.policy.ID,
|
||||
Status: model.ExecutionStatusInProgress,
|
||||
Status: models.ExecutionStatusInProgress,
|
||||
StartTime: time.Now(),
|
||||
})
|
||||
f.executionID = id
|
||||
@ -205,10 +206,10 @@ func (f *flow) preprocess() error {
|
||||
|
||||
func (f *flow) createTasks() error {
|
||||
for _, item := range f.scheduleItems {
|
||||
task := &model.Task{
|
||||
task := &models.Task{
|
||||
ExecutionID: f.executionID,
|
||||
Status: model.TaskStatusInitialized,
|
||||
ResourceType: item.SrcResource.Type,
|
||||
Status: models.TaskStatusInitialized,
|
||||
ResourceType: string(item.SrcResource.Type),
|
||||
SrcResource: getResourceName(item.SrcResource),
|
||||
DstResource: getResourceName(item.DstResource),
|
||||
}
|
||||
@ -240,17 +241,17 @@ func (f *flow) schedule() error {
|
||||
// task as failure
|
||||
if result.Error != nil {
|
||||
log.Errorf("failed to schedule task %d: %v", result.TaskID, err)
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusFailed); err != nil {
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
|
||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
allFailed = false
|
||||
// if the task is submitted successfully, update the status, job ID and start time
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil {
|
||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil {
|
||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
||||
}
|
||||
if err = f.executionMgr.UpdateTask(&model.Task{
|
||||
if err = f.executionMgr.UpdateTask(&models.Task{
|
||||
ID: result.TaskID,
|
||||
JobID: result.JobID,
|
||||
StartTime: time.Now(),
|
||||
@ -276,9 +277,9 @@ func (f *flow) markExecutionFailure(err error) {
|
||||
log.Errorf("the execution %d is marked as failure because of the error: %s",
|
||||
f.executionID, statusText)
|
||||
err = f.executionMgr.Update(
|
||||
&model.Execution{
|
||||
&models.Execution{
|
||||
ID: f.executionID,
|
||||
Status: model.ExecutionStatusFailed,
|
||||
Status: models.ExecutionStatusFailed,
|
||||
StatusText: statusText,
|
||||
EndTime: time.Now(),
|
||||
})
|
||||
|
@ -1,88 +0,0 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
)
|
||||
|
||||
// execution/task status/trigger const
|
||||
const (
|
||||
ExecutionStatusFailed string = "Failed"
|
||||
ExecutionStatusSucceed string = "Succeed"
|
||||
ExecutionStatusStopped string = "Stopped"
|
||||
ExecutionStatusInProgress string = "InProgress"
|
||||
|
||||
ExecutionTriggerManual string = "Manual"
|
||||
ExecutionTriggerEvent string = "Event"
|
||||
ExecutionTriggerSchedule string = "Schedule"
|
||||
|
||||
// The task has been persisted in db but not submitted to Jobservice
|
||||
TaskStatusInitialized string = "Initialized"
|
||||
TaskStatusPending string = "Pending"
|
||||
TaskStatusInProgress string = "InProgress"
|
||||
TaskStatusSucceed string = "Succeed"
|
||||
TaskStatusFailed string = "Failed"
|
||||
TaskStatusStopped string = "Stopped"
|
||||
)
|
||||
|
||||
// Execution defines an execution of the replication
|
||||
type Execution struct {
|
||||
ID int64 `json:"id"`
|
||||
PolicyID int64 `json:"policy_id"`
|
||||
Status string `json:"status"`
|
||||
StatusText string `json:"status_text"`
|
||||
Trigger string `json:"trigger"`
|
||||
Total int `json:"total"`
|
||||
Failed int `json:"failed"`
|
||||
Succeed int `json:"succeed"`
|
||||
Pending int `json:"pending"`
|
||||
InProgress int `json:"in_progress"`
|
||||
Stopped int `json:"stopped"`
|
||||
Initialized int `json:"initialized"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
|
||||
// Task holds the information of one replication task
|
||||
type Task struct {
|
||||
ID int64 `json:"id"`
|
||||
ExecutionID int64 `json:"execution_id"`
|
||||
ResourceType ResourceType `json:"resource_type"`
|
||||
SrcResource string `json:"src_resource"`
|
||||
DstResource string `json:"dst_resource"`
|
||||
JobID string `json:"job_id"`
|
||||
Status string `json:"status"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
|
||||
// ExecutionQuery defines the query conditions for listing executions
|
||||
type ExecutionQuery struct {
|
||||
PolicyID int64
|
||||
Status string
|
||||
Trigger string
|
||||
models.Pagination
|
||||
}
|
||||
|
||||
// TaskQuery defines the query conditions for listing tasks
|
||||
type TaskQuery struct {
|
||||
ExecutionID int64
|
||||
ResourceType ResourceType
|
||||
Status string
|
||||
models.Pagination
|
||||
}
|
@ -15,6 +15,7 @@
|
||||
package operation
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||
"github.com/goharbor/harbor/src/replication/ng/flow"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
@ -25,10 +26,11 @@ import (
|
||||
type Controller interface {
|
||||
StartReplication(policy *model.Policy) (int64, error)
|
||||
StopReplication(int64) error
|
||||
ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error)
|
||||
GetExecution(int64) (*model.Execution, error)
|
||||
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
|
||||
GetTask(int64) (*model.Task, error)
|
||||
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
|
||||
GetExecution(int64) (*models.Execution, error)
|
||||
ListTasks(...*models.TaskQuery) (int64, []*models.Task, error)
|
||||
GetTask(int64) (*models.Task, error)
|
||||
UpdateTaskStatus(id int64, status string, statusCondition ...string) error
|
||||
GetTaskLog(int64) ([]byte, error)
|
||||
}
|
||||
|
||||
@ -51,18 +53,21 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error
|
||||
func (d *defaultController) StopReplication(executionID int64) error {
|
||||
return d.flowCtl.StopReplication(executionID)
|
||||
}
|
||||
func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
return d.executionMgr.List(query...)
|
||||
}
|
||||
func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) {
|
||||
func (d *defaultController) GetExecution(executionID int64) (*models.Execution, error) {
|
||||
return d.executionMgr.Get(executionID)
|
||||
}
|
||||
func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
func (d *defaultController) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||
return d.executionMgr.ListTasks(query...)
|
||||
}
|
||||
func (d *defaultController) GetTask(id int64) (*model.Task, error) {
|
||||
func (d *defaultController) GetTask(id int64) (*models.Task, error) {
|
||||
return d.executionMgr.GetTask(id)
|
||||
}
|
||||
func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
|
||||
return d.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
|
||||
}
|
||||
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
|
||||
return d.executionMgr.GetTaskLog(taskID)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package operation
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -33,22 +34,22 @@ func (f *fakedFlowController) StopReplication(int64) error {
|
||||
|
||||
type fakedExecutionManager struct{}
|
||||
|
||||
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
|
||||
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
return 1, []*model.Execution{
|
||||
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||
return 1, []*models.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
|
||||
return &model.Execution{
|
||||
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
|
||||
return &models.Execution{
|
||||
ID: 1,
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Update(*model.Execution, ...string) error {
|
||||
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Remove(int64) error {
|
||||
@ -57,20 +58,20 @@ func (f *fakedExecutionManager) Remove(int64) error {
|
||||
func (f *fakedExecutionManager) RemoveAll(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
|
||||
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
return 1, []*model.Task{
|
||||
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||
return 1, []*models.Task{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
|
||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||
|
Loading…
Reference in New Issue
Block a user