add gc read only job (#12591)

Add the read only job as a back up plan, user still can use it but just with API, and specify the parameter read_only:true

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2020-07-30 15:30:52 +08:00 committed by GitHub
parent 518a1721a7
commit e14e6938da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 644 additions and 2 deletions

View File

@ -7,7 +7,8 @@ const (
ImageScanAllJob = "IMAGE_SCAN_ALL"
// ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC"
// ImageGCReadOnly the name of image garbage collection read only job in job service
ImageGCReadOnly = "IMAGE_GC_READ_ONLY"
// JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"
// JobKindScheduled : Kind of scheduled job

View File

@ -61,6 +61,7 @@ func (gc *GCAPI) Prepare() {
// },
// "parameters": {
// "delete_untagged": true
// "read_only": true
// }
// }
func (gc *GCAPI) Post() {
@ -73,9 +74,17 @@ func (gc *GCAPI) Post() {
gc.SendBadRequestError(err)
return
}
ajr.Name = common_job.ImageGC
ajr.Parameters["redis_url_reg"] = os.Getenv("_REDIS_URL_REG")
// default is the non-blocking GC job.
ajr.Name = common_job.ImageGC
ajr.Parameters["time_window"] = config.GetGCTimeWindow()
// if specify read_only:true, API will submit the readonly GC job, otherwise default is non-blocking GC job.
readOnlyParam, exist := ajr.Parameters["read_only"]
if exist {
if readOnly, ok := readOnlyParam.(bool); ok && readOnly {
ajr.Name = common_job.ImageGCReadOnly
}
}
gc.submit(&ajr)
gc.Redirect(http.StatusCreated, strconv.FormatInt(ajr.ID, 10))
}

View File

@ -0,0 +1,335 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcreadonly
import (
"fmt"
redislib "github.com/goharbor/harbor/src/lib/redis"
"os"
"time"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash"
"github.com/goharbor/harbor/src/pkg/blob"
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/common/config"
"github.com/goharbor/harbor/src/common/registryctl"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/registryctl/client"
)
var (
regCtlInit = registryctl.Init
getReadOnly = func(cfgMgr *config.CfgManager) (bool, error) {
if err := cfgMgr.Load(); err != nil {
return false, err
}
return cfgMgr.Get(common.ReadOnly).GetBool(), nil
}
setReadOnly = func(cfgMgr *config.CfgManager, switcher bool) error {
cfg := map[string]interface{}{
common.ReadOnly: switcher,
}
cfgMgr.UpdateConfig(cfg)
return cfgMgr.Save()
}
)
const (
dialConnectionTimeout = 30 * time.Second
dialReadTimeout = time.Minute + 10*time.Second
dialWriteTimeout = 10 * time.Second
blobPrefix = "blobs::*"
repoPrefix = "repository::*"
uploadSizePattern = "upload:*:size"
)
// GarbageCollector is the struct to run registry's garbage collection
type GarbageCollector struct {
artCtl artifact.Controller
artrashMgr artifactrash.Manager
blobMgr blob.Manager
projectCtl project.Controller
registryCtlClient client.Client
logger logger.Interface
cfgMgr *config.CfgManager
CoreURL string
redisURL string
deleteUntagged bool
}
// MaxFails implements the interface in job/Interface
func (gc *GarbageCollector) MaxFails() uint {
return 1
}
// MaxCurrency is implementation of same method in Interface.
func (gc *GarbageCollector) MaxCurrency() uint {
return 1
}
// ShouldRetry implements the interface in job/Interface
func (gc *GarbageCollector) ShouldRetry() bool {
return false
}
// Validate implements the interface in job/Interface
func (gc *GarbageCollector) Validate(params job.Parameters) error {
return nil
}
// Run implements the interface in job/Interface
// The workflow of GC is:
// 1, set harbor to readonly
// 2, select the candidate artifacts from Harbor DB.
// 3, call registry API(--delete-untagged=false) to delete manifest bases on the results of #2
// 4, clean keys of redis DB of registry, clean artifact trash and untagged from DB.
// 5, roll back readonly.
// More details:
// 1, why disable delete untagged when to call registry API
// Generally because that we introduce Harbor tag in v2.0, it's in database but no corresponding data in registry.
// Also one failure case example:
// there are two parts for putting an manifest in Harbor: write database and write storage, but they're not in a transaction,
// which leads to the data mismatching in parallel pushing images with same tag but different digest. The valid artifact in
// harbor DB could be a untagged one in the storage. If we enable the delete untagged, the valid data could be removed from the storage.
// 2, what to be cleaned
// > the deleted artifact, bases on table of artifact_trash and artifact
// > the untagged artifact(optional), bases on table of artifact.
func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
if err := gc.init(ctx, params); err != nil {
return err
}
readOnlyCur, err := getReadOnly(gc.cfgMgr)
if err != nil {
return err
}
if readOnlyCur != true {
if err := setReadOnly(gc.cfgMgr, true); err != nil {
return err
}
defer setReadOnly(gc.cfgMgr, readOnlyCur)
}
gc.logger.Infof("start to run gc in job.")
if err := gc.deleteCandidates(ctx); err != nil {
gc.logger.Errorf("failed to delete GC candidates in gc job, with error: %v", err)
}
gcr, err := gc.registryCtlClient.StartGC()
if err != nil {
gc.logger.Errorf("failed to get gc result: %v", err)
return err
}
gc.removeUntaggedBlobs(ctx)
if err := gc.cleanCache(); err != nil {
return err
}
gc.logger.Infof("GC results: status: %t, message: %s, start: %s, end: %s.", gcr.Status, gcr.Msg, gcr.StartTime, gcr.EndTime)
gc.logger.Infof("success to run gc in job.")
return nil
}
func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
regCtlInit()
gc.logger = ctx.GetLogger()
opCmd, flag := ctx.OPCommand()
if flag && opCmd.IsStop() {
gc.logger.Info("received the stop signal, quit GC job.")
return nil
}
// UT will use the mock client, ctl and mgr
if os.Getenv("UTTEST") != "true" {
gc.registryCtlClient = registryctl.RegistryCtlClient
gc.artCtl = artifact.Ctl
gc.artrashMgr = artifactrash.NewManager()
gc.blobMgr = blob.NewManager()
gc.projectCtl = project.Ctl
}
if err := gc.registryCtlClient.Health(); err != nil {
gc.logger.Errorf("failed to start gc as registry controller is unreachable: %v", err)
return err
}
errTpl := "failed to get required property: %s"
if v, ok := ctx.Get(common.CoreURL); ok && len(v.(string)) > 0 {
gc.CoreURL = v.(string)
} else {
return fmt.Errorf(errTpl, common.CoreURL)
}
secret := os.Getenv("JOBSERVICE_SECRET")
configURL := gc.CoreURL + common.CoreConfigPath
gc.cfgMgr = config.NewRESTCfgManager(configURL, secret)
gc.redisURL = params["redis_url_reg"].(string)
// default is to delete the untagged artifact
gc.deleteUntagged = true
deleteUntagged, exist := params["delete_untagged"]
if exist {
if untagged, ok := deleteUntagged.(bool); ok && !untagged {
gc.deleteUntagged = untagged
}
}
return nil
}
// cleanCache is to clean the registry cache for GC.
// To do this is because the issue https://github.com/docker/distribution/issues/2094
func (gc *GarbageCollector) cleanCache() error {
pool, err := redislib.GetRedisPool("GarbageCollector", gc.redisURL, &redislib.PoolParam{
PoolMaxIdle: 0,
PoolMaxActive: 1,
PoolIdleTimeout: 60 * time.Second,
DialConnectionTimeout: dialConnectionTimeout,
DialReadTimeout: dialReadTimeout,
DialWriteTimeout: dialWriteTimeout,
})
if err != nil {
gc.logger.Errorf("failed to connect to redis %v", err)
return err
}
con := pool.Get()
defer con.Close()
// clean all keys in registry redis DB.
// sample of keys in registry redis:
// 1) "blobs::sha256:1a6fd470b9ce10849be79e99529a88371dff60c60aab424c077007f6979b4812"
// 2) "repository::library/hello-world::blobs::sha256:4ab4c602aa5eed5528a6620ff18a1dc4faef0e1ab3a5eddeddb410714478c67f"
// 3) "upload:fbd2e0a3-262d-40bb-abe4-2f43aa6f9cda:size"
patterns := []string{blobPrefix, repoPrefix, uploadSizePattern}
for _, pattern := range patterns {
if err := delKeys(con, pattern); err != nil {
gc.logger.Errorf("failed to clean registry cache %v, pattern %s", err, pattern)
return err
}
}
return nil
}
// deleteCandidates deletes the two parts of artifact from harbor DB
// 1, required part, the artifacts were removed from Harbor.
// 2, optional part, the untagged artifacts.
func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
if os.Getenv("UTTEST") == "true" {
gc.logger = ctx.GetLogger()
}
// default is not to clean trash
flushTrash := false
defer func() {
if flushTrash {
gc.logger.Info("flush artifact trash")
if err := gc.artrashMgr.Flush(ctx.SystemContext(), 0); err != nil {
gc.logger.Errorf("failed to flush artifact trash: %v", err)
}
}
}()
// handle the optional ones, and the artifact controller will move them into trash.
if gc.deleteUntagged {
untagged, err := gc.artCtl.List(ctx.SystemContext(), &q.Query{
Keywords: map[string]interface{}{
"Tags": "nil",
},
}, nil)
if err != nil {
return err
}
gc.logger.Info("start to delete untagged artifact.")
for _, art := range untagged {
if err := gc.artCtl.Delete(ctx.SystemContext(), art.ID); err != nil {
// the failure ones can be GCed by the next execution
gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", art.ID, err)
continue
}
gc.logger.Infof("delete the untagged artifact: ProjectID:(%d)-RepositoryName(%s)-MediaType:(%s)-Digest:(%s)",
art.ProjectID, art.RepositoryName, art.ManifestMediaType, art.Digest)
}
gc.logger.Info("end to delete untagged artifact.")
}
// handle the trash
required, err := gc.artrashMgr.Filter(ctx.SystemContext(), 0)
if err != nil {
return err
}
gc.logger.Info("required candidate: %+v", required)
for _, art := range required {
if err := deleteManifest(art.RepositoryName, art.Digest); err != nil {
return fmt.Errorf("failed to delete manifest, %s:%s with error: %v", art.RepositoryName, art.Digest, err)
}
gc.logger.Infof("delete the manifest with registry v2 API: RepositoryName(%s)-MediaType:(%s)-Digest:(%s)",
art.RepositoryName, art.ManifestMediaType, art.Digest)
}
gc.logger.Info("end to delete required artifact.")
flushTrash = true
return nil
}
// clean the untagged blobs in each project, these blobs are not referenced by any manifest and will be cleaned by GC
func (gc *GarbageCollector) removeUntaggedBlobs(ctx job.Context) {
// get all projects
projects := func(chunkSize int) <-chan *models.Project {
ch := make(chan *models.Project, chunkSize)
go func() {
defer close(ch)
params := &models.ProjectQueryParam{
Pagination: &models.Pagination{Page: 1, Size: int64(chunkSize)},
}
for {
results, err := gc.projectCtl.List(ctx.SystemContext(), params, project.Metadata(false))
if err != nil {
gc.logger.Errorf("list projects failed, error: %v", err)
return
}
for _, p := range results {
ch <- p
}
if len(results) < chunkSize {
break
}
params.Pagination.Page++
}
}()
return ch
}(50)
for project := range projects {
all, err := gc.blobMgr.List(ctx.SystemContext(), blob.ListParams{
ProjectID: project.ProjectID,
})
if err != nil {
gc.logger.Errorf("failed to get blobs of project, %v", err)
continue
}
if err := gc.blobMgr.CleanupAssociationsForProject(ctx.SystemContext(), project.ProjectID, all); err != nil {
gc.logger.Errorf("failed to clean untagged blobs of project, %v", err)
continue
}
}
}

View File

@ -0,0 +1,237 @@
package gcreadonly
import (
"github.com/goharbor/harbor/src/common/config"
"github.com/goharbor/harbor/src/common/models"
commom_regctl "github.com/goharbor/harbor/src/common/registryctl"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/artifact"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
pkg_blob "github.com/goharbor/harbor/src/pkg/blob/models"
artifacttesting "github.com/goharbor/harbor/src/testing/controller/artifact"
projecttesting "github.com/goharbor/harbor/src/testing/controller/project"
mockjobservice "github.com/goharbor/harbor/src/testing/jobservice"
"github.com/goharbor/harbor/src/testing/mock"
trashtesting "github.com/goharbor/harbor/src/testing/pkg/artifactrash"
"github.com/goharbor/harbor/src/testing/pkg/blob"
"github.com/goharbor/harbor/src/testing/registryctl"
"github.com/stretchr/testify/suite"
"os"
"testing"
)
type gcTestSuite struct {
suite.Suite
artifactCtl *artifacttesting.Controller
artrashMgr *trashtesting.FakeManager
registryCtlClient *registryctl.Mockclient
projectCtl *projecttesting.Controller
blobMgr *blob.Manager
regCtlInit func()
setReadOnly func(cfgMgr *config.CfgManager, switcher bool) error
getReadOnly func(cfgMgr *config.CfgManager) (bool, error)
}
func (suite *gcTestSuite) SetupTest() {
suite.artifactCtl = &artifacttesting.Controller{}
suite.artrashMgr = &trashtesting.FakeManager{}
suite.registryCtlClient = &registryctl.Mockclient{}
suite.blobMgr = &blob.Manager{}
suite.projectCtl = &projecttesting.Controller{}
regCtlInit = func() { commom_regctl.RegistryCtlClient = suite.registryCtlClient }
setReadOnly = func(cfgMgr *config.CfgManager, switcher bool) error { return nil }
getReadOnly = func(cfgMgr *config.CfgManager) (bool, error) { return true, nil }
}
func (suite *gcTestSuite) TestMaxFails() {
gc := &GarbageCollector{}
suite.Equal(uint(1), gc.MaxFails())
}
func (suite *gcTestSuite) TestShouldRetry() {
gc := &GarbageCollector{}
suite.False(gc.ShouldRetry())
}
func (suite *gcTestSuite) TestValidate() {
gc := &GarbageCollector{}
suite.Nil(gc.Validate(nil))
}
func (suite *gcTestSuite) TestDeleteCandidates() {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
suite.artrashMgr.On("Flush").Return(nil)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{
ID: 1,
RepositoryID: 1,
},
}, nil)
suite.artifactCtl.On("Delete").Return(nil)
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{}, nil)
gc := &GarbageCollector{
artCtl: suite.artifactCtl,
artrashMgr: suite.artrashMgr,
}
suite.Nil(gc.deleteCandidates(ctx))
}
func (suite *gcTestSuite) TestRemoveUntaggedBlobs() {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
mock.OnAnything(suite.projectCtl, "List").Return([]*models.Project{
{
ProjectID: 1234,
Name: "test GC",
},
}, nil)
mock.OnAnything(suite.blobMgr, "List").Return([]*pkg_blob.Blob{
{
ID: 1234,
Digest: "sha256:1234",
Size: 1234,
},
}, nil)
mock.OnAnything(suite.blobMgr, "CleanupAssociationsForProject").Return(nil)
gc := &GarbageCollector{
projectCtl: suite.projectCtl,
blobMgr: suite.blobMgr,
}
suite.NotPanics(func() {
gc.removeUntaggedBlobs(ctx)
})
}
func (suite *gcTestSuite) TestInit() {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, true)
gc := &GarbageCollector{
registryCtlClient: suite.registryCtlClient,
}
params := map[string]interface{}{
"delete_untagged": true,
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
suite.True(gc.deleteUntagged)
params = map[string]interface{}{
"delete_untagged": "unsupported",
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
suite.True(gc.deleteUntagged)
params = map[string]interface{}{
"delete_untagged": false,
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
suite.False(gc.deleteUntagged)
params = map[string]interface{}{
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
suite.True(gc.deleteUntagged)
}
func (suite *gcTestSuite) TestStop() {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.StopCommand, true)
gc := &GarbageCollector{
registryCtlClient: suite.registryCtlClient,
}
params := map[string]interface{}{
"delete_untagged": true,
"redis_url_reg": "redis url",
}
suite.Nil(gc.init(ctx, params))
ctx = &mockjobservice.MockJobContext{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("OPCommand").Return(job.StopCommand, false)
suite.Nil(gc.init(ctx, params))
ctx = &mockjobservice.MockJobContext{}
mock.OnAnything(ctx, "Get").Return("core url", true)
ctx.On("OPCommand").Return(job.NilCommand, true)
suite.Nil(gc.init(ctx, params))
}
func (suite *gcTestSuite) TestRun() {
ctx := &mockjobservice.MockJobContext{}
logger := &mockjobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.NilCommand, true)
mock.OnAnything(ctx, "Get").Return("core url", true)
suite.artrashMgr.On("Flush").Return(nil)
suite.artifactCtl.On("List").Return([]*artifact.Artifact{
{
ID: 1,
RepositoryID: 1,
},
}, nil)
suite.artifactCtl.On("Delete").Return(nil)
suite.artrashMgr.On("Filter").Return([]model.ArtifactTrash{}, nil)
mock.OnAnything(suite.projectCtl, "List").Return([]*models.Project{
{
ProjectID: 12345,
Name: "test GC",
},
}, nil)
mock.OnAnything(suite.blobMgr, "List").Return([]*pkg_blob.Blob{
{
ID: 12345,
Digest: "sha256:12345",
Size: 12345,
},
}, nil)
mock.OnAnything(suite.blobMgr, "CleanupAssociationsForProject").Return(nil)
gc := &GarbageCollector{
artCtl: suite.artifactCtl,
artrashMgr: suite.artrashMgr,
cfgMgr: config.NewInMemoryManager(),
projectCtl: suite.projectCtl,
blobMgr: suite.blobMgr,
registryCtlClient: suite.registryCtlClient,
}
params := map[string]interface{}{
"delete_untagged": false,
// ToDo add a redis testing pkg, we do have a 'localhost' redis server in UT
"redis_url_reg": "redis://localhost:6379",
}
suite.Nil(gc.Run(ctx, params))
}
func TestGCTestSuite(t *testing.T) {
os.Setenv("UTTEST", "true")
suite.Run(t, &gcTestSuite{})
}

View File

@ -0,0 +1,56 @@
package gcreadonly
import (
"fmt"
"github.com/goharbor/harbor/src/pkg/registry"
"github.com/gomodule/redigo/redis"
)
// delKeys ...
func delKeys(con redis.Conn, pattern string) error {
iter := 0
keys := make([]string, 0)
for {
arr, err := redis.Values(con.Do("SCAN", iter, "MATCH", pattern))
if err != nil {
return fmt.Errorf("error retrieving '%s' keys", pattern)
}
iter, err = redis.Int(arr[0], nil)
if err != nil {
return fmt.Errorf("unexpected type for Int, got type %T", err)
}
k, err := redis.Strings(arr[1], nil)
if err != nil {
return fmt.Errorf("converts an array command reply to a []string %v", err)
}
keys = append(keys, k...)
if iter == 0 {
break
}
}
for _, key := range keys {
_, err := con.Do("DEL", key)
if err != nil {
return fmt.Errorf("failed to clean registry cache %v", err)
}
}
return nil
}
// deleteManifest calls the registry API to remove manifest
func deleteManifest(repository, digest string) error {
exist, _, err := registry.Cli.ManifestExist(repository, digest)
if err != nil {
return err
}
// it could be happened at remove manifest success but fail to delete harbor DB.
// when the GC job executes again, the manifest should not exist.
if !exist {
return nil
}
if err := registry.Cli.DeleteManifest(repository, digest); err != nil {
return err
}
return nil
}

View File

@ -26,6 +26,8 @@ const (
ImageScanAllJob = "IMAGE_SCAN_ALL"
// ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC"
// ImageGCReadOnly the name of image garbage collection read only job in job service
ImageGCReadOnly = "IMAGE_GC_READ_ONLY"
// Replication : the name of the replication job in job service
Replication = "REPLICATION"
// ReplicationScheduler : the name of the replication scheduler job in job service

View File

@ -33,6 +33,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/gcreadonly"
"github.com/goharbor/harbor/src/jobservice/job/impl/notification"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
@ -257,6 +258,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
job.ImageScanJob: (*sc.Job)(nil),
job.ImageScanAllJob: (*all.Job)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageGCReadOnly: (*gcreadonly.GarbageCollector)(nil),
job.Replication: (*replication.Replication)(nil),
job.ReplicationScheduler: (*replication.Scheduler)(nil),
job.Retention: (*retention.Job)(nil),