Refine the replication job

1. Refine the replication job
2. Define the Transfer interface
3. Define the common models

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-01-22 17:02:45 +08:00
parent 1987d7cb8f
commit 759cb5467b
11 changed files with 618 additions and 0 deletions

View File

@ -0,0 +1,93 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ng
import (
"encoding/json"
"fmt"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/transfer"
)
// Replication implements the job interface
type Replication struct{}
// MaxFails returns that how many times this job can fail
func (r *Replication) MaxFails() uint {
return 3
}
// ShouldRetry always returns true which means the job is needed to be restarted when fails
func (r *Replication) ShouldRetry() bool {
return true
}
// Validate does nothing
func (r *Replication) Validate(params map[string]interface{}) error {
return nil
}
// Run gets the corresponding transfer according to the resource type
// and calls its function to do the real work
func (r *Replication) Run(ctx env.JobContext, params map[string]interface{}) error {
src, dst, err := parseParams(params)
if err != nil {
return err
}
factory, err := transfer.GetFactory(src.Type)
if err != nil {
return err
}
cancelFunc := func() bool {
_, exist := ctx.OPCommand()
return exist
}
transfer, err := factory(ctx.GetLogger(), cancelFunc)
if err != nil {
return err
}
return transfer.Transfer(src, dst)
}
func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, error) {
src := &model.Resource{}
if err := parseParam(params, "src_resource", src); err != nil {
return nil, nil, err
}
dst := &model.Resource{}
if err := parseParam(params, "dst_resource", dst); err != nil {
return nil, nil, err
}
return src, dst, nil
}
func parseParam(params map[string]interface{}, name string, v interface{}) error {
value, exist := params[name]
if !exist {
return fmt.Errorf("param %s not found", name)
}
str, ok := value.(string)
if !ok {
return fmt.Errorf("the value of %s isn't string", name)
}
return json.Unmarshal([]byte(str), v)
}

View File

@ -0,0 +1,100 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ng
import (
"testing"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/transfer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseParam(t *testing.T) {
params := map[string]interface{}{}
// not exist param
err := parseParam(params, "not_exist_param", nil)
assert.NotNil(t, err)
// the param is not string
params["num"] = 1
err = parseParam(params, "num", nil)
assert.NotNil(t, err)
// not a valid json struct
type person struct {
Name string
}
params["person"] = `"name": "tom"`
p := &person{}
err = parseParam(params, "person", p)
assert.NotNil(t, err)
// pass
params["person"] = `{"name": "tom"}`
err = parseParam(params, "person", p)
assert.Nil(t, err)
assert.Equal(t, "tom", p.Name)
}
func TestParseParams(t *testing.T) {
params := map[string]interface{}{
"src_resource": `{"type":"chart"}`,
"dst_resource": `{"type":"chart"}`,
}
res, dst, err := parseParams(params)
require.Nil(t, err)
assert.Equal(t, "chart", string(res.Type))
assert.Equal(t, "chart", string(dst.Type))
}
func TestMaxFails(t *testing.T) {
rep := &Replication{}
assert.Equal(t, uint(3), rep.MaxFails())
}
func TestShouldRetry(t *testing.T) {
rep := &Replication{}
assert.True(t, rep.ShouldRetry())
}
func TestValidate(t *testing.T) {
rep := &Replication{}
assert.Nil(t, rep.Validate(nil))
}
var transferred = false
var fakedTransferFactory = func(transfer.Logger, transfer.CancelFunc) (transfer.Transfer, error) {
return &fakedTransfer{}, nil
}
type fakedTransfer struct{}
func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource) error {
transferred = true
return nil
}
func TestRun(t *testing.T) {
err := transfer.RegisterFactory("res", fakedTransferFactory)
require.Nil(t, err)
params := map[string]interface{}{
"src_resource": `{"type":"res"}`,
"dst_resource": `{}`,
}
rep := &Replication{}
require.Nil(t, rep.Run(&impl.Context{}, params))
assert.True(t, transferred)
}

View File

@ -0,0 +1,43 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import "time"
// Execution defines an execution of the replication
type Execution struct {
ID int64 `json:"id"`
PolicyID int64 `json:"policy_id"`
Total int `json:"total"`
Failed int `json:"failed"`
Succeed int `json:"succeed"`
Pending int `json:"pending"`
InProgress int `json:"in_progress"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
}
// Task holds the information of one replication task
type Task struct {
ID int64 `json:"id"`
ExecutionID int64 `json:"execution_id"`
ResourceType ResourceType `json:"resource_type"`
SrcResource string `json:"src_resource"`
DstResource string `json:"dst_resource"`
JobID string `json:"job_id"`
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
}

View File

@ -0,0 +1,23 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// Namespace represents the full path of resource isolation unit;
// if the namespace has hierarchical structure, e.g organization->team,
// it should be converted to organization.team
type Namespace struct {
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
}

View File

@ -0,0 +1,70 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import "time"
// Policy defines the structure of a replication policy
type Policy struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Creator string `json:"creator"`
// source
SrcRegistryID int64 `json:"src_registry_id"`
SrcNamespaces []string `json:"src_namespaces"`
// destination
DestRegistryID int64 `json:"dest_registry_id"`
// Only support two dest namespace modes:
// Put all the src resources to the one single dest namespace
// or keep namespaces same with the source ones (under this case,
// the DestNamespace should be set to empty)
DestNamespace string `json:"dest_namespace"`
// Filters
Filters []*Filter `json:"filters"`
// Trigger
Trigger *Trigger `json:"trigger"`
// Settings
Deletion bool `json:"deletion"`
// If override the image tag
Override bool `json:"override"`
// Operations
Enabled bool `json:"enabled"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
}
// FilterType represents the type info of the filter.
type FilterType string
// Filter holds the info of the filter
type Filter struct {
Type FilterType `json:"type"`
Value interface{} `json:"value"`
}
// TriggerType represents the type of trigger.
type TriggerType string
// Trigger holds info fot a trigger
type Trigger struct {
Type TriggerType `json:"type"`
Settings *TriggerSettings `json:"trigger_settings"`
}
// TriggerSettings is the setting about the trigger
type TriggerSettings struct {
Cron string `json:"cron"`
}

View File

@ -0,0 +1,46 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// RegistryType indicates the type of registry
type RegistryType string
// CredentialType represents the supported credential types
// e.g: u/p, OAuth token
type CredentialType string
// Credential keeps the access key and/or secret for the related registry
type Credential struct {
// Type of the credential
Type CredentialType `json:"type"`
// The key of the access account, for OAuth token, it can be empty
AccessKey string `json:"access_key"`
// The secret or password for the key
AccessSecret string `json:"access_secret"`
}
// Registry keeps the related info of registry
// Data required for the secure access way is not contained here.
// DAO layer is not considered here
type Registry struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type RegistryType `json:"type"`
URL string `json:"url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`
}

View File

@ -0,0 +1,43 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
// the resource type
const (
ResourceTypeRepository ResourceType = "repository"
ResourceTypeChart ResourceType = "chart"
)
// ResourceType represents the type of the resource
type ResourceType string
// ResourceMetadata of resource
type ResourceMetadata struct {
Namespace *Namespace `json:"namespace"`
Name string `json:"name"`
Vtags []string `json:"v_tags"`
Labels []string `json:"labels"`
}
// Resource represents the general replicating content
type Resource struct {
Type ResourceType `json:"type"`
URI string `json:"uri"`
Metadata *ResourceMetadata `json:"metadata"`
Registry *Registry `json:"registry"`
ExtendedInfo map[string]interface{} `json:"extended_info"`
// Indicate if the resource is a deleted resource
Deleted bool `json:"deleted"`
}

View File

@ -0,0 +1,15 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chart

View File

@ -0,0 +1,43 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
trans "github.com/goharbor/harbor/src/replication/ng/transfer"
)
func init() {
if err := trans.RegisterFactory(model.ResourceTypeRepository, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err)
}
}
func factory(logger trans.Logger, cancelFunc trans.CancelFunc) (trans.Transfer, error) {
return &transfer{
logger: logger,
isCanceled: cancelFunc,
}, nil
}
type transfer struct {
logger trans.Logger
isCanceled trans.CancelFunc
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
return nil
}

View File

@ -0,0 +1,88 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"errors"
"fmt"
"github.com/goharbor/harbor/src/replication/ng/model"
)
var (
registry = map[model.ResourceType]Factory{}
)
// Factory creates a specific Transfer. The "Logger" is used
// to log the processing messages and the "CancelFunc"
// can be used to check whether the task has been cancelled
// during the processing progress
type Factory func(Logger, CancelFunc) (Transfer, error)
// Transfer defines an interface used to transfer the source
// resource to the destination
type Transfer interface {
Transfer(src *model.Resource, dst *model.Resource) error
}
// Logger defines an interface for logging
type Logger interface {
// For debuging
Debug(v ...interface{})
// For debuging with format
Debugf(format string, v ...interface{})
// For logging info
Info(v ...interface{})
// For logging info with format
Infof(format string, v ...interface{})
// For warning
Warning(v ...interface{})
// For warning with format
Warningf(format string, v ...interface{})
// For logging error
Error(v ...interface{})
// For logging error with format
Errorf(format string, v ...interface{})
}
// CancelFunc is a function used to check whether the transfer
// process is cancelled
type CancelFunc func() bool
// RegisterFactory registers one transfer factory to the registry
func RegisterFactory(name model.ResourceType, factory Factory) error {
if len(name) == 0 {
return errors.New("empty transfer name")
}
if factory == nil {
return errors.New("empty transfer factory")
}
if _, exist := registry[name]; exist {
return fmt.Errorf("transfer factory for %s already exists", name)
}
registry[name] = factory
return nil
}
// GetFactory gets the transfer factory by the specified name
func GetFactory(name model.ResourceType) (Factory, error) {
factory, exist := registry[name]
if !exist {
return nil, fmt.Errorf("transfer factory for %s not found", name)
}
return factory, nil
}

View File

@ -0,0 +1,54 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var fakedFactory Factory = func(Logger, CancelFunc) (Transfer, error) {
return nil, nil
}
func TestRegisterFactory(t *testing.T) {
// empty name
err := RegisterFactory("", fakedFactory)
require.NotNil(t, err)
// nil factory
err = RegisterFactory("faked_factory", nil)
require.NotNil(t, err)
// pass
err = RegisterFactory("faked_factory", fakedFactory)
require.Nil(t, err)
// already exist
err = RegisterFactory("faked_factory", fakedFactory)
require.NotNil(t, err)
}
func TestGetFactory(t *testing.T) {
registry = map[model.ResourceType]Factory{}
err := RegisterFactory("faked_factory", fakedFactory)
require.Nil(t, err)
// try to get the factory that doesn't exist
_, err = GetFactory("not_exist_factory")
assert.NotNil(t, err)
// pass
_, err = GetFactory("faked_factory")
require.Nil(t, err)
}