mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-19 05:11:30 +01:00
Stop Pending Job cannot update status (#17842)
Convert the redis range result into struct and extract job id from it Add more log when get redis config fails Signed-off-by: stonezdj <daojunz@vmware.com> Signed-off-by: stonezdj <daojunz@vmware.com>
This commit is contained in:
parent
d079034b23
commit
619747fc68
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/common/job/models"
|
"github.com/goharbor/harbor/src/common/job/models"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/config"
|
"github.com/goharbor/harbor/src/lib/config"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -231,6 +232,7 @@ func (d *DefaultClient) GetJobServiceConfig() (*job.Config, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
log.Infof("failed to get job service config from jobservice:8080/api/v1/config, job service container version maybe mismatch")
|
||||||
return nil, &commonhttp.Error{
|
return nil, &commonhttp.Error{
|
||||||
Code: resp.StatusCode,
|
Code: resp.StatusCode,
|
||||||
Message: string(data),
|
Message: string(data),
|
||||||
|
@ -55,7 +55,7 @@ type MonitorController interface {
|
|||||||
// ListWorkers lists the workers in the pool
|
// ListWorkers lists the workers in the pool
|
||||||
ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error)
|
ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error)
|
||||||
|
|
||||||
// StopRunningJob stop the running job
|
// StopRunningJobs stop the running job
|
||||||
StopRunningJobs(ctx context.Context, jobID string) error
|
StopRunningJobs(ctx context.Context, jobID string) error
|
||||||
// StopPendingJobs stop the pending jobs
|
// StopPendingJobs stop the pending jobs
|
||||||
StopPendingJobs(ctx context.Context, jobType string) error
|
StopPendingJobs(ctx context.Context, jobType string) error
|
||||||
|
@ -16,6 +16,7 @@ package jobmonitor
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -69,10 +70,24 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
|
|||||||
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", r.namespace, jobType)
|
redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", r.namespace, jobType)
|
||||||
conn := r.redisPool.Get()
|
conn := r.redisPool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
jobIDs, err = redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
|
var jobInfo struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
}
|
||||||
|
jobs, err := redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []string{}, err
|
return []string{}, err
|
||||||
}
|
}
|
||||||
|
if len(jobs) == 0 {
|
||||||
|
log.Infof("no pending job for job type %v", jobType)
|
||||||
|
return []string{}, nil
|
||||||
|
}
|
||||||
|
for _, j := range jobs {
|
||||||
|
if err := json.Unmarshal([]byte(j), &jobInfo); err != nil {
|
||||||
|
log.Errorf("failed to parse the job info %v, %v", j, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jobIDs = append(jobIDs, jobInfo.ID)
|
||||||
|
}
|
||||||
log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
|
log.Infof("updated %d tasks in pending status to stop", len(jobIDs))
|
||||||
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
|
ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -83,6 +98,7 @@ func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (
|
|||||||
return []string{}, fmt.Errorf("no job in the queue removed")
|
return []string{}, fmt.Errorf("no job in the queue removed")
|
||||||
}
|
}
|
||||||
log.Infof("deleted %d keys in waiting queue for %s", ret, jobType)
|
log.Infof("deleted %d keys in waiting queue for %s", ret, jobType)
|
||||||
|
log.Debugf("job id to be deleted %v", jobIDs)
|
||||||
return jobIDs, nil
|
return jobIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,6 +126,7 @@ func (r *redisClientImpl) UnpauseJob(ctx context.Context, jobName string) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// JobServiceRedisClient function to create redis client for job service
|
||||||
func JobServiceRedisClient() (RedisClient, error) {
|
func JobServiceRedisClient() (RedisClient, error) {
|
||||||
cfg, err := job.GlobalClient.GetJobServiceConfig()
|
cfg, err := job.GlobalClient.GetJobServiceConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user