mirror of https://github.com/goharbor/harbor.git
(cherry-pick) Remove job status track information from redis after stop the job in the queue (#19305)
Remove job status track information from redis after stop the job in the queue After stop in the queue: Remove key in {harbor_job_service_namespace}:job_track:inprogress Remove {harbor_job_service_namespace}:job_stats:<job_id> fixes #19211 Signed-off-by: stonezdj <daojunz@vmware.com>
This commit is contained in:
parent
206c5b8dc1
commit
dd959fb5b1
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/gomodule/redigo/redis"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
libRedis "github.com/goharbor/harbor/src/lib/redis"
|
||||
|
@ -93,6 +94,10 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
|
|||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
go func() {
|
||||
// the amount of jobIDs maybe large, so use goroutine to remove the job status tracking info
|
||||
r.removeJobStatusInRedis(ctx, jobIDs)
|
||||
}()
|
||||
if ret < 1 {
|
||||
// no job in queue removed
|
||||
return []string{}, fmt.Errorf("no job in the queue removed")
|
||||
|
@ -102,6 +107,27 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
|
|||
return jobIDs, nil
|
||||
}
|
||||
|
||||
// removeJobStatusInRedis remove job status track information from redis, to avoid performance impact when the jobIDs is too large, use batch to remove
|
||||
func (r *redisClientImpl) removeJobStatusInRedis(ctx context.Context, jobIDs []string) {
|
||||
conn := r.redisPool.Get()
|
||||
defer conn.Close()
|
||||
for _, id := range jobIDs {
|
||||
namespace := fmt.Sprintf("{%s}", r.namespace)
|
||||
redisKeyStatus := rds.KeyJobStats(namespace, id)
|
||||
log.Debugf("delete job status info for job id:%v, key:%v", id, redisKeyStatus)
|
||||
_, err := conn.Do("DEL", redisKeyStatus)
|
||||
if err != nil {
|
||||
log.Warningf("failed to delete the job status info for job %v, %v, continue", id, err)
|
||||
}
|
||||
redisKeyInProgress := rds.KeyJobTrackInProgress(namespace)
|
||||
log.Debugf("delete inprogress info for key:%v, job id:%v", id, redisKeyInProgress)
|
||||
_, err = conn.Do("HDEL", redisKeyInProgress, id)
|
||||
if err != nil {
|
||||
log.Warningf("failed to delete the job info in %v for job %v, %v, continue", rds.KeyJobTrackInProgress(namespace), id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) {
|
||||
conn := r.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package jobmonitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
)
|
||||
|
||||
type RedisClientTestSuite struct {
|
||||
suite.Suite
|
||||
redisClient redisClientImpl
|
||||
redisURL string
|
||||
}
|
||||
|
||||
func (suite *RedisClientTestSuite) SetupSuite() {
|
||||
redisHost := os.Getenv("REDIS_HOST")
|
||||
if redisHost == "" {
|
||||
suite.FailNow("REDIS_HOST is not specified")
|
||||
}
|
||||
suite.redisURL = fmt.Sprintf("redis://%s:6379", redisHost)
|
||||
pool, err := redisPool(&config.RedisPoolConfig{RedisURL: suite.redisURL, Namespace: "{jobservice_namespace}", IdleTimeoutSecond: 30})
|
||||
suite.redisClient = redisClientImpl{
|
||||
redisPool: pool,
|
||||
namespace: "{harbor_job_service_namespace}",
|
||||
}
|
||||
if err != nil {
|
||||
suite.FailNow("failed to create redis client", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *RedisClientTestSuite) TearDownSuite() {
|
||||
}
|
||||
|
||||
func (suite *RedisClientTestSuite) TestUntrackJobStatusInBatch() {
|
||||
// create key and value
|
||||
jobIDs := make([]string, 0)
|
||||
conn := suite.redisClient.redisPool.Get()
|
||||
defer conn.Close()
|
||||
for i := 0; i < 100; i++ {
|
||||
k := utils.GenerateRandomStringWithLen(10)
|
||||
jobIDs = append(jobIDs, k)
|
||||
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), k)
|
||||
v := utils.GenerateRandomStringWithLen(10)
|
||||
_, err := conn.Do("HSET", key, k, v)
|
||||
if err != nil {
|
||||
suite.FailNow("can not insert data to redis", err)
|
||||
}
|
||||
}
|
||||
suite.redisClient.removeJobStatusInRedis(context.Background(), jobIDs)
|
||||
key := rds.KeyJobStats(fmt.Sprintf("{%s}", suite.redisClient.namespace), "*")
|
||||
result, err := conn.Do("KEYS", key)
|
||||
if err != nil {
|
||||
suite.FailNow("can not get data from redis", err)
|
||||
}
|
||||
remains, err := redis.Values(result, err)
|
||||
if err != nil {
|
||||
suite.FailNow("can not get data from redis", err)
|
||||
}
|
||||
suite.Equal(0, len(remains))
|
||||
}
|
||||
|
||||
func TestRedisClientTestSuite(t *testing.T) {
|
||||
suite.Run(t, &RedisClientTestSuite{})
|
||||
}
|
Loading…
Reference in New Issue