Implement periodically enqueue job feature

add scheduler based on redis
add job wrapper for redis driver
other necessary modules
This commit is contained in:
Steven Zou 2018-03-13 23:58:07 +08:00
parent 7a3c35d178
commit 7b8971e930
24 changed files with 1378 additions and 89 deletions

View File

@ -30,14 +30,12 @@ type Handler interface {
//DefaultHandler is the default request handler which implements the Handler interface.
type DefaultHandler struct {
context core.BaseContext
controller *core.Controller
}
//NewDefaultHandler is constructor of DefaultHandler.
func NewDefaultHandler(ctx core.BaseContext, ctl *core.Controller) *DefaultHandler {
func NewDefaultHandler(ctl *core.Controller) *DefaultHandler {
return &DefaultHandler{
context: ctx,
controller: ctl,
}
}
@ -62,7 +60,7 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re
}
//Pass request to the controller for the follow-up.
jobStats, err := dh.controller.LaunchJob(dh.context, jobReq)
jobStats, err := dh.controller.LaunchJob(jobReq)
if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.LaunchJobError(err))
return

View File

@ -11,7 +11,7 @@ import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/config"
"github.com/vmware/harbor/src/jobservice_v2/core"
"github.com/vmware/harbor/src/jobservice_v2/env"
)
//Server serves the http requests.
@ -26,7 +26,7 @@ type Server struct {
config ServerConfig
//The context
context core.BaseContext
context *env.Context
}
//ServerConfig contains the configurations of Server.
@ -45,7 +45,7 @@ type ServerConfig struct {
}
//NewServer is constructor of Server.
func NewServer(ctx core.BaseContext, router Router, cfg ServerConfig) *Server {
func NewServer(ctx *env.Context, router Router, cfg ServerConfig) *Server {
srv := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: http.HandlerFunc(router.ServeHTTP),
@ -82,6 +82,8 @@ func NewServer(ctx core.BaseContext, router Router, cfg ServerConfig) *Server {
//Start the server to serve requests.
func (s *Server) Start() {
s.context.WG.Add(1)
go func() {
var err error
@ -97,7 +99,7 @@ func (s *Server) Start() {
}
if err != nil {
log.Errorf("API server error: %s\n", err)
s.context.ErrorChan <- err
}
}()
}

View File

@ -8,7 +8,7 @@ https_config:
key: "server.key"
#Server listening port
port: 9443
port: 8443
#Worker pool
worker_pool:

View File

@ -1,16 +0,0 @@
package core
import (
"context"
"sync"
)
//BaseContext keep some sharable materials.
//The system context.Context interface is also included.
type BaseContext struct {
//The system context with cancel capability.
SystemContext context.Context
//Coordination signal
WG *sync.WaitGroup
}

View File

@ -1,23 +1,68 @@
package core
import (
"errors"
"fmt"
"github.com/robfig/cron"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/pool"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
//Controller implement the core interface and provides related job handle methods.
//Controller will coordinate the lower components to complete the process as a commander role.
type Controller struct{}
type Controller struct {
//Refer the backend pool
backendPool pool.Interface
}
//NewController is constructor of Controller.
func NewController() *Controller {
return &Controller{}
func NewController(backendPool pool.Interface) *Controller {
return &Controller{
backendPool: backendPool,
}
}
//LaunchJob is implementation of same method in core interface.
func (c *Controller) LaunchJob(ctx BaseContext, req models.JobRequest) (models.JobStats, error) {
return models.JobStats{
JobID: "111112222xxx",
}, nil
func (c *Controller) LaunchJob(req models.JobRequest) (models.JobStats, error) {
if err := validJobReq(req); err != nil {
return models.JobStats{}, err
}
paramsRequired, isKnownJob := c.backendPool.IsKnownJob(req.Job.Name)
if !isKnownJob {
return models.JobStats{}, fmt.Errorf("job with name '%s' is unknown", req.Job.Name)
}
if paramsRequired {
if req.Job.Parameters == nil || len(req.Job.Parameters) == 0 {
return models.JobStats{}, fmt.Errorf("'parameters' is required by job '%s'", req.Job.Name)
}
}
//Enqueue job regarding of the kind
var (
res models.JobStats
err error
)
switch req.Job.Metadata.JobKind {
case job.JobKindScheduled:
res, err = c.backendPool.Schedule(
req.Job.Name,
req.Job.Parameters,
req.Job.Metadata.ScheduleDelay,
req.Job.Metadata.IsUnique)
case job.JobKindPeriodic:
res, err = c.backendPool.PeriodicallyEnqueue(
req.Job.Name,
req.Job.Parameters,
req.Job.Metadata.Cron)
default:
res, err = c.backendPool.Enqueue(req.Job.Name, req.Job.Parameters, req.Job.Metadata.IsUnique)
}
return res, err
}
//GetJob is implementation of same method in core interface.
@ -31,11 +76,53 @@ func (c *Controller) StopJob(jobID string) error {
}
//RetryJob is implementation of same method in core interface.
func (c *Controller) RetryJob(ctx BaseContext, jonID string) error {
func (c *Controller) RetryJob(jonID string) error {
return nil
}
//CheckStatus is implementation of same method in core interface.
func (c *Controller) CheckStatus() (models.JobServiceStats, error) {
return models.JobServiceStats{}, nil
func (c *Controller) CheckStatus() (models.JobPoolStats, error) {
return models.JobPoolStats{}, nil
}
func validJobReq(req models.JobRequest) error {
if req.Job == nil {
return errors.New("empty job request is not allowed")
}
if utils.IsEmptyStr(req.Job.Name) {
return errors.New("name of job must be specified")
}
if req.Job.Metadata == nil {
return errors.New("metadata of job is missing")
}
if req.Job.Metadata.JobKind != job.JobKindGeneric &&
req.Job.Metadata.JobKind != job.JobKindPeriodic &&
req.Job.Metadata.JobKind != job.JobKindScheduled {
return fmt.Errorf(
"job kind '%s' is not supported, only support '%s','%s','%s'",
req.Job.Metadata.JobKind,
job.JobKindGeneric,
job.JobKindScheduled,
job.JobKindPeriodic)
}
if req.Job.Metadata.JobKind == job.JobKindScheduled &&
req.Job.Metadata.ScheduleDelay == 0 {
return fmt.Errorf("'schedule_delay' must be specified if the job kind is '%s'", job.JobKindScheduled)
}
if req.Job.Metadata.JobKind == job.JobKindPeriodic {
if utils.IsEmptyStr(req.Job.Metadata.Cron) {
return fmt.Errorf("'cron_spec' must be specified if the job kind is '%s'", job.JobKindPeriodic)
}
if _, err := cron.Parse(req.Job.Metadata.Cron); err != nil {
return fmt.Errorf("'cron_spec' is not correctly set: %s", err)
}
}
return nil
}

View File

@ -11,13 +11,12 @@ import (
type Interface interface {
//LaunchJob is used to handle the job submission request.
//
//ctx BaseContext: The context info for job execution
//req JobRequest : Job request contains related required information of queuing job.
//
//Returns:
// JobStats: Job status info with ID and self link returned if job is successfully launched.
// error : Error returned if failed to launch the specified job.
LaunchJob(ctx BaseContext, req models.JobRequest) (models.JobStats, error)
LaunchJob(req models.JobRequest) (models.JobStats, error)
//GetJob is used to handle the job stats query request.
//
@ -38,13 +37,12 @@ type Interface interface {
//RetryJob is used to handle the job retrying request.
//
//ctx BaseContext: The context info for job execution
//jobID string : ID of job.
//
//Return:
// error : Error returned if failed to retry the specified job.
RetryJob(ctx BaseContext, jonID string) error
RetryJob(jonID string) error
//CheckStatus is used to handle the job service healthy status checking request.
CheckStatus() (models.JobServiceStats, error)
CheckStatus() (models.JobPoolStats, error)
}

20
src/jobservice_v2/env/context.go vendored Normal file
View File

@ -0,0 +1,20 @@
package env
import (
"context"
"sync"
)
//Context keep some sharable materials and system controling channels.
//The system context.Context interface is also included.
type Context struct {
//The system context with cancel capability.
SystemContext context.Context
//Coordination signal
WG *sync.WaitGroup
//Report errors to bootstrap component
//Once error is reported by lower components, the whole system should exit
ErrorChan chan error
}

View File

@ -69,3 +69,8 @@ func LaunchJobError(err error) error {
type jobStoppedError struct {
baseError
}
//jobCancelledError is designed for the case of cancelling job.
type jobCancelledError struct {
baseError
}

View File

@ -1,16 +1,19 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
import (
"context"
hlog "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/core"
)
//Context is combination of BaseContext and other job specified resources.
//Context will be the real execution context for one job.
//Use pointer to point to the singleton BaseContext copy.
type Context struct {
//Base context
*core.BaseContext
//System context
SystemContext context.Context
//Logger for job
Logger *hlog.Logger

View File

@ -14,16 +14,29 @@ type Interface interface {
//ctx Context: Job execution context.
SetContext(ctx Context)
//Pass arguments via this method if have.
//Pass parameters via this method if have.
//
//args map[string]interface{}: arguments with key-pair style for the job execution.
SetArgs(args map[string]interface{})
//params map[string]interface{}: parameters with key-pair style for the job execution.
SetParams(params map[string]interface{})
//Inject the func into the job for OP command check.
//
//f CheckOPCmdFunc: check function reference.
SetCheckOPCmdFunc(f CheckOPCmdFunc)
//Declare how many times the job can be retried if failed.
//
//Return:
// uint: the failure count allowed
MaxFails() uint
//Indicate whether the job needs parameters or not
//
//Return:
// true if required (parameter will be pre-validated and 'SetParams' will be called)
// false if no parameters needed (no check and 'SetParams' will not be called)
ParamsRequired() bool
//Run the business logic here.
Run() error
}

View File

@ -0,0 +1,12 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
const (
//JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"
//JobKindScheduled : Kind of scheduled job
JobKindScheduled = "Scheduled"
//JobKindPeriodic : Kind of periodic job
JobKindPeriodic = "Periodic"
)

View File

@ -0,0 +1,16 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
const (
//JobStatusPending : job status pending
JobStatusPending = "Pending"
//JobStatusRunning : job status running
JobStatusRunning = "Running"
//JobStatusStopped : job status stopped
JobStatusStopped = "Stopped"
//JobStatusCancelled : job status cancelled
JobStatusCancelled = "Cancelled"
//JobStatusError : job status error
JobStatusError = "Error"
)

View File

@ -0,0 +1,10 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
//Define the register name constants of known jobs
const (
//KnownJobReplication is name of replication job
KnownJobReplication = "REPLICATION"
)

View File

@ -0,0 +1,57 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
import (
"reflect"
"github.com/gocraft/work"
"github.com/vmware/harbor/src/jobservice_v2/env"
)
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
type RedisJob struct {
job interface{}
context *env.Context
}
//NewRedisJob is constructor of RedisJob
func NewRedisJob(j interface{}, ctx *env.Context) *RedisJob {
return &RedisJob{j, ctx}
}
//Run the job
func (rj *RedisJob) Run(j *work.Job) error {
//Inject data
jobContext := Context{
SystemContext: rj.context.SystemContext,
}
runningJob := rj.Wrap()
runningJob.SetContext(jobContext)
if runningJob.ParamsRequired() {
runningJob.SetParams(j.Args)
}
//TODO: Update job status to 'Running'
err := runningJob.Run()
//TODO:
//If error is stopped error, update status to 'Stopped' and return nil
//If error is cancelled error, update status to 'Cancelled' and return err
return err
}
//Wrap returns a new (job.)Interface based on the wrapped job handler reference.
func (rj *RedisJob) Wrap() Interface {
theType := reflect.TypeOf(rj.job)
if theType.Kind() == reflect.Ptr {
theType = theType.Elem()
}
//Crate new
v := reflect.New(theType).Elem()
return v.Addr().Interface().(Interface)
}

View File

@ -0,0 +1,45 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package job
import (
"fmt"
)
//ReplicationJob is the job for replicating repositories.
type ReplicationJob struct {
ctx Context
params map[string]interface{}
opCmdFunc CheckOPCmdFunc
}
//SetContext is implementation of same method in Interface.
func (rj *ReplicationJob) SetContext(ctx Context) {
rj.ctx = ctx
fmt.Printf("ReplicationJob context=%#v\n", rj.ctx)
}
//SetParams is implementation of same method in Interface.
func (rj *ReplicationJob) SetParams(params map[string]interface{}) {
rj.params = params
fmt.Printf("ReplicationJob args: %v\n", rj.params)
}
//SetCheckOPCmdFunc is implementation of same method in Interface.
func (rj *ReplicationJob) SetCheckOPCmdFunc(f CheckOPCmdFunc) {}
//MaxFails is implementation of same method in Interface.
func (rj *ReplicationJob) MaxFails() uint {
return 2
}
//ParamsRequired is implementation of same method in Interface.
func (rj *ReplicationJob) ParamsRequired() bool {
return true
}
//Run the replication logic here.
func (rj *ReplicationJob) Run() error {
fmt.Println("=======Replication job running=======")
return nil
}

View File

@ -2,13 +2,48 @@
package models
import (
"time"
)
//Parameters for job execution.
type Parameters map[string]interface{}
//JobRequest is the request of launching a job.
type JobRequest struct{}
type JobRequest struct {
Job *JobData `json:"job"`
}
//JobData keeps the basic info.
type JobData struct {
Name string `json:"name"`
Parameters Parameters `json:"parameters"`
Metadata *JobMetadata `json:"metadata"`
}
//JobMetadata stores the metadata of job.
type JobMetadata struct {
JobKind string `json:"kind"`
ScheduleDelay uint64 `json:"schedule_delay,omitempty"`
Cron string `json:"cron_spec,omitempty"`
IsUnique bool `json:"unique"`
}
//JobStats keeps the result of job launching.
type JobStats struct {
JobID string `json:"job_id"`
Stats *JobStatData `json:"job"`
}
//JobServiceStats represent the healthy and status of the job service.
type JobServiceStats struct{}
//JobStatData keeps the stats of job
type JobStatData struct {
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
RefLink string `json:"ref_link,omitempty"`
EnqueueTime time.Time `json:"enqueue_time"`
UpdateTime time.Time `json:"update_time"`
RunAt time.Time `json:"run_at,omitempty"`
}
//JobPoolStats represent the healthy and status of the job service.
type JobPoolStats struct{}

View File

@ -0,0 +1,193 @@
//Refer github.com/gocraft/work
package period
import (
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"github.com/robfig/cron"
"github.com/vmware/harbor/src/common/utils/log"
)
const (
periodicEnqueuerSleep = 2 * time.Minute
periodicEnqueuerHorizon = 4 * time.Minute
)
type periodicEnqueuer struct {
namespace string
pool *redis.Pool
policyStore *periodicJobPolicyStore
scheduledPeriodicJobs []*scheduledPeriodicJob
stopChan chan struct{}
doneStoppingChan chan struct{}
}
type periodicJob struct {
jobName string
spec string
schedule cron.Schedule
}
type scheduledPeriodicJob struct {
scheduledAt time.Time
scheduledAtEpoch int64
*periodicJob
}
func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore) *periodicEnqueuer {
return &periodicEnqueuer{
namespace: namespace,
pool: pool,
policyStore: policyStore,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
}
}
func (pe *periodicEnqueuer) start() {
go pe.loop()
log.Info("Periodic enqueuer is started")
}
func (pe *periodicEnqueuer) stop() {
pe.stopChan <- struct{}{}
<-pe.doneStoppingChan
}
func (pe *periodicEnqueuer) loop() {
defer func() {
log.Info("Periodic enqueuer is stopped")
}()
// Begin reaping periodically
timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second)
defer timer.Stop()
if pe.shouldEnqueue() {
err := pe.enqueue()
if err != nil {
log.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err)
}
}
for {
select {
case <-pe.stopChan:
pe.doneStoppingChan <- struct{}{}
return
case <-timer.C:
timer.Reset(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second)
if pe.shouldEnqueue() {
err := pe.enqueue()
if err != nil {
log.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err)
}
}
}
}
}
func (pe *periodicEnqueuer) enqueue() error {
now := nowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
conn := pe.pool.Get()
defer conn.Close()
for _, pl := range pe.policyStore.list() {
schedule, err := cron.Parse(pl.CronSpec)
if err != nil {
//The cron spec should be already checked at top components.
//Just in cases, if error occurred, ignore it
continue
}
pj := &periodicJob{
jobName: pl.JobName,
spec: pl.CronSpec,
schedule: schedule,
}
for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
epoch := t.Unix()
id := makeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it
job := &work.Job{
Name: pj.jobName,
ID: id,
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
EnqueuedAt: epoch,
Args: pl.JobParameters, //Pass parameters to scheduled job here
}
rawJSON, err := serializeJob(job)
if err != nil {
return err
}
_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
if err != nil {
return err
}
log.Infof("Schedule job %s for policy %s\n", pj.jobName, pl.PolicyID)
}
}
_, err := conn.Do("SET", redisKeyLastPeriodicEnqueue(pe.namespace), now)
return err
}
func (pe *periodicEnqueuer) shouldEnqueue() bool {
conn := pe.pool.Get()
defer conn.Close()
lastEnqueue, err := redis.Int64(conn.Do("GET", redisKeyLastPeriodicEnqueue(pe.namespace)))
if err == redis.ErrNil {
return true
} else if err != nil {
log.Errorf("periodic_enqueuer.should_enqueue:%s\n", err)
return true
}
return lastEnqueue < (nowEpochSeconds() - int64(periodicEnqueuerSleep/time.Minute))
}
var nowMock int64
func nowEpochSeconds() int64 {
if nowMock != 0 {
return nowMock
}
return time.Now().Unix()
}
func makeUniquePeriodicID(name, spec string, epoch int64) string {
return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch)
}
func serializeJob(job *work.Job) ([]byte, error) {
return json.Marshal(job)
}
func redisNamespacePrefix(namespace string) string {
l := len(namespace)
if (l > 0) && (namespace[l-1] != ':') {
namespace = namespace + ":"
}
return namespace
}
func redisKeyScheduled(namespace string) string {
return redisNamespacePrefix(namespace) + "scheduled"
}
func redisKeyLastPeriodicEnqueue(namespace string) string {
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
}

View File

@ -0,0 +1,42 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package period
import "github.com/vmware/harbor/src/jobservice_v2/models"
//Interface defines operations the periodic scheduler should have.
type Interface interface {
//Schedule the specified cron job policy.
//
//jobName string : The name of periodical job
//params models.Parameters : The parameters required by the periodical job
//cronSpec string : The periodical settings with cron format
//
//Returns:
// The uuid of the cron job policy
// error if failed to schedule
Schedule(jobName string, params models.Parameters, cronSpec string) (string, error)
//Unschedule the specified cron job policy.
//
//cronJobPolicyID string: The ID of cron job policy.
//
//Return:
// error if failed to unschedule
UnSchedule(cronJobPolicyID string) error
//Load data
//
//Return:
// error if failed to do
Load() error
//Clear all the cron job policies.
//
//Return:
// error if failed to do
Clear() error
//Start to serve
Start() error
}

View File

@ -0,0 +1,122 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package period
import (
"encoding/json"
"sync"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
const (
//periodicJobPolicyChangeEventSchedule : Schedule periodic job policy event
periodicJobPolicyChangeEventSchedule = "Schedule"
//periodicJobPolicyChangeEventUnSchedule : UnSchedule periodic job policy event
periodicJobPolicyChangeEventUnSchedule = "UnSchedule"
)
//periodicJobPolicy ...
type periodicJobPolicy struct {
//NOTES: The 'PolicyID' should not be set when serialize this policy struct to the zset
//because each 'Policy ID' is different and it may cause issue of losing zset unique capability.
PolicyID string `json:"policy_id,omitempty"`
JobName string `json:"job_name"`
JobParameters map[string]interface{} `json:"job_params"`
CronSpec string `json:"cron_spec"`
}
//periodicJobPolicyEvent is the event content of periodic job policy change.
type periodicJobPolicyEvent struct {
Event string `json:"event"`
PeriodicJobPolicy *periodicJobPolicy `json:"periodic_job_policy"`
}
//serialize the policy to raw data.
func (pjp *periodicJobPolicy) serialize() ([]byte, error) {
return json.Marshal(pjp)
}
//deSerialize the raw json to policy.
func (pjp *periodicJobPolicy) deSerialize(rawJSON []byte) error {
return json.Unmarshal(rawJSON, pjp)
}
//serialize the policy to raw data.
func (pjpe *periodicJobPolicyEvent) serialize() ([]byte, error) {
return json.Marshal(pjpe)
}
//deSerialize the raw json to policy.
func (pjpe *periodicJobPolicyEvent) deSerialize(rawJSON []byte) error {
return json.Unmarshal(rawJSON, pjpe)
}
//periodicJobPolicyStore is in-memory cache for the periodic job policies.
type periodicJobPolicyStore struct {
lock *sync.RWMutex
policies map[string]*periodicJobPolicy //k-v pair and key is the policy ID
}
func (ps *periodicJobPolicyStore) addAll(items []*periodicJobPolicy) {
if items == nil || len(items) == 0 {
return
}
ps.lock.Lock()
defer ps.lock.Unlock()
for _, item := range items {
//Ignore the item with empty uuid
if !utils.IsEmptyStr(item.PolicyID) {
ps.policies[item.PolicyID] = item
}
}
}
func (ps *periodicJobPolicyStore) list() []*periodicJobPolicy {
allItems := make([]*periodicJobPolicy, 0)
ps.lock.RLock()
defer ps.lock.RUnlock()
for _, v := range ps.policies {
allItems = append(allItems, v)
}
return allItems
}
func (ps *periodicJobPolicyStore) add(jobPolicy *periodicJobPolicy) {
if jobPolicy == nil || utils.IsEmptyStr(jobPolicy.PolicyID) {
return
}
ps.lock.Lock()
defer ps.lock.Unlock()
ps.policies[jobPolicy.PolicyID] = jobPolicy
}
func (ps *periodicJobPolicyStore) remove(policyID string) *periodicJobPolicy {
if utils.IsEmptyStr(policyID) {
return nil
}
ps.lock.Lock()
defer ps.lock.Unlock()
if item, ok := ps.policies[policyID]; ok {
delete(ps.policies, policyID)
return item
}
return nil
}
func (ps *periodicJobPolicyStore) size() int {
ps.lock.RLock()
defer ps.lock.RUnlock()
return len(ps.policies)
}

View File

@ -0,0 +1,288 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package period
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
//RedisPeriodicScheduler manages the periodic scheduling policies.
type RedisPeriodicScheduler struct {
context context.Context
redisPool *redis.Pool
namespace string
pstore *periodicJobPolicyStore
enqueuer *periodicEnqueuer
}
//NewRedisPeriodicScheduler is constructor of RedisPeriodicScheduler
func NewRedisPeriodicScheduler(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisPeriodicScheduler {
pstore := &periodicJobPolicyStore{
lock: new(sync.RWMutex),
policies: make(map[string]*periodicJobPolicy),
}
enqueuer := newPeriodicEnqueuer(namespace, redisPool, pstore)
return &RedisPeriodicScheduler{
context: ctx,
redisPool: redisPool,
namespace: namespace,
pstore: pstore,
enqueuer: enqueuer,
}
}
//Start to serve
//Enable PUB/SUB
func (rps *RedisPeriodicScheduler) Start() error {
defer func() {
log.Info("Redis scheduler is stopped")
}()
//Load existing periodic job policies
if err := rps.Load(); err != nil {
return err
}
//As we get one connection from the pool, don't try to close it.
conn := rps.redisPool.Get()
psc := redis.PubSubConn{
Conn: conn,
}
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(rps.namespace))...)
if err != nil {
return err
}
done := make(chan error, 1)
go func() {
for {
switch res := psc.Receive().(type) {
case error:
done <- res
return
case redis.Message:
if notification := readMessage(res.Data); notification != nil {
log.Infof("Got periodic job policy change notification: %s:%s\n", notification.Event, notification.PeriodicJobPolicy.PolicyID)
switch notification.Event {
case periodicJobPolicyChangeEventSchedule:
rps.pstore.add(notification.PeriodicJobPolicy)
case periodicJobPolicyChangeEventUnSchedule:
if notification.PeriodicJobPolicy != nil {
rps.pstore.remove(notification.PeriodicJobPolicy.PolicyID)
}
default:
//do nothing
}
}
case redis.Subscription:
switch res.Kind {
case "subscribe":
log.Infof("Subscribe redis channel %s\n", res.Channel)
case "unsubscribe":
//Unsubscribe all, means main goroutine is exiting
log.Infof("Unsubscribe redis channel %s\n", res.Channel)
done <- nil
return
}
}
}
}()
//start enqueuer
rps.enqueuer.start()
defer rps.enqueuer.stop()
log.Info("Redis scheduler is started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
//blocking here
for err == nil {
select {
case <-ticker.C:
err = psc.Ping("ping!")
case <-rps.context.Done():
err = errors.New("context exit")
case err = <-done:
return err
}
}
//Unsubscribe all
psc.Unsubscribe()
return <-done
}
//Schedule is implementation of the same method in period.Interface
func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) {
if utils.IsEmptyStr(jobName) {
return "", errors.New("empty job name is not allowed")
}
if utils.IsEmptyStr(cronSpec) {
return "", errors.New("cron spec is not set")
}
//Although the ZSET can guarantee no duplicated items, we still need to check the existing
//of the job policy to avoid publish duplicated ones to other nodes as we
//use transaction commands.
jobPolicy := &periodicJobPolicy{
JobName: jobName,
JobParameters: params,
CronSpec: cronSpec,
}
//Serialize data
rawJSON, err := jobPolicy.serialize()
if err != nil {
return "", nil
}
//Check existing
//If existing, treat as a succeed submitting and return the exitsing id
if score, ok := rps.exists(string(rawJSON)); ok {
return utils.MakePeriodicPolicyUUIDWithScore(score), nil
}
uuid, score := utils.MakePeriodicPolicyUUID()
//Set back policy ID
jobPolicy.PolicyID = uuid
notification := &periodicJobPolicyEvent{
Event: periodicJobPolicyChangeEventSchedule,
PeriodicJobPolicy: jobPolicy,
}
rawJSON2, err := notification.serialize()
if err != nil {
return "", err
}
//Save to redis db and publish notification via redis transaction
conn := rps.redisPool.Get()
conn.Send("MULTI")
conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON)
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2)
if _, err := conn.Do("EXEC"); err != nil {
return "", err
}
return uuid, nil
}
//UnSchedule is implementation of the same method in period.Interface
func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error {
if utils.IsEmptyStr(cronJobPolicyID) {
return errors.New("cron job policy ID is empty")
}
score := utils.ExtractScoreFromUUID(cronJobPolicyID)
if score == 0 {
return fmt.Errorf("The ID '%s' is not valid", cronJobPolicyID)
}
notification := &periodicJobPolicyEvent{
Event: periodicJobPolicyChangeEventUnSchedule,
PeriodicJobPolicy: &periodicJobPolicy{
PolicyID: cronJobPolicyID, //Only ID required
},
}
rawJSON, err := notification.serialize()
if err != nil {
return err
}
//REM from redis db
conn := rps.redisPool.Get()
conn.Send("MULTI")
conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score
conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON)
_, err = conn.Do("EXEC")
return err
}
//Load data from zset
func (rps *RedisPeriodicScheduler) Load() error {
conn := rps.redisPool.Get()
bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES"))
if err != nil {
return err
}
allPeriodicPolicies := make([]*periodicJobPolicy, 0)
for i, l := 0, len(bytes); i < l; i = i + 2 {
rawPolicy := bytes[i].([]byte)
rawScore := bytes[i+1].([]byte)
policy := &periodicJobPolicy{}
if err := policy.deSerialize(rawPolicy); err != nil {
//Ignore error which means the policy data is not valid
//Only logged
log.Warningf("failed to deserialize periodic policy with error:%s; raw data: %s\n", err, rawPolicy)
continue
}
score, err := strconv.ParseInt(string(rawScore), 10, 64)
if err != nil {
//Ignore error which means the policy data is not valid
//Only logged
log.Warningf("failed to parse the score of the periodic policy with error:%s\n", err)
continue
}
//Set back the policy ID
policy.PolicyID = utils.MakePeriodicPolicyUUIDWithScore(score)
allPeriodicPolicies = append(allPeriodicPolicies, policy)
}
if len(allPeriodicPolicies) > 0 {
rps.pstore.addAll(allPeriodicPolicies)
}
log.Infof("Load %d periodic job policies", len(allPeriodicPolicies))
return nil
}
//Clear is implementation of the same method in period.Interface
func (rps *RedisPeriodicScheduler) Clear() error {
conn := rps.redisPool.Get()
_, err := conn.Do("ZREMRANGEBYRANK", utils.KeyPeriodicPolicy(rps.namespace), 0, -1)
return err
}
func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) {
if utils.IsEmptyStr(rawPolicy) {
return 0, false
}
conn := rps.redisPool.Get()
count, err := redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicy(rps.namespace), rawPolicy))
return count, err == nil
}
func readMessage(data []byte) *periodicJobPolicyEvent {
if data == nil || len(data) == 0 {
return nil
}
notification := &periodicJobPolicyEvent{}
err := json.Unmarshal(data, notification)
if err != nil {
return nil
}
return notification
}

View File

@ -2,8 +2,74 @@
package pool
//Interface for worker pool
import "github.com/vmware/harbor/src/jobservice_v2/models"
//Interface for worker pool.
//More like a driver to transparent the lower queue.
type Interface interface {
//Start to server
Start() error
//Start to serve
Start()
//Register job to the pool.
//
//name string : job name for referring
//job interface{}: job handler which must implement the job.Interface.
//
//Return:
// error if failed to register
RegisterJob(name string, job interface{}) error
//Register multiple jobs.
//
//jobs map[string]interface{}: job map, key is job name and value is job handler.
RegisterJobs(jobs map[string]interface{}) error
//Enqueue job
//
//jobName string : the name of enqueuing job
//params models.Parameters : parameters of enqueuing job
//isUnique bool : specify if duplicated job will be discarded
//
//Returns:
// models.JobStats: the stats of enqueuing job if succeed
// error : if failed to enqueue
Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error)
//Schedule job to run after the specified interval (seconds).
//
//jobName string : the name of enqueuing job
//runAfterSeconds uint64 : the waiting interval with seconds
//params models.Parameters : parameters of enqueuing job
//isUnique bool : specify if duplicated job will be discarded
//
//Returns:
// models.JobStats: the stats of enqueuing job if succeed
// error : if failed to enqueue
Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error)
//Schedule the job periodically running.
//
//jobName string : the name of enqueuing job
//params models.Parameters : parameters of enqueuing job
//cronSetting string : the periodic duration with cron style like '0 * * * * *'
//
//Returns:
// models.JobStats: the stats of enqueuing job if succeed
// error : if failed to enqueue
PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error)
//Return the status info of the pool.
//
//models.JobPoolStats : the stats info of the pool
//error : failed to check
Stats() (models.JobPoolStats, error)
//Check if the job has been already registered.
//
//name string : name of job
//
//Returns:
// bool : true if it is otherwise return false
// bool : if the known job requires parameters
IsKnownJob(name string) (bool, bool)
}

View File

@ -5,18 +5,38 @@ package pool
import (
"errors"
"fmt"
"time"
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice_v2/core"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
var (
dialConnectionTimeout = 30 * time.Second
healthCheckPeriod = time.Minute
dialReadTimeout = healthCheckPeriod + 10*time.Second
dialWriteTimeout = 10 * time.Second
)
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
type GoCraftWorkPool struct {
redisPool *redis.Pool
pool *work.WorkerPool
context core.BaseContext
enqueuer *work.Enqueuer
client *work.Client
context *env.Context
scheduler period.Interface
//no need to sync as write once and then only read
//key is name of known job
//value is the flag indicating if the job requires parameters
knownJobs map[string]bool
}
//RedisPoolConfig defines configurations for GoCraftWorkPool.
@ -27,47 +47,232 @@ type RedisPoolConfig struct {
WorkerCount uint
}
//RedisPoolContext ...
//We did not use this context to pass context info so far, just a placeholder.
type RedisPoolContext struct{}
//NewGoCraftWorkPool is constructor of goCraftWorkPool.
func NewGoCraftWorkPool(ctx core.BaseContext, cfg RedisPoolConfig) *GoCraftWorkPool {
func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool {
redisPool := &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
MaxActive: 6,
MaxIdle: 6,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort))
return redis.Dial(
"tcp",
fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
redis.DialConnectTimeout(dialConnectionTimeout),
redis.DialReadTimeout(dialReadTimeout),
redis.DialWriteTimeout(dialWriteTimeout),
)
},
}
pool := work.NewWorkerPool(ctx, cfg.WorkerCount, cfg.Namespace, redisPool)
pool := work.NewWorkerPool(RedisPoolContext{}, cfg.WorkerCount, cfg.Namespace, redisPool)
enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool)
client := work.NewClient(cfg.Namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool)
return &GoCraftWorkPool{
redisPool: redisPool,
pool: pool,
enqueuer: enqueuer,
scheduler: scheduler,
client: client,
context: ctx,
knownJobs: make(map[string]bool),
}
}
//Start to serve
//Unblock action
func (gcwp *GoCraftWorkPool) Start() error {
func (gcwp *GoCraftWorkPool) Start() {
if gcwp.redisPool == nil ||
gcwp.pool == nil ||
gcwp.context.SystemContext == nil {
return errors.New("Redis worker pool can not start as it's not correctly configured")
//report and exit
gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured")
return
}
done := make(chan interface{}, 1)
gcwp.context.WG.Add(1)
go func() {
defer func() {
if gcwp.context.WG != nil {
gcwp.context.WG.Done()
}
gcwp.context.WG.Done()
}()
//blocking call
if err := gcwp.scheduler.Start(); err != nil {
//Scheduler exits with error
gcwp.context.ErrorChan <- err
done <- struct{}{}
return
}
}()
gcwp.context.WG.Add(1)
go func() {
defer func() {
gcwp.context.WG.Done()
}()
//Append middlewares
gcwp.pool.Middleware((*RedisPoolContext).logJob)
gcwp.pool.Start()
log.Infof("Redis worker pool is started")
//Block on listening context signal
<-gcwp.context.SystemContext.Done()
//Block on listening context and done signal
select {
case <-gcwp.context.SystemContext.Done():
case <-done:
}
gcwp.pool.Stop()
log.Infof("Redis worker pool is stopped")
}()
}
//RegisterJob is used to register the job to the pool.
//j is the type of job
func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
if utils.IsEmptyStr(name) || j == nil {
return errors.New("job can not be registered with empty name or nil interface")
}
//j must be job.Interface
if _, ok := j.(job.Interface); !ok {
return errors.New("job must implement the job.Interface")
}
//Use redis job wrapper pointer to keep the data required by the job.Interface.
redisJob := job.NewRedisJob(j, gcwp.context)
//Get more info from j
theJ := redisJob.Wrap()
gcwp.pool.JobWithOptions(name,
work.JobOptions{MaxFails: theJ.MaxFails()},
func(job *work.Job) error {
return redisJob.Run(job)
}, //Use generic handler to handle as we do not accept context with this way.
)
gcwp.knownJobs[name] = theJ.ParamsRequired() //keep the name of registered jobs as known jobs for future validation
return nil
}
//RegisterJobs is used to register multiple jobs to pool.
func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error {
if jobs == nil || len(jobs) == 0 {
return nil
}
for name, j := range jobs {
if err := gcwp.RegisterJob(name, j); err != nil {
return err
}
}
return nil
}
//Enqueue job
func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error) {
var (
j *work.Job
err error
)
//Enqueue job
if isUnique {
j, err = gcwp.enqueuer.EnqueueUnique(jobName, params)
} else {
j, err = gcwp.enqueuer.Enqueue(jobName, params)
}
if err != nil {
return models.JobStats{}, err
}
return generateResult(j), nil
}
//Schedule job
func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error) {
var (
j *work.ScheduledJob
err error
)
//Enqueue job in
if isUnique {
j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params)
} else {
j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params)
}
if err != nil {
return models.JobStats{}, err
}
res := generateResult(j.Job)
res.Stats.RunAt = time.Unix(j.RunAt, 0)
return res, nil
}
//PeriodicallyEnqueue job
func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) {
id, err := gcwp.scheduler.Schedule(jobName, params, cronSetting)
if err != nil {
return models.JobStats{}, err
}
//TODO: Need more data
return models.JobStats{
Stats: &models.JobStatData{
JobID: id,
JobName: jobName,
Status: job.JobStatusPending,
EnqueueTime: time.Unix(time.Now().Unix(), 0),
UpdateTime: time.Unix(time.Now().Unix(), 0),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", id),
},
}, nil
}
//Stats of pool
func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
return models.JobPoolStats{}, nil
}
//IsKnownJob ...
func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (bool, bool) {
v, ok := gcwp.knownJobs[name]
return ok, v
}
//log the job
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
//TODO: Also update the job status to 'pending'
log.Infof("Job incoming: %s:%s", job.ID, job.Name)
return next()
}
//generate the job stats data
func generateResult(j *work.Job) models.JobStats {
if j == nil {
return models.JobStats{}
}
return models.JobStats{
Stats: &models.JobStatData{
JobID: j.ID,
JobName: j.Name,
Status: job.JobStatusPending,
EnqueueTime: time.Unix(j.EnqueuedAt, 0),
UpdateTime: time.Unix(time.Now().Unix(), 0),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID),
},
}
}

View File

@ -13,6 +13,8 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/api"
"github.com/vmware/harbor/src/jobservice_v2/config"
"github.com/vmware/harbor/src/jobservice_v2/core"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/pool"
)
@ -20,10 +22,7 @@ import (
var JobService = &Bootstrap{}
//Bootstrap is coordinating process to help load and start the other components to serve.
type Bootstrap struct {
apiServer *api.Server
workerPool pool.Interface
}
type Bootstrap struct{}
//LoadAndRun will load configurations, initialize components and then start the related process to serve requests.
//Return error if meet any problems.
@ -38,47 +37,49 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
//Create the root context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rootContext := core.BaseContext{
rootContext := &env.Context{
SystemContext: ctx,
WG: &sync.WaitGroup{},
ErrorChan: make(chan error, 1), //with 1 buffer
}
//Start the pool
var backendPool pool.Interface
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
if err := bs.loadAndRunRedisWorkerPool(rootContext, cfg); err != nil {
log.Errorf("Failed to start the redis worker pool with error: %s\n", err)
return
}
rootContext.WG.Add(1)
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, cfg)
}
//Initialize controller
ctl := core.NewController()
ctl := core.NewController(backendPool)
//Start the API server
bs.loadAndRunAPIServer(rootContext, cfg, ctl)
rootContext.WG.Add(1)
apiServer := bs.loadAndRunAPIServer(rootContext, cfg, ctl)
log.Infof("Server is starting at %s:%d with %s", "", cfg.Port, cfg.Protocol)
//Block here
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
<-sig
select {
case <-sig:
case err := <-rootContext.ErrorChan:
log.Errorf("Server error:%s\n", err)
}
//Call cancel to send termination signal to other interested parts.
cancel()
//Gracefully shutdown
bs.apiServer.Stop()
apiServer.Stop()
rootContext.WG.Wait()
log.Infof("Server gracefully exit")
}
//Load and run the API server.
func (bs *Bootstrap) loadAndRunAPIServer(ctx core.BaseContext, cfg config.Configuration, ctl *core.Controller) {
func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configuration, ctl *core.Controller) *api.Server {
//Initialized API server
handler := api.NewDefaultHandler(ctx, ctl)
handler := api.NewDefaultHandler(ctl)
router := api.NewBaseRouter(handler)
serverConfig := api.ServerConfig{
Protocol: cfg.Protocol,
@ -88,15 +89,16 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx core.BaseContext, cfg config.Config
serverConfig.Cert = cfg.HTTPSConfig.Cert
serverConfig.Key = cfg.HTTPSConfig.Key
}
server := api.NewServer(ctx, router, serverConfig)
bs.apiServer = server
server := api.NewServer(ctx, router, serverConfig)
//Start processes
server.Start()
return server
}
//Load and run the worker pool
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx core.BaseContext, cfg config.Configuration) error {
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg config.Configuration) pool.Interface {
redisPoolCfg := pool.RedisPoolConfig{
RedisHost: cfg.PoolConfig.RedisPoolCfg.Host,
RedisPort: cfg.PoolConfig.RedisPoolCfg.Port,
@ -105,6 +107,14 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx core.BaseContext, cfg config.
}
redisWorkerPool := pool.NewGoCraftWorkPool(ctx, redisPoolCfg)
bs.workerPool = redisWorkerPool
return redisWorkerPool.Start()
//Register jobs here
if err := redisWorkerPool.RegisterJob(job.KnownJobReplication, (*job.ReplicationJob)(nil)); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
}
redisWorkerPool.Start()
return redisWorkerPool
}

View File

@ -0,0 +1,78 @@
package utils
import (
"encoding/base64"
"fmt"
"math/rand"
"strconv"
"strings"
"time"
)
func generateScore() int64 {
ticks := time.Now().Unix()
rand := rand.New(rand.NewSource(ticks))
return ticks + rand.Int63n(1000) //Double confirm to avoid potential duplications
}
//MakePeriodicPolicyUUID returns an UUID for the periodic policy.
func MakePeriodicPolicyUUID() (string, int64) {
score := generateScore()
return MakePeriodicPolicyUUIDWithScore(score), score
}
//MakePeriodicPolicyUUIDWithScore returns the UUID based on the specified score for the periodic policy.
func MakePeriodicPolicyUUIDWithScore(score int64) string {
rawUUID := fmt.Sprintf("%s:%s:%d", "periodic", "policy", score)
return base64.StdEncoding.EncodeToString([]byte(rawUUID))
}
//ExtractScoreFromUUID extracts the score from the UUID.
func ExtractScoreFromUUID(UUID string) int64 {
if IsEmptyStr(UUID) {
return 0
}
rawData, err := base64.StdEncoding.DecodeString(UUID)
if err != nil {
return 0
}
data := string(rawData)
fragments := strings.Split(data, ":")
if len(fragments) != 3 {
return 0
}
score, err := strconv.ParseInt(fragments[2], 10, 64)
if err != nil {
return 0
}
return score
}
//KeyNamespacePrefix returns the based key based on the namespace.
func KeyNamespacePrefix(namespace string) string {
ns := strings.TrimSpace(namespace)
if !strings.HasSuffix(ns, ":") {
return fmt.Sprintf("%s:", ns)
}
return ns
}
//KeyPeriod returns the key of period
func KeyPeriod(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "period")
}
//KeyPeriodicPolicy return the key of periodic policies.
func KeyPeriodicPolicy(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies")
}
//KeyPeriodicNotification returns the key of periodic pub/sub channel.
func KeyPeriodicNotification(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")
}