mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-27 12:46:03 +01:00
Merge pull request #7460 from ywk253100/190419_replication_bug_fix
Fix bug found in replication
This commit is contained in:
commit
d8da6cb802
@ -233,9 +233,11 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debugf("got %d namespaces", len(namespaces))
|
||||||
for _, ns := range namespaces {
|
for _, ns := range namespaces {
|
||||||
page := 1
|
page := 1
|
||||||
pageSize := 100
|
pageSize := 100
|
||||||
|
n := 0
|
||||||
for {
|
for {
|
||||||
pageRepos, err := a.getRepos(ns.Name, "", page, pageSize)
|
pageRepos, err := a.getRepos(ns.Name, "", page, pageSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -243,24 +245,26 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
|
|||||||
}
|
}
|
||||||
repos = append(repos, pageRepos.Repos...)
|
repos = append(repos, pageRepos.Repos...)
|
||||||
|
|
||||||
|
n += len(pageRepos.Repos)
|
||||||
if len(pageRepos.Next) == 0 {
|
if len(pageRepos.Next) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
page++
|
page++
|
||||||
}
|
}
|
||||||
|
log.Debugf("got %d repositories for namespace %s", n, ns.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("%d repos found for namespaces: %v", len(repos), namespaces)
|
|
||||||
var resources []*model.Resource
|
var resources []*model.Resource
|
||||||
// TODO(ChenDe): Get tags for repos in parallel
|
// TODO(ChenDe): Get tags for repos in parallel
|
||||||
for _, repo := range repos {
|
for _, repo := range repos {
|
||||||
|
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
|
||||||
// If name filter set, skip repos that don't match the filter pattern.
|
// If name filter set, skip repos that don't match the filter pattern.
|
||||||
if len(nameFilter) != 0 {
|
if len(nameFilter) != 0 {
|
||||||
m, err := util.Match(nameFilter, repo.Name)
|
m, err := util.Match(nameFilter, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", repo.Name, nameFilter, err)
|
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !m {
|
if !m {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -305,7 +309,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
|
|||||||
Registry: a.registry,
|
Registry: a.registry,
|
||||||
Metadata: &model.ResourceMetadata{
|
Metadata: &model.ResourceMetadata{
|
||||||
Repository: &model.Repository{
|
Repository: &model.Repository{
|
||||||
Name: fmt.Sprintf("%s/%s", repo.Namespace, repo.Name),
|
Name: name,
|
||||||
},
|
},
|
||||||
Vtags: tags,
|
Vtags: tags,
|
||||||
},
|
},
|
||||||
|
@ -59,12 +59,7 @@ type controller struct {
|
|||||||
|
|
||||||
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
|
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
|
||||||
if !policy.Enabled {
|
if !policy.Enabled {
|
||||||
return 0, fmt.Errorf("the policy %d is diabled", policy.ID)
|
return 0, fmt.Errorf("the policy %d is disabled", policy.ID)
|
||||||
}
|
|
||||||
// only support one tag if the resource is specified as we append the tag name as a filter
|
|
||||||
// when creating the flow in function "createFlow"
|
|
||||||
if resource != nil && len(resource.Metadata.Vtags) != 1 {
|
|
||||||
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
|
|
||||||
}
|
}
|
||||||
if len(trigger) == 0 {
|
if len(trigger) == 0 {
|
||||||
trigger = model.TriggerTypeManual
|
trigger = model.TriggerTypeManual
|
||||||
@ -98,31 +93,13 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
|
|||||||
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
|
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
|
||||||
// replicate the deletion operation, so create a deletion flow
|
// replicate the deletion operation, so create a deletion flow
|
||||||
if resource != nil && resource.Deleted {
|
if resource != nil && resource.Deleted {
|
||||||
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, []*model.Resource{resource})
|
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, resource)
|
||||||
}
|
}
|
||||||
// copy only one resource, add extra filters to the policy to make sure
|
resources := []*model.Resource{}
|
||||||
// only the specified resource will be filtered out
|
|
||||||
if resource != nil {
|
if resource != nil {
|
||||||
filters := []*model.Filter{
|
resources = append(resources, resource)
|
||||||
{
|
|
||||||
Type: model.FilterTypeResource,
|
|
||||||
Value: resource.Type,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: model.FilterTypeName,
|
|
||||||
// TODO only filter the repo part?
|
|
||||||
Value: resource.Metadata.Repository.Name,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Type: model.FilterTypeTag,
|
|
||||||
// only support replicate one tag
|
|
||||||
Value: resource.Metadata.Vtags[0],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
filters = append(filters, policy.Filters...)
|
|
||||||
policy.Filters = filters
|
|
||||||
}
|
}
|
||||||
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy)
|
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy, resources...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) StopReplication(executionID int64) error {
|
func (c *controller) StopReplication(executionID int64) error {
|
||||||
|
@ -230,12 +230,7 @@ func TestStartReplication(t *testing.T) {
|
|||||||
require.NotNil(t, err)
|
require.NotNil(t, err)
|
||||||
|
|
||||||
policy.Enabled = true
|
policy.Enabled = true
|
||||||
// the resource contains Vtags whose length isn't 1
|
|
||||||
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// replicate resource deletion
|
// replicate resource deletion
|
||||||
resource.Metadata.Vtags = []string{"1.0"}
|
|
||||||
resource.Deleted = true
|
resource.Deleted = true
|
||||||
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
@ -26,20 +26,23 @@ import (
|
|||||||
|
|
||||||
type copyFlow struct {
|
type copyFlow struct {
|
||||||
executionID int64
|
executionID int64
|
||||||
|
resources []*model.Resource
|
||||||
policy *model.Policy
|
policy *model.Policy
|
||||||
executionMgr execution.Manager
|
executionMgr execution.Manager
|
||||||
scheduler scheduler.Scheduler
|
scheduler scheduler.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
|
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
|
||||||
// the source registry to the destination registry
|
// the source registry to the destination registry. If the parameter "resources" isn't provided,
|
||||||
|
// will fetch the resources first
|
||||||
func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
|
func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
|
||||||
executionID int64, policy *model.Policy) Flow {
|
executionID int64, policy *model.Policy, resources ...*model.Resource) Flow {
|
||||||
return ©Flow{
|
return ©Flow{
|
||||||
executionMgr: executionMgr,
|
executionMgr: executionMgr,
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
executionID: executionID,
|
executionID: executionID,
|
||||||
policy: policy,
|
policy: policy,
|
||||||
|
resources: resources,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,16 +51,23 @@ func (c *copyFlow) Run(interface{}) (int, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
srcResources, err := fetchResources(srcAdapter, c.policy)
|
var srcResources []*model.Resource
|
||||||
|
if len(c.resources) > 0 {
|
||||||
|
srcResources, err = filterResources(c.resources, c.policy.Filters)
|
||||||
|
} else {
|
||||||
|
srcResources, err = fetchResources(srcAdapter, c.policy)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(srcResources) == 0 {
|
if len(srcResources) == 0 {
|
||||||
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
|
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
|
||||||
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
|
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
srcResources = assembleSourceResources(srcResources, c.policy)
|
||||||
dstResources := assembleDestinationResources(srcResources, c.policy)
|
dstResources := assembleDestinationResources(srcResources, c.policy)
|
||||||
|
|
||||||
if err = prepareForPush(dstAdapter, dstResources); err != nil {
|
if err = prepareForPush(dstAdapter, dstResources); err != nil {
|
||||||
|
@ -32,7 +32,7 @@ type deletionFlow struct {
|
|||||||
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
|
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
|
||||||
// on the destination registry
|
// on the destination registry
|
||||||
func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
|
func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
|
||||||
executionID int64, policy *model.Policy, resources []*model.Resource) Flow {
|
executionID int64, policy *model.Policy, resources ...*model.Resource) Flow {
|
||||||
return &deletionFlow{
|
return &deletionFlow{
|
||||||
executionMgr: executionMgr,
|
executionMgr: executionMgr,
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
@ -43,10 +43,6 @@ func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Schedul
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *deletionFlow) Run(interface{}) (int, error) {
|
func (d *deletionFlow) Run(interface{}) (int, error) {
|
||||||
// filling the registry information
|
|
||||||
for _, resource := range d.resources {
|
|
||||||
resource.Registry = d.policy.SrcRegistry
|
|
||||||
}
|
|
||||||
srcResources, err := filterResources(d.resources, d.policy.Filters)
|
srcResources, err := filterResources(d.resources, d.policy.Filters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -57,6 +53,7 @@ func (d *deletionFlow) Run(interface{}) (int, error) {
|
|||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
srcResources = assembleSourceResources(srcResources, d.policy)
|
||||||
dstResources := assembleDestinationResources(srcResources, d.policy)
|
dstResources := assembleDestinationResources(srcResources, d.policy)
|
||||||
|
|
||||||
items, err := preprocess(d.scheduler, srcResources, dstResources)
|
items, err := preprocess(d.scheduler, srcResources, dstResources)
|
||||||
|
@ -43,7 +43,7 @@ func TestRunOfDeletionFlow(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources)
|
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources...)
|
||||||
n, err := flow.Run(nil)
|
n, err := flow.Run(nil)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
assert.Equal(t, 1, n)
|
assert.Equal(t, 1, n)
|
||||||
|
@ -182,6 +182,16 @@ func filterResources(resources []*model.Resource, filters []*model.Filter) ([]*m
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assemble the source resources by filling the registry information
|
||||||
|
func assembleSourceResources(resources []*model.Resource,
|
||||||
|
policy *model.Policy) []*model.Resource {
|
||||||
|
for _, resource := range resources {
|
||||||
|
resource.Registry = policy.SrcRegistry
|
||||||
|
}
|
||||||
|
log.Debug("assemble the source resources completed")
|
||||||
|
return resources
|
||||||
|
}
|
||||||
|
|
||||||
// assemble the destination resources by filling the metadata, registry and override properties
|
// assemble the destination resources by filling the metadata, registry and override properties
|
||||||
func assembleDestinationResources(resources []*model.Resource,
|
func assembleDestinationResources(resources []*model.Resource,
|
||||||
policy *model.Policy) []*model.Resource {
|
policy *model.Policy) []*model.Resource {
|
||||||
|
@ -276,6 +276,29 @@ func TestFilterResources(t *testing.T) {
|
|||||||
assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0])
|
assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAssembleSourceResources(t *testing.T) {
|
||||||
|
resources := []*model.Resource{
|
||||||
|
{
|
||||||
|
Type: model.ResourceTypeChart,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Repository: &model.Repository{
|
||||||
|
Name: "library/hello-world",
|
||||||
|
},
|
||||||
|
Vtags: []string{"latest"},
|
||||||
|
},
|
||||||
|
Override: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
policy := &model.Policy{
|
||||||
|
SrcRegistry: &model.Registry{
|
||||||
|
ID: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
res := assembleSourceResources(resources, policy)
|
||||||
|
assert.Equal(t, 1, len(res))
|
||||||
|
assert.Equal(t, int64(1), res[0].Registry.ID)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAssembleDestinationResources(t *testing.T) {
|
func TestAssembleDestinationResources(t *testing.T) {
|
||||||
resources := []*model.Resource{
|
resources := []*model.Resource{
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user