From 619747fc68d1f1091945139de725077284964604 Mon Sep 17 00:00:00 2001 From: "stonezdj(Daojun Zhang)" Date: Thu, 24 Nov 2022 20:29:50 +0800 Subject: [PATCH] Stop Pending Job cannot update status (#17842) Convert the redis range result into struct and extract job id from it Add more log when get redis config fails Signed-off-by: stonezdj Signed-off-by: stonezdj --- src/common/job/client.go | 2 ++ src/controller/jobmonitor/monitor.go | 2 +- src/pkg/jobmonitor/redis.go | 19 ++++++++++++++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/common/job/client.go b/src/common/job/client.go index d90feddf1..8f985e1fd 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -15,6 +15,7 @@ import ( "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/config" + "github.com/goharbor/harbor/src/lib/log" ) var ( @@ -231,6 +232,7 @@ func (d *DefaultClient) GetJobServiceConfig() (*job.Config, error) { return nil, err } if resp.StatusCode != http.StatusOK { + log.Infof("failed to get job service config from jobservice:8080/api/v1/config, job service container version maybe mismatch") return nil, &commonhttp.Error{ Code: resp.StatusCode, Message: string(data), diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go index 1c48de400..72e3b3b07 100644 --- a/src/controller/jobmonitor/monitor.go +++ b/src/controller/jobmonitor/monitor.go @@ -55,7 +55,7 @@ type MonitorController interface { // ListWorkers lists the workers in the pool ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) - // StopRunningJob stop the running job + // StopRunningJobs stop the running job StopRunningJobs(ctx context.Context, jobID string) error // StopPendingJobs stop the pending jobs StopPendingJobs(ctx context.Context, jobType string) error diff --git a/src/pkg/jobmonitor/redis.go b/src/pkg/jobmonitor/redis.go index 79589fd52..06ec3b84a 100644 --- a/src/pkg/jobmonitor/redis.go +++ b/src/pkg/jobmonitor/redis.go @@ -16,6 +16,7 @@ package jobmonitor import ( "context" + "encoding/json" "fmt" "time" @@ -69,10 +70,24 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) ( redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", r.namespace, jobType) conn := r.redisPool.Get() defer conn.Close() - jobIDs, err = redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1)) + var jobInfo struct { + ID string `json:"id"` + } + jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1)) if err != nil { return []string{}, err } + if len(jobs) == 0 { + log.Infof("no pending job for job type %v", jobType) + return []string{}, nil + } + for _, j := range jobs { + if err := json.Unmarshal([]byte(j), &jobInfo); err != nil { + log.Errorf("failed to parse the job info %v, %v", j, err) + continue + } + jobIDs = append(jobIDs, jobInfo.ID) + } log.Infof("updated %d tasks in pending status to stop", len(jobIDs)) ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue)) if err != nil { @@ -83,6 +98,7 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) ( return []string{}, fmt.Errorf("no job in the queue removed") } log.Infof("deleted %d keys in waiting queue for %s", ret, jobType) + log.Debugf("job id to be deleted %v", jobIDs) return jobIDs, nil } @@ -110,6 +126,7 @@ func (r *redisClientImpl) UnpauseJob(ctx context.Context, jobName string) error return err } +// JobServiceRedisClient function to create redis client for job service func JobServiceRedisClient() (RedisClient, error) { cfg, err := job.GlobalClient.GetJobServiceConfig() if err != nil {