mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-20 15:48:26 +01:00
Merge pull request #12274 from stonezdj/20200617_proxy_demo
Add proxy cache feature
This commit is contained in:
commit
ae2a2683c9
215
src/controller/proxy/controller.go
Normal file
215
src/controller/proxy/controller.go
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/docker/distribution/manifest/manifestlist"
|
||||||
|
"github.com/goharbor/harbor/src/common/models"
|
||||||
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
|
"github.com/goharbor/harbor/src/controller/blob"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/replication/registry"
|
||||||
|
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready
|
||||||
|
maxManifestListWait = 20
|
||||||
|
maxManifestWait = 10
|
||||||
|
sleepIntervalSec = 20
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Ctl is a global proxy controller instance
|
||||||
|
ctl Controller
|
||||||
|
once sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
// Controller defines the operations related with pull through proxy
|
||||||
|
type Controller interface {
|
||||||
|
// UseLocalBlob check if the blob should use local copy
|
||||||
|
UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool
|
||||||
|
// UseLocalManifest check manifest should use local copy
|
||||||
|
UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool
|
||||||
|
// ProxyBlob proxy the blob request to the remote server, p is the proxy project
|
||||||
|
// art is the ArtifactInfo which includes the digest of the blob
|
||||||
|
ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error)
|
||||||
|
// ProxyManifest proxy the manifest request to the remote server, p is the proxy project,
|
||||||
|
// art is the ArtifactInfo which includes the tag or digest of the manifest
|
||||||
|
ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error)
|
||||||
|
}
|
||||||
|
type controller struct {
|
||||||
|
blobCtl blob.Controller
|
||||||
|
registryMgr registry.Manager
|
||||||
|
artifactCtl artifact.Controller
|
||||||
|
local localInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
// ControllerInstance -- Get the proxy controller instance
|
||||||
|
func ControllerInstance() Controller {
|
||||||
|
// Lazy load the controller
|
||||||
|
// Because LocalHelper is not ready unless core startup completely
|
||||||
|
once.Do(func() {
|
||||||
|
ctl = &controller{
|
||||||
|
blobCtl: blob.Ctl,
|
||||||
|
registryMgr: registry.NewDefaultManager(),
|
||||||
|
artifactCtl: artifact.Ctl,
|
||||||
|
local: newLocalHelper(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return ctl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool {
|
||||||
|
if len(art.Digest) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
exist, err := c.local.BlobExist(ctx, art)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return exist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo) bool {
|
||||||
|
if len(art.Digest) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return c.local.ManifestExist(ctx, art)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) ProxyManifest(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (distribution.Manifest, error) {
|
||||||
|
var man distribution.Manifest
|
||||||
|
remoteRepo := getRemoteRepo(art)
|
||||||
|
r, err := newRemoteHelper(p.RegistryID)
|
||||||
|
if err != nil {
|
||||||
|
return man, err
|
||||||
|
}
|
||||||
|
ref := getReference(art)
|
||||||
|
man, err = r.Manifest(remoteRepo, ref)
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFoundErr(err) {
|
||||||
|
go func() {
|
||||||
|
c.local.DeleteManifest(remoteRepo, art.Tag)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return man, err
|
||||||
|
}
|
||||||
|
ct, _, err := man.Payload()
|
||||||
|
if err != nil {
|
||||||
|
return man, err
|
||||||
|
}
|
||||||
|
// Push manifest in background
|
||||||
|
go func() {
|
||||||
|
c.waitAndPushManifest(ctx, remoteRepo, man, art, ct, r)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return man, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) ProxyBlob(ctx context.Context, p *models.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
|
||||||
|
remoteRepo := getRemoteRepo(art)
|
||||||
|
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
|
||||||
|
rHelper, err := newRemoteHelper(p.RegistryID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
size, bReader, err := rHelper.BlobReader(remoteRepo, art.Digest)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to pull blob, error %v", err)
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
desc := distribution.Descriptor{Size: size, Digest: digest.Digest(art.Digest)}
|
||||||
|
go func() {
|
||||||
|
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, rHelper)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while putting blob to local repo, %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return size, bReader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r remoteInterface) error {
|
||||||
|
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest)
|
||||||
|
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to create blob reader, error %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer bReader.Close()
|
||||||
|
err = c.local.PushBlob(localRepo, desc, bReader)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r remoteInterface) {
|
||||||
|
if contType == manifestlist.MediaTypeManifestList || contType == v1.MediaTypeImageIndex {
|
||||||
|
err := c.local.PushManifestList(ctx, art.Repository, getReference(art), man)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error when push manifest list to local :%v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var waitBlobs []distribution.Descriptor
|
||||||
|
for n := 0; n < maxManifestWait; n++ {
|
||||||
|
time.Sleep(sleepIntervalSec * time.Second)
|
||||||
|
waitBlobs = c.local.CheckDependencies(ctx, art.Repository, man)
|
||||||
|
if len(waitBlobs) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Debugf("Current n=%v artifact: %v:%v", n, art.Repository, art.Tag)
|
||||||
|
}
|
||||||
|
if len(waitBlobs) > 0 {
|
||||||
|
// docker client will skip to pull layers exist in local
|
||||||
|
// these blobs is not exist in the proxy server
|
||||||
|
// it will cause the manifest dependency check always fail
|
||||||
|
// need to push these blobs before push manifest to avoid failure
|
||||||
|
log.Debug("Waiting blobs not empty, push it to local repo directly")
|
||||||
|
for _, desc := range waitBlobs {
|
||||||
|
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, r)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Failed to push blob to local repo, error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := c.local.PushManifest(art.Repository, getReference(art), man)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to push manifest, tag: %v, error %v", art.Tag, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getRemoteRepo get the remote repository name, used in proxy cache
|
||||||
|
func getRemoteRepo(art lib.ArtifactInfo) string {
|
||||||
|
return strings.TrimPrefix(art.Repository, art.ProjectName+"/")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReference(art lib.ArtifactInfo) string {
|
||||||
|
if len(art.Tag) > 0 {
|
||||||
|
return art.Tag
|
||||||
|
}
|
||||||
|
return art.Digest
|
||||||
|
}
|
174
src/controller/proxy/controller_test.go
Normal file
174
src/controller/proxy/controller_test.go
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
|
"github.com/goharbor/harbor/src/controller/blob"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/replication/registry"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type localInterfaceMock struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
|
||||||
|
args := l.Called(ctx, art)
|
||||||
|
return args.Bool(0), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
|
||||||
|
args := l.Called(ctx, art)
|
||||||
|
return args.Bool(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) PushManifest(repo string, tag string, manifest distribution.Manifest) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) PushManifestList(ctx context.Context, repo string, tag string, man distribution.Manifest) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localInterfaceMock) DeleteManifest(repo, ref string) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
type proxyControllerTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
local *localInterfaceMock
|
||||||
|
ctr Controller
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) SetupTest() {
|
||||||
|
p.local = &localInterfaceMock{}
|
||||||
|
p.ctr = &controller{
|
||||||
|
blobCtl: blob.Ctl,
|
||||||
|
registryMgr: registry.NewDefaultManager(),
|
||||||
|
artifactCtl: artifact.Ctl,
|
||||||
|
local: p.local,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) TestUseLocalManifest_True() {
|
||||||
|
ctx := context.Background()
|
||||||
|
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||||
|
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
|
||||||
|
result := p.ctr.UseLocalManifest(ctx, art)
|
||||||
|
p.Assert().True(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) TestUseLocalManifest_False() {
|
||||||
|
ctx := context.Background()
|
||||||
|
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||||
|
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(false, nil)
|
||||||
|
result := p.ctr.UseLocalManifest(ctx, art)
|
||||||
|
p.Assert().False(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) TestUseLocalManifestWithTag_False() {
|
||||||
|
ctx := context.Background()
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Tag: "latest"}
|
||||||
|
p.local.On("ManifestExist", mock.Anything, mock.Anything).Return(true, nil)
|
||||||
|
result := p.ctr.UseLocalManifest(ctx, art)
|
||||||
|
p.Assert().False(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) TestUseLocalBlob_True() {
|
||||||
|
ctx := context.Background()
|
||||||
|
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||||
|
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil)
|
||||||
|
result := p.ctr.UseLocalBlob(ctx, art)
|
||||||
|
p.Assert().True(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxyControllerTestSuite) TestUseLocalBlob_False() {
|
||||||
|
ctx := context.Background()
|
||||||
|
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||||
|
p.local.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil)
|
||||||
|
result := p.ctr.UseLocalBlob(ctx, art)
|
||||||
|
p.Assert().False(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProxyControllerTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &proxyControllerTestSuite{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProxyCacheRemoteRepo(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
in lib.ArtifactInfo
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: `normal test`,
|
||||||
|
in: lib.ArtifactInfo{ProjectName: "dockerhub_proxy", Repository: "dockerhub_proxy/firstfloor/hello-world"},
|
||||||
|
want: "firstfloor/hello-world",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range cases {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := getRemoteRepo(tt.in)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func TestGetRef(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
in lib.ArtifactInfo
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: `normal`,
|
||||||
|
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "latest", Digest: "sha256:aabbcc"},
|
||||||
|
want: "latest",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: `digest_only`,
|
||||||
|
in: lib.ArtifactInfo{Repository: "hello-world", Tag: "", Digest: "sha256:aabbcc"},
|
||||||
|
want: "sha256:aabbcc",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range cases {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := getReference(tt.in)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf(`(%v) = %v; want "%v"`, tt.in, got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
46
src/controller/proxy/inflight.go
Normal file
46
src/controller/proxy/inflight.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type inflightRequest struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
reqMap map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var inflightChecker = &inflightRequest{
|
||||||
|
reqMap: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// addRequest if the artifact already exist in the inflightRequest, return false
|
||||||
|
// else return true
|
||||||
|
func (in *inflightRequest) addRequest(artifact string) (suc bool) {
|
||||||
|
in.mu.Lock()
|
||||||
|
defer in.mu.Unlock()
|
||||||
|
_, ok := in.reqMap[artifact]
|
||||||
|
if ok {
|
||||||
|
// Skip some following operation if it is in reqMap
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
in.reqMap[artifact] = 1
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (in *inflightRequest) removeRequest(artifact string) {
|
||||||
|
in.mu.Lock()
|
||||||
|
defer in.mu.Unlock()
|
||||||
|
delete(in.reqMap, artifact)
|
||||||
|
}
|
30
src/controller/proxy/inflight_test.go
Normal file
30
src/controller/proxy/inflight_test.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInflightRequest(t *testing.T) {
|
||||||
|
artName := "hello-world:latest"
|
||||||
|
inflightChecker.addRequest(artName)
|
||||||
|
_, ok := inflightChecker.reqMap[artName]
|
||||||
|
assert.True(t, ok)
|
||||||
|
inflightChecker.removeRequest(artName)
|
||||||
|
_, exist := inflightChecker.reqMap[artName]
|
||||||
|
assert.False(t, exist)
|
||||||
|
}
|
191
src/controller/proxy/local.go
Normal file
191
src/controller/proxy/local.go
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/docker/distribution/manifest/manifestlist"
|
||||||
|
comHttpAuth "github.com/goharbor/harbor/src/common/http/modifier/auth"
|
||||||
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/registry"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// localInterface defines operations related to local repo under proxy mode
|
||||||
|
type localInterface interface {
|
||||||
|
// BlobExist check if the blob exist in local repo
|
||||||
|
BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error)
|
||||||
|
// Manifest check if the manifest exist in local repo
|
||||||
|
ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool
|
||||||
|
// PushBlob push blob to local repo
|
||||||
|
PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error
|
||||||
|
// PushManifest push manifest to local repo, ref can be digest or tag
|
||||||
|
PushManifest(repo string, ref string, manifest distribution.Manifest) error
|
||||||
|
// PushManifestList push manifest list to local repo
|
||||||
|
PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error
|
||||||
|
// CheckDependencies check if the manifest's dependency is ready
|
||||||
|
CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor
|
||||||
|
// DeleteManifest cleanup delete tag from local cache
|
||||||
|
DeleteManifest(repo, ref string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) ManifestExist(ctx context.Context, art lib.ArtifactInfo) bool {
|
||||||
|
a, err := l.artifactCtl.GetByReference(ctx, art.Repository, art.Digest, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("check manifest exist failed, error %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return a != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// localHelper defines operations related to local repo under proxy mode
|
||||||
|
type localHelper struct {
|
||||||
|
registry registry.Client
|
||||||
|
artifactCtl artifactController
|
||||||
|
}
|
||||||
|
|
||||||
|
type artifactController interface {
|
||||||
|
GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (artifact *artifact.Artifact, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLocalHelper create the localInterface
|
||||||
|
func newLocalHelper() localInterface {
|
||||||
|
l := &localHelper{artifactCtl: artifact.Ctl}
|
||||||
|
l.init()
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) BlobExist(ctx context.Context, art lib.ArtifactInfo) (bool, error) {
|
||||||
|
return l.registry.BlobExist(art.Repository, art.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) init() {
|
||||||
|
if l.registry != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Debugf("core url:%s, local core url: %v", config.GetCoreURL(), config.LocalCoreURL())
|
||||||
|
// the traffic is internal only
|
||||||
|
registryURL := config.LocalCoreURL()
|
||||||
|
authorizer := comHttpAuth.NewSecretAuthorizer(config.ProxyServiceSecret)
|
||||||
|
l.registry = registry.NewClientWithAuthorizer(registryURL, authorizer, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) PushBlob(localRepo string, desc distribution.Descriptor, bReader io.ReadCloser) error {
|
||||||
|
log.Debugf("Put blob to local registry, localRepo:%v, digest: %v", localRepo, desc.Digest)
|
||||||
|
ref := string(desc.Digest)
|
||||||
|
artName := localRepo + ":" + ref
|
||||||
|
// use inflight checker to avoid multiple requests to push blob to local in same time
|
||||||
|
if !inflightChecker.addRequest(artName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer inflightChecker.removeRequest(artName)
|
||||||
|
err := l.registry.PushBlob(localRepo, ref, desc.Size, bReader)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) PushManifest(repo string, ref string, manifest distribution.Manifest) error {
|
||||||
|
// Make sure there is only one go routing to push current artName to local repo
|
||||||
|
artName := repo + ":" + ref
|
||||||
|
// use inflight checker to avoid multiple requests to push manifest to local in same time
|
||||||
|
if !inflightChecker.addRequest(artName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer inflightChecker.removeRequest(artName)
|
||||||
|
|
||||||
|
mediaType, payload, err := manifest.Payload()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = l.registry.PushManifest(repo, ref, mediaType, payload)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteManifest cleanup delete tag from local repo
|
||||||
|
func (l *localHelper) DeleteManifest(repo, ref string) {
|
||||||
|
log.Debug("Remove tag from repo if it is exist")
|
||||||
|
if err := l.registry.DeleteManifest(repo, ref); err != nil {
|
||||||
|
// sometimes user pull a non-exist image
|
||||||
|
log.Warningf("failed to remove artifact, error %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateManifestList -- Trim the manifest list, make sure at least one depend manifests is ready
|
||||||
|
func (l *localHelper) updateManifestList(ctx context.Context, repo string, manifest distribution.Manifest) (distribution.Manifest, error) {
|
||||||
|
switch v := manifest.(type) {
|
||||||
|
case *manifestlist.DeserializedManifestList:
|
||||||
|
existMans := make([]manifestlist.ManifestDescriptor, 0)
|
||||||
|
for _, m := range v.Manifests {
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Digest: string(m.Digest)}
|
||||||
|
if l.ManifestExist(ctx, art) {
|
||||||
|
existMans = append(existMans, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return manifestlist.FromDescriptors(existMans)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("current manifest list type is unknown, manifest type[%T], content [%+v]", manifest, manifest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) PushManifestList(ctx context.Context, repo string, ref string, man distribution.Manifest) error {
|
||||||
|
// For manifest list, it might include some different manifest
|
||||||
|
// it will wait and check for 30 mins, if all depend manifests exist then push it
|
||||||
|
// if time exceed, only push the new updated manifest list which contains existing manifest
|
||||||
|
var newMan distribution.Manifest
|
||||||
|
var err error
|
||||||
|
for n := 0; n < maxManifestListWait; n++ {
|
||||||
|
log.Debugf("waiting for the manifest ready, repo %v, ref:%v", repo, ref)
|
||||||
|
time.Sleep(sleepIntervalSec * time.Second)
|
||||||
|
newMan, err = l.updateManifestList(ctx, repo, man)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(newMan.References()) == len(man.References()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(newMan.References()) == 0 {
|
||||||
|
return errors.New("manifest list doesn't contain any pushed manifest")
|
||||||
|
}
|
||||||
|
_, pl, err := newMan.Payload()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get payload, error %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debugf("The manifest list payload: %v", string(pl))
|
||||||
|
return l.PushManifest(repo, ref, newMan)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *localHelper) CheckDependencies(ctx context.Context, repo string, man distribution.Manifest) []distribution.Descriptor {
|
||||||
|
descriptors := man.References()
|
||||||
|
waitDesc := make([]distribution.Descriptor, 0)
|
||||||
|
for _, desc := range descriptors {
|
||||||
|
log.Debugf("checking the blob dependency: %v", desc.Digest)
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Digest: string(desc.Digest)}
|
||||||
|
exist, err := l.BlobExist(ctx, art)
|
||||||
|
if err != nil || !exist {
|
||||||
|
log.Debugf("Check dependency failed!")
|
||||||
|
waitDesc = append(waitDesc, desc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debugf("Check dependency result %v", waitDesc)
|
||||||
|
return waitDesc
|
||||||
|
}
|
200
src/controller/proxy/local_test.go
Normal file
200
src/controller/proxy/local_test.go
Normal file
@ -0,0 +1,200 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
distribution2 "github.com/docker/distribution"
|
||||||
|
"github.com/docker/distribution/manifest"
|
||||||
|
"github.com/docker/distribution/manifest/manifestlist"
|
||||||
|
"github.com/docker/distribution/manifest/schema2"
|
||||||
|
"github.com/goharbor/harbor/src/controller/artifact"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/distribution"
|
||||||
|
"github.com/opencontainers/go-digest"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
testregistry "github.com/goharbor/harbor/src/testing/pkg/registry"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockManifest struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockManifest) References() []distribution2.Descriptor {
|
||||||
|
args := m.Called()
|
||||||
|
desc := make([]distribution2.Descriptor, 0)
|
||||||
|
if args[0] != nil {
|
||||||
|
desc = args[0].([]distribution2.Descriptor)
|
||||||
|
}
|
||||||
|
return desc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockManifest) Payload() (mediaType string, payload []byte, err error) {
|
||||||
|
args := m.Called()
|
||||||
|
p := make([]byte, 0)
|
||||||
|
if args[1] != nil {
|
||||||
|
p = args[1].([]byte)
|
||||||
|
}
|
||||||
|
return args.String(0), p, args.Error(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
type artifactControllerMock struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *artifactControllerMock) GetByReference(ctx context.Context, repository, reference string, option *artifact.Option) (arti *artifact.Artifact, err error) {
|
||||||
|
args := a.Called(ctx, repository, reference, option)
|
||||||
|
art := &artifact.Artifact{}
|
||||||
|
if args[0] != nil {
|
||||||
|
art = args[0].(*artifact.Artifact)
|
||||||
|
}
|
||||||
|
return art, args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
type localHelperTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
registryClient *testregistry.FakeClient
|
||||||
|
local *localHelper
|
||||||
|
artCtl *artifactControllerMock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) SetupTest() {
|
||||||
|
lh.registryClient = &testregistry.FakeClient{}
|
||||||
|
lh.artCtl = &artifactControllerMock{}
|
||||||
|
lh.local = &localHelper{registry: lh.registryClient, artifactCtl: lh.artCtl}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestBlobExist_False() {
|
||||||
|
repo := "library/hello-world"
|
||||||
|
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
|
||||||
|
ctx := context.Background()
|
||||||
|
lh.registryClient.On("BlobExist").Return(false, nil)
|
||||||
|
exist, err := lh.local.BlobExist(ctx, art)
|
||||||
|
lh.Require().Nil(err)
|
||||||
|
lh.Assert().Equal(false, exist)
|
||||||
|
}
|
||||||
|
func (lh *localHelperTestSuite) TestBlobExist_True() {
|
||||||
|
repo := "library/hello-world"
|
||||||
|
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
|
||||||
|
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
|
||||||
|
ctx := context.Background()
|
||||||
|
lh.registryClient.On("BlobExist").Return(true, nil)
|
||||||
|
exist, err := lh.local.BlobExist(ctx, art)
|
||||||
|
lh.Require().Nil(err)
|
||||||
|
lh.Assert().Equal(true, exist)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestPushManifest() {
|
||||||
|
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
|
||||||
|
lh.registryClient.On("PushManifest").Return(dig, nil)
|
||||||
|
manifest := &mockManifest{}
|
||||||
|
var ct string
|
||||||
|
manifest.Mock.On("Payload").Return(ct, []byte("example"), nil)
|
||||||
|
ct = schema2.MediaTypeManifest
|
||||||
|
err := lh.local.PushManifest("library/hello-world", "", manifest)
|
||||||
|
lh.Require().Nil(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestUpdateManifestList() {
|
||||||
|
ctx := context.Background()
|
||||||
|
amdDig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
armDig := "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"
|
||||||
|
manifestList := manifestlist.ManifestList{
|
||||||
|
Versioned: manifest.Versioned{
|
||||||
|
SchemaVersion: 2,
|
||||||
|
MediaType: manifestlist.MediaTypeManifestList,
|
||||||
|
},
|
||||||
|
Manifests: []manifestlist.ManifestDescriptor{
|
||||||
|
{
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(amdDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "amd64",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
Descriptor: distribution.Descriptor{
|
||||||
|
Digest: digest.Digest(armDig),
|
||||||
|
Size: 3253,
|
||||||
|
MediaType: schema2.MediaTypeManifest,
|
||||||
|
},
|
||||||
|
Platform: manifestlist.PlatformSpec{
|
||||||
|
Architecture: "arm",
|
||||||
|
OS: "linux",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manList := &manifestlist.DeserializedManifestList{
|
||||||
|
ManifestList: manifestList,
|
||||||
|
}
|
||||||
|
ar := &artifact.Artifact{}
|
||||||
|
var emptyArtifact *artifact.Artifact
|
||||||
|
var opt *artifact.Option
|
||||||
|
lh.artCtl.On("GetByReference", ctx, "library/hello-world", amdDig, opt).Return(ar, nil)
|
||||||
|
lh.artCtl.On("GetByReference", ctx, "library/hello-world", armDig, opt).Return(emptyArtifact, nil)
|
||||||
|
newMan, err := lh.local.updateManifestList(ctx, "library/hello-world", manList)
|
||||||
|
lh.Require().Nil(err)
|
||||||
|
lh.Assert().Equal(len(newMan.References()), 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
|
||||||
|
ctx := context.Background()
|
||||||
|
manifest := &mockManifest{}
|
||||||
|
refs := []distribution2.Descriptor{
|
||||||
|
{Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"},
|
||||||
|
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
|
||||||
|
}
|
||||||
|
manifest.On("References").Return(refs)
|
||||||
|
lh.registryClient.On("BlobExist").Return(false, nil)
|
||||||
|
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
|
||||||
|
lh.Assert().Equal(len(ret), 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestCheckDependencies_Suc() {
|
||||||
|
ctx := context.Background()
|
||||||
|
manifest := &mockManifest{}
|
||||||
|
refs := []distribution2.Descriptor{
|
||||||
|
{Digest: "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"},
|
||||||
|
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
|
||||||
|
}
|
||||||
|
manifest.On("References").Return(refs)
|
||||||
|
lh.registryClient.On("BlobExist").Return(true, nil)
|
||||||
|
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
|
||||||
|
lh.Assert().Equal(len(ret), 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *localHelperTestSuite) TestManifestExist() {
|
||||||
|
ctx := context.Background()
|
||||||
|
dig := "sha256:1a9ec845ee94c202b2d5da74a24f0ed2058318bfa9879fa541efaecba272e86b"
|
||||||
|
ar := &artifact.Artifact{}
|
||||||
|
var opt *artifact.Option
|
||||||
|
lh.artCtl.On("GetByReference", ctx, "library/hello-world", dig, opt).Return(ar, nil)
|
||||||
|
art := lib.ArtifactInfo{Repository: "library/hello-world", Digest: dig}
|
||||||
|
exist := lh.local.ManifestExist(ctx, art)
|
||||||
|
lh.Assert().True(exist)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLocalHelperTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &localHelperTestSuite{})
|
||||||
|
}
|
81
src/controller/proxy/remote.go
Normal file
81
src/controller/proxy/remote.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/docker/distribution"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/replication/adapter"
|
||||||
|
"github.com/goharbor/harbor/src/replication/registry"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// remoteInterface defines operations related to remote repository under proxy
|
||||||
|
type remoteInterface interface {
|
||||||
|
// BlobReader create a reader for remote blob
|
||||||
|
BlobReader(repo, dig string) (int64, io.ReadCloser, error)
|
||||||
|
// Manifest get manifest by reference
|
||||||
|
Manifest(repo string, ref string) (distribution.Manifest, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// remoteHelper defines operations related to remote repository under proxy
|
||||||
|
type remoteHelper struct {
|
||||||
|
regID int64
|
||||||
|
registry adapter.ArtifactRegistry
|
||||||
|
}
|
||||||
|
|
||||||
|
// newRemoteHelper create a remoteHelper interface
|
||||||
|
func newRemoteHelper(regID int64) (remoteInterface, error) {
|
||||||
|
r := &remoteHelper{regID: regID}
|
||||||
|
if err := r.init(); err != nil {
|
||||||
|
log.Errorf("failed to create remoteHelper error %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *remoteHelper) init() error {
|
||||||
|
|
||||||
|
if r.registry != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
reg, err := registry.NewDefaultManager().Get(r.regID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if reg == nil {
|
||||||
|
return fmt.Errorf("failed to get registry, registryID: %v", r.regID)
|
||||||
|
}
|
||||||
|
factory, err := adapter.GetFactory(reg.Type)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
adp, err := factory.Create(reg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.registry = adp.(adapter.ArtifactRegistry)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
|
||||||
|
return r.registry.PullBlob(repo, dig)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, error) {
|
||||||
|
man, _, err := r.registry.PullManifest(repo, ref)
|
||||||
|
return man, err
|
||||||
|
}
|
138
src/server/middleware/repoproxy/proxy.go
Normal file
138
src/server/middleware/repoproxy/proxy.go
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package repoproxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
httpLib "github.com/goharbor/harbor/src/lib/http"
|
||||||
|
"github.com/goharbor/harbor/src/replication/model"
|
||||||
|
"github.com/goharbor/harbor/src/replication/registry"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common/models"
|
||||||
|
"github.com/goharbor/harbor/src/controller/project"
|
||||||
|
"github.com/goharbor/harbor/src/controller/proxy"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/server/middleware"
|
||||||
|
)
|
||||||
|
|
||||||
|
var registryMgr = registry.NewDefaultManager()
|
||||||
|
|
||||||
|
// BlobGetMiddleware handle get blob request
|
||||||
|
func BlobGetMiddleware() func(http.Handler) http.Handler {
|
||||||
|
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
|
||||||
|
if err := handleBlob(w, r, next); err != nil {
|
||||||
|
httpLib.SendError(w, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
||||||
|
ctx := r.Context()
|
||||||
|
art, p, proxyCtl, err := preCheck(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !canProxy(p) || proxyCtl.UseLocalBlob(ctx, art) {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
size, reader, err := proxyCtl.ProxyBlob(ctx, p, art)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer reader.Close()
|
||||||
|
// Use io.CopyN to avoid out of memory when pulling big blob
|
||||||
|
written, err := io.CopyN(w, reader, size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if written != size {
|
||||||
|
return errors.Errorf("The size mismatch, actual:%d, expected: %d", written, size)
|
||||||
|
}
|
||||||
|
setHeaders(w, size, "", art.Digest)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *models.Project, ctl proxy.Controller, err error) {
|
||||||
|
art = lib.GetArtifactInfo(ctx)
|
||||||
|
ctl = proxy.ControllerInstance()
|
||||||
|
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManifestGetMiddleware middleware handle request for get manifest
|
||||||
|
func ManifestGetMiddleware() func(http.Handler) http.Handler {
|
||||||
|
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
|
||||||
|
if err := handleManifest(w, r, next); err != nil {
|
||||||
|
httpLib.SendError(w, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
||||||
|
ctx := r.Context()
|
||||||
|
art, p, proxyCtl, err := preCheck(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !canProxy(p) || proxyCtl.UseLocalManifest(ctx, art) {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Debugf("the tag is %v, digest is %v", art.Tag, art.Digest)
|
||||||
|
man, err := proxyCtl.ProxyManifest(ctx, p, art)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ct, payload, err := man.Payload()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
setHeaders(w, int64(len(payload)), ct, art.Digest)
|
||||||
|
if _, err = w.Write(payload); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func canProxy(p *models.Project) bool {
|
||||||
|
if p.RegistryID < 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
reg, err := registryMgr.Get(p.RegistryID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get registry, error:%v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if reg.Status != model.Healthy {
|
||||||
|
log.Errorf("current registry is unhealthy, regID:%v, Name:%v, Status: %v", reg.ID, reg.Name, reg.Status)
|
||||||
|
}
|
||||||
|
return reg.Status == model.Healthy
|
||||||
|
}
|
||||||
|
|
||||||
|
func setHeaders(w http.ResponseWriter, size int64, mediaType string, dig string) {
|
||||||
|
h := w.Header()
|
||||||
|
h.Set("Content-Length", fmt.Sprintf("%v", size))
|
||||||
|
if len(mediaType) > 0 {
|
||||||
|
h.Set("Content-Type", mediaType)
|
||||||
|
}
|
||||||
|
h.Set("Docker-Content-Digest", dig)
|
||||||
|
h.Set("Etag", dig)
|
||||||
|
}
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/server/middleware/contenttrust"
|
"github.com/goharbor/harbor/src/server/middleware/contenttrust"
|
||||||
"github.com/goharbor/harbor/src/server/middleware/immutable"
|
"github.com/goharbor/harbor/src/server/middleware/immutable"
|
||||||
"github.com/goharbor/harbor/src/server/middleware/quota"
|
"github.com/goharbor/harbor/src/server/middleware/quota"
|
||||||
|
"github.com/goharbor/harbor/src/server/middleware/repoproxy"
|
||||||
"github.com/goharbor/harbor/src/server/middleware/v2auth"
|
"github.com/goharbor/harbor/src/server/middleware/v2auth"
|
||||||
"github.com/goharbor/harbor/src/server/middleware/vulnerable"
|
"github.com/goharbor/harbor/src/server/middleware/vulnerable"
|
||||||
"github.com/goharbor/harbor/src/server/router"
|
"github.com/goharbor/harbor/src/server/router"
|
||||||
@ -47,6 +48,7 @@ func RegisterRoutes() {
|
|||||||
root.NewRoute().
|
root.NewRoute().
|
||||||
Method(http.MethodGet).
|
Method(http.MethodGet).
|
||||||
Path("/*/manifests/:reference").
|
Path("/*/manifests/:reference").
|
||||||
|
Middleware(repoproxy.ManifestGetMiddleware()).
|
||||||
Middleware(contenttrust.Middleware()).
|
Middleware(contenttrust.Middleware()).
|
||||||
Middleware(vulnerable.Middleware()).
|
Middleware(vulnerable.Middleware()).
|
||||||
HandlerFunc(getManifest)
|
HandlerFunc(getManifest)
|
||||||
@ -66,6 +68,12 @@ func RegisterRoutes() {
|
|||||||
Middleware(quota.PutManifestMiddleware()).
|
Middleware(quota.PutManifestMiddleware()).
|
||||||
Middleware(blob.PutManifestMiddleware()).
|
Middleware(blob.PutManifestMiddleware()).
|
||||||
HandlerFunc(putManifest)
|
HandlerFunc(putManifest)
|
||||||
|
// blob get
|
||||||
|
root.NewRoute().
|
||||||
|
Method(http.MethodGet).
|
||||||
|
Path("/*/blobs/:digest").
|
||||||
|
Middleware(repoproxy.BlobGetMiddleware()).
|
||||||
|
Handler(proxy)
|
||||||
// initiate blob upload
|
// initiate blob upload
|
||||||
root.NewRoute().
|
root.NewRoute().
|
||||||
Method(http.MethodPost).
|
Method(http.MethodPost).
|
||||||
|
Loading…
Reference in New Issue
Block a user