fix(jobservice):fix issues of jobservice

- never expire the jobs that are not entering the final status (Error,Success or Stopped)
- set different expireation time to the jobs with different status
- never store the `check_in` data in the redis db to save space

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2020-05-29 11:36:08 +08:00
parent 7c2bfb1378
commit 51f1b2e590
4 changed files with 12 additions and 57 deletions

View File

@ -88,8 +88,14 @@ if res then
end
if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then
-- expire the job stats with shorter interval
-- expire the job stats with shorter interval (1 day)
redis.call('expire', KEYS[1], 86400)
elseif ARGV[1] == 'Error' then
-- expire the job stats with normal interval (7 days) incase it may be retried again
redis.call('expire', KEYS[1], 604800)
else
-- remove the expire time if existing
redis.call('persist', KEYS[1])
end
end

View File

@ -17,7 +17,6 @@ package job
import (
"context"
"encoding/json"
"math/rand"
"strconv"
"time"
@ -32,8 +31,8 @@ import (
)
const (
// Try best to keep the job stats data but anyway clear it after a reasonable time
statDataExpireTime = 7 * 24 * 3600
// Check in data placeholder for saving data space
redundantCheckInData = "[REDUNDANT]"
)
// Tracker is designed to track the life cycle of the job described by the stats
@ -85,9 +84,6 @@ type Tracker interface {
// The current status of job
Status() (Status, error)
// Expire the job stats data
Expire() error
// Switch status to running
Run() error
@ -237,7 +233,7 @@ func (bt *basicTracker) CheckIn(message string) error {
bt.refresh(current, message)
err := bt.fireHookEvent(current, message)
err = bt.Update(
"check_in", message,
// skip checkin data here
"check_in_at", now,
"update_time", now,
)
@ -245,11 +241,6 @@ func (bt *basicTracker) CheckIn(message string) error {
return err
}
// Expire job stats
func (bt *basicTracker) Expire() error {
return bt.expire(statDataExpireTime)
}
// Run job
// Either one is failed, the final return will be marked as failed.
func (bt *basicTracker) Run() error {
@ -325,7 +316,7 @@ func (bt *basicTracker) Save() (err error) {
)
if stats.Info.CheckInAt > 0 && !utils.IsEmptyStr(stats.Info.CheckIn) {
args = append(args,
"check_in", stats.Info.CheckIn,
"check_in", redundantCheckInData, // use data placeholder for saving space
"check_in_at", stats.Info.CheckInAt,
)
}
@ -355,21 +346,6 @@ func (bt *basicTracker) Save() (err error) {
// Set inprogress track lock
err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2)
// If job kind is periodic job, expire time should not be set
// If job kind is scheduled job, expire time should be runAt+
if stats.Info.JobKind != KindPeriodic {
var expireTime int64 = statDataExpireTime
if stats.Info.JobKind == KindScheduled {
nowTime := time.Now().Unix()
future := stats.Info.RunAt - nowTime
if future > 0 {
expireTime += future
}
}
expireTime += rand.Int63n(15) // Avoid lots of keys being expired at the same time
err = conn.Send("EXPIRE", key, expireTime)
}
// Link with its upstream job if upstream job ID exists for future querying
if !utils.IsEmptyStr(stats.Info.UpstreamJobID) {
k := rds.KeyUpstreamJobAndExecutions(bt.namespace, stats.Info.UpstreamJobID)
@ -587,7 +563,7 @@ func (bt *basicTracker) retrieve() error {
res.Info.CheckInAt = parseInt64(value)
break
case "check_in":
res.Info.CheckIn = value
res.Info.CheckIn = "" // never read checkin placeholder data
break
case "cron_spec":
res.Info.CronSpec = value
@ -631,25 +607,6 @@ func (bt *basicTracker) retrieve() error {
return nil
}
func (bt *basicTracker) expire(expireTime int64) error {
conn := bt.pool.Get()
defer func() {
_ = conn.Close()
}()
key := rds.KeyJobStats(bt.namespace, bt.jobID)
num, err := conn.Do("EXPIRE", key, expireTime)
if err != nil {
return err
}
if num == 0 {
return errors.Errorf("job stats for expiring %s does not exist", bt.jobID)
}
return nil
}
func getStatus(conn redis.Conn, key string) (Status, error) {
values, err := rds.HmGet(conn, key, "status")
if err != nil {

View File

@ -151,9 +151,6 @@ func (suite *TrackerTestSuite) TestTracker() {
st, err = t.Status()
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), StoppedStatus, st)
err = t.Expire()
assert.NoError(suite.T(), err)
}
// TestPeriodicTracker tests tracker of periodic

View File

@ -143,11 +143,6 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
return err
}
// Expire periodic job stats
if err := tracker.Expire(); err != nil {
logger.Error(err)
}
// Switch the job stats to stopped
// Should not block the next clear action
err = tracker.Stop()