From 7b8971e93042fa334051352c236146bd7ff0a951 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Tue, 13 Mar 2018 23:58:07 +0800 Subject: [PATCH] Implement periodically enqueue job feature add scheduler based on redis add job wrapper for redis driver other necessary modules --- src/jobservice_v2/api/handler.go | 6 +- src/jobservice_v2/api/server.go | 10 +- src/jobservice_v2/config.yml | 2 +- src/jobservice_v2/core/context.go | 16 -- src/jobservice_v2/core/controller.go | 107 +++++++- src/jobservice_v2/core/interface.go | 8 +- src/jobservice_v2/env/context.go | 20 ++ src/jobservice_v2/errs/errors.go | 5 + src/jobservice_v2/job/context.go | 9 +- src/jobservice_v2/job/interface.go | 19 +- src/jobservice_v2/job/job_kinds.go | 12 + src/jobservice_v2/job/job_status.go | 16 ++ src/jobservice_v2/job/known_jobs.go | 10 + src/jobservice_v2/job/redis_job_wrapper.go | 57 ++++ src/jobservice_v2/job/replication_job.go | 45 +++ src/jobservice_v2/models/models.go | 43 ++- src/jobservice_v2/period/enqueuer.go | 193 +++++++++++++ src/jobservice_v2/period/interface.go | 42 +++ src/jobservice_v2/period/job_policy.go | 122 +++++++++ src/jobservice_v2/period/redis_scheduler.go | 288 ++++++++++++++++++++ src/jobservice_v2/pool/interface.go | 72 ++++- src/jobservice_v2/pool/redis_pool.go | 233 +++++++++++++++- src/jobservice_v2/runtime/bootstrap.go | 54 ++-- src/jobservice_v2/utils/keys.go | 78 ++++++ 24 files changed, 1378 insertions(+), 89 deletions(-) delete mode 100644 src/jobservice_v2/core/context.go create mode 100644 src/jobservice_v2/env/context.go create mode 100644 src/jobservice_v2/job/job_kinds.go create mode 100644 src/jobservice_v2/job/job_status.go create mode 100644 src/jobservice_v2/job/known_jobs.go create mode 100644 src/jobservice_v2/job/redis_job_wrapper.go create mode 100644 src/jobservice_v2/job/replication_job.go create mode 100644 src/jobservice_v2/period/enqueuer.go create mode 100644 src/jobservice_v2/period/interface.go create mode 100644 src/jobservice_v2/period/job_policy.go create mode 100644 src/jobservice_v2/period/redis_scheduler.go create mode 100644 src/jobservice_v2/utils/keys.go diff --git a/src/jobservice_v2/api/handler.go b/src/jobservice_v2/api/handler.go index c5f302501..c0fcb4842 100644 --- a/src/jobservice_v2/api/handler.go +++ b/src/jobservice_v2/api/handler.go @@ -30,14 +30,12 @@ type Handler interface { //DefaultHandler is the default request handler which implements the Handler interface. type DefaultHandler struct { - context core.BaseContext controller *core.Controller } //NewDefaultHandler is constructor of DefaultHandler. -func NewDefaultHandler(ctx core.BaseContext, ctl *core.Controller) *DefaultHandler { +func NewDefaultHandler(ctl *core.Controller) *DefaultHandler { return &DefaultHandler{ - context: ctx, controller: ctl, } } @@ -62,7 +60,7 @@ func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Re } //Pass request to the controller for the follow-up. - jobStats, err := dh.controller.LaunchJob(dh.context, jobReq) + jobStats, err := dh.controller.LaunchJob(jobReq) if err != nil { dh.handleError(w, http.StatusInternalServerError, errs.LaunchJobError(err)) return diff --git a/src/jobservice_v2/api/server.go b/src/jobservice_v2/api/server.go index 058687742..a3ee4c467 100644 --- a/src/jobservice_v2/api/server.go +++ b/src/jobservice_v2/api/server.go @@ -11,7 +11,7 @@ import ( "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice_v2/config" - "github.com/vmware/harbor/src/jobservice_v2/core" + "github.com/vmware/harbor/src/jobservice_v2/env" ) //Server serves the http requests. @@ -26,7 +26,7 @@ type Server struct { config ServerConfig //The context - context core.BaseContext + context *env.Context } //ServerConfig contains the configurations of Server. @@ -45,7 +45,7 @@ type ServerConfig struct { } //NewServer is constructor of Server. -func NewServer(ctx core.BaseContext, router Router, cfg ServerConfig) *Server { +func NewServer(ctx *env.Context, router Router, cfg ServerConfig) *Server { srv := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Port), Handler: http.HandlerFunc(router.ServeHTTP), @@ -82,6 +82,8 @@ func NewServer(ctx core.BaseContext, router Router, cfg ServerConfig) *Server { //Start the server to serve requests. func (s *Server) Start() { + s.context.WG.Add(1) + go func() { var err error @@ -97,7 +99,7 @@ func (s *Server) Start() { } if err != nil { - log.Errorf("API server error: %s\n", err) + s.context.ErrorChan <- err } }() } diff --git a/src/jobservice_v2/config.yml b/src/jobservice_v2/config.yml index a11d95d3f..d53955239 100644 --- a/src/jobservice_v2/config.yml +++ b/src/jobservice_v2/config.yml @@ -8,7 +8,7 @@ https_config: key: "server.key" #Server listening port -port: 9443 +port: 8443 #Worker pool worker_pool: diff --git a/src/jobservice_v2/core/context.go b/src/jobservice_v2/core/context.go deleted file mode 100644 index 4b9353689..000000000 --- a/src/jobservice_v2/core/context.go +++ /dev/null @@ -1,16 +0,0 @@ -package core - -import ( - "context" - "sync" -) - -//BaseContext keep some sharable materials. -//The system context.Context interface is also included. -type BaseContext struct { - //The system context with cancel capability. - SystemContext context.Context - - //Coordination signal - WG *sync.WaitGroup -} diff --git a/src/jobservice_v2/core/controller.go b/src/jobservice_v2/core/controller.go index 0749b21f6..ad34fe5f0 100644 --- a/src/jobservice_v2/core/controller.go +++ b/src/jobservice_v2/core/controller.go @@ -1,23 +1,68 @@ package core import ( + "errors" + "fmt" + + "github.com/robfig/cron" + "github.com/vmware/harbor/src/jobservice_v2/job" "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/pool" + "github.com/vmware/harbor/src/jobservice_v2/utils" ) //Controller implement the core interface and provides related job handle methods. //Controller will coordinate the lower components to complete the process as a commander role. -type Controller struct{} +type Controller struct { + //Refer the backend pool + backendPool pool.Interface +} //NewController is constructor of Controller. -func NewController() *Controller { - return &Controller{} +func NewController(backendPool pool.Interface) *Controller { + return &Controller{ + backendPool: backendPool, + } } //LaunchJob is implementation of same method in core interface. -func (c *Controller) LaunchJob(ctx BaseContext, req models.JobRequest) (models.JobStats, error) { - return models.JobStats{ - JobID: "111112222xxx", - }, nil +func (c *Controller) LaunchJob(req models.JobRequest) (models.JobStats, error) { + if err := validJobReq(req); err != nil { + return models.JobStats{}, err + } + + paramsRequired, isKnownJob := c.backendPool.IsKnownJob(req.Job.Name) + if !isKnownJob { + return models.JobStats{}, fmt.Errorf("job with name '%s' is unknown", req.Job.Name) + } + if paramsRequired { + if req.Job.Parameters == nil || len(req.Job.Parameters) == 0 { + return models.JobStats{}, fmt.Errorf("'parameters' is required by job '%s'", req.Job.Name) + } + } + + //Enqueue job regarding of the kind + var ( + res models.JobStats + err error + ) + switch req.Job.Metadata.JobKind { + case job.JobKindScheduled: + res, err = c.backendPool.Schedule( + req.Job.Name, + req.Job.Parameters, + req.Job.Metadata.ScheduleDelay, + req.Job.Metadata.IsUnique) + case job.JobKindPeriodic: + res, err = c.backendPool.PeriodicallyEnqueue( + req.Job.Name, + req.Job.Parameters, + req.Job.Metadata.Cron) + default: + res, err = c.backendPool.Enqueue(req.Job.Name, req.Job.Parameters, req.Job.Metadata.IsUnique) + } + + return res, err } //GetJob is implementation of same method in core interface. @@ -31,11 +76,53 @@ func (c *Controller) StopJob(jobID string) error { } //RetryJob is implementation of same method in core interface. -func (c *Controller) RetryJob(ctx BaseContext, jonID string) error { +func (c *Controller) RetryJob(jonID string) error { return nil } //CheckStatus is implementation of same method in core interface. -func (c *Controller) CheckStatus() (models.JobServiceStats, error) { - return models.JobServiceStats{}, nil +func (c *Controller) CheckStatus() (models.JobPoolStats, error) { + return models.JobPoolStats{}, nil +} + +func validJobReq(req models.JobRequest) error { + if req.Job == nil { + return errors.New("empty job request is not allowed") + } + + if utils.IsEmptyStr(req.Job.Name) { + return errors.New("name of job must be specified") + } + + if req.Job.Metadata == nil { + return errors.New("metadata of job is missing") + } + + if req.Job.Metadata.JobKind != job.JobKindGeneric && + req.Job.Metadata.JobKind != job.JobKindPeriodic && + req.Job.Metadata.JobKind != job.JobKindScheduled { + return fmt.Errorf( + "job kind '%s' is not supported, only support '%s','%s','%s'", + req.Job.Metadata.JobKind, + job.JobKindGeneric, + job.JobKindScheduled, + job.JobKindPeriodic) + } + + if req.Job.Metadata.JobKind == job.JobKindScheduled && + req.Job.Metadata.ScheduleDelay == 0 { + return fmt.Errorf("'schedule_delay' must be specified if the job kind is '%s'", job.JobKindScheduled) + } + + if req.Job.Metadata.JobKind == job.JobKindPeriodic { + if utils.IsEmptyStr(req.Job.Metadata.Cron) { + return fmt.Errorf("'cron_spec' must be specified if the job kind is '%s'", job.JobKindPeriodic) + } + + if _, err := cron.Parse(req.Job.Metadata.Cron); err != nil { + return fmt.Errorf("'cron_spec' is not correctly set: %s", err) + } + } + + return nil } diff --git a/src/jobservice_v2/core/interface.go b/src/jobservice_v2/core/interface.go index dbe476b93..71e93d334 100644 --- a/src/jobservice_v2/core/interface.go +++ b/src/jobservice_v2/core/interface.go @@ -11,13 +11,12 @@ import ( type Interface interface { //LaunchJob is used to handle the job submission request. // - //ctx BaseContext: The context info for job execution //req JobRequest : Job request contains related required information of queuing job. // //Returns: // JobStats: Job status info with ID and self link returned if job is successfully launched. // error : Error returned if failed to launch the specified job. - LaunchJob(ctx BaseContext, req models.JobRequest) (models.JobStats, error) + LaunchJob(req models.JobRequest) (models.JobStats, error) //GetJob is used to handle the job stats query request. // @@ -38,13 +37,12 @@ type Interface interface { //RetryJob is used to handle the job retrying request. // - //ctx BaseContext: The context info for job execution //jobID string : ID of job. // //Return: // error : Error returned if failed to retry the specified job. - RetryJob(ctx BaseContext, jonID string) error + RetryJob(jonID string) error //CheckStatus is used to handle the job service healthy status checking request. - CheckStatus() (models.JobServiceStats, error) + CheckStatus() (models.JobPoolStats, error) } diff --git a/src/jobservice_v2/env/context.go b/src/jobservice_v2/env/context.go new file mode 100644 index 000000000..88ade8510 --- /dev/null +++ b/src/jobservice_v2/env/context.go @@ -0,0 +1,20 @@ +package env + +import ( + "context" + "sync" +) + +//Context keep some sharable materials and system controling channels. +//The system context.Context interface is also included. +type Context struct { + //The system context with cancel capability. + SystemContext context.Context + + //Coordination signal + WG *sync.WaitGroup + + //Report errors to bootstrap component + //Once error is reported by lower components, the whole system should exit + ErrorChan chan error +} diff --git a/src/jobservice_v2/errs/errors.go b/src/jobservice_v2/errs/errors.go index fb1a4af76..faa48892b 100644 --- a/src/jobservice_v2/errs/errors.go +++ b/src/jobservice_v2/errs/errors.go @@ -69,3 +69,8 @@ func LaunchJobError(err error) error { type jobStoppedError struct { baseError } + +//jobCancelledError is designed for the case of cancelling job. +type jobCancelledError struct { + baseError +} diff --git a/src/jobservice_v2/job/context.go b/src/jobservice_v2/job/context.go index a26474b15..35b9d4c49 100644 --- a/src/jobservice_v2/job/context.go +++ b/src/jobservice_v2/job/context.go @@ -1,16 +1,19 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + package job import ( + "context" + hlog "github.com/vmware/harbor/src/common/utils/log" - "github.com/vmware/harbor/src/jobservice_v2/core" ) //Context is combination of BaseContext and other job specified resources. //Context will be the real execution context for one job. //Use pointer to point to the singleton BaseContext copy. type Context struct { - //Base context - *core.BaseContext + //System context + SystemContext context.Context //Logger for job Logger *hlog.Logger diff --git a/src/jobservice_v2/job/interface.go b/src/jobservice_v2/job/interface.go index b4620efa7..dd9190045 100644 --- a/src/jobservice_v2/job/interface.go +++ b/src/jobservice_v2/job/interface.go @@ -14,16 +14,29 @@ type Interface interface { //ctx Context: Job execution context. SetContext(ctx Context) - //Pass arguments via this method if have. + //Pass parameters via this method if have. // - //args map[string]interface{}: arguments with key-pair style for the job execution. - SetArgs(args map[string]interface{}) + //params map[string]interface{}: parameters with key-pair style for the job execution. + SetParams(params map[string]interface{}) //Inject the func into the job for OP command check. // //f CheckOPCmdFunc: check function reference. SetCheckOPCmdFunc(f CheckOPCmdFunc) + //Declare how many times the job can be retried if failed. + // + //Return: + // uint: the failure count allowed + MaxFails() uint + + //Indicate whether the job needs parameters or not + // + //Return: + // true if required (parameter will be pre-validated and 'SetParams' will be called) + // false if no parameters needed (no check and 'SetParams' will not be called) + ParamsRequired() bool + //Run the business logic here. Run() error } diff --git a/src/jobservice_v2/job/job_kinds.go b/src/jobservice_v2/job/job_kinds.go new file mode 100644 index 000000000..742f06375 --- /dev/null +++ b/src/jobservice_v2/job/job_kinds.go @@ -0,0 +1,12 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package job + +const ( + //JobKindGeneric : Kind of generic job + JobKindGeneric = "Generic" + //JobKindScheduled : Kind of scheduled job + JobKindScheduled = "Scheduled" + //JobKindPeriodic : Kind of periodic job + JobKindPeriodic = "Periodic" +) diff --git a/src/jobservice_v2/job/job_status.go b/src/jobservice_v2/job/job_status.go new file mode 100644 index 000000000..0db8d4126 --- /dev/null +++ b/src/jobservice_v2/job/job_status.go @@ -0,0 +1,16 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package job + +const ( + //JobStatusPending : job status pending + JobStatusPending = "Pending" + //JobStatusRunning : job status running + JobStatusRunning = "Running" + //JobStatusStopped : job status stopped + JobStatusStopped = "Stopped" + //JobStatusCancelled : job status cancelled + JobStatusCancelled = "Cancelled" + //JobStatusError : job status error + JobStatusError = "Error" +) diff --git a/src/jobservice_v2/job/known_jobs.go b/src/jobservice_v2/job/known_jobs.go new file mode 100644 index 000000000..31a2768a2 --- /dev/null +++ b/src/jobservice_v2/job/known_jobs.go @@ -0,0 +1,10 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package job + +//Define the register name constants of known jobs + +const ( + //KnownJobReplication is name of replication job + KnownJobReplication = "REPLICATION" +) diff --git a/src/jobservice_v2/job/redis_job_wrapper.go b/src/jobservice_v2/job/redis_job_wrapper.go new file mode 100644 index 000000000..38df055c4 --- /dev/null +++ b/src/jobservice_v2/job/redis_job_wrapper.go @@ -0,0 +1,57 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package job + +import ( + "reflect" + + "github.com/gocraft/work" + "github.com/vmware/harbor/src/jobservice_v2/env" +) + +//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. +type RedisJob struct { + job interface{} + context *env.Context +} + +//NewRedisJob is constructor of RedisJob +func NewRedisJob(j interface{}, ctx *env.Context) *RedisJob { + return &RedisJob{j, ctx} +} + +//Run the job +func (rj *RedisJob) Run(j *work.Job) error { + //Inject data + jobContext := Context{ + SystemContext: rj.context.SystemContext, + } + + runningJob := rj.Wrap() + runningJob.SetContext(jobContext) + if runningJob.ParamsRequired() { + runningJob.SetParams(j.Args) + } + + //TODO: Update job status to 'Running' + err := runningJob.Run() + + //TODO: + //If error is stopped error, update status to 'Stopped' and return nil + //If error is cancelled error, update status to 'Cancelled' and return err + + return err +} + +//Wrap returns a new (job.)Interface based on the wrapped job handler reference. +func (rj *RedisJob) Wrap() Interface { + theType := reflect.TypeOf(rj.job) + + if theType.Kind() == reflect.Ptr { + theType = theType.Elem() + } + + //Crate new + v := reflect.New(theType).Elem() + return v.Addr().Interface().(Interface) +} diff --git a/src/jobservice_v2/job/replication_job.go b/src/jobservice_v2/job/replication_job.go new file mode 100644 index 000000000..c37a98816 --- /dev/null +++ b/src/jobservice_v2/job/replication_job.go @@ -0,0 +1,45 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package job + +import ( + "fmt" +) + +//ReplicationJob is the job for replicating repositories. +type ReplicationJob struct { + ctx Context + params map[string]interface{} + opCmdFunc CheckOPCmdFunc +} + +//SetContext is implementation of same method in Interface. +func (rj *ReplicationJob) SetContext(ctx Context) { + rj.ctx = ctx + fmt.Printf("ReplicationJob context=%#v\n", rj.ctx) +} + +//SetParams is implementation of same method in Interface. +func (rj *ReplicationJob) SetParams(params map[string]interface{}) { + rj.params = params + fmt.Printf("ReplicationJob args: %v\n", rj.params) +} + +//SetCheckOPCmdFunc is implementation of same method in Interface. +func (rj *ReplicationJob) SetCheckOPCmdFunc(f CheckOPCmdFunc) {} + +//MaxFails is implementation of same method in Interface. +func (rj *ReplicationJob) MaxFails() uint { + return 2 +} + +//ParamsRequired is implementation of same method in Interface. +func (rj *ReplicationJob) ParamsRequired() bool { + return true +} + +//Run the replication logic here. +func (rj *ReplicationJob) Run() error { + fmt.Println("=======Replication job running=======") + return nil +} diff --git a/src/jobservice_v2/models/models.go b/src/jobservice_v2/models/models.go index 5a43ca986..aebc81961 100644 --- a/src/jobservice_v2/models/models.go +++ b/src/jobservice_v2/models/models.go @@ -2,13 +2,48 @@ package models +import ( + "time" +) + +//Parameters for job execution. +type Parameters map[string]interface{} + //JobRequest is the request of launching a job. -type JobRequest struct{} +type JobRequest struct { + Job *JobData `json:"job"` +} + +//JobData keeps the basic info. +type JobData struct { + Name string `json:"name"` + Parameters Parameters `json:"parameters"` + Metadata *JobMetadata `json:"metadata"` +} + +//JobMetadata stores the metadata of job. +type JobMetadata struct { + JobKind string `json:"kind"` + ScheduleDelay uint64 `json:"schedule_delay,omitempty"` + Cron string `json:"cron_spec,omitempty"` + IsUnique bool `json:"unique"` +} //JobStats keeps the result of job launching. type JobStats struct { - JobID string `json:"job_id"` + Stats *JobStatData `json:"job"` } -//JobServiceStats represent the healthy and status of the job service. -type JobServiceStats struct{} +//JobStatData keeps the stats of job +type JobStatData struct { + JobID string `json:"id"` + Status string `json:"status"` + JobName string `json:"name"` + RefLink string `json:"ref_link,omitempty"` + EnqueueTime time.Time `json:"enqueue_time"` + UpdateTime time.Time `json:"update_time"` + RunAt time.Time `json:"run_at,omitempty"` +} + +//JobPoolStats represent the healthy and status of the job service. +type JobPoolStats struct{} diff --git a/src/jobservice_v2/period/enqueuer.go b/src/jobservice_v2/period/enqueuer.go new file mode 100644 index 000000000..d73555c8c --- /dev/null +++ b/src/jobservice_v2/period/enqueuer.go @@ -0,0 +1,193 @@ +//Refer github.com/gocraft/work + +package period + +import ( + "encoding/json" + "fmt" + "math/rand" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/gocraft/work" + "github.com/robfig/cron" + "github.com/vmware/harbor/src/common/utils/log" +) + +const ( + periodicEnqueuerSleep = 2 * time.Minute + periodicEnqueuerHorizon = 4 * time.Minute +) + +type periodicEnqueuer struct { + namespace string + pool *redis.Pool + policyStore *periodicJobPolicyStore + scheduledPeriodicJobs []*scheduledPeriodicJob + stopChan chan struct{} + doneStoppingChan chan struct{} +} + +type periodicJob struct { + jobName string + spec string + schedule cron.Schedule +} + +type scheduledPeriodicJob struct { + scheduledAt time.Time + scheduledAtEpoch int64 + *periodicJob +} + +func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore) *periodicEnqueuer { + return &periodicEnqueuer{ + namespace: namespace, + pool: pool, + policyStore: policyStore, + stopChan: make(chan struct{}), + doneStoppingChan: make(chan struct{}), + } +} + +func (pe *periodicEnqueuer) start() { + go pe.loop() + log.Info("Periodic enqueuer is started") +} + +func (pe *periodicEnqueuer) stop() { + pe.stopChan <- struct{}{} + <-pe.doneStoppingChan +} + +func (pe *periodicEnqueuer) loop() { + defer func() { + log.Info("Periodic enqueuer is stopped") + }() + // Begin reaping periodically + timer := time.NewTimer(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + defer timer.Stop() + + if pe.shouldEnqueue() { + err := pe.enqueue() + if err != nil { + log.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err) + } + } + + for { + select { + case <-pe.stopChan: + pe.doneStoppingChan <- struct{}{} + return + case <-timer.C: + timer.Reset(periodicEnqueuerSleep + time.Duration(rand.Intn(30))*time.Second) + if pe.shouldEnqueue() { + err := pe.enqueue() + if err != nil { + log.Errorf("periodic_enqueuer.loop.enqueue:%s\n", err) + } + } + } + } +} + +func (pe *periodicEnqueuer) enqueue() error { + now := nowEpochSeconds() + nowTime := time.Unix(now, 0) + horizon := nowTime.Add(periodicEnqueuerHorizon) + + conn := pe.pool.Get() + defer conn.Close() + + for _, pl := range pe.policyStore.list() { + schedule, err := cron.Parse(pl.CronSpec) + if err != nil { + //The cron spec should be already checked at top components. + //Just in cases, if error occurred, ignore it + continue + } + pj := &periodicJob{ + jobName: pl.JobName, + spec: pl.CronSpec, + schedule: schedule, + } + for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) { + epoch := t.Unix() + id := makeUniquePeriodicID(pj.jobName, pl.PolicyID, epoch) //Use policy ID to track the jobs related with it + + job := &work.Job{ + Name: pj.jobName, + ID: id, + + // This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. + EnqueuedAt: epoch, + Args: pl.JobParameters, //Pass parameters to scheduled job here + } + + rawJSON, err := serializeJob(job) + if err != nil { + return err + } + + _, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON) + if err != nil { + return err + } + + log.Infof("Schedule job %s for policy %s\n", pj.jobName, pl.PolicyID) + } + } + + _, err := conn.Do("SET", redisKeyLastPeriodicEnqueue(pe.namespace), now) + + return err +} + +func (pe *periodicEnqueuer) shouldEnqueue() bool { + conn := pe.pool.Get() + defer conn.Close() + + lastEnqueue, err := redis.Int64(conn.Do("GET", redisKeyLastPeriodicEnqueue(pe.namespace))) + if err == redis.ErrNil { + return true + } else if err != nil { + log.Errorf("periodic_enqueuer.should_enqueue:%s\n", err) + return true + } + + return lastEnqueue < (nowEpochSeconds() - int64(periodicEnqueuerSleep/time.Minute)) +} + +var nowMock int64 + +func nowEpochSeconds() int64 { + if nowMock != 0 { + return nowMock + } + return time.Now().Unix() +} + +func makeUniquePeriodicID(name, spec string, epoch int64) string { + return fmt.Sprintf("periodic:job:%s:%s:%d", name, spec, epoch) +} + +func serializeJob(job *work.Job) ([]byte, error) { + return json.Marshal(job) +} + +func redisNamespacePrefix(namespace string) string { + l := len(namespace) + if (l > 0) && (namespace[l-1] != ':') { + namespace = namespace + ":" + } + return namespace +} + +func redisKeyScheduled(namespace string) string { + return redisNamespacePrefix(namespace) + "scheduled" +} + +func redisKeyLastPeriodicEnqueue(namespace string) string { + return redisNamespacePrefix(namespace) + "last_periodic_enqueue" +} diff --git a/src/jobservice_v2/period/interface.go b/src/jobservice_v2/period/interface.go new file mode 100644 index 000000000..605d41d8a --- /dev/null +++ b/src/jobservice_v2/period/interface.go @@ -0,0 +1,42 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package period + +import "github.com/vmware/harbor/src/jobservice_v2/models" + +//Interface defines operations the periodic scheduler should have. +type Interface interface { + //Schedule the specified cron job policy. + // + //jobName string : The name of periodical job + //params models.Parameters : The parameters required by the periodical job + //cronSpec string : The periodical settings with cron format + // + //Returns: + // The uuid of the cron job policy + // error if failed to schedule + Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) + + //Unschedule the specified cron job policy. + // + //cronJobPolicyID string: The ID of cron job policy. + // + //Return: + // error if failed to unschedule + UnSchedule(cronJobPolicyID string) error + + //Load data + // + //Return: + // error if failed to do + Load() error + + //Clear all the cron job policies. + // + //Return: + // error if failed to do + Clear() error + + //Start to serve + Start() error +} diff --git a/src/jobservice_v2/period/job_policy.go b/src/jobservice_v2/period/job_policy.go new file mode 100644 index 000000000..6cc63fd6a --- /dev/null +++ b/src/jobservice_v2/period/job_policy.go @@ -0,0 +1,122 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package period + +import ( + "encoding/json" + "sync" + + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +const ( + //periodicJobPolicyChangeEventSchedule : Schedule periodic job policy event + periodicJobPolicyChangeEventSchedule = "Schedule" + //periodicJobPolicyChangeEventUnSchedule : UnSchedule periodic job policy event + periodicJobPolicyChangeEventUnSchedule = "UnSchedule" +) + +//periodicJobPolicy ... +type periodicJobPolicy struct { + //NOTES: The 'PolicyID' should not be set when serialize this policy struct to the zset + //because each 'Policy ID' is different and it may cause issue of losing zset unique capability. + PolicyID string `json:"policy_id,omitempty"` + JobName string `json:"job_name"` + JobParameters map[string]interface{} `json:"job_params"` + CronSpec string `json:"cron_spec"` +} + +//periodicJobPolicyEvent is the event content of periodic job policy change. +type periodicJobPolicyEvent struct { + Event string `json:"event"` + PeriodicJobPolicy *periodicJobPolicy `json:"periodic_job_policy"` +} + +//serialize the policy to raw data. +func (pjp *periodicJobPolicy) serialize() ([]byte, error) { + return json.Marshal(pjp) +} + +//deSerialize the raw json to policy. +func (pjp *periodicJobPolicy) deSerialize(rawJSON []byte) error { + return json.Unmarshal(rawJSON, pjp) +} + +//serialize the policy to raw data. +func (pjpe *periodicJobPolicyEvent) serialize() ([]byte, error) { + return json.Marshal(pjpe) +} + +//deSerialize the raw json to policy. +func (pjpe *periodicJobPolicyEvent) deSerialize(rawJSON []byte) error { + return json.Unmarshal(rawJSON, pjpe) +} + +//periodicJobPolicyStore is in-memory cache for the periodic job policies. +type periodicJobPolicyStore struct { + lock *sync.RWMutex + policies map[string]*periodicJobPolicy //k-v pair and key is the policy ID +} + +func (ps *periodicJobPolicyStore) addAll(items []*periodicJobPolicy) { + if items == nil || len(items) == 0 { + return + } + + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, item := range items { + //Ignore the item with empty uuid + if !utils.IsEmptyStr(item.PolicyID) { + ps.policies[item.PolicyID] = item + } + } +} + +func (ps *periodicJobPolicyStore) list() []*periodicJobPolicy { + allItems := make([]*periodicJobPolicy, 0) + + ps.lock.RLock() + defer ps.lock.RUnlock() + + for _, v := range ps.policies { + allItems = append(allItems, v) + } + + return allItems +} + +func (ps *periodicJobPolicyStore) add(jobPolicy *periodicJobPolicy) { + if jobPolicy == nil || utils.IsEmptyStr(jobPolicy.PolicyID) { + return + } + + ps.lock.Lock() + defer ps.lock.Unlock() + + ps.policies[jobPolicy.PolicyID] = jobPolicy +} + +func (ps *periodicJobPolicyStore) remove(policyID string) *periodicJobPolicy { + if utils.IsEmptyStr(policyID) { + return nil + } + + ps.lock.Lock() + defer ps.lock.Unlock() + + if item, ok := ps.policies[policyID]; ok { + delete(ps.policies, policyID) + return item + } + + return nil +} + +func (ps *periodicJobPolicyStore) size() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.policies) +} diff --git a/src/jobservice_v2/period/redis_scheduler.go b/src/jobservice_v2/period/redis_scheduler.go new file mode 100644 index 000000000..04c25e124 --- /dev/null +++ b/src/jobservice_v2/period/redis_scheduler.go @@ -0,0 +1,288 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package period + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +//RedisPeriodicScheduler manages the periodic scheduling policies. +type RedisPeriodicScheduler struct { + context context.Context + redisPool *redis.Pool + namespace string + pstore *periodicJobPolicyStore + enqueuer *periodicEnqueuer +} + +//NewRedisPeriodicScheduler is constructor of RedisPeriodicScheduler +func NewRedisPeriodicScheduler(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisPeriodicScheduler { + pstore := &periodicJobPolicyStore{ + lock: new(sync.RWMutex), + policies: make(map[string]*periodicJobPolicy), + } + enqueuer := newPeriodicEnqueuer(namespace, redisPool, pstore) + + return &RedisPeriodicScheduler{ + context: ctx, + redisPool: redisPool, + namespace: namespace, + pstore: pstore, + enqueuer: enqueuer, + } +} + +//Start to serve +//Enable PUB/SUB +func (rps *RedisPeriodicScheduler) Start() error { + defer func() { + log.Info("Redis scheduler is stopped") + }() + + //Load existing periodic job policies + if err := rps.Load(); err != nil { + return err + } + + //As we get one connection from the pool, don't try to close it. + conn := rps.redisPool.Get() + psc := redis.PubSubConn{ + Conn: conn, + } + + err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(rps.namespace))...) + if err != nil { + return err + } + + done := make(chan error, 1) + go func() { + for { + switch res := psc.Receive().(type) { + case error: + done <- res + return + case redis.Message: + if notification := readMessage(res.Data); notification != nil { + log.Infof("Got periodic job policy change notification: %s:%s\n", notification.Event, notification.PeriodicJobPolicy.PolicyID) + + switch notification.Event { + case periodicJobPolicyChangeEventSchedule: + rps.pstore.add(notification.PeriodicJobPolicy) + case periodicJobPolicyChangeEventUnSchedule: + if notification.PeriodicJobPolicy != nil { + rps.pstore.remove(notification.PeriodicJobPolicy.PolicyID) + } + default: + //do nothing + } + } + case redis.Subscription: + switch res.Kind { + case "subscribe": + log.Infof("Subscribe redis channel %s\n", res.Channel) + case "unsubscribe": + //Unsubscribe all, means main goroutine is exiting + log.Infof("Unsubscribe redis channel %s\n", res.Channel) + done <- nil + return + } + } + } + }() + + //start enqueuer + rps.enqueuer.start() + defer rps.enqueuer.stop() + log.Info("Redis scheduler is started") + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + //blocking here + for err == nil { + select { + case <-ticker.C: + err = psc.Ping("ping!") + case <-rps.context.Done(): + err = errors.New("context exit") + case err = <-done: + return err + } + } + + //Unsubscribe all + psc.Unsubscribe() + return <-done +} + +//Schedule is implementation of the same method in period.Interface +func (rps *RedisPeriodicScheduler) Schedule(jobName string, params models.Parameters, cronSpec string) (string, error) { + if utils.IsEmptyStr(jobName) { + return "", errors.New("empty job name is not allowed") + } + if utils.IsEmptyStr(cronSpec) { + return "", errors.New("cron spec is not set") + } + + //Although the ZSET can guarantee no duplicated items, we still need to check the existing + //of the job policy to avoid publish duplicated ones to other nodes as we + //use transaction commands. + jobPolicy := &periodicJobPolicy{ + JobName: jobName, + JobParameters: params, + CronSpec: cronSpec, + } + //Serialize data + rawJSON, err := jobPolicy.serialize() + if err != nil { + return "", nil + } + + //Check existing + //If existing, treat as a succeed submitting and return the exitsing id + if score, ok := rps.exists(string(rawJSON)); ok { + return utils.MakePeriodicPolicyUUIDWithScore(score), nil + } + + uuid, score := utils.MakePeriodicPolicyUUID() + //Set back policy ID + jobPolicy.PolicyID = uuid + notification := &periodicJobPolicyEvent{ + Event: periodicJobPolicyChangeEventSchedule, + PeriodicJobPolicy: jobPolicy, + } + rawJSON2, err := notification.serialize() + if err != nil { + return "", err + } + + //Save to redis db and publish notification via redis transaction + conn := rps.redisPool.Get() + conn.Send("MULTI") + conn.Send("ZADD", utils.KeyPeriodicPolicy(rps.namespace), score, rawJSON) + conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON2) + if _, err := conn.Do("EXEC"); err != nil { + return "", err + } + + return uuid, nil +} + +//UnSchedule is implementation of the same method in period.Interface +func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { + if utils.IsEmptyStr(cronJobPolicyID) { + return errors.New("cron job policy ID is empty") + } + + score := utils.ExtractScoreFromUUID(cronJobPolicyID) + if score == 0 { + return fmt.Errorf("The ID '%s' is not valid", cronJobPolicyID) + } + + notification := &periodicJobPolicyEvent{ + Event: periodicJobPolicyChangeEventUnSchedule, + PeriodicJobPolicy: &periodicJobPolicy{ + PolicyID: cronJobPolicyID, //Only ID required + }, + } + + rawJSON, err := notification.serialize() + if err != nil { + return err + } + + //REM from redis db + conn := rps.redisPool.Get() + conn.Send("MULTI") + conn.Send("ZREMRANGEBYSCORE", utils.KeyPeriodicPolicy(rps.namespace), score, score) //Accurately remove the item with the specified score + conn.Send("PUBLISH", utils.KeyPeriodicNotification(rps.namespace), rawJSON) + _, err = conn.Do("EXEC") + + return err +} + +//Load data from zset +func (rps *RedisPeriodicScheduler) Load() error { + conn := rps.redisPool.Get() + bytes, err := redis.MultiBulk(conn.Do("ZRANGE", utils.KeyPeriodicPolicy(rps.namespace), 0, -1, "WITHSCORES")) + if err != nil { + return err + } + + allPeriodicPolicies := make([]*periodicJobPolicy, 0) + for i, l := 0, len(bytes); i < l; i = i + 2 { + rawPolicy := bytes[i].([]byte) + rawScore := bytes[i+1].([]byte) + policy := &periodicJobPolicy{} + + if err := policy.deSerialize(rawPolicy); err != nil { + //Ignore error which means the policy data is not valid + //Only logged + log.Warningf("failed to deserialize periodic policy with error:%s; raw data: %s\n", err, rawPolicy) + continue + } + score, err := strconv.ParseInt(string(rawScore), 10, 64) + if err != nil { + //Ignore error which means the policy data is not valid + //Only logged + log.Warningf("failed to parse the score of the periodic policy with error:%s\n", err) + continue + } + + //Set back the policy ID + policy.PolicyID = utils.MakePeriodicPolicyUUIDWithScore(score) + + allPeriodicPolicies = append(allPeriodicPolicies, policy) + } + + if len(allPeriodicPolicies) > 0 { + rps.pstore.addAll(allPeriodicPolicies) + } + + log.Infof("Load %d periodic job policies", len(allPeriodicPolicies)) + return nil +} + +//Clear is implementation of the same method in period.Interface +func (rps *RedisPeriodicScheduler) Clear() error { + conn := rps.redisPool.Get() + _, err := conn.Do("ZREMRANGEBYRANK", utils.KeyPeriodicPolicy(rps.namespace), 0, -1) + + return err +} + +func (rps *RedisPeriodicScheduler) exists(rawPolicy string) (int64, bool) { + if utils.IsEmptyStr(rawPolicy) { + return 0, false + } + + conn := rps.redisPool.Get() + count, err := redis.Int64(conn.Do("ZSCORE", utils.KeyPeriodicPolicy(rps.namespace), rawPolicy)) + return count, err == nil +} + +func readMessage(data []byte) *periodicJobPolicyEvent { + if data == nil || len(data) == 0 { + return nil + } + + notification := &periodicJobPolicyEvent{} + err := json.Unmarshal(data, notification) + if err != nil { + return nil + } + + return notification +} diff --git a/src/jobservice_v2/pool/interface.go b/src/jobservice_v2/pool/interface.go index 88109f226..dd3c4f3e4 100644 --- a/src/jobservice_v2/pool/interface.go +++ b/src/jobservice_v2/pool/interface.go @@ -2,8 +2,74 @@ package pool -//Interface for worker pool +import "github.com/vmware/harbor/src/jobservice_v2/models" + +//Interface for worker pool. +//More like a driver to transparent the lower queue. type Interface interface { - //Start to server - Start() error + //Start to serve + Start() + + //Register job to the pool. + // + //name string : job name for referring + //job interface{}: job handler which must implement the job.Interface. + // + //Return: + // error if failed to register + RegisterJob(name string, job interface{}) error + + //Register multiple jobs. + // + //jobs map[string]interface{}: job map, key is job name and value is job handler. + RegisterJobs(jobs map[string]interface{}) error + + //Enqueue job + // + //jobName string : the name of enqueuing job + //params models.Parameters : parameters of enqueuing job + //isUnique bool : specify if duplicated job will be discarded + // + //Returns: + // models.JobStats: the stats of enqueuing job if succeed + // error : if failed to enqueue + Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error) + + //Schedule job to run after the specified interval (seconds). + // + //jobName string : the name of enqueuing job + //runAfterSeconds uint64 : the waiting interval with seconds + //params models.Parameters : parameters of enqueuing job + //isUnique bool : specify if duplicated job will be discarded + // + //Returns: + // models.JobStats: the stats of enqueuing job if succeed + // error : if failed to enqueue + Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error) + + //Schedule the job periodically running. + // + //jobName string : the name of enqueuing job + //params models.Parameters : parameters of enqueuing job + //cronSetting string : the periodic duration with cron style like '0 * * * * *' + // + //Returns: + // models.JobStats: the stats of enqueuing job if succeed + // error : if failed to enqueue + PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) + + //Return the status info of the pool. + // + //models.JobPoolStats : the stats info of the pool + //error : failed to check + Stats() (models.JobPoolStats, error) + + //Check if the job has been already registered. + // + //name string : name of job + // + //Returns: + // bool : true if it is otherwise return false + // bool : if the known job requires parameters + IsKnownJob(name string) (bool, bool) } diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index d80d2671e..a269b8d50 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -5,18 +5,38 @@ package pool import ( "errors" "fmt" + "time" "github.com/garyburd/redigo/redis" "github.com/gocraft/work" "github.com/vmware/harbor/src/common/utils/log" - "github.com/vmware/harbor/src/jobservice_v2/core" + "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/period" + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +var ( + dialConnectionTimeout = 30 * time.Second + healthCheckPeriod = time.Minute + dialReadTimeout = healthCheckPeriod + 10*time.Second + dialWriteTimeout = 10 * time.Second ) //GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis. type GoCraftWorkPool struct { redisPool *redis.Pool pool *work.WorkerPool - context core.BaseContext + enqueuer *work.Enqueuer + client *work.Client + context *env.Context + scheduler period.Interface + + //no need to sync as write once and then only read + //key is name of known job + //value is the flag indicating if the job requires parameters + knownJobs map[string]bool } //RedisPoolConfig defines configurations for GoCraftWorkPool. @@ -27,47 +47,232 @@ type RedisPoolConfig struct { WorkerCount uint } +//RedisPoolContext ... +//We did not use this context to pass context info so far, just a placeholder. +type RedisPoolContext struct{} + //NewGoCraftWorkPool is constructor of goCraftWorkPool. -func NewGoCraftWorkPool(ctx core.BaseContext, cfg RedisPoolConfig) *GoCraftWorkPool { +func NewGoCraftWorkPool(ctx *env.Context, cfg RedisPoolConfig) *GoCraftWorkPool { redisPool := &redis.Pool{ - MaxActive: 5, - MaxIdle: 5, + MaxActive: 6, + MaxIdle: 6, Wait: true, Dial: func() (redis.Conn, error) { - return redis.Dial("tcp", fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort)) + return redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort), + redis.DialConnectTimeout(dialConnectionTimeout), + redis.DialReadTimeout(dialReadTimeout), + redis.DialWriteTimeout(dialWriteTimeout), + ) }, } - pool := work.NewWorkerPool(ctx, cfg.WorkerCount, cfg.Namespace, redisPool) + pool := work.NewWorkerPool(RedisPoolContext{}, cfg.WorkerCount, cfg.Namespace, redisPool) + enqueuer := work.NewEnqueuer(cfg.Namespace, redisPool) + client := work.NewClient(cfg.Namespace, redisPool) + scheduler := period.NewRedisPeriodicScheduler(ctx.SystemContext, cfg.Namespace, redisPool) return &GoCraftWorkPool{ redisPool: redisPool, pool: pool, + enqueuer: enqueuer, + scheduler: scheduler, + client: client, context: ctx, + knownJobs: make(map[string]bool), } } //Start to serve //Unblock action -func (gcwp *GoCraftWorkPool) Start() error { +func (gcwp *GoCraftWorkPool) Start() { if gcwp.redisPool == nil || gcwp.pool == nil || gcwp.context.SystemContext == nil { - return errors.New("Redis worker pool can not start as it's not correctly configured") + //report and exit + gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured") + return } + done := make(chan interface{}, 1) + + gcwp.context.WG.Add(1) go func() { defer func() { - if gcwp.context.WG != nil { - gcwp.context.WG.Done() - } + gcwp.context.WG.Done() }() + //blocking call + if err := gcwp.scheduler.Start(); err != nil { + //Scheduler exits with error + gcwp.context.ErrorChan <- err + done <- struct{}{} + return + } + }() + + gcwp.context.WG.Add(1) + go func() { + defer func() { + gcwp.context.WG.Done() + }() + + //Append middlewares + gcwp.pool.Middleware((*RedisPoolContext).logJob) + gcwp.pool.Start() log.Infof("Redis worker pool is started") - //Block on listening context signal - <-gcwp.context.SystemContext.Done() + //Block on listening context and done signal + select { + case <-gcwp.context.SystemContext.Done(): + case <-done: + } + gcwp.pool.Stop() log.Infof("Redis worker pool is stopped") }() +} + +//RegisterJob is used to register the job to the pool. +//j is the type of job +func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error { + if utils.IsEmptyStr(name) || j == nil { + return errors.New("job can not be registered with empty name or nil interface") + } + + //j must be job.Interface + if _, ok := j.(job.Interface); !ok { + return errors.New("job must implement the job.Interface") + } + + //Use redis job wrapper pointer to keep the data required by the job.Interface. + redisJob := job.NewRedisJob(j, gcwp.context) + + //Get more info from j + theJ := redisJob.Wrap() + + gcwp.pool.JobWithOptions(name, + work.JobOptions{MaxFails: theJ.MaxFails()}, + func(job *work.Job) error { + return redisJob.Run(job) + }, //Use generic handler to handle as we do not accept context with this way. + ) + gcwp.knownJobs[name] = theJ.ParamsRequired() //keep the name of registered jobs as known jobs for future validation return nil } + +//RegisterJobs is used to register multiple jobs to pool. +func (gcwp *GoCraftWorkPool) RegisterJobs(jobs map[string]interface{}) error { + if jobs == nil || len(jobs) == 0 { + return nil + } + + for name, j := range jobs { + if err := gcwp.RegisterJob(name, j); err != nil { + return err + } + } + + return nil +} + +//Enqueue job +func (gcwp *GoCraftWorkPool) Enqueue(jobName string, params models.Parameters, isUnique bool) (models.JobStats, error) { + var ( + j *work.Job + err error + ) + + //Enqueue job + if isUnique { + j, err = gcwp.enqueuer.EnqueueUnique(jobName, params) + } else { + j, err = gcwp.enqueuer.Enqueue(jobName, params) + } + + if err != nil { + return models.JobStats{}, err + } + + return generateResult(j), nil +} + +//Schedule job +func (gcwp *GoCraftWorkPool) Schedule(jobName string, params models.Parameters, runAfterSeconds uint64, isUnique bool) (models.JobStats, error) { + var ( + j *work.ScheduledJob + err error + ) + + //Enqueue job in + if isUnique { + j, err = gcwp.enqueuer.EnqueueUniqueIn(jobName, int64(runAfterSeconds), params) + } else { + j, err = gcwp.enqueuer.EnqueueIn(jobName, int64(runAfterSeconds), params) + } + + if err != nil { + return models.JobStats{}, err + } + + res := generateResult(j.Job) + res.Stats.RunAt = time.Unix(j.RunAt, 0) + + return res, nil +} + +//PeriodicallyEnqueue job +func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.Parameters, cronSetting string) (models.JobStats, error) { + id, err := gcwp.scheduler.Schedule(jobName, params, cronSetting) + if err != nil { + return models.JobStats{}, err + } + + //TODO: Need more data + return models.JobStats{ + Stats: &models.JobStatData{ + JobID: id, + JobName: jobName, + Status: job.JobStatusPending, + EnqueueTime: time.Unix(time.Now().Unix(), 0), + UpdateTime: time.Unix(time.Now().Unix(), 0), + RefLink: fmt.Sprintf("/api/v1/jobs/%s", id), + }, + }, nil +} + +//Stats of pool +func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) { + return models.JobPoolStats{}, nil +} + +//IsKnownJob ... +func (gcwp *GoCraftWorkPool) IsKnownJob(name string) (bool, bool) { + v, ok := gcwp.knownJobs[name] + return ok, v +} + +//log the job +func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error { + //TODO: Also update the job status to 'pending' + log.Infof("Job incoming: %s:%s", job.ID, job.Name) + return next() +} + +//generate the job stats data +func generateResult(j *work.Job) models.JobStats { + if j == nil { + return models.JobStats{} + } + + return models.JobStats{ + Stats: &models.JobStatData{ + JobID: j.ID, + JobName: j.Name, + Status: job.JobStatusPending, + EnqueueTime: time.Unix(j.EnqueuedAt, 0), + UpdateTime: time.Unix(time.Now().Unix(), 0), + RefLink: fmt.Sprintf("/api/v1/jobs/%s", j.ID), + }, + } +} diff --git a/src/jobservice_v2/runtime/bootstrap.go b/src/jobservice_v2/runtime/bootstrap.go index c8fc2ff7a..2457086a0 100644 --- a/src/jobservice_v2/runtime/bootstrap.go +++ b/src/jobservice_v2/runtime/bootstrap.go @@ -13,6 +13,8 @@ import ( "github.com/vmware/harbor/src/jobservice_v2/api" "github.com/vmware/harbor/src/jobservice_v2/config" "github.com/vmware/harbor/src/jobservice_v2/core" + "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/job" "github.com/vmware/harbor/src/jobservice_v2/pool" ) @@ -20,10 +22,7 @@ import ( var JobService = &Bootstrap{} //Bootstrap is coordinating process to help load and start the other components to serve. -type Bootstrap struct { - apiServer *api.Server - workerPool pool.Interface -} +type Bootstrap struct{} //LoadAndRun will load configurations, initialize components and then start the related process to serve requests. //Return error if meet any problems. @@ -38,47 +37,49 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) { //Create the root context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rootContext := core.BaseContext{ + + rootContext := &env.Context{ SystemContext: ctx, WG: &sync.WaitGroup{}, + ErrorChan: make(chan error, 1), //with 1 buffer } //Start the pool + var backendPool pool.Interface if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { - if err := bs.loadAndRunRedisWorkerPool(rootContext, cfg); err != nil { - log.Errorf("Failed to start the redis worker pool with error: %s\n", err) - return - } - rootContext.WG.Add(1) + backendPool = bs.loadAndRunRedisWorkerPool(rootContext, cfg) } //Initialize controller - ctl := core.NewController() + ctl := core.NewController(backendPool) //Start the API server - bs.loadAndRunAPIServer(rootContext, cfg, ctl) - rootContext.WG.Add(1) + apiServer := bs.loadAndRunAPIServer(rootContext, cfg, ctl) log.Infof("Server is starting at %s:%d with %s", "", cfg.Port, cfg.Protocol) //Block here sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill) - <-sig + select { + case <-sig: + case err := <-rootContext.ErrorChan: + log.Errorf("Server error:%s\n", err) + } //Call cancel to send termination signal to other interested parts. cancel() //Gracefully shutdown - bs.apiServer.Stop() + apiServer.Stop() rootContext.WG.Wait() log.Infof("Server gracefully exit") } //Load and run the API server. -func (bs *Bootstrap) loadAndRunAPIServer(ctx core.BaseContext, cfg config.Configuration, ctl *core.Controller) { +func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configuration, ctl *core.Controller) *api.Server { //Initialized API server - handler := api.NewDefaultHandler(ctx, ctl) + handler := api.NewDefaultHandler(ctl) router := api.NewBaseRouter(handler) serverConfig := api.ServerConfig{ Protocol: cfg.Protocol, @@ -88,15 +89,16 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx core.BaseContext, cfg config.Config serverConfig.Cert = cfg.HTTPSConfig.Cert serverConfig.Key = cfg.HTTPSConfig.Key } - server := api.NewServer(ctx, router, serverConfig) - bs.apiServer = server + server := api.NewServer(ctx, router, serverConfig) //Start processes server.Start() + + return server } //Load and run the worker pool -func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx core.BaseContext, cfg config.Configuration) error { +func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg config.Configuration) pool.Interface { redisPoolCfg := pool.RedisPoolConfig{ RedisHost: cfg.PoolConfig.RedisPoolCfg.Host, RedisPort: cfg.PoolConfig.RedisPoolCfg.Port, @@ -105,6 +107,14 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx core.BaseContext, cfg config. } redisWorkerPool := pool.NewGoCraftWorkPool(ctx, redisPoolCfg) - bs.workerPool = redisWorkerPool - return redisWorkerPool.Start() + //Register jobs here + if err := redisWorkerPool.RegisterJob(job.KnownJobReplication, (*job.ReplicationJob)(nil)); err != nil { + //exit + ctx.ErrorChan <- err + return redisWorkerPool //avoid nil pointer issue + } + + redisWorkerPool.Start() + + return redisWorkerPool } diff --git a/src/jobservice_v2/utils/keys.go b/src/jobservice_v2/utils/keys.go new file mode 100644 index 000000000..bd7bc336c --- /dev/null +++ b/src/jobservice_v2/utils/keys.go @@ -0,0 +1,78 @@ +package utils + +import ( + "encoding/base64" + "fmt" + "math/rand" + "strconv" + "strings" + "time" +) + +func generateScore() int64 { + ticks := time.Now().Unix() + rand := rand.New(rand.NewSource(ticks)) + return ticks + rand.Int63n(1000) //Double confirm to avoid potential duplications +} + +//MakePeriodicPolicyUUID returns an UUID for the periodic policy. +func MakePeriodicPolicyUUID() (string, int64) { + score := generateScore() + return MakePeriodicPolicyUUIDWithScore(score), score +} + +//MakePeriodicPolicyUUIDWithScore returns the UUID based on the specified score for the periodic policy. +func MakePeriodicPolicyUUIDWithScore(score int64) string { + rawUUID := fmt.Sprintf("%s:%s:%d", "periodic", "policy", score) + return base64.StdEncoding.EncodeToString([]byte(rawUUID)) +} + +//ExtractScoreFromUUID extracts the score from the UUID. +func ExtractScoreFromUUID(UUID string) int64 { + if IsEmptyStr(UUID) { + return 0 + } + + rawData, err := base64.StdEncoding.DecodeString(UUID) + if err != nil { + return 0 + } + + data := string(rawData) + fragments := strings.Split(data, ":") + if len(fragments) != 3 { + return 0 + } + + score, err := strconv.ParseInt(fragments[2], 10, 64) + if err != nil { + return 0 + } + + return score +} + +//KeyNamespacePrefix returns the based key based on the namespace. +func KeyNamespacePrefix(namespace string) string { + ns := strings.TrimSpace(namespace) + if !strings.HasSuffix(ns, ":") { + return fmt.Sprintf("%s:", ns) + } + + return ns +} + +//KeyPeriod returns the key of period +func KeyPeriod(namespace string) string { + return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "period") +} + +//KeyPeriodicPolicy return the key of periodic policies. +func KeyPeriodicPolicy(namespace string) string { + return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies") +} + +//KeyPeriodicNotification returns the key of periodic pub/sub channel. +func KeyPeriodicNotification(namespace string) string { + return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications") +}