Merge pull request #7254 from ywk253100/190329_api

Update the replication API
This commit is contained in:
Wenkai Yin 2019-03-30 21:28:38 +08:00 committed by GitHub
commit 82e02fc734
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 602 additions and 327 deletions

View File

@ -1624,6 +1624,36 @@ paths:
'500': '500':
description: Unexpected internal errors. description: Unexpected internal errors.
/replication/executions/{id}: /replication/executions/{id}:
get:
summary: Get the execution of the replication.
description: |
This endpoint is for user to get one execution of the replication.
parameters:
- name: id
in: path
type: integer
format: int64
description: The execution ID.
required: true
tags:
- Products
responses:
'200':
description: Success.
schema:
$ref: '#/definitions/ReplicationExecution'
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'415':
$ref: '#/responses/UnsupportedMediaType'
'500':
description: Unexpected internal errors.
put: put:
summary: Stop the execution of the replication. summary: Stop the execution of the replication.
description: | description: |
@ -4109,19 +4139,17 @@ definitions:
description: description:
type: string type: string
description: The description of the policy. description: The description of the policy.
src_registry_id: src_registry:
type: integer description: The source registry.
format: int64 $ref: '#/definitions/Registry'
description: The source registry ID.
src_namespaces: src_namespaces:
type: array type: array
description: The source namespaces description: The source namespaces
items: items:
type: string type: string
dest_registry_id: dest_registry:
type: integer description: The destination registry.
format: int64 $ref: '#/definitions/Registry'
description: The destination registry ID.
dest_namespace: dest_namespace:
type: string type: string
description: The destination namespace. description: The destination namespace.

View File

@ -155,7 +155,7 @@ func init() {
beego.Router("/api/replication/adapters", &ReplicationAdapterAPI{}, "get:List") beego.Router("/api/replication/adapters", &ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/executions", &ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution") beego.Router("/api/replication/executions", &ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &ReplicationOperationAPI{}, "put:StopExecution") beego.Router("/api/replication/executions/:id([0-9]+)", &ReplicationOperationAPI{}, "get:GetExecution;put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &ReplicationOperationAPI{}, "get:ListTasks") beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &ReplicationOperationAPI{}, "get:GetTaskLog") beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &ReplicationOperationAPI{}, "get:GetTaskLog")

View File

@ -19,6 +19,8 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng" "github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
) )
@ -113,6 +115,11 @@ func (r *ReplicationOperationAPI) CreateExecution() {
return return
} }
if err = event.PopulateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", execution.PolicyID, err))
return
}
executionID, err := ng.OperationCtl.StartReplication(policy, nil) executionID, err := ng.OperationCtl.StartReplication(policy, nil)
if err != nil { if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err)) r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
@ -121,6 +128,26 @@ func (r *ReplicationOperationAPI) CreateExecution() {
r.Redirect(http.StatusCreated, strconv.FormatInt(executionID, 10)) r.Redirect(http.StatusCreated, strconv.FormatInt(executionID, 10))
} }
// GetExecution gets one execution of the replication
func (r *ReplicationOperationAPI) GetExecution() {
executionID, err := r.GetInt64FromPath(":id")
if err != nil || executionID <= 0 {
r.HandleBadRequest("invalid execution ID")
return
}
execution, err := ng.OperationCtl.GetExecution(executionID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get execution %d: %v", executionID, err))
return
}
if execution == nil {
r.HandleNotFound(fmt.Sprintf("execution %d not found", executionID))
return
}
r.WriteJSONData(execution)
}
// StopExecution stops one execution of the replication // StopExecution stops one execution of the replication
func (r *ReplicationOperationAPI) StopExecution() { func (r *ReplicationOperationAPI) StopExecution() {
executionID, err := r.GetInt64FromPath(":id") executionID, err := r.GetInt64FromPath(":id")

View File

@ -83,10 +83,11 @@ func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy
func (f *fakedPolicyManager) Get(id int64) (*model.Policy, error) { func (f *fakedPolicyManager) Get(id int64) (*model.Policy, error) {
if id == 1 { if id == 1 {
return &model.Policy{ return &model.Policy{
ID: 1, ID: 1,
SrcRegistryID: 1, SrcRegistry: &model.Registry{
SrcNamespaces: []string{"library"}, ID: 1,
DestRegistryID: 2, },
SrcNamespaces: []string{"library"},
}, nil }, nil
} }
return nil, nil return nil, nil
@ -148,12 +149,15 @@ func TestListExecutions(t *testing.T) {
func TestCreateExecution(t *testing.T) { func TestCreateExecution(t *testing.T) {
operationCtl := ng.OperationCtl operationCtl := ng.OperationCtl
policyMgr := ng.PolicyCtl policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() { defer func() {
ng.OperationCtl = operationCtl ng.OperationCtl = operationCtl
ng.PolicyCtl = policyMgr ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}() }()
ng.OperationCtl = &fakedOperationController{} ng.OperationCtl = &fakedOperationController{}
ng.PolicyCtl = &fakedPolicyManager{} ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{ cases := []*codeCheckingCase{
// 401 // 401
@ -202,6 +206,53 @@ func TestCreateExecution(t *testing.T) {
runCodeCheckingCases(t, cases...) runCodeCheckingCases(t, cases...)
} }
func TestGetExecution(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/2",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 200
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}
func TestStopExecution(t *testing.T) { func TestStopExecution(t *testing.T) {
operationCtl := ng.OperationCtl operationCtl := ng.OperationCtl
defer func() { defer func() {

View File

@ -19,6 +19,10 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng" "github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
@ -57,6 +61,12 @@ func (r *ReplicationPolicyAPI) List() {
r.HandleInternalServerError(fmt.Sprintf("failed to list policies: %v", err)) r.HandleInternalServerError(fmt.Sprintf("failed to list policies: %v", err))
return return
} }
for _, policy := range policies {
if err = populateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", policy.ID, err))
return
}
}
r.SetPaginationHeader(total, query.Page, query.Size) r.SetPaginationHeader(total, query.Page, query.Size)
r.WriteJSONData(policies) r.WriteJSONData(policies)
} }
@ -97,9 +107,11 @@ func (r *ReplicationPolicyAPI) validateName(policy *model.Policy) bool {
// make the registry referenced exists // make the registry referenced exists
func (r *ReplicationPolicyAPI) validateRegistry(policy *model.Policy) bool { func (r *ReplicationPolicyAPI) validateRegistry(policy *model.Policy) bool {
registryID := policy.SrcRegistryID var registryID int64
if registryID == 0 { if policy.SrcRegistry != nil && policy.SrcRegistry.ID > 0 {
registryID = policy.DestRegistryID registryID = policy.SrcRegistry.ID
} else {
registryID = policy.DestRegistry.ID
} }
registry, err := ng.RegistryMgr.Get(registryID) registry, err := ng.RegistryMgr.Get(registryID)
if err != nil { if err != nil {
@ -130,6 +142,10 @@ func (r *ReplicationPolicyAPI) Get() {
r.HandleNotFound(fmt.Sprintf("policy %d not found", id)) r.HandleNotFound(fmt.Sprintf("policy %d not found", id))
return return
} }
if err = populateRegistries(ng.RegistryMgr, policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to populate registries for policy %d: %v", policy.ID, err))
return
}
r.WriteJSONData(policy) r.WriteJSONData(policy)
} }
@ -208,3 +224,17 @@ func (r *ReplicationPolicyAPI) Delete() {
return return
} }
} }
// ignore the credential for the registries
func populateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
if err := event.PopulateRegistries(registryMgr, policy); err != nil {
return err
}
if policy.SrcRegistry != nil {
policy.SrcRegistry.Credential = nil
}
if policy.DestRegistry != nil {
policy.DestRegistry.Credential = nil
}
return nil
}

View File

@ -126,7 +126,9 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies", url: "/api/replication/policies",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -152,8 +154,10 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies", url: "/api/replication/policies",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
}, },
}, },
code: http.StatusBadRequest, code: http.StatusBadRequest,
@ -165,8 +169,10 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies", url: "/api/replication/policies",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "duplicate_name", Name: "duplicate_name",
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -179,8 +185,10 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies", url: "/api/replication/policies",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 2, SrcRegistry: &model.Registry{
ID: 2,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -193,8 +201,10 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
url: "/api/replication/policies", url: "/api/replication/policies",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -207,10 +217,13 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
func TestReplicationPolicyAPIGet(t *testing.T) { func TestReplicationPolicyAPIGet(t *testing.T) {
policyMgr := ng.PolicyCtl policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() { defer func() {
ng.PolicyCtl = policyMgr ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}() }()
ng.PolicyCtl = &fakedPolicyManager{} ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{ cases := []*codeCheckingCase{
// 401 // 401
{ {
@ -296,7 +309,9 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
url: "/api/replication/policies/1", url: "/api/replication/policies/1",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -309,8 +324,10 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
url: "/api/replication/policies/1", url: "/api/replication/policies/1",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "duplicate_name", Name: "duplicate_name",
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -323,8 +340,10 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
url: "/api/replication/policies/1", url: "/api/replication/policies/1",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 2, SrcRegistry: &model.Registry{
ID: 2,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },
@ -337,8 +356,10 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
url: "/api/replication/policies/1", url: "/api/replication/policies/1",
credential: sysAdmin, credential: sysAdmin,
bodyJSON: &model.Policy{ bodyJSON: &model.Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 1, SrcRegistry: &model.Registry{
ID: 1,
},
SrcNamespaces: []string{"library"}, SrcNamespaces: []string{"library"},
}, },
}, },

View File

@ -102,7 +102,7 @@ func initRouters() {
beego.Router("/api/replication/adapters", &api.ReplicationAdapterAPI{}, "get:List") beego.Router("/api/replication/adapters", &api.ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/executions", &api.ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution") beego.Router("/api/replication/executions", &api.ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &api.ReplicationOperationAPI{}, "put:StopExecution") beego.Router("/api/replication/executions/:id([0-9]+)", &api.ReplicationOperationAPI{}, "get:GetExecution;put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &api.ReplicationOperationAPI{}, "get:ListTasks") beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &api.ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &api.ReplicationOperationAPI{}, "get:GetTaskLog") beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &api.ReplicationOperationAPI{}, "get:GetTaskLog")

View File

@ -18,6 +18,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation" "github.com/goharbor/harbor/src/replication/ng/operation"
@ -30,16 +33,18 @@ type Handler interface {
} }
// NewHandler ... // NewHandler ...
func NewHandler(policyCtl policy.Controller, opCtl operation.Controller) Handler { func NewHandler(policyCtl policy.Controller, registryMgr registry.Manager, opCtl operation.Controller) Handler {
return &handler{ return &handler{
policyCtl: policyCtl, policyCtl: policyCtl,
opCtl: opCtl, registryMgr: registryMgr,
opCtl: opCtl,
} }
} }
type handler struct { type handler struct {
policyCtl policy.Controller policyCtl policy.Controller
opCtl operation.Controller registryMgr registry.Manager
opCtl operation.Controller
} }
func (h *handler) Handle(event *Event) error { func (h *handler) Handle(event *Event) error {
@ -68,6 +73,9 @@ func (h *handler) Handle(event *Event) error {
} }
for _, policy := range policies { for _, policy := range policies {
if err := PopulateRegistries(h.registryMgr, policy); err != nil {
return err
}
id, err := h.opCtl.StartReplication(policy, event.Resource) id, err := h.opCtl.StartReplication(policy, event.Resource)
if err != nil { if err != nil {
return err return err
@ -111,3 +119,51 @@ func (h *handler) getRelatedPolicies(namespace string, replicateDeletion ...bool
} }
return result, nil return result, nil
} }
// PopulateRegistries populates the source registry and destination registry properties for policy
func PopulateRegistries(registryMgr registry.Manager, policy *model.Policy) error {
if policy == nil {
return nil
}
registry, err := getRegistry(registryMgr, policy.SrcRegistry)
if err != nil {
return err
}
policy.SrcRegistry = registry
registry, err = getRegistry(registryMgr, policy.DestRegistry)
if err != nil {
return err
}
policy.DestRegistry = registry
return nil
}
func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model.Registry, error) {
if registry == nil || registry.ID == 0 {
return getLocalRegistry(), nil
}
reg, err := registryMgr.Get(registry.ID)
if err != nil {
return nil, err
}
if reg == nil {
return nil, fmt.Errorf("registry %d not found", registry.ID)
}
return reg, nil
}
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
Status: "healthy",
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
) )
@ -104,6 +105,33 @@ func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
func (f *fakedPolicyController) Remove(int64) error { func (f *fakedPolicyController) Remove(int64) error {
return nil return nil
} }
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
return &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
func TestGetRelatedPolicies(t *testing.T) { func TestGetRelatedPolicies(t *testing.T) {
handler := &handler{ handler := &handler{
policyCtl: &fakedPolicyController{}, policyCtl: &fakedPolicyController{},
@ -121,7 +149,10 @@ func TestGetRelatedPolicies(t *testing.T) {
} }
func TestHandle(t *testing.T) { func TestHandle(t *testing.T) {
handler := NewHandler(&fakedPolicyController{}, &fakedOperationController{}) config.Config = &config.Configuration{}
handler := NewHandler(&fakedPolicyController{},
&fakedRegistryManager{},
&fakedOperationController{})
// nil event // nil event
err := handler.Handle(nil) err := handler.Handle(nil)
require.NotNil(t, err) require.NotNil(t, err)

View File

@ -42,11 +42,11 @@ type Policy struct {
// TODO consider to remove this property? // TODO consider to remove this property?
Creator string `json:"creator"` Creator string `json:"creator"`
// source // source
SrcRegistryID int64 `json:"src_registry_id"` SrcRegistry *Registry `json:"src_registry"`
SrcNamespaces []string `json:"src_namespaces"` SrcNamespaces []string `json:"src_namespaces"`
// destination // destination
// TODO rename to DstRegistryID // TODO rename to DstRegistry
DestRegistryID int64 `json:"dest_registry_id"` DestRegistry *Registry `json:"dest_registry"`
// Only support two dest namespace modes: // Only support two dest namespace modes:
// Put all the src resources to the one single dest namespace // Put all the src resources to the one single dest namespace
// or keep namespaces same with the source ones (under this case, // or keep namespaces same with the source ones (under this case,
@ -73,11 +73,18 @@ func (p *Policy) Valid(v *validation.Validation) {
if len(p.Name) == 0 { if len(p.Name) == 0 {
v.SetError("name", "cannot be empty") v.SetError("name", "cannot be empty")
} }
var srcRegistryID, dstRegistryID int64
if p.SrcRegistry != nil {
srcRegistryID = p.SrcRegistry.ID
}
if p.DestRegistry != nil {
dstRegistryID = p.DestRegistry.ID
}
// one of the source registry and destination registry must be Harbor itself // one of the source registry and destination registry must be Harbor itself
if p.SrcRegistryID != 0 && p.DestRegistryID != 0 || if srcRegistryID != 0 && dstRegistryID != 0 ||
p.SrcRegistryID == 0 && p.DestRegistryID == 0 { srcRegistryID == 0 && dstRegistryID == 0 {
v.SetError("src_registry_id, dest_registry_id", "one of them should be empty and the other one shouldn't be empty") v.SetError("src_registry, dest_registry", "one of them should be empty and the other one shouldn't be empty")
} }
// source namespaces cannot be empty // source namespaces cannot be empty

View File

@ -42,39 +42,55 @@ func TestValidOfPolicy(t *testing.T) {
// source registry and destination registry both not empty // source registry and destination registry both not empty
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 1, SrcRegistry: &Registry{
DestRegistryID: 2, ID: 1,
},
DestRegistry: &Registry{
ID: 2,
},
}, },
pass: false, pass: false,
}, },
// empty source namespaces // empty source namespaces
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{},
}, },
pass: false, pass: false,
}, },
// empty source namespaces // empty source namespaces
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{""}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{""},
}, },
pass: false, pass: false,
}, },
// invalid filter // invalid filter
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{"library"}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{ Filters: []*Filter{
{ {
Type: "invalid_type", Type: "invalid_type",
@ -86,10 +102,14 @@ func TestValidOfPolicy(t *testing.T) {
// invalid trigger // invalid trigger
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{"library"}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{ Filters: []*Filter{
{ {
Type: FilterTypeName, Type: FilterTypeName,
@ -105,10 +125,14 @@ func TestValidOfPolicy(t *testing.T) {
// invalid trigger // invalid trigger
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{"library"}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{ Filters: []*Filter{
{ {
Type: FilterTypeName, Type: FilterTypeName,
@ -124,10 +148,14 @@ func TestValidOfPolicy(t *testing.T) {
// pass // pass
{ {
policy: &Policy{ policy: &Policy{
Name: "policy01", Name: "policy01",
SrcRegistryID: 0, SrcRegistry: &Registry{
DestRegistryID: 1, ID: 0,
SrcNamespaces: []string{"library"}, },
DestRegistry: &Registry{
ID: 1,
},
SrcNamespaces: []string{"library"},
Filters: []*Filter{ Filters: []*Filter{
{ {
Type: FilterTypeName, Type: FilterTypeName,

View File

@ -18,13 +18,14 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution" "github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/flow" "github.com/goharbor/harbor/src/replication/ng/operation/flow"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
) )
// Controller handles the replication-related operations: start, // Controller handles the replication-related operations: start,
@ -41,49 +42,51 @@ type Controller interface {
} }
// NewController returns a controller implementation // NewController returns a controller implementation
func NewController(executionMgr execution.Manager, registrgMgr registry.Manager, func NewController(js job.Client) Controller {
scheduler scheduler.Scheduler) Controller { return &controller{
return &defaultController{ executionMgr: execution.NewDefaultManager(),
executionMgr: executionMgr, scheduler: scheduler.NewScheduler(js),
registryMgr: registrgMgr,
scheduler: scheduler,
flowCtl: flow.NewController(), flowCtl: flow.NewController(),
} }
} }
type defaultController struct { type controller struct {
flowCtl flow.Controller flowCtl flow.Controller
executionMgr execution.Manager executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
} }
func (d *defaultController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
if resource != nil && len(resource.Metadata.Vtags) != 1 { if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags) return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
} }
id, err := createExecution(d.executionMgr, policy.ID) id, err := createExecution(c.executionMgr, policy.ID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
flow := d.createFlow(id, policy, resource) flow := c.createFlow(id, policy, resource)
if err = d.flowCtl.Start(flow); err != nil { if err = c.flowCtl.Start(flow); err != nil {
// mark the execution as failure and log the error message // just update the status text, the status will be updated automatically
// no error will be returned as the execution is created successfully // when listing the execution records
markExecutionFailure(d.executionMgr, id, err.Error()) if e := c.executionMgr.Update(&models.Execution{
ID: id,
StatusText: err.Error(),
}, "StatusText"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)
} }
return id, nil return id, nil
} }
// create different replication flows according to the input parameters // create different replication flows according to the input parameters
func (d *defaultController) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow { func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow // replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted { if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy, []*model.Resource{resource}) return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, []*model.Resource{resource})
} }
// copy only one resource, add extra filters to the policy to make sure // copy only one resource, add extra filters to the policy to make sure
// only the resource will be filtered out // only the resource will be filtered out
@ -106,30 +109,30 @@ func (d *defaultController) createFlow(executionID int64, policy *model.Policy,
filters = append(filters, policy.Filters...) filters = append(filters, policy.Filters...)
policy.Filters = filters policy.Filters = filters
} }
return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy) return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy)
} }
func (d *defaultController) StopReplication(executionID int64) error { func (c *controller) StopReplication(executionID int64) error {
// TODO implement the function // TODO implement the function
return nil return nil
} }
func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { func (c *controller) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return d.executionMgr.List(query...) return c.executionMgr.List(query...)
} }
func (d *defaultController) GetExecution(executionID int64) (*models.Execution, error) { func (c *controller) GetExecution(executionID int64) (*models.Execution, error) {
return d.executionMgr.Get(executionID) return c.executionMgr.Get(executionID)
} }
func (d *defaultController) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) { func (c *controller) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) {
return d.executionMgr.ListTasks(query...) return c.executionMgr.ListTasks(query...)
} }
func (d *defaultController) GetTask(id int64) (*models.Task, error) { func (c *controller) GetTask(id int64) (*models.Task, error) {
return d.executionMgr.GetTask(id) return c.executionMgr.GetTask(id)
} }
func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { func (c *controller) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return d.executionMgr.UpdateTaskStatus(id, status, statusCondition...) return c.executionMgr.UpdateTaskStatus(id, status, statusCondition...)
} }
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) { func (c *controller) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID) return c.executionMgr.GetTaskLog(taskID)
} }
// create the execution record in database // create the execution record in database
@ -145,19 +148,3 @@ func createExecution(mgr execution.Manager, policyID int64) (int64, error) {
log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id) log.Debugf("an execution record for replication based on the policy %d created: %d", policyID, id)
return id, nil return id, nil
} }
// mark the execution as failure in database
func markExecutionFailure(mgr execution.Manager, id int64, message string) {
err := mgr.Update(
&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: message,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", id, err)
return
}
log.Debugf("the execution %d is marked as failure: %s", id, message)
}

View File

@ -15,11 +15,15 @@
package operation package operation
import ( import (
"io"
"testing" "testing"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config" "github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/flow"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -82,38 +86,6 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return []byte("message"), nil return []byte("message"), nil
} }
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
type fakedScheduler struct{} type fakedScheduler struct{}
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) { func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
@ -140,12 +112,119 @@ func (f *fakedScheduler) Stop(id string) error {
return nil return nil
} }
var ctl = NewController(&fakedExecutionManager{}, &fakedRegistryManager{}, &fakedScheduler{}) func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil
}
type fakedAdapter struct{}
func (f *fakedAdapter) Info() (*model.RegistryInfo, error) {
return &model.RegistryInfo{
Type: model.RegistryTypeHarbor,
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
model.ResourceTypeChart,
},
SupportedTriggers: []model.TriggerType{model.TriggerTypeManual},
}, nil
}
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
return nil, nil
}
func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
return nil
}
func (f *fakedAdapter) GetNamespace(ns string) (*model.Namespace, error) {
var namespace *model.Namespace
if ns == "library" {
namespace = &model.Namespace{
Name: "library",
Metadata: map[string]interface{}{
"public": true,
},
}
}
return namespace, nil
}
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
Override: false,
},
}, nil
}
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
}
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
}
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakedAdapter) DeleteManifest(repository, digest string) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
}
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
}
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0"},
},
},
}, nil
}
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
return false, nil
}
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
return nil
}
func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
var ctl = &controller{
executionMgr: &fakedExecutionManager{},
scheduler: &fakedScheduler{},
flowCtl: flow.NewController(),
}
func TestStartReplication(t *testing.T) { func TestStartReplication(t *testing.T) {
err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory)
require.Nil(t, err)
config.Config = &config.Configuration{} config.Config = &config.Configuration{}
// the resource contains Vtags whose length isn't 1 // the resource contains Vtags whose length isn't 1
policy := &model.Policy{} policy := &model.Policy{
SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
resource := &model.Resource{ resource := &model.Resource{
Type: model.ResourceTypeRepository, Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{ Metadata: &model.ResourceMetadata{
@ -153,7 +232,7 @@ func TestStartReplication(t *testing.T) {
Vtags: []string{"1.0", "2.0"}, Vtags: []string{"1.0", "2.0"},
}, },
} }
_, err := ctl.StartReplication(policy, resource) _, err = ctl.StartReplication(policy, resource)
require.NotNil(t, err) require.NotNil(t, err)
// replicate resource deletion // replicate resource deletion

View File

@ -22,24 +22,21 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution" "github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
) )
type copyFlow struct { type copyFlow struct {
executionID int64 executionID int64
policy *model.Policy policy *model.Policy
executionMgr execution.Manager executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
} }
// NewCopyFlow returns an instance of the copy flow which replicates the resources from // NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry // the source registry to the destination registry
func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager, func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy) Flow { executionID int64, policy *model.Policy) Flow {
return &copyFlow{ return &copyFlow{
executionMgr: executionMgr, executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler, scheduler: scheduler,
executionID: executionID, executionID: executionID,
policy: policy, policy: policy,
@ -47,7 +44,7 @@ func NewCopyFlow(executionMgr execution.Manager, registryMgr registry.Manager,
} }
func (c *copyFlow) Run(interface{}) error { func (c *copyFlow) Run(interface{}) error {
_, dstRegistry, srcAdapter, dstAdapter, err := initialize(c.registryMgr, c.policy) srcAdapter, dstAdapter, err := initialize(c.policy)
if err != nil { if err != nil {
return err return err
} }
@ -67,7 +64,7 @@ func (c *copyFlow) Run(interface{}) error {
if err = createNamespaces(dstAdapter, dstNamespaces); err != nil { if err = createNamespaces(dstAdapter, dstNamespaces); err != nil {
return err return err
} }
dstResources := assembleDestinationResources(srcResources, dstRegistry, c.policy.DestNamespace, c.policy.Override) dstResources := assembleDestinationResources(srcResources, c.policy.DestRegistry, c.policy.DestNamespace, c.policy.Override)
items, err := preprocess(c.scheduler, srcResources, dstResources) items, err := preprocess(c.scheduler, srcResources, dstResources)
if err != nil { if err != nil {
return err return err

View File

@ -20,9 +20,15 @@ import (
func TestRunOfCopyFlow(t *testing.T) { func TestRunOfCopyFlow(t *testing.T) {
scheduler := &fakedScheduler{} scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{} executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{} policy := &model.Policy{
policy := &model.Policy{} SrcRegistry: &model.Registry{
flow := NewCopyFlow(executionMgr, registryMgr, scheduler, 1, policy) Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
flow := NewCopyFlow(executionMgr, scheduler, 1, policy)
err := flow.Run(nil) err := flow.Run(nil)
require.Nil(t, err) require.Nil(t, err)
} }

View File

@ -19,26 +19,22 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution" "github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
) )
type deletionFlow struct { type deletionFlow struct {
executionID int64 executionID int64
policy *model.Policy policy *model.Policy
executionMgr execution.Manager executionMgr execution.Manager
registryMgr registry.Manager
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
resources []*model.Resource resources []*model.Resource
} }
// NewDeletionFlow returns an instance of the delete flow which deletes the resources // NewDeletionFlow returns an instance of the delete flow which deletes the resources
// on the destination registry // on the destination registry
func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manager, func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
scheduler scheduler.Scheduler, executionID int64, policy *model.Policy, executionID int64, policy *model.Policy, resources []*model.Resource) Flow {
resources []*model.Resource) Flow {
return &deletionFlow{ return &deletionFlow{
executionMgr: executionMgr, executionMgr: executionMgr,
registryMgr: registryMgr,
scheduler: scheduler, scheduler: scheduler,
executionID: executionID, executionID: executionID,
policy: policy, policy: policy,
@ -47,13 +43,9 @@ func NewDeletionFlow(executionMgr execution.Manager, registryMgr registry.Manage
} }
func (d *deletionFlow) Run(interface{}) error { func (d *deletionFlow) Run(interface{}) error {
srcRegistry, dstRegistry, _, _, err := initialize(d.registryMgr, d.policy)
if err != nil {
return err
}
// filling the registry information // filling the registry information
for _, resource := range d.resources { for _, resource := range d.resources {
resource.Registry = srcRegistry resource.Registry = d.policy.SrcRegistry
} }
srcResources, err := filterResources(d.resources, d.policy.Filters) srcResources, err := filterResources(d.resources, d.policy.Filters)
if err != nil { if err != nil {
@ -64,7 +56,7 @@ func (d *deletionFlow) Run(interface{}) error {
log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID) log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID)
return nil return nil
} }
dstResources := assembleDestinationResources(srcResources, dstRegistry, d.policy.DestNamespace, d.policy.Override) dstResources := assembleDestinationResources(srcResources, d.policy.DestRegistry, d.policy.DestNamespace, d.policy.Override)
items, err := preprocess(d.scheduler, srcResources, dstResources) items, err := preprocess(d.scheduler, srcResources, dstResources)
if err != nil { if err != nil {
return err return err

View File

@ -24,10 +24,16 @@ import (
func TestRunOfDeletionFlow(t *testing.T) { func TestRunOfDeletionFlow(t *testing.T) {
scheduler := &fakedScheduler{} scheduler := &fakedScheduler{}
executionMgr := &fakedExecutionManager{} executionMgr := &fakedExecutionManager{}
registryMgr := &fakedRegistryManager{} policy := &model.Policy{
policy := &model.Policy{} SrcRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
DestRegistry: &model.Registry{
Type: model.RegistryTypeHarbor,
},
}
resources := []*model.Resource{} resources := []*model.Resource{}
flow := NewDeletionFlow(executionMgr, registryMgr, scheduler, 1, policy, resources) flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources)
err := flow.Run(nil) err := flow.Run(nil)
require.Nil(t, err) require.Nil(t, err)
} }

View File

@ -22,68 +22,39 @@ import (
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/ng/adapter" adp "github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation/execution" "github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/util" "github.com/goharbor/harbor/src/replication/ng/util"
) )
// get/create the source registry, destination registry, source adapter and destination adapter // get/create the source registry, destination registry, source adapter and destination adapter
func initialize(mgr registry.Manager, policy *model.Policy) (*model.Registry, *model.Registry, adp.Adapter, adp.Adapter, error) { func initialize(policy *model.Policy) (adp.Adapter, adp.Adapter, error) {
var srcRegistry, dstRegistry *model.Registry
var srcAdapter, dstAdapter adp.Adapter var srcAdapter, dstAdapter adp.Adapter
var err error var err error
registry := getLocalRegistry()
// get source registry
if policy.SrcRegistryID != 0 {
srcRegistry, err = mgr.Get(policy.SrcRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
}
if srcRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
}
} else {
srcRegistry = registry
}
// get destination registry
if policy.DestRegistryID != 0 {
dstRegistry, err = mgr.Get(policy.DestRegistryID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
}
if dstRegistry == nil {
return nil, nil, nil, nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
}
} else {
dstRegistry = registry
}
// create the source registry adapter // create the source registry adapter
srcFactory, err := adp.GetFactory(srcRegistry.Type) srcFactory, err := adp.GetFactory(policy.SrcRegistry.Type)
if err != nil { if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err) return nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", policy.SrcRegistry.Type, err)
} }
srcAdapter, err = srcFactory(srcRegistry) srcAdapter, err = srcFactory(policy.SrcRegistry)
if err != nil { if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err) return nil, nil, fmt.Errorf("failed to create adapter for source registry %s: %v", policy.SrcRegistry.URL, err)
} }
// create the destination registry adapter // create the destination registry adapter
dstFactory, err := adp.GetFactory(dstRegistry.Type) dstFactory, err := adp.GetFactory(policy.DestRegistry.Type)
if err != nil { if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err) return nil, nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", policy.DestRegistry.Type, err)
} }
dstAdapter, err = dstFactory(dstRegistry) dstAdapter, err = dstFactory(policy.DestRegistry)
if err != nil { if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err) return nil, nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", policy.DestRegistry.URL, err)
} }
log.Debug("replication flow initialization completed") log.Debug("replication flow initialization completed")
return srcRegistry, dstRegistry, srcAdapter, dstAdapter, nil return srcAdapter, dstAdapter, nil
} }
// fetch resources from the source registry // fetch resources from the source registry
@ -367,18 +338,3 @@ func getResourceName(res *model.Resource) string {
} }
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]" return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
} }
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
// TODO use the service account
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
},
Insecure: true,
}
}

View File

@ -30,37 +30,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type fakedRegistryManager struct{}
func (f *fakedRegistryManager) Add(*model.Registry) (int64, error) {
return 0, nil
}
func (f *fakedRegistryManager) List(...*model.RegistryQuery) (int64, []*model.Registry, error) {
return 0, nil, nil
}
func (f *fakedRegistryManager) Get(id int64) (*model.Registry, error) {
var registry *model.Registry
switch id {
case 1:
registry = &model.Registry{
ID: 1,
Type: model.RegistryTypeHarbor,
}
}
return registry, nil
}
func (f *fakedRegistryManager) GetByName(name string) (*model.Registry, error) {
return nil, nil
}
func (f *fakedRegistryManager) Update(*model.Registry, ...string) error {
return nil
}
func (f *fakedRegistryManager) Remove(int64) error {
return nil
}
func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) { func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
return &fakedAdapter{}, nil return &fakedAdapter{}, nil
} }
@ -241,19 +210,6 @@ func TestMain(m *testing.M) {
os.Exit(m.Run()) os.Exit(m.Run())
} }
func TestInitialize(t *testing.T) {
url := "https://registry.harbor.local"
registryMgr := &fakedRegistryManager{}
policy := &model.Policy{
SrcRegistryID: 0,
DestRegistryID: 1,
}
srcRegistry, dstRegistry, _, _, err := initialize(registryMgr, policy)
require.Nil(t, err)
assert.Equal(t, url, srcRegistry.URL)
assert.Equal(t, int64(1), dstRegistry.ID)
}
func TestFetchResources(t *testing.T) { func TestFetchResources(t *testing.T) {
adapter := &fakedAdapter{} adapter := &fakedAdapter{}
policy := &model.Policy{} policy := &model.Policy{}

View File

@ -34,18 +34,26 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (*model.Policy, e
} }
ply := model.Policy{ ply := model.Policy{
ID: policy.ID, ID: policy.ID,
Name: policy.Name, Name: policy.Name,
Description: policy.Description, Description: policy.Description,
Creator: policy.Creator, Creator: policy.Creator,
SrcRegistryID: policy.SrcRegistryID, DestNamespace: policy.DestNamespace,
DestRegistryID: policy.DestRegistryID, Deletion: policy.ReplicateDeletion,
DestNamespace: policy.DestNamespace, Override: policy.Override,
Deletion: policy.ReplicateDeletion, Enabled: policy.Enabled,
Override: policy.Override, CreationTime: policy.CreationTime,
Enabled: policy.Enabled, UpdateTime: policy.UpdateTime,
CreationTime: policy.CreationTime, }
UpdateTime: policy.UpdateTime, if policy.SrcRegistryID > 0 {
ply.SrcRegistry = &model.Registry{
ID: policy.SrcRegistryID,
}
}
if policy.DestRegistryID > 0 {
ply.DestRegistry = &model.Registry{
ID: policy.DestRegistryID,
}
} }
// 1. parse SrcNamespaces to array // 1. parse SrcNamespaces to array
@ -84,9 +92,7 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err
Name: policy.Name, Name: policy.Name,
Description: policy.Description, Description: policy.Description,
Creator: policy.Creator, Creator: policy.Creator,
SrcRegistryID: policy.SrcRegistryID,
SrcNamespaces: strings.Join(policy.SrcNamespaces, ","), SrcNamespaces: strings.Join(policy.SrcNamespaces, ","),
DestRegistryID: policy.DestRegistryID,
DestNamespace: policy.DestNamespace, DestNamespace: policy.DestNamespace,
Override: policy.Override, Override: policy.Override,
Enabled: policy.Enabled, Enabled: policy.Enabled,
@ -94,6 +100,12 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err
CreationTime: policy.CreationTime, CreationTime: policy.CreationTime,
UpdateTime: time.Now(), UpdateTime: time.Now(),
} }
if policy.SrcRegistry != nil {
ply.SrcRegistryID = policy.SrcRegistry.ID
}
if policy.DestRegistry != nil {
ply.DestRegistryID = policy.DestRegistry.ID
}
if policy.Trigger != nil { if policy.Trigger != nil {
trigger, err := json.Marshal(policy.Trigger) trigger, err := json.Marshal(policy.Trigger)

View File

@ -63,19 +63,23 @@ func Test_convertFromPersistModel(t *testing.T) {
Trigger: "", Trigger: "",
Filters: "[]", Filters: "[]",
}, want: &model.Policy{ }, want: &model.Policy{
ID: 999, ID: 999,
Name: "Policy Test", Name: "Policy Test",
Description: "Policy Description", Description: "Policy Description",
Creator: "someone", Creator: "someone",
SrcRegistryID: 123, SrcRegistry: &model.Registry{
SrcNamespaces: []string{"ns1", "ns2", "ns3"}, ID: 123,
DestRegistryID: 456, },
DestNamespace: "target_ns", SrcNamespaces: []string{"ns1", "ns2", "ns3"},
Deletion: true, DestRegistry: &model.Registry{
Override: true, ID: 456,
Enabled: true, },
Trigger: nil, DestNamespace: "target_ns",
Filters: []*model.Filter{}, Deletion: true,
Override: true,
Enabled: true,
Trigger: nil,
Filters: []*model.Filter{},
}, },
}, },
} }
@ -98,9 +102,9 @@ func Test_convertFromPersistModel(t *testing.T) {
assert.Equal(t, tt.want.Name, got.Name) assert.Equal(t, tt.want.Name, got.Name)
assert.Equal(t, tt.want.Description, got.Description) assert.Equal(t, tt.want.Description, got.Description)
assert.Equal(t, tt.want.Creator, got.Creator) assert.Equal(t, tt.want.Creator, got.Creator)
assert.Equal(t, tt.want.SrcRegistryID, got.SrcRegistryID) assert.Equal(t, tt.want.SrcRegistry.ID, got.SrcRegistry.ID)
assert.Equal(t, tt.want.SrcNamespaces, got.SrcNamespaces) assert.Equal(t, tt.want.SrcNamespaces, got.SrcNamespaces)
assert.Equal(t, tt.want.DestRegistryID, got.DestRegistryID) assert.Equal(t, tt.want.DestRegistry.ID, got.DestRegistry.ID)
assert.Equal(t, tt.want.DestNamespace, got.DestNamespace) assert.Equal(t, tt.want.DestNamespace, got.DestNamespace)
assert.Equal(t, tt.want.Deletion, got.Deletion) assert.Equal(t, tt.want.Deletion, got.Deletion)
assert.Equal(t, tt.want.Override, got.Override) assert.Equal(t, tt.want.Override, got.Override)
@ -122,19 +126,23 @@ func Test_convertToPersistModel(t *testing.T) {
{name: "Nil Model", from: nil, want: nil, wantErr: true}, {name: "Nil Model", from: nil, want: nil, wantErr: true},
{ {
name: "Persist Model", from: &model.Policy{ name: "Persist Model", from: &model.Policy{
ID: 999, ID: 999,
Name: "Policy Test", Name: "Policy Test",
Description: "Policy Description", Description: "Policy Description",
Creator: "someone", Creator: "someone",
SrcRegistryID: 123, SrcRegistry: &model.Registry{
SrcNamespaces: []string{"ns1", "ns2", "ns3"}, ID: 123,
DestRegistryID: 456, },
DestNamespace: "target_ns", SrcNamespaces: []string{"ns1", "ns2", "ns3"},
Deletion: true, DestRegistry: &model.Registry{
Override: true, ID: 456,
Enabled: true, },
Trigger: &model.Trigger{}, DestNamespace: "target_ns",
Filters: []*model.Filter{{Type: "registry", Value: "abc"}}, Deletion: true,
Override: true,
Enabled: true,
Trigger: &model.Trigger{},
Filters: []*model.Filter{{Type: "registry", Value: "abc"}},
}, want: &persist_models.RepPolicy{ }, want: &persist_models.RepPolicy{
ID: 999, ID: 999,
Name: "Policy Test", Name: "Policy Test",

View File

@ -23,8 +23,6 @@ import (
"github.com/goharbor/harbor/src/replication/ng/config" "github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/event" "github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng/operation" "github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
task_scheduler "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/policy" "github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/policy/controller" "github.com/goharbor/harbor/src/replication/ng/policy/controller"
"github.com/goharbor/harbor/src/replication/ng/registry" "github.com/goharbor/harbor/src/replication/ng/registry"
@ -70,10 +68,9 @@ func Init() error {
// init policy controller // init policy controller
PolicyCtl = controller.NewController(js) PolicyCtl = controller.NewController(js)
// init operation controller // init operation controller
OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr, OperationCtl = operation.NewController(js)
task_scheduler.NewScheduler(js))
// init event handler // init event handler
EventHandler = event.NewHandler(PolicyCtl, OperationCtl) EventHandler = event.NewHandler(PolicyCtl, RegistryMgr, OperationCtl)
log.Debug("the replication initialization completed") log.Debug("the replication initialization completed")
return nil return nil
} }