mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-20 09:15:19 +01:00
add migration process for rdb as well as UT cases
fix issue #7988 Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
parent
d8b76c482a
commit
2bc1402357
@ -19,8 +19,8 @@ worker_pool:
|
|||||||
redis_pool:
|
redis_pool:
|
||||||
#redis://[arbitrary_username:password@]ipaddress:port/database_index
|
#redis://[arbitrary_username:password@]ipaddress:port/database_index
|
||||||
#or ipaddress:port[,weight,password,database_index]
|
#or ipaddress:port[,weight,password,database_index]
|
||||||
redis_url: "localhost:6379"
|
redis_url: "redis://localhost:6379/2"
|
||||||
namespace: "harbor_job_service"
|
namespace: "harbor_job_service_namespace"
|
||||||
|
|
||||||
#Loggers for the running job
|
#Loggers for the running job
|
||||||
job_loggers:
|
job_loggers:
|
||||||
@ -29,11 +29,11 @@ job_loggers:
|
|||||||
- name: "FILE"
|
- name: "FILE"
|
||||||
level: "DEBUG"
|
level: "DEBUG"
|
||||||
settings: # Customized settings of logger
|
settings: # Customized settings of logger
|
||||||
base_dir: "/Users/szou/tmp/job_logs"
|
base_dir: "/tmp/job_logs"
|
||||||
sweeper:
|
sweeper:
|
||||||
duration: 1 #days
|
duration: 1 #days
|
||||||
settings: # Customized settings of sweeper
|
settings: # Customized settings of sweeper
|
||||||
work_dir: "/Users/szou/tmp/job_logs"
|
work_dir: "/tmp/job_logs"
|
||||||
|
|
||||||
#Loggers for the job service
|
#Loggers for the job service
|
||||||
loggers:
|
loggers:
|
||||||
|
148
src/jobservice/migration/manager.go
Normal file
148
src/jobservice/migration/manager.go
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Masterminds/semver"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager for managing the related migrators
|
||||||
|
type Manager interface {
|
||||||
|
// Register the specified migrator to the execution chain
|
||||||
|
Register(migratorFactory MigratorFactory)
|
||||||
|
|
||||||
|
// Migrate data
|
||||||
|
Migrate() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigratorChainNode is a wrapper to append the migrator to the chain with a next reference
|
||||||
|
type MigratorChainNode struct {
|
||||||
|
// Migrator implementation
|
||||||
|
migrator RDBMigrator
|
||||||
|
// Refer the next migration of the chain if existing
|
||||||
|
next *MigratorChainNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// BasicManager is the default implementation of manager interface
|
||||||
|
type BasicManager struct {
|
||||||
|
// The head of migrator chain
|
||||||
|
head *MigratorChainNode
|
||||||
|
// Pool for connecting to redis
|
||||||
|
pool *redis.Pool
|
||||||
|
// RDB namespace
|
||||||
|
namespace string
|
||||||
|
}
|
||||||
|
|
||||||
|
// New a basic manager
|
||||||
|
func New(pool *redis.Pool, ns string) Manager {
|
||||||
|
return &BasicManager{
|
||||||
|
pool: pool,
|
||||||
|
namespace: ns,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the migrator to the chain
|
||||||
|
func (bm *BasicManager) Register(migratorFactory MigratorFactory) {
|
||||||
|
if migratorFactory == nil {
|
||||||
|
return // ignore, do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
migrator, err := migratorFactory(bm.pool, bm.namespace)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("migrator register error: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
newNode := &MigratorChainNode{
|
||||||
|
migrator: migrator,
|
||||||
|
next: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
if bm.head == nil {
|
||||||
|
bm.head = newNode
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
bm.head.next = newNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate data
|
||||||
|
func (bm *BasicManager) Migrate() error {
|
||||||
|
conn := bm.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Read schema version first
|
||||||
|
v, err := redis.String(conn.Do("GET", VersionKey(bm.namespace)))
|
||||||
|
if err != nil && err != redis.ErrNil {
|
||||||
|
return errors.Wrap(err, "read schema version failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(v) > 0 {
|
||||||
|
current, err := semver.NewVersion(v)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "malformed schema version")
|
||||||
|
}
|
||||||
|
nowV, _ := semver.NewVersion(SchemaVersion)
|
||||||
|
|
||||||
|
diff := nowV.Compare(current)
|
||||||
|
if diff < 0 {
|
||||||
|
return errors.Errorf("the schema version of migrator is smaller that the one in the rdb: %s<%s", nowV.String(), current.String())
|
||||||
|
} else if diff == 0 {
|
||||||
|
logger.Info("No migration needed")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bm.head == nil {
|
||||||
|
logger.Warning("No migrator registered, passed migration")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Process for migrating data is started")
|
||||||
|
|
||||||
|
h := bm.head
|
||||||
|
for h != nil {
|
||||||
|
meta := h.migrator.Metadata()
|
||||||
|
if meta == nil {
|
||||||
|
// Make metadata required
|
||||||
|
return errors.Errorf("no metadata provided for the migrator %s", reflect.TypeOf(h.migrator).String())
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Migrate %s from %s to %s", meta.ObjectRef, meta.FromVersion, meta.ToVersion)
|
||||||
|
if err := h.migrator.Migrate(); err != nil {
|
||||||
|
return errors.Wrap(err, "migration chain calling failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next one if existing
|
||||||
|
h = h.next
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set schema version
|
||||||
|
if _, err = conn.Do("SET", VersionKey(bm.namespace), SchemaVersion); err != nil {
|
||||||
|
return errors.Wrap(err, "write schema version failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Data schema version upgraded to %s", SchemaVersion)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
198
src/jobservice/migration/manager_test.go
Normal file
198
src/jobservice/migration/manager_test.go
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ManagerTestSuite tests functions of manager
|
||||||
|
type ManagerTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
pool *redis.Pool
|
||||||
|
namespace string
|
||||||
|
|
||||||
|
manager Manager
|
||||||
|
|
||||||
|
jobID string
|
||||||
|
numbericID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestManagerTestSuite is entry of executing ManagerTestSuite
|
||||||
|
func TestManagerTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(ManagerTestSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupAllSuite sets up env for test suite
|
||||||
|
func (suite *ManagerTestSuite) SetupSuite() {
|
||||||
|
suite.pool = tests.GiveMeRedisPool()
|
||||||
|
suite.namespace = tests.GiveMeTestNamespace()
|
||||||
|
|
||||||
|
suite.manager = New(suite.pool, suite.namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupTestSuite sets up env for each test case
|
||||||
|
func (suite *ManagerTestSuite) SetupTest() {
|
||||||
|
// Mock fake data
|
||||||
|
conn := suite.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
id := utils.MakeIdentifier()
|
||||||
|
suite.jobID = id
|
||||||
|
// Mock stats of periodic job
|
||||||
|
args := []interface{}{
|
||||||
|
rds.KeyJobStats(suite.namespace, id),
|
||||||
|
"status_hook",
|
||||||
|
"http://core:8080/hook",
|
||||||
|
"id",
|
||||||
|
id,
|
||||||
|
"name",
|
||||||
|
job.ImageGC,
|
||||||
|
"kind",
|
||||||
|
job.KindPeriodic,
|
||||||
|
"unique",
|
||||||
|
0,
|
||||||
|
"status",
|
||||||
|
job.SuccessStatus.String(), // v1.6 issue
|
||||||
|
"ref_link",
|
||||||
|
fmt.Sprintf("/api/v1/jobs/%s", id),
|
||||||
|
"enqueue_time",
|
||||||
|
time.Now().Unix(),
|
||||||
|
"update_time",
|
||||||
|
time.Now().Unix(),
|
||||||
|
"run_at",
|
||||||
|
time.Now().Add(5 * time.Minute).Unix(),
|
||||||
|
"cron_spec",
|
||||||
|
"0 0 17 * * *",
|
||||||
|
"multiple_executions", // V1.7
|
||||||
|
1,
|
||||||
|
}
|
||||||
|
reply, err := redis.String(conn.Do("HMSET", args...))
|
||||||
|
require.NoError(suite.T(), err, "mock job stats data error")
|
||||||
|
require.Equal(suite.T(), "ok", strings.ToLower(reply), "ok expected")
|
||||||
|
|
||||||
|
// Mock periodic job policy object
|
||||||
|
params := make(map[string]interface{})
|
||||||
|
params["redis_url_reg"] = "redis://redis:6379/1"
|
||||||
|
|
||||||
|
policy := make(map[string]interface{})
|
||||||
|
policy["job_name"] = job.ImageGC
|
||||||
|
policy["job_params"] = params
|
||||||
|
policy["cron_spec"] = "0 0 17 * * *"
|
||||||
|
|
||||||
|
rawJSON, err := json.Marshal(&policy)
|
||||||
|
require.NoError(suite.T(), err, "mock periodic job policy error")
|
||||||
|
|
||||||
|
policy["cron_spec"] = "0 0 8 * * *"
|
||||||
|
duplicatedRawJSON, err := json.Marshal(&policy)
|
||||||
|
require.NoError(suite.T(), err, "mock duplicated periodic job policy error")
|
||||||
|
|
||||||
|
score := time.Now().Unix()
|
||||||
|
suite.numbericID = score
|
||||||
|
zaddArgs := []interface{}{
|
||||||
|
rds.KeyPeriodicPolicy(suite.namespace),
|
||||||
|
score,
|
||||||
|
rawJSON,
|
||||||
|
score - 10,
|
||||||
|
duplicatedRawJSON, // duplicated one
|
||||||
|
}
|
||||||
|
count, err := redis.Int(conn.Do("ZADD", zaddArgs...))
|
||||||
|
require.NoError(suite.T(), err, "add raw policy error")
|
||||||
|
require.Equal(suite.T(), 2, count)
|
||||||
|
|
||||||
|
// Mock key score mapping
|
||||||
|
keyScoreArgs := []interface{}{
|
||||||
|
fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score"),
|
||||||
|
score,
|
||||||
|
id,
|
||||||
|
}
|
||||||
|
|
||||||
|
count, err = redis.Int(conn.Do("ZADD", keyScoreArgs...))
|
||||||
|
require.NoError(suite.T(), err, "add key score mapping error")
|
||||||
|
require.Equal(suite.T(), 1, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupTestSuite clears up env for each test case
|
||||||
|
func (suite *ManagerTestSuite) TearDownTest() {
|
||||||
|
conn := suite.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := tests.ClearAll(suite.namespace, conn)
|
||||||
|
assert.NoError(suite.T(), err, "clear all of redis db error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestManager test the basic functions of the manager
|
||||||
|
func (suite *ManagerTestSuite) TestManager() {
|
||||||
|
require.NotNil(suite.T(), suite.manager, "nil migration manager")
|
||||||
|
|
||||||
|
suite.manager.Register(PolicyMigratorFactory)
|
||||||
|
err := suite.manager.Migrate()
|
||||||
|
require.NoError(suite.T(), err, "migrating rdb error")
|
||||||
|
|
||||||
|
// Check data
|
||||||
|
conn := suite.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
count, err := redis.Int(conn.Do("ZCARD", rds.KeyPeriodicPolicy(suite.namespace)))
|
||||||
|
assert.NoError(suite.T(), err, "get count of policies error")
|
||||||
|
assert.Equal(suite.T(), 1, count)
|
||||||
|
|
||||||
|
p, err := getPeriodicPolicy(suite.numbericID, conn, suite.namespace)
|
||||||
|
assert.NoError(suite.T(), err, "get migrated policy error")
|
||||||
|
assert.NotEmpty(suite.T(), p.ID, "ID of policy")
|
||||||
|
assert.NotEmpty(suite.T(), p.WebHookURL, "Web hook URL of policy")
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "period:key_score")
|
||||||
|
count, err = redis.Int(conn.Do("EXISTS", key))
|
||||||
|
assert.NoError(suite.T(), err, "check existence of key score mapping error")
|
||||||
|
assert.Equal(suite.T(), 0, count)
|
||||||
|
|
||||||
|
hmGetArgs := []interface{}{
|
||||||
|
rds.KeyJobStats(suite.namespace, suite.jobID),
|
||||||
|
"id",
|
||||||
|
"status",
|
||||||
|
"web_hook_url",
|
||||||
|
"numeric_policy_id",
|
||||||
|
"multiple_executions",
|
||||||
|
"status_hook",
|
||||||
|
}
|
||||||
|
fields, err := redis.Values(conn.Do("HMGET", hmGetArgs...))
|
||||||
|
assert.NoError(suite.T(), err, "check migrated job stats error")
|
||||||
|
assert.Equal(suite.T(), suite.jobID, toString(fields[0]), "check job ID")
|
||||||
|
assert.Equal(suite.T(), job.ScheduledStatus.String(), toString(fields[1]), "check job status")
|
||||||
|
assert.Equal(suite.T(), "http://core:8080/hook", toString(fields[2]), "check web hook URL")
|
||||||
|
assert.Equal(suite.T(), suite.numbericID, toInt(fields[3]), "check numberic ID")
|
||||||
|
assert.Nil(suite.T(), fields[4], "'multiple_executions' removed")
|
||||||
|
assert.Nil(suite.T(), fields[5], "'status_hook' removed")
|
||||||
|
}
|
38
src/jobservice/migration/migrator.go
Normal file
38
src/jobservice/migration/migrator.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RDBMigrator defines the action to migrate redis data
|
||||||
|
type RDBMigrator interface {
|
||||||
|
// Metadata info of the migrator
|
||||||
|
Metadata() *MigratorMeta
|
||||||
|
|
||||||
|
// Migrate executes the real migration work
|
||||||
|
Migrate() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigratorMeta keeps the base info of the migrator
|
||||||
|
type MigratorMeta struct {
|
||||||
|
FromVersion string
|
||||||
|
ToVersion string
|
||||||
|
ObjectRef string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigratorFactory is factory function to create RDBMigrator interface
|
||||||
|
type MigratorFactory func(pool *redis.Pool, namespace string) (RDBMigrator, error)
|
372
src/jobservice/migration/migrator_v180.go
Normal file
372
src/jobservice/migration/migrator_v180.go
Normal file
@ -0,0 +1,372 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/period"
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PolicyMigrator migrate the cron job policy to new schema
|
||||||
|
type PolicyMigrator struct {
|
||||||
|
// namespace of rdb
|
||||||
|
namespace string
|
||||||
|
|
||||||
|
// Pool for connecting to redis
|
||||||
|
pool *redis.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// PolicyMigratorFactory is a factory func to create PolicyMigrator
|
||||||
|
func PolicyMigratorFactory(pool *redis.Pool, namespace string) (RDBMigrator, error) {
|
||||||
|
if pool == nil {
|
||||||
|
return nil, errors.New("PolicyMigratorFactory: missing pool")
|
||||||
|
}
|
||||||
|
|
||||||
|
if utils.IsEmptyStr(namespace) {
|
||||||
|
return nil, errors.New("PolicyMigratorFactory: missing namespace")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PolicyMigrator{
|
||||||
|
namespace: namespace,
|
||||||
|
pool: pool,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metadata returns the base information of this migrator
|
||||||
|
func (pm *PolicyMigrator) Metadata() *MigratorMeta {
|
||||||
|
return &MigratorMeta{
|
||||||
|
FromVersion: "<1.8.0",
|
||||||
|
ToVersion: "1.8.1",
|
||||||
|
ObjectRef: "{namespace}:period:policies",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrate data
|
||||||
|
func (pm *PolicyMigrator) Migrate() error {
|
||||||
|
conn := pm.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
allJobIDs, err := getAllJobStatsIDs(conn, pm.namespace)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "get job stats list error")
|
||||||
|
}
|
||||||
|
|
||||||
|
args := []interface{}{
|
||||||
|
"id_placeholder",
|
||||||
|
"id",
|
||||||
|
"kind",
|
||||||
|
"status",
|
||||||
|
"status_hook", // valid for 1.6 and 1.7,
|
||||||
|
"multiple_executions", // valid for 1.7
|
||||||
|
"numeric_policy_id", // valid for 1.8
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for _, fullID := range allJobIDs {
|
||||||
|
args[0] = fullID
|
||||||
|
values, err := redis.Values(conn.Do("HMGET", args...))
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Get stats fields of job %s failed with error: %s", fullID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pID := toString(values[0])
|
||||||
|
kind := toString(values[1])
|
||||||
|
|
||||||
|
if !utils.IsEmptyStr(pID) && job.KindPeriodic == kind {
|
||||||
|
logger.Debugf("Periodic job found: %s", pID)
|
||||||
|
|
||||||
|
// Data requires migration
|
||||||
|
// Missing 'numeric_policy_id' which is introduced in 1.8
|
||||||
|
if values[5] == nil {
|
||||||
|
logger.Infof("Migrate periodic job stats data is started: %s", pID)
|
||||||
|
|
||||||
|
numbericPolicyID, err := getScoreByID(pID, conn, pm.namespace)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Get numberic ID of periodic job policy failed with error: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transaction
|
||||||
|
err = conn.Send("MULTI")
|
||||||
|
setArgs := []interface{}{
|
||||||
|
fullID,
|
||||||
|
"status",
|
||||||
|
job.ScheduledStatus.String(), // make sure the status of periodic job is "Scheduled"
|
||||||
|
"numeric_policy_id",
|
||||||
|
numbericPolicyID,
|
||||||
|
}
|
||||||
|
// If status hook existing
|
||||||
|
hookURL := toString(values[3])
|
||||||
|
if !utils.IsEmptyStr(hookURL) {
|
||||||
|
setArgs = append(setArgs, "web_hook_url", hookURL)
|
||||||
|
}
|
||||||
|
// Set fields
|
||||||
|
err = conn.Send("HMSET", setArgs...)
|
||||||
|
|
||||||
|
// Remove useless fields
|
||||||
|
rmArgs := []interface{}{
|
||||||
|
fullID,
|
||||||
|
"status_hook",
|
||||||
|
"multiple_executions",
|
||||||
|
}
|
||||||
|
err = conn.Send("HDEL", rmArgs...)
|
||||||
|
|
||||||
|
// Update periodic policy model
|
||||||
|
// conn is working, we need new conn
|
||||||
|
innerConn := pm.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = innerConn.Close()
|
||||||
|
}()
|
||||||
|
policy, er := getPeriodicPolicy(numbericPolicyID, innerConn, pm.namespace)
|
||||||
|
if er == nil {
|
||||||
|
policy.ID = pID
|
||||||
|
if !utils.IsEmptyStr(hookURL) {
|
||||||
|
// Copy web hook URL
|
||||||
|
policy.WebHookURL = fmt.Sprintf("%s", hookURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rawJSON, er := policy.Serialize(); er == nil {
|
||||||
|
// Remove the old one first
|
||||||
|
err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, numbericPolicyID)
|
||||||
|
// Save back to the rdb
|
||||||
|
err = conn.Send("ZADD", rds.KeyPeriodicPolicy(pm.namespace), numbericPolicyID, rawJSON)
|
||||||
|
} else {
|
||||||
|
logger.Errorf("Serialize policy %s failed with error: %s", pID, er)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.Errorf("Get periodic policy %s failed with error: %s", pID, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check error before executing
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Build redis transaction failed with error: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec
|
||||||
|
if _, err := conn.Do("EXEC"); err != nil {
|
||||||
|
logger.Errorf("Migrate periodic job %s failed with error: %s", pID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
logger.Infof("Migrate periodic job stats data is completed: %s", pID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Migrate %d periodic policies", count)
|
||||||
|
|
||||||
|
delScoreZset(conn, pm.namespace)
|
||||||
|
|
||||||
|
return clearDuplicatedPolicies(conn, pm.namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getAllJobStatsIDs get all the IDs of the existing jobs
|
||||||
|
func getAllJobStatsIDs(conn redis.Conn, ns string) ([]string, error) {
|
||||||
|
pattern := rds.KeyJobStats(ns, "*")
|
||||||
|
args := []interface{}{
|
||||||
|
0,
|
||||||
|
"MATCH",
|
||||||
|
pattern,
|
||||||
|
"COUNT",
|
||||||
|
100,
|
||||||
|
}
|
||||||
|
|
||||||
|
allFullIDs := make([]interface{}, 0)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Use SCAN to iterate the IDs
|
||||||
|
values, err := redis.Values(conn.Do("SCAN", args...))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// In case something wrong happened
|
||||||
|
if len(values) != 2 {
|
||||||
|
return nil, errors.Errorf("Invalid result returned for the SCAN command: %#v", values)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fullIDs, ok := values[1].([]interface{}); ok {
|
||||||
|
allFullIDs = append(allFullIDs, fullIDs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the next cursor
|
||||||
|
cur := toInt(values[0])
|
||||||
|
if cur == -1 {
|
||||||
|
// No valid next cursor got
|
||||||
|
return nil, errors.Errorf("Failed to get the next SCAN cursor: %#v", values[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
if cur != 0 {
|
||||||
|
args[0] = cur
|
||||||
|
} else {
|
||||||
|
// end
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
IDs := make([]string, 0)
|
||||||
|
for _, fullIDValue := range allFullIDs {
|
||||||
|
if fullID, ok := fullIDValue.([]byte); ok {
|
||||||
|
IDs = append(IDs, string(fullID))
|
||||||
|
} else {
|
||||||
|
logger.Debugf("Invalid job stats key: %#v", fullIDValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return IDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the score with the provided ID
|
||||||
|
func getScoreByID(id string, conn redis.Conn, ns string) (int64, error) {
|
||||||
|
scoreKey := fmt.Sprintf("%s%s:%s", rds.KeyNamespacePrefix(ns), "period", "key_score")
|
||||||
|
return redis.Int64(conn.Do("ZSCORE", scoreKey, id))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get periodic policy object by the numeric ID
|
||||||
|
func getPeriodicPolicy(numericID int64, conn redis.Conn, ns string) (*period.Policy, error) {
|
||||||
|
bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), numericID, numericID))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &period.Policy{}
|
||||||
|
if len(bytes) > 0 {
|
||||||
|
if rawPolicy, ok := bytes[0].([]byte); ok {
|
||||||
|
if err = p.DeSerialize(rawPolicy); err == nil {
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
err = errors.Errorf("invalid data for periodic policy %d: %#v", numericID, bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the duplicated policy entries for the job "IMAGE_GC" and "IMAGE_SCAN_ALL"
|
||||||
|
func clearDuplicatedPolicies(conn redis.Conn, ns string) error {
|
||||||
|
hash := make(map[string]interface{})
|
||||||
|
|
||||||
|
bytes, err := redis.Values(conn.Do("ZREVRANGE", rds.KeyPeriodicPolicy(ns), 0, -1, "WITHSCORES"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for i, l := 0, len(bytes); i < l; i = i + 2 {
|
||||||
|
rawPolicy := bytes[i].([]byte)
|
||||||
|
p := &period.Policy{}
|
||||||
|
|
||||||
|
if err := p.DeSerialize(rawPolicy); err != nil {
|
||||||
|
logger.Errorf("DeSerialize policy: %s; error: %s\n", rawPolicy, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.JobName == job.ImageScanAllJob ||
|
||||||
|
p.JobName == job.ImageGC ||
|
||||||
|
p.JobName == job.ReplicationScheduler {
|
||||||
|
score, _ := strconv.ParseInt(string(bytes[i+1].([]byte)), 10, 64)
|
||||||
|
|
||||||
|
key := hashKey(p)
|
||||||
|
if _, exists := hash[key]; exists {
|
||||||
|
// Already existing, remove the duplicated one
|
||||||
|
res, err := redis.Int(conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), score, score))
|
||||||
|
if err != nil || res == 0 {
|
||||||
|
logger.Errorf("Failed to clear duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score)
|
||||||
|
} else {
|
||||||
|
logger.Infof("Remove duplicated periodic policy: %s-%s:%v", p.JobName, p.ID, score)
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
hash[key] = score
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Infof("Clear %d duplicated periodic policies", count)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the non-used key
|
||||||
|
func delScoreZset(conn redis.Conn, ns string) {
|
||||||
|
key := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "period:key_score")
|
||||||
|
reply, err := redis.Int(conn.Do("EXISTS", key))
|
||||||
|
if err == nil && reply == 1 {
|
||||||
|
reply, err = redis.Int(conn.Do("DEL", key))
|
||||||
|
if err == nil && reply > 0 {
|
||||||
|
logger.Infof("%s removed", key)
|
||||||
|
return // success
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Just logged
|
||||||
|
logger.Errorf("Remove %s failed with error: %s", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func toString(v interface{}) string {
|
||||||
|
if v == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes, ok := v.([]byte); ok {
|
||||||
|
return string(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func toInt(v interface{}) int64 {
|
||||||
|
if v == nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes, ok := v.([]byte); ok {
|
||||||
|
if intV, err := strconv.ParseInt(string(bytes), 10, 64); err == nil {
|
||||||
|
return intV
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashKey(p *period.Policy) string {
|
||||||
|
key := p.JobName
|
||||||
|
if p.JobParameters != nil && len(p.JobParameters) > 0 {
|
||||||
|
if bytes, err := json.Marshal(p.JobParameters); err == nil {
|
||||||
|
key = fmt.Sprintf("%s:%s", key, string(bytes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return base64.StdEncoding.EncodeToString([]byte(key))
|
||||||
|
}
|
31
src/jobservice/migration/version.go
Normal file
31
src/jobservice/migration/version.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// SchemaVersion identifies the schema version of RDB
|
||||||
|
SchemaVersion = "1.8.1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VersionKey returns the key of redis schema
|
||||||
|
func VersionKey(ns string) string {
|
||||||
|
return fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(ns), "_schema_version")
|
||||||
|
}
|
@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/migration"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
@ -97,6 +98,14 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
|||||||
// Get redis connection pool
|
// Get redis connection pool
|
||||||
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
||||||
|
|
||||||
|
// Do data migration if necessary
|
||||||
|
rdbMigrator := migration.New(redisPool, namespace)
|
||||||
|
rdbMigrator.Register(migration.PolicyMigratorFactory)
|
||||||
|
if err := rdbMigrator.Migrate(); err != nil {
|
||||||
|
// Just logged, should not block the starting process
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Create stats manager
|
// Create stats manager
|
||||||
manager = mgt.NewManager(ctx, namespace, redisPool)
|
manager = mgt.NewManager(ctx, namespace, redisPool)
|
||||||
// Create hook agent, it's a singleton object
|
// Create hook agent, it's a singleton object
|
||||||
|
@ -16,6 +16,7 @@ package runtime
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/config"
|
"github.com/goharbor/harbor/src/jobservice/config"
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
@ -56,15 +57,16 @@ func (suite *BootStrapTestSuite) SetupSuite() {
|
|||||||
|
|
||||||
// TearDownSuite clears the test suite
|
// TearDownSuite clears the test suite
|
||||||
func (suite *BootStrapTestSuite) TearDownSuite() {
|
func (suite *BootStrapTestSuite) TearDownSuite() {
|
||||||
suite.cancel()
|
|
||||||
|
|
||||||
pool := tests.GiveMeRedisPool()
|
pool := tests.GiveMeRedisPool()
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_ = tests.ClearAll(tests.GiveMeTestNamespace(), conn)
|
err := tests.ClearAll(fmt.Sprintf("{%s}", tests.GiveMeTestNamespace()), conn)
|
||||||
|
require.NoError(suite.T(), err, "clear rdb error")
|
||||||
|
|
||||||
|
suite.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestBootStrapTestSuite is entry of go test
|
// TestBootStrapTestSuite is entry of go test
|
||||||
|
Loading…
Reference in New Issue
Block a user