mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-16 15:25:18 +01:00
Merge pull request #13711 from steven-zou/fix/js_api_2.1.0
fix(api):update stop job api
This commit is contained in:
commit
d5c336dc45
@ -515,6 +515,10 @@ func (bt *basicTracker) retrieve() error {
|
||||
key := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
vals, err := redis.Strings(conn.Do("HGETALL", key))
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.ErrNil) {
|
||||
return errs.NoObjectFoundError(bt.jobID)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -31,12 +31,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Waiting a short while if any errors occurred
|
||||
shortLoopInterval = 5 * time.Second
|
||||
// Waiting for long while if no retrying elements found
|
||||
longLoopInterval = 5 * time.Minute
|
||||
// loopInterval is the interval for the loop of restoring dead status
|
||||
loopInterval = 2 * time.Minute
|
||||
// shortInterval is initial interval and be as based to give random buffer to loopInterval
|
||||
shortInterval = 10
|
||||
)
|
||||
|
@ -18,6 +18,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
@ -100,29 +102,16 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
|
||||
return errors.New("bad periodic job ID: nil")
|
||||
}
|
||||
|
||||
// Handle the corresponding job stats of the given periodic job first.
|
||||
tracker, err := bs.ctl.Track(policyID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unschedule periodic job error")
|
||||
}
|
||||
|
||||
// Try to get the numeric ID from the stats of the given periodic job.
|
||||
numericID, err := tracker.NumericID()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unschedule periodic job error")
|
||||
}
|
||||
|
||||
// Switch the job stats to stopped
|
||||
// Should not block the next clear action
|
||||
if err := tracker.Stop(); err != nil {
|
||||
logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err)
|
||||
}
|
||||
|
||||
conn := bs.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
numericID, err := bs.locatePolicy(policyID, conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get downstream executions of the periodic job
|
||||
// And clear these executions
|
||||
// This is a try best action, its failure will not cause the unschedule action failed.
|
||||
@ -134,6 +123,7 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
|
||||
if len(eIDs) == 0 {
|
||||
logger.Debugf("no stopped executions: %s", policyID)
|
||||
}
|
||||
|
||||
for _, eID := range eIDs {
|
||||
eTracker, err := bs.ctl.Track(eID)
|
||||
if err != nil {
|
||||
@ -179,6 +169,41 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Locate the policy and return the numeric ID.
|
||||
// First locate policy by tracker, then locate by looping the policy list in case the job stats data is lost
|
||||
func (bs *basicScheduler) locatePolicy(policyID string, conn redis.Conn) (int64, error) {
|
||||
// Handle the corresponding job stats of the given periodic job first.
|
||||
tracker, err := bs.ctl.Track(policyID)
|
||||
if err != nil {
|
||||
// If error is not found error, then switch to the backup approach
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
// Loop the policy list to get the policy data
|
||||
pl, err := Load(bs.namespace, conn)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
for _, p := range pl {
|
||||
if p.ID == policyID && p.NumericID > 0 {
|
||||
// Found the policy in the queue and return the numeric ID
|
||||
return p.NumericID, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Still not found or other errors
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Switch the job stats to stopped if the job stats existing
|
||||
// Should not block the next clear action
|
||||
if err := tracker.Stop(); err != nil {
|
||||
logger.Errorf("Stop periodic job %s failed with error: %s", policyID, err)
|
||||
}
|
||||
|
||||
return tracker.NumericID()
|
||||
}
|
||||
|
||||
// Clear all the dirty jobs
|
||||
// A scheduled job will be marked as dirty job only if the enqueued timestamp has expired a horizon.
|
||||
// This is a try best action
|
||||
|
@ -20,6 +20,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
@ -63,6 +67,8 @@ func (suite *BasicSchedulerTestSuite) SetupSuite() {
|
||||
func(hookURL string, change *job.StatusChange) error { return nil },
|
||||
)
|
||||
|
||||
suite.setupDirtyJobs()
|
||||
|
||||
suite.scheduler = NewScheduler(ctx, suite.namespace, suite.pool, suite.lcmCtl)
|
||||
suite.scheduler.Start()
|
||||
}
|
||||
@ -121,3 +127,45 @@ func (suite *BasicSchedulerTestSuite) TestScheduler() {
|
||||
err = suite.scheduler.UnSchedule(p.ID)
|
||||
require.NoError(suite.T(), err, "unschedule: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// TestUnSchedule tests un-scheduling a periodic job without job stats
|
||||
func (suite *BasicSchedulerTestSuite) TestUnSchedule() {
|
||||
p := &Policy{
|
||||
ID: "job_id_without_stats",
|
||||
JobName: job.SampleJob,
|
||||
CronSpec: "0 10 10 5 * *",
|
||||
}
|
||||
|
||||
pid, err := suite.scheduler.Schedule(p)
|
||||
require.NoError(suite.T(), err, "schedule: nil error expected but got %s", err)
|
||||
assert.Condition(suite.T(), func() bool {
|
||||
return pid > 0
|
||||
}, "schedule: returned pid should >0")
|
||||
|
||||
// No job stats saved
|
||||
err = suite.scheduler.UnSchedule(p.ID)
|
||||
require.NoError(suite.T(), err, "unschedule: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// setupDirtyJobs adds dirty jobs for testing dirty jobs clear method in the Start()
|
||||
func (suite *BasicSchedulerTestSuite) setupDirtyJobs() {
|
||||
// Add one fake job for next testing
|
||||
j := &work.Job{
|
||||
Name: job.SampleJob,
|
||||
ID: "jid",
|
||||
// Already expired
|
||||
EnqueuedAt: time.Now().Unix() - 86400,
|
||||
Args: map[string]interface{}{"image": "sample:latest"},
|
||||
}
|
||||
|
||||
rawJSON, err := utils.SerializeJob(j)
|
||||
suite.NoError(err, "serialize job model")
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_, err = conn.Do("ZADD", rds.RedisKeyScheduled(suite.namespace), j.EnqueuedAt, rawJSON)
|
||||
suite.NoError(err, "add faked dirty scheduled job")
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ type Policy struct {
|
||||
CronSpec string `json:"cron_spec"`
|
||||
JobParameters map[string]interface{} `json:"job_params,omitempty"`
|
||||
WebHookURL string `json:"web_hook_url,omitempty"`
|
||||
NumericID int64 `json:"numeric_id,omitempty"`
|
||||
}
|
||||
|
||||
// Serialize the policy to raw data.
|
||||
@ -70,22 +71,31 @@ func (p *Policy) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type policyWithScore struct {
|
||||
RawData []byte
|
||||
Score int64
|
||||
}
|
||||
|
||||
// Load all the policies from the backend storage.
|
||||
func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
|
||||
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1))
|
||||
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(namespace), 0, -1, "WITHSCORES"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policyWithScores := make([]policyWithScore, 0)
|
||||
if err := redis.ScanSlice(bytes, &policyWithScores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policies := make([]*Policy, 0)
|
||||
for i, l := 0, len(bytes); i < l; i++ {
|
||||
rawPolicy := bytes[i].([]byte)
|
||||
for _, pw := range policyWithScores {
|
||||
p := &Policy{}
|
||||
|
||||
if err := p.DeSerialize(rawPolicy); err != nil {
|
||||
if err := p.DeSerialize(pw.RawData); err != nil {
|
||||
// Ignore error which means the policy data is not valid
|
||||
// Only logged
|
||||
logger.Errorf("Malformed policy: %s; error: %s", rawPolicy, err)
|
||||
logger.Errorf("Malformed policy: %s; error: %s", pw.RawData, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -95,9 +105,10 @@ func Load(namespace string, conn redis.Conn) ([]*Policy, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
p.NumericID = pw.Score
|
||||
policies = append(policies, p)
|
||||
|
||||
logger.Debugf("Load periodic job policy: %s", string(rawPolicy))
|
||||
logger.Debugf("Load periodic job policy: %s", string(pw.RawData))
|
||||
}
|
||||
|
||||
logger.Debugf("Load %d periodic job policies", len(policies))
|
||||
|
@ -83,6 +83,7 @@ func (suite *PolicyStoreTestSuite) TestLoad() {
|
||||
ps, err := Load(suite.namespace, conn)
|
||||
suite.NoError(err, "load: nil error expected but got %s", err)
|
||||
suite.Equal(1, len(ps), "count of loaded policies")
|
||||
suite.Greater(ps[0].NumericID, int64(0), "numericID of the policy <> 0")
|
||||
}
|
||||
|
||||
// TestPolicy tests policy itself
|
||||
@ -102,3 +103,18 @@ func (suite *PolicyStoreTestSuite) TestPolicy() {
|
||||
err = p2.Validate()
|
||||
assert.Nil(suite.T(), err, "policy validate: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// TestInvalidPolicy tests invalid policy
|
||||
func (suite *PolicyStoreTestSuite) TestInvalidPolicy() {
|
||||
p := &Policy{}
|
||||
suite.Error(p.Validate(), "error should be returned for empty ID")
|
||||
p.ID = "pid"
|
||||
suite.Error(p.Validate(), "error should be returned for empty job name")
|
||||
p.JobName = "GC"
|
||||
p.WebHookURL = "webhook"
|
||||
suite.Error(p.Validate(), "error should be returned for invalid webhook")
|
||||
p.WebHookURL = "https://webhook.com"
|
||||
suite.Error(p.Validate(), "error should be returned for invalid cron spec")
|
||||
p.CronSpec = "0 10 10 * * *"
|
||||
suite.NoError(p.Validate(), "validation passed")
|
||||
}
|
||||
|
@ -63,6 +63,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
|
||||
// Check if the job is a periodic one as periodic job has its own ID format
|
||||
if eID, yes := isPeriodicJobExecution(j); yes {
|
||||
jID = eID
|
||||
|
||||
logger.Infof("Start to run periodical job execution: %s", eID)
|
||||
}
|
||||
|
||||
// As the job stats may not be ready when job executing sometimes (corner case),
|
||||
@ -234,10 +236,6 @@ func isPeriodicJobExecution(j *work.Job) (string, bool) {
|
||||
return fmt.Sprintf("%s@%s", j.ID, epoch), ok
|
||||
}
|
||||
|
||||
func bp(b bool) *bool {
|
||||
return &b
|
||||
}
|
||||
|
||||
func backoff(x int) int {
|
||||
// y=ax^2+bx+c
|
||||
var a, b, c = -111, 666, 500
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
@ -321,42 +323,42 @@ func (w *basicWorker) StopJob(jobID string) error {
|
||||
}
|
||||
|
||||
t, err := w.ctl.Track(jobID)
|
||||
if err != nil {
|
||||
if err != nil && !errs.IsObjectNotFoundError(err) {
|
||||
// For none not found error, directly return
|
||||
return err
|
||||
}
|
||||
|
||||
// For periodical job and stats not found cases
|
||||
if errs.IsObjectNotFoundError(err) || (t != nil && t.Job().Info.JobKind == job.KindPeriodic) {
|
||||
// If the job kind is periodic or
|
||||
// if the original job stats tracker is not found (the scheduler will have a try based on other data under this case)
|
||||
return w.scheduler.UnSchedule(jobID)
|
||||
}
|
||||
|
||||
// General or scheduled job
|
||||
if job.RunningStatus.Compare(job.Status(t.Job().Info.Status)) < 0 {
|
||||
// Job has been in the final states
|
||||
logger.Warningf("Trying to stop a(n) %s job: ID=%s, Kind=%s", t.Job().Info.Status, jobID, t.Job().Info.JobKind)
|
||||
// Under this situation, the non-periodic job we're trying to stop has already been in the "non-running(stopped)" status.
|
||||
// As the goal of stopping the job running has achieved, we directly return nil here.
|
||||
if t.Job().Info.JobKind != job.KindPeriodic {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For the periodic job, its status should always be "Scheduled".
|
||||
// This case should never happen under the current model. But there might be some legacy job stats data
|
||||
// to cause such inconsistent situation.
|
||||
// Under this situation, let the periodical scheduler to handle and fix the issue.
|
||||
return nil
|
||||
}
|
||||
|
||||
switch t.Job().Info.JobKind {
|
||||
case job.KindGeneric:
|
||||
return t.Stop()
|
||||
case job.KindScheduled:
|
||||
// we need to delete the scheduled job in the queue if it is not running yet
|
||||
// otherwise, stop it.
|
||||
// Mark status to stopped
|
||||
if err := t.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Do more for scheduled job kind
|
||||
if t.Job().Info.JobKind == job.KindScheduled {
|
||||
// We need to delete the scheduled job in the queue if it is not running yet
|
||||
if err := w.client.DeleteScheduledJob(t.Job().Info.RunAt, jobID); err != nil {
|
||||
// Job is already running?
|
||||
logger.Errorf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt)
|
||||
logger.Warningf("scheduled job %s (run at = %d) is not found in the queue, is it running?", jobID, t.Job().Info.RunAt)
|
||||
}
|
||||
// Anyway, mark job stopped
|
||||
return t.Stop()
|
||||
case job.KindPeriodic:
|
||||
return w.scheduler.UnSchedule(jobID)
|
||||
default:
|
||||
return errors.Errorf("job kind %s is not supported", t.Job().Info.JobKind)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RetryJob retry the job
|
||||
|
Loading…
Reference in New Issue
Block a user