From e1b509e3f330c25c0b346a66b17b8cb7ea79d262 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Tue, 24 Apr 2018 17:17:05 +0800 Subject: [PATCH] Fix the issue of intermittent restarting of job service github issue: #4712 ping redis server before pool starting Let worker pool to restart the message server if message server exits with error (controlled by max retries) --- src/jobservice/core/controller_test.go | 4 ++- src/jobservice/pool/interface.go | 5 ++- src/jobservice/pool/message_server.go | 17 ++++++----- src/jobservice/pool/redis_pool.go | 42 ++++++++++++++++++++++++-- src/jobservice/runtime/bootstrap.go | 27 +++++++++++------ 5 files changed, 73 insertions(+), 22 deletions(-) diff --git a/src/jobservice/core/controller_test.go b/src/jobservice/core/controller_test.go index 9f522e6be8..20157870bd 100644 --- a/src/jobservice/core/controller_test.go +++ b/src/jobservice/core/controller_test.go @@ -215,7 +215,9 @@ func createJobReq(kind string, isUnique bool, withHook bool) models.JobRequest { type fakePool struct{} -func (f *fakePool) Start() {} +func (f *fakePool) Start() error { + return nil +} func (f *fakePool) RegisterJob(name string, job interface{}) error { return nil diff --git a/src/jobservice/pool/interface.go b/src/jobservice/pool/interface.go index 3fb4c1ec97..b1b5061d1f 100644 --- a/src/jobservice/pool/interface.go +++ b/src/jobservice/pool/interface.go @@ -8,7 +8,10 @@ import "github.com/vmware/harbor/src/jobservice/models" //More like a driver to transparent the lower queue. type Interface interface { //Start to serve - Start() + // + //Return: + // error if failed to start + Start() error //Register job to the pool. // diff --git a/src/jobservice/pool/message_server.go b/src/jobservice/pool/message_server.go index c5becb817b..a23ccefe8a 100644 --- a/src/jobservice/pool/message_server.go +++ b/src/jobservice/pool/message_server.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "reflect" "time" @@ -18,6 +19,10 @@ import ( "github.com/vmware/harbor/src/jobservice/utils" ) +const ( + msgServerRetryTimes = 5 +) + //MessageServer implements the sub/pub mechanism via redis to do async message exchanging. type MessageServer struct { context context.Context @@ -42,14 +47,13 @@ func (ms *MessageServer) Start() error { logger.Info("Message server is stopped") }() - //As we get one connection from the pool, don't try to close it. - conn := ms.redisPool.Get() - defer conn.Close() - + conn := ms.redisPool.Get() //Get one backend connection! psc := redis.PubSubConn{ Conn: conn, } + defer psc.Close() + //Subscribe channel err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(ms.namespace))...) if err != nil { return err @@ -60,8 +64,7 @@ func (ms *MessageServer) Start() error { for { switch res := psc.Receive().(type) { case error: - done <- res - return + done <- fmt.Errorf("error occurred when receiving from pub/sub channel of message server: %s", res.(error).Error()) case redis.Message: m := &models.Message{} if err := json.Unmarshal(res.Data, m); err != nil { @@ -131,12 +134,12 @@ func (ms *MessageServer) Start() error { case <-ms.context.Done(): err = errors.New("context exit") case err = <-done: - return err } } //Unsubscribe all psc.Unsubscribe() + return <-done } diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index 384b96d3e5..dcf765b144 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -5,6 +5,7 @@ package pool import ( "errors" "fmt" + "math" "time" "github.com/garyburd/redigo/redis" @@ -29,6 +30,8 @@ const ( //Copy from period.enqueuer periodicEnqueuerHorizon = 4 * time.Minute + + pingRedisMaxTimes = 10 ) //GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis. @@ -80,13 +83,17 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re //Start to serve //Unblock action -func (gcwp *GoCraftWorkPool) Start() { +func (gcwp *GoCraftWorkPool) Start() error { if gcwp.redisPool == nil || gcwp.pool == nil || gcwp.context.SystemContext == nil { //report and exit - gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured") - return + return errors.New("Redis worker pool can not start as it's not correctly configured") + } + + //Test the redis connection + if err := gcwp.ping(); err != nil { + return err } done := make(chan interface{}, 1) @@ -130,8 +137,18 @@ func (gcwp *GoCraftWorkPool) Start() { return } + startTimes := 0 + START_MSG_SERVER: //Start message server if err = gcwp.messageServer.Start(); err != nil { + logger.Errorf("Message server exits with error: %s\n", err.Error()) + if startTimes < msgServerRetryTimes { + startTimes++ + time.Sleep(time.Duration((int)(math.Pow(2, (float64)(startTimes)))+5) * time.Second) + logger.Infof("Restart message server (%d times)\n", startTimes) + goto START_MSG_SERVER + } + return } }() @@ -177,6 +194,8 @@ func (gcwp *GoCraftWorkPool) Start() { gcwp.pool.Stop() }() + + return nil } //RegisterJob is used to register the job to the pool. @@ -593,6 +612,23 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) return next() } +//Ping the redis server +func (gcwp *GoCraftWorkPool) ping() error { + conn := gcwp.redisPool.Get() + defer conn.Close() + + var err error + for count := 1; count <= pingRedisMaxTimes; count++ { + if _, err = conn.Do("ping"); err == nil { + return nil + } + + time.Sleep(time.Duration(count+4) * time.Second) + } + + return fmt.Errorf("connect to redis server timeout: %s", err.Error()) +} + //generate the job stats data func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats { if j == nil { diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index c82bf7810a..32fa2fd71f 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -68,14 +68,21 @@ func (bs *Bootstrap) LoadAndRun() { } //Start the pool - var backendPool pool.Interface + var ( + backendPool pool.Interface + wpErr error + ) if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis { - backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig) + backendPool, wpErr = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig) + if wpErr != nil { + logger.Fatalf("Failed to load and run worker pool: %s\n", wpErr.Error()) + } + } else { + logger.Fatalf("Worker pool backend '%s' is not supported", config.DefaultConfig.PoolConfig.Backend) } //Initialize controller ctl := core.NewController(backendPool) - //Start the API server apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl) logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol) @@ -144,7 +151,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura } //Load and run the worker pool -func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface { +func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) (pool.Interface, error) { redisPool := &redis.Pool{ MaxActive: 6, MaxIdle: 6, @@ -166,8 +173,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con //Register jobs here if err := redisWorkerPool.RegisterJob(impl.KnownJobDemo, (*impl.DemoJob)(nil)); err != nil { //exit - ctx.ErrorChan <- err - return redisWorkerPool //avoid nil pointer issue + return nil, err } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ @@ -177,11 +183,12 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con job.ImageReplicate: (*replication.Replicator)(nil), }); err != nil { //exit - ctx.ErrorChan <- err - return redisWorkerPool //avoid nil pointer issue + return nil, err } - redisWorkerPool.Start() + if err := redisWorkerPool.Start(); err != nil { + return nil, err + } - return redisWorkerPool + return redisWorkerPool, nil }