Support saving job stats to the baclend for the redis workerpool

This commit is contained in:
Steven Zou 2018-03-15 09:56:07 +08:00
parent b61dc39278
commit 28d02931d2
5 changed files with 94 additions and 21 deletions

View File

@ -2,10 +2,6 @@
package models
import (
"time"
)
//Parameters for job execution.
type Parameters map[string]interface{}
@ -36,13 +32,17 @@ type JobStats struct {
//JobStatData keeps the stats of job
type JobStatData struct {
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
RefLink string `json:"ref_link,omitempty"`
EnqueueTime time.Time `json:"enqueue_time"`
UpdateTime time.Time `json:"update_time"`
RunAt time.Time `json:"run_at,omitempty"`
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
JobKind string `json:"kind"`
IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"`
EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"`
}
//JobPoolStats represent the healthy and status of the job service.

View File

@ -57,6 +57,8 @@ func (rps *RedisPeriodicScheduler) Start() error {
//As we get one connection from the pool, don't try to close it.
conn := rps.redisPool.Get()
defer conn.Close()
psc := redis.PubSubConn{
Conn: conn,
}
@ -170,6 +172,8 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame
//Save to redis db and publish notification via redis transaction
conn := rps.redisPool.Get()
defer conn.Close()
conn.Send("MULTI")
conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON)
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2)
@ -205,6 +209,8 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
//REM from redis db
conn := rps.redisPool.Get()
defer conn.Close()
conn.Send("MULTI")
conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON)
@ -216,6 +222,8 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
//Load data from zset
func (rps *RedisPeriodicScheduler) Load() error {
conn := rps.redisPool.Get()
defer conn.Close()
bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES"))
if err != nil {
return err
@ -258,6 +266,8 @@ func (rps *RedisPeriodicScheduler) Load() error {
//Clear is implementation of the same method in period.Interface
func (rps *RedisPeriodicScheduler) Clear() error {
conn := rps.redisPool.Get()
defer conn.Close()
_, err := conn.Do("ZREMRANGEBYRANK", utils.KeyPeriodicPolicy(rps.namespace), 0, -1)
return err
@ -269,6 +279,8 @@ func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) {
}
conn := rps.redisPool.Get()
defer conn.Close()
count, err := redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicy(rps.namespace), rawPolicy))
return count, err == nil
}

View File

@ -34,6 +34,7 @@ func NewSweeper(namespace string, pool *redis.Pool, client *work.Client) *Sweepe
func (s *Sweeper) ClearOutdatedScheduledJobs() error {
//Check if other workpool has done the action
conn := s.redisPool.Get()
defer conn.Close()
//Lock
r, err := conn.Do("SET", utils.KeyPeriodicLock(s.namespace), time.Now().Unix(), "EX", 30, "NX")

View File

@ -33,6 +33,7 @@ const (
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
type GoCraftWorkPool struct {
namespace string
redisPool *redis.Pool
pool *work.WorkerPool
enqueuer *work.Enqueuer
@ -81,6 +82,7 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
return &GoCraftWorkPool{
namespace: cfg.Namespace,
redisPool: redisPool,
pool: pool,
enqueuer: enqueuer,
@ -210,7 +212,13 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i
return models.JobStats{}, err
}
return generateResult(j), nil
res := generateResult(j, job.JobKindGeneric, isUnique)
if err := gcwp.saveStats(res); err != nil {
//Once running job, let it fly away
//The client method may help if the job is still in progress when get stats of this job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
}
return res, nil
}
//Schedule job
@ -231,8 +239,14 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters,
return models.JobStats{}, err
}
res := generateResult(j.Job)
res.Stats.RunAt = time.Unix(j.RunAt, 0)
res := generateResult(j.Job, job.JobKindScheduled, isUnique)
res.Stats.RunAt = j.RunAt
if err := gcwp.saveStats(res); err != nil {
//As job is already scheduled, we should not block this call
//Use client method to help get the status of this fly-away job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
}
return res, nil
}
@ -245,13 +259,14 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P
}
//TODO: Need more data
//TODO: EnqueueTime should be got from cron spec
return models.JobStats{
Stats: &models.JobStatData{
JobID: id,
JobName: jobName,
Status: job.JobStatusPending,
EnqueueTime: time.Unix(time.Now().Unix(), 0),
UpdateTime: time.Unix(time.Now().Unix(), 0),
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", id),
},
}, nil
@ -295,9 +310,47 @@ func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (bool, bool) {
return ok, v
}
//Clear the invalid data on redis db, such as outdated scheduled jobs etc.
func (gcwp *GoCraftWorkPool) clearDirtyData() {
func (gcwp *GoCraftWorkPool) saveStats(stats models.JobStats) error {
conn := gcwp.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(gcwp.namespace, stats.Stats.JobID)
args := make([]interface{}, 0)
args = append(args, key)
args = append(args,
"id", stats.Stats.JobID,
"name", stats.Stats.JobName,
"kind", stats.Stats.JobKind,
"unique", stats.Stats.IsUnique,
"status", stats.Stats.Status,
"ref_link", stats.Stats.RefLink,
"enqueue_time", stats.Stats.EnqueueTime,
"update_time", stats.Stats.UpdateTime,
"run_at", stats.Stats.RunAt,
)
if stats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(stats.Stats.CheckIn) {
args = append(args,
"check_in", stats.Stats.CheckIn,
"check_in_at", stats.Stats.CheckInAt,
)
}
conn.Send("HMSET", args...)
//If job kind is periodic job, expire time should be set
//If job kind is scheduled job, expire time should be runAt+1day
if stats.Stats.JobKind != job.JobKindPeriodic {
var expireTime int64 = 60 * 60 * 24
if stats.Stats.JobKind == job.JobKindScheduled {
nowTime := time.Now().Unix()
future := stats.Stats.RunAt - nowTime
if future > 0 {
expireTime += future
}
}
conn.Send("EXPIRE", key, expireTime)
}
return conn.Flush()
}
//log the job
@ -308,7 +361,7 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc)
}
//generate the job stats data
func generateResult(j *work.Job) models.JobStats {
func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats {
if j == nil {
return models.JobStats{}
}
@ -317,9 +370,11 @@ func generateResult(j *work.Job) models.JobStats {
Stats: &models.JobStatData{
JobID: j.ID,
JobName: j.Name,
JobKind: jobKind,
IsUnique: isUnique,
Status: job.JobStatusPending,
EnqueueTime: time.Unix(j.EnqueuedAt, 0),
UpdateTime: time.Unix(time.Now().Unix(), 0),
EnqueueTime: j.EnqueuedAt,
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID),
},
}

View File

@ -81,3 +81,8 @@ func KeyPeriodicNotification(namespace string) string {
func KeyPeriodicLock(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "lock")
}
//KeyJobStats returns the key of job stats
func KeyJobStats(namespace string, jobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID)
}