From 98cef43ead677da277c2bd4a6b1a25461ee4078c Mon Sep 17 00:00:00 2001 From: Ziming Zhang Date: Fri, 3 Sep 2021 21:56:27 +0800 Subject: [PATCH] limit replication bandwidth Signed-off-by: Ziming Zhang --- api/v2.0/swagger.yaml | 5 ++ .../postgresql/0070_2.4.0_schema.up.sql | 4 +- src/controller/replication/flow/copy.go | 5 +- src/controller/replication/model/model.go | 3 ++ .../replication/transfer/chart/transfer.go | 10 ++-- .../transfer/chart/transfer_test.go | 2 +- .../replication/transfer/image/transfer.go | 34 ++++++++----- .../transfer/image/transfer_test.go | 3 +- .../replication/transfer/iothrottler.go | 48 +++++++++++++++++++ .../replication/transfer/transfer.go | 2 +- .../job/impl/replication/replication.go | 31 +++++++++--- .../job/impl/replication/replication_test.go | 5 +- src/pkg/replication/model/model.go | 1 + src/server/v2.0/handler/replication.go | 13 +++++ 14 files changed, 136 insertions(+), 30 deletions(-) create mode 100644 src/controller/replication/transfer/iothrottler.go diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index f70aec640..b8537db2e 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -6543,6 +6543,11 @@ definitions: type: string format: date-time description: The update time of the policy. + speed: + type: integer + format: int32 + description: speed limit for each task + x-isnullable: true # make this field optional to keep backward compatibility ReplicationTrigger: type: object properties: diff --git a/make/migrations/postgresql/0070_2.4.0_schema.up.sql b/make/migrations/postgresql/0070_2.4.0_schema.up.sql index 8532ab881..7eb11a1a2 100644 --- a/make/migrations/postgresql/0070_2.4.0_schema.up.sql +++ b/make/migrations/postgresql/0070_2.4.0_schema.up.sql @@ -1,2 +1,4 @@ /* cleanup deleted user project members */ -DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true ) \ No newline at end of file +DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true ); + +ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS speed_kb int; diff --git a/src/controller/replication/flow/copy.go b/src/controller/replication/flow/copy.go index 1b473128f..243152d5f 100644 --- a/src/controller/replication/flow/copy.go +++ b/src/controller/replication/flow/copy.go @@ -91,7 +91,7 @@ func (c *copyFlow) Run(ctx context.Context) error { return err } - return c.createTasks(ctx, srcResources, dstResources) + return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed) } func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) { @@ -102,7 +102,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) { return execution.Status == job.StoppedStatus.String(), nil } -func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource) error { +func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error { for i, resource := range srcResources { src, err := json.Marshal(resource) if err != nil { @@ -121,6 +121,7 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [ Parameters: map[string]interface{}{ "src_resource": string(src), "dst_resource": string(dest), + "speed": speed, }, } diff --git a/src/controller/replication/model/model.go b/src/controller/replication/model/model.go index 80d204261..3cd30435f 100644 --- a/src/controller/replication/model/model.go +++ b/src/controller/replication/model/model.go @@ -45,6 +45,7 @@ type Policy struct { Enabled bool `json:"enabled"` CreationTime time.Time `json:"creation_time"` UpdateTime time.Time `json:"update_time"` + Speed int32 `json:"speed"` } // IsScheduledTrigger returns true when the policy is scheduled trigger and enabled @@ -130,6 +131,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error { p.Enabled = policy.Enabled p.CreationTime = policy.CreationTime p.UpdateTime = policy.UpdateTime + p.Speed = policy.Speed if policy.SrcRegistryID > 0 { p.SrcRegistry = &model.Registry{ @@ -173,6 +175,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) { ReplicateDeletion: p.ReplicateDeletion, CreationTime: p.CreationTime, UpdateTime: p.UpdateTime, + Speed: p.Speed, } if p.SrcRegistry != nil { policy.SrcRegistryID = p.SrcRegistry.ID diff --git a/src/controller/replication/transfer/chart/transfer.go b/src/controller/replication/transfer/chart/transfer.go index 608119bb5..46d8e742b 100644 --- a/src/controller/replication/transfer/chart/transfer.go +++ b/src/controller/replication/transfer/chart/transfer.go @@ -49,7 +49,7 @@ type transfer struct { dst adapter.ChartRegistry } -func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { +func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { // initialize if err := t.initialize(src, dst); err != nil { return err @@ -78,7 +78,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { version: dst.Metadata.Artifacts[0].Tags[0], } // copy the chart from source registry to the destination - return t.copy(srcChart, dstChart, dst.Override) + return t.copy(srcChart, dstChart, dst.Override, speed) } func (t *transfer) initialize(src, dst *model.Resource) error { @@ -129,7 +129,7 @@ func (t *transfer) shouldStop() bool { return isStopped } -func (t *transfer) copy(src, dst *chart, override bool) error { +func (t *transfer) copy(src, dst *chart, override bool, speed int32) error { if t.shouldStop() { return nil } @@ -160,6 +160,10 @@ func (t *transfer) copy(src, dst *chart, override bool) error { t.logger.Errorf("failed to download the chart %s:%s: %v", src.name, src.version, err) return err } + if speed > 0 { + t.logger.Infof("limit network speed at %d kb/s", speed) + chart = trans.NewReader(chart, speed) + } defer chart.Close() if err = t.dst.UploadChart(dst.name, dst.version, chart); err != nil { diff --git a/src/controller/replication/transfer/chart/transfer_test.go b/src/controller/replication/transfer/chart/transfer_test.go index d616eb8a2..74986998a 100644 --- a/src/controller/replication/transfer/chart/transfer_test.go +++ b/src/controller/replication/transfer/chart/transfer_test.go @@ -96,7 +96,7 @@ func TestCopy(t *testing.T) { name: "dest/harbor", version: "0.2.0", } - err := transfer.copy(src, dst, true) + err := transfer.copy(src, dst, true, 0) assert.Nil(t, err) } diff --git a/src/controller/replication/transfer/image/transfer.go b/src/controller/replication/transfer/image/transfer.go index fd0bee376..8441a7156 100644 --- a/src/controller/replication/transfer/image/transfer.go +++ b/src/controller/replication/transfer/image/transfer.go @@ -69,9 +69,10 @@ type transfer struct { isStopped trans.StopFunc src adapter.ArtifactRegistry dst adapter.ArtifactRegistry + speed int32 } -func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { +func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { // initialize if err := t.initialize(src, dst); err != nil { return err @@ -88,7 +89,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error { } // copy the repository from source registry to the destination - return t.copy(t.convert(src), t.convert(dst), dst.Override) + return t.copy(t.convert(src), t.convert(dst), dst.Override, speed) } func (t *transfer) convert(resource *model.Resource) *repository { @@ -161,14 +162,18 @@ func (t *transfer) shouldStop() bool { return isStopped } -func (t *transfer) copy(src *repository, dst *repository, override bool) error { +func (t *transfer) copy(src *repository, dst *repository, override bool, speed int32) error { srcRepo := src.repository dstRepo := dst.repository t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...", srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) + if speed > 0 { + t.logger.Infof("limit network speed at %d kb/s", speed) + } + var err error for i := range src.tags { - if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override); e != nil { + if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, speed); e != nil { if e == errStopped { return nil } @@ -187,7 +192,7 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error { return nil } -func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool) error { +func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, speed int32) error { t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...", srcRepo, srcRef, dstRepo, dstRef) // pull the manifest from the source registry @@ -221,7 +226,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid // copy contents between the source and destination registries for _, content := range manifest.References() { - if err = t.copyContent(content, srcRepo, dstRepo); err != nil { + if err = t.copyContent(content, srcRepo, dstRepo, speed); err != nil { return err } } @@ -237,7 +242,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid } // copy the content from source registry to destination according to its media type -func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string) error { +func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, speed int32) error { digest := content.Digest.String() switch content.MediaType { // when the media type of pulled manifest is index, @@ -246,7 +251,7 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo v1.MediaTypeImageManifest, schema2.MediaTypeManifest, schema1.MediaTypeSignedManifest, schema1.MediaTypeManifest: // as using digest as the reference, so set the override to true directly - return t.copyArtifact(srcRepo, digest, dstRepo, digest, true) + return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, speed) // handle foreign layer case schema2.MediaTypeForeignLayer: t.logger.Infof("the layer %s is a foreign layer, skip", digest) @@ -255,15 +260,15 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo // the media type of the layer or config can be "application/octet-stream", // schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig default: - return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size) + return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, speed) } } -func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error { +func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { var err error for i, backoff := 1, 2*time.Second; i <= retry; i, backoff = i+1, backoff*2 { t.logger.Infof("copying the blob %s(the %dth running)...", digest, i) - if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor); err == nil { + if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor, speed); err == nil { t.logger.Infof("copy the blob %s completed", digest) return nil } @@ -278,7 +283,7 @@ func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDe // copy the layer or artifact config from the source registry to destination // the size parameter is taken from manifests. -func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64) error { +func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { if t.shouldStop() { return errStopped } @@ -311,6 +316,9 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor t.logger.Errorf("failed to pulling the blob %s: %v", digest, err) return err } + if speed > 0 { + data = trans.NewReader(data, speed) + } defer data.Close() // get size 0 from PullBlob, use size from distribution.Descriptor instead. if size == 0 { @@ -318,6 +326,8 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor t.logger.Debugf("the blob size from remote registry is 0, use size %d from manifests instead", size) } + t.logger.Debugf("the blob size is %d bytes", size) + if err = t.dst.PushBlob(dstRepo, digest, size, data); err != nil { t.logger.Errorf("failed to pushing the blob %s, size %d: %v", digest, size, err) return err diff --git a/src/controller/replication/transfer/image/transfer_test.go b/src/controller/replication/transfer/image/transfer_test.go index edc44d1f6..0859b83ef 100644 --- a/src/controller/replication/transfer/image/transfer_test.go +++ b/src/controller/replication/transfer/image/transfer_test.go @@ -144,8 +144,7 @@ func TestCopy(t *testing.T) { repository: "destination", tags: []string{"b1", "b2"}, } - override := true - err := tr.copy(src, dst, override) + err := tr.copy(src, dst, true, 0) require.Nil(t, err) } diff --git a/src/controller/replication/transfer/iothrottler.go b/src/controller/replication/transfer/iothrottler.go new file mode 100644 index 000000000..354c9972a --- /dev/null +++ b/src/controller/replication/transfer/iothrottler.go @@ -0,0 +1,48 @@ +package transfer + +import ( + "fmt" + "io" + "time" + + "golang.org/x/time/rate" +) + +type reader struct { + reader io.ReadCloser + limiter *rate.Limiter +} + +type RateOpts struct { + Rate float64 +} + +const KBRATE = 1024 / 8 + +// NewReader returns a Reader that is rate limited +func NewReader(r io.ReadCloser, kb int32) io.ReadCloser { + l := rate.NewLimiter(rate.Limit(kb*KBRATE), 1000*1024) + return &reader{ + reader: r, + limiter: l, + } +} + +func (r *reader) Read(buf []byte) (int, error) { + n, err := r.reader.Read(buf) + if n <= 0 { + return n, err + } + now := time.Now() + rv := r.limiter.ReserveN(now, n) + if !rv.OK() { + return 0, fmt.Errorf("exceeds limiter's burst") + } + delay := rv.DelayFrom(now) + time.Sleep(delay) + return n, err +} + +func (r *reader) Close() error { + return r.reader.Close() +} diff --git a/src/controller/replication/transfer/transfer.go b/src/controller/replication/transfer/transfer.go index ab7788834..a446d5e28 100644 --- a/src/controller/replication/transfer/transfer.go +++ b/src/controller/replication/transfer/transfer.go @@ -34,7 +34,7 @@ type Factory func(Logger, StopFunc) (Transfer, error) // Transfer defines an interface used to transfer the source // resource to the destination type Transfer interface { - Transfer(src *model.Resource, dst *model.Resource) error + Transfer(src *model.Resource, dst *model.Resource, speed int32) error } // Logger defines an interface for logging diff --git a/src/jobservice/job/impl/replication/replication.go b/src/jobservice/job/impl/replication/replication.go index bf6704b49..24c260197 100644 --- a/src/jobservice/job/impl/replication/replication.go +++ b/src/jobservice/job/impl/replication/replication.go @@ -56,7 +56,7 @@ func (r *Replication) Validate(params job.Parameters) error { func (r *Replication) Run(ctx job.Context, params job.Parameters) error { logger := ctx.GetLogger() - src, dst, err := parseParams(params) + src, dst, speed, err := parseParams(params) if err != nil { logger.Errorf("failed to parse parameters: %v", err) return err @@ -81,19 +81,38 @@ func (r *Replication) Run(ctx job.Context, params job.Parameters) error { return err } - return trans.Transfer(src, dst) + return trans.Transfer(src, dst, speed) } -func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, error) { +func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, int32, error) { src := &model.Resource{} if err := parseParam(params, "src_resource", src); err != nil { - return nil, nil, err + return nil, nil, 0, err } dst := &model.Resource{} if err := parseParam(params, "dst_resource", dst); err != nil { - return nil, nil, err + return nil, nil, 0, err } - return src, dst, nil + var speed int32 = 0 + value, exist := params["speed"] + if !exist { + speed = 0 + } else { + if s, ok := value.(int32); ok { + speed = s + } else { + if s, ok := value.(int); ok { + speed = int32(s) + } else { + if s, ok := value.(float64); ok { + speed = int32(s) + } else { + return nil, nil, 0, fmt.Errorf("the value of speed isn't integer (%T)", value) + } + } + } + } + return src, dst, speed, nil } func parseParam(params map[string]interface{}, name string, v interface{}) error { diff --git a/src/jobservice/job/impl/replication/replication_test.go b/src/jobservice/job/impl/replication/replication_test.go index 6ee096f73..8326ef482 100644 --- a/src/jobservice/job/impl/replication/replication_test.go +++ b/src/jobservice/job/impl/replication/replication_test.go @@ -53,10 +53,11 @@ func TestParseParams(t *testing.T) { "src_resource": `{"type":"chart"}`, "dst_resource": `{"type":"chart"}`, } - res, dst, err := parseParams(params) + res, dst, speed, err := parseParams(params) require.Nil(t, err) assert.Equal(t, "chart", string(res.Type)) assert.Equal(t, "chart", string(dst.Type)) + assert.Equal(t, int32(0), speed) } func TestMaxFails(t *testing.T) { @@ -82,7 +83,7 @@ var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Tr type fakedTransfer struct{} -func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource) error { +func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { transferred = true return nil } diff --git a/src/pkg/replication/model/model.go b/src/pkg/replication/model/model.go index 688ee0a22..6db252d76 100644 --- a/src/pkg/replication/model/model.go +++ b/src/pkg/replication/model/model.go @@ -41,6 +41,7 @@ type Policy struct { ReplicateDeletion bool `orm:"column(replicate_deletion)"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"` UpdateTime time.Time `orm:"column(update_time);auto_now"` + Speed int32 `orm:"column(speed_kb)"` } // TableName set table name for ORM diff --git a/src/server/v2.0/handler/replication.go b/src/server/v2.0/handler/replication.go index d5a385915..173caa0d0 100644 --- a/src/server/v2.0/handler/replication.go +++ b/src/server/v2.0/handler/replication.go @@ -101,6 +101,12 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope } } } + if params.Policy.Speed != nil { + if *params.Policy.Speed < 0 { + *params.Policy.Speed = 0 + } + policy.Speed = *params.Policy.Speed + } id, err := r.ctl.CreatePolicy(ctx, policy) if err != nil { return r.SendError(ctx, err) @@ -158,6 +164,12 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope } } } + if params.Policy.Speed != nil { + if *params.Policy.Speed < 0 { + *params.Policy.Speed = 0 + } + policy.Speed = *params.Policy.Speed + } if err := r.ctl.UpdatePolicy(ctx, policy); err != nil { return r.SendError(ctx, err) } @@ -414,6 +426,7 @@ func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPol Name: policy.Name, Override: policy.Override, ReplicateDeletion: policy.ReplicateDeletion, + Speed: &policy.Speed, UpdateTime: strfmt.DateTime(policy.UpdateTime), } if policy.SrcRegistry != nil {