diff --git a/src/controller/gc/controller.go b/src/controller/gc/controller.go index 7734c9d8a..ab34a6041 100644 --- a/src/controller/gc/controller.go +++ b/src/controller/gc/controller.go @@ -78,6 +78,7 @@ func (c *controller) Start(ctx context.Context, policy Policy, trigger string) ( para := make(map[string]interface{}) para["delete_untagged"] = policy.DeleteUntagged para["dry_run"] = policy.DryRun + para["workers"] = policy.Workers para["redis_url_reg"] = policy.ExtraAttrs["redis_url_reg"] para["time_window"] = policy.ExtraAttrs["time_window"] @@ -233,6 +234,7 @@ func convertTask(task *task.Task) *Task { RunCount: task.RunCount, DeleteUntagged: task.GetBoolFromExtraAttrs("delete_untagged"), DryRun: task.GetBoolFromExtraAttrs("dry_run"), + Workers: int(task.GetNumFromExtraAttrs("workers")), JobID: task.JobID, CreationTime: task.CreationTime, StartTime: task.StartTime, diff --git a/src/controller/gc/model.go b/src/controller/gc/model.go index 39ff50e95..f94dcd5aa 100644 --- a/src/controller/gc/model.go +++ b/src/controller/gc/model.go @@ -23,6 +23,7 @@ type Policy struct { Trigger *Trigger `json:"trigger"` DeleteUntagged bool `json:"deleteuntagged"` DryRun bool `json:"dryrun"` + Workers int `json:"workers"` ExtraAttrs map[string]interface{} `json:"extra_attrs"` } @@ -60,6 +61,7 @@ type Task struct { RunCount int32 DeleteUntagged bool DryRun bool + Workers int JobID string CreationTime time.Time StartTime time.Time diff --git a/src/go.mod b/src/go.mod index 04d4529db..a51c7130a 100644 --- a/src/go.mod +++ b/src/go.mod @@ -162,6 +162,7 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.0 // indirect + golang.org/x/sync v0.3.0 golang.org/x/sys v0.7.0 // indirect golang.org/x/term v0.7.0 // indirect google.golang.org/api v0.110.0 // indirect diff --git a/src/go.sum b/src/go.sum index 47331d874..c5fef8f52 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1521,6 +1521,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/src/jobservice/job/impl/gc/garbage_collection.go b/src/jobservice/job/impl/gc/garbage_collection.go index 615f9e6c7..df9f89901 100644 --- a/src/jobservice/job/impl/gc/garbage_collection.go +++ b/src/jobservice/job/impl/gc/garbage_collection.go @@ -17,8 +17,12 @@ package gc import ( "encoding/json" "os" + "sync/atomic" "time" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" + "github.com/goharbor/harbor/src/common/registryctl" "github.com/goharbor/harbor/src/controller/artifact" "github.com/goharbor/harbor/src/controller/project" @@ -66,6 +70,7 @@ type GarbageCollector struct { // hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep. deleteSet []*blobModels.Blob timeWindowHours int64 + workers int } // MaxFails implements the interface in job/Interface @@ -141,8 +146,19 @@ func (gc *GarbageCollector) parseParams(params job.Parameters) { } } - gc.logger.Infof("Garbage Collection parameters: [delete_untagged: %t, dry_run: %t, time_window: %d]", - gc.deleteUntagged, gc.dryRun, gc.timeWindowHours) + // gc workers: default is 1. The business unit of removing blobs. + gc.workers = 1 + ws, exist := params["workers"] + if exist { + if workers, ok := ws.(float64); ok { + if int(workers) > 0 { + gc.workers = int(workers) + } + } + } + + gc.logger.Infof("Garbage Collection parameters: [delete_untagged: %t, dry_run: %t, time_window: %d, workers: %d]", + gc.deleteUntagged, gc.dryRun, gc.timeWindowHours, gc.workers) } // Run implements the interface in job/Interface @@ -220,6 +236,7 @@ func (gc *GarbageCollector) mark(ctx job.Context) error { blobCt := 0 mfCt := 0 makeSize := int64(0) + for _, blob := range blobs { if !gc.dryRun { if gc.shouldStop(ctx) { @@ -259,159 +276,198 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error { blobCnt := int64(0) mfCnt := int64(0) total := len(gc.deleteSet) - for i, blob := range gc.deleteSet { - if gc.shouldStop(ctx) { - return errGcStop - } - idx := i + 1 - // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. - blob.Status = blobModels.StatusDeleting - count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) - if err != nil { - gc.logger.Errorf("[%d/%d] failed to mark gc candidate deleting, skip: %s, %s", idx, total, blob.Digest, blob.Status) - continue - } - if count == 0 { - gc.logger.Warningf("[%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", idx, total, blob.ID, blob.Digest) - continue - } - // remove tags and revisions of a manifest - skippedBlob := false - if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() { - for _, art := range gc.trashedArts[blob.Digest] { - // Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them. - gc.logger.Infof("[%d/%d] delete the manifest with registry v2 API: %s, %s, %s", - idx, total, art.RepositoryName, blob.ContentType, blob.Digest) - if err := retry.Retry(func() error { - return ignoreNotFound(func() error { - err := v2DeleteManifest(art.RepositoryName, blob.Digest) - // if the system is in read-only mode, return an Abort error to skip retrying - if err == readonly.Err { - return retry.Abort(err) - } - return err - }) - }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", idx, total, err, sleep) - })); err != nil { - gc.logger.Errorf("[%d/%d] failed to delete manifest with v2 API, %s, %s, %v", idx, total, art.RepositoryName, blob.Digest, err) - if err := ignoreNotFound(func() error { - return gc.markDeleteFailed(ctx, blob) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", idx, total, blob.Digest, err) - return err - } - // if the system is set to read-only mode, return directly - if err == readonly.Err { - return err - } - skippedBlob = true - continue - } - // for manifest, it has to delete the revisions folder of each repository - gc.logger.Infof("[%d/%d] delete manifest from storage: %s", idx, total, blob.Digest) - if err := retry.Retry(func() error { - return ignoreNotFound(func() error { - err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest) - // if the system is in read-only mode, return an Abort error to skip retrying - if err == readonly.Err { - return retry.Abort(err) - } - return err - }) - }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", idx, total, err, sleep) - })); err != nil { - gc.logger.Errorf("[%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", idx, total, art.RepositoryName, blob.Digest, err) - if err := ignoreNotFound(func() error { - return gc.markDeleteFailed(ctx, blob) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", idx, total, art.RepositoryName, blob.Digest, err) - return err - } - // if the system is set to read-only mode, return directly - if err == readonly.Err { - return err - } - skippedBlob = true - continue - } - - gc.logger.Infof("[%d/%d] delete artifact blob record from database: %d, %s, %s", idx, total, art.ID, art.RepositoryName, art.Digest) - if err := ignoreNotFound(func() error { - return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", idx, total, art.Digest, err) - return err - } - - gc.logger.Infof("[%d/%d] delete artifact trash record from database: %d, %s, %s", idx, total, art.ID, art.RepositoryName, art.Digest) - if err := ignoreNotFound(func() error { - return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", idx, total, art.ID, err) - return err - } - } - } - - // skip deleting the blob if the manifest's tag/revision is not deleted - if skippedBlob { - continue - } - - // delete all of blobs, which include config, layer and manifest - // for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record. - if !blob.IsForeignLayer() { - gc.logger.Infof("[%d/%d] delete blob from storage: %s", idx, total, blob.Digest) - if err := retry.Retry(func() error { - return ignoreNotFound(func() error { - err := gc.registryCtlClient.DeleteBlob(blob.Digest) - // if the system is in read-only mode, return an Abort error to skip retrying - if err == readonly.Err { - return retry.Abort(err) - } - return err - }) - }, retry.Callback(func(err error, sleep time.Duration) { - gc.logger.Infof("[%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", idx, total, err, sleep) - })); err != nil { - gc.logger.Errorf("[%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", idx, total, blob.Digest, blob.Status, err) - if err := ignoreNotFound(func() error { - return gc.markDeleteFailed(ctx, blob) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", idx, total, blob.Digest, err) - return err - } - // if the system is set to read-only mode, return directly - if err == readonly.Err { - return err - } - continue - } - sweepSize = sweepSize + blob.Size - } - - gc.logger.Infof("[%d/%d] delete blob record from database: %d, %s", idx, total, blob.ID, blob.Digest) - if err := ignoreNotFound(func() error { - return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", idx, total, blob.Digest, blob.Status, err) - if err := ignoreNotFound(func() error { - return gc.markDeleteFailed(ctx, blob) - }); err != nil { - gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", idx, total, blob.ID, blob.Digest, err) - return err - } - return err - } - if blob.IsManifest() { - mfCnt++ - } else { - blobCnt++ - } + // split the full set into pieces (count workers) + if total <= 0 || gc.workers <= 0 { + return nil } + blobChunkSize, err := divide(total, gc.workers) + if err != nil { + return err + } + blobChunkCount := (total + blobChunkSize - 1) / blobChunkSize + blobChunks := make([][]*blobModels.Blob, blobChunkCount) + for i, start := 0, 0; i < blobChunkCount; i, start = i+1, start+blobChunkSize { + end := start + blobChunkSize + if end > total { + end = total + } + blobChunks[i] = gc.deleteSet[start:end] + } + + g := new(errgroup.Group) + g.SetLimit(gc.workers) + index := int64(0) + for _, blobChunk := range blobChunks { + blobChunk := blobChunk + g.Go(func() error { + uid := uuid.New().String() + for _, blob := range blobChunk { + if gc.shouldStop(ctx) { + return errGcStop + } + + atomic.AddInt64(&index, 1) + index := atomic.LoadInt64(&index) + + // set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip. + blob.Status = blobModels.StatusDeleting + count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob) + if err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, index, total, blob.Digest, blob.Status) + continue + } + if count == 0 { + gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, index, total, blob.ID, blob.Digest) + continue + } + + // remove tags and revisions of a manifest + skippedBlob := false + if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() { + for _, art := range gc.trashedArts[blob.Digest] { + // Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them. + gc.logger.Infof("[%s][%d/%d] delete the manifest with registry v2 API: %s, %s, %s", + uid, index, total, art.RepositoryName, blob.ContentType, blob.Digest) + if err := retry.Retry(func() error { + return ignoreNotFound(func() error { + err := v2DeleteManifest(art.RepositoryName, blob.Digest) + // if the system is in read-only mode, return an Abort error to skip retrying + if err == readonly.Err { + return retry.Abort(err) + } + return err + }) + }, retry.Callback(func(err error, sleep time.Duration) { + gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep) + })); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err) + if err := ignoreNotFound(func() error { + return gc.markDeleteFailed(ctx, blob) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, index, total, blob.Digest, err) + return err + } + // if the system is set to read-only mode, return directly + if err == readonly.Err { + return err + } + skippedBlob = true + continue + } + // for manifest, it has to delete the revisions folder of each repository + gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, index, total, blob.Digest) + if err := retry.Retry(func() error { + return ignoreNotFound(func() error { + err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest) + // if the system is in read-only mode, return an Abort error to skip retrying + if err == readonly.Err { + return retry.Abort(err) + } + return err + }) + }, retry.Callback(func(err error, sleep time.Duration) { + gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep) + })); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, index, total, art.RepositoryName, blob.Digest, err) + if err := ignoreNotFound(func() error { + return gc.markDeleteFailed(ctx, blob) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err) + return err + } + // if the system is set to read-only mode, return directly + if err == readonly.Err { + return err + } + skippedBlob = true + continue + } + + gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest) + if err := ignoreNotFound(func() error { + return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, index, total, art.Digest, err) + return err + } + + gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest) + if err := ignoreNotFound(func() error { + return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, index, total, art.ID, err) + return err + } + } + } + + // skip deleting the blob if the manifest's tag/revision is not deleted + if skippedBlob { + continue + } + + // delete all the blobs, which include config, layer and manifest + // for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record. + if !blob.IsForeignLayer() { + gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, index, total, blob.Digest) + if err := retry.Retry(func() error { + return ignoreNotFound(func() error { + err := gc.registryCtlClient.DeleteBlob(blob.Digest) + // if the system is in read-only mode, return an Abort error to skip retrying + if err == readonly.Err { + return retry.Abort(err) + } + return err + }) + }, retry.Callback(func(err error, sleep time.Duration) { + gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, index, total, err, sleep) + })); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err) + if err := ignoreNotFound(func() error { + return gc.markDeleteFailed(ctx, blob) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, index, total, blob.Digest, err) + return err + } + // if the system is set to read-only mode, return directly + if err == readonly.Err { + return err + } + continue + } + atomic.AddInt64(&sweepSize, blob.Size) + } + + gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, index, total, blob.ID, blob.Digest) + if err := ignoreNotFound(func() error { + return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err) + if err := ignoreNotFound(func() error { + return gc.markDeleteFailed(ctx, blob) + }); err != nil { + gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, index, total, blob.ID, blob.Digest, err) + return err + } + return err + } + + if blob.IsManifest() { + atomic.AddInt64(&mfCnt, 1) + } else { + atomic.AddInt64(&blobCnt, 1) + } + } + return nil + }) + } + + if err := g.Wait(); err != nil { + gc.logger.Errorf("failed to execute mark(), error out, %v", err) + return err + } + gc.logger.Infof("%d blobs and %d manifests are actually deleted", blobCnt, mfCnt) gc.logger.Infof("The GC job actual frees up %d MB space.", sweepSize/1024/1024) diff --git a/src/jobservice/job/impl/gc/garbage_collection_test.go b/src/jobservice/job/impl/gc/garbage_collection_test.go index 3c78be423..124cff7f9 100644 --- a/src/jobservice/job/impl/gc/garbage_collection_test.go +++ b/src/jobservice/job/impl/gc/garbage_collection_test.go @@ -161,9 +161,11 @@ func (suite *gcTestSuite) TestInit() { "delete_untagged": true, "redis_url_reg": "redis url", "time_window": 1, + "workers": float64(3), } suite.Nil(gc.init(ctx, params)) suite.True(gc.deleteUntagged) + suite.Equal(3, gc.workers) params = map[string]interface{}{ "delete_untagged": "unsupported", @@ -279,6 +281,7 @@ func (suite *gcTestSuite) TestRun() { "delete_untagged": false, "redis_url_reg": tests.GetRedisURL(), "time_window": 1, + "workers": 3, } suite.Nil(gc.Run(ctx, params)) @@ -375,6 +378,7 @@ func (suite *gcTestSuite) TestSweep() { ContentType: schema2.MediaTypeLayer, }, }, + workers: 3, } suite.Nil(gc.sweep(ctx)) diff --git a/src/jobservice/job/impl/gc/util.go b/src/jobservice/job/impl/gc/util.go index 6f92d4274..1c16aa382 100644 --- a/src/jobservice/job/impl/gc/util.go +++ b/src/jobservice/job/impl/gc/util.go @@ -76,3 +76,18 @@ func ignoreNotFound(f func() error) error { } return nil } + +// divide if it is divisible, it gives the quotient. if it's not, it gives the remainder. +func divide(a, b int) (int, error) { + if b == 0 { + return 0, errors.New("the divided cannot be zero") + } + + quotient := a / b + remainder := a % b + + if quotient == 0 { + return remainder, nil + } + return quotient, nil +} diff --git a/src/jobservice/job/impl/gc/util_test.go b/src/jobservice/job/impl/gc/util_test.go new file mode 100644 index 000000000..773a42d6f --- /dev/null +++ b/src/jobservice/job/impl/gc/util_test.go @@ -0,0 +1,50 @@ +package gc + +import ( + "github.com/goharbor/harbor/src/lib/errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIgnoreNotFound(t *testing.T) { + var f = func() error { + return nil + } + assert.Nil(t, ignoreNotFound(f)) + f = func() error { + return errors.New(nil).WithMessage("my error") + } + assert.NotNil(t, ignoreNotFound(f)) + f = func() error { + return errors.New(nil).WithMessage("my error").WithCode(errors.BadRequestCode) + } + assert.NotNil(t, ignoreNotFound(f)) + f = func() error { + return errors.New(nil).WithMessage("my error").WithCode(errors.NotFoundCode) + } + assert.Nil(t, ignoreNotFound(f)) +} + +func TestDivide(t *testing.T) { + var result int + var err error + result, err = divide(1, 10) + assert.Nil(t, err) + assert.Equal(t, 1, result) + + result, err = divide(5, 10) + assert.Nil(t, err) + assert.Equal(t, 5, result) + + result, err = divide(30, 10) + assert.Nil(t, err) + assert.Equal(t, 3, result) + + result, err = divide(33, 10) + assert.Nil(t, err) + assert.Equal(t, 3, result) + + result, err = divide(33, 0) + assert.NotNil(t, err) +} diff --git a/src/server/v2.0/handler/gc.go b/src/server/v2.0/handler/gc.go index b52ba2157..7be005b9d 100644 --- a/src/server/v2.0/handler/gc.go +++ b/src/server/v2.0/handler/gc.go @@ -99,6 +99,17 @@ func (g *gcAPI) kick(ctx context.Context, scheType string, cron string, paramete if deleteUntagged, ok := parameters["delete_untagged"].(bool); ok { policy.DeleteUntagged = deleteUntagged } + if workers, ok := parameters["workers"].(json.Number); ok { + wInt, err := workers.Int64() + if err != nil { + return 0, errors.BadRequestError(fmt.Errorf("workers should be integer format")) + } + if !validateWorkers(int(wInt)) { + return 0, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("Error: Invalid number of workers:%s. Workers must be greater than 0 and less than or equal to 5.", workers) + } + policy.Workers = int(wInt) + } + id, err = g.gcCtr.Start(ctx, policy, task.ExecutionTriggerManual) case ScheduleNone: err = g.gcCtr.DeleteSchedule(ctx) @@ -112,6 +123,16 @@ func (g *gcAPI) kick(ctx context.Context, scheType string, cron string, paramete if deleteUntagged, ok := parameters["delete_untagged"].(bool); ok { policy.DeleteUntagged = deleteUntagged } + if workers, ok := parameters["workers"].(json.Number); ok { + wInt, err := workers.Int64() + if err != nil { + return 0, errors.BadRequestError(fmt.Errorf("workers should be integer format")) + } + if !validateWorkers(int(wInt)) { + return 0, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("Error: Invalid number of workers:%s. Workers must be greater than 0 and less than or equal to 5.", workers) + } + policy.Workers = int(wInt) + } err = g.updateSchedule(ctx, scheType, cron, policy) } return id, err @@ -260,3 +281,10 @@ func (g *gcAPI) StopGC(ctx context.Context, params operation.StopGCParams) middl return operation.NewStopGCOK() } + +func validateWorkers(workers int) bool { + if workers <= 0 || workers > 5 { + return false + } + return true +} diff --git a/src/server/v2.0/handler/gc_test.go b/src/server/v2.0/handler/gc_test.go new file mode 100644 index 000000000..af5a4fc7d --- /dev/null +++ b/src/server/v2.0/handler/gc_test.go @@ -0,0 +1,15 @@ +package handler + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateWorkers(t *testing.T) { + assert.False(t, validateWorkers(0)) + assert.False(t, validateWorkers(10)) + assert.False(t, validateWorkers(-1)) + assert.True(t, validateWorkers(1)) + assert.True(t, validateWorkers(5)) +} diff --git a/src/vendor/golang.org/x/sync/LICENSE b/src/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/src/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/vendor/golang.org/x/sync/PATENTS b/src/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/src/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/src/vendor/golang.org/x/sync/errgroup/errgroup.go b/src/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000..b18efb743 --- /dev/null +++ b/src/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,132 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := withCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/src/vendor/golang.org/x/sync/errgroup/go120.go b/src/vendor/golang.org/x/sync/errgroup/go120.go new file mode 100644 index 000000000..7d419d376 --- /dev/null +++ b/src/vendor/golang.org/x/sync/errgroup/go120.go @@ -0,0 +1,14 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.20 +// +build go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + return context.WithCancelCause(parent) +} diff --git a/src/vendor/golang.org/x/sync/errgroup/pre_go120.go b/src/vendor/golang.org/x/sync/errgroup/pre_go120.go new file mode 100644 index 000000000..1795c18ac --- /dev/null +++ b/src/vendor/golang.org/x/sync/errgroup/pre_go120.go @@ -0,0 +1,15 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !go1.20 +// +build !go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + ctx, cancel := context.WithCancel(parent) + return ctx, func(error) { cancel() } +} diff --git a/src/vendor/modules.txt b/src/vendor/modules.txt index 35b0e2f1e..408846e62 100644 --- a/src/vendor/modules.txt +++ b/src/vendor/modules.txt @@ -696,6 +696,9 @@ golang.org/x/oauth2/google/internal/externalaccount golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt +# golang.org/x/sync v0.3.0 +## explicit; go 1.17 +golang.org/x/sync/errgroup # golang.org/x/sys v0.7.0 ## explicit; go 1.17 golang.org/x/sys/internal/unsafeheader