Add proxy cache feature

Update route to add proxy related middleware
Add proxy controller

Signed-off-by: stonezdj <stonezdj@gmail.com>
This commit is contained in:
stonezdj 2020-07-04 07:44:07 +08:00
parent 3b43162b6d
commit 3abe77d6cb
9 changed files with 1083 additions and 0 deletions

View File

@ -0,0 +1,215 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"github.com/opencontainers/go-digest"
"io"
"strings"
"sync"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/replication/registry"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
const (
// wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready
maxManifestListWait = 20
maxManifestWait = 10
sleepIntervalSec = 20
)
var (
// Ctl is a global proxy controller instance
ctl Controller
once sync.Once
)
// Controller defines the operations related with pull through proxy
type Controller interface {
// UseLocalBlob check if the blob should use local copy
UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool
// UseLocalManifest check manifest should use local copy
UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool
// ProxyBlob proxy the blob request to the remote server, p is the proxy project
// art is the ArtifactInfo which includes the digest of the blob
ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error)
// ProxyManifest proxy the manifest request to the remote server, p is the proxy project,
// art is the ArtifactInfo which includes the tag or digest of the manifest
ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error)
}
type controller struct {
blobCtl blob.Controller
registryMgr registry.Manager
artifactCtl artifact.Controller
local localInterface
}
// ControllerInstance -- Get the proxy controller instance
func ControllerInstance() Controller {
// Lazy load the controller
// Because LocalHelper is not ready unless core startup completely
once.Do(func() {
ctl = &controller{
blobCtl: blob.Ctl,
registryMgr: registry.NewDefaultManager(),
artifactCtl: artifact.Ctl,
local: newLocalHelper(),
}
})
return ctl
}
func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool {
if len(art.Digest) == 0 {
return false
}
exist, err := c.local.BlobExist(ctx, art)
if err != nil {
return false
}
return exist
}
func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool {
if len(art.Digest) == 0 {
return false
}
return c.local.ManifestExist(ctx, art)
}
func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) {
var man distribution.Manifest
remoteRepo := getRemoteRepo(art)
r, err := newRemoteHelper(p.RegistryID)
if err != nil {
return man, err
}
ref := getReference(art)
man, err = r.Manifest(remoteRepo, ref)
if err != nil {
if errors.IsNotFoundErr(err) {
go func() {
c.local.DeleteManifest(remoteRepo, art.Tag)
}()
}
return man, err
}
ct, _, err := man.Payload()
if err != nil {
return man, err
}
// Push manifest in background
go func() {
c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r)
}()
return man, nil
}
func (c *controller) ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
remoteRepo := getRemoteRepo(art)
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
rHelper, err := newRemoteHelper(p.RegistryID)
if err != nil {
return 0, nil, err
}
size, bReader, err := rHelper.BlobReader(remoteRepo, art.Digest)
if err != nil {
log.Errorf("failed to pull blob, error %v", err)
return 0, nil, err
}
desc := distribution.Descriptor{Size: size, Digest: digest.Digest(art.Digest)}
go func() {
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, rHelper)
if err != nil {
log.Errorf("error while putting blob to local repo, %v", err)
}
}()
return size, bReader, nil
}
func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r remoteInterface) error {
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest)
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest))
if err != nil {
log.Errorf("failed to create blob reader, error %v", err)
return err
}
defer bReader.Close()
err = c.local.PushBlob(localRepo, desc, bReader)
return err
}
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r remoteInterface) {
if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex {
err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man)
if err != nil {
log.Errorf("error when push manifest list to local :%v", err)
}
return
}
var waitBlobs []distribution.Descriptor
for n := 0; n < maxManifestWait; n++ {
time.Sleep(sleepIntervalSec * time.Second)
waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man)
if len(waitBlobs) == 0 {
break
}
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
}
if len(waitBlobs) > 0 {
// docker client will skip to pull layers exist in local
// these blobs is not exist in the proxy server
// it will cause the manifest dependency check always fail
// need to push these blobs before push manifest to avoid failure
log.Debug("Waiting blobs not empty, push it to local repo directly")
for _, desc := range waitBlobs {
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r)
if err != nil {
log.Errorf("Failed to push blob to local repo, error: %v", err)
return
}
}
}
err := c.local.PushManifest(art.Repository, getReference(art), man)
if err != nil {
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
}
}
// getRemoteRepo get the remote repository name, used in proxy cache
func getRemoteRepo(art lib.ArtifactInfo) string {
return strings.TrimPrefix(art.Repository, art.ProjectName+"/")
}
func getReference(art lib.ArtifactInfo) string {
if len(art.Tag) > 0 {
return art.Tag
}
return art.Digest
}

View File

@ -0,0 +1,174 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/replication/registry"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"io"
"testing"
)
type localInterfaceMock struct {
mock.Mock
}
func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
args := l.Called(ctx, art)
return args.Bool(0), args.Error(1)
}
func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
args := l.Called(ctx, art)
return args.Bool(0)
}
func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
panic("implement me")
}
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error {
panic("implement me")
}
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
panic("implement me")
}
func (l *localInterfaceMock) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
panic("implement me")
}
func (l *localInterfaceMock) DeleteManifest(repo, ref string) {
panic("implement me")
}
type proxyControllerTestSuite struct {
suite.Suite
local *localInterfaceMock
ctr Controller
}
func (p *proxyControllerTestSuite) SetupTest() {
p.local = &localInterfaceMock{}
p.ctr = &controller{
blobCtl: blob.Ctl,
registryMgr: registry.NewDefaultManager(),
artifactCtl: artifact.Ctl,
local: p.local,
}
}
func (p *proxyControllerTestSuite) TestUseLocalManifest_True() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
result := p.ctr.UseLocalManifest(ctx, art)
p.Assert().True(result)
}
func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil)
result := p.ctr.UseLocalManifest(ctx, art)
p.Assert().False(result)
}
func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
ctx := context.Background()
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
result := p.ctr.UseLocalManifest(ctx, art)
p.Assert().False(result)
}
func (p *proxyControllerTestSuite) TestUseLocalBlob_True() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil)
result := p.ctr.UseLocalBlob(ctx, art)
p.Assert().True(result)
}
func (p *proxyControllerTestSuite) TestUseLocalBlob_False() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil)
result := p.ctr.UseLocalBlob(ctx, art)
p.Assert().False(result)
}
func TestProxyControllerTestSuite(t *testing.T) {
suite.Run(t, &proxyControllerTestSuite{})
}
func TestProxyCacheRemoteRepo(t *testing.T) {
cases := []struct {
name string
in lib.ArtifactInfo
want string
}{
{
name: `normal test`,
in: lib.ArtifactInfo{ProjectName: "dockerhub_proxy", Repository: "dockerhub_proxy/firstfloor/hello-world"},
want: "firstfloor/hello-world",
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
got := getRemoteRepo(tt.in)
if got != tt.want {
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want)
}
})
}
}
func TestGetRef(t *testing.T) {
cases := []struct {
name string
in lib.ArtifactInfo
want string
}{
{
name: `normal`,
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "latest", Digest: "sha256:aabbcc"},
want: "latest",
},
{
name: `digest_only`,
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "", Digest: "sha256:aabbcc"},
want: "sha256:aabbcc",
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
got := getReference(tt.in)
if got != tt.want {
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want)
}
})
}
}

View File

@ -0,0 +1,46 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import "sync"
type inflightRequest struct {
mu sync.Mutex
reqMap map[string]interface{}
}
var inflightChecker = &inflightRequest{
reqMap: make(map[string]interface{}),
}
// addRequest if the artifact already exist in the inflightRequest, return false
// else return true
func (in *inflightRequest) addRequest(artifact string) (suc bool) {
in.mu.Lock()
defer in.mu.Unlock()
_, ok := in.reqMap[artifact]
if ok {
// Skip some following operation if it is in reqMap
return false
}
in.reqMap[artifact] = 1
return true
}
func (in *inflightRequest) removeRequest(artifact string) {
in.mu.Lock()
defer in.mu.Unlock()
delete(in.reqMap, artifact)
}

View File

@ -0,0 +1,30 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestInflightRequest(t *testing.T) {
artName := "hello-world:latest"
inflightChecker.addRequest(artName)
_, ok := inflightChecker.reqMap[artName]
assert.True(t, ok)
inflightChecker.removeRequest(artName)
_, exist := inflightChecker.reqMap[artName]
assert.False(t, exist)
}

View File

@ -0,0 +1,191 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"errors"
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
comHttpAuth "github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/registry"
"io"
"time"
)
// localInterface defines operations related to local repo under proxy mode
type localInterface interface {
// BlobExist check if the blob exist in local repo
BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error)
// Manifest check if the manifest exist in local repo
ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool
// PushBlob push blob to local repo
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
// PushManifest push manifest to local repo, ref can be digest or tag
PushManifest(repo string, ref string, manifest distribution.Manifest) error
// PushManifestList push manifest list to local repo
PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error
// CheckDependencies check if the manifest's dependency is ready
CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor
// DeleteManifest cleanup delete tag from local cache
DeleteManifest(repo, ref string)
}
func (l *localHelper) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
a, err := l.artifactCtl.GetByReference(ctx, art.Repository, art.Digest, nil)
if err != nil {
log.Errorf("check manifest exist failed, error %v", err)
return false
}
return a != nil
}
// localHelper defines operations related to local repo under proxy mode
type localHelper struct {
registry registry.Client
artifactCtl artifactController
}
type artifactController interface {
GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (artifact *artifact.Artifact, err error)
}
// newLocalHelper create the localInterface
func newLocalHelper() localInterface {
l := &localHelper{artifactCtl: artifact.Ctl}
l.init()
return l
}
func (l *localHelper) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
return l.registry.BlobExist(art.Repository, art.Digest)
}
func (l *localHelper) init() {
if l.registry != nil {
return
}
log.Debugf("core url:%s, local core url: %v", config.GetCoreURL(), config.LocalCoreURL())
// the traffic is internal only
registryURL := config.LocalCoreURL()
authorizer := comHttpAuth.NewSecretAuthorizer(config.ProxyServiceSecret)
l.registry = registry.NewClientWithAuthorizer(registryURL, authorizer, true)
}
func (l *localHelper) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
log.Debugf("Put blob to local registry, localRepo:%v, digest: %v", localRepo, desc.Digest)
ref := string(desc.Digest)
artName := localRepo + ":" + ref
// use inflight checker to avoid multiple requests to push blob to local in same time
if !inflightChecker.addRequest(artName) {
return nil
}
defer inflightChecker.removeRequest(artName)
err := l.registry.PushBlob(localRepo, ref, desc.Size, bReader)
return err
}
func (l *localHelper) PushManifest(repo string, ref string, manifest distribution.Manifest) error {
// Make sure there is only one go routing to push current artName to local repo
artName := repo + ":" + ref
// use inflight checker to avoid multiple requests to push manifest to local in same time
if !inflightChecker.addRequest(artName) {
return nil
}
defer inflightChecker.removeRequest(artName)
mediaType, payload, err := manifest.Payload()
if err != nil {
return err
}
_, err = l.registry.PushManifest(repo, ref, mediaType, payload)
return err
}
// DeleteManifest cleanup delete tag from local repo
func (l *localHelper) DeleteManifest(repo, ref string) {
log.Debug("Remove tag from repo if it is exist")
if err := l.registry.DeleteManifest(repo, ref); err != nil {
// sometimes user pull a non-exist image
log.Warningf("failed to remove artifact, error %v", err)
}
}
// updateManifestList -- Trim the manifest list, make sure at least one depend manifests is ready
func (l *localHelper) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
switch v := manifest.(type) {
case *manifestlist.DeserializedManifestList:
existMans := make([]manifestlist.ManifestDescriptor, 0)
for _, m := range v.Manifests {
art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
if l.ManifestExist(ctx, art) {
existMans = append(existMans, m)
}
}
return manifestlist.FromDescriptors(existMans)
}
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
}
func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error {
// For manifest list, it might include some different manifest
// it will wait and check for 30 mins, if all depend manifests exist then push it
// if time exceed, only push the new updated manifest list which contains existing manifest
var newMan distribution.Manifest
var err error
for n := 0; n < maxManifestListWait; n++ {
log.Debugf("waiting for the manifest ready, repo %v, ref:%v", repo, ref)
time.Sleep(sleepIntervalSec * time.Second)
newMan, err = l.updateManifestList(ctx, repo, man)
if err != nil {
return err
}
if len(newMan.References()) == len(man.References()) {
break
}
}
if len(newMan.References()) == 0 {
return errors.New("manifest list doesn't contain any pushed manifest")
}
_, pl, err := newMan.Payload()
if err != nil {
log.Errorf("failed to get payload, error %v", err)
return err
}
log.Debugf("The manifest list payload: %v", string(pl))
return l.PushManifest(repo, ref, newMan)
}
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
descriptors := man.References()
waitDesc := make([]distribution.Descriptor, 0)
for _, desc := range descriptors {
log.Debugf("checking the blob dependency: %v", desc.Digest)
art := lib.ArtifactInfo{Repository: repo, Digest: string(desc.Digest)}
exist, err := l.BlobExist(ctx, art)
if err != nil || !exist {
log.Debugf("Check dependency failed!")
waitDesc = append(waitDesc, desc)
}
}
log.Debugf("Check dependency result %v", waitDesc)
return waitDesc
}

View File

@ -0,0 +1,200 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
distribution2 "github.com/docker/distribution"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/pkg/distribution"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/mock"
"testing"
testregistry "github.com/goharbor/harbor/src/testing/pkg/registry"
"github.com/stretchr/testify/suite"
)
type mockManifest struct {
mock.Mock
}
func (m *mockManifest) References() []distribution2.Descriptor {
args := m.Called()
desc := make([]distribution2.Descriptor, 0)
if args[0] != nil {
desc = args[0].([]distribution2.Descriptor)
}
return desc
}
func (m *mockManifest) Payload() (mediaType string, payload []byte, err error) {
args := m.Called()
p := make([]byte, 0)
if args[1] != nil {
p = args[1].([]byte)
}
return args.String(0), p, args.Error(2)
}
type artifactControllerMock struct {
mock.Mock
}
func (a *artifactControllerMock) GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (arti *artifact.Artifact, err error) {
args := a.Called(ctx, repository, reference, option)
art := &artifact.Artifact{}
if args[0] != nil {
art = args[0].(*artifact.Artifact)
}
return art, args.Error(1)
}
type localHelperTestSuite struct {
suite.Suite
registryClient *testregistry.FakeClient
local *localHelper
artCtl *artifactControllerMock
}
func (lh *localHelperTestSuite) SetupTest() {
lh.registryClient = &testregistry.FakeClient{}
lh.artCtl = &artifactControllerMock{}
lh.local = &localHelper{registry: lh.registryClient, artifactCtl: lh.artCtl}
}
func (lh *localHelperTestSuite) TestBlobExist_False() {
repo := "library/hello-world"
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
ctx := context.Background()
lh.registryClient.On("BlobExist").Return(false, nil)
exist, err := lh.local.BlobExist(ctx, art)
lh.Require().Nil(err)
lh.Assert().Equal(false, exist)
}
func (lh *localHelperTestSuite) TestBlobExist_True() {
repo := "library/hello-world"
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
ctx := context.Background()
lh.registryClient.On("BlobExist").Return(true, nil)
exist, err := lh.local.BlobExist(ctx, art)
lh.Require().Nil(err)
lh.Assert().Equal(true, exist)
}
func (lh *localHelperTestSuite) TestPushManifest() {
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
lh.registryClient.On("PushManifest").Return(dig, nil)
manifest := &mockManifest{}
var ct string
manifest.Mock.On("Payload").Return(ct, []byte("example"), nil)
ct = schema2.MediaTypeManifest
err := lh.local.PushManifest("library/hello-world", "", manifest)
lh.Require().Nil(err)
}
func (lh *localHelperTestSuite) TestUpdateManifestList() {
ctx := context.Background()
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
manifestList := manifestlist.ManifestList{
Versioned: manifest.Versioned{
SchemaVersion: 2,
MediaType: manifestlist.MediaTypeManifestList,
},
Manifests: []manifestlist.ManifestDescriptor{
{
Descriptor: distribution.Descriptor{
Digest: digest.Digest(amdDig),
Size: 3253,
MediaType: schema2.MediaTypeManifest,
},
Platform: manifestlist.PlatformSpec{
Architecture: "amd64",
OS: "linux",
},
}, {
Descriptor: distribution.Descriptor{
Digest: digest.Digest(armDig),
Size: 3253,
MediaType: schema2.MediaTypeManifest,
},
Platform: manifestlist.PlatformSpec{
Architecture: "arm",
OS: "linux",
},
},
},
}
manList := &manifestlist.DeserializedManifestList{
ManifestList: manifestList,
}
ar := &artifact.Artifact{}
var emptyArtifact *artifact.Artifact
var opt *artifact.Option
lh.artCtl.On("GetByReference", ctx, "library/hello-world", amdDig, opt).Return(ar, nil)
lh.artCtl.On("GetByReference", ctx, "library/hello-world", armDig, opt).Return(emptyArtifact, nil)
newMan, err := lh.local.updateManifestList(ctx, "library/hello-world", manList)
lh.Require().Nil(err)
lh.Assert().Equal(len(newMan.References()), 1)
}
func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
ctx := context.Background()
manifest := &mockManifest{}
refs := []distribution2.Descriptor{
{Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"},
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
}
manifest.On("References").Return(refs)
lh.registryClient.On("BlobExist").Return(false, nil)
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
lh.Assert().Equal(len(ret), 2)
}
func (lh *localHelperTestSuite) TestCheckDependencies_Suc() {
ctx := context.Background()
manifest := &mockManifest{}
refs := []distribution2.Descriptor{
{Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"},
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
}
manifest.On("References").Return(refs)
lh.registryClient.On("BlobExist").Return(true, nil)
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
lh.Assert().Equal(len(ret), 0)
}
func (lh *localHelperTestSuite) TestManifestExist() {
ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
ar := &artifact.Artifact{}
var opt *artifact.Option
lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil)
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
exist := lh.local.ManifestExist(ctx, art)
lh.Assert().True(exist)
}
func TestLocalHelperTestSuite(t *testing.T) {
suite.Run(t, &localHelperTestSuite{})
}

View File

@ -0,0 +1,81 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"fmt"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/registry"
"io"
)
// remoteInterface defines operations related to remote repository under proxy
type remoteInterface interface {
// BlobReader create a reader for remote blob
BlobReader(repo, dig string) (int64, io.ReadCloser, error)
// Manifest get manifest by reference
Manifest(repo string, ref string) (distribution.Manifest, error)
}
// remoteHelper defines operations related to remote repository under proxy
type remoteHelper struct {
regID int64
registry adapter.ArtifactRegistry
}
// newRemoteHelper create a remoteHelper interface
func newRemoteHelper(regID int64) (remoteInterface, error) {
r := &remoteHelper{regID: regID}
if err := r.init(); err != nil {
log.Errorf("failed to create remoteHelper error %v", err)
return nil, err
}
return r, nil
}
func (r *remoteHelper) init() error {
if r.registry != nil {
return nil
}
reg, err := registry.NewDefaultManager().Get(r.regID)
if err != nil {
return err
}
if reg == nil {
return fmt.Errorf("failed to get registry, registryID: %v", r.regID)
}
factory, err := adapter.GetFactory(reg.Type)
if err != nil {
return err
}
adp, err := factory.Create(reg)
if err != nil {
return err
}
r.registry = adp.(adapter.ArtifactRegistry)
return nil
}
func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
return r.registry.PullBlob(repo, dig)
}
func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, error) {
man, _, err := r.registry.PullManifest(repo, ref)
return man, err
}

View File

@ -0,0 +1,138 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package repoproxy
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/lib/errors"
httpLib "github.com/goharbor/harbor/src/lib/http"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/registry"
"io"
"net/http"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/controller/proxy"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/server/middleware"
)
var registryMgr = registry.NewDefaultManager()
// BlobGetMiddleware handle get blob request
func BlobGetMiddleware() func(http.Handler) http.Handler {
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
if err := handleBlob(w, r, next); err != nil {
httpLib.SendError(w, err)
}
})
}
func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error {
ctx := r.Context()
art, p, proxyCtl, err := preCheck(ctx)
if err != nil {
return err
}
if !canProxy(p) || proxyCtl.UseLocalBlob(ctx, art) {
next.ServeHTTP(w, r)
return nil
}
size, reader, err := proxyCtl.ProxyBlob(ctx, p, art)
if err != nil {
return err
}
defer reader.Close()
// Use io.CopyN to avoid out of memory when pulling big blob
written, err := io.CopyN(w, reader, size)
if err != nil {
return err
}
if written != size {
return errors.Errorf("The size mismatch, actual:%d, expected: %d", written, size)
}
setHeaders(w, size, "", art.Digest)
return nil
}
func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *models.Project, ctl proxy.Controller, err error) {
art = lib.GetArtifactInfo(ctx)
ctl = proxy.ControllerInstance()
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false))
return
}
// ManifestGetMiddleware middleware handle request for get manifest
func ManifestGetMiddleware() func(http.Handler) http.Handler {
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
if err := handleManifest(w, r, next); err != nil {
httpLib.SendError(w, err)
}
})
}
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
ctx := r.Context()
art, p, proxyCtl, err := preCheck(ctx)
if err != nil {
return err
}
if !canProxy(p) || proxyCtl.UseLocalManifest(ctx, art) {
next.ServeHTTP(w, r)
return nil
}
log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest)
man, err := proxyCtl.ProxyManifest(ctx, p, art)
if err != nil {
return err
}
ct, payload, err := man.Payload()
if err != nil {
return err
}
setHeaders(w, int64(len(payload)), ct, art.Digest)
if _, err = w.Write(payload); err != nil {
return err
}
return nil
}
func canProxy(p *models.Project) bool {
if p.RegistryID < 1 {
return false
}
reg, err := registryMgr.Get(p.RegistryID)
if err != nil {
log.Errorf("failed to get registry, error:%v", err)
return false
}
if reg.Status != model.Healthy {
log.Errorf("current registry is unhealthy, regID:%v, Name:%v, Status: %v", reg.ID, reg.Name, reg.Status)
}
return reg.Status == model.Healthy
}
func setHeaders(w http.ResponseWriter, size int64, mediaType string, dig string) {
h := w.Header()
h.Set("Content-Length", fmt.Sprintf("%v", size))
if len(mediaType) > 0 {
h.Set("Content-Type", mediaType)
}
h.Set("Docker-Content-Digest", dig)
h.Set("Etag", dig)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/goharbor/harbor/src/server/middleware/contenttrust"
"github.com/goharbor/harbor/src/server/middleware/immutable"
"github.com/goharbor/harbor/src/server/middleware/quota"
"github.com/goharbor/harbor/src/server/middleware/repoproxy"
"github.com/goharbor/harbor/src/server/middleware/v2auth"
"github.com/goharbor/harbor/src/server/middleware/vulnerable"
"github.com/goharbor/harbor/src/server/router"
@ -47,6 +48,7 @@ func RegisterRoutes() {
root.NewRoute().
Method(http.MethodGet).
Path("/*/manifests/:reference").
Middleware(repoproxy.ManifestGetMiddleware()).
Middleware(contenttrust.Middleware()).
Middleware(vulnerable.Middleware()).
HandlerFunc(getManifest)
@ -66,6 +68,12 @@ func RegisterRoutes() {
Middleware(quota.PutManifestMiddleware()).
Middleware(blob.PutManifestMiddleware()).
HandlerFunc(putManifest)
// blob get
root.NewRoute().
Method(http.MethodGet).
Path("/*/blobs/:digest").
Middleware(repoproxy.BlobGetMiddleware()).
Handler(proxy)
// initiate blob upload
root.NewRoute().
Method(http.MethodPost).