Merge pull request #6167 from steven-zou/fix_issue_of_duplicated_schedule

Fix the issue of duplicated periodic job scheduling
This commit is contained in:
Daniel Jiang 2018-10-29 15:56:03 +08:00 committed by GitHub
commit 4dc89f57f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 18 deletions

View File

@ -20,11 +20,8 @@ import (
"strings"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm"
)
@ -102,7 +99,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
return errs.JobStoppedError()
}
fmt.Println("Launch sub job")
/*fmt.Println("Launch sub job")
jobParams := make(map[string]interface{})
jobParams["image"] = "demo:1.7"
subDemoJob := models.JobRequest{
@ -115,16 +112,15 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
},
}
for i := 0; i < 5; i++ {
subJob, err := ctx.LaunchJob(subDemoJob)
if err != nil {
fmt.Printf("Create sub job failed with error: %s\n", err)
logger.Error(err)
}
fmt.Printf("Sub job: %v", subJob)
subJob, err := ctx.LaunchJob(subDemoJob)
if err != nil {
fmt.Printf("Create sub job failed with error: %s\n", err)
logger.Error(err)
return
}
fmt.Printf("Sub job: %v", subJob)*/
fmt.Println("I'm close to end")
return nil

View File

@ -15,6 +15,7 @@
package period
import (
"errors"
"fmt"
"math/rand"
"time"
@ -77,6 +78,8 @@ func (pe *periodicEnqueuer) loop() {
if err != nil {
logger.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err)
}
} else {
logger.Debug("Enqueue condition not matched, do nothing.")
}
for {
@ -91,6 +94,8 @@ func (pe *periodicEnqueuer) loop() {
if err != nil {
logger.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err)
}
} else {
logger.Debug("Enqueue condition not matched, do nothing.")
}
}
}
@ -98,12 +103,32 @@ func (pe *periodicEnqueuer) loop() {
func (pe *periodicEnqueuer) enqueue() error {
now := time.Now().Unix()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
conn := pe.pool.Get()
defer conn.Close()
// Set last periodic enqueue timestamp in advance to avoid duplicated enqueue actions
if _, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now); err != nil {
return err
}
// Avoid schedule in the same time.
lockerKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock")
lockerID := utils.MakeIdentifier()
// Acquire a locker with 30s expiring time
if err := acquireLock(conn, lockerKey, lockerID, 30); err != nil {
return err
}
defer func() {
if err := releaseLock(conn, lockerKey, lockerID); err != nil {
logger.Errorf("release enqueue locker failed: %s", err)
}
}()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
for _, pl := range pe.policyStore.list() {
schedule, err := cron.Parse(pl.CronSpec)
if err != nil {
@ -136,6 +161,16 @@ func (pe *periodicEnqueuer) enqueue() error {
return err
}
// Place the time slots for the job (policy)
// If the slot is already there, error will be returned.
expireTime := (epoch - nowTime.Unix()) + 5
slot := fmt.Sprintf("%s:%s@%d", utils.KeyPeriodicJobTimeSlots(pe.namespace), pl.PolicyID, epoch)
if err := placeSlot(conn, slot, epoch, expireTime); err != nil {
// Logged and continue
logger.Errorf("Failed to place time slot '%s@%d': %s", pl.PolicyID, epoch, err)
continue
}
_, err = conn.Do("ZADD", utils.RedisKeyScheduled(pe.namespace), epoch, rawJSON)
if err != nil {
return err
@ -171,9 +206,7 @@ func (pe *periodicEnqueuer) enqueue() error {
conn.Do("HMSET", utils.KeyJobStats(pe.namespace, pl.PolicyID), "status", job.JobStatusScheduled, "update_time", time.Now().Unix())
}
_, err := conn.Do("SET", utils.RedisKeyLastPeriodicEnqueue(pe.namespace), now)
return err
return nil
}
func (pe *periodicEnqueuer) createExecution(upstreamJobID, upstreamJobName, executionID string, runAt int64) {
@ -208,3 +241,35 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool {
return lastEnqueue < (time.Now().Unix() - int64(periodicEnqueuerSleep/time.Minute))
}
func placeSlot(conn redis.Conn, key string, value interface{}, expireTime int64) error {
args := []interface{}{key, value, "NX", "EX", expireTime}
res, err := conn.Do("SET", args...)
if err != nil {
return err
}
// Existing, the value can not be overrid
if res == nil {
return fmt.Errorf("key %s is already set with value %v", key, value)
}
return nil
}
func acquireLock(conn redis.Conn, lockerKey string, lockerID string, expireTime int64) error {
return placeSlot(conn, lockerKey, lockerID, expireTime)
}
func releaseLock(conn redis.Conn, lockerKey string, lockerID string) error {
theID, err := redis.String(conn.Do("GET", lockerKey))
if err != nil {
return err
}
if theID == lockerID {
_, err := conn.Do("DEL", lockerKey)
return err
}
return errors.New("locker ID mismatch")
}

View File

@ -60,10 +60,18 @@ func TestEnqueue(t *testing.T) {
t.Error(err)
}
if err := clear(ns); err != nil {
t.Error(err)
}
}
func clear(ns string) error {
err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get())
err = tests.Clear(utils.KeyJobStats(ns, "fake_ID"), redisPool.Get())
err = tests.Clear(utils.RedisKeyLastPeriodicEnqueue(ns), redisPool.Get())
if err != nil {
t.Error(err)
return err
}
return nil
}

View File

@ -58,6 +58,11 @@ func KeyPeriodicPolicyScore(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "key_score")
}
// KeyPeriodicJobTimeSlots returns the key of the time slots of scheduled jobs.
func KeyPeriodicJobTimeSlots(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "scheduled_slots")
}
// KeyPeriodicNotification returns the key of periodic pub/sub channel.
func KeyPeriodicNotification(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")