From ace07b0c0a78d0098d3a09dbcafade15721abcce Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Tue, 20 Aug 2019 16:39:44 +0800 Subject: [PATCH] fix #8745: job retry issue Signed-off-by: Steven Zou --- src/jobservice/job/impl/sample/job.go | 8 +++ src/jobservice/job/models.go | 1 + src/jobservice/job/tracker.go | 74 ++++++++++++++++++----- src/jobservice/runner/redis.go | 35 ++++++++++- src/jobservice/worker/cworker/c_worker.go | 2 +- 5 files changed, 101 insertions(+), 19 deletions(-) diff --git a/src/jobservice/job/impl/sample/job.go b/src/jobservice/job/impl/sample/job.go index 81101ecce..be860ec0e 100644 --- a/src/jobservice/job/impl/sample/job.go +++ b/src/jobservice/job/impl/sample/job.go @@ -17,6 +17,7 @@ package sample import ( "errors" "fmt" + "os" "strings" "time" @@ -67,6 +68,13 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { fmt.Printf("Get prop form context: sample=%s\n", v) } + // For failure case + if len(os.Getenv("JOB_FAILED")) > 0 { + <-time.After(3 * time.Second) + logger.Info("Job exit with error because `JOB_FAILED` env is set") + return errors.New("`JOB_FAILED` env is set") + } + ctx.Checkin("progress data: %30") <-time.After(1 * time.Second) ctx.Checkin("progress data: %60") diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index 16c4ad708..1ae70679e 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -67,6 +67,7 @@ type StatsInfo struct { UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job Parameters Parameters `json:"parameters,omitempty"` + Revision int64 `json:"revision,omitempty"` // For differentiating the each retry of the same job } // ActionRequest defines for triggering job action like stop/cancel. diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index 0e70d338a..9ed0e113d 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -17,15 +17,16 @@ package job import ( "context" "encoding/json" + "math/rand" + "strconv" + "time" + "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" "github.com/pkg/errors" - "math/rand" - "strconv" - "time" ) const ( @@ -96,6 +97,9 @@ type Tracker interface { // Switch the status to success Succeed() error + + // Reset the status to `pending` + Reset() error } // basicTracker implements Tracker interface based on redis @@ -361,6 +365,8 @@ func (bt *basicTracker) Save() (err error) { } // Set update timestamp args = append(args, "update_time", time.Now().Unix()) + // Set the first revision + args = append(args, "revision", time.Now().Unix()) // Do it in a transaction err = conn.Send("MULTI") @@ -419,6 +425,29 @@ func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error { return err } +// Reset the job status to `pending` and update the revision. +// Usually for the retry jobs +func (bt *basicTracker) Reset() error { + conn := bt.pool.Get() + defer func() { + closeConn(conn) + }() + + now := time.Now().Unix() + err := bt.Update( + "status", + PendingStatus.String(), + "revision", + now, + ) + if err == nil { + bt.refresh(PendingStatus) + bt.jobStats.Info.Revision = now + } + + return err +} + // Refresh the job stats in mem func (bt *basicTracker) refresh(targetStatus Status, checkIn ...string) { now := time.Now().Unix() @@ -571,20 +600,16 @@ func (bt *basicTracker) retrieve() error { res.Info.RefLink = value break case "enqueue_time": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.EnqueueTime = v + res.Info.EnqueueTime = parseInt64(value) break case "update_time": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.UpdateTime = v + res.Info.UpdateTime = parseInt64(value) break case "run_at": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.RunAt = v + res.Info.RunAt = parseInt64(value) break case "check_in_at": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.CheckInAt = v + res.Info.CheckInAt = parseInt64(value) break case "check_in": res.Info.CheckIn = value @@ -596,14 +621,12 @@ func (bt *basicTracker) retrieve() error { res.Info.WebHookURL = value break case "die_at": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.DieAt = v + res.Info.DieAt = parseInt64(value) case "upstream_job_id": res.Info.UpstreamJobID = value break case "numeric_policy_id": - v, _ := strconv.ParseInt(value, 10, 64) - res.Info.NumericPID = v + res.Info.NumericPID = parseInt64(value) break case "parameters": params := make(Parameters) @@ -611,6 +634,9 @@ func (bt *basicTracker) retrieve() error { res.Info.Parameters = params } break + case "revision": + res.Info.Revision = parseInt64(value) + break default: break } @@ -640,3 +666,21 @@ func getStatus(conn redis.Conn, key string) (Status, error) { func setStatus(conn redis.Conn, key string, status Status) error { return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix()) } + +func closeConn(conn redis.Conn) { + if conn != nil { + if err := conn.Close(); err != nil { + logger.Errorf("Close redis connection failed with error: %s", err) + } + } +} + +func parseInt64(v string) int64 { + intV, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Errorf("Parse int64 error: %s", err) + return 0 + } + + return intV +} diff --git a/src/jobservice/runner/redis.go b/src/jobservice/runner/redis.go index f8409b2d7..1fa6dde56 100644 --- a/src/jobservice/runner/redis.go +++ b/src/jobservice/runner/redis.go @@ -15,17 +15,19 @@ package runner import ( - "github.com/goharbor/harbor/src/jobservice/job/impl" "runtime" + "github.com/goharbor/harbor/src/jobservice/job/impl" + "fmt" + "time" + "github.com/gocraft/work" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/pkg/errors" - "time" ) // RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis worker. @@ -85,11 +87,38 @@ func (rj *RedisJob) Run(j *work.Job) (err error) { return } - if job.RunningStatus.Compare(job.Status(tracker.Job().Info.Status)) <= 0 { + // Do operation based on the job status + jStatus := job.Status(tracker.Job().Info.Status) + switch jStatus { + case job.PendingStatus, job.ScheduledStatus: + // do nothing now + break + case job.StoppedStatus: // Probably jobs has been stopped by directly mark status to stopped. // Directly exit and no retry markStopped = bp(true) return nil + case job.ErrorStatus: + if j.FailedAt > 0 && j.Fails > 0 { + // Retry job + // Reset job info + if er := tracker.Reset(); er != nil { + // Log error and return the original error if existing + er = errors.Wrap(er, fmt.Sprintf("retrying job %s:%s failed", j.Name, j.ID)) + logger.Error(er) + + if len(j.LastErr) > 0 { + return errors.New(j.LastErr) + } + + return err + } + + logger.Infof("|*_*| Retrying job %s:%s, revision: %d", j.Name, j.ID, tracker.Job().Info.Revision) + } + break + default: + return errors.Errorf("mismatch status for running job: expected <%s <> got %s", job.RunningStatus.String(), jStatus.String()) } // Defer to switch status diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index 6de36856f..b4c3b799f 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -405,7 +405,7 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) { w.pool.JobWithOptions( name, work.JobOptions{ - MaxFails: theJ.MaxFails(), + MaxFails: theJ.MaxFails() + 1, }, // Use generic handler to handle as we do not accept context with this way. func(job *work.Job) error {