mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 02:05:41 +01:00
feat: implement bandwidth limit for proxy-cache (#20812)
Signed-off-by: Shengwen Yu <yshengwen@vmware.com>
This commit is contained in:
parent
ec5dc094d9
commit
1f75b7aaef
@ -7340,6 +7340,10 @@ definitions:
|
|||||||
type: string
|
type: string
|
||||||
description: 'The ID of the tag retention policy for the project'
|
description: 'The ID of the tag retention policy for the project'
|
||||||
x-nullable: true
|
x-nullable: true
|
||||||
|
proxy_speed_kb:
|
||||||
|
type: string
|
||||||
|
description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.'
|
||||||
|
x-nullable: true
|
||||||
ProjectSummary:
|
ProjectSummary:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
|
@ -264,7 +264,7 @@ func (c *controller) HeadManifest(_ context.Context, art lib.ArtifactInfo, remot
|
|||||||
func (c *controller) ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
|
func (c *controller) ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
|
||||||
remoteRepo := getRemoteRepo(art)
|
remoteRepo := getRemoteRepo(art)
|
||||||
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
|
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
|
||||||
rHelper, err := NewRemoteHelper(ctx, p.RegistryID)
|
rHelper, err := NewRemoteHelper(ctx, p.RegistryID, WithSpeed(p.ProxyCacheSpeed()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
37
src/controller/proxy/options.go
Normal file
37
src/controller/proxy/options.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
// Speed is the data transfer speed for proxy cache from Harbor to upstream registry, no limit by default.
|
||||||
|
Speed int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOptions(opts ...Option) *Options {
|
||||||
|
o := &Options{}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithSpeed(speed int32) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Speed = speed
|
||||||
|
}
|
||||||
|
}
|
33
src/controller/proxy/options_test.go
Normal file
33
src/controller/proxy/options_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewOptions(t *testing.T) {
|
||||||
|
// test default options
|
||||||
|
o := NewOptions()
|
||||||
|
assert.Equal(t, int32(0), o.Speed)
|
||||||
|
|
||||||
|
// test with options
|
||||||
|
// with speed
|
||||||
|
withSpeed := WithSpeed(1024)
|
||||||
|
o = NewOptions(withSpeed)
|
||||||
|
assert.Equal(t, int32(1024), o.Speed)
|
||||||
|
}
|
@ -21,6 +21,7 @@ import (
|
|||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
"github.com/goharbor/harbor/src/pkg/reg"
|
"github.com/goharbor/harbor/src/pkg/reg"
|
||||||
"github.com/goharbor/harbor/src/pkg/reg/adapter"
|
"github.com/goharbor/harbor/src/pkg/reg/adapter"
|
||||||
"github.com/goharbor/harbor/src/pkg/reg/model"
|
"github.com/goharbor/harbor/src/pkg/reg/model"
|
||||||
@ -43,13 +44,16 @@ type remoteHelper struct {
|
|||||||
regID int64
|
regID int64
|
||||||
registry adapter.ArtifactRegistry
|
registry adapter.ArtifactRegistry
|
||||||
registryMgr reg.Manager
|
registryMgr reg.Manager
|
||||||
|
opts *Options
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRemoteHelper create a remote interface
|
// NewRemoteHelper create a remote interface
|
||||||
func NewRemoteHelper(ctx context.Context, regID int64) (RemoteInterface, error) {
|
func NewRemoteHelper(ctx context.Context, regID int64, opts ...Option) (RemoteInterface, error) {
|
||||||
r := &remoteHelper{
|
r := &remoteHelper{
|
||||||
regID: regID,
|
regID: regID,
|
||||||
registryMgr: reg.Mgr}
|
registryMgr: reg.Mgr,
|
||||||
|
opts: NewOptions(opts...),
|
||||||
|
}
|
||||||
if err := r.init(ctx); err != nil {
|
if err := r.init(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -83,7 +87,14 @@ func (r *remoteHelper) init(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
|
func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
|
||||||
return r.registry.PullBlob(repo, dig)
|
sz, bReader, err := r.registry.PullBlob(repo, dig)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
if r.opts != nil && r.opts.Speed > 0 {
|
||||||
|
bReader = lib.NewReader(bReader, r.opts.Speed)
|
||||||
|
}
|
||||||
|
return sz, bReader, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
|
func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
|
|
||||||
common_http "github.com/goharbor/harbor/src/common/http"
|
common_http "github.com/goharbor/harbor/src/common/http"
|
||||||
trans "github.com/goharbor/harbor/src/controller/replication/transfer"
|
trans "github.com/goharbor/harbor/src/controller/replication/transfer"
|
||||||
|
"github.com/goharbor/harbor/src/lib"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/pkg/reg/adapter"
|
"github.com/goharbor/harbor/src/pkg/reg/adapter"
|
||||||
"github.com/goharbor/harbor/src/pkg/reg/model"
|
"github.com/goharbor/harbor/src/pkg/reg/model"
|
||||||
@ -380,7 +381,7 @@ func (t *transfer) copyBlobByMonolithic(srcRepo, dstRepo, digest string, sizeFro
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if speed > 0 {
|
if speed > 0 {
|
||||||
data = trans.NewReader(data, speed)
|
data = lib.NewReader(data, speed)
|
||||||
}
|
}
|
||||||
defer data.Close()
|
defer data.Close()
|
||||||
// get size 0 from PullBlob, use size from distribution.Descriptor instead.
|
// get size 0 from PullBlob, use size from distribution.Descriptor instead.
|
||||||
@ -435,7 +436,7 @@ func (t *transfer) copyBlobByChunk(srcRepo, dstRepo, digest string, sizeFromDesc
|
|||||||
}
|
}
|
||||||
|
|
||||||
if speed > 0 {
|
if speed > 0 {
|
||||||
data = trans.NewReader(data, speed)
|
data = lib.NewReader(data, speed)
|
||||||
}
|
}
|
||||||
// failureEnd will only be used for adjusting content range when issue happened during push the chunk.
|
// failureEnd will only be used for adjusting content range when issue happened during push the chunk.
|
||||||
var failureEnd int64
|
var failureEnd int64
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package transfer
|
package lib
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
@ -24,4 +24,5 @@ const (
|
|||||||
ProMetaAutoScan = "auto_scan"
|
ProMetaAutoScan = "auto_scan"
|
||||||
ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist"
|
ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist"
|
||||||
ProMetaAutoSBOMGen = "auto_sbom_generation"
|
ProMetaAutoSBOMGen = "auto_sbom_generation"
|
||||||
|
ProMetaProxySpeed = "proxy_speed_kb"
|
||||||
)
|
)
|
||||||
|
@ -156,6 +156,19 @@ func (p *Project) AutoSBOMGen() bool {
|
|||||||
return isTrue(auto)
|
return isTrue(auto)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProxyCacheSpeed ...
|
||||||
|
func (p *Project) ProxyCacheSpeed() int32 {
|
||||||
|
speed, exist := p.GetMetadata(ProMetaProxySpeed)
|
||||||
|
if !exist {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
speedInt, err := strconv.ParseInt(speed, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return int32(speedInt)
|
||||||
|
}
|
||||||
|
|
||||||
// FilterByPublic returns orm.QuerySeter with public filter
|
// FilterByPublic returns orm.QuerySeter with public filter
|
||||||
func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value interface{}) orm.QuerySeter {
|
func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value interface{}) orm.QuerySeter {
|
||||||
subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'`
|
subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'`
|
||||||
|
@ -60,7 +60,7 @@ func BlobGetMiddleware() func(http.Handler) http.Handler {
|
|||||||
|
|
||||||
func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
art, p, proxyCtl, err := preCheck(ctx)
|
art, p, proxyCtl, err := preCheck(ctx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -96,14 +96,14 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) {
|
func preCheck(ctx context.Context, withProjectMetadata bool) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) {
|
||||||
none := lib.ArtifactInfo{}
|
none := lib.ArtifactInfo{}
|
||||||
art = lib.GetArtifactInfo(ctx)
|
art = lib.GetArtifactInfo(ctx)
|
||||||
if art == none {
|
if art == none {
|
||||||
return none, nil, nil, errors.New("artifactinfo is not found").WithCode(errors.NotFoundCode)
|
return none, nil, nil, errors.New("artifactinfo is not found").WithCode(errors.NotFoundCode)
|
||||||
}
|
}
|
||||||
ctl = proxy.ControllerInstance()
|
ctl = proxy.ControllerInstance()
|
||||||
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false))
|
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(withProjectMetadata))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +155,7 @@ func defaultBlobURL(projectName string, name string, digest string) string {
|
|||||||
|
|
||||||
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
art, p, proxyCtl, err := preCheck(ctx)
|
art, p, proxyCtl, err := preCheck(ctx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -174,7 +174,7 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
|
|||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID)
|
remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler {
|
|||||||
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
|
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
art, p, _, err := preCheck(ctx)
|
art, p, _, err := preCheck(ctx, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
libhttp.SendError(w, err)
|
libhttp.SendError(w, err)
|
||||||
return
|
return
|
||||||
@ -69,7 +69,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler {
|
|||||||
util.SendListTagsResponse(w, r, tags)
|
util.SendListTagsResponse(w, r, tags)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID)
|
remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warningf("failed to get remote interface, error: %v, fallback to local tags", err)
|
logger.Warningf("failed to get remote interface, error: %v, fallback to local tags", err)
|
||||||
return
|
return
|
||||||
|
@ -159,6 +159,11 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ignore metadata.proxy_speed_kb for non-proxy-cache project
|
||||||
|
if req.RegistryID == nil {
|
||||||
|
req.Metadata.ProxySpeedKb = nil
|
||||||
|
}
|
||||||
|
|
||||||
// ignore enable_content_trust metadata for proxy cache project
|
// ignore enable_content_trust metadata for proxy cache project
|
||||||
// see https://github.com/goharbor/harbor/issues/12940 to get more info
|
// see https://github.com/goharbor/harbor/issues/12940 to get more info
|
||||||
if req.RegistryID != nil {
|
if req.RegistryID != nil {
|
||||||
@ -551,6 +556,11 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ignore metadata.proxy_speed_kb for non-proxy-cache project
|
||||||
|
if params.Project.Metadata != nil && !p.IsProxy() {
|
||||||
|
params.Project.Metadata.ProxySpeedKb = nil
|
||||||
|
}
|
||||||
|
|
||||||
// ignore enable_content_trust metadata for proxy cache project
|
// ignore enable_content_trust metadata for proxy cache project
|
||||||
// see https://github.com/goharbor/harbor/issues/12940 to get more info
|
// see https://github.com/goharbor/harbor/issues/12940 to get more info
|
||||||
if params.Project.Metadata != nil && p.IsProxy() {
|
if params.Project.Metadata != nil && p.IsProxy() {
|
||||||
@ -792,6 +802,13 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project
|
|||||||
if !permitted {
|
if !permitted {
|
||||||
return errors.BadRequestError(fmt.Errorf("unsupported registry type %s", string(registry.Type)))
|
return errors.BadRequestError(fmt.Errorf("unsupported registry type %s", string(registry.Type)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate metadata.proxy_speed_kb. It should be an int32
|
||||||
|
if ps := req.Metadata.ProxySpeedKb; ps != nil {
|
||||||
|
if _, err := strconv.ParseInt(*ps, 10, 32); err != nil {
|
||||||
|
return errors.BadRequestError(nil).WithMessage(fmt.Sprintf("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.StorageLimit != nil {
|
if req.StorageLimit != nil {
|
||||||
|
@ -155,6 +155,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin
|
|||||||
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value)
|
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value)
|
||||||
}
|
}
|
||||||
metas[proModels.ProMetaSeverity] = strings.ToLower(severity.String())
|
metas[proModels.ProMetaSeverity] = strings.ToLower(severity.String())
|
||||||
|
case proModels.ProMetaProxySpeed:
|
||||||
|
v, err := strconv.ParseInt(value, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value)
|
||||||
|
}
|
||||||
|
metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10)
|
||||||
default:
|
default:
|
||||||
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid key: %s", key)
|
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid key: %s", key)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user