From ccd486a0ad1c15a0fcd2f753320f703397d7b8e4 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 30 Nov 2018 14:14:21 +0800 Subject: [PATCH 1/2] Support unique job constraints (both unique queue and running jobs) Signed-off-by: Steven Zou --- src/jobservice/config.yml | 2 +- src/jobservice/pool/de_duplicator.go | 94 ++++++++++++++++++++++++ src/jobservice/pool/redis_job_wrapper.go | 12 ++- src/jobservice/pool/redis_pool.go | 49 ++++++++---- 4 files changed, 140 insertions(+), 17 deletions(-) create mode 100644 src/jobservice/pool/de_duplicator.go diff --git a/src/jobservice/config.yml b/src/jobservice/config.yml index 22cd77ae9..fa0c5a7cf 100644 --- a/src/jobservice/config.yml +++ b/src/jobservice/config.yml @@ -29,7 +29,7 @@ job_loggers: - name: "FILE" level: "DEBUG" settings: # Customized settings of logger - base_dir: "tmp/job_logs" + base_dir: "/tmp/job_logs" sweeper: duration: 1 #days settings: # Customized settings of sweeper diff --git a/src/jobservice/pool/de_duplicator.go b/src/jobservice/pool/de_duplicator.go new file mode 100644 index 000000000..3653ae061 --- /dev/null +++ b/src/jobservice/pool/de_duplicator.go @@ -0,0 +1,94 @@ +package pool + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/goharbor/harbor/src/jobservice/models" + "github.com/goharbor/harbor/src/jobservice/utils" + "github.com/gomodule/redigo/redis" +) + +// DeDuplicator is designed to handle the uniqueness of the job. +// Once a job is declared to be unique, the job can be enqueued only if +// no same job (same job name and parameters) in the queue or running in progress. +// Adopt the same unique mechanism with the upstream framework. +type DeDuplicator struct { + // Redis namespace + namespace string + // Redis conn pool + pool *redis.Pool +} + +// NewDeDuplicator is constructor of DeDuplicator +func NewDeDuplicator(ns string, pool *redis.Pool) *DeDuplicator { + return &DeDuplicator{ + namespace: ns, + pool: pool, + } +} + +// Unique checks if the job is unique and set unique flag if it is not set yet. +func (dd *DeDuplicator) Unique(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) + if err != nil { + return fmt.Errorf("unique job error: %s", err) + } + + conn := dd.pool.Get() + defer conn.Close() + + args := []interface{}{ + uniqueKey, + 1, + "NX", + "EX", + 86400, + } + + res, err := redis.String(conn.Do("SET", args...)) + if err == nil && strings.ToUpper(res) == "OK" { + return nil + } + + return errors.New("unique job error: duplicated") +} + +// DelUniqueSign delete the job unique sign +func (dd *DeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) + if err != nil { + return fmt.Errorf("delete unique job error: %s", err) + } + + conn := dd.pool.Get() + defer conn.Close() + + if _, err := conn.Do("DEL", uniqueKey); err != nil { + return fmt.Errorf("delete unique job error: %s", err) + } + + return nil +} + +// Same key with upstream framework +func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { + var buf bytes.Buffer + + buf.WriteString(utils.KeyNamespacePrefix(namespace)) + buf.WriteString("unique:running:") + buf.WriteString(jobName) + buf.WriteRune(':') + + if args != nil { + err := json.NewEncoder(&buf).Encode(args) + if err != nil { + return "", err + } + } + + return buf.String(), nil +} diff --git a/src/jobservice/pool/redis_job_wrapper.go b/src/jobservice/pool/redis_job_wrapper.go index 2584f5567..f0b948a7f 100644 --- a/src/jobservice/pool/redis_job_wrapper.go +++ b/src/jobservice/pool/redis_job_wrapper.go @@ -37,14 +37,16 @@ type RedisJob struct { job interface{} // the real job implementation context *env.Context // context statsManager opm.JobStatsManager // job stats manager + deDuplicator *DeDuplicator // handle unique job } // NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob { +func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator *DeDuplicator) *RedisJob { return &RedisJob{ job: j, context: ctx, statsManager: statsManager, + deDuplicator: deDuplicator, } } @@ -114,6 +116,14 @@ func (rj *RedisJob) Run(j *work.Job) error { } }() + if j.Unique { + defer func() { + if err := rj.deDuplicator.DelUniqueSign(j.Name, j.Args); err != nil { + logger.Errorf("delete job unique sign error: %s", err) + } + }() + } + // Start to run rj.jobRunning(j.ID) diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index ac4b8e520..d41b74e74 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -59,6 +59,7 @@ type GoCraftWorkPool struct { scheduler period.Interface statsManager opm.JobStatsManager messageServer *MessageServer + deDuplicator *DeDuplicator // no need to sync as write once and then only read // key is name of known job @@ -79,6 +80,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr) sweeper := period.NewSweeper(namespace, redisPool, client) msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) + deDepulicator := NewDeDuplicator(namespace, redisPool) return &GoCraftWorkPool{ namespace: namespace, redisPool: redisPool, @@ -91,6 +93,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re statsManager: statsMgr, knownJobs: make(map[string]interface{}), messageServer: msgServer, + deDuplicator: deDepulicator, } } @@ -236,7 +239,7 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { } } - redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager) + redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager, gcwp.deDuplicator) // Get more info from j theJ := Wrap(j) @@ -276,15 +279,23 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i err error ) - // Enqueue job + // As the job is declared to be unique, + // check the uniqueness of the job, + // if no duplicated job existing (including the running jobs), + // set the unique flag. if isUnique { - j, err = gcwp.enqueuer.EnqueueUnique(jobName, params) - } else { - j, err = gcwp.enqueuer.Enqueue(jobName, params) - } + if err = gcwp.deDuplicator.Unique(jobName, params); err != nil { + return models.JobStats{}, err + } - if err != nil { - return models.JobStats{}, err + if j, err = gcwp.enqueuer.EnqueueUnique(jobName, params); err != nil { + return models.JobStats{}, err + } + } else { + // Enqueue job + if j, err = gcwp.enqueuer.Enqueue(jobName, params); err != nil { + return models.JobStats{}, err + } } // avoid backend pool bug @@ -307,15 +318,23 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, err error ) - // Enqueue job in + // As the job is declared to be unique, + // check the uniqueness of the job, + // if no duplicated job existing (including the running jobs), + // set the unique flag. if isUnique { - j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params) - } else { - j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params) - } + if err = gcwp.deDuplicator.Unique(jobName, params); err != nil { + return models.JobStats{}, err + } - if err != nil { - return models.JobStats{}, err + if j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params); err != nil { + return models.JobStats{}, err + } + } else { + // Enqueue job in + if j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params); err != nil { + return models.JobStats{}, err + } } // avoid backend pool bug From 84d864607cb6cd671e8f0c1ffe8dc985d4a1c99f Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 30 Nov 2018 14:27:47 +0800 Subject: [PATCH 2/2] Add UT cases for new added code Signed-off-by: Steven Zou --- src/jobservice/pool/de_duplicator.go | 59 +++++++++++++++---- src/jobservice/pool/de_duplicator_test.go | 28 +++++++++ src/jobservice/pool/redis_job_wrapper.go | 4 +- src/jobservice/pool/redis_job_wrapper_test.go | 3 +- src/jobservice/pool/redis_pool.go | 4 +- 5 files changed, 80 insertions(+), 18 deletions(-) create mode 100644 src/jobservice/pool/de_duplicator_test.go diff --git a/src/jobservice/pool/de_duplicator.go b/src/jobservice/pool/de_duplicator.go index 3653ae061..91fe2d98d 100644 --- a/src/jobservice/pool/de_duplicator.go +++ b/src/jobservice/pool/de_duplicator.go @@ -7,6 +7,7 @@ import ( "fmt" "strings" + "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/utils" "github.com/gomodule/redigo/redis" @@ -16,29 +17,53 @@ import ( // Once a job is declared to be unique, the job can be enqueued only if // no same job (same job name and parameters) in the queue or running in progress. // Adopt the same unique mechanism with the upstream framework. -type DeDuplicator struct { +type DeDuplicator interface { + // Check the uniqueness of the unique job and set the unique flag if it is not set yet. + // + // Parameters: + // jobName string : name of the job + // params models.Parameters : parameters of the job + // + // Returns: + // If no unique flag and successfully set it, a nil error is returned; + // otherwise, a non nil error is returned. + Unique(jobName string, params models.Parameters) error + + // Remove the unique flag after job exiting + // Parameters: + // jobName string : name of the job + // params models.Parameters : parameters of the job + // + // Returns: + // If unique flag is successfully removed, a nil error is returned; + // otherwise, a non nil error is returned. + DelUniqueSign(jobName string, params models.Parameters) error +} + +// RedisDeDuplicator implement the DeDuplicator interface based on redis. +type RedisDeDuplicator struct { // Redis namespace namespace string // Redis conn pool pool *redis.Pool } -// NewDeDuplicator is constructor of DeDuplicator -func NewDeDuplicator(ns string, pool *redis.Pool) *DeDuplicator { - return &DeDuplicator{ +// NewRedisDeDuplicator is constructor of RedisDeDuplicator +func NewRedisDeDuplicator(ns string, pool *redis.Pool) *RedisDeDuplicator { + return &RedisDeDuplicator{ namespace: ns, pool: pool, } } // Unique checks if the job is unique and set unique flag if it is not set yet. -func (dd *DeDuplicator) Unique(jobName string, params models.Parameters) error { - uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) +func (rdd *RedisDeDuplicator) Unique(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(rdd.namespace, jobName, params) if err != nil { return fmt.Errorf("unique job error: %s", err) } - conn := dd.pool.Get() + conn := rdd.pool.Get() defer conn.Close() args := []interface{}{ @@ -50,21 +75,29 @@ func (dd *DeDuplicator) Unique(jobName string, params models.Parameters) error { } res, err := redis.String(conn.Do("SET", args...)) - if err == nil && strings.ToUpper(res) == "OK" { - return nil + if err == redis.ErrNil { + return errs.ConflictError(uniqueKey) } - return errors.New("unique job error: duplicated") + if err == nil { + if strings.ToUpper(res) == "OK" { + return nil + } + + return errors.New("unique job error: missing 'OK' reply") + } + + return err } // DelUniqueSign delete the job unique sign -func (dd *DeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error { - uniqueKey, err := redisKeyUniqueJob(dd.namespace, jobName, params) +func (rdd *RedisDeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error { + uniqueKey, err := redisKeyUniqueJob(rdd.namespace, jobName, params) if err != nil { return fmt.Errorf("delete unique job error: %s", err) } - conn := dd.pool.Get() + conn := rdd.pool.Get() defer conn.Close() if _, err := conn.Do("DEL", uniqueKey); err != nil { diff --git a/src/jobservice/pool/de_duplicator_test.go b/src/jobservice/pool/de_duplicator_test.go new file mode 100644 index 000000000..e1607ae0f --- /dev/null +++ b/src/jobservice/pool/de_duplicator_test.go @@ -0,0 +1,28 @@ +package pool + +import ( + "testing" + + "github.com/goharbor/harbor/src/jobservice/tests" +) + +func TestDeDuplicator(t *testing.T) { + jobName := "fake_job" + jobParams := map[string]interface{}{ + "image": "ubuntu:latest", + } + + rdd := NewRedisDeDuplicator(tests.GiveMeTestNamespace(), rPool) + + if err := rdd.Unique(jobName, jobParams); err != nil { + t.Error(err) + } + + if err := rdd.Unique(jobName, jobParams); err == nil { + t.Errorf("expect duplicated error but got nil error") + } + + if err := rdd.DelUniqueSign(jobName, jobParams); err != nil { + t.Error(err) + } +} diff --git a/src/jobservice/pool/redis_job_wrapper.go b/src/jobservice/pool/redis_job_wrapper.go index f0b948a7f..2b196848a 100644 --- a/src/jobservice/pool/redis_job_wrapper.go +++ b/src/jobservice/pool/redis_job_wrapper.go @@ -37,11 +37,11 @@ type RedisJob struct { job interface{} // the real job implementation context *env.Context // context statsManager opm.JobStatsManager // job stats manager - deDuplicator *DeDuplicator // handle unique job + deDuplicator DeDuplicator // handle unique job } // NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator *DeDuplicator) *RedisJob { +func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator DeDuplicator) *RedisJob { return &RedisJob{ job: j, context: ctx, diff --git a/src/jobservice/pool/redis_job_wrapper_test.go b/src/jobservice/pool/redis_job_wrapper_test.go index 3511cdfee..c5d4122e3 100644 --- a/src/jobservice/pool/redis_job_wrapper_test.go +++ b/src/jobservice/pool/redis_job_wrapper_test.go @@ -50,7 +50,8 @@ func TestJobWrapper(t *testing.T) { WG: &sync.WaitGroup{}, ErrorChan: make(chan error, 1), // with 1 buffer } - wrapper := NewRedisJob((*fakeParentJob)(nil), envContext, mgr) + deDuplicator := NewRedisDeDuplicator(tests.GiveMeTestNamespace(), rPool) + wrapper := NewRedisJob((*fakeParentJob)(nil), envContext, mgr, deDuplicator) j := &work.Job{ ID: "FAKE", Name: "DEMO", diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index d41b74e74..362af68b6 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -59,7 +59,7 @@ type GoCraftWorkPool struct { scheduler period.Interface statsManager opm.JobStatsManager messageServer *MessageServer - deDuplicator *DeDuplicator + deDuplicator DeDuplicator // no need to sync as write once and then only read // key is name of known job @@ -80,7 +80,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr) sweeper := period.NewSweeper(namespace, redisPool, client) msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) - deDepulicator := NewDeDuplicator(namespace, redisPool) + deDepulicator := NewRedisDeDuplicator(namespace, redisPool) return &GoCraftWorkPool{ namespace: namespace, redisPool: redisPool,