Implement a default image registry client

Provide a default implemmentation for image registry interface, other adapters can use it directly

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-03-12 19:06:39 +08:00
parent 5f8c19e5ed
commit 185525e9c8
2 changed files with 135 additions and 132 deletions

View File

@ -15,12 +15,26 @@
package adapter
import (
"errors"
"io"
"net/http"
"strings"
"sync"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/goharbor/harbor/src/common/http/modifier"
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// const definition
const (
// TODO: add filter for the agent in registry webhook handler
UserAgentReplicator = "harbor-replicator"
)
// ImageRegistry defines the capabilities that an image registry should have
type ImageRegistry interface {
FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error)
@ -32,42 +46,149 @@ type ImageRegistry interface {
PushBlob(repository, digest string, size int64, blob io.Reader) error
}
// TODO implement the functions
// DefaultImageRegistry provides a default implementation for interface ImageRegistry
type DefaultImageRegistry struct{}
type DefaultImageRegistry struct {
sync.RWMutex
client *http.Client
url string
clients map[string]*registry_pkg.Repository
}
// TODO: passing the tokenServiceURL
// NewDefaultImageRegistry returns an instance of DefaultImageRegistry
func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string) *DefaultImageRegistry {
// use the same HTTP connection pool for all clients
transport := registry_pkg.GetHTTPTransport(registry.Insecure)
modifiers := []modifier.Modifier{
&auth.UserAgentModifier{
UserAgent: UserAgentReplicator,
},
}
if registry.Credential != nil {
cred := auth.NewBasicAuthCredential(
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, cred, tokenServiceURL...)
modifiers = append(modifiers, authorizer)
}
client := &http.Client{
Transport: registry_pkg.NewTransport(transport, modifiers...),
}
return &DefaultImageRegistry{
client: client,
clients: map[string]*registry_pkg.Repository{},
url: registry.URL,
}
}
func (d *DefaultImageRegistry) getClient(repository string) (*registry_pkg.Repository, error) {
client := d.get(repository)
if client != nil {
return client, nil
}
return d.create(repository)
}
func (d *DefaultImageRegistry) get(repository string) *registry_pkg.Repository {
d.RLock()
defer d.RUnlock()
client, exist := d.clients[repository]
if exist {
return client
}
return nil
}
func (d *DefaultImageRegistry) create(repository string) (*registry_pkg.Repository, error) {
d.Lock()
defer d.Unlock()
// double check
client, exist := d.clients[repository]
if exist {
return client, nil
}
client, err := registry_pkg.NewRepository(repository, d.url, d.client)
if err != nil {
return nil, err
}
d.clients[repository] = client
return client, nil
}
// FetchImages ...
func (d *DefaultImageRegistry) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return nil, nil
return nil, errors.New("not implemented")
}
// ManifestExist ...
func (d *DefaultImageRegistry) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
return false, "", nil
func (d *DefaultImageRegistry) ManifestExist(repository, reference string) (bool, string, error) {
client, err := d.getClient(repository)
if err != nil {
return false, "", err
}
digest, exist, err := client.ManifestExist(reference)
return exist, digest, err
}
// PullManifest ...
func (d *DefaultImageRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) {
return nil, "", nil
func (d *DefaultImageRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
client, err := d.getClient(repository)
if err != nil {
return nil, "", err
}
digest, mediaType, payload, err := client.PullManifest(reference, accepttedMediaTypes)
if err != nil {
return nil, "", err
}
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := registry_pkg.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return manifest, digest, nil
}
// PushManifest ...
func (d *DefaultImageRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
client, err := d.getClient(repository)
if err != nil {
return err
}
_, err = client.PushManifest(reference, mediaType, payload)
return err
}
// BlobExist ...
func (d *DefaultImageRegistry) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil
func (d *DefaultImageRegistry) BlobExist(repository, digest string) (bool, error) {
client, err := d.getClient(repository)
if err != nil {
return false, err
}
return client.BlobExist(digest)
}
// PullBlob ...
func (d *DefaultImageRegistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return 0, nil, nil
func (d *DefaultImageRegistry) PullBlob(repository, digest string) (int64, io.ReadCloser, error) {
client, err := d.getClient(repository)
if err != nil {
return 0, nil, err
}
return client.PullBlob(digest)
}
// PushBlob ...
func (d *DefaultImageRegistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
client, err := d.getClient(repository)
if err != nil {
return err
}
return client.PushBlob(digest, size, blob)
}

View File

@ -1,118 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repository
import (
"io"
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/http/modifier"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
pkg_registry "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// TODO remove the file
// const definition
const (
// TODO: add filter for the agent in registry webhook handler
UserAgentReplicator = "harbor-replicator"
)
// Registry defines an the interface for registry service
type Registry interface {
ManifestExist(repository, reference string) (exist bool, digest string, err error)
PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error)
PushManifest(repository, reference, mediaType string, payload []byte) error
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
}
// NewRegistry returns an instance of the default registry implementation
// TODO: passing the tokenServiceURL
func NewRegistry(reg *model.Registry, repository string,
tokenServiceURL ...string) (Registry, error) {
// use the same HTTP connection pool for all clients
transport := pkg_registry.GetHTTPTransport(reg.Insecure)
modifiers := []modifier.Modifier{
&auth.UserAgentModifier{
UserAgent: UserAgentReplicator,
},
}
if reg.Credential != nil {
cred := auth.NewBasicAuthCredential(
reg.Credential.AccessKey,
reg.Credential.AccessSecret)
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, cred, tokenServiceURL...)
modifiers = append(modifiers, authorizer)
}
client, err := pkg_registry.NewRepository(repository, reg.URL,
&http.Client{
Transport: pkg_registry.NewTransport(transport, modifiers...),
})
if err != nil {
return nil, err
}
return &registry{
client: client,
}, nil
}
type registry struct {
client *pkg_registry.Repository
}
func (r *registry) ManifestExist(repository, reference string) (bool, string, error) {
digest, exist, err := r.client.ManifestExist(reference)
return exist, digest, err
}
func (r *registry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
digest, mediaType, payload, err := r.client.PullManifest(reference, accepttedMediaTypes)
if err != nil {
return nil, "", err
}
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := pkg_registry.UnMarshal(mediaType, payload)
if err != nil {
return nil, "", err
}
return manifest, digest, nil
}
func (r *registry) PushManifest(repository, reference, mediaType string, payload []byte) error {
_, err := r.client.PushManifest(reference, mediaType, payload)
return err
}
func (r *registry) BlobExist(repository, digest string) (bool, error) {
return r.client.BlobExist(digest)
}
func (r *registry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
return r.client.PullBlob(digest)
}
func (r *registry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return r.client.PushBlob(digest, size, blob)
}