mirror of
https://github.com/goharbor/harbor.git
synced 2025-03-01 18:21:20 +01:00
Add sweeper to clear the outdated data before the workerpool is starting
This commit is contained in:
parent
7b8971e930
commit
6b46844565
@ -8,7 +8,7 @@ https_config:
|
||||
key: "server.key"
|
||||
|
||||
#Server listening port
|
||||
port: 8443
|
||||
port: 9443
|
||||
|
||||
#Worker pool
|
||||
worker_pool:
|
||||
|
@ -3,8 +3,6 @@
|
||||
package period
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
@ -12,6 +10,7 @@ import (
|
||||
"github.com/gocraft/work"
|
||||
"github.com/robfig/cron"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -93,7 +92,7 @@ func (pe *periodicEnqueuer) loop() {
|
||||
}
|
||||
|
||||
func (pe *periodicEnqueuer) enqueue() error {
|
||||
now := nowEpochSeconds()
|
||||
now := utils.NowEpochSeconds()
|
||||
nowTime := time.Unix(now, 0)
|
||||
horizon := nowTime.Add(periodicEnqueuerHorizon)
|
||||
|
||||
@ -114,7 +113,7 @@ func (pe *periodicEnqueuer) enqueue() error {
|
||||
}
|
||||
for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
|
||||
epoch := t.Unix()
|
||||
id := makeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it
|
||||
id := utils.MakeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it
|
||||
|
||||
job := &work.Job{
|
||||
Name: pj.jobName,
|
||||
@ -125,12 +124,12 @@ func (pe *periodicEnqueuer) enqueue() error {
|
||||
Args: pl.JobParameters, //Pass parameters to scheduled job here
|
||||
}
|
||||
|
||||
rawJSON, err := serializeJob(job)
|
||||
rawJSON, err := utils.SerializeJob(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
|
||||
_, err = conn.Do("ZADD", utils.RedisKeyScheduled(pe.namespace), epoch, rawJSON)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -139,7 +138,7 @@ func (pe *periodicEnqueuer) enqueue() error {
|
||||
}
|
||||
}
|
||||
|
||||
_, err := conn.Do("SET", redisKeyLastPeriodicEnqueue(pe.namespace), now)
|
||||
_, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now)
|
||||
|
||||
return err
|
||||
}
|
||||
@ -148,7 +147,7 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool {
|
||||
conn := pe.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
lastEnqueue, err := redis.Int64(conn.Do("GET", redisKeyLastPeriodicEnqueue(pe.namespace)))
|
||||
lastEnqueue, err := redis.Int64(conn.Do("GET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace)))
|
||||
if err == redis.ErrNil {
|
||||
return true
|
||||
} else if err != nil {
|
||||
@ -156,38 +155,5 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
return lastEnqueue < (nowEpochSeconds() - int64(periodicEnqueuerSleep/time.Minute))
|
||||
}
|
||||
|
||||
var nowMock int64
|
||||
|
||||
func nowEpochSeconds() int64 {
|
||||
if nowMock != 0 {
|
||||
return nowMock
|
||||
}
|
||||
return time.Now().Unix()
|
||||
}
|
||||
|
||||
func makeUniquePeriodicID(name, spec string, epoch int64) string {
|
||||
return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch)
|
||||
}
|
||||
|
||||
func serializeJob(job *work.Job) ([]byte, error) {
|
||||
return json.Marshal(job)
|
||||
}
|
||||
|
||||
func redisNamespacePrefix(namespace string) string {
|
||||
l := len(namespace)
|
||||
if (l > 0) && (namespace[l-1] != ':') {
|
||||
namespace = namespace + ":"
|
||||
}
|
||||
return namespace
|
||||
}
|
||||
|
||||
func redisKeyScheduled(namespace string) string {
|
||||
return redisNamespacePrefix(namespace) + "scheduled"
|
||||
}
|
||||
|
||||
func redisKeyLastPeriodicEnqueue(namespace string) string {
|
||||
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||
return lastEnqueue < (utils.NowEpochSeconds() - int64(periodicEnqueuerSleep/time.Minute))
|
||||
}
|
||||
|
122
src/jobservice_v2/period/sweeper.go
Normal file
122
src/jobservice_v2/period/sweeper.go
Normal file
@ -0,0 +1,122 @@
|
||||
package period
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
//Sweeper take charge of clearing the outdated data such as scheduled jobs etc..
|
||||
//Currently, only used in redis worker pool.
|
||||
type Sweeper struct {
|
||||
redisPool *redis.Pool
|
||||
client *work.Client
|
||||
namespace string
|
||||
}
|
||||
|
||||
//NewSweeper is constructor of Sweeper.
|
||||
func NewSweeper(namespace string, pool *redis.Pool, client *work.Client) *Sweeper {
|
||||
return &Sweeper{
|
||||
namespace: namespace,
|
||||
redisPool: pool,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
//ClearOutdatedScheduledJobs clears the outdated scheduled jobs.
|
||||
//Try best to do
|
||||
func (s *Sweeper) ClearOutdatedScheduledJobs() error {
|
||||
//Check if other workpool has done the action
|
||||
conn := s.redisPool.Get()
|
||||
|
||||
//Lock
|
||||
r, err := conn.Do("SET", utils.KeyPeriodicLock(s.namespace), time.Now().Unix(), "EX", 30, "NX")
|
||||
defer func() {
|
||||
//Make sure it can be unlocked if it is not expired yet
|
||||
conn.Do("DEL", utils.KeyPeriodicLock(s.namespace))
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if r == nil {
|
||||
//Action is already locked by other workerpool
|
||||
log.Info("Ignore clear outdated scheduled jobs")
|
||||
return nil
|
||||
}
|
||||
|
||||
nowEpoch := time.Now().Unix()
|
||||
jobScores, err := GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allErrors := make([]error, 0)
|
||||
for _, jobScore := range jobScores {
|
||||
j, err := utils.DeSerializeJob(jobScore.JobBytes)
|
||||
if err != nil {
|
||||
allErrors = append(allErrors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = s.client.DeleteScheduledJob(jobScore.Score, j.ID); err != nil {
|
||||
allErrors = append(allErrors, err)
|
||||
}
|
||||
|
||||
log.Infof("Clear outdated scheduled job: %s run at %#v\n", j.ID, time.Unix(jobScore.Score, 0).String())
|
||||
}
|
||||
|
||||
//Unlock
|
||||
if len(allErrors) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(allErrors) == 1 {
|
||||
return allErrors[0]
|
||||
}
|
||||
|
||||
errorSummary := allErrors[0].Error()
|
||||
for index, e := range allErrors {
|
||||
if index == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
errorSummary = fmt.Sprintf("%s, %s", errorSummary, e)
|
||||
}
|
||||
return fmt.Errorf("%s", errorSummary)
|
||||
}
|
||||
|
||||
//JobScore represents the data item with score in the redis db.
|
||||
type JobScore struct {
|
||||
JobBytes []byte
|
||||
Score int64
|
||||
}
|
||||
|
||||
//GetZsetByScore get the items from the zset filtered by the specified score scope.
|
||||
func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) {
|
||||
if pool == nil || utils.IsEmptyStr(key) || len(scores) < 2 {
|
||||
return nil, errors.New("bad arguments")
|
||||
}
|
||||
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var jobsWithScores []JobScore
|
||||
|
||||
if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobsWithScores, nil
|
||||
}
|
@ -29,6 +29,7 @@ type GoCraftWorkPool struct {
|
||||
redisPool *redis.Pool
|
||||
pool *work.WorkerPool
|
||||
enqueuer *work.Enqueuer
|
||||
sweeper *period.Sweeper
|
||||
client *work.Client
|
||||
context *env.Context
|
||||
scheduler period.Interface
|
||||
@ -71,11 +72,13 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool
|
||||
enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool)
|
||||
client := work.NewClient(cfg.Namespace, redisPool)
|
||||
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
|
||||
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
|
||||
return &GoCraftWorkPool{
|
||||
redisPool: redisPool,
|
||||
pool: pool,
|
||||
enqueuer: enqueuer,
|
||||
scheduler: scheduler,
|
||||
sweeper: sweeper,
|
||||
client: client,
|
||||
context: ctx,
|
||||
knownJobs: make(map[string]bool),
|
||||
@ -115,6 +118,12 @@ func (gcwp *GoCraftWorkPool) Start() {
|
||||
gcwp.context.WG.Done()
|
||||
}()
|
||||
|
||||
//Clear dirty data before pool starting
|
||||
if err := gcwp.sweeper.ClearOutdatedScheduledJobs(); err != nil {
|
||||
//Only logged
|
||||
log.Errorf("Clear outdated data before pool starting failed with error:%s\n", err)
|
||||
}
|
||||
|
||||
//Append middlewares
|
||||
gcwp.pool.Middleware((*RedisPoolContext).logJob)
|
||||
|
||||
@ -252,6 +261,11 @@ func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (bool, bool) {
|
||||
return ok, v
|
||||
}
|
||||
|
||||
//Clear the invalid data on redis db, such as outdated scheduled jobs etc.
|
||||
func (gcwp *GoCraftWorkPool) clearDirtyData() {
|
||||
|
||||
}
|
||||
|
||||
//log the job
|
||||
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
|
||||
//TODO: Also update the job status to 'pending'
|
||||
|
60
src/jobservice_v2/utils/gocarft_work.go
Normal file
60
src/jobservice_v2/utils/gocarft_work.go
Normal file
@ -0,0 +1,60 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
)
|
||||
|
||||
//Functions defined here are mainly from dep lib "github.com/gocraft/work".
|
||||
//Only for compatible
|
||||
|
||||
//MakeUniquePeriodicID creates id for the periodic job.
|
||||
func MakeUniquePeriodicID(name, spec string, epoch int64) string {
|
||||
return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch)
|
||||
}
|
||||
|
||||
//RedisNamespacePrefix ... Same with 'KeyNamespacePrefix', only for compatiblity.
|
||||
func RedisNamespacePrefix(namespace string) string {
|
||||
return KeyNamespacePrefix(namespace)
|
||||
}
|
||||
|
||||
//RedisKeyScheduled returns key of scheduled job.
|
||||
func RedisKeyScheduled(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "scheduled"
|
||||
}
|
||||
|
||||
//RedisKeyLastPeriodicEnqueue returns key of timestamp if last periodic enqueue.
|
||||
func RedisKeyLastPeriodicEnqueue(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||
}
|
||||
|
||||
var nowMock int64
|
||||
|
||||
//NowEpochSeconds ...
|
||||
func NowEpochSeconds() int64 {
|
||||
if nowMock != 0 {
|
||||
return nowMock
|
||||
}
|
||||
return time.Now().Unix()
|
||||
}
|
||||
|
||||
//SetNowEpochSecondsMock ...
|
||||
func SetNowEpochSecondsMock(t int64) {
|
||||
nowMock = t
|
||||
}
|
||||
|
||||
//SerializeJob encodes work.Job to json data.
|
||||
func SerializeJob(job *work.Job) ([]byte, error) {
|
||||
return json.Marshal(job)
|
||||
}
|
||||
|
||||
//DeSerializeJob decodes bytes to ptr of work.Job.
|
||||
func DeSerializeJob(jobBytes []byte) (*work.Job, error) {
|
||||
var j work.Job
|
||||
err := json.Unmarshal(jobBytes, &j)
|
||||
|
||||
return &j, err
|
||||
}
|
@ -76,3 +76,8 @@ func KeyPeriodicPolicy(namespace string) string {
|
||||
func KeyPeriodicNotification(namespace string) string {
|
||||
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")
|
||||
}
|
||||
|
||||
//KeyPeriodicLock returns the key of locker under period
|
||||
func KeyPeriodicLock(namespace string) string {
|
||||
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "lock")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user