feat: introduce the copy by chunk for replication (#17602)

1. Add sql migration to alter replication policy table
2. Implement the PullBlobChunk and PushBlobChunk for the underlying v2 registry client
3. Update image transfer logic to support copy by chunk
4. Update the replication policy API handler

Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
Chlins Zhang 2022-11-01 11:19:17 +08:00 committed by GitHub
parent b91a97dd62
commit c330b8c63a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 655 additions and 249 deletions

View File

@ -6959,6 +6959,10 @@ definitions:
format: int32
description: speed limit for each task
x-isnullable: true # make this field optional to keep backward compatibility
copy_by_chunk:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
ReplicationTrigger:
type: object
properties:
@ -7116,6 +7120,11 @@ definitions:
description: The triggers that the registry supports
items:
type: string
supported_copy_by_chunk:
type: boolean
description: The registry whether support copy by chunk.
x-omitempty: true
x-isnullable: true
RegistryProviderInfo:
type: object
description: The registry provider info contains the base info and capability declarations of the registry provider

View File

@ -0,0 +1 @@
ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS copy_by_chunk boolean;

View File

@ -208,13 +208,13 @@ type abstractorTestSuite struct {
suite.Suite
argMgr *tart.Manager
blobMgr *tblob.Manager
regCli *registry.FakeClient
regCli *registry.Client
abstractor *abstractor
processor *tpro.Processor
}
func (a *abstractorTestSuite) SetupTest() {
a.regCli = &registry.FakeClient{}
a.regCli = &registry.Client{}
a.argMgr = &tart.Manager{}
a.blobMgr = &tblob.Manager{}
a.abstractor = &abstractor{
@ -236,7 +236,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV1Manifest() {
{Size: 10},
{Size: 20},
}, nil)
a.regCli.On("PullManifest").Return(manifest, "", nil)
a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil)
artifact := &artifact.Artifact{
ID: 1,
}
@ -252,7 +252,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV1Manifest() {
func (a *abstractorTestSuite) TestAbstractMetadataOfV2Manifest() {
manifest, _, err := distribution.UnmarshalManifest(schema2.MediaTypeManifest, []byte(v2Manifest))
a.Require().Nil(err)
a.regCli.On("PullManifest").Return(manifest, "", nil)
a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil)
artifact := &artifact.Artifact{
ID: 1,
}
@ -271,7 +271,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV2Manifest() {
func (a *abstractorTestSuite) TestAbstractMetadataOfIndex() {
manifest, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageIndex, []byte(index))
a.Require().Nil(err)
a.regCli.On("PullManifest").Return(manifest, "", nil)
a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil)
a.argMgr.On("GetByDigest", mock.Anything, mock.Anything, mock.Anything).Return(&artifact.Artifact{
ID: 2,
Size: 10,
@ -301,7 +301,7 @@ func (u *unknownManifest) Payload() (mediaType string, payload []byte, err error
// unknown
func (a *abstractorTestSuite) TestAbstractMetadataOfUnsupported() {
a.regCli.On("PullManifest").Return(&unknownManifest{}, "", nil)
a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(&unknownManifest{}, "", nil)
artifact := &artifact.Artifact{
ID: 1,
}

View File

@ -26,6 +26,7 @@ import (
"github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/pkg/distribution"
"github.com/goharbor/harbor/src/testing/mock"
reg "github.com/goharbor/harbor/src/testing/pkg/registry"
)
@ -172,12 +173,12 @@ var (
// v1alpha1TestSuite is a test suite of testing v1alpha1 parser
type v1alpha1TestSuite struct {
suite.Suite
regCli *reg.FakeClient
regCli *reg.Client
v1alpha1Parser *v1alpha1Parser
}
func (p *v1alpha1TestSuite) SetupTest() {
p.regCli = &reg.FakeClient{}
p.regCli = &reg.Client{}
p.v1alpha1Parser = &v1alpha1Parser{
regCli: p.regCli,
}
@ -196,7 +197,7 @@ func (p *v1alpha1TestSuite) TestParse() {
art := &artifact.Artifact{ManifestMediaType: manifestMediaType, ExtraAttrs: metadata}
blob := ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(ormbIcon)))
p.regCli.On("PullBlob").Return(0, blob, nil)
p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil)
err = p.v1alpha1Parser.Parse(nil, art, content)
p.Require().Nil(err)
p.Len(art.ExtraAttrs, 12)
@ -219,7 +220,7 @@ func (p *v1alpha1TestSuite) TestParse() {
art = &artifact.Artifact{ManifestMediaType: manifestMediaType, ExtraAttrs: metadata}
blob = ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(ormbIcon)))
p.regCli.On("PullBlob").Return(0, blob, nil)
p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil)
err = p.v1alpha1Parser.Parse(nil, art, content)
p.Require().Nil(err)
p.Len(art.ExtraAttrs, 13)

View File

@ -73,7 +73,7 @@ type controllerTestSuite struct {
labelMgr *label.Manager
abstractor *fakeAbstractor
immutableMtr *immutable.FakeMatcher
regCli *registry.FakeClient
regCli *registry.Client
accMgr *accessory.Manager
}
@ -87,7 +87,7 @@ func (c *controllerTestSuite) SetupTest() {
c.abstractor = &fakeAbstractor{}
c.immutableMtr = &immutable.FakeMatcher{}
c.accMgr = &accessorytesting.Manager{}
c.regCli = &registry.FakeClient{}
c.regCli = &registry.Client{}
c.ctl = &controller{
repoMgr: c.repoMgr,
artMgr: c.artMgr,
@ -562,7 +562,7 @@ func (c *controllerTestSuite) TestCopy() {
}, nil)
c.abstractor.On("AbstractMetadata").Return(nil)
c.artMgr.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
c.regCli.On("Copy").Return(nil)
c.regCli.On("Copy", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
c.tagCtl.On("Ensure").Return(nil)
c.accMgr.On("Ensure", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
_, err := c.ctl.Copy(orm.NewContext(nil, &ormtesting.FakeOrmer{}), "library/hello-world", "latest", "library/hello-world2")

View File

@ -19,10 +19,11 @@ import (
"strings"
"testing"
"github.com/opencontainers/image-spec/specs-go/v1"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/suite"
"github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/testing/mock"
"github.com/goharbor/harbor/src/testing/pkg/registry"
)
@ -125,11 +126,11 @@ const (
type manifestTestSuite struct {
suite.Suite
processor *ManifestProcessor
regCli *registry.FakeClient
regCli *registry.Client
}
func (m *manifestTestSuite) SetupTest() {
m.regCli = &registry.FakeClient{}
m.regCli = &registry.Client{}
m.processor = &ManifestProcessor{
RegCli: m.regCli,
}
@ -139,7 +140,7 @@ func (m *manifestTestSuite) TestAbstractMetadata() {
// abstract all properties
art := &artifact.Artifact{}
m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil)
m.processor.AbstractMetadata(nil, art, []byte(manifest))
m.Len(art.ExtraAttrs, 9)
@ -149,14 +150,14 @@ func (m *manifestTestSuite) TestAbstractMetadata() {
// abstract the specified properties
m.processor.properties = []string{"os"}
art = &artifact.Artifact{}
m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil)
m.processor.AbstractMetadata(nil, art, []byte(manifest))
m.Require().Len(art.ExtraAttrs, 1)
m.Equal("linux", art.ExtraAttrs["os"])
}
func (m *manifestTestSuite) TestUnmarshalConfig() {
m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil)
config := &v1.Image{}
err := m.processor.UnmarshalConfig(nil, "library/hello-world", []byte(manifest), config)
m.Require().Nil(err)

View File

@ -28,6 +28,7 @@ import (
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/artifact"
chartserver "github.com/goharbor/harbor/src/pkg/chart"
"github.com/goharbor/harbor/src/testing/mock"
"github.com/goharbor/harbor/src/testing/pkg/chart"
"github.com/goharbor/harbor/src/testing/pkg/registry"
)
@ -62,12 +63,12 @@ var (
type processorTestSuite struct {
suite.Suite
processor *processor
regCli *registry.FakeClient
regCli *registry.Client
chartOptr *chart.FakeOpertaor
}
func (p *processorTestSuite) SetupTest() {
p.regCli = &registry.FakeClient{}
p.regCli = &registry.Client{}
p.chartOptr = &chart.FakeOpertaor{}
p.processor = &processor{
chartOperator: p.chartOptr,
@ -103,8 +104,8 @@ func (p *processorTestSuite) TestAbstractAddition() {
artifact := &artifact.Artifact{}
manifest, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageManifest, []byte(chartManifest))
p.Require().Nil(err)
p.regCli.On("PullManifest").Return(manifest, "", nil)
p.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(chartYaml)), nil)
p.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil)
p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(chartYaml)), nil)
p.chartOptr.On("GetDetails").Return(chartDetails, nil)
// values.yaml

View File

@ -25,17 +25,18 @@ import (
"github.com/goharbor/harbor/src/controller/artifact/processor/base"
"github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/testing/mock"
"github.com/goharbor/harbor/src/testing/pkg/registry"
)
type processorTestSuite struct {
suite.Suite
processor *processor
regCli *registry.FakeClient
regCli *registry.Client
}
func (p *processorTestSuite) SetupTest() {
p.regCli = &registry.FakeClient{}
p.regCli = &registry.Client{}
p.processor = &processor{
manifestProcessor: &base.ManifestProcessor{
RegCli: p.regCli,
@ -93,8 +94,8 @@ func (p *processorTestSuite) TestAbstractMetadata() {
}
mani, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageManifest, []byte(manifest))
p.Require().Nil(err)
p.regCli.On("PullManifest").Return(mani, "", nil)
p.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil)
p.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(mani, "", nil)
p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil)
err = p.processor.AbstractMetadata(nil, art, nil)
p.Require().Nil(err)
p.Len(art.ExtraAttrs, 7)

View File

@ -122,11 +122,11 @@ type defaultProcessorTestSuite struct {
suite.Suite
processor *defaultProcessor
parser *parser.Parser
regCli *registry.FakeClient
regCli *registry.Client
}
func (d *defaultProcessorTestSuite) SetupTest() {
d.regCli = &registry.FakeClient{}
d.regCli = &registry.Client{}
d.processor = &defaultProcessor{
regCli: d.regCli,
}
@ -179,7 +179,7 @@ func (d *defaultProcessorTestSuite) TestAbstractMetadata() {
configBlob := ioutil.NopCloser(strings.NewReader(ormbConfig))
art := &artifact.Artifact{ManifestMediaType: manifestMediaType}
d.regCli.On("PullBlob").Return(0, configBlob, nil)
d.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), configBlob, nil)
d.parser.On("Parse", context.TODO(), mock.AnythingOfType("*artifact.Artifact"), mock.AnythingOfType("[]byte")).Return(nil)
err = d.processor.AbstractMetadata(nil, art, content)
d.Require().Nil(err)

View File

@ -131,18 +131,18 @@ var (
type manifestV2ProcessorTestSuite struct {
suite.Suite
processor *manifestV2Processor
regCli *registry.FakeClient
regCli *registry.Client
}
func (m *manifestV2ProcessorTestSuite) SetupTest() {
m.regCli = &registry.FakeClient{}
m.regCli = &registry.Client{}
m.processor = &manifestV2Processor{}
m.processor.ManifestProcessor = &base.ManifestProcessor{RegCli: m.regCli}
}
func (m *manifestV2ProcessorTestSuite) TestAbstractMetadata() {
artifact := &artifact.Artifact{}
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, ioutil.NopCloser(bytes.NewReader([]byte(config))), nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(bytes.NewReader([]byte(config))), nil)
err := m.processor.AbstractMetadata(nil, artifact, []byte(manifest))
m.Require().Nil(err)
m.NotNil(artifact.ExtraAttrs["created"])
@ -162,8 +162,8 @@ func (m *manifestV2ProcessorTestSuite) TestAbstractAddition() {
artifact := &artifact.Artifact{}
manifest, _, err := distribution.UnmarshalManifest(schema2.MediaTypeManifest, []byte(manifest))
m.Require().Nil(err)
m.regCli.On("PullManifest").Return(manifest, "", nil)
m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil)
m.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil)
addition, err := m.processor.AbstractAddition(nil, artifact, AdditionTypeBuildHistory)
m.Require().Nil(err)
m.Equal("application/json; charset=utf-8", addition.ContentType)

View File

@ -113,18 +113,18 @@ var (
type WASMProcessorTestSuite struct {
suite.Suite
processor *Processor
regCli *registry.FakeClient
regCli *registry.Client
}
func (m *WASMProcessorTestSuite) SetupTest() {
m.regCli = &registry.FakeClient{}
m.regCli = &registry.Client{}
m.processor = &Processor{}
m.processor.ManifestProcessor = &base.ManifestProcessor{RegCli: m.regCli}
}
func (m *WASMProcessorTestSuite) TestAbstractMetadataForAnnotationFashion() {
artifact := &artifact.Artifact{}
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, ioutil.NopCloser(bytes.NewReader([]byte(annnotated_config))), nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(bytes.NewReader([]byte(annnotated_config))), nil)
err := m.processor.AbstractMetadata(nil, artifact, []byte(annnotated_manifest))
m.Require().Nil(err)
m.NotNil(artifact.ExtraAttrs["created"])
@ -157,8 +157,8 @@ func (m *WASMProcessorTestSuite) TestAbstractAdditionForAnnotationFashion() {
err = json.Unmarshal([]byte(annnotated_manifest), &manifest)
deserializedManifest, err := schema2.FromStruct(manifest)
m.Require().Nil(err)
m.regCli.On("PullManifest").Return(deserializedManifest, "", nil)
m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(annnotated_config)), nil)
m.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(deserializedManifest, "", nil)
m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(annnotated_config)), nil)
addition, err := m.processor.AbstractAddition(nil, artifact, AdditionTypeBuildHistory)
m.Require().Nil(err)
m.Equal("application/json; charset=utf-8", addition.ContentType)

View File

@ -38,12 +38,12 @@ type controllerTestSuite struct {
suite.Suite
controller Controller
argMgr *artifact_testing.Manager
regCli *registry.FakeClient
regCli *registry.Client
}
func (c *controllerTestSuite) SetupTest() {
c.argMgr = &artifact_testing.Manager{}
c.regCli = &registry.FakeClient{}
c.regCli = &registry.Client{}
c.controller = &controller{
artMgr: c.argMgr,
regCli: c.regCli,
@ -68,7 +68,7 @@ func (c *controllerTestSuite) TestGet() {
},
}, nil)
blob := ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(iconStr)))
c.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, blob, nil)
c.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil)
icon, err := c.controller.Get(nil, "sha256:364feec11702f7ee079ba81da723438373afb0921f3646e9e5015406ee150986")
c.Require().Nil(err)
c.Require().NotNil(icon)

View File

@ -65,13 +65,13 @@ func (a *artifactControllerMock) GetByReference(ctx context.Context, repository,
type localHelperTestSuite struct {
suite.Suite
registryClient *testregistry.FakeClient
registryClient *testregistry.Client
local *localHelper
artCtl *artifactControllerMock
}
func (lh *localHelperTestSuite) SetupTest() {
lh.registryClient = &testregistry.FakeClient{}
lh.registryClient = &testregistry.Client{}
lh.artCtl = &artifactControllerMock{}
lh.local = &localHelper{registry: lh.registryClient, artifactCtl: lh.artCtl}
@ -82,7 +82,7 @@ func (lh *localHelperTestSuite) TestBlobExist_False() {
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
ctx := context.Background()
lh.registryClient.On("BlobExist").Return(false, nil)
lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil)
exist, err := lh.local.BlobExist(ctx, art)
lh.Require().Nil(err)
lh.Assert().Equal(false, exist)
@ -92,7 +92,7 @@ func (lh *localHelperTestSuite) TestBlobExist_True() {
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
art := lib.ArtifactInfo{Repository: repo, Digest: dig}
ctx := context.Background()
lh.registryClient.On("BlobExist").Return(true, nil)
lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil)
exist, err := lh.local.BlobExist(ctx, art)
lh.Require().Nil(err)
lh.Assert().Equal(true, exist)
@ -100,7 +100,7 @@ func (lh *localHelperTestSuite) TestBlobExist_True() {
func (lh *localHelperTestSuite) TestPushManifest() {
dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
lh.registryClient.On("PushManifest").Return(dig, nil)
lh.registryClient.On("PushManifest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(dig, nil)
manifest := &mockManifest{}
var ct string
manifest.Mock.On("Payload").Return(ct, []byte("example"), nil)
@ -117,7 +117,7 @@ func (lh *localHelperTestSuite) TestCheckDependencies_Fail() {
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
}
manifest.On("References").Return(refs)
lh.registryClient.On("BlobExist").Return(false, nil)
lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil)
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
lh.Assert().Equal(len(ret), 2)
}
@ -130,7 +130,7 @@ func (lh *localHelperTestSuite) TestCheckDependencies_Suc() {
{Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"},
}
manifest.On("References").Return(refs)
lh.registryClient.On("BlobExist").Return(true, nil)
lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil)
ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest)
lh.Assert().Equal(len(ret), 0)
}

View File

@ -283,6 +283,7 @@ func process(info *model.RegistryInfo) *model.RegistryInfo {
Values: values,
})
in.SupportedResourceFilters = filters
in.SupportedCopyByChunk = info.SupportedCopyByChunk
return in
}

View File

@ -92,7 +92,7 @@ func (c *copyFlow) Run(ctx context.Context) error {
return err
}
return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed)
return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed, c.policy.CopyByChunk)
}
func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
@ -103,7 +103,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
return execution.Status == job.StoppedStatus.String(), nil
}
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error {
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32, copyByChunk bool) error {
var taskCnt int
defer func() {
// if no task be created, mark execution done.
@ -137,9 +137,10 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
JobKind: job.KindGeneric,
},
Parameters: map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
"copy_by_chunk": copyByChunk,
},
}

View File

@ -272,6 +272,36 @@ func (_m *mockAdapter) PullBlob(repository string, digest string) (int64, io.Rea
return r0, r1, r2
}
// PullBlobChunk provides a mock function with given fields: repository, digest, blobSize, start, end
func (_m *mockAdapter) PullBlobChunk(repository string, digest string, blobSize int64, start int64, end int64) (int64, io.ReadCloser, error) {
ret := _m.Called(repository, digest, blobSize, start, end)
var r0 int64
if rf, ok := ret.Get(0).(func(string, string, int64, int64, int64) int64); ok {
r0 = rf(repository, digest, blobSize, start, end)
} else {
r0 = ret.Get(0).(int64)
}
var r1 io.ReadCloser
if rf, ok := ret.Get(1).(func(string, string, int64, int64, int64) io.ReadCloser); ok {
r1 = rf(repository, digest, blobSize, start, end)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(io.ReadCloser)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(string, string, int64, int64, int64) error); ok {
r2 = rf(repository, digest, blobSize, start, end)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// PullManifest provides a mock function with given fields: repository, reference, accepttedMediaTypes
func (_m *mockAdapter) PullManifest(repository string, reference string, accepttedMediaTypes ...string) (distribution.Manifest, string, error) {
_va := make([]interface{}, len(accepttedMediaTypes))
@ -323,6 +353,34 @@ func (_m *mockAdapter) PushBlob(repository string, digest string, size int64, bl
return r0
}
// PushBlobChunk provides a mock function with given fields: repository, digest, size, chunk, start, end, location
func (_m *mockAdapter) PushBlobChunk(repository string, digest string, size int64, chunk io.Reader, start int64, end int64, location string) (string, int64, error) {
ret := _m.Called(repository, digest, size, chunk, start, end, location)
var r0 string
if rf, ok := ret.Get(0).(func(string, string, int64, io.Reader, int64, int64, string) string); ok {
r0 = rf(repository, digest, size, chunk, start, end, location)
} else {
r0 = ret.Get(0).(string)
}
var r1 int64
if rf, ok := ret.Get(1).(func(string, string, int64, io.Reader, int64, int64, string) int64); ok {
r1 = rf(repository, digest, size, chunk, start, end, location)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(string, string, int64, io.Reader, int64, int64, string) error); ok {
r2 = rf(repository, digest, size, chunk, start, end, location)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// PushManifest provides a mock function with given fields: repository, reference, mediaType, payload
func (_m *mockAdapter) PushManifest(repository string, reference string, mediaType string, payload []byte) (string, error) {
ret := _m.Called(repository, reference, mediaType, payload)

View File

@ -46,6 +46,7 @@ type Policy struct {
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
}
// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
@ -132,6 +133,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.CreationTime = policy.CreationTime
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
@ -176,6 +178,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
CreationTime: p.CreationTime,
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID

View File

@ -49,7 +49,7 @@ type transfer struct {
dst adapter.ChartRegistry
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, opts *trans.Options) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
@ -78,7 +78,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int3
version: dst.Metadata.Artifacts[0].Tags[0],
}
// copy the chart from source registry to the destination
return t.copy(srcChart, dstChart, dst.Override, speed)
return t.copy(srcChart, dstChart, dst.Override, opts)
}
func (t *transfer) initialize(src, dst *model.Resource) error {
@ -129,7 +129,7 @@ func (t *transfer) shouldStop() bool {
return isStopped
}
func (t *transfer) copy(src, dst *chart, override bool, speed int32) error {
func (t *transfer) copy(src, dst *chart, override bool, opts *trans.Options) error {
if t.shouldStop() {
return nil
}
@ -160,9 +160,9 @@ func (t *transfer) copy(src, dst *chart, override bool, speed int32) error {
t.logger.Errorf("failed to download the chart %s:%s: %v", src.name, src.version, err)
return err
}
if speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", speed)
chart = trans.NewReader(chart, speed)
if opts.Speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", opts.Speed)
chart = trans.NewReader(chart, opts.Speed)
}
defer chart.Close()

View File

@ -97,7 +97,7 @@ func TestCopy(t *testing.T) {
name: "dest/harbor",
version: "0.2.0",
}
err := transfer.copy(src, dst, true, 0)
err := transfer.copy(src, dst, true, trans.NewOptions())
assert.Nil(t, err)
}

View File

@ -36,15 +36,29 @@ import (
)
var (
retry int
errStopped = errors.New("stopped")
blobRetryCnt, chunkRetryCnt int
replicationChunkSize int64
errStopped = errors.New("stopped")
// default chunk size is 10MB
defaultChunkSize = 10 * 1024 * 1024
)
func init() {
retry, _ = strconv.Atoi(os.Getenv("COPY_BLOB_RETRY_COUNT"))
if retry <= 0 {
retry = 5
blobRetryCnt, _ = strconv.Atoi(os.Getenv("COPY_BLOB_RETRY_COUNT"))
if blobRetryCnt <= 0 {
blobRetryCnt = 5
}
chunkRetryCnt, _ = strconv.Atoi(os.Getenv("COPY_CHUNK_RETRY_COUNT"))
if chunkRetryCnt <= 0 {
chunkRetryCnt = 5
}
replicationChunkSize, _ = strconv.ParseInt(os.Getenv("REPLICATION_CHUNK_SIZE"), 10, 64)
if replicationChunkSize <= 0 {
replicationChunkSize = int64(defaultChunkSize)
}
if err := trans.RegisterFactory(model.ResourceTypeImage, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err)
}
@ -70,10 +84,9 @@ type transfer struct {
isStopped trans.StopFunc
src adapter.ArtifactRegistry
dst adapter.ArtifactRegistry
speed int32
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, opts *trans.Options) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
@ -90,7 +103,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int3
}
// copy the repository from source registry to the destination
return t.copy(t.convert(src), t.convert(dst), dst.Override, speed)
return t.copy(t.convert(src), t.convert(dst), dst.Override, opts)
}
func (t *transfer) convert(resource *model.Resource) *repository {
@ -163,18 +176,18 @@ func (t *transfer) shouldStop() bool {
return isStopped
}
func (t *transfer) copy(src *repository, dst *repository, override bool, speed int32) error {
func (t *transfer) copy(src *repository, dst *repository, override bool, opts *trans.Options) error {
srcRepo := src.repository
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
if speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", speed)
if opts.Speed > 0 {
t.logger.Infof("limit network speed at %d kb/s", opts.Speed)
}
var err error
for i := range src.tags {
if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, speed); e != nil {
if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, opts); e != nil {
if e == errStopped {
return nil
}
@ -193,7 +206,7 @@ func (t *transfer) copy(src *repository, dst *repository, override bool, speed i
return nil
}
func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, speed int32) error {
func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, opts *trans.Options) error {
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcRef, dstRepo, dstRef)
// pull the manifest from the source registry
@ -227,7 +240,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid
// copy contents between the source and destination registries
for _, content := range manifest.References() {
if err = t.copyContent(content, srcRepo, dstRepo, speed); err != nil {
if err = t.copyContent(content, srcRepo, dstRepo, opts); err != nil {
return err
}
}
@ -243,7 +256,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid
}
// copy the content from source registry to destination according to its media type
func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, speed int32) error {
func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, opts *trans.Options) error {
digest := content.Digest.String()
switch content.MediaType {
// when the media type of pulled manifest is index,
@ -252,7 +265,7 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo
v1.MediaTypeImageManifest, schema2.MediaTypeManifest,
schema1.MediaTypeSignedManifest, schema1.MediaTypeManifest:
// as using digest as the reference, so set the override to true directly
return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, speed)
return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, opts)
// handle foreign layer
case schema2.MediaTypeForeignLayer:
t.logger.Infof("the layer %s is a foreign layer, skip", digest)
@ -261,19 +274,24 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo
// the media type of the layer or config can be "application/octet-stream",
// schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig
default:
return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, speed)
if opts.CopyByChunk {
// copy by chunk
return t.copyChunkWithRetry(srcRepo, dstRepo, digest, content.Size, opts.Speed)
}
// copy by blob
return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, opts.Speed)
}
}
func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
var err error
for i, backoff := 1, 2*time.Second; i <= retry; i, backoff = i+1, backoff*2 {
for i, backoff := 1, 2*time.Second; i <= blobRetryCnt; i, backoff = i+1, backoff*2 {
t.logger.Infof("copying the blob %s(the %dth running)...", digest, i)
if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor, speed); err == nil {
t.logger.Infof("copy the blob %s completed", digest)
return nil
}
if i == retry || err == errStopped {
if i == blobRetryCnt || err == errStopped {
break
}
t.logger.Infof("will retry %v later", backoff)
@ -282,36 +300,80 @@ func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDe
return err
}
// copy the layer or artifact config from the source registry to destination
// the size parameter is taken from manifests.
func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
func (t *transfer) copyChunkWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
var (
err error
location string
start int64 = -1
end int64 = -1
)
for i, backoff := 1, 2*time.Second; i <= chunkRetryCnt; i, backoff = i+1, backoff*2 {
t.logger.Infof("copying the blob %s by chunk(chunkSize: %d)(the %dth running)...", digest, replicationChunkSize, i)
if err = t.copyBlobByChunk(srcRepo, dstRepo, digest, sizeFromDescriptor, &start, &end, &location, speed); err == nil {
t.logger.Infof("copy the blob %s by chunk completed", digest)
return nil
}
if i == chunkRetryCnt || err == errStopped {
break
}
t.logger.Infof("will retry %v later", backoff)
time.Sleep(backoff)
}
return err
}
// tryMountBlob try to check existence and mount, return true if mounted.
func (t *transfer) tryMountBlob(srcRepo, dstRepo, digest string) (bool, error) {
if t.shouldStop() {
return errStopped
return false, errStopped
}
exist, err := t.dst.BlobExist(dstRepo, digest)
if err != nil {
t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err)
return err
return false, err
}
if exist {
t.logger.Infof("the blob %s already exists on the destination registry, skip", digest)
return nil
// we think the blob is mounted if it is existed.
return true, nil
}
mount, repository, err := t.dst.CanBeMount(digest)
if err != nil {
t.logger.Errorf("failed to check whether the blob %s can be mounted on the destination registry: %v", digest, err)
return err
return false, err
}
if mount {
if err = t.dst.MountBlob(repository, digest, dstRepo); err != nil {
t.logger.Errorf("failed to mount the blob %s on the destination registry: %v", digest, err)
return err
return false, err
}
t.logger.Infof("the blob %s mounted from the repository %s on the destination registry directly", digest, repository)
return true, nil
}
return false, nil
}
// copy the layer or artifact config from the source registry to destination
// the size parameter is taken from manifests.
func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
mounted, err := t.tryMountBlob(srcRepo, dstRepo, digest)
if err != nil {
return err
}
// return earlier if it is mounted
if mounted {
return nil
}
return t.copyBlobByMonolithic(srcRepo, dstRepo, digest, sizeFromDescriptor, speed)
}
func (t *transfer) copyBlobByMonolithic(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error {
size, data, err := t.src.PullBlob(srcRepo, digest)
if err != nil {
t.logger.Errorf("failed to pulling the blob %s: %v", digest, err)
@ -333,6 +395,67 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor
t.logger.Errorf("failed to pushing the blob %s, size %d: %v", digest, size, err)
return err
}
return nil
}
// copyBlobByChunk copy blob by chunk with specified start and end range.
// The <range> refers to the byte range of the chunk, and MUST be inclusive on both ends. The first chunk's range MUST begin with 0.
func (t *transfer) copyBlobByChunk(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, start, end *int64, location *string, speed int32) error {
// fallback to copy by monolithic if the blob size is equal or less than chunk size.
if sizeFromDescriptor <= replicationChunkSize {
return t.copyBlobByMonolithic(srcRepo, dstRepo, digest, sizeFromDescriptor, speed)
}
mounted, err := t.tryMountBlob(srcRepo, dstRepo, digest)
if err != nil {
return err
}
// return earlier if it is mounted.
if mounted {
return nil
}
// end range should equal (blobSize - 1)
endRange := sizeFromDescriptor - 1
for {
// update the start and end for upload
*start = *end + 1
// since both ends are closed intervals, it is necessary to subtract one byte
*end = *start + replicationChunkSize - 1
if *end >= endRange {
*end = endRange
}
t.logger.Infof("copying the blob chunk: %d-%d/%d", *start, *end, sizeFromDescriptor)
_, data, err := t.src.PullBlobChunk(srcRepo, digest, sizeFromDescriptor, *start, *end)
if err != nil {
t.logger.Errorf("failed to pulling the blob chunk: %d-%d/%d, error: %v", *start, *end, sizeFromDescriptor, err)
return err
}
if speed > 0 {
data = trans.NewReader(data, speed)
}
// failureEnd will only be used for adjusting content range when issue happened during push the chunk.
var failureEnd int64
*location, failureEnd, err = t.dst.PushBlobChunk(dstRepo, digest, sizeFromDescriptor, data, *start, *end, *location)
if err != nil {
t.logger.Errorf("failed to pushing the blob chunk: %d-%d/%d, error: %v", *start, *end, sizeFromDescriptor, err)
data.Close()
*end = failureEnd
return err
}
data.Close()
t.logger.Infof("copy the blob chunk: %d-%d/%d completed", *start, *end, sizeFromDescriptor)
// if the end equals (blobSize-1), that means it is last chunk, return if this is the last chunk
if *end == endRange {
break
}
}
return nil
}

View File

@ -91,9 +91,16 @@ func (f *fakeRegistry) PullBlob(repository, digest string) (size int64, blob io.
r := ioutil.NopCloser(bytes.NewReader([]byte{'a'}))
return 1, r, nil
}
func (f *fakeRegistry) PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error) {
r := ioutil.NopCloser(bytes.NewReader([]byte{'a'}))
return 1, r, nil
}
func (f *fakeRegistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakeRegistry) PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error) {
return "", -1, nil
}
func (f *fakeRegistry) DeleteTag(repository, tag string) error {
return nil
}
@ -103,7 +110,6 @@ func (f *fakeRegistry) CanBeMount(digest string) (bool, string, error) {
func (f *fakeRegistry) MountBlob(srcRepository, digest, dstRepository string) error {
return nil
}
func (f *fakeRegistry) ListTags(repository string) (tags []string, err error) {
return nil, nil
}
@ -149,7 +155,28 @@ func TestCopy(t *testing.T) {
repository: "destination",
tags: []string{"b1", "b2"},
}
err := tr.copy(src, dst, true, 0)
err := tr.copy(src, dst, true, trans.NewOptions())
require.Nil(t, err)
}
func TestCopyByChunk(t *testing.T) {
stopFunc := func() bool { return false }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
src: &fakeRegistry{},
dst: &fakeRegistry{},
}
src := &repository{
repository: "source",
tags: []string{"a1", "a2"},
}
dst := &repository{
repository: "destination",
tags: []string{"b1", "b2"},
}
err := tr.copy(src, dst, true, trans.NewOptions(trans.WithCopyByChunk(true)))
require.Nil(t, err)
}

View File

@ -0,0 +1,45 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
type Option func(*Options)
type Options struct {
// Speed is the data transfer speed for replication, no limit by default.
Speed int32
// CopyByChunk defines whether need to copy the artifact blob by chunk, copy by whole blob by default.
CopyByChunk bool
}
func NewOptions(opts ...Option) *Options {
o := &Options{}
for _, opt := range opts {
opt(o)
}
return o
}
func WithSpeed(speed int32) Option {
return func(o *Options) {
o.Speed = speed
}
}
func WithCopyByChunk(copyByChunk bool) Option {
return func(o *Options) {
o.CopyByChunk = copyByChunk
}
}

View File

@ -0,0 +1,37 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNewOptions(t *testing.T) {
// test default options
o := NewOptions()
assert.Equal(t, int32(0), o.Speed)
assert.Equal(t, false, o.CopyByChunk)
// test with options
// with speed
withSpeed := WithSpeed(1024)
// with copy by chunk
withCopyByChunk := WithCopyByChunk(true)
o = NewOptions(withSpeed, withCopyByChunk)
assert.Equal(t, int32(1024), o.Speed)
assert.Equal(t, true, o.CopyByChunk)
}

View File

@ -34,7 +34,7 @@ type Factory func(Logger, StopFunc) (Transfer, error)
// Transfer defines an interface used to transfer the source
// resource to the destination
type Transfer interface {
Transfer(src *model.Resource, dst *model.Resource, speed int32) error
Transfer(src *model.Resource, dst *model.Resource, opts *Options) error
}
// Logger defines an interface for logging

View File

@ -55,7 +55,7 @@ func (r *Replication) Validate(params job.Parameters) error {
func (r *Replication) Run(ctx job.Context, params job.Parameters) error {
logger := ctx.GetLogger()
src, dst, speed, err := parseParams(params)
src, dst, opts, err := parseParams(params)
if err != nil {
logger.Errorf("failed to parse parameters: %v", err)
return err
@ -80,18 +80,20 @@ func (r *Replication) Run(ctx job.Context, params job.Parameters) error {
return err
}
return trans.Transfer(src, dst, speed)
return trans.Transfer(src, dst, opts)
}
func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, int32, error) {
func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, *transfer.Options, error) {
src := &model.Resource{}
if err := parseParam(params, "src_resource", src); err != nil {
return nil, nil, 0, err
return nil, nil, nil, err
}
dst := &model.Resource{}
if err := parseParam(params, "dst_resource", dst); err != nil {
return nil, nil, 0, err
return nil, nil, nil, err
}
var speed int32
value, exist := params["speed"]
if !exist {
@ -106,12 +108,25 @@ func parseParams(params map[string]interface{}) (*model.Resource, *model.Resourc
if s, ok := value.(float64); ok {
speed = int32(s)
} else {
return nil, nil, 0, fmt.Errorf("the value of speed isn't integer (%T)", value)
return nil, nil, nil, fmt.Errorf("the value of speed isn't integer (%T)", value)
}
}
}
}
return src, dst, speed, nil
var copyByChunk bool
value, exist = params["copy_by_chunk"]
if exist {
if boolVal, ok := value.(bool); ok {
copyByChunk = boolVal
}
}
opts := transfer.NewOptions(
transfer.WithSpeed(speed),
transfer.WithCopyByChunk(copyByChunk),
)
return src, dst, opts, nil
}
func parseParam(params map[string]interface{}, name string, v interface{}) error {

View File

@ -51,14 +51,17 @@ func TestParseParam(t *testing.T) {
func TestParseParams(t *testing.T) {
params := map[string]interface{}{
"src_resource": `{"type":"chart"}`,
"dst_resource": `{"type":"chart"}`,
"src_resource": `{"type":"chart"}`,
"dst_resource": `{"type":"chart"}`,
"speed": 1024,
"copy_by_chunk": true,
}
res, dst, speed, err := parseParams(params)
res, dst, opts, err := parseParams(params)
require.Nil(t, err)
assert.Equal(t, "chart", string(res.Type))
assert.Equal(t, "chart", string(dst.Type))
assert.Equal(t, int32(0), speed)
assert.Equal(t, int32(1024), opts.Speed)
assert.True(t, opts.CopyByChunk)
}
func TestMaxFails(t *testing.T) {
@ -84,7 +87,7 @@ var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Tr
type fakedTransfer struct{}
func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error {
func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, opts *transfer.Options) error {
transferred = true
return nil
}

View File

@ -60,6 +60,8 @@ type ArtifactRegistry interface {
DeleteManifest(repository, reference string) error // the "reference" can be "tag" or "digest", the function needs to handle both
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error)
PushBlobChunk(repository, digest string, size int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
MountBlob(srcRepository, digest, dstRepository string) (err error)
CanBeMount(digest string) (mount bool, repository string, err error) // check whether the blob can be mounted from the remote registry

View File

@ -116,6 +116,7 @@ func (a *Adapter) Info() (*model.RegistryInfo, error) {
model.TriggerTypeScheduled,
},
SupportedRepositoryPathComponentType: model.RepositoryPathComponentTypeAtLeastTwo,
SupportedCopyByChunk: true,
}
enabled, err := a.Client.ChartRegistryEnabled()

View File

@ -80,6 +80,7 @@ func TestInfo(t *testing.T) {
assert.Equal(t, 2, len(info.SupportedTriggers))
assert.Equal(t, 1, len(info.SupportedResourceTypes))
assert.Equal(t, model.ResourceTypeImage, info.SupportedResourceTypes[0])
assert.Equal(t, true, info.SupportedCopyByChunk)
server.Close()
}

View File

@ -139,6 +139,7 @@ type RegistryInfo struct {
SupportedResourceFilters []*FilterStyle `json:"supported_resource_filters"`
SupportedTriggers []string `json:"supported_triggers"`
SupportedRepositoryPathComponentType string `json:"supported_repository_path_component_type"` // how many path components are allowed in the repository name
SupportedCopyByChunk bool `json:"supported_copy_by_chunk,omitempty"`
}
// AdapterPattern provides base info and capability declarations of the registry

View File

@ -86,8 +86,12 @@ type Client interface {
BlobExist(repository, digest string) (exist bool, err error)
// PullBlob pulls the specified blob. The caller must close the returned "blob"
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
// PullBlobChunk pulls the specified blob, but by chunked
PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error)
// PushBlob pushes the specified blob
PushBlob(repository, digest string, size int64, blob io.Reader) error
// PushBlobChunk pushes the specified blob, but by chunked
PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error)
// MountBlob mounts the blob from the source repository
MountBlob(srcRepository, digest, dstRepository string) (err error)
// DeleteBlob deletes the specified blob
@ -371,6 +375,34 @@ func (c *client) PullBlob(repository, digest string) (int64, io.ReadCloser, erro
return size, resp.Body, nil
}
// PullBlobChunk pulls the specified blob, but by chunked, refer to https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pull for more details.
func (c *client) PullBlobChunk(repository, digest string, blobSize int64, start, end int64) (int64, io.ReadCloser, error) {
req, err := http.NewRequest(http.MethodGet, buildBlobURL(c.url, repository, digest), nil)
if err != nil {
return 0, nil, err
}
req.Header.Add("Accept-Encoding", "identity")
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := c.do(req)
if err != nil {
return 0, nil, err
}
var size int64
n := resp.Header.Get("Content-Length")
// no content-length is acceptable, which can taken from manifests
if len(n) > 0 {
size, err = strconv.ParseInt(n, 10, 64)
if err != nil {
defer resp.Body.Close()
return 0, nil, err
}
}
return size, resp.Body, nil
}
func (c *client) PushBlob(repository, digest string, size int64, blob io.Reader) error {
location, _, err := c.initiateBlobUpload(repository)
if err != nil {
@ -379,6 +411,90 @@ func (c *client) PushBlob(repository, digest string, size int64, blob io.Reader)
return c.monolithicBlobUpload(location, digest, size, blob)
}
// PushBlobChunk pushes the specified blob, but by chunked, refer to https://github.com/opencontainers/distribution-spec/blob/main/spec.md#push for more details.
func (c *client) PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (string, int64, error) {
var err error
// first chunk need to initialize blob upload location
if start == 0 {
location, _, err = c.initiateBlobUpload(repository)
if err != nil {
return location, end, err
}
}
// the range is from 0 to (blobSize-1), so (end == blobSize-1) means it is last chunk
lastChunk := end == blobSize-1
url, err := buildChunkBlobUploadURL(c.url, location, digest, lastChunk)
if err != nil {
return location, end, err
}
// use PUT instead of PATCH for last chunk which can reduce a final request
method := http.MethodPatch
if lastChunk {
method = http.MethodPut
}
req, err := http.NewRequest(method, url, chunk)
if err != nil {
return location, end, err
}
req.Header.Set("Content-Length", fmt.Sprintf("%d", end-start+1))
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", start, end))
resp, err := c.do(req)
if err != nil {
// if push chunk error, we should query the upload progress for new location and end range.
newLocation, newEnd, err1 := c.getUploadStatus(location)
if err1 == nil {
return newLocation, newEnd, err
}
// end should return start-1 to re-push this chunk
return location, start - 1, fmt.Errorf("failed to get upload status: %w", err1)
}
defer resp.Body.Close()
// return the location for next chunk upload
return resp.Header.Get("Location"), end, nil
}
func (c *client) getUploadStatus(location string) (string, int64, error) {
req, err := http.NewRequest(http.MethodGet, location, nil)
if err != nil {
return location, -1, err
}
resp, err := c.do(req)
if err != nil {
return location, -1, err
}
defer resp.Body.Close()
_, end, err := parseContentRange(resp.Header.Get("Range"))
if err != nil {
return location, -1, err
}
return resp.Header.Get("Location"), end, nil
}
func parseContentRange(cr string) (int64, int64, error) {
ranges := strings.Split(cr, "-")
if len(ranges) != 2 {
return -1, -1, fmt.Errorf("invalid content range format, %s", cr)
}
start, err := strconv.ParseInt(ranges[0], 10, 64)
if err != nil {
return -1, -1, err
}
end, err := strconv.ParseInt(ranges[1], 10, 64)
if err != nil {
return -1, -1, err
}
return start, end, nil
}
func (c *client) initiateBlobUpload(repository string) (string, string, error) {
req, err := http.NewRequest(http.MethodPost, buildInitiateBlobUploadURL(c.url, repository), nil)
if err != nil {
@ -585,6 +701,23 @@ func buildInitiateBlobUploadURL(endpoint, repository string) string {
return fmt.Sprintf("%s/v2/%s/blobs/uploads/", endpoint, repository)
}
func buildChunkBlobUploadURL(endpoint, location, digest string, lastChunk bool) (string, error) {
url, err := url.Parse(location)
if err != nil {
return "", err
}
q := url.Query()
if lastChunk {
q.Set("digest", digest)
}
url.RawQuery = q.Encode()
if url.IsAbs() {
return url.String(), nil
}
// the "relativeurls" is enabled in registry
return endpoint + url.String(), nil
}
func buildMonolithicBlobUploadURL(endpoint, location, digest string) (string, error) {
url, err := url.Parse(location)
if err != nil {

View File

@ -42,6 +42,7 @@ type Policy struct {
CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"`
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
CopyByChunk bool `orm:"column(copy_by_chunk)"`
}
// TableName set table name for ORM

View File

@ -24,7 +24,7 @@ import (
type ManagerTestSuite struct {
suite.Suite
regCli *registrytesting.FakeClient
regCli *registrytesting.Client
dao *sysartifactdaotesting.DAO
mgr *systemArtifactManager
cleanupCriteria *cleanup.Selector
@ -35,7 +35,7 @@ func (suite *ManagerTestSuite) SetupSuite() {
}
func (suite *ManagerTestSuite) SetupTest() {
suite.regCli = &registrytesting.FakeClient{}
suite.regCli = &registrytesting.Client{}
suite.dao = &sysartifactdaotesting.DAO{}
suite.cleanupCriteria = &cleanup.Selector{}
suite.mgr = &systemArtifactManager{
@ -62,7 +62,7 @@ func (suite *ManagerTestSuite) TestCreate() {
id, err := suite.mgr.Create(orm.NewContext(nil, &ormtesting.FakeOrmer{}), &sa, reader)
suite.Equalf(int64(1), id, "Expected row to correctly inserted")
suite.NoErrorf(err, "Unexpected error when creating artifact: %v", err)
suite.regCli.AssertCalled(suite.T(), "PushBlob")
suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func (suite *ManagerTestSuite) TestCreateTimeNotSet() {
@ -79,7 +79,7 @@ func (suite *ManagerTestSuite) TestCreateTimeNotSet() {
id, err := suite.mgr.Create(orm.NewContext(nil, &ormtesting.FakeOrmer{}), &sa, reader)
suite.Equalf(int64(1), id, "Expected row to correctly inserted")
suite.NoErrorf(err, "Unexpected error when creating artifact: %v", err)
suite.regCli.AssertCalled(suite.T(), "PushBlob")
suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
suite.False(sa.CreateTime.IsZero(), "Create time expected to be set")
}
@ -101,7 +101,7 @@ func (suite *ManagerTestSuite) TestCreatePushBlobFails() {
suite.Equalf(int64(0), id, "Expected no row to be inserted")
suite.Errorf(err, "Expected error when creating artifact: %v", err)
suite.dao.AssertCalled(suite.T(), "Create", mock.Anything, &sa, mock.Anything)
suite.regCli.AssertCalled(suite.T(), "PushBlob")
suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func (suite *ManagerTestSuite) TestCreateArtifactRecordFailure() {
@ -148,13 +148,13 @@ func (suite *ManagerTestSuite) TestRead() {
defer repoHandle.Close()
suite.dao.On("Get", mock.Anything, "test_vendor", "test_repo", "test_digest").Return(&sa, nil).Once()
suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(len(data), repoHandle, nil).Once()
suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(len(data)), repoHandle, nil).Once()
readCloser, err := suite.mgr.Read(context.TODO(), "test_vendor", "test_repo", "test_digest")
suite.NoErrorf(err, "Unexpected error when reading artifact: %v", err)
suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest")
suite.regCli.AssertCalled(suite.T(), "PullBlob")
suite.regCli.AssertCalled(suite.T(), "PullBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
suite.NotNilf(readCloser, "Expected valid read closer instance but was nil")
}
@ -172,7 +172,7 @@ func (suite *ManagerTestSuite) TestReadSystemArtifactRecordNotFound() {
errToRet := orm.ErrNoRows
suite.dao.On("Get", mock.Anything, "test_vendor", "test_repo", "test_digest").Return(nil, errToRet).Once()
suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(len(data), repoHandle, nil).Once()
suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(len(data)), repoHandle, nil).Once()
readCloser, err := suite.mgr.Read(context.TODO(), "test_vendor", "test_repo", "test_digest")
@ -238,7 +238,7 @@ func (suite *ManagerTestSuite) TestExist() {
suite.NoErrorf(err, "Unexpected error when checking if artifact exists: %v", err)
suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest")
suite.regCli.AssertCalled(suite.T(), "BlobExist")
suite.regCli.AssertCalled(suite.T(), "BlobExist", mock.Anything, mock.Anything)
suite.True(exists, "Expected exists to be true but was false")
}
@ -276,7 +276,7 @@ func (suite *ManagerTestSuite) TestExistSystemArtifactBlobReadError() {
suite.Error(err, "Expected error when checking if artifact exists")
suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest")
suite.regCli.AssertCalled(suite.T(), "BlobExist")
suite.regCli.AssertCalled(suite.T(), "BlobExist", mock.Anything, mock.Anything)
suite.False(exists, "Expected exists to be false but was true")
}

View File

@ -186,6 +186,12 @@ func (r *registryAPI) GetRegistryInfo(ctx context.Context, params operation.GetR
for _, trigger := range info.SupportedTriggers {
in.SupportedTriggers = append(in.SupportedTriggers, string(trigger))
}
// whether support copy by chunk
if info.SupportedCopyByChunk {
in.SupportedCopyByChunk = &info.SupportedCopyByChunk
}
return operation.NewGetRegistryInfoOK().WithPayload(in)
}

View File

@ -108,6 +108,11 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope
}
policy.Speed = *params.Policy.Speed
}
if params.Policy.CopyByChunk != nil {
policy.CopyByChunk = *params.Policy.CopyByChunk
}
id, err := r.ctl.CreatePolicy(ctx, policy)
if err != nil {
return r.SendError(ctx, err)
@ -171,6 +176,11 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope
}
policy.Speed = *params.Policy.Speed
}
if params.Policy.CopyByChunk != nil {
policy.CopyByChunk = *params.Policy.CopyByChunk
}
if err := r.ctl.UpdatePolicy(ctx, policy); err != nil {
return r.SendError(ctx, err)
}
@ -429,6 +439,7 @@ func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPol
ReplicateDeletion: policy.ReplicateDeletion,
Speed: &policy.Speed,
UpdateTime: strfmt.DateTime(policy.UpdateTime),
CopyByChunk: &policy.CopyByChunk,
}
if policy.SrcRegistry != nil {
p.SrcRegistry = convertRegistry(policy.SrcRegistry)

View File

@ -1,135 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"io"
"net/http"
"github.com/docker/distribution"
"github.com/stretchr/testify/mock"
)
// FakeClient is a fake registry client that implement src/pkg/registry.Client interface
type FakeClient struct {
mock.Mock
}
// Ping ...
func (f *FakeClient) Ping() (err error) {
args := f.Called()
return args.Error(0)
}
// Catalog ...
func (f *FakeClient) Catalog() ([]string, error) {
args := f.Called()
var repositories []string
if args[0] != nil {
repositories = args[0].([]string)
}
return repositories, args.Error(1)
}
// ListTags ...
func (f *FakeClient) ListTags(repository string) ([]string, error) {
args := f.Called()
var tags []string
if args[0] != nil {
tags = args[0].([]string)
}
return tags, args.Error(1)
}
// ManifestExist ...
func (f *FakeClient) ManifestExist(repository, reference string) (bool, *distribution.Descriptor, error) {
args := f.Called()
var desc *distribution.Descriptor
if args[0] != nil {
desc = args[0].(*distribution.Descriptor)
}
return args.Bool(0), desc, args.Error(2)
}
// PullManifest ...
func (f *FakeClient) PullManifest(repository, reference string, acceptedMediaTypes ...string) (distribution.Manifest, string, error) {
args := f.Called()
var manifest distribution.Manifest
if args[0] != nil {
manifest = args[0].(distribution.Manifest)
}
return manifest, args.String(1), args.Error(2)
}
// PushManifest ...
func (f *FakeClient) PushManifest(repository, reference, mediaType string, payload []byte) (string, error) {
args := f.Called()
return args.String(0), args.Error(1)
}
// DeleteManifest ...
func (f *FakeClient) DeleteManifest(repository, reference string) error {
args := f.Called()
return args.Error(0)
}
// BlobExist ...
func (f *FakeClient) BlobExist(repository, digest string) (bool, error) {
args := f.Called()
return args.Bool(0), args.Error(1)
}
// PullBlob ...
func (f *FakeClient) PullBlob(repository, digest string) (int64, io.ReadCloser, error) {
args := f.Called()
var blob io.ReadCloser
if args[1] != nil {
blob = args[1].(io.ReadCloser)
}
return int64(args.Int(0)), blob, args.Error(2)
}
// PushBlob ...
func (f *FakeClient) PushBlob(repository, digest string, size int64, blob io.Reader) error {
args := f.Called()
return args.Error(0)
}
// MountBlob ...
func (f *FakeClient) MountBlob(srcRepository, digest, dstRepository string) (err error) {
args := f.Called()
return args.Error(0)
}
// DeleteBlob ...
func (f *FakeClient) DeleteBlob(repository, digest string) (err error) {
args := f.Called(repository, digest)
return args.Error(0)
}
// Copy ...
func (f *FakeClient) Copy(srcRepo, srcRef, dstRepo, dstRef string, override bool) error {
args := f.Called()
return args.Error(0)
}
func (f *FakeClient) Do(req *http.Request) (*http.Response, error) {
args := f.Called()
var resp *http.Response
if args[0] != nil {
resp = args[0].(*http.Response)
}
return resp, args.Error(1)
}

View File

@ -237,6 +237,36 @@ func (_m *Client) PullBlob(repository string, digest string) (int64, io.ReadClos
return r0, r1, r2
}
// PullBlobChunk provides a mock function with given fields: repository, digest, blobSize, start, end
func (_m *Client) PullBlobChunk(repository string, digest string, blobSize int64, start int64, end int64) (int64, io.ReadCloser, error) {
ret := _m.Called(repository, digest, blobSize, start, end)
var r0 int64
if rf, ok := ret.Get(0).(func(string, string, int64, int64, int64) int64); ok {
r0 = rf(repository, digest, blobSize, start, end)
} else {
r0 = ret.Get(0).(int64)
}
var r1 io.ReadCloser
if rf, ok := ret.Get(1).(func(string, string, int64, int64, int64) io.ReadCloser); ok {
r1 = rf(repository, digest, blobSize, start, end)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(io.ReadCloser)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(string, string, int64, int64, int64) error); ok {
r2 = rf(repository, digest, blobSize, start, end)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// PullManifest provides a mock function with given fields: repository, reference, acceptedMediaTypes
func (_m *Client) PullManifest(repository string, reference string, acceptedMediaTypes ...string) (distribution.Manifest, string, error) {
_va := make([]interface{}, len(acceptedMediaTypes))
@ -288,6 +318,34 @@ func (_m *Client) PushBlob(repository string, digest string, size int64, blob io
return r0
}
// PushBlobChunk provides a mock function with given fields: repository, digest, blobSize, chunk, start, end, location
func (_m *Client) PushBlobChunk(repository string, digest string, blobSize int64, chunk io.Reader, start int64, end int64, location string) (string, int64, error) {
ret := _m.Called(repository, digest, blobSize, chunk, start, end, location)
var r0 string
if rf, ok := ret.Get(0).(func(string, string, int64, io.Reader, int64, int64, string) string); ok {
r0 = rf(repository, digest, blobSize, chunk, start, end, location)
} else {
r0 = ret.Get(0).(string)
}
var r1 int64
if rf, ok := ret.Get(1).(func(string, string, int64, io.Reader, int64, int64, string) int64); ok {
r1 = rf(repository, digest, blobSize, chunk, start, end, location)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(string, string, int64, io.Reader, int64, int64, string) error); ok {
r2 = rf(repository, digest, blobSize, chunk, start, end, location)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// PushManifest provides a mock function with given fields: repository, reference, mediaType, payload
func (_m *Client) PushManifest(repository string, reference string, mediaType string, payload []byte) (string, error) {
ret := _m.Called(repository, reference, mediaType, payload)