Merge pull request #6956 from ywk253100/190130_transfer_repo

Implement the repository transfer
This commit is contained in:
Wenkai Yin 2019-02-27 12:32:45 +08:00 committed by GitHub
commit 5b2846ef05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 504 additions and 23 deletions

View File

@ -13,6 +13,8 @@ const (
ImageReplicate = "IMAGE_REPLICATE" ImageReplicate = "IMAGE_REPLICATE"
// ImageGC the name of image garbage collection job in job service // ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC" ImageGC = "IMAGE_GC"
// ImageReplication : the name of image replication job in job service
ImageReplication = "IMAGE_REPLICATION"
// JobKindGeneric : Kind of generic job // JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic" JobKindGeneric = "Generic"

View File

@ -19,8 +19,13 @@ import (
"fmt" "fmt"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/transfer" "github.com/goharbor/harbor/src/replication/ng/transfer"
// import chart transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/chart"
// import repository transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/repository"
) )
// Replication implements the job interface // Replication implements the job interface
@ -44,22 +49,30 @@ func (r *Replication) Validate(params map[string]interface{}) error {
// Run gets the corresponding transfer according to the resource type // Run gets the corresponding transfer according to the resource type
// and calls its function to do the real work // and calls its function to do the real work
func (r *Replication) Run(ctx env.JobContext, params map[string]interface{}) error { func (r *Replication) Run(ctx env.JobContext, params map[string]interface{}) error {
logger := ctx.GetLogger()
src, dst, err := parseParams(params) src, dst, err := parseParams(params)
if err != nil { if err != nil {
logger.Errorf("failed to parse parameters: %v", err)
return err return err
} }
factory, err := transfer.GetFactory(src.Type) factory, err := transfer.GetFactory(src.Type)
if err != nil { if err != nil {
logger.Errorf("failed to get transfer factory: %v", err)
return err return err
} }
cancelFunc := func() bool { stopFunc := func() bool {
_, exist := ctx.OPCommand() cmd, exist := ctx.OPCommand()
return exist if !exist {
return false
}
return cmd == opm.CtlCommandStop
} }
transfer, err := factory(ctx.GetLogger(), cancelFunc) transfer, err := factory(ctx.GetLogger(), stopFunc)
if err != nil { if err != nil {
logger.Errorf("failed to create transfer: %v", err)
return err return err
} }

View File

@ -76,7 +76,7 @@ func TestValidate(t *testing.T) {
var transferred = false var transferred = false
var fakedTransferFactory = func(transfer.Logger, transfer.CancelFunc) (transfer.Transfer, error) { var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Transfer, error) {
return &fakedTransfer{}, nil return &fakedTransfer{}, nil
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc" "github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng"
"github.com/goharbor/harbor/src/jobservice/job/impl/scan" "github.com/goharbor/harbor/src/jobservice/job/impl/scan"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/models"
@ -205,12 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
} }
if err := redisWorkerPool.RegisterJobs( if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{ map[string]interface{}{
job.ImageScanJob: (*scan.ClairJob)(nil), job.ImageScanJob: (*scan.ClairJob)(nil),
job.ImageScanAllJob: (*scan.All)(nil), job.ImageScanAllJob: (*scan.All)(nil),
job.ImageTransfer: (*replication.Transfer)(nil), job.ImageTransfer: (*replication.Transfer)(nil),
job.ImageDelete: (*replication.Deleter)(nil), job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil), job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil), job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageReplication: (*ng.Replication)(nil),
}); err != nil { }); err != nil {
// exit // exit
return nil, err return nil, err

View File

@ -0,0 +1,116 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"io"
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/http/modifier"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
pkg_registry "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// const definition
const (
// TODO: add filter for the agent in registry webhook handler
UserAgentReplicator = "harbor-replicator"
)
// Registry defines an the interface for registry service
type Registry interface {
ManifestExist(repository, reference string) (exist bool, digest string, err error)
PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error)
PushManifest(repository, reference, mediaType string, payload []byte) error
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
}
// NewRegistry returns an instance of the default registry implementation
// TODO: passing the tokenServiceURL
func NewRegistry(reg *model.Registry, repository string,
tokenServiceURL ...string) (Registry, error) {
// use the same HTTP connection pool for all clients
transport := pkg_registry.GetHTTPTransport(reg.Insecure)
modifiers := []modifier.Modifier{
&auth.UserAgentModifier{
UserAgent: UserAgentReplicator,
},
}
if reg.Credential != nil {
cred := auth.NewBasicAuthCredential(
reg.Credential.AccessKey,
reg.Credential.AccessSecret)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, cred, tokenServiceURL...)
modifiers = append(modifiers, authorizer)
}
client, err := pkg_registry.NewRepository(repository, reg.URL,
&http.Client{
Transport: pkg_registry.NewTransport(transport, modifiers...),
})
if err != nil {
return nil, err
}
return &registry{
client: client,
}, nil
}
type registry struct {
client *pkg_registry.Repository
}
func (r *registry) ManifestExist(repository, reference string) (bool, string, error) {
digest, exist, err := r.client.ManifestExist(reference)
return exist, digest, err
}
func (r *registry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
digest, mediaType, payload, err := r.client.PullManifest(reference, accepttedMediaTypes)
if err != nil {
return nil, "", err
}
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := pkg_registry.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return manifest, digest, nil
}
func (r *registry) PushManifest(repository, reference, mediaType string, payload []byte) error {
_, err := r.client.PushManifest(reference, mediaType, payload)
return err
}
func (r *registry) BlobExist(repository, digest string) (bool, error) {
return r.client.BlobExist(digest)
}
func (r *registry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return r.client.PullBlob(digest)
}
func (r *registry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return r.client.PushBlob(digest, size, blob)
}

View File

@ -15,29 +15,247 @@
package repository package repository
import ( import (
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
trans "github.com/goharbor/harbor/src/replication/ng/transfer" trans "github.com/goharbor/harbor/src/replication/ng/transfer"
) )
var (
jobStoppedErr = errs.JobStoppedError()
)
func init() { func init() {
if err := trans.RegisterFactory(model.ResourceTypeRepository, factory); err != nil { if err := trans.RegisterFactory(model.ResourceTypeRepository, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err) log.Errorf("failed to register transfer factory: %v", err)
} }
} }
func factory(logger trans.Logger, cancelFunc trans.CancelFunc) (trans.Transfer, error) { type repository struct {
repository string
tags []string
}
func factory(logger trans.Logger, stopFunc trans.StopFunc) (trans.Transfer, error) {
return &transfer{ return &transfer{
logger: logger, logger: logger,
isCanceled: cancelFunc, isStopped: stopFunc,
}, nil }, nil
} }
type transfer struct { type transfer struct {
logger trans.Logger logger trans.Logger
isCanceled trans.CancelFunc isStopped trans.StopFunc
src Registry
dst Registry
} }
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
}
// delete the repository on destination registry
if dst.Deleted {
return t.delete(&repository{
repository: dst.Metadata.Name,
tags: dst.Metadata.Vtags,
})
}
srcRepo := &repository{
repository: src.Metadata.Name,
tags: src.Metadata.Vtags,
}
dstRepo := &repository{
repository: dst.Metadata.Name,
tags: dst.Metadata.Vtags,
}
// copy the repository from source registry to the destination
return t.copy(srcRepo, dstRepo, dst.Override)
}
func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
if t.shouldStop() {
return jobStoppedErr
}
// create client for source registry
srcReg, err := NewRegistry(src.Registry, src.Metadata.Name)
if err != nil {
t.logger.Errorf("failed to create client for source registry: %v", err)
return err
}
t.src = srcReg
t.logger.Infof("client for source registry [type: %s, URL: %s, insecure: %v] created",
src.Registry.Type, src.Registry.URL, src.Registry.Insecure)
// create client for destination registry
dstReg, err := NewRegistry(dst.Registry, dst.Metadata.Name)
if err != nil {
t.logger.Errorf("failed to create client for destination registry: %v", err)
return err
}
t.dst = dstReg
t.logger.Infof("client for destination registry [type: %s, URL: %s, insecure: %v] created",
dst.Registry.Type, dst.Registry.URL, dst.Registry.Insecure)
return nil
}
func (t *transfer) shouldStop() bool {
isStopped := t.isStopped()
if isStopped {
t.logger.Info("the job is stopped")
}
return isStopped
}
func (t *transfer) copy(src *repository, dst *repository, override bool) error {
srcRepo := src.repository
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
for i := range src.tags {
srcTag := src.tags[i]
dstTag := dst.tags[i]
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcTag, dstRepo, dstTag)
// pull the manifest from the source registry
manifest, digest, err := t.pullManifest(srcRepo, srcTag)
if err != nil {
return err
}
// check the existence of the image on the destination registry
exist, digest2, err := t.exist(dstRepo, dstTag)
if err != nil {
return err
}
if exist {
// the same image already exists
if digest == digest2 {
t.logger.Infof("the image %s:%s already exists on the destination registry, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but not allowed to override
if !override {
t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dstRepo, dstTag)
continue
}
// the same name image exists, but allowed to override
t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
dstRepo, dstTag)
}
// copy blobs between the source and destination registries
if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil {
return err
}
// push the manifest to the destination registry
if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil {
return err
}
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
srcRepo, srcTag, dstRepo, dstTag)
}
t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
return nil
}
func (t *transfer) pullManifest(repository, tag string) (
distribution.Manifest, string, error) {
if t.shouldStop() {
return nil, "", jobStoppedErr
}
t.logger.Infof("pulling the manifest of image %s:%s ...", repository, tag)
manifest, digest, err := t.src.PullManifest(repository, tag, []string{
schema1.MediaTypeManifest,
schema2.MediaTypeManifest,
})
if err != nil {
t.logger.Errorf("failed to pull the manifest of image %s:%s: %v", repository, tag, err)
return nil, "", err
}
t.logger.Infof("the manifest of image %s:%s pulled", repository, tag)
return manifest, digest, nil
}
func (t *transfer) exist(repository, tag string) (bool, string, error) {
exist, digest, err := t.dst.ManifestExist(repository, tag)
if err != nil {
t.logger.Errorf("failed to check the existence of the manifest of iage %s:%s in the destination registry: %v",
repository, tag, err)
return false, "", err
}
return exist, digest, nil
}
func (t *transfer) copyBlobs(blobs []distribution.Descriptor, srcRepo, dstRepo string) error {
for _, blob := range blobs {
if t.shouldStop() {
return jobStoppedErr
}
digest := blob.Digest.String()
t.logger.Infof("copying the blob %s...", digest)
exist, err := t.dst.BlobExist(dstRepo, digest)
if err != nil {
t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err)
return err
}
if exist {
t.logger.Infof("the blob %s already exists on the destination registry, skip", digest)
continue
}
size, data, err := t.src.PullBlob(srcRepo, digest)
if err != nil {
t.logger.Errorf("failed to pulling the blob %s: %v", digest, err)
return err
}
defer data.Close()
if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil {
t.logger.Errorf("failed to pushing the blob %s: %v", digest, err)
return err
}
t.logger.Infof("copy the blob %s completed", digest)
}
return nil
}
func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag string) error {
if t.shouldStop() {
return jobStoppedErr
}
t.logger.Infof("pushing the manifest of image %s:%s ...", repository, tag)
mediaType, payload, err := manifest.Payload()
if err != nil {
t.logger.Errorf("failed to push manifest of image %s:%s: %v",
repository, tag, err)
return err
}
if err := t.dst.PushManifest(repository, tag, mediaType, payload); err != nil {
t.logger.Errorf("failed to push manifest of image %s:%s: %v",
repository, tag, err)
return err
}
t.logger.Infof("the manifest of image %s:%s pushed",
repository, tag)
return nil
}
func (t *transfer) delete(repo *repository) error {
// TODO
return nil return nil
} }

View File

@ -0,0 +1,130 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"bytes"
"io"
"io/ioutil"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log"
pkg_registry "github.com/goharbor/harbor/src/common/utils/registry"
trans "github.com/goharbor/harbor/src/replication/ng/transfer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakeRregistry struct{}
func (f *fakeRregistry) ManifestExist(repository, reference string) (bool, string, error) {
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
manifest := `{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 7023,
"digest": "sha256:b5b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 32654,
"digest": "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 16724,
"digest": "sha256:3c3a4604a545cdc127456d94e421cd355bca5b528f4a9c1905b15da2eb4a4c6b"
},
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 73109,
"digest": "sha256:ec4b8955958665577945c89419d1af06b5f7636b4ac3da7f12184802ad867736"
}
]
}`
mediaType := schema2.MediaTypeManifest
payload := []byte(manifest)
mani, _, err := pkg_registry.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return mani, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakeRregistry) BlobExist(repository, digest string) (bool, error) {
return false, nil
}
func (f *fakeRregistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
r := ioutil.NopCloser(bytes.NewReader([]byte{'a'}))
return 1, r, nil
}
func (f *fakeRregistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func TestFactory(t *testing.T) {
tr, err := factory(nil, nil)
require.Nil(t, err)
_, ok := tr.(trans.Transfer)
assert.True(t, ok)
}
func TestShouldStop(t *testing.T) {
// should stop
stopFunc := func() bool { return true }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
}
assert.True(t, tr.shouldStop())
// should not stop
stopFunc = func() bool { return false }
tr = &transfer{
isStopped: stopFunc,
}
assert.False(t, tr.shouldStop())
}
func TestCopy(t *testing.T) {
stopFunc := func() bool { return false }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
src: &fakeRregistry{},
dst: &fakeRregistry{},
}
src := &repository{
repository: "source",
tags: []string{"a1", "a2"},
}
dst := &repository{
repository: "destination",
tags: []string{"b1", "b2"},
}
override := true
err := tr.copy(src, dst, override)
require.Nil(t, err)
}

View File

@ -26,10 +26,10 @@ var (
) )
// Factory creates a specific Transfer. The "Logger" is used // Factory creates a specific Transfer. The "Logger" is used
// to log the processing messages and the "CancelFunc" // to log the processing messages and the "StopFunc"
// can be used to check whether the task has been cancelled // can be used to check whether the task has been stopped
// during the processing progress // during the processing progress
type Factory func(Logger, CancelFunc) (Transfer, error) type Factory func(Logger, StopFunc) (Transfer, error)
// Transfer defines an interface used to transfer the source // Transfer defines an interface used to transfer the source
// resource to the destination // resource to the destination
@ -57,9 +57,9 @@ type Logger interface {
Errorf(format string, v ...interface{}) Errorf(format string, v ...interface{})
} }
// CancelFunc is a function used to check whether the transfer // StopFunc is a function used to check whether the transfer
// process is cancelled // process is stopped
type CancelFunc func() bool type StopFunc func() bool
// RegisterFactory registers one transfer factory to the registry // RegisterFactory registers one transfer factory to the registry
func RegisterFactory(name model.ResourceType, factory Factory) error { func RegisterFactory(name model.ResourceType, factory Factory) error {

View File

@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var fakedFactory Factory = func(Logger, CancelFunc) (Transfer, error) { var fakedFactory Factory = func(Logger, StopFunc) (Transfer, error) {
return nil, nil return nil, nil
} }