fixes gc dry run issue (#15804)

fixes #15332, for the dry run mode, gc job should not remove the untagged candidates.
To fix it, use the simulate untagged artifact deletion for dry-run.

Signed-off-by: Wang Yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2021-10-19 07:42:54 +08:00 committed by GitHub
parent 3f75f0db32
commit 6014646bcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 181 additions and 36 deletions

View File

@ -25,12 +25,12 @@ import (
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/lib/q"
redislib "github.com/goharbor/harbor/src/lib/redis" redisLib "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/lib/retry" "github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/artifactrash" "github.com/goharbor/harbor/src/pkg/artifactrash"
"github.com/goharbor/harbor/src/pkg/artifactrash/model" "github.com/goharbor/harbor/src/pkg/artifactrash/model"
"github.com/goharbor/harbor/src/pkg/blob" "github.com/goharbor/harbor/src/pkg/blob"
blob_models "github.com/goharbor/harbor/src/pkg/blob/models" blobModels "github.com/goharbor/harbor/src/pkg/blob/models"
"github.com/goharbor/harbor/src/registryctl/client" "github.com/goharbor/harbor/src/registryctl/client"
) )
@ -61,7 +61,7 @@ type GarbageCollector struct {
// As table blob has no repositories data, and the repositories are required when to delete a manifest, so use the table ArtifactTrash to capture them. // As table blob has no repositories data, and the repositories are required when to delete a manifest, so use the table ArtifactTrash to capture them.
trashedArts map[string][]model.ArtifactTrash trashedArts map[string][]model.ArtifactTrash
// hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep. // hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep.
deleteSet []*blob_models.Blob deleteSet []*blobModels.Blob
timeWindowHours int64 timeWindowHours int64
} }
@ -88,7 +88,7 @@ func (gc *GarbageCollector) Validate(params job.Parameters) error {
func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error { func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
regCtlInit() regCtlInit()
gc.logger = ctx.GetLogger() gc.logger = ctx.GetLogger()
gc.deleteSet = make([]*blob_models.Blob, 0) gc.deleteSet = make([]*blobModels.Blob, 0)
gc.trashedArts = make(map[string][]model.ArtifactTrash, 0) gc.trashedArts = make(map[string][]model.ArtifactTrash, 0)
opCmd, flag := ctx.OPCommand() opCmd, flag := ctx.OPCommand()
if flag && opCmd.IsStop() { if flag && opCmd.IsStop() {
@ -190,14 +190,14 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
// get gc candidates, and set the repositories. // get gc candidates, and set the repositories.
// AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly. // AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly.
untaggedBlobs := gc.markOrSweepUntaggedBlobs(ctx) orphanBlobs := gc.markOrSweepUntaggedBlobs(ctx)
blobs, err := gc.blobMgr.UselessBlobs(ctx.SystemContext(), gc.timeWindowHours) blobs, err := gc.uselessBlobs(ctx)
if err != nil { if err != nil {
gc.logger.Errorf("failed to get gc candidate: %v", err) gc.logger.Errorf("failed to get gc candidate: %v", err)
return err return err
} }
if len(untaggedBlobs) != 0 { if len(orphanBlobs) != 0 {
blobs = append(blobs, untaggedBlobs...) blobs = append(blobs, orphanBlobs...)
} }
if len(blobs) == 0 { if len(blobs) == 0 {
gc.logger.Info("no need to execute GC as there is no non referenced artifacts.") gc.logger.Info("no need to execute GC as there is no non referenced artifacts.")
@ -210,7 +210,7 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
makeSize := int64(0) makeSize := int64(0)
for _, blob := range blobs { for _, blob := range blobs {
if !gc.dryRun { if !gc.dryRun {
blob.Status = blob_models.StatusDelete blob.Status = blobModels.StatusDelete
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil { if err != nil {
gc.logger.Warningf("failed to mark gc candidate, skip it.: %s, error: %v", blob.Digest, err) gc.logger.Warningf("failed to mark gc candidate, skip it.: %s, error: %v", blob.Digest, err)
@ -243,7 +243,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
sweepSize := int64(0) sweepSize := int64(0)
for _, blob := range gc.deleteSet { for _, blob := range gc.deleteSet {
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
blob.Status = blob_models.StatusDeleting blob.Status = blobModels.StatusDeleting
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil { if err != nil {
gc.logger.Errorf("failed to mark gc candidate deleting, skip: %s, %s", blob.Digest, blob.Status) gc.logger.Errorf("failed to mark gc candidate deleting, skip: %s, %s", blob.Digest, blob.Status)
@ -335,7 +335,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
// cleanCache is to clean the registry cache for GC. // cleanCache is to clean the registry cache for GC.
// To do this is because the issue https://github.com/docker/distribution/issues/2094 // To do this is because the issue https://github.com/docker/distribution/issues/2094
func (gc *GarbageCollector) cleanCache() error { func (gc *GarbageCollector) cleanCache() error {
pool, err := redislib.GetRedisPool("GarbageCollector", gc.redisURL, &redislib.PoolParam{ pool, err := redisLib.GetRedisPool("GarbageCollector", gc.redisURL, &redisLib.PoolParam{
PoolMaxIdle: 0, PoolMaxIdle: 0,
PoolMaxActive: 1, PoolMaxActive: 1,
PoolIdleTimeout: 60 * time.Second, PoolIdleTimeout: 60 * time.Second,
@ -366,20 +366,24 @@ func (gc *GarbageCollector) cleanCache() error {
return nil return nil
} }
// deletedArt contains the two parts of artifact // deletedArt contains the two parts of artifact, no actually deletion for dry run mode.
// 1, required part, the artifacts were removed from Harbor. // 1, required part, the artifacts were removed from Harbor.
// 2, optional part, the untagged artifacts. // 2, optional part, the untagged artifacts.
func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.ArtifactTrash, error) { func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.ArtifactTrash, error) {
if os.Getenv("UTTEST") == "true" { if os.Getenv("UTTEST") == "true" {
gc.logger = ctx.GetLogger() gc.logger = ctx.GetLogger()
} }
arts := make([]model.ArtifactTrash, 0)
// allTrashedArts contains the artifacts that actual removed and simulate removed(for dry run).
allTrashedArts := make([]model.ArtifactTrash, 0)
untaggedArts := make([]*artifact.Artifact, 0)
var err error
// artMap : map[digest : []ArtifactTrash list] // artMap : map[digest : []ArtifactTrash list]
artMap := make(map[string][]model.ArtifactTrash) artMap := make(map[string][]model.ArtifactTrash)
// handle the optional ones, and the artifact controller will move them into trash. // handle the optional ones, and the artifact controller will move them into trash.
if gc.deleteUntagged { if gc.deleteUntagged {
untagged, err := gc.artCtl.List(ctx.SystemContext(), &q.Query{ untaggedArts, err = gc.artCtl.List(ctx.SystemContext(), &q.Query{
Keywords: map[string]interface{}{ Keywords: map[string]interface{}{
"Tags": "nil", "Tags": "nil",
}, },
@ -387,29 +391,44 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti
if err != nil { if err != nil {
return artMap, err return artMap, err
} }
gc.logger.Info("start to delete untagged artifact.") gc.logger.Info("start to delete untagged artifact (no actually deletion for dry-run mode)")
for _, art := range untagged { for _, untagged := range untaggedArts {
if err := gc.artCtl.Delete(ctx.SystemContext(), art.ID); err != nil { // for dryRun, just simulate the artifact deletion, move the artifact to artifact trash
if gc.dryRun {
simulateDeletion := model.ArtifactTrash{
MediaType: untagged.MediaType,
ManifestMediaType: untagged.ManifestMediaType,
RepositoryName: untagged.RepositoryName,
Digest: untagged.Digest,
}
allTrashedArts = append(allTrashedArts, simulateDeletion)
} else {
if err := gc.artCtl.Delete(ctx.SystemContext(), untagged.ID); err != nil {
// the failure ones can be GCed by the next execution // the failure ones can be GCed by the next execution
gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", art.ID, err) gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", untagged.ID, err)
continue continue
} }
gc.logger.Infof("delete the untagged artifact: ProjectID:(%d)-RepositoryName(%s)-MediaType:(%s)-Digest:(%s)",
art.ProjectID, art.RepositoryName, art.ManifestMediaType, art.Digest)
} }
gc.logger.Info("end to delete untagged artifact.") gc.logger.Infof("delete the untagged artifact: ProjectID:(%d)-RepositoryName(%s)-MediaType:(%s)-Digest:(%s)",
untagged.ProjectID, untagged.RepositoryName, untagged.ManifestMediaType, untagged.Digest)
}
gc.logger.Info("end to delete untagged artifact (no actually deletion for dry-run mode)")
} }
// filter gets all of deleted artifact, here do not need time window as the manifest candidate has to remove all of its reference. // filter gets all of actually deleted artifact, here do not need time window as the manifest candidate has to remove all of its reference.
arts, err := gc.artrashMgr.Filter(ctx.SystemContext(), 0) // For dryRun, no need to get the actual deletion artifacts since the return map is for the mark phase to call v2 remove manifest.
if !gc.dryRun {
actualDeletions, err := gc.artrashMgr.Filter(ctx.SystemContext(), 0)
if err != nil { if err != nil {
return artMap, err return artMap, err
} }
allTrashedArts = append(allTrashedArts, actualDeletions...)
}
// group the deleted artifact by digest. The repositories of blob is needed when to delete as a manifest. // group the deleted artifact by digest. The repositories of blob is needed when to delete as a manifest.
if len(arts) > 0 { if len(allTrashedArts) > 0 {
gc.logger.Info("artifact trash candidates.") gc.logger.Info("artifact trash candidates.")
for _, art := range arts { for _, art := range allTrashedArts {
gc.logger.Info(art.String()) gc.logger.Info(art.String())
_, exist := artMap[art.Digest] _, exist := artMap[art.Digest]
if !exist { if !exist {
@ -428,8 +447,8 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti
// mark or sweep the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC // mark or sweep the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC
// * dry-run, find and return the untagged blobs // * dry-run, find and return the untagged blobs
// * non dry-run, remove the reference of the untagged blobs // * non dry-run, remove the reference of the untagged blobs
func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blob_models.Blob { func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobModels.Blob {
var untaggedBlobs []*blob_models.Blob var orphanBlobs []*blobModels.Blob
for result := range project.ListAll(ctx.SystemContext(), 50, nil, project.Metadata(false)) { for result := range project.ListAll(ctx.SystemContext(), 50, nil, project.Metadata(false)) {
if result.Error != nil { if result.Error != nil {
gc.logger.Errorf("remove untagged blobs for all projects got error: %v", result.Error) gc.logger.Errorf("remove untagged blobs for all projects got error: %v", result.Error)
@ -447,7 +466,7 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blob_mo
blobRG := q.Range{ blobRG := q.Range{
Min: lastBlobID, Min: lastBlobID,
} }
q := &q.Query{ query := &q.Query{
Keywords: map[string]interface{}{ Keywords: map[string]interface{}{
"update_time": &timeRG, "update_time": &timeRG,
"projectID": p.ProjectID, "projectID": p.ProjectID,
@ -459,7 +478,7 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blob_mo
q.NewSort("id", false), q.NewSort("id", false),
}, },
} }
blobs, err := gc.blobMgr.List(ctx.SystemContext(), q) blobs, err := gc.blobMgr.List(ctx.SystemContext(), query)
if err != nil { if err != nil {
gc.logger.Errorf("failed to get blobs of project: %d, %v", p.ProjectID, err) gc.logger.Errorf("failed to get blobs of project: %d, %v", p.ProjectID, err)
break break
@ -470,7 +489,7 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blob_mo
gc.logger.Errorf("failed to find untagged blobs of project: %d, %v", p.ProjectID, err) gc.logger.Errorf("failed to find untagged blobs of project: %d, %v", p.ProjectID, err)
break break
} }
untaggedBlobs = append(untaggedBlobs, unassociated...) orphanBlobs = append(orphanBlobs, unassociated...)
} else { } else {
if err := gc.blobMgr.CleanupAssociationsForProject(ctx.SystemContext(), p.ProjectID, blobs); err != nil { if err := gc.blobMgr.CleanupAssociationsForProject(ctx.SystemContext(), p.ProjectID, blobs); err != nil {
gc.logger.Errorf("failed to clean untagged blobs of project: %d, %v", p.ProjectID, err) gc.logger.Errorf("failed to clean untagged blobs of project: %d, %v", p.ProjectID, err)
@ -483,12 +502,39 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blob_mo
lastBlobID = blobs[len(blobs)-1].ID lastBlobID = blobs[len(blobs)-1].ID
} }
} }
return untaggedBlobs return orphanBlobs
}
func (gc *GarbageCollector) uselessBlobs(ctx job.Context) ([]*blobModels.Blob, error) {
var blobs []*blobModels.Blob
var err error
blobs, err = gc.blobMgr.UselessBlobs(ctx.SystemContext(), gc.timeWindowHours)
if err != nil {
gc.logger.Errorf("failed to get gc useless blobs: %v", err)
return blobs, err
}
// For dryRun, it needs to append the blobs that are associated with untagged artifact.
// Do it since the it doesn't remove the untagged artifact in dry run mode. All the blobs of untagged artifact are referenced by project,
// so they cannot get by the above UselessBlobs method.
// In dryRun mode, trashedArts only contains the mock deletion artifact.
if gc.dryRun {
for artDigest := range gc.trashedArts {
artBlobs, err := gc.blobMgr.GetByArt(ctx.SystemContext(), artDigest)
if err != nil {
return blobs, err
}
blobs = append(blobs, artBlobs...)
}
}
return blobs, err
} }
// markDeleteFailed set the blob status to StatusDeleteFailed // markDeleteFailed set the blob status to StatusDeleteFailed
func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blob_models.Blob) error { func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blobModels.Blob) error {
blob.Status = blob_models.StatusDeleteFailed blob.Status = blobModels.StatusDeleteFailed
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil { if err != nil {
gc.logger.Errorf("failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status) gc.logger.Errorf("failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status)

View File

@ -79,6 +79,9 @@ type DAO interface {
// GetBlobsNotRefedByProjectBlob get the blobs that are not referenced by the table project_blob and also not in the reserve window(in hours) // GetBlobsNotRefedByProjectBlob get the blobs that are not referenced by the table project_blob and also not in the reserve window(in hours)
GetBlobsNotRefedByProjectBlob(ctx context.Context, timeWindowHours int64) ([]*models.Blob, error) GetBlobsNotRefedByProjectBlob(ctx context.Context, timeWindowHours int64) ([]*models.Blob, error)
// GetBlobsByArtDigest get the blobs that are referenced by artifact
GetBlobsByArtDigest(ctx context.Context, digest string) ([]*models.Blob, error)
} }
// New returns an instance of the default DAO // New returns an instance of the default DAO
@ -401,3 +404,19 @@ func (d *dao) GetBlobsNotRefedByProjectBlob(ctx context.Context, timeWindowHours
return noneRefed, nil return noneRefed, nil
} }
func (d *dao) GetBlobsByArtDigest(ctx context.Context, digest string) ([]*models.Blob, error) {
var blobs []*models.Blob
ormer, err := orm.FromContext(ctx)
if err != nil {
return blobs, err
}
sql := `SELECT b.id, b.digest, b.content_type, b.status, b.version, b.size FROM artifact_blob AS ab LEFT JOIN blob b ON ab.digest_blob = b.digest WHERE ab.digest_af = ?`
_, err = ormer.Raw(sql, digest).QueryRows(&blobs)
if err != nil {
return blobs, err
}
return blobs, nil
}

View File

@ -457,6 +457,31 @@ func (suite *DaoTestSuite) TestGetBlobsNotRefedByProjectBlob() {
suite.Require().Equal(0, len(blobs)) suite.Require().Equal(0, len(blobs))
} }
func (suite *DaoTestSuite) GetBlobsByArtDigest() {
ctx := suite.Context()
afDigest := suite.DigestString()
blobs, err := suite.dao.GetBlobsByArtDigest(ctx, afDigest)
suite.Nil(err)
suite.Require().Equal(0, len(blobs))
suite.dao.CreateBlob(ctx, &models.Blob{Digest: afDigest})
blobDigest1 := suite.DigestString()
blobDigest2 := suite.DigestString()
suite.dao.CreateBlob(ctx, &models.Blob{Digest: blobDigest1})
suite.dao.CreateBlob(ctx, &models.Blob{Digest: blobDigest2})
_, err = suite.dao.CreateArtifactAndBlob(ctx, afDigest, afDigest)
suite.Nil(err)
_, err = suite.dao.CreateArtifactAndBlob(ctx, afDigest, blobDigest1)
suite.Nil(err)
_, err = suite.dao.CreateArtifactAndBlob(ctx, afDigest, blobDigest2)
suite.Nil(err)
blobs, err = suite.dao.GetBlobsByArtDigest(ctx, afDigest)
suite.Nil(err)
suite.Require().Equal(3, len(blobs))
}
func TestDaoTestSuite(t *testing.T) { func TestDaoTestSuite(t *testing.T) {
suite.Run(t, &DaoTestSuite{}) suite.Run(t, &DaoTestSuite{})
} }

View File

@ -59,6 +59,9 @@ type Manager interface {
// Get get blob by digest // Get get blob by digest
Get(ctx context.Context, digest string) (*Blob, error) Get(ctx context.Context, digest string) (*Blob, error)
// Get get blob by artifact digest
GetByArt(ctx context.Context, digest string) ([]*models.Blob, error)
// Update the blob // Update the blob
Update(ctx context.Context, blob *Blob) error Update(ctx context.Context, blob *Blob) error
@ -125,6 +128,10 @@ func (m *manager) Get(ctx context.Context, digest string) (*Blob, error) {
return m.dao.GetBlobByDigest(ctx, digest) return m.dao.GetBlobByDigest(ctx, digest)
} }
func (m *manager) GetByArt(ctx context.Context, digest string) ([]*models.Blob, error) {
return m.dao.GetBlobsByArtDigest(ctx, digest)
}
func (m *manager) Update(ctx context.Context, blob *Blob) error { func (m *manager) Update(ctx context.Context, blob *Blob) error {
return m.dao.UpdateBlob(ctx, blob) return m.dao.UpdateBlob(ctx, blob)
} }

View File

@ -439,6 +439,31 @@ func (suite *ManagerTestSuite) TestUselessBlobs() {
suite.Require().Equal(0, len(blobs)) suite.Require().Equal(0, len(blobs))
} }
func (suite *ManagerTestSuite) GetBlobsByArtDigest() {
ctx := suite.Context()
afDigest := suite.DigestString()
blobs, err := Mgr.GetByArt(ctx, afDigest)
suite.Nil(err)
suite.Require().Equal(0, len(blobs))
Mgr.Create(ctx, suite.DigestString(), "media type", 100)
blobDigest1 := suite.DigestString()
blobDigest2 := suite.DigestString()
Mgr.Create(ctx, blobDigest1, "media type", 100)
Mgr.Create(ctx, blobDigest2, "media type", 100)
_, err = Mgr.AssociateWithArtifact(ctx, afDigest, afDigest)
suite.Nil(err)
_, err = Mgr.AssociateWithArtifact(ctx, afDigest, blobDigest1)
suite.Nil(err)
_, err = Mgr.AssociateWithArtifact(ctx, afDigest, blobDigest2)
suite.Nil(err)
blobs, err = Mgr.List(ctx, q.New(q.KeyWords{"artifactDigest": afDigest}))
suite.Nil(err)
suite.Require().Equal(3, len(blobs))
}
func TestManagerTestSuite(t *testing.T) { func TestManagerTestSuite(t *testing.T) {
suite.Run(t, &ManagerTestSuite{}) suite.Run(t, &ManagerTestSuite{})
} }

View File

@ -209,6 +209,29 @@ func (_m *Manager) Get(ctx context.Context, digest string) (*models.Blob, error)
return r0, r1 return r0, r1
} }
// GetByArt provides a mock function with given fields: ctx, digest
func (_m *Manager) GetByArt(ctx context.Context, digest string) ([]*models.Blob, error) {
ret := _m.Called(ctx, digest)
var r0 []*models.Blob
if rf, ok := ret.Get(0).(func(context.Context, string) []*models.Blob); ok {
r0 = rf(ctx, digest)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*models.Blob)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, digest)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// List provides a mock function with given fields: ctx, query // List provides a mock function with given fields: ctx, query
func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*models.Blob, error) { func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*models.Blob, error) {
ret := _m.Called(ctx, query) ret := _m.Called(ctx, query)