Merge branch 'replication_ng' into replication_ng

This commit is contained in:
Wenkai Yin 2019-03-05 15:20:21 +08:00 committed by GitHub
commit 937ee46ae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 868 additions and 25 deletions

View File

@ -13,6 +13,8 @@ const (
ImageReplicate = "IMAGE_REPLICATE"
// ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC"
// ImageReplication : the name of image replication job in job service
ImageReplication = "IMAGE_REPLICATION"
// JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"

View File

@ -19,8 +19,13 @@ import (
"fmt"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/transfer"
// import chart transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/chart"
// import repository transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/repository"
)
// Replication implements the job interface
@ -44,22 +49,30 @@ func (r *Replication) Validate(params map[string]interface{}) error {
// Run gets the corresponding transfer according to the resource type
// and calls its function to do the real work
func (r *Replication) Run(ctx env.JobContext, params map[string]interface{}) error {
logger := ctx.GetLogger()
src, dst, err := parseParams(params)
if err != nil {
logger.Errorf("failed to parse parameters: %v", err)
return err
}
factory, err := transfer.GetFactory(src.Type)
if err != nil {
logger.Errorf("failed to get transfer factory: %v", err)
return err
}
cancelFunc := func() bool {
_, exist := ctx.OPCommand()
return exist
stopFunc := func() bool {
cmd, exist := ctx.OPCommand()
if !exist {
return false
}
return cmd == opm.CtlCommandStop
}
transfer, err := factory(ctx.GetLogger(), cancelFunc)
transfer, err := factory(ctx.GetLogger(), stopFunc)
if err != nil {
logger.Errorf("failed to create transfer: %v", err)
return err
}

View File

@ -76,7 +76,7 @@ func TestValidate(t *testing.T) {
var transferred = false
var fakedTransferFactory = func(transfer.Logger, transfer.CancelFunc) (transfer.Transfer, error) {
var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Transfer, error) {
return &fakedTransfer{}, nil
}

View File

@ -32,6 +32,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng"
"github.com/goharbor/harbor/src/jobservice/job/impl/scan"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
@ -205,12 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
}
if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{
job.ImageScanJob: (*scan.ClairJob)(nil),
job.ImageScanAllJob: (*scan.All)(nil),
job.ImageTransfer: (*replication.Transfer)(nil),
job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageScanJob: (*scan.ClairJob)(nil),
job.ImageScanAllJob: (*scan.All)(nil),
job.ImageTransfer: (*replication.Transfer)(nil),
job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageReplication: (*ng.Replication)(nil),
}); err != nil {
// exit
return nil, err

View File

@ -0,0 +1,27 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"os"
"testing"
"github.com/goharbor/harbor/src/common/dao"
)
func TestMain(m *testing.M) {
dao.PrepareTestForPostgresSQL()
os.Exit(m.Run())
}

View File

@ -0,0 +1,27 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
)
// TODO remove the file
// CreateProject ...
func CreateProject(project *models.Project) (int64, error) {
return dao.GetOrmer().Insert(project)
}

View File

@ -0,0 +1,33 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"testing"
"github.com/goharbor/harbor/src/common/models"
"github.com/stretchr/testify/require"
)
// TODO remove the file
func TestCreateProject(t *testing.T) {
project := &models.Project{
Name: "example-project",
OwnerID: 1,
}
_, err := CreateProject(project)
require.Nil(t, err)
}

View File

@ -41,7 +41,7 @@ type Manager interface {
GetTask(int64) (*model.Task, error)
// Update the task, the "props" are the properties of task
// that need to be updated, it cannot include "status". If
// you want to update the status, use "UpdateTask" instead
// you want to update the status, use "UpdateTaskStatus" instead
UpdateTask(task *model.Task, props ...string) error
// UpdateTaskStatus only updates the task status. If "statusCondition"
// presents, only the tasks whose status equal to "statusCondition"

View File

@ -55,7 +55,7 @@ type defaultController struct {
scheduler scheduler.Scheduler
}
// Replicate according the to policy ID
// Start a replication according to the policy
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
log.Infof("starting the replication based on the policy %d ...", policy.ID)

View File

@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
StartReplication(policy *model.Policy) (int64, error)
StopReplication(int64) error
ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error)
GetExecution(int64) (*model.Execution, error)
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
GetTaskLog(int64) ([]byte, error)
}
// NewController returns a controller implementation
func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller {
return &defaultController{
flowCtl: flowCtl,
executionMgr: executionMgr,
}
}
type defaultController struct {
flowCtl flow.Controller
executionMgr execution.Manager
}
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
return d.flowCtl.StartReplication(policy)
}
func (d *defaultController) StopReplication(executionID int64) error {
return d.flowCtl.StopReplication(executionID)
}
func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return d.executionMgr.List(query...)
}
func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) {
return d.executionMgr.Get(executionID)
}
func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) {
return d.executionMgr.ListTasks(query...)
}
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID)
}

View File

@ -0,0 +1,126 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedFlowController struct{}
func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) {
return 1, nil
}
func (f *fakedFlowController) StopReplication(int64) error {
return nil
}
type fakedExecutionManager struct{}
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return 1, []*model.Execution{
{
ID: 1,
},
}, nil
}
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
return &model.Execution{
ID: 1,
}, nil
}
func (f *fakedExecutionManager) Update(*model.Execution, ...string) error {
return nil
}
func (f *fakedExecutionManager) Remove(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
return 1, []*model.Task{
{
ID: 1,
},
}, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
return nil, nil
}
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
return nil
}
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return []byte("message"), nil
}
var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{})
func TestStartReplication(t *testing.T) {
id, err := ctl.StartReplication(nil)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
}
func TestStopReplication(t *testing.T) {
err := ctl.StopReplication(1)
require.Nil(t, err)
}
func TestListExecutions(t *testing.T) {
n, executions, err := ctl.ListExecutions()
require.Nil(t, err)
assert.Equal(t, int64(1), n)
assert.Equal(t, int64(1), executions[0].ID)
}
func TestGetExecution(t *testing.T) {
execution, err := ctl.GetExecution(1)
require.Nil(t, err)
assert.Equal(t, int64(1), execution.ID)
}
func TestListTasks(t *testing.T) {
n, tasks, err := ctl.ListTasks()
require.Nil(t, err)
assert.Equal(t, int64(1), n)
assert.Equal(t, int64(1), tasks[0].ID)
}
func TestGetTaskLog(t *testing.T) {
log, err := ctl.GetTaskLog(1)
require.Nil(t, err)
assert.Equal(t, "message", string(log))
}

View File

@ -0,0 +1,54 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package ng ...
// TODO rename the package name after removing ng
package ng
import (
"fmt"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
var (
// RegistryMgr is a global registry manager
RegistryMgr registry.Manager
// ExecutionMgr is a global execution manager
ExecutionMgr execution.Manager
// OperationCtl is a global operation controller
OperationCtl operation.Controller
)
// Init the global variables
func Init() error {
// TODO init RegistryMgr
// TODO init ExecutionMgr
// TODO init scheduler
var scheduler scheduler.Scheduler
flowCtl, err := flow.NewController(RegistryMgr, ExecutionMgr, scheduler)
if err != nil {
return fmt.Errorf("failed to create the flow controller: %v", err)
}
OperationCtl = operation.NewController(flowCtl, ExecutionMgr)
return nil
}

View File

@ -0,0 +1,31 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package ng ...
// TODO rename the package name after removing ng
package ng
import (
"testing"
// "github.com/stretchr/testify/assert"
// "github.com/stretchr/testify/require"
)
func TestInit(t *testing.T) {
// TODO add testing code
// err := Init()
// require.Nil(t, err)
// assert.NotNil(t, OperationCtl)
// TODO add check for RegistryMgr and ExecutionMgr
}

View File

@ -0,0 +1,116 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"io"
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/http/modifier"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
pkg_registry "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// const definition
const (
// TODO: add filter for the agent in registry webhook handler
UserAgentReplicator = "harbor-replicator"
)
// Registry defines an the interface for registry service
type Registry interface {
ManifestExist(repository, reference string) (exist bool, digest string, err error)
PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error)
PushManifest(repository, reference, mediaType string, payload []byte) error
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
}
// NewRegistry returns an instance of the default registry implementation
// TODO: passing the tokenServiceURL
func NewRegistry(reg *model.Registry, repository string,
tokenServiceURL ...string) (Registry, error) {
// use the same HTTP connection pool for all clients
transport := pkg_registry.GetHTTPTransport(reg.Insecure)
modifiers := []modifier.Modifier{
&auth.UserAgentModifier{
UserAgent: UserAgentReplicator,
},
}
if reg.Credential != nil {
cred := auth.NewBasicAuthCredential(
reg.Credential.AccessKey,
reg.Credential.AccessSecret)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, cred, tokenServiceURL...)
modifiers = append(modifiers, authorizer)
}
client, err := pkg_registry.NewRepository(repository, reg.URL,
&http.Client{
Transport: pkg_registry.NewTransport(transport, modifiers...),
})
if err != nil {
return nil, err
}
return &registry{
client: client,
}, nil
}
type registry struct {
client *pkg_registry.Repository
}
func (r *registry) ManifestExist(repository, reference string) (bool, string, error) {
digest, exist, err := r.client.ManifestExist(reference)
return exist, digest, err
}
func (r *registry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
digest, mediaType, payload, err := r.client.PullManifest(reference, accepttedMediaTypes)
if err != nil {
return nil, "", err
}
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := pkg_registry.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return manifest, digest, nil
}
func (r *registry) PushManifest(repository, reference, mediaType string, payload []byte) error {
_, err := r.client.PushManifest(reference, mediaType, payload)
return err
}
func (r *registry) BlobExist(repository, digest string) (bool, error) {
return r.client.BlobExist(digest)
}
func (r *registry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return r.client.PullBlob(digest)
}
func (r *registry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return r.client.PushBlob(digest, size, blob)
}

View File

@ -15,29 +15,247 @@
package repository
import (
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/replication/ng/model"
trans "github.com/goharbor/harbor/src/replication/ng/transfer"
)
var (
jobStoppedErr = errs.JobStoppedError()
)
func init() {
if err := trans.RegisterFactory(model.ResourceTypeRepository, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err)
}
}
func factory(logger trans.Logger, cancelFunc trans.CancelFunc) (trans.Transfer, error) {
type repository struct {
repository string
tags []string
}
func factory(logger trans.Logger, stopFunc trans.StopFunc) (trans.Transfer, error) {
return &transfer{
logger: logger,
isCanceled: cancelFunc,
logger: logger,
isStopped: stopFunc,
}, nil
}
type transfer struct {
logger trans.Logger
isCanceled trans.CancelFunc
logger trans.Logger
isStopped trans.StopFunc
src Registry
dst Registry
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
}
// delete the repository on destination registry
if dst.Deleted {
return t.delete(&repository{
repository: dst.Metadata.Name,
tags: dst.Metadata.Vtags,
})
}
srcRepo := &repository{
repository: src.Metadata.Name,
tags: src.Metadata.Vtags,
}
dstRepo := &repository{
repository: dst.Metadata.Name,
tags: dst.Metadata.Vtags,
}
// copy the repository from source registry to the destination
return t.copy(srcRepo, dstRepo, dst.Override)
}
func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
if t.shouldStop() {
return jobStoppedErr
}
// create client for source registry
srcReg, err := NewRegistry(src.Registry, src.Metadata.Name)
if err != nil {
t.logger.Errorf("failed to create client for source registry: %v", err)
return err
}
t.src = srcReg
t.logger.Infof("client for source registry [type: %s, URL: %s, insecure: %v] created",
src.Registry.Type, src.Registry.URL, src.Registry.Insecure)
// create client for destination registry
dstReg, err := NewRegistry(dst.Registry, dst.Metadata.Name)
if err != nil {
t.logger.Errorf("failed to create client for destination registry: %v", err)
return err
}
t.dst = dstReg
t.logger.Infof("client for destination registry [type: %s, URL: %s, insecure: %v] created",
dst.Registry.Type, dst.Registry.URL, dst.Registry.Insecure)
return nil
}
func (t *transfer) shouldStop() bool {
isStopped := t.isStopped()
if isStopped {
t.logger.Info("the job is stopped")
}
return isStopped
}
func (t *transfer) copy(src *repository, dst *repository, override bool) error {
srcRepo := src.repository
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
for i := range src.tags {
srcTag := src.tags[i]
dstTag := dst.tags[i]
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcTag, dstRepo, dstTag)
// pull the manifest from the source registry
manifest, digest, err := t.pullManifest(srcRepo, srcTag)
if err != nil {
return err
}
// check the existence of the image on the destination registry
exist, digest2, err := t.exist(dstRepo, dstTag)
if err != nil {
return err
}
if exist {
// the same image already exists
if digest == digest2 {
t.logger.Infof("the image %s:%s already exists on the destination registry, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but not allowed to override
if !override {
t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but allowed to override
t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
dstRepo, dstTag)
}
// copy blobs between the source and destination registries
if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil {
return err
}
// push the manifest to the destination registry
if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil {
return err
}
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
srcRepo, srcTag, dstRepo, dstTag)
}
t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
return nil
}
func (t *transfer) pullManifest(repository, tag string) (
distribution.Manifest, string, error) {
if t.shouldStop() {
return nil, "", jobStoppedErr
}
t.logger.Infof("pulling the manifest of image %s:%s ...", repository, tag)
manifest, digest, err := t.src.PullManifest(repository, tag, []string{
schema1.MediaTypeManifest,
schema2.MediaTypeManifest,
})
if err != nil {
t.logger.Errorf("failed to pull the manifest of image %s:%s: %v", repository, tag, err)
return nil, "", err
}
t.logger.Infof("the manifest of image %s:%s pulled", repository, tag)
return manifest, digest, nil
}
func (t *transfer) exist(repository, tag string) (bool, string, error) {
exist, digest, err := t.dst.ManifestExist(repository, tag)
if err != nil {
t.logger.Errorf("failed to check the existence of the manifest of iage %s:%s in the destination registry: %v",
repository, tag, err)
return false, "", err
}
return exist, digest, nil
}
func (t *transfer) copyBlobs(blobs []distribution.Descriptor, srcRepo, dstRepo string) error {
for _, blob := range blobs {
if t.shouldStop() {
return jobStoppedErr
}
digest := blob.Digest.String()
t.logger.Infof("copying the blob %s...", digest)
exist, err := t.dst.BlobExist(dstRepo, digest)
if err != nil {
t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err)
return err
}
if exist {
t.logger.Infof("the blob %s already exists on the destination registry, skip", digest)
continue
}
size, data, err := t.src.PullBlob(srcRepo, digest)
if err != nil {
t.logger.Errorf("failed to pulling the blob %s: %v", digest, err)
return err
}
defer data.Close()
if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil {
t.logger.Errorf("failed to pushing the blob %s: %v", digest, err)
return err
}
t.logger.Infof("copy the blob %s completed", digest)
}
return nil
}
func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag string) error {
if t.shouldStop() {
return jobStoppedErr
}
t.logger.Infof("pushing the manifest of image %s:%s ...", repository, tag)
mediaType, payload, err := manifest.Payload()
if err != nil {
t.logger.Errorf("failed to push manifest of image %s:%s: %v",
repository, tag, err)
return err
}
if err := t.dst.PushManifest(repository, tag, mediaType, payload); err != nil {
t.logger.Errorf("failed to push manifest of image %s:%s: %v",
repository, tag, err)
return err
}
t.logger.Infof("the manifest of image %s:%s pushed",
repository, tag)
return nil
}
func (t *transfer) delete(repo *repository) error {
// TODO
return nil
}

View File

@ -0,0 +1,130 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log"
pkg_registry "github.com/goharbor/harbor/src/common/utils/registry"
trans "github.com/goharbor/harbor/src/replication/ng/transfer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakeRregistry struct{}
func (f *fakeRregistry) ManifestExist(repository, reference string) (bool, string, error) {
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
manifest := `{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 7023,
"digest": "sha256:b5b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 32654,
"digest": "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 16724,
"digest": "sha256:3c3a4604a545cdc127456d94e421cd355bca5b528f4a9c1905b15da2eb4a4c6b"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 73109,
"digest": "sha256:ec4b8955958665577945c89419d1af06b5f7636b4ac3da7f12184802ad867736"
}
]
}`
mediaType := schema2.MediaTypeManifest
payload := []byte(manifest)
mani, _, err := pkg_registry.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return mani, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakeRregistry) BlobExist(repository, digest string) (bool, error) {
return false, nil
}
func (f *fakeRregistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
r := ioutil.NopCloser(bytes.NewReader([]byte{'a'}))
return 1, r, nil
}
func (f *fakeRregistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func TestFactory(t *testing.T) {
tr, err := factory(nil, nil)
require.Nil(t, err)
_, ok := tr.(trans.Transfer)
assert.True(t, ok)
}
func TestShouldStop(t *testing.T) {
// should stop
stopFunc := func() bool { return true }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
}
assert.True(t, tr.shouldStop())
// should not stop
stopFunc = func() bool { return false }
tr = &transfer{
isStopped: stopFunc,
}
assert.False(t, tr.shouldStop())
}
func TestCopy(t *testing.T) {
stopFunc := func() bool { return false }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
src: &fakeRregistry{},
dst: &fakeRregistry{},
}
src := &repository{
repository: "source",
tags: []string{"a1", "a2"},
}
dst := &repository{
repository: "destination",
tags: []string{"b1", "b2"},
}
override := true
err := tr.copy(src, dst, override)
require.Nil(t, err)
}

View File

@ -26,10 +26,10 @@ var (
)
// Factory creates a specific Transfer. The "Logger" is used
// to log the processing messages and the "CancelFunc"
// can be used to check whether the task has been cancelled
// to log the processing messages and the "StopFunc"
// can be used to check whether the task has been stopped
// during the processing progress
type Factory func(Logger, CancelFunc) (Transfer, error)
type Factory func(Logger, StopFunc) (Transfer, error)
// Transfer defines an interface used to transfer the source
// resource to the destination
@ -57,9 +57,9 @@ type Logger interface {
Errorf(format string, v ...interface{})
}
// CancelFunc is a function used to check whether the transfer
// process is cancelled
type CancelFunc func() bool
// StopFunc is a function used to check whether the transfer
// process is stopped
type StopFunc func() bool
// RegisterFactory registers one transfer factory to the registry
func RegisterFactory(name model.ResourceType, factory Factory) error {

View File

@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/require"
)
var fakedFactory Factory = func(Logger, CancelFunc) (Transfer, error) {
var fakedFactory Factory = func(Logger, StopFunc) (Transfer, error) {
return nil, nil
}