Merge pull request #12518 from stonezdj/202020717_duplicate_op

Fix #12487: Proxy cache create duplicated operation log
This commit is contained in:
stonezdj(Daojun Zhang) 2020-08-10 17:01:07 +08:00 committed by GitHub
commit 49f4559608
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 57 deletions

View File

@ -30,8 +30,7 @@ import (
"github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/replication/registry" "github.com/opencontainers/image-spec/specs-go/v1"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
) )
const ( const (
@ -52,7 +51,7 @@ type Controller interface {
// UseLocalBlob check if the blob should use local copy // UseLocalBlob check if the blob should use local copy
UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool
// UseLocalManifest check manifest should use local copy // UseLocalManifest check manifest should use local copy
UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) (bool, error)
// ProxyBlob proxy the blob request to the remote server, p is the proxy project // ProxyBlob proxy the blob request to the remote server, p is the proxy project
// art is the ArtifactInfo which includes the digest of the blob // art is the ArtifactInfo which includes the digest of the blob
ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error)
@ -62,7 +61,6 @@ type Controller interface {
} }
type controller struct { type controller struct {
blobCtl blob.Controller blobCtl blob.Controller
registryMgr registry.Manager
artifactCtl artifact.Controller artifactCtl artifact.Controller
local localInterface local localInterface
} }
@ -74,7 +72,6 @@ func ControllerInstance() Controller {
once.Do(func() { once.Do(func() {
ctl = &controller{ ctl = &controller{
blobCtl: blob.Ctl, blobCtl: blob.Ctl,
registryMgr: registry.NewDefaultManager(),
artifactCtl: artifact.Ctl, artifactCtl: artifact.Ctl,
local: newLocalHelper(), local: newLocalHelper(),
} }
@ -94,22 +91,23 @@ func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) boo
return exist return exist
} }
func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool { func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
if len(art.Digest) == 0 { if len(art.Digest) == 0 {
return false return false, nil
} }
return c.local.ManifestExist(ctx, art) a, err := c.local.GetManifest(ctx, art)
return a != nil, err
} }
func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) { func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) {
var man distribution.Manifest var man distribution.Manifest
remoteRepo := getRemoteRepo(art) remoteRepo := getRemoteRepo(art)
r, err := newRemoteHelper(p.RegistryID) ref := getReference(art)
remote, err := newRemoteHelper(p.RegistryID)
if err != nil { if err != nil {
return man, err return man, err
} }
ref := getReference(art) man, dig, err := remote.Manifest(remoteRepo, ref)
man, err = r.Manifest(remoteRepo, ref)
if err != nil { if err != nil {
if errors.IsNotFoundErr(err) { if errors.IsNotFoundErr(err) {
go func() { go func() {
@ -124,7 +122,26 @@ func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art l
} }
// Push manifest in background // Push manifest in background
go func() { go func() {
c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r) a, err := c.local.GetManifest(ctx, art)
if err != nil {
log.Errorf("failed to get manifest, error %v", err)
}
// Push manifest to local when pull with digest, or artifact not found, or digest mismatch
if len(art.Tag) == 0 || a == nil || a.Digest != dig {
// pull with digest
c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, remote)
}
// Query artifact after push
if a == nil {
a, err = c.local.GetManifest(ctx, art)
if err != nil {
log.Errorf("failed to get manifest, error %v", err)
}
}
if a != nil {
SendPullEvent(ctx, a, art.Tag)
}
}() }()
return man, nil return man, nil
@ -184,7 +201,7 @@ func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string,
} }
if len(waitBlobs) > 0 { if len(waitBlobs) > 0 {
// docker client will skip to pull layers exist in local // docker client will skip to pull layers exist in local
// these blobs is not exist in the proxy server // these blobs are not exist in the proxy server
// it will cause the manifest dependency check always fail // it will cause the manifest dependency check always fail
// need to push these blobs before push manifest to avoid failure // need to push these blobs before push manifest to avoid failure
log.Debug("Waiting blobs not empty, push it to local repo directly") log.Debug("Waiting blobs not empty, push it to local repo directly")

View File

@ -17,30 +17,60 @@ package proxy
import ( import (
"context" "context"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/artifact" "github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/blob" "github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/replication/registry"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"io" "io"
"testing" "testing"
) )
type remoteInterfaceMock struct {
mock.Mock
}
func (r *remoteInterfaceMock) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
panic("implement me")
}
func (r *remoteInterfaceMock) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
panic("implement me")
}
func (r *remoteInterfaceMock) ManifestExist(repo, ref string) (bool, string, error) {
args := r.Called(repo, ref)
return args.Bool(0), args.String(1), args.Error(2)
}
type localInterfaceMock struct { type localInterfaceMock struct {
mock.Mock mock.Mock
} }
func (l *localInterfaceMock) SendPullEvent(ctx context.Context, repo, tag string) {
panic("implement me")
}
func (l *localInterfaceMock) GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error) {
args := l.Called(ctx, art)
var a *artifact.Artifact
if args.Get(0) != nil {
a = args.Get(0).(*artifact.Artifact)
}
return a, args.Error(1)
}
func (l *localInterfaceMock) SameArtifact(ctx context.Context, repo, tag, dig string) (bool, error) {
panic("implement me")
}
func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) { func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
args := l.Called(ctx, art) args := l.Called(ctx, art)
return args.Bool(0), args.Error(1) return args.Bool(0), args.Error(1)
} }
func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
args := l.Called(ctx, art)
return args.Bool(0)
}
func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error { func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
panic("implement me") panic("implement me")
} }
@ -64,14 +94,17 @@ func (l *localInterfaceMock) DeleteManifest(repo, ref string) {
type proxyControllerTestSuite struct { type proxyControllerTestSuite struct {
suite.Suite suite.Suite
local *localInterfaceMock local *localInterfaceMock
remote *remoteInterfaceMock
ctr Controller ctr Controller
proj *models.Project
} }
func (p *proxyControllerTestSuite) SetupTest() { func (p *proxyControllerTestSuite) SetupTest() {
p.local = &localInterfaceMock{} p.local = &localInterfaceMock{}
p.remote = &remoteInterfaceMock{}
p.proj = &models.Project{RegistryID: 1}
p.ctr = &controller{ p.ctr = &controller{
blobCtl: blob.Ctl, blobCtl: blob.Ctl,
registryMgr: registry.NewDefaultManager(),
artifactCtl: artifact.Ctl, artifactCtl: artifact.Ctl,
local: p.local, local: p.local,
} }
@ -81,8 +114,10 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_True() {
ctx := context.Background() ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
result := p.ctr.UseLocalManifest(ctx, art)
result, err := p.ctr.UseLocalManifest(ctx, art)
p.Assert().Nil(err)
p.Assert().True(result) p.Assert().True(result)
} }
@ -90,16 +125,18 @@ func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
ctx := context.Background() ctx := context.Background()
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b" dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil) p.local.On("GetManifest", mock.Anything, mock.Anything).Return(nil, nil)
result := p.ctr.UseLocalManifest(ctx, art) result, err := p.ctr.UseLocalManifest(ctx, art)
p.Assert().Nil(err)
p.Assert().False(result) p.Assert().False(result)
} }
func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() { func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
ctx := context.Background() ctx := context.Background()
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"} art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil) p.local.On("GetManifest", mock.Anything, mock.Anything).Return(&artifact.Artifact{}, nil)
result := p.ctr.UseLocalManifest(ctx, art) result, err := p.ctr.UseLocalManifest(ctx, art)
p.Assert().Nil(err)
p.Assert().False(result) p.Assert().False(result)
} }

View File

@ -16,26 +16,30 @@ package proxy
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io"
"time"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist" "github.com/docker/distribution/manifest/manifestlist"
"github.com/goharbor/harbor/src/controller/artifact" "github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/event/metadata"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/notifier/event"
"github.com/goharbor/harbor/src/pkg/proxy/secret" "github.com/goharbor/harbor/src/pkg/proxy/secret"
"github.com/goharbor/harbor/src/pkg/registry" "github.com/goharbor/harbor/src/pkg/registry"
"io" "github.com/opencontainers/go-digest"
"time"
) )
// localInterface defines operations related to local repo under proxy mode // localInterface defines operations related to local repo under proxy mode
type localInterface interface { type localInterface interface {
// BlobExist check if the blob exist in local repo // BlobExist check if the blob exist in local repo
BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error)
// Manifest check if the manifest exist in local repo // GetManifest get the manifest info
ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error)
// PushBlob push blob to local repo // PushBlob push blob to local repo
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
// PushManifest push manifest to local repo, ref can be digest or tag // PushManifest push manifest to local repo, ref can be digest or tag
@ -48,13 +52,16 @@ type localInterface interface {
DeleteManifest(repo, ref string) DeleteManifest(repo, ref string)
} }
func (l *localHelper) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool { func (l *localHelper) GetManifest(ctx context.Context, art lib.ArtifactInfo) (*artifact.Artifact, error) {
a, err := l.artifactCtl.GetByReference(ctx, art.Repository, art.Digest, nil) ref := getReference(art)
a, err := l.artifactCtl.GetByReference(ctx, art.Repository, ref, nil)
if err != nil { if err != nil {
log.Errorf("check manifest exist failed, error %v", err) if errors.IsNotFoundErr(err) {
return false return nil, nil
} }
return a != nil return nil, err
}
return a, nil
} }
// localHelper defines operations related to local repo under proxy mode // localHelper defines operations related to local repo under proxy mode
@ -134,7 +141,11 @@ func (l *localHelper) updateManifestList(ctx context.Context, repo string, manif
existMans := make([]manifestlist.ManifestDescriptor, 0) existMans := make([]manifestlist.ManifestDescriptor, 0)
for _, m := range v.Manifests { for _, m := range v.Manifests {
art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)} art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
if l.ManifestExist(ctx, art) { a, err := l.GetManifest(ctx, art)
if err != nil {
return nil, err
}
if a != nil {
existMans = append(existMans, m) existMans = append(existMans, m)
} }
} }
@ -143,14 +154,14 @@ func (l *localHelper) updateManifestList(ctx context.Context, repo string, manif
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest) return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
} }
func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error { func (l *localHelper) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
// For manifest list, it might include some different manifest // For manifest list, it might include some different manifest
// it will wait and check for 30 mins, if all depend manifests exist then push it // it will wait and check for 30 mins, if all depend manifests exist then push it
// if time exceed, only push the new updated manifest list which contains existing manifest // if time exceed, only push the new updated manifest list which contains existing manifest
var newMan distribution.Manifest var newMan distribution.Manifest
var err error var err error
for n := 0; n < maxManifestListWait; n++ { for n := 0; n < maxManifestListWait; n++ {
log.Debugf("waiting for the manifest ready, repo %v, ref:%v", repo, ref) log.Debugf("waiting for the manifest ready, repo %v, tag:%v", repo, tag)
time.Sleep(sleepIntervalSec * time.Second) time.Sleep(sleepIntervalSec * time.Second)
newMan, err = l.updateManifestList(ctx, repo, man) newMan, err = l.updateManifestList(ctx, repo, man)
if err != nil { if err != nil {
@ -160,7 +171,6 @@ func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref str
break break
} }
} }
if len(newMan.References()) == 0 { if len(newMan.References()) == 0 {
return errors.New("manifest list doesn't contain any pushed manifest") return errors.New("manifest list doesn't contain any pushed manifest")
} }
@ -170,7 +180,17 @@ func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref str
return err return err
} }
log.Debugf("The manifest list payload: %v", string(pl)) log.Debugf("The manifest list payload: %v", string(pl))
return l.PushManifest(repo, ref, newMan) dig := digest.FromBytes(pl)
// Because the manifest list maybe updated, need to recheck if it is exist in local
art := lib.ArtifactInfo{Repository: repo, Tag: tag}
a, err := l.GetManifest(ctx, art)
if err != nil {
return err
}
if a != nil && a.Digest == string(dig) {
return nil
}
return l.PushManifest(repo, tag, newMan)
} }
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor { func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
@ -188,3 +208,13 @@ func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man di
log.Debugf("Check dependency result %v", waitDesc) log.Debugf("Check dependency result %v", waitDesc)
return waitDesc return waitDesc
} }
// SendPullEvent send a pull image event
func SendPullEvent(ctx context.Context, a *artifact.Artifact, tag string) {
e := &metadata.PullArtifactEventMetadata{
Ctx: ctx,
Artifact: &a.Artifact,
Tag: tag,
}
event.BuildAndPublish(e)
}

View File

@ -191,8 +191,9 @@ func (lh *localHelperTestSuite) TestManifestExist() {
var opt *artifact.Option var opt *artifact.Option
lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil) lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil)
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig} art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
exist := lh.local.ManifestExist(ctx, art) a, err := lh.local.GetManifest(ctx, art)
lh.Assert().True(exist) lh.Assert().Nil(err)
lh.Assert().NotNil(a)
} }
func TestLocalHelperTestSuite(t *testing.T) { func TestLocalHelperTestSuite(t *testing.T) {

View File

@ -17,8 +17,8 @@ package proxy
import ( import (
"fmt" "fmt"
"github.com/docker/distribution" "github.com/docker/distribution"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/registry" "github.com/goharbor/harbor/src/replication/registry"
"io" "io"
) )
@ -28,20 +28,22 @@ type remoteInterface interface {
// BlobReader create a reader for remote blob // BlobReader create a reader for remote blob
BlobReader(repo, dig string) (int64, io.ReadCloser, error) BlobReader(repo, dig string) (int64, io.ReadCloser, error)
// Manifest get manifest by reference // Manifest get manifest by reference
Manifest(repo string, ref string) (distribution.Manifest, error) Manifest(repo string, ref string) (distribution.Manifest, string, error)
} }
// remoteHelper defines operations related to remote repository under proxy // remoteHelper defines operations related to remote repository under proxy
type remoteHelper struct { type remoteHelper struct {
regID int64 regID int64
registry adapter.ArtifactRegistry registry adapter.ArtifactRegistry
registryMgr registry.Manager
} }
// newRemoteHelper create a remoteHelper interface // newRemoteHelper create a remoteHelper interface
func newRemoteHelper(regID int64) (remoteInterface, error) { func newRemoteHelper(regID int64) (*remoteHelper, error) {
r := &remoteHelper{regID: regID} r := &remoteHelper{
regID: regID,
registryMgr: registry.NewDefaultManager()}
if err := r.init(); err != nil { if err := r.init(); err != nil {
log.Errorf("failed to create remoteHelper error %v", err)
return nil, err return nil, err
} }
return r, nil return r, nil
@ -52,13 +54,16 @@ func (r *remoteHelper) init() error {
if r.registry != nil { if r.registry != nil {
return nil return nil
} }
reg, err := registry.NewDefaultManager().Get(r.regID) reg, err := r.registryMgr.Get(r.regID)
if err != nil { if err != nil {
return err return err
} }
if reg == nil { if reg == nil {
return fmt.Errorf("failed to get registry, registryID: %v", r.regID) return fmt.Errorf("failed to get registry, registryID: %v", r.regID)
} }
if reg.Status != model.Healthy {
return fmt.Errorf("current registry is unhealthy, regID:%v, Name:%v, Status: %v", reg.ID, reg.Name, reg.Status)
}
factory, err := adapter.GetFactory(reg.Type) factory, err := adapter.GetFactory(reg.Type)
if err != nil { if err != nil {
return err return err
@ -75,7 +80,6 @@ func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error
return r.registry.PullBlob(repo, dig) return r.registry.PullBlob(repo, dig)
} }
func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, error) { func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
man, _, err := r.registry.PullManifest(repo, ref) return r.registry.PullManifest(repo, ref)
return man, err
} }

View File

@ -98,7 +98,15 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
if err != nil { if err != nil {
return err return err
} }
if !canProxy(p) || proxyCtl.UseLocalManifest(ctx, art) { if !canProxy(p) {
next.ServeHTTP(w, r)
return nil
}
useLocal, err := proxyCtl.UseLocalManifest(ctx, art)
if err != nil {
return err
}
if useLocal {
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
return nil return nil
} }