Refactor replication filter chain

This commit is contained in:
Wenkai Yin 2017-12-11 17:11:50 +08:00
parent 76c88d8d49
commit 1d2e206ce4
12 changed files with 387 additions and 79 deletions

View File

@ -22,7 +22,4 @@ const (
TriggerScheduleDaily = "daily"
//TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly"
// PatternMatchAll : the pattern that match all
PatternMatchAll = ".*"
)

View File

@ -83,7 +83,8 @@ func NewDefaultController(cfg ControllerConfig) *DefaultController {
triggerManager: trigger.NewManager(cfg.CacheCapacity),
}
endpoint := "http://jobservice"
// TODO read from configuration
endpoint := "http://jobservice:8080"
client := client.NewAuthorizedClient(auth.NewSecretAuthorizer(config.UISecret()))
ctl.replicator = replicator.NewDefaultReplicator(endpoint, client)
@ -226,19 +227,8 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
return fmt.Errorf("policy %d not found", policyID)
}
candidates := []models.FilterItem{}
if len(metadata) > 0 {
meta := metadata[0]["candidates"]
if meta != nil {
cands, ok := meta.([]models.FilterItem)
if ok {
candidates = append(candidates, cands...)
}
}
}
// prepare candidates for replication
candidates = getCandidates(&policy, ctl.sourcer, candidates...)
candidates := getCandidates(&policy, ctl.sourcer, metadata...)
// TODO
/*
@ -252,13 +242,23 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
}
*/
// TODO merge tags whose repository is same into one struct
// submit the replication
return replicate(ctl.replicator, policyID, candidates)
}
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, candidates ...models.FilterItem) []models.FilterItem {
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer,
metadata ...map[string]interface{}) []models.FilterItem {
candidates := []models.FilterItem{}
if len(metadata) > 0 {
meta := metadata[0]["candidates"]
if meta != nil {
cands, ok := meta.([]models.FilterItem)
if ok {
candidates = append(candidates, cands...)
}
}
}
if len(candidates) == 0 {
for _, namespace := range policy.Namespaces {
candidates = append(candidates, models.FilterItem{
@ -277,53 +277,26 @@ func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, ca
func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) source.FilterChain {
filters := []source.Filter{}
patternMap := map[string]string{}
patterns := map[string]string{}
for _, f := range policy.Filters {
patternMap[f.Kind] = f.Pattern
patterns[f.Kind] = f.Pattern
}
// TODO convert wildcard to regex expression
projectPattern, exist := patternMap[replication.FilterItemKindProject]
if !exist {
projectPattern = replication.PatternMatchAll
}
repositoryPattern, exist := patternMap[replication.FilterItemKindRepository]
if !exist {
repositoryPattern = replication.PatternMatchAll
}
repositoryPattern = fmt.Sprintf("%s/%s", projectPattern, repositoryPattern)
tagPattern, exist := patternMap[replication.FilterItemKindTag]
if !exist {
tagPattern = replication.PatternMatchAll
}
tagPattern = fmt.Sprintf("%s:%s", repositoryPattern, tagPattern)
if policy.Trigger != nil && policy.Trigger.Kind == replication.TriggerKindImmediate {
// build filter chain for immediate trigger policy
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindTag, tagPattern))
} else {
// build filter chain for manual and schedule trigger policy
// append project filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindProject, projectPattern))
// append repository filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindRepository,
repositoryPattern, source.NewRepositoryConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor))))
// append tag filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindTag,
tagPattern, source.NewTagConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor))))
}
registry := sourcer.GetAdaptor(replication.AdaptorKindHarbor)
// only support repository and tag filter for now
filters = append(filters,
source.NewRepositoryFilter(patterns[replication.FilterItemKindRepository], registry))
filters = append(filters,
source.NewTagFilter(patterns[replication.FilterItemKindTag], registry))
return source.NewDefaultFilterChain(filters)
}
func replicate(replicator replicator.Replicator, policyID int64, candidates []models.FilterItem) error {
if len(candidates) == 0 {
log.Debugf("replicaton candidates are null, no further action needed")
}
repositories := map[string][]string{}
// TODO the operation of all candidates are same for now. Update it after supporting
// replicate deletion

View File

@ -79,7 +79,7 @@ func TestGetCandidates(t *testing.T) {
Filters: []models.Filter{
models.Filter{
Kind: replication.FilterItemKindTag,
Pattern: ".*",
Pattern: "*",
},
},
Trigger: &models.Trigger{
@ -99,16 +99,19 @@ func TestGetCandidates(t *testing.T) {
Value: "library/hello-world:latest",
},
}
result := getCandidates(policy, sourcer, candidates...)
metadata := map[string]interface{}{
"candidates": candidates,
}
result := getCandidates(policy, sourcer, metadata)
assert.Equal(t, 2, len(result))
policy.Filters = []models.Filter{
models.Filter{
Kind: replication.FilterItemKindTag,
Pattern: "release-.*",
Pattern: "release-*",
},
}
result = getCandidates(policy, sourcer, candidates...)
result = getCandidates(policy, sourcer, metadata)
assert.Equal(t, 1, len(result))
}
@ -116,10 +119,6 @@ func TestBuildFilterChain(t *testing.T) {
policy := &models.ReplicationPolicy{
ID: 1,
Filters: []models.Filter{
models.Filter{
Kind: replication.FilterItemKindProject,
Pattern: "*",
},
models.Filter{
Kind: replication.FilterItemKindRepository,
Pattern: "*",
@ -134,11 +133,5 @@ func TestBuildFilterChain(t *testing.T) {
sourcer := source.NewSourcer()
chain := buildFilterChain(policy, sourcer)
assert.Equal(t, 3, len(chain.Filters()))
policy.Trigger = &models.Trigger{
Kind: replication.TriggerKindImmediate,
}
chain = buildFilterChain(policy, sourcer)
assert.Equal(t, 1, len(chain.Filters()))
assert.Equal(t, 2, len(chain.Filters()))
}

View File

@ -0,0 +1,23 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"path/filepath"
)
func match(pattern, str string) (bool, error) {
return filepath.Match(pattern, str)
}

View File

@ -15,7 +15,6 @@
package source
import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
@ -39,9 +38,9 @@ func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.Filter
// if support replicate deletion
result := []models.FilterItem{}
for _, item := range items {
// just put it to the result list if the item is not a project
if item.Kind != replication.FilterItemKindProject {
log.Warningf("unexpected filter item kind for repository convertor, expected %s got %s, skip",
replication.FilterItemKindProject, item.Kind)
result = append(result, item)
continue
}

View File

@ -41,6 +41,9 @@ func TestRepositoryConvert(t *testing.T) {
Kind: replication.FilterItemKindRepository,
Value: "library/centos",
},
models.FilterItem{
Kind: replication.FilterItemKindRepository,
},
}
convertor := NewRepositoryConvertor(&fakeRegistryAdaptor{})

View File

@ -0,0 +1,80 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"strings"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
// RepositoryFilter implement Filter interface to filter repository
type RepositoryFilter struct {
pattern string
convertor Convertor
}
// NewRepositoryFilter returns an instance of RepositoryFilter
func NewRepositoryFilter(pattern string, registry registry.Adaptor) *RepositoryFilter {
return &RepositoryFilter{
pattern: pattern,
convertor: NewRepositoryConvertor(registry),
}
}
// Init ...
func (r *RepositoryFilter) Init() error {
return nil
}
// GetConvertor ...
func (r *RepositoryFilter) GetConvertor() Convertor {
return r.convertor
}
// DoFilter filters repository and image(according to the repository part) and drops any other resource types
func (r *RepositoryFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindRepository && item.Kind != replication.FilterItemKindTag {
log.Warningf("unsupported type %s for repository filter, drop", item.Kind)
continue
}
repository := item.Value
if item.Kind == replication.FilterItemKindTag {
repository = strings.SplitN(repository, ":", 2)[0]
}
if len(r.pattern) == 0 {
log.Debugf("pattern is null, add %s to the repository filter result list", item.Value)
result = append(result, item)
} else {
matched, err := match(r.pattern, repository)
if err != nil {
log.Errorf("failed to match pattern %s to value %s: %v", r.pattern, repository, err)
break
}
if matched {
log.Debugf("pattern %s matched, add %s to the repository filter result list", r.pattern, item.Value)
result = append(result, item)
}
}
}
return result
}

View File

@ -0,0 +1,75 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
func TestInitOfRepositoryFilter(t *testing.T) {
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
assert.Nil(t, filter.Init())
}
func TestGetConvertorOfRepositoryFilter(t *testing.T) {
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
assert.NotNil(t, filter.GetConvertor())
}
func TestDoFilterOfRepositoryFilter(t *testing.T) {
// invalid filter item type
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
items := filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: "invalid_type",
},
})
assert.Equal(t, 0, len(items))
// empty pattern
filter = NewRepositoryFilter("", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindRepository,
Value: "library/hello-world",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewRepositoryFilter("library/*", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewRepositoryFilter("library/*", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
}

View File

@ -15,7 +15,6 @@
package source
import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
@ -38,8 +37,8 @@ func (t *TagConvertor) Convert(items []models.FilterItem) []models.FilterItem {
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindRepository {
log.Warningf("unexpected filter item kind for tag convertor, expected %s got %s, skip",
replication.FilterItemKindRepository, item.Kind)
// just put it to the result list if the item is not a repository
result = append(result, item)
continue
}

View File

@ -41,6 +41,9 @@ func TestTagConvert(t *testing.T) {
Kind: replication.FilterItemKindTag,
Value: "library/ubuntu:16.04",
},
models.FilterItem{
Kind: replication.FilterItemKindProject,
},
}
convertor := NewTagConvertor(&fakeRegistryAdaptor{})

View File

@ -0,0 +1,78 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"strings"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
// TagFilter implements Filter interface to filter tag
type TagFilter struct {
pattern string
convertor Convertor
}
// NewTagFilter returns an instance of TagFilter
func NewTagFilter(pattern string, registry registry.Adaptor) *TagFilter {
return &TagFilter{
pattern: pattern,
convertor: NewTagConvertor(registry),
}
}
// Init ...
func (t *TagFilter) Init() error {
return nil
}
// GetConvertor ...
func (t *TagFilter) GetConvertor() Convertor {
return t.convertor
}
// DoFilter filters tag of the image
func (t *TagFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindTag {
log.Warningf("unsupported type %s for tag filter, dropped", item.Kind)
continue
}
if len(t.pattern) == 0 {
log.Debugf("pattern is null, add %s to the tag filter result list", item.Value)
result = append(result, item)
continue
}
tag := strings.SplitN(item.Value, ":", 2)[1]
matched, err := match(t.pattern, tag)
if err != nil {
log.Errorf("failed to match pattern %s to value %s: %v", t.pattern, tag, err)
continue
}
if matched {
log.Debugf("pattern %s matched, add %s to the tag filter result list", t.pattern, item.Value)
result = append(result, item)
}
}
return result
}

View File

@ -0,0 +1,85 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
func TestInitOfTagFilter(t *testing.T) {
filter := NewTagFilter("", &registry.HarborAdaptor{})
assert.Nil(t, filter.Init())
}
func TestGetConvertorOfTagFilter(t *testing.T) {
filter := NewTagFilter("", &registry.HarborAdaptor{})
assert.NotNil(t, filter.GetConvertor())
}
func TestDoFilterOfTagFilter(t *testing.T) {
// invalid filter item type
filter := NewTagFilter("", &registry.HarborAdaptor{})
items := filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: "invalid_type",
},
})
assert.Equal(t, 0, len(items))
// empty pattern
filter = NewTagFilter("", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("l*t", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("lates?", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("latest?", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 0, len(items))
}