Use batch to list the job id in the job queue to avoid crash redis (#19444)

fixes: #19436

Signed-off-by: stonezdj <daojunz@vmware.com>
This commit is contained in:
stonezdj(Daojun Zhang) 2023-10-18 17:31:37 +08:00 committed by GitHub
parent 84a85fb299
commit d030ab216b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 24 deletions

View File

@ -32,6 +32,9 @@ import (
// JobServicePool job service pool name // JobServicePool job service pool name
const JobServicePool = "JobService" const JobServicePool = "JobService"
// batchSize the batch size to list the job in queue
const batchSize = 1000
// RedisClient defines the job service operations related to redis // RedisClient defines the job service operations related to redis
type RedisClient interface { type RedisClient interface {
// AllJobTypes returns all the job types registered in the job service // AllJobTypes returns all the job types registered in the job service
@ -74,21 +77,36 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
var jobInfo struct { var jobInfo struct {
ID string `json:"id"` ID string `json:"id"`
} }
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1)) size, err := redis.Int64(conn.Do("LLEN", redisKeyJobQueue))
if err != nil { if err != nil {
log.Infof("fail to get the size of the queue")
return []string{}, err return []string{}, err
} }
if len(jobs) == 0 { if size == 0 {
log.Infof("no pending job for job type %v", jobType)
return []string{}, nil return []string{}, nil
} }
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil { // use batch to list the job in queue, because the too many object load from a list might cause the redis crash
log.Errorf("failed to parse the job info %v, %v", j, err) for startIndex := int64(0); startIndex < int64(size); startIndex += batchSize {
continue endIndex := startIndex + batchSize
if endIndex > int64(size) {
endIndex = int64(size)
}
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, startIndex, endIndex))
if err != nil {
return []string{}, err
}
for _, j := range jobs {
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
log.Errorf("failed to parse the job info %v, %v", j, err)
continue
}
if len(jobInfo.ID) > 0 {
jobIDs = append(jobIDs, jobInfo.ID)
}
} }
jobIDs = append(jobIDs, jobInfo.ID)
} }
log.Infof("updated %d tasks in pending status to stop", len(jobIDs)) log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue)) ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
if err != nil { if err != nil {

View File

@ -16,6 +16,7 @@ package jobmonitor
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"os" "os"
"testing" "testing"
@ -34,51 +35,97 @@ type RedisClientTestSuite struct {
redisURL string redisURL string
} }
func (suite *RedisClientTestSuite) SetupSuite() { func (s *RedisClientTestSuite) SetupSuite() {
redisHost := os.Getenv("REDIS_HOST") redisHost := os.Getenv("REDIS_HOST")
if redisHost == "" { if redisHost == "" {
suite.FailNow("REDIS_HOST is not specified") s.FailNow("REDIS_HOST is not specified")
} }
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost) s.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30}) pool, err := redisPool(&config.RedisPoolConfig{RedisURL: s.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
suite.redisClient = redisClientImpl{ s.redisClient = redisClientImpl{
redisPool: pool, redisPool: pool,
namespace: "{harbor_job_service_namespace}", namespace: "{harbor_job_service_namespace}",
} }
if err != nil { if err != nil {
suite.FailNow("failed to create redis client", err) s.FailNow("failed to create redis client", err)
} }
} }
func (suite *RedisClientTestSuite) TearDownSuite() { func (s *RedisClientTestSuite) TearDownSuite() {
} }
func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() { func (s *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
// create key and value // create key and value
jobIDs := make([]string, 0) jobIDs := make([]string, 0)
conn := suite.redisClient.redisPool.Get() conn := s.redisClient.redisPool.Get()
defer conn.Close() defer conn.Close()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
k := utils.GenerateRandomStringWithLen(10) k := utils.GenerateRandomStringWithLen(10)
jobIDs = append(jobIDs, k) jobIDs = append(jobIDs, k)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k) key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), k)
v := utils.GenerateRandomStringWithLen(10) v := utils.GenerateRandomStringWithLen(10)
_, err := conn.Do("HSET", key, k, v) _, err := conn.Do("HSET", key, k, v)
if err != nil { if err != nil {
suite.FailNow("can not insert data to redis", err) s.FailNow("can not insert data to redis", err)
} }
} }
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*") s.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
key := rds.KeyJobStats(fmt.Sprintf("{%s}", s.redisClient.namespace), "*")
result, err := conn.Do("KEYS", key) result, err := conn.Do("KEYS", key)
if err != nil { if err != nil {
suite.FailNow("can not get data from redis", err) s.FailNow("can not get data from redis", err)
} }
remains, err := redis.Values(result, err) remains, err := redis.Values(result, err)
if err != nil { if err != nil {
suite.FailNow("can not get data from redis", err) s.FailNow("can not get data from redis", err)
} }
suite.Equal(0, len(remains)) s.Equal(0, len(remains))
}
func (s *RedisClientTestSuite) TestStopPendingJobs() {
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", "{harbor_job_service_namespace}", "REPLICATION")
// create key and value
type jobInfo struct {
ID string `json:"id"`
Params string `json:"params"`
}
conn := s.redisClient.redisPool.Get()
defer conn.Close()
for i := 0; i < 100; i++ {
job := jobInfo{
ID: utils.GenerateRandomStringWithLen(10),
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
// job without id
for i := 0; i < 10; i++ {
job := jobInfo{
Params: utils.GenerateRandomStringWithLen(10),
}
val, err := json.Marshal(&job)
if err != nil {
s.Errorf(err, "failed to marshal job info")
}
_, err = conn.Do("LPUSH", redisKeyJobQueue, val)
if err != nil {
s.FailNow("can not insert data to redis", err)
}
}
jobIDs, err := s.redisClient.StopPendingJobs(context.Background(), "REPLICATION")
if err != nil {
s.FailNow("failed to stop pending jobs", err)
}
s.Assert().Equal(100, len(jobIDs))
} }
func TestRedisClientTestSuite(t *testing.T) { func TestRedisClientTestSuite(t *testing.T) {