diff --git a/src/jobservice/common/rds/keys.go b/src/jobservice/common/rds/keys.go index dbb73c547..530a6a34b 100644 --- a/src/jobservice/common/rds/keys.go +++ b/src/jobservice/common/rds/keys.go @@ -81,10 +81,10 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string { // KeyHookEventRetryQueue returns the key of hook event retrying queue func KeyHookEventRetryQueue(namespace string) string { - return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "hook_events") + return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "hook_events") } // KeyStatusUpdateRetryQueue returns the key of status change retrying queue func KeyStatusUpdateRetryQueue(namespace string) string { - return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "status_change_events") + return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events") } diff --git a/src/jobservice/common/rds/utils.go b/src/jobservice/common/rds/utils.go index 7157986e8..5e93eed44 100644 --- a/src/jobservice/common/rds/utils.go +++ b/src/jobservice/common/rds/utils.go @@ -8,6 +8,10 @@ import ( "time" ) +// NoElementsError is a pre defined error to describe the case that no elements got +// from the backend database. +var NoElementsError = errors.New("no elements got from the backend") + // HmSet sets the properties of hash map func HmSet(conn redis.Conn, key string, fieldAndValues ...interface{}) error { if conn == nil { @@ -141,7 +145,7 @@ func ZPopMin(conn redis.Conn, key string) (interface{}, error) { if zrangeReply != nil { if elements, ok := zrangeReply.([]interface{}); ok { if len(elements) == 0 { - return nil, redis.ErrNil + return nil, NoElementsError } else { return elements[0], nil } diff --git a/src/jobservice/hook/hook_agent.go b/src/jobservice/hook/hook_agent.go index 5fb591773..11a4c6cac 100644 --- a/src/jobservice/hook/hook_agent.go +++ b/src/jobservice/hook/hook_agent.go @@ -17,7 +17,7 @@ package hook import ( "context" "encoding/json" - "errors" + "github.com/pkg/errors" "math/rand" "net/url" "time" @@ -26,9 +26,9 @@ import ( "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/env" + "github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" - "math" "sync" ) @@ -40,10 +40,10 @@ const ( // The max time for expiring the retrying events // 180 days maxEventExpireTime = 3600 * 24 * 180 - // Interval for retrying loop - retryInterval = 2 * time.Minute - // Number for splitting the event list to sub set for popping out - defaultShardNum = 3 + // Waiting a short while if any errors occurred + shortLoopInterval = 5 * time.Second + // Waiting for long while if no retrying elements found + longLoopInterval = 5 * time.Minute ) // Agent is designed to handle the hook events with reasonable numbers of concurrent threads @@ -51,7 +51,9 @@ type Agent interface { // Trigger hooks Trigger(evt *Event) error // Serves events now - Serve() + Serve() error + // Attach a job life cycle controller + Attach(ctl lcm.Controller) } // Event contains the hook URL and the data @@ -91,6 +93,7 @@ type basicAgent struct { context context.Context namespace string client Client + ctl lcm.Controller events chan *Event tokens chan bool redisPool *redis.Pool @@ -115,6 +118,11 @@ func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent { } } +// Attach a job life cycle controller +func (ba *basicAgent) Attach(ctl lcm.Controller) { + ba.ctl = ctl +} + // Trigger implements the same method of interface @Agent func (ba *basicAgent) Trigger(evt *Event) error { if evt == nil { @@ -133,12 +141,17 @@ func (ba *basicAgent) Trigger(evt *Event) error { // Start the basic agent // Termination depends on the system context // Blocking call -func (ba *basicAgent) Serve() { - go ba.looplyRetry() +func (ba *basicAgent) Serve() error { + if ba.ctl == nil { + return errors.New("nil life cycle controller of hook agent") + } + + go ba.loopRetry() logger.Info("Hook event retrying loop is started") go ba.serve() logger.Info("Basic hook agent is started") + return nil } func (ba *basicAgent) serve() { @@ -167,15 +180,14 @@ func (ba *basicAgent) serve() { // to the event channel of this node with reasonable backoff time. logger.Errorf("Failed to push hook event to the retry queue: %s", err) - // Put to the event chan now - // In a separate goroutine to avoid occupying the token long time - go func() { - // As 'pushForRetry' has checked the timestamp and expired event - // will be directly discarded and nil error is returned, no need to - // check it again here. - <-time.After(time.Duration((rand.Int31n(60) + 5)) * time.Second) - ba.events <- evt - }() + // Put to the event chan after waiting for a reasonable while, + // waiting is important, it can avoid sending large scale failure expecting + // requests in a short while. + // As 'pushForRetry' has checked the timestamp and expired event + // will be directly discarded and nil error is returned, no need to + // check it again here. + <-time.After(time.Duration(rand.Int31n(55)+5) * time.Second) + ba.events <- evt } } }(evt) @@ -201,7 +213,7 @@ func (ba *basicAgent) pushForRetry(evt *Event) error { now := time.Now().Unix() if evt.Timestamp > 0 && now-evt.Timestamp >= maxEventExpireTime { // Expired, do not need to push back to the retry queue - logger.Warningf("Event is expired: %s\n", rawJSON) + logger.Warningf("Event is expired: %s", rawJSON) return nil } @@ -224,7 +236,7 @@ func (ba *basicAgent) pushForRetry(evt *Event) error { return nil } -func (ba *basicAgent) looplyRetry() { +func (ba *basicAgent) loopRetry() { defer func() { logger.Info("Hook event retrying loop exit") ba.wg.Done() @@ -232,93 +244,81 @@ func (ba *basicAgent) looplyRetry() { ba.wg.Add(1) - // Append random seconds to avoid working in the same time slot - tk := time.NewTicker(retryInterval + time.Duration(rand.Int31n(13)+3)*time.Second) - defer tk.Stop() + token := make(chan bool, 1) + token <- true for { - select { - case <-tk.C: - if err := ba.popMinOnes(); err != nil { - logger.Errorf("Retrying to send hook events failed with error: %s", err.Error()) + <-token + if err := ba.reSend(); err != nil { + waitInterval := shortLoopInterval + if err == rds.NoElementsError { + // No elements + waitInterval = longLoopInterval + } else { + logger.Errorf("Resend hook event error: %s", err.Error()) + } + + select { + case <-time.After(waitInterval): + // Just wait, do nothing + case <-ba.context.Done(): + /// Terminated + return } - case <-ba.context.Done(): - return } + + // Put token back + token <- true } } -func (ba *basicAgent) popMinOnes() error { +func (ba *basicAgent) reSend() error { + evt, err := ba.popMinOne() + if err != nil { + return err + } + + jobID, status, err := extractJobID(evt.Data) + if err != nil { + return err + } + + t, err := ba.ctl.Track(jobID) + if err != nil { + return err + } + + diff := status.Compare(job.Status(t.Job().Info.Status)) + if diff > 0 || + (diff == 0 && t.Job().Info.CheckIn != evt.Data.CheckIn) { + ba.events <- evt + return nil + } + + return errors.Errorf("outdated hook event: %s, latest job status: %s", evt.Message, t.Job().Info.Status) +} + +func (ba *basicAgent) popMinOne() (*Event, error) { conn := ba.redisPool.Get() defer conn.Close() key := rds.KeyHookEventRetryQueue(ba.namespace) - // Get total events - total, err := redis.Int(conn.Do("ZCARD", key)) + minOne, err := rds.ZPopMin(conn, key) if err != nil { - return err + return nil, err } - // Get sharding ones - poppedNum := math.Ceil(float64(total) / float64(defaultShardNum)) - rawContent, err := redis.Values(conn.Do("ZPOPMIN", key, poppedNum)) - if err != nil { - return err + rawEvent, ok := minOne.([]byte) + if !ok { + return nil, errors.New("bad request: non bytes slice for raw event") } - for i, l := 0, len(rawContent); i < l; i = i + 2 { - rawEvent := rawContent[i].([]byte) - evt := &Event{} - - if err := evt.Deserialize(rawEvent); err != nil { - // Partially failed - logger.Warningf("Invalid event data when retrying to send hook event: %s", err.Error()) - continue - } - - // Compare with current job status if it is still valid hook events - // If it is already out of date, then directly discard it - // If it is still valid, then retry to send it - // Get the current status of job - jobID, status, err := extractJobID(evt.Data) - if err != nil { - logger.Warning(err.Error()) - continue - } - - latestStatus, err := ba.getJobStatus(jobID) - if err != nil { - logger.Warning(err.Error()) - continue - } - - if status.Compare(latestStatus) < 0 { - // Already out of date - logger.Debugf("Abandon out dated status update retrying action: %s", evt.Message) - continue - } - - // Put to the event chan for sending with a separate goroutine to avoid long time - // waiting - go func(evt *Event) { - ba.events <- evt - }(evt) + evt := &Event{} + if err := evt.Deserialize(rawEvent); err != nil { + return nil, err } - return nil -} - -func (ba *basicAgent) getJobStatus(jobID string) (job.Status, error) { - conn := ba.redisPool.Get() - defer conn.Close() - - key := rds.KeyJobStats(ba.namespace, jobID) - status, err := redis.String(conn.Do("HGET", key, "status")) - if err != nil { - return job.PendingStatus, err - } - - return job.Status(status), nil + return evt, nil } // Extract the job ID and status from the event data field @@ -333,5 +333,5 @@ func extractJobID(data *job.StatusChange) (string, job.Status, error) { } } - return "", "", errors.New("invalid job status change data to extract job ID") + return "", "", errors.New("malform job status change data") } diff --git a/src/jobservice/job/impl/sample/job.go b/src/jobservice/job/impl/sample/job.go index 1eecdc10d..81101ecce 100644 --- a/src/jobservice/job/impl/sample/job.go +++ b/src/jobservice/job/impl/sample/job.go @@ -68,10 +68,12 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { } ctx.Checkin("progress data: %30") + <-time.After(1 * time.Second) + ctx.Checkin("progress data: %60") // HOLD ON FOR A WHILE - logger.Warning("Holding for 30 seconds") - <-time.After(30 * time.Second) + logger.Warning("Holding for 10 seconds") + <-time.After(10 * time.Second) if cmd, ok := ctx.OPCommand(); ok { if cmd == job.StopCommand { diff --git a/src/jobservice/lcm/controller.go b/src/jobservice/lcm/controller.go index f5db8f845..95ede22b3 100644 --- a/src/jobservice/lcm/controller.go +++ b/src/jobservice/lcm/controller.go @@ -28,8 +28,10 @@ import ( ) const ( + // Waiting a short while if any errors occurred shortLoopInterval = 5 * time.Second - longLoopInterval = 5 * time.Minute + // Waiting for long while if no retrying elements found + longLoopInterval = 5 * time.Minute ) // Controller is designed to control the life cycle of the job @@ -115,14 +117,17 @@ func (bc *basicController) loopForRestoreDeadStatus() { <-token if err := bc.restoreDeadStatus(); err != nil { - wait := shortLoopInterval - if err == redis.ErrNil { + waitInterval := shortLoopInterval + if err == rds.NoElementsError { // No elements - wait = longLoopInterval + waitInterval = longLoopInterval + } else { + logger.Errorf("restore dead status error: %s, put it back to the retrying Q later again", err) } + // wait for a while or be terminated select { - case <-time.After(wait): + case <-time.After(waitInterval): case <-bc.context.Done(): return } diff --git a/src/jobservice/main.go b/src/jobservice/main.go index d8a462ffc..30cf82dc0 100644 --- a/src/jobservice/main.go +++ b/src/jobservice/main.go @@ -71,5 +71,7 @@ func main() { })*/ // Start - runtime.JobService.LoadAndRun(ctx, cancel) + if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil { + logger.Fatal(err) + } } diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 856143e0c..a12d5d1a9 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -39,6 +39,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/worker" "github.com/goharbor/harbor/src/jobservice/worker/cworker" "github.com/gomodule/redigo/redis" + "github.com/pkg/errors" ) const ( @@ -65,7 +66,7 @@ func (bs *Bootstrap) SetJobContextInitializer(initializer job.ContextInitializer // LoadAndRun will load configurations, initialize components and then start the related process to serve requests. // Return error if meet any problems. -func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) { +func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) (err error) { rootContext := &env.Context{ SystemContext: ctx, WG: &sync.WaitGroup{}, @@ -74,10 +75,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) // Build specified job context if bs.jobConextInitializer != nil { - if jobCtx, err := bs.jobConextInitializer(ctx); err == nil { - rootContext.JobContext = jobCtx - } else { - logger.Fatalf("Failed to initialize job context: %s\n", err) + rootContext.JobContext, err = bs.jobConextInitializer(ctx) + if err != nil { + return errors.Errorf("initialize job context error: %s", err) } } @@ -87,7 +87,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) var ( backendWorker worker.Interface lcmCtl lcm.Controller - wErr error ) if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { // Number of workers @@ -118,20 +117,31 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback) // Start the backend worker - backendWorker, wErr = bs.loadAndRunRedisWorkerPool(rootContext, namespace, workerNum, redisPool, lcmCtl) - if wErr != nil { - logger.Fatalf("Failed to load and run worker: %s\n", wErr.Error()) + backendWorker, err = bs.loadAndRunRedisWorkerPool( + rootContext, + namespace, + workerNum, + redisPool, + lcmCtl, + ) + if err != nil { + return errors.Errorf("load and run worker error: %s", err) } // Run daemon process of life cycle controller // Ignore returned error - lcmCtl.Serve() + if err = lcmCtl.Serve(); err != nil { + return errors.Errorf("start life cycle controller error: %s", err) + } // Start agent // Non blocking call - hookAgent.Serve() + hookAgent.Attach(lcmCtl) + if err = hookAgent.Serve(); err != nil { + return errors.Errorf("start hook agent error: %s", err) + } } else { - logger.Fatalf("Worker worker backend '%s' is not supported", cfg.PoolConfig.Backend) + return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend) } // Initialize controller @@ -146,8 +156,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) go func(errChan chan error) { defer func() { // Gracefully shutdown - if err := apiServer.Stop(); err != nil { - logger.Error(err) + // Error happened here should not override the outside error + if er := apiServer.Stop(); er != nil { + logger.Error(er) } // Notify others who're listening to the system context cancel() @@ -157,8 +168,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) case <-sig: terminated = true return - case err := <-errChan: - logger.Errorf("error received from error chan: %s", err) + case err = <-errChan: return } }(rootContext.ErrorChan) @@ -166,10 +176,10 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) node := ctx.Value(utils.NodeID) // Blocking here logger.Infof("API server is serving at %d with [%s] mode at node [%s]", cfg.Port, cfg.Protocol, node) - if err := apiServer.Start(); err != nil { + if er := apiServer.Start(); er != nil { if !terminated { // Tell the listening goroutine - rootContext.ErrorChan <- err + rootContext.ErrorChan <- er } } else { // In case @@ -178,6 +188,8 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) // Wait everyone exit rootContext.WG.Wait() + + return } // Load and run the API server.