Refactor the replication policy destination namespace logic

Support specifying what part of the repository will be replaced by the provided namespace

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2021-04-14 11:21:44 +08:00
parent 28d4e285f9
commit 710c80078b
11 changed files with 193 additions and 74 deletions

View File

@ -5954,6 +5954,13 @@ definitions:
dest_namespace:
type: string
description: The destination namespace.
dest_namespace_replace_count:
type: integer
format: int8
description: |-
Specify how many path components will be replaced by the provided destination namespace.
The default value is -1 in which case the legacy mode will be applied.
x-isnullable: true # make this field optional to keep backward compatibility
trigger:
$ref: '#/definitions/ReplicationTrigger'
filters:

View File

@ -0,0 +1,2 @@
ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS dest_namespace_replace_count int;
UPDATE replication_policy SET dest_namespace_replace_count=-1 WHERE dest_namespace IS NULL;

View File

@ -78,7 +78,10 @@ func (c *copyFlow) Run(ctx context.Context) error {
}
srcResources = assembleSourceResources(srcResources, c.policy)
dstResources := assembleDestinationResources(srcResources, c.policy)
dstResources, err := assembleDestinationResources(srcResources, c.policy)
if err != nil {
return err
}
if err = prepareForPush(dstAdapter, dstResources); err != nil {
return err

View File

@ -46,7 +46,10 @@ func NewDeletionFlow(executionID int64, policy *repctlmodel.Policy, resources ..
func (d *deletionFlow) Run(ctx context.Context) error {
srcResources := assembleSourceResources(d.resources, d.policy)
dstResources := assembleDestinationResources(srcResources, d.policy)
dstResources, err := assembleDestinationResources(srcResources, d.policy)
if err != nil {
return err
}
return d.createTasks(ctx, srcResources, dstResources)
}

View File

@ -16,6 +16,8 @@ package flow
import (
"fmt"
"github.com/goharbor/harbor/src/lib/errors"
"strings"
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/lib/log"
@ -122,9 +124,13 @@ func assembleSourceResources(resources []*model.Resource,
// assemble the destination resources by filling the metadata, registry and override properties
func assembleDestinationResources(resources []*model.Resource,
policy *repctlmodel.Policy) []*model.Resource {
policy *repctlmodel.Policy) ([]*model.Resource, error) {
var result []*model.Resource
for _, resource := range resources {
name, err := replaceNamespace(resource.Metadata.Repository.Name, policy.DestNamespace, policy.DestNamespaceReplaceCount)
if err != nil {
return nil, err
}
res := &model.Resource{
Type: resource.Type,
Registry: policy.DestRegistry,
@ -135,7 +141,7 @@ func assembleDestinationResources(resources []*model.Resource,
}
res.Metadata = &model.ResourceMetadata{
Repository: &model.Repository{
Name: replaceNamespace(resource.Metadata.Repository.Name, policy.DestNamespace),
Name: name,
Metadata: resource.Metadata.Repository.Metadata,
},
Vtags: resource.Metadata.Vtags,
@ -144,7 +150,7 @@ func assembleDestinationResources(resources []*model.Resource,
result = append(result, res)
}
log.Debug("assemble the destination resources completed")
return result
return result, nil
}
// do the prepare work for pushing/uploading the resources: create the namespace or repository
@ -186,13 +192,34 @@ func getResourceName(res *model.Resource) string {
return fmt.Sprintf("%s [%d item(s) in total]", meta.Repository.Name, n)
}
// repository:c namespace:n -> n/c
// repository:b/c namespace:n -> n/c
// repository:a/b/c namespace:n -> n/c
func replaceNamespace(repository string, namespace string) string {
// repository:a/b/c namespace:n replaceCount: -1 -> n/c
// repository:a/b/c namespace:n replaceCount: 0 -> n/a/b/c
// repository:a/b/c namespace:n replaceCount: 1 -> n/b/c
// repository:a/b/c namespace:n replaceCount: 2 -> n/c
// repository:a/b/c namespace:n replaceCount: 3 -> n
func replaceNamespace(repository string, namespace string, replaceCount int8) (string, error) {
if len(namespace) == 0 {
return repository
return repository, nil
}
// legacy logic to keep backward compatibility
if replaceCount < 0 {
_, rest := util.ParseRepository(repository)
return fmt.Sprintf("%s/%s", namespace, rest), nil
}
subs := strings.Split(repository, "/")
len := len(subs)
switch {
case replaceCount == 0:
return fmt.Sprintf("%s/%s", namespace, repository), nil
case int(replaceCount) == len:
return namespace, nil
case int(replaceCount) > len:
return "", errors.New(nil).WithCode(errors.BadRequestCode).
WithMessage("the repository %s contains only %d substrings, but the destination namespace replace count is %d",
repository, len, replaceCount)
default:
return fmt.Sprintf("%s/%s", namespace, strings.Join(subs[replaceCount:], "/")), nil
}
_, rest := util.ParseRepository(repository)
return fmt.Sprintf("%s/%s", namespace, rest)
}

View File

@ -105,11 +105,13 @@ func (s *stageTestSuite) TestAssembleDestinationResources() {
},
}
policy := &repctlmodel.Policy{
DestRegistry: &model.Registry{},
DestNamespace: "test",
Override: true,
DestRegistry: &model.Registry{},
DestNamespace: "test",
DestNamespaceReplaceCount: -1,
Override: true,
}
res := assembleDestinationResources(resources, policy)
res, err := assembleDestinationResources(resources, policy)
s.Require().Nil(err)
s.Len(res, 1)
s.Equal(model.ResourceTypeChart, res[0].Type)
s.Equal("test/hello-world", res[0].Metadata.Repository.Name)
@ -119,25 +121,78 @@ func (s *stageTestSuite) TestAssembleDestinationResources() {
func (s *stageTestSuite) TestReplaceNamespace() {
// empty namespace
repository := "c"
namespace := ""
result := replaceNamespace(repository, namespace)
var (
repository string = "c"
namespace string = ""
replaceCount int8 = 0
)
result, err := replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("c", result)
// repository contains no "/"
// replace count <0, repository contains no "/"
repository = "c"
namespace = "n"
result = replaceNamespace(repository, namespace)
replaceCount = -1
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/c", result)
// repository contains only one "/"
// replace count <0, repository contains only one "/"
repository = "b/c"
namespace = "n"
result = replaceNamespace(repository, namespace)
replaceCount = -1
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/c", result)
// repository contains more than one "/"
// replace count <0, repository contains more than one "/"
repository = "a/b/c"
namespace = "n"
result = replaceNamespace(repository, namespace)
replaceCount = -1
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/c", result)
// replace count > actual sub strings
repository = "a/b"
namespace = "n"
replaceCount = 3
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().NotNil(err)
// replace count = 0
repository = "a/b/c"
namespace = "n"
replaceCount = 0
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/a/b/c", result)
// replace count = 1
repository = "a/b/c"
namespace = "n"
replaceCount = 1
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/b/c", result)
// replace count = 2
repository = "a/b/c"
namespace = "n"
replaceCount = 2
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n/c", result)
// replace count = 3
repository = "a/b/c"
namespace = "n"
replaceCount = 3
result, err = replaceNamespace(repository, namespace, replaceCount)
s.Require().Nil(err)
s.Equal("n", result)
}
func TestStage(t *testing.T) {

View File

@ -29,20 +29,21 @@ import (
// Policy defines the structure of a replication policy
type Policy struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Creator string `json:"creator"`
SrcRegistry *model.Registry `json:"src_registry"`
DestRegistry *model.Registry `json:"dest_registry"`
DestNamespace string `json:"dest_namespace"`
Filters []*model.Filter `json:"filters"`
Trigger *model.Trigger `json:"trigger"`
ReplicateDeletion bool `json:"deletion"`
Override bool `json:"override"`
Enabled bool `json:"enabled"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Creator string `json:"creator"`
SrcRegistry *model.Registry `json:"src_registry"`
DestRegistry *model.Registry `json:"dest_registry"`
DestNamespace string `json:"dest_namespace"`
DestNamespaceReplaceCount int8 `json:"dest_namespace_replace_count"`
Filters []*model.Filter `json:"filters"`
Trigger *model.Trigger `json:"trigger"`
ReplicateDeletion bool `json:"deletion"`
Override bool `json:"override"`
Enabled bool `json:"enabled"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
}
// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
@ -142,6 +143,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.Description = policy.Description
p.Creator = policy.Creator
p.DestNamespace = policy.DestNamespace
p.DestNamespaceReplaceCount = policy.DestNamespaceReplaceCount
p.ReplicateDeletion = policy.ReplicateDeletion
p.Override = policy.Override
p.Enabled = policy.Enabled
@ -179,16 +181,17 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
// To converts to pkg model
func (p *Policy) To() (*replicationmodel.Policy, error) {
policy := &replicationmodel.Policy{
ID: p.ID,
Name: p.Name,
Description: p.Description,
Creator: p.Creator,
DestNamespace: p.DestNamespace,
Override: p.Override,
Enabled: p.Enabled,
ReplicateDeletion: p.ReplicateDeletion,
CreationTime: p.CreationTime,
UpdateTime: p.UpdateTime,
ID: p.ID,
Name: p.Name,
Description: p.Description,
Creator: p.Creator,
DestNamespace: p.DestNamespace,
DestNamespaceReplaceCount: p.DestNamespaceReplaceCount,
Override: p.Override,
Enabled: p.Enabled,
ReplicateDeletion: p.ReplicateDeletion,
CreationTime: p.CreationTime,
UpdateTime: p.UpdateTime,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID

View File

@ -140,7 +140,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
return err
}
// update the policy
if err := c.repMgr.Update(ctx, p); err != nil {
if err := c.repMgr.Update(ctx, p, props...); err != nil {
return err
}
// create schedule if needed

View File

@ -26,20 +26,21 @@ func init() {
// Policy is the model for replication policy
type Policy struct {
ID int64 `orm:"pk;auto;column(id)"`
Name string `orm:"column(name)"`
Description string `orm:"column(description)"`
Creator string `orm:"column(creator)"`
SrcRegistryID int64 `orm:"column(src_registry_id)"`
DestRegistryID int64 `orm:"column(dest_registry_id)"`
DestNamespace string `orm:"column(dest_namespace)"`
Override bool `orm:"column(override)"`
Enabled bool `orm:"column(enabled)"`
Trigger string `orm:"column(trigger)"`
Filters string `orm:"column(filters)"`
ReplicateDeletion bool `orm:"column(replicate_deletion)"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"`
UpdateTime time.Time `orm:"column(update_time);auto_now"`
ID int64 `orm:"pk;auto;column(id)"`
Name string `orm:"column(name)"`
Description string `orm:"column(description)"`
Creator string `orm:"column(creator)"`
SrcRegistryID int64 `orm:"column(src_registry_id)"`
DestRegistryID int64 `orm:"column(dest_registry_id)"`
DestNamespace string `orm:"column(dest_namespace)"`
DestNamespaceReplaceCount int8 `orm:"column(dest_namespace_replace_count)"`
Override bool `orm:"column(override)"`
Enabled bool `orm:"column(enabled)"`
Trigger string `orm:"column(trigger)"`
Filters string `orm:"column(filters)"`
ReplicateDeletion bool `orm:"column(replicate_deletion)"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"`
UpdateTime time.Time `orm:"column(update_time);auto_now"`
}
// TableName set table name for ORM

View File

@ -225,6 +225,9 @@ func (r *registryAPI) PingRegistry(ctx context.Context, params operation.PingReg
}
registry.URL = url
}
if params.Registry.Insecure != nil {
registry.Insecure = *params.Registry.Insecure
}
if params.Registry.CredentialType != nil {
if registry.Credential == nil {
registry.Credential = &model.Credential{}

View File

@ -66,6 +66,12 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope
Override: params.Policy.Override,
Enabled: params.Policy.Enabled,
}
// Make this field be optional to keep backward compatibility
if params.Policy.DestNamespaceReplaceCount != nil {
policy.DestNamespaceReplaceCount = *params.Policy.DestNamespaceReplaceCount
} else {
policy.DestNamespaceReplaceCount = -1 // -1 mean the legacy mode
}
if params.Policy.SrcRegistry != nil {
policy.SrcRegistry = &model.Registry{
ID: params.Policy.SrcRegistry.ID,
@ -115,6 +121,13 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope
Override: params.Policy.Override,
Enabled: params.Policy.Enabled,
}
// Make this field be optional to keep backward compatibility
if params.Policy.DestNamespaceReplaceCount != nil {
policy.DestNamespaceReplaceCount = *params.Policy.DestNamespaceReplaceCount
} else {
policy.DestNamespaceReplaceCount = -1 // -1 mean the legacy mode
}
if params.Policy.SrcRegistry != nil {
policy.SrcRegistry = &model.Registry{
ID: params.Policy.SrcRegistry.ID,
@ -387,17 +400,19 @@ func (r *replicationAPI) GetReplicationLog(ctx context.Context, params operation
}
func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPolicy {
replaceCount := policy.DestNamespaceReplaceCount
p := &models.ReplicationPolicy{
CreationTime: strfmt.DateTime(policy.CreationTime),
Deletion: policy.ReplicateDeletion,
Description: policy.Description,
DestNamespace: policy.DestNamespace,
Enabled: policy.Enabled,
ID: policy.ID,
Name: policy.Name,
Override: policy.Override,
ReplicateDeletion: policy.ReplicateDeletion,
UpdateTime: strfmt.DateTime(policy.UpdateTime),
CreationTime: strfmt.DateTime(policy.CreationTime),
Deletion: policy.ReplicateDeletion,
Description: policy.Description,
DestNamespace: policy.DestNamespace,
DestNamespaceReplaceCount: &replaceCount,
Enabled: policy.Enabled,
ID: policy.ID,
Name: policy.Name,
Override: policy.Override,
ReplicateDeletion: policy.ReplicateDeletion,
UpdateTime: strfmt.DateTime(policy.UpdateTime),
}
if policy.SrcRegistry != nil {
p.SrcRegistry = convertRegistry(policy.SrcRegistry)