mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-27 01:51:25 +01:00
Merge pull request #6858 from ywk253100/190123_flow_controller
Implement the replication flow controller
This commit is contained in:
commit
62bc7cdd58
80
src/replication/ng/adapter/adapter.go
Normal file
80
src/replication/ng/adapter/adapter.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package adapter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
var registry = map[model.RegistryType]Factory{}
|
||||
|
||||
// Factory creates a specific Adapter according to the params
|
||||
type Factory func(*model.Registry) (Adapter, error)
|
||||
|
||||
// Info provides base info and capability declarations of the adapter
|
||||
type Info struct {
|
||||
Name model.RegistryType `json:"name"`
|
||||
Description string `json:"description"`
|
||||
SupportedResourceTypes []model.ResourceType `json:"supported_resource_types"`
|
||||
SupportedResourceFilters []model.FilterType `json:"supported_resource_filters"`
|
||||
}
|
||||
|
||||
// Adapter interface defines the capabilities of registry
|
||||
type Adapter interface {
|
||||
// Info return the information of this adapter
|
||||
Info() *Info
|
||||
// Lists the available namespaces under the specified registry with the
|
||||
// provided credential/token
|
||||
ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error)
|
||||
// Create a new namespace
|
||||
// This method should guarantee it's idempotent
|
||||
// And returns nil if a namespace with the same name already exists
|
||||
CreateNamespace(*model.Namespace) error
|
||||
// Get the namespace specified by the name, the returning value should
|
||||
// contain the metadata about the namespace if it has
|
||||
GetNamespace(string) (*model.Namespace, error)
|
||||
// Fetch the content resource under the namespace by filters
|
||||
// SUGGESTION: Adapter provider can do their own filter based on the filter pattern
|
||||
// or call the default `DoFilter` function of the filter to complete resource filtering.
|
||||
FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error)
|
||||
}
|
||||
|
||||
// RegisterFactory registers one adapter factory to the registry
|
||||
func RegisterFactory(name model.RegistryType, factory Factory) error {
|
||||
if !name.Valid() {
|
||||
return errors.New("invalid adapter factory name")
|
||||
}
|
||||
if factory == nil {
|
||||
return errors.New("empty adapter factory")
|
||||
}
|
||||
if _, exist := registry[name]; exist {
|
||||
return fmt.Errorf("adapter factory for %s already exists", name)
|
||||
}
|
||||
registry[name] = factory
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetFactory gets the adapter factory by the specified name
|
||||
func GetFactory(name model.RegistryType) (Factory, error) {
|
||||
factory, exist := registry[name]
|
||||
if !exist {
|
||||
return nil, fmt.Errorf("adapter factory for %s not found", name)
|
||||
}
|
||||
|
||||
return factory, nil
|
||||
}
|
49
src/replication/ng/adapter/adapter_test.go
Normal file
49
src/replication/ng/adapter/adapter_test.go
Normal file
@ -0,0 +1,49 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package adapter
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func fakedFactory(*model.Registry) (Adapter, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestRegisterFactory(t *testing.T) {
|
||||
// empty name
|
||||
assert.NotNil(t, RegisterFactory("", nil))
|
||||
// empty factory
|
||||
assert.NotNil(t, RegisterFactory("factory", nil))
|
||||
// pass
|
||||
assert.Nil(t, RegisterFactory("factory", fakedFactory))
|
||||
// already exists
|
||||
assert.NotNil(t, RegisterFactory("factory", fakedFactory))
|
||||
}
|
||||
|
||||
func TestGetFactory(t *testing.T) {
|
||||
registry = map[model.RegistryType]Factory{}
|
||||
require.Nil(t, RegisterFactory("factory", fakedFactory))
|
||||
// doesn't exist
|
||||
_, err := GetFactory("another_factory")
|
||||
assert.NotNil(t, err)
|
||||
// pass
|
||||
_, err = GetFactory("factory")
|
||||
assert.Nil(t, err)
|
||||
}
|
51
src/replication/ng/execution/execution.go
Normal file
51
src/replication/ng/execution/execution.go
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package execution
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
// Manager manages the executions
|
||||
type Manager interface {
|
||||
// Create a new execution
|
||||
Create(*model.Execution) (int64, error)
|
||||
// List the summaries of executions
|
||||
List(*model.ExecutionQuery) (int64, []*model.Execution, error)
|
||||
// Get the specified execution
|
||||
Get(int64) (*model.Execution, error)
|
||||
// Update the data of the specified execution, the "props" are the
|
||||
// properties of execution that need to be updated
|
||||
Update(execution *model.Execution, props ...string) error
|
||||
// Remove the execution specified by the ID
|
||||
Remove(int64) error
|
||||
// Remove all executions of one policy specified by the policy ID
|
||||
RemoveAll(int64) error
|
||||
// Create a task
|
||||
CreateTask(*model.Task) (int64, error)
|
||||
// List the tasks according to the query
|
||||
ListTasks(*model.TaskQuery) (int64, []*model.Task, error)
|
||||
// Get one specified task
|
||||
GetTask(int64) (*model.Task, error)
|
||||
// Update the task, the "props" are the properties of task
|
||||
// that need to be updated
|
||||
UpdateTask(task *model.Task, props ...string) error
|
||||
// Remove one task specified by task ID
|
||||
RemoveTask(int64) error
|
||||
// Remove all tasks of one execution specified by the execution ID
|
||||
RemoveAllTasks(int64) error
|
||||
// Get the log of one specific task
|
||||
GetTaskLog(int64) ([]byte, error)
|
||||
}
|
99
src/replication/ng/flow/controller.go
Normal file
99
src/replication/ng/flow/controller.go
Normal file
@ -0,0 +1,99 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||
)
|
||||
|
||||
// Controller controls the replication flow
|
||||
type Controller interface {
|
||||
// Start a replication according to the policy and returns the
|
||||
// execution ID and error
|
||||
StartReplication(policy *model.Policy) (int64, error)
|
||||
// Stop the replication specified by the execution ID
|
||||
StopReplication(int64) error
|
||||
}
|
||||
|
||||
// NewController returns an instance of a Controller
|
||||
func NewController(registryMgr registry.Manager,
|
||||
executionMgr execution.Manager, scheduler scheduler.Scheduler) (Controller, error) {
|
||||
if registryMgr == nil || executionMgr == nil || scheduler == nil {
|
||||
return nil, errors.New("invalid params")
|
||||
}
|
||||
return &defaultController{
|
||||
registryMgr: registryMgr,
|
||||
executionMgr: executionMgr,
|
||||
scheduler: scheduler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// defaultController is the default implement for the Controller
|
||||
type defaultController struct {
|
||||
registryMgr registry.Manager
|
||||
executionMgr execution.Manager
|
||||
scheduler scheduler.Scheduler
|
||||
}
|
||||
|
||||
// Replicate according the to policy ID
|
||||
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
|
||||
log.Infof("starting the replication based on the policy %d ...", policy.ID)
|
||||
|
||||
flow, err := newFlow(policy, d.registryMgr, d.executionMgr, d.scheduler)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create the flow object based on policy %d: %v", policy.ID, err)
|
||||
}
|
||||
|
||||
// create the execution record
|
||||
id, err := flow.createExecution()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to create the execution record for replication based on policy %d: %v", policy.ID, err)
|
||||
}
|
||||
|
||||
// fetch resources from the source registry
|
||||
if err := flow.fetchResources(); err != nil {
|
||||
// just log the error message and return the execution ID
|
||||
log.Errorf("failed to fetch resources for the execution %d: %v", id, err)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// create the namespace on the destination registry
|
||||
if err = flow.createNamespace(); err != nil {
|
||||
log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// schedule the replication
|
||||
if err = flow.schedule(); err != nil {
|
||||
log.Errorf("failed to schedule the execution %d: %v", id, err)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
log.Infof("the execution %d scheduled", id)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (d *defaultController) StopReplication(id int64) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
183
src/replication/ng/flow/controller_test.go
Normal file
183
src/replication/ng/flow/controller_test.go
Normal file
@ -0,0 +1,183 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type fakedPolicyManager struct{}
|
||||
|
||||
func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedPolicyManager) Get(int64) (*model.Policy, error) {
|
||||
return &model.Policy{
|
||||
ID: 1,
|
||||
SrcRegistryID: 1,
|
||||
SrcNamespaces: []string{"library"},
|
||||
DestRegistryID: 2,
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedPolicyManager) Update(*model.Policy) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedPolicyManager) Remove(int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakedRegistryManager struct{}
|
||||
|
||||
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
|
||||
if id == 1 {
|
||||
return &model.Registry{
|
||||
Type: "faked_registry",
|
||||
}, nil
|
||||
}
|
||||
if id == 2 {
|
||||
return &model.Registry{
|
||||
Type: "faked_registry",
|
||||
}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedRegistryManager) Remove(int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakedExecutionManager struct{}
|
||||
|
||||
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
|
||||
return 1, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) List(*model.ExecutionQuery) (int64, []*model.Execution, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Update(*model.Execution, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) Remove(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveAll(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) ListTasks(*model.TaskQuery) (int64, []*model.Task, error) {
|
||||
return 0, nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveTask(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type fakedScheduler struct{}
|
||||
|
||||
func (f *fakedScheduler) Schedule(src []*model.Resource, dst []*model.Resource) ([]*model.Task, error) {
|
||||
return []*model.Task{
|
||||
{
|
||||
Status: model.TaskStatusPending,
|
||||
JobID: "uuid",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
func (f *fakedScheduler) Stop(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
|
||||
return &fakedAdapter{}, nil
|
||||
}
|
||||
|
||||
type fakedAdapter struct{}
|
||||
|
||||
func (f *fakedAdapter) Info() *adapter.Info {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) {
|
||||
return &model.Namespace{}, nil
|
||||
}
|
||||
func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
|
||||
return []*model.Resource{
|
||||
{
|
||||
Type: model.ResourceTypeRepository,
|
||||
Metadata: &model.ResourceMetadata{
|
||||
Name: "hello-world",
|
||||
Namespace: "library",
|
||||
Vtags: []string{"latest"},
|
||||
},
|
||||
Override: false,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestStartReplication(t *testing.T) {
|
||||
err := adapter.RegisterFactory("faked_registry", fakedAdapterFactory)
|
||||
require.Nil(t, err)
|
||||
|
||||
controller, _ := NewController(
|
||||
&fakedRegistryManager{},
|
||||
&fakedExecutionManager{},
|
||||
&fakedScheduler{})
|
||||
|
||||
policy := &model.Policy{
|
||||
ID: 1,
|
||||
SrcRegistryID: 1,
|
||||
DestRegistryID: 2,
|
||||
DestNamespace: "library",
|
||||
}
|
||||
id, err := controller.StartReplication(policy)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, id, int64(1))
|
||||
}
|
206
src/replication/ng/flow/flow.go
Normal file
206
src/replication/ng/flow/flow.go
Normal file
@ -0,0 +1,206 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package flow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/scheduler"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
"github.com/goharbor/harbor/src/replication/ng/registry"
|
||||
)
|
||||
|
||||
type flow struct {
|
||||
policy *model.Policy
|
||||
srcRegistry *model.Registry
|
||||
dstRegistry *model.Registry
|
||||
srcAdapter adapter.Adapter
|
||||
dstAdapter adapter.Adapter
|
||||
executionID int64
|
||||
resources []*model.Resource
|
||||
executionMgr execution.Manager
|
||||
scheduler scheduler.Scheduler
|
||||
}
|
||||
|
||||
func newFlow(policy *model.Policy, registryMgr registry.Manager,
|
||||
executionMgr execution.Manager, scheduler scheduler.Scheduler) (*flow, error) {
|
||||
|
||||
f := &flow{
|
||||
policy: policy,
|
||||
executionMgr: executionMgr,
|
||||
scheduler: scheduler,
|
||||
}
|
||||
|
||||
// get source registry
|
||||
srcRegistry, err := registryMgr.Get(policy.SrcRegistryID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
|
||||
}
|
||||
if srcRegistry == nil {
|
||||
return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
|
||||
}
|
||||
f.srcRegistry = srcRegistry
|
||||
|
||||
// get destination registry
|
||||
dstRegistry, err := registryMgr.Get(policy.DestRegistryID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
|
||||
}
|
||||
if dstRegistry == nil {
|
||||
return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
|
||||
}
|
||||
f.dstRegistry = dstRegistry
|
||||
|
||||
// create the source registry adapter
|
||||
srcFactory, err := adapter.GetFactory(srcRegistry.Type)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err)
|
||||
}
|
||||
srcAdapter, err := srcFactory(srcRegistry)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err)
|
||||
}
|
||||
f.srcAdapter = srcAdapter
|
||||
|
||||
// create the destination registry adapter
|
||||
dstFactory, err := adapter.GetFactory(dstRegistry.Type)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err)
|
||||
}
|
||||
dstAdapter, err := dstFactory(dstRegistry)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err)
|
||||
}
|
||||
f.dstAdapter = dstAdapter
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (f *flow) createExecution() (int64, error) {
|
||||
id, err := f.executionMgr.Create(&model.Execution{
|
||||
PolicyID: f.policy.ID,
|
||||
Status: model.ExecutionStatusInProgress,
|
||||
StartTime: time.Now(),
|
||||
})
|
||||
f.executionID = id
|
||||
log.Debugf("an execution record for replication based on the policy %d created: %d", f.policy.ID, id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (f *flow) fetchResources() error {
|
||||
resources, err := f.srcAdapter.FetchResources(f.policy.SrcNamespaces, f.policy.Filters)
|
||||
f.resources = resources
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) createNamespace() error {
|
||||
// merge the metadata of all source namespaces
|
||||
metadata := map[string]interface{}{}
|
||||
for _, srcNamespace := range f.policy.SrcNamespaces {
|
||||
namespace, err := f.srcAdapter.GetNamespace(srcNamespace)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
for key, value := range namespace.Metadata {
|
||||
metadata[namespace.Name+":"+key] = value
|
||||
}
|
||||
}
|
||||
|
||||
if err := f.dstAdapter.CreateNamespace(&model.Namespace{
|
||||
Name: f.policy.DestNamespace,
|
||||
Metadata: metadata,
|
||||
}); err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) schedule() error {
|
||||
dstResources := []*model.Resource{}
|
||||
for _, srcResource := range f.resources {
|
||||
dstResource := &model.Resource{
|
||||
Type: srcResource.Type,
|
||||
Metadata: &model.ResourceMetadata{
|
||||
Name: srcResource.Metadata.Name,
|
||||
Namespace: f.policy.DestNamespace,
|
||||
Vtags: srcResource.Metadata.Vtags,
|
||||
},
|
||||
Registry: f.dstRegistry,
|
||||
ExtendedInfo: srcResource.ExtendedInfo,
|
||||
Deleted: srcResource.Deleted,
|
||||
Override: f.policy.Override,
|
||||
}
|
||||
dstResources = append(dstResources, dstResource)
|
||||
}
|
||||
|
||||
tasks, err := f.scheduler.Schedule(f.resources, dstResources)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
|
||||
allFailed := true
|
||||
for _, task := range tasks {
|
||||
if task.Status != model.TaskStatusFailed {
|
||||
allFailed = false
|
||||
}
|
||||
task.ExecutionID = f.executionID
|
||||
taskID, err := f.executionMgr.CreateTask(task)
|
||||
if err != nil {
|
||||
f.markExecutionFailure(err)
|
||||
return err
|
||||
}
|
||||
log.Debugf("task record %d for execution %d created", taskID, f.executionID)
|
||||
}
|
||||
// if all the tasks are failed, mark the execution failed
|
||||
if allFailed {
|
||||
f.markExecutionFailure(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *flow) markExecutionFailure(err error) {
|
||||
statusText := ""
|
||||
if err != nil {
|
||||
statusText = err.Error()
|
||||
}
|
||||
err = f.executionMgr.Update(
|
||||
&model.Execution{
|
||||
ID: f.executionID,
|
||||
Status: model.ExecutionStatusFailed,
|
||||
StatusText: statusText,
|
||||
EndTime: time.Now(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
||||
}
|
||||
}
|
@ -14,17 +14,43 @@
|
||||
|
||||
package model
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
)
|
||||
|
||||
// execution/task status/trigger const
|
||||
const (
|
||||
ExecutionStatusFailed string = "Failed"
|
||||
ExecutionStatusSucceed string = "Succeed"
|
||||
ExecutionStatusStopped string = "Stopped"
|
||||
ExecutionStatusInProgress string = "InProgress"
|
||||
|
||||
ExecutionTriggerManual string = "Manual"
|
||||
ExecutionTriggerEvent string = "Event"
|
||||
ExecutionTriggerSchedule string = "Schedule"
|
||||
|
||||
TaskStatusFailed string = "Failed"
|
||||
TaskStatusSucceed string = "Succeed"
|
||||
TaskStatusStopped string = "Stopped"
|
||||
TaskStatusInProgress string = "InProgress"
|
||||
TaskStatusPending string = "Pending"
|
||||
)
|
||||
|
||||
// Execution defines an execution of the replication
|
||||
type Execution struct {
|
||||
ID int64 `json:"id"`
|
||||
PolicyID int64 `json:"policy_id"`
|
||||
Status string `json:"status"`
|
||||
StatusText string `json:"status_text"`
|
||||
Trigger string `json:"trigger"`
|
||||
Total int `json:"total"`
|
||||
Failed int `json:"failed"`
|
||||
Succeed int `json:"succeed"`
|
||||
Pending int `json:"pending"`
|
||||
InProgress int `json:"in_progress"`
|
||||
Stopped int `json:"stopped"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
@ -41,3 +67,19 @@ type Task struct {
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
}
|
||||
|
||||
// ExecutionQuery defines the query conditions for listing executions
|
||||
type ExecutionQuery struct {
|
||||
PolicyID int64
|
||||
Status string
|
||||
Trigger string
|
||||
models.Pagination
|
||||
}
|
||||
|
||||
// TaskQuery defines the query conditions for listing tasks
|
||||
type TaskQuery struct {
|
||||
ExecutionID int64
|
||||
ResourceType ResourceType
|
||||
Status string
|
||||
models.Pagination
|
||||
}
|
||||
|
@ -21,3 +21,8 @@ type Namespace struct {
|
||||
Name string `json:"name"`
|
||||
Metadata map[string]interface{} `json:"metadata"`
|
||||
}
|
||||
|
||||
// NamespaceQuery defines the query condition for listing namespaces
|
||||
type NamespaceQuery struct {
|
||||
Name string
|
||||
}
|
||||
|
@ -14,7 +14,11 @@
|
||||
|
||||
package model
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
)
|
||||
|
||||
// Policy defines the structure of a replication policy
|
||||
type Policy struct {
|
||||
@ -68,3 +72,12 @@ type Trigger struct {
|
||||
type TriggerSettings struct {
|
||||
Cron string `json:"cron"`
|
||||
}
|
||||
|
||||
// PolicyQuery defines the query conditions for listing policies
|
||||
type PolicyQuery struct {
|
||||
Name string
|
||||
// TODO: need to consider how to support listing the policies
|
||||
// of one namespace in both pull and push modes
|
||||
Namespace string
|
||||
models.Pagination
|
||||
}
|
||||
|
@ -14,9 +14,18 @@
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
)
|
||||
|
||||
// RegistryType indicates the type of registry
|
||||
type RegistryType string
|
||||
|
||||
// Valid indicates whether the RegistryType is a valid value
|
||||
func (r RegistryType) Valid() bool {
|
||||
return len(r) > 0
|
||||
}
|
||||
|
||||
// CredentialType represents the supported credential types
|
||||
// e.g: u/p, OAuth token
|
||||
type CredentialType string
|
||||
@ -44,3 +53,9 @@ type Registry struct {
|
||||
Insecure bool `json:"insecure"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// RegistryQuery defines the query conditions for listing registries
|
||||
type RegistryQuery struct {
|
||||
Name string
|
||||
models.Pagination
|
||||
}
|
||||
|
@ -23,12 +23,17 @@ const (
|
||||
// ResourceType represents the type of the resource
|
||||
type ResourceType string
|
||||
|
||||
// Valid indicates whether the ResourceType is a valid value
|
||||
func (r ResourceType) Valid() bool {
|
||||
return len(r) > 0
|
||||
}
|
||||
|
||||
// ResourceMetadata of resource
|
||||
type ResourceMetadata struct {
|
||||
Namespace *Namespace `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Vtags []string `json:"v_tags"`
|
||||
Labels []string `json:"labels"`
|
||||
Namespace string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Vtags []string `json:"v_tags"`
|
||||
Labels []string `json:"labels"`
|
||||
}
|
||||
|
||||
// Resource represents the general replicating content
|
||||
@ -40,4 +45,6 @@ type Resource struct {
|
||||
ExtendedInfo map[string]interface{} `json:"extended_info"`
|
||||
// Indicate if the resource is a deleted resource
|
||||
Deleted bool `json:"deleted"`
|
||||
// indicate whether the resource can be overridden
|
||||
Override bool `json:"override"`
|
||||
}
|
||||
|
34
src/replication/ng/policy/manager.go
Normal file
34
src/replication/ng/policy/manager.go
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package policy
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
// Manager manages replication policies
|
||||
type Manager interface {
|
||||
// Create new policy
|
||||
Create(*model.Policy) (int64, error)
|
||||
// List the policies, returns the total count, policy list and error
|
||||
List(...*model.PolicyQuery) (int64, []*model.Policy, error)
|
||||
// Get policy with specified ID
|
||||
Get(int64) (*model.Policy, error)
|
||||
// Update the specified policy, the "props" are the properties of policy
|
||||
// that need to be updated
|
||||
Update(policy *model.Policy, props ...string) error
|
||||
// Remove the specified policy
|
||||
Remove(int64) error
|
||||
}
|
34
src/replication/ng/registry/manager.go
Normal file
34
src/replication/ng/registry/manager.go
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package registry
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
// Manager manages registries
|
||||
type Manager interface {
|
||||
// Add new registry
|
||||
Add(*model.Registry) (int64, error)
|
||||
// List registries, returns total count, registry list and error
|
||||
List(...*model.RegistryQuery) (int64, []*model.Registry, error)
|
||||
// Get the specified registry
|
||||
Get(int64) (*model.Registry, error)
|
||||
// Update the registry, the "props" are the properties of registry
|
||||
// that need to be updated
|
||||
Update(registry *model.Registry, props ...string) error
|
||||
// Remove the registry with the specified ID
|
||||
Remove(int64) error
|
||||
}
|
27
src/replication/ng/scheduler/scheduler.go
Normal file
27
src/replication/ng/scheduler/scheduler.go
Normal file
@ -0,0 +1,27 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||
)
|
||||
|
||||
// Scheduler schedules tasks to transfer resource data
|
||||
type Scheduler interface {
|
||||
// Schedule tasks for one execution
|
||||
Schedule([]*model.Resource, []*model.Resource) ([]*model.Task, error)
|
||||
// Stop the task specified by ID
|
||||
Stop(id string) error
|
||||
}
|
@ -63,14 +63,14 @@ type CancelFunc func() bool
|
||||
|
||||
// RegisterFactory registers one transfer factory to the registry
|
||||
func RegisterFactory(name model.ResourceType, factory Factory) error {
|
||||
if len(name) == 0 {
|
||||
return errors.New("empty transfer name")
|
||||
if !name.Valid() {
|
||||
return errors.New("invalid resource transfer factory name")
|
||||
}
|
||||
if factory == nil {
|
||||
return errors.New("empty transfer factory")
|
||||
return errors.New("empty resource transfer factory")
|
||||
}
|
||||
if _, exist := registry[name]; exist {
|
||||
return fmt.Errorf("transfer factory for %s already exists", name)
|
||||
return fmt.Errorf("resource transfer factory for %s already exists", name)
|
||||
}
|
||||
|
||||
registry[name] = factory
|
||||
|
Loading…
Reference in New Issue
Block a user