diff --git a/src/jobservice_v2/api/handler.go b/src/jobservice_v2/api/handler.go index 4c7c2d8ed..280d7207a 100644 --- a/src/jobservice_v2/api/handler.go +++ b/src/jobservice_v2/api/handler.go @@ -28,6 +28,9 @@ type Handler interface { //HandleCheckStatusReq is used to handle the job service healthy status checking request. HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) + + //HandleJobLogReq is used to handle the request of getting job logs + HandleJobLogReq(w http.ResponseWriter, req *http.Request) } //DefaultHandler is the default request handler which implements the Handler interface. @@ -88,7 +91,13 @@ func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Reque jobStats, err := dh.controller.GetJob(jobID) if err != nil { - dh.handleError(w, http.StatusInternalServerError, errs.GetJobStatsError(err)) + code := http.StatusInternalServerError + backErr := errs.GetJobStatsError(err) + if errs.IsObjectNotFoundError(err) { + code = http.StatusNotFound + backErr = err + } + dh.handleError(w, code, backErr) return } @@ -126,17 +135,35 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re switch jobActionReq.Action { case opm.CtlCommandStop: if err := dh.controller.StopJob(jobID); err != nil { - dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err)) + code := http.StatusInternalServerError + backErr := errs.StopJobError(err) + if errs.IsObjectNotFoundError(err) { + code = http.StatusNotFound + backErr = err + } + dh.handleError(w, code, backErr) return } case opm.CtlCommandCancel: if err := dh.controller.CancelJob(jobID); err != nil { - dh.handleError(w, http.StatusInternalServerError, errs.CancelJobError(err)) + code := http.StatusInternalServerError + backErr := errs.CancelJobError(err) + if errs.IsObjectNotFoundError(err) { + code = http.StatusNotFound + backErr = err + } + dh.handleError(w, code, backErr) return } case opm.CtlCommandRetry: if err := dh.controller.RetryJob(jobID); err != nil { - dh.handleError(w, http.StatusInternalServerError, errs.RetryJobError(err)) + code := http.StatusInternalServerError + backErr := errs.RetryJobError(err) + if errs.IsObjectNotFoundError(err) { + code = http.StatusNotFound + backErr = err + } + dh.handleError(w, code, backErr) return } default: @@ -168,6 +195,31 @@ func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http. w.Write(data) } +//HandleJobLogReq is implementation of method defined in interface 'Handler' +func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Request) { + if !dh.preCheck(w) { + return + } + + vars := mux.Vars(req) + jobID := vars["job_id"] + + logData, err := dh.controller.GetJobLogData(jobID) + if err != nil { + code := http.StatusInternalServerError + backErr := errs.GetJobLogError(err) + if errs.IsObjectNotFoundError(err) { + code = http.StatusNotFound + backErr = err + } + dh.handleError(w, code, backErr) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(logData) +} + func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, object interface{}) ([]byte, bool) { data, err := json.Marshal(object) if err != nil { diff --git a/src/jobservice_v2/api/router.go b/src/jobservice_v2/api/router.go index 3ed964c72..e238ee871 100644 --- a/src/jobservice_v2/api/router.go +++ b/src/jobservice_v2/api/router.go @@ -54,5 +54,6 @@ func (br *BaseRouter) registerRoutes() { subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost) subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet) subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost) + subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet) subRouter.HandleFunc("/stats", br.handler.HandleCheckStatusReq).Methods(http.MethodGet) } diff --git a/src/jobservice_v2/config.yml b/src/jobservice_v2/config.yml index a11d95d3f..bf992d490 100644 --- a/src/jobservice_v2/config.yml +++ b/src/jobservice_v2/config.yml @@ -20,5 +20,9 @@ worker_pool: host: "10.160.178.186" port: 6379 namespace: "harbor_job_service" -#Logger for job + +#Logger for job +logger: + path: "/Users/szou/tmp/job_logs" + level: "INFO" diff --git a/src/jobservice_v2/config/config.go b/src/jobservice_v2/config/config.go index 1bbd8836d..c070dbd0a 100644 --- a/src/jobservice_v2/config/config.go +++ b/src/jobservice_v2/config/config.go @@ -8,6 +8,7 @@ import ( "fmt" "io/ioutil" "strconv" + "strings" "github.com/vmware/harbor/src/jobservice_v2/utils" yaml "gopkg.in/yaml.v2" @@ -23,6 +24,8 @@ const ( jobServiceRedisHost = "JOB_SERVICE_POOL_REDIS_HOST" jobServiceRedisPort = "JOB_SERVICE_POOL_REDIS_PORT" jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE" + jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH" + jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL" //JobServiceProtocolHTTPS points to the 'https' protocol JobServiceProtocolHTTPS = "https" @@ -33,6 +36,9 @@ const ( JobServicePoolBackendRedis = "redis" ) +//DefaultConfig is the default configuration reference +var DefaultConfig = &Configuration{} + //Configuration loads and keeps the related configuration items of job service. type Configuration struct { //Protocol server listening on: https/http @@ -46,22 +52,25 @@ type Configuration struct { //Configurations of worker pool PoolConfig *PoolConfig `yaml:"worker_pool,omitempty"` + + //Logger configurations + LoggerConfig *LoggerConfig `yaml:"logger,omitempty"` } -//HTTPSConfig keep additional configurations when using https protocol +//HTTPSConfig keeps additional configurations when using https protocol type HTTPSConfig struct { Cert string `yaml:"cert"` Key string `yaml:"key"` } -//RedisPoolConfig keep redis pool info. +//RedisPoolConfig keeps redis pool info. type RedisPoolConfig struct { Host string `yaml:"host"` Port uint `yaml:"port"` Namespace string `yaml:"namespace"` } -//PoolConfig keep worker pool configurations. +//PoolConfig keeps worker pool configurations. type PoolConfig struct { //0 means unlimited WorkerCount uint `yaml:"workers"` @@ -69,6 +78,12 @@ type PoolConfig struct { RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"` } +//LoggerConfig keeps logger configurations. +type LoggerConfig struct { + BasePath string `yaml:"path"` + LogLevel string `yaml:"level"` +} + //Load the configuration options from the specified yaml file. //If the yaml file is specified and existing, load configurations from yaml file first; //If detecting env variables is specified, load configurations from env variables; @@ -83,7 +98,6 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error { if err != nil { return err } - if err = yaml.Unmarshal(data, c); err != nil { return err } @@ -98,6 +112,24 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error { return c.validate() } +//GetLogBasePath returns the log base path config +func GetLogBasePath() string { + if DefaultConfig.LoggerConfig != nil { + return DefaultConfig.LoggerConfig.BasePath + } + + return "" +} + +//GetLogLevel returns the log level +func GetLogLevel() string { + if DefaultConfig.LoggerConfig != nil { + return DefaultConfig.LoggerConfig.LogLevel + } + + return "" +} + //Load env variables func (c *Configuration) loadEnvs() { prot := utils.ReadEnv(jobServiceProtocol) @@ -183,6 +215,16 @@ func (c *Configuration) loadEnvs() { } } + //logger + loggerPath := utils.ReadEnv(jobServiceLoggerBasePath) + if !utils.IsEmptyStr(loggerPath) { + c.LoggerConfig.BasePath = loggerPath + } + loggerLevel := utils.ReadEnv(jobServiceLoggerLevel) + if !utils.IsEmptyStr(loggerLevel) { + c.LoggerConfig.LogLevel = loggerLevel + } + } //Check if the configurations are valid settings. @@ -236,5 +278,18 @@ func (c *Configuration) validate() error { } } + if c.LoggerConfig == nil { + return errors.New("missing logger config") + } + + if !utils.DirExists(c.LoggerConfig.BasePath) { + return errors.New("logger path should be an existing dir") + } + + validLevels := "DEBUG,INFO,WARNING,ERROR,FATAL" + if !strings.Contains(validLevels, c.LoggerConfig.LogLevel) { + return fmt.Errorf("logger level can only be one of: %s", validLevels) + } + return nil //valid } diff --git a/src/jobservice_v2/core/controller.go b/src/jobservice_v2/core/controller.go index ca3d94e65..7a15795dd 100644 --- a/src/jobservice_v2/core/controller.go +++ b/src/jobservice_v2/core/controller.go @@ -3,8 +3,11 @@ package core import ( "errors" "fmt" + "io/ioutil" "github.com/robfig/cron" + "github.com/vmware/harbor/src/jobservice_v2/config" + "github.com/vmware/harbor/src/jobservice_v2/errs" "github.com/vmware/harbor/src/jobservice_v2/job" "github.com/vmware/harbor/src/jobservice_v2/models" "github.com/vmware/harbor/src/jobservice_v2/pool" @@ -118,6 +121,25 @@ func (c *Controller) RetryJob(jobID string) error { return c.backendPool.RetryJob(jobID) } +//GetJobLogData is used to return the log text data for the specified job if exists +func (c *Controller) GetJobLogData(jobID string) ([]byte, error) { + if utils.IsEmptyStr(jobID) { + return nil, errors.New("empty job ID") + } + + logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), jobID) + if !utils.FileExists(logPath) { + return nil, errs.NoObjectFoundError(fmt.Sprintf("%s.log", jobID)) + } + + logData, err := ioutil.ReadFile(logPath) + if err != nil { + return nil, err + } + + return logData, nil +} + //CheckStatus is implementation of same method in core interface. func (c *Controller) CheckStatus() (models.JobPoolStats, error) { return c.backendPool.Stats() diff --git a/src/jobservice_v2/core/interface.go b/src/jobservice_v2/core/interface.go index f123365c2..96865a1df 100644 --- a/src/jobservice_v2/core/interface.go +++ b/src/jobservice_v2/core/interface.go @@ -53,4 +53,7 @@ type Interface interface { //CheckStatus is used to handle the job service healthy status checking request. CheckStatus() (models.JobPoolStats, error) + + //GetJobLogData is used to return the log text data for the specified job if exists + GetJobLogData(jobID string) ([]byte, error) } diff --git a/src/jobservice_v2/env/job_context.go b/src/jobservice_v2/env/job_context.go index cc498c66a..93d4e8e36 100644 --- a/src/jobservice_v2/env/job_context.go +++ b/src/jobservice_v2/env/job_context.go @@ -2,7 +2,11 @@ package env -import "context" +import ( + "context" + + "github.com/vmware/harbor/src/jobservice_v2/logger" +) //JobContext is combination of BaseContext and other job specified resources. //JobContext will be the real execution context for one job. @@ -45,6 +49,9 @@ type JobContext interface { // op command if have // flag to indicate if have command OPCommand() (string, bool) + + //Return the logger + GetLogger() logger.Interface } //JobData defines job context dependencies. diff --git a/src/jobservice_v2/errs/errors.go b/src/jobservice_v2/errs/errors.go index 5a472d526..b7e9715c5 100644 --- a/src/jobservice_v2/errs/errors.go +++ b/src/jobservice_v2/errs/errors.go @@ -28,10 +28,14 @@ const ( StopJobErrorCode //CancelJobErrorCode is code for the error of cancelling job CancelJobErrorCode - //RetryJobErrorCode is code for the error of retry job + //RetryJobErrorCode is code for the error of retrying job RetryJobErrorCode //UnknownActionNameErrorCode is code for the case of unknown action name UnknownActionNameErrorCode + //GetJobLogErrorCode is code for the error of getting job log + GetJobLogErrorCode + //NoObjectFoundErrorCode is code for the error of no object found + NoObjectFoundErrorCode ) //baseError ... @@ -109,6 +113,11 @@ func UnknownActionNameError(err error) error { return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error()) } +//GetJobLogError is error for the case of getting job log failed +func GetJobLogError(err error) error { + return New(GetJobLogErrorCode, "Failed to get the job log", err.Error()) +} + //jobStoppedError is designed for the case of stopping job. type jobStoppedError struct { baseError @@ -139,6 +148,22 @@ func JobCancelledError() error { } } +//objectNotFound is designed for the case of no object found +type objectNotFoundError struct { + baseError +} + +//NoObjectFoundError is error wrapper for the case of no object found +func NoObjectFoundError(object string) error { + return objectNotFoundError{ + baseError{ + Code: NoObjectFoundErrorCode, + Err: "object is not found", + Description: object, + }, + } +} + //IsJobStoppedError return true if the error is jobStoppedError func IsJobStoppedError(err error) bool { _, ok := err.(jobStoppedError) @@ -150,3 +175,9 @@ func IsJobCancelledError(err error) bool { _, ok := err.(jobCancelledError) return ok } + +//IsObjectNotFoundError return true if the error is objectNotFoundError +func IsObjectNotFoundError(err error) bool { + _, ok := err.(objectNotFoundError) + return ok +} diff --git a/src/jobservice_v2/extfile.cnf b/src/jobservice_v2/extfile.cnf deleted file mode 100644 index cce0cd532..000000000 --- a/src/jobservice_v2/extfile.cnf +++ /dev/null @@ -1 +0,0 @@ -subjectAltName = IP:10.4.142.42 diff --git a/src/jobservice_v2/job/impl/context.go b/src/jobservice_v2/job/impl/context.go index 7cf9cc0db..e46793afa 100644 --- a/src/jobservice_v2/job/impl/context.go +++ b/src/jobservice_v2/job/impl/context.go @@ -5,11 +5,14 @@ package impl import ( "context" "errors" + "fmt" "reflect" - hlog "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/jobservice_v2/config" "github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/job" + jlogger "github.com/vmware/harbor/src/jobservice_v2/job/impl/logger" + "github.com/vmware/harbor/src/jobservice_v2/logger" ) //Context ... @@ -18,7 +21,7 @@ type Context struct { sysContext context.Context //Logger for job - logger *hlog.Logger + logger logger.Interface //op command func opCommandFunc job.CheckOPCmdFunc @@ -46,7 +49,13 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) { sysContext: c.sysContext, } - //TODO:Init logger here + //Init logger here + logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), dep.ID) + jContext.logger = jlogger.New(logPath, config.GetLogLevel()) + if jContext.logger == nil { + return nil, errors.New("failed to initialize job logger") + } + if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok { if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func { if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok { @@ -54,6 +63,9 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) { } } } + if jContext.opCommandFunc == nil { + return nil, errors.New("failed to inject opCommandFunc") + } if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok { if reflect.TypeOf(checkInFunc).Kind() == reflect.Func { @@ -63,6 +75,10 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) { } } + if jContext.checkInFunc == nil { + return nil, errors.New("failed to inject checkInFunc") + } + return jContext, nil } @@ -95,3 +111,8 @@ func (c *Context) OPCommand() (string, bool) { return "", false } + +//GetLogger returns the logger +func (c *Context) GetLogger() logger.Interface { + return c.logger +} diff --git a/src/jobservice_v2/job/impl/logger/job_logger.go b/src/jobservice_v2/job/impl/logger/job_logger.go new file mode 100644 index 000000000..d4b87f7e8 --- /dev/null +++ b/src/jobservice_v2/job/impl/logger/job_logger.go @@ -0,0 +1,101 @@ +package logger + +import ( + "os" + "strings" + + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/jobservice_v2/logger" +) + +//JobLogger is an implementation of logger.Interface. +//It used in the job to output logs to the logfile. +type JobLogger struct { + backendLogger *log.Logger +} + +//New logger +//nil might be returned +func New(logPath string, level string) logger.Interface { + f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil + } + logLevel := parseLevel(level) + backendLogger := log.New(f, log.NewTextFormatter(), logLevel) + + return &JobLogger{ + backendLogger: backendLogger, + } +} + +//Debug ... +func (jl *JobLogger) Debug(v ...interface{}) { + jl.backendLogger.Debug(v...) +} + +//Debugf with format +func (jl *JobLogger) Debugf(format string, v ...interface{}) { + jl.backendLogger.Debugf(format, v...) +} + +//Info ... +func (jl *JobLogger) Info(v ...interface{}) { + jl.backendLogger.Info(v...) +} + +//Infof with format +func (jl *JobLogger) Infof(format string, v ...interface{}) { + jl.backendLogger.Infof(format, v...) +} + +//Warning ... +func (jl *JobLogger) Warning(v ...interface{}) { + jl.backendLogger.Warning(v...) +} + +//Warningf with format +func (jl *JobLogger) Warningf(format string, v ...interface{}) { + jl.backendLogger.Warningf(format, v...) +} + +//Error ... +func (jl *JobLogger) Error(v ...interface{}) { + jl.backendLogger.Error(v...) +} + +//Errorf with format +func (jl *JobLogger) Errorf(format string, v ...interface{}) { + jl.backendLogger.Errorf(format, v...) +} + +//Fatal error +func (jl *JobLogger) Fatal(v ...interface{}) { + jl.backendLogger.Fatal(v...) +} + +//Fatalf error +func (jl *JobLogger) Fatalf(format string, v ...interface{}) { + jl.backendLogger.Fatalf(format, v...) +} + +func parseLevel(lvl string) log.Level { + + var level = log.WarningLevel + + switch strings.ToLower(lvl) { + case "debug": + level = log.DebugLevel + case "info": + level = log.InfoLevel + case "warning": + level = log.WarningLevel + case "error": + level = log.ErrorLevel + case "fatal": + level = log.FatalLevel + default: + } + + return level +} diff --git a/src/jobservice_v2/job/impl/replication_job.go b/src/jobservice_v2/job/impl/replication_job.go index 5feb03cfa..f82d30428 100644 --- a/src/jobservice_v2/job/impl/replication_job.go +++ b/src/jobservice_v2/job/impl/replication_job.go @@ -47,42 +47,47 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error { //Run the replication logic here. func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) error { + logger := ctx.GetLogger() + defer func() { + logger.Info("I'm finished, exit!") fmt.Println("I'm finished, exit!") }() - fmt.Println("=======Replication job running=======") - fmt.Printf("params: %#v\n", params) - fmt.Printf("context: %#v\n", ctx) + logger.Info("=======Replication job running=======") + logger.Infof("params: %#v\n", params) + logger.Infof("context: %#v\n", ctx) /*if 1 != 0 { return errors.New("I suicide") }*/ - fmt.Println("check in 30%") + logger.Info("check in 30%") ctx.Checkin("30%") - time.Sleep(5 * time.Second) - fmt.Println("check in 60%") + time.Sleep(2 * time.Second) + logger.Warning("check in 60%") ctx.Checkin("60%") - time.Sleep(5 * time.Second) - fmt.Println("check in 100%") + time.Sleep(2 * time.Second) + logger.Debug("check in 100%") ctx.Checkin("100%") time.Sleep(1 * time.Second) //HOLD ON FOR A WHILE - fmt.Println("Holding for 20 sec") - <-time.After(20 * time.Second) - fmt.Println("I'm back, check if I'm stopped/cancelled") + logger.Error("Holding for 20 sec") + <-time.After(10 * time.Second) + //logger.Fatal("I'm back, check if I'm stopped/cancelled") if cmd, ok := ctx.OPCommand(); ok { - fmt.Printf("cmd=%s\n", cmd) + logger.Infof("cmd=%s\n", cmd) if cmd == opm.CtlCommandCancel { - fmt.Println("exit for receiving cancel signal") + logger.Info("exit for receiving cancel signal") return errs.JobCancelledError() } - fmt.Println("exit for receiving stop signal") + logger.Info("exit for receiving stop signal") return errs.JobStoppedError() } + fmt.Println("I'm here") + return nil } diff --git a/src/jobservice_v2/logger/interface.go b/src/jobservice_v2/logger/interface.go new file mode 100644 index 000000000..b9b9d9d57 --- /dev/null +++ b/src/jobservice_v2/logger/interface.go @@ -0,0 +1,36 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package logger + +//Interface for logger. +type Interface interface { + //For debuging + Debug(v ...interface{}) + + //For debuging with format + Debugf(format string, v ...interface{}) + + //For logging info + Info(v ...interface{}) + + //For logging info with format + Infof(format string, v ...interface{}) + + //For warning + Warning(v ...interface{}) + + //For warning with format + Warningf(format string, v ...interface{}) + + //For logging error + Error(v ...interface{}) + + //For logging error with format + Errorf(format string, v ...interface{}) + + //For fatal error + Fatal(v ...interface{}) + + //For fatal error with error + Fatalf(format string, v ...interface{}) +} diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go index 22e9f937b..c7c889158 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -12,6 +12,8 @@ import ( "strconv" "time" + "github.com/vmware/harbor/src/jobservice_v2/errs" + "github.com/vmware/harbor/src/jobservice_v2/period" "github.com/robfig/cron" @@ -543,7 +545,7 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err } if vals == nil || len(vals) == 0 { - return models.JobStats{}, fmt.Errorf("job '%s' is not found", jobID) + return models.JobStats{}, errs.NoObjectFoundError(fmt.Sprintf("job '%s'", jobID)) } res := models.JobStats{ diff --git a/src/jobservice_v2/pool/redis_job_wrapper.go b/src/jobservice_v2/pool/redis_job_wrapper.go index c0ecfc655..28895da1e 100644 --- a/src/jobservice_v2/pool/redis_job_wrapper.go +++ b/src/jobservice_v2/pool/redis_job_wrapper.go @@ -30,16 +30,29 @@ func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManag //Run the job func (rj *RedisJob) Run(j *work.Job) error { - var cancelled = false + var ( + cancelled = false + buildContextFailed = false + runningJob job.Interface + err error + execContext env.JobContext + ) - execContext, err := rj.buildContext(j) + execContext, err = rj.buildContext(j) if err != nil { - return err + buildContextFailed = true + goto FAILED //no need to retry } - runningJob := Wrap(rj.job) + //Wrap job + runningJob = Wrap(rj.job) + defer func() { - if rj.shouldDisableRetry(runningJob, j, cancelled) { + if err == nil { + return //nothing need to do + } + + if buildContextFailed || rj.shouldDisableRetry(runningJob, j, cancelled) { j.Fails = 10000000000 //Make it big enough to avoid retrying now := time.Now().Unix() go func() { @@ -75,6 +88,7 @@ func (rj *RedisJob) Run(j *work.Job) error { return err //need to resume } +FAILED: rj.jobFailed(j.ID) return err } diff --git a/src/jobservice_v2/runtime/bootstrap.go b/src/jobservice_v2/runtime/bootstrap.go index be97878f9..7d0680ff0 100644 --- a/src/jobservice_v2/runtime/bootstrap.go +++ b/src/jobservice_v2/runtime/bootstrap.go @@ -31,8 +31,7 @@ type Bootstrap struct{} //Return error if meet any problems. func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) { //Load configurations - cfg := config.Configuration{} - if err := cfg.Load(configFile, detectEnv); err != nil { + if err := config.DefaultConfig.Load(configFile, detectEnv); err != nil { log.Errorf("Failed to load configurations with error: %s\n", err) return } @@ -58,16 +57,16 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) { //Start the pool var backendPool pool.Interface - if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { - backendPool = bs.loadAndRunRedisWorkerPool(rootContext, cfg) + if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis { + backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig) } //Initialize controller ctl := core.NewController(backendPool) //Start the API server - apiServer := bs.loadAndRunAPIServer(rootContext, cfg, ctl) - log.Infof("Server is started at %s:%d with %s", "", cfg.Port, cfg.Protocol) + apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl) + log.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol) //Block here sig := make(chan os.Signal, 1) @@ -107,7 +106,7 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) { } //Load and run the API server. -func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configuration, ctl *core.Controller) *api.Server { +func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configuration, ctl *core.Controller) *api.Server { //Initialized API server handler := api.NewDefaultHandler(ctl) router := api.NewBaseRouter(handler) @@ -128,7 +127,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configurat } //Load and run the worker pool -func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg config.Configuration) pool.Interface { +func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface { redisPoolCfg := pool.RedisPoolConfig{ RedisHost: cfg.PoolConfig.RedisPoolCfg.Host, RedisPort: cfg.PoolConfig.RedisPoolCfg.Port, diff --git a/src/jobservice_v2/utils/utils.go b/src/jobservice_v2/utils/utils.go index b7f7fe0bc..7fcfaa185 100644 --- a/src/jobservice_v2/utils/utils.go +++ b/src/jobservice_v2/utils/utils.go @@ -24,14 +24,34 @@ func ReadEnv(key string) string { //FileExists check if the specified exists. func FileExists(file string) bool { if !IsEmptyStr(file) { - if _, err := os.Stat(file); err == nil { + _, err := os.Stat(file) + if err == nil { return true } + if os.IsNotExist(err) { + return false + } + + return true } return false } +//DirExists check if the specified dir exists +func DirExists(path string) bool { + if IsEmptyStr(path) { + return false + } + + f, err := os.Stat(path) + if err != nil { + return false + } + + return f.IsDir() +} + //IsValidPort check if port is valid. func IsValidPort(port uint) bool { return port != 0 && port < 65536