From 3abe77d6cb609064f28616f316540ca356169b53 Mon Sep 17 00:00:00 2001 From: stonezdj Date: Sat, 4 Jul 2020 07:44:07 +0800 Subject: [PATCH] Add proxy cache feature Update route to add proxy related middleware Add proxy controller Signed-off-by: stonezdj --- src/controller/proxy/controller.go | 215 +++++++++++++++++++++++ src/controller/proxy/controller_test.go | 174 ++++++++++++++++++ src/controller/proxy/inflight.go | 46 +++++ src/controller/proxy/inflight_test.go | 30 ++++ src/controller/proxy/local.go | 191 ++++++++++++++++++++ src/controller/proxy/local_test.go | 200 +++++++++++++++++++++ src/controller/proxy/remote.go | 81 +++++++++ src/server/middleware/repoproxy/proxy.go | 138 +++++++++++++++ src/server/registry/route.go | 8 + 9 files changed, 1083 insertions(+) create mode 100644 src/controller/proxy/controller.go create mode 100644 src/controller/proxy/controller_test.go create mode 100644 src/controller/proxy/inflight.go create mode 100644 src/controller/proxy/inflight_test.go create mode 100644 src/controller/proxy/local.go create mode 100644 src/controller/proxy/local_test.go create mode 100644 src/controller/proxy/remote.go create mode 100644 src/server/middleware/repoproxy/proxy.go diff --git a/src/controller/proxy/controller.go b/src/controller/proxy/controller.go new file mode 100644 index 000000000..a2a3a5752 --- /dev/null +++ b/src/controller/proxy/controller.go @@ -0,0 +1,215 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "github.com/opencontainers/go-digest" + "io" + "strings" + "sync" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/controller/artifact" + "github.com/goharbor/harbor/src/controller/blob" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/replication/registry" + v1 "github.com/opencontainers/image-spec/specs-go/v1" +) + +const ( + // wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready + maxManifestListWait = 20 + maxManifestWait = 10 + sleepIntervalSec = 20 +) + +var ( + // Ctl is a global proxy controller instance + ctl Controller + once sync.Once +) + +// Controller defines the operations related with pull through proxy +type Controller interface { + // UseLocalBlob check if the blob should use local copy + UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool + // UseLocalManifest check manifest should use local copy + UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool + // ProxyBlob proxy the blob request to the remote server, p is the proxy project + // art is the ArtifactInfo which includes the digest of the blob + ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) + // ProxyManifest proxy the manifest request to the remote server, p is the proxy project, + // art is the ArtifactInfo which includes the tag or digest of the manifest + ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) +} +type controller struct { + blobCtl blob.Controller + registryMgr registry.Manager + artifactCtl artifact.Controller + local localInterface +} + +// ControllerInstance -- Get the proxy controller instance +func ControllerInstance() Controller { + // Lazy load the controller + // Because LocalHelper is not ready unless core startup completely + once.Do(func() { + ctl = &controller{ + blobCtl: blob.Ctl, + registryMgr: registry.NewDefaultManager(), + artifactCtl: artifact.Ctl, + local: newLocalHelper(), + } + }) + + return ctl +} + +func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool { + if len(art.Digest) == 0 { + return false + } + exist, err := c.local.BlobExist(ctx, art) + if err != nil { + return false + } + return exist +} + +func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool { + if len(art.Digest) == 0 { + return false + } + return c.local.ManifestExist(ctx, art) +} + +func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) { + var man distribution.Manifest + remoteRepo := getRemoteRepo(art) + r, err := newRemoteHelper(p.RegistryID) + if err != nil { + return man, err + } + ref := getReference(art) + man, err = r.Manifest(remoteRepo, ref) + if err != nil { + if errors.IsNotFoundErr(err) { + go func() { + c.local.DeleteManifest(remoteRepo, art.Tag) + }() + } + return man, err + } + ct, _, err := man.Payload() + if err != nil { + return man, err + } + // Push manifest in background + go func() { + c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r) + }() + + return man, nil +} + +func (c *controller) ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) { + remoteRepo := getRemoteRepo(art) + log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo) + rHelper, err := newRemoteHelper(p.RegistryID) + if err != nil { + return 0, nil, err + } + + size, bReader, err := rHelper.BlobReader(remoteRepo, art.Digest) + if err != nil { + log.Errorf("failed to pull blob, error %v", err) + return 0, nil, err + } + desc := distribution.Descriptor{Size: size, Digest: digest.Digest(art.Digest)} + go func() { + err := c.putBlobToLocal(remoteRepo, art.Repository, desc, rHelper) + if err != nil { + log.Errorf("error while putting blob to local repo, %v", err) + } + }() + return size, bReader, nil +} + +func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r remoteInterface) error { + log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest) + _, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest)) + if err != nil { + log.Errorf("failed to create blob reader, error %v", err) + return err + } + defer bReader.Close() + err = c.local.PushBlob(localRepo, desc, bReader) + return err +} + +func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r remoteInterface) { + if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex { + err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man) + if err != nil { + log.Errorf("error when push manifest list to local :%v", err) + } + return + } + var waitBlobs []distribution.Descriptor + for n := 0; n < maxManifestWait; n++ { + time.Sleep(sleepIntervalSec * time.Second) + waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man) + if len(waitBlobs) == 0 { + break + } + log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag) + } + if len(waitBlobs) > 0 { + // docker client will skip to pull layers exist in local + // these blobs is not exist in the proxy server + // it will cause the manifest dependency check always fail + // need to push these blobs before push manifest to avoid failure + log.Debug("Waiting blobs not empty, push it to local repo directly") + for _, desc := range waitBlobs { + err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r) + if err != nil { + log.Errorf("Failed to push blob to local repo, error: %v", err) + return + } + } + } + err := c.local.PushManifest(art.Repository, getReference(art), man) + if err != nil { + log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err) + } +} + +// getRemoteRepo get the remote repository name, used in proxy cache +func getRemoteRepo(art lib.ArtifactInfo) string { + return strings.TrimPrefix(art.Repository, art.ProjectName+"/") +} + +func getReference(art lib.ArtifactInfo) string { + if len(art.Tag) > 0 { + return art.Tag + } + return art.Digest +} diff --git a/src/controller/proxy/controller_test.go b/src/controller/proxy/controller_test.go new file mode 100644 index 000000000..8f66d9c0c --- /dev/null +++ b/src/controller/proxy/controller_test.go @@ -0,0 +1,174 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "github.com/docker/distribution" + "github.com/goharbor/harbor/src/controller/artifact" + "github.com/goharbor/harbor/src/controller/blob" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/replication/registry" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "io" + "testing" +) + +type localInterfaceMock struct { + mock.Mock +} + +func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) { + args := l.Called(ctx, art) + return args.Bool(0), args.Error(1) +} + +func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool { + args := l.Called(ctx, art) + return args.Bool(0) +} + +func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error { + panic("implement me") +} + +func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error { + panic("implement me") +} + +func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error { + panic("implement me") +} + +func (l *localInterfaceMock) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor { + panic("implement me") +} + +func (l *localInterfaceMock) DeleteManifest(repo, ref string) { + panic("implement me") +} + +type proxyControllerTestSuite struct { + suite.Suite + local *localInterfaceMock + ctr Controller +} + +func (p *proxyControllerTestSuite) SetupTest() { + p.local = &localInterfaceMock{} + p.ctr = &controller{ + blobCtl: blob.Ctl, + registryMgr: registry.NewDefaultManager(), + artifactCtl: artifact.Ctl, + local: p.local, + } +} + +func (p *proxyControllerTestSuite) TestUseLocalManifest_True() { + ctx := context.Background() + dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} + p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) + result := p.ctr.UseLocalManifest(ctx, art) + p.Assert().True(result) +} + +func (p *proxyControllerTestSuite) TestUseLocalManifest_False() { + ctx := context.Background() + dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} + p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil) + result := p.ctr.UseLocalManifest(ctx, art) + p.Assert().False(result) +} + +func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() { + ctx := context.Background() + art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"} + p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) + result := p.ctr.UseLocalManifest(ctx, art) + p.Assert().False(result) +} + +func (p *proxyControllerTestSuite) TestUseLocalBlob_True() { + ctx := context.Background() + dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} + p.local.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil) + result := p.ctr.UseLocalBlob(ctx, art) + p.Assert().True(result) +} + +func (p *proxyControllerTestSuite) TestUseLocalBlob_False() { + ctx := context.Background() + dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} + p.local.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil) + result := p.ctr.UseLocalBlob(ctx, art) + p.Assert().False(result) +} + +func TestProxyControllerTestSuite(t *testing.T) { + suite.Run(t, &proxyControllerTestSuite{}) +} + +func TestProxyCacheRemoteRepo(t *testing.T) { + cases := []struct { + name string + in lib.ArtifactInfo + want string + }{ + { + name: `normal test`, + in: lib.ArtifactInfo{ProjectName: "dockerhub_proxy", Repository: "dockerhub_proxy/firstfloor/hello-world"}, + want: "firstfloor/hello-world", + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + got := getRemoteRepo(tt.in) + if got != tt.want { + t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want) + } + }) + } +} +func TestGetRef(t *testing.T) { + cases := []struct { + name string + in lib.ArtifactInfo + want string + }{ + { + name: `normal`, + in: lib.ArtifactInfo{Repository: "hello-world", Tag: "latest", Digest: "sha256:aabbcc"}, + want: "latest", + }, + { + name: `digest_only`, + in: lib.ArtifactInfo{Repository: "hello-world", Tag: "", Digest: "sha256:aabbcc"}, + want: "sha256:aabbcc", + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + got := getReference(tt.in) + if got != tt.want { + t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want) + } + }) + } +} diff --git a/src/controller/proxy/inflight.go b/src/controller/proxy/inflight.go new file mode 100644 index 000000000..f2a2b97d5 --- /dev/null +++ b/src/controller/proxy/inflight.go @@ -0,0 +1,46 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import "sync" + +type inflightRequest struct { + mu sync.Mutex + reqMap map[string]interface{} +} + +var inflightChecker = &inflightRequest{ + reqMap: make(map[string]interface{}), +} + +// addRequest if the artifact already exist in the inflightRequest, return false +// else return true +func (in *inflightRequest) addRequest(artifact string) (suc bool) { + in.mu.Lock() + defer in.mu.Unlock() + _, ok := in.reqMap[artifact] + if ok { + // Skip some following operation if it is in reqMap + return false + } + in.reqMap[artifact] = 1 + return true +} + +func (in *inflightRequest) removeRequest(artifact string) { + in.mu.Lock() + defer in.mu.Unlock() + delete(in.reqMap, artifact) +} diff --git a/src/controller/proxy/inflight_test.go b/src/controller/proxy/inflight_test.go new file mode 100644 index 000000000..44efa1a4b --- /dev/null +++ b/src/controller/proxy/inflight_test.go @@ -0,0 +1,30 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestInflightRequest(t *testing.T) { + artName := "hello-world:latest" + inflightChecker.addRequest(artName) + _, ok := inflightChecker.reqMap[artName] + assert.True(t, ok) + inflightChecker.removeRequest(artName) + _, exist := inflightChecker.reqMap[artName] + assert.False(t, exist) +} diff --git a/src/controller/proxy/local.go b/src/controller/proxy/local.go new file mode 100644 index 000000000..4ca570860 --- /dev/null +++ b/src/controller/proxy/local.go @@ -0,0 +1,191 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + "errors" + "fmt" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + comHttpAuth "github.com/goharbor/harbor/src/common/http/modifier/auth" + "github.com/goharbor/harbor/src/controller/artifact" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/pkg/registry" + "io" + "time" +) + +// localInterface defines operations related to local repo under proxy mode +type localInterface interface { + // BlobExist check if the blob exist in local repo + BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) + // Manifest check if the manifest exist in local repo + ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool + // PushBlob push blob to local repo + PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error + // PushManifest push manifest to local repo, ref can be digest or tag + PushManifest(repo string, ref string, manifest distribution.Manifest) error + // PushManifestList push manifest list to local repo + PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error + // CheckDependencies check if the manifest's dependency is ready + CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor + // DeleteManifest cleanup delete tag from local cache + DeleteManifest(repo, ref string) +} + +func (l *localHelper) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool { + a, err := l.artifactCtl.GetByReference(ctx, art.Repository, art.Digest, nil) + if err != nil { + log.Errorf("check manifest exist failed, error %v", err) + return false + } + return a != nil +} + +// localHelper defines operations related to local repo under proxy mode +type localHelper struct { + registry registry.Client + artifactCtl artifactController +} + +type artifactController interface { + GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (artifact *artifact.Artifact, err error) +} + +// newLocalHelper create the localInterface +func newLocalHelper() localInterface { + l := &localHelper{artifactCtl: artifact.Ctl} + l.init() + return l +} + +func (l *localHelper) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) { + return l.registry.BlobExist(art.Repository, art.Digest) +} + +func (l *localHelper) init() { + if l.registry != nil { + return + } + log.Debugf("core url:%s, local core url: %v", config.GetCoreURL(), config.LocalCoreURL()) + // the traffic is internal only + registryURL := config.LocalCoreURL() + authorizer := comHttpAuth.NewSecretAuthorizer(config.ProxyServiceSecret) + l.registry = registry.NewClientWithAuthorizer(registryURL, authorizer, true) +} + +func (l *localHelper) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error { + log.Debugf("Put blob to local registry, localRepo:%v, digest: %v", localRepo, desc.Digest) + ref := string(desc.Digest) + artName := localRepo + ":" + ref + // use inflight checker to avoid multiple requests to push blob to local in same time + if !inflightChecker.addRequest(artName) { + return nil + } + defer inflightChecker.removeRequest(artName) + err := l.registry.PushBlob(localRepo, ref, desc.Size, bReader) + return err +} + +func (l *localHelper) PushManifest(repo string, ref string, manifest distribution.Manifest) error { + // Make sure there is only one go routing to push current artName to local repo + artName := repo + ":" + ref + // use inflight checker to avoid multiple requests to push manifest to local in same time + if !inflightChecker.addRequest(artName) { + return nil + } + defer inflightChecker.removeRequest(artName) + + mediaType, payload, err := manifest.Payload() + if err != nil { + return err + } + _, err = l.registry.PushManifest(repo, ref, mediaType, payload) + return err +} + +// DeleteManifest cleanup delete tag from local repo +func (l *localHelper) DeleteManifest(repo, ref string) { + log.Debug("Remove tag from repo if it is exist") + if err := l.registry.DeleteManifest(repo, ref); err != nil { + // sometimes user pull a non-exist image + log.Warningf("failed to remove artifact, error %v", err) + } +} + +// updateManifestList -- Trim the manifest list, make sure at least one depend manifests is ready +func (l *localHelper) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) { + switch v := manifest.(type) { + case *manifestlist.DeserializedManifestList: + existMans := make([]manifestlist.ManifestDescriptor, 0) + for _, m := range v.Manifests { + art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)} + if l.ManifestExist(ctx, art) { + existMans = append(existMans, m) + } + } + return manifestlist.FromDescriptors(existMans) + } + return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest) +} + +func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error { + // For manifest list, it might include some different manifest + // it will wait and check for 30 mins, if all depend manifests exist then push it + // if time exceed, only push the new updated manifest list which contains existing manifest + var newMan distribution.Manifest + var err error + for n := 0; n < maxManifestListWait; n++ { + log.Debugf("waiting for the manifest ready, repo %v, ref:%v", repo, ref) + time.Sleep(sleepIntervalSec * time.Second) + newMan, err = l.updateManifestList(ctx, repo, man) + if err != nil { + return err + } + if len(newMan.References()) == len(man.References()) { + break + } + } + + if len(newMan.References()) == 0 { + return errors.New("manifest list doesn't contain any pushed manifest") + } + _, pl, err := newMan.Payload() + if err != nil { + log.Errorf("failed to get payload, error %v", err) + return err + } + log.Debugf("The manifest list payload: %v", string(pl)) + return l.PushManifest(repo, ref, newMan) +} + +func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor { + descriptors := man.References() + waitDesc := make([]distribution.Descriptor, 0) + for _, desc := range descriptors { + log.Debugf("checking the blob dependency: %v", desc.Digest) + art := lib.ArtifactInfo{Repository: repo, Digest: string(desc.Digest)} + exist, err := l.BlobExist(ctx, art) + if err != nil || !exist { + log.Debugf("Check dependency failed!") + waitDesc = append(waitDesc, desc) + } + } + log.Debugf("Check dependency result %v", waitDesc) + return waitDesc +} diff --git a/src/controller/proxy/local_test.go b/src/controller/proxy/local_test.go new file mode 100644 index 000000000..e3a1ceeba --- /dev/null +++ b/src/controller/proxy/local_test.go @@ -0,0 +1,200 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "context" + distribution2 "github.com/docker/distribution" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema2" + "github.com/goharbor/harbor/src/controller/artifact" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/pkg/distribution" + "github.com/opencontainers/go-digest" + "github.com/stretchr/testify/mock" + "testing" + + testregistry "github.com/goharbor/harbor/src/testing/pkg/registry" + "github.com/stretchr/testify/suite" +) + +type mockManifest struct { + mock.Mock +} + +func (m *mockManifest) References() []distribution2.Descriptor { + args := m.Called() + desc := make([]distribution2.Descriptor, 0) + if args[0] != nil { + desc = args[0].([]distribution2.Descriptor) + } + return desc +} + +func (m *mockManifest) Payload() (mediaType string, payload []byte, err error) { + args := m.Called() + p := make([]byte, 0) + if args[1] != nil { + p = args[1].([]byte) + } + return args.String(0), p, args.Error(2) +} + +type artifactControllerMock struct { + mock.Mock +} + +func (a *artifactControllerMock) GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (arti *artifact.Artifact, err error) { + args := a.Called(ctx, repository, reference, option) + art := &artifact.Artifact{} + if args[0] != nil { + art = args[0].(*artifact.Artifact) + } + return art, args.Error(1) +} + +type localHelperTestSuite struct { + suite.Suite + registryClient *testregistry.FakeClient + local *localHelper + artCtl *artifactControllerMock +} + +func (lh *localHelperTestSuite) SetupTest() { + lh.registryClient = &testregistry.FakeClient{} + lh.artCtl = &artifactControllerMock{} + lh.local = &localHelper{registry: lh.registryClient, artifactCtl: lh.artCtl} + +} + +func (lh *localHelperTestSuite) TestBlobExist_False() { + repo := "library/hello-world" + dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" + art := lib.ArtifactInfo{Repository: repo, Digest: dig} + ctx := context.Background() + lh.registryClient.On("BlobExist").Return(false, nil) + exist, err := lh.local.BlobExist(ctx, art) + lh.Require().Nil(err) + lh.Assert().Equal(false, exist) +} +func (lh *localHelperTestSuite) TestBlobExist_True() { + repo := "library/hello-world" + dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" + art := lib.ArtifactInfo{Repository: repo, Digest: dig} + ctx := context.Background() + lh.registryClient.On("BlobExist").Return(true, nil) + exist, err := lh.local.BlobExist(ctx, art) + lh.Require().Nil(err) + lh.Assert().Equal(true, exist) +} + +func (lh *localHelperTestSuite) TestPushManifest() { + dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" + lh.registryClient.On("PushManifest").Return(dig, nil) + manifest := &mockManifest{} + var ct string + manifest.Mock.On("Payload").Return(ct, []byte("example"), nil) + ct = schema2.MediaTypeManifest + err := lh.local.PushManifest("library/hello-world", "", manifest) + lh.Require().Nil(err) +} + +func (lh *localHelperTestSuite) TestUpdateManifestList() { + ctx := context.Background() + amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a" + manifestList := manifestlist.ManifestList{ + Versioned: manifest.Versioned{ + SchemaVersion: 2, + MediaType: manifestlist.MediaTypeManifestList, + }, + Manifests: []manifestlist.ManifestDescriptor{ + { + Descriptor: distribution.Descriptor{ + Digest: digest.Digest(amdDig), + Size: 3253, + MediaType: schema2.MediaTypeManifest, + }, + Platform: manifestlist.PlatformSpec{ + Architecture: "amd64", + OS: "linux", + }, + }, { + Descriptor: distribution.Descriptor{ + Digest: digest.Digest(armDig), + Size: 3253, + MediaType: schema2.MediaTypeManifest, + }, + Platform: manifestlist.PlatformSpec{ + Architecture: "arm", + OS: "linux", + }, + }, + }, + } + manList := &manifestlist.DeserializedManifestList{ + ManifestList: manifestList, + } + ar := &artifact.Artifact{} + var emptyArtifact *artifact.Artifact + var opt *artifact.Option + lh.artCtl.On("GetByReference", ctx, "library/hello-world", amdDig, opt).Return(ar, nil) + lh.artCtl.On("GetByReference", ctx, "library/hello-world", armDig, opt).Return(emptyArtifact, nil) + newMan, err := lh.local.updateManifestList(ctx, "library/hello-world", manList) + lh.Require().Nil(err) + lh.Assert().Equal(len(newMan.References()), 1) +} + +func (lh *localHelperTestSuite) TestCheckDependencies_Fail() { + ctx := context.Background() + manifest := &mockManifest{} + refs := []distribution2.Descriptor{ + {Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"}, + {Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"}, + } + manifest.On("References").Return(refs) + lh.registryClient.On("BlobExist").Return(false, nil) + ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest) + lh.Assert().Equal(len(ret), 2) +} + +func (lh *localHelperTestSuite) TestCheckDependencies_Suc() { + ctx := context.Background() + manifest := &mockManifest{} + refs := []distribution2.Descriptor{ + {Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"}, + {Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"}, + } + manifest.On("References").Return(refs) + lh.registryClient.On("BlobExist").Return(true, nil) + ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest) + lh.Assert().Equal(len(ret), 0) +} + +func (lh *localHelperTestSuite) TestManifestExist() { + ctx := context.Background() + dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" + ar := &artifact.Artifact{} + var opt *artifact.Option + lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil) + art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} + exist := lh.local.ManifestExist(ctx, art) + lh.Assert().True(exist) +} + +func TestLocalHelperTestSuite(t *testing.T) { + suite.Run(t, &localHelperTestSuite{}) +} diff --git a/src/controller/proxy/remote.go b/src/controller/proxy/remote.go new file mode 100644 index 000000000..30056658e --- /dev/null +++ b/src/controller/proxy/remote.go @@ -0,0 +1,81 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "fmt" + "github.com/docker/distribution" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/replication/adapter" + "github.com/goharbor/harbor/src/replication/registry" + "io" +) + +// remoteInterface defines operations related to remote repository under proxy +type remoteInterface interface { + // BlobReader create a reader for remote blob + BlobReader(repo, dig string) (int64, io.ReadCloser, error) + // Manifest get manifest by reference + Manifest(repo string, ref string) (distribution.Manifest, error) +} + +// remoteHelper defines operations related to remote repository under proxy +type remoteHelper struct { + regID int64 + registry adapter.ArtifactRegistry +} + +// newRemoteHelper create a remoteHelper interface +func newRemoteHelper(regID int64) (remoteInterface, error) { + r := &remoteHelper{regID: regID} + if err := r.init(); err != nil { + log.Errorf("failed to create remoteHelper error %v", err) + return nil, err + } + return r, nil +} + +func (r *remoteHelper) init() error { + + if r.registry != nil { + return nil + } + reg, err := registry.NewDefaultManager().Get(r.regID) + if err != nil { + return err + } + if reg == nil { + return fmt.Errorf("failed to get registry, registryID: %v", r.regID) + } + factory, err := adapter.GetFactory(reg.Type) + if err != nil { + return err + } + adp, err := factory.Create(reg) + if err != nil { + return err + } + r.registry = adp.(adapter.ArtifactRegistry) + return nil +} + +func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) { + return r.registry.PullBlob(repo, dig) +} + +func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, error) { + man, _, err := r.registry.PullManifest(repo, ref) + return man, err +} diff --git a/src/server/middleware/repoproxy/proxy.go b/src/server/middleware/repoproxy/proxy.go new file mode 100644 index 000000000..8d23e9563 --- /dev/null +++ b/src/server/middleware/repoproxy/proxy.go @@ -0,0 +1,138 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package repoproxy + +import ( + "context" + "fmt" + "github.com/goharbor/harbor/src/lib/errors" + httpLib "github.com/goharbor/harbor/src/lib/http" + "github.com/goharbor/harbor/src/replication/model" + "github.com/goharbor/harbor/src/replication/registry" + "io" + "net/http" + + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/controller/project" + "github.com/goharbor/harbor/src/controller/proxy" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/server/middleware" +) + +var registryMgr = registry.NewDefaultManager() + +// BlobGetMiddleware handle get blob request +func BlobGetMiddleware() func(http.Handler) http.Handler { + return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) { + if err := handleBlob(w, r, next); err != nil { + httpLib.SendError(w, err) + } + }) +} + +func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error { + ctx := r.Context() + art, p, proxyCtl, err := preCheck(ctx) + if err != nil { + return err + } + if !canProxy(p) || proxyCtl.UseLocalBlob(ctx, art) { + next.ServeHTTP(w, r) + return nil + } + size, reader, err := proxyCtl.ProxyBlob(ctx, p, art) + if err != nil { + return err + } + defer reader.Close() + // Use io.CopyN to avoid out of memory when pulling big blob + written, err := io.CopyN(w, reader, size) + if err != nil { + return err + } + if written != size { + return errors.Errorf("The size mismatch, actual:%d, expected: %d", written, size) + } + setHeaders(w, size, "", art.Digest) + return nil +} + +func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *models.Project, ctl proxy.Controller, err error) { + art = lib.GetArtifactInfo(ctx) + ctl = proxy.ControllerInstance() + p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false)) + return +} + +// ManifestGetMiddleware middleware handle request for get manifest +func ManifestGetMiddleware() func(http.Handler) http.Handler { + return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) { + if err := handleManifest(w, r, next); err != nil { + httpLib.SendError(w, err) + } + }) +} + +func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error { + ctx := r.Context() + art, p, proxyCtl, err := preCheck(ctx) + if err != nil { + return err + } + if !canProxy(p) || proxyCtl.UseLocalManifest(ctx, art) { + next.ServeHTTP(w, r) + return nil + } + log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest) + man, err := proxyCtl.ProxyManifest(ctx, p, art) + if err != nil { + return err + } + ct, payload, err := man.Payload() + if err != nil { + return err + } + setHeaders(w, int64(len(payload)), ct, art.Digest) + if _, err = w.Write(payload); err != nil { + return err + } + return nil +} + +func canProxy(p *models.Project) bool { + if p.RegistryID < 1 { + return false + } + reg, err := registryMgr.Get(p.RegistryID) + if err != nil { + log.Errorf("failed to get registry, error:%v", err) + return false + } + if reg.Status != model.Healthy { + log.Errorf("current registry is unhealthy, regID:%v, Name:%v, Status: %v", reg.ID, reg.Name, reg.Status) + } + return reg.Status == model.Healthy +} + +func setHeaders(w http.ResponseWriter, size int64, mediaType string, dig string) { + h := w.Header() + h.Set("Content-Length", fmt.Sprintf("%v", size)) + if len(mediaType) > 0 { + h.Set("Content-Type", mediaType) + } + h.Set("Docker-Content-Digest", dig) + h.Set("Etag", dig) +} diff --git a/src/server/registry/route.go b/src/server/registry/route.go index 3b40a4048..bc3b48eae 100644 --- a/src/server/registry/route.go +++ b/src/server/registry/route.go @@ -22,6 +22,7 @@ import ( "github.com/goharbor/harbor/src/server/middleware/contenttrust" "github.com/goharbor/harbor/src/server/middleware/immutable" "github.com/goharbor/harbor/src/server/middleware/quota" + "github.com/goharbor/harbor/src/server/middleware/repoproxy" "github.com/goharbor/harbor/src/server/middleware/v2auth" "github.com/goharbor/harbor/src/server/middleware/vulnerable" "github.com/goharbor/harbor/src/server/router" @@ -47,6 +48,7 @@ func RegisterRoutes() { root.NewRoute(). Method(http.MethodGet). Path("/*/manifests/:reference"). + Middleware(repoproxy.ManifestGetMiddleware()). Middleware(contenttrust.Middleware()). Middleware(vulnerable.Middleware()). HandlerFunc(getManifest) @@ -66,6 +68,12 @@ func RegisterRoutes() { Middleware(quota.PutManifestMiddleware()). Middleware(blob.PutManifestMiddleware()). HandlerFunc(putManifest) + // blob get + root.NewRoute(). + Method(http.MethodGet). + Path("/*/blobs/:digest"). + Middleware(repoproxy.BlobGetMiddleware()). + Handler(proxy) // initiate blob upload root.NewRoute(). Method(http.MethodPost).