add retry for status switching

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2019-04-19 21:14:09 +08:00
parent 38e6df4edb
commit ea34bcc950
9 changed files with 330 additions and 49 deletions

View File

@ -83,3 +83,8 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string {
func KeyHookEventRetryQueue(namespace string) string {
return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "hook_events")
}
// KeyStatusUpdateRetryQueue returns the key of status change retrying queue
func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "status_change_events")
}

View File

@ -118,3 +118,35 @@ func ReleaseLock(conn redis.Conn, lockerKey string, lockerID string) error {
return errors.New("locker ID mismatch")
}
// ZPopMin pops the element with lowest score in the zset
func ZPopMin(conn redis.Conn, key string) (interface{}, error) {
err := conn.Send("MULTI")
err = conn.Send("ZRANGE", key, 0, 0) // lowest one
err = conn.Send("ZREMRANGEBYRANK", key, 0, 0)
if err != nil {
return nil, err
}
replies, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return nil, err
}
if len(replies) < 2 {
return nil, errors.Errorf("zpopmin error: not enough results returned, expected %d but got %d", 2, len(replies))
}
zrangeReply := replies[0]
if zrangeReply != nil {
if elements, ok := zrangeReply.([]interface{}); ok {
if len(elements) == 0 {
return nil, redis.ErrNil
} else {
return elements[0], nil
}
}
}
return nil, errors.New("zpopmin error: bad result reply")
}

View File

@ -0,0 +1,72 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rds
import (
"encoding/json"
"github.com/goharbor/harbor/src/jobservice/tests"
"testing"
"time"
)
var (
pool = tests.GiveMeRedisPool()
namespace = tests.GiveMeTestNamespace()
)
// For testing
type simpleStatusChange struct {
JobID string
}
func TestZPopMin(t *testing.T) {
conn := pool.Get()
defer conn.Close()
s1 := &simpleStatusChange{"a"}
s2 := &simpleStatusChange{"b"}
raw1, _ := json.Marshal(s1)
raw2, _ := json.Marshal(s2)
key := KeyStatusUpdateRetryQueue(namespace)
_, err := conn.Do("ZADD", key, time.Now().Unix(), raw1)
_, err = conn.Do("ZADD", key, time.Now().Unix()+5, raw2)
if err != nil {
t.Fatal(err)
}
v, err := ZPopMin(conn, key)
if err != nil {
t.Fatal(err)
}
change1 := &simpleStatusChange{}
json.Unmarshal(v.([]byte), change1)
if change1.JobID != "a" {
t.Errorf("expect min element 'a' but got '%s'", change1.JobID)
}
v, err = ZPopMin(conn, key)
if err != nil {
t.Fatal(err)
}
change2 := &simpleStatusChange{}
json.Unmarshal(v.([]byte), change2)
if change2.JobID != "b" {
t.Errorf("expect min element 'b' but got '%s'", change2.JobID)
}
}

View File

@ -82,6 +82,12 @@ type StatusChange struct {
Metadata *StatsInfo `json:"metadata,omitempty"`
}
// SimpleStatusChange only keeps job ID and the target status
type SimpleStatusChange struct {
JobID string `json:"job_id"`
TargetStatus string `json:"target_status"`
}
// Validate the job stats
func (st *Stats) Validate() error {
if st.Info == nil {

View File

@ -87,6 +87,9 @@ type Tracker interface {
// Check in message
CheckIn(message string) error
// Update status with retry enabled
UpdateStatusWithRetry(targetStatus Status) error
// The current status of job
Status() (Status, error)
@ -96,9 +99,6 @@ type Tracker interface {
// Switch status to running
Run() error
// Switch status to scheduled
Schedule() error
// Switch status to stopped
Stop() error
@ -222,11 +222,15 @@ func (bt *basicTracker) CheckIn(message string) error {
return errors.New("check in error: empty message")
}
err := bt.fireHook(Status(bt.jobStats.Info.Status), message)
now := time.Now().Unix()
current := Status(bt.jobStats.Info.Status)
bt.refresh(current, message)
err := bt.fireHookEvent(current, message)
err = bt.Update(
"check_in", message,
"check_in_at", time.Now().Unix(),
"update_time", time.Now().Unix(),
"check_in_at", now,
"update_time", now,
)
return err
@ -308,21 +312,22 @@ func (bt *basicTracker) Expire() error {
}
// Run job
// Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Run() error {
return bt.compareAndSet(RunningStatus)
}
bt.refresh(RunningStatus)
err := bt.fireHookEvent(RunningStatus)
err = bt.compareAndSet(RunningStatus)
// Schedule job
func (bt *basicTracker) Schedule() error {
return bt.compareAndSet(ScheduledStatus)
return err
}
// Stop job
// Stop is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Stop() error {
err := bt.fireHook(StoppedStatus)
err = bt.updateStatusWithRetry(StoppedStatus)
bt.refresh(StoppedStatus)
err := bt.fireHookEvent(StoppedStatus)
err = bt.UpdateStatusWithRetry(StoppedStatus)
return err
}
@ -331,8 +336,9 @@ func (bt *basicTracker) Stop() error {
// Fail is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Fail() error {
err := bt.fireHook(ErrorStatus)
err = bt.updateStatusWithRetry(ErrorStatus)
bt.refresh(ErrorStatus)
err := bt.fireHookEvent(ErrorStatus)
err = bt.UpdateStatusWithRetry(ErrorStatus)
return err
}
@ -341,8 +347,9 @@ func (bt *basicTracker) Fail() error {
// Succeed is final status, if failed to do, retry should be enforced.
// Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Succeed() error {
err := bt.fireHook(SuccessStatus)
err = bt.updateStatusWithRetry(SuccessStatus)
bt.refresh(SuccessStatus)
err := bt.fireHookEvent(SuccessStatus)
err = bt.UpdateStatusWithRetry(SuccessStatus)
return err
}
@ -433,8 +440,38 @@ func (bt *basicTracker) Save() (err error) {
return
}
// Fire the hook event
func (bt *basicTracker) fireHook(status Status, checkIn ...string) error {
// UpdateStatusWithRetry updates the status with retry enabled.
// If update status failed, then retry if permitted.
// Try best to do
func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error {
err := bt.compareAndSet(targetStatus)
if err != nil {
// Push to the retrying Q
if er := bt.pushToQueueForRetry(targetStatus); er != nil {
logger.Errorf("push job status update request to retry queue error: %s", er)
// If failed to put it into the retrying Q in case, let's downgrade to retry in current process
// by recursively call in goroutines.
bt.retryUpdateStatus(targetStatus)
}
}
return err
}
// Refresh the job stats in mem
func (bt *basicTracker) refresh(targetStatus Status, checkIn ...string) {
now := time.Now().Unix()
bt.jobStats.Info.Status = targetStatus.String()
if len(checkIn) > 0 {
bt.jobStats.Info.CheckIn = checkIn[0]
bt.jobStats.Info.CheckInAt = now
}
bt.jobStats.Info.UpdateTime = now
}
// FireHookEvent fires the hook event
func (bt *basicTracker) fireHookEvent(status Status, checkIn ...string) error {
// Check if hook URL is registered
if utils.IsEmptyStr(bt.jobStats.Info.WebHookURL) {
// Do nothing
@ -459,31 +496,47 @@ func (bt *basicTracker) fireHook(status Status, checkIn ...string) error {
return nil
}
// If update status failed, then retry if permitted.
// Try best to do
func (bt *basicTracker) updateStatusWithRetry(targetStatus Status) error {
err := bt.compareAndSet(targetStatus)
if err != nil {
// If still need to retry
// Check the update timestamp
if time.Now().Unix()-bt.jobStats.Info.UpdateTime < 2*24*3600 {
// Keep on retrying
go func() {
select {
case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second):
if err := bt.updateStatusWithRetry(targetStatus); err != nil {
logger.Errorf("Retry of updating status of job %s error: %s", bt.jobID, err)
}
case <-bt.context.Done():
return // terminated
}
}()
}
func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error {
simpleStatusChange := &SimpleStatusChange{
JobID: bt.jobID,
TargetStatus: targetStatus.String(),
}
rawJSON, err := json.Marshal(simpleStatusChange)
if err != nil {
return err
}
conn := bt.pool.Get()
defer conn.Close()
key := rds.KeyStatusUpdateRetryQueue(bt.namespace)
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
_, err = conn.Do("ZADD", args...)
return err
}
func (bt *basicTracker) retryUpdateStatus(targetStatus Status) {
go func() {
select {
case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second):
// Check the update timestamp
if time.Now().Unix()-bt.jobStats.Info.UpdateTime < statDataExpireTime-24*3600 {
if err := bt.compareAndSet(targetStatus); err != nil {
logger.Errorf("Retry to update job status error: %s", err)
bt.retryUpdateStatus(targetStatus)
}
// Success
}
return
case <-bt.context.Done():
return // terminated
}
}()
}
func (bt *basicTracker) compareAndSet(targetStatus Status) error {
conn := bt.pool.Get()
defer conn.Close()
@ -495,9 +548,14 @@ func (bt *basicTracker) compareAndSet(targetStatus Status) error {
return err
}
if st.Compare(targetStatus) >= 0 {
diff := st.Compare(targetStatus)
if diff > 0 {
return fmt.Errorf("mismatch job status: current %s, setting to %s", st, targetStatus)
}
if diff == 0 {
// Desired matches actual
return nil
}
return setStatus(conn, rootKey, targetStatus)
}

View File

@ -16,13 +16,27 @@ package lcm
import (
"context"
"encoding/json"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"sync"
"time"
)
const (
shortLoopInterval = 5 * time.Second
longLoopInterval = 5 * time.Minute
)
// Controller is designed to control the life cycle of the job
type Controller interface {
// Run daemon process if needed
Serve() error
// New tracker from the new provided stats
New(stats *job.Stats) (job.Tracker, error)
@ -36,18 +50,28 @@ type basicController struct {
namespace string
pool *redis.Pool
callback job.HookCallback
wg *sync.WaitGroup
}
// NewController is the constructor of basic controller
func NewController(ctx context.Context, ns string, pool *redis.Pool, callback job.HookCallback) Controller {
func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.HookCallback) Controller {
return &basicController{
context: ctx,
context: ctx.SystemContext,
namespace: ns,
pool: pool,
callback: callback,
wg: ctx.WG,
}
}
// Serve ...
func (bc *basicController) Serve() error {
go bc.loopForRestoreDeadStatus()
logger.Info("Status restoring loop is started")
return nil
}
// New tracker
func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) {
if stats == nil {
@ -75,3 +99,73 @@ func (bc *basicController) Track(jobID string) (job.Tracker, error) {
return bt, nil
}
// loopForRestoreDeadStatus is a loop to restore the dead states of jobs
func (bc *basicController) loopForRestoreDeadStatus() {
defer func() {
logger.Info("Status restoring loop is stopped")
bc.wg.Done()
}()
token := make(chan bool, 1)
token <- true
bc.wg.Add(1)
for {
<-token
if err := bc.restoreDeadStatus(); err != nil {
wait := shortLoopInterval
if err == redis.ErrNil {
// No elements
wait = longLoopInterval
}
// wait for a while or be terminated
select {
case <-time.After(wait):
case <-bc.context.Done():
return
}
}
// Return token
token <- true
}
}
// restoreDeadStatus try to restore the dead status
func (bc *basicController) restoreDeadStatus() error {
// Get one
deadOne, err := bc.popOneDead()
if err != nil {
return err
}
// Try to update status
t, err := bc.Track(deadOne.JobID)
if err != nil {
return err
}
return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus))
}
// popOneDead retrieves one dead status from the backend Q from lowest to highest
func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) {
conn := bc.pool.Get()
defer conn.Close()
key := rds.KeyStatusUpdateRetryQueue(bc.namespace)
v, err := rds.ZPopMin(conn, key)
if err != nil {
return nil, err
}
if bytes, ok := v.([]byte); ok {
ssc := &job.SimpleStatusChange{}
if err := json.Unmarshal(bytes, ssc); err == nil {
return ssc, nil
}
}
return nil, errors.New("pop one dead error: bad result reply")
}

View File

@ -173,9 +173,12 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
}
// REM from redis db with transaction way
conn.Send("MULTI")
conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score
conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
err = conn.Send("MULTI")
err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score
err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON)
if err != nil {
return err
}
_, err = conn.Do("EXEC")
if err != nil {
return err

View File

@ -115,7 +115,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
}
// Create job life cycle management controller
lcmCtl = lcm.NewController(ctx, namespace, redisPool, hookCallback)
lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback)
// Start the backend worker
backendWorker, wErr = bs.loadAndRunRedisWorkerPool(rootContext, namespace, workerNum, redisPool, lcmCtl)
@ -123,6 +123,10 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
logger.Fatalf("Failed to load and run worker: %s\n", wErr.Error())
}
// Run daemon process of life cycle controller
// Ignore returned error
lcmCtl.Serve()
// Start agent
// Non blocking call
hookAgent.Serve()

View File

@ -195,7 +195,7 @@ func (w *basicWorker) Enqueue(jobName string, params job.Parameters, isUnique bo
return nil, fmt.Errorf("job '%s' can not be enqueued, please check the job metatdata", jobName)
}
return generateResult(j, job.KindGeneric, isUnique, params), nil
return generateResult(j, job.KindGeneric, isUnique, params, webHook), nil
}
// Schedule job
@ -225,7 +225,7 @@ func (w *basicWorker) Schedule(jobName string, params job.Parameters, runAfterSe
return nil, fmt.Errorf("job '%s' can not be enqueued, please check the job metatdata", jobName)
}
res := generateResult(j.Job, job.KindScheduled, isUnique, params)
res := generateResult(j.Job, job.KindScheduled, isUnique, params, webHook)
res.Info.RunAt = j.RunAt
res.Info.Status = job.ScheduledStatus.String()
@ -468,7 +468,13 @@ func (w *basicWorker) ping() error {
}
// generate the job stats data
func generateResult(j *work.Job, jobKind string, isUnique bool, jobParameters job.Parameters) *job.Stats {
func generateResult(
j *work.Job,
jobKind string,
isUnique bool,
jobParameters job.Parameters,
webHook string,
) *job.Stats {
return &job.Stats{
Info: &job.StatsInfo{
JobID: j.ID,
@ -480,6 +486,7 @@ func generateResult(j *work.Job, jobKind string, isUnique bool, jobParameters jo
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID),
Parameters: jobParameters,
WebHookURL: webHook,
},
}
}