Add support for replicating the delation of resource

This commit refines the replication flows and provides the support for replicating resource deletion

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-03-22 10:54:25 +08:00
parent 224f059993
commit 1120368c9c
22 changed files with 1452 additions and 774 deletions

View File

@ -113,7 +113,7 @@ func (r *ReplicationOperationAPI) CreateExecution() {
return
}
executionID, err := ng.OperationCtl.StartReplication(policy)
executionID, err := ng.OperationCtl.StartReplication(policy, nil)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
return

View File

@ -25,7 +25,7 @@ import (
type fakedOperationController struct{}
func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64, error) {
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
return 1, nil
}
func (f *fakedOperationController) StopReplication(int64) error {

View File

@ -0,0 +1,30 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config
var (
// Config is the configuration
Config *Configuration
)
// Configuration holds the configuration information for replication
type Configuration struct {
CoreURL string
RegistryURL string
TokenServiceURL string
JobserviceURL string
SecretKey string
Secret string
}

View File

@ -106,6 +106,7 @@ type TaskFieldsName struct {
}
// Task represent the tasks in one execution.
// TODO add operation property
type Task struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`

View File

@ -14,105 +14,23 @@
package flow
import (
"fmt"
// Flow defines replication flow
type Flow interface {
Run(interface{}) error
}
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
)
// Controller controls the replication flow
// Controller is the controller that controls the replication flows
type Controller interface {
// Start a replication according to the policy and returns the
// execution ID and error
StartReplication(policy *model.Policy) (int64, error)
// Stop the replication specified by the execution ID
StopReplication(int64) error
Start(Flow) error
}
// NewController returns an instance of a Controller
func NewController(registryMgr registry.Manager,
executionMgr execution.Manager, scheduler scheduler.Scheduler) (Controller, error) {
if registryMgr == nil || executionMgr == nil || scheduler == nil {
// TODO(ChenDe): Uncomment it when execution manager is ready
// return nil, errors.New("invalid params")
}
return &defaultController{
registryMgr: registryMgr,
executionMgr: executionMgr,
scheduler: scheduler,
}, nil
// NewController returns an instance of the default flow controller
func NewController() Controller {
return &controller{}
}
// defaultController is the default implement for the Controller
type defaultController struct {
registryMgr registry.Manager
executionMgr execution.Manager
scheduler scheduler.Scheduler
}
// Start a replication according to the policy
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
log.Infof("starting the replication based on the policy %d ...", policy.ID)
flow, err := newFlow(policy, d.registryMgr, d.executionMgr, d.scheduler)
if err != nil {
return 0, fmt.Errorf("failed to create the flow object based on policy %d: %v", policy.ID, err)
}
// create the execution record
id, err := flow.createExecution()
if err != nil {
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policy.ID, err)
}
// fetch resources from the source registry
if err := flow.fetchResources(); err != nil {
// just log the error message and return the execution ID
log.Errorf("failed to fetch resources for the execution %d: %v", id, err)
return id, nil
}
// if no resources need to be replicated, mark the execution success and return
if len(flow.srcResources) == 0 {
flow.markExecutionSuccess("no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", id)
return id, nil
}
// create the namespace on the destination registry
if err = flow.createNamespace(); err != nil {
log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err)
return id, nil
}
// preprocess the resources
if err = flow.preprocess(); err != nil {
log.Errorf("failed to preprocess the resources for the execution %d: %v", id, err)
return id, nil
}
// create task records in database
if err = flow.createTasks(); err != nil {
log.Errorf("failed to create task records for the execution %d: %v", id, err)
return id, nil
}
// schedule the tasks
if err = flow.schedule(); err != nil {
log.Errorf("failed to schedule the execution %d: %v", id, err)
return id, nil
}
log.Infof("the execution %d scheduled", id)
return id, nil
}
func (d *defaultController) StopReplication(id int64) error {
// TODO
return nil
type controller struct{}
func (c *controller) Start(flow Flow) error {
return flow.Run(nil)
}

View File

@ -15,248 +15,20 @@
package flow
import (
"io"
"testing"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedPolicyManager struct{}
type fakedFlow struct{}
func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) {
return 0, nil
}
func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
return 0, nil, nil
}
func (f *fakedPolicyManager) Get(int64) (*model.Policy, error) {
return &model.Policy{
ID: 1,
SrcRegistryID: 1,
SrcNamespaces: []string{"library"},
DestRegistryID: 2,
}, nil
}
func (f *fakedPolicyManager) Update(*model.Policy) error {
return nil
}
func (f *fakedPolicyManager) Remove(int64) error {
func (f *fakedFlow) Run(interface{}) error {
return nil
}
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
if id == 1 {
return &model.Registry{
Type: "faked_registry",
}, nil
}
if id == 2 {
return &model.Registry{
Type: "faked_registry",
}, nil
}
return nil, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
type fakedExecutionManager struct {
taskID int64
}
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
return nil, nil
}
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
return nil
}
func (f *fakedExecutionManager) Remove(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
f.taskID++
id := f.taskID
return id, nil
}
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
return nil, nil
}
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
return nil
}
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return nil, nil
}
type fakedScheduler struct{}
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
items := []*scheduler.ScheduleItem{}
for i, res := range src {
items = append(items, &scheduler.ScheduleItem{
SrcResource: res,
DstResource: dst[i],
})
}
return items, nil
}
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
results := []*scheduler.ScheduleResult{}
for _, item := range items {
results = append(results, &scheduler.ScheduleResult{
TaskID: item.TaskID,
Error: nil,
})
}
return results, nil
}
func (f *fakedScheduler) Stop(id string) error {
return nil
}
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil
}
type fakedAdapter struct{}
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
return nil, nil
}
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
return nil
}
func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) {
return &model.Namespace{}, nil
}
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}, nil
}
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
}
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
}
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
}
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
}
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0"},
},
},
}, nil
}
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
return false, nil
}
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
return nil
}
func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
func TestStartReplication(t *testing.T) {
config.InitWithSettings(nil)
err := adapter.RegisterFactory(
&adapter.Info{
Type: "faked_registry",
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
model.ResourceTypeChart,
},
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
}, fakedAdapterFactory)
func TestStart(t *testing.T) {
flow := &fakedFlow{}
controller := NewController()
err := controller.Start(flow)
require.Nil(t, err)
controller, _ := NewController(
&fakedRegistryManager{},
&fakedExecutionManager{},
&fakedScheduler{})
policy := &model.Policy{
ID: 1,
SrcRegistryID: 1,
DestRegistryID: 2,
DestNamespace: "library",
}
id, err := controller.StartReplication(policy)
require.Nil(t, err)
assert.Equal(t, id, int64(1))
}

View File

@ -0,0 +1,95 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"time"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
)
type copyFlow struct {
executionID int64
policy *model.Policy
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
}
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry
func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy) Flow {
return &copyFlow{
executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler,
executionID: executionID,
policy: policy,
}
}
func (c *copyFlow) Run(interface{}) error {
srcRegistry, dstRegistry, srcAdapter, dstAdapter, err := initialize(c.registryMgr, c.policy)
if err != nil {
return err
}
// TODO after refactoring the adapter register, the "srcRegistry.Type" is not needed
srcResources, err := fetchResources(srcAdapter, srcRegistry.Type, c.policy)
if err != nil {
return err
}
if len(srcResources) == 0 {
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
return nil
}
dstNamespaces, err := assembleDestinationNamespaces(srcAdapter, srcResources, c.policy.DestNamespace)
if err != nil {
return err
}
if err = createNamespaces(dstAdapter, dstNamespaces); err != nil {
return err
}
dstResources := assembleDestinationResources(srcResources, dstRegistry, c.policy.DestNamespace, c.policy.Override)
items, err := preprocess(c.scheduler, srcResources, dstResources)
if err != nil {
return err
}
if err = createTasks(c.executionMgr, c.executionID, items); err != nil {
return err
}
return schedule(c.scheduler, c.executionMgr, items)
}
// mark the execution as success in database
func markExecutionSuccess(mgr execution.Manager, id int64, message string) {
err := mgr.Update(
&models.Execution{
ID: id,
Status: models.ExecutionStatusSucceed,
StatusText: message,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", id, err)
return
}
}

View File

@ -0,0 +1,28 @@
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/require"
)
func TestRunOfCopyFlow(t *testing.T) {
scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{}
flow := NewCopyFlow(executionMgr, registryMgr, scheduler, 1, policy)
err := flow.Run(nil)
require.Nil(t, err)
}

View File

@ -0,0 +1,76 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
)
type deletionFlow struct {
executionID int64
policy *model.Policy
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
resources []*model.Resource
}
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
// on the destination registry
func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manager,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy,
resources []*model.Resource) Flow {
return &deletionFlow{
executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler,
executionID: executionID,
policy: policy,
resources: resources,
}
}
func (d *deletionFlow) Run(interface{}) error {
srcRegistry, dstRegistry, _, _, err := initialize(d.registryMgr, d.policy)
if err != nil {
return err
}
// filling the registry information
for _, resource := range d.resources {
resource.Registry = srcRegistry
}
srcResources, err := filterResources(d.resources, d.policy.Filters)
if err != nil {
return err
}
if len(srcResources) == 0 {
markExecutionSuccess(d.executionMgr, d.executionID, "no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID)
return nil
}
dstResources := assembleDestinationResources(srcResources, dstRegistry, d.policy.DestNamespace, d.policy.Override)
items, err := preprocess(d.scheduler, srcResources, dstResources)
if err != nil {
return err
}
if err = createTasks(d.executionMgr, d.executionID, items); err != nil {
return err
}
return schedule(d.scheduler, d.executionMgr, items)
}

View File

@ -0,0 +1,33 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/require"
)
func TestRunOfDeletionFlow(t *testing.T) {
scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{}
resources := []*model.Resource{}
flow := NewDeletionFlow(executionMgr, registryMgr, scheduler, 1, policy, resources)
err := flow.Run(nil)
require.Nil(t, err)
}

View File

@ -1,401 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"errors"
"fmt"
"strings"
"time"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
type flow struct {
policy *model.Policy
srcRegistry *model.Registry
dstRegistry *model.Registry
srcAdapter adapter.Adapter
dstAdapter adapter.Adapter
executionID int64
srcResources []*model.Resource
dstResources []*model.Resource
executionMgr execution.Manager
scheduler scheduler.Scheduler
scheduleItems []*scheduler.ScheduleItem
}
func newFlow(policy *model.Policy, registryMgr registry.Manager,
executionMgr execution.Manager, scheduler scheduler.Scheduler) (*flow, error) {
f := &flow{
policy: policy,
executionMgr: executionMgr,
scheduler: scheduler,
}
// TODO consider to put registry model in the policy directly rather than just the registry ID?
url, err := config.RegistryURL()
if err != nil {
return nil, fmt.Errorf("failed to get the registry URL: %v", err)
}
registry := &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: url,
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
// get source registry
if policy.SrcRegistryID != 0 {
srcRegistry, err := registryMgr.Get(policy.SrcRegistryID)
if err != nil {
return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
}
if srcRegistry == nil {
return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
}
f.srcRegistry = srcRegistry
} else {
f.srcRegistry = registry
}
// get destination registry
if policy.DestRegistryID != 0 {
dstRegistry, err := registryMgr.Get(policy.DestRegistryID)
if err != nil {
return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
}
if dstRegistry == nil {
return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
}
f.dstRegistry = dstRegistry
} else {
f.dstRegistry = registry
}
// create the source registry adapter
srcFactory, err := adapter.GetFactory(f.srcRegistry.Type)
if err != nil {
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.srcRegistry.Type, err)
}
srcAdapter, err := srcFactory(f.srcRegistry)
if err != nil {
return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", f.srcRegistry.URL, err)
}
f.srcAdapter = srcAdapter
// create the destination registry adapter
dstFactory, err := adapter.GetFactory(f.dstRegistry.Type)
if err != nil {
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.dstRegistry.Type, err)
}
dstAdapter, err := dstFactory(f.dstRegistry)
if err != nil {
return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", f.dstRegistry.URL, err)
}
f.dstAdapter = dstAdapter
return f, nil
}
func (f *flow) createExecution() (int64, error) {
id, err := f.executionMgr.Create(&models.Execution{
PolicyID: f.policy.ID,
Status: models.ExecutionStatusInProgress,
StartTime: time.Now(),
})
f.executionID = id
log.Debugf("an execution record for replication based on the policy %d created: %d", f.policy.ID, id)
return id, err
}
func (f *flow) fetchResources() error {
resTypes := []model.ResourceType{}
filters := []*model.Filter{}
for _, filter := range f.policy.Filters {
if filter.Type != model.FilterTypeResource {
filters = append(filters, filter)
continue
}
resTypes = append(resTypes, filter.Value.(model.ResourceType))
}
if len(resTypes) == 0 {
resTypes = append(resTypes, adapter.GetAdapterInfo(f.srcRegistry.Type).SupportedResourceTypes...)
}
// TODO consider whether the logic can be refactored by using reflect
srcResources := []*model.Resource{}
for _, typ := range resTypes {
log.Debugf("fetching %s...", typ)
// images
if typ == model.ResourceTypeRepository {
reg, ok := f.srcAdapter.(adapter.ImageRegistry)
if !ok {
err := fmt.Errorf("the adapter doesn't implement the ImageRegistry interface")
f.markExecutionFailure(err)
return err
}
res, err := reg.FetchImages(f.policy.SrcNamespaces, filters)
if err != nil {
f.markExecutionFailure(err)
return err
}
srcResources = append(srcResources, res...)
continue
}
// charts
if typ == model.ResourceTypeChart {
reg, ok := f.srcAdapter.(adapter.ChartRegistry)
if !ok {
err := fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
f.markExecutionFailure(err)
return err
}
res, err := reg.FetchCharts(f.policy.SrcNamespaces, filters)
if err != nil {
f.markExecutionFailure(err)
return err
}
srcResources = append(srcResources, res...)
continue
}
}
dstResources := []*model.Resource{}
for _, srcResource := range srcResources {
dstResource := &model.Resource{
Type: srcResource.Type,
Metadata: &model.ResourceMetadata{
Name: srcResource.Metadata.Name,
Namespace: srcResource.Metadata.Namespace,
Vtags: srcResource.Metadata.Vtags,
},
Registry: f.dstRegistry,
ExtendedInfo: srcResource.ExtendedInfo,
Deleted: srcResource.Deleted,
Override: f.policy.Override,
}
// TODO check whether the logic is applied to chart
// if the destination namespace is specified, use the specified one
if len(f.policy.DestNamespace) > 0 {
dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name,
srcResource.Metadata.Namespace, f.policy.DestNamespace, 1)
dstResource.Metadata.Namespace = f.policy.DestNamespace
}
dstResources = append(dstResources, dstResource)
}
f.srcResources = srcResources
f.dstResources = dstResources
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
return nil
}
func (f *flow) createNamespace() error {
// Merge the metadata of all source namespaces
// eg:
// We have two source namespaces:
// {
// Name: "source01",
// Metadata: {"public": true}
// }
// and
// {
// Name: "source02",
// Metadata: {"public": false}
// }
// The name of the destination namespace is "destination",
// after merging the metadata, the destination namespace
// looks like this:
// {
// Name: "destination",
// Metadata: {
// "public": {
// "source01": true,
// "source02": false,
// },
// },
// }
// TODO merge the metadata of different namespaces
namespaces := []*model.Namespace{}
for i, resource := range f.dstResources {
namespace := &model.Namespace{
Name: resource.Metadata.Namespace,
}
// get the metadata of the namespace from the source registry
ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace)
if err != nil {
f.markExecutionFailure(err)
return err
}
namespace.Metadata = ns.Metadata
namespaces = append(namespaces, namespace)
}
for _, namespace := range namespaces {
if err := f.dstAdapter.CreateNamespace(namespace); err != nil {
f.markExecutionFailure(err)
return err
}
log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID)
}
return nil
}
func (f *flow) preprocess() error {
items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources)
if err != nil {
f.markExecutionFailure(err)
return err
}
f.scheduleItems = items
log.Debugf("the preprocess for resources of the execution %d completed",
f.executionID)
return nil
}
func (f *flow) createTasks() error {
for _, item := range f.scheduleItems {
task := &models.Task{
ExecutionID: f.executionID,
Status: models.TaskStatusInitialized,
ResourceType: string(item.SrcResource.Type),
SrcResource: getResourceName(item.SrcResource),
DstResource: getResourceName(item.DstResource),
}
id, err := f.executionMgr.CreateTask(task)
if err != nil {
// if failed to create the task for one of the items,
// the whole execution is marked as failure and all
// the items will not be submitted
f.markExecutionFailure(err)
return err
}
item.TaskID = id
log.Debugf("task record %d for the execution %d created",
id, f.executionID)
}
return nil
}
func (f *flow) schedule() error {
results, err := f.scheduler.Schedule(f.scheduleItems)
if err != nil {
f.markExecutionFailure(err)
return err
}
allFailed := true
for _, result := range results {
// if the task is failed to be submitted, update the status of the
// task as failure
if result.Error != nil {
log.Errorf("failed to schedule task %d: %v", result.TaskID, result.Error)
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
}
continue
}
allFailed = false
// if the task is submitted successfully, update the status, job ID and start time
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
}
if err = f.executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(),
}, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update task %d: %v", result.TaskID, err)
}
log.Debugf("the task %d scheduled", result.TaskID)
}
// if all the tasks are failed, mark the execution failed
if allFailed {
err = errors.New("all tasks are failed")
f.markExecutionFailure(err)
return err
}
return nil
}
func (f *flow) markExecutionFailure(err error) {
statusText := ""
if err != nil {
statusText = err.Error()
}
log.Errorf("the execution %d is marked as failure because of the error: %s",
f.executionID, statusText)
err = f.executionMgr.Update(
&models.Execution{
ID: f.executionID,
Status: models.ExecutionStatusFailed,
StatusText: statusText,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
}
}
func (f *flow) markExecutionSuccess(msg string) {
log.Debugf("the execution %d is marked as success", f.executionID)
err := f.executionMgr.Update(
&models.Execution{
ID: f.executionID,
Status: models.ExecutionStatusSucceed,
StatusText: msg,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
}
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
// if the resource has vtags
func getResourceName(res *model.Resource) string {
if res == nil {
return ""
}
meta := res.Metadata
if meta == nil {
return ""
}
if len(meta.Vtags) == 0 {
return meta.Name
}
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
}

View File

@ -0,0 +1,380 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"errors"
"fmt"
"strings"
"time"
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/goharbor/harbor/src/replication/ng/util"
)
// get/create the source registry, destination registry, source adapter and destination adapter
func initialize(mgr registry.Manager, policy *model.Policy) (*model.Registry, *model.Registry, adp.Adapter, adp.Adapter, error) {
var srcRegistry, dstRegistry *model.Registry
var srcAdapter, dstAdapter adp.Adapter
var err error
registry := getLocalRegistry()
// get source registry
if policy.SrcRegistryID != 0 {
srcRegistry, err = mgr.Get(policy.SrcRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
}
if srcRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
}
} else {
srcRegistry = registry
}
// get destination registry
if policy.DestRegistryID != 0 {
dstRegistry, err = mgr.Get(policy.DestRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
}
if dstRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
}
} else {
dstRegistry = registry
}
// create the source registry adapter
srcFactory, err := adp.GetFactory(srcRegistry.Type)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err)
}
srcAdapter, err = srcFactory(srcRegistry)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err)
}
// create the destination registry adapter
dstFactory, err := adp.GetFactory(dstRegistry.Type)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err)
}
dstAdapter, err = dstFactory(dstRegistry)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err)
}
log.Debug("replication flow initialization completed")
return srcRegistry, dstRegistry, srcAdapter, dstAdapter, nil
}
// fetch resources from the source registry
func fetchResources(adapter adp.Adapter, adapterType model.RegistryType, policy *model.Policy) ([]*model.Resource, error) {
resTypes := []model.ResourceType{}
filters := []*model.Filter{}
for _, filter := range policy.Filters {
if filter.Type != model.FilterTypeResource {
filters = append(filters, filter)
continue
}
resTypes = append(resTypes, filter.Value.(model.ResourceType))
}
if len(resTypes) == 0 {
resTypes = append(resTypes, adp.GetAdapterInfo(adapterType).SupportedResourceTypes...)
}
resources := []*model.Resource{}
// convert the adapter to different interfaces according to its required resource types
for _, typ := range resTypes {
var res []*model.Resource
var err error
if typ == model.ResourceTypeRepository {
// images
reg, ok := adapter.(adp.ImageRegistry)
if !ok {
return nil, fmt.Errorf("the adapter doesn't implement the ImageRegistry interface")
}
res, err = reg.FetchImages(policy.SrcNamespaces, filters)
} else if typ == model.ResourceTypeChart {
// charts
reg, ok := adapter.(adp.ChartRegistry)
if !ok {
return nil, fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
}
res, err = reg.FetchCharts(policy.SrcNamespaces, filters)
} else {
return nil, fmt.Errorf("unsupported resource type %s", typ)
}
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %v", typ, err)
}
resources = append(resources, res...)
log.Debugf("fetch %s completed", typ)
}
log.Debug("fetch resources from the source registry completed")
return resources, nil
}
// apply the filters to the resources and returns the filtered resources
func filterResources(resources []*model.Resource, filters []*model.Filter) ([]*model.Resource, error) {
res := []*model.Resource{}
for _, resource := range resources {
match := true
for _, filter := range filters {
switch filter.Type {
case model.FilterTypeResource:
resourceType, ok := filter.Value.(string)
if !ok {
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
}
if model.ResourceType(resourceType) != resource.Type {
match = false
break
}
case model.FilterTypeName:
pattern, ok := filter.Value.(string)
if !ok {
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
}
if resource.Metadata == nil {
match = false
break
}
m, err := util.Match(pattern, resource.Metadata.Name)
if err != nil {
return nil, err
}
if !m {
match = false
break
}
case model.FilterTypeVersion:
pattern, ok := filter.Value.(string)
if !ok {
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
}
if resource.Metadata == nil {
match = false
break
}
versions := []string{}
for _, version := range resource.Metadata.Vtags {
m, err := util.Match(pattern, version)
if err != nil {
return nil, err
}
if m {
versions = append(versions, version)
}
}
if len(versions) == 0 {
match = false
break
}
// NOTE: the property "Vtags" of the origin resource struct is overrided here
resource.Metadata.Vtags = versions
case model.FilterTypeLabel:
// TODO add support to label
default:
return nil, fmt.Errorf("unsupportted filter type: %v", filter.Type)
}
}
if match {
res = append(res, resource)
}
}
log.Debug("filter resources completed")
return res, nil
}
// Assemble the namespaces that need to be created on the destination registry:
// step 1: get the detail information for each of the source namespaces
// step 2: if the destination namespace isn't specified in the policy, then the
// same namespaces with the source will be returned. If it is specified, then
// returns the specified one with the merged metadatas of all source namespaces
func assembleDestinationNamespaces(srcAdapter adp.Adapter, srcResources []*model.Resource, dstNamespace string) ([]*model.Namespace, error) {
namespaces := []*model.Namespace{}
for _, srcResource := range srcResources {
namespace, err := srcAdapter.GetNamespace(srcResource.Metadata.Namespace)
if err != nil {
return nil, err
}
namespaces = append(namespaces, namespace)
}
if len(dstNamespace) != 0 {
namespaces = []*model.Namespace{
{
Name: dstNamespace,
// TODO merge the metadata
Metadata: map[string]interface{}{},
},
}
}
log.Debug("assemble the destination namespaces completed")
return namespaces, nil
}
// create the namespaces on the destination registry
func createNamespaces(adapter adp.Adapter, namespaces []*model.Namespace) error {
for _, namespace := range namespaces {
if err := adapter.CreateNamespace(namespace); err != nil {
return fmt.Errorf("failed to create the namespace %s on the destination registry: %v", namespace.Name, err)
}
log.Debugf("namespace %s created on the destination registry", namespace.Name)
}
return nil
}
// assemble the destination resources by filling the registry, namespace and override properties
func assembleDestinationResources(resources []*model.Resource,
registry *model.Registry, namespace string, override bool) []*model.Resource {
result := []*model.Resource{}
for _, resource := range resources {
res := &model.Resource{
Type: resource.Type,
Metadata: &model.ResourceMetadata{
Name: resource.Metadata.Name,
Namespace: resource.Metadata.Namespace,
Vtags: resource.Metadata.Vtags,
},
Registry: registry,
ExtendedInfo: resource.ExtendedInfo,
Deleted: resource.Deleted,
Override: override,
}
// if the destination namespace is specified, use the specified one
if len(namespace) > 0 {
res.Metadata.Name = strings.Replace(resource.Metadata.Name,
resource.Metadata.Namespace, namespace, 1)
res.Metadata.Namespace = namespace
}
result = append(result, res)
}
log.Debug("assemble the destination resources completed")
return result
}
// preprocess
func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*model.Resource) ([]*scheduler.ScheduleItem, error) {
items, err := scheduler.Preprocess(srcResources, dstResources)
if err != nil {
return nil, fmt.Errorf("failed to preprocess the resources: %v", err)
}
log.Debug("preprocess the resources completed")
return items, nil
}
// create task records in database
func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error {
for _, item := range items {
task := &models.Task{
ExecutionID: executionID,
Status: models.TaskStatusInitialized,
ResourceType: string(item.SrcResource.Type),
SrcResource: getResourceName(item.SrcResource),
DstResource: getResourceName(item.DstResource),
}
id, err := mgr.CreateTask(task)
if err != nil {
// if failed to create the task for one of the items,
// the whole execution is marked as failure and all
// the items will not be submitted
return fmt.Errorf("failed to create task records for the execution %d: %v", executionID, err)
}
item.TaskID = id
log.Debugf("task record %d for the execution %d created", id, executionID)
}
return nil
}
// schedule the replication tasks and update the task's status
func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) error {
results, err := scheduler.Schedule(items)
if err != nil {
return fmt.Errorf("failed to schedule the tasks: %v", err)
}
allFailed := true
for _, result := range results {
// if the task is failed to be submitted, update the status of the
// task as failure
if result.Error != nil {
log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error)
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
continue
}
allFailed = false
// if the task is submitted successfully, update the status, job ID and start time
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
if err = executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(),
}, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update the task %d: %v", result.TaskID, err)
}
log.Debugf("the task %d scheduled", result.TaskID)
}
// if all the tasks are failed, return err
if allFailed {
return errors.New("all tasks are failed")
}
return nil
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
// if the resource has vtags
func getResourceName(res *model.Resource) string {
if res == nil {
return ""
}
meta := res.Metadata
if meta == nil {
return ""
}
if len(meta.Vtags) == 0 {
return meta.Name
}
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
}
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
}

View File

@ -0,0 +1,439 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flow
import (
"io"
"os"
"testing"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil
}
type fakedAdapter struct{}
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
return nil, nil
}
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
return nil
}
func (f *fakedAdapter) GetNamespace(ns string) (*model.Namespace, error) {
var namespace *model.Namespace
if ns == "library" {
namespace = &model.Namespace{
Name: "library",
Metadata: map[string]interface{}{
"public": true,
},
}
}
return namespace, nil
}
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}, nil
}
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
}
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
}
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
}
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
}
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0"},
},
},
}, nil
}
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
return false, nil
}
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
return nil
}
func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
type fakedScheduler struct{}
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
items := []*scheduler.ScheduleItem{}
for i, res := range src {
items = append(items, &scheduler.ScheduleItem{
SrcResource: res,
DstResource: dst[i],
})
}
return items, nil
}
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
results := []*scheduler.ScheduleResult{}
for _, item := range items {
results = append(results, &scheduler.ScheduleResult{
TaskID: item.TaskID,
Error: nil,
})
}
return results, nil
}
func (f *fakedScheduler) Stop(id string) error {
return nil
}
type fakedExecutionManager struct {
taskID int64
}
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
return nil, nil
}
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
return nil
}
func (f *fakedExecutionManager) Remove(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
f.taskID++
id := f.taskID
return id, nil
}
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
return nil, nil
}
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
return nil
}
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return nil, nil
}
func TestMain(m *testing.M) {
url := "https://registry.harbor.local"
config.Config = &config.Configuration{
RegistryURL: url,
}
if err := adapter.RegisterFactory(
&adapter.Info{
Type: model.RegistryTypeHarbor,
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
model.ResourceTypeChart,
},
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
}, fakedAdapterFactory); err != nil {
os.Exit(1)
}
os.Exit(m.Run())
}
func TestInitialize(t *testing.T) {
url := "https://registry.harbor.local"
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{
SrcRegistryID: 0,
DestRegistryID: 1,
}
srcRegistry, dstRegistry, _, _, err := initialize(registryMgr, policy)
require.Nil(t, err)
assert.Equal(t, url, srcRegistry.URL)
assert.Equal(t, int64(1), dstRegistry.ID)
}
func TestFetchResources(t *testing.T) {
adapter := &fakedAdapter{}
policy := &model.Policy{}
resources, err := fetchResources(adapter, model.RegistryTypeHarbor, policy)
require.Nil(t, err)
assert.Equal(t, 2, len(resources))
}
func TestFilterResources(t *testing.T) {
resources := []*model.Resource{
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
// TODO test labels
Labels: nil,
},
Deleted: true,
Override: true,
},
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0", "0.3.0"},
// TODO test labels
Labels: nil,
},
Deleted: true,
Override: true,
},
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/mysql",
Namespace: "library",
Vtags: []string{"1.0"},
// TODO test labels
Labels: nil,
},
Deleted: true,
Override: true,
},
}
filters := []*model.Filter{
{
Type: model.FilterTypeResource,
Value: string(model.ResourceTypeChart),
},
{
Type: model.FilterTypeName,
Value: "library/*",
},
{
Type: model.FilterTypeName,
Value: "library/harbor",
},
{
Type: model.FilterTypeVersion,
Value: "0.2.?",
},
}
res, err := filterResources(resources, filters)
require.Nil(t, err)
assert.Equal(t, 1, len(res))
assert.Equal(t, "library/harbor", res[0].Metadata.Name)
assert.Equal(t, 1, len(res[0].Metadata.Vtags))
assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0])
}
func TestAssembleDestinationNamespaces(t *testing.T) {
adapter := &fakedAdapter{}
resources := []*model.Resource{
{
Metadata: &model.ResourceMetadata{
Namespace: "library",
},
},
}
namespace := ""
ns, err := assembleDestinationNamespaces(adapter, resources, namespace)
require.Nil(t, err)
assert.Equal(t, 1, len(ns))
assert.Equal(t, "library", ns[0].Name)
assert.Equal(t, true, ns[0].Metadata["public"].(bool))
namespace = "test"
ns, err = assembleDestinationNamespaces(adapter, resources, namespace)
require.Nil(t, err)
assert.Equal(t, 1, len(ns))
assert.Equal(t, "test", ns[0].Name)
// TODO add test for merged metadata
// assert.Equal(t, true, ns[0].Metadata["public"].(bool))
}
func TestCreateNamespaces(t *testing.T) {
adapter := &fakedAdapter{}
namespaces := []*model.Namespace{
{
Name: "library",
},
}
err := createNamespaces(adapter, namespaces)
require.Nil(t, err)
}
func TestAssembleDestinationResources(t *testing.T) {
resources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}
registry := &model.Registry{}
namespace := "test"
override := true
res := assembleDestinationResources(resources, registry, namespace, override)
assert.Equal(t, 1, len(res))
assert.Equal(t, model.ResourceTypeChart, res[0].Type)
assert.Equal(t, "test/hello-world", res[0].Metadata.Name)
assert.Equal(t, namespace, res[0].Metadata.Namespace)
assert.Equal(t, 1, len(res[0].Metadata.Vtags))
assert.Equal(t, "latest", res[0].Metadata.Vtags[0])
}
func TestPreprocess(t *testing.T) {
scheduler := &fakedScheduler{}
srcResources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}
dstResources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "test/hello-world",
Namespace: "test",
Vtags: []string{"latest"},
},
Override: false,
},
}
items, err := preprocess(scheduler, srcResources, dstResources)
require.Nil(t, err)
assert.Equal(t, 1, len(items))
}
func TestCreateTasks(t *testing.T) {
mgr := &fakedExecutionManager{}
items := []*scheduler.ScheduleItem{
{
SrcResource: &model.Resource{},
DstResource: &model.Resource{},
},
}
err := createTasks(mgr, 1, items)
require.Nil(t, err)
assert.Equal(t, int64(1), items[0].TaskID)
}
func TestSchedule(t *testing.T) {
sched := &fakedScheduler{}
mgr := &fakedExecutionManager{}
items := []*scheduler.ScheduleItem{
{
SrcResource: &model.Resource{},
DstResource: &model.Resource{},
TaskID: 1,
},
}
err := schedule(sched, mgr, items)
require.Nil(t, err)
}

View File

@ -44,11 +44,13 @@ type Policy struct {
SrcRegistryID int64 `json:"src_registry_id"`
SrcNamespaces []string `json:"src_namespaces"`
// destination
// TODO rename to DstRegistryID
DestRegistryID int64 `json:"dest_registry_id"`
// Only support two dest namespace modes:
// Put all the src resources to the one single dest namespace
// or keep namespaces same with the source ones (under this case,
// the DestNamespace should be set to empty)
// TODO rename to DstNamespace
DestNamespace string `json:"dest_namespace"`
// Filters
Filters []*Filter `json:"filters"`

View File

@ -15,16 +15,22 @@
package operation
import (
"fmt"
"time"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
)
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
StartReplication(policy *model.Policy) (int64, error)
StartReplication(policy *model.Policy, resource *model.Resource) (int64, error)
StopReplication(int64) error
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
GetExecution(int64) (*models.Execution, error)
@ -35,23 +41,76 @@ type Controller interface {
}
// NewController returns a controller implementation
func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller {
func NewController(executionMgr execution.Manager, registrgMgr registry.Manager,
scheduler scheduler.Scheduler) Controller {
return &defaultController{
flowCtl: flowCtl,
executionMgr: executionMgr,
registryMgr: registrgMgr,
scheduler: scheduler,
flowCtl: flow.NewController(),
}
}
type defaultController struct {
flowCtl flow.Controller
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
}
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
return d.flowCtl.StartReplication(policy)
func (d *defaultController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
}
id, err := createExecution(d.executionMgr, policy.ID)
if err != nil {
return 0, err
}
flow := d.createFlow(id, policy, resource)
if err = d.flowCtl.Start(flow); err != nil {
// mark the execution as failure and log the error message
// no error will be returned as the execution is created successfully
markExecutionFailure(d.executionMgr, id, err.Error())
}
return id, nil
}
// create different replication flows according to the input parameters
func (d *defaultController) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy, []*model.Resource{resource})
}
// copy only one resource, add extra filters to the policy to make sure
// only the resource will be filtered out
if resource != nil {
filters := []*model.Filter{
{
Type: model.FilterTypeResource,
Value: resource.Type,
},
{
Type: model.FilterTypeName,
Value: resource.Metadata.Name,
},
{
Type: model.FilterTypeVersion,
// only support replicate one tag
Value: resource.Metadata.Vtags[0],
},
}
filters = append(filters, policy.Filters...)
}
return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy)
}
func (d *defaultController) StopReplication(executionID int64) error {
return d.flowCtl.StopReplication(executionID)
// TODO implement the function
return nil
}
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return d.executionMgr.List(query...)
@ -71,3 +130,33 @@ func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCond
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID)
}
// create the execution record in database
func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
id, err := mgr.Create(&models.Execution{
PolicyID: policyID,
Status: models.ExecutionStatusInProgress,
StartTime: time.Now(),
})
if err != nil {
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policyID, err)
}
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
return id, nil
}
// mark the execution as failure in database
func markExecutionFailure(mgr execution.Manager, id int64, message string) {
err := mgr.Update(
&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: message,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", id, err)
return
}
log.Debugf("the execution %d is marked as failure: %s", id, message)
}

View File

@ -17,21 +17,14 @@ package operation
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedFlowController struct{}
func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) {
return 1, nil
}
func (f *fakedFlowController) StopReplication(int64) error {
return nil
}
type fakedExecutionManager struct{}
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
@ -69,7 +62,9 @@ func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*model
}, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
return nil, nil
return &models.Task{
ID: 1,
}, nil
}
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
return nil
@ -87,10 +82,95 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return []byte("message"), nil
}
var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{})
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
type fakedScheduler struct{}
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
items := []*scheduler.ScheduleItem{}
for i, res := range src {
items = append(items, &scheduler.ScheduleItem{
SrcResource: res,
DstResource: dst[i],
})
}
return items, nil
}
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
results := []*scheduler.ScheduleResult{}
for _, item := range items {
results = append(results, &scheduler.ScheduleResult{
TaskID: item.TaskID,
Error: nil,
})
}
return results, nil
}
func (f *fakedScheduler) Stop(id string) error {
return nil
}
var ctl = NewController(&fakedExecutionManager{}, &fakedRegistryManager{}, &fakedScheduler{})
func TestStartReplication(t *testing.T) {
id, err := ctl.StartReplication(nil)
config.Config = &config.Configuration{}
// the resource contains Vtags whose length isn't 1
policy := &model.Policy{}
resource := &model.Resource{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Vtags: []string{"1.0", "2.0"},
},
}
_, err := ctl.StartReplication(policy, resource)
require.NotNil(t, err)
// replicate resource deletion
resource.Metadata.Vtags = []string{"1.0"}
resource.Deleted = true
id, err := ctl.StartReplication(policy, resource)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// replicate resource copy
resource.Deleted = false
id, err = ctl.StartReplication(policy, resource)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
// nil resource
id, err = ctl.StartReplication(policy, nil)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
}
@ -120,6 +200,17 @@ func TestListTasks(t *testing.T) {
assert.Equal(t, int64(1), tasks[0].ID)
}
func TestGetTask(t *testing.T) {
task, err := ctl.GetTask(1)
require.Nil(t, err)
assert.Equal(t, int64(1), task.ID)
}
func TestUpdateTaskStatus(t *testing.T) {
err := ctl.UpdateTaskStatus(1, "running")
require.Nil(t, err)
}
func TestGetTaskLog(t *testing.T) {
log, err := ctl.GetTaskLog(1)
require.Nil(t, err)

View File

@ -22,6 +22,7 @@ import (
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
// TODO use the replication config rather than the core
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/dao"
"github.com/goharbor/harbor/src/replication/ng/dao/models"

View File

@ -17,11 +17,10 @@
package ng
import (
"fmt"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/common/utils/log"
cfg "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/registry"
@ -39,21 +38,32 @@ var (
OperationCtl operation.Controller
)
// Init the global variables
// Init the global variables and configurations
func Init() error {
// init config
registryURL, err := cfg.RegistryURL()
if err != nil {
return err
}
secretKey, err := cfg.SecretKey()
if err != nil {
return err
}
config.Config = &config.Configuration{
CoreURL: cfg.InternalCoreURL(),
RegistryURL: registryURL,
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
JobserviceURL: cfg.InternalJobServiceURL(),
SecretKey: secretKey,
Secret: cfg.CoreSecret(),
}
// Init registry manager
RegistryMgr = registry.NewDefaultManager()
// init policy manager
PolicyMgr = policy.NewDefaultManager()
// init ExecutionMgr
executionMgr := execution.NewDefaultManager()
// init scheduler
scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret())
flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler)
if err != nil {
return fmt.Errorf("failed to create the flow controller: %v", err)
}
OperationCtl = operation.NewController(flowCtl, executionMgr)
// init operatoin controller
OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr,
scheduler.NewScheduler(config.Config.JobserviceURL, config.Config.Secret))
log.Debug("the replication initialization completed")
return nil
}

View File

@ -17,15 +17,28 @@
package ng
import (
"io/ioutil"
"os"
"path"
"testing"
// "github.com/stretchr/testify/assert"
// "github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/core/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestInit(t *testing.T) {
// TODO add testing code
// err := Init()
// require.Nil(t, err)
// assert.NotNil(t, OperationCtl)
// TODO add check for RegistryMgr and ExecutionMgr
key := path.Join(os.TempDir(), "key")
err := ioutil.WriteFile(key, []byte{'k'}, os.ModePerm)
require.Nil(t, err)
defer os.Remove(key)
err = os.Setenv("KEY_PATH", key)
require.Nil(t, err)
config.InitWithSettings(nil)
err = Init()
require.Nil(t, err)
assert.NotNil(t, PolicyMgr)
assert.NotNil(t, RegistryMgr)
assert.NotNil(t, OperationCtl)
}

View File

@ -22,8 +22,8 @@ import (
"github.com/goharbor/harbor/src/common/job"
common_job "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/model"
)
@ -101,7 +101,7 @@ func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, e
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID),
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.Config.CoreURL, item.TaskID),
}
job.Name = common_job.Replication

View File

@ -0,0 +1,24 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"path/filepath"
)
// Match returns whether the str matches the pattern
func Match(pattern, str string) (bool, error) {
return filepath.Match(pattern, str)
}

View File

@ -0,0 +1,77 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMatch(t *testing.T) {
cases := []struct {
pattern string
str string
match bool
}{
{
pattern: "",
str: "",
match: true,
},
{
pattern: "1.*",
str: "1.0",
match: true,
},
{
pattern: "1.*",
str: "1.01",
match: true,
},
{
pattern: "1.?",
str: "1.0",
match: true,
},
{
pattern: "1.?",
str: "1.01",
match: false,
},
{
pattern: "library/*",
str: "library/hello-world",
match: true,
},
{
pattern: "lib*",
str: "library/hello-world",
match: false,
},
{
pattern: "lib*/*",
str: "library/hello-world",
match: true,
},
}
for _, c := range cases {
match, err := Match(c.pattern, c.str)
require.Nil(t, err)
assert.Equal(t, c.match, match)
}
}