From 4484bca7564b449970b8faf6da828a72ddf455eb Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 2 Apr 2019 13:29:17 +0800 Subject: [PATCH] Fix replication related issues 1. Add operation property for tasks 2. Add trigger property for executions 3. Update the getting registry info API to allow passing 0 as ID to get the info of local Harbor registry Signed-off-by: Wenkai Yin --- .../postgresql/0004_1.8.0_schema.up.sql | 1 + src/core/api/registry.go | 27 ++++++++++++++----- src/core/api/replication_execution.go | 5 +++- src/core/api/replication_execution_test.go | 2 +- src/core/router.go | 3 ++- src/replication/ng/dao/models/execution.go | 2 +- src/replication/ng/event/handler.go | 7 ++--- src/replication/ng/event/handler_test.go | 2 +- src/replication/ng/operation/controller.go | 14 ++++++---- .../ng/operation/controller_test.go | 8 +++--- src/replication/ng/operation/flow/stage.go | 5 ++++ .../ng/operation/hook/task_test.go | 2 +- .../ng/policy/scheduler/scheduler.go | 4 ++- 13 files changed, 56 insertions(+), 26 deletions(-) diff --git a/make/migrations/postgresql/0004_1.8.0_schema.up.sql b/make/migrations/postgresql/0004_1.8.0_schema.up.sql index 7ab140ed2..83cbf3a96 100644 --- a/make/migrations/postgresql/0004_1.8.0_schema.up.sql +++ b/make/migrations/postgresql/0004_1.8.0_schema.up.sql @@ -85,6 +85,7 @@ create table replication_task ( resource_type varchar(64), src_resource varchar(256), dst_resource varchar(256), + operation varchar(32), job_id varchar(64), status varchar(32), start_time timestamp default CURRENT_TIMESTAMP, diff --git a/src/core/api/registry.go b/src/core/api/registry.go index 03a28fd37..5def7fa92 100644 --- a/src/core/api/registry.go +++ b/src/core/api/registry.go @@ -5,6 +5,8 @@ import ( "net/http" "strconv" + "github.com/goharbor/harbor/src/replication/ng/event" + "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api/models" @@ -215,16 +217,27 @@ func (t *RegistryAPI) Delete() { // GetInfo returns the base info and capability declarations of the registry func (t *RegistryAPI) GetInfo() { - id := t.GetIDFromURL() - registry, err := t.manager.Get(id) - if err != nil { - t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err)) + id, err := t.GetInt64FromPath(":id") + // "0" is used for the ID of the local Harbor registry + if err != nil || id < 0 { + t.HandleBadRequest(fmt.Sprintf("invalid registry ID %s", t.GetString(":id"))) return } - if registry == nil { - t.HandleNotFound(fmt.Sprintf("registry %d not found", id)) - return + var registry *model.Registry + if id == 0 { + registry = event.GetLocalRegistry() + } else { + registry, err = t.manager.Get(id) + if err != nil { + t.HandleInternalServerError(fmt.Sprintf("failed to get registry %d: %v", id, err)) + return + } + if registry == nil { + t.HandleNotFound(fmt.Sprintf("registry %d not found", id)) + return + } } + factory, err := adapter.GetFactory(registry.Type) if err != nil { t.HandleInternalServerError(fmt.Sprintf("failed to get the adapter factory for registry type %s: %v", registry.Type, err)) diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 8db30e0bd..aab260e4d 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -19,6 +19,8 @@ import ( "net/http" "strconv" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/event" "github.com/goharbor/harbor/src/replication/ng" @@ -120,7 +122,8 @@ func (r *ReplicationOperationAPI) CreateExecution() { return } - executionID, err := ng.OperationCtl.StartReplication(policy, nil) + trigger := r.GetString("trigger", model.TriggerTypeManual) + executionID, err := ng.OperationCtl.StartReplication(policy, nil, trigger) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err)) return diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index 3fe1e211a..2524c1405 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -25,7 +25,7 @@ import ( type fakedOperationController struct{} -func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { +func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) { return 1, nil } func (f *fakedOperationController) StopReplication(int64) error { diff --git a/src/core/router.go b/src/core/router.go index e1adffaa3..5fbd01bce 100644 --- a/src/core/router.go +++ b/src/core/router.go @@ -135,7 +135,8 @@ func initRouters() { beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post") beego.Router("/api/registries/:id([0-9]+)", &api.RegistryAPI{}, "get:Get;put:Put;delete:Delete") - beego.Router("/api/registries/:id([0-9]+)/info", &api.RegistryAPI{}, "get:GetInfo") + // we use "0" as the ID of the local Harbor registry, so don't add "([0-9]+)" in the path + beego.Router("/api/registries/:id/info", &api.RegistryAPI{}, "get:GetInfo") beego.Router("/v2/*", &controllers.RegistryProxy{}, "*:Handle") diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go index 6304268c5..15119be4f 100644 --- a/src/replication/ng/dao/models/execution.go +++ b/src/replication/ng/dao/models/execution.go @@ -106,13 +106,13 @@ type TaskFieldsName struct { } // Task represent the tasks in one execution. -// TODO add operation property type Task struct { ID int64 `orm:"pk;auto;column(id)" json:"id"` ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` ResourceType string `orm:"column(resource_type)" json:"resource_type"` SrcResource string `orm:"column(src_resource)" json:"src_resource"` DstResource string `orm:"column(dst_resource)" json:"dst_resource"` + Operation string `orm:"column(operation)" json:"operation"` JobID string `orm:"column(job_id)" json:"job_id"` Status string `orm:"column(status)" json:"status"` StartTime time.Time `orm:"column(start_time)" json:"start_time"` diff --git a/src/replication/ng/event/handler.go b/src/replication/ng/event/handler.go index 7141c0699..d9d078cb0 100644 --- a/src/replication/ng/event/handler.go +++ b/src/replication/ng/event/handler.go @@ -76,7 +76,7 @@ func (h *handler) Handle(event *Event) error { if err := PopulateRegistries(h.registryMgr, policy); err != nil { return err } - id, err := h.opCtl.StartReplication(policy, event.Resource) + id, err := h.opCtl.StartReplication(policy, event.Resource, model.TriggerTypeEventBased) if err != nil { return err } @@ -140,7 +140,7 @@ func PopulateRegistries(registryMgr registry.Manager, policy *model.Policy) erro func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model.Registry, error) { if registry == nil || registry.ID == 0 { - return getLocalRegistry(), nil + return GetLocalRegistry(), nil } reg, err := registryMgr.Get(registry.ID) if err != nil { @@ -152,7 +152,8 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model return reg, nil } -func getLocalRegistry() *model.Registry { +// GetLocalRegistry returns the info of the local Harbor registry +func GetLocalRegistry() *model.Registry { return &model.Registry{ Type: model.RegistryTypeHarbor, Name: "Local", diff --git a/src/replication/ng/event/handler_test.go b/src/replication/ng/event/handler_test.go index 4cdb58cc3..3c189e80b 100644 --- a/src/replication/ng/event/handler_test.go +++ b/src/replication/ng/event/handler_test.go @@ -28,7 +28,7 @@ import ( type fakedOperationController struct{} -func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { +func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) { return 1, nil } func (f *fakedOperationController) StopReplication(int64) error { diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 924bd8977..2c7f702be 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -31,7 +31,8 @@ import ( // Controller handles the replication-related operations: start, // stop, query, etc. type Controller interface { - StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) + // trigger is used to specified that what this replication is triggered by + StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) StopReplication(int64) error ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) GetExecution(int64) (*models.Execution, error) @@ -56,12 +57,14 @@ type controller struct { scheduler scheduler.Scheduler } -func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { +func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger string) (int64, error) { if resource != nil && len(resource.Metadata.Vtags) != 1 { return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags) } - - id, err := createExecution(c.executionMgr, policy.ID) + if len(trigger) == 0 { + trigger = model.TriggerTypeManual + } + id, err := createExecution(c.executionMgr, policy.ID, trigger) if err != nil { return 0, err } @@ -139,9 +142,10 @@ func (c *controller) GetTaskLog(taskID int64) ([]byte, error) { } // create the execution record in database -func createExecution(mgr execution.Manager, policyID int64) (int64, error) { +func createExecution(mgr execution.Manager, policyID int64, trigger string) (int64, error) { id, err := mgr.Create(&models.Execution{ PolicyID: policyID, + Trigger: trigger, Status: models.ExecutionStatusInProgress, StartTime: time.Now(), }) diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go index bfe21ec5d..ff28fb7f7 100644 --- a/src/replication/ng/operation/controller_test.go +++ b/src/replication/ng/operation/controller_test.go @@ -232,24 +232,24 @@ func TestStartReplication(t *testing.T) { Vtags: []string{"1.0", "2.0"}, }, } - _, err = ctl.StartReplication(policy, resource) + _, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.NotNil(t, err) // replicate resource deletion resource.Metadata.Vtags = []string{"1.0"} resource.Deleted = true - id, err := ctl.StartReplication(policy, resource) + id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // replicate resource copy resource.Deleted = false - id, err = ctl.StartReplication(policy, resource) + id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // nil resource - id, err = ctl.StartReplication(policy, nil) + id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) } diff --git a/src/replication/ng/operation/flow/stage.go b/src/replication/ng/operation/flow/stage.go index f47b80b4a..ae34b94de 100644 --- a/src/replication/ng/operation/flow/stage.go +++ b/src/replication/ng/operation/flow/stage.go @@ -263,12 +263,17 @@ func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*mod // create task records in database func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error { for _, item := range items { + operation := "copy" + if item.DstResource.Deleted { + operation = "deletion" + } task := &models.Task{ ExecutionID: executionID, Status: models.TaskStatusInitialized, ResourceType: string(item.SrcResource.Type), SrcResource: getResourceName(item.SrcResource), DstResource: getResourceName(item.DstResource), + Operation: operation, } id, err := mgr.CreateTask(task) if err != nil { diff --git a/src/replication/ng/operation/hook/task_test.go b/src/replication/ng/operation/hook/task_test.go index 547743d45..6a88a390b 100644 --- a/src/replication/ng/operation/hook/task_test.go +++ b/src/replication/ng/operation/hook/task_test.go @@ -28,7 +28,7 @@ type fakedOperationController struct { status string } -func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource) (int64, error) { +func (f *fakedOperationController) StartReplication(*model.Policy, *model.Resource, string) (int64, error) { return 0, nil } func (f *fakedOperationController) StopReplication(int64) error { diff --git a/src/replication/ng/policy/scheduler/scheduler.go b/src/replication/ng/policy/scheduler/scheduler.go index c29c8470e..791fb65e9 100644 --- a/src/replication/ng/policy/scheduler/scheduler.go +++ b/src/replication/ng/policy/scheduler/scheduler.go @@ -19,6 +19,8 @@ import ( "net/http" "time" + "github.com/goharbor/harbor/src/replication/ng/model" + common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/job" job_models "github.com/goharbor/harbor/src/common/job/models" @@ -60,7 +62,7 @@ func (s *scheduler) Schedule(policyID int64, cron string) error { } log.Debugf("the schedule job record %d added", id) - replicateURL := fmt.Sprintf("%s/api/replication/executions", config.Config.CoreURL) + replicateURL := fmt.Sprintf("%s/api/replication/executions?trigger=%s", config.Config.CoreURL, model.TriggerTypeScheduled) statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id) jobID, err := s.jobservice.SubmitJob(&job_models.JobData{ Name: job.Scheduler,