mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-10 12:40:19 +01:00
Revise the GC job flow,
1, set harbor to readonly 2, select the candidate artifacts from Harbor DB. 3, call registry API(--delete-untagged=false) to delete manifest bases on the results of #2 4, clean keys of redis DB of registry, clean artifact trash and untagged from DB. 5, roll back readonly. Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
parent
7a2090b6ca
commit
948d45604c
@ -5676,6 +5676,9 @@ definitions:
|
||||
job_kind:
|
||||
type: string
|
||||
description: the job kind of gc job.
|
||||
job_parameters:
|
||||
type: string
|
||||
description: the job parameters of gc job.
|
||||
schedule:
|
||||
$ref: '#/definitions/AdminJobScheduleObj'
|
||||
job_status:
|
||||
@ -5695,6 +5698,9 @@ definitions:
|
||||
properties:
|
||||
schedule:
|
||||
$ref: '#/definitions/AdminJobScheduleObj'
|
||||
parameters:
|
||||
type: string
|
||||
description: The parameters of admin job
|
||||
AdminJobScheduleObj:
|
||||
type: object
|
||||
properties:
|
||||
|
@ -1,3 +1,4 @@
|
||||
ALTER TABLE admin_job ADD COLUMN job_parameters varchar(255) Default '';
|
||||
ALTER TABLE artifact ADD COLUMN repository_id int;
|
||||
ALTER TABLE artifact ADD COLUMN media_type varchar(255);
|
||||
ALTER TABLE artifact ADD COLUMN manifest_media_type varchar(255);
|
||||
|
@ -28,10 +28,10 @@ func AddAdminJob(job *models.AdminJob) (int64, error) {
|
||||
if len(job.Status) == 0 {
|
||||
job.Status = models.JobPending
|
||||
}
|
||||
sql := "insert into admin_job (job_name, job_kind, status, job_uuid, cron_str, creation_time, update_time) values (?, ?, ?, ?, ?, ?, ?) RETURNING id"
|
||||
sql := "insert into admin_job (job_name, job_parameters, job_kind, status, job_uuid, cron_str, creation_time, update_time) values (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id"
|
||||
var id int64
|
||||
now := time.Now()
|
||||
err := o.Raw(sql, job.Name, job.Kind, job.Status, job.UUID, job.Cron, now, now).QueryRow(&id)
|
||||
err := o.Raw(sql, job.Name, job.Parameters, job.Kind, job.Status, job.UUID, job.Cron, now, now).QueryRow(&id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -44,8 +44,9 @@ func (suite *AdminJobSuite) SetupSuite() {
|
||||
}
|
||||
|
||||
job0 := &models.AdminJob{
|
||||
Name: "GC",
|
||||
Kind: "testKind",
|
||||
Name: "GC",
|
||||
Kind: "testKind",
|
||||
Parameters: "{test:test}",
|
||||
}
|
||||
|
||||
suite.ids = make([]int64, 0)
|
||||
@ -77,6 +78,7 @@ func (suite *AdminJobSuite) TestAdminJobBase() {
|
||||
require.Nil(suite.T(), err)
|
||||
suite.Equal(job1.ID, suite.job0.ID)
|
||||
suite.Equal(job1.Name, suite.job0.Name)
|
||||
suite.Equal(job1.Parameters, suite.job0.Parameters)
|
||||
|
||||
// set uuid
|
||||
err = SetAdminJobUUID(suite.job0.ID, "f5ef34f4cb3588d663176132")
|
||||
|
@ -29,6 +29,7 @@ type AdminJob struct {
|
||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
||||
Name string `orm:"column(job_name)" json:"job_name"`
|
||||
Kind string `orm:"column(job_kind)" json:"job_kind"`
|
||||
Parameters string `orm:"column(job_parameters)" json:"job_parameters"`
|
||||
Cron string `orm:"column(cron_str)" json:"cron_str"`
|
||||
Status string `orm:"column(status)" json:"job_status"`
|
||||
UUID string `orm:"column(job_uuid)" json:"-"`
|
||||
|
@ -133,7 +133,7 @@ func (aj *AJAPI) list(name string) {
|
||||
|
||||
// getSchedule gets admin job schedule ...
|
||||
func (aj *AJAPI) getSchedule(name string) {
|
||||
adminJobSchedule := models.AdminJobSchedule{}
|
||||
result := models.AdminJobRep{}
|
||||
|
||||
jobs, err := dao.GetAdminJobs(&common_models.AdminJobQuery{
|
||||
Name: name,
|
||||
@ -154,10 +154,11 @@ func (aj *AJAPI) getSchedule(name string) {
|
||||
aj.SendInternalServerError(fmt.Errorf("failed to convert admin job response: %v", err))
|
||||
return
|
||||
}
|
||||
adminJobSchedule.Schedule = adminJobRep.Schedule
|
||||
result.Schedule = adminJobRep.Schedule
|
||||
result.Parameters = adminJobRep.Parameters
|
||||
}
|
||||
|
||||
aj.Data["json"] = adminJobSchedule
|
||||
aj.Data["json"] = result
|
||||
aj.ServeJSON()
|
||||
}
|
||||
|
||||
@ -285,9 +286,10 @@ func (aj *AJAPI) submit(ajr *models.AdminJobReq) {
|
||||
}
|
||||
|
||||
id, err := dao.AddAdminJob(&common_models.AdminJob{
|
||||
Name: ajr.Name,
|
||||
Kind: ajr.JobKind(),
|
||||
Cron: ajr.CronString(),
|
||||
Name: ajr.Name,
|
||||
Kind: ajr.JobKind(),
|
||||
Cron: ajr.CronString(),
|
||||
Parameters: ajr.ParamString(),
|
||||
})
|
||||
if err != nil {
|
||||
aj.SendInternalServerError(err)
|
||||
@ -345,6 +347,7 @@ func convertToAdminJobRep(job *common_models.AdminJob) (models.AdminJobRep, erro
|
||||
Name: job.Name,
|
||||
Kind: job.Kind,
|
||||
Status: job.Status,
|
||||
Parameters: job.Parameters,
|
||||
CreationTime: job.CreationTime,
|
||||
UpdateTime: job.UpdateTime,
|
||||
}
|
||||
|
@ -73,6 +73,7 @@ type AdminJobRep struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"job_name"`
|
||||
Kind string `json:"job_kind"`
|
||||
Parameters string `json:"job_parameters"`
|
||||
Status string `json:"job_status"`
|
||||
UUID string `json:"-"`
|
||||
Deleted bool `json:"deleted"`
|
||||
@ -151,6 +152,16 @@ func (ar *AdminJobReq) CronString() string {
|
||||
return string(str)
|
||||
}
|
||||
|
||||
// ParamString ...
|
||||
func (ar *AdminJobReq) ParamString() string {
|
||||
str, err := json.Marshal(ar.Parameters)
|
||||
if err != nil {
|
||||
log.Debugf("failed to marshal json error, %v", err)
|
||||
return ""
|
||||
}
|
||||
return string(str)
|
||||
}
|
||||
|
||||
// ConvertSchedule converts different kinds of cron string into one standard for UI to show.
|
||||
// in the latest design, it uses {"type":"Daily","cron":"0 0 0 * * *"} as the cron item.
|
||||
// As for supporting migration from older version, it needs to convert {"parameter":{"daily_time":0},"type":"daily"}
|
||||
|
@ -138,6 +138,19 @@ func TestCronString(t *testing.T) {
|
||||
assert.True(t, strings.EqualFold(cronStr, "{\"type\":\"Daily\",\"Cron\":\"20 3 0 * * *\"}"))
|
||||
}
|
||||
|
||||
func TestParamString(t *testing.T) {
|
||||
adminJobPara := make(map[string]interface{})
|
||||
adminJobPara["key1"] = "value1"
|
||||
adminJobPara["key2"] = true
|
||||
adminJobPara["key3"] = 88
|
||||
|
||||
adminjob := &AdminJobReq{
|
||||
Parameters: adminJobPara,
|
||||
}
|
||||
paramStr := adminjob.ParamString()
|
||||
assert.True(t, strings.EqualFold(paramStr, "{\"key1\":\"value1\",\"key2\":true,\"key3\":88}"))
|
||||
}
|
||||
|
||||
func TestConvertSchedule(t *testing.T) {
|
||||
schedule1 := "{\"type\":\"Daily\",\"cron\":\"20 3 0 * * *\"}"
|
||||
converted1, err1 := ConvertSchedule(schedule1)
|
||||
|
@ -48,12 +48,18 @@ func (gc *GCAPI) Prepare() {
|
||||
// "schedule": {
|
||||
// "type": "Daily",
|
||||
// "cron": "0 0 0 * * *"
|
||||
// },
|
||||
// "parameters": {
|
||||
// "delete_untagged": true
|
||||
// }
|
||||
// }
|
||||
// create a manual trigger for GC
|
||||
// {
|
||||
// "schedule": {
|
||||
// "type": "Manual"
|
||||
// },
|
||||
// "parameters": {
|
||||
// "delete_untagged": true
|
||||
// }
|
||||
// }
|
||||
func (gc *GCAPI) Post() {
|
||||
@ -64,9 +70,7 @@ func (gc *GCAPI) Post() {
|
||||
return
|
||||
}
|
||||
ajr.Name = common_job.ImageGC
|
||||
ajr.Parameters = map[string]interface{}{
|
||||
"redis_url_reg": os.Getenv("_REDIS_URL_REG"),
|
||||
}
|
||||
ajr.Parameters["redis_url_reg"] = os.Getenv("_REDIS_URL_REG")
|
||||
gc.submit(&ajr)
|
||||
gc.Redirect(http.StatusCreated, strconv.FormatInt(ajr.ID, 10))
|
||||
}
|
||||
@ -77,6 +81,9 @@ func (gc *GCAPI) Post() {
|
||||
// "schedule": {
|
||||
// "type": "None",
|
||||
// "cron": ""
|
||||
// },
|
||||
// "parameters": {
|
||||
// "delete_untagged": true
|
||||
// }
|
||||
// }
|
||||
func (gc *GCAPI) Put() {
|
||||
@ -87,9 +94,7 @@ func (gc *GCAPI) Put() {
|
||||
return
|
||||
}
|
||||
ajr.Name = common_job.ImageGC
|
||||
ajr.Parameters = map[string]interface{}{
|
||||
"redis_url_reg": os.Getenv("_REDIS_URL_REG"),
|
||||
}
|
||||
ajr.Parameters["redis_url_reg"] = os.Getenv("_REDIS_URL_REG")
|
||||
gc.updateSchedule(ajr)
|
||||
}
|
||||
|
||||
|
@ -7,10 +7,11 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var adminJob001 apilib.AdminJobReq
|
||||
|
||||
func TestGCPost(t *testing.T) {
|
||||
|
||||
adminJob001 := apilib.AdminJobReq{
|
||||
Parameters: map[string]interface{}{"delete_untagged": false},
|
||||
}
|
||||
assert := assert.New(t)
|
||||
apiTest := newHarborAPI()
|
||||
|
||||
|
@ -17,6 +17,8 @@ package impl
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
o "github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/internal/orm"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
)
|
||||
@ -49,7 +51,8 @@ func (dc *DefaultContext) Build(t job.Tracker) (job.Context, error) {
|
||||
}
|
||||
|
||||
jContext := &DefaultContext{
|
||||
sysContext: dc.sysContext,
|
||||
// TODO support DB transaction
|
||||
sysContext: orm.NewContext(dc.sysContext, o.NewOrm()),
|
||||
tracker: t,
|
||||
properties: make(map[string]interface{}),
|
||||
}
|
||||
|
@ -16,19 +16,18 @@ package gc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/api/artifact"
|
||||
"github.com/goharbor/harbor/src/pkg/artifactrash"
|
||||
"github.com/goharbor/harbor/src/pkg/q"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
"github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
common_quota "github.com/goharbor/harbor/src/common/quota"
|
||||
"github.com/goharbor/harbor/src/common/registryctl"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/pkg/types"
|
||||
"github.com/goharbor/harbor/src/registryctl/client"
|
||||
)
|
||||
|
||||
@ -43,11 +42,14 @@ const (
|
||||
|
||||
// GarbageCollector is the struct to run registry's garbage collection
|
||||
type GarbageCollector struct {
|
||||
artCtl artifact.Controller
|
||||
artrashMgr artifactrash.Manager
|
||||
registryCtlClient client.Client
|
||||
logger logger.Interface
|
||||
cfgMgr *config.CfgManager
|
||||
CoreURL string
|
||||
redisURL string
|
||||
deleteUntagged bool
|
||||
}
|
||||
|
||||
// MaxFails implements the interface in job/Interface
|
||||
@ -66,6 +68,22 @@ func (gc *GarbageCollector) Validate(params job.Parameters) error {
|
||||
}
|
||||
|
||||
// Run implements the interface in job/Interface
|
||||
// The workflow of GC is:
|
||||
// 1, set harbor to readonly
|
||||
// 2, select the candidate artifacts from Harbor DB.
|
||||
// 3, call registry API(--delete-untagged=false) to delete manifest bases on the results of #2
|
||||
// 4, clean keys of redis DB of registry, clean artifact trash and untagged from DB.
|
||||
// 5, roll back readonly.
|
||||
// More details:
|
||||
// 1, why disable delete untagged when to call registry API
|
||||
// Generally because that we introduce Harbor tag in v2.0, it's in database but no corresponding data in registry.
|
||||
// Also one failure case example:
|
||||
// there are two parts for putting an manifest in Harbor: write database and write storage, but they're not in a transaction,
|
||||
// which leads to the data mismatching in parallel pushing images with same tag but different digest. The valid artifact in
|
||||
// harbor DB could be a untagged one in the storage. If we enable the delete untagged, the valid data could be removed from the storage.
|
||||
// 2, what to be cleaned
|
||||
// > the deleted artifact, bases on table of artifact_trash and artifact
|
||||
// > the untagged artifact(optional), bases on table of artifact.
|
||||
func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
|
||||
if err := gc.init(ctx, params); err != nil {
|
||||
return err
|
||||
@ -80,11 +98,10 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
|
||||
}
|
||||
defer gc.setReadOnly(readOnlyCur)
|
||||
}
|
||||
if err := gc.registryCtlClient.Health(); err != nil {
|
||||
gc.logger.Errorf("failed to start gc as registry controller is unreachable: %v", err)
|
||||
return err
|
||||
}
|
||||
gc.logger.Infof("start to run gc in job.")
|
||||
if err := gc.deleteCandidates(ctx); err != nil {
|
||||
gc.logger.Errorf("failed to delete GC candidates in gc job, with error: %v", err)
|
||||
}
|
||||
gcr, err := gc.registryCtlClient.StartGC()
|
||||
if err != nil {
|
||||
gc.logger.Errorf("failed to get gc result: %v", err)
|
||||
@ -93,9 +110,6 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
|
||||
if err := gc.cleanCache(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := gc.ensureQuota(); err != nil {
|
||||
gc.logger.Warningf("failed to align quota data in gc job, with error: %v", err)
|
||||
}
|
||||
gc.logger.Infof("GC results: status: %t, message: %s, start: %s, end: %s.", gcr.Status, gcr.Msg, gcr.StartTime, gcr.EndTime)
|
||||
gc.logger.Infof("success to run gc in job.")
|
||||
return nil
|
||||
@ -105,6 +119,13 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
|
||||
registryctl.Init()
|
||||
gc.registryCtlClient = registryctl.RegistryCtlClient
|
||||
gc.logger = ctx.GetLogger()
|
||||
gc.artCtl = artifact.Ctl
|
||||
gc.artrashMgr = artifactrash.NewManager()
|
||||
|
||||
if err := gc.registryCtlClient.Health(); err != nil {
|
||||
gc.logger.Errorf("failed to start gc as registry controller is unreachable: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
errTpl := "failed to get required property: %s"
|
||||
if v, ok := ctx.Get(common.CoreURL); ok && len(v.(string)) > 0 {
|
||||
@ -116,11 +137,17 @@ func (gc *GarbageCollector) init(ctx job.Context, params job.Parameters) error {
|
||||
configURL := gc.CoreURL + common.CoreConfigPath
|
||||
gc.cfgMgr = config.NewRESTCfgManager(configURL, secret)
|
||||
gc.redisURL = params["redis_url_reg"].(string)
|
||||
|
||||
// default is to delete the untagged artifact
|
||||
if params["delete_untagged"] == "" {
|
||||
gc.deleteUntagged = true
|
||||
} else {
|
||||
gc.deleteUntagged = params["delete_untagged"].(bool)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) getReadOnly() (bool, error) {
|
||||
|
||||
if err := gc.cfgMgr.Load(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -138,7 +165,6 @@ func (gc *GarbageCollector) setReadOnly(switcher bool) error {
|
||||
// cleanCache is to clean the registry cache for GC.
|
||||
// To do this is because the issue https://github.com/docker/distribution/issues/2094
|
||||
func (gc *GarbageCollector) cleanCache() error {
|
||||
|
||||
con, err := redis.DialURL(
|
||||
gc.redisURL,
|
||||
redis.DialConnectTimeout(dialConnectionTimeout),
|
||||
@ -169,60 +195,47 @@ func (gc *GarbageCollector) cleanCache() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func delKeys(con redis.Conn, pattern string) error {
|
||||
iter := 0
|
||||
keys := make([]string, 0)
|
||||
for {
|
||||
arr, err := redis.Values(con.Do("SCAN", iter, "MATCH", pattern))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving '%s' keys", pattern)
|
||||
// deleteCandidates deletes the two parts of artifact from harbor DB
|
||||
// 1, required part, the artifacts were removed from Harbor.
|
||||
// 2, optional part, the untagged artifacts.
|
||||
func (gc *GarbageCollector) deleteCandidates(ctx job.Context) error {
|
||||
// default is to clean trash
|
||||
flushTrash := true
|
||||
defer func() {
|
||||
if flushTrash {
|
||||
if err := gc.artrashMgr.Flush(ctx.SystemContext()); err != nil {
|
||||
gc.logger.Errorf("failed to flush artifact trash")
|
||||
}
|
||||
}
|
||||
iter, err = redis.Int(arr[0], nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unexpected type for Int, got type %T", err)
|
||||
}
|
||||
k, err := redis.Strings(arr[1], nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("converts an array command reply to a []string %v", err)
|
||||
}
|
||||
keys = append(keys, k...)
|
||||
}()
|
||||
|
||||
if iter == 0 {
|
||||
break
|
||||
// handle the optional ones, and the artifact controller will move them into trash.
|
||||
if gc.deleteUntagged {
|
||||
untagged, err := gc.artCtl.List(ctx.SystemContext(), &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"Tags": "nil",
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, art := range untagged {
|
||||
if err := gc.artCtl.Delete(ctx.SystemContext(), art.ID); err != nil {
|
||||
// the failure ones can be GCed by the next execution
|
||||
gc.logger.Errorf("failed to delete untagged:%d artifact in DB, error, %v", art.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, key := range keys {
|
||||
_, err := con.Do("DEL", key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to clean registry cache %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) ensureQuota() error {
|
||||
projects, err := dao.GetProjects(nil)
|
||||
// handle the trash
|
||||
required, err := gc.artrashMgr.Filter(ctx.SystemContext())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
for _, project := range projects {
|
||||
pSize, err := dao.CountSizeOfProject(project.ProjectID)
|
||||
if err != nil {
|
||||
gc.logger.Warningf("error happen on counting size of project:%d by artifact, error:%v, just skip it.", project.ProjectID, err)
|
||||
continue
|
||||
}
|
||||
quotaMgr, err := common_quota.NewManager("project", strconv.FormatInt(project.ProjectID, 10))
|
||||
if err != nil {
|
||||
gc.logger.Errorf("Error occurred when to new quota manager %v, just skip it.", err)
|
||||
continue
|
||||
}
|
||||
if err := quotaMgr.SetResourceUsage(types.ResourceStorage, pSize); err != nil {
|
||||
gc.logger.Errorf("cannot ensure quota for the project: %d, err: %v, just skip it.", project.ProjectID, err)
|
||||
continue
|
||||
}
|
||||
if err := dao.RemoveUntaggedBlobs(project.ProjectID); err != nil {
|
||||
gc.logger.Errorf("cannot delete untagged blobs of project: %d, err: %v, just skip it.", project.ProjectID, err)
|
||||
continue
|
||||
for _, art := range required {
|
||||
if err := deleteManifest(art.RepositoryName, art.Digest); err != nil {
|
||||
flushTrash = false
|
||||
return fmt.Errorf("failed to delete manifest, %s:%s with error: %v", art.RepositoryName, art.Digest, err)
|
||||
}
|
||||
}
|
||||
return nil
|
21
src/jobservice/job/impl/gc/garbage_collection_test.go
Normal file
21
src/jobservice/job/impl/gc/garbage_collection_test.go
Normal file
@ -0,0 +1,21 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMaxFails(t *testing.T) {
|
||||
rep := &GarbageCollector{}
|
||||
assert.Equal(t, uint(1), rep.MaxFails())
|
||||
}
|
||||
|
||||
func TestShouldRetry(t *testing.T) {
|
||||
rep := &GarbageCollector{}
|
||||
assert.False(t, rep.ShouldRetry())
|
||||
}
|
||||
|
||||
func TestValidate(t *testing.T) {
|
||||
rep := &GarbageCollector{}
|
||||
assert.Nil(t, rep.Validate(nil))
|
||||
}
|
56
src/jobservice/job/impl/gc/util.go
Normal file
56
src/jobservice/job/impl/gc/util.go
Normal file
@ -0,0 +1,56 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/goharbor/harbor/src/pkg/registry"
|
||||
)
|
||||
|
||||
// delKeys ...
|
||||
func delKeys(con redis.Conn, pattern string) error {
|
||||
iter := 0
|
||||
keys := make([]string, 0)
|
||||
for {
|
||||
arr, err := redis.Values(con.Do("SCAN", iter, "MATCH", pattern))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving '%s' keys", pattern)
|
||||
}
|
||||
iter, err = redis.Int(arr[0], nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unexpected type for Int, got type %T", err)
|
||||
}
|
||||
k, err := redis.Strings(arr[1], nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("converts an array command reply to a []string %v", err)
|
||||
}
|
||||
keys = append(keys, k...)
|
||||
|
||||
if iter == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, key := range keys {
|
||||
_, err := con.Do("DEL", key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to clean registry cache %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteManifest calls the registry API to remove manifest
|
||||
func deleteManifest(repository, digest string) error {
|
||||
exist, _, err := registry.Cli.ManifestExist(repository, digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// it could be happened at remove manifest success but fail to delete harbor DB.
|
||||
// when the GC job executes again, the manifest should not exist.
|
||||
if !exist {
|
||||
return nil
|
||||
}
|
||||
if err := registry.Cli.DeleteManifest(repository, digest); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -15,6 +15,7 @@ package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -52,6 +53,7 @@ type RedisRunnerTestSuite struct {
|
||||
|
||||
// TestRedisRunnerTestSuite is entry of go test
|
||||
func TestRedisRunnerTestSuite(t *testing.T) {
|
||||
common_dao.PrepareTestForPostgresSQL()
|
||||
suite.Run(t, new(RedisRunnerTestSuite))
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
@ -50,6 +51,7 @@ type CWorkerTestSuite struct {
|
||||
func (suite *CWorkerTestSuite) SetupSuite() {
|
||||
suite.namespace = tests.GiveMeTestNamespace()
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
common_dao.PrepareTestForPostgresSQL()
|
||||
|
||||
// Append node ID
|
||||
vCtx := context.WithValue(context.Background(), utils.NodeID, utils.GenerateNodeID())
|
||||
|
@ -17,6 +17,8 @@ type DAO interface {
|
||||
Delete(ctx context.Context, id int64) (err error)
|
||||
// Filter lists the artifact that needs to be cleaned
|
||||
Filter(ctx context.Context) (arts []model.ArtifactTrash, err error)
|
||||
// Flush clean the trash table
|
||||
Flush(ctx context.Context) (err error)
|
||||
}
|
||||
|
||||
// New returns an instance of the default DAO
|
||||
@ -76,3 +78,20 @@ func (d *dao) Filter(ctx context.Context) (arts []model.ArtifactTrash, err error
|
||||
}
|
||||
return deletedAfs, nil
|
||||
}
|
||||
|
||||
// Flush ...
|
||||
func (d *dao) Flush(ctx context.Context) (err error) {
|
||||
ormer, err := orm.FromContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sql := `DELETE * FROM artifact_trash`
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = ormer.Raw(sql).Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -89,3 +89,27 @@ func (d *daoTestSuite) TestFilter() {
|
||||
d.Require().NotNil(err)
|
||||
d.Require().Equal(afs[0].Digest, "1234")
|
||||
}
|
||||
|
||||
func (d *daoTestSuite) TestFlush() {
|
||||
_, err := d.dao.Create(d.ctx, &model.ArtifactTrash{
|
||||
ManifestMediaType: v1.MediaTypeImageManifest,
|
||||
RepositoryName: "hello-world",
|
||||
Digest: "abcd",
|
||||
})
|
||||
d.Require().Nil(err)
|
||||
_, err = d.dao.Create(d.ctx, &model.ArtifactTrash{
|
||||
ManifestMediaType: v1.MediaTypeImageManifest,
|
||||
RepositoryName: "hello-world2",
|
||||
Digest: "efgh",
|
||||
})
|
||||
d.Require().Nil(err)
|
||||
_, err = d.dao.Create(d.ctx, &model.ArtifactTrash{
|
||||
ManifestMediaType: v1.MediaTypeImageManifest,
|
||||
RepositoryName: "hello-world3",
|
||||
Digest: "ijkl",
|
||||
})
|
||||
d.Require().Nil(err)
|
||||
|
||||
err = d.dao.Flush(d.ctx)
|
||||
d.Require().Nil(err)
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ type Manager interface {
|
||||
Delete(ctx context.Context, id int64) (err error)
|
||||
// Filter ...
|
||||
Filter(ctx context.Context) (arts []model.ArtifactTrash, err error)
|
||||
// Flush clean the trash table
|
||||
Flush(ctx context.Context) (err error)
|
||||
}
|
||||
|
||||
// NewManager returns an instance of the default manager
|
||||
@ -57,3 +59,7 @@ func (m *manager) Delete(ctx context.Context, id int64) error {
|
||||
func (m *manager) Filter(ctx context.Context) (arts []model.ArtifactTrash, err error) {
|
||||
return m.dao.Filter(ctx)
|
||||
}
|
||||
|
||||
func (m *manager) Flush(ctx context.Context) (err error) {
|
||||
return m.dao.Flush(ctx)
|
||||
}
|
||||
|
@ -24,6 +24,10 @@ func (f *fakeDao) Filter(ctx context.Context) (arts []model.ArtifactTrash, err e
|
||||
args := f.Called()
|
||||
return args.Get(0).([]model.ArtifactTrash), args.Error(1)
|
||||
}
|
||||
func (f *fakeDao) Flush(ctx context.Context) (err error) {
|
||||
args := f.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
type managerTestSuite struct {
|
||||
suite.Suite
|
||||
@ -69,3 +73,10 @@ func (m *managerTestSuite) TestFilter() {
|
||||
m.Require().Nil(err)
|
||||
m.Equal(len(arts), 1)
|
||||
}
|
||||
|
||||
func (m *managerTestSuite) TestFlush() {
|
||||
m.dao.On("Flush", mock.Anything).Return(nil)
|
||||
err := m.mgr.Flush(nil)
|
||||
m.Require().Nil(err)
|
||||
m.dao.AssertExpectations(m.T())
|
||||
}
|
||||
|
@ -24,9 +24,11 @@ package apilib
|
||||
|
||||
// AdminJobReq holds request information for admin job
|
||||
type AdminJobReq struct {
|
||||
Schedule *ScheduleParam `json:"schedule,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
ID int64 `json:"id,omitempty"`
|
||||
Schedule *ScheduleParam `json:"schedule,omitempty"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status,omitempty"`
|
||||
ID int64 `json:"id,omitempty"`
|
||||
Parameters map[string]interface{} `json:"parameters"`
|
||||
}
|
||||
|
||||
// ScheduleParam ...
|
||||
|
@ -45,3 +45,9 @@ func (f *FakeManager) Filter(ctx context.Context) (arts []model.ArtifactTrash, e
|
||||
args := f.Called()
|
||||
return args.Get(0).([]model.ArtifactTrash), args.Error(1)
|
||||
}
|
||||
|
||||
// Flush ...
|
||||
func (f *FakeManager) Flush(ctx context.Context) (err error) {
|
||||
args := f.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user