Use separate redis conn for the lock mechanism in the periodic enqueuer

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2018-10-31 15:19:11 +08:00
parent 646cf282ba
commit 300977d539

View File

@ -42,6 +42,7 @@ type periodicEnqueuer struct {
stopChan chan struct{} stopChan chan struct{}
doneStoppingChan chan struct{} doneStoppingChan chan struct{}
statsManager opm.JobStatsManager statsManager opm.JobStatsManager
identity string
} }
func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore, statsManager opm.JobStatsManager) *periodicEnqueuer { func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore, statsManager opm.JobStatsManager) *periodicEnqueuer {
@ -52,6 +53,7 @@ func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *period
statsManager: statsManager, statsManager: statsManager,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}), doneStoppingChan: make(chan struct{}),
identity: utils.MakeIdentifier(), // Currently, use a generated ID
} }
} }
@ -104,7 +106,7 @@ func (pe *periodicEnqueuer) loop() {
func (pe *periodicEnqueuer) enqueue() error { func (pe *periodicEnqueuer) enqueue() error {
now := time.Now().Unix() now := time.Now().Unix()
logger.Debugf("Periodic enqueuing loop: %d", now) logger.Debugf("Periodic enqueuing loop by enqueuer %s: %d", pe.identity, now)
conn := pe.pool.Get() conn := pe.pool.Get()
defer conn.Close() defer conn.Close()
@ -115,16 +117,23 @@ func (pe *periodicEnqueuer) enqueue() error {
} }
// Avoid schedule in the same time. // Avoid schedule in the same time.
lockerKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock") lockKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock")
lockerID := utils.MakeIdentifier()
// Use separate conn for the locker
lockConn := pe.pool.Get()
defer lockConn.Close()
// Acquire a locker with 30s expiring time // Acquire a locker with 30s expiring time
if err := acquireLock(conn, lockerKey, lockerID, 30); err != nil { if err := acquireLock(lockConn, lockKey, pe.identity, 30); err != nil {
return err return err
} }
logger.Debugf("Periodic enqueuer %s acquires lock", pe.identity)
defer func() { defer func() {
if err := releaseLock(conn, lockerKey, lockerID); err != nil { if err := releaseLock(lockConn, lockKey, pe.identity); err != nil {
logger.Errorf("release enqueue locker failed: %s", err) logger.Errorf("Periodic enqueuer %s releases lock failed: %s", pe.identity, err)
} else {
logger.Debugf("Periodic enqueuer %s releases lock", pe.identity)
} }
}() }()
@ -169,7 +178,7 @@ func (pe *periodicEnqueuer) enqueue() error {
slot := fmt.Sprintf("%s:%s@%d", utils.KeyPeriodicJobTimeSlots(pe.namespace), pl.PolicyID, epoch) slot := fmt.Sprintf("%s:%s@%d", utils.KeyPeriodicJobTimeSlots(pe.namespace), pl.PolicyID, epoch)
if err := placeSlot(conn, slot, epoch, expireTime); err != nil { if err := placeSlot(conn, slot, epoch, expireTime); err != nil {
// Logged and continue // Logged and continue
logger.Errorf("Failed to place time slot '%s@%d': %s", pl.PolicyID, epoch, err) logger.Errorf("Failed to place time slot '%s@%d' in enqueuer %s: %s", pl.PolicyID, epoch, pe.identity, err)
continue continue
} }
@ -178,7 +187,7 @@ func (pe *periodicEnqueuer) enqueue() error {
return err return err
} }
logger.Infof("Schedule job %s:%s for policy %s at %d\n", job.Name, job.ID, pl.PolicyID, epoch) logger.Infof("Schedule job %s:%s for policy %s at %d by enqueuer %s", job.Name, job.ID, pl.PolicyID, epoch, pe.identity)
// Try to save the stats of new scheduled execution (job). // Try to save the stats of new scheduled execution (job).
pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch) pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch)
@ -189,18 +198,18 @@ func (pe *periodicEnqueuer) enqueue() error {
// Register hook for the execution // Register hook for the execution
if err := pe.statsManager.RegisterHook(scheduledExecutionID, webHookURL, false); err != nil { if err := pe.statsManager.RegisterHook(scheduledExecutionID, webHookURL, false); err != nil {
// Just logged // Just logged
logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error: %s", webHookURL, scheduledExecutionID, err) logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error by enqueuer %s: %s", webHookURL, scheduledExecutionID, pe.identity, err)
} }
} else { } else {
// Just a warning // Just a warning
logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s: %s", pl.PolicyID, err) logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s by enqueuer %s: %s", pl.PolicyID, pe.identity, err)
} }
} }
// Link the upstream job (policy) with the created executions // Link the upstream job (policy) with the created executions
if len(executions) > 0 { if len(executions) > 0 {
if err := pe.statsManager.AttachExecution(pl.PolicyID, executions...); err != nil { if err := pe.statsManager.AttachExecution(pl.PolicyID, executions...); err != nil {
// Just logged it // Just logged it
logger.Errorf("Link upstream job with executions failed: %s", err) logger.Errorf("Link upstream job with executions failed in enqueuer %s: %s", pe.identity, err)
} }
} }
// Directly use redis conn to update the periodic job (policy) status // Directly use redis conn to update the periodic job (policy) status