Support stop GC execution (#17004)

Support stop GC execution

Fixes 16902

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开) 2022-06-17 20:03:24 +08:00 committed by GitHub
parent e9378ea00e
commit 6c515b04d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 31 deletions

View File

@ -4073,6 +4073,26 @@ paths:
$ref: '#/responses/404' $ref: '#/responses/404'
'500': '500':
$ref: '#/responses/500' $ref: '#/responses/500'
put:
summary: Stop the specific GC execution
description: Stop the GC execution specified by ID
tags:
- gc
operationId: stopGC
parameters:
- $ref: '#/parameters/requestId'
- $ref: '#/parameters/gcId'
responses:
'200':
$ref: '#/responses/200'
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/system/gc/{gc_id}/log: /system/gc/{gc_id}/log:
get: get:
summary: Get gc job log. summary: Get gc job log.

View File

@ -36,6 +36,7 @@ import (
var ( var (
regCtlInit = registryctl.Init regCtlInit = registryctl.Init
stopErr = errors.New("stopped")
) )
const ( const (
@ -90,11 +91,7 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
gc.logger = ctx.GetLogger() gc.logger = ctx.GetLogger()
gc.deleteSet = make([]*blobModels.Blob, 0) gc.deleteSet = make([]*blobModels.Blob, 0)
gc.trashedArts = make(map[string][]model.ArtifactTrash, 0) gc.trashedArts = make(map[string][]model.ArtifactTrash, 0)
opCmd, flag := ctx.OPCommand()
if flag && opCmd.IsStop() {
gc.logger.Info("received the stop signal, quit GC job.")
return nil
}
// UT will use the mock client, ctl and mgr // UT will use the mock client, ctl and mgr
if os.Getenv("UTTEST") != "true" { if os.Getenv("UTTEST") != "true" {
gc.registryCtlClient = registryctl.RegistryCtlClient gc.registryCtlClient = registryctl.RegistryCtlClient
@ -156,6 +153,10 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
// mark // mark
if err := gc.mark(ctx); err != nil { if err := gc.mark(ctx); err != nil {
if err == stopErr {
gc.logger.Info("received the stop signal, quit GC job.")
return nil
}
gc.logger.Errorf("failed to execute GC job at mark phase, error: %v", err) gc.logger.Errorf("failed to execute GC job at mark phase, error: %v", err)
return err return err
} }
@ -163,6 +164,11 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
// sweep // sweep
if !gc.dryRun { if !gc.dryRun {
if err := gc.sweep(ctx); err != nil { if err := gc.sweep(ctx); err != nil {
if err == stopErr {
// we may already delete several artifacts before receiving the stop signal, so try to clean up the cache
gc.logger.Info("received the stop signal, quit GC job after cleaning up the cache.")
return gc.cleanCache()
}
gc.logger.Errorf("failed to execute GC job at sweep phase, error: %v", err) gc.logger.Errorf("failed to execute GC job at sweep phase, error: %v", err)
return err return err
} }
@ -190,7 +196,11 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
// get gc candidates, and set the repositories. // get gc candidates, and set the repositories.
// AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly. // AS the reference count is calculated by joining table project_blob and blob, here needs to call removeUntaggedBlobs to remove these non-used blobs from table project_blob firstly.
orphanBlobs := gc.markOrSweepUntaggedBlobs(ctx) orphanBlobs, err := gc.markOrSweepUntaggedBlobs(ctx)
if err != nil {
return err
}
blobs, err := gc.uselessBlobs(ctx) blobs, err := gc.uselessBlobs(ctx)
if err != nil { if err != nil {
gc.logger.Errorf("failed to get gc candidate: %v", err) gc.logger.Errorf("failed to get gc candidate: %v", err)
@ -210,6 +220,9 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
makeSize := int64(0) makeSize := int64(0)
for _, blob := range blobs { for _, blob := range blobs {
if !gc.dryRun { if !gc.dryRun {
if gc.shouldStop(ctx) {
return stopErr
}
blob.Status = blobModels.StatusDelete blob.Status = blobModels.StatusDelete
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil { if err != nil {
@ -245,6 +258,9 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
mfCnt := 0 mfCnt := 0
total := len(gc.deleteSet) total := len(gc.deleteSet)
for i, blob := range gc.deleteSet { for i, blob := range gc.deleteSet {
if gc.shouldStop(ctx) {
return stopErr
}
idx := i + 1 idx := i + 1
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
blob.Status = blobModels.StatusDeleting blob.Status = blobModels.StatusDeleting
@ -389,6 +405,8 @@ func (gc *GarbageCollector) cleanCache() error {
} }
} }
gc.logger.Info("cache clean up completed")
return nil return nil
} }
@ -427,6 +445,9 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti
} }
allTrashedArts = append(allTrashedArts, simulateDeletion) allTrashedArts = append(allTrashedArts, simulateDeletion)
} else { } else {
if gc.shouldStop(ctx) {
return nil, stopErr
}
if err := gc.artCtl.Delete(ctx.SystemContext(), untagged.ID); err != nil { if err := gc.artCtl.Delete(ctx.SystemContext(), untagged.ID); err != nil {
// the failure ones can be GCed by the next execution // the failure ones can be GCed by the next execution
gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", untagged.ID, err) gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", untagged.ID, err)
@ -471,9 +492,12 @@ func (gc *GarbageCollector) deletedArt(ctx job.Context) (map[string][]model.Arti
// mark or sweep the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC // mark or sweep the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC
// * dry-run, find and return the untagged blobs // * dry-run, find and return the untagged blobs
// * non dry-run, remove the reference of the untagged blobs // * non dry-run, remove the reference of the untagged blobs
func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobModels.Blob { func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) ([]*blobModels.Blob, error) {
var orphanBlobs []*blobModels.Blob var orphanBlobs []*blobModels.Blob
for result := range project.ListAll(ctx.SystemContext(), 50, nil, project.Metadata(false)) { for result := range project.ListAll(ctx.SystemContext(), 50, nil, project.Metadata(false)) {
if gc.shouldStop(ctx) {
return nil, stopErr
}
if result.Error != nil { if result.Error != nil {
gc.logger.Errorf("remove untagged blobs for all projects got error: %v", result.Error) gc.logger.Errorf("remove untagged blobs for all projects got error: %v", result.Error)
continue continue
@ -487,6 +511,10 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobMod
} }
for { for {
if gc.shouldStop(ctx) {
gc.logger.Info("received the stop signal, quit GC job.")
return nil, stopErr
}
blobRG := q.Range{ blobRG := q.Range{
Min: lastBlobID, Min: lastBlobID,
} }
@ -526,7 +554,7 @@ func (gc *GarbageCollector) markOrSweepUntaggedBlobs(ctx job.Context) []*blobMod
lastBlobID = blobs[len(blobs)-1].ID lastBlobID = blobs[len(blobs)-1].ID
} }
} }
return orphanBlobs return orphanBlobs, nil
} }
func (gc *GarbageCollector) uselessBlobs(ctx job.Context) ([]*blobModels.Blob, error) { func (gc *GarbageCollector) uselessBlobs(ctx job.Context) ([]*blobModels.Blob, error) {
@ -569,3 +597,11 @@ func (gc *GarbageCollector) markDeleteFailed(ctx job.Context, blob *blobModels.B
} }
return nil return nil
} }
func (gc *GarbageCollector) shouldStop(ctx job.Context) bool {
opCmd, exit := ctx.OPCommand()
if exit && opCmd.IsStop() {
return true
}
return false
}

View File

@ -21,9 +21,10 @@ import (
"github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/manifest/schema2"
commom_regctl "github.com/goharbor/harbor/src/common/registryctl" commom_regctl "github.com/goharbor/harbor/src/common/registryctl"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/project" "github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/artifact" pkgart "github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/pkg/artifactrash/model" "github.com/goharbor/harbor/src/pkg/artifactrash/model"
pkg_blob "github.com/goharbor/harbor/src/pkg/blob/models" pkg_blob "github.com/goharbor/harbor/src/pkg/blob/models"
htesting "github.com/goharbor/harbor/src/testing" htesting "github.com/goharbor/harbor/src/testing"
@ -89,8 +90,10 @@ func (suite *gcTestSuite) TestDeletedArt() {
suite.artifactCtl.On("List").Return([]*artifact.Artifact{ suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{ {
ID: 1, Artifact: pkgart.Artifact{
RepositoryID: 1, ID: 1,
RepositoryID: 1,
},
}, },
}, nil) }, nil)
suite.artifactCtl.On("Delete").Return(nil) suite.artifactCtl.On("Delete").Return(nil)
@ -116,6 +119,7 @@ func (suite *gcTestSuite) TestRemoveUntaggedBlobs() {
ctx := &mockjobservice.MockJobContext{} ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{} logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger) ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, false)
mock.OnAnything(suite.projectCtl, "List").Return([]*proModels.Project{ mock.OnAnything(suite.projectCtl, "List").Return([]*proModels.Project{
{ {
@ -148,7 +152,6 @@ func (suite *gcTestSuite) TestInit() {
logger := &mockjobservice.MockJobLogger{} logger := &mockjobservice.MockJobLogger{}
mock.OnAnything(ctx, "Get").Return("core url", true) mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("GetLogger").Return(logger) ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, true)
gc := &GarbageCollector{ gc := &GarbageCollector{
registryCtlClient: suite.registryCtlClient, registryCtlClient: suite.registryCtlClient,
@ -189,25 +192,22 @@ func (suite *gcTestSuite) TestStop() {
ctx.On("GetLogger").Return(logger) ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.StopCommand, true) ctx.On("OPCommand").Return(job.StopCommand, true)
mock.OnAnything(suite.artifactCtl, "List").Return([]*artifact.Artifact{
{
Artifact: pkgart.Artifact{
ID: 1,
RepositoryID: 1,
},
},
}, nil)
gc := &GarbageCollector{ gc := &GarbageCollector{
registryCtlClient: suite.registryCtlClient, registryCtlClient: suite.registryCtlClient,
artCtl: suite.artifactCtl,
deleteUntagged: true,
} }
params := map[string]interface{}{
"delete_untagged": true,
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
ctx = &mockjobservice.MockJobContext{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("OPCommand").Return(job.StopCommand, false)
suite.Nil(gc.init(ctx, params))
ctx = &mockjobservice.MockJobContext{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("OPCommand").Return(job.NilCommand, true)
suite.Nil(gc.init(ctx, params))
suite.Equal(stopErr, gc.mark(ctx))
} }
func (suite *gcTestSuite) TestRun() { func (suite *gcTestSuite) TestRun() {
@ -219,8 +219,10 @@ func (suite *gcTestSuite) TestRun() {
suite.artifactCtl.On("List").Return([]*artifact.Artifact{ suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{ {
ID: 1, Artifact: pkgart.Artifact{
RepositoryID: 1, ID: 1,
RepositoryID: 1,
},
}, },
}, nil) }, nil)
suite.artifactCtl.On("Delete").Return(nil) suite.artifactCtl.On("Delete").Return(nil)
@ -285,11 +287,14 @@ func (suite *gcTestSuite) TestMark() {
ctx := &mockjobservice.MockJobContext{} ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{} logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger) ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, false)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{ suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{ {
ID: 1, Artifact: pkgart.Artifact{
RepositoryID: 1, ID: 1,
RepositoryID: 1,
},
}, },
}, nil) }, nil)
suite.artifactCtl.On("Delete").Return(nil) suite.artifactCtl.On("Delete").Return(nil)
@ -351,6 +356,7 @@ func (suite *gcTestSuite) TestSweep() {
ctx := &mockjobservice.MockJobContext{} ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{} logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger) ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, false)
mock.OnAnything(suite.blobMgr, "UpdateBlobStatus").Return(int64(1), nil) mock.OnAnything(suite.blobMgr, "UpdateBlobStatus").Return(int64(1), nil)
mock.OnAnything(suite.blobMgr, "Delete").Return(nil) mock.OnAnything(suite.blobMgr, "Delete").Return(nil)

View File

@ -232,3 +232,15 @@ func (g *gcAPI) GetGCLog(ctx context.Context, params operation.GetGCLogParams) m
} }
return operation.NewGetGCLogOK().WithPayload(string(log)) return operation.NewGetGCLogOK().WithPayload(string(log))
} }
func (g *gcAPI) StopGC(ctx context.Context, params operation.StopGCParams) middleware.Responder {
if err := g.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceGarbageCollection); err != nil {
return g.SendError(ctx, err)
}
if err := g.gcCtr.Stop(ctx, params.GCID); err != nil {
return g.SendError(ctx, err)
}
return operation.NewStopGCOK()
}