diff --git a/src/jobservice/common/rds/keys.go b/src/jobservice/common/rds/keys.go index fde3e745a..dbb73c547 100644 --- a/src/jobservice/common/rds/keys.go +++ b/src/jobservice/common/rds/keys.go @@ -83,3 +83,8 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string { func KeyHookEventRetryQueue(namespace string) string { return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "hook_events") } + +// KeyStatusUpdateRetryQueue returns the key of status change retrying queue +func KeyStatusUpdateRetryQueue(namespace string) string { + return fmt.Sprintf("%s:%s", KeyNamespacePrefix(namespace), "status_change_events") +} diff --git a/src/jobservice/common/rds/utils.go b/src/jobservice/common/rds/utils.go index d73388a33..7157986e8 100644 --- a/src/jobservice/common/rds/utils.go +++ b/src/jobservice/common/rds/utils.go @@ -118,3 +118,35 @@ func ReleaseLock(conn redis.Conn, lockerKey string, lockerID string) error { return errors.New("locker ID mismatch") } + +// ZPopMin pops the element with lowest score in the zset +func ZPopMin(conn redis.Conn, key string) (interface{}, error) { + err := conn.Send("MULTI") + err = conn.Send("ZRANGE", key, 0, 0) // lowest one + err = conn.Send("ZREMRANGEBYRANK", key, 0, 0) + if err != nil { + return nil, err + } + + replies, err := redis.Values(conn.Do("EXEC")) + if err != nil { + return nil, err + } + + if len(replies) < 2 { + return nil, errors.Errorf("zpopmin error: not enough results returned, expected %d but got %d", 2, len(replies)) + } + + zrangeReply := replies[0] + if zrangeReply != nil { + if elements, ok := zrangeReply.([]interface{}); ok { + if len(elements) == 0 { + return nil, redis.ErrNil + } else { + return elements[0], nil + } + } + } + + return nil, errors.New("zpopmin error: bad result reply") +} diff --git a/src/jobservice/common/rds/utils_test.go b/src/jobservice/common/rds/utils_test.go new file mode 100644 index 000000000..8fbd9164b --- /dev/null +++ b/src/jobservice/common/rds/utils_test.go @@ -0,0 +1,72 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rds + +import ( + "encoding/json" + "github.com/goharbor/harbor/src/jobservice/tests" + "testing" + "time" +) + +var ( + pool = tests.GiveMeRedisPool() + namespace = tests.GiveMeTestNamespace() +) + +// For testing +type simpleStatusChange struct { + JobID string +} + +func TestZPopMin(t *testing.T) { + conn := pool.Get() + defer conn.Close() + + s1 := &simpleStatusChange{"a"} + s2 := &simpleStatusChange{"b"} + + raw1, _ := json.Marshal(s1) + raw2, _ := json.Marshal(s2) + + key := KeyStatusUpdateRetryQueue(namespace) + _, err := conn.Do("ZADD", key, time.Now().Unix(), raw1) + _, err = conn.Do("ZADD", key, time.Now().Unix()+5, raw2) + if err != nil { + t.Fatal(err) + } + + v, err := ZPopMin(conn, key) + if err != nil { + t.Fatal(err) + } + + change1 := &simpleStatusChange{} + json.Unmarshal(v.([]byte), change1) + if change1.JobID != "a" { + t.Errorf("expect min element 'a' but got '%s'", change1.JobID) + } + + v, err = ZPopMin(conn, key) + if err != nil { + t.Fatal(err) + } + + change2 := &simpleStatusChange{} + json.Unmarshal(v.([]byte), change2) + if change2.JobID != "b" { + t.Errorf("expect min element 'b' but got '%s'", change2.JobID) + } +} diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index c06ebad6a..16c4ad708 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -82,6 +82,12 @@ type StatusChange struct { Metadata *StatsInfo `json:"metadata,omitempty"` } +// SimpleStatusChange only keeps job ID and the target status +type SimpleStatusChange struct { + JobID string `json:"job_id"` + TargetStatus string `json:"target_status"` +} + // Validate the job stats func (st *Stats) Validate() error { if st.Info == nil { diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index 807801565..5e6aa1fe7 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -87,6 +87,9 @@ type Tracker interface { // Check in message CheckIn(message string) error + // Update status with retry enabled + UpdateStatusWithRetry(targetStatus Status) error + // The current status of job Status() (Status, error) @@ -96,9 +99,6 @@ type Tracker interface { // Switch status to running Run() error - // Switch status to scheduled - Schedule() error - // Switch status to stopped Stop() error @@ -222,11 +222,15 @@ func (bt *basicTracker) CheckIn(message string) error { return errors.New("check in error: empty message") } - err := bt.fireHook(Status(bt.jobStats.Info.Status), message) + now := time.Now().Unix() + current := Status(bt.jobStats.Info.Status) + + bt.refresh(current, message) + err := bt.fireHookEvent(current, message) err = bt.Update( "check_in", message, - "check_in_at", time.Now().Unix(), - "update_time", time.Now().Unix(), + "check_in_at", now, + "update_time", now, ) return err @@ -308,21 +312,22 @@ func (bt *basicTracker) Expire() error { } // Run job +// Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Run() error { - return bt.compareAndSet(RunningStatus) -} + bt.refresh(RunningStatus) + err := bt.fireHookEvent(RunningStatus) + err = bt.compareAndSet(RunningStatus) -// Schedule job -func (bt *basicTracker) Schedule() error { - return bt.compareAndSet(ScheduledStatus) + return err } // Stop job // Stop is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Stop() error { - err := bt.fireHook(StoppedStatus) - err = bt.updateStatusWithRetry(StoppedStatus) + bt.refresh(StoppedStatus) + err := bt.fireHookEvent(StoppedStatus) + err = bt.UpdateStatusWithRetry(StoppedStatus) return err } @@ -331,8 +336,9 @@ func (bt *basicTracker) Stop() error { // Fail is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Fail() error { - err := bt.fireHook(ErrorStatus) - err = bt.updateStatusWithRetry(ErrorStatus) + bt.refresh(ErrorStatus) + err := bt.fireHookEvent(ErrorStatus) + err = bt.UpdateStatusWithRetry(ErrorStatus) return err } @@ -341,8 +347,9 @@ func (bt *basicTracker) Fail() error { // Succeed is final status, if failed to do, retry should be enforced. // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Succeed() error { - err := bt.fireHook(SuccessStatus) - err = bt.updateStatusWithRetry(SuccessStatus) + bt.refresh(SuccessStatus) + err := bt.fireHookEvent(SuccessStatus) + err = bt.UpdateStatusWithRetry(SuccessStatus) return err } @@ -433,8 +440,38 @@ func (bt *basicTracker) Save() (err error) { return } -// Fire the hook event -func (bt *basicTracker) fireHook(status Status, checkIn ...string) error { +// UpdateStatusWithRetry updates the status with retry enabled. +// If update status failed, then retry if permitted. +// Try best to do +func (bt *basicTracker) UpdateStatusWithRetry(targetStatus Status) error { + err := bt.compareAndSet(targetStatus) + if err != nil { + // Push to the retrying Q + if er := bt.pushToQueueForRetry(targetStatus); er != nil { + logger.Errorf("push job status update request to retry queue error: %s", er) + // If failed to put it into the retrying Q in case, let's downgrade to retry in current process + // by recursively call in goroutines. + bt.retryUpdateStatus(targetStatus) + } + } + + return err +} + +// Refresh the job stats in mem +func (bt *basicTracker) refresh(targetStatus Status, checkIn ...string) { + now := time.Now().Unix() + + bt.jobStats.Info.Status = targetStatus.String() + if len(checkIn) > 0 { + bt.jobStats.Info.CheckIn = checkIn[0] + bt.jobStats.Info.CheckInAt = now + } + bt.jobStats.Info.UpdateTime = now +} + +// FireHookEvent fires the hook event +func (bt *basicTracker) fireHookEvent(status Status, checkIn ...string) error { // Check if hook URL is registered if utils.IsEmptyStr(bt.jobStats.Info.WebHookURL) { // Do nothing @@ -459,31 +496,47 @@ func (bt *basicTracker) fireHook(status Status, checkIn ...string) error { return nil } -// If update status failed, then retry if permitted. -// Try best to do -func (bt *basicTracker) updateStatusWithRetry(targetStatus Status) error { - err := bt.compareAndSet(targetStatus) - if err != nil { - // If still need to retry - // Check the update timestamp - if time.Now().Unix()-bt.jobStats.Info.UpdateTime < 2*24*3600 { - // Keep on retrying - go func() { - select { - case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second): - if err := bt.updateStatusWithRetry(targetStatus); err != nil { - logger.Errorf("Retry of updating status of job %s error: %s", bt.jobID, err) - } - case <-bt.context.Done(): - return // terminated - } - }() - } +func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error { + simpleStatusChange := &SimpleStatusChange{ + JobID: bt.jobID, + TargetStatus: targetStatus.String(), } + rawJSON, err := json.Marshal(simpleStatusChange) + if err != nil { + return err + } + + conn := bt.pool.Get() + defer conn.Close() + + key := rds.KeyStatusUpdateRetryQueue(bt.namespace) + args := []interface{}{key, "NX", time.Now().Unix(), rawJSON} + + _, err = conn.Do("ZADD", args...) + return err } +func (bt *basicTracker) retryUpdateStatus(targetStatus Status) { + go func() { + select { + case <-time.After(time.Duration(5)*time.Minute + time.Duration(rand.Int31n(13))*time.Second): + // Check the update timestamp + if time.Now().Unix()-bt.jobStats.Info.UpdateTime < statDataExpireTime-24*3600 { + if err := bt.compareAndSet(targetStatus); err != nil { + logger.Errorf("Retry to update job status error: %s", err) + bt.retryUpdateStatus(targetStatus) + } + // Success + } + return + case <-bt.context.Done(): + return // terminated + } + }() +} + func (bt *basicTracker) compareAndSet(targetStatus Status) error { conn := bt.pool.Get() defer conn.Close() @@ -495,9 +548,14 @@ func (bt *basicTracker) compareAndSet(targetStatus Status) error { return err } - if st.Compare(targetStatus) >= 0 { + diff := st.Compare(targetStatus) + if diff > 0 { return fmt.Errorf("mismatch job status: current %s, setting to %s", st, targetStatus) } + if diff == 0 { + // Desired matches actual + return nil + } return setStatus(conn, rootKey, targetStatus) } diff --git a/src/jobservice/lcm/controller.go b/src/jobservice/lcm/controller.go index 2b9c9466d..f5db8f845 100644 --- a/src/jobservice/lcm/controller.go +++ b/src/jobservice/lcm/controller.go @@ -16,13 +16,27 @@ package lcm import ( "context" + "encoding/json" + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" "github.com/gomodule/redigo/redis" "github.com/pkg/errors" + "sync" + "time" +) + +const ( + shortLoopInterval = 5 * time.Second + longLoopInterval = 5 * time.Minute ) // Controller is designed to control the life cycle of the job type Controller interface { + // Run daemon process if needed + Serve() error + // New tracker from the new provided stats New(stats *job.Stats) (job.Tracker, error) @@ -36,18 +50,28 @@ type basicController struct { namespace string pool *redis.Pool callback job.HookCallback + wg *sync.WaitGroup } // NewController is the constructor of basic controller -func NewController(ctx context.Context, ns string, pool *redis.Pool, callback job.HookCallback) Controller { +func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.HookCallback) Controller { return &basicController{ - context: ctx, + context: ctx.SystemContext, namespace: ns, pool: pool, callback: callback, + wg: ctx.WG, } } +// Serve ... +func (bc *basicController) Serve() error { + go bc.loopForRestoreDeadStatus() + logger.Info("Status restoring loop is started") + + return nil +} + // New tracker func (bc *basicController) New(stats *job.Stats) (job.Tracker, error) { if stats == nil { @@ -75,3 +99,73 @@ func (bc *basicController) Track(jobID string) (job.Tracker, error) { return bt, nil } + +// loopForRestoreDeadStatus is a loop to restore the dead states of jobs +func (bc *basicController) loopForRestoreDeadStatus() { + defer func() { + logger.Info("Status restoring loop is stopped") + bc.wg.Done() + }() + + token := make(chan bool, 1) + token <- true + + bc.wg.Add(1) + for { + <-token + + if err := bc.restoreDeadStatus(); err != nil { + wait := shortLoopInterval + if err == redis.ErrNil { + // No elements + wait = longLoopInterval + } + // wait for a while or be terminated + select { + case <-time.After(wait): + case <-bc.context.Done(): + return + } + } + + // Return token + token <- true + } +} + +// restoreDeadStatus try to restore the dead status +func (bc *basicController) restoreDeadStatus() error { + // Get one + deadOne, err := bc.popOneDead() + if err != nil { + return err + } + // Try to update status + t, err := bc.Track(deadOne.JobID) + if err != nil { + return err + } + + return t.UpdateStatusWithRetry(job.Status(deadOne.TargetStatus)) +} + +// popOneDead retrieves one dead status from the backend Q from lowest to highest +func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) { + conn := bc.pool.Get() + defer conn.Close() + + key := rds.KeyStatusUpdateRetryQueue(bc.namespace) + v, err := rds.ZPopMin(conn, key) + if err != nil { + return nil, err + } + + if bytes, ok := v.([]byte); ok { + ssc := &job.SimpleStatusChange{} + if err := json.Unmarshal(bytes, ssc); err == nil { + return ssc, nil + } + } + + return nil, errors.New("pop one dead error: bad result reply") +} diff --git a/src/jobservice/period/basic_scheduler.go b/src/jobservice/period/basic_scheduler.go index a1312910b..5503a7130 100644 --- a/src/jobservice/period/basic_scheduler.go +++ b/src/jobservice/period/basic_scheduler.go @@ -173,9 +173,12 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { } // REM from redis db with transaction way - conn.Send("MULTI") - conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score - conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON) + err = conn.Send("MULTI") + err = conn.Send("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID) // Accurately remove the item with the specified score + err = conn.Send("PUBLISH", rds.KeyPeriodicNotification(bs.namespace), msgJSON) + if err != nil { + return err + } _, err = conn.Do("EXEC") if err != nil { return err diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index d66752882..856143e0c 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -115,7 +115,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) } // Create job life cycle management controller - lcmCtl = lcm.NewController(ctx, namespace, redisPool, hookCallback) + lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback) // Start the backend worker backendWorker, wErr = bs.loadAndRunRedisWorkerPool(rootContext, namespace, workerNum, redisPool, lcmCtl) @@ -123,6 +123,10 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) logger.Fatalf("Failed to load and run worker: %s\n", wErr.Error()) } + // Run daemon process of life cycle controller + // Ignore returned error + lcmCtl.Serve() + // Start agent // Non blocking call hookAgent.Serve() diff --git a/src/jobservice/worker/cworker/redis_pool.go b/src/jobservice/worker/cworker/redis_pool.go index badb2e640..591db568d 100644 --- a/src/jobservice/worker/cworker/redis_pool.go +++ b/src/jobservice/worker/cworker/redis_pool.go @@ -195,7 +195,7 @@ func (w *basicWorker) Enqueue(jobName string, params job.Parameters, isUnique bo return nil, fmt.Errorf("job '%s' can not be enqueued, please check the job metatdata", jobName) } - return generateResult(j, job.KindGeneric, isUnique, params), nil + return generateResult(j, job.KindGeneric, isUnique, params, webHook), nil } // Schedule job @@ -225,7 +225,7 @@ func (w *basicWorker) Schedule(jobName string, params job.Parameters, runAfterSe return nil, fmt.Errorf("job '%s' can not be enqueued, please check the job metatdata", jobName) } - res := generateResult(j.Job, job.KindScheduled, isUnique, params) + res := generateResult(j.Job, job.KindScheduled, isUnique, params, webHook) res.Info.RunAt = j.RunAt res.Info.Status = job.ScheduledStatus.String() @@ -468,7 +468,13 @@ func (w *basicWorker) ping() error { } // generate the job stats data -func generateResult(j *work.Job, jobKind string, isUnique bool, jobParameters job.Parameters) *job.Stats { +func generateResult( + j *work.Job, + jobKind string, + isUnique bool, + jobParameters job.Parameters, + webHook string, +) *job.Stats { return &job.Stats{ Info: &job.StatsInfo{ JobID: j.ID, @@ -480,6 +486,7 @@ func generateResult(j *work.Job, jobKind string, isUnique bool, jobParameters jo UpdateTime: time.Now().Unix(), RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID), Parameters: jobParameters, + WebHookURL: webHook, }, } }