mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-22 00:27:44 +01:00
revise gc job to align non blocking gc (#12439)
two phases: 1, mark, select the gc candidates bases on the DB and mark them as status delete. 2, sweep, select the candidate and mark it as status deleting and remove it from backend and database. Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
parent
4d4a04fad4
commit
d73265d10d
@ -15,8 +15,11 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
|
||||
blob_models "github.com/goharbor/harbor/src/pkg/blob/models"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
@ -27,8 +30,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/pkg/blob"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
"github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/goharbor/harbor/src/common/registryctl"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
@ -37,21 +38,6 @@ import (
|
||||
|
||||
var (
|
||||
regCtlInit = registryctl.Init
|
||||
|
||||
getReadOnly = func(cfgMgr *config.CfgManager) (bool, error) {
|
||||
if err := cfgMgr.Load(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return cfgMgr.Get(common.ReadOnly).GetBool(), nil
|
||||
}
|
||||
|
||||
setReadOnly = func(cfgMgr *config.CfgManager, switcher bool) error {
|
||||
cfg := map[string]interface{}{
|
||||
common.ReadOnly: switcher,
|
||||
}
|
||||
cfgMgr.UpdateConfig(cfg)
|
||||
return cfgMgr.Save()
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
@ -71,10 +57,16 @@ type GarbageCollector struct {
|
||||
projectCtl project.Controller
|
||||
registryCtlClient client.Client
|
||||
logger logger.Interface
|
||||
cfgMgr *config.CfgManager
|
||||
CoreURL string
|
||||
redisURL string
|
||||
deleteUntagged bool
|
||||
dryRun bool
|
||||
// holds all of trashed artifacts' digest and repositories.
|
||||
// The source data of trashedArts is the table ArtifactTrash and it's only used as a dictionary by sweep when to delete a manifest.
|
||||
// As table blob has no repositories data, and the repositories are required when to delete a manifest, so use the table ArtifactTrash to capture them.
|
||||
trashedArts map[string][]string
|
||||
// hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep.
|
||||
deleteSet []*blob_models.Blob
|
||||
timeWindowHours int64
|
||||
}
|
||||
|
||||
// MaxFails implements the interface in job/Interface
|
||||
@ -97,58 +89,11 @@ func (gc *GarbageCollector) Validate(params job.Parameters) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run implements the interface in job/Interface
|
||||
// The workflow of GC is:
|
||||
// 1, set harbor to readonly
|
||||
// 2, select the candidate artifacts from Harbor DB.
|
||||
// 3, call registry API(--delete-untagged=false) to delete manifest bases on the results of #2
|
||||
// 4, clean keys of redis DB of registry, clean artifact trash and untagged from DB.
|
||||
// 5, roll back readonly.
|
||||
// More details:
|
||||
// 1, why disable delete untagged when to call registry API
|
||||
// Generally because that we introduce Harbor tag in v2.0, it's in database but no corresponding data in registry.
|
||||
// Also one failure case example:
|
||||
// there are two parts for putting an manifest in Harbor: write database and write storage, but they're not in a transaction,
|
||||
// which leads to the data mismatching in parallel pushing images with same tag but different digest. The valid artifact in
|
||||
// harbor DB could be a untagged one in the storage. If we enable the delete untagged, the valid data could be removed from the storage.
|
||||
// 2, what to be cleaned
|
||||
// > the deleted artifact, bases on table of artifact_trash and artifact
|
||||
// > the untagged artifact(optional), bases on table of artifact.
|
||||
func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
|
||||
if err := gc.init(ctx, params); err != nil {
|
||||
return err
|
||||
}
|
||||
readOnlyCur, err := getReadOnly(gc.cfgMgr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if readOnlyCur != true {
|
||||
if err := setReadOnly(gc.cfgMgr, true); err != nil {
|
||||
return err
|
||||
}
|
||||
defer setReadOnly(gc.cfgMgr, readOnlyCur)
|
||||
}
|
||||
gc.logger.Infof("start to run gc in job.")
|
||||
if err := gc.deleteCandidates(ctx); err != nil {
|
||||
gc.logger.Errorf("failed to delete GC candidates in gc job, with error: %v", err)
|
||||
}
|
||||
gcr, err := gc.registryCtlClient.StartGC()
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to get gc result: %v", err)
|
||||
return err
|
||||
}
|
||||
gc.removeUntaggedBlobs(ctx)
|
||||
if err := gc.cleanCache(); err != nil {
|
||||
return err
|
||||
}
|
||||
gc.logger.Infof("GC results: status: %t, message: %s, start: %s, end: %s.", gcr.Status, gcr.Msg, gcr.StartTime, gcr.EndTime)
|
||||
gc.logger.Infof("success to run gc in job.")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
|
||||
regCtlInit()
|
||||
gc.logger = ctx.GetLogger()
|
||||
gc.deleteSet = make([]*blob_models.Blob, 0)
|
||||
gc.trashedArts = make(map[string][]string, 0)
|
||||
opCmd, flag := ctx.OPCommand()
|
||||
if flag && opCmd.IsStop() {
|
||||
gc.logger.Info("received the stop signal, quit GC job.")
|
||||
@ -166,19 +111,16 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
|
||||
gc.logger.Errorf("failed to start gc as registry controller is unreachable: %v", err)
|
||||
return err
|
||||
}
|
||||
gc.parseParams(params)
|
||||
return nil
|
||||
}
|
||||
|
||||
errTpl := "failed to get required property: %s"
|
||||
if v, ok := ctx.Get(common.CoreURL); ok && len(v.(string)) > 0 {
|
||||
gc.CoreURL = v.(string)
|
||||
} else {
|
||||
return fmt.Errorf(errTpl, common.CoreURL)
|
||||
}
|
||||
secret := os.Getenv("JOBSERVICE_SECRET")
|
||||
configURL := gc.CoreURL + common.CoreConfigPath
|
||||
gc.cfgMgr = config.NewRESTCfgManager(configURL, secret)
|
||||
// parseParams set the parameters according to the GC API call.
|
||||
func (gc *GarbageCollector) parseParams(params job.Parameters) {
|
||||
// redis url
|
||||
gc.redisURL = params["redis_url_reg"].(string)
|
||||
|
||||
// default is to delete the untagged artifact
|
||||
// delete untagged: default is to delete the untagged artifact
|
||||
gc.deleteUntagged = true
|
||||
deleteUntagged, exist := params["delete_untagged"]
|
||||
if exist {
|
||||
@ -186,6 +128,174 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
|
||||
gc.deleteUntagged = untagged
|
||||
}
|
||||
}
|
||||
|
||||
// time window: default is 2 hours, and for testing/debugging, it can be set to 0.
|
||||
gc.timeWindowHours = 2
|
||||
timeWindow, exist := params["time_window"]
|
||||
if exist {
|
||||
if timeWindowHours, ok := timeWindow.(string); ok {
|
||||
str, err := strconv.ParseInt(timeWindowHours, 10, 64)
|
||||
if err != nil {
|
||||
gc.logger.Warningf("wrong type of time windows, set the default value. %v", err)
|
||||
} else {
|
||||
gc.timeWindowHours = str
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dry run: default is false. And for dry run we can have button in the UI.
|
||||
gc.dryRun = false
|
||||
dryRun, exist := params["dry_run"]
|
||||
if exist {
|
||||
if dryRun, ok := dryRun.(bool); ok && dryRun {
|
||||
gc.dryRun = dryRun
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run implements the interface in job/Interface
|
||||
func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
|
||||
if err := gc.init(ctx, params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gc.logger.Infof("start to run gc in job.")
|
||||
|
||||
// mark
|
||||
if err := gc.mark(ctx); err != nil {
|
||||
gc.logger.Errorf("failed to execute GC job at mark phase, error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// sweep
|
||||
if !gc.dryRun {
|
||||
if err := gc.sweep(ctx); err != nil {
|
||||
gc.logger.Errorf("failed to execute GC job at sweep phase, error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := gc.cleanCache(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
gc.logger.Infof("success to run gc in job.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// mark
|
||||
func (gc *GarbageCollector) mark(ctx job.Context) error {
|
||||
arts, err := gc.deletedArt(ctx)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to get deleted Artifacts in gc job, with error: %v", err)
|
||||
return err
|
||||
}
|
||||
// no need to execute GC as there is no removed artifacts.
|
||||
// Do this is to handle if user trigger GC job several times, only one job should do the following logic as artifact trash table is flushed.
|
||||
if len(arts) == 0 {
|
||||
gc.logger.Info("no need to execute GC as there is no removed artifacts.")
|
||||
return nil
|
||||
}
|
||||
gc.trashedArts = arts
|
||||
|
||||
// get gc candidates, and set the repositories.
|
||||
// AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly.
|
||||
if !gc.dryRun {
|
||||
gc.removeUntaggedBlobs(ctx)
|
||||
}
|
||||
blobs, err := gc.blobMgr.UselessBlobs(ctx.SystemContext(), gc.timeWindowHours)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to get gc candidate: %v", err)
|
||||
return err
|
||||
}
|
||||
if len(blobs) == 0 {
|
||||
gc.logger.Info("no need to execute GC as there is no non referenced artifacts.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// update delete status for the candidates.
|
||||
blobCt := 0
|
||||
mfCt := 0
|
||||
for _, blob := range blobs {
|
||||
if !gc.dryRun {
|
||||
blob.Status = blob_models.StatusDelete
|
||||
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
|
||||
if err != nil {
|
||||
gc.logger.Warningf("failed to mark gc candidate, skip it.: %s, error: %v", blob.Digest, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
gc.logger.Infof("blob eligible for deletion: %s", blob.Digest)
|
||||
gc.deleteSet = append(gc.deleteSet, blob)
|
||||
// as table blob has no repository name, here needs to use the ArtifactTrash to fill it in.
|
||||
if blob.IsManifest() {
|
||||
mfCt++
|
||||
} else {
|
||||
blobCt++
|
||||
}
|
||||
}
|
||||
gc.logger.Infof("%d blobs and %d manifests eligible for deletion", blobCt, mfCt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) sweep(ctx job.Context) error {
|
||||
gc.logger = ctx.GetLogger()
|
||||
for _, blob := range gc.deleteSet {
|
||||
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
|
||||
blob.Status = blob_models.StatusDeleting
|
||||
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to mark gc candidate deleting, skip: %s, %s", blob.Digest, blob.Status)
|
||||
continue
|
||||
}
|
||||
|
||||
// remove tags and revisions of a manifest
|
||||
if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() {
|
||||
for _, repo := range gc.trashedArts[blob.Digest] {
|
||||
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
|
||||
gc.logger.Infof("delete the manifest with registry v2 API: %s, %s, %s",
|
||||
repo, blob.ContentType, blob.Digest)
|
||||
if err := v2DeleteManifest(repo, blob.Digest); err != nil {
|
||||
gc.logger.Errorf("failed to delete manifest with v2 API, %s, %s, %v", repo, blob.Digest, err)
|
||||
if err := gc.markDeleteFailed(ctx, blob); err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.Wrapf(err, "failed to delete manifest with v2 API: %s, %s", repo, blob.Digest)
|
||||
}
|
||||
// for manifest, it has to delete the revisions folder of each repository
|
||||
gc.logger.Infof("delete manifest from storage: %s", blob.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.registryCtlClient.DeleteManifest(repo, blob.Digest)
|
||||
}); err != nil {
|
||||
if err := gc.markDeleteFailed(ctx, blob); err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.Wrapf(err, "failed to remove manifest from storage: %s, %s", repo, blob.Digest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete all of blobs, which include config, layer and manifest
|
||||
gc.logger.Infof("delete blob from storage: %s", blob.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.registryCtlClient.DeleteBlob(blob.Digest)
|
||||
}); err != nil {
|
||||
if err := gc.markDeleteFailed(ctx, blob); err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.Wrapf(err, "failed to delete blob from storage: %s, %s", blob.Digest, blob.Status)
|
||||
|
||||
}
|
||||
|
||||
// remove the blob record
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
|
||||
}); err != nil {
|
||||
if err := gc.markDeleteFailed(ctx, blob); err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.Wrapf(err, "failed to delete blob from database: %s, %s", blob.Digest, blob.Status)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -222,10 +332,10 @@ func (gc *GarbageCollector) cleanCache() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteCandidates deletes the two parts of artifact from harbor DB
|
||||
// deletedArt contains the two parts of artifact
|
||||
// 1, required part, the artifacts were removed from Harbor.
|
||||
// 2, optional part, the untagged artifacts.
|
||||
func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
|
||||
func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]string, error) {
|
||||
if os.Getenv("UTTEST") == "true" {
|
||||
gc.logger = ctx.GetLogger()
|
||||
}
|
||||
@ -234,12 +344,15 @@ func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
|
||||
defer func() {
|
||||
if flushTrash {
|
||||
gc.logger.Info("flush artifact trash")
|
||||
if err := gc.artrashMgr.Flush(ctx.SystemContext(), 0); err != nil {
|
||||
if err := gc.artrashMgr.Flush(ctx.SystemContext(), gc.timeWindowHours); err != nil {
|
||||
gc.logger.Errorf("failed to flush artifact trash: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
arts := make([]model.ArtifactTrash, 0)
|
||||
|
||||
// artMap : map[digest : []repo list]
|
||||
artMap := make(map[string][]string)
|
||||
// handle the optional ones, and the artifact controller will move them into trash.
|
||||
if gc.deleteUntagged {
|
||||
untagged, err := gc.artCtl.List(ctx.SystemContext(), &q.Query{
|
||||
@ -248,7 +361,7 @@ func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return artMap, err
|
||||
}
|
||||
gc.logger.Info("start to delete untagged artifact.")
|
||||
for _, art := range untagged {
|
||||
@ -263,22 +376,29 @@ func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
|
||||
gc.logger.Info("end to delete untagged artifact.")
|
||||
}
|
||||
|
||||
// handle the trash
|
||||
required, err := gc.artrashMgr.Filter(ctx.SystemContext(), 0)
|
||||
// filter gets all of deleted artifact, here do not need time window as the manifest candidate has to remove all of its reference.
|
||||
arts, err := gc.artrashMgr.Filter(ctx.SystemContext(), 0)
|
||||
if err != nil {
|
||||
return err
|
||||
return artMap, err
|
||||
}
|
||||
gc.logger.Info("required candidate: %+v", required)
|
||||
for _, art := range required {
|
||||
if err := deleteManifest(art.RepositoryName, art.Digest); err != nil {
|
||||
return fmt.Errorf("failed to delete manifest, %s:%s with error: %v", art.RepositoryName, art.Digest, err)
|
||||
|
||||
// the repositories of blob is needed when to delete as a manifest.
|
||||
for _, art := range arts {
|
||||
_, exist := artMap[art.Digest]
|
||||
if !exist {
|
||||
artMap[art.Digest] = []string{art.RepositoryName}
|
||||
} else {
|
||||
repos := artMap[art.Digest]
|
||||
repos = append(repos, art.RepositoryName)
|
||||
artMap[art.Digest] = repos
|
||||
}
|
||||
gc.logger.Infof("delete the manifest with registry v2 API: RepositoryName(%s)-MediaType:(%s)-Digest:(%s)",
|
||||
art.RepositoryName, art.ManifestMediaType, art.Digest)
|
||||
}
|
||||
gc.logger.Info("end to delete required artifact.")
|
||||
flushTrash = true
|
||||
return nil
|
||||
|
||||
gc.logger.Info("required candidate: %+v", arts)
|
||||
if !gc.dryRun {
|
||||
flushTrash = true
|
||||
}
|
||||
return artMap, nil
|
||||
}
|
||||
|
||||
// clean the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC
|
||||
@ -319,7 +439,8 @@ func (gc *GarbageCollector) removeUntaggedBlobs(ctx job.Context) {
|
||||
|
||||
for project := range projects {
|
||||
all, err := gc.blobMgr.List(ctx.SystemContext(), blob.ListParams{
|
||||
ProjectID: project.ProjectID,
|
||||
ProjectID: project.ProjectID,
|
||||
UpdateTime: time.Now().Add(-time.Duration(gc.timeWindowHours) * time.Hour),
|
||||
})
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to get blobs of project, %v", err)
|
||||
@ -331,3 +452,14 @@ func (gc *GarbageCollector) removeUntaggedBlobs(ctx job.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// markDeleteFailed set the blob status to StatusDeleteFailed
|
||||
func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blob_models.Blob) error {
|
||||
blob.Status = blob_models.StatusDeleteFailed
|
||||
_, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status)
|
||||
return errors.Wrapf(err, "failed to mark gc candidate delete failed: %s, %s", blob.Digest, blob.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1,13 +1,14 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
commom_regctl "github.com/goharbor/harbor/src/common/registryctl"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/artifact"
|
||||
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
|
||||
pkg_blob "github.com/goharbor/harbor/src/pkg/blob/models"
|
||||
htesting "github.com/goharbor/harbor/src/testing"
|
||||
artifacttesting "github.com/goharbor/harbor/src/testing/controller/artifact"
|
||||
projecttesting "github.com/goharbor/harbor/src/testing/controller/project"
|
||||
mockjobservice "github.com/goharbor/harbor/src/testing/jobservice"
|
||||
@ -21,16 +22,14 @@ import (
|
||||
)
|
||||
|
||||
type gcTestSuite struct {
|
||||
suite.Suite
|
||||
htesting.Suite
|
||||
artifactCtl *artifacttesting.Controller
|
||||
artrashMgr *trashtesting.FakeManager
|
||||
registryCtlClient *registryctl.Mockclient
|
||||
projectCtl *projecttesting.Controller
|
||||
blobMgr *blob.Manager
|
||||
|
||||
regCtlInit func()
|
||||
setReadOnly func(cfgMgr *config.CfgManager, switcher bool) error
|
||||
getReadOnly func(cfgMgr *config.CfgManager) (bool, error)
|
||||
regCtlInit func()
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) SetupTest() {
|
||||
@ -41,8 +40,6 @@ func (suite *gcTestSuite) SetupTest() {
|
||||
suite.projectCtl = &projecttesting.Controller{}
|
||||
|
||||
regCtlInit = func() { commom_regctl.RegistryCtlClient = suite.registryCtlClient }
|
||||
setReadOnly = func(cfgMgr *config.CfgManager, switcher bool) error { return nil }
|
||||
getReadOnly = func(cfgMgr *config.CfgManager) (bool, error) { return true, nil }
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) TestMaxFails() {
|
||||
@ -60,7 +57,7 @@ func (suite *gcTestSuite) TestValidate() {
|
||||
suite.Nil(gc.Validate(nil))
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) TestDeleteCandidates() {
|
||||
func (suite *gcTestSuite) TestDeletedArt() {
|
||||
ctx := &mockjobservice.MockJobContext{}
|
||||
logger := &mockjobservice.MockJobLogger{}
|
||||
ctx.On("GetLogger").Return(logger)
|
||||
@ -73,13 +70,22 @@ func (suite *gcTestSuite) TestDeleteCandidates() {
|
||||
},
|
||||
}, nil)
|
||||
suite.artifactCtl.On("Delete").Return(nil)
|
||||
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{}, nil)
|
||||
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{
|
||||
{
|
||||
ID: 1,
|
||||
Digest: suite.DigestString(),
|
||||
ManifestMediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
gc := &GarbageCollector{
|
||||
artCtl: suite.artifactCtl,
|
||||
artrashMgr: suite.artrashMgr,
|
||||
}
|
||||
suite.Nil(gc.deleteCandidates(ctx))
|
||||
|
||||
arts, err := gc.deletedArt(ctx)
|
||||
suite.Nil(err)
|
||||
suite.Equal(1, len(arts))
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) TestRemoveUntaggedBlobs() {
|
||||
@ -127,6 +133,7 @@ func (suite *gcTestSuite) TestInit() {
|
||||
params := map[string]interface{}{
|
||||
"delete_untagged": true,
|
||||
"redis_url_reg": "redis url",
|
||||
"time_window": 1,
|
||||
}
|
||||
suite.Nil(gc.init(ctx, params))
|
||||
suite.True(gc.deleteUntagged)
|
||||
@ -217,7 +224,6 @@ func (suite *gcTestSuite) TestRun() {
|
||||
gc := &GarbageCollector{
|
||||
artCtl: suite.artifactCtl,
|
||||
artrashMgr: suite.artrashMgr,
|
||||
cfgMgr: config.NewInMemoryManager(),
|
||||
projectCtl: suite.projectCtl,
|
||||
blobMgr: suite.blobMgr,
|
||||
registryCtlClient: suite.registryCtlClient,
|
||||
@ -226,11 +232,106 @@ func (suite *gcTestSuite) TestRun() {
|
||||
"delete_untagged": false,
|
||||
// ToDo add a redis testing pkg, we do have a 'localhost' redis server in UT
|
||||
"redis_url_reg": "redis://localhost:6379",
|
||||
"time_window": 1,
|
||||
}
|
||||
|
||||
suite.Nil(gc.Run(ctx, params))
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) TestMark() {
|
||||
ctx := &mockjobservice.MockJobContext{}
|
||||
logger := &mockjobservice.MockJobLogger{}
|
||||
ctx.On("GetLogger").Return(logger)
|
||||
|
||||
suite.artrashMgr.On("Flush").Return(nil)
|
||||
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
|
||||
{
|
||||
ID: 1,
|
||||
RepositoryID: 1,
|
||||
},
|
||||
}, nil)
|
||||
suite.artifactCtl.On("Delete").Return(nil)
|
||||
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{
|
||||
{
|
||||
ID: 1,
|
||||
Digest: suite.DigestString(),
|
||||
ManifestMediaType: schema2.MediaTypeManifest,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
mock.OnAnything(suite.projectCtl, "List").Return([]*models.Project{
|
||||
{
|
||||
ProjectID: 1234,
|
||||
Name: "test GC",
|
||||
},
|
||||
}, nil)
|
||||
|
||||
mock.OnAnything(suite.blobMgr, "List").Return([]*pkg_blob.Blob{
|
||||
{
|
||||
ID: 1234,
|
||||
Digest: suite.DigestString(),
|
||||
Size: 1234,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
mock.OnAnything(suite.blobMgr, "CleanupAssociationsForProject").Return(nil)
|
||||
|
||||
mock.OnAnything(suite.blobMgr, "UselessBlobs").Return([]*pkg_blob.Blob{
|
||||
{
|
||||
ID: 1,
|
||||
Digest: suite.DigestString(),
|
||||
ContentType: schema2.MediaTypeManifest,
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
Digest: suite.DigestString(),
|
||||
ContentType: schema2.MediaTypeLayer,
|
||||
},
|
||||
{
|
||||
ID: 3,
|
||||
Digest: suite.DigestString(),
|
||||
ContentType: schema2.MediaTypeManifest,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
mock.OnAnything(suite.blobMgr, "UpdateBlobStatus").Return(int64(1), nil)
|
||||
|
||||
gc := &GarbageCollector{
|
||||
artCtl: suite.artifactCtl,
|
||||
artrashMgr: suite.artrashMgr,
|
||||
projectCtl: suite.projectCtl,
|
||||
blobMgr: suite.blobMgr,
|
||||
}
|
||||
|
||||
suite.Nil(gc.mark(ctx))
|
||||
}
|
||||
|
||||
func (suite *gcTestSuite) TestSweep() {
|
||||
ctx := &mockjobservice.MockJobContext{}
|
||||
logger := &mockjobservice.MockJobLogger{}
|
||||
ctx.On("GetLogger").Return(logger)
|
||||
|
||||
mock.OnAnything(suite.blobMgr, "UpdateBlobStatus").Return(int64(1), nil)
|
||||
mock.OnAnything(suite.blobMgr, "Delete").Return(nil)
|
||||
|
||||
gc := &GarbageCollector{
|
||||
artCtl: suite.artifactCtl,
|
||||
artrashMgr: suite.artrashMgr,
|
||||
projectCtl: suite.projectCtl,
|
||||
blobMgr: suite.blobMgr,
|
||||
registryCtlClient: suite.registryCtlClient,
|
||||
deleteSet: []*pkg_blob.Blob{
|
||||
{
|
||||
ID: 1,
|
||||
Digest: suite.DigestString(),
|
||||
ContentType: schema2.MediaTypeLayer,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
suite.Nil(gc.sweep(ctx))
|
||||
}
|
||||
|
||||
func TestGCTestSuite(t *testing.T) {
|
||||
os.Setenv("UTTEST", "true")
|
||||
suite.Run(t, &gcTestSuite{})
|
||||
|
@ -3,6 +3,7 @@ package gc
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/pkg/registry"
|
||||
)
|
||||
|
||||
@ -38,8 +39,8 @@ func delKeys(con redis.Conn, pattern string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteManifest calls the registry API to remove manifest
|
||||
func deleteManifest(repository, digest string) error {
|
||||
// v2DeleteManifest calls the registry API to remove manifest
|
||||
func v2DeleteManifest(repository, digest string) error {
|
||||
exist, _, err := registry.Cli.ManifestExist(repository, digest)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -54,3 +55,11 @@ func deleteManifest(repository, digest string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ignoreNotFound ignores the NotFoundErr error
|
||||
func ignoreNotFound(f func() error) error {
|
||||
if err := f(); err != nil && !errors.IsNotFoundErr(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
@ -71,9 +72,9 @@ func (d *dao) Filter(ctx context.Context, cutOff time.Time) (arts []model.Artifa
|
||||
return deletedAfs, err
|
||||
}
|
||||
|
||||
sql := `SELECT aft.* FROM artifact_trash AS aft LEFT JOIN artifact af ON (aft.repository_name=af.repository_name AND aft.digest=af.digest) WHERE (af.digest IS NULL AND af.repository_name IS NULL) AND aft.creation_time <= ?`
|
||||
sql := fmt.Sprintf(`SELECT aft.* FROM artifact_trash AS aft LEFT JOIN artifact af ON (aft.repository_name=af.repository_name AND aft.digest=af.digest) WHERE (af.digest IS NULL AND af.repository_name IS NULL) AND aft.creation_time <= TO_TIMESTAMP('%f')`, float64(cutOff.UnixNano())/float64((time.Second)))
|
||||
|
||||
_, err = ormer.Raw(sql, cutOff).QueryRows(&deletedAfs)
|
||||
_, err = ormer.Raw(sql).QueryRows(&deletedAfs)
|
||||
if err != nil {
|
||||
return deletedAfs, err
|
||||
}
|
||||
@ -87,11 +88,11 @@ func (d *dao) Flush(ctx context.Context, cutOff time.Time) (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sql := `DELETE FROM artifact_trash where creation_time <= ?`
|
||||
sql := fmt.Sprintf(`DELETE FROM artifact_trash where creation_time <= TO_TIMESTAMP('%f')`, float64(cutOff.UnixNano())/float64((time.Second)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = ormer.Raw(sql, cutOff).Exec()
|
||||
_, err = ormer.Raw(sql).Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -16,8 +16,11 @@ package models
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/docker/distribution/manifest/manifestlist"
|
||||
"github.com/docker/distribution/manifest/schema1"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
v1 "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -83,6 +86,12 @@ func (b *Blob) IsForeignLayer() bool {
|
||||
return b.ContentType == schema2.MediaTypeForeignLayer
|
||||
}
|
||||
|
||||
// IsManifest returns true if the blob is manifest layer
|
||||
func (b *Blob) IsManifest() bool {
|
||||
return b.ContentType == schema2.MediaTypeManifest || b.ContentType == schema1.MediaTypeManifest ||
|
||||
b.ContentType == v1.MediaTypeImageManifest || b.ContentType == v1.MediaTypeImageIndex || b.ContentType == manifestlist.MediaTypeManifestList
|
||||
}
|
||||
|
||||
// ProjectBlob alias ProjectBlob model
|
||||
type ProjectBlob = models.ProjectBlob
|
||||
|
||||
|
@ -93,7 +93,7 @@ class System(base.Base):
|
||||
def create_gc_schedule(self, schedule_type, cron = None, expect_status_code = 201, expect_response_body = None, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
|
||||
gc_parameters = {'delete_untagged':True}
|
||||
gc_parameters = {'delete_untagged':True, 'time_window':'0'}
|
||||
|
||||
gc_schedule = swagger_client.AdminJobScheduleObj()
|
||||
gc_schedule.type = schedule_type
|
||||
|
Loading…
Reference in New Issue
Block a user