diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 72ed7f4f9..a655653d2 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -6959,6 +6959,10 @@ definitions: format: int32 description: speed limit for each task x-isnullable: true # make this field optional to keep backward compatibility + copy_by_chunk: + type: boolean + description: Whether to enable copy by chunk. + x-isnullable: true ReplicationTrigger: type: object properties: @@ -7116,6 +7120,11 @@ definitions: description: The triggers that the registry supports items: type: string + supported_copy_by_chunk: + type: boolean + description: The registry whether support copy by chunk. + x-omitempty: true + x-isnullable: true RegistryProviderInfo: type: object description: The registry provider info contains the base info and capability declarations of the registry provider diff --git a/make/migrations/postgresql/0100_2.7.0_schema.up.sql b/make/migrations/postgresql/0100_2.7.0_schema.up.sql new file mode 100644 index 000000000..f70551519 --- /dev/null +++ b/make/migrations/postgresql/0100_2.7.0_schema.up.sql @@ -0,0 +1 @@ +ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS copy_by_chunk boolean; \ No newline at end of file diff --git a/src/controller/artifact/abstractor_test.go b/src/controller/artifact/abstractor_test.go index c0908f7d7..47b340f04 100644 --- a/src/controller/artifact/abstractor_test.go +++ b/src/controller/artifact/abstractor_test.go @@ -208,13 +208,13 @@ type abstractorTestSuite struct { suite.Suite argMgr *tart.Manager blobMgr *tblob.Manager - regCli *registry.FakeClient + regCli *registry.Client abstractor *abstractor processor *tpro.Processor } func (a *abstractorTestSuite) SetupTest() { - a.regCli = ®istry.FakeClient{} + a.regCli = ®istry.Client{} a.argMgr = &tart.Manager{} a.blobMgr = &tblob.Manager{} a.abstractor = &abstractor{ @@ -236,7 +236,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV1Manifest() { {Size: 10}, {Size: 20}, }, nil) - a.regCli.On("PullManifest").Return(manifest, "", nil) + a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil) artifact := &artifact.Artifact{ ID: 1, } @@ -252,7 +252,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV1Manifest() { func (a *abstractorTestSuite) TestAbstractMetadataOfV2Manifest() { manifest, _, err := distribution.UnmarshalManifest(schema2.MediaTypeManifest, []byte(v2Manifest)) a.Require().Nil(err) - a.regCli.On("PullManifest").Return(manifest, "", nil) + a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil) artifact := &artifact.Artifact{ ID: 1, } @@ -271,7 +271,7 @@ func (a *abstractorTestSuite) TestAbstractMetadataOfV2Manifest() { func (a *abstractorTestSuite) TestAbstractMetadataOfIndex() { manifest, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageIndex, []byte(index)) a.Require().Nil(err) - a.regCli.On("PullManifest").Return(manifest, "", nil) + a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil) a.argMgr.On("GetByDigest", mock.Anything, mock.Anything, mock.Anything).Return(&artifact.Artifact{ ID: 2, Size: 10, @@ -301,7 +301,7 @@ func (u *unknownManifest) Payload() (mediaType string, payload []byte, err error // unknown func (a *abstractorTestSuite) TestAbstractMetadataOfUnsupported() { - a.regCli.On("PullManifest").Return(&unknownManifest{}, "", nil) + a.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(&unknownManifest{}, "", nil) artifact := &artifact.Artifact{ ID: 1, } diff --git a/src/controller/artifact/annotation/v1alpha1_test.go b/src/controller/artifact/annotation/v1alpha1_test.go index a7b26859b..dc7c7a41f 100644 --- a/src/controller/artifact/annotation/v1alpha1_test.go +++ b/src/controller/artifact/annotation/v1alpha1_test.go @@ -26,6 +26,7 @@ import ( "github.com/goharbor/harbor/src/pkg/artifact" "github.com/goharbor/harbor/src/pkg/distribution" + "github.com/goharbor/harbor/src/testing/mock" reg "github.com/goharbor/harbor/src/testing/pkg/registry" ) @@ -172,12 +173,12 @@ var ( // v1alpha1TestSuite is a test suite of testing v1alpha1 parser type v1alpha1TestSuite struct { suite.Suite - regCli *reg.FakeClient + regCli *reg.Client v1alpha1Parser *v1alpha1Parser } func (p *v1alpha1TestSuite) SetupTest() { - p.regCli = ®.FakeClient{} + p.regCli = ®.Client{} p.v1alpha1Parser = &v1alpha1Parser{ regCli: p.regCli, } @@ -196,7 +197,7 @@ func (p *v1alpha1TestSuite) TestParse() { art := &artifact.Artifact{ManifestMediaType: manifestMediaType, ExtraAttrs: metadata} blob := ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(ormbIcon))) - p.regCli.On("PullBlob").Return(0, blob, nil) + p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil) err = p.v1alpha1Parser.Parse(nil, art, content) p.Require().Nil(err) p.Len(art.ExtraAttrs, 12) @@ -219,7 +220,7 @@ func (p *v1alpha1TestSuite) TestParse() { art = &artifact.Artifact{ManifestMediaType: manifestMediaType, ExtraAttrs: metadata} blob = ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(ormbIcon))) - p.regCli.On("PullBlob").Return(0, blob, nil) + p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil) err = p.v1alpha1Parser.Parse(nil, art, content) p.Require().Nil(err) p.Len(art.ExtraAttrs, 13) diff --git a/src/controller/artifact/controller_test.go b/src/controller/artifact/controller_test.go index 57e8d89a4..3ad16dea6 100644 --- a/src/controller/artifact/controller_test.go +++ b/src/controller/artifact/controller_test.go @@ -73,7 +73,7 @@ type controllerTestSuite struct { labelMgr *label.Manager abstractor *fakeAbstractor immutableMtr *immutable.FakeMatcher - regCli *registry.FakeClient + regCli *registry.Client accMgr *accessory.Manager } @@ -87,7 +87,7 @@ func (c *controllerTestSuite) SetupTest() { c.abstractor = &fakeAbstractor{} c.immutableMtr = &immutable.FakeMatcher{} c.accMgr = &accessorytesting.Manager{} - c.regCli = ®istry.FakeClient{} + c.regCli = ®istry.Client{} c.ctl = &controller{ repoMgr: c.repoMgr, artMgr: c.artMgr, @@ -562,7 +562,7 @@ func (c *controllerTestSuite) TestCopy() { }, nil) c.abstractor.On("AbstractMetadata").Return(nil) c.artMgr.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil) - c.regCli.On("Copy").Return(nil) + c.regCli.On("Copy", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) c.tagCtl.On("Ensure").Return(nil) c.accMgr.On("Ensure", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) _, err := c.ctl.Copy(orm.NewContext(nil, &ormtesting.FakeOrmer{}), "library/hello-world", "latest", "library/hello-world2") diff --git a/src/controller/artifact/processor/base/manifest_test.go b/src/controller/artifact/processor/base/manifest_test.go index a07ac52f5..b98660fd6 100644 --- a/src/controller/artifact/processor/base/manifest_test.go +++ b/src/controller/artifact/processor/base/manifest_test.go @@ -19,10 +19,11 @@ import ( "strings" "testing" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/stretchr/testify/suite" "github.com/goharbor/harbor/src/pkg/artifact" + "github.com/goharbor/harbor/src/testing/mock" "github.com/goharbor/harbor/src/testing/pkg/registry" ) @@ -125,11 +126,11 @@ const ( type manifestTestSuite struct { suite.Suite processor *ManifestProcessor - regCli *registry.FakeClient + regCli *registry.Client } func (m *manifestTestSuite) SetupTest() { - m.regCli = ®istry.FakeClient{} + m.regCli = ®istry.Client{} m.processor = &ManifestProcessor{ RegCli: m.regCli, } @@ -139,7 +140,7 @@ func (m *manifestTestSuite) TestAbstractMetadata() { // abstract all properties art := &artifact.Artifact{} - m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil) m.processor.AbstractMetadata(nil, art, []byte(manifest)) m.Len(art.ExtraAttrs, 9) @@ -149,14 +150,14 @@ func (m *manifestTestSuite) TestAbstractMetadata() { // abstract the specified properties m.processor.properties = []string{"os"} art = &artifact.Artifact{} - m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil) m.processor.AbstractMetadata(nil, art, []byte(manifest)) m.Require().Len(art.ExtraAttrs, 1) m.Equal("linux", art.ExtraAttrs["os"]) } func (m *manifestTestSuite) TestUnmarshalConfig() { - m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil) config := &v1.Image{} err := m.processor.UnmarshalConfig(nil, "library/hello-world", []byte(manifest), config) m.Require().Nil(err) diff --git a/src/controller/artifact/processor/chart/chart_test.go b/src/controller/artifact/processor/chart/chart_test.go index ee3331993..63111d685 100644 --- a/src/controller/artifact/processor/chart/chart_test.go +++ b/src/controller/artifact/processor/chart/chart_test.go @@ -28,6 +28,7 @@ import ( "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/pkg/artifact" chartserver "github.com/goharbor/harbor/src/pkg/chart" + "github.com/goharbor/harbor/src/testing/mock" "github.com/goharbor/harbor/src/testing/pkg/chart" "github.com/goharbor/harbor/src/testing/pkg/registry" ) @@ -62,12 +63,12 @@ var ( type processorTestSuite struct { suite.Suite processor *processor - regCli *registry.FakeClient + regCli *registry.Client chartOptr *chart.FakeOpertaor } func (p *processorTestSuite) SetupTest() { - p.regCli = ®istry.FakeClient{} + p.regCli = ®istry.Client{} p.chartOptr = &chart.FakeOpertaor{} p.processor = &processor{ chartOperator: p.chartOptr, @@ -103,8 +104,8 @@ func (p *processorTestSuite) TestAbstractAddition() { artifact := &artifact.Artifact{} manifest, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageManifest, []byte(chartManifest)) p.Require().Nil(err) - p.regCli.On("PullManifest").Return(manifest, "", nil) - p.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(chartYaml)), nil) + p.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil) + p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(chartYaml)), nil) p.chartOptr.On("GetDetails").Return(chartDetails, nil) // values.yaml diff --git a/src/controller/artifact/processor/cnab/cnab_test.go b/src/controller/artifact/processor/cnab/cnab_test.go index 26070d798..20b7b2ee2 100644 --- a/src/controller/artifact/processor/cnab/cnab_test.go +++ b/src/controller/artifact/processor/cnab/cnab_test.go @@ -25,17 +25,18 @@ import ( "github.com/goharbor/harbor/src/controller/artifact/processor/base" "github.com/goharbor/harbor/src/pkg/artifact" + "github.com/goharbor/harbor/src/testing/mock" "github.com/goharbor/harbor/src/testing/pkg/registry" ) type processorTestSuite struct { suite.Suite processor *processor - regCli *registry.FakeClient + regCli *registry.Client } func (p *processorTestSuite) SetupTest() { - p.regCli = ®istry.FakeClient{} + p.regCli = ®istry.Client{} p.processor = &processor{ manifestProcessor: &base.ManifestProcessor{ RegCli: p.regCli, @@ -93,8 +94,8 @@ func (p *processorTestSuite) TestAbstractMetadata() { } mani, _, err := distribution.UnmarshalManifest(v1.MediaTypeImageManifest, []byte(manifest)) p.Require().Nil(err) - p.regCli.On("PullManifest").Return(mani, "", nil) - p.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil) + p.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(mani, "", nil) + p.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil) err = p.processor.AbstractMetadata(nil, art, nil) p.Require().Nil(err) p.Len(art.ExtraAttrs, 7) diff --git a/src/controller/artifact/processor/default_test.go b/src/controller/artifact/processor/default_test.go index ceea13b21..64f1668c2 100644 --- a/src/controller/artifact/processor/default_test.go +++ b/src/controller/artifact/processor/default_test.go @@ -122,11 +122,11 @@ type defaultProcessorTestSuite struct { suite.Suite processor *defaultProcessor parser *parser.Parser - regCli *registry.FakeClient + regCli *registry.Client } func (d *defaultProcessorTestSuite) SetupTest() { - d.regCli = ®istry.FakeClient{} + d.regCli = ®istry.Client{} d.processor = &defaultProcessor{ regCli: d.regCli, } @@ -179,7 +179,7 @@ func (d *defaultProcessorTestSuite) TestAbstractMetadata() { configBlob := ioutil.NopCloser(strings.NewReader(ormbConfig)) art := &artifact.Artifact{ManifestMediaType: manifestMediaType} - d.regCli.On("PullBlob").Return(0, configBlob, nil) + d.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), configBlob, nil) d.parser.On("Parse", context.TODO(), mock.AnythingOfType("*artifact.Artifact"), mock.AnythingOfType("[]byte")).Return(nil) err = d.processor.AbstractMetadata(nil, art, content) d.Require().Nil(err) diff --git a/src/controller/artifact/processor/image/manifest_v2_test.go b/src/controller/artifact/processor/image/manifest_v2_test.go index 1054b0499..354dfcc93 100644 --- a/src/controller/artifact/processor/image/manifest_v2_test.go +++ b/src/controller/artifact/processor/image/manifest_v2_test.go @@ -131,18 +131,18 @@ var ( type manifestV2ProcessorTestSuite struct { suite.Suite processor *manifestV2Processor - regCli *registry.FakeClient + regCli *registry.Client } func (m *manifestV2ProcessorTestSuite) SetupTest() { - m.regCli = ®istry.FakeClient{} + m.regCli = ®istry.Client{} m.processor = &manifestV2Processor{} m.processor.ManifestProcessor = &base.ManifestProcessor{RegCli: m.regCli} } func (m *manifestV2ProcessorTestSuite) TestAbstractMetadata() { artifact := &artifact.Artifact{} - m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, ioutil.NopCloser(bytes.NewReader([]byte(config))), nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(bytes.NewReader([]byte(config))), nil) err := m.processor.AbstractMetadata(nil, artifact, []byte(manifest)) m.Require().Nil(err) m.NotNil(artifact.ExtraAttrs["created"]) @@ -162,8 +162,8 @@ func (m *manifestV2ProcessorTestSuite) TestAbstractAddition() { artifact := &artifact.Artifact{} manifest, _, err := distribution.UnmarshalManifest(schema2.MediaTypeManifest, []byte(manifest)) m.Require().Nil(err) - m.regCli.On("PullManifest").Return(manifest, "", nil) - m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(config)), nil) + m.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(manifest, "", nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(config)), nil) addition, err := m.processor.AbstractAddition(nil, artifact, AdditionTypeBuildHistory) m.Require().Nil(err) m.Equal("application/json; charset=utf-8", addition.ContentType) diff --git a/src/controller/artifact/processor/wasm/wasm_test.go b/src/controller/artifact/processor/wasm/wasm_test.go index afee20f6e..89cbfe4f0 100644 --- a/src/controller/artifact/processor/wasm/wasm_test.go +++ b/src/controller/artifact/processor/wasm/wasm_test.go @@ -113,18 +113,18 @@ var ( type WASMProcessorTestSuite struct { suite.Suite processor *Processor - regCli *registry.FakeClient + regCli *registry.Client } func (m *WASMProcessorTestSuite) SetupTest() { - m.regCli = ®istry.FakeClient{} + m.regCli = ®istry.Client{} m.processor = &Processor{} m.processor.ManifestProcessor = &base.ManifestProcessor{RegCli: m.regCli} } func (m *WASMProcessorTestSuite) TestAbstractMetadataForAnnotationFashion() { artifact := &artifact.Artifact{} - m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, ioutil.NopCloser(bytes.NewReader([]byte(annnotated_config))), nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(bytes.NewReader([]byte(annnotated_config))), nil) err := m.processor.AbstractMetadata(nil, artifact, []byte(annnotated_manifest)) m.Require().Nil(err) m.NotNil(artifact.ExtraAttrs["created"]) @@ -157,8 +157,8 @@ func (m *WASMProcessorTestSuite) TestAbstractAdditionForAnnotationFashion() { err = json.Unmarshal([]byte(annnotated_manifest), &manifest) deserializedManifest, err := schema2.FromStruct(manifest) m.Require().Nil(err) - m.regCli.On("PullManifest").Return(deserializedManifest, "", nil) - m.regCli.On("PullBlob").Return(0, ioutil.NopCloser(strings.NewReader(annnotated_config)), nil) + m.regCli.On("PullManifest", mock.Anything, mock.Anything).Return(deserializedManifest, "", nil) + m.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), ioutil.NopCloser(strings.NewReader(annnotated_config)), nil) addition, err := m.processor.AbstractAddition(nil, artifact, AdditionTypeBuildHistory) m.Require().Nil(err) m.Equal("application/json; charset=utf-8", addition.ContentType) diff --git a/src/controller/icon/controller_test.go b/src/controller/icon/controller_test.go index 20693db6e..a0f252aa6 100644 --- a/src/controller/icon/controller_test.go +++ b/src/controller/icon/controller_test.go @@ -38,12 +38,12 @@ type controllerTestSuite struct { suite.Suite controller Controller argMgr *artifact_testing.Manager - regCli *registry.FakeClient + regCli *registry.Client } func (c *controllerTestSuite) SetupTest() { c.argMgr = &artifact_testing.Manager{} - c.regCli = ®istry.FakeClient{} + c.regCli = ®istry.Client{} c.controller = &controller{ artMgr: c.argMgr, regCli: c.regCli, @@ -68,7 +68,7 @@ func (c *controllerTestSuite) TestGet() { }, }, nil) blob := ioutil.NopCloser(base64.NewDecoder(base64.StdEncoding, strings.NewReader(iconStr))) - c.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(0, blob, nil) + c.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(0), blob, nil) icon, err := c.controller.Get(nil, "sha256:364feec11702f7ee079ba81da723438373afb0921f3646e9e5015406ee150986") c.Require().Nil(err) c.Require().NotNil(icon) diff --git a/src/controller/proxy/local_test.go b/src/controller/proxy/local_test.go index 12003a383..0bffe69b0 100644 --- a/src/controller/proxy/local_test.go +++ b/src/controller/proxy/local_test.go @@ -65,13 +65,13 @@ func (a *artifactControllerMock) GetByReference(ctx context.Context, repository, type localHelperTestSuite struct { suite.Suite - registryClient *testregistry.FakeClient + registryClient *testregistry.Client local *localHelper artCtl *artifactControllerMock } func (lh *localHelperTestSuite) SetupTest() { - lh.registryClient = &testregistry.FakeClient{} + lh.registryClient = &testregistry.Client{} lh.artCtl = &artifactControllerMock{} lh.local = &localHelper{registry: lh.registryClient, artifactCtl: lh.artCtl} @@ -82,7 +82,7 @@ func (lh *localHelperTestSuite) TestBlobExist_False() { dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" art := lib.ArtifactInfo{Repository: repo, Digest: dig} ctx := context.Background() - lh.registryClient.On("BlobExist").Return(false, nil) + lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil) exist, err := lh.local.BlobExist(ctx, art) lh.Require().Nil(err) lh.Assert().Equal(false, exist) @@ -92,7 +92,7 @@ func (lh *localHelperTestSuite) TestBlobExist_True() { dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" art := lib.ArtifactInfo{Repository: repo, Digest: dig} ctx := context.Background() - lh.registryClient.On("BlobExist").Return(true, nil) + lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil) exist, err := lh.local.BlobExist(ctx, art) lh.Require().Nil(err) lh.Assert().Equal(true, exist) @@ -100,7 +100,7 @@ func (lh *localHelperTestSuite) TestBlobExist_True() { func (lh *localHelperTestSuite) TestPushManifest() { dig := "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f" - lh.registryClient.On("PushManifest").Return(dig, nil) + lh.registryClient.On("PushManifest", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(dig, nil) manifest := &mockManifest{} var ct string manifest.Mock.On("Payload").Return(ct, []byte("example"), nil) @@ -117,7 +117,7 @@ func (lh *localHelperTestSuite) TestCheckDependencies_Fail() { {Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"}, } manifest.On("References").Return(refs) - lh.registryClient.On("BlobExist").Return(false, nil) + lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(false, nil) ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest) lh.Assert().Equal(len(ret), 2) } @@ -130,7 +130,7 @@ func (lh *localHelperTestSuite) TestCheckDependencies_Suc() { {Digest: "sha256:92c7f9c92844bbbb5d0a101b22f7c2a7949e40f8ea90c8b3bc396879d95e899a"}, } manifest.On("References").Return(refs) - lh.registryClient.On("BlobExist").Return(true, nil) + lh.registryClient.On("BlobExist", mock.Anything, mock.Anything).Return(true, nil) ret := lh.local.CheckDependencies(ctx, "library/hello-world", manifest) lh.Assert().Equal(len(ret), 0) } diff --git a/src/controller/registry/controller.go b/src/controller/registry/controller.go index 5cda26933..b6da77209 100644 --- a/src/controller/registry/controller.go +++ b/src/controller/registry/controller.go @@ -283,6 +283,7 @@ func process(info *model.RegistryInfo) *model.RegistryInfo { Values: values, }) in.SupportedResourceFilters = filters + in.SupportedCopyByChunk = info.SupportedCopyByChunk return in } diff --git a/src/controller/replication/flow/copy.go b/src/controller/replication/flow/copy.go index f36f4eeb0..7f23d8fae 100644 --- a/src/controller/replication/flow/copy.go +++ b/src/controller/replication/flow/copy.go @@ -92,7 +92,7 @@ func (c *copyFlow) Run(ctx context.Context) error { return err } - return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed) + return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed, c.policy.CopyByChunk) } func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) { @@ -103,7 +103,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) { return execution.Status == job.StoppedStatus.String(), nil } -func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error { +func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32, copyByChunk bool) error { var taskCnt int defer func() { // if no task be created, mark execution done. @@ -137,9 +137,10 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [ JobKind: job.KindGeneric, }, Parameters: map[string]interface{}{ - "src_resource": string(src), - "dst_resource": string(dest), - "speed": speed, + "src_resource": string(src), + "dst_resource": string(dest), + "speed": speed, + "copy_by_chunk": copyByChunk, }, } diff --git a/src/controller/replication/flow/mock_adapter_test.go b/src/controller/replication/flow/mock_adapter_test.go index 2336a8b09..c522c6eb0 100644 --- a/src/controller/replication/flow/mock_adapter_test.go +++ b/src/controller/replication/flow/mock_adapter_test.go @@ -272,6 +272,36 @@ func (_m *mockAdapter) PullBlob(repository string, digest string) (int64, io.Rea return r0, r1, r2 } +// PullBlobChunk provides a mock function with given fields: repository, digest, blobSize, start, end +func (_m *mockAdapter) PullBlobChunk(repository string, digest string, blobSize int64, start int64, end int64) (int64, io.ReadCloser, error) { + ret := _m.Called(repository, digest, blobSize, start, end) + + var r0 int64 + if rf, ok := ret.Get(0).(func(string, string, int64, int64, int64) int64); ok { + r0 = rf(repository, digest, blobSize, start, end) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 io.ReadCloser + if rf, ok := ret.Get(1).(func(string, string, int64, int64, int64) io.ReadCloser); ok { + r1 = rf(repository, digest, blobSize, start, end) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(io.ReadCloser) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string, int64, int64, int64) error); ok { + r2 = rf(repository, digest, blobSize, start, end) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // PullManifest provides a mock function with given fields: repository, reference, accepttedMediaTypes func (_m *mockAdapter) PullManifest(repository string, reference string, accepttedMediaTypes ...string) (distribution.Manifest, string, error) { _va := make([]interface{}, len(accepttedMediaTypes)) @@ -323,6 +353,34 @@ func (_m *mockAdapter) PushBlob(repository string, digest string, size int64, bl return r0 } +// PushBlobChunk provides a mock function with given fields: repository, digest, size, chunk, start, end, location +func (_m *mockAdapter) PushBlobChunk(repository string, digest string, size int64, chunk io.Reader, start int64, end int64, location string) (string, int64, error) { + ret := _m.Called(repository, digest, size, chunk, start, end, location) + + var r0 string + if rf, ok := ret.Get(0).(func(string, string, int64, io.Reader, int64, int64, string) string); ok { + r0 = rf(repository, digest, size, chunk, start, end, location) + } else { + r0 = ret.Get(0).(string) + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(string, string, int64, io.Reader, int64, int64, string) int64); ok { + r1 = rf(repository, digest, size, chunk, start, end, location) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string, int64, io.Reader, int64, int64, string) error); ok { + r2 = rf(repository, digest, size, chunk, start, end, location) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // PushManifest provides a mock function with given fields: repository, reference, mediaType, payload func (_m *mockAdapter) PushManifest(repository string, reference string, mediaType string, payload []byte) (string, error) { ret := _m.Called(repository, reference, mediaType, payload) diff --git a/src/controller/replication/model/model.go b/src/controller/replication/model/model.go index 6c28b588c..6821640a4 100644 --- a/src/controller/replication/model/model.go +++ b/src/controller/replication/model/model.go @@ -46,6 +46,7 @@ type Policy struct { CreationTime time.Time `json:"creation_time"` UpdateTime time.Time `json:"update_time"` Speed int32 `json:"speed"` + CopyByChunk bool `json:"copy_by_chunk"` } // IsScheduledTrigger returns true when the policy is scheduled trigger and enabled @@ -132,6 +133,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error { p.CreationTime = policy.CreationTime p.UpdateTime = policy.UpdateTime p.Speed = policy.Speed + p.CopyByChunk = policy.CopyByChunk if policy.SrcRegistryID > 0 { p.SrcRegistry = &model.Registry{ @@ -176,6 +178,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) { CreationTime: p.CreationTime, UpdateTime: p.UpdateTime, Speed: p.Speed, + CopyByChunk: p.CopyByChunk, } if p.SrcRegistry != nil { policy.SrcRegistryID = p.SrcRegistry.ID diff --git a/src/controller/replication/transfer/chart/transfer.go b/src/controller/replication/transfer/chart/transfer.go index 46d8e742b..8013b026b 100644 --- a/src/controller/replication/transfer/chart/transfer.go +++ b/src/controller/replication/transfer/chart/transfer.go @@ -49,7 +49,7 @@ type transfer struct { dst adapter.ChartRegistry } -func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { +func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, opts *trans.Options) error { // initialize if err := t.initialize(src, dst); err != nil { return err @@ -78,7 +78,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int3 version: dst.Metadata.Artifacts[0].Tags[0], } // copy the chart from source registry to the destination - return t.copy(srcChart, dstChart, dst.Override, speed) + return t.copy(srcChart, dstChart, dst.Override, opts) } func (t *transfer) initialize(src, dst *model.Resource) error { @@ -129,7 +129,7 @@ func (t *transfer) shouldStop() bool { return isStopped } -func (t *transfer) copy(src, dst *chart, override bool, speed int32) error { +func (t *transfer) copy(src, dst *chart, override bool, opts *trans.Options) error { if t.shouldStop() { return nil } @@ -160,9 +160,9 @@ func (t *transfer) copy(src, dst *chart, override bool, speed int32) error { t.logger.Errorf("failed to download the chart %s:%s: %v", src.name, src.version, err) return err } - if speed > 0 { - t.logger.Infof("limit network speed at %d kb/s", speed) - chart = trans.NewReader(chart, speed) + if opts.Speed > 0 { + t.logger.Infof("limit network speed at %d kb/s", opts.Speed) + chart = trans.NewReader(chart, opts.Speed) } defer chart.Close() diff --git a/src/controller/replication/transfer/chart/transfer_test.go b/src/controller/replication/transfer/chart/transfer_test.go index 814ac97c2..7d4849f12 100644 --- a/src/controller/replication/transfer/chart/transfer_test.go +++ b/src/controller/replication/transfer/chart/transfer_test.go @@ -97,7 +97,7 @@ func TestCopy(t *testing.T) { name: "dest/harbor", version: "0.2.0", } - err := transfer.copy(src, dst, true, 0) + err := transfer.copy(src, dst, true, trans.NewOptions()) assert.Nil(t, err) } diff --git a/src/controller/replication/transfer/image/transfer.go b/src/controller/replication/transfer/image/transfer.go index 120043705..79f30e9a6 100644 --- a/src/controller/replication/transfer/image/transfer.go +++ b/src/controller/replication/transfer/image/transfer.go @@ -36,15 +36,29 @@ import ( ) var ( - retry int - errStopped = errors.New("stopped") + blobRetryCnt, chunkRetryCnt int + replicationChunkSize int64 + errStopped = errors.New("stopped") + // default chunk size is 10MB + defaultChunkSize = 10 * 1024 * 1024 ) func init() { - retry, _ = strconv.Atoi(os.Getenv("COPY_BLOB_RETRY_COUNT")) - if retry <= 0 { - retry = 5 + blobRetryCnt, _ = strconv.Atoi(os.Getenv("COPY_BLOB_RETRY_COUNT")) + if blobRetryCnt <= 0 { + blobRetryCnt = 5 } + + chunkRetryCnt, _ = strconv.Atoi(os.Getenv("COPY_CHUNK_RETRY_COUNT")) + if chunkRetryCnt <= 0 { + chunkRetryCnt = 5 + } + + replicationChunkSize, _ = strconv.ParseInt(os.Getenv("REPLICATION_CHUNK_SIZE"), 10, 64) + if replicationChunkSize <= 0 { + replicationChunkSize = int64(defaultChunkSize) + } + if err := trans.RegisterFactory(model.ResourceTypeImage, factory); err != nil { log.Errorf("failed to register transfer factory: %v", err) } @@ -70,10 +84,9 @@ type transfer struct { isStopped trans.StopFunc src adapter.ArtifactRegistry dst adapter.ArtifactRegistry - speed int32 } -func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { +func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, opts *trans.Options) error { // initialize if err := t.initialize(src, dst); err != nil { return err @@ -90,7 +103,7 @@ func (t *transfer) Transfer(src *model.Resource, dst *model.Resource, speed int3 } // copy the repository from source registry to the destination - return t.copy(t.convert(src), t.convert(dst), dst.Override, speed) + return t.copy(t.convert(src), t.convert(dst), dst.Override, opts) } func (t *transfer) convert(resource *model.Resource) *repository { @@ -163,18 +176,18 @@ func (t *transfer) shouldStop() bool { return isStopped } -func (t *transfer) copy(src *repository, dst *repository, override bool, speed int32) error { +func (t *transfer) copy(src *repository, dst *repository, override bool, opts *trans.Options) error { srcRepo := src.repository dstRepo := dst.repository t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...", srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ",")) - if speed > 0 { - t.logger.Infof("limit network speed at %d kb/s", speed) + if opts.Speed > 0 { + t.logger.Infof("limit network speed at %d kb/s", opts.Speed) } var err error for i := range src.tags { - if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, speed); e != nil { + if e := t.copyArtifact(srcRepo, src.tags[i], dstRepo, dst.tags[i], override, opts); e != nil { if e == errStopped { return nil } @@ -193,7 +206,7 @@ func (t *transfer) copy(src *repository, dst *repository, override bool, speed i return nil } -func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, speed int32) error { +func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, override bool, opts *trans.Options) error { t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...", srcRepo, srcRef, dstRepo, dstRef) // pull the manifest from the source registry @@ -227,7 +240,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid // copy contents between the source and destination registries for _, content := range manifest.References() { - if err = t.copyContent(content, srcRepo, dstRepo, speed); err != nil { + if err = t.copyContent(content, srcRepo, dstRepo, opts); err != nil { return err } } @@ -243,7 +256,7 @@ func (t *transfer) copyArtifact(srcRepo, srcRef, dstRepo, dstRef string, overrid } // copy the content from source registry to destination according to its media type -func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, speed int32) error { +func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo string, opts *trans.Options) error { digest := content.Digest.String() switch content.MediaType { // when the media type of pulled manifest is index, @@ -252,7 +265,7 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo v1.MediaTypeImageManifest, schema2.MediaTypeManifest, schema1.MediaTypeSignedManifest, schema1.MediaTypeManifest: // as using digest as the reference, so set the override to true directly - return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, speed) + return t.copyArtifact(srcRepo, digest, dstRepo, digest, true, opts) // handle foreign layer case schema2.MediaTypeForeignLayer: t.logger.Infof("the layer %s is a foreign layer, skip", digest) @@ -261,19 +274,24 @@ func (t *transfer) copyContent(content distribution.Descriptor, srcRepo, dstRepo // the media type of the layer or config can be "application/octet-stream", // schema1.MediaTypeManifestLayer, schema2.MediaTypeLayer, schema2.MediaTypeImageConfig default: - return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, speed) + if opts.CopyByChunk { + // copy by chunk + return t.copyChunkWithRetry(srcRepo, dstRepo, digest, content.Size, opts.Speed) + } + // copy by blob + return t.copyBlobWithRetry(srcRepo, dstRepo, digest, content.Size, opts.Speed) } } func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { var err error - for i, backoff := 1, 2*time.Second; i <= retry; i, backoff = i+1, backoff*2 { + for i, backoff := 1, 2*time.Second; i <= blobRetryCnt; i, backoff = i+1, backoff*2 { t.logger.Infof("copying the blob %s(the %dth running)...", digest, i) if err = t.copyBlob(srcRepo, dstRepo, digest, sizeFromDescriptor, speed); err == nil { t.logger.Infof("copy the blob %s completed", digest) return nil } - if i == retry || err == errStopped { + if i == blobRetryCnt || err == errStopped { break } t.logger.Infof("will retry %v later", backoff) @@ -282,36 +300,80 @@ func (t *transfer) copyBlobWithRetry(srcRepo, dstRepo, digest string, sizeFromDe return err } -// copy the layer or artifact config from the source registry to destination -// the size parameter is taken from manifests. -func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { +func (t *transfer) copyChunkWithRetry(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { + var ( + err error + location string + + start int64 = -1 + end int64 = -1 + ) + + for i, backoff := 1, 2*time.Second; i <= chunkRetryCnt; i, backoff = i+1, backoff*2 { + t.logger.Infof("copying the blob %s by chunk(chunkSize: %d)(the %dth running)...", digest, replicationChunkSize, i) + if err = t.copyBlobByChunk(srcRepo, dstRepo, digest, sizeFromDescriptor, &start, &end, &location, speed); err == nil { + t.logger.Infof("copy the blob %s by chunk completed", digest) + return nil + } + if i == chunkRetryCnt || err == errStopped { + break + } + t.logger.Infof("will retry %v later", backoff) + time.Sleep(backoff) + } + + return err +} + +// tryMountBlob try to check existence and mount, return true if mounted. +func (t *transfer) tryMountBlob(srcRepo, dstRepo, digest string) (bool, error) { if t.shouldStop() { - return errStopped + return false, errStopped } exist, err := t.dst.BlobExist(dstRepo, digest) if err != nil { t.logger.Errorf("failed to check the existence of blob %s on the destination registry: %v", digest, err) - return err + return false, err } if exist { t.logger.Infof("the blob %s already exists on the destination registry, skip", digest) - return nil + // we think the blob is mounted if it is existed. + return true, nil } mount, repository, err := t.dst.CanBeMount(digest) if err != nil { t.logger.Errorf("failed to check whether the blob %s can be mounted on the destination registry: %v", digest, err) - return err + return false, err } if mount { if err = t.dst.MountBlob(repository, digest, dstRepo); err != nil { t.logger.Errorf("failed to mount the blob %s on the destination registry: %v", digest, err) - return err + return false, err } t.logger.Infof("the blob %s mounted from the repository %s on the destination registry directly", digest, repository) + return true, nil + } + + return false, nil +} + +// copy the layer or artifact config from the source registry to destination +// the size parameter is taken from manifests. +func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { + mounted, err := t.tryMountBlob(srcRepo, dstRepo, digest) + if err != nil { + return err + } + // return earlier if it is mounted + if mounted { return nil } + return t.copyBlobByMonolithic(srcRepo, dstRepo, digest, sizeFromDescriptor, speed) +} + +func (t *transfer) copyBlobByMonolithic(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, speed int32) error { size, data, err := t.src.PullBlob(srcRepo, digest) if err != nil { t.logger.Errorf("failed to pulling the blob %s: %v", digest, err) @@ -333,6 +395,67 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor t.logger.Errorf("failed to pushing the blob %s, size %d: %v", digest, size, err) return err } + + return nil +} + +// copyBlobByChunk copy blob by chunk with specified start and end range. +// The refers to the byte range of the chunk, and MUST be inclusive on both ends. The first chunk's range MUST begin with 0. +func (t *transfer) copyBlobByChunk(srcRepo, dstRepo, digest string, sizeFromDescriptor int64, start, end *int64, location *string, speed int32) error { + // fallback to copy by monolithic if the blob size is equal or less than chunk size. + if sizeFromDescriptor <= replicationChunkSize { + return t.copyBlobByMonolithic(srcRepo, dstRepo, digest, sizeFromDescriptor, speed) + } + + mounted, err := t.tryMountBlob(srcRepo, dstRepo, digest) + if err != nil { + return err + } + // return earlier if it is mounted. + if mounted { + return nil + } + + // end range should equal (blobSize - 1) + endRange := sizeFromDescriptor - 1 + for { + // update the start and end for upload + *start = *end + 1 + // since both ends are closed intervals, it is necessary to subtract one byte + *end = *start + replicationChunkSize - 1 + if *end >= endRange { + *end = endRange + } + + t.logger.Infof("copying the blob chunk: %d-%d/%d", *start, *end, sizeFromDescriptor) + _, data, err := t.src.PullBlobChunk(srcRepo, digest, sizeFromDescriptor, *start, *end) + if err != nil { + t.logger.Errorf("failed to pulling the blob chunk: %d-%d/%d, error: %v", *start, *end, sizeFromDescriptor, err) + return err + } + + if speed > 0 { + data = trans.NewReader(data, speed) + } + // failureEnd will only be used for adjusting content range when issue happened during push the chunk. + var failureEnd int64 + *location, failureEnd, err = t.dst.PushBlobChunk(dstRepo, digest, sizeFromDescriptor, data, *start, *end, *location) + if err != nil { + t.logger.Errorf("failed to pushing the blob chunk: %d-%d/%d, error: %v", *start, *end, sizeFromDescriptor, err) + data.Close() + *end = failureEnd + return err + } + + data.Close() + + t.logger.Infof("copy the blob chunk: %d-%d/%d completed", *start, *end, sizeFromDescriptor) + // if the end equals (blobSize-1), that means it is last chunk, return if this is the last chunk + if *end == endRange { + break + } + } + return nil } diff --git a/src/controller/replication/transfer/image/transfer_test.go b/src/controller/replication/transfer/image/transfer_test.go index 70214bab4..5f876f3cf 100644 --- a/src/controller/replication/transfer/image/transfer_test.go +++ b/src/controller/replication/transfer/image/transfer_test.go @@ -91,9 +91,16 @@ func (f *fakeRegistry) PullBlob(repository, digest string) (size int64, blob io. r := ioutil.NopCloser(bytes.NewReader([]byte{'a'})) return 1, r, nil } +func (f *fakeRegistry) PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error) { + r := ioutil.NopCloser(bytes.NewReader([]byte{'a'})) + return 1, r, nil +} func (f *fakeRegistry) PushBlob(repository, digest string, size int64, blob io.Reader) error { return nil } +func (f *fakeRegistry) PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error) { + return "", -1, nil +} func (f *fakeRegistry) DeleteTag(repository, tag string) error { return nil } @@ -103,7 +110,6 @@ func (f *fakeRegistry) CanBeMount(digest string) (bool, string, error) { func (f *fakeRegistry) MountBlob(srcRepository, digest, dstRepository string) error { return nil } - func (f *fakeRegistry) ListTags(repository string) (tags []string, err error) { return nil, nil } @@ -149,7 +155,28 @@ func TestCopy(t *testing.T) { repository: "destination", tags: []string{"b1", "b2"}, } - err := tr.copy(src, dst, true, 0) + err := tr.copy(src, dst, true, trans.NewOptions()) + require.Nil(t, err) +} + +func TestCopyByChunk(t *testing.T) { + stopFunc := func() bool { return false } + tr := &transfer{ + logger: log.DefaultLogger(), + isStopped: stopFunc, + src: &fakeRegistry{}, + dst: &fakeRegistry{}, + } + + src := &repository{ + repository: "source", + tags: []string{"a1", "a2"}, + } + dst := &repository{ + repository: "destination", + tags: []string{"b1", "b2"}, + } + err := tr.copy(src, dst, true, trans.NewOptions(trans.WithCopyByChunk(true))) require.Nil(t, err) } diff --git a/src/controller/replication/transfer/options.go b/src/controller/replication/transfer/options.go new file mode 100644 index 000000000..c7827aef7 --- /dev/null +++ b/src/controller/replication/transfer/options.go @@ -0,0 +1,45 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +type Option func(*Options) + +type Options struct { + // Speed is the data transfer speed for replication, no limit by default. + Speed int32 + // CopyByChunk defines whether need to copy the artifact blob by chunk, copy by whole blob by default. + CopyByChunk bool +} + +func NewOptions(opts ...Option) *Options { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + return o +} + +func WithSpeed(speed int32) Option { + return func(o *Options) { + o.Speed = speed + } +} + +func WithCopyByChunk(copyByChunk bool) Option { + return func(o *Options) { + o.CopyByChunk = copyByChunk + } +} diff --git a/src/controller/replication/transfer/options_test.go b/src/controller/replication/transfer/options_test.go new file mode 100644 index 000000000..0d1b7fb07 --- /dev/null +++ b/src/controller/replication/transfer/options_test.go @@ -0,0 +1,37 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewOptions(t *testing.T) { + // test default options + o := NewOptions() + assert.Equal(t, int32(0), o.Speed) + assert.Equal(t, false, o.CopyByChunk) + + // test with options + // with speed + withSpeed := WithSpeed(1024) + // with copy by chunk + withCopyByChunk := WithCopyByChunk(true) + o = NewOptions(withSpeed, withCopyByChunk) + assert.Equal(t, int32(1024), o.Speed) + assert.Equal(t, true, o.CopyByChunk) +} diff --git a/src/controller/replication/transfer/transfer.go b/src/controller/replication/transfer/transfer.go index a446d5e28..f27982162 100644 --- a/src/controller/replication/transfer/transfer.go +++ b/src/controller/replication/transfer/transfer.go @@ -34,7 +34,7 @@ type Factory func(Logger, StopFunc) (Transfer, error) // Transfer defines an interface used to transfer the source // resource to the destination type Transfer interface { - Transfer(src *model.Resource, dst *model.Resource, speed int32) error + Transfer(src *model.Resource, dst *model.Resource, opts *Options) error } // Logger defines an interface for logging diff --git a/src/jobservice/job/impl/replication/replication.go b/src/jobservice/job/impl/replication/replication.go index f155cd203..4a016617e 100644 --- a/src/jobservice/job/impl/replication/replication.go +++ b/src/jobservice/job/impl/replication/replication.go @@ -55,7 +55,7 @@ func (r *Replication) Validate(params job.Parameters) error { func (r *Replication) Run(ctx job.Context, params job.Parameters) error { logger := ctx.GetLogger() - src, dst, speed, err := parseParams(params) + src, dst, opts, err := parseParams(params) if err != nil { logger.Errorf("failed to parse parameters: %v", err) return err @@ -80,18 +80,20 @@ func (r *Replication) Run(ctx job.Context, params job.Parameters) error { return err } - return trans.Transfer(src, dst, speed) + return trans.Transfer(src, dst, opts) } -func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, int32, error) { +func parseParams(params map[string]interface{}) (*model.Resource, *model.Resource, *transfer.Options, error) { src := &model.Resource{} if err := parseParam(params, "src_resource", src); err != nil { - return nil, nil, 0, err + return nil, nil, nil, err } + dst := &model.Resource{} if err := parseParam(params, "dst_resource", dst); err != nil { - return nil, nil, 0, err + return nil, nil, nil, err } + var speed int32 value, exist := params["speed"] if !exist { @@ -106,12 +108,25 @@ func parseParams(params map[string]interface{}) (*model.Resource, *model.Resourc if s, ok := value.(float64); ok { speed = int32(s) } else { - return nil, nil, 0, fmt.Errorf("the value of speed isn't integer (%T)", value) + return nil, nil, nil, fmt.Errorf("the value of speed isn't integer (%T)", value) } } } } - return src, dst, speed, nil + + var copyByChunk bool + value, exist = params["copy_by_chunk"] + if exist { + if boolVal, ok := value.(bool); ok { + copyByChunk = boolVal + } + } + + opts := transfer.NewOptions( + transfer.WithSpeed(speed), + transfer.WithCopyByChunk(copyByChunk), + ) + return src, dst, opts, nil } func parseParam(params map[string]interface{}, name string, v interface{}) error { diff --git a/src/jobservice/job/impl/replication/replication_test.go b/src/jobservice/job/impl/replication/replication_test.go index 8428fa36d..be80389c0 100644 --- a/src/jobservice/job/impl/replication/replication_test.go +++ b/src/jobservice/job/impl/replication/replication_test.go @@ -51,14 +51,17 @@ func TestParseParam(t *testing.T) { func TestParseParams(t *testing.T) { params := map[string]interface{}{ - "src_resource": `{"type":"chart"}`, - "dst_resource": `{"type":"chart"}`, + "src_resource": `{"type":"chart"}`, + "dst_resource": `{"type":"chart"}`, + "speed": 1024, + "copy_by_chunk": true, } - res, dst, speed, err := parseParams(params) + res, dst, opts, err := parseParams(params) require.Nil(t, err) assert.Equal(t, "chart", string(res.Type)) assert.Equal(t, "chart", string(dst.Type)) - assert.Equal(t, int32(0), speed) + assert.Equal(t, int32(1024), opts.Speed) + assert.True(t, opts.CopyByChunk) } func TestMaxFails(t *testing.T) { @@ -84,7 +87,7 @@ var fakedTransferFactory = func(transfer.Logger, transfer.StopFunc) (transfer.Tr type fakedTransfer struct{} -func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, speed int32) error { +func (f *fakedTransfer) Transfer(src *model.Resource, dst *model.Resource, opts *transfer.Options) error { transferred = true return nil } diff --git a/src/pkg/reg/adapter/adapter.go b/src/pkg/reg/adapter/adapter.go index 4e55bb6fe..0798b52e3 100644 --- a/src/pkg/reg/adapter/adapter.go +++ b/src/pkg/reg/adapter/adapter.go @@ -60,6 +60,8 @@ type ArtifactRegistry interface { DeleteManifest(repository, reference string) error // the "reference" can be "tag" or "digest", the function needs to handle both BlobExist(repository, digest string) (exist bool, err error) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) + PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error) + PushBlobChunk(repository, digest string, size int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error) PushBlob(repository, digest string, size int64, blob io.Reader) error MountBlob(srcRepository, digest, dstRepository string) (err error) CanBeMount(digest string) (mount bool, repository string, err error) // check whether the blob can be mounted from the remote registry diff --git a/src/pkg/reg/adapter/harbor/base/adapter.go b/src/pkg/reg/adapter/harbor/base/adapter.go index 5f616f599..485cd1d73 100644 --- a/src/pkg/reg/adapter/harbor/base/adapter.go +++ b/src/pkg/reg/adapter/harbor/base/adapter.go @@ -116,6 +116,7 @@ func (a *Adapter) Info() (*model.RegistryInfo, error) { model.TriggerTypeScheduled, }, SupportedRepositoryPathComponentType: model.RepositoryPathComponentTypeAtLeastTwo, + SupportedCopyByChunk: true, } enabled, err := a.Client.ChartRegistryEnabled() diff --git a/src/pkg/reg/adapter/harbor/base/adapter_test.go b/src/pkg/reg/adapter/harbor/base/adapter_test.go index 465ff1b42..2767b158f 100644 --- a/src/pkg/reg/adapter/harbor/base/adapter_test.go +++ b/src/pkg/reg/adapter/harbor/base/adapter_test.go @@ -80,6 +80,7 @@ func TestInfo(t *testing.T) { assert.Equal(t, 2, len(info.SupportedTriggers)) assert.Equal(t, 1, len(info.SupportedResourceTypes)) assert.Equal(t, model.ResourceTypeImage, info.SupportedResourceTypes[0]) + assert.Equal(t, true, info.SupportedCopyByChunk) server.Close() } diff --git a/src/pkg/reg/model/registry.go b/src/pkg/reg/model/registry.go index 64c0758ca..6c8a09cc4 100644 --- a/src/pkg/reg/model/registry.go +++ b/src/pkg/reg/model/registry.go @@ -139,6 +139,7 @@ type RegistryInfo struct { SupportedResourceFilters []*FilterStyle `json:"supported_resource_filters"` SupportedTriggers []string `json:"supported_triggers"` SupportedRepositoryPathComponentType string `json:"supported_repository_path_component_type"` // how many path components are allowed in the repository name + SupportedCopyByChunk bool `json:"supported_copy_by_chunk,omitempty"` } // AdapterPattern provides base info and capability declarations of the registry diff --git a/src/pkg/registry/client.go b/src/pkg/registry/client.go index b2b825c87..f41d275b3 100644 --- a/src/pkg/registry/client.go +++ b/src/pkg/registry/client.go @@ -86,8 +86,12 @@ type Client interface { BlobExist(repository, digest string) (exist bool, err error) // PullBlob pulls the specified blob. The caller must close the returned "blob" PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) + // PullBlobChunk pulls the specified blob, but by chunked + PullBlobChunk(repository, digest string, blobSize, start, end int64) (size int64, blob io.ReadCloser, err error) // PushBlob pushes the specified blob PushBlob(repository, digest string, size int64, blob io.Reader) error + // PushBlobChunk pushes the specified blob, but by chunked + PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (nextUploadLocation string, endRange int64, err error) // MountBlob mounts the blob from the source repository MountBlob(srcRepository, digest, dstRepository string) (err error) // DeleteBlob deletes the specified blob @@ -371,6 +375,34 @@ func (c *client) PullBlob(repository, digest string) (int64, io.ReadCloser, erro return size, resp.Body, nil } +// PullBlobChunk pulls the specified blob, but by chunked, refer to https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pull for more details. +func (c *client) PullBlobChunk(repository, digest string, blobSize int64, start, end int64) (int64, io.ReadCloser, error) { + req, err := http.NewRequest(http.MethodGet, buildBlobURL(c.url, repository, digest), nil) + if err != nil { + return 0, nil, err + } + + req.Header.Add("Accept-Encoding", "identity") + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end)) + resp, err := c.do(req) + if err != nil { + return 0, nil, err + } + + var size int64 + n := resp.Header.Get("Content-Length") + // no content-length is acceptable, which can taken from manifests + if len(n) > 0 { + size, err = strconv.ParseInt(n, 10, 64) + if err != nil { + defer resp.Body.Close() + return 0, nil, err + } + } + + return size, resp.Body, nil +} + func (c *client) PushBlob(repository, digest string, size int64, blob io.Reader) error { location, _, err := c.initiateBlobUpload(repository) if err != nil { @@ -379,6 +411,90 @@ func (c *client) PushBlob(repository, digest string, size int64, blob io.Reader) return c.monolithicBlobUpload(location, digest, size, blob) } +// PushBlobChunk pushes the specified blob, but by chunked, refer to https://github.com/opencontainers/distribution-spec/blob/main/spec.md#push for more details. +func (c *client) PushBlobChunk(repository, digest string, blobSize int64, chunk io.Reader, start, end int64, location string) (string, int64, error) { + var err error + // first chunk need to initialize blob upload location + if start == 0 { + location, _, err = c.initiateBlobUpload(repository) + if err != nil { + return location, end, err + } + } + + // the range is from 0 to (blobSize-1), so (end == blobSize-1) means it is last chunk + lastChunk := end == blobSize-1 + url, err := buildChunkBlobUploadURL(c.url, location, digest, lastChunk) + if err != nil { + return location, end, err + } + + // use PUT instead of PATCH for last chunk which can reduce a final request + method := http.MethodPatch + if lastChunk { + method = http.MethodPut + } + req, err := http.NewRequest(method, url, chunk) + if err != nil { + return location, end, err + } + + req.Header.Set("Content-Length", fmt.Sprintf("%d", end-start+1)) + req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", start, end)) + resp, err := c.do(req) + if err != nil { + // if push chunk error, we should query the upload progress for new location and end range. + newLocation, newEnd, err1 := c.getUploadStatus(location) + if err1 == nil { + return newLocation, newEnd, err + } + // end should return start-1 to re-push this chunk + return location, start - 1, fmt.Errorf("failed to get upload status: %w", err1) + } + + defer resp.Body.Close() + // return the location for next chunk upload + return resp.Header.Get("Location"), end, nil +} + +func (c *client) getUploadStatus(location string) (string, int64, error) { + req, err := http.NewRequest(http.MethodGet, location, nil) + if err != nil { + return location, -1, err + } + + resp, err := c.do(req) + if err != nil { + return location, -1, err + } + + defer resp.Body.Close() + + _, end, err := parseContentRange(resp.Header.Get("Range")) + if err != nil { + return location, -1, err + } + + return resp.Header.Get("Location"), end, nil +} + +func parseContentRange(cr string) (int64, int64, error) { + ranges := strings.Split(cr, "-") + if len(ranges) != 2 { + return -1, -1, fmt.Errorf("invalid content range format, %s", cr) + } + start, err := strconv.ParseInt(ranges[0], 10, 64) + if err != nil { + return -1, -1, err + } + end, err := strconv.ParseInt(ranges[1], 10, 64) + if err != nil { + return -1, -1, err + } + + return start, end, nil +} + func (c *client) initiateBlobUpload(repository string) (string, string, error) { req, err := http.NewRequest(http.MethodPost, buildInitiateBlobUploadURL(c.url, repository), nil) if err != nil { @@ -585,6 +701,23 @@ func buildInitiateBlobUploadURL(endpoint, repository string) string { return fmt.Sprintf("%s/v2/%s/blobs/uploads/", endpoint, repository) } +func buildChunkBlobUploadURL(endpoint, location, digest string, lastChunk bool) (string, error) { + url, err := url.Parse(location) + if err != nil { + return "", err + } + q := url.Query() + if lastChunk { + q.Set("digest", digest) + } + url.RawQuery = q.Encode() + if url.IsAbs() { + return url.String(), nil + } + // the "relativeurls" is enabled in registry + return endpoint + url.String(), nil +} + func buildMonolithicBlobUploadURL(endpoint, location, digest string) (string, error) { url, err := url.Parse(location) if err != nil { diff --git a/src/pkg/replication/model/model.go b/src/pkg/replication/model/model.go index 942861e1d..492371edd 100644 --- a/src/pkg/replication/model/model.go +++ b/src/pkg/replication/model/model.go @@ -42,6 +42,7 @@ type Policy struct { CreationTime time.Time `orm:"column(creation_time);auto_now_add" sort:"default:desc"` UpdateTime time.Time `orm:"column(update_time);auto_now"` Speed int32 `orm:"column(speed_kb)"` + CopyByChunk bool `orm:"column(copy_by_chunk)"` } // TableName set table name for ORM diff --git a/src/pkg/systemartifact/manager_test.go b/src/pkg/systemartifact/manager_test.go index 1356bad59..d81708787 100644 --- a/src/pkg/systemartifact/manager_test.go +++ b/src/pkg/systemartifact/manager_test.go @@ -24,7 +24,7 @@ import ( type ManagerTestSuite struct { suite.Suite - regCli *registrytesting.FakeClient + regCli *registrytesting.Client dao *sysartifactdaotesting.DAO mgr *systemArtifactManager cleanupCriteria *cleanup.Selector @@ -35,7 +35,7 @@ func (suite *ManagerTestSuite) SetupSuite() { } func (suite *ManagerTestSuite) SetupTest() { - suite.regCli = ®istrytesting.FakeClient{} + suite.regCli = ®istrytesting.Client{} suite.dao = &sysartifactdaotesting.DAO{} suite.cleanupCriteria = &cleanup.Selector{} suite.mgr = &systemArtifactManager{ @@ -62,7 +62,7 @@ func (suite *ManagerTestSuite) TestCreate() { id, err := suite.mgr.Create(orm.NewContext(nil, &ormtesting.FakeOrmer{}), &sa, reader) suite.Equalf(int64(1), id, "Expected row to correctly inserted") suite.NoErrorf(err, "Unexpected error when creating artifact: %v", err) - suite.regCli.AssertCalled(suite.T(), "PushBlob") + suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } func (suite *ManagerTestSuite) TestCreateTimeNotSet() { @@ -79,7 +79,7 @@ func (suite *ManagerTestSuite) TestCreateTimeNotSet() { id, err := suite.mgr.Create(orm.NewContext(nil, &ormtesting.FakeOrmer{}), &sa, reader) suite.Equalf(int64(1), id, "Expected row to correctly inserted") suite.NoErrorf(err, "Unexpected error when creating artifact: %v", err) - suite.regCli.AssertCalled(suite.T(), "PushBlob") + suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything) suite.False(sa.CreateTime.IsZero(), "Create time expected to be set") } @@ -101,7 +101,7 @@ func (suite *ManagerTestSuite) TestCreatePushBlobFails() { suite.Equalf(int64(0), id, "Expected no row to be inserted") suite.Errorf(err, "Expected error when creating artifact: %v", err) suite.dao.AssertCalled(suite.T(), "Create", mock.Anything, &sa, mock.Anything) - suite.regCli.AssertCalled(suite.T(), "PushBlob") + suite.regCli.AssertCalled(suite.T(), "PushBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } func (suite *ManagerTestSuite) TestCreateArtifactRecordFailure() { @@ -148,13 +148,13 @@ func (suite *ManagerTestSuite) TestRead() { defer repoHandle.Close() suite.dao.On("Get", mock.Anything, "test_vendor", "test_repo", "test_digest").Return(&sa, nil).Once() - suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(len(data), repoHandle, nil).Once() + suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(len(data)), repoHandle, nil).Once() readCloser, err := suite.mgr.Read(context.TODO(), "test_vendor", "test_repo", "test_digest") suite.NoErrorf(err, "Unexpected error when reading artifact: %v", err) suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest") - suite.regCli.AssertCalled(suite.T(), "PullBlob") + suite.regCli.AssertCalled(suite.T(), "PullBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything) suite.NotNilf(readCloser, "Expected valid read closer instance but was nil") } @@ -172,7 +172,7 @@ func (suite *ManagerTestSuite) TestReadSystemArtifactRecordNotFound() { errToRet := orm.ErrNoRows suite.dao.On("Get", mock.Anything, "test_vendor", "test_repo", "test_digest").Return(nil, errToRet).Once() - suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(len(data), repoHandle, nil).Once() + suite.regCli.On("PullBlob", mock.Anything, mock.Anything).Return(int64(len(data)), repoHandle, nil).Once() readCloser, err := suite.mgr.Read(context.TODO(), "test_vendor", "test_repo", "test_digest") @@ -238,7 +238,7 @@ func (suite *ManagerTestSuite) TestExist() { suite.NoErrorf(err, "Unexpected error when checking if artifact exists: %v", err) suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest") - suite.regCli.AssertCalled(suite.T(), "BlobExist") + suite.regCli.AssertCalled(suite.T(), "BlobExist", mock.Anything, mock.Anything) suite.True(exists, "Expected exists to be true but was false") } @@ -276,7 +276,7 @@ func (suite *ManagerTestSuite) TestExistSystemArtifactBlobReadError() { suite.Error(err, "Expected error when checking if artifact exists") suite.dao.AssertCalled(suite.T(), "Get", mock.Anything, "test_vendor", "test_repo", "test_digest") - suite.regCli.AssertCalled(suite.T(), "BlobExist") + suite.regCli.AssertCalled(suite.T(), "BlobExist", mock.Anything, mock.Anything) suite.False(exists, "Expected exists to be false but was true") } diff --git a/src/server/v2.0/handler/registry.go b/src/server/v2.0/handler/registry.go index e6ed83a2a..59e6d293a 100644 --- a/src/server/v2.0/handler/registry.go +++ b/src/server/v2.0/handler/registry.go @@ -186,6 +186,12 @@ func (r *registryAPI) GetRegistryInfo(ctx context.Context, params operation.GetR for _, trigger := range info.SupportedTriggers { in.SupportedTriggers = append(in.SupportedTriggers, string(trigger)) } + + // whether support copy by chunk + if info.SupportedCopyByChunk { + in.SupportedCopyByChunk = &info.SupportedCopyByChunk + } + return operation.NewGetRegistryInfoOK().WithPayload(in) } diff --git a/src/server/v2.0/handler/replication.go b/src/server/v2.0/handler/replication.go index f33a757fc..26c793255 100644 --- a/src/server/v2.0/handler/replication.go +++ b/src/server/v2.0/handler/replication.go @@ -108,6 +108,11 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope } policy.Speed = *params.Policy.Speed } + + if params.Policy.CopyByChunk != nil { + policy.CopyByChunk = *params.Policy.CopyByChunk + } + id, err := r.ctl.CreatePolicy(ctx, policy) if err != nil { return r.SendError(ctx, err) @@ -171,6 +176,11 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope } policy.Speed = *params.Policy.Speed } + + if params.Policy.CopyByChunk != nil { + policy.CopyByChunk = *params.Policy.CopyByChunk + } + if err := r.ctl.UpdatePolicy(ctx, policy); err != nil { return r.SendError(ctx, err) } @@ -429,6 +439,7 @@ func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPol ReplicateDeletion: policy.ReplicateDeletion, Speed: &policy.Speed, UpdateTime: strfmt.DateTime(policy.UpdateTime), + CopyByChunk: &policy.CopyByChunk, } if policy.SrcRegistry != nil { p.SrcRegistry = convertRegistry(policy.SrcRegistry) diff --git a/src/testing/pkg/registry/client.go b/src/testing/pkg/registry/client.go deleted file mode 100644 index ca6632ce9..000000000 --- a/src/testing/pkg/registry/client.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package registry - -import ( - "io" - "net/http" - - "github.com/docker/distribution" - "github.com/stretchr/testify/mock" -) - -// FakeClient is a fake registry client that implement src/pkg/registry.Client interface -type FakeClient struct { - mock.Mock -} - -// Ping ... -func (f *FakeClient) Ping() (err error) { - args := f.Called() - return args.Error(0) -} - -// Catalog ... -func (f *FakeClient) Catalog() ([]string, error) { - args := f.Called() - var repositories []string - if args[0] != nil { - repositories = args[0].([]string) - } - return repositories, args.Error(1) -} - -// ListTags ... -func (f *FakeClient) ListTags(repository string) ([]string, error) { - args := f.Called() - var tags []string - if args[0] != nil { - tags = args[0].([]string) - } - return tags, args.Error(1) -} - -// ManifestExist ... -func (f *FakeClient) ManifestExist(repository, reference string) (bool, *distribution.Descriptor, error) { - args := f.Called() - var desc *distribution.Descriptor - if args[0] != nil { - desc = args[0].(*distribution.Descriptor) - } - return args.Bool(0), desc, args.Error(2) -} - -// PullManifest ... -func (f *FakeClient) PullManifest(repository, reference string, acceptedMediaTypes ...string) (distribution.Manifest, string, error) { - args := f.Called() - var manifest distribution.Manifest - if args[0] != nil { - manifest = args[0].(distribution.Manifest) - } - return manifest, args.String(1), args.Error(2) -} - -// PushManifest ... -func (f *FakeClient) PushManifest(repository, reference, mediaType string, payload []byte) (string, error) { - args := f.Called() - return args.String(0), args.Error(1) -} - -// DeleteManifest ... -func (f *FakeClient) DeleteManifest(repository, reference string) error { - args := f.Called() - return args.Error(0) -} - -// BlobExist ... -func (f *FakeClient) BlobExist(repository, digest string) (bool, error) { - args := f.Called() - return args.Bool(0), args.Error(1) -} - -// PullBlob ... -func (f *FakeClient) PullBlob(repository, digest string) (int64, io.ReadCloser, error) { - args := f.Called() - var blob io.ReadCloser - if args[1] != nil { - blob = args[1].(io.ReadCloser) - } - return int64(args.Int(0)), blob, args.Error(2) -} - -// PushBlob ... -func (f *FakeClient) PushBlob(repository, digest string, size int64, blob io.Reader) error { - args := f.Called() - return args.Error(0) -} - -// MountBlob ... -func (f *FakeClient) MountBlob(srcRepository, digest, dstRepository string) (err error) { - args := f.Called() - return args.Error(0) -} - -// DeleteBlob ... -func (f *FakeClient) DeleteBlob(repository, digest string) (err error) { - args := f.Called(repository, digest) - return args.Error(0) -} - -// Copy ... -func (f *FakeClient) Copy(srcRepo, srcRef, dstRepo, dstRef string, override bool) error { - args := f.Called() - return args.Error(0) -} - -func (f *FakeClient) Do(req *http.Request) (*http.Response, error) { - args := f.Called() - var resp *http.Response - if args[0] != nil { - resp = args[0].(*http.Response) - } - return resp, args.Error(1) -} diff --git a/src/testing/pkg/registry/fake_registry_client.go b/src/testing/pkg/registry/fake_registry_client.go index 8200a4e15..f9d0a68a9 100644 --- a/src/testing/pkg/registry/fake_registry_client.go +++ b/src/testing/pkg/registry/fake_registry_client.go @@ -237,6 +237,36 @@ func (_m *Client) PullBlob(repository string, digest string) (int64, io.ReadClos return r0, r1, r2 } +// PullBlobChunk provides a mock function with given fields: repository, digest, blobSize, start, end +func (_m *Client) PullBlobChunk(repository string, digest string, blobSize int64, start int64, end int64) (int64, io.ReadCloser, error) { + ret := _m.Called(repository, digest, blobSize, start, end) + + var r0 int64 + if rf, ok := ret.Get(0).(func(string, string, int64, int64, int64) int64); ok { + r0 = rf(repository, digest, blobSize, start, end) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 io.ReadCloser + if rf, ok := ret.Get(1).(func(string, string, int64, int64, int64) io.ReadCloser); ok { + r1 = rf(repository, digest, blobSize, start, end) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(io.ReadCloser) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string, int64, int64, int64) error); ok { + r2 = rf(repository, digest, blobSize, start, end) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // PullManifest provides a mock function with given fields: repository, reference, acceptedMediaTypes func (_m *Client) PullManifest(repository string, reference string, acceptedMediaTypes ...string) (distribution.Manifest, string, error) { _va := make([]interface{}, len(acceptedMediaTypes)) @@ -288,6 +318,34 @@ func (_m *Client) PushBlob(repository string, digest string, size int64, blob io return r0 } +// PushBlobChunk provides a mock function with given fields: repository, digest, blobSize, chunk, start, end, location +func (_m *Client) PushBlobChunk(repository string, digest string, blobSize int64, chunk io.Reader, start int64, end int64, location string) (string, int64, error) { + ret := _m.Called(repository, digest, blobSize, chunk, start, end, location) + + var r0 string + if rf, ok := ret.Get(0).(func(string, string, int64, io.Reader, int64, int64, string) string); ok { + r0 = rf(repository, digest, blobSize, chunk, start, end, location) + } else { + r0 = ret.Get(0).(string) + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(string, string, int64, io.Reader, int64, int64, string) int64); ok { + r1 = rf(repository, digest, blobSize, chunk, start, end, location) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string, int64, io.Reader, int64, int64, string) error); ok { + r2 = rf(repository, digest, blobSize, chunk, start, end, location) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // PushManifest provides a mock function with given fields: repository, reference, mediaType, payload func (_m *Client) PushManifest(repository string, reference string, mediaType string, payload []byte) (string, error) { ret := _m.Called(repository, reference, mediaType, payload)