Fix the issue of stopping periodic job (#6174)

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2018-10-31 11:43:46 +08:00 committed by Yan
parent 5d585c7c65
commit b48748492e
5 changed files with 111 additions and 46 deletions

View File

@ -16,6 +16,9 @@ package opm
import "github.com/goharbor/harbor/src/jobservice/models"
// Range for list scope defining
type Range int
// JobStatsManager defines the methods to handle stats of job.
type JobStatsManager interface {
// Start to serve
@ -55,10 +58,11 @@ type JobStatsManager interface {
//
// jobID string : ID of the being retried job
// command string : the command applied to the job like stop/cancel
// isCached bool : to indicate if only cache the op command
//
// Returns:
// error if it was not successfully sent
SendCommand(jobID string, command string) error
SendCommand(jobID string, command string, isCached bool) error
// CtlCommand checks if control command is fired for the specified job.
//
@ -122,9 +126,12 @@ type JobStatsManager interface {
// Get all the executions (IDs) fro the specified upstream Job.
//
// upstreamJobID string: ID of the upstream job
//
// ranges ...Range: Define the start and end for the list, e.g:
// 0, 10 means [0:10]
// 10 means [10:]
// empty means [0:-1]==all
// Returns:
// the ID list of the executions if no error occurred
// or a non-nil error is returned
GetExecutions(upstreamJobID string) ([]string, error)
GetExecutions(upstreamJobID string, ranges ...Range) ([]string, error)
}

View File

@ -44,7 +44,7 @@ const (
opPersistExecutions = "persist_executions"
opUpdateStats = "update_job_stats"
maxFails = 3
jobStatsDataExpireTime = 60 * 60 * 24 * 7 // one week
jobStatsDataExpireTime = 60 * 60 * 24 * 5 // 5 days
// CtlCommandStop : command stop
CtlCommandStop = "stop"
@ -249,7 +249,7 @@ func (rjs *RedisJobStatsManager) loop() {
}
// SendCommand for the specified job
func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error {
func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string, isCached bool) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
@ -258,8 +258,11 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error
return errors.New("unknown command")
}
if err := rjs.opCommands.Fire(jobID, command); err != nil {
return err
if !isCached {
// Let other interested parties awareness
if err := rjs.opCommands.Fire(jobID, command); err != nil {
return err
}
}
// Directly add to op commands maintaining list
@ -341,7 +344,16 @@ func (rjs *RedisJobStatsManager) GetHook(jobID string) (string, error) {
return hookURL, nil
}
return rjs.getHook(jobID)
// Not hit in cache! Get it from the backend.
hookURL, err := rjs.getHook(jobID)
if err != nil {
return "", err
}
// Cache and return
rjs.hookStore.Add(jobID, hookURL)
return hookURL, nil
}
// ExpirePeriodicJobStats marks the periodic job stats expired
@ -379,7 +391,7 @@ func (rjs *RedisJobStatsManager) AttachExecution(upstreamJobID string, execution
}
// GetExecutions returns the existing executions (IDs) for the specified job.
func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string) ([]string, error) {
func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string, ranges ...Range) ([]string, error) {
if len(upstreamJobID) == 0 {
return nil, errors.New("no upstream ID specified")
}
@ -387,8 +399,16 @@ func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string) ([]string,
conn := rjs.redisPool.Get()
defer conn.Close()
var start, end interface{} = "-inf", "+inf"
if len(ranges) >= 1 {
start = int(ranges[0])
}
if len(ranges) > 1 {
end = int(ranges[1])
}
key := utils.KeyUpstreamJobAndExecutions(rjs.namespace, upstreamJobID)
ids, err := redis.Strings(conn.Do("ZRANGE", key, 0, -1))
ids, err := redis.Strings(conn.Do("ZRANGEBYSCORE", key, start, end))
if err != nil {
if err == redis.ErrNil {
return []string{}, nil
@ -786,7 +806,7 @@ func (rjs *RedisJobStatsManager) getHook(jobID string) (string, error) {
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
hookURL, err := redis.String(conn.Do("HMGET", key, "status_hook"))
hookURL, err := redis.String(conn.Do("HGET", key, "status_hook"))
if err != nil {
if err == redis.ErrNil {
return "", fmt.Errorf("no registered web hook found for job '%s'", jobID)

View File

@ -90,7 +90,7 @@ func TestCommand(t *testing.T) {
defer mgr.Shutdown()
<-time.After(200 * time.Millisecond)
if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil {
if err := mgr.SendCommand("fake_job_ID", CtlCommandStop, true); err != nil {
t.Fatal(err)
}

View File

@ -104,6 +104,8 @@ func (pe *periodicEnqueuer) loop() {
func (pe *periodicEnqueuer) enqueue() error {
now := time.Now().Unix()
logger.Debugf("Periodic enqueuing loop: %d", now)
conn := pe.pool.Get()
defer conn.Close()

View File

@ -19,6 +19,7 @@ import (
"fmt"
"math"
"reflect"
"strings"
"time"
"github.com/gocraft/work"
@ -30,7 +31,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/goharbor/harbor/src/jobservice/utils"
"github.com/gomodule/redigo/redis"
"github.com/robfig/cron"
)
var (
@ -419,8 +419,6 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
return err
}
needSetStopStatus := false
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
// Only running job can be stopped
@ -429,12 +427,18 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
}
case job.JobKindScheduled:
// we need to delete the scheduled job in the queue if it is not running yet
// otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
// otherwise, stop it.
if theJob.Stats.Status == job.JobStatusPending {
if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
needSetStopStatus = true
// Update the job status to 'stopped'
gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped)
logger.Debugf("Scheduled job which plan to run at %d '%s' is stopped", theJob.Stats.RunAt, jobID)
return nil
}
case job.JobKindPeriodic:
// firstly delete the periodic job policy
@ -445,31 +449,28 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
logger.Infof("Periodic job policy %s is removed", jobID)
// secondly we need try to delete the job instances scheduled for this periodic job, a try best action
gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) // ignore error as we have logged
if err := gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID); err != nil {
// only logged
logger.Errorf("Errors happened when deleting jobs of periodic policy %s: %s", theJob.Stats.JobID, err)
}
// thirdly expire the job stats of this periodic job if exists
if err := gcwp.statsManager.ExpirePeriodicJobStats(theJob.Stats.JobID); err != nil {
// only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
needSetStopStatus = true
return nil
default:
break
return fmt.Errorf("Job kind %s is not supported", theJob.Stats.JobKind)
}
// Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
// Send 'stop' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil {
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop, false); err != nil {
return err
}
// The job running instance will set the status to 'stopped'
needSetStopStatus = false
}
// If needed, update the job status to 'stopped'
if needSetStopStatus {
gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped)
}
return nil
@ -493,7 +494,7 @@ func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
}
// Send 'cancel' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel); err != nil {
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel, false); err != nil {
return err
}
break
@ -552,30 +553,64 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error {
return gcwp.statsManager.RegisterHook(jobID, hookURL, false)
}
func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
// A try best method to delete the scheduled jobs of one periodic job
func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string) error {
// Check the scope of [-periodicEnqueuerHorizon, -1]
// If the job is still not completed after a 'periodicEnqueuerHorizon', just ignore it
now := time.Now().Unix() // Baseline
startTime := now - (int64)(periodicEnqueuerHorizon/time.Minute)*60
// Try to delete more
// Get the range scope
start := (opm.Range)(startTime)
ids, err := gcwp.statsManager.GetExecutions(policyID, start)
if err != nil {
logger.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := time.Now().Unix()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
logger.Debugf("Found scheduled jobs '%v' in scope [%d,+inf] for periodic job policy %s", ids, start, policyID)
// try to delete more
// return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil {
// only logged
logger.Warningf("Delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
if len(ids) == 0 {
// Treat as a normal case, nothing need to do
return nil
}
multiErrs := []string{}
for _, id := range ids {
subJob, err := gcwp.statsManager.Retrieve(id)
if err != nil {
multiErrs = append(multiErrs, err.Error())
continue // going on
}
if subJob.Stats.Status == job.JobStatusRunning {
// Send 'stop' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(subJob.Stats.JobID, opm.CtlCommandStop, false); err != nil {
multiErrs = append(multiErrs, err.Error())
continue
}
logger.Debugf("Stop running job %s for periodic job policy %s", subJob.Stats.JobID, policyID)
} else {
logger.Infof("Delete scheduled job for periodic job policy %s: runat = %d", policyID, epoch)
if subJob.Stats.JobKind == job.JobKindScheduled &&
subJob.Stats.Status == job.JobStatusPending {
// The pending scheduled job
if err := gcwp.client.DeleteScheduledJob(subJob.Stats.RunAt, subJob.Stats.JobID); err != nil {
multiErrs = append(multiErrs, err.Error())
continue // going on
}
// Log action
logger.Debugf("Delete scheduled job for periodic job policy %s: runat = %d", policyID, subJob.Stats.RunAt)
}
}
}
return err
if len(multiErrs) > 0 {
return errors.New(strings.Join(multiErrs, "\n"))
}
return nil
}
func (gcwp *GoCraftWorkPool) handleSchedulePolicy(data interface{}) error {
@ -637,7 +672,8 @@ func (gcwp *GoCraftWorkPool) handleOPCommandFiring(data interface{}) error {
return errors.New("malformed op command info")
}
return gcwp.statsManager.SendCommand(jobID, command)
// Put the command into the maintaining list
return gcwp.statsManager.SendCommand(jobID, command, true)
}
// log the job