mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 18:25:56 +01:00
add multiple deletion of GC (#18855)
User can specify the workers when to issue an GC execution, the maxium count of workers is 5. Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
parent
02a1c417d4
commit
7435c8c5ab
@ -78,6 +78,7 @@ func (c *controller) Start(ctx context.Context, policy Policy, trigger string) (
|
||||
para := make(map[string]interface{})
|
||||
para["delete_untagged"] = policy.DeleteUntagged
|
||||
para["dry_run"] = policy.DryRun
|
||||
para["workers"] = policy.Workers
|
||||
para["redis_url_reg"] = policy.ExtraAttrs["redis_url_reg"]
|
||||
para["time_window"] = policy.ExtraAttrs["time_window"]
|
||||
|
||||
@ -233,6 +234,7 @@ func convertTask(task *task.Task) *Task {
|
||||
RunCount: task.RunCount,
|
||||
DeleteUntagged: task.GetBoolFromExtraAttrs("delete_untagged"),
|
||||
DryRun: task.GetBoolFromExtraAttrs("dry_run"),
|
||||
Workers: int(task.GetNumFromExtraAttrs("workers")),
|
||||
JobID: task.JobID,
|
||||
CreationTime: task.CreationTime,
|
||||
StartTime: task.StartTime,
|
||||
|
@ -23,6 +23,7 @@ type Policy struct {
|
||||
Trigger *Trigger `json:"trigger"`
|
||||
DeleteUntagged bool `json:"deleteuntagged"`
|
||||
DryRun bool `json:"dryrun"`
|
||||
Workers int `json:"workers"`
|
||||
ExtraAttrs map[string]interface{} `json:"extra_attrs"`
|
||||
}
|
||||
|
||||
@ -60,6 +61,7 @@ type Task struct {
|
||||
RunCount int32
|
||||
DeleteUntagged bool
|
||||
DryRun bool
|
||||
Workers int
|
||||
JobID string
|
||||
CreationTime time.Time
|
||||
StartTime time.Time
|
||||
|
@ -162,6 +162,7 @@ require (
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.6.0 // indirect
|
||||
go.uber.org/zap v1.19.0 // indirect
|
||||
golang.org/x/sync v0.3.0
|
||||
golang.org/x/sys v0.7.0 // indirect
|
||||
golang.org/x/term v0.7.0 // indirect
|
||||
google.golang.org/api v0.110.0 // indirect
|
||||
|
@ -1521,6 +1521,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -17,8 +17,12 @@ package gc
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/registryctl"
|
||||
"github.com/goharbor/harbor/src/controller/artifact"
|
||||
"github.com/goharbor/harbor/src/controller/project"
|
||||
@ -66,6 +70,7 @@ type GarbageCollector struct {
|
||||
// hold all of GC candidates(non-referenced blobs), it's captured by mark and consumed by sweep.
|
||||
deleteSet []*blobModels.Blob
|
||||
timeWindowHours int64
|
||||
workers int
|
||||
}
|
||||
|
||||
// MaxFails implements the interface in job/Interface
|
||||
@ -141,8 +146,19 @@ func (gc *GarbageCollector) parseParams(params job.Parameters) {
|
||||
}
|
||||
}
|
||||
|
||||
gc.logger.Infof("Garbage Collection parameters: [delete_untagged: %t, dry_run: %t, time_window: %d]",
|
||||
gc.deleteUntagged, gc.dryRun, gc.timeWindowHours)
|
||||
// gc workers: default is 1. The business unit of removing blobs.
|
||||
gc.workers = 1
|
||||
ws, exist := params["workers"]
|
||||
if exist {
|
||||
if workers, ok := ws.(float64); ok {
|
||||
if int(workers) > 0 {
|
||||
gc.workers = int(workers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gc.logger.Infof("Garbage Collection parameters: [delete_untagged: %t, dry_run: %t, time_window: %d, workers: %d]",
|
||||
gc.deleteUntagged, gc.dryRun, gc.timeWindowHours, gc.workers)
|
||||
}
|
||||
|
||||
// Run implements the interface in job/Interface
|
||||
@ -220,6 +236,7 @@ func (gc *GarbageCollector) mark(ctx job.Context) error {
|
||||
blobCt := 0
|
||||
mfCt := 0
|
||||
makeSize := int64(0)
|
||||
|
||||
for _, blob := range blobs {
|
||||
if !gc.dryRun {
|
||||
if gc.shouldStop(ctx) {
|
||||
@ -259,159 +276,198 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
|
||||
blobCnt := int64(0)
|
||||
mfCnt := int64(0)
|
||||
total := len(gc.deleteSet)
|
||||
for i, blob := range gc.deleteSet {
|
||||
if gc.shouldStop(ctx) {
|
||||
return errGcStop
|
||||
}
|
||||
idx := i + 1
|
||||
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
|
||||
blob.Status = blobModels.StatusDeleting
|
||||
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to mark gc candidate deleting, skip: %s, %s", idx, total, blob.Digest, blob.Status)
|
||||
continue
|
||||
}
|
||||
if count == 0 {
|
||||
gc.logger.Warningf("[%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", idx, total, blob.ID, blob.Digest)
|
||||
continue
|
||||
}
|
||||
|
||||
// remove tags and revisions of a manifest
|
||||
skippedBlob := false
|
||||
if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() {
|
||||
for _, art := range gc.trashedArts[blob.Digest] {
|
||||
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
|
||||
gc.logger.Infof("[%d/%d] delete the manifest with registry v2 API: %s, %s, %s",
|
||||
idx, total, art.RepositoryName, blob.ContentType, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := v2DeleteManifest(art.RepositoryName, blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", idx, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to delete manifest with v2 API, %s, %s, %v", idx, total, art.RepositoryName, blob.Digest, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", idx, total, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
skippedBlob = true
|
||||
continue
|
||||
}
|
||||
// for manifest, it has to delete the revisions folder of each repository
|
||||
gc.logger.Infof("[%d/%d] delete manifest from storage: %s", idx, total, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", idx, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", idx, total, art.RepositoryName, blob.Digest, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", idx, total, art.RepositoryName, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
skippedBlob = true
|
||||
continue
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%d/%d] delete artifact blob record from database: %d, %s, %s", idx, total, art.ID, art.RepositoryName, art.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", idx, total, art.Digest, err)
|
||||
return err
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%d/%d] delete artifact trash record from database: %d, %s, %s", idx, total, art.ID, art.RepositoryName, art.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", idx, total, art.ID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// skip deleting the blob if the manifest's tag/revision is not deleted
|
||||
if skippedBlob {
|
||||
continue
|
||||
}
|
||||
|
||||
// delete all of blobs, which include config, layer and manifest
|
||||
// for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record.
|
||||
if !blob.IsForeignLayer() {
|
||||
gc.logger.Infof("[%d/%d] delete blob from storage: %s", idx, total, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := gc.registryCtlClient.DeleteBlob(blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", idx, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", idx, total, blob.Digest, blob.Status, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", idx, total, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
sweepSize = sweepSize + blob.Size
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%d/%d] delete blob record from database: %d, %s", idx, total, blob.ID, blob.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", idx, total, blob.Digest, blob.Status, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", idx, total, blob.ID, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
if blob.IsManifest() {
|
||||
mfCnt++
|
||||
} else {
|
||||
blobCnt++
|
||||
}
|
||||
// split the full set into pieces (count workers)
|
||||
if total <= 0 || gc.workers <= 0 {
|
||||
return nil
|
||||
}
|
||||
blobChunkSize, err := divide(total, gc.workers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blobChunkCount := (total + blobChunkSize - 1) / blobChunkSize
|
||||
blobChunks := make([][]*blobModels.Blob, blobChunkCount)
|
||||
for i, start := 0, 0; i < blobChunkCount; i, start = i+1, start+blobChunkSize {
|
||||
end := start + blobChunkSize
|
||||
if end > total {
|
||||
end = total
|
||||
}
|
||||
blobChunks[i] = gc.deleteSet[start:end]
|
||||
}
|
||||
|
||||
g := new(errgroup.Group)
|
||||
g.SetLimit(gc.workers)
|
||||
index := int64(0)
|
||||
for _, blobChunk := range blobChunks {
|
||||
blobChunk := blobChunk
|
||||
g.Go(func() error {
|
||||
uid := uuid.New().String()
|
||||
for _, blob := range blobChunk {
|
||||
if gc.shouldStop(ctx) {
|
||||
return errGcStop
|
||||
}
|
||||
|
||||
atomic.AddInt64(&index, 1)
|
||||
index := atomic.LoadInt64(&index)
|
||||
|
||||
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
|
||||
blob.Status = blobModels.StatusDeleting
|
||||
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
|
||||
if err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, index, total, blob.Digest, blob.Status)
|
||||
continue
|
||||
}
|
||||
if count == 0 {
|
||||
gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, index, total, blob.ID, blob.Digest)
|
||||
continue
|
||||
}
|
||||
|
||||
// remove tags and revisions of a manifest
|
||||
skippedBlob := false
|
||||
if _, exist := gc.trashedArts[blob.Digest]; exist && blob.IsManifest() {
|
||||
for _, art := range gc.trashedArts[blob.Digest] {
|
||||
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
|
||||
gc.logger.Infof("[%s][%d/%d] delete the manifest with registry v2 API: %s, %s, %s",
|
||||
uid, index, total, art.RepositoryName, blob.ContentType, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := v2DeleteManifest(art.RepositoryName, blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, index, total, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
skippedBlob = true
|
||||
continue
|
||||
}
|
||||
// for manifest, it has to delete the revisions folder of each repository
|
||||
gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, index, total, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, index, total, art.RepositoryName, blob.Digest, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
skippedBlob = true
|
||||
continue
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, index, total, art.Digest, err)
|
||||
return err
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, index, total, art.ID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// skip deleting the blob if the manifest's tag/revision is not deleted
|
||||
if skippedBlob {
|
||||
continue
|
||||
}
|
||||
|
||||
// delete all the blobs, which include config, layer and manifest
|
||||
// for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record.
|
||||
if !blob.IsForeignLayer() {
|
||||
gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, index, total, blob.Digest)
|
||||
if err := retry.Retry(func() error {
|
||||
return ignoreNotFound(func() error {
|
||||
err := gc.registryCtlClient.DeleteBlob(blob.Digest)
|
||||
// if the system is in read-only mode, return an Abort error to skip retrying
|
||||
if err == readonly.Err {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}, retry.Callback(func(err error, sleep time.Duration) {
|
||||
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, index, total, err, sleep)
|
||||
})); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, index, total, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
// if the system is set to read-only mode, return directly
|
||||
if err == readonly.Err {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
atomic.AddInt64(&sweepSize, blob.Size)
|
||||
}
|
||||
|
||||
gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, index, total, blob.ID, blob.Digest)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
|
||||
if err := ignoreNotFound(func() error {
|
||||
return gc.markDeleteFailed(ctx, blob)
|
||||
}); err != nil {
|
||||
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, index, total, blob.ID, blob.Digest, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if blob.IsManifest() {
|
||||
atomic.AddInt64(&mfCnt, 1)
|
||||
} else {
|
||||
atomic.AddInt64(&blobCnt, 1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
gc.logger.Errorf("failed to execute mark(), error out, %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
gc.logger.Infof("%d blobs and %d manifests are actually deleted", blobCnt, mfCnt)
|
||||
gc.logger.Infof("The GC job actual frees up %d MB space.", sweepSize/1024/1024)
|
||||
|
||||
|
@ -161,9 +161,11 @@ func (suite *gcTestSuite) TestInit() {
|
||||
"delete_untagged": true,
|
||||
"redis_url_reg": "redis url",
|
||||
"time_window": 1,
|
||||
"workers": float64(3),
|
||||
}
|
||||
suite.Nil(gc.init(ctx, params))
|
||||
suite.True(gc.deleteUntagged)
|
||||
suite.Equal(3, gc.workers)
|
||||
|
||||
params = map[string]interface{}{
|
||||
"delete_untagged": "unsupported",
|
||||
@ -279,6 +281,7 @@ func (suite *gcTestSuite) TestRun() {
|
||||
"delete_untagged": false,
|
||||
"redis_url_reg": tests.GetRedisURL(),
|
||||
"time_window": 1,
|
||||
"workers": 3,
|
||||
}
|
||||
|
||||
suite.Nil(gc.Run(ctx, params))
|
||||
@ -375,6 +378,7 @@ func (suite *gcTestSuite) TestSweep() {
|
||||
ContentType: schema2.MediaTypeLayer,
|
||||
},
|
||||
},
|
||||
workers: 3,
|
||||
}
|
||||
|
||||
suite.Nil(gc.sweep(ctx))
|
||||
|
@ -76,3 +76,18 @@ func ignoreNotFound(f func() error) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// divide if it is divisible, it gives the quotient. if it's not, it gives the remainder.
|
||||
func divide(a, b int) (int, error) {
|
||||
if b == 0 {
|
||||
return 0, errors.New("the divided cannot be zero")
|
||||
}
|
||||
|
||||
quotient := a / b
|
||||
remainder := a % b
|
||||
|
||||
if quotient == 0 {
|
||||
return remainder, nil
|
||||
}
|
||||
return quotient, nil
|
||||
}
|
||||
|
50
src/jobservice/job/impl/gc/util_test.go
Normal file
50
src/jobservice/job/impl/gc/util_test.go
Normal file
@ -0,0 +1,50 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIgnoreNotFound(t *testing.T) {
|
||||
var f = func() error {
|
||||
return nil
|
||||
}
|
||||
assert.Nil(t, ignoreNotFound(f))
|
||||
f = func() error {
|
||||
return errors.New(nil).WithMessage("my error")
|
||||
}
|
||||
assert.NotNil(t, ignoreNotFound(f))
|
||||
f = func() error {
|
||||
return errors.New(nil).WithMessage("my error").WithCode(errors.BadRequestCode)
|
||||
}
|
||||
assert.NotNil(t, ignoreNotFound(f))
|
||||
f = func() error {
|
||||
return errors.New(nil).WithMessage("my error").WithCode(errors.NotFoundCode)
|
||||
}
|
||||
assert.Nil(t, ignoreNotFound(f))
|
||||
}
|
||||
|
||||
func TestDivide(t *testing.T) {
|
||||
var result int
|
||||
var err error
|
||||
result, err = divide(1, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 1, result)
|
||||
|
||||
result, err = divide(5, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 5, result)
|
||||
|
||||
result, err = divide(30, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 3, result)
|
||||
|
||||
result, err = divide(33, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 3, result)
|
||||
|
||||
result, err = divide(33, 0)
|
||||
assert.NotNil(t, err)
|
||||
}
|
@ -99,6 +99,17 @@ func (g *gcAPI) kick(ctx context.Context, scheType string, cron string, paramete
|
||||
if deleteUntagged, ok := parameters["delete_untagged"].(bool); ok {
|
||||
policy.DeleteUntagged = deleteUntagged
|
||||
}
|
||||
if workers, ok := parameters["workers"].(json.Number); ok {
|
||||
wInt, err := workers.Int64()
|
||||
if err != nil {
|
||||
return 0, errors.BadRequestError(fmt.Errorf("workers should be integer format"))
|
||||
}
|
||||
if !validateWorkers(int(wInt)) {
|
||||
return 0, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("Error: Invalid number of workers:%s. Workers must be greater than 0 and less than or equal to 5.", workers)
|
||||
}
|
||||
policy.Workers = int(wInt)
|
||||
}
|
||||
|
||||
id, err = g.gcCtr.Start(ctx, policy, task.ExecutionTriggerManual)
|
||||
case ScheduleNone:
|
||||
err = g.gcCtr.DeleteSchedule(ctx)
|
||||
@ -112,6 +123,16 @@ func (g *gcAPI) kick(ctx context.Context, scheType string, cron string, paramete
|
||||
if deleteUntagged, ok := parameters["delete_untagged"].(bool); ok {
|
||||
policy.DeleteUntagged = deleteUntagged
|
||||
}
|
||||
if workers, ok := parameters["workers"].(json.Number); ok {
|
||||
wInt, err := workers.Int64()
|
||||
if err != nil {
|
||||
return 0, errors.BadRequestError(fmt.Errorf("workers should be integer format"))
|
||||
}
|
||||
if !validateWorkers(int(wInt)) {
|
||||
return 0, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("Error: Invalid number of workers:%s. Workers must be greater than 0 and less than or equal to 5.", workers)
|
||||
}
|
||||
policy.Workers = int(wInt)
|
||||
}
|
||||
err = g.updateSchedule(ctx, scheType, cron, policy)
|
||||
}
|
||||
return id, err
|
||||
@ -260,3 +281,10 @@ func (g *gcAPI) StopGC(ctx context.Context, params operation.StopGCParams) middl
|
||||
|
||||
return operation.NewStopGCOK()
|
||||
}
|
||||
|
||||
func validateWorkers(workers int) bool {
|
||||
if workers <= 0 || workers > 5 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
15
src/server/v2.0/handler/gc_test.go
Normal file
15
src/server/v2.0/handler/gc_test.go
Normal file
@ -0,0 +1,15 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidateWorkers(t *testing.T) {
|
||||
assert.False(t, validateWorkers(0))
|
||||
assert.False(t, validateWorkers(10))
|
||||
assert.False(t, validateWorkers(-1))
|
||||
assert.True(t, validateWorkers(1))
|
||||
assert.True(t, validateWorkers(5))
|
||||
}
|
27
src/vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
27
src/vendor/golang.org/x/sync/LICENSE
generated
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
Copyright (c) 2009 The Go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
22
src/vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
22
src/vendor/golang.org/x/sync/PATENTS
generated
vendored
Normal file
@ -0,0 +1,22 @@
|
||||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
132
src/vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
132
src/vendor/golang.org/x/sync/errgroup/errgroup.go
generated
vendored
Normal file
@ -0,0 +1,132 @@
|
||||
// Copyright 2016 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package errgroup provides synchronization, error propagation, and Context
|
||||
// cancelation for groups of goroutines working on subtasks of a common task.
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type token struct{}
|
||||
|
||||
// A Group is a collection of goroutines working on subtasks that are part of
|
||||
// the same overall task.
|
||||
//
|
||||
// A zero Group is valid, has no limit on the number of active goroutines,
|
||||
// and does not cancel on error.
|
||||
type Group struct {
|
||||
cancel func(error)
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
sem chan token
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
}
|
||||
|
||||
func (g *Group) done() {
|
||||
if g.sem != nil {
|
||||
<-g.sem
|
||||
}
|
||||
g.wg.Done()
|
||||
}
|
||||
|
||||
// WithContext returns a new Group and an associated Context derived from ctx.
|
||||
//
|
||||
// The derived Context is canceled the first time a function passed to Go
|
||||
// returns a non-nil error or the first time Wait returns, whichever occurs
|
||||
// first.
|
||||
func WithContext(ctx context.Context) (*Group, context.Context) {
|
||||
ctx, cancel := withCancelCause(ctx)
|
||||
return &Group{cancel: cancel}, ctx
|
||||
}
|
||||
|
||||
// Wait blocks until all function calls from the Go method have returned, then
|
||||
// returns the first non-nil error (if any) from them.
|
||||
func (g *Group) Wait() error {
|
||||
g.wg.Wait()
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
return g.err
|
||||
}
|
||||
|
||||
// Go calls the given function in a new goroutine.
|
||||
// It blocks until the new goroutine can be added without the number of
|
||||
// active goroutines in the group exceeding the configured limit.
|
||||
//
|
||||
// The first call to return a non-nil error cancels the group's context, if the
|
||||
// group was created by calling WithContext. The error will be returned by Wait.
|
||||
func (g *Group) Go(f func() error) {
|
||||
if g.sem != nil {
|
||||
g.sem <- token{}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// TryGo calls the given function in a new goroutine only if the number of
|
||||
// active goroutines in the group is currently below the configured limit.
|
||||
//
|
||||
// The return value reports whether the goroutine was started.
|
||||
func (g *Group) TryGo(f func() error) bool {
|
||||
if g.sem != nil {
|
||||
select {
|
||||
case g.sem <- token{}:
|
||||
// Note: this allows barging iff channels in general allow barging.
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer g.done()
|
||||
|
||||
if err := f(); err != nil {
|
||||
g.errOnce.Do(func() {
|
||||
g.err = err
|
||||
if g.cancel != nil {
|
||||
g.cancel(g.err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
return true
|
||||
}
|
||||
|
||||
// SetLimit limits the number of active goroutines in this group to at most n.
|
||||
// A negative value indicates no limit.
|
||||
//
|
||||
// Any subsequent call to the Go method will block until it can add an active
|
||||
// goroutine without exceeding the configured limit.
|
||||
//
|
||||
// The limit must not be modified while any goroutines in the group are active.
|
||||
func (g *Group) SetLimit(n int) {
|
||||
if n < 0 {
|
||||
g.sem = nil
|
||||
return
|
||||
}
|
||||
if len(g.sem) != 0 {
|
||||
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
|
||||
}
|
||||
g.sem = make(chan token, n)
|
||||
}
|
14
src/vendor/golang.org/x/sync/errgroup/go120.go
generated
vendored
Normal file
14
src/vendor/golang.org/x/sync/errgroup/go120.go
generated
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build go1.20
|
||||
// +build go1.20
|
||||
|
||||
package errgroup
|
||||
|
||||
import "context"
|
||||
|
||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
||||
return context.WithCancelCause(parent)
|
||||
}
|
15
src/vendor/golang.org/x/sync/errgroup/pre_go120.go
generated
vendored
Normal file
15
src/vendor/golang.org/x/sync/errgroup/pre_go120.go
generated
vendored
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright 2023 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build !go1.20
|
||||
// +build !go1.20
|
||||
|
||||
package errgroup
|
||||
|
||||
import "context"
|
||||
|
||||
func withCancelCause(parent context.Context) (context.Context, func(error)) {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
return ctx, func(error) { cancel() }
|
||||
}
|
3
src/vendor/modules.txt
vendored
3
src/vendor/modules.txt
vendored
@ -696,6 +696,9 @@ golang.org/x/oauth2/google/internal/externalaccount
|
||||
golang.org/x/oauth2/internal
|
||||
golang.org/x/oauth2/jws
|
||||
golang.org/x/oauth2/jwt
|
||||
# golang.org/x/sync v0.3.0
|
||||
## explicit; go 1.17
|
||||
golang.org/x/sync/errgroup
|
||||
# golang.org/x/sys v0.7.0
|
||||
## explicit; go 1.17
|
||||
golang.org/x/sys/internal/unsafeheader
|
||||
|
Loading…
Reference in New Issue
Block a user