Refactor the adapter interface

This commit refactors the Adapter interface and adjust the code in the flow controller and repository handler

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-02-27 17:35:21 +08:00
parent 2cce835784
commit 6742e0ceda
9 changed files with 144 additions and 19 deletions

View File

@ -53,10 +53,6 @@ type Adapter interface {
// Get the namespace specified by the name, the returning value should
// contain the metadata about the namespace if it has
GetNamespace(string) (*model.Namespace, error)
// Fetch the content resource under the namespace by filters
// SUGGESTION: Adapter provider can do their own filter based on the filter pattern
// or call the default `DoFilter` function of the filter to complete resource filtering.
FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error)
}
// RegisterFactory registers one adapter factory to the registry

View File

@ -0,0 +1,15 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adapter

View File

@ -0,0 +1,33 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adapter
import (
"io"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// ImageRegistry defines the capabilities that an image registry should have
type ImageRegistry interface {
FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error)
ManifestExist(repository, reference string) (exist bool, digest string, err error)
PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error)
PushManifest(repository, reference, mediaType string, payload []byte) error
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
}

View File

@ -15,8 +15,10 @@
package flow
import (
"io"
"testing"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
@ -158,9 +160,6 @@ func fakedAdapterFactory(*model.Registry) (adapter.Adapter, error) {
type fakedAdapter struct{}
func (f *fakedAdapter) Info() *adapter.Info {
return nil
}
func (f *fakedAdapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
return nil, nil
}
@ -170,7 +169,7 @@ func (f *fakedAdapter) CreateNamespace(*model.Namespace) error {
func (f *fakedAdapter) GetNamespace(string) (*model.Namespace, error) {
return &model.Namespace{}, nil
}
func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
func (f *fakedAdapter) FetchImages(namespace []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeRepository,
@ -184,11 +183,30 @@ func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filte
}, nil
}
func (f *fakedAdapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
}
func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
}
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
}
func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
}
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func TestStartReplication(t *testing.T) {
err := adapter.RegisterFactory(
&adapter.Info{
Type: "faked_registry",
SupportedResourceTypes: []model.ResourceType{"image"},
SupportedResourceTypes: []model.ResourceType{"repository"},
}, fakedAdapterFactory)
require.Nil(t, err)

View File

@ -109,12 +109,40 @@ func (f *flow) createExecution() (int64, error) {
}
func (f *flow) fetchResources() error {
resources, err := f.srcAdapter.FetchResources(f.policy.SrcNamespaces, f.policy.Filters)
f.resources = resources
resTypes := []model.ResourceType{}
filters := []*model.Filter{}
for _, filter := range f.policy.Filters {
if filter.Type != model.FilterTypeResource {
filters = append(filters, filter)
continue
}
resTypes = append(resTypes, filter.Value.(model.ResourceType))
}
if len(resTypes) == 0 {
resTypes = append(resTypes, adapter.GetAdapterInfo(f.srcRegistry.Type).SupportedResourceTypes...)
}
// TODO consider whether the logic can be refactored by using reflect
resources := []*model.Resource{}
for _, typ := range resTypes {
if typ == model.ResourceTypeRepository {
reg, ok := f.srcAdapter.(adapter.ImageRegistry)
if !ok {
err := fmt.Errorf("the adapter doesn't implement the ImageRegistry interface")
f.markExecutionFailure(err)
return err
}
res, err := reg.FetchImages(f.policy.SrcNamespaces, filters)
if err != nil {
f.markExecutionFailure(err)
return err
}
resources = append(resources, res...)
continue
}
// TODO add support for chart
}
f.resources = resources
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
return nil

View File

@ -20,6 +20,14 @@ import (
"github.com/goharbor/harbor/src/common/models"
)
// const definition
const (
FilterTypeResource = "Resource"
FilterTypeName = "Name"
FilterTypeVersion = "Version"
FilterTypeLabel = "Label"
)
// Policy defines the structure of a replication policy
type Policy struct {
ID int64 `json:"id"`

View File

@ -28,6 +28,8 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
// TODO remove the file
// const definition
const (
// TODO: add filter for the agent in registry webhook handler

View File

@ -15,8 +15,11 @@
package repository
import (
"errors"
"strings"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
@ -51,8 +54,8 @@ func factory(logger trans.Logger, stopFunc trans.StopFunc) (trans.Transfer, erro
type transfer struct {
logger trans.Logger
isStopped trans.StopFunc
src Registry
dst Registry
src adapter.ImageRegistry
dst adapter.ImageRegistry
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
@ -85,9 +88,8 @@ func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
if t.shouldStop() {
return jobStoppedErr
}
// create client for source registry
srcReg, err := NewRegistry(src.Registry, src.Metadata.Name)
srcReg, err := createRegistry(src.Registry)
if err != nil {
t.logger.Errorf("failed to create client for source registry: %v", err)
return err
@ -97,7 +99,7 @@ func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
src.Registry.Type, src.Registry.URL, src.Registry.Insecure)
// create client for destination registry
dstReg, err := NewRegistry(dst.Registry, dst.Metadata.Name)
dstReg, err := createRegistry(dst.Registry)
if err != nil {
t.logger.Errorf("failed to create client for destination registry: %v", err)
return err
@ -109,6 +111,23 @@ func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
return nil
}
// TODO handler the tokenServiceURL
func createRegistry(reg *model.Registry, tokenServiceURL ...string) (adapter.ImageRegistry, error) {
factory, err := adapter.GetFactory(reg.Type)
if err != nil {
return nil, err
}
ad, err := factory(reg)
if err != nil {
return nil, err
}
registry, ok := ad.(adapter.ImageRegistry)
if !ok {
return nil, errors.New("the adapter doesn't implement the \"ImageRegistry\" interface")
}
return registry, nil
}
func (t *transfer) shouldStop() bool {
isStopped := t.isStopped()
if isStopped {

View File

@ -20,6 +20,8 @@ import (
"io/ioutil"
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log"
@ -31,6 +33,10 @@ import (
type fakeRregistry struct{}
func (f *fakeRregistry) FetchImages([]string, []*model.Filter) ([]*model.Resource, error) {
return nil, nil
}
func (f *fakeRregistry) ManifestExist(repository, reference string) (bool, string, error) {
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}