mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-03 15:43:39 +01:00
Merge pull request #7210 from ywk253100/190321_delete
Add support for replicating the delation of resource
This commit is contained in:
commit
791aecddfa
@ -113,7 +113,7 @@ func (r *ReplicationOperationAPI) CreateExecution() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
executionID, err := ng.OperationCtl.StartReplication(policy)
|
executionID, err := ng.OperationCtl.StartReplication(policy, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
|
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
|
||||||
return
|
return
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
|
|
||||||
type fakedOperationController struct{}
|
type fakedOperationController struct{}
|
||||||
|
|
||||||
func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64, error) {
|
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
|
||||||
return 1, nil
|
return 1, nil
|
||||||
}
|
}
|
||||||
func (f *fakedOperationController) StopReplication(int64) error {
|
func (f *fakedOperationController) StopReplication(int64) error {
|
||||||
|
30
src/replication/ng/config/config.go
Normal file
30
src/replication/ng/config/config.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package config
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Config is the configuration
|
||||||
|
Config *Configuration
|
||||||
|
)
|
||||||
|
|
||||||
|
// Configuration holds the configuration information for replication
|
||||||
|
type Configuration struct {
|
||||||
|
CoreURL string
|
||||||
|
RegistryURL string
|
||||||
|
TokenServiceURL string
|
||||||
|
JobserviceURL string
|
||||||
|
SecretKey string
|
||||||
|
Secret string
|
||||||
|
}
|
@ -106,6 +106,7 @@ type TaskFieldsName struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Task represent the tasks in one execution.
|
// Task represent the tasks in one execution.
|
||||||
|
// TODO add operation property
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
||||||
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
|
ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"`
|
||||||
|
@ -14,105 +14,23 @@
|
|||||||
|
|
||||||
package flow
|
package flow
|
||||||
|
|
||||||
import (
|
// Flow defines replication flow
|
||||||
"fmt"
|
type Flow interface {
|
||||||
|
Run(interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
// Controller is the controller that controls the replication flows
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Controller controls the replication flow
|
|
||||||
type Controller interface {
|
type Controller interface {
|
||||||
// Start a replication according to the policy and returns the
|
Start(Flow) error
|
||||||
// execution ID and error
|
|
||||||
StartReplication(policy *model.Policy) (int64, error)
|
|
||||||
// Stop the replication specified by the execution ID
|
|
||||||
StopReplication(int64) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController returns an instance of a Controller
|
// NewController returns an instance of the default flow controller
|
||||||
func NewController(registryMgr registry.Manager,
|
func NewController() Controller {
|
||||||
executionMgr execution.Manager, scheduler scheduler.Scheduler) (Controller, error) {
|
return &controller{}
|
||||||
if registryMgr == nil || executionMgr == nil || scheduler == nil {
|
|
||||||
// TODO(ChenDe): Uncomment it when execution manager is ready
|
|
||||||
// return nil, errors.New("invalid params")
|
|
||||||
}
|
|
||||||
return &defaultController{
|
|
||||||
registryMgr: registryMgr,
|
|
||||||
executionMgr: executionMgr,
|
|
||||||
scheduler: scheduler,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaultController is the default implement for the Controller
|
type controller struct{}
|
||||||
type defaultController struct {
|
|
||||||
registryMgr registry.Manager
|
func (c *controller) Start(flow Flow) error {
|
||||||
executionMgr execution.Manager
|
return flow.Run(nil)
|
||||||
scheduler scheduler.Scheduler
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a replication according to the policy
|
|
||||||
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
|
|
||||||
log.Infof("starting the replication based on the policy %d ...", policy.ID)
|
|
||||||
|
|
||||||
flow, err := newFlow(policy, d.registryMgr, d.executionMgr, d.scheduler)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to create the flow object based on policy %d: %v", policy.ID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the execution record
|
|
||||||
id, err := flow.createExecution()
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policy.ID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch resources from the source registry
|
|
||||||
if err := flow.fetchResources(); err != nil {
|
|
||||||
// just log the error message and return the execution ID
|
|
||||||
log.Errorf("failed to fetch resources for the execution %d: %v", id, err)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if no resources need to be replicated, mark the execution success and return
|
|
||||||
if len(flow.srcResources) == 0 {
|
|
||||||
flow.markExecutionSuccess("no resources need to be replicated")
|
|
||||||
log.Infof("no resources need to be replicated for the execution %d, skip", id)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the namespace on the destination registry
|
|
||||||
if err = flow.createNamespace(); err != nil {
|
|
||||||
log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// preprocess the resources
|
|
||||||
if err = flow.preprocess(); err != nil {
|
|
||||||
log.Errorf("failed to preprocess the resources for the execution %d: %v", id, err)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// create task records in database
|
|
||||||
if err = flow.createTasks(); err != nil {
|
|
||||||
log.Errorf("failed to create task records for the execution %d: %v", id, err)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// schedule the tasks
|
|
||||||
if err = flow.schedule(); err != nil {
|
|
||||||
log.Errorf("failed to schedule the execution %d: %v", id, err)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("the execution %d scheduled", id)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *defaultController) StopReplication(id int64) error {
|
|
||||||
// TODO
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -15,248 +15,20 @@
|
|||||||
package flow
|
package flow
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakedPolicyManager struct{}
|
type fakedFlow struct{}
|
||||||
|
|
||||||
func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) {
|
func (f *fakedFlow) Run(interface{}) error {
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedPolicyManager) Get(int64) (*model.Policy, error) {
|
|
||||||
return &model.Policy{
|
|
||||||
ID: 1,
|
|
||||||
SrcRegistryID: 1,
|
|
||||||
SrcNamespaces: []string{"library"},
|
|
||||||
DestRegistryID: 2,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
func (f *fakedPolicyManager) Update(*model.Policy) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedPolicyManager) Remove(int64) error {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakedRegistryManager struct{}
|
func TestStart(t *testing.T) {
|
||||||
|
flow := &fakedFlow{}
|
||||||
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
|
controller := NewController()
|
||||||
return 0, nil
|
err := controller.Start(flow)
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
|
|
||||||
if id == 1 {
|
|
||||||
return &model.Registry{
|
|
||||||
Type: "faked_registry",
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
if id == 2 {
|
|
||||||
return &model.Registry{
|
|
||||||
Type: "faked_registry",
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) Remove(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedRegistryManager) HealthCheck() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakedExecutionManager struct {
|
|
||||||
taskID int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) Remove(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) RemoveAll(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
|
|
||||||
f.taskID++
|
|
||||||
id := f.taskID
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakedScheduler struct{}
|
|
||||||
|
|
||||||
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
|
|
||||||
items := []*scheduler.ScheduleItem{}
|
|
||||||
for i, res := range src {
|
|
||||||
items = append(items, &scheduler.ScheduleItem{
|
|
||||||
SrcResource: res,
|
|
||||||
DstResource: dst[i],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return items, nil
|
|
||||||
}
|
|
||||||
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
|
|
||||||
results := []*scheduler.ScheduleResult{}
|
|
||||||
for _, item := range items {
|
|
||||||
results = append(results, &scheduler.ScheduleResult{
|
|
||||||
TaskID: item.TaskID,
|
|
||||||
Error: nil,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return results, nil
|
|
||||||
}
|
|
||||||
func (f *fakedScheduler) Stop(id string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
|
|
||||||
return &fakedAdapter{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakedAdapter struct{}
|
|
||||||
|
|
||||||
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) {
|
|
||||||
return &model.Namespace{}, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
|
|
||||||
return []*model.Resource{
|
|
||||||
{
|
|
||||||
Type: model.ResourceTypeRepository,
|
|
||||||
Metadata: &model.ResourceMetadata{
|
|
||||||
Name: "library/hello-world",
|
|
||||||
Namespace: "library",
|
|
||||||
Vtags: []string{"latest"},
|
|
||||||
},
|
|
||||||
Override: false,
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
|
|
||||||
return false, "", nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
|
|
||||||
return nil, "", nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
|
|
||||||
return []*model.Resource{
|
|
||||||
{
|
|
||||||
Type: model.ResourceTypeChart,
|
|
||||||
Metadata: &model.ResourceMetadata{
|
|
||||||
Name: "library/harbor",
|
|
||||||
Namespace: "library",
|
|
||||||
Vtags: []string{"0.2.0"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakedAdapter) DeleteChart(name, version string) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStartReplication(t *testing.T) {
|
|
||||||
config.InitWithSettings(nil)
|
|
||||||
err := adapter.RegisterFactory(
|
|
||||||
&adapter.Info{
|
|
||||||
Type: "faked_registry",
|
|
||||||
SupportedResourceTypes: []model.ResourceType{
|
|
||||||
model.ResourceTypeRepository,
|
|
||||||
model.ResourceTypeChart,
|
|
||||||
},
|
|
||||||
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
|
|
||||||
}, fakedAdapterFactory)
|
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
controller, _ := NewController(
|
|
||||||
&fakedRegistryManager{},
|
|
||||||
&fakedExecutionManager{},
|
|
||||||
&fakedScheduler{})
|
|
||||||
|
|
||||||
policy := &model.Policy{
|
|
||||||
ID: 1,
|
|
||||||
SrcRegistryID: 1,
|
|
||||||
DestRegistryID: 2,
|
|
||||||
DestNamespace: "library",
|
|
||||||
}
|
|
||||||
id, err := controller.StartReplication(policy)
|
|
||||||
require.Nil(t, err)
|
|
||||||
assert.Equal(t, id, int64(1))
|
|
||||||
}
|
}
|
||||||
|
95
src/replication/ng/flow/copy.go
Normal file
95
src/replication/ng/flow/copy.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
|
)
|
||||||
|
|
||||||
|
type copyFlow struct {
|
||||||
|
executionID int64
|
||||||
|
policy *model.Policy
|
||||||
|
executionMgr execution.Manager
|
||||||
|
registryMgr registry.Manager
|
||||||
|
scheduler scheduler.Scheduler
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
|
||||||
|
// the source registry to the destination registry
|
||||||
|
func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager,
|
||||||
|
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy) Flow {
|
||||||
|
return ©Flow{
|
||||||
|
executionMgr: executionMgr,
|
||||||
|
registryMgr: registryMgr,
|
||||||
|
scheduler: scheduler,
|
||||||
|
executionID: executionID,
|
||||||
|
policy: policy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *copyFlow) Run(interface{}) error {
|
||||||
|
srcRegistry, dstRegistry, srcAdapter, dstAdapter, err := initialize(c.registryMgr, c.policy)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// TODO after refactoring the adapter register, the "srcRegistry.Type" is not needed
|
||||||
|
srcResources, err := fetchResources(srcAdapter, srcRegistry.Type, c.policy)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(srcResources) == 0 {
|
||||||
|
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
|
||||||
|
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dstNamespaces, err := assembleDestinationNamespaces(srcAdapter, srcResources, c.policy.DestNamespace)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = createNamespaces(dstAdapter, dstNamespaces); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dstResources := assembleDestinationResources(srcResources, dstRegistry, c.policy.DestNamespace, c.policy.Override)
|
||||||
|
items, err := preprocess(c.scheduler, srcResources, dstResources)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = createTasks(c.executionMgr, c.executionID, items); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return schedule(c.scheduler, c.executionMgr, items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark the execution as success in database
|
||||||
|
func markExecutionSuccess(mgr execution.Manager, id int64, message string) {
|
||||||
|
err := mgr.Update(
|
||||||
|
&models.Execution{
|
||||||
|
ID: id,
|
||||||
|
Status: models.ExecutionStatusSucceed,
|
||||||
|
StatusText: message,
|
||||||
|
EndTime: time.Now(),
|
||||||
|
}, "Status", "StatusText", "EndTime")
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to update the execution %d: %v", id, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
28
src/replication/ng/flow/copy_test.go
Normal file
28
src/replication/ng/flow/copy_test.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRunOfCopyFlow(t *testing.T) {
|
||||||
|
scheduler := &fakedScheduler{}
|
||||||
|
executionMgr := &fakedExecutionManager{}
|
||||||
|
registryMgr := &fakedRegistryManager{}
|
||||||
|
policy := &model.Policy{}
|
||||||
|
flow := NewCopyFlow(executionMgr, registryMgr, scheduler, 1, policy)
|
||||||
|
err := flow.Run(nil)
|
||||||
|
require.Nil(t, err)
|
||||||
|
}
|
76
src/replication/ng/flow/deletion.go
Normal file
76
src/replication/ng/flow/deletion.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
|
)
|
||||||
|
|
||||||
|
type deletionFlow struct {
|
||||||
|
executionID int64
|
||||||
|
policy *model.Policy
|
||||||
|
executionMgr execution.Manager
|
||||||
|
registryMgr registry.Manager
|
||||||
|
scheduler scheduler.Scheduler
|
||||||
|
resources []*model.Resource
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
|
||||||
|
// on the destination registry
|
||||||
|
func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manager,
|
||||||
|
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy,
|
||||||
|
resources []*model.Resource) Flow {
|
||||||
|
return &deletionFlow{
|
||||||
|
executionMgr: executionMgr,
|
||||||
|
registryMgr: registryMgr,
|
||||||
|
scheduler: scheduler,
|
||||||
|
executionID: executionID,
|
||||||
|
policy: policy,
|
||||||
|
resources: resources,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *deletionFlow) Run(interface{}) error {
|
||||||
|
srcRegistry, dstRegistry, _, _, err := initialize(d.registryMgr, d.policy)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// filling the registry information
|
||||||
|
for _, resource := range d.resources {
|
||||||
|
resource.Registry = srcRegistry
|
||||||
|
}
|
||||||
|
srcResources, err := filterResources(d.resources, d.policy.Filters)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(srcResources) == 0 {
|
||||||
|
markExecutionSuccess(d.executionMgr, d.executionID, "no resources need to be replicated")
|
||||||
|
log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dstResources := assembleDestinationResources(srcResources, dstRegistry, d.policy.DestNamespace, d.policy.Override)
|
||||||
|
items, err := preprocess(d.scheduler, srcResources, dstResources)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = createTasks(d.executionMgr, d.executionID, items); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return schedule(d.scheduler, d.executionMgr, items)
|
||||||
|
}
|
33
src/replication/ng/flow/deletion_test.go
Normal file
33
src/replication/ng/flow/deletion_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRunOfDeletionFlow(t *testing.T) {
|
||||||
|
scheduler := &fakedScheduler{}
|
||||||
|
executionMgr := &fakedExecutionManager{}
|
||||||
|
registryMgr := &fakedRegistryManager{}
|
||||||
|
policy := &model.Policy{}
|
||||||
|
resources := []*model.Resource{}
|
||||||
|
flow := NewDeletionFlow(executionMgr, registryMgr, scheduler, 1, policy, resources)
|
||||||
|
err := flow.Run(nil)
|
||||||
|
require.Nil(t, err)
|
||||||
|
}
|
@ -1,401 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package flow
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
type flow struct {
|
|
||||||
policy *model.Policy
|
|
||||||
srcRegistry *model.Registry
|
|
||||||
dstRegistry *model.Registry
|
|
||||||
srcAdapter adapter.Adapter
|
|
||||||
dstAdapter adapter.Adapter
|
|
||||||
executionID int64
|
|
||||||
srcResources []*model.Resource
|
|
||||||
dstResources []*model.Resource
|
|
||||||
executionMgr execution.Manager
|
|
||||||
scheduler scheduler.Scheduler
|
|
||||||
scheduleItems []*scheduler.ScheduleItem
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFlow(policy *model.Policy, registryMgr registry.Manager,
|
|
||||||
executionMgr execution.Manager, scheduler scheduler.Scheduler) (*flow, error) {
|
|
||||||
|
|
||||||
f := &flow{
|
|
||||||
policy: policy,
|
|
||||||
executionMgr: executionMgr,
|
|
||||||
scheduler: scheduler,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO consider to put registry model in the policy directly rather than just the registry ID?
|
|
||||||
url, err := config.RegistryURL()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get the registry URL: %v", err)
|
|
||||||
}
|
|
||||||
registry := &model.Registry{
|
|
||||||
Type: model.RegistryTypeHarbor,
|
|
||||||
Name: "Local",
|
|
||||||
URL: url,
|
|
||||||
// TODO use the service account
|
|
||||||
Credential: &model.Credential{
|
|
||||||
Type: model.CredentialTypeBasic,
|
|
||||||
AccessKey: "admin",
|
|
||||||
AccessSecret: "Harbor12345",
|
|
||||||
},
|
|
||||||
Insecure: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
// get source registry
|
|
||||||
if policy.SrcRegistryID != 0 {
|
|
||||||
srcRegistry, err := registryMgr.Get(policy.SrcRegistryID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
|
|
||||||
}
|
|
||||||
if srcRegistry == nil {
|
|
||||||
return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
|
|
||||||
}
|
|
||||||
f.srcRegistry = srcRegistry
|
|
||||||
} else {
|
|
||||||
f.srcRegistry = registry
|
|
||||||
}
|
|
||||||
|
|
||||||
// get destination registry
|
|
||||||
if policy.DestRegistryID != 0 {
|
|
||||||
dstRegistry, err := registryMgr.Get(policy.DestRegistryID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
|
|
||||||
}
|
|
||||||
if dstRegistry == nil {
|
|
||||||
return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
|
|
||||||
}
|
|
||||||
f.dstRegistry = dstRegistry
|
|
||||||
} else {
|
|
||||||
f.dstRegistry = registry
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the source registry adapter
|
|
||||||
srcFactory, err := adapter.GetFactory(f.srcRegistry.Type)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.srcRegistry.Type, err)
|
|
||||||
}
|
|
||||||
srcAdapter, err := srcFactory(f.srcRegistry)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", f.srcRegistry.URL, err)
|
|
||||||
}
|
|
||||||
f.srcAdapter = srcAdapter
|
|
||||||
|
|
||||||
// create the destination registry adapter
|
|
||||||
dstFactory, err := adapter.GetFactory(f.dstRegistry.Type)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.dstRegistry.Type, err)
|
|
||||||
}
|
|
||||||
dstAdapter, err := dstFactory(f.dstRegistry)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", f.dstRegistry.URL, err)
|
|
||||||
}
|
|
||||||
f.dstAdapter = dstAdapter
|
|
||||||
|
|
||||||
return f, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) createExecution() (int64, error) {
|
|
||||||
id, err := f.executionMgr.Create(&models.Execution{
|
|
||||||
PolicyID: f.policy.ID,
|
|
||||||
Status: models.ExecutionStatusInProgress,
|
|
||||||
StartTime: time.Now(),
|
|
||||||
})
|
|
||||||
f.executionID = id
|
|
||||||
log.Debugf("an execution record for replication based on the policy %d created: %d", f.policy.ID, id)
|
|
||||||
return id, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) fetchResources() error {
|
|
||||||
resTypes := []model.ResourceType{}
|
|
||||||
filters := []*model.Filter{}
|
|
||||||
for _, filter := range f.policy.Filters {
|
|
||||||
if filter.Type != model.FilterTypeResource {
|
|
||||||
filters = append(filters, filter)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
resTypes = append(resTypes, filter.Value.(model.ResourceType))
|
|
||||||
}
|
|
||||||
if len(resTypes) == 0 {
|
|
||||||
resTypes = append(resTypes, adapter.GetAdapterInfo(f.srcRegistry.Type).SupportedResourceTypes...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO consider whether the logic can be refactored by using reflect
|
|
||||||
srcResources := []*model.Resource{}
|
|
||||||
for _, typ := range resTypes {
|
|
||||||
log.Debugf("fetching %s...", typ)
|
|
||||||
// images
|
|
||||||
if typ == model.ResourceTypeRepository {
|
|
||||||
reg, ok := f.srcAdapter.(adapter.ImageRegistry)
|
|
||||||
if !ok {
|
|
||||||
err := fmt.Errorf("the adapter doesn't implement the ImageRegistry interface")
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
res, err := reg.FetchImages(f.policy.SrcNamespaces, filters)
|
|
||||||
if err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
srcResources = append(srcResources, res...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// charts
|
|
||||||
if typ == model.ResourceTypeChart {
|
|
||||||
reg, ok := f.srcAdapter.(adapter.ChartRegistry)
|
|
||||||
if !ok {
|
|
||||||
err := fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
res, err := reg.FetchCharts(f.policy.SrcNamespaces, filters)
|
|
||||||
if err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
srcResources = append(srcResources, res...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dstResources := []*model.Resource{}
|
|
||||||
for _, srcResource := range srcResources {
|
|
||||||
dstResource := &model.Resource{
|
|
||||||
Type: srcResource.Type,
|
|
||||||
Metadata: &model.ResourceMetadata{
|
|
||||||
Name: srcResource.Metadata.Name,
|
|
||||||
Namespace: srcResource.Metadata.Namespace,
|
|
||||||
Vtags: srcResource.Metadata.Vtags,
|
|
||||||
},
|
|
||||||
Registry: f.dstRegistry,
|
|
||||||
ExtendedInfo: srcResource.ExtendedInfo,
|
|
||||||
Deleted: srcResource.Deleted,
|
|
||||||
Override: f.policy.Override,
|
|
||||||
}
|
|
||||||
// TODO check whether the logic is applied to chart
|
|
||||||
// if the destination namespace is specified, use the specified one
|
|
||||||
if len(f.policy.DestNamespace) > 0 {
|
|
||||||
dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name,
|
|
||||||
srcResource.Metadata.Namespace, f.policy.DestNamespace, 1)
|
|
||||||
dstResource.Metadata.Namespace = f.policy.DestNamespace
|
|
||||||
}
|
|
||||||
dstResources = append(dstResources, dstResource)
|
|
||||||
}
|
|
||||||
|
|
||||||
f.srcResources = srcResources
|
|
||||||
f.dstResources = dstResources
|
|
||||||
|
|
||||||
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) createNamespace() error {
|
|
||||||
// Merge the metadata of all source namespaces
|
|
||||||
// eg:
|
|
||||||
// We have two source namespaces:
|
|
||||||
// {
|
|
||||||
// Name: "source01",
|
|
||||||
// Metadata: {"public": true}
|
|
||||||
// }
|
|
||||||
// and
|
|
||||||
// {
|
|
||||||
// Name: "source02",
|
|
||||||
// Metadata: {"public": false}
|
|
||||||
// }
|
|
||||||
// The name of the destination namespace is "destination",
|
|
||||||
// after merging the metadata, the destination namespace
|
|
||||||
// looks like this:
|
|
||||||
// {
|
|
||||||
// Name: "destination",
|
|
||||||
// Metadata: {
|
|
||||||
// "public": {
|
|
||||||
// "source01": true,
|
|
||||||
// "source02": false,
|
|
||||||
// },
|
|
||||||
// },
|
|
||||||
// }
|
|
||||||
// TODO merge the metadata of different namespaces
|
|
||||||
namespaces := []*model.Namespace{}
|
|
||||||
for i, resource := range f.dstResources {
|
|
||||||
namespace := &model.Namespace{
|
|
||||||
Name: resource.Metadata.Namespace,
|
|
||||||
}
|
|
||||||
// get the metadata of the namespace from the source registry
|
|
||||||
ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace)
|
|
||||||
if err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
namespace.Metadata = ns.Metadata
|
|
||||||
namespaces = append(namespaces, namespace)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, namespace := range namespaces {
|
|
||||||
if err := f.dstAdapter.CreateNamespace(namespace); err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) preprocess() error {
|
|
||||||
items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources)
|
|
||||||
if err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
f.scheduleItems = items
|
|
||||||
log.Debugf("the preprocess for resources of the execution %d completed",
|
|
||||||
f.executionID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) createTasks() error {
|
|
||||||
for _, item := range f.scheduleItems {
|
|
||||||
task := &models.Task{
|
|
||||||
ExecutionID: f.executionID,
|
|
||||||
Status: models.TaskStatusInitialized,
|
|
||||||
ResourceType: string(item.SrcResource.Type),
|
|
||||||
SrcResource: getResourceName(item.SrcResource),
|
|
||||||
DstResource: getResourceName(item.DstResource),
|
|
||||||
}
|
|
||||||
id, err := f.executionMgr.CreateTask(task)
|
|
||||||
if err != nil {
|
|
||||||
// if failed to create the task for one of the items,
|
|
||||||
// the whole execution is marked as failure and all
|
|
||||||
// the items will not be submitted
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
item.TaskID = id
|
|
||||||
log.Debugf("task record %d for the execution %d created",
|
|
||||||
id, f.executionID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) schedule() error {
|
|
||||||
results, err := f.scheduler.Schedule(f.scheduleItems)
|
|
||||||
if err != nil {
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
allFailed := true
|
|
||||||
for _, result := range results {
|
|
||||||
// if the task is failed to be submitted, update the status of the
|
|
||||||
// task as failure
|
|
||||||
if result.Error != nil {
|
|
||||||
log.Errorf("failed to schedule task %d: %v", result.TaskID, result.Error)
|
|
||||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
|
|
||||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
allFailed = false
|
|
||||||
// if the task is submitted successfully, update the status, job ID and start time
|
|
||||||
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil {
|
|
||||||
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
|
|
||||||
}
|
|
||||||
if err = f.executionMgr.UpdateTask(&models.Task{
|
|
||||||
ID: result.TaskID,
|
|
||||||
JobID: result.JobID,
|
|
||||||
StartTime: time.Now(),
|
|
||||||
}, "JobID", "StartTime"); err != nil {
|
|
||||||
log.Errorf("failed to update task %d: %v", result.TaskID, err)
|
|
||||||
}
|
|
||||||
log.Debugf("the task %d scheduled", result.TaskID)
|
|
||||||
}
|
|
||||||
// if all the tasks are failed, mark the execution failed
|
|
||||||
if allFailed {
|
|
||||||
err = errors.New("all tasks are failed")
|
|
||||||
f.markExecutionFailure(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) markExecutionFailure(err error) {
|
|
||||||
statusText := ""
|
|
||||||
if err != nil {
|
|
||||||
statusText = err.Error()
|
|
||||||
}
|
|
||||||
log.Errorf("the execution %d is marked as failure because of the error: %s",
|
|
||||||
f.executionID, statusText)
|
|
||||||
err = f.executionMgr.Update(
|
|
||||||
&models.Execution{
|
|
||||||
ID: f.executionID,
|
|
||||||
Status: models.ExecutionStatusFailed,
|
|
||||||
StatusText: statusText,
|
|
||||||
EndTime: time.Now(),
|
|
||||||
}, "Status", "StatusText", "EndTime")
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *flow) markExecutionSuccess(msg string) {
|
|
||||||
log.Debugf("the execution %d is marked as success", f.executionID)
|
|
||||||
err := f.executionMgr.Update(
|
|
||||||
&models.Execution{
|
|
||||||
ID: f.executionID,
|
|
||||||
Status: models.ExecutionStatusSucceed,
|
|
||||||
StatusText: msg,
|
|
||||||
EndTime: time.Now(),
|
|
||||||
}, "Status", "StatusText", "EndTime")
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
|
|
||||||
// if the resource has vtags
|
|
||||||
func getResourceName(res *model.Resource) string {
|
|
||||||
if res == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
meta := res.Metadata
|
|
||||||
if meta == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
if len(meta.Vtags) == 0 {
|
|
||||||
return meta.Name
|
|
||||||
}
|
|
||||||
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
|
|
||||||
}
|
|
380
src/replication/ng/flow/stage.go
Normal file
380
src/replication/ng/flow/stage.go
Normal file
@ -0,0 +1,380 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/config"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// get/create the source registry, destination registry, source adapter and destination adapter
|
||||||
|
func initialize(mgr registry.Manager, policy *model.Policy) (*model.Registry, *model.Registry, adp.Adapter, adp.Adapter, error) {
|
||||||
|
var srcRegistry, dstRegistry *model.Registry
|
||||||
|
var srcAdapter, dstAdapter adp.Adapter
|
||||||
|
var err error
|
||||||
|
registry := getLocalRegistry()
|
||||||
|
// get source registry
|
||||||
|
if policy.SrcRegistryID != 0 {
|
||||||
|
srcRegistry, err = mgr.Get(policy.SrcRegistryID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
|
||||||
|
}
|
||||||
|
if srcRegistry == nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
srcRegistry = registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// get destination registry
|
||||||
|
if policy.DestRegistryID != 0 {
|
||||||
|
dstRegistry, err = mgr.Get(policy.DestRegistryID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
|
||||||
|
}
|
||||||
|
if dstRegistry == nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dstRegistry = registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the source registry adapter
|
||||||
|
srcFactory, err := adp.GetFactory(srcRegistry.Type)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err)
|
||||||
|
}
|
||||||
|
srcAdapter, err = srcFactory(srcRegistry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the destination registry adapter
|
||||||
|
dstFactory, err := adp.GetFactory(dstRegistry.Type)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err)
|
||||||
|
}
|
||||||
|
dstAdapter, err = dstFactory(dstRegistry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err)
|
||||||
|
}
|
||||||
|
log.Debug("replication flow initialization completed")
|
||||||
|
return srcRegistry, dstRegistry, srcAdapter, dstAdapter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch resources from the source registry
|
||||||
|
func fetchResources(adapter adp.Adapter, adapterType model.RegistryType, policy *model.Policy) ([]*model.Resource, error) {
|
||||||
|
resTypes := []model.ResourceType{}
|
||||||
|
filters := []*model.Filter{}
|
||||||
|
for _, filter := range policy.Filters {
|
||||||
|
if filter.Type != model.FilterTypeResource {
|
||||||
|
filters = append(filters, filter)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resTypes = append(resTypes, filter.Value.(model.ResourceType))
|
||||||
|
}
|
||||||
|
if len(resTypes) == 0 {
|
||||||
|
resTypes = append(resTypes, adp.GetAdapterInfo(adapterType).SupportedResourceTypes...)
|
||||||
|
}
|
||||||
|
|
||||||
|
resources := []*model.Resource{}
|
||||||
|
// convert the adapter to different interfaces according to its required resource types
|
||||||
|
for _, typ := range resTypes {
|
||||||
|
var res []*model.Resource
|
||||||
|
var err error
|
||||||
|
if typ == model.ResourceTypeRepository {
|
||||||
|
// images
|
||||||
|
reg, ok := adapter.(adp.ImageRegistry)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("the adapter doesn't implement the ImageRegistry interface")
|
||||||
|
}
|
||||||
|
res, err = reg.FetchImages(policy.SrcNamespaces, filters)
|
||||||
|
} else if typ == model.ResourceTypeChart {
|
||||||
|
// charts
|
||||||
|
reg, ok := adapter.(adp.ChartRegistry)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
|
||||||
|
}
|
||||||
|
res, err = reg.FetchCharts(policy.SrcNamespaces, filters)
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("unsupported resource type %s", typ)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to fetch %s: %v", typ, err)
|
||||||
|
}
|
||||||
|
resources = append(resources, res...)
|
||||||
|
log.Debugf("fetch %s completed", typ)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("fetch resources from the source registry completed")
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// apply the filters to the resources and returns the filtered resources
|
||||||
|
func filterResources(resources []*model.Resource, filters []*model.Filter) ([]*model.Resource, error) {
|
||||||
|
res := []*model.Resource{}
|
||||||
|
for _, resource := range resources {
|
||||||
|
match := true
|
||||||
|
for _, filter := range filters {
|
||||||
|
switch filter.Type {
|
||||||
|
case model.FilterTypeResource:
|
||||||
|
resourceType, ok := filter.Value.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
|
||||||
|
}
|
||||||
|
if model.ResourceType(resourceType) != resource.Type {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case model.FilterTypeName:
|
||||||
|
pattern, ok := filter.Value.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
|
||||||
|
}
|
||||||
|
if resource.Metadata == nil {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
m, err := util.Match(pattern, resource.Metadata.Name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !m {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case model.FilterTypeVersion:
|
||||||
|
pattern, ok := filter.Value.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("%v is not a valid string", filter.Value)
|
||||||
|
}
|
||||||
|
if resource.Metadata == nil {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
versions := []string{}
|
||||||
|
for _, version := range resource.Metadata.Vtags {
|
||||||
|
m, err := util.Match(pattern, version)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if m {
|
||||||
|
versions = append(versions, version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(versions) == 0 {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// NOTE: the property "Vtags" of the origin resource struct is overrided here
|
||||||
|
resource.Metadata.Vtags = versions
|
||||||
|
case model.FilterTypeLabel:
|
||||||
|
// TODO add support to label
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupportted filter type: %v", filter.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if match {
|
||||||
|
res = append(res, resource)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debug("filter resources completed")
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assemble the namespaces that need to be created on the destination registry:
|
||||||
|
// step 1: get the detail information for each of the source namespaces
|
||||||
|
// step 2: if the destination namespace isn't specified in the policy, then the
|
||||||
|
// same namespaces with the source will be returned. If it is specified, then
|
||||||
|
// returns the specified one with the merged metadatas of all source namespaces
|
||||||
|
func assembleDestinationNamespaces(srcAdapter adp.Adapter, srcResources []*model.Resource, dstNamespace string) ([]*model.Namespace, error) {
|
||||||
|
namespaces := []*model.Namespace{}
|
||||||
|
for _, srcResource := range srcResources {
|
||||||
|
namespace, err := srcAdapter.GetNamespace(srcResource.Metadata.Namespace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespaces = append(namespaces, namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dstNamespace) != 0 {
|
||||||
|
namespaces = []*model.Namespace{
|
||||||
|
{
|
||||||
|
Name: dstNamespace,
|
||||||
|
// TODO merge the metadata
|
||||||
|
Metadata: map[string]interface{}{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("assemble the destination namespaces completed")
|
||||||
|
return namespaces, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the namespaces on the destination registry
|
||||||
|
func createNamespaces(adapter adp.Adapter, namespaces []*model.Namespace) error {
|
||||||
|
for _, namespace := range namespaces {
|
||||||
|
if err := adapter.CreateNamespace(namespace); err != nil {
|
||||||
|
return fmt.Errorf("failed to create the namespace %s on the destination registry: %v", namespace.Name, err)
|
||||||
|
}
|
||||||
|
log.Debugf("namespace %s created on the destination registry", namespace.Name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// assemble the destination resources by filling the registry, namespace and override properties
|
||||||
|
func assembleDestinationResources(resources []*model.Resource,
|
||||||
|
registry *model.Registry, namespace string, override bool) []*model.Resource {
|
||||||
|
result := []*model.Resource{}
|
||||||
|
for _, resource := range resources {
|
||||||
|
res := &model.Resource{
|
||||||
|
Type: resource.Type,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: resource.Metadata.Name,
|
||||||
|
Namespace: resource.Metadata.Namespace,
|
||||||
|
Vtags: resource.Metadata.Vtags,
|
||||||
|
},
|
||||||
|
Registry: registry,
|
||||||
|
ExtendedInfo: resource.ExtendedInfo,
|
||||||
|
Deleted: resource.Deleted,
|
||||||
|
Override: override,
|
||||||
|
}
|
||||||
|
// if the destination namespace is specified, use the specified one
|
||||||
|
if len(namespace) > 0 {
|
||||||
|
res.Metadata.Name = strings.Replace(resource.Metadata.Name,
|
||||||
|
resource.Metadata.Namespace, namespace, 1)
|
||||||
|
res.Metadata.Namespace = namespace
|
||||||
|
}
|
||||||
|
result = append(result, res)
|
||||||
|
}
|
||||||
|
log.Debug("assemble the destination resources completed")
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// preprocess
|
||||||
|
func preprocess(scheduler scheduler.Scheduler, srcResources, dstResources []*model.Resource) ([]*scheduler.ScheduleItem, error) {
|
||||||
|
items, err := scheduler.Preprocess(srcResources, dstResources)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to preprocess the resources: %v", err)
|
||||||
|
}
|
||||||
|
log.Debug("preprocess the resources completed")
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// create task records in database
|
||||||
|
func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.ScheduleItem) error {
|
||||||
|
for _, item := range items {
|
||||||
|
task := &models.Task{
|
||||||
|
ExecutionID: executionID,
|
||||||
|
Status: models.TaskStatusInitialized,
|
||||||
|
ResourceType: string(item.SrcResource.Type),
|
||||||
|
SrcResource: getResourceName(item.SrcResource),
|
||||||
|
DstResource: getResourceName(item.DstResource),
|
||||||
|
}
|
||||||
|
id, err := mgr.CreateTask(task)
|
||||||
|
if err != nil {
|
||||||
|
// if failed to create the task for one of the items,
|
||||||
|
// the whole execution is marked as failure and all
|
||||||
|
// the items will not be submitted
|
||||||
|
return fmt.Errorf("failed to create task records for the execution %d: %v", executionID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
item.TaskID = id
|
||||||
|
log.Debugf("task record %d for the execution %d created", id, executionID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// schedule the replication tasks and update the task's status
|
||||||
|
func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) error {
|
||||||
|
results, err := scheduler.Schedule(items)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to schedule the tasks: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
allFailed := true
|
||||||
|
for _, result := range results {
|
||||||
|
// if the task is failed to be submitted, update the status of the
|
||||||
|
// task as failure
|
||||||
|
if result.Error != nil {
|
||||||
|
log.Errorf("failed to schedule the task %d: %v", result.TaskID, result.Error)
|
||||||
|
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
|
||||||
|
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allFailed = false
|
||||||
|
// if the task is submitted successfully, update the status, job ID and start time
|
||||||
|
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
|
||||||
|
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
|
||||||
|
}
|
||||||
|
if err = executionMgr.UpdateTask(&models.Task{
|
||||||
|
ID: result.TaskID,
|
||||||
|
JobID: result.JobID,
|
||||||
|
StartTime: time.Now(),
|
||||||
|
}, "JobID", "StartTime"); err != nil {
|
||||||
|
log.Errorf("failed to update the task %d: %v", result.TaskID, err)
|
||||||
|
}
|
||||||
|
log.Debugf("the task %d scheduled", result.TaskID)
|
||||||
|
}
|
||||||
|
// if all the tasks are failed, return err
|
||||||
|
if allFailed {
|
||||||
|
return errors.New("all tasks are failed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
|
||||||
|
// if the resource has vtags
|
||||||
|
func getResourceName(res *model.Resource) string {
|
||||||
|
if res == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
meta := res.Metadata
|
||||||
|
if meta == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if len(meta.Vtags) == 0 {
|
||||||
|
return meta.Name
|
||||||
|
}
|
||||||
|
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLocalRegistry() *model.Registry {
|
||||||
|
return &model.Registry{
|
||||||
|
Type: model.RegistryTypeHarbor,
|
||||||
|
Name: "Local",
|
||||||
|
URL: config.Config.RegistryURL,
|
||||||
|
// TODO use the service account
|
||||||
|
Credential: &model.Credential{
|
||||||
|
Type: model.CredentialTypeBasic,
|
||||||
|
AccessKey: "admin",
|
||||||
|
AccessSecret: "Harbor12345",
|
||||||
|
},
|
||||||
|
Insecure: true,
|
||||||
|
}
|
||||||
|
}
|
439
src/replication/ng/flow/stage_test.go
Normal file
439
src/replication/ng/flow/stage_test.go
Normal file
@ -0,0 +1,439 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package flow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/config"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakedRegistryManager struct{}
|
||||||
|
|
||||||
|
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
|
||||||
|
var registry *model.Registry
|
||||||
|
switch id {
|
||||||
|
case 1:
|
||||||
|
registry = &model.Registry{
|
||||||
|
ID: 1,
|
||||||
|
Type: model.RegistryTypeHarbor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return registry, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Remove(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) HealthCheck() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
|
||||||
|
return &fakedAdapter{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakedAdapter struct{}
|
||||||
|
|
||||||
|
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) GetNamespace(ns string) (*model.Namespace, error) {
|
||||||
|
var namespace *model.Namespace
|
||||||
|
if ns == "library" {
|
||||||
|
namespace = &model.Namespace{
|
||||||
|
Name: "library",
|
||||||
|
Metadata: map[string]interface{}{
|
||||||
|
"public": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return namespace, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
|
||||||
|
return []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeRepository,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
},
|
||||||
|
Override: false,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
|
||||||
|
return false, "", nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
|
||||||
|
return nil, "", nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
|
||||||
|
return []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/harbor",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"0.2.0"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedAdapter) DeleteChart(name, version string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakedScheduler struct{}
|
||||||
|
|
||||||
|
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
|
||||||
|
items := []*scheduler.ScheduleItem{}
|
||||||
|
for i, res := range src {
|
||||||
|
items = append(items, &scheduler.ScheduleItem{
|
||||||
|
SrcResource: res,
|
||||||
|
DstResource: dst[i],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
|
||||||
|
results := []*scheduler.ScheduleResult{}
|
||||||
|
for _, item := range items {
|
||||||
|
results = append(results, &scheduler.ScheduleResult{
|
||||||
|
TaskID: item.TaskID,
|
||||||
|
Error: nil,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
func (f *fakedScheduler) Stop(id string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakedExecutionManager struct {
|
||||||
|
taskID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) Update(*models.Execution, ...string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) Remove(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) RemoveAll(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
|
||||||
|
f.taskID++
|
||||||
|
id := f.taskID
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
url := "https://registry.harbor.local"
|
||||||
|
config.Config = &config.Configuration{
|
||||||
|
RegistryURL: url,
|
||||||
|
}
|
||||||
|
if err := adapter.RegisterFactory(
|
||||||
|
&adapter.Info{
|
||||||
|
Type: model.RegistryTypeHarbor,
|
||||||
|
SupportedResourceTypes: []model.ResourceType{
|
||||||
|
model.ResourceTypeRepository,
|
||||||
|
model.ResourceTypeChart,
|
||||||
|
},
|
||||||
|
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
|
||||||
|
}, fakedAdapterFactory); err != nil {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitialize(t *testing.T) {
|
||||||
|
url := "https://registry.harbor.local"
|
||||||
|
registryMgr := &fakedRegistryManager{}
|
||||||
|
policy := &model.Policy{
|
||||||
|
SrcRegistryID: 0,
|
||||||
|
DestRegistryID: 1,
|
||||||
|
}
|
||||||
|
srcRegistry, dstRegistry, _, _, err := initialize(registryMgr, policy)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, url, srcRegistry.URL)
|
||||||
|
assert.Equal(t, int64(1), dstRegistry.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFetchResources(t *testing.T) {
|
||||||
|
adapter := &fakedAdapter{}
|
||||||
|
policy := &model.Policy{}
|
||||||
|
resources, err := fetchResources(adapter, model.RegistryTypeHarbor, policy)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, 2, len(resources))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilterResources(t *testing.T) {
|
||||||
|
resources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeRepository,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
// TODO test labels
|
||||||
|
Labels: nil,
|
||||||
|
},
|
||||||
|
Deleted: true,
|
||||||
|
Override: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/harbor",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"0.2.0", "0.3.0"},
|
||||||
|
// TODO test labels
|
||||||
|
Labels: nil,
|
||||||
|
},
|
||||||
|
Deleted: true,
|
||||||
|
Override: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/mysql",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"1.0"},
|
||||||
|
// TODO test labels
|
||||||
|
Labels: nil,
|
||||||
|
},
|
||||||
|
Deleted: true,
|
||||||
|
Override: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
filters := []*model.Filter{
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeResource,
|
||||||
|
Value: string(model.ResourceTypeChart),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeName,
|
||||||
|
Value: "library/*",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeName,
|
||||||
|
Value: "library/harbor",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeVersion,
|
||||||
|
Value: "0.2.?",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
res, err := filterResources(resources, filters)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, 1, len(res))
|
||||||
|
assert.Equal(t, "library/harbor", res[0].Metadata.Name)
|
||||||
|
assert.Equal(t, 1, len(res[0].Metadata.Vtags))
|
||||||
|
assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAssembleDestinationNamespaces(t *testing.T) {
|
||||||
|
adapter := &fakedAdapter{}
|
||||||
|
resources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Namespace: "library",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
namespace := ""
|
||||||
|
ns, err := assembleDestinationNamespaces(adapter, resources, namespace)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, 1, len(ns))
|
||||||
|
assert.Equal(t, "library", ns[0].Name)
|
||||||
|
assert.Equal(t, true, ns[0].Metadata["public"].(bool))
|
||||||
|
|
||||||
|
namespace = "test"
|
||||||
|
ns, err = assembleDestinationNamespaces(adapter, resources, namespace)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, 1, len(ns))
|
||||||
|
assert.Equal(t, "test", ns[0].Name)
|
||||||
|
// TODO add test for merged metadata
|
||||||
|
// assert.Equal(t, true, ns[0].Metadata["public"].(bool))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateNamespaces(t *testing.T) {
|
||||||
|
adapter := &fakedAdapter{}
|
||||||
|
namespaces := []*model.Namespace{
|
||||||
|
{
|
||||||
|
Name: "library",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := createNamespaces(adapter, namespaces)
|
||||||
|
require.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAssembleDestinationResources(t *testing.T) {
|
||||||
|
resources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
},
|
||||||
|
Override: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
registry := &model.Registry{}
|
||||||
|
namespace := "test"
|
||||||
|
override := true
|
||||||
|
res := assembleDestinationResources(resources, registry, namespace, override)
|
||||||
|
assert.Equal(t, 1, len(res))
|
||||||
|
assert.Equal(t, model.ResourceTypeChart, res[0].Type)
|
||||||
|
assert.Equal(t, "test/hello-world", res[0].Metadata.Name)
|
||||||
|
assert.Equal(t, namespace, res[0].Metadata.Namespace)
|
||||||
|
assert.Equal(t, 1, len(res[0].Metadata.Vtags))
|
||||||
|
assert.Equal(t, "latest", res[0].Metadata.Vtags[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPreprocess(t *testing.T) {
|
||||||
|
scheduler := &fakedScheduler{}
|
||||||
|
srcResources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
Namespace: "library",
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
},
|
||||||
|
Override: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dstResources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "test/hello-world",
|
||||||
|
Namespace: "test",
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
},
|
||||||
|
Override: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
items, err := preprocess(scheduler, srcResources, dstResources)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, 1, len(items))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateTasks(t *testing.T) {
|
||||||
|
mgr := &fakedExecutionManager{}
|
||||||
|
items := []*scheduler.ScheduleItem{
|
||||||
|
{
|
||||||
|
SrcResource: &model.Resource{},
|
||||||
|
DstResource: &model.Resource{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := createTasks(mgr, 1, items)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, int64(1), items[0].TaskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedule(t *testing.T) {
|
||||||
|
sched := &fakedScheduler{}
|
||||||
|
mgr := &fakedExecutionManager{}
|
||||||
|
items := []*scheduler.ScheduleItem{
|
||||||
|
{
|
||||||
|
SrcResource: &model.Resource{},
|
||||||
|
DstResource: &model.Resource{},
|
||||||
|
TaskID: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := schedule(sched, mgr, items)
|
||||||
|
require.Nil(t, err)
|
||||||
|
}
|
@ -44,11 +44,13 @@ type Policy struct {
|
|||||||
SrcRegistryID int64 `json:"src_registry_id"`
|
SrcRegistryID int64 `json:"src_registry_id"`
|
||||||
SrcNamespaces []string `json:"src_namespaces"`
|
SrcNamespaces []string `json:"src_namespaces"`
|
||||||
// destination
|
// destination
|
||||||
|
// TODO rename to DstRegistryID
|
||||||
DestRegistryID int64 `json:"dest_registry_id"`
|
DestRegistryID int64 `json:"dest_registry_id"`
|
||||||
// Only support two dest namespace modes:
|
// Only support two dest namespace modes:
|
||||||
// Put all the src resources to the one single dest namespace
|
// Put all the src resources to the one single dest namespace
|
||||||
// or keep namespaces same with the source ones (under this case,
|
// or keep namespaces same with the source ones (under this case,
|
||||||
// the DestNamespace should be set to empty)
|
// the DestNamespace should be set to empty)
|
||||||
|
// TODO rename to DstNamespace
|
||||||
DestNamespace string `json:"dest_namespace"`
|
DestNamespace string `json:"dest_namespace"`
|
||||||
// Filters
|
// Filters
|
||||||
Filters []*Filter `json:"filters"`
|
Filters []*Filter `json:"filters"`
|
||||||
|
@ -15,16 +15,22 @@
|
|||||||
package operation
|
package operation
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/flow"
|
"github.com/goharbor/harbor/src/replication/ng/flow"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Controller handles the replication-related operations: start,
|
// Controller handles the replication-related operations: start,
|
||||||
// stop, query, etc.
|
// stop, query, etc.
|
||||||
type Controller interface {
|
type Controller interface {
|
||||||
StartReplication(policy *model.Policy) (int64, error)
|
StartReplication(policy *model.Policy, resource *model.Resource) (int64, error)
|
||||||
StopReplication(int64) error
|
StopReplication(int64) error
|
||||||
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
|
ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error)
|
||||||
GetExecution(int64) (*models.Execution, error)
|
GetExecution(int64) (*models.Execution, error)
|
||||||
@ -35,23 +41,76 @@ type Controller interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewController returns a controller implementation
|
// NewController returns a controller implementation
|
||||||
func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller {
|
func NewController(executionMgr execution.Manager, registrgMgr registry.Manager,
|
||||||
|
scheduler scheduler.Scheduler) Controller {
|
||||||
return &defaultController{
|
return &defaultController{
|
||||||
flowCtl: flowCtl,
|
|
||||||
executionMgr: executionMgr,
|
executionMgr: executionMgr,
|
||||||
|
registryMgr: registrgMgr,
|
||||||
|
scheduler: scheduler,
|
||||||
|
flowCtl: flow.NewController(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type defaultController struct {
|
type defaultController struct {
|
||||||
flowCtl flow.Controller
|
flowCtl flow.Controller
|
||||||
executionMgr execution.Manager
|
executionMgr execution.Manager
|
||||||
|
registryMgr registry.Manager
|
||||||
|
scheduler scheduler.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
|
func (d *defaultController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
|
||||||
return d.flowCtl.StartReplication(policy)
|
if resource != nil && len(resource.Metadata.Vtags) != 1 {
|
||||||
|
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := createExecution(d.executionMgr, policy.ID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
flow := d.createFlow(id, policy, resource)
|
||||||
|
if err = d.flowCtl.Start(flow); err != nil {
|
||||||
|
// mark the execution as failure and log the error message
|
||||||
|
// no error will be returned as the execution is created successfully
|
||||||
|
markExecutionFailure(d.executionMgr, id, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create different replication flows according to the input parameters
|
||||||
|
func (d *defaultController) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
|
||||||
|
// replicate the deletion operation, so create a deletion flow
|
||||||
|
if resource != nil && resource.Deleted {
|
||||||
|
return flow.NewDeletionFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy, []*model.Resource{resource})
|
||||||
|
|
||||||
|
}
|
||||||
|
// copy only one resource, add extra filters to the policy to make sure
|
||||||
|
// only the resource will be filtered out
|
||||||
|
if resource != nil {
|
||||||
|
filters := []*model.Filter{
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeResource,
|
||||||
|
Value: resource.Type,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeName,
|
||||||
|
Value: resource.Metadata.Name,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: model.FilterTypeVersion,
|
||||||
|
// only support replicate one tag
|
||||||
|
Value: resource.Metadata.Vtags[0],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
filters = append(filters, policy.Filters...)
|
||||||
|
}
|
||||||
|
return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *defaultController) StopReplication(executionID int64) error {
|
func (d *defaultController) StopReplication(executionID int64) error {
|
||||||
return d.flowCtl.StopReplication(executionID)
|
// TODO implement the function
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
|
||||||
return d.executionMgr.List(query...)
|
return d.executionMgr.List(query...)
|
||||||
@ -71,3 +130,33 @@ func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCond
|
|||||||
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
|
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
|
||||||
return d.executionMgr.GetTaskLog(taskID)
|
return d.executionMgr.GetTaskLog(taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the execution record in database
|
||||||
|
func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
|
||||||
|
id, err := mgr.Create(&models.Execution{
|
||||||
|
PolicyID: policyID,
|
||||||
|
Status: models.ExecutionStatusInProgress,
|
||||||
|
StartTime: time.Now(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policyID, err)
|
||||||
|
}
|
||||||
|
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark the execution as failure in database
|
||||||
|
func markExecutionFailure(mgr execution.Manager, id int64, message string) {
|
||||||
|
err := mgr.Update(
|
||||||
|
&models.Execution{
|
||||||
|
ID: id,
|
||||||
|
Status: models.ExecutionStatusFailed,
|
||||||
|
StatusText: message,
|
||||||
|
EndTime: time.Now(),
|
||||||
|
}, "Status", "StatusText", "EndTime")
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to update the execution %d: %v", id, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Debugf("the execution %d is marked as failure: %s", id, message)
|
||||||
|
}
|
||||||
|
@ -17,21 +17,14 @@ package operation
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakedFlowController struct{}
|
|
||||||
|
|
||||||
func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) {
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
func (f *fakedFlowController) StopReplication(int64) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakedExecutionManager struct{}
|
type fakedExecutionManager struct{}
|
||||||
|
|
||||||
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
|
||||||
@ -69,7 +62,9 @@ func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*model
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) {
|
||||||
return nil, nil
|
return &models.Task{
|
||||||
|
ID: 1,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error {
|
||||||
return nil
|
return nil
|
||||||
@ -87,10 +82,95 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
|
|||||||
return []byte("message"), nil
|
return []byte("message"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{})
|
type fakedRegistryManager struct{}
|
||||||
|
|
||||||
|
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
|
||||||
|
var registry *model.Registry
|
||||||
|
switch id {
|
||||||
|
case 1:
|
||||||
|
registry = &model.Registry{
|
||||||
|
ID: 1,
|
||||||
|
Type: model.RegistryTypeHarbor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return registry, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) Remove(int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (f *fakedRegistryManager) HealthCheck() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakedScheduler struct{}
|
||||||
|
|
||||||
|
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
|
||||||
|
items := []*scheduler.ScheduleItem{}
|
||||||
|
for i, res := range src {
|
||||||
|
items = append(items, &scheduler.ScheduleItem{
|
||||||
|
SrcResource: res,
|
||||||
|
DstResource: dst[i],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
|
||||||
|
results := []*scheduler.ScheduleResult{}
|
||||||
|
for _, item := range items {
|
||||||
|
results = append(results, &scheduler.ScheduleResult{
|
||||||
|
TaskID: item.TaskID,
|
||||||
|
Error: nil,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
func (f *fakedScheduler) Stop(id string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var ctl = NewController(&fakedExecutionManager{}, &fakedRegistryManager{}, &fakedScheduler{})
|
||||||
|
|
||||||
func TestStartReplication(t *testing.T) {
|
func TestStartReplication(t *testing.T) {
|
||||||
id, err := ctl.StartReplication(nil)
|
config.Config = &config.Configuration{}
|
||||||
|
// the resource contains Vtags whose length isn't 1
|
||||||
|
policy := &model.Policy{}
|
||||||
|
resource := &model.Resource{
|
||||||
|
Type: model.ResourceTypeRepository,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
Vtags: []string{"1.0", "2.0"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err := ctl.StartReplication(policy, resource)
|
||||||
|
require.NotNil(t, err)
|
||||||
|
|
||||||
|
// replicate resource deletion
|
||||||
|
resource.Metadata.Vtags = []string{"1.0"}
|
||||||
|
resource.Deleted = true
|
||||||
|
id, err := ctl.StartReplication(policy, resource)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, int64(1), id)
|
||||||
|
|
||||||
|
// replicate resource copy
|
||||||
|
resource.Deleted = false
|
||||||
|
id, err = ctl.StartReplication(policy, resource)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, int64(1), id)
|
||||||
|
|
||||||
|
// nil resource
|
||||||
|
id, err = ctl.StartReplication(policy, nil)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
assert.Equal(t, int64(1), id)
|
assert.Equal(t, int64(1), id)
|
||||||
}
|
}
|
||||||
@ -120,6 +200,17 @@ func TestListTasks(t *testing.T) {
|
|||||||
assert.Equal(t, int64(1), tasks[0].ID)
|
assert.Equal(t, int64(1), tasks[0].ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetTask(t *testing.T) {
|
||||||
|
task, err := ctl.GetTask(1)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, int64(1), task.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateTaskStatus(t *testing.T) {
|
||||||
|
err := ctl.UpdateTaskStatus(1, "running")
|
||||||
|
require.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetTaskLog(t *testing.T) {
|
func TestGetTaskLog(t *testing.T) {
|
||||||
log, err := ctl.GetTaskLog(1)
|
log, err := ctl.GetTaskLog(1)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
"github.com/goharbor/harbor/src/common/utils/registry"
|
"github.com/goharbor/harbor/src/common/utils/registry"
|
||||||
"github.com/goharbor/harbor/src/common/utils/registry/auth"
|
"github.com/goharbor/harbor/src/common/utils/registry/auth"
|
||||||
|
// TODO use the replication config rather than the core
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao"
|
"github.com/goharbor/harbor/src/replication/ng/dao"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
|
@ -17,11 +17,10 @@
|
|||||||
package ng
|
package ng
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
cfg "github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
"github.com/goharbor/harbor/src/replication/ng/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/flow"
|
|
||||||
"github.com/goharbor/harbor/src/replication/ng/operation"
|
"github.com/goharbor/harbor/src/replication/ng/operation"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/policy"
|
"github.com/goharbor/harbor/src/replication/ng/policy"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||||
@ -39,21 +38,32 @@ var (
|
|||||||
OperationCtl operation.Controller
|
OperationCtl operation.Controller
|
||||||
)
|
)
|
||||||
|
|
||||||
// Init the global variables
|
// Init the global variables and configurations
|
||||||
func Init() error {
|
func Init() error {
|
||||||
|
// init config
|
||||||
|
registryURL, err := cfg.RegistryURL()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
secretKey, err := cfg.SecretKey()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
config.Config = &config.Configuration{
|
||||||
|
CoreURL: cfg.InternalCoreURL(),
|
||||||
|
RegistryURL: registryURL,
|
||||||
|
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
|
||||||
|
JobserviceURL: cfg.InternalJobServiceURL(),
|
||||||
|
SecretKey: secretKey,
|
||||||
|
Secret: cfg.CoreSecret(),
|
||||||
|
}
|
||||||
// Init registry manager
|
// Init registry manager
|
||||||
RegistryMgr = registry.NewDefaultManager()
|
RegistryMgr = registry.NewDefaultManager()
|
||||||
// init policy manager
|
// init policy manager
|
||||||
PolicyMgr = policy.NewDefaultManager()
|
PolicyMgr = policy.NewDefaultManager()
|
||||||
// init ExecutionMgr
|
// init operatoin controller
|
||||||
executionMgr := execution.NewDefaultManager()
|
OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr,
|
||||||
// init scheduler
|
scheduler.NewScheduler(config.Config.JobserviceURL, config.Config.Secret))
|
||||||
scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret())
|
log.Debug("the replication initialization completed")
|
||||||
|
|
||||||
flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create the flow controller: %v", err)
|
|
||||||
}
|
|
||||||
OperationCtl = operation.NewController(flowCtl, executionMgr)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -17,15 +17,28 @@
|
|||||||
package ng
|
package ng
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
// "github.com/stretchr/testify/assert"
|
|
||||||
// "github.com/stretchr/testify/require"
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInit(t *testing.T) {
|
func TestInit(t *testing.T) {
|
||||||
// TODO add testing code
|
key := path.Join(os.TempDir(), "key")
|
||||||
// err := Init()
|
err := ioutil.WriteFile(key, []byte{'k'}, os.ModePerm)
|
||||||
// require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
// assert.NotNil(t, OperationCtl)
|
defer os.Remove(key)
|
||||||
// TODO add check for RegistryMgr and ExecutionMgr
|
err = os.Setenv("KEY_PATH", key)
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
config.InitWithSettings(nil)
|
||||||
|
err = Init()
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.NotNil(t, PolicyMgr)
|
||||||
|
assert.NotNil(t, RegistryMgr)
|
||||||
|
assert.NotNil(t, OperationCtl)
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,8 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/common/job"
|
"github.com/goharbor/harbor/src/common/job"
|
||||||
common_job "github.com/goharbor/harbor/src/common/job"
|
common_job "github.com/goharbor/harbor/src/common/job"
|
||||||
"github.com/goharbor/harbor/src/common/job/models"
|
"github.com/goharbor/harbor/src/common/job/models"
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/opm"
|
"github.com/goharbor/harbor/src/jobservice/opm"
|
||||||
|
"github.com/goharbor/harbor/src/replication/ng/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, e
|
|||||||
Metadata: &models.JobMetadata{
|
Metadata: &models.JobMetadata{
|
||||||
JobKind: job.JobKindGeneric,
|
JobKind: job.JobKindGeneric,
|
||||||
},
|
},
|
||||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID),
|
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.Config.CoreURL, item.TaskID),
|
||||||
}
|
}
|
||||||
|
|
||||||
job.Name = common_job.Replication
|
job.Name = common_job.Replication
|
||||||
|
24
src/replication/ng/util/util.go
Normal file
24
src/replication/ng/util/util.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Match returns whether the str matches the pattern
|
||||||
|
func Match(pattern, str string) (bool, error) {
|
||||||
|
return filepath.Match(pattern, str)
|
||||||
|
}
|
77
src/replication/ng/util/util_test.go
Normal file
77
src/replication/ng/util/util_test.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMatch(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
pattern string
|
||||||
|
str string
|
||||||
|
match bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
pattern: "",
|
||||||
|
str: "",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "1.*",
|
||||||
|
str: "1.0",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "1.*",
|
||||||
|
str: "1.01",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "1.?",
|
||||||
|
str: "1.0",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "1.?",
|
||||||
|
str: "1.01",
|
||||||
|
match: false,
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
pattern: "library/*",
|
||||||
|
str: "library/hello-world",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "lib*",
|
||||||
|
str: "library/hello-world",
|
||||||
|
match: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pattern: "lib*/*",
|
||||||
|
str: "library/hello-world",
|
||||||
|
match: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
match, err := Match(c.pattern, c.str)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, c.match, match)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user