diff --git a/src/jobservice/config.yml b/src/jobservice/config.yml index 562317698..745e53f8b 100644 --- a/src/jobservice/config.yml +++ b/src/jobservice/config.yml @@ -19,8 +19,8 @@ worker_pool: redis_pool: #redis://[arbitrary_username:password@]ipaddress:port/database_index #or ipaddress:port[,weight,password,database_index] - redis_url: "localhost:6379" - namespace: "harbor_job_service" + redis_url: "redis://localhost:6379/2" + namespace: "harbor_job_service_namespace" #Loggers for the running job job_loggers: @@ -29,11 +29,11 @@ job_loggers: - name: "FILE" level: "DEBUG" settings: # Customized settings of logger - base_dir: "/Users/szou/tmp/job_logs" + base_dir: "/tmp/job_logs" sweeper: duration: 1 #days settings: # Customized settings of sweeper - work_dir: "/Users/szou/tmp/job_logs" + work_dir: "/tmp/job_logs" #Loggers for the job service loggers: diff --git a/src/jobservice/migration/manager.go b/src/jobservice/migration/manager.go new file mode 100644 index 000000000..2c540d579 --- /dev/null +++ b/src/jobservice/migration/manager.go @@ -0,0 +1,148 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "github.com/Masterminds/semver" + "reflect" + + "github.com/gomodule/redigo/redis" + + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/pkg/errors" +) + +// Manager for managing the related migrators +type Manager interface { + // Register the specified migrator to the execution chain + Register(migratorFactory MigratorFactory) + + // Migrate data + Migrate() error +} + +// MigratorChainNode is a wrapper to append the migrator to the chain with a next reference +type MigratorChainNode struct { + // Migrator implementation + migrator RDBMigrator + // Refer the next migration of the chain if existing + next *MigratorChainNode +} + +// BasicManager is the default implementation of manager interface +type BasicManager struct { + // The head of migrator chain + head *MigratorChainNode + // Pool for connecting to redis + pool *redis.Pool + // RDB namespace + namespace string +} + +// New a basic manager +func New(pool *redis.Pool, ns string) Manager { + return &BasicManager{ + pool: pool, + namespace: ns, + } +} + +// Register the migrator to the chain +func (bm *BasicManager) Register(migratorFactory MigratorFactory) { + if migratorFactory == nil { + return // ignore, do nothing + } + + migrator, err := migratorFactory(bm.pool, bm.namespace) + if err != nil { + logger.Errorf("migrator register error: %s", err) + return + } + + newNode := &MigratorChainNode{ + migrator: migrator, + next: nil, + } + + if bm.head == nil { + bm.head = newNode + return + } + + bm.head.next = newNode +} + +// Migrate data +func (bm *BasicManager) Migrate() error { + conn := bm.pool.Get() + defer func() { + _ = conn.Close() + }() + + // Read schema version first + v, err := redis.String(conn.Do("GET", VersionKey(bm.namespace))) + if err != nil && err != redis.ErrNil { + return errors.Wrap(err, "read schema version failed") + } + + if len(v) > 0 { + current, err := semver.NewVersion(v) + if err != nil { + return errors.Wrap(err, "malformed schema version") + } + nowV, _ := semver.NewVersion(SchemaVersion) + + diff := nowV.Compare(current) + if diff < 0 { + return errors.Errorf("the schema version of migrator is smaller that the one in the rdb: %s<%s", nowV.String(), current.String()) + } else if diff == 0 { + logger.Info("No migration needed") + return nil + } + } + + if bm.head == nil { + logger.Warning("No migrator registered, passed migration") + return nil + } + + logger.Info("Process for migrating data is started") + + h := bm.head + for h != nil { + meta := h.migrator.Metadata() + if meta == nil { + // Make metadata required + return errors.Errorf("no metadata provided for the migrator %s", reflect.TypeOf(h.migrator).String()) + } + + logger.Infof("Migrate %s from %s to %s", meta.ObjectRef, meta.FromVersion, meta.ToVersion) + if err := h.migrator.Migrate(); err != nil { + return errors.Wrap(err, "migration chain calling failed") + } + + // Next one if existing + h = h.next + } + + // Set schema version + if _, err = conn.Do("SET", VersionKey(bm.namespace), SchemaVersion); err != nil { + return errors.Wrap(err, "write schema version failed") + } + + logger.Infof("Data schema version upgraded to %s", SchemaVersion) + + return nil +} diff --git a/src/jobservice/migration/manager_test.go b/src/jobservice/migration/manager_test.go new file mode 100644 index 000000000..8573ea562 --- /dev/null +++ b/src/jobservice/migration/manager_test.go @@ -0,0 +1,198 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "encoding/json" + "fmt" + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/tests" + "github.com/gomodule/redigo/redis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "strings" + "testing" + "time" +) + +// ManagerTestSuite tests functions of manager +type ManagerTestSuite struct { + suite.Suite + + pool *redis.Pool + namespace string + + manager Manager + + jobID string + numbericID int64 +} + +// TestManagerTestSuite is entry of executing ManagerTestSuite +func TestManagerTestSuite(t *testing.T) { + suite.Run(t, new(ManagerTestSuite)) +} + +// SetupAllSuite sets up env for test suite +func (suite *ManagerTestSuite) SetupSuite() { + suite.pool = tests.GiveMeRedisPool() + suite.namespace = tests.GiveMeTestNamespace() + + suite.manager = New(suite.pool, suite.namespace) +} + +// SetupTestSuite sets up env for each test case +func (suite *ManagerTestSuite) SetupTest() { + // Mock fake data + conn := suite.pool.Get() + defer func() { + _ = conn.Close() + }() + + id := utils.MakeIdentifier() + suite.jobID = id + // Mock stats of periodic job + args := []interface{}{ + rds.KeyJobStats(suite.namespace, id), + "status_hook", + "http://core:8080/hook", + "id", + id, + "name", + job.ImageGC, + "kind", + job.KindPeriodic, + "unique", + 0, + "status", + job.SuccessStatus.String(), // v1.6 issue + "ref_link", + fmt.Sprintf("/api/v1/jobs/%s", id), + "enqueue_time", + time.Now().Unix(), + "update_time", + time.Now().Unix(), + "run_at", + time.Now().Add(5 * time.Minute).Unix(), + "cron_spec", + "0 0 17 * * *", + "multiple_executions", // V1.7 + 1, + } + reply, err := redis.String(conn.Do("HMSET", args...)) + require.NoError(suite.T(), err, "mock job stats data error") + require.Equal(suite.T(), "ok", strings.ToLower(reply), "ok expected") + + // Mock periodic job policy object + params := make(map[string]interface{}) + params["redis_url_reg"] = "redis://redis:6379/1" + + policy := make(map[string]interface{}) + policy["job_name"] = job.ImageGC + policy["job_params"] = params + policy["cron_spec"] = "0 0 17 * * *" + + rawJSON, err := json.Marshal(&policy) + require.NoError(suite.T(), err, "mock periodic job policy error") + + policy["cron_spec"] = "0 0 8 * * *" + duplicatedRawJSON, err := json.Marshal(&policy) + require.NoError(suite.T(), err, "mock duplicated periodic job policy error") + + score := time.Now().Unix() + suite.numbericID = score + zaddArgs := []interface{}{ + rds.KeyPeriodicPolicy(suite.namespace), + score, + rawJSON, + score - 10, + duplicatedRawJSON, // duplicated one + } + count, err := redis.Int(conn.Do("ZADD", zaddArgs...)) + require.NoError(suite.T(), err, "add raw policy error") + require.Equal(suite.T(), 2, count) + + // Mock key score mapping + keyScoreArgs := []interface{}{ + fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score"), + score, + id, + } + + count, err = redis.Int(conn.Do("ZADD", keyScoreArgs...)) + require.NoError(suite.T(), err, "add key score mapping error") + require.Equal(suite.T(), 1, count) +} + +// SetupTestSuite clears up env for each test case +func (suite *ManagerTestSuite) TearDownTest() { + conn := suite.pool.Get() + defer func() { + _ = conn.Close() + }() + + err := tests.ClearAll(suite.namespace, conn) + assert.NoError(suite.T(), err, "clear all of redis db error") +} + +// TestManager test the basic functions of the manager +func (suite *ManagerTestSuite) TestManager() { + require.NotNil(suite.T(), suite.manager, "nil migration manager") + + suite.manager.Register(PolicyMigratorFactory) + err := suite.manager.Migrate() + require.NoError(suite.T(), err, "migrating rdb error") + + // Check data + conn := suite.pool.Get() + defer func() { + _ = conn.Close() + }() + + count, err := redis.Int(conn.Do("ZCARD", rds.KeyPeriodicPolicy(suite.namespace))) + assert.NoError(suite.T(), err, "get count of policies error") + assert.Equal(suite.T(), 1, count) + + p, err := getPeriodicPolicy(suite.numbericID, conn, suite.namespace) + assert.NoError(suite.T(), err, "get migrated policy error") + assert.NotEmpty(suite.T(), p.ID, "ID of policy") + assert.NotEmpty(suite.T(), p.WebHookURL, "Web hook URL of policy") + + key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score") + count, err = redis.Int(conn.Do("EXISTS", key)) + assert.NoError(suite.T(), err, "check existence of key score mapping error") + assert.Equal(suite.T(), 0, count) + + hmGetArgs := []interface{}{ + rds.KeyJobStats(suite.namespace, suite.jobID), + "id", + "status", + "web_hook_url", + "numeric_policy_id", + "multiple_executions", + "status_hook", + } + fields, err := redis.Values(conn.Do("HMGET", hmGetArgs...)) + assert.NoError(suite.T(), err, "check migrated job stats error") + assert.Equal(suite.T(), suite.jobID, toString(fields[0]), "check job ID") + assert.Equal(suite.T(), job.ScheduledStatus.String(), toString(fields[1]), "check job status") + assert.Equal(suite.T(), "http://core:8080/hook", toString(fields[2]), "check web hook URL") + assert.Equal(suite.T(), suite.numbericID, toInt(fields[3]), "check numberic ID") + assert.Nil(suite.T(), fields[4], "'multiple_executions' removed") + assert.Nil(suite.T(), fields[5], "'status_hook' removed") +} diff --git a/src/jobservice/migration/migrator.go b/src/jobservice/migration/migrator.go new file mode 100644 index 000000000..e7535c692 --- /dev/null +++ b/src/jobservice/migration/migrator.go @@ -0,0 +1,38 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "github.com/gomodule/redigo/redis" +) + +// RDBMigrator defines the action to migrate redis data +type RDBMigrator interface { + // Metadata info of the migrator + Metadata() *MigratorMeta + + // Migrate executes the real migration work + Migrate() error +} + +// MigratorMeta keeps the base info of the migrator +type MigratorMeta struct { + FromVersion string + ToVersion string + ObjectRef string +} + +// MigratorFactory is factory function to create RDBMigrator interface +type MigratorFactory func(pool *redis.Pool, namespace string) (RDBMigrator, error) diff --git a/src/jobservice/migration/migrator_v180.go b/src/jobservice/migration/migrator_v180.go new file mode 100644 index 000000000..8812e442f --- /dev/null +++ b/src/jobservice/migration/migrator_v180.go @@ -0,0 +1,372 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/jobservice/period" + "github.com/gomodule/redigo/redis" + "github.com/pkg/errors" + "strconv" +) + +// PolicyMigrator migrate the cron job policy to new schema +type PolicyMigrator struct { + // namespace of rdb + namespace string + + // Pool for connecting to redis + pool *redis.Pool +} + +// PolicyMigratorFactory is a factory func to create PolicyMigrator +func PolicyMigratorFactory(pool *redis.Pool, namespace string) (RDBMigrator, error) { + if pool == nil { + return nil, errors.New("PolicyMigratorFactory: missing pool") + } + + if utils.IsEmptyStr(namespace) { + return nil, errors.New("PolicyMigratorFactory: missing namespace") + } + + return &PolicyMigrator{ + namespace: namespace, + pool: pool, + }, nil +} + +// Metadata returns the base information of this migrator +func (pm *PolicyMigrator) Metadata() *MigratorMeta { + return &MigratorMeta{ + FromVersion: "<1.8.0", + ToVersion: "1.8.1", + ObjectRef: "{namespace}:period:policies", + } +} + +// Migrate data +func (pm *PolicyMigrator) Migrate() error { + conn := pm.pool.Get() + defer func() { + _ = conn.Close() + }() + + allJobIDs, err := getAllJobStatsIDs(conn, pm.namespace) + if err != nil { + return errors.Wrap(err, "get job stats list error") + } + + args := []interface{}{ + "id_placeholder", + "id", + "kind", + "status", + "status_hook", // valid for 1.6 and 1.7, + "multiple_executions", // valid for 1.7 + "numeric_policy_id", // valid for 1.8 + } + + count := 0 + for _, fullID := range allJobIDs { + args[0] = fullID + values, err := redis.Values(conn.Do("HMGET", args...)) + if err != nil { + logger.Errorf("Get stats fields of job %s failed with error: %s", fullID, err) + continue + } + + pID := toString(values[0]) + kind := toString(values[1]) + + if !utils.IsEmptyStr(pID) && job.KindPeriodic == kind { + logger.Debugf("Periodic job found: %s", pID) + + // Data requires migration + // Missing 'numeric_policy_id' which is introduced in 1.8 + if values[5] == nil { + logger.Infof("Migrate periodic job stats data is started: %s", pID) + + numbericPolicyID, err := getScoreByID(pID, conn, pm.namespace) + if err != nil { + logger.Errorf("Get numberic ID of periodic job policy failed with error: %s", err) + continue + } + + // Transaction + err = conn.Send("MULTI") + setArgs := []interface{}{ + fullID, + "status", + job.ScheduledStatus.String(), // make sure the status of periodic job is "Scheduled" + "numeric_policy_id", + numbericPolicyID, + } + // If status hook existing + hookURL := toString(values[3]) + if !utils.IsEmptyStr(hookURL) { + setArgs = append(setArgs, "web_hook_url", hookURL) + } + // Set fields + err = conn.Send("HMSET", setArgs...) + + // Remove useless fields + rmArgs := []interface{}{ + fullID, + "status_hook", + "multiple_executions", + } + err = conn.Send("HDEL", rmArgs...) + + // Update periodic policy model + // conn is working, we need new conn + innerConn := pm.pool.Get() + defer func() { + _ = innerConn.Close() + }() + policy, er := getPeriodicPolicy(numbericPolicyID, innerConn, pm.namespace) + if er == nil { + policy.ID = pID + if !utils.IsEmptyStr(hookURL) { + // Copy web hook URL + policy.WebHookURL = fmt.Sprintf("%s", hookURL) + } + + if rawJSON, er := policy.Serialize(); er == nil { + // Remove the old one first + err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, numbericPolicyID) + // Save back to the rdb + err = conn.Send("ZADD", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, rawJSON) + } else { + logger.Errorf("Serialize policy %s failed with error: %s", pID, er) + } + } else { + logger.Errorf("Get periodic policy %s failed with error: %s", pID, er) + } + + // Check error before executing + if err != nil { + logger.Errorf("Build redis transaction failed with error: %s", err) + continue + } + + // Exec + if _, err := conn.Do("EXEC"); err != nil { + logger.Errorf("Migrate periodic job %s failed with error: %s", pID, err) + continue + } + + count++ + logger.Infof("Migrate periodic job stats data is completed: %s", pID) + } + } + } + + logger.Infof("Migrate %d periodic policies", count) + + delScoreZset(conn, pm.namespace) + + return clearDuplicatedPolicies(conn, pm.namespace) +} + +// getAllJobStatsIDs get all the IDs of the existing jobs +func getAllJobStatsIDs(conn redis.Conn, ns string) ([]string, error) { + pattern := rds.KeyJobStats(ns, "*") + args := []interface{}{ + 0, + "MATCH", + pattern, + "COUNT", + 100, + } + + allFullIDs := make([]interface{}, 0) + + for { + // Use SCAN to iterate the IDs + values, err := redis.Values(conn.Do("SCAN", args...)) + if err != nil { + return nil, err + } + + // In case something wrong happened + if len(values) != 2 { + return nil, errors.Errorf("Invalid result returned for the SCAN command: %#v", values) + } + + if fullIDs, ok := values[1].([]interface{}); ok { + allFullIDs = append(allFullIDs, fullIDs...) + } + + // Check the next cursor + cur := toInt(values[0]) + if cur == -1 { + // No valid next cursor got + return nil, errors.Errorf("Failed to get the next SCAN cursor: %#v", values[0]) + } + + if cur != 0 { + args[0] = cur + } else { + // end + break + } + } + + IDs := make([]string, 0) + for _, fullIDValue := range allFullIDs { + if fullID, ok := fullIDValue.([]byte); ok { + IDs = append(IDs, string(fullID)) + } else { + logger.Debugf("Invalid job stats key: %#v", fullIDValue) + } + } + + return IDs, nil +} + +// Get the score with the provided ID +func getScoreByID(id string, conn redis.Conn, ns string) (int64, error) { + scoreKey := fmt.Sprintf("%s%s:%s", rds.KeyNamespacePrefix(ns), "period", "key_score") + return redis.Int64(conn.Do("ZSCORE", scoreKey, id)) +} + +// Get periodic policy object by the numeric ID +func getPeriodicPolicy(numericID int64, conn redis.Conn, ns string) (*period.Policy, error) { + bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), numericID, numericID)) + if err != nil { + return nil, err + } + + p := &period.Policy{} + if len(bytes) > 0 { + if rawPolicy, ok := bytes[0].([]byte); ok { + if err = p.DeSerialize(rawPolicy); err == nil { + return p, nil + } + } + } + + if err == nil { + err = errors.Errorf("invalid data for periodic policy %d: %#v", numericID, bytes) + } + + return nil, err +} + +// Clear the duplicated policy entries for the job "IMAGE_GC" and "IMAGE_SCAN_ALL" +func clearDuplicatedPolicies(conn redis.Conn, ns string) error { + hash := make(map[string]interface{}) + + bytes, err := redis.Values(conn.Do("ZREVRANGE", rds.KeyPeriodicPolicy(ns), 0, -1, "WITHSCORES")) + if err != nil { + return err + } + + count := 0 + for i, l := 0, len(bytes); i < l; i = i + 2 { + rawPolicy := bytes[i].([]byte) + p := &period.Policy{} + + if err := p.DeSerialize(rawPolicy); err != nil { + logger.Errorf("DeSerialize policy: %s; error: %s\n", rawPolicy, err) + continue + } + + if p.JobName == job.ImageScanAllJob || + p.JobName == job.ImageGC || + p.JobName == job.ReplicationScheduler { + score, _ := strconv.ParseInt(string(bytes[i+1].([]byte)), 10, 64) + + key := hashKey(p) + if _, exists := hash[key]; exists { + // Already existing, remove the duplicated one + res, err := redis.Int(conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), score, score)) + if err != nil || res == 0 { + logger.Errorf("Failed to clear duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score) + } else { + logger.Infof("Remove duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score) + count++ + } + } else { + hash[key] = score + } + } + } + + logger.Infof("Clear %d duplicated periodic policies", count) + + return nil +} + +// Remove the non-used key +func delScoreZset(conn redis.Conn, ns string) { + key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "period:key_score") + reply, err := redis.Int(conn.Do("EXISTS", key)) + if err == nil && reply == 1 { + reply, err = redis.Int(conn.Do("DEL", key)) + if err == nil && reply > 0 { + logger.Infof("%s removed", key) + return // success + } + } + + if err != nil { + // Just logged + logger.Errorf("Remove %s failed with error: %s", key, err) + } +} + +func toString(v interface{}) string { + if v == nil { + return "" + } + + if bytes, ok := v.([]byte); ok { + return string(bytes) + } + + return "" +} + +func toInt(v interface{}) int64 { + if v == nil { + return -1 + } + + if bytes, ok := v.([]byte); ok { + if intV, err := strconv.ParseInt(string(bytes), 10, 64); err == nil { + return intV + } + } + + return -1 +} + +func hashKey(p *period.Policy) string { + key := p.JobName + if p.JobParameters != nil && len(p.JobParameters) > 0 { + if bytes, err := json.Marshal(p.JobParameters); err == nil { + key = fmt.Sprintf("%s:%s", key, string(bytes)) + } + } + + return base64.StdEncoding.EncodeToString([]byte(key)) +} diff --git a/src/jobservice/migration/version.go b/src/jobservice/migration/version.go new file mode 100644 index 000000000..028b9b6a9 --- /dev/null +++ b/src/jobservice/migration/version.go @@ -0,0 +1,31 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migration + +import ( + "fmt" + + "github.com/goharbor/harbor/src/jobservice/common/rds" +) + +const ( + // SchemaVersion identifies the schema version of RDB + SchemaVersion = "1.8.1" +) + +// VersionKey returns the key of redis schema +func VersionKey(ns string) string { + return fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "_schema_version") +} diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 6e722bcd8..8ecafd56d 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/goharbor/harbor/src/jobservice/mgt" + "github.com/goharbor/harbor/src/jobservice/migration" "os" "os/signal" "sync" @@ -97,6 +98,14 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) // Get redis connection pool redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL) + // Do data migration if necessary + rdbMigrator := migration.New(redisPool, namespace) + rdbMigrator.Register(migration.PolicyMigratorFactory) + if err := rdbMigrator.Migrate(); err != nil { + // Just logged, should not block the starting process + logger.Error(err) + } + // Create stats manager manager = mgt.NewManager(ctx, namespace, redisPool) // Create hook agent, it's a singleton object diff --git a/src/jobservice/runtime/bootstrap_test.go b/src/jobservice/runtime/bootstrap_test.go index 74bfa9a59..257e58479 100644 --- a/src/jobservice/runtime/bootstrap_test.go +++ b/src/jobservice/runtime/bootstrap_test.go @@ -16,6 +16,7 @@ package runtime import ( "context" + "fmt" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/logger" @@ -56,15 +57,16 @@ func (suite *BootStrapTestSuite) SetupSuite() { // TearDownSuite clears the test suite func (suite *BootStrapTestSuite) TearDownSuite() { - suite.cancel() - pool := tests.GiveMeRedisPool() conn := pool.Get() defer func() { _ = conn.Close() }() - _ = tests.ClearAll(tests.GiveMeTestNamespace(), conn) + err := tests.ClearAll(fmt.Sprintf("{%s}", tests.GiveMeTestNamespace()), conn) + require.NoError(suite.T(), err, "clear rdb error") + + suite.cancel() } // TestBootStrapTestSuite is entry of go test