From 2f979704849e5164f2b8ffe8aa5cfdba9a614445 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Thu, 15 Mar 2018 20:23:42 +0800 Subject: [PATCH] Support update job status for both generic and periodic job refactor scheduler refactor period.enqueuer add stats manager component --- src/jobservice_v2/job/impl/replication_job.go | 7 +- src/jobservice_v2/job/job_status.go | 12 +- src/jobservice_v2/job/redis_job_wrapper.go | 22 +- src/jobservice_v2/opm/job_operator.go | 3 + src/jobservice_v2/opm/job_stats_mgr.go | 32 +++ src/jobservice_v2/opm/redis_job_stats_mgr.go | 267 ++++++++++++++++++ src/jobservice_v2/period/enqueuer.go | 10 +- src/jobservice_v2/period/interface.go | 3 +- src/jobservice_v2/period/redis_scheduler.go | 122 ++++++-- src/jobservice_v2/pool/redis_pool.go | 190 ++++--------- src/jobservice_v2/utils/gocarft_work.go | 12 + src/jobservice_v2/utils/keys.go | 42 +-- 12 files changed, 508 insertions(+), 214 deletions(-) create mode 100644 src/jobservice_v2/opm/job_operator.go create mode 100644 src/jobservice_v2/opm/job_stats_mgr.go create mode 100644 src/jobservice_v2/opm/redis_job_stats_mgr.go diff --git a/src/jobservice_v2/job/impl/replication_job.go b/src/jobservice_v2/job/impl/replication_job.go index 927b977e5..f5af4f902 100644 --- a/src/jobservice_v2/job/impl/replication_job.go +++ b/src/jobservice_v2/job/impl/replication_job.go @@ -5,6 +5,8 @@ package impl import ( "errors" "fmt" + "strings" + "time" "github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/job" @@ -28,7 +30,7 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error { return errors.New("missing parameter 'image'") } - if name != "demo steven" { + if !strings.HasPrefix(name.(string), "demo") { return fmt.Errorf("expected '%s' but got '%s'", "demo steven", name) } @@ -41,5 +43,8 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}, fmt.Printf("params: %#v\n", params) fmt.Printf("context: %#v\n", ctx) + //HOLD ON FOR A WHILE + fmt.Println("Holding for 10 sec") + <-time.After(10 * time.Second) return nil } diff --git a/src/jobservice_v2/job/job_status.go b/src/jobservice_v2/job/job_status.go index 0db8d4126..500b08c66 100644 --- a/src/jobservice_v2/job/job_status.go +++ b/src/jobservice_v2/job/job_status.go @@ -3,14 +3,18 @@ package job const ( - //JobStatusPending : job status pending + //JobStatusPending : job status pending JobStatusPending = "Pending" - //JobStatusRunning : job status running + //JobStatusRunning : job status running JobStatusRunning = "Running" - //JobStatusStopped : job status stopped + //JobStatusStopped : job status stopped JobStatusStopped = "Stopped" //JobStatusCancelled : job status cancelled JobStatusCancelled = "Cancelled" - //JobStatusError : job status error + //JobStatusError : job status error JobStatusError = "Error" + //JobStatusSuccess : job status success + JobStatusSuccess = "Success" + //JobStatusScheduled : job status scheduled + JobStatusScheduled = "Scheduled" ) diff --git a/src/jobservice_v2/job/redis_job_wrapper.go b/src/jobservice_v2/job/redis_job_wrapper.go index 1666c97ed..c18d7883e 100644 --- a/src/jobservice_v2/job/redis_job_wrapper.go +++ b/src/jobservice_v2/job/redis_job_wrapper.go @@ -7,15 +7,19 @@ import ( "github.com/vmware/harbor/src/jobservice_v2/env" ) +//StatusChangeCallback is the func called when job status changed +type StatusChangeCallback func(jobID string, status string) + //RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. type RedisJob struct { - job interface{} - context *env.Context + job interface{} + context *env.Context + callback StatusChangeCallback } //NewRedisJob is constructor of RedisJob -func NewRedisJob(j interface{}, ctx *env.Context) *RedisJob { - return &RedisJob{j, ctx} +func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback) *RedisJob { + return &RedisJob{j, ctx, statusChangeCallback} } //Run the job @@ -33,10 +37,18 @@ func (rj *RedisJob) Run(j *work.Job) error { //Inject data runningJob := Wrap(rj.job) - //TODO: Update job status to 'Running' + //Start to run + rj.callback(j.ID, JobStatusRunning) + //TODO: Check function should be defined err = runningJob.Run(execContext, j.Args, nil) + if err == nil { + rj.callback(j.ID, JobStatusSuccess) + } else { + rj.callback(j.ID, JobStatusError) + } + //TODO: //If error is stopped error, update status to 'Stopped' and return nil //If error is cancelled error, update status to 'Cancelled' and return err diff --git a/src/jobservice_v2/opm/job_operator.go b/src/jobservice_v2/opm/job_operator.go new file mode 100644 index 000000000..cb4e8f22c --- /dev/null +++ b/src/jobservice_v2/opm/job_operator.go @@ -0,0 +1,3 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package opm diff --git a/src/jobservice_v2/opm/job_stats_mgr.go b/src/jobservice_v2/opm/job_stats_mgr.go new file mode 100644 index 000000000..4d3e308ee --- /dev/null +++ b/src/jobservice_v2/opm/job_stats_mgr.go @@ -0,0 +1,32 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package opm + +import "github.com/vmware/harbor/src/jobservice_v2/models" + +//JobStatsManager defines the methods to handle stats of job. +type JobStatsManager interface { + //Start to serve + Start() + + //Stop to serve + Stop() + + //Save the job stats + //Async method to retry and improve performance + // + //jobStats models.JobStats : the job stats to be saved + Save(jobStats models.JobStats) + + //Get the job stats from backend store + //Sync method as we need the data + // + //Returns: + // models.JobStats : job stats data + // error : error if meet any problems + Retrieve(jobID string) (models.JobStats, error) + + //SetJobStatus will mark the status of job to the specified one + //Async method to retry + SetJobStatus(jobID string, status string) +} diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go new file mode 100644 index 000000000..b00b608a2 --- /dev/null +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -0,0 +1,267 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package opm + +import ( + "context" + "math/rand" + "strconv" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +const ( + processBufferSize = 1024 + opSaveStats = "save_job_stats" + opUpdateStatus = "update_job_status" + maxFails = 3 +) + +type queueItem struct { + op string + fails uint + data interface{} +} + +//RedisJobStatsManager implements JobStatsManager based on redis. +type RedisJobStatsManager struct { + namespace string + redisPool *redis.Pool + context context.Context + + stopChan chan struct{} + doneChan chan struct{} + processChan chan *queueItem + isRunning bool //no need to sync +} + +//NewRedisJobStatsManager is constructor of RedisJobStatsManager +func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager { + return &RedisJobStatsManager{ + namespace: namespace, + context: ctx, + redisPool: redisPool, + stopChan: make(chan struct{}, 1), + doneChan: make(chan struct{}, 1), + processChan: make(chan *queueItem, processBufferSize), + } +} + +//Start is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) Start() { + if rjs.isRunning { + return + } + go rjs.loop() + rjs.isRunning = true +} + +//Stop is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) Stop() { + if !rjs.isRunning { + return + } + rjs.stopChan <- struct{}{} + <-rjs.doneChan +} + +//Save is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) { + item := &queueItem{ + op: opSaveStats, + data: jobStats, + } + + rjs.processChan <- item +} + +//Retrieve is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobStats(rjs.namespace, jobID) + vals, err := redis.Strings(conn.Do("HGETALL", key)) + if err != nil { + return models.JobStats{}, err + } + + res := models.JobStats{ + Stats: &models.JobStatData{}, + } + for i, l := 0, len(vals); i < l; i = i + 2 { + prop := vals[i] + value := vals[i+1] + switch prop { + case "id": + res.Stats.JobID = value + break + case "name": + res.Stats.JobName = value + break + case "kind": + res.Stats.JobKind = value + case "unique": + v, err := strconv.ParseBool(value) + if err != nil { + v = false + } + res.Stats.IsUnique = v + case "status": + res.Stats.Status = value + break + case "ref_link": + res.Stats.RefLink = value + break + case "enqueue_time": + v, _ := strconv.ParseInt(value, 10, 64) + res.Stats.EnqueueTime = v + break + case "update_time": + v, _ := strconv.ParseInt(value, 10, 64) + res.Stats.UpdateTime = v + break + case "run_at": + v, _ := strconv.ParseInt(value, 10, 64) + res.Stats.RunAt = v + break + case "check_in_at": + v, _ := strconv.ParseInt(value, 10, 64) + res.Stats.CheckInAt = v + break + case "check_in": + res.Stats.CheckIn = value + break + default: + } + } + + return res, nil +} + +//SetJobStatus is implementation of same method in JobStatsManager interface. +func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) { + item := &queueItem{ + op: opUpdateStatus, + data: []string{jobID, status}, + } + + rjs.processChan <- item +} + +func (rjs *RedisJobStatsManager) loop() { + controlChan := make(chan struct{}) + + defer func() { + rjs.isRunning = false + //Notify other sub goroutines + close(controlChan) + log.Info("Redis job stats manager is stopped") + }() + + for { + select { + case item := <-rjs.processChan: + if err := rjs.process(item); err != nil { + item.fails++ + if item.fails < maxFails { + //Retry after a random interval + go func() { + timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) + defer timer.Stop() + + select { + case <-timer.C: + rjs.processChan <- item + return + case <-controlChan: + } + }() + } else { + log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) + } + } + break + case <-rjs.stopChan: + rjs.doneChan <- struct{}{} + return + case <-rjs.context.Done(): + return + } + } +} + +func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobStats(rjs.namespace, jobID) + args := make([]interface{}, 0, 3) + args = append(args, key, "status", status, "update_time", time.Now().Unix()) + _, err := conn.Do("HMSET", args...) + + return err +} + +func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error { + conn := rjs.redisPool.Get() + defer conn.Close() + + key := utils.KeyJobStats(rjs.namespace, jobStats.Stats.JobID) + args := make([]interface{}, 0) + args = append(args, key) + args = append(args, + "id", jobStats.Stats.JobID, + "name", jobStats.Stats.JobName, + "kind", jobStats.Stats.JobKind, + "unique", jobStats.Stats.IsUnique, + "status", jobStats.Stats.Status, + "ref_link", jobStats.Stats.RefLink, + "enqueue_time", jobStats.Stats.EnqueueTime, + "update_time", jobStats.Stats.UpdateTime, + "run_at", jobStats.Stats.RunAt, + ) + if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) { + args = append(args, + "check_in", jobStats.Stats.CheckIn, + "check_in_at", jobStats.Stats.CheckInAt, + ) + } + + conn.Send("HMSET", args...) + //If job kind is periodic job, expire time should not be set + //If job kind is scheduled job, expire time should be runAt+1day + if jobStats.Stats.JobKind != job.JobKindPeriodic { + var expireTime int64 = 60 * 60 * 24 + if jobStats.Stats.JobKind == job.JobKindScheduled { + nowTime := time.Now().Unix() + future := jobStats.Stats.RunAt - nowTime + if future > 0 { + expireTime += future + } + } + conn.Send("EXPIRE", key, expireTime) + } + + return conn.Flush() +} + +func (rjs *RedisJobStatsManager) process(item *queueItem) error { + switch item.op { + case opSaveStats: + jobStats := item.data.(models.JobStats) + return rjs.saveJobStats(jobStats) + case opUpdateStatus: + data := item.data.([]string) + return rjs.updateJobStatus(data[0], data[1]) + default: + break + } + + return nil +} diff --git a/src/jobservice_v2/period/enqueuer.go b/src/jobservice_v2/period/enqueuer.go index 26681347c..f64c0084d 100644 --- a/src/jobservice_v2/period/enqueuer.go +++ b/src/jobservice_v2/period/enqueuer.go @@ -10,6 +10,7 @@ import ( "github.com/gocraft/work" "github.com/robfig/cron" "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/jobservice_v2/job" "github.com/vmware/harbor/src/jobservice_v2/utils" ) @@ -113,11 +114,9 @@ func (pe *periodicEnqueuer) enqueue() error { } for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) { epoch := t.Unix() - id := utils.MakeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it - job := &work.Job{ Name: pj.jobName, - ID: id, + ID: pl.PolicyID, //Same with the id of the policy it's being scheduled for // This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. EnqueuedAt: epoch, @@ -134,8 +133,11 @@ func (pe *periodicEnqueuer) enqueue() error { return err } - log.Infof("Schedule job %s for policy %s\n", pj.jobName, pl.PolicyID) + log.Infof("Schedule job %s for policy %s at %d\n", pj.jobName, pl.PolicyID, epoch) } + //Directly use redis conn to update the periodic job (policy) status + //Do not care the result + conn.Do("HMSET", utils.KeyJobStats(pe.namespace, pl.PolicyID), "status", job.JobStatusScheduled, "update_time", time.Now().Unix()) } _, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now) diff --git a/src/jobservice_v2/period/interface.go b/src/jobservice_v2/period/interface.go index 605d41d8a..2b1cded82 100644 --- a/src/jobservice_v2/period/interface.go +++ b/src/jobservice_v2/period/interface.go @@ -14,8 +14,9 @@ type Interface interface { // //Returns: // The uuid of the cron job policy + // The latest next trigger time // error if failed to schedule - Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) + Schedule(jobName string, params models.Parameters, cronSpec string) (string, int64, error) //Unschedule the specified cron job policy. // diff --git a/src/jobservice_v2/period/redis_scheduler.go b/src/jobservice_v2/period/redis_scheduler.go index 032b3a6d3..734d3df61 100644 --- a/src/jobservice_v2/period/redis_scheduler.go +++ b/src/jobservice_v2/period/redis_scheduler.go @@ -6,11 +6,12 @@ import ( "context" "encoding/json" "errors" - "fmt" "strconv" "sync" "time" + "github.com/robfig/cron" + "github.com/garyburd/redigo/redis" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice_v2/models" @@ -131,12 +132,18 @@ func (rps *RedisPeriodicScheduler) Start() error { } //Schedule is implementation of the same method in period.Interface -func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) { +func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, int64, error) { if utils.IsEmptyStr(jobName) { - return "", errors.New("empty job name is not allowed") + return "", 0, errors.New("empty job name is not allowed") } if utils.IsEmptyStr(cronSpec) { - return "", errors.New("cron spec is not set") + return "", 0, errors.New("cron spec is not set") + } + + //Get next run time + schedule, err := cron.Parse(cronSpec) + if err != nil { + return "", 0, err } //Although the ZSET can guarantee no duplicated items, we still need to check the existing @@ -150,13 +157,14 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame //Serialize data rawJSON, err := jobPolicy.serialize() if err != nil { - return "", nil + return "", 0, nil } //Check existing //If existing, treat as a succeed submitting and return the exitsing id if score, ok := rps.exists(string(rawJSON)); ok { - return utils.MakePeriodicPolicyUUIDWithScore(score), nil + id, err := rps.getIDByScore(score) + return id, 0, err } uuid, score := utils.MakePeriodicPolicyUUID() @@ -168,21 +176,35 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame } rawJSON2, err := notification.serialize() if err != nil { - return "", err + return "", 0, err } //Save to redis db and publish notification via redis transaction conn := rps.redisPool.Get() defer conn.Close() - conn.Send("MULTI") - conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON) - conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2) - if _, err := conn.Do("EXEC"); err != nil { - return "", err + err = conn.Send("MULTI") + if err != nil { + return "", 0, err + } + err = conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON) + if err != nil { + return "", 0, err + } + err = conn.Send("ZADD", utils.KeyPeriodicPolicyScore(rps.namespace), score, uuid) + if err != nil { + return "", 0, err + } + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2) + if err != nil { + return "", 0, err } - return uuid, nil + if _, err := conn.Do("EXEC"); err != nil { + return "", 0, err + } + + return uuid, schedule.Next(time.Now()).Unix(), nil } //UnSchedule is implementation of the same method in period.Interface @@ -191,9 +213,9 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { return errors.New("cron job policy ID is empty") } - score := utils.ExtractScoreFromUUID(cronJobPolicyID) - if score == 0 { - return fmt.Errorf("The ID '%s' is not valid", cronJobPolicyID) + score, err := rps.getScoreByID(cronJobPolicyID) + if err != nil { + return err } notification := &periodicJobPolicyEvent{ @@ -212,9 +234,23 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { conn := rps.redisPool.Get() defer conn.Close() - conn.Send("MULTI") - conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score - conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON) + err = conn.Send("MULTI") + if err != nil { + return err + } + err = conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score + if err != nil { + return err + } + err = conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), score, score) //Remove key score mapping + if err != nil { + return err + } + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON) + if err != nil { + return err + } + _, err = conn.Do("EXEC") return err @@ -225,12 +261,29 @@ func (rps *RedisPeriodicScheduler) Load() error { conn := rps.redisPool.Get() defer conn.Close() - bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES")) + //Let's build key score mapping locally first + bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicyScore(rps.namespace), 0, -1, "WITHSCORES")) + if err != nil { + return err + } + keyScoreMap := make(map[int64]string) + for i, l := 0, len(bytes); i < l; i = i + 2 { + pid := string(bytes[i].([]byte)) + rawScore := bytes[i+1].([]byte) + score, err := strconv.ParseInt(string(rawScore), 10, 64) + if err != nil { + //Ignore + continue + } + keyScoreMap[score] = pid + } + + bytes, err = redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES")) if err != nil { return err } - allPeriodicPolicies := make([]*periodicJobPolicy, 0) + allPeriodicPolicies := make([]*periodicJobPolicy, 0, len(bytes)/2) for i, l := 0, len(bytes); i < l; i = i + 2 { rawPolicy := bytes[i].([]byte) rawScore := bytes[i+1].([]byte) @@ -251,7 +304,13 @@ func (rps *RedisPeriodicScheduler) Load() error { } //Set back the policy ID - policy.PolicyID = utils.MakePeriodicPolicyUUIDWithScore(score) + if pid, ok := keyScoreMap[score]; ok { + policy.PolicyID = pid + } else { + //Something wrong, should not be happended + //ignore here + continue + } allPeriodicPolicies = append(allPeriodicPolicies, policy) } @@ -286,6 +345,25 @@ func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) { return count, err == nil } +func (rps *RedisPeriodicScheduler) getScoreByID(id string) (int64, error) { + conn := rps.redisPool.Get() + defer conn.Close() + + return redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), id)) +} + +func (rps *RedisPeriodicScheduler) getIDByScore(score int64) (string, error) { + conn := rps.redisPool.Get() + defer conn.Close() + + ids, err := redis.Strings(conn.Do("ZRANGEBYSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), score, score)) + if err != nil { + return "", err + } + + return ids[0], nil +} + func readMessage(data []byte) *periodicJobPolicyEvent { if data == nil || len(data) == 0 { return nil diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index 659b5e46a..93c31abce 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "os" - "strconv" "time" "github.com/garyburd/redigo/redis" @@ -15,6 +14,7 @@ import ( "github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/job" "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/opm" "github.com/vmware/harbor/src/jobservice_v2/period" "github.com/vmware/harbor/src/jobservice_v2/utils" ) @@ -34,14 +34,15 @@ const ( //GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis. type GoCraftWorkPool struct { - namespace string - redisPool *redis.Pool - pool *work.WorkerPool - enqueuer *work.Enqueuer - sweeper *period.Sweeper - client *work.Client - context *env.Context - scheduler period.Interface + namespace string + redisPool *redis.Pool + pool *work.WorkerPool + enqueuer *work.Enqueuer + sweeper *period.Sweeper + client *work.Client + context *env.Context + scheduler period.Interface + statsManager opm.JobStatsManager //no need to sync as write once and then only read //key is name of known job @@ -82,16 +83,18 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool client := work.NewClient(cfg.Namespace, redisPool) scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool) sweeper := period.NewSweeper(cfg.Namespace, redisPool, client) + statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool) return &GoCraftWorkPool{ - namespace: cfg.Namespace, - redisPool: redisPool, - pool: pool, - enqueuer: enqueuer, - scheduler: scheduler, - sweeper: sweeper, - client: client, - context: ctx, - knownJobs: make(map[string]interface{}), + namespace: cfg.Namespace, + redisPool: redisPool, + pool: pool, + enqueuer: enqueuer, + scheduler: scheduler, + sweeper: sweeper, + client: client, + context: ctx, + statsManager: statsMgr, + knownJobs: make(map[string]interface{}), } } @@ -112,7 +115,13 @@ func (gcwp *GoCraftWorkPool) Start() { go func() { defer func() { gcwp.context.WG.Done() + gcwp.statsManager.Stop() }() + //Start stats manager + //None-blocking + gcwp.statsManager.Start() + log.Info("Redis job stats manager is started") + //blocking call if err := gcwp.scheduler.Start(); err != nil { //Scheduler exits with error @@ -164,7 +173,10 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { } //Use redis job wrapper pointer to keep the data required by the job.Interface. - redisJob := job.NewRedisJob(j, gcwp.context) + statusChangeCallback := func(jobID string, status string) { + gcwp.statsManager.SetJobStatus(jobID, status) + } + redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback) //Get more info from j theJ := job.Wrap(j) @@ -214,11 +226,10 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i } res := generateResult(j, job.JobKindGeneric, isUnique) - if err := gcwp.saveJobStats(res); err != nil { - //Once running job, let it fly away - //The client method may help if the job is still in progress when get stats of this job - log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err) - } + //Save data with async way. Once it fails to do, let it escape + //The client method may help if the job is still in progress when get stats of this job + gcwp.statsManager.Save(res) + return res, nil } @@ -243,34 +254,36 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, res := generateResult(j.Job, job.JobKindScheduled, isUnique) res.Stats.RunAt = j.RunAt - if err := gcwp.saveJobStats(res); err != nil { - //As job is already scheduled, we should not block this call - //Use client method to help get the status of this fly-away job - log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err) - } + //As job is already scheduled, we should not block this call + //Once it fails to do, use client method to help get the status of the escape job + gcwp.statsManager.Save(res) return res, nil } //PeriodicallyEnqueue job func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) { - id, err := gcwp.scheduler.Schedule(jobName, params, cronSetting) + id, nextRun, err := gcwp.scheduler.Schedule(jobName, params, cronSetting) if err != nil { return models.JobStats{}, err } - //TODO: Need more data - //TODO: EnqueueTime should be got from cron spec - return models.JobStats{ + res := models.JobStats{ Stats: &models.JobStatData{ JobID: id, JobName: jobName, Status: job.JobStatusPending, + JobKind: job.JobKindPeriodic, EnqueueTime: time.Now().Unix(), UpdateTime: time.Now().Unix(), RefLink: fmt.Sprintf("/api/v1/jobs/%s", id), + RunAt: nextRun, }, - }, nil + } + + gcwp.statsManager.Save(res) + + return res, nil } //GetJobStats return the job stats of the specified enqueued job. @@ -279,7 +292,7 @@ func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error) return models.JobStats{}, errors.New("empty job ID") } - return gcwp.getJobStats(jobID) + return gcwp.statsManager.Retrieve(jobID) } //Stats of pool @@ -330,115 +343,8 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m return theJ.Validate(params) } -func (gcwp *GoCraftWorkPool) getJobStats(ID string) (models.JobStats, error) { - conn := gcwp.redisPool.Get() - defer conn.Close() - - key := utils.KeyJobStats(gcwp.namespace, ID) - vals, err := redis.Strings(conn.Do("HGETALL", key)) - if err != nil { - return models.JobStats{}, err - } - - res := models.JobStats{ - Stats: &models.JobStatData{}, - } - for i, l := 0, len(vals); i < l; i = i + 2 { - prop := vals[i] - value := vals[i+1] - switch prop { - case "id": - res.Stats.JobID = value - break - case "name": - res.Stats.JobName = value - break - case "kind": - res.Stats.JobKind = value - case "unique": - v, err := strconv.ParseBool(value) - if err != nil { - v = false - } - res.Stats.IsUnique = v - case "status": - res.Stats.Status = value - break - case "ref_link": - res.Stats.RefLink = value - break - case "enqueue_time": - v, _ := strconv.ParseInt(value, 10, 64) - res.Stats.EnqueueTime = v - break - case "update_time": - v, _ := strconv.ParseInt(value, 10, 64) - res.Stats.UpdateTime = v - break - case "run_at": - v, _ := strconv.ParseInt(value, 10, 64) - res.Stats.RunAt = v - break - case "check_in_at": - v, _ := strconv.ParseInt(value, 10, 64) - res.Stats.CheckInAt = v - break - case "check_in": - res.Stats.CheckIn = value - break - default: - } - } - - return res, nil -} - -func (gcwp *GoCraftWorkPool) saveJobStats(stats models.JobStats) error { - conn := gcwp.redisPool.Get() - defer conn.Close() - - key := utils.KeyJobStats(gcwp.namespace, stats.Stats.JobID) - args := make([]interface{}, 0) - args = append(args, key) - args = append(args, - "id", stats.Stats.JobID, - "name", stats.Stats.JobName, - "kind", stats.Stats.JobKind, - "unique", stats.Stats.IsUnique, - "status", stats.Stats.Status, - "ref_link", stats.Stats.RefLink, - "enqueue_time", stats.Stats.EnqueueTime, - "update_time", stats.Stats.UpdateTime, - "run_at", stats.Stats.RunAt, - ) - if stats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(stats.Stats.CheckIn) { - args = append(args, - "check_in", stats.Stats.CheckIn, - "check_in_at", stats.Stats.CheckInAt, - ) - } - - conn.Send("HMSET", args...) - //If job kind is periodic job, expire time should not be set - //If job kind is scheduled job, expire time should be runAt+1day - if stats.Stats.JobKind != job.JobKindPeriodic { - var expireTime int64 = 60 * 60 * 24 - if stats.Stats.JobKind == job.JobKindScheduled { - nowTime := time.Now().Unix() - future := stats.Stats.RunAt - nowTime - if future > 0 { - expireTime += future - } - } - conn.Send("EXPIRE", key, expireTime) - } - - return conn.Flush() -} - //log the job func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error { - //TODO: Also update the job status to 'pending' log.Infof("Job incoming: %s:%s", job.ID, job.Name) return next() } diff --git a/src/jobservice_v2/utils/gocarft_work.go b/src/jobservice_v2/utils/gocarft_work.go index 5615ac73a..4254f5ad8 100644 --- a/src/jobservice_v2/utils/gocarft_work.go +++ b/src/jobservice_v2/utils/gocarft_work.go @@ -1,8 +1,10 @@ package utils import ( + "crypto/rand" "encoding/json" "fmt" + "io" "time" "github.com/gocraft/work" @@ -11,6 +13,16 @@ import ( //Functions defined here are mainly from dep lib "github.com/gocraft/work". //Only for compatible +//MakeIdentifier creates uuid for job. +func MakeIdentifier() string { + b := make([]byte, 12) + _, err := io.ReadFull(rand.Reader, b) + if err != nil { + return "" + } + return fmt.Sprintf("%x", b) +} + //MakeUniquePeriodicID creates id for the periodic job. func MakeUniquePeriodicID(name, spec string, epoch int64) string { return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch) diff --git a/src/jobservice_v2/utils/keys.go b/src/jobservice_v2/utils/keys.go index 6652902a2..45ac9a8aa 100644 --- a/src/jobservice_v2/utils/keys.go +++ b/src/jobservice_v2/utils/keys.go @@ -3,10 +3,8 @@ package utils import ( - "encoding/base64" "fmt" "math/rand" - "strconv" "strings" "time" ) @@ -20,38 +18,7 @@ func generateScore() int64 { //MakePeriodicPolicyUUID returns an UUID for the periodic policy. func MakePeriodicPolicyUUID() (string, int64) { score := generateScore() - return MakePeriodicPolicyUUIDWithScore(score), score -} - -//MakePeriodicPolicyUUIDWithScore returns the UUID based on the specified score for the periodic policy. -func MakePeriodicPolicyUUIDWithScore(score int64) string { - rawUUID := fmt.Sprintf("%s:%s:%d", "periodic", "policy", score) - return base64.StdEncoding.EncodeToString([]byte(rawUUID)) -} - -//ExtractScoreFromUUID extracts the score from the UUID. -func ExtractScoreFromUUID(UUID string) int64 { - if IsEmptyStr(UUID) { - return 0 - } - - rawData, err := base64.StdEncoding.DecodeString(UUID) - if err != nil { - return 0 - } - - data := string(rawData) - fragments := strings.Split(data, ":") - if len(fragments) != 3 { - return 0 - } - - score, err := strconv.ParseInt(fragments[2], 10, 64) - if err != nil { - return 0 - } - - return score + return MakeIdentifier(), score } //KeyNamespacePrefix returns the based key based on the namespace. @@ -69,11 +36,16 @@ func KeyPeriod(namespace string) string { return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "period") } -//KeyPeriodicPolicy return the key of periodic policies. +//KeyPeriodicPolicy returns the key of periodic policies. func KeyPeriodicPolicy(namespace string) string { return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies") } +//KeyPeriodicPolicyScore returns the key of policy key and score mapping. +func KeyPeriodicPolicyScore(namespace string) string { + return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "key_score") +} + //KeyPeriodicNotification returns the key of periodic pub/sub channel. func KeyPeriodicNotification(namespace string) string { return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")