mirror of
https://github.com/goharbor/harbor.git
synced 2025-03-01 18:21:20 +01:00
Support status hook mechanism to report job status changes
refactor scheduler to extarct separate sub/pub system fix bug in getJob return all the stats data of running pools in the health check API
This commit is contained in:
parent
ac544b3ead
commit
e5f8beb35f
@ -11,6 +11,11 @@ import (
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
hookActivated = "activated"
|
||||
hookDeactivated = "error"
|
||||
)
|
||||
|
||||
//Controller implement the core interface and provides related job handle methods.
|
||||
//Controller will coordinate the lower components to complete the process as a commander role.
|
||||
type Controller struct {
|
||||
@ -63,6 +68,17 @@ func (c *Controller) LaunchJob(req models.JobRequest) (models.JobStats, error) {
|
||||
res, err = c.backendPool.Enqueue(req.Job.Name, req.Job.Parameters, req.Job.Metadata.IsUnique)
|
||||
}
|
||||
|
||||
//Register status hook?
|
||||
if err == nil {
|
||||
if !utils.IsEmptyStr(req.Job.StatusHook) {
|
||||
if err := c.backendPool.RegisterHook(res.Stats.JobID, req.Job.StatusHook); err != nil {
|
||||
res.Stats.HookStatus = hookDeactivated
|
||||
} else {
|
||||
res.Stats.HookStatus = hookActivated
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ type JobData struct {
|
||||
Name string `json:"name"`
|
||||
Parameters Parameters `json:"parameters"`
|
||||
Metadata *JobMetadata `json:"metadata"`
|
||||
StatusHook string `json:"status_hook"`
|
||||
}
|
||||
|
||||
//JobMetadata stores the metadata of job.
|
||||
@ -45,10 +46,16 @@ type JobStatData struct {
|
||||
CheckIn string `json:"check_in,omitempty"`
|
||||
CheckInAt int64 `json:"check_in_at,omitempty"`
|
||||
DieAt int64 `json:"die_at,omitempty"`
|
||||
HookStatus string `json:"hook_status,omitempty"`
|
||||
}
|
||||
|
||||
//JobPoolStats represent the healthy and status of the job service.
|
||||
//JobPoolStats represents the healthy and status of all the running worker pools.
|
||||
type JobPoolStats struct {
|
||||
Pools []*JobPoolStatsData `json:"worker_pools"`
|
||||
}
|
||||
|
||||
//JobPoolStatsData represent the healthy and status of the worker pool.
|
||||
type JobPoolStatsData struct {
|
||||
WorkerPoolID string `json:"worker_pool_id"`
|
||||
StartedAt int64 `json:"started_at"`
|
||||
HeartbeatAt int64 `json:"heartbeat_at"`
|
||||
@ -61,3 +68,16 @@ type JobPoolStats struct {
|
||||
type JobActionRequest struct {
|
||||
Action string `json:"action"`
|
||||
}
|
||||
|
||||
//JobStatusChange is designed for reporting the status change via hook.
|
||||
type JobStatusChange struct {
|
||||
JobID string `json:"job_id"`
|
||||
Status string `json:"status"`
|
||||
CheckIn string `json:"check_in,omitempty"`
|
||||
}
|
||||
|
||||
//Message is designed for sub/pub messages
|
||||
type Message struct {
|
||||
Event string
|
||||
Data interface{} //generic format
|
||||
}
|
||||
|
99
src/jobservice_v2/opm/hook_client.go
Normal file
99
src/jobservice_v2/opm/hook_client.go
Normal file
@ -0,0 +1,99 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package opm
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
clientTimeout = 10 * time.Second
|
||||
maxIdleConnections = 20
|
||||
idleConnectionTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
//DefaultHookClient is for default use.
|
||||
var DefaultHookClient = NewHookClient()
|
||||
|
||||
//HookClient is used to post the related data to the interested parties.
|
||||
type HookClient struct {
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
//NewHookClient return the ptr of the new HookClient
|
||||
func NewHookClient() *HookClient {
|
||||
client := &http.Client{
|
||||
Timeout: clientTimeout,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: maxIdleConnections,
|
||||
IdleConnTimeout: idleConnectionTimeout,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &HookClient{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
//ReportStatus reports the status change info to the subscribed party.
|
||||
//The status includes 'checkin' info with format 'check_in:<message>'
|
||||
func (hc *HookClient) ReportStatus(hookURL string, status models.JobStatusChange) error {
|
||||
if utils.IsEmptyStr(hookURL) {
|
||||
return errors.New("empty hook url") //do nothing
|
||||
}
|
||||
|
||||
//Parse and validate URL
|
||||
url, err := url.Parse(hookURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//Marshal data
|
||||
data, err := json.Marshal(&status)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//New post request
|
||||
req, err := http.NewRequest(http.MethodPost, url.String(), strings.NewReader(string(data)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := hc.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer res.Body.Close() //close connection for reuse
|
||||
|
||||
//Should be 200
|
||||
if res.StatusCode != http.StatusOK {
|
||||
if res.ContentLength > 0 {
|
||||
//read error content and return
|
||||
dt, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.New(string(dt))
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to report status change via hook, expect '200' but got '%d'", res.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
57
src/jobservice_v2/opm/hook_store.go
Normal file
57
src/jobservice_v2/opm/hook_store.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package opm
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
//HookStore is used to cache the hooks in memory.
|
||||
//Use job ID as key to index
|
||||
type HookStore struct {
|
||||
lock *sync.RWMutex
|
||||
data map[string]string
|
||||
}
|
||||
|
||||
//NewHookStore is to create a ptr of new HookStore.
|
||||
func NewHookStore() *HookStore {
|
||||
return &HookStore{
|
||||
lock: new(sync.RWMutex),
|
||||
data: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
//Add new record
|
||||
func (hs *HookStore) Add(jobID string, hookURL string) {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return //do nothing
|
||||
}
|
||||
|
||||
hs.lock.Lock()
|
||||
defer hs.lock.Unlock()
|
||||
|
||||
hs.data[jobID] = hookURL
|
||||
}
|
||||
|
||||
//Get one hook url by job ID
|
||||
func (hs *HookStore) Get(jobID string) (string, bool) {
|
||||
hs.lock.RLock()
|
||||
defer hs.lock.RUnlock()
|
||||
|
||||
hookURL, ok := hs.data[jobID]
|
||||
|
||||
return hookURL, ok
|
||||
}
|
||||
|
||||
//Remove the specified one
|
||||
func (hs *HookStore) Remove(jobID string) (string, bool) {
|
||||
hs.lock.Lock()
|
||||
defer hs.lock.Unlock()
|
||||
|
||||
hookURL, ok := hs.data[jobID]
|
||||
delete(hs.data, jobID)
|
||||
|
||||
return hookURL, ok
|
||||
}
|
@ -76,4 +76,14 @@ type JobStatsManager interface {
|
||||
//message string : The message being checked in
|
||||
//
|
||||
DieAt(jobID string, dieAt int64)
|
||||
|
||||
//RegisterHook is used to save the hook url or cache the url in memory.
|
||||
//
|
||||
//jobID string : ID of job
|
||||
//hookURL string : the hook url being registered
|
||||
//isCached bool : to indicate if only cache the hook url
|
||||
//
|
||||
//Returns:
|
||||
// error if meet any problems
|
||||
RegisterHook(jobID string, hookURL string, isCached bool) error
|
||||
}
|
||||
|
@ -4,8 +4,10 @@ package opm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -29,6 +31,7 @@ const (
|
||||
opUpdateStatus = "update_job_status"
|
||||
opCheckIn = "check_in"
|
||||
opDieAt = "mark_die_at"
|
||||
opReportStatus = "report_status"
|
||||
maxFails = 3
|
||||
|
||||
//CtlCommandStop : command stop
|
||||
@ -40,6 +43,9 @@ const (
|
||||
|
||||
//Copy from period.enqueuer
|
||||
periodicEnqueuerHorizon = 4 * time.Minute
|
||||
|
||||
//EventRegisterStatusHook is event name of registering hook
|
||||
EventRegisterStatusHook = "register_hook"
|
||||
)
|
||||
|
||||
type queueItem struct {
|
||||
@ -59,7 +65,8 @@ type RedisJobStatsManager struct {
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
processChan chan *queueItem
|
||||
isRunning bool //no need to sync
|
||||
isRunning bool //no need to sync
|
||||
hookStore *HookStore //cache the hook here to avoid requesting backend
|
||||
}
|
||||
|
||||
//NewRedisJobStatsManager is constructor of RedisJobStatsManager
|
||||
@ -73,6 +80,7 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r
|
||||
stopChan: make(chan struct{}, 1),
|
||||
doneChan: make(chan struct{}, 1),
|
||||
processChan: make(chan *queueItem, processBufferSize),
|
||||
hookStore: NewHookStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +91,8 @@ func (rjs *RedisJobStatsManager) Start() {
|
||||
}
|
||||
go rjs.loop()
|
||||
rjs.isRunning = true
|
||||
|
||||
log.Info("Redis job stats manager is started")
|
||||
}
|
||||
|
||||
//Shutdown is implementation of same method in JobStatsManager interface.
|
||||
@ -128,6 +138,9 @@ func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
|
||||
}
|
||||
|
||||
rjs.processChan <- item
|
||||
|
||||
//Report status at the same time
|
||||
rjs.submitStatusReportingItem(jobID, status, "")
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) loop() {
|
||||
@ -144,12 +157,13 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
select {
|
||||
case item := <-rjs.processChan:
|
||||
go func(item *queueItem) {
|
||||
clearHookCache := false
|
||||
if err := rjs.process(item); err != nil {
|
||||
item.fails++
|
||||
if item.fails < maxFails {
|
||||
//Retry after a random interval
|
||||
go func() {
|
||||
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
|
||||
timer := time.NewTimer(time.Duration(backoff(item.fails)) * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
@ -161,6 +175,22 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
}()
|
||||
} else {
|
||||
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
|
||||
if item.op == opReportStatus {
|
||||
clearHookCache = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if item.op == opReportStatus {
|
||||
clearHookCache = true
|
||||
}
|
||||
}
|
||||
|
||||
if clearHookCache {
|
||||
//Clear cache to save memory if job status is success or stopped.
|
||||
data := item.data.([]string)
|
||||
status := data[2]
|
||||
if status == job.JobStatusSuccess || status == job.JobStatusStopped {
|
||||
rjs.hookStore.Remove(data[0])
|
||||
}
|
||||
}
|
||||
}(item)
|
||||
@ -284,6 +314,9 @@ func (rjs *RedisJobStatsManager) CheckIn(jobID string, message string) {
|
||||
}
|
||||
|
||||
rjs.processChan <- item
|
||||
|
||||
//Report checkin message at the same time
|
||||
rjs.submitStatusReportingItem(jobID, job.JobStatusRunning, message)
|
||||
}
|
||||
|
||||
//CtlCommand checks if control command is fired for the specified job.
|
||||
@ -309,6 +342,64 @@ func (rjs *RedisJobStatsManager) DieAt(jobID string, dieAt int64) {
|
||||
rjs.processChan <- item
|
||||
}
|
||||
|
||||
//RegisterHook is used to save the hook url or cache the url in memory.
|
||||
func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCached bool) error {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return errors.New("empty job ID")
|
||||
}
|
||||
|
||||
if utils.IsEmptyStr(hookURL) {
|
||||
return errors.New("invalid hook url")
|
||||
}
|
||||
|
||||
if !isCached {
|
||||
return rjs.saveHook(jobID, hookURL)
|
||||
}
|
||||
|
||||
rjs.hookStore.Add(jobID, hookURL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, checkIn string) {
|
||||
//Let it run in a separate goroutine to avoid waiting more time
|
||||
go func() {
|
||||
var (
|
||||
hookUrl string
|
||||
ok bool
|
||||
err error
|
||||
)
|
||||
|
||||
hookUrl, ok = rjs.hookStore.Get(jobID)
|
||||
if !ok {
|
||||
//Retrieve from backend
|
||||
hookUrl, err = rjs.getHook(jobID)
|
||||
if err != nil {
|
||||
//logged and exit
|
||||
log.Warningf("no status hook found for job %s\n, abandon status reporting", jobID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
item := &queueItem{
|
||||
op: opReportStatus,
|
||||
data: []string{jobID, hookUrl, status, checkIn},
|
||||
}
|
||||
|
||||
rjs.processChan <- item
|
||||
}()
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, checkIn string) error {
|
||||
reportingStatus := models.JobStatusChange{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
CheckIn: checkIn,
|
||||
}
|
||||
|
||||
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
@ -339,7 +430,7 @@ func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID st
|
||||
epoch := t.Unix()
|
||||
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
|
||||
//only logged
|
||||
log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
|
||||
log.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -451,6 +542,10 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
|
||||
return models.JobStats{}, err
|
||||
}
|
||||
|
||||
if vals == nil || len(vals) == 0 {
|
||||
return models.JobStats{}, fmt.Errorf("job '%s' is not found", jobID)
|
||||
}
|
||||
|
||||
res := models.JobStats{
|
||||
Stats: &models.JobStatData{},
|
||||
}
|
||||
@ -504,6 +599,7 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Stats.DieAt = v
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@ -572,9 +668,85 @@ func (rjs *RedisJobStatsManager) process(item *queueItem) error {
|
||||
case opDieAt:
|
||||
data := item.data.([]interface{})
|
||||
return rjs.dieAt(data[0].(string), data[1].(int64))
|
||||
case opReportStatus:
|
||||
data := item.data.([]string)
|
||||
return rjs.reportStatus(data[0], data[1], data[2], data[3])
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//HookData keeps the hook url info
|
||||
type HookData struct {
|
||||
JobID string `json:"job_id"`
|
||||
HookURL string `json:"hook_url"`
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) saveHook(jobID string, hookURL string) error {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
key := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
args := make([]interface{}, 0, 3)
|
||||
args = append(args, key, "status_hook", hookURL)
|
||||
msg := &models.Message{
|
||||
Event: EventRegisterStatusHook,
|
||||
Data: &HookData{
|
||||
JobID: jobID,
|
||||
HookURL: hookURL,
|
||||
},
|
||||
}
|
||||
rawJSON, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//hook is saved into the job stats
|
||||
//We'll not set expire time here, the expire time of the key will be set when saving job stats
|
||||
if err := conn.Send("MULTI"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.Send("HMSET", args...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.Send("PUBLISH", utils.KeyPeriodicNotification(rjs.namespace), rawJSON); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = conn.Do("EXEC")
|
||||
return err
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) getHook(jobID string) (string, error) {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
key := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
vals, err := redis.Strings(conn.Do("HGETALL", key))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
for i, l := 0, len(vals); i < l; i = i + 2 {
|
||||
prop := vals[i]
|
||||
value := vals[i+1]
|
||||
switch prop {
|
||||
case "status_hook":
|
||||
return value, nil
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("no hook found for job '%s'", jobID)
|
||||
}
|
||||
|
||||
func backoff(seed uint) int {
|
||||
if seed < 1 {
|
||||
seed = 1
|
||||
}
|
||||
|
||||
return int(math.Pow(float64(seed+1), float64(seed))) + rand.Intn(5)
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ type Interface interface {
|
||||
// error if failed to unschedule
|
||||
UnSchedule(cronJobPolicyID string) error
|
||||
|
||||
//Load data
|
||||
//Load and cache data if needed
|
||||
//
|
||||
//Return:
|
||||
// error if failed to do
|
||||
@ -39,5 +39,21 @@ type Interface interface {
|
||||
Clear() error
|
||||
|
||||
//Start to serve
|
||||
Start() error
|
||||
Start()
|
||||
|
||||
//Accept the pushed policy and cache it
|
||||
//
|
||||
//policy *PeriodicJobPolicy : the periodic policy being accept
|
||||
//
|
||||
//Return:
|
||||
// error if failed to do
|
||||
AcceptPeriodicPolicy(policy *PeriodicJobPolicy) error
|
||||
|
||||
//Remove the specified policy from the cache if it is existing
|
||||
//
|
||||
//policyID string : ID of the policy being removed
|
||||
//
|
||||
//Return:
|
||||
// the ptr of the being deletd policy
|
||||
RemovePeriodicPolicy(policyID string) *PeriodicJobPolicy
|
||||
}
|
||||
|
@ -16,8 +16,8 @@ const (
|
||||
periodicJobPolicyChangeEventUnSchedule = "UnSchedule"
|
||||
)
|
||||
|
||||
//periodicJobPolicy ...
|
||||
type periodicJobPolicy struct {
|
||||
//PeriodicJobPolicy ...
|
||||
type PeriodicJobPolicy struct {
|
||||
//NOTES: The 'PolicyID' should not be set when serialize this policy struct to the zset
|
||||
//because each 'Policy ID' is different and it may cause issue of losing zset unique capability.
|
||||
PolicyID string `json:"policy_id,omitempty"`
|
||||
@ -26,39 +26,23 @@ type periodicJobPolicy struct {
|
||||
CronSpec string `json:"cron_spec"`
|
||||
}
|
||||
|
||||
//periodicJobPolicyEvent is the event content of periodic job policy change.
|
||||
type periodicJobPolicyEvent struct {
|
||||
Event string `json:"event"`
|
||||
PeriodicJobPolicy *periodicJobPolicy `json:"periodic_job_policy"`
|
||||
}
|
||||
|
||||
//serialize the policy to raw data.
|
||||
func (pjp *periodicJobPolicy) serialize() ([]byte, error) {
|
||||
//Serialize the policy to raw data.
|
||||
func (pjp *PeriodicJobPolicy) Serialize() ([]byte, error) {
|
||||
return json.Marshal(pjp)
|
||||
}
|
||||
|
||||
//deSerialize the raw json to policy.
|
||||
func (pjp *periodicJobPolicy) deSerialize(rawJSON []byte) error {
|
||||
//DeSerialize the raw json to policy.
|
||||
func (pjp *PeriodicJobPolicy) DeSerialize(rawJSON []byte) error {
|
||||
return json.Unmarshal(rawJSON, pjp)
|
||||
}
|
||||
|
||||
//serialize the policy to raw data.
|
||||
func (pjpe *periodicJobPolicyEvent) serialize() ([]byte, error) {
|
||||
return json.Marshal(pjpe)
|
||||
}
|
||||
|
||||
//deSerialize the raw json to policy.
|
||||
func (pjpe *periodicJobPolicyEvent) deSerialize(rawJSON []byte) error {
|
||||
return json.Unmarshal(rawJSON, pjpe)
|
||||
}
|
||||
|
||||
//periodicJobPolicyStore is in-memory cache for the periodic job policies.
|
||||
type periodicJobPolicyStore struct {
|
||||
lock *sync.RWMutex
|
||||
policies map[string]*periodicJobPolicy //k-v pair and key is the policy ID
|
||||
policies map[string]*PeriodicJobPolicy //k-v pair and key is the policy ID
|
||||
}
|
||||
|
||||
func (ps *periodicJobPolicyStore) addAll(items []*periodicJobPolicy) {
|
||||
func (ps *periodicJobPolicyStore) addAll(items []*PeriodicJobPolicy) {
|
||||
if items == nil || len(items) == 0 {
|
||||
return
|
||||
}
|
||||
@ -74,8 +58,8 @@ func (ps *periodicJobPolicyStore) addAll(items []*periodicJobPolicy) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *periodicJobPolicyStore) list() []*periodicJobPolicy {
|
||||
allItems := make([]*periodicJobPolicy, 0)
|
||||
func (ps *periodicJobPolicyStore) list() []*PeriodicJobPolicy {
|
||||
allItems := make([]*PeriodicJobPolicy, 0)
|
||||
|
||||
ps.lock.RLock()
|
||||
defer ps.lock.RUnlock()
|
||||
@ -87,7 +71,7 @@ func (ps *periodicJobPolicyStore) list() []*periodicJobPolicy {
|
||||
return allItems
|
||||
}
|
||||
|
||||
func (ps *periodicJobPolicyStore) add(jobPolicy *periodicJobPolicy) {
|
||||
func (ps *periodicJobPolicyStore) add(jobPolicy *PeriodicJobPolicy) {
|
||||
if jobPolicy == nil || utils.IsEmptyStr(jobPolicy.PolicyID) {
|
||||
return
|
||||
}
|
||||
@ -98,7 +82,7 @@ func (ps *periodicJobPolicyStore) add(jobPolicy *periodicJobPolicy) {
|
||||
ps.policies[jobPolicy.PolicyID] = jobPolicy
|
||||
}
|
||||
|
||||
func (ps *periodicJobPolicyStore) remove(policyID string) *periodicJobPolicy {
|
||||
func (ps *periodicJobPolicyStore) remove(policyID string) *PeriodicJobPolicy {
|
||||
if utils.IsEmptyStr(policyID) {
|
||||
return nil
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
package period
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strconv"
|
||||
@ -14,13 +13,21 @@ import (
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
//EventSchedulePeriodicPolicy is for scheduling periodic policy event
|
||||
EventSchedulePeriodicPolicy = "schedule"
|
||||
//EventUnSchedulePeriodicPolicy is for unscheduling periodic policy event
|
||||
EventUnSchedulePeriodicPolicy = "unschedule"
|
||||
)
|
||||
|
||||
//RedisPeriodicScheduler manages the periodic scheduling policies.
|
||||
type RedisPeriodicScheduler struct {
|
||||
context context.Context
|
||||
context *env.Context
|
||||
redisPool *redis.Pool
|
||||
namespace string
|
||||
pstore *periodicJobPolicyStore
|
||||
@ -28,10 +35,10 @@ type RedisPeriodicScheduler struct {
|
||||
}
|
||||
|
||||
//NewRedisPeriodicScheduler is constructor of RedisPeriodicScheduler
|
||||
func NewRedisPeriodicScheduler(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisPeriodicScheduler {
|
||||
func NewRedisPeriodicScheduler(ctx *env.Context, namespace string, redisPool *redis.Pool) *RedisPeriodicScheduler {
|
||||
pstore := &periodicJobPolicyStore{
|
||||
lock: new(sync.RWMutex),
|
||||
policies: make(map[string]*periodicJobPolicy),
|
||||
policies: make(map[string]*PeriodicJobPolicy),
|
||||
}
|
||||
enqueuer := newPeriodicEnqueuer(namespace, redisPool, pstore)
|
||||
|
||||
@ -45,90 +52,25 @@ func NewRedisPeriodicScheduler(ctx context.Context, namespace string, redisPool
|
||||
}
|
||||
|
||||
//Start to serve
|
||||
//Enable PUB/SUB
|
||||
func (rps *RedisPeriodicScheduler) Start() error {
|
||||
func (rps *RedisPeriodicScheduler) Start() {
|
||||
defer func() {
|
||||
log.Info("Redis scheduler is stopped")
|
||||
}()
|
||||
|
||||
//Load existing periodic job policies
|
||||
if err := rps.Load(); err != nil {
|
||||
return err
|
||||
//exit now
|
||||
rps.context.ErrorChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
//As we get one connection from the pool, don't try to close it.
|
||||
conn := rps.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
psc := redis.PubSubConn{
|
||||
Conn: conn,
|
||||
}
|
||||
|
||||
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(rps.namespace))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
switch res := psc.Receive().(type) {
|
||||
case error:
|
||||
done <- res
|
||||
return
|
||||
case redis.Message:
|
||||
if notification := readMessage(res.Data); notification != nil {
|
||||
log.Infof("Got periodic job policy change notification: %s:%s\n", notification.Event, notification.PeriodicJobPolicy.PolicyID)
|
||||
|
||||
switch notification.Event {
|
||||
case periodicJobPolicyChangeEventSchedule:
|
||||
rps.pstore.add(notification.PeriodicJobPolicy)
|
||||
case periodicJobPolicyChangeEventUnSchedule:
|
||||
if notification.PeriodicJobPolicy != nil {
|
||||
rps.pstore.remove(notification.PeriodicJobPolicy.PolicyID)
|
||||
}
|
||||
default:
|
||||
//do nothing
|
||||
}
|
||||
}
|
||||
case redis.Subscription:
|
||||
switch res.Kind {
|
||||
case "subscribe":
|
||||
log.Infof("Subscribe redis channel %s\n", res.Channel)
|
||||
break
|
||||
case "unsubscribe":
|
||||
//Unsubscribe all, means main goroutine is exiting
|
||||
log.Infof("Unsubscribe redis channel %s\n", res.Channel)
|
||||
done <- nil
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
//start enqueuer
|
||||
rps.enqueuer.start()
|
||||
defer rps.enqueuer.stop()
|
||||
log.Info("Redis scheduler is started")
|
||||
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
//blocking here
|
||||
for err == nil {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err = psc.Ping("ping!")
|
||||
case <-rps.context.Done():
|
||||
err = errors.New("context exit")
|
||||
case err = <-done:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//Unsubscribe all
|
||||
psc.Unsubscribe()
|
||||
return <-done
|
||||
<-rps.context.SystemContext.Done()
|
||||
}
|
||||
|
||||
//Schedule is implementation of the same method in period.Interface
|
||||
@ -149,13 +91,13 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame
|
||||
//Although the ZSET can guarantee no duplicated items, we still need to check the existing
|
||||
//of the job policy to avoid publish duplicated ones to other nodes as we
|
||||
//use transaction commands.
|
||||
jobPolicy := &periodicJobPolicy{
|
||||
jobPolicy := &PeriodicJobPolicy{
|
||||
JobName: jobName,
|
||||
JobParameters: params,
|
||||
CronSpec: cronSpec,
|
||||
}
|
||||
//Serialize data
|
||||
rawJSON, err := jobPolicy.serialize()
|
||||
rawJSON, err := jobPolicy.Serialize()
|
||||
if err != nil {
|
||||
return "", 0, nil
|
||||
}
|
||||
@ -170,11 +112,11 @@ func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parame
|
||||
uuid, score := utils.MakePeriodicPolicyUUID()
|
||||
//Set back policy ID
|
||||
jobPolicy.PolicyID = uuid
|
||||
notification := &periodicJobPolicyEvent{
|
||||
Event: periodicJobPolicyChangeEventSchedule,
|
||||
PeriodicJobPolicy: jobPolicy,
|
||||
notification := &models.Message{
|
||||
Event: EventSchedulePeriodicPolicy,
|
||||
Data: jobPolicy,
|
||||
}
|
||||
rawJSON2, err := notification.serialize()
|
||||
rawJSON2, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
@ -218,14 +160,14 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
notification := &periodicJobPolicyEvent{
|
||||
Event: periodicJobPolicyChangeEventUnSchedule,
|
||||
PeriodicJobPolicy: &periodicJobPolicy{
|
||||
notification := &models.Message{
|
||||
Event: EventUnSchedulePeriodicPolicy,
|
||||
Data: &PeriodicJobPolicy{
|
||||
PolicyID: cronJobPolicyID, //Only ID required
|
||||
},
|
||||
}
|
||||
|
||||
rawJSON, err := notification.serialize()
|
||||
rawJSON, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -283,13 +225,13 @@ func (rps *RedisPeriodicScheduler) Load() error {
|
||||
return err
|
||||
}
|
||||
|
||||
allPeriodicPolicies := make([]*periodicJobPolicy, 0, len(bytes)/2)
|
||||
allPeriodicPolicies := make([]*PeriodicJobPolicy, 0, len(bytes)/2)
|
||||
for i, l := 0, len(bytes); i < l; i = i + 2 {
|
||||
rawPolicy := bytes[i].([]byte)
|
||||
rawScore := bytes[i+1].([]byte)
|
||||
policy := &periodicJobPolicy{}
|
||||
policy := &PeriodicJobPolicy{}
|
||||
|
||||
if err := policy.deSerialize(rawPolicy); err != nil {
|
||||
if err := policy.DeSerialize(rawPolicy); err != nil {
|
||||
//Ignore error which means the policy data is not valid
|
||||
//Only logged
|
||||
log.Warningf("failed to deserialize periodic policy with error:%s; raw data: %s\n", err, rawPolicy)
|
||||
@ -333,6 +275,26 @@ func (rps *RedisPeriodicScheduler) Clear() error {
|
||||
return err
|
||||
}
|
||||
|
||||
//AcceptPeriodicPolicy is implementation of the same method in period.Interface
|
||||
func (rps *RedisPeriodicScheduler) AcceptPeriodicPolicy(policy *PeriodicJobPolicy) error {
|
||||
if policy == nil || utils.IsEmptyStr(policy.PolicyID) {
|
||||
return errors.New("nil periodic policy")
|
||||
}
|
||||
|
||||
rps.pstore.add(policy)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//RemovePeriodicPolicy is implementation of the same method in period.Interface
|
||||
func (rps *RedisPeriodicScheduler) RemovePeriodicPolicy(policyID string) *PeriodicJobPolicy {
|
||||
if utils.IsEmptyStr(policyID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return rps.pstore.remove(policyID)
|
||||
}
|
||||
|
||||
func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) {
|
||||
if utils.IsEmptyStr(rawPolicy) {
|
||||
return 0, false
|
||||
@ -363,17 +325,3 @@ func (rps *RedisPeriodicScheduler) getIDByScore(score int64) (string, error) {
|
||||
|
||||
return ids[0], nil
|
||||
}
|
||||
|
||||
func readMessage(data []byte) *periodicJobPolicyEvent {
|
||||
if data == nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
notification := &periodicJobPolicyEvent{}
|
||||
err := json.Unmarshal(data, notification)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return notification
|
||||
}
|
||||
|
@ -22,6 +22,9 @@ type Interface interface {
|
||||
//Register multiple jobs.
|
||||
//
|
||||
//jobs map[string]interface{}: job map, key is job name and value is job handler.
|
||||
//
|
||||
//Return:
|
||||
// error if failed to register
|
||||
RegisterJobs(jobs map[string]interface{}) error
|
||||
|
||||
//Enqueue job
|
||||
@ -60,8 +63,9 @@ type Interface interface {
|
||||
|
||||
//Return the status info of the pool.
|
||||
//
|
||||
//models.JobPoolStats : the stats info of the pool
|
||||
//error : failed to check
|
||||
//Returns:
|
||||
// models.JobPoolStats : the stats info of all running pools
|
||||
// error : failed to check
|
||||
Stats() (models.JobPoolStats, error)
|
||||
|
||||
//Check if the job has been already registered.
|
||||
@ -78,7 +82,7 @@ type Interface interface {
|
||||
//jobType interface{} : type of known job
|
||||
// params map[string]interface{} : parameters of known job
|
||||
//
|
||||
//Returns:
|
||||
//Return:
|
||||
// error if parameters are not valid
|
||||
|
||||
ValidateJobParameters(jobType interface{}, params map[string]interface{}) error
|
||||
@ -96,7 +100,7 @@ type Interface interface {
|
||||
//
|
||||
//jobID string : ID of the enqueued job
|
||||
//
|
||||
//Returns:
|
||||
//Return:
|
||||
// error : error returned if meet any problems
|
||||
StopJob(jobID string) error
|
||||
|
||||
@ -104,7 +108,7 @@ type Interface interface {
|
||||
//
|
||||
//jobID string : ID of the enqueued job
|
||||
//
|
||||
//Returns:
|
||||
//Return:
|
||||
// error : error returned if meet any problems
|
||||
CancelJob(jobID string) error
|
||||
|
||||
@ -112,7 +116,16 @@ type Interface interface {
|
||||
//
|
||||
//jobID string : ID of the enqueued job
|
||||
//
|
||||
//Returns:
|
||||
//Return:
|
||||
// error : error returned if meet any problems
|
||||
RetryJob(jobID string) error
|
||||
|
||||
//Register hook
|
||||
//
|
||||
//jobID string : ID of job
|
||||
//hookURL string : the hook url
|
||||
//
|
||||
//Return:
|
||||
// error : error returned if meet any problems
|
||||
RegisterHook(jobID string, hookURL string) error
|
||||
}
|
||||
|
183
src/jobservice_v2/pool/message_server.go
Normal file
183
src/jobservice_v2/pool/message_server.go
Normal file
@ -0,0 +1,183 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/opm"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/period"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
)
|
||||
|
||||
//MessageServer implements the sub/pub mechanism via redis to do async message exchanging.
|
||||
type MessageServer struct {
|
||||
context context.Context
|
||||
redisPool *redis.Pool
|
||||
namespace string
|
||||
callbacks map[string]reflect.Value //no need to sync
|
||||
}
|
||||
|
||||
//NewMessageServer creates a new ptr of MessageServer
|
||||
func NewMessageServer(ctx context.Context, namespace string, redisPool *redis.Pool) *MessageServer {
|
||||
return &MessageServer{
|
||||
context: ctx,
|
||||
redisPool: redisPool,
|
||||
namespace: namespace,
|
||||
callbacks: make(map[string]reflect.Value),
|
||||
}
|
||||
}
|
||||
|
||||
//Start to serve
|
||||
func (ms *MessageServer) Start() error {
|
||||
defer func() {
|
||||
log.Info("Message server is stopped")
|
||||
}()
|
||||
|
||||
//As we get one connection from the pool, don't try to close it.
|
||||
conn := ms.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
psc := redis.PubSubConn{
|
||||
Conn: conn,
|
||||
}
|
||||
|
||||
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(ms.namespace))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
switch res := psc.Receive().(type) {
|
||||
case error:
|
||||
done <- res
|
||||
return
|
||||
case redis.Message:
|
||||
m := &models.Message{}
|
||||
if err := json.Unmarshal(res.Data, m); err != nil {
|
||||
//logged
|
||||
log.Warningf("read invalid message: %s\n", res.Data)
|
||||
}
|
||||
if callback, ok := ms.callbacks[m.Event]; !ok {
|
||||
//logged
|
||||
log.Warningf("no handler to handle event %s\n", m.Event)
|
||||
} else {
|
||||
//Try to recover the concrete type
|
||||
var converted interface{}
|
||||
switch m.Event {
|
||||
case period.EventSchedulePeriodicPolicy,
|
||||
period.EventUnSchedulePeriodicPolicy:
|
||||
//ignore error, actually error should not be happend because we did not change data
|
||||
//after the last unmarshal try.
|
||||
policyObject := &period.PeriodicJobPolicy{}
|
||||
dt, _ := json.Marshal(m.Data)
|
||||
json.Unmarshal(dt, policyObject)
|
||||
converted = policyObject
|
||||
case opm.EventRegisterStatusHook:
|
||||
//ignore error
|
||||
hookObject := &opm.HookData{}
|
||||
dt, _ := json.Marshal(m.Data)
|
||||
json.Unmarshal(dt, hookObject)
|
||||
converted = hookObject
|
||||
}
|
||||
res := callback.Call([]reflect.Value{reflect.ValueOf(converted)})
|
||||
e := res[0].Interface()
|
||||
if e != nil {
|
||||
err := e.(error)
|
||||
//logged
|
||||
log.Errorf("failed to fire callback with error: %s\n", err)
|
||||
}
|
||||
}
|
||||
case redis.Subscription:
|
||||
switch res.Kind {
|
||||
case "subscribe":
|
||||
log.Infof("Subscribe redis channel %s\n", res.Channel)
|
||||
break
|
||||
case "unsubscribe":
|
||||
//Unsubscribe all, means main goroutine is exiting
|
||||
log.Infof("Unsubscribe redis channel %s\n", res.Channel)
|
||||
done <- nil
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Info("Message server is started")
|
||||
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
//blocking here
|
||||
for err == nil {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err = psc.Ping("ping!")
|
||||
case <-ms.context.Done():
|
||||
err = errors.New("context exit")
|
||||
case err = <-done:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//Unsubscribe all
|
||||
psc.Unsubscribe()
|
||||
return <-done
|
||||
}
|
||||
|
||||
//Subscribe event with specified callback
|
||||
func (ms *MessageServer) Subscribe(event string, callback interface{}) error {
|
||||
if utils.IsEmptyStr(event) {
|
||||
return errors.New("empty event is not allowed")
|
||||
}
|
||||
|
||||
handler, err := validateCallbackFunc(callback)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ms.callbacks[event] = handler
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateCallbackFunc(callback interface{}) (reflect.Value, error) {
|
||||
if callback == nil {
|
||||
return reflect.ValueOf(nil), errors.New("nil callback handler")
|
||||
}
|
||||
|
||||
vFn := reflect.ValueOf(callback)
|
||||
vFType := vFn.Type()
|
||||
if vFType.Kind() != reflect.Func {
|
||||
return reflect.ValueOf(nil), errors.New("callback handler must be a generic func")
|
||||
}
|
||||
|
||||
inNum := vFType.NumIn()
|
||||
outNum := vFType.NumOut()
|
||||
if inNum != 1 || outNum != 1 {
|
||||
return reflect.ValueOf(nil), errors.New("callback handler can only be func(interface{})error format")
|
||||
}
|
||||
|
||||
inType := vFType.In(0)
|
||||
var intf *interface{}
|
||||
if inType != reflect.TypeOf(intf).Elem() {
|
||||
return reflect.ValueOf(nil), errors.New("callback handler can only be func(interface{})error format")
|
||||
}
|
||||
|
||||
outType := vFType.Out(0)
|
||||
var e *error
|
||||
if outType != reflect.TypeOf(e).Elem() {
|
||||
return reflect.ValueOf(nil), errors.New("callback handler can only be func(interface{})error format")
|
||||
}
|
||||
|
||||
return vFn, nil
|
||||
}
|
@ -14,7 +14,7 @@ import (
|
||||
|
||||
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
|
||||
type RedisJob struct {
|
||||
job interface{} // the real job implementation
|
||||
job interface{} //the real job implementation
|
||||
context *env.Context //context
|
||||
statsManager opm.JobStatsManager //job stats manager
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ package pool
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
@ -34,15 +33,16 @@ const (
|
||||
|
||||
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
|
||||
type GoCraftWorkPool struct {
|
||||
namespace string
|
||||
redisPool *redis.Pool
|
||||
pool *work.WorkerPool
|
||||
enqueuer *work.Enqueuer
|
||||
sweeper *period.Sweeper
|
||||
client *work.Client
|
||||
context *env.Context
|
||||
scheduler period.Interface
|
||||
statsManager opm.JobStatsManager
|
||||
namespace string
|
||||
redisPool *redis.Pool
|
||||
pool *work.WorkerPool
|
||||
enqueuer *work.Enqueuer
|
||||
sweeper *period.Sweeper
|
||||
client *work.Client
|
||||
context *env.Context
|
||||
scheduler period.Interface
|
||||
statsManager opm.JobStatsManager
|
||||
messageServer *MessageServer
|
||||
|
||||
//no need to sync as write once and then only read
|
||||
//key is name of known job
|
||||
@ -81,20 +81,22 @@ func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool
|
||||
pool := work.NewWorkerPool(RedisPoolContext{}, cfg.WorkerCount, cfg.Namespace, redisPool)
|
||||
enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool)
|
||||
client := work.NewClient(cfg.Namespace, redisPool)
|
||||
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
|
||||
scheduler := period.NewRedisPeriodicScheduler(ctx, cfg.Namespace, redisPool)
|
||||
sweeper := period.NewSweeper(cfg.Namespace, redisPool, client)
|
||||
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, cfg.Namespace, redisPool, client, scheduler)
|
||||
msgServer := NewMessageServer(ctx.SystemContext, cfg.Namespace, redisPool)
|
||||
return &GoCraftWorkPool{
|
||||
namespace: cfg.Namespace,
|
||||
redisPool: redisPool,
|
||||
pool: pool,
|
||||
enqueuer: enqueuer,
|
||||
scheduler: scheduler,
|
||||
sweeper: sweeper,
|
||||
client: client,
|
||||
context: ctx,
|
||||
statsManager: statsMgr,
|
||||
knownJobs: make(map[string]interface{}),
|
||||
namespace: cfg.Namespace,
|
||||
redisPool: redisPool,
|
||||
pool: pool,
|
||||
enqueuer: enqueuer,
|
||||
scheduler: scheduler,
|
||||
sweeper: sweeper,
|
||||
client: client,
|
||||
context: ctx,
|
||||
statsManager: statsMgr,
|
||||
knownJobs: make(map[string]interface{}),
|
||||
messageServer: msgServer,
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,20 +115,39 @@ func (gcwp *GoCraftWorkPool) Start() {
|
||||
|
||||
gcwp.context.WG.Add(1)
|
||||
go func() {
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
gcwp.context.WG.Done()
|
||||
gcwp.statsManager.Shutdown()
|
||||
if err != nil {
|
||||
//report error
|
||||
gcwp.context.ErrorChan <- err
|
||||
done <- struct{}{} //exit immediately
|
||||
}
|
||||
}()
|
||||
//Start stats manager
|
||||
//None-blocking
|
||||
gcwp.statsManager.Start()
|
||||
log.Info("Redis job stats manager is started")
|
||||
|
||||
//blocking call
|
||||
if err := gcwp.scheduler.Start(); err != nil {
|
||||
//Scheduler exits with error
|
||||
gcwp.context.ErrorChan <- err
|
||||
done <- struct{}{}
|
||||
//Register callbacks
|
||||
if err = gcwp.messageServer.Subscribe(period.EventSchedulePeriodicPolicy,
|
||||
func(data interface{}) error {
|
||||
return gcwp.handleSchedulePolicy(data)
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if err = gcwp.messageServer.Subscribe(period.EventUnSchedulePeriodicPolicy,
|
||||
func(data interface{}) error {
|
||||
return gcwp.handleUnSchedulePolicy(data)
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
if err = gcwp.messageServer.Subscribe(opm.EventRegisterStatusHook,
|
||||
func(data interface{}) error {
|
||||
return gcwp.handleRegisterStatusHook(data)
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//Start message server
|
||||
if err = gcwp.messageServer.Start(); err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
@ -135,6 +156,21 @@ func (gcwp *GoCraftWorkPool) Start() {
|
||||
go func() {
|
||||
defer func() {
|
||||
gcwp.context.WG.Done()
|
||||
gcwp.statsManager.Shutdown()
|
||||
}()
|
||||
//Start stats manager
|
||||
//None-blocking
|
||||
gcwp.statsManager.Start()
|
||||
|
||||
//blocking call
|
||||
gcwp.scheduler.Start()
|
||||
}()
|
||||
|
||||
gcwp.context.WG.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
gcwp.context.WG.Done()
|
||||
log.Infof("Redis worker pool is stopped")
|
||||
}()
|
||||
|
||||
//Clear dirty data before pool starting
|
||||
@ -156,7 +192,6 @@ func (gcwp *GoCraftWorkPool) Start() {
|
||||
}
|
||||
|
||||
gcwp.pool.Stop()
|
||||
log.Infof("Redis worker pool is stopped")
|
||||
}()
|
||||
}
|
||||
|
||||
@ -301,27 +336,34 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
|
||||
}
|
||||
|
||||
//Find the heartbeat of this pool via pid
|
||||
pid := os.Getpid()
|
||||
stats := make([]*models.JobPoolStatsData, 0)
|
||||
for _, hb := range hbs {
|
||||
if hb.Pid == pid {
|
||||
wPoolStatus := workerPoolStatusHealthy
|
||||
if time.Unix(hb.HeartbeatAt, 0).Add(workerPoolDeadTime).Before(time.Now()) {
|
||||
wPoolStatus = workerPoolStatusDead
|
||||
}
|
||||
stats := models.JobPoolStats{
|
||||
WorkerPoolID: hb.WorkerPoolID,
|
||||
StartedAt: hb.StartedAt,
|
||||
HeartbeatAt: hb.HeartbeatAt,
|
||||
JobNames: hb.JobNames,
|
||||
Concurrency: hb.Concurrency,
|
||||
Status: wPoolStatus,
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
if hb.HeartbeatAt == 0 {
|
||||
continue //invalid ones
|
||||
}
|
||||
|
||||
wPoolStatus := workerPoolStatusHealthy
|
||||
if time.Unix(hb.HeartbeatAt, 0).Add(workerPoolDeadTime).Before(time.Now()) {
|
||||
wPoolStatus = workerPoolStatusDead
|
||||
}
|
||||
stat := &models.JobPoolStatsData{
|
||||
WorkerPoolID: hb.WorkerPoolID,
|
||||
StartedAt: hb.StartedAt,
|
||||
HeartbeatAt: hb.HeartbeatAt,
|
||||
JobNames: hb.JobNames,
|
||||
Concurrency: hb.Concurrency,
|
||||
Status: wPoolStatus,
|
||||
}
|
||||
stats = append(stats, stat)
|
||||
}
|
||||
|
||||
return models.JobPoolStats{}, errors.New("Failed to get stats of worker pool")
|
||||
if len(stats) == 0 {
|
||||
return models.JobPoolStats{}, errors.New("Failed to get stats of worker pools")
|
||||
}
|
||||
|
||||
return models.JobPoolStats{
|
||||
Pools: stats,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//StopJob will stop the job
|
||||
@ -355,6 +397,64 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m
|
||||
return theJ.Validate(params)
|
||||
}
|
||||
|
||||
//RegisterHook registers status hook url
|
||||
//sync method
|
||||
func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return errors.New("empty job ID")
|
||||
}
|
||||
|
||||
if utils.IsEmptyStr(hookURL) {
|
||||
return errors.New("empty hook url")
|
||||
}
|
||||
|
||||
return gcwp.statsManager.RegisterHook(jobID, hookURL, false)
|
||||
}
|
||||
|
||||
func (gcwp *GoCraftWorkPool) handleSchedulePolicy(data interface{}) error {
|
||||
if data == nil {
|
||||
return errors.New("nil data interface")
|
||||
}
|
||||
|
||||
pl, ok := data.(*period.PeriodicJobPolicy)
|
||||
if !ok {
|
||||
return errors.New("malformed policy object")
|
||||
}
|
||||
|
||||
return gcwp.scheduler.AcceptPeriodicPolicy(pl)
|
||||
}
|
||||
|
||||
func (gcwp *GoCraftWorkPool) handleUnSchedulePolicy(data interface{}) error {
|
||||
if data == nil {
|
||||
return errors.New("nil data interface")
|
||||
}
|
||||
|
||||
pl, ok := data.(*period.PeriodicJobPolicy)
|
||||
if !ok {
|
||||
return errors.New("malformed policy object")
|
||||
}
|
||||
|
||||
removed := gcwp.scheduler.RemovePeriodicPolicy(pl.PolicyID)
|
||||
if removed == nil {
|
||||
return errors.New("nothing removed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcwp *GoCraftWorkPool) handleRegisterStatusHook(data interface{}) error {
|
||||
if data == nil {
|
||||
return errors.New("nil data interface")
|
||||
}
|
||||
|
||||
hook, ok := data.(*opm.HookData)
|
||||
if !ok {
|
||||
return errors.New("malformed hook object")
|
||||
}
|
||||
|
||||
return gcwp.statsManager.RegisterHook(hook.JobID, hook.HookURL, true)
|
||||
}
|
||||
|
||||
//log the job
|
||||
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
|
||||
log.Infof("Job incoming: %s:%s", job.Name, job.ID)
|
||||
|
@ -65,7 +65,7 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
|
||||
|
||||
//Start the API server
|
||||
apiServer := bs.loadAndRunAPIServer(rootContext, cfg, ctl)
|
||||
log.Infof("Server is starting at %s:%d with %s", "", cfg.Port, cfg.Protocol)
|
||||
log.Infof("Server is started at %s:%d with %s", "", cfg.Port, cfg.Protocol)
|
||||
|
||||
//Block here
|
||||
sig := make(chan os.Signal, 1)
|
||||
|
Loading…
Reference in New Issue
Block a user