refactor: replace the gc redigo client to the standard cache (#18965)

Refactor the clean redis logic in the GC job, replace the redigo client
to the lib cache interface which can simplify operations.

Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
Chlins Zhang 2023-07-24 14:30:25 +08:00 committed by GitHub
parent 94c76002a2
commit c030fd7863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 45 deletions

View File

@ -15,7 +15,9 @@
package gc
import (
"context"
"encoding/json"
"net/url"
"os"
"sync/atomic"
"time"
@ -28,9 +30,9 @@ import (
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q"
redisLib "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/artifactrash"
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
@ -185,13 +187,13 @@ func (gc *GarbageCollector) Run(ctx job.Context, params job.Parameters) error {
if err == errGcStop {
// we may already delete several artifacts before receiving the stop signal, so try to clean up the cache
gc.logger.Info("received the stop signal, quit GC job after cleaning up the cache.")
return gc.cleanCache()
return gc.cleanCache(ctx.SystemContext())
}
gc.logger.Errorf("failed to execute GC job at sweep phase, error: %v", err)
return err
}
if err := gc.cleanCache(); err != nil {
if err := gc.cleanCache(ctx.SystemContext()); err != nil {
return err
}
}
@ -480,21 +482,18 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
// cleanCache is to clean the registry cache for GC.
// To do this is because the issue https://github.com/docker/distribution/issues/2094
func (gc *GarbageCollector) cleanCache() error {
pool, err := redisLib.GetRedisPool("GarbageCollector", gc.redisURL, &redisLib.PoolParam{
PoolMaxIdle: 0,
PoolMaxActive: 1,
PoolIdleTimeout: 60 * time.Second,
DialConnectionTimeout: dialConnectionTimeout,
DialReadTimeout: dialReadTimeout,
DialWriteTimeout: dialWriteTimeout,
})
func (gc *GarbageCollector) cleanCache(ctx context.Context) error {
u, err := url.Parse(gc.redisURL)
if err != nil {
gc.logger.Errorf("failed to connect to redis %v", err)
gc.logger.Errorf("failed to parse redis url %s, error: %v", gc.redisURL, err)
return err
}
c, err := cache.New(u.Scheme, cache.Address(gc.redisURL))
if err != nil {
gc.logger.Errorf("failed to get redis client: %v", err)
return err
}
con := pool.Get()
defer con.Close()
// clean all keys in registry redis DB.
@ -503,7 +502,7 @@ func (gc *GarbageCollector) cleanCache() error {
// 2) "repository::library/hello-world::blobs::sha256:4ab4c602aa5eed5528a6620ff18a1dc4faef0e1ab3a5eddeddb410714478c67f"
patterns := []string{blobPrefix, repoPrefix}
for _, pattern := range patterns {
if err := delKeys(con, pattern); err != nil {
if err := delKeys(ctx, c, pattern); err != nil {
gc.logger.Errorf("failed to clean registry cache %v, pattern %s", err, pattern)
return err
}

View File

@ -15,43 +15,26 @@
package gc
import (
"fmt"
"github.com/gomodule/redigo/redis"
"context"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/registry"
)
// delKeys ...
func delKeys(con redis.Conn, pattern string) error {
iter := 0
keys := make([]string, 0)
for {
arr, err := redis.Values(con.Do("SCAN", iter, "MATCH", pattern))
if err != nil {
return fmt.Errorf("error retrieving '%s' keys: %s", pattern, err)
}
iter, err = redis.Int(arr[0], nil)
if err != nil {
return fmt.Errorf("unexpected type for Int, got type %T", err)
}
k, err := redis.Strings(arr[1], nil)
if err != nil {
return fmt.Errorf("converts an array command reply to a []string %v", err)
}
keys = append(keys, k...)
func delKeys(ctx context.Context, c cache.Cache, pattern string) error {
iter, err := c.Scan(ctx, pattern)
if err != nil {
return errors.Wrap(err, "failed to scan keys")
}
if iter == 0 {
break
}
}
for _, key := range keys {
_, err := con.Do("DEL", key)
if err != nil {
return fmt.Errorf("failed to clean registry cache %v", err)
for iter.Next(ctx) {
if err := c.Delete(ctx, iter.Val()); err != nil {
return errors.Wrap(err, "failed to clean registry cache")
}
}
return nil
}

View File

@ -1,9 +1,12 @@
package gc
import (
"github.com/goharbor/harbor/src/lib/errors"
"context"
"fmt"
"testing"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/stretchr/testify/assert"
)
@ -48,3 +51,39 @@ func TestDivide(t *testing.T) {
result, err = divide(33, 0)
assert.NotNil(t, err)
}
func TestDelKeys(t *testing.T) {
// get redis client
c, err := cache.New("redis", cache.Address("redis://127.0.0.1:6379"))
assert.NoError(t, err)
// helper function
// mock the data in the redis
mock := func(count int, prefix string) {
for i := 0; i < count; i++ {
err = c.Save(context.TODO(), fmt.Sprintf("%s-%d", prefix, i), "", 0)
assert.NoError(t, err)
}
}
// check after running delKeys, should no keys found
afterCheck := func(prefix string) {
iter, err := c.Scan(context.TODO(), prefix)
assert.NoError(t, err)
assert.False(t, iter.Next(context.TODO()))
}
{
prefix := "mock-group-1"
count := 100
mock(count, prefix)
assert.NoError(t, delKeys(context.TODO(), c, prefix))
afterCheck(prefix)
}
{
prefix := "mock-group-2"
count := 1100
mock(count, prefix)
assert.NoError(t, delKeys(context.TODO(), c, prefix))
afterCheck(prefix)
}
}