Enable filter chain in replication

This commit is contained in:
Wenkai Yin 2017-11-29 15:14:13 +08:00
parent a384325a1e
commit c5ccb7e53c
13 changed files with 219 additions and 138 deletions

View File

@ -23,8 +23,11 @@ const (
//TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly"
//OperationPush : operation for pushing images
//OperationPush : push operation
OperationPush = "push"
//OperationDelete : operation for deleting images
//OperationDelete : delete operation
OperationDelete = "delete"
// PatternMatchAll : the pattern that match all
PatternMatchAll = ".*"
)

View File

@ -3,6 +3,9 @@ package core
import (
"fmt"
"github.com/vmware/harbor/src/common/dao"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/policy"
@ -182,9 +185,113 @@ func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.Replic
//Replicate starts one replication defined in the specified policy;
//Can be launched by the API layer and related triggers.
func (ctl *Controller) Replicate(policyID int64, metadate ...map[string]interface{}) error {
func (ctl *Controller) Replicate(policyID int64, metadata ...map[string]interface{}) error {
policy, err := ctl.GetPolicy(policyID)
if err != nil {
return err
}
if policy.ID == 0 {
return fmt.Errorf("policy %d not found", policyID)
}
fmt.Printf("replicating %d, metadata: %v ...\n", policyID, metadate)
candidates := []models.FilterItem{}
if len(metadata) > 0 {
meta := metadata[0]["candidates"]
if meta != nil {
cands, ok := meta.([]models.FilterItem)
if ok {
candidates = append(candidates, cands...)
}
}
}
// prepare candidates for replication
candidates = getCandidates(&policy, ctl.sourcer, candidates...)
targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs {
target, err := dao.GetRepTarget(targetID)
if err != nil {
return err
}
targets = append(targets, target)
}
// TODO merge tags whose repository is same into one struct
// call job service to do the replication
return replicate(candidates, targets)
}
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, candidates ...models.FilterItem) []models.FilterItem {
if len(candidates) == 0 {
for _, namespace := range policy.Namespaces {
candidates = append(candidates, models.FilterItem{
Kind: replication.FilterItemKindProject,
Value: namespace,
Operation: replication.OperationPush,
})
}
}
filterChain := buildFilterChain(policy, sourcer)
return filterChain.DoFilter(candidates)
}
func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) source.FilterChain {
filters := []source.Filter{}
patternMap := map[string]string{}
for _, f := range policy.Filters {
patternMap[f.Kind] = f.Pattern
}
// TODO convert wildcard to regex expression
projectPattern, exist := patternMap[replication.FilterItemKindProject]
if !exist {
projectPattern = replication.PatternMatchAll
}
repositoryPattern, exist := patternMap[replication.FilterItemKindRepository]
if !exist {
repositoryPattern = replication.PatternMatchAll
}
repositoryPattern = fmt.Sprintf("%s/%s", projectPattern, repositoryPattern)
tagPattern, exist := patternMap[replication.FilterItemKindProject]
if !exist {
tagPattern = replication.PatternMatchAll
}
tagPattern = fmt.Sprintf("%s:%s", repositoryPattern, tagPattern)
if policy.Trigger.Kind == replication.TriggerKindImmediate {
// build filter chain for immediate trigger policy
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindTag, tagPattern))
} else {
// build filter chain for manual and schedule trigger policy
// append project filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindProject, projectPattern))
// append repository filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindRepository,
repositoryPattern, source.NewRepositoryConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor))))
// append tag filter
filters = append(filters,
source.NewPatternFilter(replication.FilterItemKindTag,
tagPattern, source.NewTagConvertor(sourcer.GetAdaptor(replication.AdaptorKindHarbor))))
}
return source.NewDefaultFilterChain(filters)
}
func replicate(candidates []models.FilterItem, targets []*common_models.RepTarget) error {
// TODO
log.Infof("replicate candidates %v to targets %v", candidates, targets)
return nil
}

View File

@ -0,0 +1,41 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"fmt"
"github.com/astaxie/beego/validation"
"github.com/vmware/harbor/src/replication"
)
// Filter is the data model represents the filter defined by user.
type Filter struct {
Kind string `json:"kind"`
Pattern string `json:"pattern"`
}
// Valid ...
func (f *Filter) Valid(v *validation.Validation) {
if !(f.Kind == replication.FilterItemKindProject ||
f.Kind == replication.FilterItemKindRepository ||
f.Kind == replication.FilterItemKindTag) {
v.SetError("kind", fmt.Sprintf("invalid filter kind: %s", f.Kind))
}
if len(f.Pattern) == 0 {
v.SetError("pattern", "filter pattern can not be empty")
}
}

View File

@ -20,6 +20,8 @@ type FilterItem struct {
//kind == 'tag', value will be tag name.
Value string `json:"value"`
Operation string `json:"operation"`
//Extension placeholder.
//To append more additional information if required by the filter.
Metadata map[string]interface{} `json:"metadata"`

View File

@ -9,7 +9,7 @@ type ReplicationPolicy struct {
ID int64 //UUID of the policy
Name string
Description string
Filters []FilterItem
Filters []Filter
ReplicateDeletion bool
Trigger *Trigger //The trigger of the replication
ProjectIDs []int64 //Projects attached to this policy

View File

@ -7,6 +7,7 @@ import (
"github.com/vmware/harbor/src/common/dao"
persist_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/ui/config"
)
//Manager provides replication policy CURD capabilities.
@ -64,8 +65,14 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (models.Replicati
UpdateTime: policy.UpdateTime,
}
project, err := config.GlobalProjectMgr.Get(policy.ProjectID)
if err != nil {
return models.ReplicationPolicy{}, err
}
ply.Namespaces = []string{project.Name}
if len(policy.Filters) > 0 {
filters := []models.FilterItem{}
filters := []models.Filter{}
if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil {
return models.ReplicationPolicy{}, err
}

View File

@ -1,10 +1,15 @@
package registry
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/ui/utils"
)
// TODO refacotor the methods of HarborAdaptor by caling Harbor's API
//HarborAdaptor is defined to adapt the Harbor registry
type HarborAdaptor struct{}
@ -25,7 +30,19 @@ func (ha *HarborAdaptor) GetNamespace(name string) models.Namespace {
//GetRepositories is used to get all the repositories under the specified namespace
func (ha *HarborAdaptor) GetRepositories(namespace string) []models.Repository {
return nil
repos, err := dao.GetRepositoryByProjectName(namespace)
if err != nil {
log.Errorf("failed to get repositories under namespace %s: %v", namespace, err)
return nil
}
repositories := []models.Repository{}
for _, repo := range repos {
repositories = append(repositories, models.Repository{
Name: repo.Name,
})
}
return repositories
}
//GetRepository is used to get the repository with the specified name under the specified namespace
@ -35,7 +52,26 @@ func (ha *HarborAdaptor) GetRepository(name string, namespace string) models.Rep
//GetTags is used to get all the tags of the specified repository under the namespace
func (ha *HarborAdaptor) GetTags(repositoryName string, namespace string) []models.Tag {
return nil
client, err := utils.NewRepositoryClientForUI("harbor-ui", repositoryName)
if err != nil {
log.Errorf("failed to create registry client: %v", err)
return nil
}
ts, err := client.ListTag()
if err != nil {
log.Errorf("failed to get tags of repository %s: %v", repositoryName, err)
return nil
}
tags := []models.Tag{}
for _, t := range ts {
tags = append(tags, models.Tag{
Name: t,
})
}
return tags
}
//GetTag is used to get the tag with the specified name of the repository under the namespace

View File

@ -35,6 +35,8 @@ func NewRepositoryConvertor(registry registry.Adaptor) *RepositoryConvertor {
// Convert projects to repositories
func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.FilterItem {
// TODO get repositories from database where the push/deletion operations are recorded
// if support replicate deletion
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindProject {
@ -46,12 +48,9 @@ func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.Filter
repositories := r.registry.GetRepositories(item.Value)
for _, repository := range repositories {
result = append(result, models.FilterItem{
Kind: replication.FilterItemKindRepository,
Value: repository.Name,
// public is used to create project if it does not exist when replicating
Metadata: map[string]interface{}{
"public": item.Metadata["public"],
},
Kind: replication.FilterItemKindRepository,
Value: repository.Name,
Operation: item.Operation,
})
}
}

View File

@ -60,6 +60,7 @@ func (t *TagCombinationFilter) DoFilter(filterItems []models.FilterItem) []model
repos[strs[0]] = append(repos[strs[0]], strs[1])
}
// TODO append operation
items := []models.FilterItem{}
for repo, tags := range repos {
items = append(items, models.FilterItem{

View File

@ -46,12 +46,9 @@ func (t *TagConvertor) Convert(items []models.FilterItem) []models.FilterItem {
tags := t.registry.GetTags(item.Value, "")
for _, tag := range tags {
result = append(result, models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: tag.Name,
// public is used to create project if it does not exist when replicating
Metadata: map[string]interface{}{
"public": item.Metadata["public"],
},
Kind: replication.FilterItemKindTag,
Value: item.Value + ":" + tag.Name,
Operation: item.Operation,
})
}
}

View File

@ -27,7 +27,7 @@ type ReplicationPolicy struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Filters []rep_models.FilterItem `json:"filters"`
Filters []rep_models.Filter `json:"filters"`
ReplicateDeletion bool `json:"replicate_deletion"`
Trigger *rep_models.Trigger `json:"trigger"`
Projects []*common_models.Project `json:"projects"`

View File

@ -15,10 +15,7 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strings"
@ -77,120 +74,6 @@ func checkUserExists(name string) int {
return 0
}
<<<<<<< HEAD
=======
// CheckAndTriggerReplication checks whether replication policy is set
// on the resource, if is, trigger the replication
func CheckAndTriggerReplication(image, operation string) {
project, _ := utils.ParseRepository(image)
watchItems, err := trigger.DefaultWatchList.Get(project, operation)
if err != nil {
log.Errorf("failed to get watch list for resource %s, operation %s: %v", image, operation, err)
return
}
if len(watchItems) == 0 {
log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation)
return
}
for _, watchItem := range watchItems {
// TODO define a new type ReplicationItem to wrap FilterItem and operation.
// Maybe change the FilterItem to interface and define a type Resource to
// implement FilterItem is better?
item := &rep_models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: image,
Metadata: map[string]interface{}{
"operation": operation,
},
}
if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
PolicyID: watchItem.PolicyID,
Metadata: map[string]interface{}{
"": []*rep_models.FilterItem{item},
},
}); err != nil {
log.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v",
image, operation, watchItem.PolicyID, err)
return
}
log.Infof("replication topic for resource %s, operation %s, policy %d triggered",
image, operation, watchItem.PolicyID)
}
}
>>>>>>> 3409fa1... Publish replication notification for manual, scheduel and immediate trigger
// TriggerReplication triggers the replication according to the policy
// TODO remove
func TriggerReplication(policyID int64, repository string,
tags []string, operation string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
Operation string `json:"operation"`
TagList []string `json:"tags"`
}{
PolicyID: policyID,
Repo: repository,
TagList: tags,
Operation: operation,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationURL()
return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK))
}
// TODO remove
func postReplicationAction(policyID int64, acton string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}{
PolicyID: policyID,
Action: acton,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationActionURL()
req, err := http.NewRequest("POST", url, bytes.NewBuffer(b))
if err != nil {
return err
}
uiutils.AddUISecret(req)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}
// SyncRegistry syncs the repositories of registry with database.
func SyncRegistry(pm promgr.ProjectManager) error {

View File

@ -29,6 +29,7 @@ import (
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/replication/core"
_ "github.com/vmware/harbor/src/replication/event"
"github.com/vmware/harbor/src/ui/api"
_ "github.com/vmware/harbor/src/ui/auth/db"
@ -131,6 +132,10 @@ func main() {
notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)})
}
if err := core.DefaultController.Init(); err != nil {
log.Errorf("failed to initialize DefaultContorllter: %v", err)
}
filter.Init()
beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter)
beego.InsertFilter("/api/*", beego.BeforeRouter, filter.MediaTypeFilter("application/json"))