Merge pull request #6151 from steven-zou/fix_missing_webhook_issue

Fix the web hook mising issue for the periodic job executions
This commit is contained in:
Steven Zou 2018-10-26 14:51:27 +08:00 committed by GitHub
commit 08d363fc8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 13 deletions

View File

@ -93,6 +93,15 @@ type JobStatsManager interface {
// error if meet any problems // error if meet any problems
RegisterHook(jobID string, hookURL string, isCached bool) error RegisterHook(jobID string, hookURL string, isCached bool) error
// Get hook returns the web hook url for the specified job if it is registered
//
// jobID string : ID of job
//
// Returns:
// the web hook url if existing
// non-nil error if meet any problems
GetHook(jobID string) (string, error)
// Mark the periodic job stats expired // Mark the periodic job stats expired
// //
// jobID string : ID of job // jobID string : ID of job

View File

@ -330,6 +330,20 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return nil return nil
} }
// GetHook returns the status web hook url for the specified job if existing
func (rjs *RedisJobStatsManager) GetHook(jobID string) (string, error) {
if utils.IsEmptyStr(jobID) {
return "", errors.New("empty job ID")
}
// First retrieve from the cache
if hookURL, ok := rjs.hookStore.Get(jobID); ok {
return hookURL, nil
}
return rjs.getHook(jobID)
}
// ExpirePeriodicJobStats marks the periodic job stats expired // ExpirePeriodicJobStats marks the periodic job stats expired
func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error { func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get() conn := rjs.redisPool.Get()
@ -772,23 +786,15 @@ func (rjs *RedisJobStatsManager) getHook(jobID string) (string, error) {
defer conn.Close() defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID) key := utils.KeyJobStats(rjs.namespace, jobID)
vals, err := redis.Strings(conn.Do("HGETALL", key)) hookURL, err := redis.String(conn.Do("HMGET", key, "status_hook"))
if err != nil { if err != nil {
if err == redis.ErrNil {
return "", fmt.Errorf("no registered web hook found for job '%s'", jobID)
}
return "", err return "", err
} }
for i, l := 0, len(vals); i < l; i = i + 2 { return hookURL, nil
prop := vals[i]
value := vals[i+1]
switch prop {
case "status_hook":
return value, nil
default:
break
}
}
return "", fmt.Errorf("no hook found for job '%s'", jobID)
} }
func backoff(seed uint) int { func backoff(seed uint) int {

View File

@ -145,6 +145,19 @@ func (pe *periodicEnqueuer) enqueue() error {
// Try to save the stats of new scheduled execution (job). // Try to save the stats of new scheduled execution (job).
pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch) pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch)
// Get web hook from the periodic job (policy)
webHookURL, err := pe.statsManager.GetHook(pl.PolicyID)
if err == nil {
// Register hook for the execution
if err := pe.statsManager.RegisterHook(scheduledExecutionID, webHookURL, false); err != nil {
// Just logged
logger.Errorf("Failed to register web hook '%s' for periodic job (execution) '%s' with error: %s", webHookURL, scheduledExecutionID, err)
}
} else {
// Just a warning
logger.Warningf("Failed to retrieve web hook for periodic job (policy) %s: %s", pl.PolicyID, err)
}
} }
// Link the upstream job (policy) with the created executions // Link the upstream job (policy) with the created executions
if len(executions) > 0 { if len(executions) > 0 {