From f0f2e77fb4a8f01421386608d6517e3cacd17803 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 30 Jan 2019 15:15:38 +0800 Subject: [PATCH 1/3] Implement the repository transfer This commit implements the Transfer interface for resource repository Signed-off-by: Wenkai Yin --- src/common/job/const.go | 2 + .../job/impl/replication/ng/replication.go | 21 +- .../impl/replication/ng/replication_test.go | 2 +- src/jobservice/runtime/bootstrap.go | 14 +- .../ng/transfer/repository/registry.go | 116 +++++++++ .../ng/transfer/repository/transfer.go | 228 +++++++++++++++++- .../ng/transfer/repository/transfer_test.go | 130 ++++++++++ src/replication/ng/transfer/transfer.go | 12 +- src/replication/ng/transfer/transfer_test.go | 2 +- 9 files changed, 504 insertions(+), 23 deletions(-) create mode 100644 src/replication/ng/transfer/repository/registry.go create mode 100644 src/replication/ng/transfer/repository/transfer_test.go diff --git a/src/common/job/const.go b/src/common/job/const.go index d8fe2b59e..08876ec68 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -13,6 +13,8 @@ const ( ImageReplicate = "IMAGE_REPLICATE" // ImageGC the name of image garbage collection job in job service ImageGC = "IMAGE_GC" + // ImageReplication : the name of image replication job in job service + ImageReplication = "IMAGE_REPLICATION" // JobKindGeneric : Kind of generic job JobKindGeneric = "Generic" diff --git a/src/jobservice/job/impl/replication/ng/replication.go b/src/jobservice/job/impl/replication/ng/replication.go index be7cf588c..ec0ec5a08 100644 --- a/src/jobservice/job/impl/replication/ng/replication.go +++ b/src/jobservice/job/impl/replication/ng/replication.go @@ -19,8 +19,13 @@ import ( "fmt" "github.com/goharbor/harbor/src/jobservice/env" + "github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/transfer" + // import chart transfer + _ "github.com/goharbor/harbor/src/replication/ng/transfer/chart" + // import repository transfer + _ "github.com/goharbor/harbor/src/replication/ng/transfer/repository" ) // Replication implements the job interface @@ -44,22 +49,30 @@ func (r *Replication) Validate(params map[string]interface{}) error { // Run gets the corresponding transfer according to the resource type // and calls its function to do the real work func (r *Replication) Run(ctx env.JobContext, params map[string]interface{}) error { + logger := ctx.GetLogger() + src, dst, err := parseParams(params) if err != nil { + logger.Errorf("failed to parse parameters: %v", err) return err } factory, err := transfer.GetFactory(src.Type) if err != nil { + logger.Errorf("failed to get transfer factory: %v", err) return err } - cancelFunc := func() bool { - _, exist := ctx.OPCommand() - return exist + stopFunc := func() bool { + cmd, exist := ctx.OPCommand() + if !exist { + return false + } + return cmd == opm.CtlCommandStop } - transfer, err := factory(ctx.GetLogger(), cancelFunc) + transfer, err := factory(ctx.GetLogger(), stopFunc) if err != nil { + logger.Errorf("failed to create transfer: %v", err) return err } diff --git a/src/jobservice/job/impl/replication/ng/replication_test.go b/src/jobservice/job/impl/replication/ng/replication_test.go index 2dadeab5d..4fe74ab34 100644 --- a/src/jobservice/job/impl/replication/ng/replication_test.go +++ b/src/jobservice/job/impl/replication/ng/replication_test.go @@ -76,7 +76,7 @@ func TestValidate(t *testing.T) { var transferred = false -var fakedTransferFactory = func(transfer.Logger, transfer.CancelFunc) (transfer.Transfer, error) { +var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Transfer, error) { return &fakedTransfer{}, nil } diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index d8937b924..42153e7cb 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -32,6 +32,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/job/impl/gc" "github.com/goharbor/harbor/src/jobservice/job/impl/replication" + "github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng" "github.com/goharbor/harbor/src/jobservice/job/impl/scan" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/models" @@ -205,12 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ - job.ImageScanJob: (*scan.ClairJob)(nil), - job.ImageScanAllJob: (*scan.All)(nil), - job.ImageTransfer: (*replication.Transfer)(nil), - job.ImageDelete: (*replication.Deleter)(nil), - job.ImageReplicate: (*replication.Replicator)(nil), - job.ImageGC: (*gc.GarbageCollector)(nil), + job.ImageScanJob: (*scan.ClairJob)(nil), + job.ImageScanAllJob: (*scan.All)(nil), + job.ImageTransfer: (*replication.Transfer)(nil), + job.ImageDelete: (*replication.Deleter)(nil), + job.ImageReplicate: (*replication.Replicator)(nil), + job.ImageGC: (*gc.GarbageCollector)(nil), + job.ImageReplication: (*ng.Replication)(nil), }); err != nil { // exit return nil, err diff --git a/src/replication/ng/transfer/repository/registry.go b/src/replication/ng/transfer/repository/registry.go new file mode 100644 index 000000000..02d6f1935 --- /dev/null +++ b/src/replication/ng/transfer/repository/registry.go @@ -0,0 +1,116 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package repository + +import ( + "io" + "net/http" + "strings" + + "github.com/goharbor/harbor/src/common/http/modifier" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + pkg_registry "github.com/goharbor/harbor/src/common/utils/registry" + "github.com/goharbor/harbor/src/common/utils/registry/auth" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// const definition +const ( + // TODO: add filter for the agent in registry webhook handler + UserAgentReplicator = "harbor-replicator" +) + +// Registry defines an the interface for registry service +type Registry interface { + ManifestExist(repository, reference string) (exist bool, digest string, err error) + PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) + PushManifest(repository, reference, mediaType string, payload []byte) error + BlobExist(repository, digest string) (exist bool, err error) + PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) + PushBlob(repository, digest string, size int64, blob io.Reader) error +} + +// NewRegistry returns an instance of the default registry implementation +// TODO: passing the tokenServiceURL +func NewRegistry(reg *model.Registry, repository string, + tokenServiceURL ...string) (Registry, error) { + // use the same HTTP connection pool for all clients + transport := pkg_registry.GetHTTPTransport(reg.Insecure) + modifiers := []modifier.Modifier{ + &auth.UserAgentModifier{ + UserAgent: UserAgentReplicator, + }, + } + if reg.Credential != nil { + cred := auth.NewBasicAuthCredential( + reg.Credential.AccessKey, + reg.Credential.AccessSecret) + authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ + Transport: transport, + }, cred, tokenServiceURL...) + + modifiers = append(modifiers, authorizer) + } + + client, err := pkg_registry.NewRepository(repository, reg.URL, + &http.Client{ + Transport: pkg_registry.NewTransport(transport, modifiers...), + }) + if err != nil { + return nil, err + } + + return ®istry{ + client: client, + }, nil +} + +type registry struct { + client *pkg_registry.Repository +} + +func (r *registry) ManifestExist(repository, reference string) (bool, string, error) { + digest, exist, err := r.client.ManifestExist(reference) + return exist, digest, err +} +func (r *registry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { + digest, mediaType, payload, err := r.client.PullManifest(reference, accepttedMediaTypes) + if err != nil { + return nil, "", err + } + if strings.Contains(mediaType, "application/json") { + mediaType = schema1.MediaTypeManifest + } + manifest, _, err := pkg_registry.UnMarshal(mediaType, payload) + if err != nil { + return nil, "", err + } + return manifest, digest, nil +} +func (r *registry) PushManifest(repository, reference, mediaType string, payload []byte) error { + _, err := r.client.PushManifest(reference, mediaType, payload) + return err +} +func (r *registry) BlobExist(repository, digest string) (bool, error) { + return r.client.BlobExist(digest) +} +func (r *registry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { + return r.client.PullBlob(digest) +} +func (r *registry) PushBlob(repository, digest string, size int64, blob io.Reader) error { + return r.client.PushBlob(digest, size, blob) +} diff --git a/src/replication/ng/transfer/repository/transfer.go b/src/replication/ng/transfer/repository/transfer.go index 7c1be0601..941d91af9 100644 --- a/src/replication/ng/transfer/repository/transfer.go +++ b/src/replication/ng/transfer/repository/transfer.go @@ -15,29 +15,247 @@ package repository import ( + "strings" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/replication/ng/model" trans "github.com/goharbor/harbor/src/replication/ng/transfer" ) +var ( + jobStoppedErr = errs.JobStoppedError() +) + func init() { if err := trans.RegisterFactory(model.ResourceTypeRepository, factory); err != nil { log.Errorf("failed to register transfer factory: %v", err) } } -func factory(logger trans.Logger, cancelFunc trans.CancelFunc) (trans.Transfer, error) { +type repository struct { + repository string + tags []string +} + +func factory(logger trans.Logger, stopFunc trans.StopFunc) (trans.Transfer, error) { return &transfer{ - logger: logger, - isCanceled: cancelFunc, + logger: logger, + isStopped: stopFunc, }, nil } type transfer struct { - logger trans.Logger - isCanceled trans.CancelFunc + logger trans.Logger + isStopped trans.StopFunc + src Registry + dst Registry } func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { + // initialize + if err := t.initialize(src, dst); err != nil { + return err + } + + // delete the repository on destination registry + if dst.Deleted { + return t.delete(&repository{ + repository: dst.Metadata.Name, + tags: dst.Metadata.Vtags, + }) + } + + srcRepo := &repository{ + repository: src.Metadata.Name, + tags: src.Metadata.Vtags, + } + dstRepo := &repository{ + repository: dst.Metadata.Name, + tags: dst.Metadata.Vtags, + } + // copy the repository from source registry to the destination + return t.copy(srcRepo, dstRepo, dst.Override) +} + +func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error { + if t.shouldStop() { + return jobStoppedErr + } + + // create client for source registry + srcReg, err := NewRegistry(src.Registry, src.Metadata.Name) + if err != nil { + t.logger.Errorf("failed to create client for source registry: %v", err) + return err + } + t.src = srcReg + t.logger.Infof("client for source registry [type: %s, URL: %s, insecure: %v] created", + src.Registry.Type, src.Registry.URL, src.Registry.Insecure) + + // create client for destination registry + dstReg, err := NewRegistry(dst.Registry, dst.Metadata.Name) + if err != nil { + t.logger.Errorf("failed to create client for destination registry: %v", err) + return err + } + t.dst = dstReg + t.logger.Infof("client for destination registry [type: %s, URL: %s, insecure: %v] created", + dst.Registry.Type, dst.Registry.URL, dst.Registry.Insecure) + + return nil +} + +func (t *transfer) shouldStop() bool { + isStopped := t.isStopped() + if isStopped { + t.logger.Info("the job is stopped") + } + return isStopped +} + +func (t *transfer) copy(src *repository, dst *repository, override bool) error { + srcRepo := src.repository + dstRepo := dst.repository + t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...", + srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) + for i := range src.tags { + srcTag := src.tags[i] + dstTag := dst.tags[i] + t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...", + srcRepo, srcTag, dstRepo, dstTag) + // pull the manifest from the source registry + manifest, digest, err := t.pullManifest(srcRepo, srcTag) + if err != nil { + return err + } + + // check the existence of the image on the destination registry + exist, digest2, err := t.exist(dstRepo, dstTag) + if err != nil { + return err + } + if exist { + // the same image already exists + if digest == digest2 { + t.logger.Infof("the image %s:%s already exists on the destination registry, skip", + dstRepo, dstTag) + continue + } + // the same name image exists, but not allowed to override + if !override { + t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip", + dstRepo, dstTag) + continue + } + // the same name image exists, but allowed to override + t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...", + dstRepo, dstTag) + } + + // copy blobs between the source and destination registries + if err = t.copyBlobs(manifest.References(), srcRepo, dstRepo); err != nil { + return err + } + + // push the manifest to the destination registry + if err := t.pushManifest(manifest, dstRepo, dstTag); err != nil { + return err + } + + t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed", + srcRepo, srcTag, dstRepo, dstTag) + } + t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed", + srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) + return nil +} + +func (t *transfer) pullManifest(repository, tag string) ( + distribution.Manifest, string, error) { + if t.shouldStop() { + return nil, "", jobStoppedErr + } + t.logger.Infof("pulling the manifest of image %s:%s ...", repository, tag) + manifest, digest, err := t.src.PullManifest(repository, tag, []string{ + schema1.MediaTypeManifest, + schema2.MediaTypeManifest, + }) + if err != nil { + t.logger.Errorf("failed to pull the manifest of image %s:%s: %v", repository, tag, err) + return nil, "", err + } + t.logger.Infof("the manifest of image %s:%s pulled", repository, tag) + return manifest, digest, nil +} + +func (t *transfer) exist(repository, tag string) (bool, string, error) { + exist, digest, err := t.dst.ManifestExist(repository, tag) + if err != nil { + t.logger.Errorf("failed to check the existence of the manifest of iage %s:%s in the destination registry: %v", + repository, tag, err) + return false, "", err + } + return exist, digest, nil +} + +func (t *transfer) copyBlobs(blobs []distribution.Descriptor, srcRepo, dstRepo string) error { + for _, blob := range blobs { + if t.shouldStop() { + return jobStoppedErr + } + digest := blob.Digest.String() + t.logger.Infof("copying the blob %s...", digest) + exist, err := t.dst.BlobExist(dstRepo, digest) + if err != nil { + t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err) + return err + } + if exist { + t.logger.Infof("the blob %s already exists on the destination registry, skip", digest) + continue + } + + size, data, err := t.src.PullBlob(srcRepo, digest) + if err != nil { + t.logger.Errorf("failed to pulling the blob %s: %v", digest, err) + return err + } + defer data.Close() + if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil { + t.logger.Errorf("failed to pushing the blob %s: %v", digest, err) + return err + } + t.logger.Infof("copy the blob %s completed", digest) + } + return nil +} + +func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag string) error { + if t.shouldStop() { + return jobStoppedErr + } + t.logger.Infof("pushing the manifest of image %s:%s ...", repository, tag) + mediaType, payload, err := manifest.Payload() + if err != nil { + t.logger.Errorf("failed to push manifest of image %s:%s: %v", + repository, tag, err) + return err + } + if err := t.dst.PushManifest(repository, tag, mediaType, payload); err != nil { + t.logger.Errorf("failed to push manifest of image %s:%s: %v", + repository, tag, err) + return err + } + t.logger.Infof("the manifest of image %s:%s pushed", + repository, tag) + return nil +} + +func (t *transfer) delete(repo *repository) error { + // TODO return nil } diff --git a/src/replication/ng/transfer/repository/transfer_test.go b/src/replication/ng/transfer/repository/transfer_test.go new file mode 100644 index 000000000..c0cccaef2 --- /dev/null +++ b/src/replication/ng/transfer/repository/transfer_test.go @@ -0,0 +1,130 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package repository + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema2" + "github.com/goharbor/harbor/src/common/utils/log" + pkg_registry "github.com/goharbor/harbor/src/common/utils/registry" + trans "github.com/goharbor/harbor/src/replication/ng/transfer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeRregistry struct{} + +func (f *fakeRregistry) ManifestExist(repository, reference string) (bool, string, error) { + return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil +} +func (f *fakeRregistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { + manifest := `{ + "schemaVersion": 2, + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "config": { + "mediaType": "application/vnd.docker.container.image.v1+json", + "size": 7023, + "digest": "sha256:b5b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7" + }, + "layers": [ + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 32654, + "digest": "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" + }, + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 16724, + "digest": "sha256:3c3a4604a545cdc127456d94e421cd355bca5b528f4a9c1905b15da2eb4a4c6b" + }, + { + "mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip", + "size": 73109, + "digest": "sha256:ec4b8955958665577945c89419d1af06b5f7636b4ac3da7f12184802ad867736" + } + ] + }` + mediaType := schema2.MediaTypeManifest + payload := []byte(manifest) + mani, _, err := pkg_registry.UnMarshal(mediaType, payload) + if err != nil { + return nil, "", err + } + return mani, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil +} +func (f *fakeRregistry) PushManifest(repository, reference, mediaType string, payload []byte) error { + return nil +} +func (f *fakeRregistry) BlobExist(repository, digest string) (bool, error) { + return false, nil +} +func (f *fakeRregistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { + r := ioutil.NopCloser(bytes.NewReader([]byte{'a'})) + return 1, r, nil +} +func (f *fakeRregistry) PushBlob(repository, digest string, size int64, blob io.Reader) error { + return nil +} + +func TestFactory(t *testing.T) { + tr, err := factory(nil, nil) + require.Nil(t, err) + _, ok := tr.(trans.Transfer) + assert.True(t, ok) +} + +func TestShouldStop(t *testing.T) { + // should stop + stopFunc := func() bool { return true } + tr := &transfer{ + logger: log.DefaultLogger(), + isStopped: stopFunc, + } + assert.True(t, tr.shouldStop()) + + // should not stop + stopFunc = func() bool { return false } + tr = &transfer{ + isStopped: stopFunc, + } + assert.False(t, tr.shouldStop()) +} + +func TestCopy(t *testing.T) { + stopFunc := func() bool { return false } + tr := &transfer{ + logger: log.DefaultLogger(), + isStopped: stopFunc, + src: &fakeRregistry{}, + dst: &fakeRregistry{}, + } + + src := &repository{ + repository: "source", + tags: []string{"a1", "a2"}, + } + dst := &repository{ + repository: "destination", + tags: []string{"b1", "b2"}, + } + override := true + err := tr.copy(src, dst, override) + require.Nil(t, err) +} diff --git a/src/replication/ng/transfer/transfer.go b/src/replication/ng/transfer/transfer.go index db5161d48..3434b873c 100644 --- a/src/replication/ng/transfer/transfer.go +++ b/src/replication/ng/transfer/transfer.go @@ -26,10 +26,10 @@ var ( ) // Factory creates a specific Transfer. The "Logger" is used -// to log the processing messages and the "CancelFunc" -// can be used to check whether the task has been cancelled +// to log the processing messages and the "StopFunc" +// can be used to check whether the task has been stopped // during the processing progress -type Factory func(Logger, CancelFunc) (Transfer, error) +type Factory func(Logger, StopFunc) (Transfer, error) // Transfer defines an interface used to transfer the source // resource to the destination @@ -57,9 +57,9 @@ type Logger interface { Errorf(format string, v ...interface{}) } -// CancelFunc is a function used to check whether the transfer -// process is cancelled -type CancelFunc func() bool +// StopFunc is a function used to check whether the transfer +// process is stopped +type StopFunc func() bool // RegisterFactory registers one transfer factory to the registry func RegisterFactory(name model.ResourceType, factory Factory) error { diff --git a/src/replication/ng/transfer/transfer_test.go b/src/replication/ng/transfer/transfer_test.go index efa785ca4..649057e3c 100644 --- a/src/replication/ng/transfer/transfer_test.go +++ b/src/replication/ng/transfer/transfer_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/require" ) -var fakedFactory Factory = func(Logger, CancelFunc) (Transfer, error) { +var fakedFactory Factory = func(Logger, StopFunc) (Transfer, error) { return nil, nil } From 8894a27d2d9402048fa2597f709f1bf355d80b54 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 21 Feb 2019 16:52:53 +0800 Subject: [PATCH 2/3] Implement the operation controller This commit implements the operation controller. The operation controller wraps the flow controller and execution manager to provide capabilities for the upper level Signed-off-by: Wenkai Yin --- src/replication/ng/execution/execution.go | 2 +- src/replication/ng/flow/controller.go | 2 +- src/replication/ng/operation/controller.go | 64 +++++++++ .../ng/operation/controller_test.go | 126 ++++++++++++++++++ src/replication/ng/replication.go | 54 ++++++++ src/replication/ng/replication_test.go | 31 +++++ 6 files changed, 277 insertions(+), 2 deletions(-) create mode 100644 src/replication/ng/operation/controller.go create mode 100644 src/replication/ng/operation/controller_test.go create mode 100644 src/replication/ng/replication.go create mode 100644 src/replication/ng/replication_test.go diff --git a/src/replication/ng/execution/execution.go b/src/replication/ng/execution/execution.go index 9399da8ec..1b1c5a62a 100644 --- a/src/replication/ng/execution/execution.go +++ b/src/replication/ng/execution/execution.go @@ -41,7 +41,7 @@ type Manager interface { GetTask(int64) (*model.Task, error) // Update the task, the "props" are the properties of task // that need to be updated, it cannot include "status". If - // you want to update the status, use "UpdateTask" instead + // you want to update the status, use "UpdateTaskStatus" instead UpdateTask(task *model.Task, props ...string) error // UpdateTaskStatus only updates the task status. If "statusCondition" // presents, only the tasks whose status equal to "statusCondition" diff --git a/src/replication/ng/flow/controller.go b/src/replication/ng/flow/controller.go index e3d60a405..9e133dedd 100644 --- a/src/replication/ng/flow/controller.go +++ b/src/replication/ng/flow/controller.go @@ -55,7 +55,7 @@ type defaultController struct { scheduler scheduler.Scheduler } -// Replicate according the to policy ID +// Start a replication according to the policy func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) { log.Infof("starting the replication based on the policy %d ...", policy.ID) diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go new file mode 100644 index 000000000..d31e70bcd --- /dev/null +++ b/src/replication/ng/operation/controller.go @@ -0,0 +1,64 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package operation + +import ( + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/flow" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Controller handles the replication-related operations: start, +// stop, query, etc. +type Controller interface { + StartReplication(policy *model.Policy) (int64, error) + StopReplication(int64) error + ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) + GetExecution(int64) (*model.Execution, error) + ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) + GetTaskLog(int64) ([]byte, error) +} + +// NewController returns a controller implementation +func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller { + return &defaultController{ + flowCtl: flowCtl, + executionMgr: executionMgr, + } +} + +type defaultController struct { + flowCtl flow.Controller + executionMgr execution.Manager +} + +func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) { + return d.flowCtl.StartReplication(policy) +} +func (d *defaultController) StopReplication(executionID int64) error { + return d.flowCtl.StopReplication(executionID) +} +func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) { + return d.executionMgr.List(query...) +} +func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) { + return d.executionMgr.Get(executionID) +} +func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) { + return d.executionMgr.ListTasks(query...) +} +func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) { + return d.executionMgr.GetTaskLog(taskID) +} diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go new file mode 100644 index 000000000..a68e2bda6 --- /dev/null +++ b/src/replication/ng/operation/controller_test.go @@ -0,0 +1,126 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package operation + +import ( + "testing" + + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakedFlowController struct{} + +func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) { + return 1, nil +} +func (f *fakedFlowController) StopReplication(int64) error { + return nil +} + +type fakedExecutionManager struct{} + +func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { + return 1, nil +} +func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) { + return 1, []*model.Execution{ + { + ID: 1, + }, + }, nil +} +func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { + return &model.Execution{ + ID: 1, + }, nil +} +func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { + return nil +} +func (f *fakedExecutionManager) Remove(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAll(int64) error { + return nil +} +func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { + return 1, nil +} +func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { + return 1, []*model.Task{ + { + ID: 1, + }, + }, nil +} +func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { + return nil, nil +} +func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { + return nil +} +func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { + return nil +} +func (f *fakedExecutionManager) RemoveTask(int64) error { + return nil +} +func (f *fakedExecutionManager) RemoveAllTasks(int64) error { + return nil +} +func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) { + return []byte("message"), nil +} + +var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{}) + +func TestStartReplication(t *testing.T) { + id, err := ctl.StartReplication(nil) + require.Nil(t, err) + assert.Equal(t, int64(1), id) +} + +func TestStopReplication(t *testing.T) { + err := ctl.StopReplication(1) + require.Nil(t, err) +} + +func TestListExecutions(t *testing.T) { + n, executions, err := ctl.ListExecutions() + require.Nil(t, err) + assert.Equal(t, int64(1), n) + assert.Equal(t, int64(1), executions[0].ID) +} + +func TestGetExecution(t *testing.T) { + execution, err := ctl.GetExecution(1) + require.Nil(t, err) + assert.Equal(t, int64(1), execution.ID) +} + +func TestListTasks(t *testing.T) { + n, tasks, err := ctl.ListTasks() + require.Nil(t, err) + assert.Equal(t, int64(1), n) + assert.Equal(t, int64(1), tasks[0].ID) +} + +func TestGetTaskLog(t *testing.T) { + log, err := ctl.GetTaskLog(1) + require.Nil(t, err) + assert.Equal(t, "message", string(log)) +} diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go new file mode 100644 index 000000000..cd169ff98 --- /dev/null +++ b/src/replication/ng/replication.go @@ -0,0 +1,54 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ng ... +// TODO rename the package name after removing ng +package ng + +import ( + "fmt" + + "github.com/goharbor/harbor/src/replication/ng/scheduler" + + "github.com/goharbor/harbor/src/replication/ng/execution" + "github.com/goharbor/harbor/src/replication/ng/flow" + "github.com/goharbor/harbor/src/replication/ng/operation" + "github.com/goharbor/harbor/src/replication/ng/registry" +) + +var ( + // RegistryMgr is a global registry manager + RegistryMgr registry.Manager + // ExecutionMgr is a global execution manager + ExecutionMgr execution.Manager + // OperationCtl is a global operation controller + OperationCtl operation.Controller +) + +// Init the global variables +func Init() error { + // TODO init RegistryMgr + + // TODO init ExecutionMgr + + // TODO init scheduler + var scheduler scheduler.Scheduler + + flowCtl, err := flow.NewController(RegistryMgr, ExecutionMgr, scheduler) + if err != nil { + return fmt.Errorf("failed to create the flow controller: %v", err) + } + OperationCtl = operation.NewController(flowCtl, ExecutionMgr) + return nil +} diff --git a/src/replication/ng/replication_test.go b/src/replication/ng/replication_test.go new file mode 100644 index 000000000..5da81dc8c --- /dev/null +++ b/src/replication/ng/replication_test.go @@ -0,0 +1,31 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ng ... +// TODO rename the package name after removing ng +package ng + +import ( + "testing" + // "github.com/stretchr/testify/assert" + // "github.com/stretchr/testify/require" +) + +func TestInit(t *testing.T) { + // TODO add testing code + // err := Init() + // require.Nil(t, err) + // assert.NotNil(t, OperationCtl) + // TODO add check for RegistryMgr and ExecutionMgr +} From 79d34c6f809991b89a459df8483464ae7a1406dd Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 26 Feb 2019 14:28:43 +0800 Subject: [PATCH 3/3] Create dao folder in replication This commit creates a new folder called dao under replication to hold dao codes Signed-off-by: Wenkai Yin --- src/replication/ng/dao/dao_test.go | 27 +++++++++++++++++++++ src/replication/ng/dao/example.go | 27 +++++++++++++++++++++ src/replication/ng/dao/example_test.go | 33 ++++++++++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 src/replication/ng/dao/dao_test.go create mode 100644 src/replication/ng/dao/example.go create mode 100644 src/replication/ng/dao/example_test.go diff --git a/src/replication/ng/dao/dao_test.go b/src/replication/ng/dao/dao_test.go new file mode 100644 index 000000000..eb5e9bbb9 --- /dev/null +++ b/src/replication/ng/dao/dao_test.go @@ -0,0 +1,27 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "os" + "testing" + + "github.com/goharbor/harbor/src/common/dao" +) + +func TestMain(m *testing.M) { + dao.PrepareTestForPostgresSQL() + os.Exit(m.Run()) +} diff --git a/src/replication/ng/dao/example.go b/src/replication/ng/dao/example.go new file mode 100644 index 000000000..606519e3b --- /dev/null +++ b/src/replication/ng/dao/example.go @@ -0,0 +1,27 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" +) + +// TODO remove the file + +// CreateProject ... +func CreateProject(project *models.Project) (int64, error) { + return dao.GetOrmer().Insert(project) +} diff --git a/src/replication/ng/dao/example_test.go b/src/replication/ng/dao/example_test.go new file mode 100644 index 000000000..f7fae2918 --- /dev/null +++ b/src/replication/ng/dao/example_test.go @@ -0,0 +1,33 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "testing" + + "github.com/goharbor/harbor/src/common/models" + "github.com/stretchr/testify/require" +) + +// TODO remove the file + +func TestCreateProject(t *testing.T) { + project := &models.Project{ + Name: "example-project", + OwnerID: 1, + } + _, err := CreateProject(project) + require.Nil(t, err) +}