From e12da7264ad595099d8eb584c5d8e17f440df3a1 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 25 Jan 2019 13:22:37 +0800 Subject: [PATCH] Implement the replication flow controller 1. Implement the replication flow controller 2. Implement the registry adapter management 3. Define the interface for PolicyManager, RegistryManager, ExecutionManager, TaskManager and Scheduler Signed-off-by: Wenkai Yin --- src/replication/ng/adapter/adapter.go | 80 ++++++++ src/replication/ng/adapter/adapter_test.go | 49 +++++ src/replication/ng/execution/execution.go | 51 +++++ src/replication/ng/flow/controller.go | 99 ++++++++++ src/replication/ng/flow/controller_test.go | 183 ++++++++++++++++++ src/replication/ng/flow/flow.go | 206 +++++++++++++++++++++ src/replication/ng/model/execution.go | 44 ++++- src/replication/ng/model/namespace.go | 5 + src/replication/ng/model/policy.go | 15 +- src/replication/ng/model/registry.go | 15 ++ src/replication/ng/model/resource.go | 15 +- src/replication/ng/policy/manager.go | 34 ++++ src/replication/ng/registry/manager.go | 34 ++++ src/replication/ng/scheduler/scheduler.go | 27 +++ src/replication/ng/transfer/transfer.go | 8 +- 15 files changed, 855 insertions(+), 10 deletions(-) create mode 100644 src/replication/ng/adapter/adapter.go create mode 100644 src/replication/ng/adapter/adapter_test.go create mode 100644 src/replication/ng/execution/execution.go create mode 100644 src/replication/ng/flow/controller.go create mode 100644 src/replication/ng/flow/controller_test.go create mode 100644 src/replication/ng/flow/flow.go create mode 100644 src/replication/ng/policy/manager.go create mode 100644 src/replication/ng/registry/manager.go create mode 100644 src/replication/ng/scheduler/scheduler.go diff --git a/src/replication/ng/adapter/adapter.go b/src/replication/ng/adapter/adapter.go new file mode 100644 index 000000000..c60ebf2a2 --- /dev/null +++ b/src/replication/ng/adapter/adapter.go @@ -0,0 +1,80 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +import ( + "errors" + "fmt" + + "github.com/goharbor/harbor/src/replication/ng/model" +) + +var registry = map[model.RegistryType]Factory{} + +// Factory creates a specific Adapter according to the params +type Factory func(*model.Registry) (Adapter, error) + +// Info provides base info and capability declarations of the adapter +type Info struct { + Name model.RegistryType `json:"name"` + Description string `json:"description"` + SupportedResourceTypes []model.ResourceType `json:"supported_resource_types"` + SupportedResourceFilters []model.FilterType `json:"supported_resource_filters"` +} + +// Adapter interface defines the capabilities of registry +type Adapter interface { + // Info return the information of this adapter + Info() *Info + // Lists the available namespaces under the specified registry with the + // provided credential/token + ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) + // Create a new namespace + // This method should guarantee it's idempotent + // And returns nil if a namespace with the same name already exists + CreateNamespace(*model.Namespace) error + // Get the namespace specified by the name, the returning value should + // contain the metadata about the namespace if it has + GetNamespace(string) (*model.Namespace, error) + // Fetch the content resource under the namespace by filters + // SUGGESTION: Adapter provider can do their own filter based on the filter pattern + // or call the default `DoFilter` function of the filter to complete resource filtering. + FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error) +} + +// RegisterFactory registers one adapter factory to the registry +func RegisterFactory(name model.RegistryType, factory Factory) error { + if !name.Valid() { + return errors.New("invalid adapter factory name") + } + if factory == nil { + return errors.New("empty adapter factory") + } + if _, exist := registry[name]; exist { + return fmt.Errorf("adapter factory for %s already exists", name) + } + registry[name] = factory + return nil +} + +// GetFactory gets the adapter factory by the specified name +func GetFactory(name model.RegistryType) (Factory, error) { + factory, exist := registry[name] + if !exist { + return nil, fmt.Errorf("adapter factory for %s not found", name) + } + + return factory, nil +} diff --git a/src/replication/ng/adapter/adapter_test.go b/src/replication/ng/adapter/adapter_test.go new file mode 100644 index 000000000..73a20014b --- /dev/null +++ b/src/replication/ng/adapter/adapter_test.go @@ -0,0 +1,49 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +import ( + "testing" + + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func fakedFactory(*model.Registry) (Adapter, error) { + return nil, nil +} + +func TestRegisterFactory(t *testing.T) { + // empty name + assert.NotNil(t, RegisterFactory("", nil)) + // empty factory + assert.NotNil(t, RegisterFactory("factory", nil)) + // pass + assert.Nil(t, RegisterFactory("factory", fakedFactory)) + // already exists + assert.NotNil(t, RegisterFactory("factory", fakedFactory)) +} + +func TestGetFactory(t *testing.T) { + registry = map[model.RegistryType]Factory{} + require.Nil(t, RegisterFactory("factory", fakedFactory)) + // doesn't exist + _, err := GetFactory("another_factory") + assert.NotNil(t, err) + // pass + _, err = GetFactory("factory") + assert.Nil(t, err) +} diff --git a/src/replication/ng/execution/execution.go b/src/replication/ng/execution/execution.go new file mode 100644 index 000000000..a0a0649fc --- /dev/null +++ b/src/replication/ng/execution/execution.go @@ -0,0 +1,51 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execution + +import ( + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Manager manages the executions +type Manager interface { + // Create a new execution + Create(*model.Execution) (int64, error) + // List the summaries of executions + List(*model.ExecutionQuery) (int64, []*model.Execution, error) + // Get the specified execution + Get(int64) (*model.Execution, error) + // Update the data of the specified execution, the "props" are the + // properties of execution that need to be updated + Update(execution *model.Execution, props ...string) error + // Remove the execution specified by the ID + Remove(int64) error + // Remove all executions of one policy specified by the policy ID + RemoveAll(int64) error + // Create a task + CreateTask(*model.Task) (int64, error) + // List the tasks according to the query + ListTasks(*model.TaskQuery) (int64, []*model.Task, error) + // Get one specified task + GetTask(int64) (*model.Task, error) + // Update the task, the "props" are the properties of task + // that need to be updated + UpdateTask(task *model.Task, props ...string) error + // Remove one task specified by task ID + RemoveTask(int64) error + // Remove all tasks of one execution specified by the execution ID + RemoveAllTasks(int64) error + // Get the log of one specific task + GetTaskLog(int64) ([]byte, error) +} diff --git a/src/replication/ng/flow/controller.go b/src/replication/ng/flow/controller.go new file mode 100644 index 000000000..51fcb30f8 --- /dev/null +++ b/src/replication/ng/flow/controller.go @@ -0,0 +1,99 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "errors" + "fmt" + + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/model" + + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" +) + +// Controller controls the replication flow +type Controller interface { + // Start a replication according to the policy and returns the + // execution ID and error + StartReplication(policy *model.Policy) (int64, error) + // Stop the replication specified by the execution ID + StopReplication(int64) error +} + +// NewController returns an instance of a Controller +func NewController(registryMgr registry.Manager, + executionMgr execution.Manager, scheduler scheduler.Scheduler) (Controller, error) { + if registryMgr == nil || executionMgr == nil || scheduler == nil { + return nil, errors.New("invalid params") + } + return &defaultController{ + registryMgr: registryMgr, + executionMgr: executionMgr, + scheduler: scheduler, + }, nil +} + +// defaultController is the default implement for the Controller +type defaultController struct { + registryMgr registry.Manager + executionMgr execution.Manager + scheduler scheduler.Scheduler +} + +// Replicate according the to policy ID +func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) { + log.Infof("starting the replication based on the policy %d ...", policy.ID) + + flow, err := newFlow(policy, d.registryMgr, d.executionMgr, d.scheduler) + if err != nil { + return 0, fmt.Errorf("failed to create the flow object based on policy %d: %v", policy.ID, err) + } + + // create the execution record + id, err := flow.createExecution() + if err != nil { + return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policy.ID, err) + } + + // fetch resources from the source registry + if err := flow.fetchResources(); err != nil { + // just log the error message and return the execution ID + log.Errorf("failed to fetch resources for the execution %d: %v", id, err) + return id, nil + } + + // create the namespace on the destination registry + if err = flow.createNamespace(); err != nil { + log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err) + return id, nil + } + + // schedule the replication + if err = flow.schedule(); err != nil { + log.Errorf("failed to schedule the execution %d: %v", id, err) + return id, nil + } + + log.Infof("the execution %d scheduled", id) + return id, nil +} + +func (d *defaultController) StopReplication(id int64) error { + // TODO + return nil +} diff --git a/src/replication/ng/flow/controller_test.go b/src/replication/ng/flow/controller_test.go new file mode 100644 index 000000000..607a0bf8f --- /dev/null +++ b/src/replication/ng/flow/controller_test.go @@ -0,0 +1,183 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "testing" + + "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakedPolicyManager struct{} + +func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) { + return 0, nil +} +func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) { + return 0, nil, nil +} +func (f *fakedPolicyManager) Get(int64) (*model.Policy, error) { + return &model.Policy{ + ID: 1, + SrcRegistryID: 1, + SrcNamespaces: []string{"library"}, + DestRegistryID: 2, + }, nil +} +func (f *fakedPolicyManager) Update(*model.Policy) error { + return nil +} +func (f *fakedPolicyManager) Remove(int64) error { + return nil +} + +type fakedRegistryManager struct{} + +func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) { + return 0, nil +} +func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) { + return 0, nil, nil +} +func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) { + if id == 1 { + return &model.Registry{ + Type: "faked_registry", + }, nil + } + if id == 2 { + return &model.Registry{ + Type: "faked_registry", + }, nil + } + return nil, nil +} +func (f *fakedRegistryManager) Update(*model.Registry, ...string) error { + return nil +} +func (f *fakedRegistryManager) Remove(int64) error { + return nil +} + +type fakedExecutionManager struct{} + +func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { + return 1, nil +} +func (f *fakedExecutionManager) List(*model.ExecutionQuery) (int64, []*model.Execution, error) { + return 0, nil, nil +} +func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { + return nil, nil +} +func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { + return nil +} +func (f *fakedExecutionManager) Remove(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAll(int64) error { + return nil +} +func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { + return 0, nil +} +func (f *fakedExecutionManager) ListTasks(*model.TaskQuery) (int64, []*model.Task, error) { + return 0, nil, nil +} +func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { + return nil, nil +} +func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { + return nil +} +func (f *fakedExecutionManager) RemoveTask(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAllTasks(int64) error { + return nil +} +func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { + return nil, nil +} + +type fakedScheduler struct{} + +func (f *fakedScheduler) Schedule(src []*model.Resource, dst []*model.Resource) ([]*model.Task, error) { + return []*model.Task{ + { + Status: model.TaskStatusPending, + JobID: "uuid", + }, + }, nil +} +func (f *fakedScheduler) Stop(id string) error { + return nil +} + +func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) { + return &fakedAdapter{}, nil +} + +type fakedAdapter struct{} + +func (f *fakedAdapter) Info() *adapter.Info { + return nil +} +func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) { + return nil, nil +} +func (f *fakedAdapter) CreateNamespace(*model.Namespace) error { + return nil +} +func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) { + return &model.Namespace{}, nil +} +func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error) { + return []*model.Resource{ + { + Type: model.ResourceTypeRepository, + Metadata: &model.ResourceMetadata{ + Name: "hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + Override: false, + }, + }, nil +} + +func TestStartReplication(t *testing.T) { + err := adapter.RegisterFactory("faked_registry", fakedAdapterFactory) + require.Nil(t, err) + + controller, _ := NewController( + &fakedRegistryManager{}, + &fakedExecutionManager{}, + &fakedScheduler{}) + + policy := &model.Policy{ + ID: 1, + SrcRegistryID: 1, + DestRegistryID: 2, + DestNamespace: "library", + } + id, err := controller.StartReplication(policy) + require.Nil(t, err) + assert.Equal(t, id, int64(1)) +} diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go new file mode 100644 index 000000000..1896e63b8 --- /dev/null +++ b/src/replication/ng/flow/flow.go @@ -0,0 +1,206 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "fmt" + "time" + + "github.com/goharbor/harbor/src/replication/ng/scheduler" + + "github.com/goharbor/harbor/src/replication/ng/execution" + + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/registry" +) + +type flow struct { + policy *model.Policy + srcRegistry *model.Registry + dstRegistry *model.Registry + srcAdapter adapter.Adapter + dstAdapter adapter.Adapter + executionID int64 + resources []*model.Resource + executionMgr execution.Manager + scheduler scheduler.Scheduler +} + +func newFlow(policy *model.Policy, registryMgr registry.Manager, + executionMgr execution.Manager, scheduler scheduler.Scheduler) (*flow, error) { + + f := &flow{ + policy: policy, + executionMgr: executionMgr, + scheduler: scheduler, + } + + // get source registry + srcRegistry, err := registryMgr.Get(policy.SrcRegistryID) + if err != nil { + return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err) + } + if srcRegistry == nil { + return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID) + } + f.srcRegistry = srcRegistry + + // get destination registry + dstRegistry, err := registryMgr.Get(policy.DestRegistryID) + if err != nil { + return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err) + } + if dstRegistry == nil { + return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID) + } + f.dstRegistry = dstRegistry + + // create the source registry adapter + srcFactory, err := adapter.GetFactory(srcRegistry.Type) + if err != nil { + return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err) + } + srcAdapter, err := srcFactory(srcRegistry) + if err != nil { + return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err) + } + f.srcAdapter = srcAdapter + + // create the destination registry adapter + dstFactory, err := adapter.GetFactory(dstRegistry.Type) + if err != nil { + return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err) + } + dstAdapter, err := dstFactory(dstRegistry) + if err != nil { + return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err) + } + f.dstAdapter = dstAdapter + + return f, nil +} + +func (f *flow) createExecution() (int64, error) { + id, err := f.executionMgr.Create(&model.Execution{ + PolicyID: f.policy.ID, + Status: model.ExecutionStatusInProgress, + StartTime: time.Now(), + }) + f.executionID = id + log.Debugf("an execution record for replication based on the policy %d created: %d", f.policy.ID, id) + return id, err +} + +func (f *flow) fetchResources() error { + resources, err := f.srcAdapter.FetchResources(f.policy.SrcNamespaces, f.policy.Filters) + f.resources = resources + if err != nil { + f.markExecutionFailure(err) + return err + } + + log.Debugf("resources for the execution %d fetched from the source registry", f.executionID) + return nil +} + +func (f *flow) createNamespace() error { + // merge the metadata of all source namespaces + metadata := map[string]interface{}{} + for _, srcNamespace := range f.policy.SrcNamespaces { + namespace, err := f.srcAdapter.GetNamespace(srcNamespace) + if err != nil { + f.markExecutionFailure(err) + return err + } + for key, value := range namespace.Metadata { + metadata[namespace.Name+":"+key] = value + } + } + + if err := f.dstAdapter.CreateNamespace(&model.Namespace{ + Name: f.policy.DestNamespace, + Metadata: metadata, + }); err != nil { + f.markExecutionFailure(err) + return err + } + + log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID) + return nil +} + +func (f *flow) schedule() error { + dstResources := []*model.Resource{} + for _, srcResource := range f.resources { + dstResource := &model.Resource{ + Type: srcResource.Type, + Metadata: &model.ResourceMetadata{ + Name: srcResource.Metadata.Name, + Namespace: f.policy.DestNamespace, + Vtags: srcResource.Metadata.Vtags, + }, + Registry: f.dstRegistry, + ExtendedInfo: srcResource.ExtendedInfo, + Deleted: srcResource.Deleted, + Override: f.policy.Override, + } + dstResources = append(dstResources, dstResource) + } + + tasks, err := f.scheduler.Schedule(f.resources, dstResources) + if err != nil { + f.markExecutionFailure(err) + return err + } + + allFailed := true + for _, task := range tasks { + if task.Status != model.TaskStatusFailed { + allFailed = false + } + task.ExecutionID = f.executionID + taskID, err := f.executionMgr.CreateTask(task) + if err != nil { + f.markExecutionFailure(err) + return err + } + log.Debugf("task record %d for execution %d created", taskID, f.executionID) + } + // if all the tasks are failed, mark the execution failed + if allFailed { + f.markExecutionFailure(err) + } + + return nil +} + +func (f *flow) markExecutionFailure(err error) { + statusText := "" + if err != nil { + statusText = err.Error() + } + err = f.executionMgr.Update( + &model.Execution{ + ID: f.executionID, + Status: model.ExecutionStatusFailed, + StatusText: statusText, + EndTime: time.Now(), + }) + if err != nil { + log.Errorf("failed to update the execution %d: %v", f.executionID, err) + } +} diff --git a/src/replication/ng/model/execution.go b/src/replication/ng/model/execution.go index 2756dd05b..a9d6f612f 100644 --- a/src/replication/ng/model/execution.go +++ b/src/replication/ng/model/execution.go @@ -14,17 +14,43 @@ package model -import "time" +import ( + "time" + + "github.com/goharbor/harbor/src/common/models" +) + +// execution/task status/trigger const +const ( + ExecutionStatusFailed string = "Failed" + ExecutionStatusSucceed string = "Succeed" + ExecutionStatusStopped string = "Stopped" + ExecutionStatusInProgress string = "InProgress" + + ExecutionTriggerManual string = "Manual" + ExecutionTriggerEvent string = "Event" + ExecutionTriggerSchedule string = "Schedule" + + TaskStatusFailed string = "Failed" + TaskStatusSucceed string = "Succeed" + TaskStatusStopped string = "Stopped" + TaskStatusInProgress string = "InProgress" + TaskStatusPending string = "Pending" +) // Execution defines an execution of the replication type Execution struct { ID int64 `json:"id"` PolicyID int64 `json:"policy_id"` + Status string `json:"status"` + StatusText string `json:"status_text"` + Trigger string `json:"trigger"` Total int `json:"total"` Failed int `json:"failed"` Succeed int `json:"succeed"` Pending int `json:"pending"` InProgress int `json:"in_progress"` + Stopped int `json:"stopped"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` } @@ -41,3 +67,19 @@ type Task struct { StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` } + +// ExecutionQuery defines the query conditions for listing executions +type ExecutionQuery struct { + PolicyID int64 + Status string + Trigger string + models.Pagination +} + +// TaskQuery defines the query conditions for listing tasks +type TaskQuery struct { + ExecutionID int64 + ResourceType ResourceType + Status string + models.Pagination +} diff --git a/src/replication/ng/model/namespace.go b/src/replication/ng/model/namespace.go index 6473d8e3a..9d55996b2 100644 --- a/src/replication/ng/model/namespace.go +++ b/src/replication/ng/model/namespace.go @@ -21,3 +21,8 @@ type Namespace struct { Name string `json:"name"` Metadata map[string]interface{} `json:"metadata"` } + +// NamespaceQuery defines the query condition for listing namespaces +type NamespaceQuery struct { + Name string +} diff --git a/src/replication/ng/model/policy.go b/src/replication/ng/model/policy.go index 622206394..1cd699beb 100644 --- a/src/replication/ng/model/policy.go +++ b/src/replication/ng/model/policy.go @@ -14,7 +14,11 @@ package model -import "time" +import ( + "time" + + "github.com/goharbor/harbor/src/common/models" +) // Policy defines the structure of a replication policy type Policy struct { @@ -68,3 +72,12 @@ type Trigger struct { type TriggerSettings struct { Cron string `json:"cron"` } + +// PolicyQuery defines the query conditions for listing policies +type PolicyQuery struct { + Name string + // TODO: need to consider how to support listing the policies + // of one namespace in both pull and push modes + Namespace string + models.Pagination +} diff --git a/src/replication/ng/model/registry.go b/src/replication/ng/model/registry.go index 383bcebf3..7ce36b9b5 100644 --- a/src/replication/ng/model/registry.go +++ b/src/replication/ng/model/registry.go @@ -14,9 +14,18 @@ package model +import ( + "github.com/goharbor/harbor/src/common/models" +) + // RegistryType indicates the type of registry type RegistryType string +// Valid indicates whether the RegistryType is a valid value +func (r RegistryType) Valid() bool { + return len(r) > 0 +} + // CredentialType represents the supported credential types // e.g: u/p, OAuth token type CredentialType string @@ -44,3 +53,9 @@ type Registry struct { Insecure bool `json:"insecure"` Status string `json:"status"` } + +// RegistryQuery defines the query conditions for listing registries +type RegistryQuery struct { + Name string + models.Pagination +} diff --git a/src/replication/ng/model/resource.go b/src/replication/ng/model/resource.go index b0c9879a3..b75d1a964 100644 --- a/src/replication/ng/model/resource.go +++ b/src/replication/ng/model/resource.go @@ -23,12 +23,17 @@ const ( // ResourceType represents the type of the resource type ResourceType string +// Valid indicates whether the ResourceType is a valid value +func (r ResourceType) Valid() bool { + return len(r) > 0 +} + // ResourceMetadata of resource type ResourceMetadata struct { - Namespace *Namespace `json:"namespace"` - Name string `json:"name"` - Vtags []string `json:"v_tags"` - Labels []string `json:"labels"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Vtags []string `json:"v_tags"` + Labels []string `json:"labels"` } // Resource represents the general replicating content @@ -40,4 +45,6 @@ type Resource struct { ExtendedInfo map[string]interface{} `json:"extended_info"` // Indicate if the resource is a deleted resource Deleted bool `json:"deleted"` + // indicate whether the resource can be overridden + Override bool `json:"override"` } diff --git a/src/replication/ng/policy/manager.go b/src/replication/ng/policy/manager.go new file mode 100644 index 000000000..1608861d0 --- /dev/null +++ b/src/replication/ng/policy/manager.go @@ -0,0 +1,34 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policy + +import ( + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Manager manages replication policies +type Manager interface { + // Create new policy + Create(*model.Policy) (int64, error) + // List the policies, returns the total count, policy list and error + List(...*model.PolicyQuery) (int64, []*model.Policy, error) + // Get policy with specified ID + Get(int64) (*model.Policy, error) + // Update the specified policy, the "props" are the properties of policy + // that need to be updated + Update(policy *model.Policy, props ...string) error + // Remove the specified policy + Remove(int64) error +} diff --git a/src/replication/ng/registry/manager.go b/src/replication/ng/registry/manager.go new file mode 100644 index 000000000..dd53aaf09 --- /dev/null +++ b/src/replication/ng/registry/manager.go @@ -0,0 +1,34 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Manager manages registries +type Manager interface { + // Add new registry + Add(*model.Registry) (int64, error) + // List registries, returns total count, registry list and error + List(...*model.RegistryQuery) (int64, []*model.Registry, error) + // Get the specified registry + Get(int64) (*model.Registry, error) + // Update the registry, the "props" are the properties of registry + // that need to be updated + Update(registry *model.Registry, props ...string) error + // Remove the registry with the specified ID + Remove(int64) error +} diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go new file mode 100644 index 000000000..15bbee95a --- /dev/null +++ b/src/replication/ng/scheduler/scheduler.go @@ -0,0 +1,27 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Scheduler schedules tasks to transfer resource data +type Scheduler interface { + // Schedule tasks for one execution + Schedule([]*model.Resource, []*model.Resource) ([]*model.Task, error) + // Stop the task specified by ID + Stop(id string) error +} diff --git a/src/replication/ng/transfer/transfer.go b/src/replication/ng/transfer/transfer.go index ebf940b21..db5161d48 100644 --- a/src/replication/ng/transfer/transfer.go +++ b/src/replication/ng/transfer/transfer.go @@ -63,14 +63,14 @@ type CancelFunc func() bool // RegisterFactory registers one transfer factory to the registry func RegisterFactory(name model.ResourceType, factory Factory) error { - if len(name) == 0 { - return errors.New("empty transfer name") + if !name.Valid() { + return errors.New("invalid resource transfer factory name") } if factory == nil { - return errors.New("empty transfer factory") + return errors.New("empty resource transfer factory") } if _, exist := registry[name]; exist { - return fmt.Errorf("transfer factory for %s already exists", name) + return fmt.Errorf("resource transfer factory for %s already exists", name) } registry[name] = factory