Implement API service of getting job stats

This commit is contained in:
Steven Zou 2018-03-15 14:33:57 +08:00
parent ea912729ec
commit be75145858
6 changed files with 112 additions and 6 deletions

View File

@ -79,8 +79,20 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re
func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
jobID := vars["job_id"]
jobStats, err := dh.controller.GetJob(jobID)
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.GetJobStatsError(err))
return
}
data, ok := dh.handleJSONData(w, jobStats)
if !ok {
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("job is coming " + jobID))
w.Write(data)
}
//HandleJobActionReq is implementation of method defined in interface 'Handler'

View File

@ -68,7 +68,11 @@ func (c *Controller) LaunchJob(req models.JobRequest) (models.JobStats, error) {
//GetJob is implementation of same method in core interface.
func (c *Controller) GetJob(jobID string) (models.JobStats, error) {
return models.JobStats{}, nil
if utils.IsEmptyStr(jobID) {
return models.JobStats{}, errors.New("empty job ID")
}
return c.backendPool.GetJobStats(jobID)
}
//StopJob is implementation of same method in core interface.

View File

@ -20,6 +20,8 @@ const (
LaunchJobErrorCode
//CheckStatsErrorCode is code for the error of checking stats of worker pool
CheckStatsErrorCode
//GetJobStatsErrorCode is code for the error of getting stats of enqueued job
GetJobStatsErrorCode
)
//baseError ...
@ -72,6 +74,11 @@ func CheckStatsError(err error) error {
return New(CheckStatsErrorCode, "Check stats of server failed with error", err.Error())
}
//GetJobStatsError is error wrapper for the error of getting job stats
func GetJobStatsError(err error) error {
return New(GetJobStatsErrorCode, "Get job stats failed with error", err.Error())
}
//jobStoppedError is designed for the case of stopping job.
type jobStoppedError struct {
baseError

View File

@ -94,6 +94,7 @@ func (rps *RedisPeriodicScheduler) Start() error {
switch res.Kind {
case "subscribe":
log.Infof("Subscribe redis channel %s\n", res.Channel)
break
case "unsubscribe":
//Unsubscribe all, means main goroutine is exiting
log.Infof("Unsubscribe redis channel %s\n", res.Channel)

View File

@ -82,4 +82,13 @@ type Interface interface {
// error if parameters are not valid
ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
//Get the stats of the specified job
//
//jobID string : ID of the enqueued job
//
//Returns:
// models.JobStats : job stats data
// error : error returned if meet any problems
GetJobStats(jobID string) (models.JobStats, error)
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"time"
"github.com/garyburd/redigo/redis"
@ -213,7 +214,7 @@ func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, i
}
res := generateResult(j, job.JobKindGeneric, isUnique)
if err := gcwp.saveStats(res); err != nil {
if err := gcwp.saveJobStats(res); err != nil {
//Once running job, let it fly away
//The client method may help if the job is still in progress when get stats of this job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
@ -242,7 +243,7 @@ func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters,
res := generateResult(j.Job, job.JobKindScheduled, isUnique)
res.Stats.RunAt = j.RunAt
if err := gcwp.saveStats(res); err != nil {
if err := gcwp.saveJobStats(res); err != nil {
//As job is already scheduled, we should not block this call
//Use client method to help get the status of this fly-away job
log.Errorf("Failed to save stats of job %s with error: %s\n", res.Stats.JobID, err)
@ -272,6 +273,15 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P
}, nil
}
//GetJobStats return the job stats of the specified enqueued job.
func (gcwp *GoCraftWorkPool) GetJobStats(jobID string) (models.JobStats, error) {
if utils.IsEmptyStr(jobID) {
return models.JobStats{}, errors.New("empty job ID")
}
return gcwp.getJobStats(jobID)
}
//Stats of pool
func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
//Get the status of workerpool via client
@ -320,7 +330,70 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m
return theJ.Validate(params)
}
func (gcwp *GoCraftWorkPool) saveStats(stats models.JobStats) error {
func (gcwp *GoCraftWorkPool) getJobStats(ID string) (models.JobStats, error) {
conn := gcwp.redisPool.Get()
defer conn.Close()
key := utils.KeyJobStats(gcwp.namespace, ID)
vals, err := redis.Strings(conn.Do("HGETALL", key))
if err != nil {
return models.JobStats{}, err
}
res := models.JobStats{
Stats: &models.JobStatData{},
}
for i, l := 0, len(vals); i < l; i = i + 2 {
prop := vals[i]
value := vals[i+1]
switch prop {
case "id":
res.Stats.JobID = value
break
case "name":
res.Stats.JobName = value
break
case "kind":
res.Stats.JobKind = value
case "unique":
v, err := strconv.ParseBool(value)
if err != nil {
v = false
}
res.Stats.IsUnique = v
case "status":
res.Stats.Status = value
break
case "ref_link":
res.Stats.RefLink = value
break
case "enqueue_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.EnqueueTime = v
break
case "update_time":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.UpdateTime = v
break
case "run_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.RunAt = v
break
case "check_in_at":
v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.CheckInAt = v
break
case "check_in":
res.Stats.CheckIn = value
break
default:
}
}
return res, nil
}
func (gcwp *GoCraftWorkPool) saveJobStats(stats models.JobStats) error {
conn := gcwp.redisPool.Get()
defer conn.Close()
@ -346,7 +419,7 @@ func (gcwp *GoCraftWorkPool) saveStats(stats models.JobStats) error {
}
conn.Send("HMSET", args...)
//If job kind is periodic job, expire time should be set
//If job kind is periodic job, expire time should not be set
//If job kind is scheduled job, expire time should be runAt+1day
if stats.Stats.JobKind != job.JobKindPeriodic {
var expireTime int64 = 60 * 60 * 24