diff --git a/src/replication/ng/adapter/image_registry.go b/src/replication/ng/adapter/image_registry.go index 1b0ebe00e..8bf769756 100644 --- a/src/replication/ng/adapter/image_registry.go +++ b/src/replication/ng/adapter/image_registry.go @@ -41,6 +41,7 @@ type ImageRegistry interface { ManifestExist(repository, reference string) (exist bool, digest string, err error) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) PushManifest(repository, reference, mediaType string, payload []byte) error + DeleteManifest(repository, digest string) error BlobExist(repository, digest string) (exist bool, err error) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) PushBlob(repository, digest string, size int64, blob io.Reader) error @@ -166,6 +167,18 @@ func (d *DefaultImageRegistry) PushManifest(repository, reference, mediaType str return err } +// TODO monitor the registry API request in core directly rather than using +// the web hook + +// DeleteManifest ... +func (d *DefaultImageRegistry) DeleteManifest(repository, digest string) error { + client, err := d.getClient(repository) + if err != nil { + return err + } + return client.DeleteManifest(digest) +} + // BlobExist ... func (d *DefaultImageRegistry) BlobExist(repository, digest string) (bool, error) { client, err := d.getClient(repository) diff --git a/src/replication/ng/flow/stage_test.go b/src/replication/ng/flow/stage_test.go index f673f2f17..7d4b12377 100644 --- a/src/replication/ng/flow/stage_test.go +++ b/src/replication/ng/flow/stage_test.go @@ -108,6 +108,9 @@ func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMedia func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error { return nil } +func (f *fakedAdapter) DeleteManifest(repository, digest string) error { + return nil +} func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) { return false, nil } diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index e5b5f4088..e38ebd27c 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -104,6 +104,7 @@ func (d *defaultController) createFlow(executionID int64, policy *model.Policy, }, } filters = append(filters, policy.Filters...) + policy.Filters = filters } return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy) } diff --git a/src/replication/ng/transfer/chart/transfer.go b/src/replication/ng/transfer/chart/transfer.go index d11a041c4..90558b7b9 100644 --- a/src/replication/ng/transfer/chart/transfer.go +++ b/src/replication/ng/transfer/chart/transfer.go @@ -176,9 +176,20 @@ func (t *transfer) copy(src, dst *chart, override bool) error { } func (t *transfer) delete(chart *chart) error { + exist, err := t.dst.ChartExist(chart.name, chart.version) + if err != nil { + t.logger.Errorf("failed to check the existence of chart %s:%s on the destination registry: %v", chart.name, chart.version, err) + return err + } + if !exist { + t.logger.Infof("the chart %s:%s doesn't exist on the destination registry, skip", + chart.name, chart.version) + return nil + } + t.logger.Infof("deleting the chart %s:%s on the destination registry...", chart.name, chart.version) if err := t.dst.DeleteChart(chart.name, chart.version); err != nil { - t.logger.Errorf("failed to delete the chart %s:%s on the destination registry", chart.name, chart.version) + t.logger.Errorf("failed to delete the chart %s:%s on the destination registry: %v", chart.name, chart.version, err) return err } t.logger.Infof("delete the chart %s:%s on the destination registry completed", chart.name, chart.version) diff --git a/src/replication/ng/transfer/repository/transfer.go b/src/replication/ng/transfer/repository/transfer.go index 2acd1755f..ab95dd776 100644 --- a/src/replication/ng/transfer/repository/transfer.go +++ b/src/replication/ng/transfer/repository/transfer.go @@ -214,7 +214,7 @@ func (t *transfer) pullManifest(repository, tag string) ( func (t *transfer) exist(repository, tag string) (bool, string, error) { exist, digest, err := t.dst.ManifestExist(repository, tag) if err != nil { - t.logger.Errorf("failed to check the existence of the manifest of iage %s:%s in the destination registry: %v", + t.logger.Errorf("failed to check the existence of the manifest of image %s:%s on the destination registry: %v", repository, tag, err) return false, "", err } @@ -275,6 +275,36 @@ func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag } func (t *transfer) delete(repo *repository) error { - // TODO + if t.shouldStop() { + return jobStoppedErr + } + + digests := map[string]struct{}{} + repository := repo.repository + for _, tag := range repo.tags { + exist, digest, err := t.dst.ManifestExist(repository, tag) + if err != nil { + t.logger.Errorf("failed to check the existence of the manifest of image %s:%s on the destination registry: %v", + repository, tag, err) + return err + } + if !exist { + t.logger.Infof("the image %s:%s doesn't exist on the destination registry, skip", + repository, tag) + continue + } + t.logger.Infof("the digest of image %s:%s is %s", repository, tag, digest) + if _, exist := digests[digest]; !exist { + digests[digest] = struct{}{} + } + } + for digest := range digests { + if err := t.dst.DeleteManifest(repository, digest); err != nil { + t.logger.Errorf("failed to delete the manifest of image %s:%s on the destination registry: %v", + repository, digest, err) + return err + } + t.logger.Infof("the manifest of image %s:%s is deleted", repository, digest) + } return nil } diff --git a/src/replication/ng/transfer/repository/transfer_test.go b/src/replication/ng/transfer/repository/transfer_test.go index f73798c5e..71ecf150e 100644 --- a/src/replication/ng/transfer/repository/transfer_test.go +++ b/src/replication/ng/transfer/repository/transfer_test.go @@ -38,6 +38,9 @@ func (f *fakeRegistry) FetchImages([]string, []*model.Filter) ([]*model.Resource } func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, string, error) { + if repository == "destination" && reference == "b1" { + return true, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil + } return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil } func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { @@ -78,6 +81,9 @@ func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMedia func (f *fakeRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error { return nil } +func (f *fakeRegistry) DeleteManifest(repository, reference string) error { + return nil +} func (f *fakeRegistry) BlobExist(repository, digest string) (bool, error) { return false, nil } @@ -134,3 +140,19 @@ func TestCopy(t *testing.T) { err := tr.copy(src, dst, override) require.Nil(t, err) } + +func TestDelete(t *testing.T) { + stopFunc := func() bool { return false } + tr := &transfer{ + logger: log.DefaultLogger(), + isStopped: stopFunc, + dst: &fakeRegistry{}, + } + + repo := &repository{ + repository: "destination", + tags: []string{"b1", "b2"}, + } + err := tr.delete(repo) + require.Nil(t, err) +}