mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-14 22:35:36 +01:00
Merge pull request #13805 from stonezdj/201218_add_content_type_length
Add content type and length in header
This commit is contained in:
commit
b748852ee8
@ -17,6 +17,7 @@ package proxy
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/controller/tag"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -34,7 +35,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/opencontainers/go-digest"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -43,7 +43,7 @@ const (
|
||||
maxManifestWait = 10
|
||||
sleepIntervalSec = 20
|
||||
// keep manifest list in cache for one week
|
||||
manifestListCacheIntervalSec = 7 * 24 * 60 * 60
|
||||
manifestListCacheInterval = 7 * 24 * 60 * 60 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@ -65,13 +65,16 @@ type Controller interface {
|
||||
// art is the ArtifactInfo which includes the tag or digest of the manifest
|
||||
ProxyManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (distribution.Manifest, error)
|
||||
// HeadManifest send manifest head request to the remote server
|
||||
HeadManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, string, error)
|
||||
HeadManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *distribution.Descriptor, error)
|
||||
// EnsureTag ensure tag for digest
|
||||
EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error
|
||||
}
|
||||
type controller struct {
|
||||
blobCtl blob.Controller
|
||||
artifactCtl artifact.Controller
|
||||
local localInterface
|
||||
cache cache.Cache
|
||||
blobCtl blob.Controller
|
||||
artifactCtl artifact.Controller
|
||||
local localInterface
|
||||
cache cache.Cache
|
||||
handlerRegistry map[string]ManifestCacheHandler
|
||||
}
|
||||
|
||||
// ControllerInstance -- Get the proxy controller instance
|
||||
@ -79,17 +82,43 @@ func ControllerInstance() Controller {
|
||||
// Lazy load the controller
|
||||
// Because LocalHelper is not ready unless core startup completely
|
||||
once.Do(func() {
|
||||
l := newLocalHelper()
|
||||
ctl = &controller{
|
||||
blobCtl: blob.Ctl,
|
||||
artifactCtl: artifact.Ctl,
|
||||
local: newLocalHelper(),
|
||||
cache: cache.Default(),
|
||||
blobCtl: blob.Ctl,
|
||||
artifactCtl: artifact.Ctl,
|
||||
local: newLocalHelper(),
|
||||
cache: cache.Default(),
|
||||
handlerRegistry: NewCacheHandlerRegistry(l),
|
||||
}
|
||||
})
|
||||
|
||||
return ctl
|
||||
}
|
||||
|
||||
func (c *controller) EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error {
|
||||
// search the digest in cache and query with trimmed digest
|
||||
var trimmedDigest string
|
||||
err := c.cache.Fetch(TrimmedManifestlist+art.Digest, &trimmedDigest)
|
||||
if err == cache.ErrNotFound {
|
||||
// skip to update digest, continue
|
||||
} else if err != nil {
|
||||
// for other error, return
|
||||
return err
|
||||
} else {
|
||||
// found in redis, update the digest
|
||||
art.Digest = trimmedDigest
|
||||
log.Debugf("Found trimmed digest: %v", trimmedDigest)
|
||||
}
|
||||
a, err := c.local.GetManifest(ctx, art)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if a == nil {
|
||||
return fmt.Errorf("the artifact is not ready yet, failed to tag it to %v", tagName)
|
||||
}
|
||||
return tag.Ctl.Ensure(ctx, a.RepositoryID, a.Artifact.ID, tagName)
|
||||
}
|
||||
|
||||
func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool {
|
||||
if len(art.Digest) == 0 {
|
||||
return false
|
||||
@ -119,11 +148,11 @@ func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo,
|
||||
}
|
||||
|
||||
remoteRepo := getRemoteRepo(art)
|
||||
exist, dig, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD
|
||||
exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
if !exist {
|
||||
if !exist || desc == nil {
|
||||
go func() {
|
||||
c.local.DeleteManifest(remoteRepo, art.Tag)
|
||||
}()
|
||||
@ -132,18 +161,18 @@ func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo,
|
||||
|
||||
var content []byte
|
||||
if c.cache != nil {
|
||||
err = c.cache.Fetch(getManifestListKey(art.Repository, dig), &content)
|
||||
err = c.cache.Fetch(getManifestListKey(art.Repository, string(desc.Digest)), &content)
|
||||
if err == nil {
|
||||
log.Debugf("Get the manifest list with key=cache:%v", getManifestListKey(art.Repository, dig))
|
||||
return true, &ManifestList{content, dig, manifestlist.MediaTypeManifestList}, nil
|
||||
log.Debugf("Get the manifest list with key=cache:%v", getManifestListKey(art.Repository, string(desc.Digest)))
|
||||
return true, &ManifestList{content, string(desc.Digest), manifestlist.MediaTypeManifestList}, nil
|
||||
}
|
||||
if err == cache.ErrNotFound {
|
||||
log.Debugf("Digest is not found in manifest list cache, key=cache:%v", getManifestListKey(art.Repository, dig))
|
||||
log.Debugf("Digest is not found in manifest list cache, key=cache:%v", getManifestListKey(art.Repository, string(desc.Digest)))
|
||||
} else {
|
||||
log.Errorf("Failed to get manifest list from cache, error: %v", err)
|
||||
}
|
||||
}
|
||||
return a != nil && dig == a.Digest, nil, nil // digest matches
|
||||
return a != nil && string(desc.Digest) == a.Digest, nil, nil // digest matches
|
||||
}
|
||||
|
||||
func getManifestListKey(repo, dig string) string {
|
||||
@ -198,7 +227,7 @@ func (c *controller) ProxyManifest(ctx context.Context, art lib.ArtifactInfo, re
|
||||
|
||||
return man, nil
|
||||
}
|
||||
func (c *controller) HeadManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, string, error) {
|
||||
func (c *controller) HeadManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *distribution.Descriptor, error) {
|
||||
remoteRepo := getRemoteRepo(art)
|
||||
ref := getReference(art)
|
||||
return remote.ManifestExist(remoteRepo, ref)
|
||||
@ -239,53 +268,14 @@ func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc di
|
||||
}
|
||||
|
||||
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r RemoteInterface) {
|
||||
if contType == manifestlist.MediaTypeManifestList {
|
||||
_, payload, err := man.Payload()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get payload, error %v", err)
|
||||
h, ok := c.handlerRegistry[contType]
|
||||
if !ok {
|
||||
h, ok = c.handlerRegistry[defaultHandler]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
key := getManifestListKey(art.Repository, art.Digest)
|
||||
log.Debugf("Cache manifest list with key=cache:%v", key)
|
||||
err = c.cache.Save(key, payload, manifestListCacheIntervalSec)
|
||||
if err != nil {
|
||||
log.Errorf("failed to cache payload, error %v", err)
|
||||
}
|
||||
}
|
||||
if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex {
|
||||
err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man)
|
||||
if err != nil {
|
||||
log.Errorf("error when push manifest list to local :%v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
var waitBlobs []distribution.Descriptor
|
||||
for n := 0; n < maxManifestWait; n++ {
|
||||
time.Sleep(sleepIntervalSec * time.Second)
|
||||
waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man)
|
||||
if len(waitBlobs) == 0 {
|
||||
break
|
||||
}
|
||||
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
|
||||
}
|
||||
if len(waitBlobs) > 0 {
|
||||
// docker client will skip to pull layers exist in local
|
||||
// these blobs are not exist in the proxy server
|
||||
// it will cause the manifest dependency check always fail
|
||||
// need to push these blobs before push manifest to avoid failure
|
||||
log.Debug("Waiting blobs not empty, push it to local repo directly")
|
||||
for _, desc := range waitBlobs {
|
||||
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to push blob to local repo, error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
err := c.local.PushManifest(art.Repository, getReference(art), man)
|
||||
if err != nil {
|
||||
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
|
||||
}
|
||||
h.CacheContent(ctx, remoteRepo, man, art, r)
|
||||
}
|
||||
|
||||
// getRemoteRepo get the remote repository name, used in proxy cache
|
||||
|
@ -23,29 +23,14 @@ import (
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
_ "github.com/goharbor/harbor/src/lib/cache"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
testproxy "github.com/goharbor/harbor/src/testing/controller/proxy"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"io"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type remoteInterfaceMock struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (r *remoteInterfaceMock) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (r *remoteInterfaceMock) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (r *remoteInterfaceMock) ManifestExist(repo, ref string) (bool, string, error) {
|
||||
args := r.Called(repo, ref)
|
||||
return args.Bool(0), args.String(1), args.Error(2)
|
||||
}
|
||||
|
||||
type localInterfaceMock struct {
|
||||
mock.Mock
|
||||
}
|
||||
@ -78,7 +63,8 @@ func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descri
|
||||
}
|
||||
|
||||
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error {
|
||||
panic("implement me")
|
||||
args := l.Called(repo, tag, manifest)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
||||
@ -95,14 +81,14 @@ func (l *localInterfaceMock) DeleteManifest(repo, ref string) {
|
||||
type proxyControllerTestSuite struct {
|
||||
suite.Suite
|
||||
local *localInterfaceMock
|
||||
remote *remoteInterfaceMock
|
||||
remote *testproxy.RemoteInterface
|
||||
ctr Controller
|
||||
proj *models.Project
|
||||
}
|
||||
|
||||
func (p *proxyControllerTestSuite) SetupTest() {
|
||||
p.local = &localInterfaceMock{}
|
||||
p.remote = &remoteInterfaceMock{}
|
||||
p.remote = &testproxy.RemoteInterface{}
|
||||
p.proj = &models.Project{RegistryID: 1}
|
||||
p.ctr = &controller{
|
||||
blobCtl: blob.Ctl,
|
||||
@ -125,8 +111,9 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_True() {
|
||||
func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
|
||||
ctx := context.Background()
|
||||
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||
desc := &distribution.Descriptor{Digest: digest.Digest(dig)}
|
||||
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(true, dig, nil)
|
||||
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(true, desc, nil)
|
||||
p.local.On("GetManifest", mock.Anything, mock.Anything).Return(nil, nil)
|
||||
result, _, err := p.ctr.UseLocalManifest(ctx, art, p.remote)
|
||||
p.Assert().Nil(err)
|
||||
@ -136,8 +123,9 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
|
||||
func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
|
||||
ctx := context.Background()
|
||||
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
|
||||
desc := &distribution.Descriptor{}
|
||||
p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
|
||||
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(false, "", nil)
|
||||
p.remote.On("ManifestExist", mock.Anything, mock.Anything).Return(false, desc, nil)
|
||||
result, _, err := p.ctr.UseLocalManifest(ctx, art, p.remote)
|
||||
p.Assert().True(errors.IsNotFoundErr(err))
|
||||
p.Assert().False(result)
|
||||
|
@ -16,24 +16,23 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest/manifestlist"
|
||||
"github.com/goharbor/harbor/src/controller/artifact"
|
||||
"github.com/goharbor/harbor/src/controller/event/metadata"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
"github.com/goharbor/harbor/src/lib/cache"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/pkg/notifier/event"
|
||||
"github.com/goharbor/harbor/src/pkg/proxy/secret"
|
||||
"github.com/goharbor/harbor/src/pkg/registry"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"io"
|
||||
)
|
||||
|
||||
// TrimmedManifestlist - key prefix for trimmed manifest
|
||||
const TrimmedManifestlist = "trimmedmanifestlist:"
|
||||
|
||||
// localInterface defines operations related to local repo under proxy mode
|
||||
type localInterface interface {
|
||||
// BlobExist check if the blob exist in local repo
|
||||
@ -44,8 +43,6 @@ type localInterface interface {
|
||||
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
|
||||
// PushManifest push manifest to local repo, ref can be digest or tag
|
||||
PushManifest(repo string, ref string, manifest distribution.Manifest) error
|
||||
// PushManifestList push manifest list to local repo
|
||||
PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error
|
||||
// CheckDependencies check if the manifest's dependency is ready
|
||||
CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor
|
||||
// DeleteManifest cleanup delete tag from local cache
|
||||
@ -68,6 +65,7 @@ func (l *localHelper) GetManifest(ctx context.Context, art lib.ArtifactInfo) (*a
|
||||
type localHelper struct {
|
||||
registry registry.Client
|
||||
artifactCtl artifactController
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
type artifactController interface {
|
||||
@ -93,6 +91,7 @@ func (l *localHelper) init() {
|
||||
// the traffic is internal only
|
||||
registryURL := config.LocalCoreURL()
|
||||
l.registry = registry.NewClientWithAuthorizer(registryURL, secret.NewAuthorizer(), true)
|
||||
l.cache = cache.Default()
|
||||
}
|
||||
|
||||
func (l *localHelper) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
|
||||
@ -134,65 +133,6 @@ func (l *localHelper) DeleteManifest(repo, ref string) {
|
||||
}
|
||||
}
|
||||
|
||||
// updateManifestList -- Trim the manifest list, make sure at least one depend manifests is ready
|
||||
func (l *localHelper) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
|
||||
switch v := manifest.(type) {
|
||||
case *manifestlist.DeserializedManifestList:
|
||||
existMans := make([]manifestlist.ManifestDescriptor, 0)
|
||||
for _, m := range v.Manifests {
|
||||
art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
|
||||
a, err := l.GetManifest(ctx, art)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if a != nil {
|
||||
existMans = append(existMans, m)
|
||||
}
|
||||
}
|
||||
return manifestlist.FromDescriptors(existMans)
|
||||
}
|
||||
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
|
||||
}
|
||||
|
||||
func (l *localHelper) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
||||
// For manifest list, it might include some different manifest
|
||||
// it will wait and check for 30 mins, if all depend manifests exist then push it
|
||||
// if time exceed, only push the new updated manifest list which contains existing manifest
|
||||
var newMan distribution.Manifest
|
||||
var err error
|
||||
for n := 0; n < maxManifestListWait; n++ {
|
||||
log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, tag)
|
||||
time.Sleep(sleepIntervalSec * time.Second)
|
||||
newMan, err = l.updateManifestList(ctx, repo, man)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(newMan.References()) == len(man.References()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(newMan.References()) == 0 {
|
||||
return errors.New("manifest list doesn't contain any pushed manifest")
|
||||
}
|
||||
_, pl, err := newMan.Payload()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get payload, error %v", err)
|
||||
return err
|
||||
}
|
||||
log.Debugf("The manifest list payload: %v", string(pl))
|
||||
dig := digest.FromBytes(pl)
|
||||
// Because the manifest list maybe updated, need to recheck if it is exist in local
|
||||
art := lib.ArtifactInfo{Repository: repo, Tag: tag}
|
||||
a, err := l.GetManifest(ctx, art)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if a != nil && a.Digest == string(dig) {
|
||||
return nil
|
||||
}
|
||||
return l.PushManifest(repo, tag, newMan)
|
||||
}
|
||||
|
||||
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
|
||||
descriptors := man.References()
|
||||
waitDesc := make([]distribution.Descriptor, 0)
|
||||
|
@ -17,13 +17,9 @@ package proxy
|
||||
import (
|
||||
"context"
|
||||
distribution2 "github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest"
|
||||
"github.com/docker/distribution/manifest/manifestlist"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
"github.com/goharbor/harbor/src/controller/artifact"
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
"github.com/goharbor/harbor/src/pkg/distribution"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"testing"
|
||||
|
||||
@ -112,52 +108,6 @@ func (lh *localHelperTestSuite) TestPushManifest() {
|
||||
lh.Require().Nil(err)
|
||||
}
|
||||
|
||||
func (lh *localHelperTestSuite) TestUpdateManifestList() {
|
||||
ctx := context.Background()
|
||||
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||
manifestList := manifestlist.ManifestList{
|
||||
Versioned: manifest.Versioned{
|
||||
SchemaVersion: 2,
|
||||
MediaType: manifestlist.MediaTypeManifestList,
|
||||
},
|
||||
Manifests: []manifestlist.ManifestDescriptor{
|
||||
{
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(amdDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "amd64",
|
||||
OS: "linux",
|
||||
},
|
||||
}, {
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(armDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "arm",
|
||||
OS: "linux",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
manList := &manifestlist.DeserializedManifestList{
|
||||
ManifestList: manifestList,
|
||||
}
|
||||
ar := &artifact.Artifact{}
|
||||
var emptyArtifact *artifact.Artifact
|
||||
var opt *artifact.Option
|
||||
lh.artCtl.On("GetByReference", ctx, "library/hello-world", amdDig, opt).Return(ar, nil)
|
||||
lh.artCtl.On("GetByReference", ctx, "library/hello-world", armDig, opt).Return(emptyArtifact, nil)
|
||||
newMan, err := lh.local.updateManifestList(ctx, "library/hello-world", manList)
|
||||
lh.Require().Nil(err)
|
||||
lh.Assert().Equal(len(newMan.References()), 1)
|
||||
}
|
||||
|
||||
func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
|
||||
ctx := context.Background()
|
||||
manifest := &mockManifest{}
|
||||
|
207
src/controller/proxy/manifestcache.go
Normal file
207
src/controller/proxy/manifestcache.go
Normal file
@ -0,0 +1,207 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest/manifestlist"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
libCache "github.com/goharbor/harbor/src/lib/cache"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/opencontainers/go-digest"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const defaultHandler = "default"
|
||||
|
||||
// NewCacheHandlerRegistry ...
|
||||
func NewCacheHandlerRegistry(local localInterface) map[string]ManifestCacheHandler {
|
||||
manListHandler := &ManifestListCache{
|
||||
local: local,
|
||||
cache: libCache.Default(),
|
||||
}
|
||||
manHandler := &ManifestCache{local}
|
||||
registry := map[string]ManifestCacheHandler{
|
||||
manifestlist.MediaTypeManifestList: manListHandler,
|
||||
v1.MediaTypeImageIndex: manListHandler,
|
||||
schema2.MediaTypeManifest: manHandler,
|
||||
defaultHandler: manHandler,
|
||||
}
|
||||
return registry
|
||||
}
|
||||
|
||||
// ManifestCacheHandler define how to cache manifest content
|
||||
type ManifestCacheHandler interface {
|
||||
// CacheContent - cache the content of the manifest
|
||||
CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface)
|
||||
}
|
||||
|
||||
// ManifestListCache handle Manifest list type and index type
|
||||
type ManifestListCache struct {
|
||||
cache libCache.Cache
|
||||
local localInterface
|
||||
}
|
||||
|
||||
// CacheContent ...
|
||||
func (m *ManifestListCache) CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface) {
|
||||
_, payload, err := man.Payload()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get payload, error %v", err)
|
||||
return
|
||||
}
|
||||
key := getManifestListKey(art.Repository, art.Digest)
|
||||
log.Debugf("cache manifest list with key=cache:%v", key)
|
||||
err = m.cache.Save(key, payload, manifestListCacheInterval)
|
||||
if err != nil {
|
||||
log.Errorf("failed to cache payload, error %v", err)
|
||||
}
|
||||
err = m.push(ctx, art.Repository, getReference(art), man)
|
||||
if err != nil {
|
||||
log.Errorf("error when push manifest list to local :%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// cacheTrimmedDigest - cache the change Trimmed Digest for controller.EnsureTag when digest is changed
|
||||
func (m *ManifestListCache) cacheTrimmedDigest(ctx context.Context, newDig string) {
|
||||
if m.cache == nil {
|
||||
return
|
||||
}
|
||||
art := lib.GetArtifactInfo(ctx)
|
||||
key := TrimmedManifestlist + string(art.Digest)
|
||||
err := m.cache.Save(key, newDig)
|
||||
if err != nil {
|
||||
log.Warningf("failed to cache the trimmed manifest, err %v", err)
|
||||
return
|
||||
}
|
||||
log.Debugf("Saved key:%v, value:%v", key, newDig)
|
||||
|
||||
}
|
||||
|
||||
func (m *ManifestListCache) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
|
||||
switch v := manifest.(type) {
|
||||
case *manifestlist.DeserializedManifestList:
|
||||
existMans := make([]manifestlist.ManifestDescriptor, 0)
|
||||
for _, ma := range v.Manifests {
|
||||
art := lib.ArtifactInfo{Repository: repo, Digest: string(ma.Digest)}
|
||||
a, err := m.local.GetManifest(ctx, art)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if a != nil {
|
||||
existMans = append(existMans, ma)
|
||||
}
|
||||
}
|
||||
return manifestlist.FromDescriptors(existMans)
|
||||
}
|
||||
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
|
||||
}
|
||||
|
||||
func (m *ManifestListCache) push(ctx context.Context, repo, reference string, man distribution.Manifest) error {
|
||||
// For manifest list, it might include some different manifest
|
||||
// it will wait and check for 30 mins, if all depend manifests are ready then push it
|
||||
// if time exceed, then push a updated manifest list which contains existing manifest
|
||||
var newMan distribution.Manifest
|
||||
var err error
|
||||
for n := 0; n < maxManifestListWait; n++ {
|
||||
log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, reference)
|
||||
time.Sleep(sleepIntervalSec * time.Second)
|
||||
newMan, err = m.updateManifestList(ctx, repo, man)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(newMan.References()) == len(man.References()) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(newMan.References()) == 0 {
|
||||
return errors.New("manifest list doesn't contain any pushed manifest")
|
||||
}
|
||||
_, pl, err := newMan.Payload()
|
||||
if err != nil {
|
||||
log.Errorf("failed to get payload, error %v", err)
|
||||
return err
|
||||
}
|
||||
log.Debugf("The manifest list payload: %v", string(pl))
|
||||
newDig := digest.FromBytes(pl)
|
||||
m.cacheTrimmedDigest(ctx, string(newDig))
|
||||
// Because the manifest list maybe updated, need to recheck if it is exist in local
|
||||
art := lib.ArtifactInfo{Repository: repo, Tag: reference}
|
||||
a, err := m.local.GetManifest(ctx, art)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if a != nil && a.Digest == string(newDig) {
|
||||
return nil
|
||||
}
|
||||
// when pushing with digest, should push to its actual digest
|
||||
if strings.HasPrefix(reference, "sha256:") {
|
||||
reference = string(newDig)
|
||||
}
|
||||
return m.local.PushManifest(repo, reference, newMan)
|
||||
}
|
||||
|
||||
// ManifestCache default Manifest handler
|
||||
type ManifestCache struct {
|
||||
local localInterface
|
||||
}
|
||||
|
||||
// CacheContent ...
|
||||
func (m *ManifestCache) CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface) {
|
||||
var waitBlobs []distribution.Descriptor
|
||||
for n := 0; n < maxManifestWait; n++ {
|
||||
time.Sleep(sleepIntervalSec * time.Second)
|
||||
waitBlobs = m.local.CheckDependencies(ctx, art.Repository, man)
|
||||
if len(waitBlobs) == 0 {
|
||||
break
|
||||
}
|
||||
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
|
||||
}
|
||||
if len(waitBlobs) > 0 {
|
||||
// docker client will skip to pull layers exist in local
|
||||
// these blobs are not exist in the proxy server
|
||||
// it will cause the manifest dependency check always fail
|
||||
// need to push these blobs before push manifest to avoid failure
|
||||
log.Debug("Waiting blobs not empty, push it to local repo directly")
|
||||
for _, desc := range waitBlobs {
|
||||
err := m.putBlobToLocal(remoteRepo, art.Repository, desc, r)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to push blob to local repo, error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
err := m.local.PushManifest(art.Repository, getReference(art), man)
|
||||
if err != nil {
|
||||
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ManifestCache) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r RemoteInterface) error {
|
||||
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest)
|
||||
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest))
|
||||
if err != nil {
|
||||
log.Errorf("failed to create blob reader, error %v", err)
|
||||
return err
|
||||
}
|
||||
defer bReader.Close()
|
||||
err = m.local.PushBlob(localRepo, desc, bReader)
|
||||
return err
|
||||
}
|
154
src/controller/proxy/manifestcache_test.go
Normal file
154
src/controller/proxy/manifestcache_test.go
Normal file
@ -0,0 +1,154 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest"
|
||||
"github.com/docker/distribution/manifest/manifestlist"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
"github.com/goharbor/harbor/src/controller/artifact"
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
"github.com/goharbor/harbor/src/testing/mock"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type CacheTestSuite struct {
|
||||
suite.Suite
|
||||
mHandler *ManifestListCache
|
||||
local localInterfaceMock
|
||||
}
|
||||
|
||||
func (suite *CacheTestSuite) SetupSuite() {
|
||||
suite.local = localInterfaceMock{}
|
||||
suite.mHandler = &ManifestListCache{local: &suite.local}
|
||||
}
|
||||
|
||||
func (suite *CacheTestSuite) TearDownSuite() {
|
||||
}
|
||||
func (suite *CacheTestSuite) TestUpdateManifestList() {
|
||||
ctx := context.Background()
|
||||
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||
manifestList := manifestlist.ManifestList{
|
||||
Versioned: manifest.Versioned{
|
||||
SchemaVersion: 2,
|
||||
MediaType: manifestlist.MediaTypeManifestList,
|
||||
},
|
||||
Manifests: []manifestlist.ManifestDescriptor{
|
||||
{
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(amdDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "amd64",
|
||||
OS: "linux",
|
||||
},
|
||||
}, {
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(armDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "arm",
|
||||
OS: "linux",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
manList := &manifestlist.DeserializedManifestList{
|
||||
ManifestList: manifestList,
|
||||
}
|
||||
artInfo1 := lib.ArtifactInfo{
|
||||
Repository: "library/hello-world",
|
||||
Digest: amdDig,
|
||||
Tag: "",
|
||||
}
|
||||
ar := &artifact.Artifact{}
|
||||
suite.local.On("GetManifest", ctx, artInfo1).Return(ar, nil)
|
||||
suite.local.On("GetManifest", ctx, mock.Anything).Return(nil, nil)
|
||||
|
||||
newMan, err := suite.mHandler.updateManifestList(ctx, "library/hello-world", manList)
|
||||
suite.Require().Nil(err)
|
||||
suite.Assert().Equal(len(newMan.References()), 1)
|
||||
}
|
||||
|
||||
func (suite *CacheTestSuite) TestPushManifestList() {
|
||||
ctx := context.Background()
|
||||
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||
manifestList := manifestlist.ManifestList{
|
||||
Versioned: manifest.Versioned{
|
||||
SchemaVersion: 2,
|
||||
MediaType: manifestlist.MediaTypeManifestList,
|
||||
},
|
||||
Manifests: []manifestlist.ManifestDescriptor{
|
||||
{
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(amdDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "amd64",
|
||||
OS: "linux",
|
||||
},
|
||||
}, {
|
||||
Descriptor: distribution.Descriptor{
|
||||
Digest: digest.Digest(armDig),
|
||||
Size: 3253,
|
||||
MediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
Platform: manifestlist.PlatformSpec{
|
||||
Architecture: "arm",
|
||||
OS: "linux",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
manList := &manifestlist.DeserializedManifestList{
|
||||
ManifestList: manifestList,
|
||||
}
|
||||
repo := "library/hello-world"
|
||||
artInfo1 := lib.ArtifactInfo{
|
||||
Repository: repo,
|
||||
Digest: amdDig,
|
||||
Tag: "",
|
||||
}
|
||||
ar := &artifact.Artifact{}
|
||||
_, payload, err := manList.Payload()
|
||||
suite.Nil(err)
|
||||
originDigest := digest.FromBytes(payload)
|
||||
|
||||
suite.local.On("GetManifest", ctx, artInfo1).Return(ar, nil)
|
||||
suite.local.On("GetManifest", ctx, mock.Anything).Return(nil, nil)
|
||||
|
||||
suite.local.On("PushManifest", repo, originDigest, mock.Anything).Return(fmt.Errorf("wrong digest"))
|
||||
suite.local.On("PushManifest", repo, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
err = suite.mHandler.push(ctx, "library/hello-world", string(originDigest), manList)
|
||||
suite.Require().Nil(err)
|
||||
}
|
||||
|
||||
func TestCacheTestSuite(t *testing.T) {
|
||||
suite.Run(t, &CacheTestSuite{})
|
||||
}
|
@ -30,7 +30,7 @@ type RemoteInterface interface {
|
||||
// Manifest get manifest by reference
|
||||
Manifest(repo string, ref string) (distribution.Manifest, string, error)
|
||||
// ManifestExist checks manifest exist, if exist, return digest
|
||||
ManifestExist(repo string, ref string) (bool, string, error)
|
||||
ManifestExist(repo string, ref string) (bool, *distribution.Descriptor, error)
|
||||
}
|
||||
|
||||
// remoteHelper defines operations related to remote repository under proxy
|
||||
@ -86,6 +86,6 @@ func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest,
|
||||
return r.registry.PullManifest(repo, ref)
|
||||
}
|
||||
|
||||
func (r *remoteHelper) ManifestExist(repo string, ref string) (bool, string, error) {
|
||||
func (r *remoteHelper) ManifestExist(repo string, ref string) (bool, *distribution.Descriptor, error) {
|
||||
return r.registry.ManifestExist(repo, ref)
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func (_m *mockAdapter) Info() (*model.RegistryInfo, error) {
|
||||
}
|
||||
|
||||
// ManifestExist provides a mock function with given fields: repository, reference
|
||||
func (_m *mockAdapter) ManifestExist(repository string, reference string) (bool, string, error) {
|
||||
func (_m *mockAdapter) ManifestExist(repository string, reference string) (bool, *distribution.Descriptor, error) {
|
||||
ret := _m.Called(repository, reference)
|
||||
|
||||
var r0 bool
|
||||
@ -144,11 +144,13 @@ func (_m *mockAdapter) ManifestExist(repository string, reference string) (bool,
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
var r1 string
|
||||
if rf, ok := ret.Get(1).(func(string, string) string); ok {
|
||||
var r1 *distribution.Descriptor
|
||||
if rf, ok := ret.Get(1).(func(string, string) *distribution.Descriptor); ok {
|
||||
r1 = rf(repository, reference)
|
||||
} else {
|
||||
r1 = ret.Get(1).(string)
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(*distribution.Descriptor)
|
||||
}
|
||||
}
|
||||
|
||||
var r2 error
|
||||
|
@ -72,7 +72,7 @@ type Client interface {
|
||||
// ListTags lists the tags under the specified repository
|
||||
ListTags(repository string) (tags []string, err error)
|
||||
// ManifestExist checks the existence of the manifest
|
||||
ManifestExist(repository, reference string) (exist bool, digest string, err error)
|
||||
ManifestExist(repository, reference string) (exist bool, desc *distribution.Descriptor, err error)
|
||||
// PullManifest pulls the specified manifest
|
||||
PullManifest(repository, reference string, acceptedMediaTypes ...string) (manifest distribution.Manifest, digest string, err error)
|
||||
// PushManifest pushes the specified manifest
|
||||
@ -242,10 +242,10 @@ func (c *client) listTags(url string) ([]string, string, error) {
|
||||
return tgs.Tags, next(resp.Header.Get("Link")), nil
|
||||
}
|
||||
|
||||
func (c *client) ManifestExist(repository, reference string) (bool, string, error) {
|
||||
func (c *client) ManifestExist(repository, reference string) (bool, *distribution.Descriptor, error) {
|
||||
req, err := http.NewRequest(http.MethodHead, buildManifestURL(c.url, repository, reference), nil)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
return false, nil, err
|
||||
}
|
||||
for _, mediaType := range accepts {
|
||||
req.Header.Add(http.CanonicalHeaderKey("Accept"), mediaType)
|
||||
@ -253,12 +253,16 @@ func (c *client) ManifestExist(repository, reference string) (bool, string, erro
|
||||
resp, err := c.do(req)
|
||||
if err != nil {
|
||||
if errors.IsErr(err, errors.NotFoundCode) {
|
||||
return false, "", nil
|
||||
return false, nil, nil
|
||||
}
|
||||
return false, "", err
|
||||
return false, nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
return true, resp.Header.Get(http.CanonicalHeaderKey("Docker-Content-Digest")), nil
|
||||
dig := resp.Header.Get(http.CanonicalHeaderKey("Docker-Content-Digest"))
|
||||
contentType := resp.Header.Get(http.CanonicalHeaderKey("Content-Type"))
|
||||
contentLen := resp.Header.Get(http.CanonicalHeaderKey("Content-Length"))
|
||||
len, _ := strconv.Atoi(contentLen)
|
||||
return true, &distribution.Descriptor{Digest: digest.Digest(dig), MediaType: contentType, Size: int64(len)}, nil
|
||||
}
|
||||
|
||||
func (c *client) PullManifest(repository, reference string, acceptedMediaTypes ...string) (
|
||||
@ -310,7 +314,7 @@ func (c *client) DeleteManifest(repository, reference string) error {
|
||||
_, err := digest.Parse(reference)
|
||||
if err != nil {
|
||||
// the reference is tag, get the digest first
|
||||
exist, digest, err := c.ManifestExist(repository, reference)
|
||||
exist, desc, err := c.ManifestExist(repository, reference)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -318,7 +322,7 @@ func (c *client) DeleteManifest(repository, reference string) error {
|
||||
return errors.New(nil).WithCode(errors.NotFoundCode).
|
||||
WithMessage("%s:%s not found", repository, reference)
|
||||
}
|
||||
reference = digest
|
||||
reference = string(desc.Digest)
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodDelete, buildManifestURL(c.url, repository, reference), nil)
|
||||
if err != nil {
|
||||
@ -450,13 +454,13 @@ func (c *client) Copy(srcRepo, srcRef, dstRepo, dstRef string, override bool) er
|
||||
}
|
||||
|
||||
// check the existence of the artifact on the destination repository
|
||||
exist, dstDgt, err := c.ManifestExist(dstRepo, dstRef)
|
||||
exist, desc, err := c.ManifestExist(dstRepo, dstRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exist {
|
||||
// the same artifact already exists
|
||||
if srcDgt == dstDgt {
|
||||
if desc != nil && srcDgt == string(desc.Digest) {
|
||||
return nil
|
||||
}
|
||||
// the same name artifact exists, but not allowed to override
|
||||
|
@ -171,15 +171,15 @@ func (c *clientTestSuite) TestManifestExist() {
|
||||
|
||||
client := NewClient(server.URL, "", "", true)
|
||||
// doesn't exist
|
||||
exist, digest, err := client.ManifestExist("library/alpine", "latest")
|
||||
exist, desc, err := client.ManifestExist("library/alpine", "latest")
|
||||
c.Require().Nil(err)
|
||||
c.False(exist)
|
||||
|
||||
// exist
|
||||
exist, digest, err = client.ManifestExist("library/hello-world", "latest")
|
||||
exist, desc, err = client.ManifestExist("library/hello-world", "latest")
|
||||
c.Require().Nil(err)
|
||||
c.True(exist)
|
||||
c.Equal("digest", digest)
|
||||
c.Equal("digest", string(desc.Digest))
|
||||
}
|
||||
|
||||
func (c *clientTestSuite) TestPullManifest() {
|
||||
|
@ -54,7 +54,7 @@ type Adapter interface {
|
||||
// ArtifactRegistry defines the capabilities that an artifact registry should have
|
||||
type ArtifactRegistry interface {
|
||||
FetchArtifacts(filters []*model.Filter) ([]*model.Resource, error)
|
||||
ManifestExist(repository, reference string) (exist bool, digest string, err error)
|
||||
ManifestExist(repository, reference string) (exist bool, desc *distribution.Descriptor, err error)
|
||||
PullManifest(repository, reference string, accepttedMediaTypes ...string) (manifest distribution.Manifest, digest string, err error)
|
||||
PushManifest(repository, reference, mediaType string, payload []byte) (string, error)
|
||||
DeleteManifest(repository, reference string) error // the "reference" can be "tag" or "digest", the function needs to handle both
|
||||
|
@ -3,11 +3,12 @@ package huawei
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/docker/distribution"
|
||||
"github.com/goharbor/harbor/src/replication/model"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/replication/model"
|
||||
)
|
||||
|
||||
// FetchArtifacts gets resources from Huawei SWR
|
||||
@ -54,17 +55,17 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) ([]*model.Resource, er
|
||||
}
|
||||
|
||||
// ManifestExist check the manifest of Huawei SWR
|
||||
func (a *adapter) ManifestExist(repository, reference string) (exist bool, digest string, err error) {
|
||||
func (a *adapter) ManifestExist(repository, reference string) (exist bool, desc *distribution.Descriptor, err error) {
|
||||
token, err := getJwtToken(a, repository)
|
||||
if err != nil {
|
||||
return exist, digest, err
|
||||
return exist, nil, err
|
||||
}
|
||||
|
||||
urls := fmt.Sprintf("%s/v2/%s/manifests/%s", a.registry.URL, repository, reference)
|
||||
|
||||
r, err := http.NewRequest("GET", urls, nil)
|
||||
if err != nil {
|
||||
return exist, digest, err
|
||||
return exist, nil, err
|
||||
}
|
||||
|
||||
r.Header.Add("content-type", "application/json; charset=utf-8")
|
||||
@ -72,29 +73,33 @@ func (a *adapter) ManifestExist(repository, reference string) (exist bool, diges
|
||||
|
||||
resp, err := a.oriClient.Do(r)
|
||||
if err != nil {
|
||||
return exist, digest, err
|
||||
return exist, nil, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
code := resp.StatusCode
|
||||
if code >= 300 || code < 200 {
|
||||
if code == 404 {
|
||||
return false, digest, nil
|
||||
return false, nil, nil
|
||||
}
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
return exist, digest, fmt.Errorf("[%d][%s]", code, string(body))
|
||||
return exist, nil, fmt.Errorf("[%d][%s]", code, string(body))
|
||||
}
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return exist, digest, err
|
||||
return exist, nil, err
|
||||
}
|
||||
exist = true
|
||||
manifest := hwManifest{}
|
||||
err = json.Unmarshal(body, &manifest)
|
||||
if err != nil {
|
||||
return exist, digest, err
|
||||
return exist, nil, err
|
||||
}
|
||||
return exist, manifest.Config.Digest, nil
|
||||
contentType := resp.Header.Get(http.CanonicalHeaderKey("Content-Type"))
|
||||
contentLen := resp.Header.Get(http.CanonicalHeaderKey("Content-Length"))
|
||||
len, _ := strconv.Atoi(contentLen)
|
||||
|
||||
return exist, &distribution.Descriptor{MediaType: contentType, Size: int64(len)}, nil
|
||||
}
|
||||
|
||||
// DeleteManifest delete the manifest of Huawei SWR
|
||||
|
@ -44,7 +44,7 @@ func TestAdapter_FetchArtifacts(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAdapter_ManifestExist(t *testing.T) {
|
||||
exist, digest, err := HWAdapter.ManifestExist("", "")
|
||||
exist, desc, err := HWAdapter.ManifestExist("", "")
|
||||
if err != nil {
|
||||
if strings.HasPrefix(err.Error(), "[401]") {
|
||||
t.Log("huawei ak/sk is not available", err.Error())
|
||||
@ -53,7 +53,7 @@ func TestAdapter_ManifestExist(t *testing.T) {
|
||||
}
|
||||
} else {
|
||||
if exist {
|
||||
t.Log(digest)
|
||||
t.Log(desc.Digest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -328,13 +328,17 @@ func (t *transfer) pullManifest(repository, reference string) (
|
||||
}
|
||||
|
||||
func (t *transfer) exist(repository, tag string) (bool, string, error) {
|
||||
exist, digest, err := t.dst.ManifestExist(repository, tag)
|
||||
exist, desc, err := t.dst.ManifestExist(repository, tag)
|
||||
if err != nil {
|
||||
t.logger.Errorf("failed to check the existence of the manifest of artifact %s:%s on the destination registry: %v",
|
||||
repository, tag, err)
|
||||
return false, "", err
|
||||
}
|
||||
return exist, digest, nil
|
||||
var dig string
|
||||
if desc != nil {
|
||||
dig = string(desc.Digest)
|
||||
}
|
||||
return exist, dig, nil
|
||||
}
|
||||
|
||||
func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag string) error {
|
||||
|
@ -16,6 +16,7 @@ package image
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
@ -35,11 +36,11 @@ func (f *fakeRegistry) FetchArtifacts([]*model.Filter) ([]*model.Resource, error
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, string, error) {
|
||||
func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, *distribution.Descriptor, error) {
|
||||
if repository == "destination" && reference == "b1" {
|
||||
return true, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
|
||||
return true, &distribution.Descriptor{Digest: digest.Digest("sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7")}, nil
|
||||
}
|
||||
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
|
||||
return false, &distribution.Descriptor{Digest: digest.Digest("sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7")}, nil
|
||||
}
|
||||
func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMediaTypes ...string) (distribution.Manifest, string, error) {
|
||||
manifest := `{
|
||||
|
@ -17,9 +17,6 @@ package repoproxy
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/security"
|
||||
"github.com/goharbor/harbor/src/common/security/proxycachesecret"
|
||||
@ -29,9 +26,13 @@ import (
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
httpLib "github.com/goharbor/harbor/src/lib/http"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/replication/model"
|
||||
"github.com/goharbor/harbor/src/replication/registry"
|
||||
"github.com/goharbor/harbor/src/server/middleware"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
var registryMgr = registry.NewDefaultManager()
|
||||
@ -41,6 +42,8 @@ const (
|
||||
contentType = "Content-Type"
|
||||
dockerContentDigest = "Docker-Content-Digest"
|
||||
etag = "Etag"
|
||||
ensureTagInterval = 10 * time.Second
|
||||
ensureTagMaxRetry = 60
|
||||
)
|
||||
|
||||
// BlobGetMiddleware handle get blob request
|
||||
@ -227,14 +230,32 @@ func DisableBlobAndManifestUploadMiddleware() func(http.Handler) http.Handler {
|
||||
}
|
||||
|
||||
func proxyManifestHead(ctx context.Context, w http.ResponseWriter, ctl proxy.Controller, p *models.Project, art lib.ArtifactInfo, remote proxy.RemoteInterface) error {
|
||||
exist, dig, err := ctl.HeadManifest(ctx, art, remote)
|
||||
exist, desc, err := ctl.HeadManifest(ctx, art, remote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exist {
|
||||
if !exist || desc == nil {
|
||||
return errors.NotFoundError(fmt.Errorf("The tag %v:%v is not found", art.Repository, art.Tag))
|
||||
}
|
||||
w.Header().Set(dockerContentDigest, dig)
|
||||
w.Header().Set(etag, dig)
|
||||
go func(art lib.ArtifactInfo) {
|
||||
// After docker 20.10 or containerd, the client heads the tag first,
|
||||
// Then GET the image by digest, in order to associate the tag with the digest
|
||||
// Ensure tag after head request, make sure tags in proxy cache keep update
|
||||
bCtx := orm.Context()
|
||||
for i := 0; i < ensureTagMaxRetry; i++ {
|
||||
time.Sleep(ensureTagInterval)
|
||||
bArt := lib.ArtifactInfo{ProjectName: art.ProjectName, Repository: art.Repository, Digest: string(desc.Digest)}
|
||||
err := ctl.EnsureTag(bCtx, bArt, art.Tag)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
log.Debugf("Failed to ensure tag %+v , error %v", art, err)
|
||||
}
|
||||
}(art)
|
||||
|
||||
w.Header().Set(contentType, desc.MediaType)
|
||||
w.Header().Set(contentLength, fmt.Sprintf("%v", desc.Size))
|
||||
w.Header().Set(dockerContentDigest, string(desc.Digest))
|
||||
w.Header().Set(etag, string(desc.Digest))
|
||||
return nil
|
||||
}
|
||||
|
@ -23,3 +23,4 @@ package controller
|
||||
//go:generate mockery --case snake --dir ../../controller/scanner --name Controller --output ./scanner --outpkg scanner
|
||||
//go:generate mockery --case snake --dir ../../controller/replication --name Controller --output ./replication --outpkg replication
|
||||
//go:generate mockery --case snake --dir ../../controller/robot --name Controller --output ./robot --outpkg robot
|
||||
//go:generate mockery --case snake --dir ../../controller/proxy --name RemoteInterface --output ./proxy --outpkg proxy
|
||||
|
106
src/testing/controller/proxy/remote_interface.go
Normal file
106
src/testing/controller/proxy/remote_interface.go
Normal file
@ -0,0 +1,106 @@
|
||||
// Code generated by mockery v2.1.0. DO NOT EDIT.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
io "io"
|
||||
|
||||
distribution "github.com/docker/distribution"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// RemoteInterface is an autogenerated mock type for the RemoteInterface type
|
||||
type RemoteInterface struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// BlobReader provides a mock function with given fields: repo, dig
|
||||
func (_m *RemoteInterface) BlobReader(repo string, dig string) (int64, io.ReadCloser, error) {
|
||||
ret := _m.Called(repo, dig)
|
||||
|
||||
var r0 int64
|
||||
if rf, ok := ret.Get(0).(func(string, string) int64); ok {
|
||||
r0 = rf(repo, dig)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
var r1 io.ReadCloser
|
||||
if rf, ok := ret.Get(1).(func(string, string) io.ReadCloser); ok {
|
||||
r1 = rf(repo, dig)
|
||||
} else {
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(io.ReadCloser)
|
||||
}
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(string, string) error); ok {
|
||||
r2 = rf(repo, dig)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// Manifest provides a mock function with given fields: repo, ref
|
||||
func (_m *RemoteInterface) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
|
||||
ret := _m.Called(repo, ref)
|
||||
|
||||
var r0 distribution.Manifest
|
||||
if rf, ok := ret.Get(0).(func(string, string) distribution.Manifest); ok {
|
||||
r0 = rf(repo, ref)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(distribution.Manifest)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 string
|
||||
if rf, ok := ret.Get(1).(func(string, string) string); ok {
|
||||
r1 = rf(repo, ref)
|
||||
} else {
|
||||
r1 = ret.Get(1).(string)
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(string, string) error); ok {
|
||||
r2 = rf(repo, ref)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// ManifestExist provides a mock function with given fields: repo, ref
|
||||
func (_m *RemoteInterface) ManifestExist(repo string, ref string) (bool, *distribution.Descriptor, error) {
|
||||
ret := _m.Called(repo, ref)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(string, string) bool); ok {
|
||||
r0 = rf(repo, ref)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
var r1 *distribution.Descriptor
|
||||
if rf, ok := ret.Get(1).(func(string, string) *distribution.Descriptor); ok {
|
||||
r1 = rf(repo, ref)
|
||||
} else {
|
||||
if ret.Get(1) != nil {
|
||||
r1 = ret.Get(1).(*distribution.Descriptor)
|
||||
}
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(string, string) error); ok {
|
||||
r2 = rf(repo, ref)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
@ -53,9 +53,13 @@ func (f *FakeClient) ListTags(repository string) ([]string, error) {
|
||||
}
|
||||
|
||||
// ManifestExist ...
|
||||
func (f *FakeClient) ManifestExist(repository, reference string) (bool, string, error) {
|
||||
func (f *FakeClient) ManifestExist(repository, reference string) (bool, *distribution.Descriptor, error) {
|
||||
args := f.Called()
|
||||
return args.Bool(0), args.String(1), args.Error(2)
|
||||
var desc *distribution.Descriptor
|
||||
if args[0] != nil {
|
||||
desc = args[0].(*distribution.Descriptor)
|
||||
}
|
||||
return args.Bool(0), desc, args.Error(2)
|
||||
}
|
||||
|
||||
// PullManifest ...
|
||||
|
Loading…
Reference in New Issue
Block a user