Implement deletion for image transter

This commit implements the delete function for image transfer

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-03-22 14:58:12 +08:00
parent 791aecddfa
commit c3b02dd104
6 changed files with 83 additions and 3 deletions

View File

@ -41,6 +41,7 @@ type ImageRegistry interface {
ManifestExist(repository, reference string) (exist bool, digest string, err error) ManifestExist(repository, reference string) (exist bool, digest string, err error)
PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error)
PushManifest(repository, reference, mediaType string, payload []byte) error PushManifest(repository, reference, mediaType string, payload []byte) error
DeleteManifest(repository, digest string) error
BlobExist(repository, digest string) (exist bool, err error) BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error PushBlob(repository, digest string, size int64, blob io.Reader) error
@ -166,6 +167,18 @@ func (d *DefaultImageRegistry) PushManifest(repository, reference, mediaType str
return err return err
} }
// TODO monitor the registry API request in core directly rather than using
// the web hook
// DeleteManifest ...
func (d *DefaultImageRegistry) DeleteManifest(repository, digest string) error {
client, err := d.getClient(repository)
if err != nil {
return err
}
return client.DeleteManifest(digest)
}
// BlobExist ... // BlobExist ...
func (d *DefaultImageRegistry) BlobExist(repository, digest string) (bool, error) { func (d *DefaultImageRegistry) BlobExist(repository, digest string) (bool, error) {
client, err := d.getClient(repository) client, err := d.getClient(repository)

View File

@ -108,6 +108,9 @@ func (f *fakedAdapter) PullManifest(repository, reference string, accepttedMedia
func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error { func (f *fakedAdapter) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil return nil
} }
func (f *fakedAdapter) DeleteManifest(repository, digest string) error {
return nil
}
func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) { func (f *fakedAdapter) BlobExist(repository, digest string) (exist bool, err error) {
return false, nil return false, nil
} }

View File

@ -104,6 +104,7 @@ func (d *defaultController) createFlow(executionID int64, policy *model.Policy,
}, },
} }
filters = append(filters, policy.Filters...) filters = append(filters, policy.Filters...)
policy.Filters = filters
} }
return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy) return flow.NewCopyFlow(d.executionMgr, d.registryMgr, d.scheduler, executionID, policy)
} }

View File

@ -176,9 +176,20 @@ func (t *transfer) copy(src, dst *chart, override bool) error {
} }
func (t *transfer) delete(chart *chart) error { func (t *transfer) delete(chart *chart) error {
exist, err := t.dst.ChartExist(chart.name, chart.version)
if err != nil {
t.logger.Errorf("failed to check the existence of chart %s:%s on the destination registry: %v", chart.name, chart.version, err)
return err
}
if !exist {
t.logger.Infof("the chart %s:%s doesn't exist on the destination registry, skip",
chart.name, chart.version)
return nil
}
t.logger.Infof("deleting the chart %s:%s on the destination registry...", chart.name, chart.version) t.logger.Infof("deleting the chart %s:%s on the destination registry...", chart.name, chart.version)
if err := t.dst.DeleteChart(chart.name, chart.version); err != nil { if err := t.dst.DeleteChart(chart.name, chart.version); err != nil {
t.logger.Errorf("failed to delete the chart %s:%s on the destination registry", chart.name, chart.version) t.logger.Errorf("failed to delete the chart %s:%s on the destination registry: %v", chart.name, chart.version, err)
return err return err
} }
t.logger.Infof("delete the chart %s:%s on the destination registry completed", chart.name, chart.version) t.logger.Infof("delete the chart %s:%s on the destination registry completed", chart.name, chart.version)

View File

@ -214,7 +214,7 @@ func (t *transfer) pullManifest(repository, tag string) (
func (t *transfer) exist(repository, tag string) (bool, string, error) { func (t *transfer) exist(repository, tag string) (bool, string, error) {
exist, digest, err := t.dst.ManifestExist(repository, tag) exist, digest, err := t.dst.ManifestExist(repository, tag)
if err != nil { if err != nil {
t.logger.Errorf("failed to check the existence of the manifest of iage %s:%s in the destination registry: %v", t.logger.Errorf("failed to check the existence of the manifest of image %s:%s on the destination registry: %v",
repository, tag, err) repository, tag, err)
return false, "", err return false, "", err
} }
@ -275,6 +275,36 @@ func (t *transfer) pushManifest(manifest distribution.Manifest, repository, tag
} }
func (t *transfer) delete(repo *repository) error { func (t *transfer) delete(repo *repository) error {
// TODO if t.shouldStop() {
return jobStoppedErr
}
digests := map[string]struct{}{}
repository := repo.repository
for _, tag := range repo.tags {
exist, digest, err := t.dst.ManifestExist(repository, tag)
if err != nil {
t.logger.Errorf("failed to check the existence of the manifest of image %s:%s on the destination registry: %v",
repository, tag, err)
return err
}
if !exist {
t.logger.Infof("the image %s:%s doesn't exist on the destination registry, skip",
repository, tag)
continue
}
t.logger.Infof("the digest of image %s:%s is %s", repository, tag, digest)
if _, exist := digests[digest]; !exist {
digests[digest] = struct{}{}
}
}
for digest := range digests {
if err := t.dst.DeleteManifest(repository, digest); err != nil {
t.logger.Errorf("failed to delete the manifest of image %s:%s on the destination registry: %v",
repository, digest, err)
return err
}
t.logger.Infof("the manifest of image %s:%s is deleted", repository, digest)
}
return nil return nil
} }

View File

@ -38,6 +38,9 @@ func (f *fakeRegistry) FetchImages([]string, []*model.Filter) ([]*model.Resource
} }
func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, string, error) { func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, string, error) {
if repository == "destination" && reference == "b1" {
return true, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
} }
func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
@ -78,6 +81,9 @@ func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMedia
func (f *fakeRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error { func (f *fakeRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil return nil
} }
func (f *fakeRegistry) DeleteManifest(repository, reference string) error {
return nil
}
func (f *fakeRegistry) BlobExist(repository, digest string) (bool, error) { func (f *fakeRegistry) BlobExist(repository, digest string) (bool, error) {
return false, nil return false, nil
} }
@ -134,3 +140,19 @@ func TestCopy(t *testing.T) {
err := tr.copy(src, dst, override) err := tr.copy(src, dst, override)
require.Nil(t, err) require.Nil(t, err)
} }
func TestDelete(t *testing.T) {
stopFunc := func() bool { return false }
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
dst: &fakeRegistry{},
}
repo := &repository{
repository: "destination",
tags: []string{"b1", "b2"},
}
err := tr.delete(repo)
require.Nil(t, err)
}