From c5ccb7e53c286b7c21f32f2c8e5201c86385aa80 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 29 Nov 2017 15:14:13 +0800 Subject: [PATCH] Enable filter chain in replication --- src/replication/consts.go | 7 +- src/replication/core/controller.go | 111 ++++++++++++++++- src/replication/models/filter.go | 41 ++++++ src/replication/models/filter_item.go | 2 + src/replication/models/policy.go | 2 +- src/replication/policy/manager.go | 9 +- src/replication/registry/harbor_adaptor.go | 40 +++++- .../source/repository_convertor.go | 11 +- .../source/tag_combination_filter.go | 1 + src/replication/source/tag_convertor.go | 9 +- src/ui/api/models/replication_policy.go | 2 +- src/ui/api/utils.go | 117 ------------------ src/ui/main.go | 5 + 13 files changed, 219 insertions(+), 138 deletions(-) create mode 100644 src/replication/models/filter.go diff --git a/src/replication/consts.go b/src/replication/consts.go index 813988d54..a2148823b 100644 --- a/src/replication/consts.go +++ b/src/replication/consts.go @@ -23,8 +23,11 @@ const ( //TriggerScheduleWeekly : type of scheduling is 'weekly' TriggerScheduleWeekly = "weekly" - //OperationPush : operation for pushing images + //OperationPush : push operation OperationPush = "push" - //OperationDelete : operation for deleting images + //OperationDelete : delete operation OperationDelete = "delete" + + // PatternMatchAll : the pattern that match all + PatternMatchAll = ".*" ) diff --git a/src/replication/core/controller.go b/src/replication/core/controller.go index d06727380..1bcc094a8 100644 --- a/src/replication/core/controller.go +++ b/src/replication/core/controller.go @@ -3,6 +3,9 @@ package core import ( "fmt" + "github.com/vmware/harbor/src/common/dao" + common_models "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/policy" @@ -182,9 +185,113 @@ func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.Replic //Replicate starts one replication defined in the specified policy; //Can be launched by the API layer and related triggers. -func (ctl *Controller) Replicate(policyID int64, metadate ...map[string]interface{}) error { +func (ctl *Controller) Replicate(policyID int64, metadata ...map[string]interface{}) error { + policy, err := ctl.GetPolicy(policyID) + if err != nil { + return err + } + if policy.ID == 0 { + return fmt.Errorf("policy %d not found", policyID) + } - fmt.Printf("replicating %d, metadata: %v ...\n", policyID, metadate) + candidates := []models.FilterItem{} + if len(metadata) > 0 { + meta := metadata[0]["candidates"] + if meta != nil { + cands, ok := meta.([]models.FilterItem) + if ok { + candidates = append(candidates, cands...) + } + } + } + + // prepare candidates for replication + candidates = getCandidates(&policy, ctl.sourcer, candidates...) + + targets := []*common_models.RepTarget{} + for _, targetID := range policy.TargetIDs { + target, err := dao.GetRepTarget(targetID) + if err != nil { + return err + } + targets = append(targets, target) + } + + // TODO merge tags whose repository is same into one struct + + // call job service to do the replication + return replicate(candidates, targets) +} + +func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, candidates ...models.FilterItem) []models.FilterItem { + if len(candidates) == 0 { + for _, namespace := range policy.Namespaces { + candidates = append(candidates, models.FilterItem{ + Kind: replication.FilterItemKindProject, + Value: namespace, + Operation: replication.OperationPush, + }) + } + } + + filterChain := buildFilterChain(policy, sourcer) + + return filterChain.DoFilter(candidates) +} + +func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) source.FilterChain { + filters := []source.Filter{} + + patternMap := map[string]string{} + for _, f := range policy.Filters { + patternMap[f.Kind] = f.Pattern + } + + // TODO convert wildcard to regex expression + projectPattern, exist := patternMap[replication.FilterItemKindProject] + if !exist { + projectPattern = replication.PatternMatchAll + } + + repositoryPattern, exist := patternMap[replication.FilterItemKindRepository] + if !exist { + repositoryPattern = replication.PatternMatchAll + } + repositoryPattern = fmt.Sprintf("%s/%s", projectPattern, repositoryPattern) + + tagPattern, exist := patternMap[replication.FilterItemKindProject] + if !exist { + tagPattern = replication.PatternMatchAll + } + tagPattern = fmt.Sprintf("%s:%s", repositoryPattern, tagPattern) + + if policy.Trigger.Kind == replication.TriggerKindImmediate { + // build filter chain for immediate trigger policy + filters = append(filters, + source.NewPatternFilter(replication.FilterItemKindTag, tagPattern)) + } else { + // build filter chain for manual and schedule trigger policy + + // append project filter + filters = append(filters, + source.NewPatternFilter(replication.FilterItemKindProject, projectPattern)) + // append repository filter + filters = append(filters, + source.NewPatternFilter(replication.FilterItemKindRepository, + repositoryPattern, source.NewRepositoryConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor)))) + // append tag filter + filters = append(filters, + source.NewPatternFilter(replication.FilterItemKindTag, + tagPattern, source.NewTagConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor)))) + } + + return source.NewDefaultFilterChain(filters) +} + +func replicate(candidates []models.FilterItem, targets []*common_models.RepTarget) error { + // TODO + + log.Infof("replicate candidates %v to targets %v", candidates, targets) return nil } diff --git a/src/replication/models/filter.go b/src/replication/models/filter.go new file mode 100644 index 000000000..648d6247c --- /dev/null +++ b/src/replication/models/filter.go @@ -0,0 +1,41 @@ +// Copyright (c) 2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "fmt" + + "github.com/astaxie/beego/validation" + "github.com/vmware/harbor/src/replication" +) + +// Filter is the data model represents the filter defined by user. +type Filter struct { + Kind string `json:"kind"` + Pattern string `json:"pattern"` +} + +// Valid ... +func (f *Filter) Valid(v *validation.Validation) { + if !(f.Kind == replication.FilterItemKindProject || + f.Kind == replication.FilterItemKindRepository || + f.Kind == replication.FilterItemKindTag) { + v.SetError("kind", fmt.Sprintf("invalid filter kind: %s", f.Kind)) + } + + if len(f.Pattern) == 0 { + v.SetError("pattern", "filter pattern can not be empty") + } +} diff --git a/src/replication/models/filter_item.go b/src/replication/models/filter_item.go index d142fb70f..101fd6438 100644 --- a/src/replication/models/filter_item.go +++ b/src/replication/models/filter_item.go @@ -20,6 +20,8 @@ type FilterItem struct { //kind == 'tag', value will be tag name. Value string `json:"value"` + Operation string `json:"operation"` + //Extension placeholder. //To append more additional information if required by the filter. Metadata map[string]interface{} `json:"metadata"` diff --git a/src/replication/models/policy.go b/src/replication/models/policy.go index 27c157a64..5e6777703 100644 --- a/src/replication/models/policy.go +++ b/src/replication/models/policy.go @@ -9,7 +9,7 @@ type ReplicationPolicy struct { ID int64 //UUID of the policy Name string Description string - Filters []FilterItem + Filters []Filter ReplicateDeletion bool Trigger *Trigger //The trigger of the replication ProjectIDs []int64 //Projects attached to this policy diff --git a/src/replication/policy/manager.go b/src/replication/policy/manager.go index dcd8a13df..3f8a35f17 100644 --- a/src/replication/policy/manager.go +++ b/src/replication/policy/manager.go @@ -7,6 +7,7 @@ import ( "github.com/vmware/harbor/src/common/dao" persist_models "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/replication/models" + "github.com/vmware/harbor/src/ui/config" ) //Manager provides replication policy CURD capabilities. @@ -64,8 +65,14 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (models.Replicati UpdateTime: policy.UpdateTime, } + project, err := config.GlobalProjectMgr.Get(policy.ProjectID) + if err != nil { + return models.ReplicationPolicy{}, err + } + ply.Namespaces = []string{project.Name} + if len(policy.Filters) > 0 { - filters := []models.FilterItem{} + filters := []models.Filter{} if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil { return models.ReplicationPolicy{}, err } diff --git a/src/replication/registry/harbor_adaptor.go b/src/replication/registry/harbor_adaptor.go index 6c7ffe09d..e28622f93 100644 --- a/src/replication/registry/harbor_adaptor.go +++ b/src/replication/registry/harbor_adaptor.go @@ -1,10 +1,15 @@ package registry import ( + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication/models" + "github.com/vmware/harbor/src/ui/utils" ) +// TODO refacotor the methods of HarborAdaptor by caling Harbor's API + //HarborAdaptor is defined to adapt the Harbor registry type HarborAdaptor struct{} @@ -25,7 +30,19 @@ func (ha *HarborAdaptor) GetNamespace(name string) models.Namespace { //GetRepositories is used to get all the repositories under the specified namespace func (ha *HarborAdaptor) GetRepositories(namespace string) []models.Repository { - return nil + repos, err := dao.GetRepositoryByProjectName(namespace) + if err != nil { + log.Errorf("failed to get repositories under namespace %s: %v", namespace, err) + return nil + } + + repositories := []models.Repository{} + for _, repo := range repos { + repositories = append(repositories, models.Repository{ + Name: repo.Name, + }) + } + return repositories } //GetRepository is used to get the repository with the specified name under the specified namespace @@ -35,7 +52,26 @@ func (ha *HarborAdaptor) GetRepository(name string, namespace string) models.Rep //GetTags is used to get all the tags of the specified repository under the namespace func (ha *HarborAdaptor) GetTags(repositoryName string, namespace string) []models.Tag { - return nil + client, err := utils.NewRepositoryClientForUI("harbor-ui", repositoryName) + if err != nil { + log.Errorf("failed to create registry client: %v", err) + return nil + } + + ts, err := client.ListTag() + if err != nil { + log.Errorf("failed to get tags of repository %s: %v", repositoryName, err) + return nil + } + + tags := []models.Tag{} + for _, t := range ts { + tags = append(tags, models.Tag{ + Name: t, + }) + } + + return tags } //GetTag is used to get the tag with the specified name of the repository under the namespace diff --git a/src/replication/source/repository_convertor.go b/src/replication/source/repository_convertor.go index 12e5bda28..b2ff6cd58 100644 --- a/src/replication/source/repository_convertor.go +++ b/src/replication/source/repository_convertor.go @@ -35,6 +35,8 @@ func NewRepositoryConvertor(registry registry.Adaptor) *RepositoryConvertor { // Convert projects to repositories func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.FilterItem { + // TODO get repositories from database where the push/deletion operations are recorded + // if support replicate deletion result := []models.FilterItem{} for _, item := range items { if item.Kind != replication.FilterItemKindProject { @@ -46,12 +48,9 @@ func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.Filter repositories := r.registry.GetRepositories(item.Value) for _, repository := range repositories { result = append(result, models.FilterItem{ - Kind: replication.FilterItemKindRepository, - Value: repository.Name, - // public is used to create project if it does not exist when replicating - Metadata: map[string]interface{}{ - "public": item.Metadata["public"], - }, + Kind: replication.FilterItemKindRepository, + Value: repository.Name, + Operation: item.Operation, }) } } diff --git a/src/replication/source/tag_combination_filter.go b/src/replication/source/tag_combination_filter.go index ced506e44..8bdddadd6 100644 --- a/src/replication/source/tag_combination_filter.go +++ b/src/replication/source/tag_combination_filter.go @@ -60,6 +60,7 @@ func (t *TagCombinationFilter) DoFilter(filterItems []models.FilterItem) []model repos[strs[0]] = append(repos[strs[0]], strs[1]) } + // TODO append operation items := []models.FilterItem{} for repo, tags := range repos { items = append(items, models.FilterItem{ diff --git a/src/replication/source/tag_convertor.go b/src/replication/source/tag_convertor.go index 236e04e3c..09adcecf9 100644 --- a/src/replication/source/tag_convertor.go +++ b/src/replication/source/tag_convertor.go @@ -46,12 +46,9 @@ func (t *TagConvertor) Convert(items []models.FilterItem) []models.FilterItem { tags := t.registry.GetTags(item.Value, "") for _, tag := range tags { result = append(result, models.FilterItem{ - Kind: replication.FilterItemKindTag, - Value: tag.Name, - // public is used to create project if it does not exist when replicating - Metadata: map[string]interface{}{ - "public": item.Metadata["public"], - }, + Kind: replication.FilterItemKindTag, + Value: item.Value + ":" + tag.Name, + Operation: item.Operation, }) } } diff --git a/src/ui/api/models/replication_policy.go b/src/ui/api/models/replication_policy.go index 34fa38a7b..c8da8a03a 100644 --- a/src/ui/api/models/replication_policy.go +++ b/src/ui/api/models/replication_policy.go @@ -27,7 +27,7 @@ type ReplicationPolicy struct { ID int64 `json:"id"` Name string `json:"name"` Description string `json:"description"` - Filters []rep_models.FilterItem `json:"filters"` + Filters []rep_models.Filter `json:"filters"` ReplicateDeletion bool `json:"replicate_deletion"` Trigger *rep_models.Trigger `json:"trigger"` Projects []*common_models.Project `json:"projects"` diff --git a/src/ui/api/utils.go b/src/ui/api/utils.go index 76a0cabf5..7f883109b 100644 --- a/src/ui/api/utils.go +++ b/src/ui/api/utils.go @@ -15,10 +15,7 @@ package api import ( - "bytes" - "encoding/json" "fmt" - "io/ioutil" "net/http" "sort" "strings" @@ -77,120 +74,6 @@ func checkUserExists(name string) int { return 0 } -<<<<<<< HEAD -======= -// CheckAndTriggerReplication checks whether replication policy is set -// on the resource, if is, trigger the replication -func CheckAndTriggerReplication(image, operation string) { - project, _ := utils.ParseRepository(image) - watchItems, err := trigger.DefaultWatchList.Get(project, operation) - if err != nil { - log.Errorf("failed to get watch list for resource %s, operation %s: %v", image, operation, err) - return - } - if len(watchItems) == 0 { - log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation) - return - } - - for _, watchItem := range watchItems { - // TODO define a new type ReplicationItem to wrap FilterItem and operation. - // Maybe change the FilterItem to interface and define a type Resource to - // implement FilterItem is better? - item := &rep_models.FilterItem{ - Kind: replication.FilterItemKindTag, - Value: image, - Metadata: map[string]interface{}{ - "operation": operation, - }, - } - - if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{ - PolicyID: watchItem.PolicyID, - Metadata: map[string]interface{}{ - "": []*rep_models.FilterItem{item}, - }, - }); err != nil { - log.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v", - image, operation, watchItem.PolicyID, err) - return - } - log.Infof("replication topic for resource %s, operation %s, policy %d triggered", - image, operation, watchItem.PolicyID) - } -} - ->>>>>>> 3409fa1... Publish replication notification for manual, scheduel and immediate trigger -// TriggerReplication triggers the replication according to the policy -// TODO remove -func TriggerReplication(policyID int64, repository string, - tags []string, operation string) error { - data := struct { - PolicyID int64 `json:"policy_id"` - Repo string `json:"repository"` - Operation string `json:"operation"` - TagList []string `json:"tags"` - }{ - PolicyID: policyID, - Repo: repository, - TagList: tags, - Operation: operation, - } - - b, err := json.Marshal(&data) - if err != nil { - return err - } - url := buildReplicationURL() - - return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK)) -} - -// TODO remove -func postReplicationAction(policyID int64, acton string) error { - data := struct { - PolicyID int64 `json:"policy_id"` - Action string `json:"action"` - }{ - PolicyID: policyID, - Action: acton, - } - - b, err := json.Marshal(&data) - if err != nil { - return err - } - - url := buildReplicationActionURL() - - req, err := http.NewRequest("POST", url, bytes.NewBuffer(b)) - if err != nil { - return err - } - - uiutils.AddUISecret(req) - - client := &http.Client{} - - resp, err := client.Do(req) - if err != nil { - return err - } - - defer resp.Body.Close() - - if resp.StatusCode == http.StatusOK { - return nil - } - - b, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - return fmt.Errorf("%d %s", resp.StatusCode, string(b)) -} - // SyncRegistry syncs the repositories of registry with database. func SyncRegistry(pm promgr.ProjectManager) error { diff --git a/src/ui/main.go b/src/ui/main.go index 8a47ecca5..e25a10e29 100644 --- a/src/ui/main.go +++ b/src/ui/main.go @@ -29,6 +29,7 @@ import ( "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/scheduler" + "github.com/vmware/harbor/src/replication/core" _ "github.com/vmware/harbor/src/replication/event" "github.com/vmware/harbor/src/ui/api" _ "github.com/vmware/harbor/src/ui/auth/db" @@ -131,6 +132,10 @@ func main() { notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)}) } + if err := core.DefaultController.Init(); err != nil { + log.Errorf("failed to initialize DefaultContorllter: %v", err) + } + filter.Init() beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter) beego.InsertFilter("/api/*", beego.BeforeRouter, filter.MediaTypeFilter("application/json"))