Merge pull request #6396 from steven-zou/unique_job

Support unique job constraints (both unique queue and running jobs)
This commit is contained in:
Steven Zou 2018-11-30 17:03:43 +08:00 committed by GitHub
commit 9c76c375ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 203 additions and 18 deletions

View File

@ -29,7 +29,7 @@ job_loggers:
- name: "FILE" - name: "FILE"
level: "DEBUG" level: "DEBUG"
settings: # Customized settings of logger settings: # Customized settings of logger
base_dir: "tmp/job_logs" base_dir: "/tmp/job_logs"
sweeper: sweeper:
duration: 1 #days duration: 1 #days
settings: # Customized settings of sweeper settings: # Customized settings of sweeper

View File

@ -0,0 +1,127 @@
package pool
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/utils"
"github.com/gomodule/redigo/redis"
)
// DeDuplicator is designed to handle the uniqueness of the job.
// Once a job is declared to be unique, the job can be enqueued only if
// no same job (same job name and parameters) in the queue or running in progress.
// Adopt the same unique mechanism with the upstream framework.
type DeDuplicator interface {
// Check the uniqueness of the unique job and set the unique flag if it is not set yet.
//
// Parameters:
// jobName string : name of the job
// params models.Parameters : parameters of the job
//
// Returns:
// If no unique flag and successfully set it, a nil error is returned;
// otherwise, a non nil error is returned.
Unique(jobName string, params models.Parameters) error
// Remove the unique flag after job exiting
// Parameters:
// jobName string : name of the job
// params models.Parameters : parameters of the job
//
// Returns:
// If unique flag is successfully removed, a nil error is returned;
// otherwise, a non nil error is returned.
DelUniqueSign(jobName string, params models.Parameters) error
}
// RedisDeDuplicator implement the DeDuplicator interface based on redis.
type RedisDeDuplicator struct {
// Redis namespace
namespace string
// Redis conn pool
pool *redis.Pool
}
// NewRedisDeDuplicator is constructor of RedisDeDuplicator
func NewRedisDeDuplicator(ns string, pool *redis.Pool) *RedisDeDuplicator {
return &RedisDeDuplicator{
namespace: ns,
pool: pool,
}
}
// Unique checks if the job is unique and set unique flag if it is not set yet.
func (rdd *RedisDeDuplicator) Unique(jobName string, params models.Parameters) error {
uniqueKey, err := redisKeyUniqueJob(rdd.namespace, jobName, params)
if err != nil {
return fmt.Errorf("unique job error: %s", err)
}
conn := rdd.pool.Get()
defer conn.Close()
args := []interface{}{
uniqueKey,
1,
"NX",
"EX",
86400,
}
res, err := redis.String(conn.Do("SET", args...))
if err == redis.ErrNil {
return errs.ConflictError(uniqueKey)
}
if err == nil {
if strings.ToUpper(res) == "OK" {
return nil
}
return errors.New("unique job error: missing 'OK' reply")
}
return err
}
// DelUniqueSign delete the job unique sign
func (rdd *RedisDeDuplicator) DelUniqueSign(jobName string, params models.Parameters) error {
uniqueKey, err := redisKeyUniqueJob(rdd.namespace, jobName, params)
if err != nil {
return fmt.Errorf("delete unique job error: %s", err)
}
conn := rdd.pool.Get()
defer conn.Close()
if _, err := conn.Do("DEL", uniqueKey); err != nil {
return fmt.Errorf("delete unique job error: %s", err)
}
return nil
}
// Same key with upstream framework
func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) {
var buf bytes.Buffer
buf.WriteString(utils.KeyNamespacePrefix(namespace))
buf.WriteString("unique:running:")
buf.WriteString(jobName)
buf.WriteRune(':')
if args != nil {
err := json.NewEncoder(&buf).Encode(args)
if err != nil {
return "", err
}
}
return buf.String(), nil
}

View File

@ -0,0 +1,28 @@
package pool
import (
"testing"
"github.com/goharbor/harbor/src/jobservice/tests"
)
func TestDeDuplicator(t *testing.T) {
jobName := "fake_job"
jobParams := map[string]interface{}{
"image": "ubuntu:latest",
}
rdd := NewRedisDeDuplicator(tests.GiveMeTestNamespace(), rPool)
if err := rdd.Unique(jobName, jobParams); err != nil {
t.Error(err)
}
if err := rdd.Unique(jobName, jobParams); err == nil {
t.Errorf("expect duplicated error but got nil error")
}
if err := rdd.DelUniqueSign(jobName, jobParams); err != nil {
t.Error(err)
}
}

View File

@ -37,14 +37,16 @@ type RedisJob struct {
job interface{} // the real job implementation job interface{} // the real job implementation
context *env.Context // context context *env.Context // context
statsManager opm.JobStatsManager // job stats manager statsManager opm.JobStatsManager // job stats manager
deDuplicator DeDuplicator // handle unique job
} }
// NewRedisJob is constructor of RedisJob // NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob { func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager, deDuplicator DeDuplicator) *RedisJob {
return &RedisJob{ return &RedisJob{
job: j, job: j,
context: ctx, context: ctx,
statsManager: statsManager, statsManager: statsManager,
deDuplicator: deDuplicator,
} }
} }
@ -114,6 +116,14 @@ func (rj *RedisJob) Run(j *work.Job) error {
} }
}() }()
if j.Unique {
defer func() {
if err := rj.deDuplicator.DelUniqueSign(j.Name, j.Args); err != nil {
logger.Errorf("delete job unique sign error: %s", err)
}
}()
}
// Start to run // Start to run
rj.jobRunning(j.ID) rj.jobRunning(j.ID)

View File

@ -50,7 +50,8 @@ func TestJobWrapper(t *testing.T) {
WG: &sync.WaitGroup{}, WG: &sync.WaitGroup{},
ErrorChan: make(chan error, 1), // with 1 buffer ErrorChan: make(chan error, 1), // with 1 buffer
} }
wrapper := NewRedisJob((*fakeParentJob)(nil), envContext, mgr) deDuplicator := NewRedisDeDuplicator(tests.GiveMeTestNamespace(), rPool)
wrapper := NewRedisJob((*fakeParentJob)(nil), envContext, mgr, deDuplicator)
j := &work.Job{ j := &work.Job{
ID: "FAKE", ID: "FAKE",
Name: "DEMO", Name: "DEMO",

View File

@ -59,6 +59,7 @@ type GoCraftWorkPool struct {
scheduler period.Interface scheduler period.Interface
statsManager opm.JobStatsManager statsManager opm.JobStatsManager
messageServer *MessageServer messageServer *MessageServer
deDuplicator DeDuplicator
// no need to sync as write once and then only read // no need to sync as write once and then only read
// key is name of known job // key is name of known job
@ -79,6 +80,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re
scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr) scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr)
sweeper := period.NewSweeper(namespace, redisPool, client) sweeper := period.NewSweeper(namespace, redisPool, client)
msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool)
deDepulicator := NewRedisDeDuplicator(namespace, redisPool)
return &GoCraftWorkPool{ return &GoCraftWorkPool{
namespace: namespace, namespace: namespace,
redisPool: redisPool, redisPool: redisPool,
@ -91,6 +93,7 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re
statsManager: statsMgr, statsManager: statsMgr,
knownJobs: make(map[string]interface{}), knownJobs: make(map[string]interface{}),
messageServer: msgServer, messageServer: msgServer,
deDuplicator: deDepulicator,
} }
} }
@ -236,7 +239,7 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
} }
} }
redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager) redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager, gcwp.deDuplicator)
// Get more info from j // Get more info from j
theJ := Wrap(j) theJ := Wrap(j)
@ -276,16 +279,24 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i
err error err error
) )
// Enqueue job // As the job is declared to be unique,
// check the uniqueness of the job,
// if no duplicated job existing (including the running jobs),
// set the unique flag.
if isUnique { if isUnique {
j, err = gcwp.enqueuer.EnqueueUnique(jobName, params) if err = gcwp.deDuplicator.Unique(jobName, params); err != nil {
} else { return models.JobStats{}, err
j, err = gcwp.enqueuer.Enqueue(jobName, params)
} }
if err != nil { if j, err = gcwp.enqueuer.EnqueueUnique(jobName, params); err != nil {
return models.JobStats{}, err return models.JobStats{}, err
} }
} else {
// Enqueue job
if j, err = gcwp.enqueuer.Enqueue(jobName, params); err != nil {
return models.JobStats{}, err
}
}
// avoid backend pool bug // avoid backend pool bug
if j == nil { if j == nil {
@ -307,16 +318,24 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters,
err error err error
) )
// Enqueue job in // As the job is declared to be unique,
// check the uniqueness of the job,
// if no duplicated job existing (including the running jobs),
// set the unique flag.
if isUnique { if isUnique {
j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params) if err = gcwp.deDuplicator.Unique(jobName, params); err != nil {
} else { return models.JobStats{}, err
j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params)
} }
if err != nil { if j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params); err != nil {
return models.JobStats{}, err return models.JobStats{}, err
} }
} else {
// Enqueue job in
if j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params); err != nil {
return models.JobStats{}, err
}
}
// avoid backend pool bug // avoid backend pool bug
if j == nil { if j == nil {