diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 45af40ef6..0ed3c5a4b 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -113,7 +113,7 @@ func (r *ReplicationOperationAPI) CreateExecution() { return } - executionID, err := ng.OperationCtl.StartReplication(policy) + executionID, err := ng.OperationCtl.StartReplication(policy, nil) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err)) return diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index e9ad45e55..3afe60fba 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -25,7 +25,7 @@ import ( type fakedOperationController struct{} -func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64, error) { +func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { return 1, nil } func (f *fakedOperationController) StopReplication(int64) error { diff --git a/src/replication/ng/config/config.go b/src/replication/ng/config/config.go new file mode 100644 index 000000000..00a9dad9f --- /dev/null +++ b/src/replication/ng/config/config.go @@ -0,0 +1,30 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +var ( + // Config is the configuration + Config *Configuration +) + +// Configuration holds the configuration information for replication +type Configuration struct { + CoreURL string + RegistryURL string + TokenServiceURL string + JobserviceURL string + SecretKey string + Secret string +} diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go index dfd6fe9e9..6304268c5 100644 --- a/src/replication/ng/dao/models/execution.go +++ b/src/replication/ng/dao/models/execution.go @@ -106,6 +106,7 @@ type TaskFieldsName struct { } // Task represent the tasks in one execution. +// TODO add operation property type Task struct { ID int64 `orm:"pk;auto;column(id)" json:"id"` ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` diff --git a/src/replication/ng/flow/controller.go b/src/replication/ng/flow/controller.go index 49240249b..a96856e41 100644 --- a/src/replication/ng/flow/controller.go +++ b/src/replication/ng/flow/controller.go @@ -14,105 +14,23 @@ package flow -import ( - "fmt" +// Flow defines replication flow +type Flow interface { + Run(interface{}) error +} - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/replication/ng/model" - - "github.com/goharbor/harbor/src/replication/ng/execution" - "github.com/goharbor/harbor/src/replication/ng/registry" - "github.com/goharbor/harbor/src/replication/ng/scheduler" -) - -// Controller controls the replication flow +// Controller is the controller that controls the replication flows type Controller interface { - // Start a replication according to the policy and returns the - // execution ID and error - StartReplication(policy *model.Policy) (int64, error) - // Stop the replication specified by the execution ID - StopReplication(int64) error + Start(Flow) error } -// NewController returns an instance of a Controller -func NewController(registryMgr registry.Manager, - executionMgr execution.Manager, scheduler scheduler.Scheduler) (Controller, error) { - if registryMgr == nil || executionMgr == nil || scheduler == nil { - // TODO(ChenDe): Uncomment it when execution manager is ready - // return nil, errors.New("invalid params") - } - return &defaultController{ - registryMgr: registryMgr, - executionMgr: executionMgr, - scheduler: scheduler, - }, nil +// NewController returns an instance of the default flow controller +func NewController() Controller { + return &controller{} } -// defaultController is the default implement for the Controller -type defaultController struct { - registryMgr registry.Manager - executionMgr execution.Manager - scheduler scheduler.Scheduler -} - -// Start a replication according to the policy -func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) { - log.Infof("starting the replication based on the policy %d ...", policy.ID) - - flow, err := newFlow(policy, d.registryMgr, d.executionMgr, d.scheduler) - if err != nil { - return 0, fmt.Errorf("failed to create the flow object based on policy %d: %v", policy.ID, err) - } - - // create the execution record - id, err := flow.createExecution() - if err != nil { - return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policy.ID, err) - } - - // fetch resources from the source registry - if err := flow.fetchResources(); err != nil { - // just log the error message and return the execution ID - log.Errorf("failed to fetch resources for the execution %d: %v", id, err) - return id, nil - } - - // if no resources need to be replicated, mark the execution success and return - if len(flow.srcResources) == 0 { - flow.markExecutionSuccess("no resources need to be replicated") - log.Infof("no resources need to be replicated for the execution %d, skip", id) - return id, nil - } - - // create the namespace on the destination registry - if err = flow.createNamespace(); err != nil { - log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err) - return id, nil - } - - // preprocess the resources - if err = flow.preprocess(); err != nil { - log.Errorf("failed to preprocess the resources for the execution %d: %v", id, err) - return id, nil - } - - // create task records in database - if err = flow.createTasks(); err != nil { - log.Errorf("failed to create task records for the execution %d: %v", id, err) - return id, nil - } - - // schedule the tasks - if err = flow.schedule(); err != nil { - log.Errorf("failed to schedule the execution %d: %v", id, err) - return id, nil - } - - log.Infof("the execution %d scheduled", id) - return id, nil -} - -func (d *defaultController) StopReplication(id int64) error { - // TODO - return nil +type controller struct{} + +func (c *controller) Start(flow Flow) error { + return flow.Run(nil) } diff --git a/src/replication/ng/flow/controller_test.go b/src/replication/ng/flow/controller_test.go index 6f20391a7..f08f90af7 100644 --- a/src/replication/ng/flow/controller_test.go +++ b/src/replication/ng/flow/controller_test.go @@ -15,248 +15,20 @@ package flow import ( - "io" "testing" - "github.com/docker/distribution" - "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/replication/ng/adapter" - "github.com/goharbor/harbor/src/replication/ng/dao/models" - "github.com/goharbor/harbor/src/replication/ng/model" - "github.com/goharbor/harbor/src/replication/ng/scheduler" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type fakedPolicyManager struct{} +type fakedFlow struct{} -func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) { - return 0, nil -} -func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) { - return 0, nil, nil -} -func (f *fakedPolicyManager) Get(int64) (*model.Policy, error) { - return &model.Policy{ - ID: 1, - SrcRegistryID: 1, - SrcNamespaces: []string{"library"}, - DestRegistryID: 2, - }, nil -} -func (f *fakedPolicyManager) Update(*model.Policy) error { - return nil -} -func (f *fakedPolicyManager) Remove(int64) error { +func (f *fakedFlow) Run(interface{}) error { return nil } -type fakedRegistryManager struct{} - -func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) { - return 0, nil -} -func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) { - return 0, nil, nil -} -func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) { - if id == 1 { - return &model.Registry{ - Type: "faked_registry", - }, nil - } - if id == 2 { - return &model.Registry{ - Type: "faked_registry", - }, nil - } - return nil, nil -} -func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) { - return nil, nil -} -func (f *fakedRegistryManager) Update(*model.Registry, ...string) error { - return nil -} -func (f *fakedRegistryManager) Remove(int64) error { - return nil -} -func (f *fakedRegistryManager) HealthCheck() error { - return nil -} - -type fakedExecutionManager struct { - taskID int64 -} - -func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { - return 1, nil -} -func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { - return 0, nil, nil -} -func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { - return nil, nil -} -func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { - return nil -} -func (f *fakedExecutionManager) Remove(int64) error { - return nil -} -func (f *fakedExecutionManager) RemoveAll(int64) error { - return nil -} -func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { - f.taskID++ - id := f.taskID - return id, nil -} -func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { - return 0, nil, nil -} -func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { - return nil, nil -} -func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { - return nil -} -func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { - return nil -} -func (f *fakedExecutionManager) RemoveTask(int64) error { - return nil -} -func (f *fakedExecutionManager) RemoveAllTasks(int64) error { - return nil -} -func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { - return nil, nil -} - -type fakedScheduler struct{} - -func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) { - items := []*scheduler.ScheduleItem{} - for i, res := range src { - items = append(items, &scheduler.ScheduleItem{ - SrcResource: res, - DstResource: dst[i], - }) - } - return items, nil -} -func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) { - results := []*scheduler.ScheduleResult{} - for _, item := range items { - results = append(results, &scheduler.ScheduleResult{ - TaskID: item.TaskID, - Error: nil, - }) - } - return results, nil -} -func (f *fakedScheduler) Stop(id string) error { - return nil -} - -func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) { - return &fakedAdapter{}, nil -} - -type fakedAdapter struct{} - -func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) { - return nil, nil -} -func (f *fakedAdapter) CreateNamespace(*model.Namespace) error { - return nil -} -func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) { - return &model.Namespace{}, nil -} -func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) { - return []*model.Resource{ - { - Type: model.ResourceTypeRepository, - Metadata: &model.ResourceMetadata{ - Name: "library/hello-world", - Namespace: "library", - Vtags: []string{"latest"}, - }, - Override: false, - }, - }, nil -} - -func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) { - return false, "", nil -} -func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) { - return nil, "", nil -} -func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error { - return nil -} -func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) { - return false, nil -} -func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { - return 0, nil, nil -} -func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error { - return nil -} -func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) { - return []*model.Resource{ - { - Type: model.ResourceTypeChart, - Metadata: &model.ResourceMetadata{ - Name: "library/harbor", - Namespace: "library", - Vtags: []string{"0.2.0"}, - }, - }, - }, nil -} -func (f *fakedAdapter) ChartExist(name, version string) (bool, error) { - return false, nil -} -func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) { - return nil, nil -} -func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error { - return nil -} -func (f *fakedAdapter) DeleteChart(name, version string) error { - return nil -} - -func TestStartReplication(t *testing.T) { - config.InitWithSettings(nil) - err := adapter.RegisterFactory( - &adapter.Info{ - Type: "faked_registry", - SupportedResourceTypes: []model.ResourceType{ - model.ResourceTypeRepository, - model.ResourceTypeChart, - }, - SupportedTriggers: []model.TriggerType{model.TriggerTypeManual}, - }, fakedAdapterFactory) +func TestStart(t *testing.T) { + flow := &fakedFlow{} + controller := NewController() + err := controller.Start(flow) require.Nil(t, err) - - controller, _ := NewController( - &fakedRegistryManager{}, - &fakedExecutionManager{}, - &fakedScheduler{}) - - policy := &model.Policy{ - ID: 1, - SrcRegistryID: 1, - DestRegistryID: 2, - DestNamespace: "library", - } - id, err := controller.StartReplication(policy) - require.Nil(t, err) - assert.Equal(t, id, int64(1)) } diff --git a/src/replication/ng/flow/copy.go b/src/replication/ng/flow/copy.go new file mode 100644 index 000000000..979a99668 --- /dev/null +++ b/src/replication/ng/flow/copy.go @@ -0,0 +1,95 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "time" + + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" +) + +type copyFlow struct { + executionID int64 + policy *model.Policy + executionMgr execution.Manager + registryMgr registry.Manager + scheduler scheduler.Scheduler +} + +// NewCopyFlow returns an instance of the copy flow which replicates the resources from +// the source registry to the destination registry +func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager, + scheduler scheduler.Scheduler, executionID int64, policy *model.Policy) Flow { + return ©Flow{ + executionMgr: executionMgr, + registryMgr: registryMgr, + scheduler: scheduler, + executionID: executionID, + policy: policy, + } +} + +func (c *copyFlow) Run(interface{}) error { + srcRegistry, dstRegistry, srcAdapter, dstAdapter, err := initialize(c.registryMgr, c.policy) + if err != nil { + return err + } + // TODO after refactoring the adapter register, the "srcRegistry.Type" is not needed + srcResources, err := fetchResources(srcAdapter, srcRegistry.Type, c.policy) + if err != nil { + return err + } + if len(srcResources) == 0 { + markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated") + log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID) + return nil + } + dstNamespaces, err := assembleDestinationNamespaces(srcAdapter, srcResources, c.policy.DestNamespace) + if err != nil { + return err + } + if err = createNamespaces(dstAdapter, dstNamespaces); err != nil { + return err + } + dstResources := assembleDestinationResources(srcResources, dstRegistry, c.policy.DestNamespace, c.policy.Override) + items, err := preprocess(c.scheduler, srcResources, dstResources) + if err != nil { + return err + } + if err = createTasks(c.executionMgr, c.executionID, items); err != nil { + return err + } + return schedule(c.scheduler, c.executionMgr, items) +} + +// mark the execution as success in database +func markExecutionSuccess(mgr execution.Manager, id int64, message string) { + err := mgr.Update( + &models.Execution{ + ID: id, + Status: models.ExecutionStatusSucceed, + StatusText: message, + EndTime: time.Now(), + }, "Status", "StatusText", "EndTime") + if err != nil { + log.Errorf("failed to update the execution %d: %v", id, err) + return + } +} diff --git a/src/replication/ng/flow/copy_test.go b/src/replication/ng/flow/copy_test.go new file mode 100644 index 000000000..c1584dccc --- /dev/null +++ b/src/replication/ng/flow/copy_test.go @@ -0,0 +1,28 @@ +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "testing" + + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/stretchr/testify/require" +) + +func TestRunOfCopyFlow(t *testing.T) { + scheduler := &fakedScheduler{} + executionMgr := &fakedExecutionManager{} + registryMgr := &fakedRegistryManager{} + policy := &model.Policy{} + flow := NewCopyFlow(executionMgr, registryMgr, scheduler, 1, policy) + err := flow.Run(nil) + require.Nil(t, err) +} diff --git a/src/replication/ng/flow/deletion.go b/src/replication/ng/flow/deletion.go new file mode 100644 index 000000000..04751b10f --- /dev/null +++ b/src/replication/ng/flow/deletion.go @@ -0,0 +1,76 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" +) + +type deletionFlow struct { + executionID int64 + policy *model.Policy + executionMgr execution.Manager + registryMgr registry.Manager + scheduler scheduler.Scheduler + resources []*model.Resource +} + +// NewDeletionFlow returns an instance of the delete flow which deletes the resources +// on the destination registry +func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manager, + scheduler scheduler.Scheduler, executionID int64, policy *model.Policy, + resources []*model.Resource) Flow { + return &deletionFlow{ + executionMgr: executionMgr, + registryMgr: registryMgr, + scheduler: scheduler, + executionID: executionID, + policy: policy, + resources: resources, + } +} + +func (d *deletionFlow) Run(interface{}) error { + srcRegistry, dstRegistry, _, _, err := initialize(d.registryMgr, d.policy) + if err != nil { + return err + } + // filling the registry information + for _, resource := range d.resources { + resource.Registry = srcRegistry + } + srcResources, err := filterResources(d.resources, d.policy.Filters) + if err != nil { + return err + } + if len(srcResources) == 0 { + markExecutionSuccess(d.executionMgr, d.executionID, "no resources need to be replicated") + log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID) + return nil + } + dstResources := assembleDestinationResources(srcResources, dstRegistry, d.policy.DestNamespace, d.policy.Override) + items, err := preprocess(d.scheduler, srcResources, dstResources) + if err != nil { + return err + } + if err = createTasks(d.executionMgr, d.executionID, items); err != nil { + return err + } + return schedule(d.scheduler, d.executionMgr, items) +} diff --git a/src/replication/ng/flow/deletion_test.go b/src/replication/ng/flow/deletion_test.go new file mode 100644 index 000000000..8c8d9a13d --- /dev/null +++ b/src/replication/ng/flow/deletion_test.go @@ -0,0 +1,33 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "testing" + + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/stretchr/testify/require" +) + +func TestRunOfDeletionFlow(t *testing.T) { + scheduler := &fakedScheduler{} + executionMgr := &fakedExecutionManager{} + registryMgr := &fakedRegistryManager{} + policy := &model.Policy{} + resources := []*model.Resource{} + flow := NewDeletionFlow(executionMgr, registryMgr, scheduler, 1, policy, resources) + err := flow.Run(nil) + require.Nil(t, err) +} diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go deleted file mode 100644 index 7503e4f8a..000000000 --- a/src/replication/ng/flow/flow.go +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package flow - -import ( - "errors" - "fmt" - "strings" - "time" - - "github.com/goharbor/harbor/src/replication/ng/scheduler" - - "github.com/goharbor/harbor/src/replication/ng/execution" - - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/replication/ng/adapter" - "github.com/goharbor/harbor/src/replication/ng/dao/models" - "github.com/goharbor/harbor/src/replication/ng/model" - "github.com/goharbor/harbor/src/replication/ng/registry" -) - -type flow struct { - policy *model.Policy - srcRegistry *model.Registry - dstRegistry *model.Registry - srcAdapter adapter.Adapter - dstAdapter adapter.Adapter - executionID int64 - srcResources []*model.Resource - dstResources []*model.Resource - executionMgr execution.Manager - scheduler scheduler.Scheduler - scheduleItems []*scheduler.ScheduleItem -} - -func newFlow(policy *model.Policy, registryMgr registry.Manager, - executionMgr execution.Manager, scheduler scheduler.Scheduler) (*flow, error) { - - f := &flow{ - policy: policy, - executionMgr: executionMgr, - scheduler: scheduler, - } - - // TODO consider to put registry model in the policy directly rather than just the registry ID? - url, err := config.RegistryURL() - if err != nil { - return nil, fmt.Errorf("failed to get the registry URL: %v", err) - } - registry := &model.Registry{ - Type: model.RegistryTypeHarbor, - Name: "Local", - URL: url, - // TODO use the service account - Credential: &model.Credential{ - Type: model.CredentialTypeBasic, - AccessKey: "admin", - AccessSecret: "Harbor12345", - }, - Insecure: true, - } - - // get source registry - if policy.SrcRegistryID != 0 { - srcRegistry, err := registryMgr.Get(policy.SrcRegistryID) - if err != nil { - return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err) - } - if srcRegistry == nil { - return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID) - } - f.srcRegistry = srcRegistry - } else { - f.srcRegistry = registry - } - - // get destination registry - if policy.DestRegistryID != 0 { - dstRegistry, err := registryMgr.Get(policy.DestRegistryID) - if err != nil { - return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err) - } - if dstRegistry == nil { - return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID) - } - f.dstRegistry = dstRegistry - } else { - f.dstRegistry = registry - } - - // create the source registry adapter - srcFactory, err := adapter.GetFactory(f.srcRegistry.Type) - if err != nil { - return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.srcRegistry.Type, err) - } - srcAdapter, err := srcFactory(f.srcRegistry) - if err != nil { - return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", f.srcRegistry.URL, err) - } - f.srcAdapter = srcAdapter - - // create the destination registry adapter - dstFactory, err := adapter.GetFactory(f.dstRegistry.Type) - if err != nil { - return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.dstRegistry.Type, err) - } - dstAdapter, err := dstFactory(f.dstRegistry) - if err != nil { - return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", f.dstRegistry.URL, err) - } - f.dstAdapter = dstAdapter - - return f, nil -} - -func (f *flow) createExecution() (int64, error) { - id, err := f.executionMgr.Create(&models.Execution{ - PolicyID: f.policy.ID, - Status: models.ExecutionStatusInProgress, - StartTime: time.Now(), - }) - f.executionID = id - log.Debugf("an execution record for replication based on the policy %d created: %d", f.policy.ID, id) - return id, err -} - -func (f *flow) fetchResources() error { - resTypes := []model.ResourceType{} - filters := []*model.Filter{} - for _, filter := range f.policy.Filters { - if filter.Type != model.FilterTypeResource { - filters = append(filters, filter) - continue - } - resTypes = append(resTypes, filter.Value.(model.ResourceType)) - } - if len(resTypes) == 0 { - resTypes = append(resTypes, adapter.GetAdapterInfo(f.srcRegistry.Type).SupportedResourceTypes...) - } - - // TODO consider whether the logic can be refactored by using reflect - srcResources := []*model.Resource{} - for _, typ := range resTypes { - log.Debugf("fetching %s...", typ) - // images - if typ == model.ResourceTypeRepository { - reg, ok := f.srcAdapter.(adapter.ImageRegistry) - if !ok { - err := fmt.Errorf("the adapter doesn't implement the ImageRegistry interface") - f.markExecutionFailure(err) - return err - } - res, err := reg.FetchImages(f.policy.SrcNamespaces, filters) - if err != nil { - f.markExecutionFailure(err) - return err - } - srcResources = append(srcResources, res...) - continue - } - // charts - if typ == model.ResourceTypeChart { - reg, ok := f.srcAdapter.(adapter.ChartRegistry) - if !ok { - err := fmt.Errorf("the adapter doesn't implement the ChartRegistry interface") - f.markExecutionFailure(err) - return err - } - res, err := reg.FetchCharts(f.policy.SrcNamespaces, filters) - if err != nil { - f.markExecutionFailure(err) - return err - } - srcResources = append(srcResources, res...) - continue - } - } - - dstResources := []*model.Resource{} - for _, srcResource := range srcResources { - dstResource := &model.Resource{ - Type: srcResource.Type, - Metadata: &model.ResourceMetadata{ - Name: srcResource.Metadata.Name, - Namespace: srcResource.Metadata.Namespace, - Vtags: srcResource.Metadata.Vtags, - }, - Registry: f.dstRegistry, - ExtendedInfo: srcResource.ExtendedInfo, - Deleted: srcResource.Deleted, - Override: f.policy.Override, - } - // TODO check whether the logic is applied to chart - // if the destination namespace is specified, use the specified one - if len(f.policy.DestNamespace) > 0 { - dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name, - srcResource.Metadata.Namespace, f.policy.DestNamespace, 1) - dstResource.Metadata.Namespace = f.policy.DestNamespace - } - dstResources = append(dstResources, dstResource) - } - - f.srcResources = srcResources - f.dstResources = dstResources - - log.Debugf("resources for the execution %d fetched from the source registry", f.executionID) - return nil -} - -func (f *flow) createNamespace() error { - // Merge the metadata of all source namespaces - // eg: - // We have two source namespaces: - // { - // Name: "source01", - // Metadata: {"public": true} - // } - // and - // { - // Name: "source02", - // Metadata: {"public": false} - // } - // The name of the destination namespace is "destination", - // after merging the metadata, the destination namespace - // looks like this: - // { - // Name: "destination", - // Metadata: { - // "public": { - // "source01": true, - // "source02": false, - // }, - // }, - // } - // TODO merge the metadata of different namespaces - namespaces := []*model.Namespace{} - for i, resource := range f.dstResources { - namespace := &model.Namespace{ - Name: resource.Metadata.Namespace, - } - // get the metadata of the namespace from the source registry - ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace) - if err != nil { - f.markExecutionFailure(err) - return err - } - namespace.Metadata = ns.Metadata - namespaces = append(namespaces, namespace) - } - - for _, namespace := range namespaces { - if err := f.dstAdapter.CreateNamespace(namespace); err != nil { - f.markExecutionFailure(err) - return err - } - - log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID) - } - - return nil -} - -func (f *flow) preprocess() error { - items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources) - if err != nil { - f.markExecutionFailure(err) - return err - } - f.scheduleItems = items - log.Debugf("the preprocess for resources of the execution %d completed", - f.executionID) - return nil -} - -func (f *flow) createTasks() error { - for _, item := range f.scheduleItems { - task := &models.Task{ - ExecutionID: f.executionID, - Status: models.TaskStatusInitialized, - ResourceType: string(item.SrcResource.Type), - SrcResource: getResourceName(item.SrcResource), - DstResource: getResourceName(item.DstResource), - } - id, err := f.executionMgr.CreateTask(task) - if err != nil { - // if failed to create the task for one of the items, - // the whole execution is marked as failure and all - // the items will not be submitted - f.markExecutionFailure(err) - return err - } - - item.TaskID = id - log.Debugf("task record %d for the execution %d created", - id, f.executionID) - } - return nil -} - -func (f *flow) schedule() error { - results, err := f.scheduler.Schedule(f.scheduleItems) - if err != nil { - f.markExecutionFailure(err) - return err - } - - allFailed := true - for _, result := range results { - // if the task is failed to be submitted, update the status of the - // task as failure - if result.Error != nil { - log.Errorf("failed to schedule task %d: %v", result.TaskID, result.Error) - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil { - log.Errorf("failed to update task status %d: %v", result.TaskID, err) - } - continue - } - allFailed = false - // if the task is submitted successfully, update the status, job ID and start time - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil { - log.Errorf("failed to update task status %d: %v", result.TaskID, err) - } - if err = f.executionMgr.UpdateTask(&models.Task{ - ID: result.TaskID, - JobID: result.JobID, - StartTime: time.Now(), - }, "JobID", "StartTime"); err != nil { - log.Errorf("failed to update task %d: %v", result.TaskID, err) - } - log.Debugf("the task %d scheduled", result.TaskID) - } - // if all the tasks are failed, mark the execution failed - if allFailed { - err = errors.New("all tasks are failed") - f.markExecutionFailure(err) - return err - } - return nil -} - -func (f *flow) markExecutionFailure(err error) { - statusText := "" - if err != nil { - statusText = err.Error() - } - log.Errorf("the execution %d is marked as failure because of the error: %s", - f.executionID, statusText) - err = f.executionMgr.Update( - &models.Execution{ - ID: f.executionID, - Status: models.ExecutionStatusFailed, - StatusText: statusText, - EndTime: time.Now(), - }, "Status", "StatusText", "EndTime") - if err != nil { - log.Errorf("failed to update the execution %d: %v", f.executionID, err) - } -} - -func (f *flow) markExecutionSuccess(msg string) { - log.Debugf("the execution %d is marked as success", f.executionID) - err := f.executionMgr.Update( - &models.Execution{ - ID: f.executionID, - Status: models.ExecutionStatusSucceed, - StatusText: msg, - EndTime: time.Now(), - }, "Status", "StatusText", "EndTime") - if err != nil { - log.Errorf("failed to update the execution %d: %v", f.executionID, err) - } -} - -// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" -// if the resource has vtags -func getResourceName(res *model.Resource) string { - if res == nil { - return "" - } - meta := res.Metadata - if meta == nil { - return "" - } - if len(meta.Vtags) == 0 { - return meta.Name - } - return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]" -} diff --git a/src/replication/ng/flow/stage.go b/src/replication/ng/flow/stage.go new file mode 100644 index 000000000..bef8cee56 --- /dev/null +++ b/src/replication/ng/flow/stage.go @@ -0,0 +1,380 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/goharbor/harbor/src/common/utils/log" + adp "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/config" + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" + "github.com/goharbor/harbor/src/replication/ng/util" +) + +// get/create the source registry, destination registry, source adapter and destination adapter +func initialize(mgr registry.Manager, policy *model.Policy) (*model.Registry, *model.Registry, adp.Adapter, adp.Adapter, error) { + var srcRegistry, dstRegistry *model.Registry + var srcAdapter, dstAdapter adp.Adapter + var err error + registry := getLocalRegistry() + // get source registry + if policy.SrcRegistryID != 0 { + srcRegistry, err = mgr.Get(policy.SrcRegistryID) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err) + } + if srcRegistry == nil { + return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID) + } + } else { + srcRegistry = registry + } + + // get destination registry + if policy.DestRegistryID != 0 { + dstRegistry, err = mgr.Get(policy.DestRegistryID) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err) + } + if dstRegistry == nil { + return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.DestRegistryID) + } + } else { + dstRegistry = registry + } + + // create the source registry adapter + srcFactory, err := adp.GetFactory(srcRegistry.Type) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err) + } + srcAdapter, err = srcFactory(srcRegistry) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err) + } + + // create the destination registry adapter + dstFactory, err := adp.GetFactory(dstRegistry.Type) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err) + } + dstAdapter, err = dstFactory(dstRegistry) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err) + } + log.Debug("replication flow initialization completed") + return srcRegistry, dstRegistry, srcAdapter, dstAdapter, nil +} + +// fetch resources from the source registry +func fetchResources(adapter adp.Adapter, adapterType model.RegistryType, policy *model.Policy) ([]*model.Resource, error) { + resTypes := []model.ResourceType{} + filters := []*model.Filter{} + for _, filter := range policy.Filters { + if filter.Type != model.FilterTypeResource { + filters = append(filters, filter) + continue + } + resTypes = append(resTypes, filter.Value.(model.ResourceType)) + } + if len(resTypes) == 0 { + resTypes = append(resTypes, adp.GetAdapterInfo(adapterType).SupportedResourceTypes...) + } + + resources := []*model.Resource{} + // convert the adapter to different interfaces according to its required resource types + for _, typ := range resTypes { + var res []*model.Resource + var err error + if typ == model.ResourceTypeRepository { + // images + reg, ok := adapter.(adp.ImageRegistry) + if !ok { + return nil, fmt.Errorf("the adapter doesn't implement the ImageRegistry interface") + } + res, err = reg.FetchImages(policy.SrcNamespaces, filters) + } else if typ == model.ResourceTypeChart { + // charts + reg, ok := adapter.(adp.ChartRegistry) + if !ok { + return nil, fmt.Errorf("the adapter doesn't implement the ChartRegistry interface") + } + res, err = reg.FetchCharts(policy.SrcNamespaces, filters) + } else { + return nil, fmt.Errorf("unsupported resource type %s", typ) + } + if err != nil { + return nil, fmt.Errorf("failed to fetch %s: %v", typ, err) + } + resources = append(resources, res...) + log.Debugf("fetch %s completed", typ) + } + + log.Debug("fetch resources from the source registry completed") + return resources, nil +} + +// apply the filters to the resources and returns the filtered resources +func filterResources(resources []*model.Resource, filters []*model.Filter) ([]*model.Resource, error) { + res := []*model.Resource{} + for _, resource := range resources { + match := true + for _, filter := range filters { + switch filter.Type { + case model.FilterTypeResource: + resourceType, ok := filter.Value.(string) + if !ok { + return nil, fmt.Errorf("%v is not a valid string", filter.Value) + } + if model.ResourceType(resourceType) != resource.Type { + match = false + break + } + case model.FilterTypeName: + pattern, ok := filter.Value.(string) + if !ok { + return nil, fmt.Errorf("%v is not a valid string", filter.Value) + } + if resource.Metadata == nil { + match = false + break + } + m, err := util.Match(pattern, resource.Metadata.Name) + if err != nil { + return nil, err + } + if !m { + match = false + break + } + case model.FilterTypeVersion: + pattern, ok := filter.Value.(string) + if !ok { + return nil, fmt.Errorf("%v is not a valid string", filter.Value) + } + if resource.Metadata == nil { + match = false + break + } + versions := []string{} + for _, version := range resource.Metadata.Vtags { + m, err := util.Match(pattern, version) + if err != nil { + return nil, err + } + if m { + versions = append(versions, version) + } + } + if len(versions) == 0 { + match = false + break + } + // NOTE: the property "Vtags" of the origin resource struct is overrided here + resource.Metadata.Vtags = versions + case model.FilterTypeLabel: + // TODO add support to label + default: + return nil, fmt.Errorf("unsupportted filter type: %v", filter.Type) + } + } + if match { + res = append(res, resource) + } + } + log.Debug("filter resources completed") + return res, nil +} + +// Assemble the namespaces that need to be created on the destination registry: +// step 1: get the detail information for each of the source namespaces +// step 2: if the destination namespace isn't specified in the policy, then the +// same namespaces with the source will be returned. If it is specified, then +// returns the specified one with the merged metadatas of all source namespaces +func assembleDestinationNamespaces(srcAdapter adp.Adapter, srcResources []*model.Resource, dstNamespace string) ([]*model.Namespace, error) { + namespaces := []*model.Namespace{} + for _, srcResource := range srcResources { + namespace, err := srcAdapter.GetNamespace(srcResource.Metadata.Namespace) + if err != nil { + return nil, err + } + namespaces = append(namespaces, namespace) + } + + if len(dstNamespace) != 0 { + namespaces = []*model.Namespace{ + { + Name: dstNamespace, + // TODO merge the metadata + Metadata: map[string]interface{}{}, + }, + } + } + + log.Debug("assemble the destination namespaces completed") + return namespaces, nil +} + +// create the namespaces on the destination registry +func createNamespaces(adapter adp.Adapter, namespaces []*model.Namespace) error { + for _, namespace := range namespaces { + if err := adapter.CreateNamespace(namespace); err != nil { + return fmt.Errorf("failed to create the namespace %s on the destination registry: %v", namespace.Name, err) + } + log.Debugf("namespace %s created on the destination registry", namespace.Name) + } + return nil +} + +// assemble the destination resources by filling the registry, namespace and override properties +func assembleDestinationResources(resources []*model.Resource, + registry *model.Registry, namespace string, override bool) []*model.Resource { + result := []*model.Resource{} + for _, resource := range resources { + res := &model.Resource{ + Type: resource.Type, + Metadata: &model.ResourceMetadata{ + Name: resource.Metadata.Name, + Namespace: resource.Metadata.Namespace, + Vtags: resource.Metadata.Vtags, + }, + Registry: registry, + ExtendedInfo: resource.ExtendedInfo, + Deleted: resource.Deleted, + Override: override, + } + // if the destination namespace is specified, use the specified one + if len(namespace) > 0 { + res.Metadata.Name = strings.Replace(resource.Metadata.Name, + resource.Metadata.Namespace, namespace, 1) + res.Metadata.Namespace = namespace + } + result = append(result, res) + } + log.Debug("assemble the destination resources completed") + return result +} + +// preprocess +func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*model.Resource) ([]*scheduler.ScheduleItem, error) { + items, err := scheduler.Preprocess(srcResources, dstResources) + if err != nil { + return nil, fmt.Errorf("failed to preprocess the resources: %v", err) + } + log.Debug("preprocess the resources completed") + return items, nil +} + +// create task records in database +func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error { + for _, item := range items { + task := &models.Task{ + ExecutionID: executionID, + Status: models.TaskStatusInitialized, + ResourceType: string(item.SrcResource.Type), + SrcResource: getResourceName(item.SrcResource), + DstResource: getResourceName(item.DstResource), + } + id, err := mgr.CreateTask(task) + if err != nil { + // if failed to create the task for one of the items, + // the whole execution is marked as failure and all + // the items will not be submitted + return fmt.Errorf("failed to create task records for the execution %d: %v", executionID, err) + } + + item.TaskID = id + log.Debugf("task record %d for the execution %d created", id, executionID) + } + return nil +} + +// schedule the replication tasks and update the task's status +func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) error { + results, err := scheduler.Schedule(items) + if err != nil { + return fmt.Errorf("failed to schedule the tasks: %v", err) + } + + allFailed := true + for _, result := range results { + // if the task is failed to be submitted, update the status of the + // task as failure + if result.Error != nil { + log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error) + if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil { + log.Errorf("failed to update the task status %d: %v", result.TaskID, err) + } + continue + } + allFailed = false + // if the task is submitted successfully, update the status, job ID and start time + if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil { + log.Errorf("failed to update the task status %d: %v", result.TaskID, err) + } + if err = executionMgr.UpdateTask(&models.Task{ + ID: result.TaskID, + JobID: result.JobID, + StartTime: time.Now(), + }, "JobID", "StartTime"); err != nil { + log.Errorf("failed to update the task %d: %v", result.TaskID, err) + } + log.Debugf("the task %d scheduled", result.TaskID) + } + // if all the tasks are failed, return err + if allFailed { + return errors.New("all tasks are failed") + } + return nil +} + +// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" +// if the resource has vtags +func getResourceName(res *model.Resource) string { + if res == nil { + return "" + } + meta := res.Metadata + if meta == nil { + return "" + } + if len(meta.Vtags) == 0 { + return meta.Name + } + return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]" +} + +func getLocalRegistry() *model.Registry { + return &model.Registry{ + Type: model.RegistryTypeHarbor, + Name: "Local", + URL: config.Config.RegistryURL, + // TODO use the service account + Credential: &model.Credential{ + Type: model.CredentialTypeBasic, + AccessKey: "admin", + AccessSecret: "Harbor12345", + }, + Insecure: true, + } +} diff --git a/src/replication/ng/flow/stage_test.go b/src/replication/ng/flow/stage_test.go new file mode 100644 index 000000000..f673f2f17 --- /dev/null +++ b/src/replication/ng/flow/stage_test.go @@ -0,0 +1,439 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flow + +import ( + "io" + "os" + "testing" + + "github.com/docker/distribution" + + "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/config" + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/scheduler" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakedRegistryManager struct{} + +func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) { + return 0, nil +} +func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) { + return 0, nil, nil +} +func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) { + var registry *model.Registry + switch id { + case 1: + registry = &model.Registry{ + ID: 1, + Type: model.RegistryTypeHarbor, + } + } + return registry, nil +} +func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) { + return nil, nil +} +func (f *fakedRegistryManager) Update(*model.Registry, ...string) error { + return nil +} +func (f *fakedRegistryManager) Remove(int64) error { + return nil +} +func (f *fakedRegistryManager) HealthCheck() error { + return nil +} +func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) { + return &fakedAdapter{}, nil +} + +type fakedAdapter struct{} + +func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) { + return nil, nil +} +func (f *fakedAdapter) CreateNamespace(*model.Namespace) error { + return nil +} +func (f *fakedAdapter) GetNamespace(ns string) (*model.Namespace, error) { + var namespace *model.Namespace + if ns == "library" { + namespace = &model.Namespace{ + Name: "library", + Metadata: map[string]interface{}{ + "public": true, + }, + } + } + return namespace, nil +} +func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) { + return []*model.Resource{ + { + Type: model.ResourceTypeRepository, + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + Override: false, + }, + }, nil +} + +func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) { + return false, "", nil +} +func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) { + return nil, "", nil +} +func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error { + return nil +} +func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) { + return false, nil +} +func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { + return 0, nil, nil +} +func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error { + return nil +} +func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) { + return []*model.Resource{ + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "library/harbor", + Namespace: "library", + Vtags: []string{"0.2.0"}, + }, + }, + }, nil +} +func (f *fakedAdapter) ChartExist(name, version string) (bool, error) { + return false, nil +} +func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) { + return nil, nil +} +func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error { + return nil +} +func (f *fakedAdapter) DeleteChart(name, version string) error { + return nil +} + +type fakedScheduler struct{} + +func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) { + items := []*scheduler.ScheduleItem{} + for i, res := range src { + items = append(items, &scheduler.ScheduleItem{ + SrcResource: res, + DstResource: dst[i], + }) + } + return items, nil +} +func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) { + results := []*scheduler.ScheduleResult{} + for _, item := range items { + results = append(results, &scheduler.ScheduleResult{ + TaskID: item.TaskID, + Error: nil, + }) + } + return results, nil +} +func (f *fakedScheduler) Stop(id string) error { + return nil +} + +type fakedExecutionManager struct { + taskID int64 +} + +func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { + return 1, nil +} +func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 0, nil, nil +} +func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { + return nil, nil +} +func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { + return nil +} +func (f *fakedExecutionManager) Remove(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAll(int64) error { + return nil +} +func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { + f.taskID++ + id := f.taskID + return id, nil +} +func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 0, nil, nil +} +func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { + return nil, nil +} +func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { + return nil +} +func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { + return nil +} +func (f *fakedExecutionManager) RemoveTask(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAllTasks(int64) error { + return nil +} +func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { + return nil, nil +} + +func TestMain(m *testing.M) { + url := "https://registry.harbor.local" + config.Config = &config.Configuration{ + RegistryURL: url, + } + if err := adapter.RegisterFactory( + &adapter.Info{ + Type: model.RegistryTypeHarbor, + SupportedResourceTypes: []model.ResourceType{ + model.ResourceTypeRepository, + model.ResourceTypeChart, + }, + SupportedTriggers: []model.TriggerType{model.TriggerTypeManual}, + }, fakedAdapterFactory); err != nil { + os.Exit(1) + } + os.Exit(m.Run()) +} + +func TestInitialize(t *testing.T) { + url := "https://registry.harbor.local" + registryMgr := &fakedRegistryManager{} + policy := &model.Policy{ + SrcRegistryID: 0, + DestRegistryID: 1, + } + srcRegistry, dstRegistry, _, _, err := initialize(registryMgr, policy) + require.Nil(t, err) + assert.Equal(t, url, srcRegistry.URL) + assert.Equal(t, int64(1), dstRegistry.ID) +} + +func TestFetchResources(t *testing.T) { + adapter := &fakedAdapter{} + policy := &model.Policy{} + resources, err := fetchResources(adapter, model.RegistryTypeHarbor, policy) + require.Nil(t, err) + assert.Equal(t, 2, len(resources)) +} + +func TestFilterResources(t *testing.T) { + resources := []*model.Resource{ + { + Type: model.ResourceTypeRepository, + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + // TODO test labels + Labels: nil, + }, + Deleted: true, + Override: true, + }, + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "library/harbor", + Namespace: "library", + Vtags: []string{"0.2.0", "0.3.0"}, + // TODO test labels + Labels: nil, + }, + Deleted: true, + Override: true, + }, + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "library/mysql", + Namespace: "library", + Vtags: []string{"1.0"}, + // TODO test labels + Labels: nil, + }, + Deleted: true, + Override: true, + }, + } + filters := []*model.Filter{ + { + Type: model.FilterTypeResource, + Value: string(model.ResourceTypeChart), + }, + { + Type: model.FilterTypeName, + Value: "library/*", + }, + { + Type: model.FilterTypeName, + Value: "library/harbor", + }, + { + Type: model.FilterTypeVersion, + Value: "0.2.?", + }, + } + res, err := filterResources(resources, filters) + require.Nil(t, err) + assert.Equal(t, 1, len(res)) + assert.Equal(t, "library/harbor", res[0].Metadata.Name) + assert.Equal(t, 1, len(res[0].Metadata.Vtags)) + assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0]) +} + +func TestAssembleDestinationNamespaces(t *testing.T) { + adapter := &fakedAdapter{} + resources := []*model.Resource{ + { + Metadata: &model.ResourceMetadata{ + Namespace: "library", + }, + }, + } + namespace := "" + ns, err := assembleDestinationNamespaces(adapter, resources, namespace) + require.Nil(t, err) + assert.Equal(t, 1, len(ns)) + assert.Equal(t, "library", ns[0].Name) + assert.Equal(t, true, ns[0].Metadata["public"].(bool)) + + namespace = "test" + ns, err = assembleDestinationNamespaces(adapter, resources, namespace) + require.Nil(t, err) + assert.Equal(t, 1, len(ns)) + assert.Equal(t, "test", ns[0].Name) + // TODO add test for merged metadata + // assert.Equal(t, true, ns[0].Metadata["public"].(bool)) +} + +func TestCreateNamespaces(t *testing.T) { + adapter := &fakedAdapter{} + namespaces := []*model.Namespace{ + { + Name: "library", + }, + } + err := createNamespaces(adapter, namespaces) + require.Nil(t, err) +} + +func TestAssembleDestinationResources(t *testing.T) { + resources := []*model.Resource{ + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + Override: false, + }, + } + registry := &model.Registry{} + namespace := "test" + override := true + res := assembleDestinationResources(resources, registry, namespace, override) + assert.Equal(t, 1, len(res)) + assert.Equal(t, model.ResourceTypeChart, res[0].Type) + assert.Equal(t, "test/hello-world", res[0].Metadata.Name) + assert.Equal(t, namespace, res[0].Metadata.Namespace) + assert.Equal(t, 1, len(res[0].Metadata.Vtags)) + assert.Equal(t, "latest", res[0].Metadata.Vtags[0]) +} + +func TestPreprocess(t *testing.T) { + scheduler := &fakedScheduler{} + srcResources := []*model.Resource{ + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + Override: false, + }, + } + dstResources := []*model.Resource{ + { + Type: model.ResourceTypeChart, + Metadata: &model.ResourceMetadata{ + Name: "test/hello-world", + Namespace: "test", + Vtags: []string{"latest"}, + }, + Override: false, + }, + } + items, err := preprocess(scheduler, srcResources, dstResources) + require.Nil(t, err) + assert.Equal(t, 1, len(items)) +} + +func TestCreateTasks(t *testing.T) { + mgr := &fakedExecutionManager{} + items := []*scheduler.ScheduleItem{ + { + SrcResource: &model.Resource{}, + DstResource: &model.Resource{}, + }, + } + err := createTasks(mgr, 1, items) + require.Nil(t, err) + assert.Equal(t, int64(1), items[0].TaskID) +} + +func TestSchedule(t *testing.T) { + sched := &fakedScheduler{} + mgr := &fakedExecutionManager{} + items := []*scheduler.ScheduleItem{ + { + SrcResource: &model.Resource{}, + DstResource: &model.Resource{}, + TaskID: 1, + }, + } + err := schedule(sched, mgr, items) + require.Nil(t, err) +} diff --git a/src/replication/ng/model/policy.go b/src/replication/ng/model/policy.go index d2ea36887..461bdf826 100644 --- a/src/replication/ng/model/policy.go +++ b/src/replication/ng/model/policy.go @@ -44,11 +44,13 @@ type Policy struct { SrcRegistryID int64 `json:"src_registry_id"` SrcNamespaces []string `json:"src_namespaces"` // destination + // TODO rename to DstRegistryID DestRegistryID int64 `json:"dest_registry_id"` // Only support two dest namespace modes: // Put all the src resources to the one single dest namespace // or keep namespaces same with the source ones (under this case, // the DestNamespace should be set to empty) + // TODO rename to DstNamespace DestNamespace string `json:"dest_namespace"` // Filters Filters []*Filter `json:"filters"` diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 3ad2e12b2..e5b5f4088 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -15,16 +15,22 @@ package operation import ( + "fmt" + "time" + + "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/execution" "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" ) // Controller handles the replication-related operations: start, // stop, query, etc. type Controller interface { - StartReplication(policy *model.Policy) (int64, error) + StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) StopReplication(int64) error ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) GetExecution(int64) (*models.Execution, error) @@ -35,23 +41,76 @@ type Controller interface { } // NewController returns a controller implementation -func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller { +func NewController(executionMgr execution.Manager, registrgMgr registry.Manager, + scheduler scheduler.Scheduler) Controller { return &defaultController{ - flowCtl: flowCtl, executionMgr: executionMgr, + registryMgr: registrgMgr, + scheduler: scheduler, + flowCtl: flow.NewController(), } } type defaultController struct { flowCtl flow.Controller executionMgr execution.Manager + registryMgr registry.Manager + scheduler scheduler.Scheduler } -func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) { - return d.flowCtl.StartReplication(policy) +func (d *defaultController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { + if resource != nil && len(resource.Metadata.Vtags) != 1 { + return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags) + } + + id, err := createExecution(d.executionMgr, policy.ID) + if err != nil { + return 0, err + } + + flow := d.createFlow(id, policy, resource) + if err = d.flowCtl.Start(flow); err != nil { + // mark the execution as failure and log the error message + // no error will be returned as the execution is created successfully + markExecutionFailure(d.executionMgr, id, err.Error()) + } + + return id, nil } + +// create different replication flows according to the input parameters +func (d *defaultController) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow { + // replicate the deletion operation, so create a deletion flow + if resource != nil && resource.Deleted { + return flow.NewDeletionFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy, []*model.Resource{resource}) + + } + // copy only one resource, add extra filters to the policy to make sure + // only the resource will be filtered out + if resource != nil { + filters := []*model.Filter{ + { + Type: model.FilterTypeResource, + Value: resource.Type, + }, + { + Type: model.FilterTypeName, + Value: resource.Metadata.Name, + }, + { + Type: model.FilterTypeVersion, + // only support replicate one tag + Value: resource.Metadata.Vtags[0], + }, + } + filters = append(filters, policy.Filters...) + } + return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy) +} + func (d *defaultController) StopReplication(executionID int64) error { - return d.flowCtl.StopReplication(executionID) + // TODO implement the function + return nil } func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return d.executionMgr.List(query...) @@ -71,3 +130,33 @@ func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCond func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) { return d.executionMgr.GetTaskLog(taskID) } + +// create the execution record in database +func createExecution(mgr execution.Manager, policyID int64) (int64, error) { + id, err := mgr.Create(&models.Execution{ + PolicyID: policyID, + Status: models.ExecutionStatusInProgress, + StartTime: time.Now(), + }) + if err != nil { + return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policyID, err) + } + log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id) + return id, nil +} + +// mark the execution as failure in database +func markExecutionFailure(mgr execution.Manager, id int64, message string) { + err := mgr.Update( + &models.Execution{ + ID: id, + Status: models.ExecutionStatusFailed, + StatusText: message, + EndTime: time.Now(), + }, "Status", "StatusText", "EndTime") + if err != nil { + log.Errorf("failed to update the execution %d: %v", id, err) + return + } + log.Debugf("the execution %d is marked as failure: %s", id, message) +} diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go index 73f48e517..5ada7fd66 100644 --- a/src/replication/ng/operation/controller_test.go +++ b/src/replication/ng/operation/controller_test.go @@ -17,21 +17,14 @@ package operation import ( "testing" + "github.com/goharbor/harbor/src/replication/ng/config" "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/scheduler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type fakedFlowController struct{} - -func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) { - return 1, nil -} -func (f *fakedFlowController) StopReplication(int64) error { - return nil -} - type fakedExecutionManager struct{} func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { @@ -69,7 +62,9 @@ func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*model }, nil } func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { - return nil, nil + return &models.Task{ + ID: 1, + }, nil } func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil @@ -87,10 +82,95 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { return []byte("message"), nil } -var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{}) +type fakedRegistryManager struct{} + +func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) { + return 0, nil +} +func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) { + return 0, nil, nil +} +func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) { + var registry *model.Registry + switch id { + case 1: + registry = &model.Registry{ + ID: 1, + Type: model.RegistryTypeHarbor, + } + } + return registry, nil +} +func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) { + return nil, nil +} +func (f *fakedRegistryManager) Update(*model.Registry, ...string) error { + return nil +} +func (f *fakedRegistryManager) Remove(int64) error { + return nil +} +func (f *fakedRegistryManager) HealthCheck() error { + return nil +} + +type fakedScheduler struct{} + +func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) { + items := []*scheduler.ScheduleItem{} + for i, res := range src { + items = append(items, &scheduler.ScheduleItem{ + SrcResource: res, + DstResource: dst[i], + }) + } + return items, nil +} +func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) { + results := []*scheduler.ScheduleResult{} + for _, item := range items { + results = append(results, &scheduler.ScheduleResult{ + TaskID: item.TaskID, + Error: nil, + }) + } + return results, nil +} +func (f *fakedScheduler) Stop(id string) error { + return nil +} + +var ctl = NewController(&fakedExecutionManager{}, &fakedRegistryManager{}, &fakedScheduler{}) func TestStartReplication(t *testing.T) { - id, err := ctl.StartReplication(nil) + config.Config = &config.Configuration{} + // the resource contains Vtags whose length isn't 1 + policy := &model.Policy{} + resource := &model.Resource{ + Type: model.ResourceTypeRepository, + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Vtags: []string{"1.0", "2.0"}, + }, + } + _, err := ctl.StartReplication(policy, resource) + require.NotNil(t, err) + + // replicate resource deletion + resource.Metadata.Vtags = []string{"1.0"} + resource.Deleted = true + id, err := ctl.StartReplication(policy, resource) + require.Nil(t, err) + assert.Equal(t, int64(1), id) + + // replicate resource copy + resource.Deleted = false + id, err = ctl.StartReplication(policy, resource) + require.Nil(t, err) + assert.Equal(t, int64(1), id) + + // nil resource + id, err = ctl.StartReplication(policy, nil) require.Nil(t, err) assert.Equal(t, int64(1), id) } @@ -120,6 +200,17 @@ func TestListTasks(t *testing.T) { assert.Equal(t, int64(1), tasks[0].ID) } +func TestGetTask(t *testing.T) { + task, err := ctl.GetTask(1) + require.Nil(t, err) + assert.Equal(t, int64(1), task.ID) +} + +func TestUpdateTaskStatus(t *testing.T) { + err := ctl.UpdateTaskStatus(1, "running") + require.Nil(t, err) +} + func TestGetTaskLog(t *testing.T) { log, err := ctl.GetTaskLog(1) require.Nil(t, err) diff --git a/src/replication/ng/registry/manager.go b/src/replication/ng/registry/manager.go index 07f0bf462..a80e5eb17 100644 --- a/src/replication/ng/registry/manager.go +++ b/src/replication/ng/registry/manager.go @@ -22,6 +22,7 @@ import ( "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" + // TODO use the replication config rather than the core "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/dao" "github.com/goharbor/harbor/src/replication/ng/dao/models" diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index 8f12a7577..a1055a5f7 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -17,11 +17,10 @@ package ng import ( - "fmt" - - "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/common/utils/log" + cfg "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/replication/ng/config" "github.com/goharbor/harbor/src/replication/ng/execution" - "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/operation" "github.com/goharbor/harbor/src/replication/ng/policy" "github.com/goharbor/harbor/src/replication/ng/registry" @@ -39,21 +38,32 @@ var ( OperationCtl operation.Controller ) -// Init the global variables +// Init the global variables and configurations func Init() error { + // init config + registryURL, err := cfg.RegistryURL() + if err != nil { + return err + } + secretKey, err := cfg.SecretKey() + if err != nil { + return err + } + config.Config = &config.Configuration{ + CoreURL: cfg.InternalCoreURL(), + RegistryURL: registryURL, + TokenServiceURL: cfg.InternalTokenServiceEndpoint(), + JobserviceURL: cfg.InternalJobServiceURL(), + SecretKey: secretKey, + Secret: cfg.CoreSecret(), + } // Init registry manager RegistryMgr = registry.NewDefaultManager() // init policy manager PolicyMgr = policy.NewDefaultManager() - // init ExecutionMgr - executionMgr := execution.NewDefaultManager() - // init scheduler - scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret()) - - flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler) - if err != nil { - return fmt.Errorf("failed to create the flow controller: %v", err) - } - OperationCtl = operation.NewController(flowCtl, executionMgr) + // init operatoin controller + OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr, + scheduler.NewScheduler(config.Config.JobserviceURL, config.Config.Secret)) + log.Debug("the replication initialization completed") return nil } diff --git a/src/replication/ng/replication_test.go b/src/replication/ng/replication_test.go index 5da81dc8c..93fcbbfea 100644 --- a/src/replication/ng/replication_test.go +++ b/src/replication/ng/replication_test.go @@ -17,15 +17,28 @@ package ng import ( + "io/ioutil" + "os" + "path" "testing" - // "github.com/stretchr/testify/assert" - // "github.com/stretchr/testify/require" + + "github.com/goharbor/harbor/src/core/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInit(t *testing.T) { - // TODO add testing code - // err := Init() - // require.Nil(t, err) - // assert.NotNil(t, OperationCtl) - // TODO add check for RegistryMgr and ExecutionMgr + key := path.Join(os.TempDir(), "key") + err := ioutil.WriteFile(key, []byte{'k'}, os.ModePerm) + require.Nil(t, err) + defer os.Remove(key) + err = os.Setenv("KEY_PATH", key) + require.Nil(t, err) + + config.InitWithSettings(nil) + err = Init() + require.Nil(t, err) + assert.NotNil(t, PolicyMgr) + assert.NotNil(t, RegistryMgr) + assert.NotNil(t, OperationCtl) } diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index 47a93d83f..f5686af83 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -22,8 +22,8 @@ import ( "github.com/goharbor/harbor/src/common/job" common_job "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job/models" - "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/jobservice/opm" + "github.com/goharbor/harbor/src/replication/ng/config" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -101,7 +101,7 @@ func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, e Metadata: &models.JobMetadata{ JobKind: job.JobKindGeneric, }, - StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID), + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.Config.CoreURL, item.TaskID), } job.Name = common_job.Replication diff --git a/src/replication/ng/util/util.go b/src/replication/ng/util/util.go new file mode 100644 index 000000000..b8e7413cd --- /dev/null +++ b/src/replication/ng/util/util.go @@ -0,0 +1,24 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "path/filepath" +) + +// Match returns whether the str matches the pattern +func Match(pattern, str string) (bool, error) { + return filepath.Match(pattern, str) +} diff --git a/src/replication/ng/util/util_test.go b/src/replication/ng/util/util_test.go new file mode 100644 index 000000000..9828b3981 --- /dev/null +++ b/src/replication/ng/util/util_test.go @@ -0,0 +1,77 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMatch(t *testing.T) { + cases := []struct { + pattern string + str string + match bool + }{ + { + pattern: "", + str: "", + match: true, + }, + { + pattern: "1.*", + str: "1.0", + match: true, + }, + { + pattern: "1.*", + str: "1.01", + match: true, + }, + { + pattern: "1.?", + str: "1.0", + match: true, + }, + { + pattern: "1.?", + str: "1.01", + match: false, + }, + + { + pattern: "library/*", + str: "library/hello-world", + match: true, + }, + { + pattern: "lib*", + str: "library/hello-world", + match: false, + }, + { + pattern: "lib*/*", + str: "library/hello-world", + match: true, + }, + } + for _, c := range cases { + match, err := Match(c.pattern, c.str) + require.Nil(t, err) + assert.Equal(t, c.match, match) + } +}