Fix the issue of intermittent restarting of job service

github issue: #4712
ping redis server before pool starting

Let worker pool to restart the message server if message server exits with error (controlled by max retries)
This commit is contained in:
Steven Zou 2018-04-24 17:17:05 +08:00
parent 5089b26cf8
commit e1b509e3f3
5 changed files with 73 additions and 22 deletions

View File

@ -215,7 +215,9 @@ func createJobReq(kind string, isUnique bool, withHook bool) models.JobRequest {
type fakePool struct{}
func (f *fakePool) Start() {}
func (f *fakePool) Start() error {
return nil
}
func (f *fakePool) RegisterJob(name string, job interface{}) error {
return nil

View File

@ -8,7 +8,10 @@ import "github.com/vmware/harbor/src/jobservice/models"
//More like a driver to transparent the lower queue.
type Interface interface {
//Start to serve
Start()
//
//Return:
// error if failed to start
Start() error
//Register job to the pool.
//

View File

@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"
@ -18,6 +19,10 @@ import (
"github.com/vmware/harbor/src/jobservice/utils"
)
const (
msgServerRetryTimes = 5
)
//MessageServer implements the sub/pub mechanism via redis to do async message exchanging.
type MessageServer struct {
context context.Context
@ -42,14 +47,13 @@ func (ms *MessageServer) Start() error {
logger.Info("Message server is stopped")
}()
//As we get one connection from the pool, don't try to close it.
conn := ms.redisPool.Get()
defer conn.Close()
conn := ms.redisPool.Get() //Get one backend connection!
psc := redis.PubSubConn{
Conn: conn,
}
defer psc.Close()
//Subscribe channel
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(ms.namespace))...)
if err != nil {
return err
@ -60,8 +64,7 @@ func (ms *MessageServer) Start() error {
for {
switch res := psc.Receive().(type) {
case error:
done <- res
return
done <- fmt.Errorf("error occurred when receiving from pub/sub channel of message server: %s", res.(error).Error())
case redis.Message:
m := &models.Message{}
if err := json.Unmarshal(res.Data, m); err != nil {
@ -131,12 +134,12 @@ func (ms *MessageServer) Start() error {
case <-ms.context.Done():
err = errors.New("context exit")
case err = <-done:
return err
}
}
//Unsubscribe all
psc.Unsubscribe()
return <-done
}

View File

@ -5,6 +5,7 @@ package pool
import (
"errors"
"fmt"
"math"
"time"
"github.com/garyburd/redigo/redis"
@ -29,6 +30,8 @@ const (
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
pingRedisMaxTimes = 10
)
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
@ -80,13 +83,17 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re
//Start to serve
//Unblock action
func (gcwp *GoCraftWorkPool) Start() {
func (gcwp *GoCraftWorkPool) Start() error {
if gcwp.redisPool == nil ||
gcwp.pool == nil ||
gcwp.context.SystemContext == nil {
//report and exit
gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured")
return
return errors.New("Redis worker pool can not start as it's not correctly configured")
}
//Test the redis connection
if err := gcwp.ping(); err != nil {
return err
}
done := make(chan interface{}, 1)
@ -130,8 +137,18 @@ func (gcwp *GoCraftWorkPool) Start() {
return
}
startTimes := 0
START_MSG_SERVER:
//Start message server
if err = gcwp.messageServer.Start(); err != nil {
logger.Errorf("Message server exits with error: %s\n", err.Error())
if startTimes < msgServerRetryTimes {
startTimes++
time.Sleep(time.Duration((int)(math.Pow(2, (float64)(startTimes)))+5) * time.Second)
logger.Infof("Restart message server (%d times)\n", startTimes)
goto START_MSG_SERVER
}
return
}
}()
@ -177,6 +194,8 @@ func (gcwp *GoCraftWorkPool) Start() {
gcwp.pool.Stop()
}()
return nil
}
//RegisterJob is used to register the job to the pool.
@ -593,6 +612,23 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc)
return next()
}
//Ping the redis server
func (gcwp *GoCraftWorkPool) ping() error {
conn := gcwp.redisPool.Get()
defer conn.Close()
var err error
for count := 1; count <= pingRedisMaxTimes; count++ {
if _, err = conn.Do("ping"); err == nil {
return nil
}
time.Sleep(time.Duration(count+4) * time.Second)
}
return fmt.Errorf("connect to redis server timeout: %s", err.Error())
}
//generate the job stats data
func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats {
if j == nil {

View File

@ -68,14 +68,21 @@ func (bs *Bootstrap) LoadAndRun() {
}
//Start the pool
var backendPool pool.Interface
var (
backendPool pool.Interface
wpErr error
)
if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis {
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
backendPool, wpErr = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
if wpErr != nil {
logger.Fatalf("Failed to load and run worker pool: %s\n", wpErr.Error())
}
} else {
logger.Fatalf("Worker pool backend '%s' is not supported", config.DefaultConfig.PoolConfig.Backend)
}
//Initialize controller
ctl := core.NewController(backendPool)
//Start the API server
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
@ -144,7 +151,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura
}
//Load and run the worker pool
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface {
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) (pool.Interface, error) {
redisPool := &redis.Pool{
MaxActive: 6,
MaxIdle: 6,
@ -166,8 +173,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
//Register jobs here
if err := redisWorkerPool.RegisterJob(impl.KnownJobDemo, (*impl.DemoJob)(nil)); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}
if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{
@ -177,11 +183,12 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
job.ImageReplicate: (*replication.Replicator)(nil),
}); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}
redisWorkerPool.Start()
if err := redisWorkerPool.Start(); err != nil {
return nil, err
}
return redisWorkerPool
return redisWorkerPool, nil
}