Merge pull request #3783 from ywk253100/171206_replicator

Replication enhancement
This commit is contained in:
Wenkai Yin 2017-12-25 10:37:19 +08:00 committed by GitHub
commit fa472823e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1802 additions and 617 deletions

View File

@ -89,8 +89,8 @@ script:
- sudo mkdir -p ./make/common/config/registry/ - sudo mkdir -p ./make/common/config/registry/
- sudo mv ./tests/reg_config.yml ./make/common/config/registry/config.yml - sudo mv ./tests/reg_config.yml ./make/common/config/registry/config.yml
- sudo docker-compose -f ./make/docker-compose.test.yml up -d - sudo docker-compose -f ./make/docker-compose.test.yml up -d
- go list ./... | grep -v -E 'vendor|tests' | xargs -L1 fgt golint - go list ./... | grep -v -E 'vendor|tests|test' | xargs -L1 fgt golint
- go list ./... | grep -v -E 'vendor|tests' | xargs -L1 go vet - go list ./... | grep -v -E 'vendor|tests|test' | xargs -L1 go vet
- export MYSQL_HOST=$IP - export MYSQL_HOST=$IP
- export REGISTRY_URL=$IP:5000 - export REGISTRY_URL=$IP:5000
- echo $REGISTRY_URL - echo $REGISTRY_URL

View File

@ -15,16 +15,11 @@
package client package client
import ( import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings" "strings"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/adminserver/systeminfo/imagestorage" "github.com/vmware/harbor/src/adminserver/systeminfo/imagestorage"
"github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier/auth"
"github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils"
) )
@ -43,38 +38,29 @@ type Client interface {
} }
// NewClient return an instance of Adminserver client // NewClient return an instance of Adminserver client
func NewClient(baseURL string, authorizer auth.Authorizer) Client { func NewClient(baseURL string, cfg *Config) Client {
baseURL = strings.TrimRight(baseURL, "/") baseURL = strings.TrimRight(baseURL, "/")
if !strings.Contains(baseURL, "://") { if !strings.Contains(baseURL, "://") {
baseURL = "http://" + baseURL baseURL = "http://" + baseURL
} }
return &client{ client := &client{
baseURL: baseURL, baseURL: baseURL,
client: &http.Client{},
authorizer: authorizer,
} }
if cfg != nil {
authorizer := auth.NewSecretAuthorizer(cfg.Secret)
client.client = http.NewClient(nil, authorizer)
}
return client
} }
type client struct { type client struct {
baseURL string baseURL string
client *http.Client client *http.Client
authorizer auth.Authorizer
} }
// do creates request and authorizes it if authorizer is not nil // Config contains configurations needed for client
func (c *client) do(method, relativePath string, body io.Reader) (*http.Response, error) { type Config struct {
url := c.baseURL + relativePath Secret string
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
if c.authorizer != nil {
if err := c.authorizer.Authorize(req); err != nil {
return nil, err
}
}
return c.client.Do(req)
} }
func (c *client) Ping() error { func (c *client) Ping() error {
@ -88,96 +74,32 @@ func (c *client) Ping() error {
// GetCfgs ... // GetCfgs ...
func (c *client) GetCfgs() (map[string]interface{}, error) { func (c *client) GetCfgs() (map[string]interface{}, error) {
resp, err := c.do(http.MethodGet, "/api/configurations", nil) url := c.baseURL + "/api/configurations"
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get configurations: %d %s",
resp.StatusCode, b)
}
cfgs := map[string]interface{}{} cfgs := map[string]interface{}{}
if err = json.Unmarshal(b, &cfgs); err != nil { if err := c.client.Get(url, &cfgs); err != nil {
return nil, err return nil, err
} }
return cfgs, nil return cfgs, nil
} }
// UpdateCfgs ... // UpdateCfgs ...
func (c *client) UpdateCfgs(cfgs map[string]interface{}) error { func (c *client) UpdateCfgs(cfgs map[string]interface{}) error {
data, err := json.Marshal(cfgs) url := c.baseURL + "/api/configurations"
if err != nil { return c.client.Put(url, cfgs)
return err
}
resp, err := c.do(http.MethodPut, "/api/configurations", bytes.NewReader(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("failed to update configurations: %d %s",
resp.StatusCode, b)
}
return nil
} }
// ResetCfgs ... // ResetCfgs ...
func (c *client) ResetCfgs() error { func (c *client) ResetCfgs() error {
resp, err := c.do(http.MethodPost, "/api/configurations/reset", nil) url := c.baseURL + "/api/configurations/reset"
if err != nil { return c.client.Post(url)
return err
}
if resp.StatusCode != http.StatusOK {
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("failed to reset configurations: %d %s",
resp.StatusCode, b)
}
return nil
} }
// Capacity ... // Capacity ...
func (c *client) Capacity() (*imagestorage.Capacity, error) { func (c *client) Capacity() (*imagestorage.Capacity, error) {
resp, err := c.do(http.MethodGet, "/api/systeminfo/capacity", nil) url := c.baseURL + "/api/systeminfo/capacity"
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get capacity: %d %s",
resp.StatusCode, b)
}
capacity := &imagestorage.Capacity{} capacity := &imagestorage.Capacity{}
if err = json.Unmarshal(b, capacity); err != nil { if err := c.client.Get(url, capacity); err != nil {
return nil, err return nil, err
} }
return capacity, nil return capacity, nil
} }

View File

@ -34,7 +34,7 @@ func TestMain(m *testing.M) {
os.Exit(1) os.Exit(1)
} }
c = NewClient(server.URL, nil) c = NewClient(server.URL, &Config{})
os.Exit(m.Run()) os.Exit(m.Run())
} }

View File

@ -941,7 +941,6 @@ func TestFilterRepTargets(t *testing.T) {
func TestAddRepPolicy(t *testing.T) { func TestAddRepPolicy(t *testing.T) {
policy := models.RepPolicy{ policy := models.RepPolicy{
ProjectID: 1, ProjectID: 1,
Enabled: 1,
TargetID: targetID, TargetID: targetID,
Description: "whatever", Description: "whatever",
Name: "mypolicy", Name: "mypolicy",
@ -961,15 +960,10 @@ func TestAddRepPolicy(t *testing.T) {
t.Errorf("Unable to find a policy with id: %d", id) t.Errorf("Unable to find a policy with id: %d", id)
} }
if p.Name != "mypolicy" || p.TargetID != targetID || p.Enabled != 1 || p.Description != "whatever" { if p.Name != "mypolicy" || p.TargetID != targetID || p.Description != "whatever" {
t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Enabled: 1, Description: whatever;\n result: Name: %s, TargetID: %d, Enabled: %d, Description: %s", t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Description: whatever;\n result: Name: %s, TargetID: %d, Description: %s",
targetID, p.Name, p.TargetID, p.Enabled, p.Description) targetID, p.Name, p.TargetID, p.Description)
} }
var tm = time.Now().AddDate(0, 0, -1)
if !p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
} }
func TestGetRepPolicyByTarget(t *testing.T) { func TestGetRepPolicyByTarget(t *testing.T) {
@ -1019,44 +1013,9 @@ func TestGetRepPolicyByName(t *testing.T) {
} }
func TestDisableRepPolicy(t *testing.T) {
err := DisableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 1 {
t.Errorf("The Enabled value of replication policy is still 1 after disabled, id: %d", policyID)
}
}
func TestEnableRepPolicy(t *testing.T) {
err := EnableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 0 {
t.Errorf("The Enabled value of replication policy is still 0 after disabled, id: %d", policyID)
}
}
func TestAddRepPolicy2(t *testing.T) { func TestAddRepPolicy2(t *testing.T) {
policy2 := models.RepPolicy{ policy2 := models.RepPolicy{
ProjectID: 3, ProjectID: 3,
Enabled: 0,
TargetID: 3, TargetID: 3,
Description: "whatever", Description: "whatever",
Name: "mypolicy", Name: "mypolicy",
@ -1073,10 +1032,6 @@ func TestAddRepPolicy2(t *testing.T) {
if p == nil { if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID2) t.Errorf("Unable to find a policy with id: %d", policyID2)
} }
var tm time.Time
if p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
} }
func TestAddRepJob(t *testing.T) { func TestAddRepJob(t *testing.T) {

View File

@ -106,17 +106,13 @@ func FilterRepTargets(name string) ([]*models.RepTarget, error) {
// AddRepPolicy ... // AddRepPolicy ...
func AddRepPolicy(policy models.RepPolicy) (int64, error) { func AddRepPolicy(policy models.RepPolicy) (int64, error) {
o := GetOrmer() o := GetOrmer()
sql := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, start_time, creation_time, update_time, filters, replicate_deletion) sql := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, creation_time, update_time, filters, replicate_deletion)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
params := []interface{}{} params := []interface{}{}
params = append(params, policy.Name, policy.ProjectID, policy.TargetID, policy.Enabled, policy.Description, policy.Trigger)
now := time.Now() now := time.Now()
if policy.Enabled == 1 { params = append(params, policy.Name, policy.ProjectID, policy.TargetID, 1,
params = append(params, now) policy.Description, policy.Trigger, now, now, policy.Filters,
} else { policy.ReplicateDeletion)
params = append(params, nil)
}
params = append(params, now, now, policy.Filters, policy.ReplicateDeletion)
result, err := o.Raw(sql, params...).Exec() result, err := o.Raw(sql, params...).Exec()
if err != nil { if err != nil {
@ -150,8 +146,8 @@ func FilterRepPolicies(name string, projectID int64) ([]*models.RepPolicy, error
var args []interface{} var args []interface{}
sql := `select rp.id, rp.project_id, rp.target_id, sql := `select rp.id, rp.project_id, rp.target_id,
rt.name as target_name, rp.name, rp.enabled, rp.description, rt.name as target_name, rp.name, rp.description,
rp.cron_str, rp.filters, rp.replicate_deletion,rp.start_time, rp.cron_str, rp.filters, rp.replicate_deletion,
rp.creation_time, rp.update_time, rp.creation_time, rp.update_time,
count(rj.status) as error_job_count count(rj.status) as error_job_count
from replication_policy rp from replication_policy rp
@ -245,7 +241,7 @@ func GetRepPolicyByProjectAndTarget(projectID, targetID int64) ([]*models.RepPol
func UpdateRepPolicy(policy *models.RepPolicy) error { func UpdateRepPolicy(policy *models.RepPolicy) error {
o := GetOrmer() o := GetOrmer()
policy.UpdateTime = time.Now() policy.UpdateTime = time.Now()
_, err := o.Update(policy, "TargetID", "Name", "Enabled", "Description", _, err := o.Update(policy, "TargetID", "Name", "Description",
"Trigger", "Filters", "ReplicateDeletion", "UpdateTime") "Trigger", "Filters", "ReplicateDeletion", "UpdateTime")
return err return err
} }
@ -262,36 +258,6 @@ func DeleteRepPolicy(id int64) error {
return err return err
} }
// UpdateRepPolicyEnablement ...
func UpdateRepPolicyEnablement(id int64, enabled int) error {
o := GetOrmer()
p := models.RepPolicy{
ID: id,
Enabled: enabled,
UpdateTime: time.Now(),
}
var err error
if enabled == 1 {
p.StartTime = time.Now()
_, err = o.Update(&p, "Enabled", "StartTime")
} else {
_, err = o.Update(&p, "Enabled")
}
return err
}
// EnableRepPolicy ...
func EnableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 1)
}
// DisableRepPolicy ...
func DisableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 0)
}
// AddRepJob ... // AddRepJob ...
func AddRepJob(job models.RepJob) (int64, error) { func AddRepJob(job models.RepJob) (int64, error) {
o := GetOrmer() o := GetOrmer()

162
src/common/http/client.go Normal file
View File

@ -0,0 +1,162 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"github.com/vmware/harbor/src/common/http/modifier"
)
// Client is a util for common HTTP operations, such Get, Head, Post, Put and Delete.
// Use Do instead if those methods can not meet your requirement
type Client struct {
modifiers []modifier.Modifier
client *http.Client
}
// NewClient creates an instance of Client.
// Use net/http.Client as the default value if c is nil.
// Modifiers modify the request before sending it.
func NewClient(c *http.Client, modifiers ...modifier.Modifier) *Client {
client := &Client{
client: c,
}
if client.client == nil {
client.client = &http.Client{}
}
if len(modifiers) > 0 {
client.modifiers = modifiers
}
return client
}
// Do ...
func (c *Client) Do(req *http.Request) (*http.Response, error) {
for _, modifier := range c.modifiers {
if err := modifier.Modify(req); err != nil {
return nil, err
}
}
return c.client.Do(req)
}
// Get ...
func (c *Client) Get(url string, v ...interface{}) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
data, err := c.do(req)
if err != nil {
return err
}
if len(v) == 0 {
return nil
}
return json.Unmarshal(data, v[0])
}
// Head ...
func (c *Client) Head(url string) error {
req, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil {
return err
}
_, err = c.do(req)
return err
}
// Post ...
func (c *Client) Post(url string, v ...interface{}) error {
var reader io.Reader
if len(v) > 0 {
data, err := json.Marshal(v[0])
if err != nil {
return err
}
reader = bytes.NewReader(data)
}
req, err := http.NewRequest(http.MethodPost, url, reader)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
_, err = c.do(req)
return err
}
// Put ...
func (c *Client) Put(url string, v ...interface{}) error {
var reader io.Reader
if len(v) > 0 {
data := []byte{}
data, err := json.Marshal(v[0])
if err != nil {
return err
}
reader = bytes.NewReader(data)
}
req, err := http.NewRequest(http.MethodPut, url, reader)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
_, err = c.do(req)
return err
}
// Delete ...
func (c *Client) Delete(url string) error {
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
return err
}
_, err = c.do(req)
return err
}
func (c *Client) do(req *http.Request) ([]byte, error) {
resp, err := c.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, &Error{
Code: resp.StatusCode,
Message: string(data),
}
}
return data, nil
}

30
src/common/http/error.go Normal file
View File

@ -0,0 +1,30 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"fmt"
)
// Error wrap HTTP status code and message as an error
type Error struct {
Code int
Message string
}
// Error ...
func (e *Error) Error() string {
return fmt.Sprintf("http error: code %d, message %s", e.Code, e.Message)
}

View File

@ -0,0 +1,54 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"errors"
"net/http"
"github.com/vmware/harbor/src/common/http/modifier"
)
const (
secretCookieName = "secret"
)
// Authorizer is a kind of Modifier used to authorize the requests
type Authorizer modifier.Modifier
// SecretAuthorizer authorizes the requests with the specified secret
type SecretAuthorizer struct {
secret string
}
// NewSecretAuthorizer returns an instance of SecretAuthorizer
func NewSecretAuthorizer(secret string) *SecretAuthorizer {
return &SecretAuthorizer{
secret: secret,
}
}
// Modify the request by adding secret authentication information
func (s *SecretAuthorizer) Modify(req *http.Request) error {
if req == nil {
return errors.New("the request is null")
}
req.AddCookie(&http.Cookie{
Name: secretCookieName,
Value: s.secret,
})
return nil
}

View File

@ -1,4 +1,3 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -19,25 +18,22 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAuthorize(t *testing.T) { func TestAuthorizeOfSecretAuthorizer(t *testing.T) {
cookieName := "secret"
secret := "secret" secret := "secret"
authorizer := NewSecretAuthorizer(cookieName, secret) authorizer := NewSecretAuthorizer(secret)
// nil request
require.NotNil(t, authorizer.Modify(nil))
// valid request
req, err := http.NewRequest("", "", nil) req, err := http.NewRequest("", "", nil)
if !assert.Nil(t, err, "unexpected error") { require.Nil(t, err)
return require.Nil(t, authorizer.Modify(req))
} require.Equal(t, 1, len(req.Cookies()))
v, err := req.Cookie(secretCookieName)
err = authorizer.Authorize(req) require.Nil(t, err)
if !assert.Nil(t, err, "unexpected error") { assert.Equal(t, secret, v.Value)
return
}
cookie, err := req.Cookie(cookieName)
if !assert.Nil(t, err, "unexpected error") {
return
}
assert.Equal(t, secret, cookie.Value, "unexpected cookie")
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package registry package modifier
import ( import (
"net/http" "net/http"

View File

@ -42,12 +42,10 @@ type RepPolicy struct {
ProjectID int64 `orm:"column(project_id)" ` ProjectID int64 `orm:"column(project_id)" `
TargetID int64 `orm:"column(target_id)"` TargetID int64 `orm:"column(target_id)"`
Name string `orm:"column(name)"` Name string `orm:"column(name)"`
Enabled int `orm:"column(enabled)"`
Description string `orm:"column(description)"` Description string `orm:"column(description)"`
Trigger string `orm:"column(cron_str)"` Trigger string `orm:"column(cron_str)"`
Filters string `orm:"column(filters)"` Filters string `orm:"column(filters)"`
ReplicateDeletion bool `orm:"column(replicate_deletion)"` ReplicateDeletion bool `orm:"column(replicate_deletion)"`
StartTime time.Time `orm:"column(start_time)"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add"` CreationTime time.Time `orm:"column(creation_time);auto_now_add"`
UpdateTime time.Time `orm:"column(update_time);auto_now"` UpdateTime time.Time `orm:"column(update_time);auto_now"`
Deleted int `orm:"column(deleted)"` Deleted int `orm:"column(deleted)"`

View File

@ -23,9 +23,9 @@ import (
"time" "time"
"github.com/docker/distribution/registry/auth/token" "github.com/docker/distribution/registry/auth/token"
"github.com/vmware/harbor/src/common/http/modifier"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry"
token_util "github.com/vmware/harbor/src/ui/service/token" token_util "github.com/vmware/harbor/src/ui/service/token"
) )
@ -254,7 +254,7 @@ func ping(client *http.Client, endpoint string) (string, string, error) {
// from token server and add it to the origin request // from token server and add it to the origin request
// If customizedTokenService is set, the token request will be sent to it instead of the server get from authorizer // If customizedTokenService is set, the token request will be sent to it instead of the server get from authorizer
func NewStandardTokenAuthorizer(client *http.Client, credential Credential, func NewStandardTokenAuthorizer(client *http.Client, credential Credential,
customizedTokenService ...string) registry.Modifier { customizedTokenService ...string) modifier.Modifier {
generator := &standardTokenGenerator{ generator := &standardTokenGenerator{
credential: credential, credential: credential,
client: client, client: client,
@ -309,7 +309,7 @@ func (s *standardTokenGenerator) generate(scopes []*token.ResourceActions, endpo
// NewRawTokenAuthorizer returns a token authorizer which calls method to create // NewRawTokenAuthorizer returns a token authorizer which calls method to create
// token directly // token directly
func NewRawTokenAuthorizer(username, service string) registry.Modifier { func NewRawTokenAuthorizer(username, service string) modifier.Modifier {
generator := &rawTokenGenerator{ generator := &rawTokenGenerator{
service: service, service: service,
username: username, username: username,

View File

@ -17,17 +17,18 @@ package registry
import ( import (
"net/http" "net/http"
"github.com/vmware/harbor/src/common/http/modifier"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
) )
// Transport holds information about base transport and modifiers // Transport holds information about base transport and modifiers
type Transport struct { type Transport struct {
transport http.RoundTripper transport http.RoundTripper
modifiers []Modifier modifiers []modifier.Modifier
} }
// NewTransport ... // NewTransport ...
func NewTransport(transport http.RoundTripper, modifiers ...Modifier) *Transport { func NewTransport(transport http.RoundTripper, modifiers ...modifier.Modifier) *Transport {
return &Transport{ return &Transport{
transport: transport, transport: transport,
modifiers: modifiers, modifiers: modifiers,

View File

@ -0,0 +1,45 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
)
type FakePolicyManager struct {
}
func (f *FakePolicyManager) GetPolicies(query models.QueryParameter) ([]models.ReplicationPolicy, error) {
return []models.ReplicationPolicy{}, nil
}
func (f *FakePolicyManager) GetPolicy(id int64) (models.ReplicationPolicy, error) {
return models.ReplicationPolicy{
ID: 1,
Trigger: &models.Trigger{
Kind: replication.TriggerKindManual,
},
}, nil
}
func (f *FakePolicyManager) CreatePolicy(policy models.ReplicationPolicy) (int64, error) {
return 1, nil
}
func (f *FakePolicyManager) UpdatePolicy(models.ReplicationPolicy) error {
return nil
}
func (f *FakePolicyManager) RemovePolicy(int64) error {
return nil
}

View File

@ -0,0 +1,26 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
type FakeReplicatoinController struct {
FakePolicyManager
}
func (f *FakeReplicatoinController) Init() error {
return nil
}
func (f *FakeReplicatoinController) Replicate(policyID int64, metadata ...map[string]interface{}) error {
return nil
}

View File

@ -0,0 +1,56 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"github.com/vmware/harbor/src/common/http"
"github.com/vmware/harbor/src/common/http/modifier/auth"
"github.com/vmware/harbor/src/jobservice/api"
)
// Client defines the methods that a jobservice client should implement
type Client interface {
SubmitReplicationJob(*api.ReplicationReq) error
}
// DefaultClient provides a default implement for the interface Client
type DefaultClient struct {
endpoint string
client *http.Client
}
// Config contains configuration items needed for DefaultClient
type Config struct {
Secret string
}
// NewDefaultClient returns an instance of DefaultClient
func NewDefaultClient(endpoint string, cfg *Config) *DefaultClient {
c := &DefaultClient{
endpoint: endpoint,
}
if cfg != nil {
c.client = http.NewClient(nil, auth.NewSecretAuthorizer(cfg.Secret))
}
return c
}
// SubmitReplicationJob submits a replication job to the jobservice
func (d *DefaultClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
url := d.endpoint + "/api/jobs/replication"
return d.client.Post(url, replication)
}

View File

@ -0,0 +1,55 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"encoding/json"
"net/http"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/jobservice/api"
)
var url string
func TestMain(m *testing.M) {
requestMapping := []*test.RequestHandlerMapping{
&test.RequestHandlerMapping{
Method: http.MethodPost,
Pattern: "/api/jobs/replication",
Handler: func(w http.ResponseWriter, r *http.Request) {
replication := &api.ReplicationReq{}
if err := json.NewDecoder(r.Body).Decode(replication); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
},
},
}
server := test.NewServer(requestMapping...)
defer server.Close()
url = server.URL
os.Exit(m.Run())
}
func TestSubmitReplicationJob(t *testing.T) {
client := NewDefaultClient(url, &Config{})
err := client.SubmitReplicationJob(&api.ReplicationReq{})
assert.Nil(t, err)
}

View File

@ -20,7 +20,6 @@ import (
"strings" "strings"
"github.com/vmware/harbor/src/adminserver/client" "github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/common" "github.com/vmware/harbor/src/common"
comcfg "github.com/vmware/harbor/src/common/config" comcfg "github.com/vmware/harbor/src/common/config"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
@ -50,8 +49,10 @@ func Init() error {
adminServerURL = "http://adminserver" adminServerURL = "http://adminserver"
} }
log.Infof("initializing client for adminserver %s ...", adminServerURL) log.Infof("initializing client for adminserver %s ...", adminServerURL)
authorizer := auth.NewSecretAuthorizer(secretCookieName, UISecret()) cfg := &client.Config{
AdminserverClient = client.NewClient(adminServerURL, authorizer) Secret: UISecret(),
}
AdminserverClient = client.NewClient(adminServerURL, cfg)
if err := AdminserverClient.Ping(); err != nil { if err := AdminserverClient.Ping(); err != nil {
return fmt.Errorf("failed to ping adminserver: %v", err) return fmt.Errorf("failed to ping adminserver: %v", err)
} }

View File

@ -105,7 +105,6 @@ func TestRepJob(t *testing.T) {
assert.Nil(err) assert.Nil(err)
j, err := dao.GetRepJob(repJobID) j, err := dao.GetRepJob(repJobID)
assert.Equal(models.JobRetrying, j.Status) assert.Equal(models.JobRetrying, j.Status)
assert.Equal(1, rj.parm.Enabled)
assert.False(rj.parm.Insecure) assert.False(rj.parm.Insecure)
rj2 := NewRepJob(99999) rj2 := NewRepJob(99999)
err = rj2.Init() err = rj2.Init()
@ -163,7 +162,6 @@ func prepareRepJobData() error {
} }
policy := models.RepPolicy{ policy := models.RepPolicy{
ProjectID: 1, ProjectID: 1,
Enabled: 1,
TargetID: targetID, TargetID: targetID,
Description: "whatever", Description: "whatever",
Name: "mypolicy", Name: "mypolicy",

View File

@ -62,7 +62,6 @@ type RepJobParm struct {
TargetPassword string TargetPassword string
Repository string Repository string
Tags []string Tags []string
Enabled int
Operation string Operation string
Insecure bool Insecure bool
} }
@ -124,13 +123,8 @@ func (rj *RepJob) Init() error {
LocalRegURL: regURL, LocalRegURL: regURL,
Repository: job.Repository, Repository: job.Repository,
Tags: job.TagList, Tags: job.TagList,
Enabled: policy.Enabled,
Operation: job.Operation, Operation: job.Operation,
} }
if policy.Enabled == 0 {
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID) target, err := dao.GetRepTarget(policy.TargetID)
if err != nil { if err != nil {
return fmt.Errorf("Failed to get target, error: %v", err) return fmt.Errorf("Failed to get target, error: %v", err)

View File

@ -208,16 +208,6 @@ func (sm *SM) Reset(j Job) error {
} }
func (sm *SM) kickOff() error { func (sm *SM) kickOff() error {
if repJob, ok := sm.CurrentJob.(*RepJob); ok {
if repJob.parm.Enabled == 0 {
log.Debugf("The policy of job:%v is disabled, will cancel the job", repJob)
_, err := sm.EnterState(models.JobCanceled)
if err != nil {
log.Warningf("For job: %v, failed to update state to 'canceled', error: %v", repJob, err)
}
return err
}
}
log.Debugf("In kickOff: will start job: %v", sm.CurrentJob) log.Debugf("In kickOff: will start job: %v", sm.CurrentJob)
sm.Start(models.JobRunning) sm.Start(models.JobRunning)
return nil return nil

View File

@ -22,9 +22,4 @@ const (
TriggerScheduleDaily = "daily" TriggerScheduleDaily = "daily"
//TriggerScheduleWeekly : type of scheduling is 'weekly' //TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly" TriggerScheduleWeekly = "weekly"
//OperationPush : operation for pushing images
OperationPush = "push"
//OperationDelete : operation for deleting images
OperationDelete = "delete"
) )

View File

@ -1,34 +1,69 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core package core
import ( import (
"fmt" "fmt"
"strings"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/policy" "github.com/vmware/harbor/src/replication/policy"
"github.com/vmware/harbor/src/replication/replicator"
"github.com/vmware/harbor/src/replication/source" "github.com/vmware/harbor/src/replication/source"
"github.com/vmware/harbor/src/replication/target"
"github.com/vmware/harbor/src/replication/trigger" "github.com/vmware/harbor/src/replication/trigger"
"github.com/vmware/harbor/src/ui/config"
) )
//Controller is core module to cordinate and control the overall workflow of the // Controller defines the methods that a replicatoin controllter should implement
type Controller interface {
policy.Manager
Init() error
Replicate(policyID int64, metadata ...map[string]interface{}) error
}
//DefaultController is core module to cordinate and control the overall workflow of the
//replication modules. //replication modules.
type Controller struct { type DefaultController struct {
//Indicate whether the controller has been initialized or not //Indicate whether the controller has been initialized or not
initialized bool initialized bool
//Manage the policies //Manage the policies
policyManager *policy.Manager policyManager policy.Manager
//Manage the targets
targetManager target.Manager
//Handle the things related with source //Handle the things related with source
sourcer *source.Sourcer sourcer *source.Sourcer
//Manage the triggers of policies //Manage the triggers of policies
triggerManager *trigger.Manager triggerManager *trigger.Manager
//Handle the replication work
replicator replicator.Replicator
} }
//Keep controller as singleton instance //Keep controller as singleton instance
var ( var (
DefaultController = NewController(ControllerConfig{}) //Use default data GlobalController Controller = NewDefaultController(ControllerConfig{}) //Use default data
) )
//ControllerConfig includes related configurations required by the controller //ControllerConfig includes related configurations required by the controller
@ -37,33 +72,35 @@ type ControllerConfig struct {
CacheCapacity int CacheCapacity int
} }
//NewController is the constructor of Controller. //NewDefaultController is the constructor of DefaultController.
func NewController(config ControllerConfig) *Controller { func NewDefaultController(cfg ControllerConfig) *DefaultController {
//Controller refer the default instances //Controller refer the default instances
return &Controller{ ctl := &DefaultController{
policyManager: policy.NewManager(), policyManager: policy.NewDefaultManager(),
targetManager: target.NewDefaultManager(),
sourcer: source.NewSourcer(), sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(config.CacheCapacity), triggerManager: trigger.NewManager(cfg.CacheCapacity),
} }
// TODO read from configuration
endpoint := "http://jobservice:8080"
ctl.replicator = replicator.NewDefaultReplicator(endpoint,
&client.Config{
Secret: config.UISecret(),
})
return ctl
} }
//Init will initialize the controller and the sub components //Init will initialize the controller and the sub components
func (ctl *Controller) Init() error { func (ctl *DefaultController) Init() error {
if ctl.initialized { if ctl.initialized {
return nil return nil
} }
//Build query parameters //Build query parameters
triggerNames := []string{
replication.TriggerKindSchedule,
}
queryName := ""
for _, name := range triggerNames {
queryName = fmt.Sprintf("%s,%s", queryName, name)
}
//Enable the triggers
query := models.QueryParameter{ query := models.QueryParameter{
TriggerName: queryName, TriggerType: replication.TriggerKindSchedule,
} }
policies, err := ctl.policyManager.GetPolicies(query) policies, err := ctl.policyManager.GetPolicies(query)
@ -89,7 +126,7 @@ func (ctl *Controller) Init() error {
} }
//CreatePolicy is used to create a new policy and enable it if necessary //CreatePolicy is used to create a new policy and enable it if necessary
func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64, error) { func (ctl *DefaultController) CreatePolicy(newPolicy models.ReplicationPolicy) (int64, error) {
id, err := ctl.policyManager.CreatePolicy(newPolicy) id, err := ctl.policyManager.CreatePolicy(newPolicy)
if err != nil { if err != nil {
return 0, err return 0, err
@ -105,7 +142,7 @@ func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64,
//UpdatePolicy will update the policy with new content. //UpdatePolicy will update the policy with new content.
//Parameter updatedPolicy must have the ID of the updated policy. //Parameter updatedPolicy must have the ID of the updated policy.
func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error { func (ctl *DefaultController) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error {
// TODO check pre-conditions // TODO check pre-conditions
id := updatedPolicy.ID id := updatedPolicy.ID
@ -124,7 +161,7 @@ func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) erro
} else { } else {
switch updatedPolicy.Trigger.Kind { switch updatedPolicy.Trigger.Kind {
case replication.TriggerKindSchedule: case replication.TriggerKindSchedule:
if updatedPolicy.Trigger.Param != originPolicy.Trigger.Param { if !originPolicy.Trigger.ScheduleParam.Equal(updatedPolicy.Trigger.ScheduleParam) {
reset = true reset = true
} }
case replication.TriggerKindImmediate: case replication.TriggerKindImmediate:
@ -140,7 +177,7 @@ func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) erro
} }
if reset { if reset {
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil { if err = ctl.triggerManager.UnsetTrigger(&originPolicy); err != nil {
return err return err
} }
@ -151,7 +188,7 @@ func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) erro
} }
//RemovePolicy will remove the specified policy and clean the related settings //RemovePolicy will remove the specified policy and clean the related settings
func (ctl *Controller) RemovePolicy(policyID int64) error { func (ctl *DefaultController) RemovePolicy(policyID int64) error {
// TODO check pre-conditions // TODO check pre-conditions
policy, err := ctl.policyManager.GetPolicy(policyID) policy, err := ctl.policyManager.GetPolicy(policyID)
@ -163,7 +200,7 @@ func (ctl *Controller) RemovePolicy(policyID int64) error {
return fmt.Errorf("policy %d not found", policyID) return fmt.Errorf("policy %d not found", policyID)
} }
if err = ctl.triggerManager.UnsetTrigger(policyID, *policy.Trigger); err != nil { if err = ctl.triggerManager.UnsetTrigger(&policy); err != nil {
return err return err
} }
@ -171,20 +208,116 @@ func (ctl *Controller) RemovePolicy(policyID int64) error {
} }
//GetPolicy is delegation of GetPolicy of Policy.Manager //GetPolicy is delegation of GetPolicy of Policy.Manager
func (ctl *Controller) GetPolicy(policyID int64) (models.ReplicationPolicy, error) { func (ctl *DefaultController) GetPolicy(policyID int64) (models.ReplicationPolicy, error) {
return ctl.policyManager.GetPolicy(policyID) return ctl.policyManager.GetPolicy(policyID)
} }
//GetPolicies is delegation of GetPoliciemodels.ReplicationPolicy{}s of Policy.Manager //GetPolicies is delegation of GetPoliciemodels.ReplicationPolicy{}s of Policy.Manager
func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.ReplicationPolicy, error) { func (ctl *DefaultController) GetPolicies(query models.QueryParameter) ([]models.ReplicationPolicy, error) {
return ctl.policyManager.GetPolicies(query) return ctl.policyManager.GetPolicies(query)
} }
//Replicate starts one replication defined in the specified policy; //Replicate starts one replication defined in the specified policy;
//Can be launched by the API layer and related triggers. //Can be launched by the API layer and related triggers.
func (ctl *Controller) Replicate(policyID int64, metadate ...map[string]interface{}) error { func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]interface{}) error {
policy, err := ctl.GetPolicy(policyID)
if err != nil {
return err
}
if policy.ID == 0 {
return fmt.Errorf("policy %d not found", policyID)
}
fmt.Printf("replicating %d, metadata: %v ...\n", policyID, metadate) // prepare candidates for replication
candidates := getCandidates(&policy, ctl.sourcer, metadata...)
/*
targets := []*common_models.RepTarget{}
for _, targetID := range policy.TargetIDs {
target, err := ctl.targetManager.GetTarget(targetID)
if err != nil {
return err
}
targets = append(targets, target)
}
*/
// submit the replication
return replicate(ctl.replicator, policyID, candidates)
}
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer,
metadata ...map[string]interface{}) []models.FilterItem {
candidates := []models.FilterItem{}
if len(metadata) > 0 {
meta := metadata[0]["candidates"]
if meta != nil {
cands, ok := meta.([]models.FilterItem)
if ok {
candidates = append(candidates, cands...)
}
}
}
if len(candidates) == 0 {
for _, namespace := range policy.Namespaces {
candidates = append(candidates, models.FilterItem{
Kind: replication.FilterItemKindProject,
Value: namespace,
Operation: common_models.RepOpTransfer,
})
}
}
filterChain := buildFilterChain(policy, sourcer)
return filterChain.DoFilter(candidates)
}
func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) source.FilterChain {
filters := []source.Filter{}
patterns := map[string]string{}
for _, f := range policy.Filters {
patterns[f.Kind] = f.Pattern
}
registry := sourcer.GetAdaptor(replication.AdaptorKindHarbor)
// only support repository and tag filter for now
filters = append(filters,
source.NewRepositoryFilter(patterns[replication.FilterItemKindRepository], registry))
filters = append(filters,
source.NewTagFilter(patterns[replication.FilterItemKindTag], registry))
return source.NewDefaultFilterChain(filters)
}
func replicate(replicator replicator.Replicator, policyID int64, candidates []models.FilterItem) error {
if len(candidates) == 0 {
log.Debugf("replicaton candidates are null, no further action needed")
}
repositories := map[string][]string{}
// TODO the operation of all candidates are same for now. Update it after supporting
// replicate deletion
operation := ""
for _, candidate := range candidates {
strs := strings.SplitN(candidate.Value, ":", 2)
repositories[strs[0]] = append(repositories[strs[0]], strs[1])
operation = candidate.Operation
}
for repository, tags := range repositories {
replication := &api.ReplicationReq{
PolicyID: policyID,
Repo: repository,
Operation: operation,
TagList: tags,
}
log.Debugf("submiting replication job to jobservice: %v", replication)
if err := replicator.Replicate(replication); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -0,0 +1,141 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/source"
)
func TestMain(m *testing.M) {
// set the policy manager used by GlobalController with a fake policy manager
controller := GlobalController.(*DefaultController)
controller.policyManager = &test.FakePolicyManager{}
os.Exit(m.Run())
}
func TestNewDefaultController(t *testing.T) {
controller := NewDefaultController(ControllerConfig{})
assert.NotNil(t, controller)
}
func TestInit(t *testing.T) {
assert.Nil(t, GlobalController.Init())
}
func TestCreatePolicy(t *testing.T) {
_, err := GlobalController.CreatePolicy(models.ReplicationPolicy{
Trigger: &models.Trigger{
Kind: replication.TriggerKindManual,
},
})
assert.Nil(t, err)
}
func TestUpdatePolicy(t *testing.T) {
assert.Nil(t, GlobalController.UpdatePolicy(models.ReplicationPolicy{
ID: 2,
Trigger: &models.Trigger{
Kind: replication.TriggerKindManual,
},
}))
}
func TestRemovePolicy(t *testing.T) {
assert.Nil(t, GlobalController.RemovePolicy(1))
}
func TestGetPolicy(t *testing.T) {
_, err := GlobalController.GetPolicy(1)
assert.Nil(t, err)
}
func TestGetPolicies(t *testing.T) {
_, err := GlobalController.GetPolicies(models.QueryParameter{})
assert.Nil(t, err)
}
func TestReplicate(t *testing.T) {
// TODO
}
func TestGetCandidates(t *testing.T) {
policy := &models.ReplicationPolicy{
ID: 1,
Filters: []models.Filter{
models.Filter{
Kind: replication.FilterItemKindTag,
Pattern: "*",
},
},
Trigger: &models.Trigger{
Kind: replication.TriggerKindImmediate,
},
}
sourcer := source.NewSourcer()
candidates := []models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:release-1.0",
},
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
}
metadata := map[string]interface{}{
"candidates": candidates,
}
result := getCandidates(policy, sourcer, metadata)
assert.Equal(t, 2, len(result))
policy.Filters = []models.Filter{
models.Filter{
Kind: replication.FilterItemKindTag,
Pattern: "release-*",
},
}
result = getCandidates(policy, sourcer, metadata)
assert.Equal(t, 1, len(result))
}
func TestBuildFilterChain(t *testing.T) {
policy := &models.ReplicationPolicy{
ID: 1,
Filters: []models.Filter{
models.Filter{
Kind: replication.FilterItemKindRepository,
Pattern: "*",
},
models.Filter{
Kind: replication.FilterItemKindTag,
Pattern: "*",
},
},
}
sourcer := source.NewSourcer()
chain := buildFilterChain(policy, sourcer)
assert.Equal(t, 2, len(chain.Filters()))
}

View File

@ -19,7 +19,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/replication/event/notification" "github.com/vmware/harbor/src/replication/event/notification"
) )
@ -38,7 +38,7 @@ func (oph *OnDeletionHandler) Handle(value interface{}) error {
} }
notification := value.(notification.OnDeletionNotification) notification := value.(notification.OnDeletionNotification)
return checkAndTriggerReplication(notification.Image, replication.OperationDelete) return checkAndTriggerReplication(notification.Image, models.RepOpDelete)
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface

View File

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
@ -45,7 +46,7 @@ func (oph *OnPushHandler) Handle(value interface{}) error {
notification := value.(notification.OnPushNotification) notification := value.(notification.OnPushNotification)
return checkAndTriggerReplication(notification.Image, replication.OperationPush) return checkAndTriggerReplication(notification.Image, common_models.RepOpTransfer)
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface
@ -68,18 +69,16 @@ func checkAndTriggerReplication(image, operation string) error {
} }
for _, watchItem := range watchItems { for _, watchItem := range watchItems {
item := &models.FilterItem{ item := models.FilterItem{
Kind: replication.FilterItemKindTag, Kind: replication.FilterItemKindTag,
Value: image, Value: image,
Metadata: map[string]interface{}{ Operation: operation,
"operation": operation,
},
} }
if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{ if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
PolicyID: watchItem.PolicyID, PolicyID: watchItem.PolicyID,
Metadata: map[string]interface{}{ Metadata: map[string]interface{}{
"": []*models.FilterItem{item}, "candidates": []models.FilterItem{item},
}, },
}); err != nil { }); err != nil {
return fmt.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v", return fmt.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v",

View File

@ -43,8 +43,7 @@ func (srh *StartReplicationHandler) Handle(value interface{}) error {
} }
//Start replication //Start replication
return core.GlobalController.Replicate(notification.PolicyID, notification.Metadata)
return core.DefaultController.Replicate(notification.PolicyID, notification.Metadata)
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface

View File

@ -18,10 +18,14 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/replication/core"
"github.com/vmware/harbor/src/replication/event/notification" "github.com/vmware/harbor/src/replication/event/notification"
) )
func TestHandle(t *testing.T) { func TestHandle(t *testing.T) {
core.GlobalController = &test.FakeReplicatoinController{}
handler := &StartReplicationHandler{} handler := &StartReplicationHandler{}
assert.NotNil(t, handler.Handle(nil)) assert.NotNil(t, handler.Handle(nil))

View File

@ -0,0 +1,41 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"fmt"
"github.com/astaxie/beego/validation"
"github.com/vmware/harbor/src/replication"
)
// Filter is the data model represents the filter defined by user.
type Filter struct {
Kind string `json:"kind"`
Pattern string `json:"pattern"`
}
// Valid ...
func (f *Filter) Valid(v *validation.Validation) {
if !(f.Kind == replication.FilterItemKindProject ||
f.Kind == replication.FilterItemKindRepository ||
f.Kind == replication.FilterItemKindTag) {
v.SetError("kind", fmt.Sprintf("invalid filter kind: %s", f.Kind))
}
if len(f.Pattern) == 0 {
v.SetError("pattern", "filter pattern can not be empty")
}
}

View File

@ -1,12 +1,19 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models package models
import (
"fmt"
"github.com/astaxie/beego/validation"
"github.com/vmware/harbor/src/replication"
)
//FilterItem is the general data model represents the filtering resources which are used as input and output for the filters. //FilterItem is the general data model represents the filtering resources which are used as input and output for the filters.
type FilterItem struct { type FilterItem struct {
@ -20,20 +27,9 @@ type FilterItem struct {
//kind == 'tag', value will be tag name. //kind == 'tag', value will be tag name.
Value string `json:"value"` Value string `json:"value"`
Operation string `json:"operation"`
//Extension placeholder. //Extension placeholder.
//To append more additional information if required by the filter. //To append more additional information if required by the filter.
Metadata map[string]interface{} `json:"metadata"` Metadata map[string]interface{} `json:"metadata"`
} }
// Valid ...
func (f *FilterItem) Valid(v *validation.Validation) {
if !(f.Kind == replication.FilterItemKindProject ||
f.Kind == replication.FilterItemKindRepository ||
f.Kind == replication.FilterItemKindTag) {
v.SetError("kind", fmt.Sprintf("invalid filter kind: %s", f.Kind))
}
if len(f.Value) == 0 {
v.SetError("value", "filter value can not be empty")
}
}

View File

@ -0,0 +1,45 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"testing"
"github.com/astaxie/beego/validation"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
)
func TestValid(t *testing.T) {
cases := map[*Filter]bool{
&Filter{}: true,
&Filter{
Kind: "invalid_kind",
}: true,
&Filter{
Kind: replication.FilterItemKindRepository,
}: true,
&Filter{
Kind: replication.FilterItemKindRepository,
Pattern: "*",
}: false,
}
for filter, hasError := range cases {
v := &validation.Validation{}
filter.Valid(v)
assert.Equal(t, hasError, v.HasErrors())
}
}

View File

@ -9,7 +9,7 @@ type ReplicationPolicy struct {
ID int64 //UUID of the policy ID int64 //UUID of the policy
Name string Name string
Description string Description string
Filters []FilterItem Filters []Filter
ReplicateDeletion bool ReplicateDeletion bool
Trigger *Trigger //The trigger of the replication Trigger *Trigger //The trigger of the replication
ProjectIDs []int64 //Projects attached to this policy ProjectIDs []int64 //Projects attached to this policy
@ -27,8 +27,8 @@ type QueryParameter struct {
//Size of each page, couple with page //Size of each page, couple with page
PageSize int64 PageSize int64
//Query by the name of trigger //Query by the type of trigger
TriggerName string TriggerType string
//Query by project ID //Query by project ID
ProjectID int64 ProjectID int64

View File

@ -1,3 +1,17 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models package models
import ( import (
@ -9,11 +23,8 @@ import (
//Trigger is replication launching approach definition //Trigger is replication launching approach definition
type Trigger struct { type Trigger struct {
//The name of the trigger Kind string `json:"kind"` // the type of the trigger
Kind string `json:"kind"` ScheduleParam *ScheduleParam `json:"schedule_param"` // optional, only used when kind is 'schedule'
//The parameters with json text format required by the trigger
Param string `json:"param"`
} }
// Valid ... // Valid ...
@ -23,4 +34,46 @@ func (t *Trigger) Valid(v *validation.Validation) {
t.Kind == replication.TriggerKindSchedule) { t.Kind == replication.TriggerKindSchedule) {
v.SetError("kind", fmt.Sprintf("invalid trigger kind: %s", t.Kind)) v.SetError("kind", fmt.Sprintf("invalid trigger kind: %s", t.Kind))
} }
if t.Kind == replication.TriggerKindSchedule {
if t.ScheduleParam == nil {
v.SetError("schedule_param", "empty schedule_param")
} else {
t.ScheduleParam.Valid(v)
}
}
}
// ScheduleParam defines the parameters used by schedule trigger
type ScheduleParam struct {
Type string `json:"type"` //daily or weekly
Weekday int8 `json:"weekday"` //Optional, only used when type is 'weekly'
Offtime int64 `json:"offtime"` //The time offset with the UTC 00:00 in seconds
}
// Valid ...
func (s *ScheduleParam) Valid(v *validation.Validation) {
if !(s.Type == replication.TriggerScheduleDaily ||
s.Type == replication.TriggerScheduleWeekly) {
v.SetError("type", fmt.Sprintf("invalid schedule trigger parameter type: %s", s.Type))
}
if s.Type == replication.TriggerScheduleWeekly {
if s.Weekday < 1 || s.Weekday > 7 {
v.SetError("weekday", fmt.Sprintf("invalid schedule trigger parameter weekday: %d", s.Weekday))
}
}
if s.Offtime < 0 || s.Offtime > 3600*24 {
v.SetError("offtime", fmt.Sprintf("invalid schedule trigger parameter offtime: %d", s.Offtime))
}
}
// Equal ...
func (s *ScheduleParam) Equal(param *ScheduleParam) bool {
if param == nil {
return false
}
return s.Type == param.Type && s.Weekday == param.Weekday && s.Offtime == param.Offtime
} }

View File

@ -0,0 +1,77 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"testing"
"github.com/astaxie/beego/validation"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
)
func TestValidOfTrigger(t *testing.T) {
cases := map[*Trigger]bool{
&Trigger{}: true,
&Trigger{
Kind: "invalid_kind",
}: true,
&Trigger{
Kind: replication.TriggerKindImmediate,
}: false,
&Trigger{
Kind: replication.TriggerKindSchedule,
}: true,
}
for filter, hasError := range cases {
v := &validation.Validation{}
filter.Valid(v)
assert.Equal(t, hasError, v.HasErrors())
}
}
func TestValidOfScheduleParam(t *testing.T) {
cases := map[*ScheduleParam]bool{
&ScheduleParam{}: true,
&ScheduleParam{
Type: "invalid_type",
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleDaily,
Offtime: 3600*24 + 1,
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleDaily,
Offtime: 3600 * 2,
}: false,
&ScheduleParam{
Type: replication.TriggerScheduleWeekly,
Weekday: 0,
Offtime: 3600 * 2,
}: true,
&ScheduleParam{
Type: replication.TriggerScheduleWeekly,
Weekday: 7,
Offtime: 3600 * 2,
}: false,
}
for param, hasError := range cases {
v := &validation.Validation{}
param.Valid(v)
assert.Equal(t, hasError, v.HasErrors())
}
}

View File

@ -1,3 +1,17 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package policy package policy
import ( import (
@ -7,18 +21,28 @@ import (
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
persist_models "github.com/vmware/harbor/src/common/models" persist_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/ui/config"
) )
//Manager provides replication policy CURD capabilities. // Manager defines the method a policy manger should implement
type Manager struct{} type Manager interface {
GetPolicies(models.QueryParameter) ([]models.ReplicationPolicy, error)
GetPolicy(int64) (models.ReplicationPolicy, error)
CreatePolicy(models.ReplicationPolicy) (int64, error)
UpdatePolicy(models.ReplicationPolicy) error
RemovePolicy(int64) error
}
//NewManager is the constructor of Manager. //DefaultManager provides replication policy CURD capabilities.
func NewManager() *Manager { type DefaultManager struct{}
return &Manager{}
//NewDefaultManager is the constructor of DefaultManager.
func NewDefaultManager() *DefaultManager {
return &DefaultManager{}
} }
//GetPolicies returns all the policies //GetPolicies returns all the policies
func (m *Manager) GetPolicies(query models.QueryParameter) ([]models.ReplicationPolicy, error) { func (m *DefaultManager) GetPolicies(query models.QueryParameter) ([]models.ReplicationPolicy, error) {
result := []models.ReplicationPolicy{} result := []models.ReplicationPolicy{}
//TODO support more query conditions other than name and project ID //TODO support more query conditions other than name and project ID
policies, err := dao.FilterRepPolicies(query.Name, query.ProjectID) policies, err := dao.FilterRepPolicies(query.Name, query.ProjectID)
@ -31,6 +55,13 @@ func (m *Manager) GetPolicies(query models.QueryParameter) ([]models.Replication
if err != nil { if err != nil {
return []models.ReplicationPolicy{}, err return []models.ReplicationPolicy{}, err
} }
if len(query.TriggerType) > 0 {
if ply.Trigger.Kind != query.TriggerType {
continue
}
}
result = append(result, ply) result = append(result, ply)
} }
@ -38,7 +69,7 @@ func (m *Manager) GetPolicies(query models.QueryParameter) ([]models.Replication
} }
//GetPolicy returns the policy with the specified ID //GetPolicy returns the policy with the specified ID
func (m *Manager) GetPolicy(policyID int64) (models.ReplicationPolicy, error) { func (m *DefaultManager) GetPolicy(policyID int64) (models.ReplicationPolicy, error) {
policy, err := dao.GetRepPolicy(policyID) policy, err := dao.GetRepPolicy(policyID)
if err != nil { if err != nil {
return models.ReplicationPolicy{}, err return models.ReplicationPolicy{}, err
@ -47,7 +78,6 @@ func (m *Manager) GetPolicy(policyID int64) (models.ReplicationPolicy, error) {
return convertFromPersistModel(policy) return convertFromPersistModel(policy)
} }
// TODO add UT
func convertFromPersistModel(policy *persist_models.RepPolicy) (models.ReplicationPolicy, error) { func convertFromPersistModel(policy *persist_models.RepPolicy) (models.ReplicationPolicy, error) {
if policy == nil { if policy == nil {
return models.ReplicationPolicy{}, nil return models.ReplicationPolicy{}, nil
@ -64,8 +94,14 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (models.Replicati
UpdateTime: policy.UpdateTime, UpdateTime: policy.UpdateTime,
} }
project, err := config.GlobalProjectMgr.Get(policy.ProjectID)
if err != nil {
return models.ReplicationPolicy{}, err
}
ply.Namespaces = []string{project.Name}
if len(policy.Filters) > 0 { if len(policy.Filters) > 0 {
filters := []models.FilterItem{} filters := []models.Filter{}
if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil { if err := json.Unmarshal([]byte(policy.Filters), &filters); err != nil {
return models.ReplicationPolicy{}, err return models.ReplicationPolicy{}, err
} }
@ -83,7 +119,6 @@ func convertFromPersistModel(policy *persist_models.RepPolicy) (models.Replicati
return ply, nil return ply, nil
} }
// TODO add ut
func convertToPersistModel(policy models.ReplicationPolicy) (*persist_models.RepPolicy, error) { func convertToPersistModel(policy models.ReplicationPolicy) (*persist_models.RepPolicy, error) {
ply := &persist_models.RepPolicy{ ply := &persist_models.RepPolicy{
ID: policy.ID, ID: policy.ID,
@ -124,7 +159,7 @@ func convertToPersistModel(policy models.ReplicationPolicy) (*persist_models.Rep
//CreatePolicy creates a new policy with the provided data; //CreatePolicy creates a new policy with the provided data;
//If creating failed, error will be returned; //If creating failed, error will be returned;
//If creating succeed, ID of the new created policy will be returned. //If creating succeed, ID of the new created policy will be returned.
func (m *Manager) CreatePolicy(policy models.ReplicationPolicy) (int64, error) { func (m *DefaultManager) CreatePolicy(policy models.ReplicationPolicy) (int64, error) {
now := time.Now() now := time.Now()
policy.CreationTime = now policy.CreationTime = now
policy.UpdateTime = now policy.UpdateTime = now
@ -137,7 +172,7 @@ func (m *Manager) CreatePolicy(policy models.ReplicationPolicy) (int64, error) {
//UpdatePolicy updates the policy; //UpdatePolicy updates the policy;
//If updating failed, error will be returned. //If updating failed, error will be returned.
func (m *Manager) UpdatePolicy(policy models.ReplicationPolicy) error { func (m *DefaultManager) UpdatePolicy(policy models.ReplicationPolicy) error {
policy.UpdateTime = time.Now() policy.UpdateTime = time.Now()
ply, err := convertToPersistModel(policy) ply, err := convertToPersistModel(policy)
if err != nil { if err != nil {
@ -148,6 +183,6 @@ func (m *Manager) UpdatePolicy(policy models.ReplicationPolicy) error {
//RemovePolicy removes the specified policy; //RemovePolicy removes the specified policy;
//If removing failed, error will be returned. //If removing failed, error will be returned.
func (m *Manager) RemovePolicy(policyID int64) error { func (m *DefaultManager) RemovePolicy(policyID int64) error {
return dao.DeleteRepPolicy(policyID) return dao.DeleteRepPolicy(policyID)
} }

View File

@ -0,0 +1,60 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package policy
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware/harbor/src/replication/models"
)
func TestConvertToPersistModel(t *testing.T) {
var id, projectID, targetID int64 = 1, 1, 1
name := "policy01"
replicateDeletion := true
trigger := &models.Trigger{
Kind: "trigger_kind",
}
filters := []models.Filter{
models.Filter{
Kind: "filter_kind",
Pattern: "filter_pattern",
},
}
policy := models.ReplicationPolicy{
ID: id,
Name: name,
ReplicateDeletion: replicateDeletion,
ProjectIDs: []int64{projectID},
TargetIDs: []int64{targetID},
Trigger: trigger,
Filters: filters,
}
ply, err := convertToPersistModel(policy)
require.Nil(t, err)
assert.Equal(t, id, ply.ID)
assert.Equal(t, name, ply.Name)
assert.Equal(t, replicateDeletion, ply.ReplicateDeletion)
assert.Equal(t, projectID, ply.ProjectID)
assert.Equal(t, targetID, ply.TargetID)
tg, _ := json.Marshal(trigger)
assert.Equal(t, string(tg), ply.Trigger)
ft, _ := json.Marshal(filters)
assert.Equal(t, string(ft), ply.Filters)
}

View File

@ -1,10 +1,15 @@
package registry package registry
import ( import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/ui/utils"
) )
// TODO refacotor the methods of HarborAdaptor by caling Harbor's API
//HarborAdaptor is defined to adapt the Harbor registry //HarborAdaptor is defined to adapt the Harbor registry
type HarborAdaptor struct{} type HarborAdaptor struct{}
@ -25,7 +30,19 @@ func (ha *HarborAdaptor) GetNamespace(name string) models.Namespace {
//GetRepositories is used to get all the repositories under the specified namespace //GetRepositories is used to get all the repositories under the specified namespace
func (ha *HarborAdaptor) GetRepositories(namespace string) []models.Repository { func (ha *HarborAdaptor) GetRepositories(namespace string) []models.Repository {
return nil repos, err := dao.GetRepositoryByProjectName(namespace)
if err != nil {
log.Errorf("failed to get repositories under namespace %s: %v", namespace, err)
return nil
}
repositories := []models.Repository{}
for _, repo := range repos {
repositories = append(repositories, models.Repository{
Name: repo.Name,
})
}
return repositories
} }
//GetRepository is used to get the repository with the specified name under the specified namespace //GetRepository is used to get the repository with the specified name under the specified namespace
@ -35,7 +52,26 @@ func (ha *HarborAdaptor) GetRepository(name string, namespace string) models.Rep
//GetTags is used to get all the tags of the specified repository under the namespace //GetTags is used to get all the tags of the specified repository under the namespace
func (ha *HarborAdaptor) GetTags(repositoryName string, namespace string) []models.Tag { func (ha *HarborAdaptor) GetTags(repositoryName string, namespace string) []models.Tag {
return nil client, err := utils.NewRepositoryClientForUI("harbor-ui", repositoryName)
if err != nil {
log.Errorf("failed to create registry client: %v", err)
return nil
}
ts, err := client.ListTag()
if err != nil {
log.Errorf("failed to get tags of repository %s: %v", repositoryName, err)
return nil
}
tags := []models.Tag{}
for _, t := range ts {
tags = append(tags, models.Tag{
Name: t,
})
}
return tags
} }
//GetTag is used to get the tag with the specified name of the repository under the namespace //GetTag is used to get the tag with the specified name of the repository under the namespace

View File

@ -0,0 +1,42 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replicator
import (
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
)
// Replicator submits the replication work to the jobservice
type Replicator interface {
Replicate(*api.ReplicationReq) error
}
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client client.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(endpoint string, cfg *client.Config) *DefaultReplicator {
return &DefaultReplicator{
client: client.NewDefaultClient(endpoint, cfg),
}
}
// Replicate ...
func (d *DefaultReplicator) Replicate(replication *api.ReplicationReq) error {
return d.client.SubmitReplicationJob(replication)
}

View File

@ -12,39 +12,24 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package auth package replicator
import ( import (
"net/http" "testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/jobservice/api"
"github.com/vmware/harbor/src/jobservice/client"
) )
// Authorizer authorizes request type fakeJobserviceClient struct{}
type Authorizer interface {
Authorize(*http.Request) error
}
// NewSecretAuthorizer returns an instance of secretAuthorizer
func NewSecretAuthorizer(cookieName, secret string) Authorizer {
return &secretAuthorizer{
cookieName: cookieName,
secret: secret,
}
}
type secretAuthorizer struct {
cookieName string
secret string
}
func (s *secretAuthorizer) Authorize(req *http.Request) error {
if req == nil {
return nil
}
req.AddCookie(&http.Cookie{
Name: s.cookieName,
Value: s.secret,
})
func (f *fakeJobserviceClient) SubmitReplicationJob(replication *api.ReplicationReq) error {
return nil return nil
} }
func TestReplicate(t *testing.T) {
replicator := NewDefaultReplicator("http://jobservice", &client.Config{})
replicator.client = &fakeJobserviceClient{}
assert.Nil(t, replicator.Replicate(&api.ReplicationReq{}))
}

View File

@ -0,0 +1,23 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"path/filepath"
)
func match(pattern, str string) (bool, error) {
return filepath.Match(pattern, str)
}

View File

@ -15,7 +15,6 @@
package source package source
import ( import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry" "github.com/vmware/harbor/src/replication/registry"
@ -35,23 +34,22 @@ func NewRepositoryConvertor(registry registry.Adaptor) *RepositoryConvertor {
// Convert projects to repositories // Convert projects to repositories
func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.FilterItem { func (r *RepositoryConvertor) Convert(items []models.FilterItem) []models.FilterItem {
// TODO get repositories from database where the push/deletion operations are recorded
// if support replicate deletion
result := []models.FilterItem{} result := []models.FilterItem{}
for _, item := range items { for _, item := range items {
// just put it to the result list if the item is not a project
if item.Kind != replication.FilterItemKindProject { if item.Kind != replication.FilterItemKindProject {
log.Warningf("unexpected filter item kind for repository convertor, expected %s got %s, skip", result = append(result, item)
replication.FilterItemKindProject, item.Kind)
continue continue
} }
repositories := r.registry.GetRepositories(item.Value) repositories := r.registry.GetRepositories(item.Value)
for _, repository := range repositories { for _, repository := range repositories {
result = append(result, models.FilterItem{ result = append(result, models.FilterItem{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: repository.Name, Value: repository.Name,
// public is used to create project if it does not exist when replicating Operation: item.Operation,
Metadata: map[string]interface{}{
"public": item.Metadata["public"],
},
}) })
} }
} }

View File

@ -27,9 +27,6 @@ func TestRepositoryConvert(t *testing.T) {
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindProject, Kind: replication.FilterItemKindProject,
Value: "library", Value: "library",
Metadata: map[string]interface{}{
"public": true,
},
}, },
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
@ -39,16 +36,13 @@ func TestRepositoryConvert(t *testing.T) {
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "library/ubuntu", Value: "library/ubuntu",
Metadata: map[string]interface{}{
"public": true,
},
}, },
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "library/centos", Value: "library/centos",
Metadata: map[string]interface{}{ },
"public": true, models.FilterItem{
}, Kind: replication.FilterItemKindRepository,
}, },
} }
@ -88,10 +82,10 @@ func (f *fakeRegistryAdaptor) GetRepository(name string, namespace string) model
func (f *fakeRegistryAdaptor) GetTags(repositoryName string, namespace string) []models.Tag { func (f *fakeRegistryAdaptor) GetTags(repositoryName string, namespace string) []models.Tag {
return []models.Tag{ return []models.Tag{
models.Tag{ models.Tag{
Name: "library/ubuntu:14.04", Name: "14.04",
}, },
models.Tag{ models.Tag{
Name: "library/ubuntu:16.04", Name: "16.04",
}, },
} }
} }

View File

@ -0,0 +1,86 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"strings"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
// RepositoryFilter implement Filter interface to filter repository
type RepositoryFilter struct {
pattern string
convertor Convertor
}
// NewRepositoryFilter returns an instance of RepositoryFilter
func NewRepositoryFilter(pattern string, registry registry.Adaptor) *RepositoryFilter {
return &RepositoryFilter{
pattern: pattern,
convertor: NewRepositoryConvertor(registry),
}
}
// Init ...
func (r *RepositoryFilter) Init() error {
return nil
}
// GetConvertor ...
func (r *RepositoryFilter) GetConvertor() Convertor {
return r.convertor
}
// DoFilter filters repository and image(according to the repository part) and drops any other resource types
func (r *RepositoryFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
candidates := []string{}
for _, item := range items {
candidates = append(candidates, item.Value)
}
log.Debugf("repository filter candidates: %v", candidates)
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindRepository && item.Kind != replication.FilterItemKindTag {
log.Warningf("unsupported type %s for repository filter, drop", item.Kind)
continue
}
repository := item.Value
if item.Kind == replication.FilterItemKindTag {
repository = strings.SplitN(repository, ":", 2)[0]
}
if len(r.pattern) == 0 {
log.Debugf("pattern is null, add %s to the repository filter result list", item.Value)
result = append(result, item)
} else {
matched, err := match(r.pattern, repository)
if err != nil {
log.Errorf("failed to match pattern %s to value %s: %v", r.pattern, repository, err)
break
}
if matched {
log.Debugf("pattern %s matched, add %s to the repository filter result list", r.pattern, item.Value)
result = append(result, item)
}
}
}
return result
}

View File

@ -0,0 +1,75 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
func TestInitOfRepositoryFilter(t *testing.T) {
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
assert.Nil(t, filter.Init())
}
func TestGetConvertorOfRepositoryFilter(t *testing.T) {
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
assert.NotNil(t, filter.GetConvertor())
}
func TestDoFilterOfRepositoryFilter(t *testing.T) {
// invalid filter item type
filter := NewRepositoryFilter("", &registry.HarborAdaptor{})
items := filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: "invalid_type",
},
})
assert.Equal(t, 0, len(items))
// empty pattern
filter = NewRepositoryFilter("", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindRepository,
Value: "library/hello-world",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewRepositoryFilter("library/*", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewRepositoryFilter("library/*", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
}

View File

@ -60,6 +60,7 @@ func (t *TagCombinationFilter) DoFilter(filterItems []models.FilterItem) []model
repos[strs[0]] = append(repos[strs[0]], strs[1]) repos[strs[0]] = append(repos[strs[0]], strs[1])
} }
// TODO append operation
items := []models.FilterItem{} items := []models.FilterItem{}
for repo, tags := range repos { for repo, tags := range repos {
items = append(items, models.FilterItem{ items = append(items, models.FilterItem{

View File

@ -15,7 +15,6 @@
package source package source
import ( import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry" "github.com/vmware/harbor/src/replication/registry"
@ -38,20 +37,17 @@ func (t *TagConvertor) Convert(items []models.FilterItem) []models.FilterItem {
result := []models.FilterItem{} result := []models.FilterItem{}
for _, item := range items { for _, item := range items {
if item.Kind != replication.FilterItemKindRepository { if item.Kind != replication.FilterItemKindRepository {
log.Warningf("unexpected filter item kind for tag convertor, expected %s got %s, skip", // just put it to the result list if the item is not a repository
replication.FilterItemKindRepository, item.Kind) result = append(result, item)
continue continue
} }
tags := t.registry.GetTags(item.Value, "") tags := t.registry.GetTags(item.Value, "")
for _, tag := range tags { for _, tag := range tags {
result = append(result, models.FilterItem{ result = append(result, models.FilterItem{
Kind: replication.FilterItemKindTag, Kind: replication.FilterItemKindTag,
Value: tag.Name, Value: item.Value + ":" + tag.Name,
// public is used to create project if it does not exist when replicating Operation: item.Operation,
Metadata: map[string]interface{}{
"public": item.Metadata["public"],
},
}) })
} }
} }

View File

@ -27,9 +27,6 @@ func TestTagConvert(t *testing.T) {
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "library/ubuntu", Value: "library/ubuntu",
Metadata: map[string]interface{}{
"public": true,
},
}, },
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindProject, Kind: replication.FilterItemKindProject,
@ -39,16 +36,13 @@ func TestTagConvert(t *testing.T) {
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindTag, Kind: replication.FilterItemKindTag,
Value: "library/ubuntu:14.04", Value: "library/ubuntu:14.04",
Metadata: map[string]interface{}{
"public": true,
},
}, },
models.FilterItem{ models.FilterItem{
Kind: replication.FilterItemKindTag, Kind: replication.FilterItemKindTag,
Value: "library/ubuntu:16.04", Value: "library/ubuntu:16.04",
Metadata: map[string]interface{}{ },
"public": true, models.FilterItem{
}, Kind: replication.FilterItemKindProject,
}, },
} }

View File

@ -0,0 +1,84 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"strings"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
// TagFilter implements Filter interface to filter tag
type TagFilter struct {
pattern string
convertor Convertor
}
// NewTagFilter returns an instance of TagFilter
func NewTagFilter(pattern string, registry registry.Adaptor) *TagFilter {
return &TagFilter{
pattern: pattern,
convertor: NewTagConvertor(registry),
}
}
// Init ...
func (t *TagFilter) Init() error {
return nil
}
// GetConvertor ...
func (t *TagFilter) GetConvertor() Convertor {
return t.convertor
}
// DoFilter filters tag of the image
func (t *TagFilter) DoFilter(items []models.FilterItem) []models.FilterItem {
candidates := []string{}
for _, item := range items {
candidates = append(candidates, item.Value)
}
log.Debugf("tag filter candidates: %v", candidates)
result := []models.FilterItem{}
for _, item := range items {
if item.Kind != replication.FilterItemKindTag {
log.Warningf("unsupported type %s for tag filter, dropped", item.Kind)
continue
}
if len(t.pattern) == 0 {
log.Debugf("pattern is null, add %s to the tag filter result list", item.Value)
result = append(result, item)
continue
}
tag := strings.SplitN(item.Value, ":", 2)[1]
matched, err := match(t.pattern, tag)
if err != nil {
log.Errorf("failed to match pattern %s to value %s: %v", t.pattern, tag, err)
continue
}
if matched {
log.Debugf("pattern %s matched, add %s to the tag filter result list", t.pattern, item.Value)
result = append(result, item)
}
}
return result
}

View File

@ -0,0 +1,85 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package source
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/registry"
)
func TestInitOfTagFilter(t *testing.T) {
filter := NewTagFilter("", &registry.HarborAdaptor{})
assert.Nil(t, filter.Init())
}
func TestGetConvertorOfTagFilter(t *testing.T) {
filter := NewTagFilter("", &registry.HarborAdaptor{})
assert.NotNil(t, filter.GetConvertor())
}
func TestDoFilterOfTagFilter(t *testing.T) {
// invalid filter item type
filter := NewTagFilter("", &registry.HarborAdaptor{})
items := filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: "invalid_type",
},
})
assert.Equal(t, 0, len(items))
// empty pattern
filter = NewTagFilter("", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("l*t", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("lates?", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 1, len(items))
// non-empty pattern
filter = NewTagFilter("latest?", &registry.HarborAdaptor{})
items = filter.DoFilter([]models.FilterItem{
models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: "library/hello-world:latest",
},
})
assert.Equal(t, 0, len(items))
}

View File

@ -0,0 +1,38 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package target
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
)
// Manager defines the methods that a target manager should implement
type Manager interface {
GetTarget(int64) (*models.RepTarget, error)
}
// DefaultManager implement the Manager interface
type DefaultManager struct{}
// NewDefaultManager returns an instance of DefaultManger
func NewDefaultManager() *DefaultManager {
return &DefaultManager{}
}
// GetTarget ...
func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) {
return dao.GetRepTarget(id)
}

View File

@ -1,7 +1,6 @@
package trigger package trigger
import ( import (
"errors"
"fmt" "fmt"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
@ -55,88 +54,71 @@ func (m *Manager) RemoveTrigger(policyID int64) error {
//SetupTrigger will create the new trigger based on the provided policy. //SetupTrigger will create the new trigger based on the provided policy.
//If failed, an error will be returned. //If failed, an error will be returned.
func (m *Manager) SetupTrigger(policy *models.ReplicationPolicy) error { func (m *Manager) SetupTrigger(policy *models.ReplicationPolicy) error {
if policy == nil || policy.Trigger == nil { trigger, err := createTrigger(policy)
log.Debug("empty policy or trigger, skip trigger setup") if err != nil {
return err
}
// manual trigger, do nothing
if trigger == nil {
return nil return nil
} }
tg := trigger.(Interface)
if err = tg.Setup(); err != nil {
return err
}
log.Debugf("%s trigger for policy %d is set", tg.Kind(), policy.ID)
return nil
}
//UnsetTrigger will disable the trigger which is not cached in the trigger cache.
func (m *Manager) UnsetTrigger(policy *models.ReplicationPolicy) error {
trigger, err := createTrigger(policy)
if err != nil {
return err
}
// manual trigger, do nothing
if trigger == nil {
return nil
}
tg := trigger.(Interface)
if err = tg.Unset(); err != nil {
return err
}
log.Debugf("%s trigger for policy %d is unset", tg.Kind(), policy.ID)
return nil
}
func createTrigger(policy *models.ReplicationPolicy) (interface{}, error) {
if policy == nil || policy.Trigger == nil {
return nil, fmt.Errorf("empty policy or trigger")
}
trigger := policy.Trigger trigger := policy.Trigger
switch trigger.Kind { switch trigger.Kind {
case replication.TriggerKindSchedule: case replication.TriggerKindSchedule:
param := ScheduleParam{} param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion param.Type = trigger.ScheduleParam.Type
param.Weekday = trigger.ScheduleParam.Weekday
param.Offtime = trigger.ScheduleParam.Offtime
newTrigger := NewScheduleTrigger(param) return NewScheduleTrigger(param), nil
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindImmediate: case replication.TriggerKindImmediate:
param := ImmediateParam{} param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion param.OnDeletion = policy.ReplicateDeletion
param.Namespaces = policy.Namespaces param.Namespaces = policy.Namespaces
newTrigger := NewImmediateTrigger(param) return NewImmediateTrigger(param), nil
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindManual: case replication.TriggerKindManual:
// do nothing return nil, nil
default: default:
return fmt.Errorf("invalid trigger type: %s", policy.Trigger.Kind) return nil, fmt.Errorf("invalid trigger type: %s", trigger.Kind)
} }
return nil
}
//UnsetTrigger will disable the trigger which is not cached in the trigger cache.
func (m *Manager) UnsetTrigger(policyID int64, trigger models.Trigger) error {
if policyID <= 0 {
return errors.New("Invalid policy ID")
}
if len(trigger.Kind) == 0 {
return errors.New("Invalid replication trigger definition")
}
switch trigger.Kind {
case replication.TriggerKindSchedule:
param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewScheduleTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
case replication.TriggerKindImmediate:
param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewImmediateTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
default:
//Treat as manual trigger
break
}
return nil
} }

View File

@ -2,6 +2,7 @@ package trigger
import ( import (
"fmt" "fmt"
"time"
"github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/common/scheduler/policy" "github.com/vmware/harbor/src/common/scheduler/policy"
@ -31,10 +32,10 @@ func (st *ScheduleTrigger) Setup() error {
config := &policy.AlternatePolicyConfiguration{} config := &policy.AlternatePolicyConfiguration{}
switch st.params.Type { switch st.params.Type {
case replication.TriggerScheduleDaily: case replication.TriggerScheduleDaily:
config.Duration = 24 * 3600 config.Duration = 24 * 3600 * time.Second
config.OffsetTime = st.params.Offtime config.OffsetTime = st.params.Offtime
case replication.TriggerScheduleWeekly: case replication.TriggerScheduleWeekly:
config.Duration = 7 * 24 * 3600 config.Duration = 7 * 24 * 3600 * time.Second
config.OffsetTime = st.params.Offtime config.OffsetTime = st.params.Offtime
config.Weekday = st.params.Weekday config.Weekday = st.params.Weekday
default: default:

View File

@ -118,10 +118,6 @@ func CommonDelTarget() {
_ = dao.DeleteRepTarget(target.ID) _ = dao.DeleteRepTarget(target.ID)
} }
func CommonPolicyEabled(policyID int, enabled int) {
_ = dao.UpdateRepPolicyEnablement(int64(policyID), enabled)
}
func CommonAddRepository() { func CommonAddRepository() {
commonRepository := &models.RepoRecord{ commonRepository := &models.RepoRecord{
RepositoryID: 1, RepositoryID: 1,

View File

@ -40,6 +40,7 @@ import (
"github.com/dghubble/sling" "github.com/dghubble/sling"
//for test env prepare //for test env prepare
"github.com/vmware/harbor/src/replication/core"
_ "github.com/vmware/harbor/src/replication/event" _ "github.com/vmware/harbor/src/replication/event"
_ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/db"
_ "github.com/vmware/harbor/src/ui/auth/ldap" _ "github.com/vmware/harbor/src/ui/auth/ldap"
@ -133,6 +134,10 @@ func init() {
_ = updateInitPassword(1, "Harbor12345") _ = updateInitPassword(1, "Harbor12345")
if err := core.GlobalController.Init(); err != nil {
log.Fatalf("failed to initialize GlobalController: %v", err)
}
//syncRegistry //syncRegistry
if err := SyncRegistry(config.GlobalProjectMgr); err != nil { if err := SyncRegistry(config.GlobalProjectMgr); err != nil {
log.Fatalf("failed to sync repositories from registry: %v", err) log.Fatalf("failed to sync repositories from registry: %v", err)

View File

@ -27,7 +27,7 @@ type ReplicationPolicy struct {
ID int64 `json:"id"` ID int64 `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Description string `json:"description"` Description string `json:"description"`
Filters []rep_models.FilterItem `json:"filters"` Filters []rep_models.Filter `json:"filters"`
ReplicateDeletion bool `json:"replicate_deletion"` ReplicateDeletion bool `json:"replicate_deletion"`
Trigger *rep_models.Trigger `json:"trigger"` Trigger *rep_models.Trigger `json:"trigger"`
Projects []*common_models.Project `json:"projects"` Projects []*common_models.Project `json:"projects"`

View File

@ -49,7 +49,7 @@ func (r *ReplicationAPI) Post() {
replication := &models.Replication{} replication := &models.Replication{}
r.DecodeJSONReqAndValidate(replication) r.DecodeJSONReqAndValidate(replication)
policy, err := core.DefaultController.GetPolicy(replication.PolicyID) policy, err := core.GlobalController.GetPolicy(replication.PolicyID)
if err != nil { if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get replication policy %d: %v", replication.PolicyID, err)) r.HandleInternalServerError(fmt.Sprintf("failed to get replication policy %d: %v", replication.PolicyID, err))
return return
@ -60,11 +60,16 @@ func (r *ReplicationAPI) Post() {
return return
} }
if err = notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{ if err = startReplication(replication.PolicyID); err != nil {
PolicyID: replication.PolicyID,
}); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err)) r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err))
return return
} }
log.Infof("replication topic for policy %d triggered", replication.PolicyID) log.Infof("replication signal for policy %d sent", replication.PolicyID)
}
func startReplication(policyID int64) error {
return notifier.Publish(topic.StartReplicationTopic,
notification.StartReplicationNotification{
PolicyID: policyID,
})
} }

View File

@ -51,7 +51,7 @@ func (pa *RepPolicyAPI) Prepare() {
// Get ... // Get ...
func (pa *RepPolicyAPI) Get() { func (pa *RepPolicyAPI) Get() {
id := pa.GetIDFromURL() id := pa.GetIDFromURL()
policy, err := core.DefaultController.GetPolicy(id) policy, err := core.GlobalController.GetPolicy(id)
if err != nil { if err != nil {
log.Errorf("failed to get policy %d: %v", id, err) log.Errorf("failed to get policy %d: %v", id, err)
pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
@ -87,7 +87,7 @@ func (pa *RepPolicyAPI) List() {
result := []*api_models.ReplicationPolicy{} result := []*api_models.ReplicationPolicy{}
policies, err := core.DefaultController.GetPolicies(queryParam) policies, err := core.GlobalController.GetPolicies(queryParam)
if err != nil { if err != nil {
log.Errorf("failed to get policies: %v, query parameters: %v", err, queryParam) log.Errorf("failed to get policies: %v, query parameters: %v", err, queryParam)
pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
@ -139,13 +139,21 @@ func (pa *RepPolicyAPI) Post() {
} }
} }
id, err := core.DefaultController.CreatePolicy(convertToRepPolicy(policy)) id, err := core.GlobalController.CreatePolicy(convertToRepPolicy(policy))
if err != nil { if err != nil {
pa.HandleInternalServerError(fmt.Sprintf("failed to create policy: %v", err)) pa.HandleInternalServerError(fmt.Sprintf("failed to create policy: %v", err))
return return
} }
// TODO trigger a replication if ReplicateExistingImageNow is true if policy.ReplicateExistingImageNow {
go func() {
if err = startReplication(id); err != nil {
log.Errorf("failed to send replication signal for policy %d: %v", id, err)
return
}
log.Infof("replication signal for policy %d sent", id)
}()
}
pa.Redirect(http.StatusCreated, strconv.FormatInt(id, 10)) pa.Redirect(http.StatusCreated, strconv.FormatInt(id, 10))
} }
@ -154,7 +162,7 @@ func (pa *RepPolicyAPI) Post() {
func (pa *RepPolicyAPI) Put() { func (pa *RepPolicyAPI) Put() {
id := pa.GetIDFromURL() id := pa.GetIDFromURL()
originalPolicy, err := core.DefaultController.GetPolicy(id) originalPolicy, err := core.GlobalController.GetPolicy(id)
if err != nil { if err != nil {
log.Errorf("failed to get policy %d: %v", id, err) log.Errorf("failed to get policy %d: %v", id, err)
pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
@ -197,17 +205,27 @@ func (pa *RepPolicyAPI) Put() {
} }
} }
if err = core.DefaultController.UpdatePolicy(convertToRepPolicy(policy)); err != nil { if err = core.GlobalController.UpdatePolicy(convertToRepPolicy(policy)); err != nil {
pa.HandleInternalServerError(fmt.Sprintf("failed to update policy %d: %v", id, err)) pa.HandleInternalServerError(fmt.Sprintf("failed to update policy %d: %v", id, err))
return return
} }
if policy.ReplicateExistingImageNow {
go func() {
if err = startReplication(id); err != nil {
log.Errorf("failed to send replication signal for policy %d: %v", id, err)
return
}
log.Infof("replication signal for policy %d sent", id)
}()
}
} }
// Delete the replication policy // Delete the replication policy
func (pa *RepPolicyAPI) Delete() { func (pa *RepPolicyAPI) Delete() {
id := pa.GetIDFromURL() id := pa.GetIDFromURL()
policy, err := core.DefaultController.GetPolicy(id) policy, err := core.GlobalController.GetPolicy(id)
if err != nil { if err != nil {
log.Errorf("failed to get policy %d: %v", id, err) log.Errorf("failed to get policy %d: %v", id, err)
pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) pa.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
@ -232,7 +250,7 @@ func (pa *RepPolicyAPI) Delete() {
} }
} }
if err = core.DefaultController.RemovePolicy(id); err != nil { if err = core.GlobalController.RemovePolicy(id); err != nil {
log.Errorf("failed to delete policy %d: %v", id, err) log.Errorf("failed to delete policy %d: %v", id, err)
pa.CustomAbort(http.StatusInternalServerError, "") pa.CustomAbort(http.StatusInternalServerError, "")
} }

View File

@ -123,10 +123,10 @@ func TestRepPolicyAPIPost(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: "invalid_filter_kind", Kind: "invalid_filter_kind",
Value: "", Pattern: "",
}, },
}, },
}, },
@ -151,10 +151,10 @@ func TestRepPolicyAPIPost(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -182,10 +182,10 @@ func TestRepPolicyAPIPost(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -213,10 +213,10 @@ func TestRepPolicyAPIPost(t *testing.T) {
ID: 10000, ID: 10000,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -244,10 +244,10 @@ func TestRepPolicyAPIPost(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -374,10 +374,10 @@ func TestRepPolicyAPIPut(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -405,10 +405,10 @@ func TestRepPolicyAPIPut(t *testing.T) {
ID: targetID, ID: targetID,
}, },
}, },
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: replication.FilterItemKindRepository, Kind: replication.FilterItemKindRepository,
Value: "*", Pattern: "*",
}, },
}, },
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
@ -463,16 +463,15 @@ func TestConvertToRepPolicy(t *testing.T) {
ID: 1, ID: 1,
Name: "policy", Name: "policy",
Description: "description", Description: "description",
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: "filter_kind_01", Kind: "filter_kind_01",
Value: "*", Pattern: "*",
}, },
}, },
ReplicateDeletion: true, ReplicateDeletion: true,
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
Kind: "trigger_kind_01", Kind: "trigger_kind_01",
Param: "{param}",
}, },
Projects: []*models.Project{ Projects: []*models.Project{
&models.Project{ &models.Project{
@ -490,16 +489,15 @@ func TestConvertToRepPolicy(t *testing.T) {
ID: 1, ID: 1,
Name: "policy", Name: "policy",
Description: "description", Description: "description",
Filters: []rep_models.FilterItem{ Filters: []rep_models.Filter{
rep_models.FilterItem{ rep_models.Filter{
Kind: "filter_kind_01", Kind: "filter_kind_01",
Value: "*", Pattern: "*",
}, },
}, },
ReplicateDeletion: true, ReplicateDeletion: true,
Trigger: &rep_models.Trigger{ Trigger: &rep_models.Trigger{
Kind: "trigger_kind_01", Kind: "trigger_kind_01",
Param: "{param}",
}, },
ProjectIDs: []int64{1}, ProjectIDs: []int64{1},
Namespaces: []string{"library"}, Namespaces: []string{"library"},

View File

@ -231,23 +231,6 @@ func (t *TargetAPI) Put() {
t.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound)) t.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
} }
policies, err := dao.GetRepPolicyByTarget(id)
if err != nil {
log.Errorf("failed to get policies according target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
hasEnabledPolicy := false
for _, policy := range policies {
if policy.Enabled == 1 {
hasEnabledPolicy = true
break
}
}
if hasEnabledPolicy {
t.CustomAbort(http.StatusBadRequest, "the target is associated with policy which is enabled")
}
if len(target.Password) != 0 { if len(target.Password) != 0 {
target.Password, err = utils.ReversibleDecrypt(target.Password, t.secretKey) target.Password, err = utils.ReversibleDecrypt(target.Password, t.secretKey)
if err != nil { if err != nil {

View File

@ -15,10 +15,7 @@
package api package api
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"sort" "sort"
"strings" "strings"
@ -77,76 +74,6 @@ func checkUserExists(name string) int {
return 0 return 0
} }
// TriggerReplication triggers the replication according to the policy
// TODO remove
func TriggerReplication(policyID int64, repository string,
tags []string, operation string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
Operation string `json:"operation"`
TagList []string `json:"tags"`
}{
PolicyID: policyID,
Repo: repository,
TagList: tags,
Operation: operation,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationURL()
return uiutils.RequestAsUI("POST", url, bytes.NewBuffer(b), uiutils.NewStatusRespHandler(http.StatusOK))
}
// TODO remove
func postReplicationAction(policyID int64, acton string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}{
PolicyID: policyID,
Action: acton,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationActionURL()
req, err := http.NewRequest("POST", url, bytes.NewBuffer(b))
if err != nil {
return err
}
uiutils.AddUISecret(req)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}
// SyncRegistry syncs the repositories of registry with database. // SyncRegistry syncs the repositories of registry with database.
func SyncRegistry(pm promgr.ProjectManager) error { func SyncRegistry(pm promgr.ProjectManager) error {

View File

@ -23,7 +23,6 @@ import (
"strings" "strings"
"github.com/vmware/harbor/src/adminserver/client" "github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/common" "github.com/vmware/harbor/src/common"
comcfg "github.com/vmware/harbor/src/common/config" comcfg "github.com/vmware/harbor/src/common/config"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
@ -73,8 +72,10 @@ func Init() error {
// InitByURL Init configurations with given url // InitByURL Init configurations with given url
func InitByURL(adminServerURL string) error { func InitByURL(adminServerURL string) error {
log.Infof("initializing client for adminserver %s ...", adminServerURL) log.Infof("initializing client for adminserver %s ...", adminServerURL)
authorizer := auth.NewSecretAuthorizer(secretCookieName, UISecret()) cfg := &client.Config{
AdminserverClient = client.NewClient(adminServerURL, authorizer) Secret: UISecret(),
}
AdminserverClient = client.NewClient(adminServerURL, cfg)
if err := AdminserverClient.Ping(); err != nil { if err := AdminserverClient.Ping(); err != nil {
return fmt.Errorf("failed to ping adminserver: %v", err) return fmt.Errorf("failed to ping adminserver: %v", err)
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/replication/core"
_ "github.com/vmware/harbor/src/replication/event" _ "github.com/vmware/harbor/src/replication/event"
"github.com/vmware/harbor/src/ui/api" "github.com/vmware/harbor/src/ui/api"
_ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/db"
@ -131,6 +132,10 @@ func main() {
notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)}) notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)})
} }
if err := core.GlobalController.Init(); err != nil {
log.Errorf("failed to initialize the replication controller: %v", err)
}
filter.Init() filter.Init()
beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter) beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter)
beego.InsertFilter("/api/*", beego.BeforeRouter, filter.MediaTypeFilter("application/json")) beego.InsertFilter("/api/*", beego.BeforeRouter, filter.MediaTypeFilter("application/json"))