Update the replication API

1. Add getting execution by ID API
2. Return registry detail info in listing policies API

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-03-30 18:27:47 +08:00
parent 1491cf1846
commit 71b706e60a
22 changed files with 602 additions and 327 deletions

View File

@ -1624,6 +1624,36 @@ paths:
'500':
description: Unexpected internal errors.
/replication/executions/{id}:
get:
summary: Get the execution of the replication.
description: |
This endpoint is for user to get one execution of the replication.
parameters:
- name: id
in: path
type: integer
format: int64
description: The execution ID.
required: true
tags:
- Products
responses:
'200':
description: Success.
schema:
$ref: '#/definitions/ReplicationExecution'
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'415':
$ref: '#/responses/UnsupportedMediaType'
'500':
description: Unexpected internal errors.
put:
summary: Stop the execution of the replication.
description: |
@ -4109,19 +4139,17 @@ definitions:
description:
type: string
description: The description of the policy.
src_registry_id:
type: integer
format: int64
description: The source registry ID.
src_registry:
description: The source registry.
$ref: '#/definitions/Registry'
src_namespaces:
type: array
description: The source namespaces
items:
type: string
dest_registry_id:
type: integer
format: int64
description: The destination registry ID.
dest_registry:
description: The destination registry.
$ref: '#/definitions/Registry'
dest_namespace:
type: string
description: The destination namespace.

View File

@ -155,7 +155,7 @@ func init() {
beego.Router("/api/replication/adapters", &ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/executions", &ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &ReplicationOperationAPI{}, "put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &ReplicationOperationAPI{}, "get:GetExecution;put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &ReplicationOperationAPI{}, "get:GetTaskLog")

View File

@ -19,6 +19,8 @@ import (
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
)
@ -113,6 +115,11 @@ func (r *ReplicationOperationAPI) CreateExecution() {
return
}
if err = event.PopulateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", execution.PolicyID, err))
return
}
executionID, err := ng.OperationCtl.StartReplication(policy, nil)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
@ -121,6 +128,26 @@ func (r *ReplicationOperationAPI) CreateExecution() {
r.Redirect(http.StatusCreated, strconv.FormatInt(executionID, 10))
}
// GetExecution gets one execution of the replication
func (r *ReplicationOperationAPI) GetExecution() {
executionID, err := r.GetInt64FromPath(":id")
if err != nil || executionID <= 0 {
r.HandleBadRequest("invalid execution ID")
return
}
execution, err := ng.OperationCtl.GetExecution(executionID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get execution %d: %v", executionID, err))
return
}
if execution == nil {
r.HandleNotFound(fmt.Sprintf("execution %d not found", executionID))
return
}
r.WriteJSONData(execution)
}
// StopExecution stops one execution of the replication
func (r *ReplicationOperationAPI) StopExecution() {
executionID, err := r.GetInt64FromPath(":id")

View File

@ -84,9 +84,10 @@ func (f *fakedPolicyManager) Get(id int64) (*model.Policy, error) {
if id == 1 {
return &model.Policy{
ID: 1,
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
DestRegistryID: 2,
}, nil
}
return nil, nil
@ -148,12 +149,15 @@ func TestListExecutions(t *testing.T) {
func TestCreateExecution(t *testing.T) {
operationCtl := ng.OperationCtl
policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() {
ng.OperationCtl = operationCtl
ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}()
ng.OperationCtl = &fakedOperationController{}
ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{
// 401
@ -202,6 +206,53 @@ func TestCreateExecution(t *testing.T) {
runCodeCheckingCases(t, cases...)
}
func TestGetExecution(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/2",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 200
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}
func TestStopExecution(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {

View File

@ -19,6 +19,10 @@ import (
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
@ -57,6 +61,12 @@ func (r *ReplicationPolicyAPI) List() {
r.HandleInternalServerError(fmt.Sprintf("failed to list policies: %v", err))
return
}
for _, policy := range policies {
if err = populateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", policy.ID, err))
return
}
}
r.SetPaginationHeader(total, query.Page, query.Size)
r.WriteJSONData(policies)
}
@ -97,9 +107,11 @@ func (r *ReplicationPolicyAPI) validateName(policy *model.Policy) bool {
// make the registry referenced exists
func (r *ReplicationPolicyAPI) validateRegistry(policy *model.Policy) bool {
registryID := policy.SrcRegistryID
if registryID == 0 {
registryID = policy.DestRegistryID
var registryID int64
if policy.SrcRegistry != nil && policy.SrcRegistry.ID > 0 {
registryID = policy.SrcRegistry.ID
} else {
registryID = policy.DestRegistry.ID
}
registry, err := ng.RegistryMgr.Get(registryID)
if err != nil {
@ -130,6 +142,10 @@ func (r *ReplicationPolicyAPI) Get() {
r.HandleNotFound(fmt.Sprintf("policy %d not found", id))
return
}
if err = populateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", policy.ID, err))
return
}
r.WriteJSONData(policy)
}
@ -208,3 +224,17 @@ func (r *ReplicationPolicyAPI) Delete() {
return
}
}
// ignore the credential for the registries
func populateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
if err := event.PopulateRegistries(registryMgr, policy); err != nil {
return err
}
if policy.SrcRegistry != nil {
policy.SrcRegistry.Credential = nil
}
if policy.DestRegistry != nil {
policy.DestRegistry.Credential = nil
}
return nil
}

View File

@ -126,7 +126,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies",
credential: sysAdmin,
bodyJSON: &model.Policy{
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},
@ -153,7 +155,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "policy01",
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
},
},
code: http.StatusBadRequest,
@ -166,7 +170,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "duplicate_name",
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},
@ -180,7 +186,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "policy01",
SrcRegistryID: 2,
SrcRegistry: &model.Registry{
ID: 2,
},
SrcNamespaces: []string{"library"},
},
},
@ -194,7 +202,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "policy01",
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},
@ -207,10 +217,13 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
func TestReplicationPolicyAPIGet(t *testing.T) {
policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() {
ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}()
ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{
// 401
{
@ -296,7 +309,9 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
url: "/api/replication/policies/1",
credential: sysAdmin,
bodyJSON: &model.Policy{
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},
@ -310,7 +325,9 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "duplicate_name",
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},
@ -324,7 +341,9 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "policy01",
SrcRegistryID: 2,
SrcRegistry: &model.Registry{
ID: 2,
},
SrcNamespaces: []string{"library"},
},
},
@ -338,7 +357,9 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
credential: sysAdmin,
bodyJSON: &model.Policy{
Name: "policy01",
SrcRegistryID: 1,
SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
},
},

View File

@ -102,7 +102,7 @@ func initRouters() {
beego.Router("/api/replication/adapters", &api.ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/executions", &api.ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &api.ReplicationOperationAPI{}, "put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &api.ReplicationOperationAPI{}, "get:GetExecution;put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &api.ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &api.ReplicationOperationAPI{}, "get:GetTaskLog")

View File

@ -18,6 +18,9 @@ import (
"errors"
"fmt"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation"
@ -30,15 +33,17 @@ type Handler interface {
}
// NewHandler ...
func NewHandler(policyCtl policy.Controller, opCtl operation.Controller) Handler {
func NewHandler(policyCtl policy.Controller, registryMgr registry.Manager, opCtl operation.Controller) Handler {
return &handler{
policyCtl: policyCtl,
registryMgr: registryMgr,
opCtl: opCtl,
}
}
type handler struct {
policyCtl policy.Controller
registryMgr registry.Manager
opCtl operation.Controller
}
@ -68,6 +73,9 @@ func (h *handler) Handle(event *Event) error {
}
for _, policy := range policies {
if err := PopulateRegistries(h.registryMgr, policy); err != nil {
return err
}
id, err := h.opCtl.StartReplication(policy, event.Resource)
if err != nil {
return err
@ -111,3 +119,51 @@ func (h *handler) getRelatedPolicies(namespace string, replicateDeletion ...bool
}
return result, nil
}
// PopulateRegistries populates the source registry and destination registry properties for policy
func PopulateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
if policy == nil {
return nil
}
registry, err := getRegistry(registryMgr, policy.SrcRegistry)
if err != nil {
return err
}
policy.SrcRegistry = registry
registry, err = getRegistry(registryMgr, policy.DestRegistry)
if err != nil {
return err
}
policy.DestRegistry = registry
return nil
}
func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model.Registry, error) {
if registry == nil || registry.ID == 0 {
return getLocalRegistry(), nil
}
reg, err := registryMgr.Get(registry.ID)
if err != nil {
return nil, err
}
if reg == nil {
return nil, fmt.Errorf("registry %d not found", registry.ID)
}
return reg, nil
}
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
Status: "healthy",
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
)
@ -104,6 +105,33 @@ func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
func (f *fakedPolicyController) Remove(int64) error {
return nil
}
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
return &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
func TestGetRelatedPolicies(t *testing.T) {
handler := &handler{
policyCtl: &fakedPolicyController{},
@ -121,7 +149,10 @@ func TestGetRelatedPolicies(t *testing.T) {
}
func TestHandle(t *testing.T) {
handler := NewHandler(&fakedPolicyController{}, &fakedOperationController{})
config.Config = &config.Configuration{}
handler := NewHandler(&fakedPolicyController{},
&fakedRegistryManager{},
&fakedOperationController{})
// nil event
err := handler.Handle(nil)
require.NotNil(t, err)

View File

@ -42,11 +42,11 @@ type Policy struct {
// TODO consider to remove this property?
Creator string `json:"creator"`
// source
SrcRegistryID int64 `json:"src_registry_id"`
SrcRegistry *Registry `json:"src_registry"`
SrcNamespaces []string `json:"src_namespaces"`
// destination
// TODO rename to DstRegistryID
DestRegistryID int64 `json:"dest_registry_id"`
// TODO rename to DstRegistry
DestRegistry *Registry `json:"dest_registry"`
// Only support two dest namespace modes:
// Put all the src resources to the one single dest namespace
// or keep namespaces same with the source ones (under this case,
@ -73,11 +73,18 @@ func (p *Policy) Valid(v *validation.Validation) {
if len(p.Name) == 0 {
v.SetError("name", "cannot be empty")
}
var srcRegistryID, dstRegistryID int64
if p.SrcRegistry != nil {
srcRegistryID = p.SrcRegistry.ID
}
if p.DestRegistry != nil {
dstRegistryID = p.DestRegistry.ID
}
// one of the source registry and destination registry must be Harbor itself
if p.SrcRegistryID != 0 && p.DestRegistryID != 0 ||
p.SrcRegistryID == 0 && p.DestRegistryID == 0 {
v.SetError("src_registry_id, dest_registry_id", "one of them should be empty and the other one shouldn't be empty")
if srcRegistryID != 0 && dstRegistryID != 0 ||
srcRegistryID == 0 && dstRegistryID == 0 {
v.SetError("src_registry, dest_registry", "one of them should be empty and the other one shouldn't be empty")
}
// source namespaces cannot be empty

View File

@ -43,8 +43,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 1,
DestRegistryID: 2,
SrcRegistry: &Registry{
ID: 1,
},
DestRegistry: &Registry{
ID: 2,
},
},
pass: false,
},
@ -52,8 +56,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{},
},
pass: false,
@ -62,8 +70,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{""},
},
pass: false,
@ -72,8 +84,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
@ -87,8 +103,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
@ -106,8 +126,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
@ -125,8 +149,12 @@ func TestValidOfPolicy(t *testing.T) {
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcRegistry: &Registry{
ID: 0,
},
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{

View File

@ -18,13 +18,14 @@ import (
"fmt"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/flow"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
// Controller handles the replication-related operations: start,
@ -41,49 +42,51 @@ type Controller interface {
}
// NewController returns a controller implementation
func NewController(executionMgr execution.Manager, registrgMgr registry.Manager,
scheduler scheduler.Scheduler) Controller {
return &defaultController{
executionMgr: executionMgr,
registryMgr: registrgMgr,
scheduler: scheduler,
func NewController(js job.Client) Controller {
return &controller{
executionMgr: execution.NewDefaultManager(),
scheduler: scheduler.NewScheduler(js),
flowCtl: flow.NewController(),
}
}
type defaultController struct {
type controller struct {
flowCtl flow.Controller
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
}
func (d *defaultController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
}
id, err := createExecution(d.executionMgr, policy.ID)
id, err := createExecution(c.executionMgr, policy.ID)
if err != nil {
return 0, err
}
flow := d.createFlow(id, policy, resource)
if err = d.flowCtl.Start(flow); err != nil {
// mark the execution as failure and log the error message
// no error will be returned as the execution is created successfully
markExecutionFailure(d.executionMgr, id, err.Error())
flow := c.createFlow(id, policy, resource)
if err = c.flowCtl.Start(flow); err != nil {
// just update the status text, the status will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
StatusText: err.Error(),
}, "StatusText"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
}
return id, nil
}
// create different replication flows according to the input parameters
func (d *defaultController) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy, []*model.Resource{resource})
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, []*model.Resource{resource})
}
// copy only one resource, add extra filters to the policy to make sure
// only the resource will be filtered out
@ -106,30 +109,30 @@ func (d *defaultController) createFlow(executionID int64, policy *model.Policy,
filters = append(filters, policy.Filters...)
policy.Filters = filters
}
return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy)
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy)
}
func (d *defaultController) StopReplication(executionID int64) error {
func (c *controller) StopReplication(executionID int64) error {
// TODO implement the function
return nil
}
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return d.executionMgr.List(query...)
func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return c.executionMgr.List(query...)
}
func (d *defaultController) GetExecution(executionID int64) (*models.Execution, error) {
return d.executionMgr.Get(executionID)
func (c *controller) GetExecution(executionID int64) (*models.Execution, error) {
return c.executionMgr.Get(executionID)
}
func (d *defaultController) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
return d.executionMgr.ListTasks(query...)
func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
return c.executionMgr.ListTasks(query...)
}
func (d *defaultController) GetTask(id int64) (*models.Task, error) {
return d.executionMgr.GetTask(id)
func (c *controller) GetTask(id int64) (*models.Task, error) {
return c.executionMgr.GetTask(id)
}
func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return d.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
}
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID)
func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
return c.executionMgr.GetTaskLog(taskID)
}
// create the execution record in database
@ -145,19 +148,3 @@ func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
return id, nil
}
// mark the execution as failure in database
func markExecutionFailure(mgr execution.Manager, id int64, message string) {
err := mgr.Update(
&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: message,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", id, err)
return
}
log.Debugf("the execution %d is marked as failure: %s", id, message)
}

View File

@ -15,11 +15,15 @@
package operation
import (
"io"
"testing"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/flow"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -82,38 +86,6 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return []byte("message"), nil
}
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
type fakedScheduler struct{}
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
@ -140,12 +112,119 @@ func (f *fakedScheduler) Stop(id string) error {
return nil
}
var ctl = NewController(&fakedExecutionManager{}, &fakedRegistryManager{}, &fakedScheduler{})
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil
}
type fakedAdapter struct{}
func (f *fakedAdapter) Info() (*model.RegistryInfo, error) {
return &model.RegistryInfo{
Type: model.RegistryTypeHarbor,
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
model.ResourceTypeChart,
},
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
}, nil
}
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
return nil, nil
}
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
return nil
}
func (f *fakedAdapter) GetNamespace(ns string) (*model.Namespace, error) {
var namespace *model.Namespace
if ns == "library" {
namespace = &model.Namespace{
Name: "library",
Metadata: map[string]interface{}{
"public": true,
},
}
}
return namespace, nil
}
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}, nil
}
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
}
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
}
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakedAdapter) DeleteManifest(repository, digest string) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
}
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
}
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0"},
},
},
}, nil
}
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
return false, nil
}
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
return nil
}
func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
var ctl = &controller{
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{},
flowCtl: flow.NewController(),
}
func TestStartReplication(t *testing.T) {
err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory)
require.Nil(t, err)
config.Config = &config.Configuration{}
// the resource contains Vtags whose length isn't 1
policy := &model.Policy{}
policy := &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
resource := &model.Resource{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
@ -153,7 +232,7 @@ func TestStartReplication(t *testing.T) {
Vtags: []string{"1.0", "2.0"},
},
}
_, err := ctl.StartReplication(policy, resource)
_, err = ctl.StartReplication(policy, resource)
require.NotNil(t, err)
// replicate resource deletion

View File

@ -22,24 +22,21 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
type copyFlow struct {
executionID int64
policy *model.Policy
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
}
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry
func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy) Flow {
func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
executionID int64, policy *model.Policy) Flow {
return &copyFlow{
executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler,
executionID: executionID,
policy: policy,
@ -47,7 +44,7 @@ func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager,
}
func (c *copyFlow) Run(interface{}) error {
_, dstRegistry, srcAdapter, dstAdapter, err := initialize(c.registryMgr, c.policy)
srcAdapter, dstAdapter, err := initialize(c.policy)
if err != nil {
return err
}
@ -67,7 +64,7 @@ func (c *copyFlow) Run(interface{}) error {
if err = createNamespaces(dstAdapter, dstNamespaces); err != nil {
return err
}
dstResources := assembleDestinationResources(srcResources, dstRegistry, c.policy.DestNamespace, c.policy.Override)
dstResources := assembleDestinationResources(srcResources, c.policy.DestRegistry, c.policy.DestNamespace, c.policy.Override)
items, err := preprocess(c.scheduler, srcResources, dstResources)
if err != nil {
return err

View File

@ -20,9 +20,15 @@ import (
func TestRunOfCopyFlow(t *testing.T) {
scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{}
flow := NewCopyFlow(executionMgr, registryMgr, scheduler, 1, policy)
policy := &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
flow := NewCopyFlow(executionMgr, scheduler, 1, policy)
err := flow.Run(nil)
require.Nil(t, err)
}

View File

@ -19,26 +19,22 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
type deletionFlow struct {
executionID int64
policy *model.Policy
executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler
resources []*model.Resource
}
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
// on the destination registry
func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manager,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy,
resources []*model.Resource) Flow {
func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
executionID int64, policy *model.Policy, resources []*model.Resource) Flow {
return &deletionFlow{
executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler,
executionID: executionID,
policy: policy,
@ -47,13 +43,9 @@ func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manage
}
func (d *deletionFlow) Run(interface{}) error {
srcRegistry, dstRegistry, _, _, err := initialize(d.registryMgr, d.policy)
if err != nil {
return err
}
// filling the registry information
for _, resource := range d.resources {
resource.Registry = srcRegistry
resource.Registry = d.policy.SrcRegistry
}
srcResources, err := filterResources(d.resources, d.policy.Filters)
if err != nil {
@ -64,7 +56,7 @@ func (d *deletionFlow) Run(interface{}) error {
log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID)
return nil
}
dstResources := assembleDestinationResources(srcResources, dstRegistry, d.policy.DestNamespace, d.policy.Override)
dstResources := assembleDestinationResources(srcResources, d.policy.DestRegistry, d.policy.DestNamespace, d.policy.Override)
items, err := preprocess(d.scheduler, srcResources, dstResources)
if err != nil {
return err

View File

@ -24,10 +24,16 @@ import (
func TestRunOfDeletionFlow(t *testing.T) {
scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{}
policy := &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
resources := []*model.Resource{}
flow := NewDeletionFlow(executionMgr, registryMgr, scheduler, 1, policy, resources)
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources)
err := flow.Run(nil)
require.Nil(t, err)
}

View File

@ -22,68 +22,39 @@ import (
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/util"
)
// get/create the source registry, destination registry, source adapter and destination adapter
func initialize(mgr registry.Manager, policy *model.Policy) (*model.Registry, *model.Registry, adp.Adapter, adp.Adapter, error) {
var srcRegistry, dstRegistry *model.Registry
func initialize(policy *model.Policy) (adp.Adapter, adp.Adapter, error) {
var srcAdapter, dstAdapter adp.Adapter
var err error
registry := getLocalRegistry()
// get source registry
if policy.SrcRegistryID != 0 {
srcRegistry, err = mgr.Get(policy.SrcRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
}
if srcRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
}
} else {
srcRegistry = registry
}
// get destination registry
if policy.DestRegistryID != 0 {
dstRegistry, err = mgr.Get(policy.DestRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
}
if dstRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
}
} else {
dstRegistry = registry
}
// create the source registry adapter
srcFactory, err := adp.GetFactory(srcRegistry.Type)
srcFactory, err := adp.GetFactory(policy.SrcRegistry.Type)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err)
return nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", policy.SrcRegistry.Type, err)
}
srcAdapter, err = srcFactory(srcRegistry)
srcAdapter, err = srcFactory(policy.SrcRegistry)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err)
return nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", policy.SrcRegistry.URL, err)
}
// create the destination registry adapter
dstFactory, err := adp.GetFactory(dstRegistry.Type)
dstFactory, err := adp.GetFactory(policy.DestRegistry.Type)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err)
return nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", policy.DestRegistry.Type, err)
}
dstAdapter, err = dstFactory(dstRegistry)
dstAdapter, err = dstFactory(policy.DestRegistry)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err)
return nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", policy.DestRegistry.URL, err)
}
log.Debug("replication flow initialization completed")
return srcRegistry, dstRegistry, srcAdapter, dstAdapter, nil
return srcAdapter, dstAdapter, nil
}
// fetch resources from the source registry
@ -367,18 +338,3 @@ func getResourceName(res *model.Resource) string {
}
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
}
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
}

View File

@ -30,37 +30,6 @@ import (
"github.com/stretchr/testify/require"
)
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil
}
@ -241,19 +210,6 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func TestInitialize(t *testing.T) {
url := "https://registry.harbor.local"
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{
SrcRegistryID: 0,
DestRegistryID: 1,
}
srcRegistry, dstRegistry, _, _, err := initialize(registryMgr, policy)
require.Nil(t, err)
assert.Equal(t, url, srcRegistry.URL)
assert.Equal(t, int64(1), dstRegistry.ID)
}
func TestFetchResources(t *testing.T) {
adapter := &fakedAdapter{}
policy := &model.Policy{}

View File

@ -38,8 +38,6 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (*model.Policy, e
Name: policy.Name,
Description: policy.Description,
Creator: policy.Creator,
SrcRegistryID: policy.SrcRegistryID,
DestRegistryID: policy.DestRegistryID,
DestNamespace: policy.DestNamespace,
Deletion: policy.ReplicateDeletion,
Override: policy.Override,
@ -47,6 +45,16 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (*model.Policy, e
CreationTime: policy.CreationTime,
UpdateTime: policy.UpdateTime,
}
if policy.SrcRegistryID > 0 {
ply.SrcRegistry = &model.Registry{
ID: policy.SrcRegistryID,
}
}
if policy.DestRegistryID > 0 {
ply.DestRegistry = &model.Registry{
ID: policy.DestRegistryID,
}
}
// 1. parse SrcNamespaces to array
if len(policy.SrcNamespaces) > 0 {
@ -84,9 +92,7 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err
Name: policy.Name,
Description: policy.Description,
Creator: policy.Creator,
SrcRegistryID: policy.SrcRegistryID,
SrcNamespaces: strings.Join(policy.SrcNamespaces, ","),
DestRegistryID: policy.DestRegistryID,
DestNamespace: policy.DestNamespace,
Override: policy.Override,
Enabled: policy.Enabled,
@ -94,6 +100,12 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err
CreationTime: policy.CreationTime,
UpdateTime: time.Now(),
}
if policy.SrcRegistry != nil {
ply.SrcRegistryID = policy.SrcRegistry.ID
}
if policy.DestRegistry != nil {
ply.DestRegistryID = policy.DestRegistry.ID
}
if policy.Trigger != nil {
trigger, err := json.Marshal(policy.Trigger)

View File

@ -67,9 +67,13 @@ func Test_convertFromPersistModel(t *testing.T) {
Name: "Policy Test",
Description: "Policy Description",
Creator: "someone",
SrcRegistryID: 123,
SrcRegistry: &model.Registry{
ID: 123,
},
SrcNamespaces: []string{"ns1", "ns2", "ns3"},
DestRegistryID: 456,
DestRegistry: &model.Registry{
ID: 456,
},
DestNamespace: "target_ns",
Deletion: true,
Override: true,
@ -98,9 +102,9 @@ func Test_convertFromPersistModel(t *testing.T) {
assert.Equal(t, tt.want.Name, got.Name)
assert.Equal(t, tt.want.Description, got.Description)
assert.Equal(t, tt.want.Creator, got.Creator)
assert.Equal(t, tt.want.SrcRegistryID, got.SrcRegistryID)
assert.Equal(t, tt.want.SrcRegistry.ID, got.SrcRegistry.ID)
assert.Equal(t, tt.want.SrcNamespaces, got.SrcNamespaces)
assert.Equal(t, tt.want.DestRegistryID, got.DestRegistryID)
assert.Equal(t, tt.want.DestRegistry.ID, got.DestRegistry.ID)
assert.Equal(t, tt.want.DestNamespace, got.DestNamespace)
assert.Equal(t, tt.want.Deletion, got.Deletion)
assert.Equal(t, tt.want.Override, got.Override)
@ -126,9 +130,13 @@ func Test_convertToPersistModel(t *testing.T) {
Name: "Policy Test",
Description: "Policy Description",
Creator: "someone",
SrcRegistryID: 123,
SrcRegistry: &model.Registry{
ID: 123,
},
SrcNamespaces: []string{"ns1", "ns2", "ns3"},
DestRegistryID: 456,
DestRegistry: &model.Registry{
ID: 456,
},
DestNamespace: "target_ns",
Deletion: true,
Override: true,

View File

@ -23,8 +23,6 @@ import (
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
task_scheduler "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/policy/controller"
"github.com/goharbor/harbor/src/replication/ng/registry"
@ -70,10 +68,9 @@ func Init() error {
// init policy controller
PolicyCtl = controller.NewController(js)
// init operation controller
OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr,
task_scheduler.NewScheduler(js))
OperationCtl = operation.NewController(js)
// init event handler
EventHandler = event.NewHandler(PolicyCtl, OperationCtl)
EventHandler = event.NewHandler(PolicyCtl, RegistryMgr, OperationCtl)
log.Debug("the replication initialization completed")
return nil
}