mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-29 12:07:56 +01:00
Refactor Manifest cache process
Separate manifest, manifest list and image index. Signed-off-by: stonezdj <stonezdj@gmail.com>
This commit is contained in:
parent
aa3002e7a5
commit
1d50be31aa
@ -35,7 +35,6 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/lib/orm"
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -71,10 +70,11 @@ type Controller interface {
|
|||||||
EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error
|
EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error
|
||||||
}
|
}
|
||||||
type controller struct {
|
type controller struct {
|
||||||
blobCtl blob.Controller
|
blobCtl blob.Controller
|
||||||
artifactCtl artifact.Controller
|
artifactCtl artifact.Controller
|
||||||
local localInterface
|
local localInterface
|
||||||
cache cache.Cache
|
cache cache.Cache
|
||||||
|
handlerRegistry map[string]ManifestCacheHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
// ControllerInstance -- Get the proxy controller instance
|
// ControllerInstance -- Get the proxy controller instance
|
||||||
@ -82,11 +82,13 @@ func ControllerInstance() Controller {
|
|||||||
// Lazy load the controller
|
// Lazy load the controller
|
||||||
// Because LocalHelper is not ready unless core startup completely
|
// Because LocalHelper is not ready unless core startup completely
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
|
l := newLocalHelper()
|
||||||
ctl = &controller{
|
ctl = &controller{
|
||||||
blobCtl: blob.Ctl,
|
blobCtl: blob.Ctl,
|
||||||
artifactCtl: artifact.Ctl,
|
artifactCtl: artifact.Ctl,
|
||||||
local: newLocalHelper(),
|
local: newLocalHelper(),
|
||||||
cache: cache.Default(),
|
cache: cache.Default(),
|
||||||
|
handlerRegistry: NewCacheHandlerRegistry(l),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -97,9 +99,15 @@ func (c *controller) EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagNam
|
|||||||
// search the digest in cache and query with trimmed digest
|
// search the digest in cache and query with trimmed digest
|
||||||
var trimmedDigest string
|
var trimmedDigest string
|
||||||
err := c.cache.Fetch(TrimmedManifestlist+art.Digest, &trimmedDigest)
|
err := c.cache.Fetch(TrimmedManifestlist+art.Digest, &trimmedDigest)
|
||||||
if err != cache.ErrNotFound {
|
if err == cache.ErrNotFound {
|
||||||
log.Debugf("Found trimed digest: %v", trimmedDigest)
|
// skip to update digest, continue
|
||||||
|
} else if err != nil {
|
||||||
|
// for other error, return
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
// found in redis, update the digest
|
||||||
art.Digest = trimmedDigest
|
art.Digest = trimmedDigest
|
||||||
|
log.Debugf("Found trimmed digest: %v", trimmedDigest)
|
||||||
}
|
}
|
||||||
a, err := c.local.GetManifest(ctx, art)
|
a, err := c.local.GetManifest(ctx, art)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -260,53 +268,14 @@ func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc di
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r RemoteInterface) {
|
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r RemoteInterface) {
|
||||||
if contType == manifestlist.MediaTypeManifestList {
|
h, ok := c.handlerRegistry[contType]
|
||||||
_, payload, err := man.Payload()
|
if !ok {
|
||||||
if err != nil {
|
h, ok = c.handlerRegistry[defaultHandler]
|
||||||
log.Errorf("failed to get payload, error %v", err)
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
key := getManifestListKey(art.Repository, art.Digest)
|
|
||||||
log.Debugf("Cache manifest list with key=cache:%v", key)
|
|
||||||
err = c.cache.Save(key, payload, manifestListCacheInterval)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to cache payload, error %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex {
|
|
||||||
err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error when push manifest list to local :%v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var waitBlobs []distribution.Descriptor
|
|
||||||
for n := 0; n < maxManifestWait; n++ {
|
|
||||||
time.Sleep(sleepIntervalSec * time.Second)
|
|
||||||
waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man)
|
|
||||||
if len(waitBlobs) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
|
|
||||||
}
|
|
||||||
if len(waitBlobs) > 0 {
|
|
||||||
// docker client will skip to pull layers exist in local
|
|
||||||
// these blobs are not exist in the proxy server
|
|
||||||
// it will cause the manifest dependency check always fail
|
|
||||||
// need to push these blobs before push manifest to avoid failure
|
|
||||||
log.Debug("Waiting blobs not empty, push it to local repo directly")
|
|
||||||
for _, desc := range waitBlobs {
|
|
||||||
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Failed to push blob to local repo, error: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err := c.local.PushManifest(art.Repository, getReference(art), man)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
|
|
||||||
}
|
}
|
||||||
|
h.CacheContent(ctx, remoteRepo, man, art, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRemoteRepo get the remote repository name, used in proxy cache
|
// getRemoteRepo get the remote repository name, used in proxy cache
|
||||||
|
@ -63,7 +63,8 @@ func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descri
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error {
|
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error {
|
||||||
panic("implement me")
|
args := l.Called(repo, tag, manifest)
|
||||||
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
||||||
|
@ -16,24 +16,18 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"github.com/goharbor/harbor/src/lib/cache"
|
|
||||||
"io"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/manifest/manifestlist"
|
|
||||||
"github.com/goharbor/harbor/src/controller/artifact"
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
"github.com/goharbor/harbor/src/controller/event/metadata"
|
"github.com/goharbor/harbor/src/controller/event/metadata"
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/lib"
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/lib/cache"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/pkg/notifier/event"
|
"github.com/goharbor/harbor/src/pkg/notifier/event"
|
||||||
"github.com/goharbor/harbor/src/pkg/proxy/secret"
|
"github.com/goharbor/harbor/src/pkg/proxy/secret"
|
||||||
"github.com/goharbor/harbor/src/pkg/registry"
|
"github.com/goharbor/harbor/src/pkg/registry"
|
||||||
"github.com/opencontainers/go-digest"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TrimmedManifestlist - key prefix for trimmed manifest
|
// TrimmedManifestlist - key prefix for trimmed manifest
|
||||||
@ -49,8 +43,6 @@ type localInterface interface {
|
|||||||
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
|
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
|
||||||
// PushManifest push manifest to local repo, ref can be digest or tag
|
// PushManifest push manifest to local repo, ref can be digest or tag
|
||||||
PushManifest(repo string, ref string, manifest distribution.Manifest) error
|
PushManifest(repo string, ref string, manifest distribution.Manifest) error
|
||||||
// PushManifestList push manifest list to local repo
|
|
||||||
PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error
|
|
||||||
// CheckDependencies check if the manifest's dependency is ready
|
// CheckDependencies check if the manifest's dependency is ready
|
||||||
CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor
|
CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor
|
||||||
// DeleteManifest cleanup delete tag from local cache
|
// DeleteManifest cleanup delete tag from local cache
|
||||||
@ -141,80 +133,6 @@ func (l *localHelper) DeleteManifest(repo, ref string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateManifestList -- Trim the manifest list, make sure at least one depend manifests is ready
|
|
||||||
func (l *localHelper) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
|
|
||||||
switch v := manifest.(type) {
|
|
||||||
case *manifestlist.DeserializedManifestList:
|
|
||||||
existMans := make([]manifestlist.ManifestDescriptor, 0)
|
|
||||||
for _, m := range v.Manifests {
|
|
||||||
art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
|
|
||||||
a, err := l.GetManifest(ctx, art)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if a != nil {
|
|
||||||
existMans = append(existMans, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return manifestlist.FromDescriptors(existMans)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *localHelper) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
|
||||||
// For manifest list, it might include some different manifest
|
|
||||||
// it will wait and check for 30 mins, if all depend manifests exist then push it
|
|
||||||
// if time exceed, only push the new updated manifest list which contains existing manifest
|
|
||||||
var newMan distribution.Manifest
|
|
||||||
var err error
|
|
||||||
for n := 0; n < maxManifestListWait; n++ {
|
|
||||||
log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, tag)
|
|
||||||
time.Sleep(sleepIntervalSec * time.Second)
|
|
||||||
newMan, err = l.updateManifestList(ctx, repo, man)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(newMan.References()) == len(man.References()) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(newMan.References()) == 0 {
|
|
||||||
return errors.New("manifest list doesn't contain any pushed manifest")
|
|
||||||
}
|
|
||||||
_, pl, err := newMan.Payload()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to get payload, error %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Debugf("The manifest list payload: %v", string(pl))
|
|
||||||
newDig := digest.FromBytes(pl)
|
|
||||||
l.cacheTrimmedDigest(ctx, string(newDig))
|
|
||||||
// Because the manifest list maybe updated, need to recheck if it is exist in local
|
|
||||||
art := lib.ArtifactInfo{Repository: repo, Tag: tag}
|
|
||||||
a, err := l.GetManifest(ctx, art)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if a != nil && a.Digest == string(newDig) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// because current digest is changed, need to push to the new digest
|
|
||||||
if strings.HasPrefix(tag, "sha256:") {
|
|
||||||
tag = string(newDig)
|
|
||||||
}
|
|
||||||
return l.PushManifest(repo, tag, newMan)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *localHelper) cacheTrimmedDigest(ctx context.Context, newDig string) {
|
|
||||||
if l.cache == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
art := lib.GetArtifactInfo(ctx)
|
|
||||||
key := TrimmedManifestlist + string(art.Digest)
|
|
||||||
log.Debugf("Saved key:%v, value:%v", key, newDig)
|
|
||||||
l.cache.Save(key, newDig)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
|
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
|
||||||
descriptors := man.References()
|
descriptors := man.References()
|
||||||
waitDesc := make([]distribution.Descriptor, 0)
|
waitDesc := make([]distribution.Descriptor, 0)
|
||||||
|
@ -17,13 +17,9 @@ package proxy
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
distribution2 "github.com/docker/distribution"
|
distribution2 "github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/manifest"
|
|
||||||
"github.com/docker/distribution/manifest/manifestlist"
|
|
||||||
"github.com/docker/distribution/manifest/schema2"
|
"github.com/docker/distribution/manifest/schema2"
|
||||||
"github.com/goharbor/harbor/src/controller/artifact"
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
"github.com/goharbor/harbor/src/lib"
|
"github.com/goharbor/harbor/src/lib"
|
||||||
"github.com/goharbor/harbor/src/pkg/distribution"
|
|
||||||
"github.com/opencontainers/go-digest"
|
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -112,52 +108,6 @@ func (lh *localHelperTestSuite) TestPushManifest() {
|
|||||||
lh.Require().Nil(err)
|
lh.Require().Nil(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lh *localHelperTestSuite) TestUpdateManifestList() {
|
|
||||||
ctx := context.Background()
|
|
||||||
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
|
||||||
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
|
||||||
manifestList := manifestlist.ManifestList{
|
|
||||||
Versioned: manifest.Versioned{
|
|
||||||
SchemaVersion: 2,
|
|
||||||
MediaType: manifestlist.MediaTypeManifestList,
|
|
||||||
},
|
|
||||||
Manifests: []manifestlist.ManifestDescriptor{
|
|
||||||
{
|
|
||||||
Descriptor: distribution.Descriptor{
|
|
||||||
Digest: digest.Digest(amdDig),
|
|
||||||
Size: 3253,
|
|
||||||
MediaType: schema2.MediaTypeManifest,
|
|
||||||
},
|
|
||||||
Platform: manifestlist.PlatformSpec{
|
|
||||||
Architecture: "amd64",
|
|
||||||
OS: "linux",
|
|
||||||
},
|
|
||||||
}, {
|
|
||||||
Descriptor: distribution.Descriptor{
|
|
||||||
Digest: digest.Digest(armDig),
|
|
||||||
Size: 3253,
|
|
||||||
MediaType: schema2.MediaTypeManifest,
|
|
||||||
},
|
|
||||||
Platform: manifestlist.PlatformSpec{
|
|
||||||
Architecture: "arm",
|
|
||||||
OS: "linux",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
manList := &manifestlist.DeserializedManifestList{
|
|
||||||
ManifestList: manifestList,
|
|
||||||
}
|
|
||||||
ar := &artifact.Artifact{}
|
|
||||||
var emptyArtifact *artifact.Artifact
|
|
||||||
var opt *artifact.Option
|
|
||||||
lh.artCtl.On("GetByReference", ctx, "library/hello-world", amdDig, opt).Return(ar, nil)
|
|
||||||
lh.artCtl.On("GetByReference", ctx, "library/hello-world", armDig, opt).Return(emptyArtifact, nil)
|
|
||||||
newMan, err := lh.local.updateManifestList(ctx, "library/hello-world", manList)
|
|
||||||
lh.Require().Nil(err)
|
|
||||||
lh.Assert().Equal(len(newMan.References()), 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
|
func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
manifest := &mockManifest{}
|
manifest := &mockManifest{}
|
||||||
|
207
src/controller/proxy/manifestcache.go
Normal file
207
src/controller/proxy/manifestcache.go
Normal file
@ -0,0 +1,207 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/docker/distribution/manifest/manifestlist"
|
||||||
|
"github.com/docker/distribution/manifest/schema2"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
libCache "github.com/goharbor/harbor/src/lib/cache"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const defaultHandler = "default"
|
||||||
|
|
||||||
|
// NewCacheHandlerRegistry ...
|
||||||
|
func NewCacheHandlerRegistry(local localInterface) map[string]ManifestCacheHandler {
|
||||||
|
manListHandler := &ManifestListCache{
|
||||||
|
local: local,
|
||||||
|
cache: libCache.Default(),
|
||||||
|
}
|
||||||
|
manHandler := &ManifestCache{local}
|
||||||
|
registry := map[string]ManifestCacheHandler{
|
||||||
|
manifestlist.MediaTypeManifestList: manListHandler,
|
||||||
|
v1.MediaTypeImageIndex: manListHandler,
|
||||||
|
schema2.MediaTypeManifest: manHandler,
|
||||||
|
defaultHandler: manHandler,
|
||||||
|
}
|
||||||
|
return registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManifestCacheHandler define how to cache manifest content
|
||||||
|
type ManifestCacheHandler interface {
|
||||||
|
// CacheContent - cache the content of the manifest
|
||||||
|
CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManifestListCache handle Manifest list type and index type
|
||||||
|
type ManifestListCache struct {
|
||||||
|
cache libCache.Cache
|
||||||
|
local localInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheContent ...
|
||||||
|
func (m *ManifestListCache) CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface) {
|
||||||
|
_, payload, err := man.Payload()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get payload, error %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := getManifestListKey(art.Repository, art.Digest)
|
||||||
|
log.Debugf("cache manifest list with key=cache:%v", key)
|
||||||
|
err = m.cache.Save(key, payload, manifestListCacheInterval)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to cache payload, error %v", err)
|
||||||
|
}
|
||||||
|
err = m.push(ctx, art.Repository, getReference(art), man)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error when push manifest list to local :%v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheTrimmedDigest - cache the change Trimmed Digest for controller.EnsureTag when digest is changed
|
||||||
|
func (m *ManifestListCache) cacheTrimmedDigest(ctx context.Context, newDig string) {
|
||||||
|
if m.cache == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
art := lib.GetArtifactInfo(ctx)
|
||||||
|
key := TrimmedManifestlist + string(art.Digest)
|
||||||
|
err := m.cache.Save(key, newDig)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("failed to cache the trimmed manifest, err %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Debugf("Saved key:%v, value:%v", key, newDig)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ManifestListCache) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
|
||||||
|
switch v := manifest.(type) {
|
||||||
|
case *manifestlist.DeserializedManifestList:
|
||||||
|
existMans := make([]manifestlist.ManifestDescriptor, 0)
|
||||||
|
for _, ma := range v.Manifests {
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Digest: string(ma.Digest)}
|
||||||
|
a, err := m.local.GetManifest(ctx, art)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if a != nil {
|
||||||
|
existMans = append(existMans, ma)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return manifestlist.FromDescriptors(existMans)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ManifestListCache) push(ctx context.Context, repo, reference string, man distribution.Manifest) error {
|
||||||
|
// For manifest list, it might include some different manifest
|
||||||
|
// it will wait and check for 30 mins, if all depend manifests are ready then push it
|
||||||
|
// if time exceed, then push a updated manifest list which contains existing manifest
|
||||||
|
var newMan distribution.Manifest
|
||||||
|
var err error
|
||||||
|
for n := 0; n < maxManifestListWait; n++ {
|
||||||
|
log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, reference)
|
||||||
|
time.Sleep(sleepIntervalSec * time.Second)
|
||||||
|
newMan, err = m.updateManifestList(ctx, repo, man)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(newMan.References()) == len(man.References()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(newMan.References()) == 0 {
|
||||||
|
return errors.New("manifest list doesn't contain any pushed manifest")
|
||||||
|
}
|
||||||
|
_, pl, err := newMan.Payload()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get payload, error %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debugf("The manifest list payload: %v", string(pl))
|
||||||
|
newDig := digest.FromBytes(pl)
|
||||||
|
m.cacheTrimmedDigest(ctx, string(newDig))
|
||||||
|
// Because the manifest list maybe updated, need to recheck if it is exist in local
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Tag: reference}
|
||||||
|
a, err := m.local.GetManifest(ctx, art)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if a != nil && a.Digest == string(newDig) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// when pushing with digest, should push to its actual digest
|
||||||
|
if strings.HasPrefix(reference, "sha256:") {
|
||||||
|
reference = string(newDig)
|
||||||
|
}
|
||||||
|
return m.local.PushManifest(repo, reference, newMan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManifestCache default Manifest handler
|
||||||
|
type ManifestCache struct {
|
||||||
|
local localInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheContent ...
|
||||||
|
func (m *ManifestCache) CacheContent(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, r RemoteInterface) {
|
||||||
|
var waitBlobs []distribution.Descriptor
|
||||||
|
for n := 0; n < maxManifestWait; n++ {
|
||||||
|
time.Sleep(sleepIntervalSec * time.Second)
|
||||||
|
waitBlobs = m.local.CheckDependencies(ctx, art.Repository, man)
|
||||||
|
if len(waitBlobs) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
|
||||||
|
}
|
||||||
|
if len(waitBlobs) > 0 {
|
||||||
|
// docker client will skip to pull layers exist in local
|
||||||
|
// these blobs are not exist in the proxy server
|
||||||
|
// it will cause the manifest dependency check always fail
|
||||||
|
// need to push these blobs before push manifest to avoid failure
|
||||||
|
log.Debug("Waiting blobs not empty, push it to local repo directly")
|
||||||
|
for _, desc := range waitBlobs {
|
||||||
|
err := m.putBlobToLocal(remoteRepo, art.Repository, desc, r)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to push blob to local repo, error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := m.local.PushManifest(art.Repository, getReference(art), man)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ManifestCache) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r RemoteInterface) error {
|
||||||
|
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest)
|
||||||
|
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to create blob reader, error %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer bReader.Close()
|
||||||
|
err = m.local.PushBlob(localRepo, desc, bReader)
|
||||||
|
return err
|
||||||
|
}
|
154
src/controller/proxy/manifestcache_test.go
Normal file
154
src/controller/proxy/manifestcache_test.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/docker/distribution/manifest"
|
||||||
|
"github.com/docker/distribution/manifest/manifestlist"
|
||||||
|
"github.com/docker/distribution/manifest/schema2"
|
||||||
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/testing/mock"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CacheTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
mHandler *ManifestListCache
|
||||||
|
local localInterfaceMock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *CacheTestSuite) SetupSuite() {
|
||||||
|
suite.local = localInterfaceMock{}
|
||||||
|
suite.mHandler = &ManifestListCache{local: &suite.local}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *CacheTestSuite) TearDownSuite() {
|
||||||
|
}
|
||||||
|
func (suite *CacheTestSuite) TestUpdateManifestList() {
|
||||||
|
ctx := context.Background()
|
||||||
|
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||||
|
manifestList := manifestlist.ManifestList{
|
||||||
|
Versioned: manifest.Versioned{
|
||||||
|
SchemaVersion: 2,
|
||||||
|
MediaType: manifestlist.MediaTypeManifestList,
|
||||||
|
},
|
||||||
|
Manifests: []manifestlist.ManifestDescriptor{
|
||||||
|
{
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(amdDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "amd64",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(armDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "arm",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manList := &manifestlist.DeserializedManifestList{
|
||||||
|
ManifestList: manifestList,
|
||||||
|
}
|
||||||
|
artInfo1 := lib.ArtifactInfo{
|
||||||
|
Repository: "library/hello-world",
|
||||||
|
Digest: amdDig,
|
||||||
|
Tag: "",
|
||||||
|
}
|
||||||
|
ar := &artifact.Artifact{}
|
||||||
|
suite.local.On("GetManifest", ctx, artInfo1).Return(ar, nil)
|
||||||
|
suite.local.On("GetManifest", ctx, mock.Anything).Return(nil, nil)
|
||||||
|
|
||||||
|
newMan, err := suite.mHandler.updateManifestList(ctx, "library/hello-world", manList)
|
||||||
|
suite.Require().Nil(err)
|
||||||
|
suite.Assert().Equal(len(newMan.References()), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *CacheTestSuite) TestPushManifestList() {
|
||||||
|
ctx := context.Background()
|
||||||
|
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||||
|
manifestList := manifestlist.ManifestList{
|
||||||
|
Versioned: manifest.Versioned{
|
||||||
|
SchemaVersion: 2,
|
||||||
|
MediaType: manifestlist.MediaTypeManifestList,
|
||||||
|
},
|
||||||
|
Manifests: []manifestlist.ManifestDescriptor{
|
||||||
|
{
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(amdDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "amd64",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(armDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "arm",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manList := &manifestlist.DeserializedManifestList{
|
||||||
|
ManifestList: manifestList,
|
||||||
|
}
|
||||||
|
repo := "library/hello-world"
|
||||||
|
artInfo1 := lib.ArtifactInfo{
|
||||||
|
Repository: repo,
|
||||||
|
Digest: amdDig,
|
||||||
|
Tag: "",
|
||||||
|
}
|
||||||
|
ar := &artifact.Artifact{}
|
||||||
|
_, payload, err := manList.Payload()
|
||||||
|
suite.Nil(err)
|
||||||
|
originDigest := digest.FromBytes(payload)
|
||||||
|
|
||||||
|
suite.local.On("GetManifest", ctx, artInfo1).Return(ar, nil)
|
||||||
|
suite.local.On("GetManifest", ctx, mock.Anything).Return(nil, nil)
|
||||||
|
|
||||||
|
suite.local.On("PushManifest", repo, originDigest, mock.Anything).Return(fmt.Errorf("wrong digest"))
|
||||||
|
suite.local.On("PushManifest", repo, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
err = suite.mHandler.push(ctx, "library/hello-world", string(originDigest), manList)
|
||||||
|
suite.Require().Nil(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &CacheTestSuite{})
|
||||||
|
}
|
@ -237,7 +237,7 @@ func proxyManifestHead(ctx context.Context, w http.ResponseWriter, ctl proxy.Con
|
|||||||
if !exist || desc == nil {
|
if !exist || desc == nil {
|
||||||
return errors.NotFoundError(fmt.Errorf("The tag %v:%v is not found", art.Repository, art.Tag))
|
return errors.NotFoundError(fmt.Errorf("The tag %v:%v is not found", art.Repository, art.Tag))
|
||||||
}
|
}
|
||||||
go func() {
|
go func(art lib.ArtifactInfo) {
|
||||||
// After docker 20.10 or containerd, the client heads the tag first,
|
// After docker 20.10 or containerd, the client heads the tag first,
|
||||||
// Then GET the image by digest, in order to associate the tag with the digest
|
// Then GET the image by digest, in order to associate the tag with the digest
|
||||||
// Ensure tag after head request, make sure tags in proxy cache keep update
|
// Ensure tag after head request, make sure tags in proxy cache keep update
|
||||||
@ -251,7 +251,8 @@ func proxyManifestHead(ctx context.Context, w http.ResponseWriter, ctl proxy.Con
|
|||||||
}
|
}
|
||||||
log.Debugf("Failed to ensure tag %+v , error %v", art, err)
|
log.Debugf("Failed to ensure tag %+v , error %v", art, err)
|
||||||
}
|
}
|
||||||
}()
|
}(art)
|
||||||
|
|
||||||
w.Header().Set(contentType, desc.MediaType)
|
w.Header().Set(contentType, desc.MediaType)
|
||||||
w.Header().Set(contentLength, fmt.Sprintf("%v", desc.Size))
|
w.Header().Set(contentLength, fmt.Sprintf("%v", desc.Size))
|
||||||
w.Header().Set(dockerContentDigest, string(desc.Digest))
|
w.Header().Set(dockerContentDigest, string(desc.Digest))
|
||||||
|
Loading…
Reference in New Issue
Block a user