mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-20 23:57:42 +01:00
Merge pull request #8026 from steven-zou/fix_bug_#7988
add migration process for rdb as well as UT cases
This commit is contained in:
commit
89ad59b0d4
@ -19,8 +19,8 @@ worker_pool:
|
||||
redis_pool:
|
||||
#redis://[arbitrary_username:password@]ipaddress:port/database_index
|
||||
#or ipaddress:port[,weight,password,database_index]
|
||||
redis_url: "localhost:6379"
|
||||
namespace: "harbor_job_service"
|
||||
redis_url: "redis://localhost:6379/2"
|
||||
namespace: "harbor_job_service_namespace"
|
||||
|
||||
#Loggers for the running job
|
||||
job_loggers:
|
||||
@ -29,11 +29,11 @@ job_loggers:
|
||||
- name: "FILE"
|
||||
level: "DEBUG"
|
||||
settings: # Customized settings of logger
|
||||
base_dir: "/Users/szou/tmp/job_logs"
|
||||
base_dir: "/tmp/job_logs"
|
||||
sweeper:
|
||||
duration: 1 #days
|
||||
settings: # Customized settings of sweeper
|
||||
work_dir: "/Users/szou/tmp/job_logs"
|
||||
work_dir: "/tmp/job_logs"
|
||||
|
||||
#Loggers for the job service
|
||||
loggers:
|
||||
|
148
src/jobservice/migration/manager.go
Normal file
148
src/jobservice/migration/manager.go
Normal file
@ -0,0 +1,148 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"github.com/Masterminds/semver"
|
||||
"reflect"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Manager for managing the related migrators
|
||||
type Manager interface {
|
||||
// Register the specified migrator to the execution chain
|
||||
Register(migratorFactory MigratorFactory)
|
||||
|
||||
// Migrate data
|
||||
Migrate() error
|
||||
}
|
||||
|
||||
// MigratorChainNode is a wrapper to append the migrator to the chain with a next reference
|
||||
type MigratorChainNode struct {
|
||||
// Migrator implementation
|
||||
migrator RDBMigrator
|
||||
// Refer the next migration of the chain if existing
|
||||
next *MigratorChainNode
|
||||
}
|
||||
|
||||
// BasicManager is the default implementation of manager interface
|
||||
type BasicManager struct {
|
||||
// The head of migrator chain
|
||||
head *MigratorChainNode
|
||||
// Pool for connecting to redis
|
||||
pool *redis.Pool
|
||||
// RDB namespace
|
||||
namespace string
|
||||
}
|
||||
|
||||
// New a basic manager
|
||||
func New(pool *redis.Pool, ns string) Manager {
|
||||
return &BasicManager{
|
||||
pool: pool,
|
||||
namespace: ns,
|
||||
}
|
||||
}
|
||||
|
||||
// Register the migrator to the chain
|
||||
func (bm *BasicManager) Register(migratorFactory MigratorFactory) {
|
||||
if migratorFactory == nil {
|
||||
return // ignore, do nothing
|
||||
}
|
||||
|
||||
migrator, err := migratorFactory(bm.pool, bm.namespace)
|
||||
if err != nil {
|
||||
logger.Errorf("migrator register error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
newNode := &MigratorChainNode{
|
||||
migrator: migrator,
|
||||
next: nil,
|
||||
}
|
||||
|
||||
if bm.head == nil {
|
||||
bm.head = newNode
|
||||
return
|
||||
}
|
||||
|
||||
bm.head.next = newNode
|
||||
}
|
||||
|
||||
// Migrate data
|
||||
func (bm *BasicManager) Migrate() error {
|
||||
conn := bm.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Read schema version first
|
||||
v, err := redis.String(conn.Do("GET", VersionKey(bm.namespace)))
|
||||
if err != nil && err != redis.ErrNil {
|
||||
return errors.Wrap(err, "read schema version failed")
|
||||
}
|
||||
|
||||
if len(v) > 0 {
|
||||
current, err := semver.NewVersion(v)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "malformed schema version")
|
||||
}
|
||||
nowV, _ := semver.NewVersion(SchemaVersion)
|
||||
|
||||
diff := nowV.Compare(current)
|
||||
if diff < 0 {
|
||||
return errors.Errorf("the schema version of migrator is smaller that the one in the rdb: %s<%s", nowV.String(), current.String())
|
||||
} else if diff == 0 {
|
||||
logger.Info("No migration needed")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if bm.head == nil {
|
||||
logger.Warning("No migrator registered, passed migration")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Info("Process for migrating data is started")
|
||||
|
||||
h := bm.head
|
||||
for h != nil {
|
||||
meta := h.migrator.Metadata()
|
||||
if meta == nil {
|
||||
// Make metadata required
|
||||
return errors.Errorf("no metadata provided for the migrator %s", reflect.TypeOf(h.migrator).String())
|
||||
}
|
||||
|
||||
logger.Infof("Migrate %s from %s to %s", meta.ObjectRef, meta.FromVersion, meta.ToVersion)
|
||||
if err := h.migrator.Migrate(); err != nil {
|
||||
return errors.Wrap(err, "migration chain calling failed")
|
||||
}
|
||||
|
||||
// Next one if existing
|
||||
h = h.next
|
||||
}
|
||||
|
||||
// Set schema version
|
||||
if _, err = conn.Do("SET", VersionKey(bm.namespace), SchemaVersion); err != nil {
|
||||
return errors.Wrap(err, "write schema version failed")
|
||||
}
|
||||
|
||||
logger.Infof("Data schema version upgraded to %s", SchemaVersion)
|
||||
|
||||
return nil
|
||||
}
|
198
src/jobservice/migration/manager_test.go
Normal file
198
src/jobservice/migration/manager_test.go
Normal file
@ -0,0 +1,198 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ManagerTestSuite tests functions of manager
|
||||
type ManagerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
pool *redis.Pool
|
||||
namespace string
|
||||
|
||||
manager Manager
|
||||
|
||||
jobID string
|
||||
numbericID int64
|
||||
}
|
||||
|
||||
// TestManagerTestSuite is entry of executing ManagerTestSuite
|
||||
func TestManagerTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(ManagerTestSuite))
|
||||
}
|
||||
|
||||
// SetupAllSuite sets up env for test suite
|
||||
func (suite *ManagerTestSuite) SetupSuite() {
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
suite.namespace = tests.GiveMeTestNamespace()
|
||||
|
||||
suite.manager = New(suite.pool, suite.namespace)
|
||||
}
|
||||
|
||||
// SetupTestSuite sets up env for each test case
|
||||
func (suite *ManagerTestSuite) SetupTest() {
|
||||
// Mock fake data
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
id := utils.MakeIdentifier()
|
||||
suite.jobID = id
|
||||
// Mock stats of periodic job
|
||||
args := []interface{}{
|
||||
rds.KeyJobStats(suite.namespace, id),
|
||||
"status_hook",
|
||||
"http://core:8080/hook",
|
||||
"id",
|
||||
id,
|
||||
"name",
|
||||
job.ImageGC,
|
||||
"kind",
|
||||
job.KindPeriodic,
|
||||
"unique",
|
||||
0,
|
||||
"status",
|
||||
job.SuccessStatus.String(), // v1.6 issue
|
||||
"ref_link",
|
||||
fmt.Sprintf("/api/v1/jobs/%s", id),
|
||||
"enqueue_time",
|
||||
time.Now().Unix(),
|
||||
"update_time",
|
||||
time.Now().Unix(),
|
||||
"run_at",
|
||||
time.Now().Add(5 * time.Minute).Unix(),
|
||||
"cron_spec",
|
||||
"0 0 17 * * *",
|
||||
"multiple_executions", // V1.7
|
||||
1,
|
||||
}
|
||||
reply, err := redis.String(conn.Do("HMSET", args...))
|
||||
require.NoError(suite.T(), err, "mock job stats data error")
|
||||
require.Equal(suite.T(), "ok", strings.ToLower(reply), "ok expected")
|
||||
|
||||
// Mock periodic job policy object
|
||||
params := make(map[string]interface{})
|
||||
params["redis_url_reg"] = "redis://redis:6379/1"
|
||||
|
||||
policy := make(map[string]interface{})
|
||||
policy["job_name"] = job.ImageGC
|
||||
policy["job_params"] = params
|
||||
policy["cron_spec"] = "0 0 17 * * *"
|
||||
|
||||
rawJSON, err := json.Marshal(&policy)
|
||||
require.NoError(suite.T(), err, "mock periodic job policy error")
|
||||
|
||||
policy["cron_spec"] = "0 0 8 * * *"
|
||||
duplicatedRawJSON, err := json.Marshal(&policy)
|
||||
require.NoError(suite.T(), err, "mock duplicated periodic job policy error")
|
||||
|
||||
score := time.Now().Unix()
|
||||
suite.numbericID = score
|
||||
zaddArgs := []interface{}{
|
||||
rds.KeyPeriodicPolicy(suite.namespace),
|
||||
score,
|
||||
rawJSON,
|
||||
score - 10,
|
||||
duplicatedRawJSON, // duplicated one
|
||||
}
|
||||
count, err := redis.Int(conn.Do("ZADD", zaddArgs...))
|
||||
require.NoError(suite.T(), err, "add raw policy error")
|
||||
require.Equal(suite.T(), 2, count)
|
||||
|
||||
// Mock key score mapping
|
||||
keyScoreArgs := []interface{}{
|
||||
fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score"),
|
||||
score,
|
||||
id,
|
||||
}
|
||||
|
||||
count, err = redis.Int(conn.Do("ZADD", keyScoreArgs...))
|
||||
require.NoError(suite.T(), err, "add key score mapping error")
|
||||
require.Equal(suite.T(), 1, count)
|
||||
}
|
||||
|
||||
// SetupTestSuite clears up env for each test case
|
||||
func (suite *ManagerTestSuite) TearDownTest() {
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
err := tests.ClearAll(suite.namespace, conn)
|
||||
assert.NoError(suite.T(), err, "clear all of redis db error")
|
||||
}
|
||||
|
||||
// TestManager test the basic functions of the manager
|
||||
func (suite *ManagerTestSuite) TestManager() {
|
||||
require.NotNil(suite.T(), suite.manager, "nil migration manager")
|
||||
|
||||
suite.manager.Register(PolicyMigratorFactory)
|
||||
err := suite.manager.Migrate()
|
||||
require.NoError(suite.T(), err, "migrating rdb error")
|
||||
|
||||
// Check data
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
count, err := redis.Int(conn.Do("ZCARD", rds.KeyPeriodicPolicy(suite.namespace)))
|
||||
assert.NoError(suite.T(), err, "get count of policies error")
|
||||
assert.Equal(suite.T(), 1, count)
|
||||
|
||||
p, err := getPeriodicPolicy(suite.numbericID, conn, suite.namespace)
|
||||
assert.NoError(suite.T(), err, "get migrated policy error")
|
||||
assert.NotEmpty(suite.T(), p.ID, "ID of policy")
|
||||
assert.NotEmpty(suite.T(), p.WebHookURL, "Web hook URL of policy")
|
||||
|
||||
key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score")
|
||||
count, err = redis.Int(conn.Do("EXISTS", key))
|
||||
assert.NoError(suite.T(), err, "check existence of key score mapping error")
|
||||
assert.Equal(suite.T(), 0, count)
|
||||
|
||||
hmGetArgs := []interface{}{
|
||||
rds.KeyJobStats(suite.namespace, suite.jobID),
|
||||
"id",
|
||||
"status",
|
||||
"web_hook_url",
|
||||
"numeric_policy_id",
|
||||
"multiple_executions",
|
||||
"status_hook",
|
||||
}
|
||||
fields, err := redis.Values(conn.Do("HMGET", hmGetArgs...))
|
||||
assert.NoError(suite.T(), err, "check migrated job stats error")
|
||||
assert.Equal(suite.T(), suite.jobID, toString(fields[0]), "check job ID")
|
||||
assert.Equal(suite.T(), job.ScheduledStatus.String(), toString(fields[1]), "check job status")
|
||||
assert.Equal(suite.T(), "http://core:8080/hook", toString(fields[2]), "check web hook URL")
|
||||
assert.Equal(suite.T(), suite.numbericID, toInt(fields[3]), "check numberic ID")
|
||||
assert.Nil(suite.T(), fields[4], "'multiple_executions' removed")
|
||||
assert.Nil(suite.T(), fields[5], "'status_hook' removed")
|
||||
}
|
38
src/jobservice/migration/migrator.go
Normal file
38
src/jobservice/migration/migrator.go
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
// RDBMigrator defines the action to migrate redis data
|
||||
type RDBMigrator interface {
|
||||
// Metadata info of the migrator
|
||||
Metadata() *MigratorMeta
|
||||
|
||||
// Migrate executes the real migration work
|
||||
Migrate() error
|
||||
}
|
||||
|
||||
// MigratorMeta keeps the base info of the migrator
|
||||
type MigratorMeta struct {
|
||||
FromVersion string
|
||||
ToVersion string
|
||||
ObjectRef string
|
||||
}
|
||||
|
||||
// MigratorFactory is factory function to create RDBMigrator interface
|
||||
type MigratorFactory func(pool *redis.Pool, namespace string) (RDBMigrator, error)
|
372
src/jobservice/migration/migrator_v180.go
Normal file
372
src/jobservice/migration/migrator_v180.go
Normal file
@ -0,0 +1,372 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/period"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// PolicyMigrator migrate the cron job policy to new schema
|
||||
type PolicyMigrator struct {
|
||||
// namespace of rdb
|
||||
namespace string
|
||||
|
||||
// Pool for connecting to redis
|
||||
pool *redis.Pool
|
||||
}
|
||||
|
||||
// PolicyMigratorFactory is a factory func to create PolicyMigrator
|
||||
func PolicyMigratorFactory(pool *redis.Pool, namespace string) (RDBMigrator, error) {
|
||||
if pool == nil {
|
||||
return nil, errors.New("PolicyMigratorFactory: missing pool")
|
||||
}
|
||||
|
||||
if utils.IsEmptyStr(namespace) {
|
||||
return nil, errors.New("PolicyMigratorFactory: missing namespace")
|
||||
}
|
||||
|
||||
return &PolicyMigrator{
|
||||
namespace: namespace,
|
||||
pool: pool,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Metadata returns the base information of this migrator
|
||||
func (pm *PolicyMigrator) Metadata() *MigratorMeta {
|
||||
return &MigratorMeta{
|
||||
FromVersion: "<1.8.0",
|
||||
ToVersion: "1.8.1",
|
||||
ObjectRef: "{namespace}:period:policies",
|
||||
}
|
||||
}
|
||||
|
||||
// Migrate data
|
||||
func (pm *PolicyMigrator) Migrate() error {
|
||||
conn := pm.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
allJobIDs, err := getAllJobStatsIDs(conn, pm.namespace)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get job stats list error")
|
||||
}
|
||||
|
||||
args := []interface{}{
|
||||
"id_placeholder",
|
||||
"id",
|
||||
"kind",
|
||||
"status",
|
||||
"status_hook", // valid for 1.6 and 1.7,
|
||||
"multiple_executions", // valid for 1.7
|
||||
"numeric_policy_id", // valid for 1.8
|
||||
}
|
||||
|
||||
count := 0
|
||||
for _, fullID := range allJobIDs {
|
||||
args[0] = fullID
|
||||
values, err := redis.Values(conn.Do("HMGET", args...))
|
||||
if err != nil {
|
||||
logger.Errorf("Get stats fields of job %s failed with error: %s", fullID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
pID := toString(values[0])
|
||||
kind := toString(values[1])
|
||||
|
||||
if !utils.IsEmptyStr(pID) && job.KindPeriodic == kind {
|
||||
logger.Debugf("Periodic job found: %s", pID)
|
||||
|
||||
// Data requires migration
|
||||
// Missing 'numeric_policy_id' which is introduced in 1.8
|
||||
if values[5] == nil {
|
||||
logger.Infof("Migrate periodic job stats data is started: %s", pID)
|
||||
|
||||
numbericPolicyID, err := getScoreByID(pID, conn, pm.namespace)
|
||||
if err != nil {
|
||||
logger.Errorf("Get numberic ID of periodic job policy failed with error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Transaction
|
||||
err = conn.Send("MULTI")
|
||||
setArgs := []interface{}{
|
||||
fullID,
|
||||
"status",
|
||||
job.ScheduledStatus.String(), // make sure the status of periodic job is "Scheduled"
|
||||
"numeric_policy_id",
|
||||
numbericPolicyID,
|
||||
}
|
||||
// If status hook existing
|
||||
hookURL := toString(values[3])
|
||||
if !utils.IsEmptyStr(hookURL) {
|
||||
setArgs = append(setArgs, "web_hook_url", hookURL)
|
||||
}
|
||||
// Set fields
|
||||
err = conn.Send("HMSET", setArgs...)
|
||||
|
||||
// Remove useless fields
|
||||
rmArgs := []interface{}{
|
||||
fullID,
|
||||
"status_hook",
|
||||
"multiple_executions",
|
||||
}
|
||||
err = conn.Send("HDEL", rmArgs...)
|
||||
|
||||
// Update periodic policy model
|
||||
// conn is working, we need new conn
|
||||
innerConn := pm.pool.Get()
|
||||
defer func() {
|
||||
_ = innerConn.Close()
|
||||
}()
|
||||
policy, er := getPeriodicPolicy(numbericPolicyID, innerConn, pm.namespace)
|
||||
if er == nil {
|
||||
policy.ID = pID
|
||||
if !utils.IsEmptyStr(hookURL) {
|
||||
// Copy web hook URL
|
||||
policy.WebHookURL = fmt.Sprintf("%s", hookURL)
|
||||
}
|
||||
|
||||
if rawJSON, er := policy.Serialize(); er == nil {
|
||||
// Remove the old one first
|
||||
err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, numbericPolicyID)
|
||||
// Save back to the rdb
|
||||
err = conn.Send("ZADD", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, rawJSON)
|
||||
} else {
|
||||
logger.Errorf("Serialize policy %s failed with error: %s", pID, er)
|
||||
}
|
||||
} else {
|
||||
logger.Errorf("Get periodic policy %s failed with error: %s", pID, er)
|
||||
}
|
||||
|
||||
// Check error before executing
|
||||
if err != nil {
|
||||
logger.Errorf("Build redis transaction failed with error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Exec
|
||||
if _, err := conn.Do("EXEC"); err != nil {
|
||||
logger.Errorf("Migrate periodic job %s failed with error: %s", pID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
logger.Infof("Migrate periodic job stats data is completed: %s", pID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("Migrate %d periodic policies", count)
|
||||
|
||||
delScoreZset(conn, pm.namespace)
|
||||
|
||||
return clearDuplicatedPolicies(conn, pm.namespace)
|
||||
}
|
||||
|
||||
// getAllJobStatsIDs get all the IDs of the existing jobs
|
||||
func getAllJobStatsIDs(conn redis.Conn, ns string) ([]string, error) {
|
||||
pattern := rds.KeyJobStats(ns, "*")
|
||||
args := []interface{}{
|
||||
0,
|
||||
"MATCH",
|
||||
pattern,
|
||||
"COUNT",
|
||||
100,
|
||||
}
|
||||
|
||||
allFullIDs := make([]interface{}, 0)
|
||||
|
||||
for {
|
||||
// Use SCAN to iterate the IDs
|
||||
values, err := redis.Values(conn.Do("SCAN", args...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// In case something wrong happened
|
||||
if len(values) != 2 {
|
||||
return nil, errors.Errorf("Invalid result returned for the SCAN command: %#v", values)
|
||||
}
|
||||
|
||||
if fullIDs, ok := values[1].([]interface{}); ok {
|
||||
allFullIDs = append(allFullIDs, fullIDs...)
|
||||
}
|
||||
|
||||
// Check the next cursor
|
||||
cur := toInt(values[0])
|
||||
if cur == -1 {
|
||||
// No valid next cursor got
|
||||
return nil, errors.Errorf("Failed to get the next SCAN cursor: %#v", values[0])
|
||||
}
|
||||
|
||||
if cur != 0 {
|
||||
args[0] = cur
|
||||
} else {
|
||||
// end
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
IDs := make([]string, 0)
|
||||
for _, fullIDValue := range allFullIDs {
|
||||
if fullID, ok := fullIDValue.([]byte); ok {
|
||||
IDs = append(IDs, string(fullID))
|
||||
} else {
|
||||
logger.Debugf("Invalid job stats key: %#v", fullIDValue)
|
||||
}
|
||||
}
|
||||
|
||||
return IDs, nil
|
||||
}
|
||||
|
||||
// Get the score with the provided ID
|
||||
func getScoreByID(id string, conn redis.Conn, ns string) (int64, error) {
|
||||
scoreKey := fmt.Sprintf("%s%s:%s", rds.KeyNamespacePrefix(ns), "period", "key_score")
|
||||
return redis.Int64(conn.Do("ZSCORE", scoreKey, id))
|
||||
}
|
||||
|
||||
// Get periodic policy object by the numeric ID
|
||||
func getPeriodicPolicy(numericID int64, conn redis.Conn, ns string) (*period.Policy, error) {
|
||||
bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), numericID, numericID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &period.Policy{}
|
||||
if len(bytes) > 0 {
|
||||
if rawPolicy, ok := bytes[0].([]byte); ok {
|
||||
if err = p.DeSerialize(rawPolicy); err == nil {
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
err = errors.Errorf("invalid data for periodic policy %d: %#v", numericID, bytes)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Clear the duplicated policy entries for the job "IMAGE_GC" and "IMAGE_SCAN_ALL"
|
||||
func clearDuplicatedPolicies(conn redis.Conn, ns string) error {
|
||||
hash := make(map[string]interface{})
|
||||
|
||||
bytes, err := redis.Values(conn.Do("ZREVRANGE", rds.KeyPeriodicPolicy(ns), 0, -1, "WITHSCORES"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := 0
|
||||
for i, l := 0, len(bytes); i < l; i = i + 2 {
|
||||
rawPolicy := bytes[i].([]byte)
|
||||
p := &period.Policy{}
|
||||
|
||||
if err := p.DeSerialize(rawPolicy); err != nil {
|
||||
logger.Errorf("DeSerialize policy: %s; error: %s\n", rawPolicy, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.JobName == job.ImageScanAllJob ||
|
||||
p.JobName == job.ImageGC ||
|
||||
p.JobName == job.ReplicationScheduler {
|
||||
score, _ := strconv.ParseInt(string(bytes[i+1].([]byte)), 10, 64)
|
||||
|
||||
key := hashKey(p)
|
||||
if _, exists := hash[key]; exists {
|
||||
// Already existing, remove the duplicated one
|
||||
res, err := redis.Int(conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), score, score))
|
||||
if err != nil || res == 0 {
|
||||
logger.Errorf("Failed to clear duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score)
|
||||
} else {
|
||||
logger.Infof("Remove duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score)
|
||||
count++
|
||||
}
|
||||
} else {
|
||||
hash[key] = score
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("Clear %d duplicated periodic policies", count)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the non-used key
|
||||
func delScoreZset(conn redis.Conn, ns string) {
|
||||
key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "period:key_score")
|
||||
reply, err := redis.Int(conn.Do("EXISTS", key))
|
||||
if err == nil && reply == 1 {
|
||||
reply, err = redis.Int(conn.Do("DEL", key))
|
||||
if err == nil && reply > 0 {
|
||||
logger.Infof("%s removed", key)
|
||||
return // success
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Just logged
|
||||
logger.Errorf("Remove %s failed with error: %s", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
func toString(v interface{}) string {
|
||||
if v == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
if bytes, ok := v.([]byte); ok {
|
||||
return string(bytes)
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func toInt(v interface{}) int64 {
|
||||
if v == nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
if bytes, ok := v.([]byte); ok {
|
||||
if intV, err := strconv.ParseInt(string(bytes), 10, 64); err == nil {
|
||||
return intV
|
||||
}
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
|
||||
func hashKey(p *period.Policy) string {
|
||||
key := p.JobName
|
||||
if p.JobParameters != nil && len(p.JobParameters) > 0 {
|
||||
if bytes, err := json.Marshal(p.JobParameters); err == nil {
|
||||
key = fmt.Sprintf("%s:%s", key, string(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
return base64.StdEncoding.EncodeToString([]byte(key))
|
||||
}
|
31
src/jobservice/migration/version.go
Normal file
31
src/jobservice/migration/version.go
Normal file
@ -0,0 +1,31 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package migration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
)
|
||||
|
||||
const (
|
||||
// SchemaVersion identifies the schema version of RDB
|
||||
SchemaVersion = "1.8.1"
|
||||
)
|
||||
|
||||
// VersionKey returns the key of redis schema
|
||||
func VersionKey(ns string) string {
|
||||
return fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "_schema_version")
|
||||
}
|
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"github.com/goharbor/harbor/src/jobservice/migration"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
@ -97,6 +98,14 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
// Get redis connection pool
|
||||
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
||||
|
||||
// Do data migration if necessary
|
||||
rdbMigrator := migration.New(redisPool, namespace)
|
||||
rdbMigrator.Register(migration.PolicyMigratorFactory)
|
||||
if err := rdbMigrator.Migrate(); err != nil {
|
||||
// Just logged, should not block the starting process
|
||||
logger.Error(err)
|
||||
}
|
||||
|
||||
// Create stats manager
|
||||
manager = mgt.NewManager(ctx, namespace, redisPool)
|
||||
// Create hook agent, it's a singleton object
|
||||
|
@ -16,6 +16,7 @@ package runtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
@ -56,15 +57,16 @@ func (suite *BootStrapTestSuite) SetupSuite() {
|
||||
|
||||
// TearDownSuite clears the test suite
|
||||
func (suite *BootStrapTestSuite) TearDownSuite() {
|
||||
suite.cancel()
|
||||
|
||||
pool := tests.GiveMeRedisPool()
|
||||
conn := pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_ = tests.ClearAll(tests.GiveMeTestNamespace(), conn)
|
||||
err := tests.ClearAll(fmt.Sprintf("{%s}", tests.GiveMeTestNamespace()), conn)
|
||||
require.NoError(suite.T(), err, "clear rdb error")
|
||||
|
||||
suite.cancel()
|
||||
}
|
||||
|
||||
// TestBootStrapTestSuite is entry of go test
|
||||
|
Loading…
Reference in New Issue
Block a user