Merge pull request #8751 from steven-zou/fix/#8745

fix #8745: job retry issue
This commit is contained in:
Wenkai Yin(尹文开) 2019-08-20 19:01:30 +08:00 committed by GitHub
commit f9aa154826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 19 deletions

View File

@ -17,6 +17,7 @@ package sample
import (
"errors"
"fmt"
"os"
"strings"
"time"
@ -67,6 +68,13 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
fmt.Printf("Get prop form context: sample=%s\n", v)
}
// For failure case
if len(os.Getenv("JOB_FAILED")) > 0 {
<-time.After(3 * time.Second)
logger.Info("Job exit with error because `JOB_FAILED` env is set")
return errors.New("`JOB_FAILED` env is set")
}
ctx.Checkin("progress data: %30")
<-time.After(1 * time.Second)
ctx.Checkin("progress data: %60")

View File

@ -67,6 +67,7 @@ type StatsInfo struct {
UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing
NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
Parameters Parameters `json:"parameters,omitempty"`
Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job
}
// ActionRequest defines for triggering job action like stop/cancel.

View File

@ -17,15 +17,16 @@ package job
import (
"context"
"encoding/json"
"math/rand"
"strconv"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"math/rand"
"strconv"
"time"
)
const (
@ -96,6 +97,9 @@ type Tracker interface {
// Switch the status to success
Succeed() error
// Reset the status to `pending`
Reset() error
}
// basicTracker implements Tracker interface based on redis
@ -361,6 +365,8 @@ func (bt *basicTracker) Save() (err error) {
}
// Set update timestamp
args = append(args, "update_time", time.Now().Unix())
// Set the first revision
args = append(args, "revision", time.Now().Unix())
// Do it in a transaction
err = conn.Send("MULTI")
@ -419,6 +425,29 @@ func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error {
return err
}
// Reset the job status to `pending` and update the revision.
// Usually for the retry jobs
func (bt *basicTracker) Reset() error {
conn := bt.pool.Get()
defer func() {
closeConn(conn)
}()
now := time.Now().Unix()
err := bt.Update(
"status",
PendingStatus.String(),
"revision",
now,
)
if err == nil {
bt.refresh(PendingStatus)
bt.jobStats.Info.Revision = now
}
return err
}
// Refresh the job stats in mem
func (bt *basicTracker) refresh(targetStatus Status, checkIn ...string) {
now := time.Now().Unix()
@ -571,20 +600,16 @@ func (bt *basicTracker) retrieve() error {
res.Info.RefLink = value
break
case "enqueue_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.EnqueueTime = v
res.Info.EnqueueTime = parseInt64(value)
break
case "update_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.UpdateTime = v
res.Info.UpdateTime = parseInt64(value)
break
case "run_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.RunAt = v
res.Info.RunAt = parseInt64(value)
break
case "check_in_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.CheckInAt = v
res.Info.CheckInAt = parseInt64(value)
break
case "check_in":
res.Info.CheckIn = value
@ -596,14 +621,12 @@ func (bt *basicTracker) retrieve() error {
res.Info.WebHookURL = value
break
case "die_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.DieAt = v
res.Info.DieAt = parseInt64(value)
case "upstream_job_id":
res.Info.UpstreamJobID = value
break
case "numeric_policy_id":
v, _ := strconv.ParseInt(value, 10, 64)
res.Info.NumericPID = v
res.Info.NumericPID = parseInt64(value)
break
case "parameters":
params := make(Parameters)
@ -611,6 +634,9 @@ func (bt *basicTracker) retrieve() error {
res.Info.Parameters = params
}
break
case "revision":
res.Info.Revision = parseInt64(value)
break
default:
break
}
@ -640,3 +666,21 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
func setStatus(conn redis.Conn, key string, status Status) error {
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
}
func closeConn(conn redis.Conn) {
if conn != nil {
if err := conn.Close(); err != nil {
logger.Errorf("Close redis connection failed with error: %s", err)
}
}
}
func parseInt64(v string) int64 {
intV, err := strconv.ParseInt(v, 10, 64)
if err != nil {
logger.Errorf("Parse int64 error: %s", err)
return 0
}
return intV
}

View File

@ -15,17 +15,19 @@
package runner
import (
"github.com/goharbor/harbor/src/jobservice/job/impl"
"runtime"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"fmt"
"time"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/pkg/errors"
"time"
)
// RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis worker.
@ -85,11 +87,38 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
return
}
if job.RunningStatus.Compare(job.Status(tracker.Job().Info.Status)) <= 0 {
// Do operation based on the job status
jStatus := job.Status(tracker.Job().Info.Status)
switch jStatus {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
break
case job.StoppedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
markStopped = bp(true)
return nil
case job.ErrorStatus:
if j.FailedAt > 0 && j.Fails > 0 {
// Retry job
// Reset job info
if er := tracker.Reset(); er != nil {
// Log error and return the original error if existing
er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID))
logger.Error(er)
if len(j.LastErr) > 0 {
return errors.New(j.LastErr)
}
return err
}
logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
}
break
default:
return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String())
}
// Defer to switch status

View File

@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
w.pool.JobWithOptions(
name,
work.JobOptions{
MaxFails: theJ.MaxFails(),
MaxFails: theJ.MaxFails() + 1,
},
// Use generic handler to handle as we do not accept context with this way.
func(job *work.Job) error {