diff --git a/src/jobservice/hook/hook_agent_test.go b/src/jobservice/hook/hook_agent_test.go index 5821956bc..e0ef0a206 100644 --- a/src/jobservice/hook/hook_agent_test.go +++ b/src/jobservice/hook/hook_agent_test.go @@ -166,7 +166,8 @@ func (suite *HookAgentTestSuite) TestRetryAndPopMin() { func (suite *HookAgentTestSuite) checkStatus() { t := job.NewBasicTrackerWithID(context.TODO(), suite.jid, suite.namespace, suite.pool, nil, list.New()) err := t.Load() - suite.NoError(err, "load updated job stats") + require.NoError(suite.T(), err, "load updated job stats") + require.NotNil(suite.T(), t.Job(), "latest job stats") suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status") } diff --git a/src/jobservice/period/basic_scheduler.go b/src/jobservice/period/basic_scheduler.go index 3dd3adefa..ced91361e 100644 --- a/src/jobservice/period/basic_scheduler.go +++ b/src/jobservice/period/basic_scheduler.go @@ -100,16 +100,22 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { return errors.New("bad periodic job ID: nil") } + // Handle the corresponding job stats of the given periodic job first. tracker, err := bs.ctl.Track(policyID) if err != nil { - return err + return errors.Wrap(err, "unschedule periodic job error") } - // If errors occurred when getting the numeric ID of periodic job, - // may be because the specified job is not a valid periodic job. + // Try to get the numeric ID from the stats of the given periodic job. numericID, err := tracker.NumericID() if err != nil { - return err + return errors.Wrap(err, "unschedule periodic job error") + } + + // Switch the job stats to stopped + // Should not block the next clear action + if err := tracker.Stop(); err != nil { + logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err) } conn := bs.pool.Get() @@ -117,36 +123,6 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { _ = conn.Close() }() - // Get the un-scheduling policy object - bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID)) - if err != nil { - return err - } - - p := &Policy{} - if len(bytes) > 0 { - if rawPolicy, ok := bytes[0].([]byte); ok { - if err := p.DeSerialize(rawPolicy); err != nil { - return err - } - } - } - - if utils.IsEmptyStr(p.ID) { - // Deserialize failed - return errors.Errorf("no valid periodic job policy found: %s:%d", policyID, numericID) - } - - // REM from redis db - // Accurately remove the item with the specified score - if _, err := conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID); err != nil { - return err - } - - // Switch the job stats to stopped - // Should not block the next clear action - err = tracker.Stop() - // Get downstream executions of the periodic job // And clear these executions // This is a try best action, its failure will not cause the unschedule action failed. @@ -169,7 +145,7 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { // Only need to care the pending and running ones // Do clear if job.ScheduledStatus == job.Status(e.Info.Status) { - // Please pay attention here, the job ID used in the scheduled jon queue is + // Please pay attention here, the job ID used in the scheduled job queue is // the ID of the periodic job (policy). if err := bs.client.DeleteScheduledJob(e.Info.RunAt, policyID); err != nil { logger.Errorf("Delete scheduled job %s error: %s", eID, err) @@ -178,16 +154,29 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { // Mark job status to stopped to block execution. // The executions here should not be in the final states, - // double confirmation: only stop the stopped ones. + // double confirmation: only stop the can-stop ones. if job.RunningStatus.Compare(job.Status(e.Info.Status)) >= 0 { if err := eTracker.Stop(); err != nil { logger.Errorf("Stop execution %s error: %s", eID, err) + } else { + logger.Debugf("Stop execution %s of periodic job %s", eID, policyID) } } } } - return err + // REM from redis db + // Accurately remove the item with the specified score + removed, err := redis.Int64(conn.Do("ZREMRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID)) + if err != nil { + return errors.Wrap(err, "unschedule periodic job error") + } + + if removed == 0 { + logger.Warningf("No periodic job with ID=%s and numeric ID=%d removed from the periodic job policy set", policyID, numericID) + } + + return nil } // Clear all the dirty jobs diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index 6bd8f77c1..5d608899e 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -327,7 +327,17 @@ func (w *basicWorker) StopJob(jobID string) error { if job.RunningStatus.Compare(job.Status(t.Job().Info.Status)) < 0 { // Job has been in the final states - return errors.Errorf("mismatch job status for stopping job: %s, job status %s is behind %s", jobID, t.Job().Info.Status, job.RunningStatus) + logger.Warningf("Trying to stop a(n) %s job: ID=%s, Kind=%s", t.Job().Info.Status, jobID, t.Job().Info.JobKind) + // Under this situation, the non-periodic job we're trying to stop has already been in the "non-running(stopped)" status. + // As the goal of stopping the job running has achieved, we directly return nil here. + if t.Job().Info.JobKind != job.KindPeriodic { + return nil + } + + // For the periodic job, its status should always be "Scheduled". + // This case should never happen under the current model. But there might be some legacy job stats data + // to cause such inconsistent situation. + // Under this situation, let the periodical scheduler to handle and fix the issue. } switch t.Job().Info.JobKind { @@ -338,9 +348,9 @@ func (w *basicWorker) StopJob(jobID string) error { // otherwise, stop it. if err := w.client.DeleteScheduledJob(t.Job().Info.RunAt, jobID); err != nil { // Job is already running? - logger.Errorf("scheduled job %s (run at = %d) is not found in the queue to stop, is it already running?", jobID, t.Job().Info.RunAt) + logger.Errorf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt) } - // Anyway, mark jon stopped + // Anyway, mark job stopped return t.Stop() case job.KindPeriodic: return w.scheduler.UnSchedule(jobID)