Fix bug found in replication

Call filterResources direclty if resource isn't null in copy flow

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-04-19 17:31:07 +08:00
parent e92164c886
commit 5c944d98d5
8 changed files with 63 additions and 47 deletions

View File

@ -233,9 +233,11 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
if err != nil {
return nil, err
}
log.Debugf("got %d namespaces", len(namespaces))
for _, ns := range namespaces {
page := 1
pageSize := 100
n := 0
for {
pageRepos, err := a.getRepos(ns.Name, "", page, pageSize)
if err != nil {
@ -243,24 +245,26 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
}
repos = append(repos, pageRepos.Repos...)
n += len(pageRepos.Repos)
if len(pageRepos.Next) == 0 {
break
}
page++
}
log.Debugf("got %d repositories for namespace %s", n, ns.Name)
}
log.Infof("%d repos found for namespaces: %v", len(repos), namespaces)
var resources []*model.Resource
// TODO(ChenDe): Get tags for repos in parallel
for _, repo := range repos {
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, repo.Name)
m, err := util.Match(nameFilter, name)
if err != nil {
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", repo.Name, nameFilter, err)
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
}
if !m {
continue
}
@ -305,7 +309,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: fmt.Sprintf("%s/%s", repo.Namespace, repo.Name),
Name: name,
},
Vtags: tags,
},

View File

@ -59,12 +59,7 @@ type controller struct {
func (c *controller) StartReplication(policy *model.Policy, resource *model.Resource, trigger model.TriggerType) (int64, error) {
if !policy.Enabled {
return 0, fmt.Errorf("the policy %d is diabled", policy.ID)
}
// only support one tag if the resource is specified as we append the tag name as a filter
// when creating the flow in function "createFlow"
if resource != nil && len(resource.Metadata.Vtags) != 1 {
return 0, fmt.Errorf("the length of Vtags must be 1: %v", resource.Metadata.Vtags)
return 0, fmt.Errorf("the policy %d is disabled", policy.ID)
}
if len(trigger) == 0 {
trigger = model.TriggerTypeManual
@ -98,31 +93,13 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
func (c *controller) createFlow(executionID int64, policy *model.Policy, resource *model.Resource) flow.Flow {
// replicate the deletion operation, so create a deletion flow
if resource != nil && resource.Deleted {
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, []*model.Resource{resource})
return flow.NewDeletionFlow(c.executionMgr, c.scheduler, executionID, policy, resource)
}
// copy only one resource, add extra filters to the policy to make sure
// only the specified resource will be filtered out
resources := []*model.Resource{}
if resource != nil {
filters := []*model.Filter{
{
Type: model.FilterTypeResource,
Value: resource.Type,
},
{
Type: model.FilterTypeName,
// TODO only filter the repo part?
Value: resource.Metadata.Repository.Name,
},
{
Type: model.FilterTypeTag,
// only support replicate one tag
Value: resource.Metadata.Vtags[0],
},
}
filters = append(filters, policy.Filters...)
policy.Filters = filters
resources = append(resources, resource)
}
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy)
return flow.NewCopyFlow(c.executionMgr, c.scheduler, executionID, policy, resources...)
}
func (c *controller) StopReplication(executionID int64) error {

View File

@ -230,12 +230,7 @@ func TestStartReplication(t *testing.T) {
require.NotNil(t, err)
policy.Enabled = true
// the resource contains Vtags whose length isn't 1
_, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.NotNil(t, err)
// replicate resource deletion
resource.Metadata.Vtags = []string{"1.0"}
resource.Deleted = true
id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased)
require.Nil(t, err)

View File

@ -26,20 +26,23 @@ import (
type copyFlow struct {
executionID int64
resources []*model.Resource
policy *model.Policy
executionMgr execution.Manager
scheduler scheduler.Scheduler
}
// NewCopyFlow returns an instance of the copy flow which replicates the resources from
// the source registry to the destination registry
// the source registry to the destination registry. If the parameter "resources" isn't provided,
// will fetch the resources first
func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
executionID int64, policy *model.Policy) Flow {
executionID int64, policy *model.Policy, resources ...*model.Resource) Flow {
return &copyFlow{
executionMgr: executionMgr,
scheduler: scheduler,
executionID: executionID,
policy: policy,
resources: resources,
}
}
@ -48,16 +51,23 @@ func (c *copyFlow) Run(interface{}) (int, error) {
if err != nil {
return 0, err
}
srcResources, err := fetchResources(srcAdapter, c.policy)
var srcResources []*model.Resource
if len(c.resources) > 0 {
srcResources, err = filterResources(c.resources, c.policy.Filters)
} else {
srcResources, err = fetchResources(srcAdapter, c.policy)
}
if err != nil {
return 0, err
}
if len(srcResources) == 0 {
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
return 0, nil
}
srcResources = assembleSourceResources(srcResources, c.policy)
dstResources := assembleDestinationResources(srcResources, c.policy)
if err = prepareForPush(dstAdapter, dstResources); err != nil {

View File

@ -32,7 +32,7 @@ type deletionFlow struct {
// NewDeletionFlow returns an instance of the delete flow which deletes the resources
// on the destination registry
func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
executionID int64, policy *model.Policy, resources []*model.Resource) Flow {
executionID int64, policy *model.Policy, resources ...*model.Resource) Flow {
return &deletionFlow{
executionMgr: executionMgr,
scheduler: scheduler,
@ -43,10 +43,6 @@ func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Schedul
}
func (d *deletionFlow) Run(interface{}) (int, error) {
// filling the registry information
for _, resource := range d.resources {
resource.Registry = d.policy.SrcRegistry
}
srcResources, err := filterResources(d.resources, d.policy.Filters)
if err != nil {
return 0, err
@ -57,6 +53,7 @@ func (d *deletionFlow) Run(interface{}) (int, error) {
return 0, nil
}
srcResources = assembleSourceResources(srcResources, d.policy)
dstResources := assembleDestinationResources(srcResources, d.policy)
items, err := preprocess(d.scheduler, srcResources, dstResources)

View File

@ -43,7 +43,7 @@ func TestRunOfDeletionFlow(t *testing.T) {
},
},
}
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources)
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources...)
n, err := flow.Run(nil)
require.Nil(t, err)
assert.Equal(t, 1, n)

View File

@ -182,6 +182,16 @@ func filterResources(resources []*model.Resource, filters []*model.Filter) ([]*m
return res, nil
}
// assemble the source resources by filling the registry information
func assembleSourceResources(resources []*model.Resource,
policy *model.Policy) []*model.Resource {
for _, resource := range resources {
resource.Registry = policy.SrcRegistry
}
log.Debug("assemble the source resources completed")
return resources
}
// assemble the destination resources by filling the metadata, registry and override properties
func assembleDestinationResources(resources []*model.Resource,
policy *model.Policy) []*model.Resource {

View File

@ -276,6 +276,29 @@ func TestFilterResources(t *testing.T) {
assert.Equal(t, "0.2.0", res[0].Metadata.Vtags[0])
}
func TestAssembleSourceResources(t *testing.T) {
resources := []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: "library/hello-world",
},
Vtags: []string{"latest"},
},
Override: false,
},
}
policy := &model.Policy{
SrcRegistry: &model.Registry{
ID: 1,
},
}
res := assembleSourceResources(resources, policy)
assert.Equal(t, 1, len(res))
assert.Equal(t, int64(1), res[0].Registry.ID)
}
func TestAssembleDestinationResources(t *testing.T) {
resources := []*model.Resource{
{