mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-30 06:03:45 +01:00
fix #8525: periodic job retry issue: job stats is not found
Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
parent
8701007714
commit
926ac44a67
@ -15,11 +15,11 @@
|
|||||||
package period
|
package period
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"context"
|
|
||||||
"github.com/gocraft/work"
|
"github.com/gocraft/work"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
@ -175,23 +175,14 @@ func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) {
|
|||||||
e.lastEnqueueErr = err
|
e.lastEnqueueErr = err
|
||||||
logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)
|
logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)
|
||||||
} else {
|
} else {
|
||||||
if p.JobParameters == nil {
|
|
||||||
p.JobParameters = make(job.Parameters)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clone job parameters
|
|
||||||
wJobParams := make(job.Parameters)
|
|
||||||
if p.JobParameters != nil && len(p.JobParameters) > 0 {
|
|
||||||
for k, v := range p.JobParameters {
|
|
||||||
wJobParams[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Add extra argument for job running
|
|
||||||
// Notes: Only for system using
|
|
||||||
wJobParams[PeriodicExecutionMark] = true
|
|
||||||
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
|
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
|
||||||
epoch := t.Unix()
|
epoch := t.Unix()
|
||||||
|
|
||||||
|
// Clone parameters
|
||||||
|
// Add extra argument for job running too.
|
||||||
|
// Notes: Only for system using
|
||||||
|
wJobParams := cloneParameters(p.JobParameters, epoch)
|
||||||
|
|
||||||
// Create an execution (job) based on the periodic job template (policy)
|
// Create an execution (job) based on the periodic job template (policy)
|
||||||
j := &work.Job{
|
j := &work.Job{
|
||||||
Name: p.JobName,
|
Name: p.JobName,
|
||||||
@ -316,3 +307,16 @@ func (e *enqueuer) shouldEnqueue() bool {
|
|||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cloneParameters(params job.Parameters, epoch int64) job.Parameters {
|
||||||
|
p := make(job.Parameters)
|
||||||
|
|
||||||
|
// Clone parameters to a new param map
|
||||||
|
for k, v := range params {
|
||||||
|
p[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
p[PeriodicExecutionMark] = fmt.Sprintf("%d", epoch)
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
@ -15,18 +15,17 @@
|
|||||||
package runner
|
package runner
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
|
||||||
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gocraft/work"
|
"github.com/gocraft/work"
|
||||||
"github.com/goharbor/harbor/src/jobservice/env"
|
"github.com/goharbor/harbor/src/jobservice/env"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||||
"github.com/goharbor/harbor/src/jobservice/lcm"
|
"github.com/goharbor/harbor/src/jobservice/lcm"
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/period"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -69,8 +68,10 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
|
|||||||
|
|
||||||
// Track the running job now
|
// Track the running job now
|
||||||
jID := j.ID
|
jID := j.ID
|
||||||
if isPeriodicJobExecution(j) {
|
|
||||||
jID = fmt.Sprintf("%s@%d", j.ID, j.EnqueuedAt)
|
// Check if the job is a periodic one as periodic job has its own ID format
|
||||||
|
if eID, yes := isPeriodicJobExecution(j); yes {
|
||||||
|
jID = eID
|
||||||
}
|
}
|
||||||
|
|
||||||
if tracker, err = rj.ctl.Track(jID); err != nil {
|
if tracker, err = rj.ctl.Track(jID); err != nil {
|
||||||
@ -191,7 +192,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
|
|||||||
// Handle retry
|
// Handle retry
|
||||||
rj.retry(runningJob, j)
|
rj.retry(runningJob, j)
|
||||||
// Handle periodic job execution
|
// Handle periodic job execution
|
||||||
if isPeriodicJobExecution(j) {
|
if _, yes := isPeriodicJobExecution(j); yes {
|
||||||
if er := tracker.PeriodicExecutionDone(); er != nil {
|
if er := tracker.PeriodicExecutionDone(); er != nil {
|
||||||
// Just log it
|
// Just log it
|
||||||
logger.Error(er)
|
logger.Error(er)
|
||||||
@ -210,14 +211,9 @@ func (rj *RedisJob) retry(j job.Interface, wj *work.Job) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPeriodicJobExecution(j *work.Job) bool {
|
func isPeriodicJobExecution(j *work.Job) (string, bool) {
|
||||||
if isPeriodic, ok := j.Args["_job_kind_periodic_"]; ok {
|
epoch, ok := j.Args[period.PeriodicExecutionMark]
|
||||||
if isPeriodicV, yes := isPeriodic.(bool); yes && isPeriodicV {
|
return fmt.Sprintf("%s@%s", j.ID, epoch), ok
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func bp(b bool) *bool {
|
func bp(b bool) *bool {
|
||||||
|
@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
|||||||
w.pool.JobWithOptions(
|
w.pool.JobWithOptions(
|
||||||
name,
|
name,
|
||||||
work.JobOptions{
|
work.JobOptions{
|
||||||
MaxFails: theJ.MaxFails() + 1,
|
MaxFails: theJ.MaxFails(),
|
||||||
},
|
},
|
||||||
// Use generic handler to handle as we do not accept context with this way.
|
// Use generic handler to handle as we do not accept context with this way.
|
||||||
func(job *work.Job) error {
|
func(job *work.Job) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user