mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-24 09:38:09 +01:00
parent
f930786050
commit
ace07b0c0a
@ -17,6 +17,7 @@ package sample
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -67,6 +68,13 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
|
||||
fmt.Printf("Get prop form context: sample=%s\n", v)
|
||||
}
|
||||
|
||||
// For failure case
|
||||
if len(os.Getenv("JOB_FAILED")) > 0 {
|
||||
<-time.After(3 * time.Second)
|
||||
logger.Info("Job exit with error because `JOB_FAILED` env is set")
|
||||
return errors.New("`JOB_FAILED` env is set")
|
||||
}
|
||||
|
||||
ctx.Checkin("progress data: %30")
|
||||
<-time.After(1 * time.Second)
|
||||
ctx.Checkin("progress data: %60")
|
||||
|
@ -67,6 +67,7 @@ type StatsInfo struct {
|
||||
UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing
|
||||
NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
|
||||
Parameters Parameters `json:"parameters,omitempty"`
|
||||
Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job
|
||||
}
|
||||
|
||||
// ActionRequest defines for triggering job action like stop/cancel.
|
||||
|
@ -17,15 +17,16 @@ package job
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -96,6 +97,9 @@ type Tracker interface {
|
||||
|
||||
// Switch the status to success
|
||||
Succeed() error
|
||||
|
||||
// Reset the status to `pending`
|
||||
Reset() error
|
||||
}
|
||||
|
||||
// basicTracker implements Tracker interface based on redis
|
||||
@ -361,6 +365,8 @@ func (bt *basicTracker) Save() (err error) {
|
||||
}
|
||||
// Set update timestamp
|
||||
args = append(args, "update_time", time.Now().Unix())
|
||||
// Set the first revision
|
||||
args = append(args, "revision", time.Now().Unix())
|
||||
|
||||
// Do it in a transaction
|
||||
err = conn.Send("MULTI")
|
||||
@ -419,6 +425,29 @@ func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Reset the job status to `pending` and update the revision.
|
||||
// Usually for the retry jobs
|
||||
func (bt *basicTracker) Reset() error {
|
||||
conn := bt.pool.Get()
|
||||
defer func() {
|
||||
closeConn(conn)
|
||||
}()
|
||||
|
||||
now := time.Now().Unix()
|
||||
err := bt.Update(
|
||||
"status",
|
||||
PendingStatus.String(),
|
||||
"revision",
|
||||
now,
|
||||
)
|
||||
if err == nil {
|
||||
bt.refresh(PendingStatus)
|
||||
bt.jobStats.Info.Revision = now
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Refresh the job stats in mem
|
||||
func (bt *basicTracker) refresh(targetStatus Status, checkIn ...string) {
|
||||
now := time.Now().Unix()
|
||||
@ -571,20 +600,16 @@ func (bt *basicTracker) retrieve() error {
|
||||
res.Info.RefLink = value
|
||||
break
|
||||
case "enqueue_time":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.EnqueueTime = v
|
||||
res.Info.EnqueueTime = parseInt64(value)
|
||||
break
|
||||
case "update_time":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.UpdateTime = v
|
||||
res.Info.UpdateTime = parseInt64(value)
|
||||
break
|
||||
case "run_at":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.RunAt = v
|
||||
res.Info.RunAt = parseInt64(value)
|
||||
break
|
||||
case "check_in_at":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.CheckInAt = v
|
||||
res.Info.CheckInAt = parseInt64(value)
|
||||
break
|
||||
case "check_in":
|
||||
res.Info.CheckIn = value
|
||||
@ -596,14 +621,12 @@ func (bt *basicTracker) retrieve() error {
|
||||
res.Info.WebHookURL = value
|
||||
break
|
||||
case "die_at":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.DieAt = v
|
||||
res.Info.DieAt = parseInt64(value)
|
||||
case "upstream_job_id":
|
||||
res.Info.UpstreamJobID = value
|
||||
break
|
||||
case "numeric_policy_id":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Info.NumericPID = v
|
||||
res.Info.NumericPID = parseInt64(value)
|
||||
break
|
||||
case "parameters":
|
||||
params := make(Parameters)
|
||||
@ -611,6 +634,9 @@ func (bt *basicTracker) retrieve() error {
|
||||
res.Info.Parameters = params
|
||||
}
|
||||
break
|
||||
case "revision":
|
||||
res.Info.Revision = parseInt64(value)
|
||||
break
|
||||
default:
|
||||
break
|
||||
}
|
||||
@ -640,3 +666,21 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
|
||||
func setStatus(conn redis.Conn, key string, status Status) error {
|
||||
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
|
||||
}
|
||||
|
||||
func closeConn(conn redis.Conn) {
|
||||
if conn != nil {
|
||||
if err := conn.Close(); err != nil {
|
||||
logger.Errorf("Close redis connection failed with error: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseInt64(v string) int64 {
|
||||
intV, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil {
|
||||
logger.Errorf("Parse int64 error: %s", err)
|
||||
return 0
|
||||
}
|
||||
|
||||
return intV
|
||||
}
|
||||
|
@ -15,17 +15,19 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||
"runtime"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/lcm"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/pkg/errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis worker.
|
||||
@ -85,11 +87,38 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
if job.RunningStatus.Compare(job.Status(tracker.Job().Info.Status)) <= 0 {
|
||||
// Do operation based on the job status
|
||||
jStatus := job.Status(tracker.Job().Info.Status)
|
||||
switch jStatus {
|
||||
case job.PendingStatus, job.ScheduledStatus:
|
||||
// do nothing now
|
||||
break
|
||||
case job.StoppedStatus:
|
||||
// Probably jobs has been stopped by directly mark status to stopped.
|
||||
// Directly exit and no retry
|
||||
markStopped = bp(true)
|
||||
return nil
|
||||
case job.ErrorStatus:
|
||||
if j.FailedAt > 0 && j.Fails > 0 {
|
||||
// Retry job
|
||||
// Reset job info
|
||||
if er := tracker.Reset(); er != nil {
|
||||
// Log error and return the original error if existing
|
||||
er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID))
|
||||
logger.Error(er)
|
||||
|
||||
if len(j.LastErr) > 0 {
|
||||
return errors.New(j.LastErr)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision)
|
||||
}
|
||||
break
|
||||
default:
|
||||
return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String())
|
||||
}
|
||||
|
||||
// Defer to switch status
|
||||
|
@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
||||
w.pool.JobWithOptions(
|
||||
name,
|
||||
work.JobOptions{
|
||||
MaxFails: theJ.MaxFails(),
|
||||
MaxFails: theJ.MaxFails() + 1,
|
||||
},
|
||||
// Use generic handler to handle as we do not accept context with this way.
|
||||
func(job *work.Job) error {
|
||||
|
Loading…
Reference in New Issue
Block a user