limit replication bandwidth

Signed-off-by: Ziming Zhang <zziming@vmware.com>
This commit is contained in:
Ziming Zhang 2021-09-03 21:56:27 +08:00 committed by Ziming
parent 9fdf8e286d
commit 98cef43ead
14 changed files with 136 additions and 30 deletions

View File

@ -6543,6 +6543,11 @@ definitions:
type: string
format: date-time
description: The update time of the policy.
speed:
type: integer
format: int32
description: speed limit for each task
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:

View File

@ -1,2 +1,4 @@
/* cleanup deleted user project members */
DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true )
DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true );
ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS speed_kb int;

View File

@ -91,7 +91,7 @@ func (c *copyFlow) Run(ctx context.Context) error {
return err
}
return c.createTasks(ctx, srcResources, dstResources)
return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed)
}
func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
@ -102,7 +102,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
return execution.Status == job.StoppedStatus.String(), nil
}
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource) error {
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error {
for i, resource := range srcResources {
src, err := json.Marshal(resource)
if err != nil {
@ -121,6 +121,7 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
Parameters: map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
},
}

View File

@ -45,6 +45,7 @@ type Policy struct {
Enabled bool `json:"enabled"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
}
// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
@ -130,6 +131,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.Enabled = policy.Enabled
p.CreationTime = policy.CreationTime
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
@ -173,6 +175,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
ReplicateDeletion: p.ReplicateDeletion,
CreationTime: p.CreationTime,
UpdateTime: p.UpdateTime,
Speed: p.Speed,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID

View File

@ -49,7 +49,7 @@ type transfer struct {
dst adapter.ChartRegistry
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
@ -78,7 +78,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
version: dst.Metadata.Artifacts[0].Tags[0],
}
// copy the chart from source registry to the destination
return t.copy(srcChart, dstChart, dst.Override)
return t.copy(srcChart, dstChart, dst.Override, speed)
}
func (t *transfer) initialize(src, dst *model.Resource) error {
@ -129,7 +129,7 @@ func (t *transfer) shouldStop() bool {
return isStopped
}
func (t *transfer) copy(src, dst *chart, override bool) error {
func (t *transfer) copy(src, dst *chart, override bool, speed int32) error {
if t.shouldStop() {
return nil
}
@ -160,6 +160,10 @@ func (t *transfer) copy(src, dst *chart, override bool) error {
t.logger.Errorf("failed to download the chart %s:%s: %v", src.name, src.version, err)
return err
}
if speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", speed)
chart = trans.NewReader(chart, speed)
}
defer chart.Close()
if err = t.dst.UploadChart(dst.name, dst.version, chart); err != nil {

View File

@ -96,7 +96,7 @@ func TestCopy(t *testing.T) {
name: "dest/harbor",
version: "0.2.0",
}
err := transfer.copy(src, dst, true)
err := transfer.copy(src, dst, true, 0)
assert.Nil(t, err)
}

View File

@ -69,9 +69,10 @@ type transfer struct {
isStopped trans.StopFunc
src adapter.ArtifactRegistry
dst adapter.ArtifactRegistry
speed int32
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
@ -88,7 +89,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
}
// copy the repository from source registry to the destination
return t.copy(t.convert(src), t.convert(dst), dst.Override)
return t.copy(t.convert(src), t.convert(dst), dst.Override, speed)
}
func (t *transfer) convert(resource *model.Resource) *repository {
@ -161,14 +162,18 @@ func (t *transfer) shouldStop() bool {
return isStopped
}
func (t *transfer) copy(src *repository, dst *repository, override bool) error {
func (t *transfer) copy(src *repository, dst *repository, override bool, speed int32) error {
srcRepo := src.repository
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
if speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", speed)
}
var err error
for i := range src.tags {
if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override); e != nil {
if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, speed); e != nil {
if e == errStopped {
return nil
}
@ -187,7 +192,7 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error {
return nil
}
func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool) error {
func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, speed int32) error {
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcRef, dstRepo, dstRef)
// pull the manifest from the source registry
@ -221,7 +226,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid
// copy contents between the source and destination registries
for _, content := range manifest.References() {
if err = t.copyContent(content, srcRepo, dstRepo); err != nil {
if err = t.copyContent(content, srcRepo, dstRepo, speed); err != nil {
return err
}
}
@ -237,7 +242,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid
}
// copy the content from source registry to destination according to its media type
func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string) error {
func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, speed int32) error {
digest := content.Digest.String()
switch content.MediaType {
// when the media type of pulled manifest is index,
@ -246,7 +251,7 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo
v1.MediaTypeImageManifest, schema2.MediaTypeManifest,
schema1.MediaTypeSignedManifest, schema1.MediaTypeManifest:
// as using digest as the reference, so set the override to true directly
return t.copyArtifact(srcRepo, digest, dstRepo, digest, true)
return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, speed)
// handle foreign layer
case schema2.MediaTypeForeignLayer:
t.logger.Infof("the layer %s is a foreign layer, skip", digest)
@ -255,15 +260,15 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo
// the media type of the layer or config can be "application/octet-stream",
// schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig
default:
return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size)
return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, speed)
}
}
func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error {
func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
var err error
for i, backoff := 1, 2*time.Second; i <= retry; i, backoff = i+1, backoff*2 {
t.logger.Infof("copying the blob %s(the %dth running)...", digest, i)
if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor); err == nil {
if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor, speed); err == nil {
t.logger.Infof("copy the blob %s completed", digest)
return nil
}
@ -278,7 +283,7 @@ func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDe
// copy the layer or artifact config from the source registry to destination
// the size parameter is taken from manifests.
func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error {
func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
if t.shouldStop() {
return errStopped
}
@ -311,6 +316,9 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor
t.logger.Errorf("failed to pulling the blob %s: %v", digest, err)
return err
}
if speed > 0 {
data = trans.NewReader(data, speed)
}
defer data.Close()
// get size 0 from PullBlob, use size from distribution.Descriptor instead.
if size == 0 {
@ -318,6 +326,8 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor
t.logger.Debugf("the blob size from remote registry is 0, use size %d from manifests instead", size)
}
t.logger.Debugf("the blob size is %d bytes", size)
if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil {
t.logger.Errorf("failed to pushing the blob %s, size %d: %v", digest, size, err)
return err

View File

@ -144,8 +144,7 @@ func TestCopy(t *testing.T) {
repository: "destination",
tags: []string{"b1", "b2"},
}
override := true
err := tr.copy(src, dst, override)
err := tr.copy(src, dst, true, 0)
require.Nil(t, err)
}

View File

@ -0,0 +1,48 @@
package transfer
import (
"fmt"
"io"
"time"
"golang.org/x/time/rate"
)
type reader struct {
reader io.ReadCloser
limiter *rate.Limiter
}
type RateOpts struct {
Rate float64
}
const KBRATE = 1024 / 8
// NewReader returns a Reader that is rate limited
func NewReader(r io.ReadCloser, kb int32) io.ReadCloser {
l := rate.NewLimiter(rate.Limit(kb*KBRATE), 1000*1024)
return &reader{
reader: r,
limiter: l,
}
}
func (r *reader) Read(buf []byte) (int, error) {
n, err := r.reader.Read(buf)
if n <= 0 {
return n, err
}
now := time.Now()
rv := r.limiter.ReserveN(now, n)
if !rv.OK() {
return 0, fmt.Errorf("exceeds limiter's burst")
}
delay := rv.DelayFrom(now)
time.Sleep(delay)
return n, err
}
func (r *reader) Close() error {
return r.reader.Close()
}

View File

@ -34,7 +34,7 @@ type Factory func(Logger, StopFunc) (Transfer, error)
// Transfer defines an interface used to transfer the source
// resource to the destination
type Transfer interface {
Transfer(src *model.Resource, dst *model.Resource) error
Transfer(src *model.Resource, dst *model.Resource, speed int32) error
}
// Logger defines an interface for logging

View File

@ -56,7 +56,7 @@ func (r *Replication) Validate(params job.Parameters) error {
func (r *Replication) Run(ctx job.Context, params job.Parameters) error {
logger := ctx.GetLogger()
src, dst, err := parseParams(params)
src, dst, speed, err := parseParams(params)
if err != nil {
logger.Errorf("failed to parse parameters: %v", err)
return err
@ -81,19 +81,38 @@ func (r *Replication) Run(ctx job.Context, params job.Parameters) error {
return err
}
return trans.Transfer(src, dst)
return trans.Transfer(src, dst, speed)
}
func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, error) {
func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, int32, error) {
src := &model.Resource{}
if err := parseParam(params, "src_resource", src); err != nil {
return nil, nil, err
return nil, nil, 0, err
}
dst := &model.Resource{}
if err := parseParam(params, "dst_resource", dst); err != nil {
return nil, nil, err
return nil, nil, 0, err
}
return src, dst, nil
var speed int32 = 0
value, exist := params["speed"]
if !exist {
speed = 0
} else {
if s, ok := value.(int32); ok {
speed = s
} else {
if s, ok := value.(int); ok {
speed = int32(s)
} else {
if s, ok := value.(float64); ok {
speed = int32(s)
} else {
return nil, nil, 0, fmt.Errorf("the value of speed isn't integer (%T)", value)
}
}
}
}
return src, dst, speed, nil
}
func parseParam(params map[string]interface{}, name string, v interface{}) error {

View File

@ -53,10 +53,11 @@ func TestParseParams(t *testing.T) {
"src_resource": `{"type":"chart"}`,
"dst_resource": `{"type":"chart"}`,
}
res, dst, err := parseParams(params)
res, dst, speed, err := parseParams(params)
require.Nil(t, err)
assert.Equal(t, "chart", string(res.Type))
assert.Equal(t, "chart", string(dst.Type))
assert.Equal(t, int32(0), speed)
}
func TestMaxFails(t *testing.T) {
@ -82,7 +83,7 @@ var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Tr
type fakedTransfer struct{}
func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource) error {
func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
transferred = true
return nil
}

View File

@ -41,6 +41,7 @@ type Policy struct {
ReplicateDeletion bool `orm:"column(replicate_deletion)"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"`
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
}
// TableName set table name for ORM

View File

@ -101,6 +101,12 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope
}
}
}
if params.Policy.Speed != nil {
if *params.Policy.Speed < 0 {
*params.Policy.Speed = 0
}
policy.Speed = *params.Policy.Speed
}
id, err := r.ctl.CreatePolicy(ctx, policy)
if err != nil {
return r.SendError(ctx, err)
@ -158,6 +164,12 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope
}
}
}
if params.Policy.Speed != nil {
if *params.Policy.Speed < 0 {
*params.Policy.Speed = 0
}
policy.Speed = *params.Policy.Speed
}
if err := r.ctl.UpdatePolicy(ctx, policy); err != nil {
return r.SendError(ctx, err)
}
@ -414,6 +426,7 @@ func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPol
Name: policy.Name,
Override: policy.Override,
ReplicateDeletion: policy.ReplicateDeletion,
Speed: &policy.Speed,
UpdateTime: strfmt.DateTime(policy.UpdateTime),
}
if policy.SrcRegistry != nil {