Merge pull request #12096 from steven-zou/fix/remove_checkin_data

fix(jobservice):fix issues of jobservice
This commit is contained in:
Steven Zou 2020-05-29 16:49:33 +08:00 committed by GitHub
commit a032c546f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 12 additions and 57 deletions

View File

@ -88,8 +88,14 @@ if res then
end end
if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then
-- expire the job stats with shorter interval -- expire the job stats with shorter interval (1 day)
redis.call('expire', KEYS[1], 86400) redis.call('expire', KEYS[1], 86400)
elseif ARGV[1] == 'Error' then
-- expire the job stats with normal interval (7 days) incase it may be retried again
redis.call('expire', KEYS[1], 604800)
else
-- remove the expire time if existing
redis.call('persist', KEYS[1])
end end
end end

View File

@ -17,7 +17,6 @@ package job
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"math/rand"
"strconv" "strconv"
"time" "time"
@ -32,8 +31,8 @@ import (
) )
const ( const (
// Try best to keep the job stats data but anyway clear it after a reasonable time // Check in data placeholder for saving data space
statDataExpireTime = 7 * 24 * 3600 redundantCheckInData = "[REDUNDANT]"
) )
// Tracker is designed to track the life cycle of the job described by the stats // Tracker is designed to track the life cycle of the job described by the stats
@ -85,9 +84,6 @@ type Tracker interface {
// The current status of job // The current status of job
Status() (Status, error) Status() (Status, error)
// Expire the job stats data
Expire() error
// Switch status to running // Switch status to running
Run() error Run() error
@ -237,7 +233,7 @@ func (bt *basicTracker) CheckIn(message string) error {
bt.refresh(current, message) bt.refresh(current, message)
err := bt.fireHookEvent(current, message) err := bt.fireHookEvent(current, message)
err = bt.Update( err = bt.Update(
"check_in", message, // skip checkin data here
"check_in_at", now, "check_in_at", now,
"update_time", now, "update_time", now,
) )
@ -245,11 +241,6 @@ func (bt *basicTracker) CheckIn(message string) error {
return err return err
} }
// Expire job stats
func (bt *basicTracker) Expire() error {
return bt.expire(statDataExpireTime)
}
// Run job // Run job
// Either one is failed, the final return will be marked as failed. // Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Run() error { func (bt *basicTracker) Run() error {
@ -325,7 +316,7 @@ func (bt *basicTracker) Save() (err error) {
) )
if stats.Info.CheckInAt > 0 && !utils.IsEmptyStr(stats.Info.CheckIn) { if stats.Info.CheckInAt > 0 && !utils.IsEmptyStr(stats.Info.CheckIn) {
args = append(args, args = append(args,
"check_in", stats.Info.CheckIn, "check_in", redundantCheckInData, // use data placeholder for saving space
"check_in_at", stats.Info.CheckInAt, "check_in_at", stats.Info.CheckInAt,
) )
} }
@ -355,21 +346,6 @@ func (bt *basicTracker) Save() (err error) {
// Set inprogress track lock // Set inprogress track lock
err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2) err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2)
// If job kind is periodic job, expire time should not be set
// If job kind is scheduled job, expire time should be runAt+
if stats.Info.JobKind != KindPeriodic {
var expireTime int64 = statDataExpireTime
if stats.Info.JobKind == KindScheduled {
nowTime := time.Now().Unix()
future := stats.Info.RunAt - nowTime
if future > 0 {
expireTime += future
}
}
expireTime += rand.Int63n(15) // Avoid lots of keys being expired at the same time
err = conn.Send("EXPIRE", key, expireTime)
}
// Link with its upstream job if upstream job ID exists for future querying // Link with its upstream job if upstream job ID exists for future querying
if !utils.IsEmptyStr(stats.Info.UpstreamJobID) { if !utils.IsEmptyStr(stats.Info.UpstreamJobID) {
k := rds.KeyUpstreamJobAndExecutions(bt.namespace, stats.Info.UpstreamJobID) k := rds.KeyUpstreamJobAndExecutions(bt.namespace, stats.Info.UpstreamJobID)
@ -587,7 +563,7 @@ func (bt *basicTracker) retrieve() error {
res.Info.CheckInAt = parseInt64(value) res.Info.CheckInAt = parseInt64(value)
break break
case "check_in": case "check_in":
res.Info.CheckIn = value res.Info.CheckIn = "" // never read checkin placeholder data
break break
case "cron_spec": case "cron_spec":
res.Info.CronSpec = value res.Info.CronSpec = value
@ -631,25 +607,6 @@ func (bt *basicTracker) retrieve() error {
return nil return nil
} }
func (bt *basicTracker) expire(expireTime int64) error {
conn := bt.pool.Get()
defer func() {
_ = conn.Close()
}()
key := rds.KeyJobStats(bt.namespace, bt.jobID)
num, err := conn.Do("EXPIRE", key, expireTime)
if err != nil {
return err
}
if num == 0 {
return errors.Errorf("job stats for expiring %s does not exist", bt.jobID)
}
return nil
}
func getStatus(conn redis.Conn, key string) (Status, error) { func getStatus(conn redis.Conn, key string) (Status, error) {
values, err := rds.HmGet(conn, key, "status") values, err := rds.HmGet(conn, key, "status")
if err != nil { if err != nil {

View File

@ -151,9 +151,6 @@ func (suite *TrackerTestSuite) TestTracker() {
st, err = t.Status() st, err = t.Status()
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
assert.Equal(suite.T(), StoppedStatus, st) assert.Equal(suite.T(), StoppedStatus, st)
err = t.Expire()
assert.NoError(suite.T(), err)
} }
// TestPeriodicTracker tests tracker of periodic // TestPeriodicTracker tests tracker of periodic

View File

@ -143,11 +143,6 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return err return err
} }
// Expire periodic job stats
if err := tracker.Expire(); err != nil {
logger.Error(err)
}
// Switch the job stats to stopped // Switch the job stats to stopped
// Should not block the next clear action // Should not block the next clear action
err = tracker.Stop() err = tracker.Stop()