Merge pull request #4593 from vmware/add_more_log_2_js

Fix issue: failed to update the status of job if runtme error occurred
This commit is contained in:
Steven Zou 2018-04-08 14:03:34 +08:00 committed by GitHub
commit 529ad3e079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 32 additions and 19 deletions

View File

@ -12,7 +12,7 @@ port: 9443
#Worker pool
worker_pool:
#0 means unlimited
#Worker concurrency
workers: 10
backend: "redis"
#Additional config if use 'redis' backend

View File

@ -69,6 +69,9 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
/*if 1 != 0 {
return errors.New("I suicide")
}*/
//runtime error
//var runtime_err error = nil
//fmt.Println(runtime_err.Error())
logger.Info("check in 30%")
ctx.Checkin("30%")

View File

@ -178,7 +178,7 @@ func (rjs *RedisJobStatsManager) loop() {
}
}()
} else {
logger.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
logger.Errorf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
if item.op == opReportStatus {
clearHookCache = true
}
@ -348,7 +348,7 @@ func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) er
defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 5)
args := make([]interface{}, 0, 6)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
if status == job.JobStatusSuccess {
//make sure the 'die_at' is reset in case it's a retrying job

View File

@ -66,12 +66,14 @@ func (ms *MessageServer) Start() error {
m := &models.Message{}
if err := json.Unmarshal(res.Data, m); err != nil {
//logged
logger.Warningf("read invalid message: %s\n", res.Data)
logger.Warningf("Read invalid message: %s\n", res.Data)
}
if callback, ok := ms.callbacks[m.Event]; !ok {
//logged
logger.Warningf("no handler to handle event %s\n", m.Event)
} else {
//logged incoming events
logger.Infof("Receive event '%s' with data(unformatted): %+#v\n", m.Event, m.Data)
//Try to recover the concrete type
var converted interface{}
switch m.Event {
@ -98,7 +100,7 @@ func (ms *MessageServer) Start() error {
if e != nil {
err := e.(error)
//logged
logger.Errorf("failed to fire callback with error: %s\n", err)
logger.Errorf("Failed to fire callback with error: %s\n", err)
}
}
case redis.Subscription:

View File

@ -40,25 +40,15 @@ func (rj *RedisJob) Run(j *work.Job) error {
execContext env.JobContext
)
execContext, err = rj.buildContext(j)
if err != nil {
buildContextFailed = true
goto FAILED //no need to retry
}
//Wrap job
runningJob = Wrap(rj.job)
defer func() {
//Close open io stream first
if closer, ok := execContext.GetLogger().(logger.Closer); ok {
closer.Close()
}
if err == nil {
logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID)
return //nothing need to do
}
//log error
logger.Errorf("Job '%s:%s' exit with error: %s\n", j.Name, j.ID, err)
if buildContextFailed || rj.shouldDisableRetry(runningJob, j, cancelled) {
j.Fails = 10000000000 //Make it big enough to avoid retrying
now := time.Now().Unix()
@ -76,6 +66,24 @@ func (rj *RedisJob) Run(j *work.Job) error {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Runtime error: %s", r)
//record runtime error status
rj.jobFailed(j.ID)
}
}()
//Wrap job
runningJob = Wrap(rj.job)
execContext, err = rj.buildContext(j)
if err != nil {
buildContextFailed = true
goto FAILED //no need to retry
}
defer func() {
//Close open io stream first
if closer, ok := execContext.GetLogger().(logger.Closer); ok {
closer.Close()
}
}()