fix[jobservice]:job status is hung after restart

- improve the status hook sending/resending approach
- improve the status compare and set approach
- simplify the relevant flow
- add reaper to fix the out of sync jobs
- fix #10244 , fix #9963

Signed-off-by: Steven Zou <>
This commit is contained in:
Steven Zou 2020-01-08 18:09:43 +08:00
parent 6d80803dbb
commit e899d659f3
30 changed files with 1694 additions and 1033 deletions

View File

@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
// SyncList is a sync list based on the container/list
type SyncList struct {
// For synchronization
lock *sync.RWMutex
// Use interface slice as the backend data struct
l *list.List
// New a sync list
func New() *SyncList {
return &SyncList{
lock: &sync.RWMutex{},
l: list.New(),
// Iterate the list
func (l *SyncList) Iterate(f func(ele interface{}) bool) {
defer l.lock.RUnlock()
// Get the front pointer
for e := l.l.Front(); e != nil; {
// Keep the next one
next := e.Next()
if f(e.Value) {
e = next
// Push the element to the back of the list
func (l *SyncList) Push(ele interface{}) {
if ele != nil {
defer l.lock.Unlock()

View File

@ -0,0 +1,57 @@
// Copyright Project Harbor Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
type ListSuite struct {
l *SyncList
func TestListSuite(t *testing.T) {
suite.Run(t, &ListSuite{})
func (suite *ListSuite) SetupSuite() {
suite.l = New()
suite.Equal(4, suite.l.l.Len())
func (suite *ListSuite) TestIterate() {
suite.l.Iterate(func(ele interface{}) bool {
if s, ok := ele.(string); ok {
if strings.HasPrefix(s, "b") {
return true
return false
suite.Equal(3, suite.l.l.Len())

View File

@ -86,3 +86,33 @@ func KeyHookEventRetryQueue(namespace string) string {
func KeyStatusUpdateRetryQueue(namespace string) string { func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events") return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")
} }
// KeyJobTrackInProgress returns the key of in progress jobs tracking queue
func KeyJobTrackInProgress(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "job_track:inprogress")
// KeyJobs returns the key of the specified job queue
func KeyJobs(namespace, jobType string) string {
return fmt.Sprintf("%sjobs:%s", KeyNamespacePrefix(namespace), jobType)
// KeyJobLock returns the key of lock for the specified job type.
func KeyJobLock(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock", KeyJobs(namespace, jobType))
// KeyJobLockInfo returns the key of lock_info for the specified job type.
func KeyJobLockInfo(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock_info", KeyJobs(namespace, jobType))
// KeyInProgressQueue returns the key of the in progress queue for the specified job type.
func KeyInProgressQueue(namespace string, jobType string, workerPoolID string) string {
return fmt.Sprintf("%s:%s:inprogress", KeyJobs(namespace, jobType), workerPoolID)
// KeyWorkerPools returns the key of the worker pool
func KeyWorkerPools(namespace string) string {
return KeyNamespacePrefix(namespace) + "worker_pools"

View File

@ -0,0 +1,273 @@
// Copyright Project Harbor Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rds
import (
const (
requeueKeysPerJob = 4
// luaFuncStCodeText is common lua script function
var luaFuncStCodeText = `
-- for easily compare status
local stMap = { ['Pending'] = 0, ['Scheduled'] = 1, ['Running'] = 2, ['Success'] = 3, ['Stopped'] = 3, ['Error'] = 3 }
local function stCode(status)
return stMap[status]
// luaFuncCompareText is common lua script function
var luaFuncCompareText = `
local function compare(status, revision, checkInT)
local sCode = stCode(status)
local aCode = stCode(ARGV[1])
local aRev = tonumber(ARGV[2])
local aCheckInT = tonumber(ARGV[3])
if revision < aRev or
( revision == aRev and sCode < aCode ) or
( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT))
return 'ok'
return 'no'
// Script used to set the status of the job
// KEY[1]: key of job stats
// KEY[2]: key of inprogress track
// ARGV[1]: status text
// ARGV[2]: stats revision
// ARGV[3]: update timestamp
// ARGV[4]: job ID
var setStatusScriptText = fmt.Sprintf(`
local res, st, rev, aCode, aRev
res ='hmget', KEYS[1], 'status', 'revision')
if res then
st = res[1]
aCode = stCode(ARGV[1])
rev = tonumber(res[2])
aRev = tonumber(ARGV[2])
-- set same status repeatedly is allowed
if rev < aRev or ( rev == aRev and (stCode(st) < aCode or st == ARGV[1])) then'hmset', KEYS[1], 'status', ARGV[1], 'update_time', ARGV[3])
-- update inprogress track if necessary
if aCode == 3 then
-- final status
local c ='hincrby', KEYS[2], ARGV[4], -1)
-- lock count is 0, del it
if c <= 0 then'hdel', KEYS[2], ARGV[4])
if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then
-- expire the job stats with shorter interval'expire', KEYS[1], 86400)
return 'ok'
return st
`, luaFuncStCodeText)
// SetStatusScript is lua script for setting job status atomically
var SetStatusScript = redis.NewScript(2, setStatusScriptText)
// Used to check if the status info provided is still validate
// KEY[1]: key of job stats
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
var isStatusMatchScriptText = fmt.Sprintf(`
local res, st, rev, checkInAt, ack
res ='hmget', KEYS[1], 'status', 'revision', 'check_in_at', 'ack')
if res then
st = res[1]
rev = tonumber(res[2])
checkInAt = tonumber(res[3])
ack = res[4]
local reply = compare(st, rev, checkInAt)
if reply == 'ok' then
if not ack then
return 'ok'
-- ack exists, compare with ack
local a = cjson.decode(ack)
st = a['status']
rev = a['revision']
checkInAt = a['check_in_at']
local reply2 = compare(st, rev, checkInAt)
if reply2 == 'ok' then
return 'ok'
return 'no'
`, luaFuncStCodeText, luaFuncCompareText)
// CheckStatusMatchScript is lua script for checking if the provided status is still matching
// the backend status.
var CheckStatusMatchScript = redis.NewScript(1, isStatusMatchScriptText)
// Used to set the hook ACK
// KEY[1]: key of job stats
// KEY[2]: key of inprogress track
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
// ARGV[4]: job ID
var hookAckScriptText = fmt.Sprintf(`
local function canSetAck(jk, nrev)
local res ='hmget', jk, 'revision', 'ack')
if res then
local rev = tonumber(res[1])
local ackv = res[2]
if ackv then
-- ack existing
local ack = cjson.decode(ackv)
local cmp = compare(ack['status'], ack['revision'], ack['check_in_at'])
if cmp == 'ok' then
return 'ok'
-- no ack yet
if rev <= nrev then
return 'ok'
return nil
if canSetAck(KEYS[1], tonumber(ARGV[2])) ~= 'ok' then
return 'none'
-- can-set-ack check is ok
-- set new ack
local newAck = {['status'] = ARGV[1], ['revision'] = tonumber(ARGV[2]), ['check_in_at'] = tonumber(ARGV[3])}
local ackJson = cjson.encode(newAck)'hset', KEYS[1], 'ack', ackJson)
-- update the inprogress track
if stCode(ARGV[1]) == 3 then
-- final status
local c ='hincrby', KEYS[2], ARGV[4], -1)
-- lock count is 0, del it
if c <= 0 then'hdel', KEYS[2], ARGV[4])
return 'ok'
`, luaFuncStCodeText, luaFuncCompareText)
// HookAckScript is defined to set the hook event ACK in the job stats map
var HookAckScript = redis.NewScript(2, hookAckScriptText)
// Used to reset job status
// KEYS[1]: key of job stats
// KEYS[2]: key of inprogress job track
// ARGV[1]: job ID
// ARGV[2]: start status
// ARGV[3]: revision of job stats
var statusResetScriptText = `
local now = tonumber(ARGV[3])
-- reset status and revision'hmset', KEYS[1], 'status', ARGV[2], 'revision', now, 'update_time', now)'hdel', KEYS[1], 'ack', 'check_in', 'check_in_at')
-- reset inprogress track'hset', KEYS[2], ARGV[1], 2)
// StatusResetScript is lua script to reset the job stats
var StatusResetScript = redis.NewScript(2, statusResetScriptText)
// Copy from upstream worker framework
// Used by the reaper to re-enqueue jobs that were in progress
// KEYS[1] = the 1st job's in progress queue
// KEYS[2] = the 1st job's job queue
// KEYS[3] = the 2nd job's in progress queue
// KEYS[4] = the 2nd job's job queue
// ...
// KEYS[N] = the last job's in progress queue
// KEYS[N+1] = the last job's job queue
// ARGV[1] = workerPoolID for job queue
var redisLuaReenqueueJob = fmt.Sprintf(`
local function releaseLock(lockKey, lockInfoKey, workerPoolID)'decr', lockKey)'hincrby', lockInfoKey, workerPoolID, -1)
local keylen = #KEYS
local res, jobQueue, inProgQueue, workerPoolID, lockKey, lockInfoKey
workerPoolID = ARGV[1]
for i=1,keylen,%d do
inProgQueue = KEYS[i]
jobQueue = KEYS[i+1]
lockKey = KEYS[i+2]
lockInfoKey = KEYS[i+3]
res ='rpoplpush', inProgQueue, jobQueue)
if res then
releaseLock(lockKey, lockInfoKey, workerPoolID)
return {res, inProgQueue, jobQueue}
return nil`, requeueKeysPerJob)
// RedisLuaReenqueueScript returns redis script of redisLuaReenqueueJob
func RedisLuaReenqueueScript(jobTypesCount int) *redis.Script {
return redis.NewScript(jobTypesCount*requeueKeysPerJob, redisLuaReenqueueJob)

View File

@ -2,10 +2,11 @@ package rds
import ( import (
"fmt" "fmt"
"time" "time"
) )
// ErrNoElements is a pre defined error to describe the case that no elements got // ErrNoElements is a pre defined error to describe the case that no elements got

View File

@ -17,31 +17,22 @@ package hook
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"net/url" "net/url"
"sync" "sync"
"" ""
"" ""
"" ""
"" ""
"" ""
) )
const ( const (
// Influenced by the worker number setting
maxEventChanBuffer = 1024
// Max concurrent client handlers
maxHandlers = 5
// The max time for expiring the retrying events // The max time for expiring the retrying events
// 180 days // 1 day
maxEventExpireTime = 3600 * 24 * 180 maxEventExpireTime = 3600 * 24
// Waiting a short while if any errors occurred // Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found // Waiting for long while if no retrying elements found
@ -52,10 +43,9 @@ const (
type Agent interface { type Agent interface {
// Trigger hooks // Trigger hooks
Trigger(evt *Event) error Trigger(evt *Event) error
// Serves events now
// Serves retry loop now
Serve() error Serve() error
// Attach a job life cycle controller
Attach(ctl lcm.Controller)
} }
// Event contains the hook URL and the data // Event contains the hook URL and the data
@ -95,47 +85,53 @@ type basicAgent struct {
context context.Context context context.Context
namespace string namespace string
client Client client Client
ctl lcm.Controller
events chan *Event
tokens chan bool
redisPool *redis.Pool redisPool *redis.Pool
wg *sync.WaitGroup wg *sync.WaitGroup
} }
// NewAgent is constructor of basic agent // NewAgent is constructor of basic agent
func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent { func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent {
tks := make(chan bool, maxHandlers)
// Put tokens
for i := 0; i < maxHandlers; i++ {
tks <- true
return &basicAgent{ return &basicAgent{
context: ctx.SystemContext, context: ctx.SystemContext,
namespace: ns, namespace: ns,
client: NewClient(ctx.SystemContext), client: NewClient(ctx.SystemContext),
events: make(chan *Event, maxEventChanBuffer),
tokens: tks,
redisPool: redisPool, redisPool: redisPool,
wg: ctx.WG, wg: ctx.WG,
} }
} }
// Attach a job life cycle controller
func (ba *basicAgent) Attach(ctl lcm.Controller) {
ba.ctl = ctl
// Trigger implements the same method of interface @Agent // Trigger implements the same method of interface @Agent
func (ba *basicAgent) Trigger(evt *Event) error { func (ba *basicAgent) Trigger(evt *Event) error {
if evt == nil { if evt == nil {
return errors.New("nil event") return errors.New("nil web hook event")
} }
if err := evt.Validate(); err != nil { if err := evt.Validate(); err != nil {
return err return errors.Wrap(err, "trigger error")
} } <- evt // Treat hook event is success if it is successfully sent or cached in the retry queue.
if err := ba.client.SendEvent(evt); err != nil {
// Push event to the retry queue
if er := ba.pushForRetry(evt); er != nil {
// Failed to push to the hook event retry queue, return error with all context
return errors.Wrap(er, err.Error())
logger.Warningf("Send hook event '%s' to '%s' failed with error: %s; push hook event to the queue for retrying later", evt.Message, evt.URL, err)
// Treat as successful hook event as the event has been put into the retry queue for future resending.
return nil
// Mark event hook ACK including "revision", "status" and "check_in_at" in the job stats to indicate
// the related hook event has been successfully fired.
// The ACK can be used by the reaper to justify if the hook event should be resent again.
// The failure of persisting this ACK may cause duplicated hook event resending issue, which
// can be ignored.
if err := ba.ack(evt); err != nil {
// Just log error
logger.Error(errors.Wrap(err, "trigger"))
return nil return nil
} }
@ -144,64 +140,14 @@ func (ba *basicAgent) Trigger(evt *Event) error {
// Termination depends on the system context // Termination depends on the system context
// Blocking call // Blocking call
func (ba *basicAgent) Serve() error { func (ba *basicAgent) Serve() error {
if ba.ctl == nil {
return errors.New("nil life cycle controller of hook agent")
ba.wg.Add(1) ba.wg.Add(1)
go ba.loopRetry() go ba.loopRetry()
logger.Info("Hook event retrying loop is started") logger.Info("Hook event retrying loop is started")
go ba.serve()
logger.Info("Basic hook agent is started")
return nil return nil
} }
func (ba *basicAgent) serve() {
defer func() {
logger.Info("Basic hook agent is stopped")
for {
select {
case evt := <
// if exceed, wait here
// avoid too many request connections at the same time
go func(evt *Event) {
defer func() {
ba.tokens <- true // return token
if err := ba.client.SendEvent(evt); err != nil {
logger.Errorf("Send hook event '%s' to '%s' failed with error: %s; push to the queue for retrying later", evt.Message, evt.URL, err)
// Push event to the retry queue
if err := ba.pushForRetry(evt); err != nil {
// Failed to push to the retry queue, let's directly push it
// to the event channel of this node with reasonable backoff time.
logger.Errorf("Failed to push hook event to the retry queue: %s", err)
// Put to the event chan after waiting for a reasonable while,
// waiting is important, it can avoid sending large scale failure expecting
// requests in a short while.
// As 'pushForRetry' has checked the timestamp and expired event
// will be directly discarded and nil error is returned, no need to
// check it again here.
<-time.After(time.Duration(rand.Int31n(55)+5) * time.Second) <- evt
case <-ba.context.Done():
func (ba *basicAgent) pushForRetry(evt *Event) error { func (ba *basicAgent) pushForRetry(evt *Event) error {
if evt == nil { if evt == nil {
// do nothing // do nothing
@ -248,11 +194,7 @@ func (ba *basicAgent) loopRetry() {
ba.wg.Done() ba.wg.Done()
}() }()
token := make(chan bool, 1)
token <- true
for { for {
if err := ba.reSend(); err != nil { if err := ba.reSend(); err != nil {
waitInterval := shortLoopInterval waitInterval := shortLoopInterval
if err == rds.ErrNoElements { if err == rds.ErrNoElements {
@ -270,44 +212,47 @@ func (ba *basicAgent) loopRetry() {
return return
} }
} }
// Put token back
token <- true
} }
} }
func (ba *basicAgent) reSend() error { func (ba *basicAgent) reSend() error {
evt, err := ba.popMinOne()
if err != nil {
return err
jobID, status, err := extractJobID(evt.Data)
if err != nil {
return err
t, err := ba.ctl.Track(jobID)
if err != nil {
return err
diff := status.Compare(job.Status(t.Job().Info.Status))
if diff > 0 ||
(diff == 0 && t.Job().Info.CheckIn != evt.Data.CheckIn) { <- evt
return nil
return errors.Errorf("outdated hook event: %s, latest job status: %s", evt.Message, t.Job().Info.Status)
func (ba *basicAgent) popMinOne() (*Event, error) {
conn := ba.redisPool.Get() conn := ba.redisPool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
logger.Error(errors.Wrap(err, "resend"))
}() }()
// Pick up one queued event for resending
evt, err := ba.popMinOne(conn)
if err != nil {
return err
// Args for executing script
args := []interface{}{
rds.KeyJobStats(ba.namespace, evt.Data.JobID),
// If failed to check the status matching, just ignore it, continue the resending
reply, err := redis.String(rds.CheckStatusMatchScript.Do(conn, args...))
if err != nil {
// Log error
logger.Error(errors.Wrap(err, "resend"))
} else {
if reply != "ok" {
return errors.Errorf("outdated hook event: %s", evt.Message)
return ba.Trigger(evt)
// popMinOne picks up one event for retrying
func (ba *basicAgent) popMinOne(conn redis.Conn) (*Event, error) {
key := rds.KeyHookEventRetryQueue(ba.namespace) key := rds.KeyHookEventRetryQueue(ba.namespace)
minOne, err := rds.ZPopMin(conn, key) minOne, err := rds.ZPopMin(conn, key)
if err != nil { if err != nil {
@ -327,17 +272,33 @@ func (ba *basicAgent) popMinOne() (*Event, error) {
return evt, nil return evt, nil
} }
// Extract the job ID and status from the event data field // ack hook event
// First return is job ID func (ba *basicAgent) ack(evt *Event) error {
// Second return is job status conn := ba.redisPool.Get()
// Last one is error defer func() {
func extractJobID(data *job.StatusChange) (string, job.Status, error) { if err := conn.Close(); err != nil {
if data != nil && len(data.JobID) > 0 { logger.Error(errors.Wrap(err, "ack"))
status := job.Status(data.Status)
if status.Validate() == nil {
return data.JobID, status, nil
} }
k := rds.KeyJobStats(ba.namespace, evt.Data.JobID)
k2 := rds.KeyJobTrackInProgress(ba.namespace)
reply, err := redis.String(rds.HookAckScript.Do(
if err != nil {
return errors.Wrap(err, "ack")
} }
return "", "", errors.New("malform job status change data") if reply != "ok" {
return errors.Errorf("no ack done for event: %s", evt.Message)
return nil
} }

View File

@ -16,35 +16,34 @@ package hook
import ( import (
"context" "context"
"testing" "testing"
"time" "time"
"" ""
"" ""
"" ""
"" ""
"" ""
"" ""
"" ""
) )
// HookAgentTestSuite tests functions of hook agent // HookAgentTestSuite tests functions of hook agent
type HookAgentTestSuite struct { type HookAgentTestSuite struct {
suite.Suite suite.Suite
pool *redis.Pool
namespace string namespace string
lcmCtl lcm.Controller pool *redis.Pool
agent *basicAgent
envContext *env.Context event *Event
cancel context.CancelFunc jid string
} }
// TestHookAgentTestSuite is entry of go test // TestHookAgentTestSuite is entry of go test
@ -57,14 +56,11 @@ func (suite *HookAgentTestSuite) SetupSuite() {
suite.pool = tests.GiveMeRedisPool() suite.pool = tests.GiveMeRedisPool()
suite.namespace = tests.GiveMeTestNamespace() suite.namespace = tests.GiveMeTestNamespace()
ctx, cancel := context.WithCancel(context.Background()) suite.agent = &basicAgent{
suite.envContext = &env.Context{ context: context.TODO(),
SystemContext: ctx, namespace: suite.namespace,
WG: new(sync.WaitGroup), redisPool: suite.pool,
} }
suite.cancel = cancel
suite.lcmCtl = lcm.NewController(suite.envContext, suite.namespace, suite.pool, func(hookURL string, change *job.StatusChange) error { return nil })
} }
// TearDownSuite prepares test suites // TearDownSuite prepares test suites
@ -77,126 +73,121 @@ func (suite *HookAgentTestSuite) TearDownSuite() {
_ = tests.ClearAll(suite.namespace, conn) _ = tests.ClearAll(suite.namespace, conn)
} }
func (suite *HookAgentTestSuite) SetupTest() {
suite.jid = utils.MakeIdentifier()
rev := time.Now().Unix()
stats := &job.Stats{
Info: &job.StatsInfo{
JobID: suite.jid,
Status: job.RunningStatus.String(),
Revision: rev,
JobKind: job.KindGeneric,
JobName: job.SampleJob,
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New())
err := t.Save()
suite.NoError(err, "mock job stats")
suite.event = &Event{
URL: "",
Message: "HookAgentTestSuite",
Timestamp: time.Now().Unix(),
Data: &job.StatusChange{
JobID: suite.jid,
Status: job.SuccessStatus.String(),
Metadata: &job.StatsInfo{
JobID: suite.jid,
Status: job.SuccessStatus.String(),
Revision: rev,
JobKind: job.KindGeneric,
JobName: job.SampleJob,
func (suite *HookAgentTestSuite) TearDownTest() {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection")
k := rds.KeyHookEventRetryQueue(suite.namespace)
_, err := conn.Do("DEL", k)
suite.NoError(err, "tear down test cases")
// TestEventSending ... // TestEventSending ...
func (suite *HookAgentTestSuite) TestEventSending() { func (suite *HookAgentTestSuite) TestEventSending() {
done := make(chan bool, 1) mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(nil)
suite.agent.client = mc
expected := uint32(1300) // >1024 max err := suite.agent.Trigger(suite.event)
count := uint32(0)
counter := &count
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
c := atomic.AddUint32(counter, 1)
if c == expected {
done <- true
_, _ = fmt.Fprintln(w, "ok")
defer ts.Close()
// in case test failed and avoid dead lock
go func() {
<-time.After(time.Duration(10) * time.Second)
done <- true // time out
agent := NewAgent(suite.envContext, suite.namespace, suite.pool)
err := agent.Serve()
require.NoError(suite.T(), err, "agent serve: nil error expected but got %s", err)
go func() {
defer func() {
for i := uint32(0); i < expected; i++ {
changeData := &job.StatusChange{
JobID: fmt.Sprintf("job-%d", i),
Status: "running",
evt := &Event{
URL: ts.URL,
Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status),
Data: changeData,
Timestamp: time.Now().Unix(),
err := agent.Trigger(evt)
require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err) require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err)
// Check results // check
<-done suite.checkStatus()
require.Equal(suite.T(), expected, count, "expected %d hook events but only got %d", expected, count) }
// Wait // TestEventSending ...
suite.envContext.WG.Wait() func (suite *HookAgentTestSuite) TestEventSendingError() {
mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(errors.New("internal server error: for testing"))
suite.agent.client = mc
err := suite.agent.Trigger(suite.event)
// Failed to send by client, it should be put into retry queue, check it
// The return should still be nil
suite.NoError(err, "agent trigger: nil error expected but got %s", err)
} }
// TestRetryAndPopMin ... // TestRetryAndPopMin ...
func (suite *HookAgentTestSuite) TestRetryAndPopMin() { func (suite *HookAgentTestSuite) TestRetryAndPopMin() {
ctx := context.Background() mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(nil)
suite.agent.client = mc
tks := make(chan bool, maxHandlers) err := suite.agent.pushForRetry(suite.event)
// Put tokens suite.NoError(err, "push event for retry")
for i := 0; i < maxHandlers; i++ {
tks <- true
agent := &basicAgent{ err = suite.agent.reSend()
context: ctx, require.NoError(suite.T(), err, "resend error: %v", err)
namespace: suite.namespace,
client: NewClient(ctx),
events: make(chan *Event, maxEventChanBuffer),
tokens: tks,
redisPool: suite.pool,
changeData := &job.StatusChange{ // Check
JobID: "fake_job_ID", suite.checkRetryQueue(0)
Status: job.RunningStatus.String(), suite.checkStatus()
} }
evt := &Event{ func (suite *HookAgentTestSuite) checkStatus() {
URL: "", t := job.NewBasicTrackerWithID(context.TODO(), suite.jid, suite.namespace, suite.pool, nil, list.New())
Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status), err := t.Load()
Data: changeData, suite.NoError(err, "load updated job stats")
Timestamp: time.Now().Unix(), suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status")
} }
// Mock job stats func (suite *HookAgentTestSuite) checkRetryQueue(size int) {
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() err := conn.Close()
suite.NoError(err, "close redis connection")
}() }()
key := rds.KeyJobStats(suite.namespace, "fake_job_ID") k := rds.KeyHookEventRetryQueue(suite.namespace)
_, err := conn.Do("HSET", key, "status", job.SuccessStatus.String()) c, err := redis.Int(conn.Do("ZCARD", k))
require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err) suite.NoError(err, "check retry queue")
suite.Equal(size, c, "retry queue count")
err = agent.pushForRetry(evt) }
require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err)
type mockClient struct {
err = agent.reSend() mock.Mock
require.Error(suite.T(), err, "resend: non nil error expected but got nil") }
assert.Equal(suite.T(), 0, len(, "the hook event should be discard but actually not")
func (mc *mockClient) SendEvent(evt *Event) error {
// Change status args := mc.Called(evt)
_, err = conn.Do("HSET", key, "status", job.PendingStatus.String()) return args.Error(0)
require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err)
err = agent.pushForRetry(evt)
require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err)
err = agent.reSend()
require.Nil(suite.T(), err, "resend: nil error should be returned but got %s", err)
<-time.After(time.Duration(1) * time.Second)
assert.Equal(suite.T(), 1, len(, "the hook event should be requeued but actually not: %d", len(
} }

View File

@ -17,14 +17,15 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time" "time"
) )
// HookClientTestSuite tests functions of hook client // HookClientTestSuite tests functions of hook client
@ -57,7 +58,7 @@ func (suite *HookClientTestSuite) SetupSuite() {
return return
} }
fmt.Fprintln(w, "ok") _, _ = fmt.Fprintln(w, "ok")
})) }))
} }

View File

@ -19,6 +19,8 @@ import (
"os" "os"
"testing" "testing"
comcfg "" comcfg ""
"" ""
"" ""
@ -87,6 +89,7 @@ func (suite *ContextImplTestSuite) SetupSuite() {
suite.namespace, suite.namespace,
suite.pool, suite.pool,
nil, nil,
) )
err := suite.tracker.Save() err := suite.tracker.Save()

View File

@ -80,8 +80,8 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
ctx.Checkin("progress data: %60") ctx.Checkin("progress data: %60")
logger.Warning("Holding for 10 seconds") logger.Warning("Holding for 30 seconds")
<-time.After(10 * time.Second) <-time.After(30 * time.Second)
if cmd, ok := ctx.OPCommand(); ok { if cmd, ok := ctx.OPCommand(); ok {
if cmd == job.StopCommand { if cmd == job.StopCommand {

View File

@ -68,6 +68,14 @@ type StatsInfo struct {
NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
Parameters Parameters `json:"parameters,omitempty"` Parameters Parameters `json:"parameters,omitempty"`
Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job
HookAck *ACK `json:"ack,omitempty"`
// ACK is the acknowledge of hook event
type ACK struct {
Status string `json:"status"`
Revision int64 `json:"revision"`
CheckInAt int64 `json:"check_in_at"`
} }
// ActionRequest defines for triggering job action like stop/cancel. // ActionRequest defines for triggering job action like stop/cancel.
@ -87,6 +95,7 @@ type StatusChange struct {
type SimpleStatusChange struct { type SimpleStatusChange struct {
JobID string `json:"job_id"` JobID string `json:"job_id"`
TargetStatus string `json:"target_status"` TargetStatus string `json:"target_status"`
Revision int64 `json:"revision"`
} }
// Validate the job stats // Validate the job stats

View File

@ -21,6 +21,8 @@ import (
"strconv" "strconv"
"time" "time"
"" ""
"" ""
"" ""
@ -32,8 +34,6 @@ import (
const ( const (
// Try best to keep the job stats data but anyway clear it after a reasonable time // Try best to keep the job stats data but anyway clear it after a reasonable time
statDataExpireTime = 7 * 24 * 3600 statDataExpireTime = 7 * 24 * 3600
// 1 hour to discard the job stats of success jobs
statDataExpireTimeForSuccess = 3600
) )
// Tracker is designed to track the life cycle of the job described by the stats // Tracker is designed to track the life cycle of the job described by the stats
@ -102,6 +102,9 @@ type Tracker interface {
// Reset the status to `pending` // Reset the status to `pending`
Reset() error Reset() error
// Fire status hook to report the current status
FireHook() error
} }
// basicTracker implements Tracker interface based on redis // basicTracker implements Tracker interface based on redis
@ -112,6 +115,7 @@ type basicTracker struct {
jobID string jobID string
jobStats *Stats jobStats *Stats
callback HookCallback callback HookCallback
retryList *list.SyncList
} }
// NewBasicTrackerWithID builds a tracker with the provided job ID // NewBasicTrackerWithID builds a tracker with the provided job ID
@ -121,6 +125,7 @@ func NewBasicTrackerWithID(
ns string, ns string,
pool *redis.Pool, pool *redis.Pool,
callback HookCallback, callback HookCallback,
retryList *list.SyncList,
) Tracker { ) Tracker {
return &basicTracker{ return &basicTracker{
namespace: ns, namespace: ns,
@ -128,6 +133,7 @@ func NewBasicTrackerWithID(
pool: pool, pool: pool,
jobID: jobID, jobID: jobID,
callback: callback, callback: callback,
retryList: retryList,
} }
} }
@ -138,6 +144,7 @@ func NewBasicTrackerWithStats(
ns string, ns string,
pool *redis.Pool, pool *redis.Pool,
callback HookCallback, callback HookCallback,
retryList *list.SyncList,
) Tracker { ) Tracker {
return &basicTracker{ return &basicTracker{
namespace: ns, namespace: ns,
@ -146,6 +153,7 @@ func NewBasicTrackerWithStats(
jobStats: stats, jobStats: stats,
jobID: stats.Info.JobID, jobID: stats.Info.JobID,
callback: callback, callback: callback,
retryList: retryList,
} }
} }
@ -245,67 +253,44 @@ func (bt *basicTracker) Expire() error {
// Run job // Run job
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Run() error { func (bt *basicTracker) Run() error {
err := bt.compareAndSet(RunningStatus) if err := bt.setStatus(RunningStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "run")
if er := bt.fireHookEvent(RunningStatus); err == nil && er != nil {
return er
} }
return err return nil
} }
// Stop job // Stop job
// Stop is final status, if failed to do, retry should be enforced. // Stop is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Stop() error { func (bt *basicTracker) Stop() error {
err := bt.UpdateStatusWithRetry(StoppedStatus) if err := bt.setStatus(StoppedStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "stop")
if er := bt.fireHookEvent(StoppedStatus); err == nil && er != nil {
return er
} }
return err return nil
} }
// Fail job // Fail job
// Fail is final status, if failed to do, retry should be enforced. // Fail is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Fail() error { func (bt *basicTracker) Fail() error {
err := bt.UpdateStatusWithRetry(ErrorStatus) if err := bt.setStatus(ErrorStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "fail")
if er := bt.fireHookEvent(ErrorStatus); err == nil && er != nil {
return er
} }
return err return nil
} }
// Succeed job // Succeed job
// Succeed is final status, if failed to do, retry should be enforced. // Succeed is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Succeed() error { func (bt *basicTracker) Succeed() error {
err := bt.UpdateStatusWithRetry(SuccessStatus) if err := bt.setStatus(SuccessStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "succeed")
// Expire the stat data of the successful job
if er := bt.expire(statDataExpireTimeForSuccess); er != nil {
// Only logged
logger.Errorf("Expire stat data for the success job `%s` failed with error: %s", bt.jobID, er)
} }
if er := bt.fireHookEvent(SuccessStatus); err == nil && er != nil { return nil
return er
return err
} }
// Save the stats of job tracked by this tracker // Save the stats of job tracked by this tracker
@ -362,9 +347,13 @@ func (bt *basicTracker) Save() (err error) {
// Set the first revision // Set the first revision
args = append(args, "revision", time.Now().Unix()) args = append(args, "revision", time.Now().Unix())
// ACK data is saved/updated not via tracker, so ignore the ACK saving
// Do it in a transaction // Do it in a transaction
err = conn.Send("MULTI") err = conn.Send("MULTI")
err = conn.Send("HMSET", args...) err = conn.Send("HMSET", args...)
// Set inprogress track lock
err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2)
// If job kind is periodic job, expire time should not be set // If job kind is periodic job, expire time should not be set
// If job kind is scheduled job, expire time should be runAt+ // If job kind is scheduled job, expire time should be runAt+
@ -404,15 +393,14 @@ func (bt *basicTracker) Save() (err error) {
func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error { func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error {
err := bt.compareAndSet(targetStatus) err := bt.compareAndSet(targetStatus)
if err != nil { if err != nil {
// Status mismatching error will be ignored // Status mismatching error will be directly ignored as the status has already been outdated
if !errs.IsStatusMismatchError(err) { if !errs.IsStatusMismatchError(err) {
// Push to the retrying Q // Push to the retrying daemon
if er := bt.pushToQueueForRetry(targetStatus); er != nil { bt.retryList.Push(SimpleStatusChange{
logger.Errorf("push job status update request to retry queue error: %s", er) JobID: bt.jobID,
// If failed to put it into the retrying Q in case, let's downgrade to retry in current process TargetStatus: targetStatus.String(),
// by recursively call in goroutines. Revision: bt.jobStats.Info.Revision,
bt.retryUpdateStatus(targetStatus) })
} }
} }
@ -428,15 +416,48 @@ func (bt *basicTracker) Reset() error {
}() }()
now := time.Now().Unix() now := time.Now().Unix()
err := bt.Update( if _, err := rds.StatusResetScript.Do(
"status", conn,
rds.KeyJobStats(bt.namespace, bt.jobStats.Info.JobID),
PendingStatus.String(), PendingStatus.String(),
now, now,
) ); err != nil {
if err == nil { return errors.Wrap(err, "reset")
bt.refresh(PendingStatus) }
// Sync current tracker
bt.jobStats.Info.Status = PendingStatus.String()
bt.jobStats.Info.Revision = now bt.jobStats.Info.Revision = now
bt.jobStats.Info.UpdateTime = now
bt.jobStats.Info.CheckIn = ""
bt.jobStats.Info.CheckInAt = 0
return nil
// FireHook fires status hook event to report current status
func (bt *basicTracker) FireHook() error {
return bt.fireHookEvent(
// setStatus sets the job status to the target status and fire status change hook
func (bt *basicTracker) setStatus(status Status) error {
err := bt.UpdateStatusWithRetry(status)
if !errs.IsStatusMismatchError(err) {
if er := bt.fireHookEvent(status); er != nil {
// Add more error context
if err != nil {
return errors.Wrap(er, err.Error())
return er
} }
return err return err
@ -480,72 +501,32 @@ func (bt *basicTracker) fireHookEvent(status Status, checkIn ...string) error {
return nil return nil
} }
func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error {
simpleStatusChange := &SimpleStatusChange{
JobID: bt.jobID,
TargetStatus: targetStatus.String(),
rawJSON, err := json.Marshal(simpleStatusChange)
if err != nil {
return err
conn := bt.pool.Get()
defer func() {
_ = conn.Close()
key := rds.KeyStatusUpdateRetryQueue(bt.namespace)
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
_, err = conn.Do("ZADD", args...)
return err
func (bt *basicTracker) retryUpdateStatus(targetStatus Status) {
go func() {
select {
case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second):
// Check the update timestamp
if time.Now().Unix()-bt.jobStats.Info.UpdateTime < statDataExpireTime-24*3600 {
if err := bt.compareAndSet(targetStatus); err != nil {
logger.Errorf("Retry to update job status error: %s", err)
// Success
case <-bt.context.Done():
return // terminated
func (bt *basicTracker) compareAndSet(targetStatus Status) error { func (bt *basicTracker) compareAndSet(targetStatus Status) error {
conn := bt.pool.Get() conn := bt.pool.Get()
defer func() { defer func() {
_ = conn.Close() closeConn(conn)
}() }()
rootKey := rds.KeyJobStats(bt.namespace, bt.jobID) rootKey := rds.KeyJobStats(bt.namespace, bt.jobID)
trackKey := rds.KeyJobTrackInProgress(bt.namespace)
st, err := getStatus(conn, rootKey) reply, err := redis.String(rds.SetStatusScript.Do(
if err != nil { if err != nil {
return err return errors.Wrap(err, "compare and set status error")
} }
diff := st.Compare(targetStatus) if reply != "ok" {
if diff > 0 { return errs.StatusMismatchError(reply, targetStatus.String())
return errs.StatusMismatchError(st.String(), targetStatus.String())
} }
if diff == 0 {
// Desired matches actual
return nil return nil
return setStatus(conn, rootKey, targetStatus)
} }
// retrieve the stats of job tracked by this tracker from the backend data // retrieve the stats of job tracked by this tracker from the backend data
@ -626,11 +607,20 @@ func (bt *basicTracker) retrieve() error {
params := make(Parameters) params := make(Parameters)
if err := json.Unmarshal([]byte(value), &params); err == nil { if err := json.Unmarshal([]byte(value), &params); err == nil {
res.Info.Parameters = params res.Info.Parameters = params
} else {
logger.Error(errors.Wrap(err, "retrieve: tracker"))
} }
break break
case "revision": case "revision":
res.Info.Revision = parseInt64(value) res.Info.Revision = parseInt64(value)
break break
case "ack":
ack := &ACK{}
if err := json.Unmarshal([]byte(value), ack); err == nil {
res.Info.HookAck = ack
} else {
logger.Error(errors.Wrap(err, "retrieve: tracker"))
default: default:
break break
} }
@ -663,7 +653,7 @@ func (bt *basicTracker) expire(expireTime int64) error {
func getStatus(conn redis.Conn, key string) (Status, error) { func getStatus(conn redis.Conn, key string) (Status, error) {
values, err := rds.HmGet(conn, key, "status") values, err := rds.HmGet(conn, key, "status")
if err != nil { if err != nil {
return "", err return "", errors.Wrap(err, "get status error")
} }
if len(values) == 1 { if len(values) == 1 {
@ -676,10 +666,6 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
return "", errors.New("malformed status data returned") return "", errors.New("malformed status data returned")
} }
func setStatus(conn redis.Conn, key string, status Status) error {
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
func closeConn(conn redis.Conn) { func closeConn(conn redis.Conn) {
if conn != nil { if conn != nil {
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {

View File

@ -19,6 +19,8 @@ import (
"testing" "testing"
"time" "time"
"" ""
"" ""
"" ""
@ -77,6 +79,7 @@ func (suite *TrackerTestSuite) TestTracker() {
func(hookURL string, change *StatusChange) error { func(hookURL string, change *StatusChange) error {
return nil return nil
}, },
) )
err := tracker.Save() err := tracker.Save()
@ -107,12 +110,19 @@ func (suite *TrackerTestSuite) TestTracker() {
assert.Error(suite.T(), err, "run: non nil error expected but got nil") assert.Error(suite.T(), err, "run: non nil error expected but got nil")
err = tracker.CheckIn("check in") err = tracker.CheckIn("check in")
assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err) assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err)
// check in is allowed to be repeated
err = tracker.CheckIn("check in2")
assert.Nil(suite.T(), err, "check in2: nil error expected but got %s", err)
err = tracker.Succeed() err = tracker.Succeed()
assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err) assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err)
err = tracker.Stop() // same status is allowed to update
assert.Nil(suite.T(), err, "stop: nil error expected but got %s", err) err = tracker.Succeed()
assert.Nil(suite.T(), err, "succeed again: nil error expected but got %s", err)
// final status can be set only once
err = tracker.Fail() err = tracker.Fail()
assert.Nil(suite.T(), err, "fail: nil error expected but got %s", err) assert.Error(suite.T(), err, "fail: error expected but got nil")
t := NewBasicTrackerWithID( t := NewBasicTrackerWithID(
context.TODO(), context.TODO(),
@ -122,10 +132,26 @@ func (suite *TrackerTestSuite) TestTracker() {
func(hookURL string, change *StatusChange) error { func(hookURL string, change *StatusChange) error {
return nil return nil
}, },
) )
err = t.Load() err = t.Load()
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
var st Status
err = t.Reset()
assert.NoError(suite.T(), err)
st, err = t.Status()
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), PendingStatus, st)
err = t.Stop()
assert.NoError(suite.T(), err)
st, err = t.Status()
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), StoppedStatus, st)
err = t.Expire() err = t.Expire()
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
} }
@ -146,7 +172,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
}, },
} }
t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil) t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil, nil)
err := t.Save() err := t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -166,7 +192,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
}, },
} }
t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil) t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil, nil)
err = t2.Save() err = t2.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -177,32 +203,3 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
err = t2.PeriodicExecutionDone() err = t2.PeriodicExecutionDone()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
} }
// TestPushForRetry tests push for retry
func (suite *TrackerTestSuite) TestPushForRetry() {
ID := utils.MakeIdentifier()
runAt := time.Now().Add(1 * time.Hour).Unix()
jobStats := &Stats{
Info: &StatsInfo{
JobID: ID,
Status: ScheduledStatus.String(),
JobKind: KindScheduled,
JobName: SampleJob,
IsUnique: false,
RunAt: runAt,
EnqueueTime: runAt,
t := &basicTracker{
namespace: suite.namespace,
context: context.TODO(),
pool: suite.pool,
jobID: ID,
jobStats: jobStats,
callback: nil,
err := t.pushToQueueForRetry(RunningStatus)
require.NoError(suite.T(), err)

View File

@ -16,15 +16,21 @@ package lcm
import ( import (
"context" "context"
"encoding/json" "math/rand"
"" ""
"" ""
"" ""
"" ""
"" ""
"" ""
) )
const ( const (
@ -32,6 +38,10 @@ const (
shortLoopInterval = 5 * time.Second shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found // Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute longLoopInterval = 5 * time.Minute
// loopInterval is the interval for the loop of restoring dead status
loopInterval = 2 * time.Minute
// shortInterval is initial interval and be as based to give random buffer to loopInterval
shortInterval = 10
) )
// Controller is designed to control the life cycle of the job // Controller is designed to control the life cycle of the job
@ -53,6 +63,7 @@ type basicController struct {
pool *redis.Pool pool *redis.Pool
callback job.HookCallback callback job.HookCallback
wg *sync.WaitGroup wg *sync.WaitGroup
retryList *list.SyncList
} }
// NewController is the constructor of basic controller // NewController is the constructor of basic controller
@ -63,6 +74,7 @@ func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.H
pool: pool, pool: pool,
callback: callback, callback: callback,
wg: ctx.WG, wg: ctx.WG,
retryList: list.New(),
} }
} }
@ -86,7 +98,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
return nil, errors.Errorf("error occurred when creating job tracker: %s", err) return nil, errors.Errorf("error occurred when creating job tracker: %s", err)
} }
bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback) bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback, bc.retryList)
if err := bt.Save(); err != nil { if err := bt.Save(); err != nil {
return nil, err return nil, err
} }
@ -96,7 +108,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
// Track and attache with the job // Track and attache with the job
func (bc *basicController) Track(jobID string) (job.Tracker, error) { func (bc *basicController) Track(jobID string) (job.Tracker, error) {
bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback) bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback, bc.retryList)
if err := bt.Load(); err != nil { if err := bt.Load(); err != nil {
return nil, err return nil, err
} }
@ -105,75 +117,90 @@ func (bc *basicController) Track(jobID string) (job.Tracker, error) {
} }
// loopForRestoreDeadStatus is a loop to restore the dead states of jobs // loopForRestoreDeadStatus is a loop to restore the dead states of jobs
// Obviously,this retry is a try best action.
// The retry items are not persisted and they will be gone if the job service is restart.
func (bc *basicController) loopForRestoreDeadStatus() { func (bc *basicController) loopForRestoreDeadStatus() {
// Generate random timer duration
rd := func() time.Duration {
return longLoopInterval + time.Duration(rand.Int31n(shortInterval))*time.Second
defer func() { defer func() {
logger.Info("Status restoring loop is stopped") logger.Info("Status restoring loop is stopped")
bc.wg.Done() bc.wg.Done()
}() }()
token := make(chan bool, 1) // Initialize the timer
token <- true tm := time.NewTimer(shortInterval * time.Second)
defer tm.Stop()
for { for {
if err := bc.restoreDeadStatus(); err != nil {
waitInterval := shortLoopInterval
if err == rds.ErrNoElements {
// No elements
waitInterval = longLoopInterval
} else {
logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err)
// wait for a while or be terminated
select { select {
case <-time.After(waitInterval): case <-tm.C:
// Reset timer
// Retry the items in the list
case <-bc.context.Done(): case <-bc.context.Done():
return return // terminated
} }
} }
// Return token
token <- true
} }
// restoreDeadStatus try to restore the dead status // retryLoop iterates the retry queue and do retrying
func (bc *basicController) restoreDeadStatus() error { func (bc *basicController) retryLoop() {
// Get one // Get connection
deadOne, err := bc.popOneDead()
if err != nil {
return err
// Try to update status
t, err := bc.Track(deadOne.JobID)
if err != nil {
return err
return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus))
// popOneDead retrieves one dead status from the backend Q from lowest to highest
func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) {
conn := bc.pool.Get() conn := bc.pool.Get()
defer func() { defer func() {
_ = conn.Close() // Return redis connection
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %v : %s", err, "retry loop: lcm")
}() }()
key := rds.KeyStatusUpdateRetryQueue(bc.namespace) // Check the list
v, err := rds.ZPopMin(conn, key) bc.retryList.Iterate(func(ele interface{}) bool {
if change, ok := ele.(job.SimpleStatusChange); ok {
err := retry(conn, bc.namespace, change)
if err != nil { if err != nil {
return nil, err // Log the error
logger.Errorf("Failed to retry the status update action: %v : %s", err, "retry loop: lcm")
} }
if bytes, ok := v.([]byte); ok { if err == nil || errs.IsStatusMismatchError(err) {
ssc := &job.SimpleStatusChange{} return true
if err := json.Unmarshal(bytes, ssc); err == nil {
return ssc, nil
} }
} }
return nil, errors.New("pop one dead error: bad result reply") return false
// retry status update action
func retry(conn redis.Conn, ns string, change job.SimpleStatusChange) error {
// Debug
logger.Debugf("Retry the status update action: %v", change)
rootKey := rds.KeyJobStats(ns, change.JobID)
trackKey := rds.KeyJobTrackInProgress(ns)
reply, err := redis.String(rds.SetStatusScript.Do(
if err != nil {
return errors.Wrap(err, "retry")
if reply != "ok" {
return errs.StatusMismatchError(reply, change.TargetStatus)
return nil
} }

View File

@ -16,8 +16,10 @@ package lcm
import ( import (
"context" "context"
"encoding/json" "sync"
"" "testing"
"" ""
"" ""
"" ""
@ -26,9 +28,6 @@ import (
"" ""
"" ""
"" ""
) )
// LcmControllerTestSuite tests functions of life cycle controller // LcmControllerTestSuite tests functions of life cycle controller
@ -65,48 +64,36 @@ func TestLcmControllerTestSuite(t *testing.T) {
suite.Run(t, new(LcmControllerTestSuite)) suite.Run(t, new(LcmControllerTestSuite))
} }
// TestNewAndTrack tests controller.New() and controller.Track() // TestController tests lcm controller
func (suite *LcmControllerTestSuite) TestNewAndTrack() { func (suite *LcmControllerTestSuite) TestController() {
// Prepare mock data
jobID := utils.MakeIdentifier() jobID := utils.MakeIdentifier()
suite.newsStats(jobID) rev := time.Now().Unix()
suite.newsStats(jobID, rev)
simpleChange := job.SimpleStatusChange{
JobID: jobID,
TargetStatus: job.RunningStatus.String(),
Revision: rev,
// Just test if the server can be started
err := suite.ctl.Serve()
require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err)
// Test retry loop
bc := suite.ctl.(*basicController)
t, err := suite.ctl.Track(jobID) t, err := suite.ctl.Track(jobID)
require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err) require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err)
assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName) assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName)
} assert.Equal(suite.T(), job.RunningStatus.String(), t.Job().Info.Status)
// TestNew tests controller.Serve()
func (suite *LcmControllerTestSuite) TestServe() {
// Prepare mock data
jobID := utils.MakeIdentifier()
conn := suite.pool.Get()
defer func() {
_ = conn.Close()
simpleChange := &job.SimpleStatusChange{
JobID: jobID,
TargetStatus: job.RunningStatus.String(),
rawJSON, err := json.Marshal(simpleChange)
require.Nil(suite.T(), err, "json marshal: nil error expected but got %s", err)
key := rds.KeyStatusUpdateRetryQueue(suite.namespace)
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
_, err = conn.Do("ZADD", args...)
require.Nil(suite.T(), err, "prepare mock data: nil error expected but got %s", err)
err = suite.ctl.Serve()
require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err)
<-time.After(1 * time.Second)
count, err := redis.Int(conn.Do("ZCARD", key))
require.Nil(suite.T(), err, "get total dead status: nil error expected but got %s", err)
assert.Equal(suite.T(), 0, count)
} }
// newsStats create job stats // newsStats create job stats
func (suite *LcmControllerTestSuite) newsStats(jobID string) { func (suite *LcmControllerTestSuite) newsStats(jobID string, revision int64) {
stats := &job.Stats{ stats := &job.Stats{
Info: &job.StatsInfo{ Info: &job.StatsInfo{
JobID: jobID, JobID: jobID,
@ -114,6 +101,7 @@ func (suite *LcmControllerTestSuite) newsStats(jobID string) {
JobName: job.SampleJob, JobName: job.SampleJob,
IsUnique: true, IsUnique: true,
Status: job.PendingStatus.String(), Status: job.PendingStatus.String(),
Revision: revision,
}, },
} }

View File

@ -17,6 +17,9 @@ package mgt
import ( import (
"context" "context"
"fmt" "fmt"
"" ""
"" ""
"" ""
@ -27,8 +30,6 @@ import (
"" ""
"" ""
"" ""
) )
// Manager defies the related operations to handle the management of job stats. // Manager defies the related operations to handle the management of job stats.
@ -154,7 +155,7 @@ func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error)
statsKey := string(bytes) statsKey := string(bytes)
if i := strings.LastIndex(statsKey, ":"); i != -1 { if i := strings.LastIndex(statsKey, ":"); i != -1 {
jID := statsKey[i+1:] jID := statsKey[i+1:]
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil)
if err := t.Load(); err != nil { if err := t.Load(); err != nil {
logger.Errorf("retrieve stats data of job %s error: %s", jID, err) logger.Errorf("retrieve stats data of job %s error: %s", jID, err)
continue continue
@ -174,7 +175,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re
return nil, 0, errors.New("nil periodic job ID") return nil, 0, errors.New("nil periodic job ID")
} }
tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil) tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil, nil)
err = tracker.Load() err = tracker.Load()
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
@ -239,7 +240,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re
} }
for _, eID := range executionIDs { for _, eID := range executionIDs {
t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil, nil)
if er := t.Load(); er != nil { if er := t.Load(); er != nil {
logger.Errorf("track job %s error: %s", eID, err) logger.Errorf("track job %s error: %s", eID, err)
continue continue
@ -273,7 +274,7 @@ func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int6
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt) jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
} }
} }
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil)
err = t.Load() err = t.Load()
if err != nil { if err != nil {
// Just log it // Just log it
@ -293,7 +294,7 @@ func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) {
return nil, errs.BadRequestError("empty job ID") return nil, errs.BadRequestError("empty job ID")
} }
t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil, nil)
if err := t.Load(); err != nil { if err := t.Load(); err != nil {
return nil, err return nil, err
} }
@ -307,7 +308,7 @@ func (bm *basicManager) SaveJob(j *job.Stats) error {
return errs.BadRequestError("nil saving job stats") return errs.BadRequestError("nil saving job stats")
} }
t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil, nil)
return t.Save() return t.Save()
} }

View File

@ -16,7 +16,11 @@ package mgt
import ( import (
"context" "context"
"" ""
"" ""
"" ""
"" ""
@ -24,8 +28,6 @@ import (
"" ""
"" ""
"" ""
) )
// BasicManagerTestSuite tests the function of basic manager // BasicManagerTestSuite tests the function of basic manager
@ -56,7 +58,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() {
}, },
} }
t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil, list.New())
err := t.Save() err := t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -71,7 +73,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() {
UpstreamJobID: "1000", UpstreamJobID: "1000",
}, },
} }
t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil) t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil, list.New())
err = t.Save() err = t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
} }
@ -117,7 +119,7 @@ func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() {
assert.Equal(suite.T(), int64(1), total) assert.Equal(suite.T(), int64(1), total)
assert.Equal(suite.T(), int64(1), int64(len(jobs))) assert.Equal(suite.T(), int64(1), int64(len(jobs)))
t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil, list.New())
err = t.Load() err = t.Load()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
err = t.PeriodicExecutionDone() err = t.PeriodicExecutionDone()
@ -147,17 +149,17 @@ func (suite *BasicManagerTestSuite) TestGetScheduledJobs() {
}, },
} }
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New())
err = t.Save() err = t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{ l, total, err := suite.manager.GetScheduledJobs(&query.Parameter{
PageNumber: 1, PageNumber: 1,
PageSize: 10, PageSize: 10,
}) })
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), total) assert.Equal(suite.T(), int64(1), total)
assert.Equal(suite.T(), int64(1), int64(len(list))) assert.Equal(suite.T(), int64(1), int64(len(l)))
} }
// TestGetJob tests get job // TestGetJob tests get job

View File

@ -15,10 +15,9 @@
package period package period
import ( import (
"encoding/json" "context"
"time" "time"
"" ""
"" ""
"" ""
@ -52,27 +51,13 @@ func NewScheduler(ctx context.Context, namespace string, pool *redis.Pool, ctl l
} }
// Start the periodic scheduling process // Start the periodic scheduling process
// Blocking call here func (bs *basicScheduler) Start() {
func (bs *basicScheduler) Start() error { // Run once clean
defer func() {
logger.Info("Basic scheduler is stopped")
// Try best to do // Try best to do
go bs.clearDirtyJobs() go bs.clearDirtyJobs()
logger.Info("Basic scheduler is started")
// start enqueuer // start enqueuer
return bs.enqueuer.start() bs.enqueuer.start()
// Stop the periodic scheduling process
func (bs *basicScheduler) Stop() error {
// stop everything
bs.enqueuer.stopChan <- true
return nil
} }
// Schedule is implementation of the same method in period.Interface // Schedule is implementation of the same method in period.Interface
@ -99,24 +84,10 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) {
return -1, err return -1, err
} }
// Prepare publish message
m := &message{
Event: changeEventSchedule,
Data: p,
msgJSON, err := json.Marshal(m)
if err != nil {
return -1, err
pid := time.Now().Unix() pid := time.Now().Unix()
// Save to redis db and publish notification via redis transaction // Save to redis db
err = conn.Send("MULTI") if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil {
err = conn.Send("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON)
err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
if _, err := conn.Do("EXEC"); err != nil {
return -1, err return -1, err
} }
@ -166,25 +137,9 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID) return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID)
} }
notification := &message{ // REM from redis db
Event: changeEventUnSchedule, // Accurately remove the item with the specified score
Data: p, if _, err := conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID); err != nil {
msgJSON, err := json.Marshal(notification)
if err != nil {
return err
// REM from redis db with transaction way
err = conn.Send("MULTI")
err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score
err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
if err != nil {
return err
_, err = conn.Do("EXEC")
if err != nil {
return err return err
} }

View File

@ -16,6 +16,10 @@ package period
import ( import (
"context" "context"
"fmt" "fmt"
"" ""
"" ""
"" ""
@ -25,9 +29,6 @@ import (
"" ""
"" ""
"" ""
) )
// BasicSchedulerTestSuite tests functions of basic scheduler // BasicSchedulerTestSuite tests functions of basic scheduler
@ -63,6 +64,7 @@ func (suite *BasicSchedulerTestSuite) SetupSuite() {
) )
suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl) suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl)
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
@ -84,20 +86,6 @@ func TestSchedulerTestSuite(t *testing.T) {
// TestScheduler tests scheduling and un-scheduling // TestScheduler tests scheduling and un-scheduling
func (suite *BasicSchedulerTestSuite) TestScheduler() { func (suite *BasicSchedulerTestSuite) TestScheduler() {
go func() {
<-time.After(1 * time.Second)
_ = suite.scheduler.Stop()
go func() {
var err error
defer func() {
require.NoError(suite.T(), err, "start scheduler: nil error expected but got %s", err)
err = suite.scheduler.Start()
// Prepare one // Prepare one
now := time.Now() now := time.Now()
minute := now.Minute() minute := now.Minute()

View File

@ -43,14 +43,11 @@ type enqueuer struct {
namespace string namespace string
context context.Context context context.Context
pool *redis.Pool pool *redis.Pool
policyStore *policyStore
ctl lcm.Controller ctl lcm.Controller
// Diff with other nodes // Diff with other nodes
nodeID string nodeID string
// Track the error of enqueuing // Track the error of enqueuing
lastEnqueueErr error lastEnqueueErr error
// For stop
stopChan chan bool
} }
func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer { func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer {
@ -64,29 +61,20 @@ func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lc
context: ctx, context: ctx,
namespace: namespace, namespace: namespace,
pool: pool, pool: pool,
policyStore: newPolicyStore(ctx, namespace, pool),
ctl: ctl, ctl: ctl,
stopChan: make(chan bool, 1),
nodeID: nodeID.(string), nodeID: nodeID.(string),
} }
} }
// Blocking call // Blocking call
func (e *enqueuer) start() error { func (e *enqueuer) start() {
// Load policies first when starting
if err := e.policyStore.load(); err != nil {
return err
go e.loop() go e.loop()
logger.Info("Periodic enqueuer is started") logger.Info("Scheduler: periodic enqueuer is started")
return e.policyStore.serve()
} }
func (e *enqueuer) loop() { func (e *enqueuer) loop() {
defer func() { defer func() {
logger.Info("Periodic enqueuer is stopped") logger.Info("Scheduler: periodic enqueuer is stopped")
}() }()
// Do enqueue immediately when starting // Do enqueue immediately when starting
@ -98,10 +86,8 @@ func (e *enqueuer) loop() {
for { for {
select { select {
case <-e.stopChan: case <-e.context.Done():
// Stop policy store now return // exit
e.policyStore.stopChan <- true
case <-timer.C: case <-timer.C:
// Pause the timer for completing the processing this time // Pause the timer for completing the processing this time
timer.Reset(neverExecuted) timer.Reset(neverExecuted)
@ -157,10 +143,17 @@ func (e *enqueuer) enqueue() {
// Reset error track // Reset error track
e.lastEnqueueErr = nil e.lastEnqueueErr = nil
e.policyStore.Iterate(func(id string, p *Policy) bool { // Load policies and schedule next jobs for them
pls, err := Load(e.namespace, conn)
if err != nil {
// Log error
logger.Errorf("%s:%s", err, "enqueue error: enqueuer")
for _, p := range pls {
e.scheduleNextJobs(p, conn) e.scheduleNextJobs(p, conn)
return true }
} }
// scheduleNextJobs schedules job for next time slots based on the policy // scheduleNextJobs schedules job for next time slots based on the policy

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"" ""
"" ""
"" ""
@ -30,7 +31,6 @@ import (
"" ""
"" ""
"" ""
"" ""
) )
@ -69,8 +69,9 @@ func (suite *EnqueuerTestSuite) SetupSuite() {
func(hookURL string, change *job.StatusChange) error { return nil }, func(hookURL string, change *job.StatusChange) error { return nil },
) )
suite.enqueuer = newEnqueuer(ctx, suite.namespace, suite.pool, lcmCtl) suite.enqueuer = newEnqueuer(ctx, suite.namespace, suite.pool, lcmCtl)
suite.prepare() suite.prepare()
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
@ -87,15 +88,12 @@ func (suite *EnqueuerTestSuite) TearDownSuite() {
// TestEnqueuer tests enqueuer // TestEnqueuer tests enqueuer
func (suite *EnqueuerTestSuite) TestEnqueuer() { func (suite *EnqueuerTestSuite) TestEnqueuer() {
go func() {
defer func() {
suite.enqueuer.stopChan <- true
key := rds.RedisKeyScheduled(suite.namespace) key := rds.RedisKeyScheduled(suite.namespace)
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}() }()
tk := time.NewTicker(500 * time.Millisecond) tk := time.NewTicker(500 * time.Millisecond)
@ -116,10 +114,6 @@ func (suite *EnqueuerTestSuite) TestEnqueuer() {
return return
} }
} }
err := suite.enqueuer.start()
require.Nil(suite.T(), err, "enqueuer start: nil error expected but got %s", err)
} }
func (suite *EnqueuerTestSuite) prepare() { func (suite *EnqueuerTestSuite) prepare() {

View File

@ -15,26 +15,15 @@
package period package period
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"" ""
"" ""
"" ""
"" ""
"" ""
const (
// changeEventSchedule : Schedule periodic job policy event
changeEventSchedule = "Schedule"
// changeEventUnSchedule : UnSchedule periodic job policy event
changeEventUnSchedule = "UnSchedule"
) )
// Policy ... // Policy ...
@ -81,184 +70,14 @@ func (p *Policy) Validate() error {
return nil return nil
} }
// policyStore is in-memory cache for the periodic job policies. // Load all the policies from the backend storage.
type policyStore struct { func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
// k-v pair and key is the policy ID bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1))
hash *sync.Map
namespace string
context context.Context
pool *redis.Pool
// For stop
stopChan chan bool
// message is designed for sub/pub messages
type message struct {
Event string `json:"event"`
Data *Policy `json:"data"`
// newPolicyStore is constructor of policyStore
func newPolicyStore(ctx context.Context, ns string, pool *redis.Pool) *policyStore {
return &policyStore{
hash: new(sync.Map),
context: ctx,
namespace: ns,
pool: pool,
stopChan: make(chan bool, 1),
// Blocking call
func (ps *policyStore) serve() (err error) {
defer func() {
logger.Info("Periodical job policy store is stopped")
conn := ps.pool.Get()
psc := redis.PubSubConn{
Conn: conn,
defer func() {
_ = psc.Close()
// Subscribe channel
err = psc.Subscribe(redis.Args{}.AddFlat(rds.KeyPeriodicNotification(ps.namespace))...)
if err != nil { if err != nil {
return return nil, err
} }
// Channels for sub/pub ctl policies := make([]*Policy, 0)
errChan := make(chan error, 1)
done := make(chan bool, 1)
go func() {
for {
switch res := psc.Receive().(type) {
case error:
errChan <- fmt.Errorf("redis sub/pub chan error: %s", res.(error).Error())
case redis.Message:
m := &message{}
if err := json.Unmarshal(res.Data, m); err != nil {
// logged
logger.Errorf("Read invalid message: %s\n", res.Data)
if err := ps.sync(m); err != nil {
case redis.Subscription:
switch res.Kind {
case "subscribe":
logger.Infof("Subscribe redis channel %s", res.Channel)
case "unsubscribe":
// Unsubscribe all, means main goroutine is exiting
logger.Infof("Unsubscribe redis channel %s", res.Channel)
done <- true
logger.Info("Periodical job policy store is serving with policy auto sync enabled")
defer func() {
var unSubErr error
defer func() {
// Merge errors
finalErrs := make([]string, 0)
if unSubErr != nil {
finalErrs = append(finalErrs, unSubErr.Error())
if err != nil {
finalErrs = append(finalErrs, err.Error())
if len(finalErrs) > 0 {
// Override returned err or do nothing
err = errors.New(strings.Join(finalErrs, ";"))
// Unsubscribe all
if err := psc.Unsubscribe(); err != nil {
logger.Errorf("unsubscribe: %s", err)
// Confirm result
// Add timeout in case unsubscribe failed
select {
case unSubErr = <-errChan:
case <-done:
case <-time.After(30 * time.Second):
unSubErr = errors.New("unsubscribe time out")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
// blocking here
for {
select {
case <-ticker.C:
err = psc.Ping("ping!")
if err != nil {
case <-ps.stopChan:
return nil
case err = <-errChan:
// sync policy with backend list
func (ps *policyStore) sync(m *message) error {
if m == nil {
return errors.New("nil message")
if m.Data == nil {
return errors.New("missing data in the policy sync message")
switch m.Event {
case changeEventSchedule:
if err := ps.add(m.Data); err != nil {
return fmt.Errorf("failed to sync scheduled policy %s: %s", m.Data.ID, err)
case changeEventUnSchedule:
removed := ps.remove(m.Data.ID)
if removed == nil {
return fmt.Errorf("failed to sync unscheduled policy %s", m.Data.ID)
return fmt.Errorf("message %s is not supported", m.Event)
return nil
// Load all the policies from the backend to store
func (ps *policyStore) load() error {
conn := ps.pool.Get()
defer func() {
_ = conn.Close()
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(ps.namespace), 0, -1))
if err != nil {
return err
count := 0
for i, l := 0, len(bytes); i < l; i++ { for i, l := 0, len(bytes); i < l; i++ {
rawPolicy := bytes[i].([]byte) rawPolicy := bytes[i].([]byte)
p := &Policy{} p := &Policy{}
@ -266,62 +85,22 @@ func (ps *policyStore) load() error {
if err := p.DeSerialize(rawPolicy); err != nil { if err := p.DeSerialize(rawPolicy); err != nil {
// Ignore error which means the policy data is not valid // Ignore error which means the policy data is not valid
// Only logged // Only logged
logger.Errorf("malform policy: %s; error: %s\n", rawPolicy, err) logger.Errorf("Malformed policy: %s; error: %s", rawPolicy, err)
continue continue
} }
// Add to cache store // Validate the policy object
if err := ps.add(p); err != nil { if err := p.Validate(); err != nil {
// Only logged logger.Errorf("Policy validate error: %s", err)
logger.Errorf("cache periodic policies error: %s", err)
continue continue
} }
count++ policies = append(policies, p)
logger.Debugf("Load periodic job policy: %s", string(rawPolicy)) logger.Debugf("Load periodic job policy: %s", string(rawPolicy))
} }
logger.Infof("Load %d periodic job policies", count) logger.Debugf("Load %d periodic job policies", len(policies))
return nil return policies, nil
// Add one or more policy
func (ps *policyStore) add(item *Policy) error {
if item == nil {
return errors.New("nil policy to add")
if utils.IsEmptyStr(item.ID) {
return errors.New("malform policy to add")
v, _ := ps.hash.LoadOrStore(item.ID, item)
if v == nil {
return fmt.Errorf("failed to add policy: %s", item.ID)
return nil
// Iterate all the policies in the store
func (ps *policyStore) Iterate(f func(id string, p *Policy) bool) {
ps.hash.Range(func(k, v interface{}) bool {
return f(k.(string), v.(*Policy))
// Remove the specified policy from the store
func (ps *policyStore) remove(policyID string) *Policy {
if utils.IsEmptyStr(policyID) {
return nil
if v, ok := ps.hash.Load(policyID); ok {
return v.(*Policy)
return nil
} }

View File

@ -14,25 +14,23 @@
package period package period
import ( import (
"context" "testing"
"" ""
"" ""
"" ""
"" ""
"" ""
"" ""
) )
// PolicyStoreTestSuite tests functions of policy store // PolicyStoreTestSuite tests functions of policy store
type PolicyStoreTestSuite struct { type PolicyStoreTestSuite struct {
suite.Suite suite.Suite
store *policyStore
namespace string namespace string
pool *redis.Pool pool *redis.Pool
cancel context.CancelFunc
} }
// TestPolicyStoreTestSuite is entry of go test // TestPolicyStoreTestSuite is entry of go test
@ -44,37 +42,20 @@ func TestPolicyStoreTestSuite(t *testing.T) {
func (suite *PolicyStoreTestSuite) SetupSuite() { func (suite *PolicyStoreTestSuite) SetupSuite() {
suite.namespace = tests.GiveMeTestNamespace() suite.namespace = tests.GiveMeTestNamespace()
suite.pool = tests.GiveMeRedisPool() suite.pool = tests.GiveMeRedisPool()
ctx, cancel := context.WithCancel(context.Background())
suite.cancel = cancel = newPolicyStore(ctx, suite.namespace, suite.pool)
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
func (suite *PolicyStoreTestSuite) TearDownSuite() { func (suite *PolicyStoreTestSuite) TearDownSuite() {
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}() }()
_ = tests.ClearAll(suite.namespace, conn) if err := tests.ClearAll(suite.namespace, conn); err != nil {
} suite.NoError(err, "clear redis namespace")
// TestStore tests policy store serve
func (suite *PolicyStoreTestSuite) TestServe() {
var err error
defer func() { <- true
assert.Nil(suite.T(), err, "serve exit: nil error expected but got %s", err)
go func() {
err =
<-time.After(1 * time.Second)
} }
// TestLoad tests load policy from backend // TestLoad tests load policy from backend
@ -91,47 +72,17 @@ func (suite *PolicyStoreTestSuite) TestLoad() {
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}() }()
_, err = conn.Do("ZADD", key, time.Now().Unix(), rawData) _, err = conn.Do("ZADD", key, time.Now().Unix(), rawData)
assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err) assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err)
err = ps, err := Load(suite.namespace, conn)
assert.Nil(suite.T(), err, "load: nil error expected but got %s", err) suite.NoError(err, "load: nil error expected but got %s", err)
suite.Equal(1, len(ps), "count of loaded policies")
p1 := &Policy{
ID: "fake_policy_1",
JobName: job.SampleJob,
CronSpec: "5 * * * * *",
m := &message{
Event: changeEventSchedule,
Data: p1,
err =
assert.Nil(suite.T(), err, "sync schedule: nil error expected but got %s", err)
count := 0 string, p *Policy) bool {
return true
assert.Equal(suite.T(), 2, count, "expected 2 policies but got %d", count)
m1 := &message{
Event: changeEventUnSchedule,
Data: p1,
err =
assert.Nil(suite.T(), err, "sync unschedule: nil error expected but got %s", err)
count = 0 string, p *Policy) bool {
return true
assert.Equal(suite.T(), 1, count, "expected 1 policies but got %d", count)
} }
// TestPolicy tests policy itself // TestPolicy tests policy itself

View File

@ -18,15 +18,7 @@ package period
type Scheduler interface { type Scheduler interface {
// Start to serve periodic job scheduling process // Start to serve periodic job scheduling process
// //
// Returns: Start()
// error if any problems happened
Start() error
// Stop the working periodic job scheduling process
// Returns;
// error if any problems happened
Stop() error
// Schedule the specified cron job policy. // Schedule the specified cron job policy.
// //

View File

@ -17,7 +17,8 @@ package runner
import ( import (
"fmt" "fmt"
"runtime" "runtime"
"" ""
"" ""
@ -51,21 +52,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
runningJob job.Interface runningJob job.Interface
execContext job.Context execContext job.Context
tracker job.Tracker tracker job.Tracker
markStopped = bp(false)
) )
// Defer to log the exit result
defer func() {
if !*markStopped {
if err == nil {
logger.Infof("|^_^| Job '%s:%s' exit with success", j.Name, j.ID)
} else {
// log error
logger.Errorf("|@_@| Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
// Track the running job now // Track the running job now
jID := j.ID jID := j.ID
@ -75,60 +63,33 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
if tracker, err = rj.ctl.Track(jID); err != nil { if tracker, err = rj.ctl.Track(jID); err != nil {
// log error
logger.Errorf("Job '%s:%s' exit with error: failed to get job tracker: %s", j.Name, j.ID, err)
// Pay attentions here, if the job stats is lost (NOTFOUND error returned),
// directly return without retry again as we have no way to restore the stats again.
if errs.IsObjectNotFoundError(err) {
j.Fails = 10000000000 // never retry
// ELSE:
// As tracker creation failed, there is no way to mark the job status change. // As tracker creation failed, there is no way to mark the job status change.
// Also a non nil error return consumes a fail. If all retries are failed here, // Also a non nil error return consumes a fail. If all retries are failed here,
// it will cause the job to be zombie one (pending forever). // it will cause the job to be zombie one (pending forever).
// Here we will avoid the job to consume a fail and let it retry again and again. // Those zombie ones will be reaped by the reaper later.
// However, to avoid a forever retry, we will check the FailedAt timestamp.
now := time.Now().Unix()
if j.FailedAt == 0 || now-j.FailedAt < 2*24*3600 {
return return
} }
// Do operation based on the job status
jStatus := job.Status(tracker.Job().Info.Status)
switch jStatus {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
case job.StoppedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
markStopped = bp(true)
return nil
case job.ErrorStatus:
if j.FailedAt > 0 && j.Fails > 0 {
// Retry job
// Reset job info
if er := tracker.Reset(); er != nil {
// Log error and return the original error if existing
er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID))
if len(j.LastErr) > 0 {
return errors.New(j.LastErr)
return err
logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String())
// Defer to switch status // Defer to switch status
defer func() { defer func() {
// Switch job status based on the returned error. // Switch job status based on the returned error.
// The err happened here should not override the job run error, just log it. // The err happened here should not override the job run error, just log it.
if err != nil { if err != nil {
// log error
logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
if er := tracker.Fail(); er != nil { if er := tracker.Fail(); er != nil {
logger.Errorf("Mark job status to failure error: %s", err) logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er)
} }
return return
@ -136,19 +97,20 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Nil error might be returned by the stopped job. Check the latest status here. // Nil error might be returned by the stopped job. Check the latest status here.
// If refresh latest status failed, let the process to go on to void missing status updating. // If refresh latest status failed, let the process to go on to void missing status updating.
if latest, er := tracker.Status(); er == nil { if latest, er := tracker.Status(); er != nil {
logger.Errorf("Error occurred when getting the status of job %s:%s: %s", j.Name, j.ID, er)
} else {
if latest == job.StoppedStatus { if latest == job.StoppedStatus {
// Logged // Logged
logger.Infof("Job %s:%s is stopped", tracker.Job().Info.JobName, tracker.Job().Info.JobID) logger.Infof("Job %s:%s is stopped", j.Name, j.ID)
// Stopped job, no exit message printing.
markStopped = bp(true)
return return
} }
} }
// Mark job status to success. // Mark job status to success.
logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID)
if er := tracker.Succeed(); er != nil { if er := tracker.Succeed(); er != nil {
logger.Errorf("Mark job status to success error: %s", er) logger.Errorf("Error occurred when marking the status of job %s:%s to success: %s", j.Name, j.ID, er)
} }
}() }()
@ -163,6 +125,40 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
}() }()
// Do operation based on the job status
jStatus := job.Status(tracker.Job().Info.Status)
switch jStatus {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
case job.StoppedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
return nil
case job.RunningStatus, job.ErrorStatus:
// The failed jobs can be put into retry queue and the in progress jobs may be
// interrupted by a sudden service crash event, all those jobs can be rescheduled.
// Reset job info.
if err = tracker.Reset(); err != nil {
// Log error and return the original error if existing
err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID))
if len(j.LastErr) > 0 {
err = errors.Wrap(err, j.LastErr)
logger.Infof("Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
case job.SuccessStatus:
// do nothing
return nil
return errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())
// Build job context // Build job context
if rj.context.JobContext == nil { if rj.context.JobContext == nil {
rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext) rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext)
@ -189,6 +185,11 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
// Run the job // Run the job
err = runningJob.Run(execContext, j.Args) err = runningJob.Run(execContext, j.Args)
// Add error context
if err != nil {
err = errors.Wrap(err, "run error")
// Handle retry // Handle retry
rj.retry(runningJob, j) rj.retry(runningJob, j)
// Handle periodic job execution // Handle periodic job execution

View File

@ -15,12 +15,13 @@ package runner
import ( import (
"context" "context"
"os" "os"
"sync" "sync"
"testing" "testing"
"time" "time"
"" ""
"" ""
@ -154,7 +155,7 @@ func (suite *RedisRunnerTestSuite) TestJobWrapperInvalidTracker() {
redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl) redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl)
err := redisJob.Run(j) err := redisJob.Run(j)
require.Error(suite.T(), err, "redis job: non nil error expected but got nil") require.Error(suite.T(), err, "redis job: non nil error expected but got nil")
assert.Equal(suite.T(), int64(2), j.Fails) assert.Equal(suite.T(), int64(10000000000), j.Fails)
} }
// TestJobWrapperPanic tests job runner panic // TestJobWrapperPanic tests job runner panic

View File

@ -117,7 +117,12 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
hookCallback := func(URL string, change *job.StatusChange) error { hookCallback := func(URL string, change *job.StatusChange) error {
msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status) msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status)
if !utils.IsEmptyStr(change.CheckIn) { if !utils.IsEmptyStr(change.CheckIn) {
msg = fmt.Sprintf("%s, check_in=%s", msg, change.CheckIn) // Ignore the real check in message to avoid too big message stream
cData := change.CheckIn
if len(cData) > 256 {
cData = fmt.Sprintf("<DATA BLOCK: %d bytes>", len(cData))
msg = fmt.Sprintf("%s, check_in=%s", msg, cData)
} }
evt := &hook.Event{ evt := &hook.Event{
@ -153,7 +158,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Start agent // Start agent
// Non blocking call // Non blocking call
if err = hookAgent.Serve(); err != nil { if err = hookAgent.Serve(); err != nil {
return errors.Errorf("start hook agent error: %s", err) return errors.Errorf("start hook agent error: %s", err)
} }
@ -186,6 +190,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
terminated = true terminated = true
return return
case err = <-errChan: case err = <-errChan:
logger.Errorf("Received error from error chan: %s", err)
return return
} }
}(rootContext.ErrorChan) }(rootContext.ErrorChan)

View File

@ -54,6 +54,7 @@ type basicWorker struct {
context *env.Context context *env.Context
scheduler period.Scheduler scheduler period.Scheduler
ctl lcm.Controller ctl lcm.Controller
reaper *reaper
// key is name of known job // key is name of known job
// value is the type of known job // value is the type of known job
@ -92,6 +93,13 @@ func NewWorker(ctx *env.Context, namespace string, workerCount uint, redisPool *
ctl: ctl, ctl: ctl,
context: ctx, context: ctx,
knownJobs: new(sync.Map), knownJobs: new(sync.Map),
reaper: &reaper{
context: ctx.SystemContext,
namespace: namespace,
pool: redisPool,
lcmCtl: ctl,
jobTypes: make([]string, 0), // Append data later (at the start step)
} }
} }
@ -121,16 +129,7 @@ func (w *basicWorker) Start() error {
} }
// Start the periodic scheduler // Start the periodic scheduler
w.context.WG.Add(1) w.scheduler.Start()
go func() {
defer func() {
// Blocking call
if err := w.scheduler.Start(); err != nil {
w.context.ErrorChan <- err
// Listen to the system signal // Listen to the system signal
w.context.WG.Add(1) w.context.WG.Add(1)
@ -139,10 +138,8 @@ func (w *basicWorker) Start() error {
w.context.WG.Done() w.context.WG.Done()
logger.Infof("Basic worker is stopped") logger.Infof("Basic worker is stopped")
}() }()
<-w.context.SystemContext.Done() <-w.context.SystemContext.Done()
if err := w.scheduler.Stop(); err != nil {
logger.Errorf("stop scheduler error: %s", err)
w.pool.Stop() w.pool.Stop()
}() }()
@ -151,7 +148,15 @@ func (w *basicWorker) Start() error {
w.pool.Middleware((*workerContext).logJob) w.pool.Middleware((*workerContext).logJob)
// Non blocking call // Non blocking call
w.pool.Start() w.pool.Start()
logger.Infof("Redis worker is started") logger.Infof("Basic worker is started")
// Start the reaper
w.knownJobs.Range(func(k interface{}, v interface{}) bool {
w.reaper.jobTypes = append(w.reaper.jobTypes, k.(string))
return true
return nil return nil
} }

View File

@ -0,0 +1,363 @@
// Copyright Project Harbor Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package cworker
import (
const (
maxUpdateDuration = 24 * time.Hour
reapLoopInterval = 1 * time.Hour
initialReapLoopInterval = 5 * time.Minute
// reaper is designed to reap the outdated job stats and web hook
type reaper struct {
context context.Context
namespace string
pool *redis.Pool
lcmCtl lcm.Controller
jobTypes []string
// start the reap process
// Non blocking call
func (r *reaper) start() {
// Start the interval job stats sync loop
go func() {
defer logger.Info("Reaper is stopped")
tm := time.NewTimer(initialReapLoopInterval)
defer tm.Stop()
logger.Info("Reaper is started")
for {
select {
case <-tm.C:
if err := r.syncOutdatedStats(); err != nil {
// Just log
case <-r.context.Done():
return // Terminated
// Start re-enqueue in-progress jobs process.
// Only run once at the start point.
go func() {
// Wait for a short while and then start
<-time.After(5 * time.Second)
if err := r.reEnqueueInProgressJobs(); err != nil {
// reEnqueueInProgressJobs is an enhancement for reaper process of upstream project.
// Mainly fix the issue of failing to re-enqueue the jobs in the dead worker pool.
// This process only needs to be executed once when worker pool is starting.
func (r *reaper) reEnqueueInProgressJobs() error {
// Debug
logger.Debugf("Start: Reap in-progress jobs from the dead pools")
defer logger.Debugf("End: Reap in-progress jobs")
currentPools, err := r.getCurrentWorkerPools()
if err != nil {
return errors.Wrap(err, "re-enqueue in progress jobs")
h := func(k string, v int64) (err error) {
if v <= 0 {
// Do nothing
return nil
// If the worker pool is in the current pool list, ignore it as the default reap
// process will cover the re-enqueuing work.
if currentPools[k] {
// Do nothing
return nil
// Re-enqueue jobs
if err := r.requeueInProgressJobs(k, r.jobTypes); err != nil {
return errors.Wrap(err, "in-progress jobs reap handler")
return nil
for _, jt := range r.jobTypes {
lk := rds.KeyJobLockInfo(r.namespace, jt)
if err := r.scanLocks(lk, h); err != nil {
// Log error and continue
logger.Errorf("Re-enqueue in progress jobs error: %v", err)
return nil
// syncOutdatedStats ensures the job status is correctly updated and
// the related status change hook events are successfully fired.
func (r *reaper) syncOutdatedStats() error {
// Debug
logger.Debugf("Start: reap outdated job stats")
defer logger.Debugf("End: reap outdated job stats")
// Loop all the in progress jobs to check if they're really in progress or
// status is hung.
h := func(k string, v int64) (err error) {
defer func() {
if errs.IsObjectNotFoundError(err) {
// As the job stats is lost and we don't have chance to restore it, then directly discard it.
logger.Errorf("Sync outdated stats error: %s", err.Error())
// Un-track the in-progress record
err = r.unTrackInProgress(k)
if err != nil {
err = errors.Wrap(err, "sync outdated stats handler")
var t job.Tracker
t, err = r.lcmCtl.Track(k)
if err != nil {
// Compare and check if the status and the ACKed status are consistent
diff := compare(t.Job().Info)
if diff == 0 {
// Status and ACKed status are consistent
if job.Status(t.Job().Info.Status).Final() {
// Final status
// The inprogress track record is not valid as everything is done and consistent.
// It should not happen. However, if it is really happened, we can fix it here.
if err = r.unTrackInProgress(t.Job().Info.JobID); err != nil {
} else {
// Ongoing, check the update timestamp to make sure it is not hung
if time.Unix(t.Job().Info.UpdateTime, 0).Add(maxUpdateDuration).Before(time.Now()) {
// Status hung
// Mark job status to error state
if err = t.Fail(); err != nil {
// Exit
// Exit as it is still a valid ongoing job
} else if diff > 0 {
// The hook event of current job status is not ACKed
// Resend hook event by set the status again
if err = t.FireHook(); err != nil {
// Success and exit
} else {
// Current status is outdated, update it with ACKed status.
if err = t.UpdateStatusWithRetry(job.Status(t.Job().Info.HookAck.Status)); err != nil {
// Success and exit
return nil
if err := r.scanLocks(rds.KeyJobTrackInProgress(r.namespace), h); err != nil {
return errors.Wrap(err, "reaper")
return nil
// scanLocks gets the lock info from the specified key by redis scan way
func (r *reaper) scanLocks(key string, handler func(k string, v int64) error) error {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %v", err)
var cursor int64
for {
reply, err := redis.Values(conn.Do("HSCAN", key, cursor, "MATCH", "*", "COUNT", 100))
if err != nil {
return errors.Wrap(err, "scan locks")
if len(reply) != 2 {
return errors.New("expect 2 elements returned")
// Get next cursor
cursor, err = strconv.ParseInt(string(reply[0].([]uint8)), 10, 16)
if err != nil {
return errors.Wrap(err, "scan locks")
if values, ok := reply[1].([]interface{}); ok {
for i := 0; i < len(values); i += 2 {
k := string(values[i].([]uint8))
lc, err := strconv.ParseInt(string(values[i+1].([]uint8)), 10, 16)
if err != nil {
// Ignore and continue
logger.Errorf("Malformed lock object for %s: %v", k, err)
// Call handler to handle the data
if err := handler(k, lc); err != nil {
// Log and ignore the error
logger.Errorf("Failed to call reap handler: %v", err)
// Check if we have reached the end
if cursor == 0 {
return nil
// unTrackInProgress del the key in the progress track queue
func (r *reaper) unTrackInProgress(jobID string) error {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s: %v", "untrack in progress job", err)
_, err := conn.Do("HDEL", rds.KeyJobTrackInProgress(r.namespace), jobID)
if err != nil {
return errors.Wrap(err, "untrack in progress record")
return nil
// getCurrentWorkerPools returns the IDs of the current worker pools
func (r *reaper) getCurrentWorkerPools() (map[string]bool, error) {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s : %v", "get current worker pools", err)
// Get the current worker pools
workerPoolIDs, err := redis.Strings(conn.Do("SMEMBERS", rds.KeyWorkerPools(r.namespace)))
if err != nil {
return nil, errors.Wrap(err, "get current workpools")
m := make(map[string]bool)
for _, id := range workerPoolIDs {
m[id] = true
return m, nil
func (r *reaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
numKeys := len(jobTypes)
redisRequeueScript := rds.RedisLuaReenqueueScript(numKeys)
var scriptArgs = make([]interface{}, 0, numKeys+1)
for _, jobType := range jobTypes {
// pops from in progress, push into job queue and decrement the queue lock
scriptArgs = append(
rds.KeyInProgressQueue(r.namespace, jobType, poolID),
rds.KeyJobs(r.namespace, jobType),
rds.KeyJobLock(r.namespace, jobType),
rds.KeyJobLockInfo(r.namespace, jobType),
) // KEYS[1-4 * N]
scriptArgs = append(scriptArgs, poolID) // ARGV[1]
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s : %s", "re enqueue in-progress jobs", err)
// Keep moving jobs until all queues are empty
for {
values, err := redis.Values(redisRequeueScript.Do(conn, scriptArgs...))
if err == redis.ErrNil {
return nil
} else if err != nil {
return err
if len(values) != 3 {
return fmt.Errorf("need 3 elements back")
// compare the status and the status in the ack
// 0: status == ack.status
// >0: status > ack.status
// <0: status < ack.status
// compare based on:
// revision:status_code:check_in
func compare(j *job.StatsInfo) int {
// No ack existing
if j.HookAck == nil {
return 1
// Compare revision
rev := j.Revision - j.HookAck.Revision
if rev != 0 {
return (int)(rev)
// Revision is same, then compare the status
st := job.Status(j.Status).Compare(job.Status(j.HookAck.Status))
if st != 0 {
return st
// Revision and status are same, then compare the checkin
return (int)(j.CheckInAt - j.HookAck.CheckInAt)

View File

@ -0,0 +1,243 @@
// Copyright Project Harbor Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package cworker
import (
// ReaperTestSuite is used to test reaper
type ReaperTestSuite struct {
namespace string
pool *redis.Pool
r *reaper
ctl *mockLcmCtl
jid string
// TestReaper is the entry func of the ReaperTestSuite
func TestReaper(t *testing.T) {
suite.Run(t, &ReaperTestSuite{})
// SetupSuite prepares the test suite environment
func (suite *ReaperTestSuite) SetupSuite() {
suite.namespace = tests.GiveMeTestNamespace()
suite.pool = tests.GiveMeRedisPool()
ctx := context.TODO()
suite.ctl = &mockLcmCtl{}
suite.r = &reaper{
context: ctx,
namespace: suite.namespace,
pool: suite.pool,
jobTypes: []string{job.SampleJob},
lcmCtl: suite.ctl,
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
// Mock data in the redis DB
cwp := utils.MakeIdentifier()
wpk := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "worker_pools")
_ = conn.Send("SADD", wpk, cwp)
_ = conn.Send("HSET", fmt.Sprintf("%s:%s", wpk, cwp), "heartbeat_at", time.Now().Unix())
err := conn.Flush()
suite.NoError(err, "mock current pool error")
// Mock lock info of job DEMO
lk := rds.KeyJobLock(suite.namespace, job.SampleJob)
_, err = conn.Do("INCR", lk)
suite.NoError(err, "set lock data error")
wp := utils.MakeIdentifier()
lik := rds.KeyJobLockInfo(suite.namespace, job.SampleJob)
_, err = conn.Do("HINCRBY", lik, wp, 1)
suite.NoError(err, "set lock_info data error")
// Mock in-progress job
ipk := rds.KeyInProgressQueue(suite.namespace, job.SampleJob, wp)
j, err := mockJobData()
suite.NoError(err, "mock job")
_, err = conn.Do("LPUSH", ipk, j)
suite.NoError(err, "push mock job to queue")
// Mock job stats
suite.jid = utils.MakeIdentifier()
err = mockJobStats(conn, suite.namespace, suite.jid)
suite.NoError(err, "mock job stats")
// Mock in-progress job track
tk := rds.KeyJobTrackInProgress(suite.namespace)
_, err = conn.Do("HSET", tk, suite.jid, 1)
suite.NoError(err, "mock in-progress track")
// TearDownSuite clears down the test suite environment
func (suite *ReaperTestSuite) TearDownSuite() {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
_ = tests.ClearAll(suite.namespace, conn)
func (suite *ReaperTestSuite) TestRequeueInProgressJobs() {
err := suite.r.reEnqueueInProgressJobs()
suite.NoError(err, "requeue in-progress jobs")
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
v, err := redis.Int(conn.Do("GET", rds.KeyJobLock(suite.namespace, job.SampleJob)))
suite.NoError(err, "get job lock info")
suite.Equal(0, v, "lock should be 0")
func (suite *ReaperTestSuite) TestSyncOutdatedStats() {
// Use real track to test
mt := job.NewBasicTrackerWithID(
func(hookURL string, change *job.StatusChange) error {
return nil
err := mt.Load()
suite.NoError(err, "track job stats")
suite.ctl.On("Track", suite.jid).Return(mt, nil)
err = suite.r.syncOutdatedStats()
suite.NoError(err, "sync outdated stats")
// Check result
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
status, err := redis.String(conn.Do("HGET", rds.KeyJobStats(suite.namespace, suite.jid), "status"))
suite.NoError(err, "get status")
suite.Equal(job.SuccessStatus.String(), status, "check status")
func mockJobData() (string, error) {
j := make(map[string]interface{})
j["name"] = job.SampleJob
j["id"] = utils.MakeIdentifier()
j["t"] = time.Now().Unix()
args := make(map[string]interface{})
j["args"] = args
args["image"] = "test suite"
b, err := json.Marshal(&j)
if err != nil {
return "", nil
return string(b), nil
func mockJobStats(conn redis.Conn, ns string, jid string) error {
rev := time.Now().Unix()
sk := rds.KeyJobStats(ns, jid)
ack := &job.ACK{
Revision: rev,
Status: job.SuccessStatus.String(),
b, err := json.Marshal(ack)
if err != nil {
return err
args := []interface{}{
"id", jid,
"status", job.RunningStatus.String(),
"name", job.SampleJob,
"kind", job.KindGeneric,
"unique", 0,
"ref_link", fmt.Sprintf("/api/v1/jobs/%s", jid),
"enqueue_time", time.Now().Unix(),
"update_time", time.Now().Unix(),
"revision", rev,
"ack", string(b),
_, err = conn.Do("HMSET", args...)
return err
type mockLcmCtl struct {
func (m *mockLcmCtl) Serve() error {
return nil
// New tracker from the new provided stats
func (m *mockLcmCtl) New(stats *job.Stats) (job.Tracker, error) {
args := m.Called(stats)
if args.Get(0) != nil {
return args.Get(0).(job.Tracker), nil
return nil, args.Error(1)
// Track the life cycle of the specified existing job
func (m *mockLcmCtl) Track(jobID string) (job.Tracker, error) {
args := m.Called(jobID)
if args.Get(0) != nil {
return args.Get(0).(job.Tracker), nil
return nil, args.Error(1)