Support API service of checking stats of the worker pool

This commit is contained in:
Steven Zou 2018-03-14 16:59:00 +08:00
parent 6b46844565
commit 414c36205c
5 changed files with 77 additions and 6 deletions

View File

@ -65,11 +65,12 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re
dh.handleError(w, http.StatusInternalServerError, errs.LaunchJobError(err))
return
}
data, err = json.Marshal(jobStats)
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err))
data, ok := dh.handleJSONData(w, jobStats)
if !ok {
return
}
w.WriteHeader(http.StatusAccepted)
w.Write(data)
}
@ -89,7 +90,29 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
//HandleCheckStatusReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) {
stats, err := dh.controller.CheckStatus()
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.CheckStatsError(err))
return
}
data, ok := dh.handleJSONData(w, stats)
if !ok {
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
}
func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, object interface{}) ([]byte, bool) {
data, err := json.Marshal(object)
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err))
return nil, false
}
return data, true
}
func (dh *DefaultHandler) handleError(w http.ResponseWriter, code int, err error) {

View File

@ -82,7 +82,7 @@ func (c *Controller) RetryJob(jonID string) error {
//CheckStatus is implementation of same method in core interface.
func (c *Controller) CheckStatus() (models.JobPoolStats, error) {
return models.JobPoolStats{}, nil
return c.backendPool.Stats()
}
func validJobReq(req models.JobRequest) error {

View File

@ -18,6 +18,8 @@ const (
MissingBackendHandlerErrorCode
//LaunchJobErrorCode is code for the error of launching job
LaunchJobErrorCode
//CheckStatsErrorCode is code for the error of checking stats of worker pool
CheckStatsErrorCode
)
//baseError ...
@ -65,6 +67,11 @@ func LaunchJobError(err error) error {
return New(LaunchJobErrorCode, "Launch job failed with error", err.Error())
}
//CheckStatsError is error wrapper for the error of checking stats failed
func CheckStatsError(err error) error {
return New(CheckStatsErrorCode, "Check stats of server failed with error", err.Error())
}
//jobStoppedError is designed for the case of stopping job.
type jobStoppedError struct {
baseError

View File

@ -46,4 +46,11 @@ type JobStatData struct {
}
//JobPoolStats represent the healthy and status of the job service.
type JobPoolStats struct{}
type JobPoolStats struct {
WorkerPoolID string `json:"worker_pool_id"`
StartedAt int64 `json:"started_at"`
HeartbeatAt int64 `json:"heartbeat_at"`
JobNames []string `json:"job_names"`
Concurrency uint `json:"concurrency"`
Status string `json:"status"`
}

View File

@ -5,6 +5,7 @@ package pool
import (
"errors"
"fmt"
"os"
"time"
"github.com/garyburd/redigo/redis"
@ -22,6 +23,12 @@ var (
healthCheckPeriod = time.Minute
dialReadTimeout = healthCheckPeriod + 10*time.Second
dialWriteTimeout = 10 * time.Second
workerPoolDeadTime = 10 * time.Second
)
const (
workerPoolStatusHealthy = "Healthy"
workerPoolStatusDead = "Dead"
)
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
@ -252,7 +259,34 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P
//Stats of pool
func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
return models.JobPoolStats{}, nil
//Get the status of workerpool via client
hbs, err := gcwp.client.WorkerPoolHeartbeats()
if err != nil {
return models.JobPoolStats{}, err
}
//Find the heartbeat of this pool via pid
pid := os.Getpid()
for _, hb := range hbs {
if hb.Pid == pid {
wPoolStatus := workerPoolStatusHealthy
if time.Unix(hb.HeartbeatAt, 0).Add(workerPoolDeadTime).Before(time.Now()) {
wPoolStatus = workerPoolStatusDead
}
stats := models.JobPoolStats{
WorkerPoolID: hb.WorkerPoolID,
StartedAt: hb.StartedAt,
HeartbeatAt: hb.HeartbeatAt,
JobNames: hb.JobNames,
Concurrency: hb.Concurrency,
Status: wPoolStatus,
}
return stats, nil
}
}
return models.JobPoolStats{}, errors.New("Failed to get stats of worker pool")
}
//IsKnownJob ...