Merge pull request #10429 from steven-zou/fix/job_hung_issue

fix[jobservice]:job status is hung after restart
This commit is contained in:
Steven Zou 2020-01-20 16:46:22 +08:00 committed by GitHub
commit e8a617e0bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1694 additions and 1033 deletions

View File

@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
"container/list"
"sync"
)
// SyncList is a sync list based on the container/list
type SyncList struct {
// For synchronization
lock *sync.RWMutex
// Use interface slice as the backend data struct
l *list.List
}
// New a sync list
func New() *SyncList {
return &SyncList{
lock: &sync.RWMutex{},
l: list.New(),
}
}
// Iterate the list
func (l *SyncList) Iterate(f func(ele interface{}) bool) {
l.lock.RLock()
defer l.lock.RUnlock()
// Get the front pointer
for e := l.l.Front(); e != nil; {
// Keep the next one
next := e.Next()
if f(e.Value) {
l.l.Remove(e)
}
e = next
}
}
// Push the element to the back of the list
func (l *SyncList) Push(ele interface{}) {
if ele != nil {
l.lock.Lock()
defer l.lock.Unlock()
l.l.PushBack(ele)
}
}

View File

@ -0,0 +1,57 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package list
import (
"strings"
"testing"
"github.com/stretchr/testify/suite"
)
type ListSuite struct {
suite.Suite
l *SyncList
}
func TestListSuite(t *testing.T) {
suite.Run(t, &ListSuite{})
}
func (suite *ListSuite) SetupSuite() {
suite.l = New()
suite.l.Push("a0")
suite.l.Push("a1")
suite.l.Push("b0")
suite.l.Push("a2")
suite.Equal(4, suite.l.l.Len())
}
func (suite *ListSuite) TestIterate() {
suite.l.Iterate(func(ele interface{}) bool {
if s, ok := ele.(string); ok {
if strings.HasPrefix(s, "b") {
return true
}
}
return false
})
suite.Equal(3, suite.l.l.Len())
}

View File

@ -86,3 +86,33 @@ func KeyHookEventRetryQueue(namespace string) string {
func KeyStatusUpdateRetryQueue(namespace string) string { func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events") return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")
} }
// KeyJobTrackInProgress returns the key of in progress jobs tracking queue
func KeyJobTrackInProgress(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "job_track:inprogress")
}
// KeyJobs returns the key of the specified job queue
func KeyJobs(namespace, jobType string) string {
return fmt.Sprintf("%sjobs:%s", KeyNamespacePrefix(namespace), jobType)
}
// KeyJobLock returns the key of lock for the specified job type.
func KeyJobLock(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock", KeyJobs(namespace, jobType))
}
// KeyJobLockInfo returns the key of lock_info for the specified job type.
func KeyJobLockInfo(namespace string, jobType string) string {
return fmt.Sprintf("%s:lock_info", KeyJobs(namespace, jobType))
}
// KeyInProgressQueue returns the key of the in progress queue for the specified job type.
func KeyInProgressQueue(namespace string, jobType string, workerPoolID string) string {
return fmt.Sprintf("%s:%s:inprogress", KeyJobs(namespace, jobType), workerPoolID)
}
// KeyWorkerPools returns the key of the worker pool
func KeyWorkerPools(namespace string) string {
return KeyNamespacePrefix(namespace) + "worker_pools"
}

View File

@ -0,0 +1,273 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rds
import (
"fmt"
"github.com/gomodule/redigo/redis"
)
const (
requeueKeysPerJob = 4
)
// luaFuncStCodeText is common lua script function
var luaFuncStCodeText = `
-- for easily compare status
local stMap = { ['Pending'] = 0, ['Scheduled'] = 1, ['Running'] = 2, ['Success'] = 3, ['Stopped'] = 3, ['Error'] = 3 }
local function stCode(status)
return stMap[status]
end
`
// luaFuncCompareText is common lua script function
var luaFuncCompareText = `
local function compare(status, revision, checkInT)
local sCode = stCode(status)
local aCode = stCode(ARGV[1])
local aRev = tonumber(ARGV[2])
local aCheckInT = tonumber(ARGV[3])
if revision < aRev or
( revision == aRev and sCode < aCode ) or
( revision == aRev and sCode == aCode and (not checkInT or checkInT < aCheckInT))
then
return 'ok'
end
return 'no'
end
`
// Script used to set the status of the job
//
// KEY[1]: key of job stats
// KEY[2]: key of inprogress track
// ARGV[1]: status text
// ARGV[2]: stats revision
// ARGV[3]: update timestamp
// ARGV[4]: job ID
var setStatusScriptText = fmt.Sprintf(`
%s
local res, st, rev, aCode, aRev
res = redis.call('hmget', KEYS[1], 'status', 'revision')
if res then
st = res[1]
aCode = stCode(ARGV[1])
rev = tonumber(res[2])
aRev = tonumber(ARGV[2])
-- set same status repeatedly is allowed
if rev < aRev or ( rev == aRev and (stCode(st) < aCode or st == ARGV[1])) then
redis.call('hmset', KEYS[1], 'status', ARGV[1], 'update_time', ARGV[3])
-- update inprogress track if necessary
if aCode == 3 then
-- final status
local c = redis.call('hincrby', KEYS[2], ARGV[4], -1)
-- lock count is 0, del it
if c <= 0 then
redis.call('hdel', KEYS[2], ARGV[4])
end
if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then
-- expire the job stats with shorter interval
redis.call('expire', KEYS[1], 86400)
end
end
return 'ok'
end
end
return st
`, luaFuncStCodeText)
// SetStatusScript is lua script for setting job status atomically
var SetStatusScript = redis.NewScript(2, setStatusScriptText)
// Used to check if the status info provided is still validate
//
// KEY[1]: key of job stats
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
var isStatusMatchScriptText = fmt.Sprintf(`
%s
%s
local res, st, rev, checkInAt, ack
res = redis.call('hmget', KEYS[1], 'status', 'revision', 'check_in_at', 'ack')
if res then
st = res[1]
rev = tonumber(res[2])
checkInAt = tonumber(res[3])
ack = res[4]
local reply = compare(st, rev, checkInAt)
if reply == 'ok' then
if not ack then
return 'ok'
end
-- ack exists, compare with ack
local a = cjson.decode(ack)
st = a['status']
rev = a['revision']
checkInAt = a['check_in_at']
local reply2 = compare(st, rev, checkInAt)
if reply2 == 'ok' then
return 'ok'
end
end
end
return 'no'
`, luaFuncStCodeText, luaFuncCompareText)
// CheckStatusMatchScript is lua script for checking if the provided status is still matching
// the backend status.
var CheckStatusMatchScript = redis.NewScript(1, isStatusMatchScriptText)
// Used to set the hook ACK
//
// KEY[1]: key of job stats
// KEY[2]: key of inprogress track
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
// ARGV[4]: job ID
var hookAckScriptText = fmt.Sprintf(`
%s
%s
local function canSetAck(jk, nrev)
local res = redis.call('hmget', jk, 'revision', 'ack')
if res then
local rev = tonumber(res[1])
local ackv = res[2]
if ackv then
-- ack existing
local ack = cjson.decode(ackv)
local cmp = compare(ack['status'], ack['revision'], ack['check_in_at'])
if cmp == 'ok' then
return 'ok'
end
else
-- no ack yet
if rev <= nrev then
return 'ok'
end
end
end
return nil
end
if canSetAck(KEYS[1], tonumber(ARGV[2])) ~= 'ok' then
return 'none'
end
-- can-set-ack check is ok
-- set new ack
local newAck = {['status'] = ARGV[1], ['revision'] = tonumber(ARGV[2]), ['check_in_at'] = tonumber(ARGV[3])}
local ackJson = cjson.encode(newAck)
redis.call('hset', KEYS[1], 'ack', ackJson)
-- update the inprogress track
if stCode(ARGV[1]) == 3 then
-- final status
local c = redis.call('hincrby', KEYS[2], ARGV[4], -1)
-- lock count is 0, del it
if c <= 0 then
redis.call('hdel', KEYS[2], ARGV[4])
end
end
return 'ok'
`, luaFuncStCodeText, luaFuncCompareText)
// HookAckScript is defined to set the hook event ACK in the job stats map
var HookAckScript = redis.NewScript(2, hookAckScriptText)
// Used to reset job status
//
// KEYS[1]: key of job stats
// KEYS[2]: key of inprogress job track
// ARGV[1]: job ID
// ARGV[2]: start status
// ARGV[3]: revision of job stats
var statusResetScriptText = `
local now = tonumber(ARGV[3])
-- reset status and revision
redis.call('hmset', KEYS[1], 'status', ARGV[2], 'revision', now, 'update_time', now)
redis.call('hdel', KEYS[1], 'ack', 'check_in', 'check_in_at')
-- reset inprogress track
redis.call('hset', KEYS[2], ARGV[1], 2)
`
// StatusResetScript is lua script to reset the job stats
var StatusResetScript = redis.NewScript(2, statusResetScriptText)
// Copy from upstream worker framework
// Used by the reaper to re-enqueue jobs that were in progress
//
// KEYS[1] = the 1st job's in progress queue
// KEYS[2] = the 1st job's job queue
// KEYS[3] = the 2nd job's in progress queue
// KEYS[4] = the 2nd job's job queue
// ...
// KEYS[N] = the last job's in progress queue
// KEYS[N+1] = the last job's job queue
// ARGV[1] = workerPoolID for job queue
var redisLuaReenqueueJob = fmt.Sprintf(`
local function releaseLock(lockKey, lockInfoKey, workerPoolID)
redis.call('decr', lockKey)
redis.call('hincrby', lockInfoKey, workerPoolID, -1)
end
local keylen = #KEYS
local res, jobQueue, inProgQueue, workerPoolID, lockKey, lockInfoKey
workerPoolID = ARGV[1]
for i=1,keylen,%d do
inProgQueue = KEYS[i]
jobQueue = KEYS[i+1]
lockKey = KEYS[i+2]
lockInfoKey = KEYS[i+3]
res = redis.call('rpoplpush', inProgQueue, jobQueue)
if res then
releaseLock(lockKey, lockInfoKey, workerPoolID)
return {res, inProgQueue, jobQueue}
end
end
return nil`, requeueKeysPerJob)
// RedisLuaReenqueueScript returns redis script of redisLuaReenqueueJob
func RedisLuaReenqueueScript(jobTypesCount int) *redis.Script {
return redis.NewScript(jobTypesCount*requeueKeysPerJob, redisLuaReenqueueJob)
}

View File

@ -2,10 +2,11 @@ package rds
import ( import (
"fmt" "fmt"
"github.com/garyburd/redigo/redis"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/pkg/errors"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
) )
// ErrNoElements is a pre defined error to describe the case that no elements got // ErrNoElements is a pre defined error to describe the case that no elements got

View File

@ -17,31 +17,22 @@ package hook
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"math/rand"
"net/url" "net/url"
"time"
"github.com/pkg/errors"
"github.com/goharbor/harbor/src/jobservice/job"
"sync" "sync"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
) )
const ( const (
// Influenced by the worker number setting
maxEventChanBuffer = 1024
// Max concurrent client handlers
maxHandlers = 5
// The max time for expiring the retrying events // The max time for expiring the retrying events
// 180 days // 1 day
maxEventExpireTime = 3600 * 24 * 180 maxEventExpireTime = 3600 * 24
// Waiting a short while if any errors occurred // Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found // Waiting for long while if no retrying elements found
@ -52,10 +43,9 @@ const (
type Agent interface { type Agent interface {
// Trigger hooks // Trigger hooks
Trigger(evt *Event) error Trigger(evt *Event) error
// Serves events now
// Serves retry loop now
Serve() error Serve() error
// Attach a job life cycle controller
Attach(ctl lcm.Controller)
} }
// Event contains the hook URL and the data // Event contains the hook URL and the data
@ -95,47 +85,53 @@ type basicAgent struct {
context context.Context context context.Context
namespace string namespace string
client Client client Client
ctl lcm.Controller
events chan *Event
tokens chan bool
redisPool *redis.Pool redisPool *redis.Pool
wg *sync.WaitGroup wg *sync.WaitGroup
} }
// NewAgent is constructor of basic agent // NewAgent is constructor of basic agent
func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent { func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent {
tks := make(chan bool, maxHandlers)
// Put tokens
for i := 0; i < maxHandlers; i++ {
tks <- true
}
return &basicAgent{ return &basicAgent{
context: ctx.SystemContext, context: ctx.SystemContext,
namespace: ns, namespace: ns,
client: NewClient(ctx.SystemContext), client: NewClient(ctx.SystemContext),
events: make(chan *Event, maxEventChanBuffer),
tokens: tks,
redisPool: redisPool, redisPool: redisPool,
wg: ctx.WG, wg: ctx.WG,
} }
} }
// Attach a job life cycle controller
func (ba *basicAgent) Attach(ctl lcm.Controller) {
ba.ctl = ctl
}
// Trigger implements the same method of interface @Agent // Trigger implements the same method of interface @Agent
func (ba *basicAgent) Trigger(evt *Event) error { func (ba *basicAgent) Trigger(evt *Event) error {
if evt == nil { if evt == nil {
return errors.New("nil event") return errors.New("nil web hook event")
} }
if err := evt.Validate(); err != nil { if err := evt.Validate(); err != nil {
return err return errors.Wrap(err, "trigger error")
} }
ba.events <- evt // Treat hook event is success if it is successfully sent or cached in the retry queue.
if err := ba.client.SendEvent(evt); err != nil {
// Push event to the retry queue
if er := ba.pushForRetry(evt); er != nil {
// Failed to push to the hook event retry queue, return error with all context
return errors.Wrap(er, err.Error())
}
logger.Warningf("Send hook event '%s' to '%s' failed with error: %s; push hook event to the queue for retrying later", evt.Message, evt.URL, err)
// Treat as successful hook event as the event has been put into the retry queue for future resending.
return nil
}
// Mark event hook ACK including "revision", "status" and "check_in_at" in the job stats to indicate
// the related hook event has been successfully fired.
// The ACK can be used by the reaper to justify if the hook event should be resent again.
// The failure of persisting this ACK may cause duplicated hook event resending issue, which
// can be ignored.
if err := ba.ack(evt); err != nil {
// Just log error
logger.Error(errors.Wrap(err, "trigger"))
}
return nil return nil
} }
@ -144,64 +140,14 @@ func (ba *basicAgent) Trigger(evt *Event) error {
// Termination depends on the system context // Termination depends on the system context
// Blocking call // Blocking call
func (ba *basicAgent) Serve() error { func (ba *basicAgent) Serve() error {
if ba.ctl == nil {
return errors.New("nil life cycle controller of hook agent")
}
ba.wg.Add(1) ba.wg.Add(1)
go ba.loopRetry() go ba.loopRetry()
logger.Info("Hook event retrying loop is started") logger.Info("Hook event retrying loop is started")
ba.wg.Add(1)
go ba.serve()
logger.Info("Basic hook agent is started")
return nil return nil
} }
func (ba *basicAgent) serve() {
defer func() {
logger.Info("Basic hook agent is stopped")
ba.wg.Done()
}()
for {
select {
case evt := <-ba.events:
// if exceed, wait here
// avoid too many request connections at the same time
<-ba.tokens
go func(evt *Event) {
defer func() {
ba.tokens <- true // return token
}()
if err := ba.client.SendEvent(evt); err != nil {
logger.Errorf("Send hook event '%s' to '%s' failed with error: %s; push to the queue for retrying later", evt.Message, evt.URL, err)
// Push event to the retry queue
if err := ba.pushForRetry(evt); err != nil {
// Failed to push to the retry queue, let's directly push it
// to the event channel of this node with reasonable backoff time.
logger.Errorf("Failed to push hook event to the retry queue: %s", err)
// Put to the event chan after waiting for a reasonable while,
// waiting is important, it can avoid sending large scale failure expecting
// requests in a short while.
// As 'pushForRetry' has checked the timestamp and expired event
// will be directly discarded and nil error is returned, no need to
// check it again here.
<-time.After(time.Duration(rand.Int31n(55)+5) * time.Second)
ba.events <- evt
}
}
}(evt)
case <-ba.context.Done():
return
}
}
}
func (ba *basicAgent) pushForRetry(evt *Event) error { func (ba *basicAgent) pushForRetry(evt *Event) error {
if evt == nil { if evt == nil {
// do nothing // do nothing
@ -248,11 +194,7 @@ func (ba *basicAgent) loopRetry() {
ba.wg.Done() ba.wg.Done()
}() }()
token := make(chan bool, 1)
token <- true
for { for {
<-token
if err := ba.reSend(); err != nil { if err := ba.reSend(); err != nil {
waitInterval := shortLoopInterval waitInterval := shortLoopInterval
if err == rds.ErrNoElements { if err == rds.ErrNoElements {
@ -270,44 +212,47 @@ func (ba *basicAgent) loopRetry() {
return return
} }
} }
// Put token back
token <- true
} }
} }
func (ba *basicAgent) reSend() error { func (ba *basicAgent) reSend() error {
evt, err := ba.popMinOne()
if err != nil {
return err
}
jobID, status, err := extractJobID(evt.Data)
if err != nil {
return err
}
t, err := ba.ctl.Track(jobID)
if err != nil {
return err
}
diff := status.Compare(job.Status(t.Job().Info.Status))
if diff > 0 ||
(diff == 0 && t.Job().Info.CheckIn != evt.Data.CheckIn) {
ba.events <- evt
return nil
}
return errors.Errorf("outdated hook event: %s, latest job status: %s", evt.Message, t.Job().Info.Status)
}
func (ba *basicAgent) popMinOne() (*Event, error) {
conn := ba.redisPool.Get() conn := ba.redisPool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
logger.Error(errors.Wrap(err, "resend"))
}
}() }()
// Pick up one queued event for resending
evt, err := ba.popMinOne(conn)
if err != nil {
return err
}
// Args for executing script
args := []interface{}{
rds.KeyJobStats(ba.namespace, evt.Data.JobID),
evt.Data.Status,
evt.Data.Metadata.Revision,
evt.Data.Metadata.CheckInAt,
}
// If failed to check the status matching, just ignore it, continue the resending
reply, err := redis.String(rds.CheckStatusMatchScript.Do(conn, args...))
if err != nil {
// Log error
logger.Error(errors.Wrap(err, "resend"))
} else {
if reply != "ok" {
return errors.Errorf("outdated hook event: %s", evt.Message)
}
}
return ba.Trigger(evt)
}
// popMinOne picks up one event for retrying
func (ba *basicAgent) popMinOne(conn redis.Conn) (*Event, error) {
key := rds.KeyHookEventRetryQueue(ba.namespace) key := rds.KeyHookEventRetryQueue(ba.namespace)
minOne, err := rds.ZPopMin(conn, key) minOne, err := rds.ZPopMin(conn, key)
if err != nil { if err != nil {
@ -327,17 +272,33 @@ func (ba *basicAgent) popMinOne() (*Event, error) {
return evt, nil return evt, nil
} }
// Extract the job ID and status from the event data field // ack hook event
// First return is job ID func (ba *basicAgent) ack(evt *Event) error {
// Second return is job status conn := ba.redisPool.Get()
// Last one is error defer func() {
func extractJobID(data *job.StatusChange) (string, job.Status, error) { if err := conn.Close(); err != nil {
if data != nil && len(data.JobID) > 0 { logger.Error(errors.Wrap(err, "ack"))
status := job.Status(data.Status)
if status.Validate() == nil {
return data.JobID, status, nil
} }
}()
k := rds.KeyJobStats(ba.namespace, evt.Data.JobID)
k2 := rds.KeyJobTrackInProgress(ba.namespace)
reply, err := redis.String(rds.HookAckScript.Do(
conn,
k,
k2,
evt.Data.Status,
evt.Data.Metadata.Revision,
evt.Data.Metadata.CheckInAt,
evt.Data.JobID,
))
if err != nil {
return errors.Wrap(err, "ack")
} }
return "", "", errors.New("malform job status change data") if reply != "ok" {
return errors.Errorf("no ack done for event: %s", evt.Message)
}
return nil
} }

View File

@ -16,35 +16,34 @@ package hook
import ( import (
"context" "context"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing" "testing"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/pkg/errors"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"sync"
) )
// HookAgentTestSuite tests functions of hook agent // HookAgentTestSuite tests functions of hook agent
type HookAgentTestSuite struct { type HookAgentTestSuite struct {
suite.Suite suite.Suite
pool *redis.Pool
namespace string namespace string
lcmCtl lcm.Controller pool *redis.Pool
agent *basicAgent
envContext *env.Context event *Event
cancel context.CancelFunc jid string
} }
// TestHookAgentTestSuite is entry of go test // TestHookAgentTestSuite is entry of go test
@ -57,14 +56,11 @@ func (suite *HookAgentTestSuite) SetupSuite() {
suite.pool = tests.GiveMeRedisPool() suite.pool = tests.GiveMeRedisPool()
suite.namespace = tests.GiveMeTestNamespace() suite.namespace = tests.GiveMeTestNamespace()
ctx, cancel := context.WithCancel(context.Background()) suite.agent = &basicAgent{
suite.envContext = &env.Context{ context: context.TODO(),
SystemContext: ctx, namespace: suite.namespace,
WG: new(sync.WaitGroup), redisPool: suite.pool,
} }
suite.cancel = cancel
suite.lcmCtl = lcm.NewController(suite.envContext, suite.namespace, suite.pool, func(hookURL string, change *job.StatusChange) error { return nil })
} }
// TearDownSuite prepares test suites // TearDownSuite prepares test suites
@ -77,126 +73,121 @@ func (suite *HookAgentTestSuite) TearDownSuite() {
_ = tests.ClearAll(suite.namespace, conn) _ = tests.ClearAll(suite.namespace, conn)
} }
func (suite *HookAgentTestSuite) SetupTest() {
suite.jid = utils.MakeIdentifier()
rev := time.Now().Unix()
stats := &job.Stats{
Info: &job.StatsInfo{
JobID: suite.jid,
Status: job.RunningStatus.String(),
Revision: rev,
JobKind: job.KindGeneric,
JobName: job.SampleJob,
},
}
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New())
err := t.Save()
suite.NoError(err, "mock job stats")
suite.event = &Event{
URL: "http://domian.com",
Message: "HookAgentTestSuite",
Timestamp: time.Now().Unix(),
Data: &job.StatusChange{
JobID: suite.jid,
Status: job.SuccessStatus.String(),
Metadata: &job.StatsInfo{
JobID: suite.jid,
Status: job.SuccessStatus.String(),
Revision: rev,
JobKind: job.KindGeneric,
JobName: job.SampleJob,
},
},
}
}
func (suite *HookAgentTestSuite) TearDownTest() {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection")
}()
k := rds.KeyHookEventRetryQueue(suite.namespace)
_, err := conn.Do("DEL", k)
suite.NoError(err, "tear down test cases")
}
// TestEventSending ... // TestEventSending ...
func (suite *HookAgentTestSuite) TestEventSending() { func (suite *HookAgentTestSuite) TestEventSending() {
done := make(chan bool, 1) mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(nil)
suite.agent.client = mc
expected := uint32(1300) // >1024 max err := suite.agent.Trigger(suite.event)
count := uint32(0)
counter := &count
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
c := atomic.AddUint32(counter, 1)
if c == expected {
done <- true
}
}()
_, _ = fmt.Fprintln(w, "ok")
}))
defer ts.Close()
// in case test failed and avoid dead lock
go func() {
<-time.After(time.Duration(10) * time.Second)
done <- true // time out
}()
agent := NewAgent(suite.envContext, suite.namespace, suite.pool)
agent.Attach(suite.lcmCtl)
err := agent.Serve()
require.NoError(suite.T(), err, "agent serve: nil error expected but got %s", err)
go func() {
defer func() {
suite.cancel()
}()
for i := uint32(0); i < expected; i++ {
changeData := &job.StatusChange{
JobID: fmt.Sprintf("job-%d", i),
Status: "running",
}
evt := &Event{
URL: ts.URL,
Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status),
Data: changeData,
Timestamp: time.Now().Unix(),
}
err := agent.Trigger(evt)
require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err) require.Nil(suite.T(), err, "agent trigger: nil error expected but got %s", err)
}
// Check results // check
<-done suite.checkStatus()
require.Equal(suite.T(), expected, count, "expected %d hook events but only got %d", expected, count) }
}()
// Wait // TestEventSending ...
suite.envContext.WG.Wait() func (suite *HookAgentTestSuite) TestEventSendingError() {
mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(errors.New("internal server error: for testing"))
suite.agent.client = mc
err := suite.agent.Trigger(suite.event)
// Failed to send by client, it should be put into retry queue, check it
// The return should still be nil
suite.NoError(err, "agent trigger: nil error expected but got %s", err)
suite.checkRetryQueue(1)
} }
// TestRetryAndPopMin ... // TestRetryAndPopMin ...
func (suite *HookAgentTestSuite) TestRetryAndPopMin() { func (suite *HookAgentTestSuite) TestRetryAndPopMin() {
ctx := context.Background() mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(nil)
suite.agent.client = mc
tks := make(chan bool, maxHandlers) err := suite.agent.pushForRetry(suite.event)
// Put tokens suite.NoError(err, "push event for retry")
for i := 0; i < maxHandlers; i++ {
tks <- true
}
agent := &basicAgent{ err = suite.agent.reSend()
context: ctx, require.NoError(suite.T(), err, "resend error: %v", err)
namespace: suite.namespace,
client: NewClient(ctx),
events: make(chan *Event, maxEventChanBuffer),
tokens: tks,
redisPool: suite.pool,
}
agent.Attach(suite.lcmCtl)
changeData := &job.StatusChange{ // Check
JobID: "fake_job_ID", suite.checkRetryQueue(0)
Status: job.RunningStatus.String(), suite.checkStatus()
} }
evt := &Event{ func (suite *HookAgentTestSuite) checkStatus() {
URL: "https://fake.js.com", t := job.NewBasicTrackerWithID(context.TODO(), suite.jid, suite.namespace, suite.pool, nil, list.New())
Message: fmt.Sprintf("status of job %s change to %s", changeData.JobID, changeData.Status), err := t.Load()
Data: changeData, suite.NoError(err, "load updated job stats")
Timestamp: time.Now().Unix(), suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status")
} }
// Mock job stats func (suite *HookAgentTestSuite) checkRetryQueue(size int) {
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() err := conn.Close()
suite.NoError(err, "close redis connection")
}() }()
key := rds.KeyJobStats(suite.namespace, "fake_job_ID") k := rds.KeyHookEventRetryQueue(suite.namespace)
_, err := conn.Do("HSET", key, "status", job.SuccessStatus.String()) c, err := redis.Int(conn.Do("ZCARD", k))
require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err) suite.NoError(err, "check retry queue")
suite.Equal(size, c, "retry queue count")
err = agent.pushForRetry(evt) }
require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err)
type mockClient struct {
err = agent.reSend() mock.Mock
require.Error(suite.T(), err, "resend: non nil error expected but got nil") }
assert.Equal(suite.T(), 0, len(agent.events), "the hook event should be discard but actually not")
func (mc *mockClient) SendEvent(evt *Event) error {
// Change status args := mc.Called(evt)
_, err = conn.Do("HSET", key, "status", job.PendingStatus.String()) return args.Error(0)
require.Nil(suite.T(), err, "prepare job stats: nil error returned but got %s", err)
err = agent.pushForRetry(evt)
require.Nil(suite.T(), err, "push for retry: nil error expected but got %s", err)
err = agent.reSend()
require.Nil(suite.T(), err, "resend: nil error should be returned but got %s", err)
<-time.After(time.Duration(1) * time.Second)
assert.Equal(suite.T(), 1, len(agent.events), "the hook event should be requeued but actually not: %d", len(agent.events))
} }

View File

@ -17,14 +17,15 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
) )
// HookClientTestSuite tests functions of hook client // HookClientTestSuite tests functions of hook client
@ -57,7 +58,7 @@ func (suite *HookClientTestSuite) SetupSuite() {
return return
} }
fmt.Fprintln(w, "ok") _, _ = fmt.Fprintln(w, "ok")
})) }))
} }

View File

@ -19,6 +19,8 @@ import (
"os" "os"
"testing" "testing"
"github.com/goharbor/harbor/src/jobservice/common/list"
comcfg "github.com/goharbor/harbor/src/common/config" comcfg "github.com/goharbor/harbor/src/common/config"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/config"
@ -87,6 +89,7 @@ func (suite *ContextImplTestSuite) SetupSuite() {
suite.namespace, suite.namespace,
suite.pool, suite.pool,
nil, nil,
list.New(),
) )
err := suite.tracker.Save() err := suite.tracker.Save()

View File

@ -80,8 +80,8 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
ctx.Checkin("progress data: %60") ctx.Checkin("progress data: %60")
// HOLD ON FOR A WHILE // HOLD ON FOR A WHILE
logger.Warning("Holding for 10 seconds") logger.Warning("Holding for 30 seconds")
<-time.After(10 * time.Second) <-time.After(30 * time.Second)
if cmd, ok := ctx.OPCommand(); ok { if cmd, ok := ctx.OPCommand(); ok {
if cmd == job.StopCommand { if cmd == job.StopCommand {

View File

@ -68,6 +68,14 @@ type StatsInfo struct {
NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
Parameters Parameters `json:"parameters,omitempty"` Parameters Parameters `json:"parameters,omitempty"`
Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job
HookAck *ACK `json:"ack,omitempty"`
}
// ACK is the acknowledge of hook event
type ACK struct {
Status string `json:"status"`
Revision int64 `json:"revision"`
CheckInAt int64 `json:"check_in_at"`
} }
// ActionRequest defines for triggering job action like stop/cancel. // ActionRequest defines for triggering job action like stop/cancel.
@ -87,6 +95,7 @@ type StatusChange struct {
type SimpleStatusChange struct { type SimpleStatusChange struct {
JobID string `json:"job_id"` JobID string `json:"job_id"`
TargetStatus string `json:"target_status"` TargetStatus string `json:"target_status"`
Revision int64 `json:"revision"`
} }
// Validate the job stats // Validate the job stats

View File

@ -21,6 +21,8 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
@ -32,8 +34,6 @@ import (
const ( const (
// Try best to keep the job stats data but anyway clear it after a reasonable time // Try best to keep the job stats data but anyway clear it after a reasonable time
statDataExpireTime = 7 * 24 * 3600 statDataExpireTime = 7 * 24 * 3600
// 1 hour to discard the job stats of success jobs
statDataExpireTimeForSuccess = 3600
) )
// Tracker is designed to track the life cycle of the job described by the stats // Tracker is designed to track the life cycle of the job described by the stats
@ -102,6 +102,9 @@ type Tracker interface {
// Reset the status to `pending` // Reset the status to `pending`
Reset() error Reset() error
// Fire status hook to report the current status
FireHook() error
} }
// basicTracker implements Tracker interface based on redis // basicTracker implements Tracker interface based on redis
@ -112,6 +115,7 @@ type basicTracker struct {
jobID string jobID string
jobStats *Stats jobStats *Stats
callback HookCallback callback HookCallback
retryList *list.SyncList
} }
// NewBasicTrackerWithID builds a tracker with the provided job ID // NewBasicTrackerWithID builds a tracker with the provided job ID
@ -121,6 +125,7 @@ func NewBasicTrackerWithID(
ns string, ns string,
pool *redis.Pool, pool *redis.Pool,
callback HookCallback, callback HookCallback,
retryList *list.SyncList,
) Tracker { ) Tracker {
return &basicTracker{ return &basicTracker{
namespace: ns, namespace: ns,
@ -128,6 +133,7 @@ func NewBasicTrackerWithID(
pool: pool, pool: pool,
jobID: jobID, jobID: jobID,
callback: callback, callback: callback,
retryList: retryList,
} }
} }
@ -138,6 +144,7 @@ func NewBasicTrackerWithStats(
ns string, ns string,
pool *redis.Pool, pool *redis.Pool,
callback HookCallback, callback HookCallback,
retryList *list.SyncList,
) Tracker { ) Tracker {
return &basicTracker{ return &basicTracker{
namespace: ns, namespace: ns,
@ -146,6 +153,7 @@ func NewBasicTrackerWithStats(
jobStats: stats, jobStats: stats,
jobID: stats.Info.JobID, jobID: stats.Info.JobID,
callback: callback, callback: callback,
retryList: retryList,
} }
} }
@ -245,67 +253,44 @@ func (bt *basicTracker) Expire() error {
// Run job // Run job
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Run() error { func (bt *basicTracker) Run() error {
err := bt.compareAndSet(RunningStatus) if err := bt.setStatus(RunningStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "run")
bt.refresh(RunningStatus)
if er := bt.fireHookEvent(RunningStatus); err == nil && er != nil {
return er
}
} }
return err return nil
} }
// Stop job // Stop job
// Stop is final status, if failed to do, retry should be enforced. // Stop is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Stop() error { func (bt *basicTracker) Stop() error {
err := bt.UpdateStatusWithRetry(StoppedStatus) if err := bt.setStatus(StoppedStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "stop")
bt.refresh(StoppedStatus)
if er := bt.fireHookEvent(StoppedStatus); err == nil && er != nil {
return er
}
} }
return err return nil
} }
// Fail job // Fail job
// Fail is final status, if failed to do, retry should be enforced. // Fail is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Fail() error { func (bt *basicTracker) Fail() error {
err := bt.UpdateStatusWithRetry(ErrorStatus) if err := bt.setStatus(ErrorStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "fail")
bt.refresh(ErrorStatus)
if er := bt.fireHookEvent(ErrorStatus); err == nil && er != nil {
return er
}
} }
return err return nil
} }
// Succeed job // Succeed job
// Succeed is final status, if failed to do, retry should be enforced. // Succeed is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Succeed() error { func (bt *basicTracker) Succeed() error {
err := bt.UpdateStatusWithRetry(SuccessStatus) if err := bt.setStatus(SuccessStatus); err != nil {
if !errs.IsStatusMismatchError(err) { return errors.Wrap(err, "succeed")
bt.refresh(SuccessStatus)
// Expire the stat data of the successful job
if er := bt.expire(statDataExpireTimeForSuccess); er != nil {
// Only logged
logger.Errorf("Expire stat data for the success job `%s` failed with error: %s", bt.jobID, er)
} }
if er := bt.fireHookEvent(SuccessStatus); err == nil && er != nil { return nil
return er
}
}
return err
} }
// Save the stats of job tracked by this tracker // Save the stats of job tracked by this tracker
@ -362,9 +347,13 @@ func (bt *basicTracker) Save() (err error) {
// Set the first revision // Set the first revision
args = append(args, "revision", time.Now().Unix()) args = append(args, "revision", time.Now().Unix())
// ACK data is saved/updated not via tracker, so ignore the ACK saving
// Do it in a transaction // Do it in a transaction
err = conn.Send("MULTI") err = conn.Send("MULTI")
err = conn.Send("HMSET", args...) err = conn.Send("HMSET", args...)
// Set inprogress track lock
err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2)
// If job kind is periodic job, expire time should not be set // If job kind is periodic job, expire time should not be set
// If job kind is scheduled job, expire time should be runAt+ // If job kind is scheduled job, expire time should be runAt+
@ -404,15 +393,14 @@ func (bt *basicTracker) Save() (err error) {
func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error { func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error {
err := bt.compareAndSet(targetStatus) err := bt.compareAndSet(targetStatus)
if err != nil { if err != nil {
// Status mismatching error will be ignored // Status mismatching error will be directly ignored as the status has already been outdated
if !errs.IsStatusMismatchError(err) { if !errs.IsStatusMismatchError(err) {
// Push to the retrying Q // Push to the retrying daemon
if er := bt.pushToQueueForRetry(targetStatus); er != nil { bt.retryList.Push(SimpleStatusChange{
logger.Errorf("push job status update request to retry queue error: %s", er) JobID: bt.jobID,
// If failed to put it into the retrying Q in case, let's downgrade to retry in current process TargetStatus: targetStatus.String(),
// by recursively call in goroutines. Revision: bt.jobStats.Info.Revision,
bt.retryUpdateStatus(targetStatus) })
}
} }
} }
@ -428,15 +416,48 @@ func (bt *basicTracker) Reset() error {
}() }()
now := time.Now().Unix() now := time.Now().Unix()
err := bt.Update( if _, err := rds.StatusResetScript.Do(
"status", conn,
rds.KeyJobStats(bt.namespace, bt.jobStats.Info.JobID),
rds.KeyJobTrackInProgress(bt.namespace),
bt.jobStats.Info.JobID,
PendingStatus.String(), PendingStatus.String(),
"revision",
now, now,
) ); err != nil {
if err == nil { return errors.Wrap(err, "reset")
bt.refresh(PendingStatus) }
// Sync current tracker
bt.jobStats.Info.Status = PendingStatus.String()
bt.jobStats.Info.Revision = now bt.jobStats.Info.Revision = now
bt.jobStats.Info.UpdateTime = now
bt.jobStats.Info.CheckIn = ""
bt.jobStats.Info.CheckInAt = 0
return nil
}
// FireHook fires status hook event to report current status
func (bt *basicTracker) FireHook() error {
return bt.fireHookEvent(
Status(bt.jobStats.Info.Status),
bt.jobStats.Info.CheckIn,
)
}
// setStatus sets the job status to the target status and fire status change hook
func (bt *basicTracker) setStatus(status Status) error {
err := bt.UpdateStatusWithRetry(status)
if !errs.IsStatusMismatchError(err) {
bt.refresh(status)
if er := bt.fireHookEvent(status); er != nil {
// Add more error context
if err != nil {
return errors.Wrap(er, err.Error())
}
return er
}
} }
return err return err
@ -480,72 +501,32 @@ func (bt *basicTracker) fireHookEvent(status Status, checkIn ...string) error {
return nil return nil
} }
func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error {
simpleStatusChange := &SimpleStatusChange{
JobID: bt.jobID,
TargetStatus: targetStatus.String(),
}
rawJSON, err := json.Marshal(simpleStatusChange)
if err != nil {
return err
}
conn := bt.pool.Get()
defer func() {
_ = conn.Close()
}()
key := rds.KeyStatusUpdateRetryQueue(bt.namespace)
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
_, err = conn.Do("ZADD", args...)
return err
}
func (bt *basicTracker) retryUpdateStatus(targetStatus Status) {
go func() {
select {
case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second):
// Check the update timestamp
if time.Now().Unix()-bt.jobStats.Info.UpdateTime < statDataExpireTime-24*3600 {
if err := bt.compareAndSet(targetStatus); err != nil {
logger.Errorf("Retry to update job status error: %s", err)
bt.retryUpdateStatus(targetStatus)
}
// Success
}
return
case <-bt.context.Done():
return // terminated
}
}()
}
func (bt *basicTracker) compareAndSet(targetStatus Status) error { func (bt *basicTracker) compareAndSet(targetStatus Status) error {
conn := bt.pool.Get() conn := bt.pool.Get()
defer func() { defer func() {
_ = conn.Close() closeConn(conn)
}() }()
rootKey := rds.KeyJobStats(bt.namespace, bt.jobID) rootKey := rds.KeyJobStats(bt.namespace, bt.jobID)
trackKey := rds.KeyJobTrackInProgress(bt.namespace)
st, err := getStatus(conn, rootKey) reply, err := redis.String(rds.SetStatusScript.Do(
conn,
rootKey,
trackKey,
targetStatus.String(),
bt.jobStats.Info.Revision,
time.Now().Unix(),
bt.jobID,
))
if err != nil { if err != nil {
return err return errors.Wrap(err, "compare and set status error")
} }
diff := st.Compare(targetStatus) if reply != "ok" {
if diff > 0 { return errs.StatusMismatchError(reply, targetStatus.String())
return errs.StatusMismatchError(st.String(), targetStatus.String())
} }
if diff == 0 {
// Desired matches actual
return nil return nil
}
return setStatus(conn, rootKey, targetStatus)
} }
// retrieve the stats of job tracked by this tracker from the backend data // retrieve the stats of job tracked by this tracker from the backend data
@ -626,11 +607,20 @@ func (bt *basicTracker) retrieve() error {
params := make(Parameters) params := make(Parameters)
if err := json.Unmarshal([]byte(value), &params); err == nil { if err := json.Unmarshal([]byte(value), &params); err == nil {
res.Info.Parameters = params res.Info.Parameters = params
} else {
logger.Error(errors.Wrap(err, "retrieve: tracker"))
} }
break break
case "revision": case "revision":
res.Info.Revision = parseInt64(value) res.Info.Revision = parseInt64(value)
break break
case "ack":
ack := &ACK{}
if err := json.Unmarshal([]byte(value), ack); err == nil {
res.Info.HookAck = ack
} else {
logger.Error(errors.Wrap(err, "retrieve: tracker"))
}
default: default:
break break
} }
@ -663,7 +653,7 @@ func (bt *basicTracker) expire(expireTime int64) error {
func getStatus(conn redis.Conn, key string) (Status, error) { func getStatus(conn redis.Conn, key string) (Status, error) {
values, err := rds.HmGet(conn, key, "status") values, err := rds.HmGet(conn, key, "status")
if err != nil { if err != nil {
return "", err return "", errors.Wrap(err, "get status error")
} }
if len(values) == 1 { if len(values) == 1 {
@ -676,10 +666,6 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
return "", errors.New("malformed status data returned") return "", errors.New("malformed status data returned")
} }
func setStatus(conn redis.Conn, key string, status Status) error {
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
}
func closeConn(conn redis.Conn) { func closeConn(conn redis.Conn) {
if conn != nil { if conn != nil {
if err := conn.Close(); err != nil { if err := conn.Close(); err != nil {

View File

@ -19,6 +19,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
@ -77,6 +79,7 @@ func (suite *TrackerTestSuite) TestTracker() {
func(hookURL string, change *StatusChange) error { func(hookURL string, change *StatusChange) error {
return nil return nil
}, },
list.New(),
) )
err := tracker.Save() err := tracker.Save()
@ -107,12 +110,19 @@ func (suite *TrackerTestSuite) TestTracker() {
assert.Error(suite.T(), err, "run: non nil error expected but got nil") assert.Error(suite.T(), err, "run: non nil error expected but got nil")
err = tracker.CheckIn("check in") err = tracker.CheckIn("check in")
assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err) assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err)
// check in is allowed to be repeated
err = tracker.CheckIn("check in2")
assert.Nil(suite.T(), err, "check in2: nil error expected but got %s", err)
err = tracker.Succeed() err = tracker.Succeed()
assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err) assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err)
err = tracker.Stop() // same status is allowed to update
assert.Nil(suite.T(), err, "stop: nil error expected but got %s", err) err = tracker.Succeed()
assert.Nil(suite.T(), err, "succeed again: nil error expected but got %s", err)
// final status can be set only once
err = tracker.Fail() err = tracker.Fail()
assert.Nil(suite.T(), err, "fail: nil error expected but got %s", err) assert.Error(suite.T(), err, "fail: error expected but got nil")
t := NewBasicTrackerWithID( t := NewBasicTrackerWithID(
context.TODO(), context.TODO(),
@ -122,10 +132,26 @@ func (suite *TrackerTestSuite) TestTracker() {
func(hookURL string, change *StatusChange) error { func(hookURL string, change *StatusChange) error {
return nil return nil
}, },
list.New(),
) )
err = t.Load() err = t.Load()
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
var st Status
err = t.Reset()
assert.NoError(suite.T(), err)
st, err = t.Status()
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), PendingStatus, st)
err = t.Stop()
assert.NoError(suite.T(), err)
st, err = t.Status()
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), StoppedStatus, st)
err = t.Expire() err = t.Expire()
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
} }
@ -146,7 +172,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
}, },
} }
t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil) t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil, nil)
err := t.Save() err := t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -166,7 +192,7 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
}, },
} }
t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil) t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil, nil)
err = t2.Save() err = t2.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -177,32 +203,3 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
err = t2.PeriodicExecutionDone() err = t2.PeriodicExecutionDone()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
} }
// TestPushForRetry tests push for retry
func (suite *TrackerTestSuite) TestPushForRetry() {
ID := utils.MakeIdentifier()
runAt := time.Now().Add(1 * time.Hour).Unix()
jobStats := &Stats{
Info: &StatsInfo{
JobID: ID,
Status: ScheduledStatus.String(),
JobKind: KindScheduled,
JobName: SampleJob,
IsUnique: false,
RunAt: runAt,
EnqueueTime: runAt,
},
}
t := &basicTracker{
namespace: suite.namespace,
context: context.TODO(),
pool: suite.pool,
jobID: ID,
jobStats: jobStats,
callback: nil,
}
err := t.pushToQueueForRetry(RunningStatus)
require.NoError(suite.T(), err)
}

View File

@ -16,15 +16,21 @@ package lcm
import ( import (
"context" "context"
"encoding/json" "math/rand"
"sync"
"time"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/pkg/errors" "github.com/pkg/errors"
"sync"
"time"
) )
const ( const (
@ -32,6 +38,10 @@ const (
shortLoopInterval = 5 * time.Second shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found // Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute longLoopInterval = 5 * time.Minute
// loopInterval is the interval for the loop of restoring dead status
loopInterval = 2 * time.Minute
// shortInterval is initial interval and be as based to give random buffer to loopInterval
shortInterval = 10
) )
// Controller is designed to control the life cycle of the job // Controller is designed to control the life cycle of the job
@ -53,6 +63,7 @@ type basicController struct {
pool *redis.Pool pool *redis.Pool
callback job.HookCallback callback job.HookCallback
wg *sync.WaitGroup wg *sync.WaitGroup
retryList *list.SyncList
} }
// NewController is the constructor of basic controller // NewController is the constructor of basic controller
@ -63,6 +74,7 @@ func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.H
pool: pool, pool: pool,
callback: callback, callback: callback,
wg: ctx.WG, wg: ctx.WG,
retryList: list.New(),
} }
} }
@ -86,7 +98,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
return nil, errors.Errorf("error occurred when creating job tracker: %s", err) return nil, errors.Errorf("error occurred when creating job tracker: %s", err)
} }
bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback) bt := job.NewBasicTrackerWithStats(bc.context, stats, bc.namespace, bc.pool, bc.callback, bc.retryList)
if err := bt.Save(); err != nil { if err := bt.Save(); err != nil {
return nil, err return nil, err
} }
@ -96,7 +108,7 @@ func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
// Track and attache with the job // Track and attache with the job
func (bc *basicController) Track(jobID string) (job.Tracker, error) { func (bc *basicController) Track(jobID string) (job.Tracker, error) {
bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback) bt := job.NewBasicTrackerWithID(bc.context, jobID, bc.namespace, bc.pool, bc.callback, bc.retryList)
if err := bt.Load(); err != nil { if err := bt.Load(); err != nil {
return nil, err return nil, err
} }
@ -105,75 +117,90 @@ func (bc *basicController) Track(jobID string) (job.Tracker, error) {
} }
// loopForRestoreDeadStatus is a loop to restore the dead states of jobs // loopForRestoreDeadStatus is a loop to restore the dead states of jobs
// Obviously,this retry is a try best action.
// The retry items are not persisted and they will be gone if the job service is restart.
func (bc *basicController) loopForRestoreDeadStatus() { func (bc *basicController) loopForRestoreDeadStatus() {
// Generate random timer duration
rd := func() time.Duration {
return longLoopInterval + time.Duration(rand.Int31n(shortInterval))*time.Second
}
defer func() { defer func() {
logger.Info("Status restoring loop is stopped") logger.Info("Status restoring loop is stopped")
bc.wg.Done() bc.wg.Done()
}() }()
token := make(chan bool, 1) // Initialize the timer
token <- true tm := time.NewTimer(shortInterval * time.Second)
defer tm.Stop()
for { for {
<-token
if err := bc.restoreDeadStatus(); err != nil {
waitInterval := shortLoopInterval
if err == rds.ErrNoElements {
// No elements
waitInterval = longLoopInterval
} else {
logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err)
}
// wait for a while or be terminated
select { select {
case <-time.After(waitInterval): case <-tm.C:
// Reset timer
tm.Reset(rd())
// Retry the items in the list
bc.retryLoop()
case <-bc.context.Done(): case <-bc.context.Done():
return return // terminated
} }
} }
// Return token
token <- true
}
} }
// restoreDeadStatus try to restore the dead status // retryLoop iterates the retry queue and do retrying
func (bc *basicController) restoreDeadStatus() error { func (bc *basicController) retryLoop() {
// Get one // Get connection
deadOne, err := bc.popOneDead()
if err != nil {
return err
}
// Try to update status
t, err := bc.Track(deadOne.JobID)
if err != nil {
return err
}
return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus))
}
// popOneDead retrieves one dead status from the backend Q from lowest to highest
func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) {
conn := bc.pool.Get() conn := bc.pool.Get()
defer func() { defer func() {
_ = conn.Close() // Return redis connection
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %v : %s", err, "retry loop: lcm")
}
}() }()
key := rds.KeyStatusUpdateRetryQueue(bc.namespace) // Check the list
v, err := rds.ZPopMin(conn, key) bc.retryList.Iterate(func(ele interface{}) bool {
if change, ok := ele.(job.SimpleStatusChange); ok {
err := retry(conn, bc.namespace, change)
if err != nil { if err != nil {
return nil, err // Log the error
logger.Errorf("Failed to retry the status update action: %v : %s", err, "retry loop: lcm")
} }
if bytes, ok := v.([]byte); ok { if err == nil || errs.IsStatusMismatchError(err) {
ssc := &job.SimpleStatusChange{} return true
if err := json.Unmarshal(bytes, ssc); err == nil {
return ssc, nil
} }
} }
return nil, errors.New("pop one dead error: bad result reply") return false
})
}
// retry status update action
func retry(conn redis.Conn, ns string, change job.SimpleStatusChange) error {
// Debug
logger.Debugf("Retry the status update action: %v", change)
rootKey := rds.KeyJobStats(ns, change.JobID)
trackKey := rds.KeyJobTrackInProgress(ns)
reply, err := redis.String(rds.SetStatusScript.Do(
conn,
rootKey,
trackKey,
change.TargetStatus,
change.Revision,
time.Now().Unix(),
change.JobID,
))
if err != nil {
return errors.Wrap(err, "retry")
}
if reply != "ok" {
return errs.StatusMismatchError(reply, change.TargetStatus)
}
return nil
} }

View File

@ -16,8 +16,10 @@ package lcm
import ( import (
"context" "context"
"encoding/json" "sync"
"github.com/goharbor/harbor/src/jobservice/common/rds" "testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
@ -26,9 +28,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"sync"
"testing"
"time"
) )
// LcmControllerTestSuite tests functions of life cycle controller // LcmControllerTestSuite tests functions of life cycle controller
@ -65,48 +64,36 @@ func TestLcmControllerTestSuite(t *testing.T) {
suite.Run(t, new(LcmControllerTestSuite)) suite.Run(t, new(LcmControllerTestSuite))
} }
// TestNewAndTrack tests controller.New() and controller.Track() // TestController tests lcm controller
func (suite *LcmControllerTestSuite) TestNewAndTrack() { func (suite *LcmControllerTestSuite) TestController() {
// Prepare mock data
jobID := utils.MakeIdentifier() jobID := utils.MakeIdentifier()
suite.newsStats(jobID) rev := time.Now().Unix()
suite.newsStats(jobID, rev)
simpleChange := job.SimpleStatusChange{
JobID: jobID,
TargetStatus: job.RunningStatus.String(),
Revision: rev,
}
// Just test if the server can be started
err := suite.ctl.Serve()
require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err)
// Test retry loop
bc := suite.ctl.(*basicController)
bc.retryList.Push(simpleChange)
bc.retryLoop()
t, err := suite.ctl.Track(jobID) t, err := suite.ctl.Track(jobID)
require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err) require.Nil(suite.T(), err, "lcm track: nil error expected but got %s", err)
assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName) assert.Equal(suite.T(), job.SampleJob, t.Job().Info.JobName, "lcm new: expect job name %s but got %s", job.SampleJob, t.Job().Info.JobName)
} assert.Equal(suite.T(), job.RunningStatus.String(), t.Job().Info.Status)
// TestNew tests controller.Serve()
func (suite *LcmControllerTestSuite) TestServe() {
// Prepare mock data
jobID := utils.MakeIdentifier()
suite.newsStats(jobID)
conn := suite.pool.Get()
defer func() {
_ = conn.Close()
}()
simpleChange := &job.SimpleStatusChange{
JobID: jobID,
TargetStatus: job.RunningStatus.String(),
}
rawJSON, err := json.Marshal(simpleChange)
require.Nil(suite.T(), err, "json marshal: nil error expected but got %s", err)
key := rds.KeyStatusUpdateRetryQueue(suite.namespace)
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
_, err = conn.Do("ZADD", args...)
require.Nil(suite.T(), err, "prepare mock data: nil error expected but got %s", err)
err = suite.ctl.Serve()
require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err)
<-time.After(1 * time.Second)
count, err := redis.Int(conn.Do("ZCARD", key))
require.Nil(suite.T(), err, "get total dead status: nil error expected but got %s", err)
assert.Equal(suite.T(), 0, count)
} }
// newsStats create job stats // newsStats create job stats
func (suite *LcmControllerTestSuite) newsStats(jobID string) { func (suite *LcmControllerTestSuite) newsStats(jobID string, revision int64) {
stats := &job.Stats{ stats := &job.Stats{
Info: &job.StatsInfo{ Info: &job.StatsInfo{
JobID: jobID, JobID: jobID,
@ -114,6 +101,7 @@ func (suite *LcmControllerTestSuite) newsStats(jobID string) {
JobName: job.SampleJob, JobName: job.SampleJob,
IsUnique: true, IsUnique: true,
Status: job.PendingStatus.String(), Status: job.PendingStatus.String(),
Revision: revision,
}, },
} }

View File

@ -17,6 +17,9 @@ package mgt
import ( import (
"context" "context"
"fmt" "fmt"
"strconv"
"strings"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/query"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
@ -27,8 +30,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/period" "github.com/goharbor/harbor/src/jobservice/period"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/pkg/errors" "github.com/pkg/errors"
"strconv"
"strings"
) )
// Manager defies the related operations to handle the management of job stats. // Manager defies the related operations to handle the management of job stats.
@ -154,7 +155,7 @@ func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error)
statsKey := string(bytes) statsKey := string(bytes)
if i := strings.LastIndex(statsKey, ":"); i != -1 { if i := strings.LastIndex(statsKey, ":"); i != -1 {
jID := statsKey[i+1:] jID := statsKey[i+1:]
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil)
if err := t.Load(); err != nil { if err := t.Load(); err != nil {
logger.Errorf("retrieve stats data of job %s error: %s", jID, err) logger.Errorf("retrieve stats data of job %s error: %s", jID, err)
continue continue
@ -174,7 +175,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re
return nil, 0, errors.New("nil periodic job ID") return nil, 0, errors.New("nil periodic job ID")
} }
tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil) tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil, nil)
err = tracker.Load() err = tracker.Load()
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
@ -239,7 +240,7 @@ func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (re
} }
for _, eID := range executionIDs { for _, eID := range executionIDs {
t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil, nil)
if er := t.Load(); er != nil { if er := t.Load(); er != nil {
logger.Errorf("track job %s error: %s", eID, err) logger.Errorf("track job %s error: %s", eID, err)
continue continue
@ -273,7 +274,7 @@ func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int6
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt) jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
} }
} }
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil, nil)
err = t.Load() err = t.Load()
if err != nil { if err != nil {
// Just log it // Just log it
@ -293,7 +294,7 @@ func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) {
return nil, errs.BadRequestError("empty job ID") return nil, errs.BadRequestError("empty job ID")
} }
t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil, nil)
if err := t.Load(); err != nil { if err := t.Load(); err != nil {
return nil, err return nil, err
} }
@ -307,7 +308,7 @@ func (bm *basicManager) SaveJob(j *job.Stats) error {
return errs.BadRequestError("nil saving job stats") return errs.BadRequestError("nil saving job stats")
} }
t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil) t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil, nil)
return t.Save() return t.Save()
} }

View File

@ -16,7 +16,11 @@ package mgt
import ( import (
"context" "context"
"testing"
"time"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/query"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
@ -24,8 +28,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"testing"
"time"
) )
// BasicManagerTestSuite tests the function of basic manager // BasicManagerTestSuite tests the function of basic manager
@ -56,7 +58,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() {
}, },
} }
t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil, list.New())
err := t.Save() err := t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -71,7 +73,7 @@ func (suite *BasicManagerTestSuite) SetupSuite() {
UpstreamJobID: "1000", UpstreamJobID: "1000",
}, },
} }
t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil) t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil, list.New())
err = t.Save() err = t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
} }
@ -117,7 +119,7 @@ func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() {
assert.Equal(suite.T(), int64(1), total) assert.Equal(suite.T(), int64(1), total)
assert.Equal(suite.T(), int64(1), int64(len(jobs))) assert.Equal(suite.T(), int64(1), int64(len(jobs)))
t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil, list.New())
err = t.Load() err = t.Load()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
err = t.PeriodicExecutionDone() err = t.PeriodicExecutionDone()
@ -147,17 +149,17 @@ func (suite *BasicManagerTestSuite) TestGetScheduledJobs() {
}, },
} }
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil) t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil, list.New())
err = t.Save() err = t.Save()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{ l, total, err := suite.manager.GetScheduledJobs(&query.Parameter{
PageNumber: 1, PageNumber: 1,
PageSize: 10, PageSize: 10,
}) })
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), total) assert.Equal(suite.T(), int64(1), total)
assert.Equal(suite.T(), int64(1), int64(len(list))) assert.Equal(suite.T(), int64(1), int64(len(l)))
} }
// TestGetJob tests get job // TestGetJob tests get job

View File

@ -15,10 +15,9 @@
package period package period
import ( import (
"encoding/json" "context"
"time" "time"
"context"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
@ -52,27 +51,13 @@ func NewScheduler(ctx context.Context, namespace string, pool *redis.Pool, ctl l
} }
// Start the periodic scheduling process // Start the periodic scheduling process
// Blocking call here func (bs *basicScheduler) Start() {
func (bs *basicScheduler) Start() error { // Run once clean
defer func() {
logger.Info("Basic scheduler is stopped")
}()
// Try best to do // Try best to do
go bs.clearDirtyJobs() go bs.clearDirtyJobs()
logger.Info("Basic scheduler is started")
// start enqueuer // start enqueuer
return bs.enqueuer.start() bs.enqueuer.start()
}
// Stop the periodic scheduling process
func (bs *basicScheduler) Stop() error {
// stop everything
bs.enqueuer.stopChan <- true
return nil
} }
// Schedule is implementation of the same method in period.Interface // Schedule is implementation of the same method in period.Interface
@ -99,24 +84,10 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) {
return -1, err return -1, err
} }
// Prepare publish message
m := &message{
Event: changeEventSchedule,
Data: p,
}
msgJSON, err := json.Marshal(m)
if err != nil {
return -1, err
}
pid := time.Now().Unix() pid := time.Now().Unix()
// Save to redis db and publish notification via redis transaction // Save to redis db
err = conn.Send("MULTI") if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil {
err = conn.Send("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON)
err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
if _, err := conn.Do("EXEC"); err != nil {
return -1, err return -1, err
} }
@ -166,25 +137,9 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID) return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID)
} }
notification := &message{ // REM from redis db
Event: changeEventUnSchedule, // Accurately remove the item with the specified score
Data: p, if _, err := conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID); err != nil {
}
msgJSON, err := json.Marshal(notification)
if err != nil {
return err
}
// REM from redis db with transaction way
err = conn.Send("MULTI")
err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score
err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
if err != nil {
return err
}
_, err = conn.Do("EXEC")
if err != nil {
return err return err
} }

View File

@ -16,6 +16,10 @@ package period
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
@ -25,9 +29,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"sync"
"testing"
"time"
) )
// BasicSchedulerTestSuite tests functions of basic scheduler // BasicSchedulerTestSuite tests functions of basic scheduler
@ -63,6 +64,7 @@ func (suite *BasicSchedulerTestSuite) SetupSuite() {
) )
suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl) suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl)
suite.scheduler.Start()
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
@ -84,20 +86,6 @@ func TestSchedulerTestSuite(t *testing.T) {
// TestScheduler tests scheduling and un-scheduling // TestScheduler tests scheduling and un-scheduling
func (suite *BasicSchedulerTestSuite) TestScheduler() { func (suite *BasicSchedulerTestSuite) TestScheduler() {
go func() {
<-time.After(1 * time.Second)
_ = suite.scheduler.Stop()
}()
go func() {
var err error
defer func() {
require.NoError(suite.T(), err, "start scheduler: nil error expected but got %s", err)
}()
err = suite.scheduler.Start()
}()
// Prepare one // Prepare one
now := time.Now() now := time.Now()
minute := now.Minute() minute := now.Minute()

View File

@ -43,14 +43,11 @@ type enqueuer struct {
namespace string namespace string
context context.Context context context.Context
pool *redis.Pool pool *redis.Pool
policyStore *policyStore
ctl lcm.Controller ctl lcm.Controller
// Diff with other nodes // Diff with other nodes
nodeID string nodeID string
// Track the error of enqueuing // Track the error of enqueuing
lastEnqueueErr error lastEnqueueErr error
// For stop
stopChan chan bool
} }
func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer { func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer {
@ -64,29 +61,20 @@ func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lc
context: ctx, context: ctx,
namespace: namespace, namespace: namespace,
pool: pool, pool: pool,
policyStore: newPolicyStore(ctx, namespace, pool),
ctl: ctl, ctl: ctl,
stopChan: make(chan bool, 1),
nodeID: nodeID.(string), nodeID: nodeID.(string),
} }
} }
// Blocking call // Blocking call
func (e *enqueuer) start() error { func (e *enqueuer) start() {
// Load policies first when starting
if err := e.policyStore.load(); err != nil {
return err
}
go e.loop() go e.loop()
logger.Info("Periodic enqueuer is started") logger.Info("Scheduler: periodic enqueuer is started")
return e.policyStore.serve()
} }
func (e *enqueuer) loop() { func (e *enqueuer) loop() {
defer func() { defer func() {
logger.Info("Periodic enqueuer is stopped") logger.Info("Scheduler: periodic enqueuer is stopped")
}() }()
// Do enqueue immediately when starting // Do enqueue immediately when starting
@ -98,10 +86,8 @@ func (e *enqueuer) loop() {
for { for {
select { select {
case <-e.stopChan: case <-e.context.Done():
// Stop policy store now return // exit
e.policyStore.stopChan <- true
return
case <-timer.C: case <-timer.C:
// Pause the timer for completing the processing this time // Pause the timer for completing the processing this time
timer.Reset(neverExecuted) timer.Reset(neverExecuted)
@ -157,10 +143,17 @@ func (e *enqueuer) enqueue() {
// Reset error track // Reset error track
e.lastEnqueueErr = nil e.lastEnqueueErr = nil
e.policyStore.Iterate(func(id string, p *Policy) bool { // Load policies and schedule next jobs for them
pls, err := Load(e.namespace, conn)
if err != nil {
// Log error
logger.Errorf("%s:%s", err, "enqueue error: enqueuer")
return
}
for _, p := range pls {
e.scheduleNextJobs(p, conn) e.scheduleNextJobs(p, conn)
return true }
})
} }
// scheduleNextJobs schedules job for next time slots based on the policy // scheduleNextJobs schedules job for next time slots based on the policy

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
@ -30,7 +31,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
@ -69,8 +69,9 @@ func (suite *EnqueuerTestSuite) SetupSuite() {
func(hookURL string, change *job.StatusChange) error { return nil }, func(hookURL string, change *job.StatusChange) error { return nil },
) )
suite.enqueuer = newEnqueuer(ctx, suite.namespace, suite.pool, lcmCtl) suite.enqueuer = newEnqueuer(ctx, suite.namespace, suite.pool, lcmCtl)
suite.prepare() suite.prepare()
suite.enqueuer.start()
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
@ -87,15 +88,12 @@ func (suite *EnqueuerTestSuite) TearDownSuite() {
// TestEnqueuer tests enqueuer // TestEnqueuer tests enqueuer
func (suite *EnqueuerTestSuite) TestEnqueuer() { func (suite *EnqueuerTestSuite) TestEnqueuer() {
go func() {
defer func() {
suite.enqueuer.stopChan <- true
}()
key := rds.RedisKeyScheduled(suite.namespace) key := rds.RedisKeyScheduled(suite.namespace)
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}
}() }()
tk := time.NewTicker(500 * time.Millisecond) tk := time.NewTicker(500 * time.Millisecond)
@ -116,10 +114,6 @@ func (suite *EnqueuerTestSuite) TestEnqueuer() {
return return
} }
} }
}()
err := suite.enqueuer.start()
require.Nil(suite.T(), err, "enqueuer start: nil error expected but got %s", err)
} }
func (suite *EnqueuerTestSuite) prepare() { func (suite *EnqueuerTestSuite) prepare() {

View File

@ -15,26 +15,15 @@
package period package period
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"sync"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/robfig/cron" "github.com/robfig/cron"
"strings"
)
const (
// changeEventSchedule : Schedule periodic job policy event
changeEventSchedule = "Schedule"
// changeEventUnSchedule : UnSchedule periodic job policy event
changeEventUnSchedule = "UnSchedule"
) )
// Policy ... // Policy ...
@ -81,184 +70,14 @@ func (p *Policy) Validate() error {
return nil return nil
} }
// policyStore is in-memory cache for the periodic job policies. // Load all the policies from the backend storage.
type policyStore struct { func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
// k-v pair and key is the policy ID bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1))
hash *sync.Map
namespace string
context context.Context
pool *redis.Pool
// For stop
stopChan chan bool
}
// message is designed for sub/pub messages
type message struct {
Event string `json:"event"`
Data *Policy `json:"data"`
}
// newPolicyStore is constructor of policyStore
func newPolicyStore(ctx context.Context, ns string, pool *redis.Pool) *policyStore {
return &policyStore{
hash: new(sync.Map),
context: ctx,
namespace: ns,
pool: pool,
stopChan: make(chan bool, 1),
}
}
// Blocking call
func (ps *policyStore) serve() (err error) {
defer func() {
logger.Info("Periodical job policy store is stopped")
}()
conn := ps.pool.Get()
psc := redis.PubSubConn{
Conn: conn,
}
defer func() {
_ = psc.Close()
}()
// Subscribe channel
err = psc.Subscribe(redis.Args{}.AddFlat(rds.KeyPeriodicNotification(ps.namespace))...)
if err != nil { if err != nil {
return return nil, err
} }
// Channels for sub/pub ctl policies := make([]*Policy, 0)
errChan := make(chan error, 1)
done := make(chan bool, 1)
go func() {
for {
switch res := psc.Receive().(type) {
case error:
errChan <- fmt.Errorf("redis sub/pub chan error: %s", res.(error).Error())
break
case redis.Message:
m := &message{}
if err := json.Unmarshal(res.Data, m); err != nil {
// logged
logger.Errorf("Read invalid message: %s\n", res.Data)
break
}
if err := ps.sync(m); err != nil {
logger.Error(err)
}
break
case redis.Subscription:
switch res.Kind {
case "subscribe":
logger.Infof("Subscribe redis channel %s", res.Channel)
break
case "unsubscribe":
// Unsubscribe all, means main goroutine is exiting
logger.Infof("Unsubscribe redis channel %s", res.Channel)
done <- true
return
}
}
}
}()
logger.Info("Periodical job policy store is serving with policy auto sync enabled")
defer func() {
var unSubErr error
defer func() {
// Merge errors
finalErrs := make([]string, 0)
if unSubErr != nil {
finalErrs = append(finalErrs, unSubErr.Error())
}
if err != nil {
finalErrs = append(finalErrs, err.Error())
}
if len(finalErrs) > 0 {
// Override returned err or do nothing
err = errors.New(strings.Join(finalErrs, ";"))
}
}()
// Unsubscribe all
if err := psc.Unsubscribe(); err != nil {
logger.Errorf("unsubscribe: %s", err)
}
// Confirm result
// Add timeout in case unsubscribe failed
select {
case unSubErr = <-errChan:
return
case <-done:
return
case <-time.After(30 * time.Second):
unSubErr = errors.New("unsubscribe time out")
return
}
}()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
// blocking here
for {
select {
case <-ticker.C:
err = psc.Ping("ping!")
if err != nil {
return
}
case <-ps.stopChan:
return nil
case err = <-errChan:
return
}
}
}
// sync policy with backend list
func (ps *policyStore) sync(m *message) error {
if m == nil {
return errors.New("nil message")
}
if m.Data == nil {
return errors.New("missing data in the policy sync message")
}
switch m.Event {
case changeEventSchedule:
if err := ps.add(m.Data); err != nil {
return fmt.Errorf("failed to sync scheduled policy %s: %s", m.Data.ID, err)
}
case changeEventUnSchedule:
removed := ps.remove(m.Data.ID)
if removed == nil {
return fmt.Errorf("failed to sync unscheduled policy %s", m.Data.ID)
}
default:
return fmt.Errorf("message %s is not supported", m.Event)
}
return nil
}
// Load all the policies from the backend to store
func (ps *policyStore) load() error {
conn := ps.pool.Get()
defer func() {
_ = conn.Close()
}()
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(ps.namespace), 0, -1))
if err != nil {
return err
}
count := 0
for i, l := 0, len(bytes); i < l; i++ { for i, l := 0, len(bytes); i < l; i++ {
rawPolicy := bytes[i].([]byte) rawPolicy := bytes[i].([]byte)
p := &Policy{} p := &Policy{}
@ -266,62 +85,22 @@ func (ps *policyStore) load() error {
if err := p.DeSerialize(rawPolicy); err != nil { if err := p.DeSerialize(rawPolicy); err != nil {
// Ignore error which means the policy data is not valid // Ignore error which means the policy data is not valid
// Only logged // Only logged
logger.Errorf("malform policy: %s; error: %s\n", rawPolicy, err) logger.Errorf("Malformed policy: %s; error: %s", rawPolicy, err)
continue continue
} }
// Add to cache store // Validate the policy object
if err := ps.add(p); err != nil { if err := p.Validate(); err != nil {
// Only logged logger.Errorf("Policy validate error: %s", err)
logger.Errorf("cache periodic policies error: %s", err)
continue continue
} }
count++ policies = append(policies, p)
logger.Debugf("Load periodic job policy: %s", string(rawPolicy)) logger.Debugf("Load periodic job policy: %s", string(rawPolicy))
} }
logger.Infof("Load %d periodic job policies", count) logger.Debugf("Load %d periodic job policies", len(policies))
return nil return policies, nil
}
// Add one or more policy
func (ps *policyStore) add(item *Policy) error {
if item == nil {
return errors.New("nil policy to add")
}
if utils.IsEmptyStr(item.ID) {
return errors.New("malform policy to add")
}
v, _ := ps.hash.LoadOrStore(item.ID, item)
if v == nil {
return fmt.Errorf("failed to add policy: %s", item.ID)
}
return nil
}
// Iterate all the policies in the store
func (ps *policyStore) Iterate(f func(id string, p *Policy) bool) {
ps.hash.Range(func(k, v interface{}) bool {
return f(k.(string), v.(*Policy))
})
}
// Remove the specified policy from the store
func (ps *policyStore) remove(policyID string) *Policy {
if utils.IsEmptyStr(policyID) {
return nil
}
if v, ok := ps.hash.Load(policyID); ok {
ps.hash.Delete(policyID)
return v.(*Policy)
}
return nil
} }

View File

@ -14,25 +14,23 @@
package period package period
import ( import (
"context" "testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"testing"
"time"
) )
// PolicyStoreTestSuite tests functions of policy store // PolicyStoreTestSuite tests functions of policy store
type PolicyStoreTestSuite struct { type PolicyStoreTestSuite struct {
suite.Suite suite.Suite
store *policyStore
namespace string namespace string
pool *redis.Pool pool *redis.Pool
cancel context.CancelFunc
} }
// TestPolicyStoreTestSuite is entry of go test // TestPolicyStoreTestSuite is entry of go test
@ -44,37 +42,20 @@ func TestPolicyStoreTestSuite(t *testing.T) {
func (suite *PolicyStoreTestSuite) SetupSuite() { func (suite *PolicyStoreTestSuite) SetupSuite() {
suite.namespace = tests.GiveMeTestNamespace() suite.namespace = tests.GiveMeTestNamespace()
suite.pool = tests.GiveMeRedisPool() suite.pool = tests.GiveMeRedisPool()
ctx, cancel := context.WithCancel(context.Background())
suite.cancel = cancel
suite.store = newPolicyStore(ctx, suite.namespace, suite.pool)
} }
// TearDownSuite clears the test suite // TearDownSuite clears the test suite
func (suite *PolicyStoreTestSuite) TearDownSuite() { func (suite *PolicyStoreTestSuite) TearDownSuite() {
suite.cancel()
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}
}() }()
_ = tests.ClearAll(suite.namespace, conn) if err := tests.ClearAll(suite.namespace, conn); err != nil {
} suite.NoError(err, "clear redis namespace")
}
// TestStore tests policy store serve
func (suite *PolicyStoreTestSuite) TestServe() {
var err error
defer func() {
suite.store.stopChan <- true
assert.Nil(suite.T(), err, "serve exit: nil error expected but got %s", err)
}()
go func() {
err = suite.store.serve()
}()
<-time.After(1 * time.Second)
} }
// TestLoad tests load policy from backend // TestLoad tests load policy from backend
@ -91,47 +72,17 @@ func (suite *PolicyStoreTestSuite) TestLoad() {
conn := suite.pool.Get() conn := suite.pool.Get()
defer func() { defer func() {
_ = conn.Close() if err := conn.Close(); err != nil {
suite.NoError(err, "close redis connection")
}
}() }()
_, err = conn.Do("ZADD", key, time.Now().Unix(), rawData) _, err = conn.Do("ZADD", key, time.Now().Unix(), rawData)
assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err) assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err)
err = suite.store.load() ps, err := Load(suite.namespace, conn)
assert.Nil(suite.T(), err, "load: nil error expected but got %s", err) suite.NoError(err, "load: nil error expected but got %s", err)
suite.Equal(1, len(ps), "count of loaded policies")
p1 := &Policy{
ID: "fake_policy_1",
JobName: job.SampleJob,
CronSpec: "5 * * * * *",
}
m := &message{
Event: changeEventSchedule,
Data: p1,
}
err = suite.store.sync(m)
assert.Nil(suite.T(), err, "sync schedule: nil error expected but got %s", err)
count := 0
suite.store.Iterate(func(id string, p *Policy) bool {
count++
return true
})
assert.Equal(suite.T(), 2, count, "expected 2 policies but got %d", count)
m1 := &message{
Event: changeEventUnSchedule,
Data: p1,
}
err = suite.store.sync(m1)
assert.Nil(suite.T(), err, "sync unschedule: nil error expected but got %s", err)
count = 0
suite.store.Iterate(func(id string, p *Policy) bool {
count++
return true
})
assert.Equal(suite.T(), 1, count, "expected 1 policies but got %d", count)
} }
// TestPolicy tests policy itself // TestPolicy tests policy itself

View File

@ -18,15 +18,7 @@ package period
type Scheduler interface { type Scheduler interface {
// Start to serve periodic job scheduling process // Start to serve periodic job scheduling process
// //
// Returns: Start()
// error if any problems happened
Start() error
// Stop the working periodic job scheduling process
//
// Returns;
// error if any problems happened
Stop() error
// Schedule the specified cron job policy. // Schedule the specified cron job policy.
// //

View File

@ -17,7 +17,8 @@ package runner
import ( import (
"fmt" "fmt"
"runtime" "runtime"
"time"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
@ -51,21 +52,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
runningJob job.Interface runningJob job.Interface
execContext job.Context execContext job.Context
tracker job.Tracker tracker job.Tracker
markStopped = bp(false)
) )
// Defer to log the exit result
defer func() {
if !*markStopped {
if err == nil {
logger.Infof("|^_^| Job '%s:%s' exit with success", j.Name, j.ID)
} else {
// log error
logger.Errorf("|@_@| Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
}
}
}()
// Track the running job now // Track the running job now
jID := j.ID jID := j.ID
@ -75,60 +63,33 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
if tracker, err = rj.ctl.Track(jID); err != nil { if tracker, err = rj.ctl.Track(jID); err != nil {
// log error
logger.Errorf("Job '%s:%s' exit with error: failed to get job tracker: %s", j.Name, j.ID, err)
// Pay attentions here, if the job stats is lost (NOTFOUND error returned),
// directly return without retry again as we have no way to restore the stats again.
if errs.IsObjectNotFoundError(err) {
j.Fails = 10000000000 // never retry
}
// ELSE:
// As tracker creation failed, there is no way to mark the job status change. // As tracker creation failed, there is no way to mark the job status change.
// Also a non nil error return consumes a fail. If all retries are failed here, // Also a non nil error return consumes a fail. If all retries are failed here,
// it will cause the job to be zombie one (pending forever). // it will cause the job to be zombie one (pending forever).
// Here we will avoid the job to consume a fail and let it retry again and again. // Those zombie ones will be reaped by the reaper later.
// However, to avoid a forever retry, we will check the FailedAt timestamp.
now := time.Now().Unix()
if j.FailedAt == 0 || now-j.FailedAt < 2*24*3600 {
j.Fails--
}
return return
} }
// Do operation based on the job status
jStatus := job.Status(tracker.Job().Info.Status)
switch jStatus {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
break
case job.StoppedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
markStopped = bp(true)
return nil
case job.ErrorStatus:
if j.FailedAt > 0 && j.Fails > 0 {
// Retry job
// Reset job info
if er := tracker.Reset(); er != nil {
// Log error and return the original error if existing
er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID))
logger.Error(er)
if len(j.LastErr) > 0 {
return errors.New(j.LastErr)
}
return err
}
logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
}
break
default:
return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String())
}
// Defer to switch status // Defer to switch status
defer func() { defer func() {
// Switch job status based on the returned error. // Switch job status based on the returned error.
// The err happened here should not override the job run error, just log it. // The err happened here should not override the job run error, just log it.
if err != nil { if err != nil {
// log error
logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
if er := tracker.Fail(); er != nil { if er := tracker.Fail(); er != nil {
logger.Errorf("Mark job status to failure error: %s", err) logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er)
} }
return return
@ -136,19 +97,20 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Nil error might be returned by the stopped job. Check the latest status here. // Nil error might be returned by the stopped job. Check the latest status here.
// If refresh latest status failed, let the process to go on to void missing status updating. // If refresh latest status failed, let the process to go on to void missing status updating.
if latest, er := tracker.Status(); er == nil { if latest, er := tracker.Status(); er != nil {
logger.Errorf("Error occurred when getting the status of job %s:%s: %s", j.Name, j.ID, er)
} else {
if latest == job.StoppedStatus { if latest == job.StoppedStatus {
// Logged // Logged
logger.Infof("Job %s:%s is stopped", tracker.Job().Info.JobName, tracker.Job().Info.JobID) logger.Infof("Job %s:%s is stopped", j.Name, j.ID)
// Stopped job, no exit message printing.
markStopped = bp(true)
return return
} }
} }
// Mark job status to success. // Mark job status to success.
logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID)
if er := tracker.Succeed(); er != nil { if er := tracker.Succeed(); er != nil {
logger.Errorf("Mark job status to success error: %s", er) logger.Errorf("Error occurred when marking the status of job %s:%s to success: %s", j.Name, j.ID, er)
} }
}() }()
@ -163,6 +125,40 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
}() }()
// Do operation based on the job status
jStatus := job.Status(tracker.Job().Info.Status)
switch jStatus {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
break
case job.StoppedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
return nil
case job.RunningStatus, job.ErrorStatus:
// The failed jobs can be put into retry queue and the in progress jobs may be
// interrupted by a sudden service crash event, all those jobs can be rescheduled.
// Reset job info.
if err = tracker.Reset(); err != nil {
// Log error and return the original error if existing
err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID))
if len(j.LastErr) > 0 {
err = errors.Wrap(err, j.LastErr)
}
return
}
logger.Infof("Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
break
case job.SuccessStatus:
// do nothing
return nil
default:
return errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())
}
// Build job context // Build job context
if rj.context.JobContext == nil { if rj.context.JobContext == nil {
rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext) rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext)
@ -189,6 +185,11 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} }
// Run the job // Run the job
err = runningJob.Run(execContext, j.Args) err = runningJob.Run(execContext, j.Args)
// Add error context
if err != nil {
err = errors.Wrap(err, "run error")
}
// Handle retry // Handle retry
rj.retry(runningJob, j) rj.retry(runningJob, j)
// Handle periodic job execution // Handle periodic job execution

View File

@ -15,12 +15,13 @@ package runner
import ( import (
"context" "context"
"github.com/stretchr/testify/assert"
"os" "os"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger/backend" "github.com/goharbor/harbor/src/jobservice/logger/backend"
@ -154,7 +155,7 @@ func (suite *RedisRunnerTestSuite) TestJobWrapperInvalidTracker() {
redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl) redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl)
err := redisJob.Run(j) err := redisJob.Run(j)
require.Error(suite.T(), err, "redis job: non nil error expected but got nil") require.Error(suite.T(), err, "redis job: non nil error expected but got nil")
assert.Equal(suite.T(), int64(2), j.Fails) assert.Equal(suite.T(), int64(10000000000), j.Fails)
} }
// TestJobWrapperPanic tests job runner panic // TestJobWrapperPanic tests job runner panic

View File

@ -117,7 +117,12 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
hookCallback := func(URL string, change *job.StatusChange) error { hookCallback := func(URL string, change *job.StatusChange) error {
msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status) msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status)
if !utils.IsEmptyStr(change.CheckIn) { if !utils.IsEmptyStr(change.CheckIn) {
msg = fmt.Sprintf("%s, check_in=%s", msg, change.CheckIn) // Ignore the real check in message to avoid too big message stream
cData := change.CheckIn
if len(cData) > 256 {
cData = fmt.Sprintf("<DATA BLOCK: %d bytes>", len(cData))
}
msg = fmt.Sprintf("%s, check_in=%s", msg, cData)
} }
evt := &hook.Event{ evt := &hook.Event{
@ -153,7 +158,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Start agent // Start agent
// Non blocking call // Non blocking call
hookAgent.Attach(lcmCtl)
if err = hookAgent.Serve(); err != nil { if err = hookAgent.Serve(); err != nil {
return errors.Errorf("start hook agent error: %s", err) return errors.Errorf("start hook agent error: %s", err)
} }
@ -186,6 +190,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
terminated = true terminated = true
return return
case err = <-errChan: case err = <-errChan:
logger.Errorf("Received error from error chan: %s", err)
return return
} }
}(rootContext.ErrorChan) }(rootContext.ErrorChan)

View File

@ -54,6 +54,7 @@ type basicWorker struct {
context *env.Context context *env.Context
scheduler period.Scheduler scheduler period.Scheduler
ctl lcm.Controller ctl lcm.Controller
reaper *reaper
// key is name of known job // key is name of known job
// value is the type of known job // value is the type of known job
@ -92,6 +93,13 @@ func NewWorker(ctx *env.Context, namespace string, workerCount uint, redisPool *
ctl: ctl, ctl: ctl,
context: ctx, context: ctx,
knownJobs: new(sync.Map), knownJobs: new(sync.Map),
reaper: &reaper{
context: ctx.SystemContext,
namespace: namespace,
pool: redisPool,
lcmCtl: ctl,
jobTypes: make([]string, 0), // Append data later (at the start step)
},
} }
} }
@ -121,16 +129,7 @@ func (w *basicWorker) Start() error {
} }
// Start the periodic scheduler // Start the periodic scheduler
w.context.WG.Add(1) w.scheduler.Start()
go func() {
defer func() {
w.context.WG.Done()
}()
// Blocking call
if err := w.scheduler.Start(); err != nil {
w.context.ErrorChan <- err
}
}()
// Listen to the system signal // Listen to the system signal
w.context.WG.Add(1) w.context.WG.Add(1)
@ -139,10 +138,8 @@ func (w *basicWorker) Start() error {
w.context.WG.Done() w.context.WG.Done()
logger.Infof("Basic worker is stopped") logger.Infof("Basic worker is stopped")
}() }()
<-w.context.SystemContext.Done() <-w.context.SystemContext.Done()
if err := w.scheduler.Stop(); err != nil {
logger.Errorf("stop scheduler error: %s", err)
}
w.pool.Stop() w.pool.Stop()
}() }()
@ -151,7 +148,15 @@ func (w *basicWorker) Start() error {
w.pool.Middleware((*workerContext).logJob) w.pool.Middleware((*workerContext).logJob)
// Non blocking call // Non blocking call
w.pool.Start() w.pool.Start()
logger.Infof("Redis worker is started") logger.Infof("Basic worker is started")
// Start the reaper
w.knownJobs.Range(func(k interface{}, v interface{}) bool {
w.reaper.jobTypes = append(w.reaper.jobTypes, k.(string))
return true
})
w.reaper.start()
return nil return nil
} }

View File

@ -0,0 +1,363 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cworker
import (
"context"
"fmt"
"strconv"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
)
const (
maxUpdateDuration = 24 * time.Hour
reapLoopInterval = 1 * time.Hour
initialReapLoopInterval = 5 * time.Minute
)
// reaper is designed to reap the outdated job stats and web hook
type reaper struct {
context context.Context
namespace string
pool *redis.Pool
lcmCtl lcm.Controller
jobTypes []string
}
// start the reap process
// Non blocking call
func (r *reaper) start() {
// Start the interval job stats sync loop
go func() {
defer logger.Info("Reaper is stopped")
tm := time.NewTimer(initialReapLoopInterval)
defer tm.Stop()
logger.Info("Reaper is started")
for {
select {
case <-tm.C:
tm.Reset(reapLoopInterval)
if err := r.syncOutdatedStats(); err != nil {
// Just log
logger.Error(err)
}
case <-r.context.Done():
return // Terminated
}
}
}()
// Start re-enqueue in-progress jobs process.
// Only run once at the start point.
go func() {
// Wait for a short while and then start
<-time.After(5 * time.Second)
if err := r.reEnqueueInProgressJobs(); err != nil {
logger.Error(err)
}
}()
}
// reEnqueueInProgressJobs is an enhancement for reaper process of upstream project.
// Mainly fix the issue of failing to re-enqueue the jobs in the dead worker pool.
// This process only needs to be executed once when worker pool is starting.
func (r *reaper) reEnqueueInProgressJobs() error {
// Debug
logger.Debugf("Start: Reap in-progress jobs from the dead pools")
defer logger.Debugf("End: Reap in-progress jobs")
currentPools, err := r.getCurrentWorkerPools()
if err != nil {
return errors.Wrap(err, "re-enqueue in progress jobs")
}
h := func(k string, v int64) (err error) {
if v <= 0 {
// Do nothing
return nil
}
// If the worker pool is in the current pool list, ignore it as the default reap
// process will cover the re-enqueuing work.
if currentPools[k] {
// Do nothing
return nil
}
// Re-enqueue jobs
if err := r.requeueInProgressJobs(k, r.jobTypes); err != nil {
return errors.Wrap(err, "in-progress jobs reap handler")
}
return nil
}
for _, jt := range r.jobTypes {
lk := rds.KeyJobLockInfo(r.namespace, jt)
if err := r.scanLocks(lk, h); err != nil {
// Log error and continue
logger.Errorf("Re-enqueue in progress jobs error: %v", err)
continue
}
}
return nil
}
// syncOutdatedStats ensures the job status is correctly updated and
// the related status change hook events are successfully fired.
func (r *reaper) syncOutdatedStats() error {
// Debug
logger.Debugf("Start: reap outdated job stats")
defer logger.Debugf("End: reap outdated job stats")
// Loop all the in progress jobs to check if they're really in progress or
// status is hung.
h := func(k string, v int64) (err error) {
defer func() {
if errs.IsObjectNotFoundError(err) {
// As the job stats is lost and we don't have chance to restore it, then directly discard it.
logger.Errorf("Sync outdated stats error: %s", err.Error())
// Un-track the in-progress record
err = r.unTrackInProgress(k)
}
if err != nil {
err = errors.Wrap(err, "sync outdated stats handler")
}
}()
var t job.Tracker
t, err = r.lcmCtl.Track(k)
if err != nil {
return
}
// Compare and check if the status and the ACKed status are consistent
diff := compare(t.Job().Info)
if diff == 0 {
// Status and ACKed status are consistent
if job.Status(t.Job().Info.Status).Final() {
// Final status
// The inprogress track record is not valid as everything is done and consistent.
// It should not happen. However, if it is really happened, we can fix it here.
if err = r.unTrackInProgress(t.Job().Info.JobID); err != nil {
return
}
} else {
// Ongoing, check the update timestamp to make sure it is not hung
if time.Unix(t.Job().Info.UpdateTime, 0).Add(maxUpdateDuration).Before(time.Now()) {
// Status hung
// Mark job status to error state
if err = t.Fail(); err != nil {
return
}
// Exit
}
// Exit as it is still a valid ongoing job
}
} else if diff > 0 {
// The hook event of current job status is not ACKed
// Resend hook event by set the status again
if err = t.FireHook(); err != nil {
return
}
// Success and exit
} else {
// Current status is outdated, update it with ACKed status.
if err = t.UpdateStatusWithRetry(job.Status(t.Job().Info.HookAck.Status)); err != nil {
return
}
// Success and exit
}
return nil
}
if err := r.scanLocks(rds.KeyJobTrackInProgress(r.namespace), h); err != nil {
return errors.Wrap(err, "reaper")
}
return nil
}
// scanLocks gets the lock info from the specified key by redis scan way
func (r *reaper) scanLocks(key string, handler func(k string, v int64) error) error {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %v", err)
}
}()
var cursor int64
for {
reply, err := redis.Values(conn.Do("HSCAN", key, cursor, "MATCH", "*", "COUNT", 100))
if err != nil {
return errors.Wrap(err, "scan locks")
}
if len(reply) != 2 {
return errors.New("expect 2 elements returned")
}
// Get next cursor
cursor, err = strconv.ParseInt(string(reply[0].([]uint8)), 10, 16)
if err != nil {
return errors.Wrap(err, "scan locks")
}
if values, ok := reply[1].([]interface{}); ok {
for i := 0; i < len(values); i += 2 {
k := string(values[i].([]uint8))
lc, err := strconv.ParseInt(string(values[i+1].([]uint8)), 10, 16)
if err != nil {
// Ignore and continue
logger.Errorf("Malformed lock object for %s: %v", k, err)
continue
}
// Call handler to handle the data
if err := handler(k, lc); err != nil {
// Log and ignore the error
logger.Errorf("Failed to call reap handler: %v", err)
}
}
}
// Check if we have reached the end
if cursor == 0 {
return nil
}
}
}
// unTrackInProgress del the key in the progress track queue
func (r *reaper) unTrackInProgress(jobID string) error {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s: %v", "untrack in progress job", err)
}
}()
_, err := conn.Do("HDEL", rds.KeyJobTrackInProgress(r.namespace), jobID)
if err != nil {
return errors.Wrap(err, "untrack in progress record")
}
return nil
}
// getCurrentWorkerPools returns the IDs of the current worker pools
func (r *reaper) getCurrentWorkerPools() (map[string]bool, error) {
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s : %v", "get current worker pools", err)
}
}()
// Get the current worker pools
workerPoolIDs, err := redis.Strings(conn.Do("SMEMBERS", rds.KeyWorkerPools(r.namespace)))
if err != nil {
return nil, errors.Wrap(err, "get current workpools")
}
m := make(map[string]bool)
for _, id := range workerPoolIDs {
m[id] = true
}
return m, nil
}
func (r *reaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
numKeys := len(jobTypes)
redisRequeueScript := rds.RedisLuaReenqueueScript(numKeys)
var scriptArgs = make([]interface{}, 0, numKeys+1)
for _, jobType := range jobTypes {
// pops from in progress, push into job queue and decrement the queue lock
scriptArgs = append(
scriptArgs,
rds.KeyInProgressQueue(r.namespace, jobType, poolID),
rds.KeyJobs(r.namespace, jobType),
rds.KeyJobLock(r.namespace, jobType),
rds.KeyJobLockInfo(r.namespace, jobType),
) // KEYS[1-4 * N]
}
scriptArgs = append(scriptArgs, poolID) // ARGV[1]
conn := r.pool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("Failed to close redis connection: %s : %s", "re enqueue in-progress jobs", err)
}
}()
// Keep moving jobs until all queues are empty
for {
values, err := redis.Values(redisRequeueScript.Do(conn, scriptArgs...))
if err == redis.ErrNil {
return nil
} else if err != nil {
return err
}
if len(values) != 3 {
return fmt.Errorf("need 3 elements back")
}
}
}
// compare the status and the status in the ack
// 0: status == ack.status
// >0: status > ack.status
// <0: status < ack.status
//
// compare based on:
// revision:status_code:check_in
func compare(j *job.StatsInfo) int {
// No ack existing
if j.HookAck == nil {
return 1
}
// Compare revision
rev := j.Revision - j.HookAck.Revision
if rev != 0 {
return (int)(rev)
}
// Revision is same, then compare the status
st := job.Status(j.Status).Compare(job.Status(j.HookAck.Status))
if st != 0 {
return st
}
// Revision and status are same, then compare the checkin
return (int)(j.CheckInAt - j.HookAck.CheckInAt)
}

View File

@ -0,0 +1,243 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cworker
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/stretchr/testify/mock"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/suite"
)
// ReaperTestSuite is used to test reaper
type ReaperTestSuite struct {
suite.Suite
namespace string
pool *redis.Pool
r *reaper
ctl *mockLcmCtl
jid string
}
// TestReaper is the entry func of the ReaperTestSuite
func TestReaper(t *testing.T) {
suite.Run(t, &ReaperTestSuite{})
}
// SetupSuite prepares the test suite environment
func (suite *ReaperTestSuite) SetupSuite() {
suite.namespace = tests.GiveMeTestNamespace()
suite.pool = tests.GiveMeRedisPool()
ctx := context.TODO()
suite.ctl = &mockLcmCtl{}
suite.r = &reaper{
context: ctx,
namespace: suite.namespace,
pool: suite.pool,
jobTypes: []string{job.SampleJob},
lcmCtl: suite.ctl,
}
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
}()
// Mock data in the redis DB
cwp := utils.MakeIdentifier()
wpk := fmt.Sprintf("%s%s", rds.KeyNamespacePrefix(suite.namespace), "worker_pools")
_ = conn.Send("SADD", wpk, cwp)
_ = conn.Send("HSET", fmt.Sprintf("%s:%s", wpk, cwp), "heartbeat_at", time.Now().Unix())
err := conn.Flush()
suite.NoError(err, "mock current pool error")
// Mock lock info of job DEMO
lk := rds.KeyJobLock(suite.namespace, job.SampleJob)
_, err = conn.Do("INCR", lk)
suite.NoError(err, "set lock data error")
wp := utils.MakeIdentifier()
lik := rds.KeyJobLockInfo(suite.namespace, job.SampleJob)
_, err = conn.Do("HINCRBY", lik, wp, 1)
suite.NoError(err, "set lock_info data error")
// Mock in-progress job
ipk := rds.KeyInProgressQueue(suite.namespace, job.SampleJob, wp)
j, err := mockJobData()
suite.NoError(err, "mock job")
_, err = conn.Do("LPUSH", ipk, j)
suite.NoError(err, "push mock job to queue")
// Mock job stats
suite.jid = utils.MakeIdentifier()
err = mockJobStats(conn, suite.namespace, suite.jid)
suite.NoError(err, "mock job stats")
// Mock in-progress job track
tk := rds.KeyJobTrackInProgress(suite.namespace)
_, err = conn.Do("HSET", tk, suite.jid, 1)
suite.NoError(err, "mock in-progress track")
}
// TearDownSuite clears down the test suite environment
func (suite *ReaperTestSuite) TearDownSuite() {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
}()
_ = tests.ClearAll(suite.namespace, conn)
}
func (suite *ReaperTestSuite) TestRequeueInProgressJobs() {
err := suite.r.reEnqueueInProgressJobs()
suite.NoError(err, "requeue in-progress jobs")
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
}()
v, err := redis.Int(conn.Do("GET", rds.KeyJobLock(suite.namespace, job.SampleJob)))
suite.NoError(err, "get job lock info")
suite.Equal(0, v, "lock should be 0")
}
func (suite *ReaperTestSuite) TestSyncOutdatedStats() {
// Use real track to test
mt := job.NewBasicTrackerWithID(
context.TODO(),
suite.jid,
suite.namespace,
suite.pool,
func(hookURL string, change *job.StatusChange) error {
return nil
},
list.New())
err := mt.Load()
suite.NoError(err, "track job stats")
suite.ctl.On("Track", suite.jid).Return(mt, nil)
err = suite.r.syncOutdatedStats()
suite.NoError(err, "sync outdated stats")
// Check result
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection error")
}()
status, err := redis.String(conn.Do("HGET", rds.KeyJobStats(suite.namespace, suite.jid), "status"))
suite.NoError(err, "get status")
suite.Equal(job.SuccessStatus.String(), status, "check status")
}
func mockJobData() (string, error) {
j := make(map[string]interface{})
j["name"] = job.SampleJob
j["id"] = utils.MakeIdentifier()
j["t"] = time.Now().Unix()
args := make(map[string]interface{})
j["args"] = args
args["image"] = "test suite"
b, err := json.Marshal(&j)
if err != nil {
return "", nil
}
return string(b), nil
}
func mockJobStats(conn redis.Conn, ns string, jid string) error {
rev := time.Now().Unix()
sk := rds.KeyJobStats(ns, jid)
ack := &job.ACK{
Revision: rev,
Status: job.SuccessStatus.String(),
}
b, err := json.Marshal(ack)
if err != nil {
return err
}
args := []interface{}{
sk,
"id", jid,
"status", job.RunningStatus.String(),
"name", job.SampleJob,
"kind", job.KindGeneric,
"unique", 0,
"ref_link", fmt.Sprintf("/api/v1/jobs/%s", jid),
"enqueue_time", time.Now().Unix(),
"update_time", time.Now().Unix(),
"revision", rev,
"ack", string(b),
}
_, err = conn.Do("HMSET", args...)
return err
}
type mockLcmCtl struct {
mock.Mock
}
func (m *mockLcmCtl) Serve() error {
return nil
}
// New tracker from the new provided stats
func (m *mockLcmCtl) New(stats *job.Stats) (job.Tracker, error) {
args := m.Called(stats)
if args.Get(0) != nil {
return args.Get(0).(job.Tracker), nil
}
return nil, args.Error(1)
}
// Track the life cycle of the specified existing job
func (m *mockLcmCtl) Track(jobID string) (job.Tracker, error) {
args := m.Called(jobID)
if args.Get(0) != nil {
return args.Get(0).(job.Tracker), nil
}
return nil, args.Error(1)
}