diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index cc37b22c1..efacfcb98 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -19,11 +19,7 @@ import ( ) var ( - dialConnectionTimeout = 30 * time.Second - healthCheckPeriod = time.Minute - dialReadTimeout = healthCheckPeriod + 10*time.Second - dialWriteTimeout = 10 * time.Second - workerPoolDeadTime = 10 * time.Second + workerPoolDeadTime = 10 * time.Second ) const ( @@ -50,43 +46,21 @@ type GoCraftWorkPool struct { knownJobs map[string]interface{} } -//RedisPoolConfig defines configurations for GoCraftWorkPool. -type RedisPoolConfig struct { - RedisHost string - RedisPort uint - Namespace string - WorkerCount uint -} - //RedisPoolContext ... //We did not use this context to pass context info so far, just a placeholder. type RedisPoolContext struct{} //NewGoCraftWorkPool is constructor of goCraftWorkPool. -func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool { - redisPool := &redis.Pool{ - MaxActive: 6, - MaxIdle: 6, - Wait: true, - Dial: func() (redis.Conn, error) { - return redis.Dial( - "tcp", - fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort), - redis.DialConnectTimeout(dialConnectionTimeout), - redis.DialReadTimeout(dialReadTimeout), - redis.DialWriteTimeout(dialWriteTimeout), - ) - }, - } - pool := work.NewWorkerPool(RedisPoolContext{}, cfg.WorkerCount, cfg.Namespace, redisPool) - enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool) - client := work.NewClient(cfg.Namespace, redisPool) - scheduler := period.NewRedisPeriodicScheduler(ctx, cfg.Namespace, redisPool) - sweeper := period.NewSweeper(cfg.Namespace, redisPool, client) - statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool, client, scheduler) - msgServer := NewMessageServer(ctx.SystemContext, cfg.Namespace, redisPool) +func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, redisPool *redis.Pool) *GoCraftWorkPool { + pool := work.NewWorkerPool(RedisPoolContext{}, workerCount, namespace, redisPool) + enqueuer := work.NewEnqueuer(namespace, redisPool) + client := work.NewClient(namespace, redisPool) + scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool) + sweeper := period.NewSweeper(namespace, redisPool, client) + statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, namespace, redisPool, client, scheduler) + msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) return &GoCraftWorkPool{ - namespace: cfg.Namespace, + namespace: namespace, redisPool: redisPool, pool: pool, enqueuer: enqueuer, diff --git a/src/jobservice_v2/runtime/bootstrap.go b/src/jobservice_v2/runtime/bootstrap.go index fd985eca0..bafead14a 100644 --- a/src/jobservice_v2/runtime/bootstrap.go +++ b/src/jobservice_v2/runtime/bootstrap.go @@ -4,12 +4,14 @@ package runtime import ( "context" + "fmt" "os" "os/signal" "sync" "syscall" "time" + "github.com/garyburd/redigo/redis" "github.com/vmware/harbor/src/common/job" "github.com/vmware/harbor/src/jobservice_v2/api" "github.com/vmware/harbor/src/jobservice_v2/config" @@ -22,6 +24,13 @@ import ( "github.com/vmware/harbor/src/jobservice_v2/pool" ) +const ( + dialConnectionTimeout = 30 * time.Second + healthCheckPeriod = time.Minute + dialReadTimeout = healthCheckPeriod + 10*time.Second + dialWriteTimeout = 10 * time.Second +) + //JobService ... var JobService = &Bootstrap{} @@ -136,14 +145,25 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura //Load and run the worker pool func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface { - redisPoolCfg := pool.RedisPoolConfig{ - RedisHost: cfg.PoolConfig.RedisPoolCfg.Host, - RedisPort: cfg.PoolConfig.RedisPoolCfg.Port, - Namespace: cfg.PoolConfig.RedisPoolCfg.Namespace, - WorkerCount: cfg.PoolConfig.WorkerCount, + redisPool := &redis.Pool{ + MaxActive: 6, + MaxIdle: 6, + Wait: true, + Dial: func() (redis.Conn, error) { + return redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", cfg.PoolConfig.RedisPoolCfg.Host, cfg.PoolConfig.RedisPoolCfg.Port), + redis.DialConnectTimeout(dialConnectionTimeout), + redis.DialReadTimeout(dialReadTimeout), + redis.DialWriteTimeout(dialWriteTimeout), + ) + }, } - redisWorkerPool := pool.NewGoCraftWorkPool(ctx, redisPoolCfg) + redisWorkerPool := pool.NewGoCraftWorkPool(ctx, + cfg.PoolConfig.RedisPoolCfg.Namespace, + cfg.PoolConfig.WorkerCount, + redisPool) //Register jobs here if err := redisWorkerPool.RegisterJob(impl.KnownJobReplication, (*impl.ReplicationJob)(nil)); err != nil { //exit