From 6c515b04d4c5b012af9566510092593b1a0b7893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Fri, 17 Jun 2022 20:03:24 +0800 Subject: [PATCH] Support stop GC execution (#17004) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Support stop GC execution Fixes 16902 Signed-off-by: Wenkai Yin(尹文开) --- api/v2.0/swagger.yaml | 20 +++++++ .../job/impl/gc/garbage_collection.go | 52 ++++++++++++++++--- .../job/impl/gc/garbage_collection_test.go | 52 +++++++++++-------- src/server/v2.0/handler/gc.go | 12 +++++ 4 files changed, 105 insertions(+), 31 deletions(-) diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 54a4dc566..d0087bae8 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -4073,6 +4073,26 @@ paths: $ref: '#/responses/404' '500': $ref: '#/responses/500' + put: + summary: Stop the specific GC execution + description: Stop the GC execution specified by ID + tags: + - gc + operationId: stopGC + parameters: + - $ref: '#/parameters/requestId' + - $ref: '#/parameters/gcId' + responses: + '200': + $ref: '#/responses/200' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' /system/gc/{gc_id}/log: get: summary: Get gc job log. diff --git a/src/jobservice/job/impl/gc/garbage_collection.go b/src/jobservice/job/impl/gc/garbage_collection.go index 79752d18f..5260032e4 100644 --- a/src/jobservice/job/impl/gc/garbage_collection.go +++ b/src/jobservice/job/impl/gc/garbage_collection.go @@ -36,6 +36,7 @@ import ( var ( regCtlInit = registryctl.Init + stopErr = errors.New("stopped") ) const ( @@ -90,11 +91,7 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error { gc.logger = ctx.GetLogger() gc.deleteSet = make([]*blobModels.Blob, 0) gc.trashedArts = make(map[string][]model.ArtifactTrash, 0) - opCmd, flag := ctx.OPCommand() - if flag && opCmd.IsStop() { - gc.logger.Info("received the stop signal, quit GC job.") - return nil - } + // UT will use the mock client, ctl and mgr if os.Getenv("UTTEST") != "true" { gc.registryCtlClient = registryctl.RegistryCtlClient @@ -156,6 +153,10 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error { // mark if err := gc.mark(ctx); err != nil { + if err == stopErr { + gc.logger.Info("received the stop signal, quit GC job.") + return nil + } gc.logger.Errorf("failed to execute GC job at mark phase, error: %v", err) return err } @@ -163,6 +164,11 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error { // sweep if !gc.dryRun { if err := gc.sweep(ctx); err != nil { + if err == stopErr { + // we may already delete several artifacts before receiving the stop signal, so try to clean up the cache + gc.logger.Info("received the stop signal, quit GC job after cleaning up the cache.") + return gc.cleanCache() + } gc.logger.Errorf("failed to execute GC job at sweep phase, error: %v", err) return err } @@ -190,7 +196,11 @@ func (gc *GarbageCollector) mark(ctx job.Context) error { // get gc candidates, and set the repositories. // AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly. - orphanBlobs := gc.markOrSweepUntaggedBlobs(ctx) + orphanBlobs, err := gc.markOrSweepUntaggedBlobs(ctx) + if err != nil { + return err + } + blobs, err := gc.uselessBlobs(ctx) if err != nil { gc.logger.Errorf("failed to get gc candidate: %v", err) @@ -210,6 +220,9 @@ func (gc *GarbageCollector) mark(ctx job.Context) error { makeSize := int64(0) for _, blob := range blobs { if !gc.dryRun { + if gc.shouldStop(ctx) { + return stopErr + } blob.Status = blobModels.StatusDelete count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) if err != nil { @@ -245,6 +258,9 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { mfCnt := 0 total := len(gc.deleteSet) for i, blob := range gc.deleteSet { + if gc.shouldStop(ctx) { + return stopErr + } idx := i + 1 // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. blob.Status = blobModels.StatusDeleting @@ -389,6 +405,8 @@ func (gc *GarbageCollector) cleanCache() error { } } + gc.logger.Info("cache clean up completed") + return nil } @@ -427,6 +445,9 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti } allTrashedArts = append(allTrashedArts, simulateDeletion) } else { + if gc.shouldStop(ctx) { + return nil, stopErr + } if err := gc.artCtl.Delete(ctx.SystemContext(), untagged.ID); err != nil { // the failure ones can be GCed by the next execution gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", untagged.ID, err) @@ -471,9 +492,12 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti // mark or sweep the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC // * dry-run, find and return the untagged blobs // * non dry-run, remove the reference of the untagged blobs -func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobModels.Blob { +func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) ([]*blobModels.Blob, error) { var orphanBlobs []*blobModels.Blob for result := range project.ListAll(ctx.SystemContext(), 50, nil, project.Metadata(false)) { + if gc.shouldStop(ctx) { + return nil, stopErr + } if result.Error != nil { gc.logger.Errorf("remove untagged blobs for all projects got error: %v", result.Error) continue @@ -487,6 +511,10 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobMod } for { + if gc.shouldStop(ctx) { + gc.logger.Info("received the stop signal, quit GC job.") + return nil, stopErr + } blobRG := q.Range{ Min: lastBlobID, } @@ -526,7 +554,7 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobMod lastBlobID = blobs[len(blobs)-1].ID } } - return orphanBlobs + return orphanBlobs, nil } func (gc *GarbageCollector) uselessBlobs(ctx job.Context) ([]*blobModels.Blob, error) { @@ -569,3 +597,11 @@ func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blobModels.B } return nil } + +func (gc *GarbageCollector) shouldStop(ctx job.Context) bool { + opCmd, exit := ctx.OPCommand() + if exit && opCmd.IsStop() { + return true + } + return false +} diff --git a/src/jobservice/job/impl/gc/garbage_collection_test.go b/src/jobservice/job/impl/gc/garbage_collection_test.go index 340af8d19..64d6c8f8e 100644 --- a/src/jobservice/job/impl/gc/garbage_collection_test.go +++ b/src/jobservice/job/impl/gc/garbage_collection_test.go @@ -21,9 +21,10 @@ import ( "github.com/docker/distribution/manifest/schema2" commom_regctl "github.com/goharbor/harbor/src/common/registryctl" + "github.com/goharbor/harbor/src/controller/artifact" "github.com/goharbor/harbor/src/controller/project" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/pkg/artifact" + pkgart "github.com/goharbor/harbor/src/pkg/artifact" "github.com/goharbor/harbor/src/pkg/artifactrash/model" pkg_blob "github.com/goharbor/harbor/src/pkg/blob/models" htesting "github.com/goharbor/harbor/src/testing" @@ -89,8 +90,10 @@ func (suite *gcTestSuite) TestDeletedArt() { suite.artifactCtl.On("List").Return([]*artifact.Artifact{ { - ID: 1, - RepositoryID: 1, + Artifact: pkgart.Artifact{ + ID: 1, + RepositoryID: 1, + }, }, }, nil) suite.artifactCtl.On("Delete").Return(nil) @@ -116,6 +119,7 @@ func (suite *gcTestSuite) TestRemoveUntaggedBlobs() { ctx := &mockjobservice.MockJobContext{} logger := &mockjobservice.MockJobLogger{} ctx.On("GetLogger").Return(logger) + ctx.On("OPCommand").Return(job.NilCommand, false) mock.OnAnything(suite.projectCtl, "List").Return([]*proModels.Project{ { @@ -148,7 +152,6 @@ func (suite *gcTestSuite) TestInit() { logger := &mockjobservice.MockJobLogger{} mock.OnAnything(ctx, "Get").Return("core url", true) ctx.On("GetLogger").Return(logger) - ctx.On("OPCommand").Return(job.NilCommand, true) gc := &GarbageCollector{ registryCtlClient: suite.registryCtlClient, @@ -189,25 +192,22 @@ func (suite *gcTestSuite) TestStop() { ctx.On("GetLogger").Return(logger) ctx.On("OPCommand").Return(job.StopCommand, true) + mock.OnAnything(suite.artifactCtl, "List").Return([]*artifact.Artifact{ + { + Artifact: pkgart.Artifact{ + ID: 1, + RepositoryID: 1, + }, + }, + }, nil) + gc := &GarbageCollector{ registryCtlClient: suite.registryCtlClient, + artCtl: suite.artifactCtl, + deleteUntagged: true, } - params := map[string]interface{}{ - "delete_untagged": true, - "redis_url_reg": "redis url", - } - suite.Nil(gc.init(ctx, params)) - - ctx = &mockjobservice.MockJobContext{} - mock.OnAnything(ctx, "Get").Return("core url", true) - ctx.On("OPCommand").Return(job.StopCommand, false) - suite.Nil(gc.init(ctx, params)) - - ctx = &mockjobservice.MockJobContext{} - mock.OnAnything(ctx, "Get").Return("core url", true) - ctx.On("OPCommand").Return(job.NilCommand, true) - suite.Nil(gc.init(ctx, params)) + suite.Equal(stopErr, gc.mark(ctx)) } func (suite *gcTestSuite) TestRun() { @@ -219,8 +219,10 @@ func (suite *gcTestSuite) TestRun() { suite.artifactCtl.On("List").Return([]*artifact.Artifact{ { - ID: 1, - RepositoryID: 1, + Artifact: pkgart.Artifact{ + ID: 1, + RepositoryID: 1, + }, }, }, nil) suite.artifactCtl.On("Delete").Return(nil) @@ -285,11 +287,14 @@ func (suite *gcTestSuite) TestMark() { ctx := &mockjobservice.MockJobContext{} logger := &mockjobservice.MockJobLogger{} ctx.On("GetLogger").Return(logger) + ctx.On("OPCommand").Return(job.NilCommand, false) suite.artifactCtl.On("List").Return([]*artifact.Artifact{ { - ID: 1, - RepositoryID: 1, + Artifact: pkgart.Artifact{ + ID: 1, + RepositoryID: 1, + }, }, }, nil) suite.artifactCtl.On("Delete").Return(nil) @@ -351,6 +356,7 @@ func (suite *gcTestSuite) TestSweep() { ctx := &mockjobservice.MockJobContext{} logger := &mockjobservice.MockJobLogger{} ctx.On("GetLogger").Return(logger) + ctx.On("OPCommand").Return(job.NilCommand, false) mock.OnAnything(suite.blobMgr, "UpdateBlobStatus").Return(int64(1), nil) mock.OnAnything(suite.blobMgr, "Delete").Return(nil) diff --git a/src/server/v2.0/handler/gc.go b/src/server/v2.0/handler/gc.go index a93216cbb..16e200f2f 100644 --- a/src/server/v2.0/handler/gc.go +++ b/src/server/v2.0/handler/gc.go @@ -232,3 +232,15 @@ func (g *gcAPI) GetGCLog(ctx context.Context, params operation.GetGCLogParams) m } return operation.NewGetGCLogOK().WithPayload(string(log)) } + +func (g *gcAPI) StopGC(ctx context.Context, params operation.StopGCParams) middleware.Responder { + if err := g.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceGarbageCollection); err != nil { + return g.SendError(ctx, err) + } + + if err := g.gcCtr.Stop(ctx, params.GCID); err != nil { + return g.SendError(ctx, err) + } + + return operation.NewStopGCOK() +}