diff --git a/src/jobservice/job/impl/demo_job.go b/src/jobservice/job/impl/demo_job.go index 15a6a0bf1..603c937de 100644 --- a/src/jobservice/job/impl/demo_job.go +++ b/src/jobservice/job/impl/demo_job.go @@ -20,11 +20,8 @@ import ( "strings" "time" - "github.com/goharbor/harbor/src/common/job" - "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/errs" - "github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/opm" ) @@ -102,7 +99,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error return errs.JobStoppedError() } - fmt.Println("Launch sub job") + /*fmt.Println("Launch sub job") jobParams := make(map[string]interface{}) jobParams["image"] = "demo:1.7" subDemoJob := models.JobRequest{ @@ -115,16 +112,15 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error }, } - for i := 0; i < 5; i++ { - subJob, err := ctx.LaunchJob(subDemoJob) - if err != nil { - fmt.Printf("Create sub job failed with error: %s\n", err) - logger.Error(err) - } - - fmt.Printf("Sub job: %v", subJob) + subJob, err := ctx.LaunchJob(subDemoJob) + if err != nil { + fmt.Printf("Create sub job failed with error: %s\n", err) + logger.Error(err) + return } + fmt.Printf("Sub job: %v", subJob)*/ + fmt.Println("I'm close to end") return nil diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index aa9f6ddc6..801840d6b 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -15,6 +15,7 @@ package period import ( + "errors" "fmt" "math/rand" "time" @@ -77,6 +78,8 @@ func (pe *periodicEnqueuer) loop() { if err != nil { logger.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err) } + } else { + logger.Debug("Enqueue condition not matched, do nothing.") } for { @@ -91,6 +94,8 @@ func (pe *periodicEnqueuer) loop() { if err != nil { logger.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err) } + } else { + logger.Debug("Enqueue condition not matched, do nothing.") } } } @@ -98,12 +103,32 @@ func (pe *periodicEnqueuer) loop() { func (pe *periodicEnqueuer) enqueue() error { now := time.Now().Unix() - nowTime := time.Unix(now, 0) - horizon := nowTime.Add(periodicEnqueuerHorizon) conn := pe.pool.Get() defer conn.Close() + // Set last periodic enqueue timestamp in advance to avoid duplicated enqueue actions + if _, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now); err != nil { + return err + } + + // Avoid schedule in the same time. + lockerKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock") + lockerID := utils.MakeIdentifier() + + // Acquire a locker with 30s expiring time + if err := acquireLock(conn, lockerKey, lockerID, 30); err != nil { + return err + } + defer func() { + if err := releaseLock(conn, lockerKey, lockerID); err != nil { + logger.Errorf("release enqueue locker failed: %s", err) + } + }() + + nowTime := time.Unix(now, 0) + horizon := nowTime.Add(periodicEnqueuerHorizon) + for _, pl := range pe.policyStore.list() { schedule, err := cron.Parse(pl.CronSpec) if err != nil { @@ -136,6 +161,16 @@ func (pe *periodicEnqueuer) enqueue() error { return err } + // Place the time slots for the job (policy) + // If the slot is already there, error will be returned. + expireTime := (epoch - nowTime.Unix()) + 5 + slot := fmt.Sprintf("%s:%s@%d", utils.KeyPeriodicJobTimeSlots(pe.namespace), pl.PolicyID, epoch) + if err := placeSlot(conn, slot, epoch, expireTime); err != nil { + // Logged and continue + logger.Errorf("Failed to place time slot '%s@%d': %s", pl.PolicyID, epoch, err) + continue + } + _, err = conn.Do("ZADD", utils.RedisKeyScheduled(pe.namespace), epoch, rawJSON) if err != nil { return err @@ -171,9 +206,7 @@ func (pe *periodicEnqueuer) enqueue() error { conn.Do("HMSET", utils.KeyJobStats(pe.namespace, pl.PolicyID), "status", job.JobStatusScheduled, "update_time", time.Now().Unix()) } - _, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now) - - return err + return nil } func (pe *periodicEnqueuer) createExecution(upstreamJobID, upstreamJobName, executionID string, runAt int64) { @@ -208,3 +241,35 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool { return lastEnqueue < (time.Now().Unix() - int64(periodicEnqueuerSleep/time.Minute)) } + +func placeSlot(conn redis.Conn, key string, value interface{}, expireTime int64) error { + args := []interface{}{key, value, "NX", "EX", expireTime} + res, err := conn.Do("SET", args...) + if err != nil { + return err + } + // Existing, the value can not be overrid + if res == nil { + return fmt.Errorf("key %s is already set with value %v", key, value) + } + + return nil +} + +func acquireLock(conn redis.Conn, lockerKey string, lockerID string, expireTime int64) error { + return placeSlot(conn, lockerKey, lockerID, expireTime) +} + +func releaseLock(conn redis.Conn, lockerKey string, lockerID string) error { + theID, err := redis.String(conn.Do("GET", lockerKey)) + if err != nil { + return err + } + + if theID == lockerID { + _, err := conn.Do("DEL", lockerKey) + return err + } + + return errors.New("locker ID mismatch") +} diff --git a/src/jobservice/period/enqueuer_test.go b/src/jobservice/period/enqueuer_test.go index 9fd097de4..29e1d0689 100644 --- a/src/jobservice/period/enqueuer_test.go +++ b/src/jobservice/period/enqueuer_test.go @@ -60,10 +60,18 @@ func TestEnqueue(t *testing.T) { t.Error(err) } + if err := clear(ns); err != nil { + t.Error(err) + } +} + +func clear(ns string) error { err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get()) err = tests.Clear(utils.KeyJobStats(ns, "fake_ID"), redisPool.Get()) err = tests.Clear(utils.RedisKeyLastPeriodicEnqueue(ns), redisPool.Get()) if err != nil { - t.Error(err) + return err } + + return nil } diff --git a/src/jobservice/utils/keys.go b/src/jobservice/utils/keys.go index 43b2d0a38..b216c75a8 100644 --- a/src/jobservice/utils/keys.go +++ b/src/jobservice/utils/keys.go @@ -58,6 +58,11 @@ func KeyPeriodicPolicyScore(namespace string) string { return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "key_score") } +// KeyPeriodicJobTimeSlots returns the key of the time slots of scheduled jobs. +func KeyPeriodicJobTimeSlots(namespace string) string { + return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "scheduled_slots") +} + // KeyPeriodicNotification returns the key of periodic pub/sub channel. func KeyPeriodicNotification(namespace string) string { return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")