diff --git a/src/jobservice/config.yml b/src/jobservice/config.yml index 22cd77ae9..fa0c5a7cf 100644 --- a/src/jobservice/config.yml +++ b/src/jobservice/config.yml @@ -29,7 +29,7 @@ job_loggers: - name: "FILE" level: "DEBUG" settings: # Customized settings of logger - base_dir: "tmp/job_logs" + base_dir: "/tmp/job_logs" sweeper: duration: 1 #days settings: # Customized settings of sweeper diff --git a/src/jobservice/pool/de_duplicator.go b/src/jobservice/pool/de_duplicator.go new file mode 100644 index 000000000..3653ae061 --- /dev/null +++ b/src/jobservice/pool/de_duplicator.go @@ -0,0 +1,94 @@ +package pool + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/goharbor/harbor/src/jobservice/models" + "github.com/goharbor/harbor/src/jobservice/utils" + "github.com/gomodule/redigo/redis" +) + +// DeDuplicator is designed to handle the uniqueness of the job. +// Once a job is declared to be unique, the job can be enqueued only if +// no same job (same job name and parameters) in the queue or running in progress. +// Adopt the same unique mechanism with the upstream framework. +type DeDuplicator struct { + // Redis namespace + namespace string + // Redis conn pool + pool *redis.Pool +} + +// NewDeDuplicator is constructor of DeDuplicator +func NewDeDuplicator(ns string, pool *redis.Pool) *DeDuplicator { + return &DeDuplicator{ + namespace: ns, + pool: pool, + } +} + +// Unique checks if the job is unique and set unique flag if it is not set yet. +func (dd *DeDuplicator) Unique(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) + if err != nil { + return fmt.Errorf("unique job error: %s", err) + } + + conn := dd.pool.Get() + defer conn.Close() + + args := []interface{}{ + uniqueKey, + 1, + "NX", + "EX", + 86400, + } + + res, err := redis.String(conn.Do("SET", args...)) + if err == nil && strings.ToUpper(res) == "OK" { + return nil + } + + return errors.New("unique job error: duplicated") +} + +// DelUniqueSign delete the job unique sign +func (dd *DeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) + if err != nil { + return fmt.Errorf("delete unique job error: %s", err) + } + + conn := dd.pool.Get() + defer conn.Close() + + if _, err := conn.Do("DEL", uniqueKey); err != nil { + return fmt.Errorf("delete unique job error: %s", err) + } + + return nil +} + +// Same key with upstream framework +func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { + var buf bytes.Buffer + + buf.WriteString(utils.KeyNamespacePrefix(namespace)) + buf.WriteString("unique:running:") + buf.WriteString(jobName) + buf.WriteRune(':') + + if args != nil { + err := json.NewEncoder(&buf).Encode(args) + if err != nil { + return "", err + } + } + + return buf.String(), nil +} diff --git a/src/jobservice/pool/redis_job_wrapper.go b/src/jobservice/pool/redis_job_wrapper.go index 2584f5567..f0b948a7f 100644 --- a/src/jobservice/pool/redis_job_wrapper.go +++ b/src/jobservice/pool/redis_job_wrapper.go @@ -37,14 +37,16 @@ type RedisJob struct { job interface{} // the real job implementation context *env.Context // context statsManager opm.JobStatsManager // job stats manager + deDuplicator *DeDuplicator // handle unique job } // NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob { +func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator *DeDuplicator) *RedisJob { return &RedisJob{ job: j, context: ctx, statsManager: statsManager, + deDuplicator: deDuplicator, } } @@ -114,6 +116,14 @@ func (rj *RedisJob) Run(j *work.Job) error { } }() + if j.Unique { + defer func() { + if err := rj.deDuplicator.DelUniqueSign(j.Name, j.Args); err != nil { + logger.Errorf("delete job unique sign error: %s", err) + } + }() + } + // Start to run rj.jobRunning(j.ID) diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index ac4b8e520..d41b74e74 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -59,6 +59,7 @@ type GoCraftWorkPool struct { scheduler period.Interface statsManager opm.JobStatsManager messageServer *MessageServer + deDuplicator *DeDuplicator // no need to sync as write once and then only read // key is name of known job @@ -79,6 +80,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr) sweeper := period.NewSweeper(namespace, redisPool, client) msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) + deDepulicator := NewDeDuplicator(namespace, redisPool) return &GoCraftWorkPool{ namespace: namespace, redisPool: redisPool, @@ -91,6 +93,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re statsManager: statsMgr, knownJobs: make(map[string]interface{}), messageServer: msgServer, + deDuplicator: deDepulicator, } } @@ -236,7 +239,7 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { } } - redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager) + redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager, gcwp.deDuplicator) // Get more info from j theJ := Wrap(j) @@ -276,15 +279,23 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i err error ) - // Enqueue job + // As the job is declared to be unique, + // check the uniqueness of the job, + // if no duplicated job existing (including the running jobs), + // set the unique flag. if isUnique { - j, err = gcwp.enqueuer.EnqueueUnique(jobName, params) - } else { - j, err = gcwp.enqueuer.Enqueue(jobName, params) - } + if err = gcwp.deDuplicator.Unique(jobName, params); err != nil { + return models.JobStats{}, err + } - if err != nil { - return models.JobStats{}, err + if j, err = gcwp.enqueuer.EnqueueUnique(jobName, params); err != nil { + return models.JobStats{}, err + } + } else { + // Enqueue job + if j, err = gcwp.enqueuer.Enqueue(jobName, params); err != nil { + return models.JobStats{}, err + } } // avoid backend pool bug @@ -307,15 +318,23 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, err error ) - // Enqueue job in + // As the job is declared to be unique, + // check the uniqueness of the job, + // if no duplicated job existing (including the running jobs), + // set the unique flag. if isUnique { - j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params) - } else { - j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params) - } + if err = gcwp.deDuplicator.Unique(jobName, params); err != nil { + return models.JobStats{}, err + } - if err != nil { - return models.JobStats{}, err + if j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params); err != nil { + return models.JobStats{}, err + } + } else { + // Enqueue job in + if j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params); err != nil { + return models.JobStats{}, err + } } // avoid backend pool bug