From 300977d5396b7ce255ae3b0941453b477cb40bad Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 31 Oct 2018 15:19:11 +0800 Subject: [PATCH] Use separate redis conn for the lock mechanism in the periodic enqueuer Signed-off-by: Steven Zou --- src/jobservice/period/enqueuer.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index 373f95190..0627d75b8 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -42,6 +42,7 @@ type periodicEnqueuer struct { stopChan chan struct{} doneStoppingChan chan struct{} statsManager opm.JobStatsManager + identity string } func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore, statsManager opm.JobStatsManager) *periodicEnqueuer { @@ -52,6 +53,7 @@ func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *period statsManager: statsManager, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), + identity: utils.MakeIdentifier(), // Currently, use a generated ID } } @@ -104,7 +106,7 @@ func (pe *periodicEnqueuer) loop() { func (pe *periodicEnqueuer) enqueue() error { now := time.Now().Unix() - logger.Debugf("Periodic enqueuing loop: %d", now) + logger.Debugf("Periodic enqueuing loop by enqueuer %s: %d", pe.identity, now) conn := pe.pool.Get() defer conn.Close() @@ -115,16 +117,23 @@ func (pe *periodicEnqueuer) enqueue() error { } // Avoid schedule in the same time. - lockerKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock") - lockerID := utils.MakeIdentifier() + lockKey := fmt.Sprintf("%s:%s", utils.KeyPeriod(pe.namespace), "lock") + + // Use separate conn for the locker + lockConn := pe.pool.Get() + defer lockConn.Close() // Acquire a locker with 30s expiring time - if err := acquireLock(conn, lockerKey, lockerID, 30); err != nil { + if err := acquireLock(lockConn, lockKey, pe.identity, 30); err != nil { return err } + logger.Debugf("Periodic enqueuer %s acquires lock", pe.identity) + defer func() { - if err := releaseLock(conn, lockerKey, lockerID); err != nil { - logger.Errorf("release enqueue locker failed: %s", err) + if err := releaseLock(lockConn, lockKey, pe.identity); err != nil { + logger.Errorf("Periodic enqueuer %s releases lock failed: %s", pe.identity, err) + } else { + logger.Debugf("Periodic enqueuer %s releases lock", pe.identity) } }() @@ -169,7 +178,7 @@ func (pe *periodicEnqueuer) enqueue() error { slot := fmt.Sprintf("%s:%s@%d", utils.KeyPeriodicJobTimeSlots(pe.namespace), pl.PolicyID, epoch) if err := placeSlot(conn, slot, epoch, expireTime); err != nil { // Logged and continue - logger.Errorf("Failed to place time slot '%s@%d': %s", pl.PolicyID, epoch, err) + logger.Errorf("Failed to place time slot '%s@%d' in enqueuer %s: %s", pl.PolicyID, epoch, pe.identity, err) continue } @@ -178,7 +187,7 @@ func (pe *periodicEnqueuer) enqueue() error { return err } - logger.Infof("Schedule job %s:%s for policy %s at %d\n", job.Name, job.ID, pl.PolicyID, epoch) + logger.Infof("Schedule job %s:%s for policy %s at %d by enqueuer %s", job.Name, job.ID, pl.PolicyID, epoch, pe.identity) // Try to save the stats of new scheduled execution (job). pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch) @@ -189,18 +198,18 @@ func (pe *periodicEnqueuer) enqueue() error { // Register hook for the execution if err := pe.statsManager.RegisterHook(scheduledExecutionID, webHookURL, false); err != nil { // Just logged - logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error: %s", webHookURL, scheduledExecutionID, err) + logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error by enqueuer %s: %s", webHookURL, scheduledExecutionID, pe.identity, err) } } else { // Just a warning - logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s: %s", pl.PolicyID, err) + logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s by enqueuer %s: %s", pl.PolicyID, pe.identity, err) } } // Link the upstream job (policy) with the created executions if len(executions) > 0 { if err := pe.statsManager.AttachExecution(pl.PolicyID, executions...); err != nil { // Just logged it - logger.Errorf("Link upstream job with executions failed: %s", err) + logger.Errorf("Link upstream job with executions failed in enqueuer %s: %s", pe.identity, err) } } // Directly use redis conn to update the periodic job (policy) status