refine the hook event and status update retry approach

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2019-04-20 15:00:32 +08:00
parent c6a509b3dd
commit 6f8a80d21c
7 changed files with 144 additions and 119 deletions

View File

@ -81,10 +81,10 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string {
// KeyHookEventRetryQueue returns the key of hook event retrying queue // KeyHookEventRetryQueue returns the key of hook event retrying queue
func KeyHookEventRetryQueue(namespace string) string { func KeyHookEventRetryQueue(namespace string) string {
return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "hook_events") return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "hook_events")
} }
// KeyStatusUpdateRetryQueue returns the key of status change retrying queue // KeyStatusUpdateRetryQueue returns the key of status change retrying queue
func KeyStatusUpdateRetryQueue(namespace string) string { func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "status_change_events") return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")
} }

View File

@ -8,6 +8,10 @@ import (
"time" "time"
) )
// NoElementsError is a pre defined error to describe the case that no elements got
// from the backend database.
var NoElementsError = errors.New("no elements got from the backend")
// HmSet sets the properties of hash map // HmSet sets the properties of hash map
func HmSet(conn redis.Conn, key string, fieldAndValues ...interface{}) error { func HmSet(conn redis.Conn, key string, fieldAndValues ...interface{}) error {
if conn == nil { if conn == nil {
@ -141,7 +145,7 @@ func ZPopMin(conn redis.Conn, key string) (interface{}, error) {
if zrangeReply != nil { if zrangeReply != nil {
if elements, ok := zrangeReply.([]interface{}); ok { if elements, ok := zrangeReply.([]interface{}); ok {
if len(elements) == 0 { if len(elements) == 0 {
return nil, redis.ErrNil return nil, NoElementsError
} else { } else {
return elements[0], nil return elements[0], nil
} }

View File

@ -17,7 +17,7 @@ package hook
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "github.com/pkg/errors"
"math/rand" "math/rand"
"net/url" "net/url"
"time" "time"
@ -26,9 +26,9 @@ import (
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"math"
"sync" "sync"
) )
@ -40,10 +40,10 @@ const (
// The max time for expiring the retrying events // The max time for expiring the retrying events
// 180 days // 180 days
maxEventExpireTime = 3600 * 24 * 180 maxEventExpireTime = 3600 * 24 * 180
// Interval for retrying loop // Waiting a short while if any errors occurred
retryInterval = 2 * time.Minute shortLoopInterval = 5 * time.Second
// Number for splitting the event list to sub set for popping out // Waiting for long while if no retrying elements found
defaultShardNum = 3 longLoopInterval = 5 * time.Minute
) )
// Agent is designed to handle the hook events with reasonable numbers of concurrent threads // Agent is designed to handle the hook events with reasonable numbers of concurrent threads
@ -51,7 +51,9 @@ type Agent interface {
// Trigger hooks // Trigger hooks
Trigger(evt *Event) error Trigger(evt *Event) error
// Serves events now // Serves events now
Serve() Serve() error
// Attach a job life cycle controller
Attach(ctl lcm.Controller)
} }
// Event contains the hook URL and the data // Event contains the hook URL and the data
@ -91,6 +93,7 @@ type basicAgent struct {
context context.Context context context.Context
namespace string namespace string
client Client client Client
ctl lcm.Controller
events chan *Event events chan *Event
tokens chan bool tokens chan bool
redisPool *redis.Pool redisPool *redis.Pool
@ -115,6 +118,11 @@ func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent {
} }
} }
// Attach a job life cycle controller
func (ba *basicAgent) Attach(ctl lcm.Controller) {
ba.ctl = ctl
}
// Trigger implements the same method of interface @Agent // Trigger implements the same method of interface @Agent
func (ba *basicAgent) Trigger(evt *Event) error { func (ba *basicAgent) Trigger(evt *Event) error {
if evt == nil { if evt == nil {
@ -133,12 +141,17 @@ func (ba *basicAgent) Trigger(evt *Event) error {
// Start the basic agent // Start the basic agent
// Termination depends on the system context // Termination depends on the system context
// Blocking call // Blocking call
func (ba *basicAgent) Serve() { func (ba *basicAgent) Serve() error {
go ba.looplyRetry() if ba.ctl == nil {
return errors.New("nil life cycle controller of hook agent")
}
go ba.loopRetry()
logger.Info("Hook event retrying loop is started") logger.Info("Hook event retrying loop is started")
go ba.serve() go ba.serve()
logger.Info("Basic hook agent is started") logger.Info("Basic hook agent is started")
return nil
} }
func (ba *basicAgent) serve() { func (ba *basicAgent) serve() {
@ -167,15 +180,14 @@ func (ba *basicAgent) serve() {
// to the event channel of this node with reasonable backoff time. // to the event channel of this node with reasonable backoff time.
logger.Errorf("Failed to push hook event to the retry queue: %s", err) logger.Errorf("Failed to push hook event to the retry queue: %s", err)
// Put to the event chan now // Put to the event chan after waiting for a reasonable while,
// In a separate goroutine to avoid occupying the token long time // waiting is important, it can avoid sending large scale failure expecting
go func() { // requests in a short while.
// As 'pushForRetry' has checked the timestamp and expired event // As 'pushForRetry' has checked the timestamp and expired event
// will be directly discarded and nil error is returned, no need to // will be directly discarded and nil error is returned, no need to
// check it again here. // check it again here.
<-time.After(time.Duration((rand.Int31n(60) + 5)) * time.Second) <-time.After(time.Duration(rand.Int31n(55)+5) * time.Second)
ba.events <- evt ba.events <- evt
}()
} }
} }
}(evt) }(evt)
@ -201,7 +213,7 @@ func (ba *basicAgent) pushForRetry(evt *Event) error {
now := time.Now().Unix() now := time.Now().Unix()
if evt.Timestamp > 0 && now-evt.Timestamp >= maxEventExpireTime { if evt.Timestamp > 0 && now-evt.Timestamp >= maxEventExpireTime {
// Expired, do not need to push back to the retry queue // Expired, do not need to push back to the retry queue
logger.Warningf("Event is expired: %s\n", rawJSON) logger.Warningf("Event is expired: %s", rawJSON)
return nil return nil
} }
@ -224,7 +236,7 @@ func (ba *basicAgent) pushForRetry(evt *Event) error {
return nil return nil
} }
func (ba *basicAgent) looplyRetry() { func (ba *basicAgent) loopRetry() {
defer func() { defer func() {
logger.Info("Hook event retrying loop exit") logger.Info("Hook event retrying loop exit")
ba.wg.Done() ba.wg.Done()
@ -232,93 +244,81 @@ func (ba *basicAgent) looplyRetry() {
ba.wg.Add(1) ba.wg.Add(1)
// Append random seconds to avoid working in the same time slot token := make(chan bool, 1)
tk := time.NewTicker(retryInterval + time.Duration(rand.Int31n(13)+3)*time.Second) token <- true
defer tk.Stop()
for { for {
select { <-token
case <-tk.C: if err := ba.reSend(); err != nil {
if err := ba.popMinOnes(); err != nil { waitInterval := shortLoopInterval
logger.Errorf("Retrying to send hook events failed with error: %s", err.Error()) if err == rds.NoElementsError {
// No elements
waitInterval = longLoopInterval
} else {
logger.Errorf("Resend hook event error: %s", err.Error())
} }
select {
case <-time.After(waitInterval):
// Just wait, do nothing
case <-ba.context.Done(): case <-ba.context.Done():
/// Terminated
return return
} }
} }
// Put token back
token <- true
}
} }
func (ba *basicAgent) popMinOnes() error { func (ba *basicAgent) reSend() error {
evt, err := ba.popMinOne()
if err != nil {
return err
}
jobID, status, err := extractJobID(evt.Data)
if err != nil {
return err
}
t, err := ba.ctl.Track(jobID)
if err != nil {
return err
}
diff := status.Compare(job.Status(t.Job().Info.Status))
if diff > 0 ||
(diff == 0 && t.Job().Info.CheckIn != evt.Data.CheckIn) {
ba.events <- evt
return nil
}
return errors.Errorf("outdated hook event: %s, latest job status: %s", evt.Message, t.Job().Info.Status)
}
func (ba *basicAgent) popMinOne() (*Event, error) {
conn := ba.redisPool.Get() conn := ba.redisPool.Get()
defer conn.Close() defer conn.Close()
key := rds.KeyHookEventRetryQueue(ba.namespace) key := rds.KeyHookEventRetryQueue(ba.namespace)
// Get total events minOne, err := rds.ZPopMin(conn, key)
total, err := redis.Int(conn.Do("ZCARD", key))
if err != nil { if err != nil {
return err return nil, err
} }
// Get sharding ones rawEvent, ok := minOne.([]byte)
poppedNum := math.Ceil(float64(total) / float64(defaultShardNum)) if !ok {
rawContent, err := redis.Values(conn.Do("ZPOPMIN", key, poppedNum)) return nil, errors.New("bad request: non bytes slice for raw event")
if err != nil {
return err
} }
for i, l := 0, len(rawContent); i < l; i = i + 2 {
rawEvent := rawContent[i].([]byte)
evt := &Event{} evt := &Event{}
if err := evt.Deserialize(rawEvent); err != nil { if err := evt.Deserialize(rawEvent); err != nil {
// Partially failed return nil, err
logger.Warningf("Invalid event data when retrying to send hook event: %s", err.Error())
continue
} }
// Compare with current job status if it is still valid hook events return evt, nil
// If it is already out of date, then directly discard it
// If it is still valid, then retry to send it
// Get the current status of job
jobID, status, err := extractJobID(evt.Data)
if err != nil {
logger.Warning(err.Error())
continue
}
latestStatus, err := ba.getJobStatus(jobID)
if err != nil {
logger.Warning(err.Error())
continue
}
if status.Compare(latestStatus) < 0 {
// Already out of date
logger.Debugf("Abandon out dated status update retrying action: %s", evt.Message)
continue
}
// Put to the event chan for sending with a separate goroutine to avoid long time
// waiting
go func(evt *Event) {
ba.events <- evt
}(evt)
}
return nil
}
func (ba *basicAgent) getJobStatus(jobID string) (job.Status, error) {
conn := ba.redisPool.Get()
defer conn.Close()
key := rds.KeyJobStats(ba.namespace, jobID)
status, err := redis.String(conn.Do("HGET", key, "status"))
if err != nil {
return job.PendingStatus, err
}
return job.Status(status), nil
} }
// Extract the job ID and status from the event data field // Extract the job ID and status from the event data field
@ -333,5 +333,5 @@ func extractJobID(data *job.StatusChange) (string, job.Status, error) {
} }
} }
return "", "", errors.New("invalid job status change data to extract job ID") return "", "", errors.New("malform job status change data")
} }

View File

@ -68,10 +68,12 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
} }
ctx.Checkin("progress data: %30") ctx.Checkin("progress data: %30")
<-time.After(1 * time.Second)
ctx.Checkin("progress data: %60")
// HOLD ON FOR A WHILE // HOLD ON FOR A WHILE
logger.Warning("Holding for 30 seconds") logger.Warning("Holding for 10 seconds")
<-time.After(30 * time.Second) <-time.After(10 * time.Second)
if cmd, ok := ctx.OPCommand(); ok { if cmd, ok := ctx.OPCommand(); ok {
if cmd == job.StopCommand { if cmd == job.StopCommand {

View File

@ -28,7 +28,9 @@ import (
) )
const ( const (
// Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute longLoopInterval = 5 * time.Minute
) )
@ -115,14 +117,17 @@ func (bc *basicController) loopForRestoreDeadStatus() {
<-token <-token
if err := bc.restoreDeadStatus(); err != nil { if err := bc.restoreDeadStatus(); err != nil {
wait := shortLoopInterval waitInterval := shortLoopInterval
if err == redis.ErrNil { if err == rds.NoElementsError {
// No elements // No elements
wait = longLoopInterval waitInterval = longLoopInterval
} else {
logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err)
} }
// wait for a while or be terminated // wait for a while or be terminated
select { select {
case <-time.After(wait): case <-time.After(waitInterval):
case <-bc.context.Done(): case <-bc.context.Done():
return return
} }

View File

@ -71,5 +71,7 @@ func main() {
})*/ })*/
// Start // Start
runtime.JobService.LoadAndRun(ctx, cancel) if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil {
logger.Fatal(err)
}
} }

View File

@ -39,6 +39,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/worker" "github.com/goharbor/harbor/src/jobservice/worker"
"github.com/goharbor/harbor/src/jobservice/worker/cworker" "github.com/goharbor/harbor/src/jobservice/worker/cworker"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
) )
const ( const (
@ -65,7 +66,7 @@ func (bs *Bootstrap) SetJobContextInitializer(initializer job.ContextInitializer
// LoadAndRun will load configurations, initialize components and then start the related process to serve requests. // LoadAndRun will load configurations, initialize components and then start the related process to serve requests.
// Return error if meet any problems. // Return error if meet any problems.
func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) { func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) (err error) {
rootContext := &env.Context{ rootContext := &env.Context{
SystemContext: ctx, SystemContext: ctx,
WG: &sync.WaitGroup{}, WG: &sync.WaitGroup{},
@ -74,10 +75,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Build specified job context // Build specified job context
if bs.jobConextInitializer != nil { if bs.jobConextInitializer != nil {
if jobCtx, err := bs.jobConextInitializer(ctx); err == nil { rootContext.JobContext, err = bs.jobConextInitializer(ctx)
rootContext.JobContext = jobCtx if err != nil {
} else { return errors.Errorf("initialize job context error: %s", err)
logger.Fatalf("Failed to initialize job context: %s\n", err)
} }
} }
@ -87,7 +87,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
var ( var (
backendWorker worker.Interface backendWorker worker.Interface
lcmCtl lcm.Controller lcmCtl lcm.Controller
wErr error
) )
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
// Number of workers // Number of workers
@ -118,20 +117,31 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback) lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback)
// Start the backend worker // Start the backend worker
backendWorker, wErr = bs.loadAndRunRedisWorkerPool(rootContext, namespace, workerNum, redisPool, lcmCtl) backendWorker, err = bs.loadAndRunRedisWorkerPool(
if wErr != nil { rootContext,
logger.Fatalf("Failed to load and run worker: %s\n", wErr.Error()) namespace,
workerNum,
redisPool,
lcmCtl,
)
if err != nil {
return errors.Errorf("load and run worker error: %s", err)
} }
// Run daemon process of life cycle controller // Run daemon process of life cycle controller
// Ignore returned error // Ignore returned error
lcmCtl.Serve() if err = lcmCtl.Serve(); err != nil {
return errors.Errorf("start life cycle controller error: %s", err)
}
// Start agent // Start agent
// Non blocking call // Non blocking call
hookAgent.Serve() hookAgent.Attach(lcmCtl)
if err = hookAgent.Serve(); err != nil {
return errors.Errorf("start hook agent error: %s", err)
}
} else { } else {
logger.Fatalf("Worker worker backend '%s' is not supported", cfg.PoolConfig.Backend) return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend)
} }
// Initialize controller // Initialize controller
@ -146,8 +156,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
go func(errChan chan error) { go func(errChan chan error) {
defer func() { defer func() {
// Gracefully shutdown // Gracefully shutdown
if err := apiServer.Stop(); err != nil { // Error happened here should not override the outside error
logger.Error(err) if er := apiServer.Stop(); er != nil {
logger.Error(er)
} }
// Notify others who're listening to the system context // Notify others who're listening to the system context
cancel() cancel()
@ -157,8 +168,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
case <-sig: case <-sig:
terminated = true terminated = true
return return
case err := <-errChan: case err = <-errChan:
logger.Errorf("error received from error chan: %s", err)
return return
} }
}(rootContext.ErrorChan) }(rootContext.ErrorChan)
@ -166,10 +176,10 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
node := ctx.Value(utils.NodeID) node := ctx.Value(utils.NodeID)
// Blocking here // Blocking here
logger.Infof("API server is serving at %d with [%s] mode at node [%s]", cfg.Port, cfg.Protocol, node) logger.Infof("API server is serving at %d with [%s] mode at node [%s]", cfg.Port, cfg.Protocol, node)
if err := apiServer.Start(); err != nil { if er := apiServer.Start(); er != nil {
if !terminated { if !terminated {
// Tell the listening goroutine // Tell the listening goroutine
rootContext.ErrorChan <- err rootContext.ErrorChan <- er
} }
} else { } else {
// In case // In case
@ -178,6 +188,8 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Wait everyone exit // Wait everyone exit
rootContext.WG.Wait() rootContext.WG.Wait()
return
} }
// Load and run the API server. // Load and run the API server.