Merge pull request #13705 from steven-zou/fix/js_api

fix(api):update stop job api
This commit is contained in:
Steven Zou 2020-12-09 13:41:58 +08:00 committed by GitHub
commit 4acb708938
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 154 additions and 54 deletions

View File

@ -515,6 +515,10 @@ func (bt *basicTracker) retrieve() error {
key := rds.KeyJobStats(bt.namespace, bt.jobID)
vals, err := redis.Strings(conn.Do("HGETALL", key))
if err != nil {
if errors.Is(err, redis.ErrNil) {
return errs.NoObjectFoundError(bt.jobID)
}
return err
}

View File

@ -31,12 +31,8 @@ import (
)
const (
// Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute
// loopInterval is the interval for the loop of restoring dead status
loopInterval = 2 * time.Minute
// shortInterval is initial interval and be as based to give random buffer to loopInterval
shortInterval = 10
)

View File

@ -18,6 +18,8 @@ import (
"context"
"time"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
@ -100,29 +102,16 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return errors.New("bad periodic job ID: nil")
}
// Handle the corresponding job stats of the given periodic job first.
tracker, err := bs.ctl.Track(policyID)
if err != nil {
return errors.Wrap(err, "unschedule periodic job error")
}
// Try to get the numeric ID from the stats of the given periodic job.
numericID, err := tracker.NumericID()
if err != nil {
return errors.Wrap(err, "unschedule periodic job error")
}
// Switch the job stats to stopped
// Should not block the next clear action
if err := tracker.Stop(); err != nil {
logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err)
}
conn := bs.pool.Get()
defer func() {
_ = conn.Close()
}()
numericID, err := bs.locatePolicy(policyID, conn)
if err != nil {
return err
}
// Get downstream executions of the periodic job
// And clear these executions
// This is a try best action, its failure will not cause the unschedule action failed.
@ -134,6 +123,7 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
if len(eIDs) == 0 {
logger.Debugf("no stopped executions: %s", policyID)
}
for _, eID := range eIDs {
eTracker, err := bs.ctl.Track(eID)
if err != nil {
@ -179,6 +169,41 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return nil
}
// Locate the policy and return the numeric ID.
// First locate policy by tracker, then locate by looping the policy list in case the job stats data is lost
func (bs *basicScheduler) locatePolicy(policyID string, conn redis.Conn) (int64, error) {
// Handle the corresponding job stats of the given periodic job first.
tracker, err := bs.ctl.Track(policyID)
if err != nil {
// If error is not found error, then switch to the backup approach
if errs.IsObjectNotFoundError(err) {
// Loop the policy list to get the policy data
pl, err := Load(bs.namespace, conn)
if err != nil {
return -1, err
}
for _, p := range pl {
if p.ID == policyID && p.NumericID > 0 {
// Found the policy in the queue and return the numeric ID
return p.NumericID, nil
}
}
}
// Still not found or other errors
return -1, err
}
// Switch the job stats to stopped if the job stats existing
// Should not block the next clear action
if err := tracker.Stop(); err != nil {
logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err)
}
return tracker.NumericID()
}
// Clear all the dirty jobs
// A scheduled job will be marked as dirty job only if the enqueued timestamp has expired a horizon.
// This is a try best action

View File

@ -20,6 +20,10 @@ import (
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
@ -63,6 +67,8 @@ func (suite *BasicSchedulerTestSuite) SetupSuite() {
func(hookURL string, change *job.StatusChange) error { return nil },
)
suite.setupDirtyJobs()
suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl)
suite.scheduler.Start()
}
@ -121,3 +127,45 @@ func (suite *BasicSchedulerTestSuite) TestScheduler() {
err = suite.scheduler.UnSchedule(p.ID)
require.NoError(suite.T(), err, "unschedule: nil error expected but got %s", err)
}
// TestUnSchedule tests un-scheduling a periodic job without job stats
func (suite *BasicSchedulerTestSuite) TestUnSchedule() {
p := &Policy{
ID: "job_id_without_stats",
JobName: job.SampleJob,
CronSpec: "0 10 10 5 * *",
}
pid, err := suite.scheduler.Schedule(p)
require.NoError(suite.T(), err, "schedule: nil error expected but got %s", err)
assert.Condition(suite.T(), func() bool {
return pid > 0
}, "schedule: returned pid should >0")
// No job stats saved
err = suite.scheduler.UnSchedule(p.ID)
require.NoError(suite.T(), err, "unschedule: nil error expected but got %s", err)
}
// setupDirtyJobs adds dirty jobs for testing dirty jobs clear method in the Start()
func (suite *BasicSchedulerTestSuite) setupDirtyJobs() {
// Add one fake job for next testing
j := &work.Job{
Name: job.SampleJob,
ID: "jid",
// Already expired
EnqueuedAt: time.Now().Unix() - 86400,
Args: map[string]interface{}{"image": "sample:latest"},
}
rawJSON, err := utils.SerializeJob(j)
suite.NoError(err, "serialize job model")
conn := suite.pool.Get()
defer func() {
_ = conn.Close()
}()
_, err = conn.Do("ZADD", rds.RedisKeyScheduled(suite.namespace), j.EnqueuedAt, rawJSON)
suite.NoError(err, "add faked dirty scheduled job")
}

View File

@ -35,6 +35,7 @@ type Policy struct {
CronSpec string `json:"cron_spec"`
JobParameters map[string]interface{} `json:"job_params,omitempty"`
WebHookURL string `json:"web_hook_url,omitempty"`
NumericID int64 `json:"numeric_id,omitempty"`
}
// Serialize the policy to raw data.
@ -70,22 +71,31 @@ func (p *Policy) Validate() error {
return nil
}
type policyWithScore struct {
RawData []byte
Score int64
}
// Load all the policies from the backend storage.
func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1))
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1, "WITHSCORES"))
if err != nil {
return nil, err
}
policyWithScores := make([]policyWithScore, 0)
if err := redis.ScanSlice(bytes, &policyWithScores); err != nil {
return nil, err
}
policies := make([]*Policy, 0)
for i, l := 0, len(bytes); i < l; i++ {
rawPolicy := bytes[i].([]byte)
for _, pw := range policyWithScores {
p := &Policy{}
if err := p.DeSerialize(rawPolicy); err != nil {
if err := p.DeSerialize(pw.RawData); err != nil {
// Ignore error which means the policy data is not valid
// Only logged
logger.Errorf("Malformed policy: %s; error: %s", rawPolicy, err)
logger.Errorf("Malformed policy: %s; error: %s", pw.RawData, err)
continue
}
@ -95,9 +105,10 @@ func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
continue
}
p.NumericID = pw.Score
policies = append(policies, p)
logger.Debugf("Load periodic job policy: %s", string(rawPolicy))
logger.Debugf("Load periodic job policy: %s", string(pw.RawData))
}
logger.Debugf("Load %d periodic job policies", len(policies))

View File

@ -83,6 +83,7 @@ func (suite *PolicyStoreTestSuite) TestLoad() {
ps, err := Load(suite.namespace, conn)
suite.NoError(err, "load: nil error expected but got %s", err)
suite.Equal(1, len(ps), "count of loaded policies")
suite.Greater(ps[0].NumericID, int64(0), "numericID of the policy <> 0")
}
// TestPolicy tests policy itself
@ -102,3 +103,18 @@ func (suite *PolicyStoreTestSuite) TestPolicy() {
err = p2.Validate()
assert.Nil(suite.T(), err, "policy validate: nil error expected but got %s", err)
}
// TestInvalidPolicy tests invalid policy
func (suite *PolicyStoreTestSuite) TestInvalidPolicy() {
p := &Policy{}
suite.Error(p.Validate(), "error should be returned for empty ID")
p.ID = "pid"
suite.Error(p.Validate(), "error should be returned for empty job name")
p.JobName = "GC"
p.WebHookURL = "webhook"
suite.Error(p.Validate(), "error should be returned for invalid webhook")
p.WebHookURL = "https://webhook.com"
suite.Error(p.Validate(), "error should be returned for invalid cron spec")
p.CronSpec = "0 10 10 * * *"
suite.NoError(p.Validate(), "validation passed")
}

View File

@ -63,6 +63,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Check if the job is a periodic one as periodic job has its own ID format
if eID, yes := isPeriodicJobExecution(j); yes {
jID = eID
logger.Infof("Start to run periodical job execution: %s", eID)
}
// As the job stats may not be ready when job executing sometimes (corner case),
@ -234,10 +236,6 @@ func isPeriodicJobExecution(j *work.Job) (string, bool) {
return fmt.Sprintf("%s@%s", j.ID, epoch), ok
}
func bp(b bool) *bool {
return &b
}
func backoff(x int) int {
// y=ax^2+bx+c
var a, b, c = -111, 666, 500

View File

@ -20,6 +20,8 @@ import (
"sync"
"time"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/env"
@ -321,42 +323,42 @@ func (w *basicWorker) StopJob(jobID string) error {
}
t, err := w.ctl.Track(jobID)
if err != nil {
if err != nil && !errs.IsObjectNotFoundError(err) {
// For none not found error, directly return
return err
}
// For periodical job and stats not found cases
if errs.IsObjectNotFoundError(err) || (t != nil && t.Job().Info.JobKind == job.KindPeriodic) {
// If the job kind is periodic or
// if the original job stats tracker is not found (the scheduler will have a try based on other data under this case)
return w.scheduler.UnSchedule(jobID)
}
// General or scheduled job
if job.RunningStatus.Compare(job.Status(t.Job().Info.Status)) < 0 {
// Job has been in the final states
logger.Warningf("Trying to stop a(n) %s job: ID=%s, Kind=%s", t.Job().Info.Status, jobID, t.Job().Info.JobKind)
// Under this situation, the non-periodic job we're trying to stop has already been in the "non-running(stopped)" status.
// As the goal of stopping the job running has achieved, we directly return nil here.
if t.Job().Info.JobKind != job.KindPeriodic {
return nil
}
// For the periodic job, its status should always be "Scheduled".
// This case should never happen under the current model. But there might be some legacy job stats data
// to cause such inconsistent situation.
// Under this situation, let the periodical scheduler to handle and fix the issue.
return nil
}
switch t.Job().Info.JobKind {
case job.KindGeneric:
return t.Stop()
case job.KindScheduled:
// we need to delete the scheduled job in the queue if it is not running yet
// otherwise, stop it.
// Mark status to stopped
if err := t.Stop(); err != nil {
return err
}
// Do more for scheduled job kind
if t.Job().Info.JobKind == job.KindScheduled {
// We need to delete the scheduled job in the queue if it is not running yet
if err := w.client.DeleteScheduledJob(t.Job().Info.RunAt, jobID); err != nil {
// Job is already running?
logger.Errorf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt)
logger.Warningf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt)
}
// Anyway, mark job stopped
return t.Stop()
case job.KindPeriodic:
return w.scheduler.UnSchedule(jobID)
default:
return errors.Errorf("job kind %s is not supported", t.Job().Info.JobKind)
}
return nil
}
// RetryJob retry the job