Merge pull request #8500 from steven-zou/fix/pool_run_out

fix bug #8499: redis connection pool is run out
This commit is contained in:
Wenkai Yin(尹文开) 2019-08-02 17:47:26 +08:00 committed by GitHub
commit 27245021d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 12 deletions

View File

@ -17,6 +17,10 @@ package migration
import (
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/job"
@ -25,9 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"strings"
"testing"
"time"
)
// ManagerTestSuite tests functions of manager
@ -168,7 +169,8 @@ func (suite *ManagerTestSuite) TestManager() {
assert.NoError(suite.T(), err, "get count of policies error")
assert.Equal(suite.T(), 1, count)
p, err := getPeriodicPolicy(suite.numbericID, conn, suite.namespace)
innerConn := suite.pool.Get()
p, err := getPeriodicPolicy(suite.numbericID, innerConn, suite.namespace)
assert.NoError(suite.T(), err, "get migrated policy error")
assert.NotEmpty(suite.T(), p.ID, "ID of policy")
assert.NotEmpty(suite.T(), p.WebHookURL, "Web hook URL of policy")

View File

@ -18,6 +18,8 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/job"
@ -25,7 +27,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"strconv"
)
// PolicyMigrator migrate the cron job policy to new schema
@ -66,7 +67,9 @@ func (pm *PolicyMigrator) Metadata() *MigratorMeta {
func (pm *PolicyMigrator) Migrate() error {
conn := pm.pool.Get()
defer func() {
_ = conn.Close()
if err := conn.Close(); err != nil {
logger.Errorf("close redis connection error: %s", err)
}
}()
allJobIDs, err := getAllJobStatsIDs(conn, pm.namespace)
@ -137,10 +140,9 @@ func (pm *PolicyMigrator) Migrate() error {
// Update periodic policy model
// conn is working, we need new conn
// this inner connection will be closed by the calling method
innerConn := pm.pool.Get()
defer func() {
_ = innerConn.Close()
}()
policy, er := getPeriodicPolicy(numbericPolicyID, innerConn, pm.namespace)
if er == nil {
policy.ID = pID
@ -250,6 +252,13 @@ func getScoreByID(id string, conn redis.Conn, ns string) (int64, error) {
// Get periodic policy object by the numeric ID
func getPeriodicPolicy(numericID int64, conn redis.Conn, ns string) (*period.Policy, error) {
// close this inner connection here
defer func() {
if err := conn.Close(); err != nil {
logger.Errorf("close redis connection error: %s", err)
}
}()
bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(ns), numericID, numericID))
if err != nil {
return nil, err

View File

@ -263,9 +263,8 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
// Get a redis connection pool
func (bs *Bootstrap) getRedisPool(redisURL string) *redis.Pool {
return &redis.Pool{
MaxActive: 6,
MaxIdle: 6,
Wait: true,
MaxIdle: 6,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.DialURL(
redisURL,