Support update job status for both generic and periodic job

refactor scheduler
refactor period.enqueuer
add stats manager component
This commit is contained in:
Steven Zou 2018-03-15 20:23:42 +08:00
parent be75145858
commit 2f97970484
12 changed files with 508 additions and 214 deletions

View File

@ -5,6 +5,8 @@ package impl
import (
"errors"
"fmt"
"strings"
"time"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
@ -28,7 +30,7 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error {
return errors.New("missing parameter 'image'")
}
if name != "demo steven" {
if !strings.HasPrefix(name.(string), "demo") {
return fmt.Errorf("expected '%s' but got '%s'", "demo steven", name)
}
@ -41,5 +43,8 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{},
fmt.Printf("params: %#v\n", params)
fmt.Printf("context: %#v\n", ctx)
//HOLD ON FOR A WHILE
fmt.Println("Holding for 10 sec")
<-time.After(10 * time.Second)
return nil
}

View File

@ -3,14 +3,18 @@
package job
const (
//JobStatusPending : job status pending
//JobStatusPending : job status pending
JobStatusPending = "Pending"
//JobStatusRunning : job status running
//JobStatusRunning : job status running
JobStatusRunning = "Running"
//JobStatusStopped : job status stopped
//JobStatusStopped : job status stopped
JobStatusStopped = "Stopped"
//JobStatusCancelled : job status cancelled
JobStatusCancelled = "Cancelled"
//JobStatusError : job status error
//JobStatusError : job status error
JobStatusError = "Error"
//JobStatusSuccess : job status success
JobStatusSuccess = "Success"
//JobStatusScheduled : job status scheduled
JobStatusScheduled = "Scheduled"
)

View File

@ -7,15 +7,19 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/env"
)
//StatusChangeCallback is the func called when job status changed
type StatusChangeCallback func(jobID string, status string)
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
type RedisJob struct {
job interface{}
context *env.Context
job interface{}
context *env.Context
callback StatusChangeCallback
}
//NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context) *RedisJob {
return &RedisJob{j, ctx}
func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback) *RedisJob {
return &RedisJob{j, ctx, statusChangeCallback}
}
//Run the job
@ -33,10 +37,18 @@ func (rj *RedisJob) Run(j *work.Job) error {
//Inject data
runningJob := Wrap(rj.job)
//TODO: Update job status to 'Running'
//Start to run
rj.callback(j.ID, JobStatusRunning)
//TODO: Check function should be defined
err = runningJob.Run(execContext, j.Args, nil)
if err == nil {
rj.callback(j.ID, JobStatusSuccess)
} else {
rj.callback(j.ID, JobStatusError)
}
//TODO:
//If error is stopped error, update status to 'Stopped' and return nil
//If error is cancelled error, update status to 'Cancelled' and return err

View File

@ -0,0 +1,3 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm

View File

@ -0,0 +1,32 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import "github.com/vmware/harbor/src/jobservice_v2/models"
//JobStatsManager defines the methods to handle stats of job.
type JobStatsManager interface {
//Start to serve
Start()
//Stop to serve
Stop()
//Save the job stats
//Async method to retry and improve performance
//
//jobStats models.JobStats : the job stats to be saved
Save(jobStats models.JobStats)
//Get the job stats from backend store
//Sync method as we need the data
//
//Returns:
// models.JobStats : job stats data
// error : error if meet any problems
Retrieve(jobID string) (models.JobStats, error)
//SetJobStatus will mark the status of job to the specified one
//Async method to retry
SetJobStatus(jobID string, status string)
}

View File

@ -0,0 +1,267 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import (
"context"
"math/rand"
"strconv"
"time"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
const (
processBufferSize = 1024
opSaveStats = "save_job_stats"
opUpdateStatus = "update_job_status"
maxFails = 3
)
type queueItem struct {
op string
fails uint
data interface{}
}
//RedisJobStatsManager implements JobStatsManager based on redis.
type RedisJobStatsManager struct {
namespace string
redisPool *redis.Pool
context context.Context
stopChan chan struct{}
doneChan chan struct{}
processChan chan *queueItem
isRunning bool //no need to sync
}
//NewRedisJobStatsManager is constructor of RedisJobStatsManager
func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager {
return &RedisJobStatsManager{
namespace: namespace,
context: ctx,
redisPool: redisPool,
stopChan: make(chan struct{}, 1),
doneChan: make(chan struct{}, 1),
processChan: make(chan *queueItem, processBufferSize),
}
}
//Start is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Start() {
if rjs.isRunning {
return
}
go rjs.loop()
rjs.isRunning = true
}
//Stop is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Stop() {
if !rjs.isRunning {
return
}
rjs.stopChan <- struct{}{}
<-rjs.doneChan
}
//Save is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
item := &queueItem{
op: opSaveStats,
data: jobStats,
}
rjs.processChan <- item
}
//Retrieve is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
vals, err := redis.Strings(conn.Do("HGETALL", key))
if err != nil {
return models.JobStats{}, err
}
res := models.JobStats{
Stats: &models.JobStatData{},
}
for i, l := 0, len(vals); i < l; i = i + 2 {
prop := vals[i]
value := vals[i+1]
switch prop {
case "id":
res.Stats.JobID = value
break
case "name":
res.Stats.JobName = value
break
case "kind":
res.Stats.JobKind = value
case "unique":
v, err := strconv.ParseBool(value)
if err != nil {
v = false
}
res.Stats.IsUnique = v
case "status":
res.Stats.Status = value
break
case "ref_link":
res.Stats.RefLink = value
break
case "enqueue_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.EnqueueTime = v
break
case "update_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.UpdateTime = v
break
case "run_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.RunAt = v
break
case "check_in_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.CheckInAt = v
break
case "check_in":
res.Stats.CheckIn = value
break
default:
}
}
return res, nil
}
//SetJobStatus is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
item := &queueItem{
op: opUpdateStatus,
data: []string{jobID, status},
}
rjs.processChan <- item
}
func (rjs *RedisJobStatsManager) loop() {
controlChan := make(chan struct{})
defer func() {
rjs.isRunning = false
//Notify other sub goroutines
close(controlChan)
log.Info("Redis job stats manager is stopped")
}()
for {
select {
case item := <-rjs.processChan:
if err := rjs.process(item); err != nil {
item.fails++
if item.fails < maxFails {
//Retry after a random interval
go func() {
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer timer.Stop()
select {
case <-timer.C:
rjs.processChan <- item
return
case <-controlChan:
}
}()
} else {
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
}
}
break
case <-rjs.stopChan:
rjs.doneChan <- struct{}{}
return
case <-rjs.context.Done():
return
}
}
}
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 3)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
_, err := conn.Do("HMSET", args...)
return err
}
func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobStats.Stats.JobID)
args := make([]interface{}, 0)
args = append(args, key)
args = append(args,
"id", jobStats.Stats.JobID,
"name", jobStats.Stats.JobName,
"kind", jobStats.Stats.JobKind,
"unique", jobStats.Stats.IsUnique,
"status", jobStats.Stats.Status,
"ref_link", jobStats.Stats.RefLink,
"enqueue_time", jobStats.Stats.EnqueueTime,
"update_time", jobStats.Stats.UpdateTime,
"run_at", jobStats.Stats.RunAt,
)
if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) {
args = append(args,
"check_in", jobStats.Stats.CheckIn,
"check_in_at", jobStats.Stats.CheckInAt,
)
}
conn.Send("HMSET", args...)
//If job kind is periodic job, expire time should not be set
//If job kind is scheduled job, expire time should be runAt+1day
if jobStats.Stats.JobKind != job.JobKindPeriodic {
var expireTime int64 = 60 * 60 * 24
if jobStats.Stats.JobKind == job.JobKindScheduled {
nowTime := time.Now().Unix()
future := jobStats.Stats.RunAt - nowTime
if future > 0 {
expireTime += future
}
}
conn.Send("EXPIRE", key, expireTime)
}
return conn.Flush()
}
func (rjs *RedisJobStatsManager) process(item *queueItem) error {
switch item.op {
case opSaveStats:
jobStats := item.data.(models.JobStats)
return rjs.saveJobStats(jobStats)
case opUpdateStatus:
data := item.data.([]string)
return rjs.updateJobStatus(data[0], data[1])
default:
break
}
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/gocraft/work"
"github.com/robfig/cron"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
@ -113,11 +114,9 @@ func (pe *periodicEnqueuer) enqueue() error {
}
for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
epoch := t.Unix()
id := utils.MakeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it
job := &work.Job{
Name: pj.jobName,
ID: id,
ID: pl.PolicyID, //Same with the id of the policy it's being scheduled for
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
EnqueuedAt: epoch,
@ -134,8 +133,11 @@ func (pe *periodicEnqueuer) enqueue() error {
return err
}
log.Infof("Schedule job %s for policy %s\n", pj.jobName, pl.PolicyID)
log.Infof("Schedule job %s for policy %s at %d\n", pj.jobName, pl.PolicyID, epoch)
}
//Directly use redis conn to update the periodic job (policy) status
//Do not care the result
conn.Do("HMSET", utils.KeyJobStats(pe.namespace, pl.PolicyID), "status", job.JobStatusScheduled, "update_time", time.Now().Unix())
}
_, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now)

View File

@ -14,8 +14,9 @@ type Interface interface {
//
//Returns:
// The uuid of the cron job policy
// The latest next trigger time
// error if failed to schedule
Schedule(jobName string, params models.Parameters, cronSpec string) (string, error)
Schedule(jobName string, params models.Parameters, cronSpec string) (string, int64, error)
//Unschedule the specified cron job policy.
//

View File

@ -6,11 +6,12 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/robfig/cron"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/models"
@ -131,12 +132,18 @@ func (rps *RedisPeriodicScheduler) Start() error {
}
//Schedule is implementation of the same method in period.Interface
func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) {
func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, int64, error) {
if utils.IsEmptyStr(jobName) {
return "", errors.New("empty job name is not allowed")
return "", 0, errors.New("empty job name is not allowed")
}
if utils.IsEmptyStr(cronSpec) {
return "", errors.New("cron spec is not set")
return "", 0, errors.New("cron spec is not set")
}
//Get next run time
schedule, err := cron.Parse(cronSpec)
if err != nil {
return "", 0, err
}
//Although the ZSET can guarantee no duplicated items, we still need to check the existing
@ -150,13 +157,14 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame
//Serialize data
rawJSON, err := jobPolicy.serialize()
if err != nil {
return "", nil
return "", 0, nil
}
//Check existing
//If existing, treat as a succeed submitting and return the exitsing id
if score, ok := rps.exists(string(rawJSON)); ok {
return utils.MakePeriodicPolicyUUIDWithScore(score), nil
id, err := rps.getIDByScore(score)
return id, 0, err
}
uuid, score := utils.MakePeriodicPolicyUUID()
@ -168,21 +176,35 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame
}
rawJSON2, err := notification.serialize()
if err != nil {
return "", err
return "", 0, err
}
//Save to redis db and publish notification via redis transaction
conn := rps.redisPool.Get()
defer conn.Close()
conn.Send("MULTI")
conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON)
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2)
if _, err := conn.Do("EXEC"); err != nil {
return "", err
err = conn.Send("MULTI")
if err != nil {
return "", 0, err
}
err = conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON)
if err != nil {
return "", 0, err
}
err = conn.Send("ZADD", utils.KeyPeriodicPolicyScore(rps.namespace), score, uuid)
if err != nil {
return "", 0, err
}
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2)
if err != nil {
return "", 0, err
}
return uuid, nil
if _, err := conn.Do("EXEC"); err != nil {
return "", 0, err
}
return uuid, schedule.Next(time.Now()).Unix(), nil
}
//UnSchedule is implementation of the same method in period.Interface
@ -191,9 +213,9 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
return errors.New("cron job policy ID is empty")
}
score := utils.ExtractScoreFromUUID(cronJobPolicyID)
if score == 0 {
return fmt.Errorf("The ID '%s' is not valid", cronJobPolicyID)
score, err := rps.getScoreByID(cronJobPolicyID)
if err != nil {
return err
}
notification := &periodicJobPolicyEvent{
@ -212,9 +234,23 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
conn := rps.redisPool.Get()
defer conn.Close()
conn.Send("MULTI")
conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON)
err = conn.Send("MULTI")
if err != nil {
return err
}
err = conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score
if err != nil {
return err
}
err = conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), score, score) //Remove key score mapping
if err != nil {
return err
}
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON)
if err != nil {
return err
}
_, err = conn.Do("EXEC")
return err
@ -225,12 +261,29 @@ func (rps *RedisPeriodicScheduler) Load() error {
conn := rps.redisPool.Get()
defer conn.Close()
bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES"))
//Let's build key score mapping locally first
bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicyScore(rps.namespace), 0, -1, "WITHSCORES"))
if err != nil {
return err
}
keyScoreMap := make(map[int64]string)
for i, l := 0, len(bytes); i < l; i = i + 2 {
pid := string(bytes[i].([]byte))
rawScore := bytes[i+1].([]byte)
score, err := strconv.ParseInt(string(rawScore), 10, 64)
if err != nil {
//Ignore
continue
}
keyScoreMap[score] = pid
}
bytes, err = redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES"))
if err != nil {
return err
}
allPeriodicPolicies := make([]*periodicJobPolicy, 0)
allPeriodicPolicies := make([]*periodicJobPolicy, 0, len(bytes)/2)
for i, l := 0, len(bytes); i < l; i = i + 2 {
rawPolicy := bytes[i].([]byte)
rawScore := bytes[i+1].([]byte)
@ -251,7 +304,13 @@ func (rps *RedisPeriodicScheduler) Load() error {
}
//Set back the policy ID
policy.PolicyID = utils.MakePeriodicPolicyUUIDWithScore(score)
if pid, ok := keyScoreMap[score]; ok {
policy.PolicyID = pid
} else {
//Something wrong, should not be happended
//ignore here
continue
}
allPeriodicPolicies = append(allPeriodicPolicies, policy)
}
@ -286,6 +345,25 @@ func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) {
return count, err == nil
}
func (rps *RedisPeriodicScheduler) getScoreByID(id string) (int64, error) {
conn := rps.redisPool.Get()
defer conn.Close()
return redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), id))
}
func (rps *RedisPeriodicScheduler) getIDByScore(score int64) (string, error) {
conn := rps.redisPool.Get()
defer conn.Close()
ids, err := redis.Strings(conn.Do("ZRANGEBYSCORE", utils.KeyPeriodicPolicyScore(rps.namespace), score, score))
if err != nil {
return "", err
}
return ids[0], nil
}
func readMessage(data []byte) *periodicJobPolicyEvent {
if data == nil || len(data) == 0 {
return nil

View File

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"os"
"strconv"
"time"
"github.com/garyburd/redigo/redis"
@ -15,6 +14,7 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/opm"
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
@ -34,14 +34,15 @@ const (
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
type GoCraftWorkPool struct {
namespace string
redisPool *redis.Pool
pool *work.WorkerPool
enqueuer *work.Enqueuer
sweeper *period.Sweeper
client *work.Client
context *env.Context
scheduler period.Interface
namespace string
redisPool *redis.Pool
pool *work.WorkerPool
enqueuer *work.Enqueuer
sweeper *period.Sweeper
client *work.Client
context *env.Context
scheduler period.Interface
statsManager opm.JobStatsManager
//no need to sync as write once and then only read
//key is name of known job
@ -82,16 +83,18 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool
client := work.NewClient(cfg.Namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool)
return &GoCraftWorkPool{
namespace: cfg.Namespace,
redisPool: redisPool,
pool: pool,
enqueuer: enqueuer,
scheduler: scheduler,
sweeper: sweeper,
client: client,
context: ctx,
knownJobs: make(map[string]interface{}),
namespace: cfg.Namespace,
redisPool: redisPool,
pool: pool,
enqueuer: enqueuer,
scheduler: scheduler,
sweeper: sweeper,
client: client,
context: ctx,
statsManager: statsMgr,
knownJobs: make(map[string]interface{}),
}
}
@ -112,7 +115,13 @@ func (gcwp *GoCraftWorkPool) Start() {
go func() {
defer func() {
gcwp.context.WG.Done()
gcwp.statsManager.Stop()
}()
//Start stats manager
//None-blocking
gcwp.statsManager.Start()
log.Info("Redis job stats manager is started")
//blocking call
if err := gcwp.scheduler.Start(); err != nil {
//Scheduler exits with error
@ -164,7 +173,10 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
}
//Use redis job wrapper pointer to keep the data required by the job.Interface.
redisJob := job.NewRedisJob(j, gcwp.context)
statusChangeCallback := func(jobID string, status string) {
gcwp.statsManager.SetJobStatus(jobID, status)
}
redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback)
//Get more info from j
theJ := job.Wrap(j)
@ -214,11 +226,10 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i
}
res := generateResult(j, job.JobKindGeneric, isUnique)
if err := gcwp.saveJobStats(res); err != nil {
//Once running job, let it fly away
//The client method may help if the job is still in progress when get stats of this job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
}
//Save data with async way. Once it fails to do, let it escape
//The client method may help if the job is still in progress when get stats of this job
gcwp.statsManager.Save(res)
return res, nil
}
@ -243,34 +254,36 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters,
res := generateResult(j.Job, job.JobKindScheduled, isUnique)
res.Stats.RunAt = j.RunAt
if err := gcwp.saveJobStats(res); err != nil {
//As job is already scheduled, we should not block this call
//Use client method to help get the status of this fly-away job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
}
//As job is already scheduled, we should not block this call
//Once it fails to do, use client method to help get the status of the escape job
gcwp.statsManager.Save(res)
return res, nil
}
//PeriodicallyEnqueue job
func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) {
id, err := gcwp.scheduler.Schedule(jobName, params, cronSetting)
id, nextRun, err := gcwp.scheduler.Schedule(jobName, params, cronSetting)
if err != nil {
return models.JobStats{}, err
}
//TODO: Need more data
//TODO: EnqueueTime should be got from cron spec
return models.JobStats{
res := models.JobStats{
Stats: &models.JobStatData{
JobID: id,
JobName: jobName,
Status: job.JobStatusPending,
JobKind: job.JobKindPeriodic,
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", id),
RunAt: nextRun,
},
}, nil
}
gcwp.statsManager.Save(res)
return res, nil
}
//GetJobStats return the job stats of the specified enqueued job.
@ -279,7 +292,7 @@ func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error)
return models.JobStats{}, errors.New("empty job ID")
}
return gcwp.getJobStats(jobID)
return gcwp.statsManager.Retrieve(jobID)
}
//Stats of pool
@ -330,115 +343,8 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m
return theJ.Validate(params)
}
func (gcwp *GoCraftWorkPool) getJobStats(ID string) (models.JobStats, error) {
conn := gcwp.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(gcwp.namespace, ID)
vals, err := redis.Strings(conn.Do("HGETALL", key))
if err != nil {
return models.JobStats{}, err
}
res := models.JobStats{
Stats: &models.JobStatData{},
}
for i, l := 0, len(vals); i < l; i = i + 2 {
prop := vals[i]
value := vals[i+1]
switch prop {
case "id":
res.Stats.JobID = value
break
case "name":
res.Stats.JobName = value
break
case "kind":
res.Stats.JobKind = value
case "unique":
v, err := strconv.ParseBool(value)
if err != nil {
v = false
}
res.Stats.IsUnique = v
case "status":
res.Stats.Status = value
break
case "ref_link":
res.Stats.RefLink = value
break
case "enqueue_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.EnqueueTime = v
break
case "update_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.UpdateTime = v
break
case "run_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.RunAt = v
break
case "check_in_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.CheckInAt = v
break
case "check_in":
res.Stats.CheckIn = value
break
default:
}
}
return res, nil
}
func (gcwp *GoCraftWorkPool) saveJobStats(stats models.JobStats) error {
conn := gcwp.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(gcwp.namespace, stats.Stats.JobID)
args := make([]interface{}, 0)
args = append(args, key)
args = append(args,
"id", stats.Stats.JobID,
"name", stats.Stats.JobName,
"kind", stats.Stats.JobKind,
"unique", stats.Stats.IsUnique,
"status", stats.Stats.Status,
"ref_link", stats.Stats.RefLink,
"enqueue_time", stats.Stats.EnqueueTime,
"update_time", stats.Stats.UpdateTime,
"run_at", stats.Stats.RunAt,
)
if stats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(stats.Stats.CheckIn) {
args = append(args,
"check_in", stats.Stats.CheckIn,
"check_in_at", stats.Stats.CheckInAt,
)
}
conn.Send("HMSET", args...)
//If job kind is periodic job, expire time should not be set
//If job kind is scheduled job, expire time should be runAt+1day
if stats.Stats.JobKind != job.JobKindPeriodic {
var expireTime int64 = 60 * 60 * 24
if stats.Stats.JobKind == job.JobKindScheduled {
nowTime := time.Now().Unix()
future := stats.Stats.RunAt - nowTime
if future > 0 {
expireTime += future
}
}
conn.Send("EXPIRE", key, expireTime)
}
return conn.Flush()
}
//log the job
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
//TODO: Also update the job status to 'pending'
log.Infof("Job incoming: %s:%s", job.ID, job.Name)
return next()
}

View File

@ -1,8 +1,10 @@
package utils
import (
"crypto/rand"
"encoding/json"
"fmt"
"io"
"time"
"github.com/gocraft/work"
@ -11,6 +13,16 @@ import (
//Functions defined here are mainly from dep lib "github.com/gocraft/work".
//Only for compatible
//MakeIdentifier creates uuid for job.
func MakeIdentifier() string {
b := make([]byte, 12)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
return ""
}
return fmt.Sprintf("%x", b)
}
//MakeUniquePeriodicID creates id for the periodic job.
func MakeUniquePeriodicID(name, spec string, epoch int64) string {
return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch)

View File

@ -3,10 +3,8 @@
package utils
import (
"encoding/base64"
"fmt"
"math/rand"
"strconv"
"strings"
"time"
)
@ -20,38 +18,7 @@ func generateScore() int64 {
//MakePeriodicPolicyUUID returns an UUID for the periodic policy.
func MakePeriodicPolicyUUID() (string, int64) {
score := generateScore()
return MakePeriodicPolicyUUIDWithScore(score), score
}
//MakePeriodicPolicyUUIDWithScore returns the UUID based on the specified score for the periodic policy.
func MakePeriodicPolicyUUIDWithScore(score int64) string {
rawUUID := fmt.Sprintf("%s:%s:%d", "periodic", "policy", score)
return base64.StdEncoding.EncodeToString([]byte(rawUUID))
}
//ExtractScoreFromUUID extracts the score from the UUID.
func ExtractScoreFromUUID(UUID string) int64 {
if IsEmptyStr(UUID) {
return 0
}
rawData, err := base64.StdEncoding.DecodeString(UUID)
if err != nil {
return 0
}
data := string(rawData)
fragments := strings.Split(data, ":")
if len(fragments) != 3 {
return 0
}
score, err := strconv.ParseInt(fragments[2], 10, 64)
if err != nil {
return 0
}
return score
return MakeIdentifier(), score
}
//KeyNamespacePrefix returns the based key based on the namespace.
@ -69,11 +36,16 @@ func KeyPeriod(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "period")
}
//KeyPeriodicPolicy return the key of periodic policies.
//KeyPeriodicPolicy returns the key of periodic policies.
func KeyPeriodicPolicy(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies")
}
//KeyPeriodicPolicyScore returns the key of policy key and score mapping.
func KeyPeriodicPolicyScore(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "key_score")
}
//KeyPeriodicNotification returns the key of periodic pub/sub channel.
func KeyPeriodicNotification(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")