From 51f1b2e5900bed8a56d4e0d5350de88ab205feef Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 29 May 2020 11:36:08 +0800 Subject: [PATCH] fix(jobservice):fix issues of jobservice - never expire the jobs that are not entering the final status (Error,Success or Stopped) - set different expireation time to the jobs with different status - never store the `check_in` data in the redis db to save space Signed-off-by: Steven Zou --- src/jobservice/common/rds/scripts.go | 8 +++- src/jobservice/job/tracker.go | 53 +++--------------------- src/jobservice/job/tracker_test.go | 3 -- src/jobservice/period/basic_scheduler.go | 5 --- 4 files changed, 12 insertions(+), 57 deletions(-) diff --git a/src/jobservice/common/rds/scripts.go b/src/jobservice/common/rds/scripts.go index 0030ce0e5..fff8d3777 100644 --- a/src/jobservice/common/rds/scripts.go +++ b/src/jobservice/common/rds/scripts.go @@ -88,8 +88,14 @@ if res then end if ARGV[1] == 'Success' or ARGV[1] == 'Stopped' then - -- expire the job stats with shorter interval + -- expire the job stats with shorter interval (1 day) redis.call('expire', KEYS[1], 86400) + elseif ARGV[1] == 'Error' then + -- expire the job stats with normal interval (7 days) incase it may be retried again + redis.call('expire', KEYS[1], 604800) + else + -- remove the expire time if existing + redis.call('persist', KEYS[1]) end end diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index 47ef4052f..6635ddfcc 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -17,7 +17,6 @@ package job import ( "context" "encoding/json" - "math/rand" "strconv" "time" @@ -32,8 +31,8 @@ import ( ) const ( - // Try best to keep the job stats data but anyway clear it after a reasonable time - statDataExpireTime = 7 * 24 * 3600 + // Check in data placeholder for saving data space + redundantCheckInData = "[REDUNDANT]" ) // Tracker is designed to track the life cycle of the job described by the stats @@ -85,9 +84,6 @@ type Tracker interface { // The current status of job Status() (Status, error) - // Expire the job stats data - Expire() error - // Switch status to running Run() error @@ -237,7 +233,7 @@ func (bt *basicTracker) CheckIn(message string) error { bt.refresh(current, message) err := bt.fireHookEvent(current, message) err = bt.Update( - "check_in", message, + // skip checkin data here "check_in_at", now, "update_time", now, ) @@ -245,11 +241,6 @@ func (bt *basicTracker) CheckIn(message string) error { return err } -// Expire job stats -func (bt *basicTracker) Expire() error { - return bt.expire(statDataExpireTime) -} - // Run job // Either one is failed, the final return will be marked as failed. func (bt *basicTracker) Run() error { @@ -325,7 +316,7 @@ func (bt *basicTracker) Save() (err error) { ) if stats.Info.CheckInAt > 0 && !utils.IsEmptyStr(stats.Info.CheckIn) { args = append(args, - "check_in", stats.Info.CheckIn, + "check_in", redundantCheckInData, // use data placeholder for saving space "check_in_at", stats.Info.CheckInAt, ) } @@ -355,21 +346,6 @@ func (bt *basicTracker) Save() (err error) { // Set inprogress track lock err = conn.Send("HSET", rds.KeyJobTrackInProgress(bt.namespace), stats.Info.JobID, 2) - // If job kind is periodic job, expire time should not be set - // If job kind is scheduled job, expire time should be runAt+ - if stats.Info.JobKind != KindPeriodic { - var expireTime int64 = statDataExpireTime - if stats.Info.JobKind == KindScheduled { - nowTime := time.Now().Unix() - future := stats.Info.RunAt - nowTime - if future > 0 { - expireTime += future - } - } - expireTime += rand.Int63n(15) // Avoid lots of keys being expired at the same time - err = conn.Send("EXPIRE", key, expireTime) - } - // Link with its upstream job if upstream job ID exists for future querying if !utils.IsEmptyStr(stats.Info.UpstreamJobID) { k := rds.KeyUpstreamJobAndExecutions(bt.namespace, stats.Info.UpstreamJobID) @@ -587,7 +563,7 @@ func (bt *basicTracker) retrieve() error { res.Info.CheckInAt = parseInt64(value) break case "check_in": - res.Info.CheckIn = value + res.Info.CheckIn = "" // never read checkin placeholder data break case "cron_spec": res.Info.CronSpec = value @@ -631,25 +607,6 @@ func (bt *basicTracker) retrieve() error { return nil } -func (bt *basicTracker) expire(expireTime int64) error { - conn := bt.pool.Get() - defer func() { - _ = conn.Close() - }() - - key := rds.KeyJobStats(bt.namespace, bt.jobID) - num, err := conn.Do("EXPIRE", key, expireTime) - if err != nil { - return err - } - - if num == 0 { - return errors.Errorf("job stats for expiring %s does not exist", bt.jobID) - } - - return nil -} - func getStatus(conn redis.Conn, key string) (Status, error) { values, err := rds.HmGet(conn, key, "status") if err != nil { diff --git a/src/jobservice/job/tracker_test.go b/src/jobservice/job/tracker_test.go index 58f705596..36b7bc295 100644 --- a/src/jobservice/job/tracker_test.go +++ b/src/jobservice/job/tracker_test.go @@ -151,9 +151,6 @@ func (suite *TrackerTestSuite) TestTracker() { st, err = t.Status() assert.NoError(suite.T(), err) assert.Equal(suite.T(), StoppedStatus, st) - - err = t.Expire() - assert.NoError(suite.T(), err) } // TestPeriodicTracker tests tracker of periodic diff --git a/src/jobservice/period/basic_scheduler.go b/src/jobservice/period/basic_scheduler.go index fd86907be..3dd3adefa 100644 --- a/src/jobservice/period/basic_scheduler.go +++ b/src/jobservice/period/basic_scheduler.go @@ -143,11 +143,6 @@ func (bs *basicScheduler) UnSchedule(policyID string) error { return err } - // Expire periodic job stats - if err := tracker.Expire(); err != nil { - logger.Error(err) - } - // Switch the job stats to stopped // Should not block the next clear action err = tracker.Stop()