From 1cb40368f52fc05661baae1b671690e2fc1d22ba Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Sun, 8 Apr 2018 10:38:47 +0800 Subject: [PATCH 1/2] Fix issue: failed to update the status of job if runtme error occurred --- src/jobservice/config.yml | 2 +- src/jobservice/job/impl/demo_job.go | 3 +++ src/jobservice/opm/redis_job_stats_mgr.go | 4 ++-- src/jobservice/pool/message_server.go | 6 ++++-- src/jobservice/pool/redis_job_wrapper.go | 6 ++++++ 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/jobservice/config.yml b/src/jobservice/config.yml index be919b473..af0e8b081 100644 --- a/src/jobservice/config.yml +++ b/src/jobservice/config.yml @@ -12,7 +12,7 @@ port: 9443 #Worker pool worker_pool: - #0 means unlimited + #Worker concurrency workers: 10 backend: "redis" #Additional config if use 'redis' backend diff --git a/src/jobservice/job/impl/demo_job.go b/src/jobservice/job/impl/demo_job.go index 2be68d4b4..510919d54 100644 --- a/src/jobservice/job/impl/demo_job.go +++ b/src/jobservice/job/impl/demo_job.go @@ -69,6 +69,9 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error /*if 1 != 0 { return errors.New("I suicide") }*/ + //runtime error + //var runtime_err error = nil + //fmt.Println(runtime_err.Error()) logger.Info("check in 30%") ctx.Checkin("30%") diff --git a/src/jobservice/opm/redis_job_stats_mgr.go b/src/jobservice/opm/redis_job_stats_mgr.go index 675413a7b..4a53c508d 100644 --- a/src/jobservice/opm/redis_job_stats_mgr.go +++ b/src/jobservice/opm/redis_job_stats_mgr.go @@ -178,7 +178,7 @@ func (rjs *RedisJobStatsManager) loop() { } }() } else { - logger.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) + logger.Errorf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) if item.op == opReportStatus { clearHookCache = true } @@ -348,7 +348,7 @@ func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) er defer conn.Close() key := utils.KeyJobStats(rjs.namespace, jobID) - args := make([]interface{}, 0, 5) + args := make([]interface{}, 0, 6) args = append(args, key, "status", status, "update_time", time.Now().Unix()) if status == job.JobStatusSuccess { //make sure the 'die_at' is reset in case it's a retrying job diff --git a/src/jobservice/pool/message_server.go b/src/jobservice/pool/message_server.go index 471cff980..c5becb817 100644 --- a/src/jobservice/pool/message_server.go +++ b/src/jobservice/pool/message_server.go @@ -66,12 +66,14 @@ func (ms *MessageServer) Start() error { m := &models.Message{} if err := json.Unmarshal(res.Data, m); err != nil { //logged - logger.Warningf("read invalid message: %s\n", res.Data) + logger.Warningf("Read invalid message: %s\n", res.Data) } if callback, ok := ms.callbacks[m.Event]; !ok { //logged logger.Warningf("no handler to handle event %s\n", m.Event) } else { + //logged incoming events + logger.Infof("Receive event '%s' with data(unformatted): %+#v\n", m.Event, m.Data) //Try to recover the concrete type var converted interface{} switch m.Event { @@ -98,7 +100,7 @@ func (ms *MessageServer) Start() error { if e != nil { err := e.(error) //logged - logger.Errorf("failed to fire callback with error: %s\n", err) + logger.Errorf("Failed to fire callback with error: %s\n", err) } } case redis.Subscription: diff --git a/src/jobservice/pool/redis_job_wrapper.go b/src/jobservice/pool/redis_job_wrapper.go index 1638c2ba6..6beb92465 100644 --- a/src/jobservice/pool/redis_job_wrapper.go +++ b/src/jobservice/pool/redis_job_wrapper.go @@ -56,9 +56,13 @@ func (rj *RedisJob) Run(j *work.Job) error { } if err == nil { + logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID) return //nothing need to do } + //log error + logger.Errorf("Job '%s:%s' exit with error: %s\n", j.Name, j.ID, err) + if buildContextFailed || rj.shouldDisableRetry(runningJob, j, cancelled) { j.Fails = 10000000000 //Make it big enough to avoid retrying now := time.Now().Unix() @@ -76,6 +80,8 @@ func (rj *RedisJob) Run(j *work.Job) error { defer func() { if r := recover(); r != nil { err = fmt.Errorf("Runtime error: %s", r) + //record runtime error status + rj.jobFailed(j.ID) } }() From 233692c1274aecf839b6fdbc4381630c1245227c Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Sun, 8 Apr 2018 11:29:43 +0800 Subject: [PATCH 2/2] fix issue: job context may be nil pointer when trying to be closed in defer func --- src/jobservice/pool/redis_job_wrapper.go | 30 +++++++++++++----------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/jobservice/pool/redis_job_wrapper.go b/src/jobservice/pool/redis_job_wrapper.go index 6beb92465..ed11a8c9b 100644 --- a/src/jobservice/pool/redis_job_wrapper.go +++ b/src/jobservice/pool/redis_job_wrapper.go @@ -40,21 +40,7 @@ func (rj *RedisJob) Run(j *work.Job) error { execContext env.JobContext ) - execContext, err = rj.buildContext(j) - if err != nil { - buildContextFailed = true - goto FAILED //no need to retry - } - - //Wrap job - runningJob = Wrap(rj.job) - defer func() { - //Close open io stream first - if closer, ok := execContext.GetLogger().(logger.Closer); ok { - closer.Close() - } - if err == nil { logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID) return //nothing need to do @@ -85,6 +71,22 @@ func (rj *RedisJob) Run(j *work.Job) error { } }() + //Wrap job + runningJob = Wrap(rj.job) + + execContext, err = rj.buildContext(j) + if err != nil { + buildContextFailed = true + goto FAILED //no need to retry + } + + defer func() { + //Close open io stream first + if closer, ok := execContext.GetLogger().(logger.Closer); ok { + closer.Close() + } + }() + //Start to run rj.jobRunning(j.ID) //Inject data