Merge pull request #12121 from steven-zou/fix/job_stop_status_issue

fix(jobservice):mismatch status issue when stopping job
This commit is contained in:
Steven Zou 2020-06-03 18:13:23 +08:00 committed by GitHub
commit 6db856c3e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 41 deletions

View File

@ -166,7 +166,8 @@ func (suite *HookAgentTestSuite) TestRetryAndPopMin() {
func (suite *HookAgentTestSuite) checkStatus() {
t := job.NewBasicTrackerWithID(context.TODO(), suite.jid, suite.namespace, suite.pool, nil, list.New())
err := t.Load()
suite.NoError(err, "load updated job stats")
require.NoError(suite.T(), err, "load updated job stats")
require.NotNil(suite.T(), t.Job(), "latest job stats")
suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status")
}

View File

@ -100,16 +100,22 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return errors.New("bad periodic job ID: nil")
}
// Handle the corresponding job stats of the given periodic job first.
tracker, err := bs.ctl.Track(policyID)
if err != nil {
return err
return errors.Wrap(err, "unschedule periodic job error")
}
// If errors occurred when getting the numeric ID of periodic job,
// may be because the specified job is not a valid periodic job.
// Try to get the numeric ID from the stats of the given periodic job.
numericID, err := tracker.NumericID()
if err != nil {
return err
return errors.Wrap(err, "unschedule periodic job error")
}
// Switch the job stats to stopped
// Should not block the next clear action
if err := tracker.Stop(); err != nil {
logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err)
}
conn := bs.pool.Get()
@ -117,36 +123,6 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
_ = conn.Close()
}()
// Get the un-scheduling policy object
bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID))
if err != nil {
return err
}
p := &Policy{}
if len(bytes) > 0 {
if rawPolicy, ok := bytes[0].([]byte); ok {
if err := p.DeSerialize(rawPolicy); err != nil {
return err
}
}
}
if utils.IsEmptyStr(p.ID) {
// Deserialize failed
return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID)
}
// REM from redis db
// Accurately remove the item with the specified score
if _, err := conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID); err != nil {
return err
}
// Switch the job stats to stopped
// Should not block the next clear action
err = tracker.Stop()
// Get downstream executions of the periodic job
// And clear these executions
// This is a try best action, its failure will not cause the unschedule action failed.
@ -169,7 +145,7 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
// Only need to care the pending and running ones
// Do clear
if job.ScheduledStatus == job.Status(e.Info.Status) {
// Please pay attention here, the job ID used in the scheduled jon queue is
// Please pay attention here, the job ID used in the scheduled job queue is
// the ID of the periodic job (policy).
if err := bs.client.DeleteScheduledJob(e.Info.RunAt, policyID); err != nil {
logger.Errorf("Delete scheduled job %s error: %s", eID, err)
@ -178,16 +154,29 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
// Mark job status to stopped to block execution.
// The executions here should not be in the final states,
// double confirmation: only stop the stopped ones.
// double confirmation: only stop the can-stop ones.
if job.RunningStatus.Compare(job.Status(e.Info.Status)) >= 0 {
if err := eTracker.Stop(); err != nil {
logger.Errorf("Stop execution %s error: %s", eID, err)
} else {
logger.Debugf("Stop execution %s of periodic job %s", eID, policyID)
}
}
}
}
return err
// REM from redis db
// Accurately remove the item with the specified score
removed, err := redis.Int64(conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID))
if err != nil {
return errors.Wrap(err, "unschedule periodic job error")
}
if removed == 0 {
logger.Warningf("No periodic job with ID=%s and numeric ID=%d removed from the periodic job policy set", policyID, numericID)
}
return nil
}
// Clear all the dirty jobs

View File

@ -327,7 +327,17 @@ func (w *basicWorker) StopJob(jobID string) error {
if job.RunningStatus.Compare(job.Status(t.Job().Info.Status)) < 0 {
// Job has been in the final states
return errors.Errorf("mismatch job status for stopping job: %s, job status %s is behind %s", jobID, t.Job().Info.Status, job.RunningStatus)
logger.Warningf("Trying to stop a(n) %s job: ID=%s, Kind=%s", t.Job().Info.Status, jobID, t.Job().Info.JobKind)
// Under this situation, the non-periodic job we're trying to stop has already been in the "non-running(stopped)" status.
// As the goal of stopping the job running has achieved, we directly return nil here.
if t.Job().Info.JobKind != job.KindPeriodic {
return nil
}
// For the periodic job, its status should always be "Scheduled".
// This case should never happen under the current model. But there might be some legacy job stats data
// to cause such inconsistent situation.
// Under this situation, let the periodical scheduler to handle and fix the issue.
}
switch t.Job().Info.JobKind {
@ -338,9 +348,9 @@ func (w *basicWorker) StopJob(jobID string) error {
// otherwise, stop it.
if err := w.client.DeleteScheduledJob(t.Job().Info.RunAt, jobID); err != nil {
// Job is already running?
logger.Errorf("scheduled job %s (run at = %d) is not found in the queue to stop, is it already running?", jobID, t.Job().Info.RunAt)
logger.Errorf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt)
}
// Anyway, mark jon stopped
// Anyway, mark job stopped
return t.Stop()
case job.KindPeriodic:
return w.scheduler.UnSchedule(jobID)