mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-23 10:45:45 +01:00
Inject logger into the execution context of running job for logging
define logger interface implement a logger base on common log package add getting log API inject logger to the job execution context
This commit is contained in:
parent
b001180d26
commit
579b70c5e4
@ -28,6 +28,9 @@ type Handler interface {
|
||||
|
||||
//HandleCheckStatusReq is used to handle the job service healthy status checking request.
|
||||
HandleCheckStatusReq(w http.ResponseWriter, req *http.Request)
|
||||
|
||||
//HandleJobLogReq is used to handle the request of getting job logs
|
||||
HandleJobLogReq(w http.ResponseWriter, req *http.Request)
|
||||
}
|
||||
|
||||
//DefaultHandler is the default request handler which implements the Handler interface.
|
||||
@ -88,7 +91,13 @@ func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Reque
|
||||
|
||||
jobStats, err := dh.controller.GetJob(jobID)
|
||||
if err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.GetJobStatsError(err))
|
||||
code := http.StatusInternalServerError
|
||||
backErr := errs.GetJobStatsError(err)
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
code = http.StatusNotFound
|
||||
backErr = err
|
||||
}
|
||||
dh.handleError(w, code, backErr)
|
||||
return
|
||||
}
|
||||
|
||||
@ -126,17 +135,35 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
|
||||
switch jobActionReq.Action {
|
||||
case opm.CtlCommandStop:
|
||||
if err := dh.controller.StopJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
|
||||
code := http.StatusInternalServerError
|
||||
backErr := errs.StopJobError(err)
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
code = http.StatusNotFound
|
||||
backErr = err
|
||||
}
|
||||
dh.handleError(w, code, backErr)
|
||||
return
|
||||
}
|
||||
case opm.CtlCommandCancel:
|
||||
if err := dh.controller.CancelJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.CancelJobError(err))
|
||||
code := http.StatusInternalServerError
|
||||
backErr := errs.CancelJobError(err)
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
code = http.StatusNotFound
|
||||
backErr = err
|
||||
}
|
||||
dh.handleError(w, code, backErr)
|
||||
return
|
||||
}
|
||||
case opm.CtlCommandRetry:
|
||||
if err := dh.controller.RetryJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.RetryJobError(err))
|
||||
code := http.StatusInternalServerError
|
||||
backErr := errs.RetryJobError(err)
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
code = http.StatusNotFound
|
||||
backErr = err
|
||||
}
|
||||
dh.handleError(w, code, backErr)
|
||||
return
|
||||
}
|
||||
default:
|
||||
@ -168,6 +195,31 @@ func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.
|
||||
w.Write(data)
|
||||
}
|
||||
|
||||
//HandleJobLogReq is implementation of method defined in interface 'Handler'
|
||||
func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Request) {
|
||||
if !dh.preCheck(w) {
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(req)
|
||||
jobID := vars["job_id"]
|
||||
|
||||
logData, err := dh.controller.GetJobLogData(jobID)
|
||||
if err != nil {
|
||||
code := http.StatusInternalServerError
|
||||
backErr := errs.GetJobLogError(err)
|
||||
if errs.IsObjectNotFoundError(err) {
|
||||
code = http.StatusNotFound
|
||||
backErr = err
|
||||
}
|
||||
dh.handleError(w, code, backErr)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(logData)
|
||||
}
|
||||
|
||||
func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, object interface{}) ([]byte, bool) {
|
||||
data, err := json.Marshal(object)
|
||||
if err != nil {
|
||||
|
@ -54,5 +54,6 @@ func (br *BaseRouter) registerRoutes() {
|
||||
subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost)
|
||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost)
|
||||
subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/stats", br.handler.HandleCheckStatusReq).Methods(http.MethodGet)
|
||||
}
|
||||
|
@ -20,5 +20,9 @@ worker_pool:
|
||||
host: "10.160.178.186"
|
||||
port: 6379
|
||||
namespace: "harbor_job_service"
|
||||
#Logger for job
|
||||
|
||||
#Logger for job
|
||||
logger:
|
||||
path: "/Users/szou/tmp/job_logs"
|
||||
level: "INFO"
|
||||
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
@ -23,6 +24,8 @@ const (
|
||||
jobServiceRedisHost = "JOB_SERVICE_POOL_REDIS_HOST"
|
||||
jobServiceRedisPort = "JOB_SERVICE_POOL_REDIS_PORT"
|
||||
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
|
||||
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
|
||||
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
|
||||
|
||||
//JobServiceProtocolHTTPS points to the 'https' protocol
|
||||
JobServiceProtocolHTTPS = "https"
|
||||
@ -33,6 +36,9 @@ const (
|
||||
JobServicePoolBackendRedis = "redis"
|
||||
)
|
||||
|
||||
//DefaultConfig is the default configuration reference
|
||||
var DefaultConfig = &Configuration{}
|
||||
|
||||
//Configuration loads and keeps the related configuration items of job service.
|
||||
type Configuration struct {
|
||||
//Protocol server listening on: https/http
|
||||
@ -46,22 +52,25 @@ type Configuration struct {
|
||||
|
||||
//Configurations of worker pool
|
||||
PoolConfig *PoolConfig `yaml:"worker_pool,omitempty"`
|
||||
|
||||
//Logger configurations
|
||||
LoggerConfig *LoggerConfig `yaml:"logger,omitempty"`
|
||||
}
|
||||
|
||||
//HTTPSConfig keep additional configurations when using https protocol
|
||||
//HTTPSConfig keeps additional configurations when using https protocol
|
||||
type HTTPSConfig struct {
|
||||
Cert string `yaml:"cert"`
|
||||
Key string `yaml:"key"`
|
||||
}
|
||||
|
||||
//RedisPoolConfig keep redis pool info.
|
||||
//RedisPoolConfig keeps redis pool info.
|
||||
type RedisPoolConfig struct {
|
||||
Host string `yaml:"host"`
|
||||
Port uint `yaml:"port"`
|
||||
Namespace string `yaml:"namespace"`
|
||||
}
|
||||
|
||||
//PoolConfig keep worker pool configurations.
|
||||
//PoolConfig keeps worker pool configurations.
|
||||
type PoolConfig struct {
|
||||
//0 means unlimited
|
||||
WorkerCount uint `yaml:"workers"`
|
||||
@ -69,6 +78,12 @@ type PoolConfig struct {
|
||||
RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"`
|
||||
}
|
||||
|
||||
//LoggerConfig keeps logger configurations.
|
||||
type LoggerConfig struct {
|
||||
BasePath string `yaml:"path"`
|
||||
LogLevel string `yaml:"level"`
|
||||
}
|
||||
|
||||
//Load the configuration options from the specified yaml file.
|
||||
//If the yaml file is specified and existing, load configurations from yaml file first;
|
||||
//If detecting env variables is specified, load configurations from env variables;
|
||||
@ -83,7 +98,6 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = yaml.Unmarshal(data, c); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -98,6 +112,24 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error {
|
||||
return c.validate()
|
||||
}
|
||||
|
||||
//GetLogBasePath returns the log base path config
|
||||
func GetLogBasePath() string {
|
||||
if DefaultConfig.LoggerConfig != nil {
|
||||
return DefaultConfig.LoggerConfig.BasePath
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
//GetLogLevel returns the log level
|
||||
func GetLogLevel() string {
|
||||
if DefaultConfig.LoggerConfig != nil {
|
||||
return DefaultConfig.LoggerConfig.LogLevel
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
//Load env variables
|
||||
func (c *Configuration) loadEnvs() {
|
||||
prot := utils.ReadEnv(jobServiceProtocol)
|
||||
@ -183,6 +215,16 @@ func (c *Configuration) loadEnvs() {
|
||||
}
|
||||
}
|
||||
|
||||
//logger
|
||||
loggerPath := utils.ReadEnv(jobServiceLoggerBasePath)
|
||||
if !utils.IsEmptyStr(loggerPath) {
|
||||
c.LoggerConfig.BasePath = loggerPath
|
||||
}
|
||||
loggerLevel := utils.ReadEnv(jobServiceLoggerLevel)
|
||||
if !utils.IsEmptyStr(loggerLevel) {
|
||||
c.LoggerConfig.LogLevel = loggerLevel
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Check if the configurations are valid settings.
|
||||
@ -236,5 +278,18 @@ func (c *Configuration) validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if c.LoggerConfig == nil {
|
||||
return errors.New("missing logger config")
|
||||
}
|
||||
|
||||
if !utils.DirExists(c.LoggerConfig.BasePath) {
|
||||
return errors.New("logger path should be an existing dir")
|
||||
}
|
||||
|
||||
validLevels := "DEBUG,INFO,WARNING,ERROR,FATAL"
|
||||
if !strings.Contains(validLevels, c.LoggerConfig.LogLevel) {
|
||||
return fmt.Errorf("logger level can only be one of: %s", validLevels)
|
||||
}
|
||||
|
||||
return nil //valid
|
||||
}
|
||||
|
@ -3,8 +3,11 @@ package core
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/robfig/cron"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/config"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/pool"
|
||||
@ -118,6 +121,25 @@ func (c *Controller) RetryJob(jobID string) error {
|
||||
return c.backendPool.RetryJob(jobID)
|
||||
}
|
||||
|
||||
//GetJobLogData is used to return the log text data for the specified job if exists
|
||||
func (c *Controller) GetJobLogData(jobID string) ([]byte, error) {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return nil, errors.New("empty job ID")
|
||||
}
|
||||
|
||||
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), jobID)
|
||||
if !utils.FileExists(logPath) {
|
||||
return nil, errs.NoObjectFoundError(fmt.Sprintf("%s.log", jobID))
|
||||
}
|
||||
|
||||
logData, err := ioutil.ReadFile(logPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return logData, nil
|
||||
}
|
||||
|
||||
//CheckStatus is implementation of same method in core interface.
|
||||
func (c *Controller) CheckStatus() (models.JobPoolStats, error) {
|
||||
return c.backendPool.Stats()
|
||||
|
@ -53,4 +53,7 @@ type Interface interface {
|
||||
|
||||
//CheckStatus is used to handle the job service healthy status checking request.
|
||||
CheckStatus() (models.JobPoolStats, error)
|
||||
|
||||
//GetJobLogData is used to return the log text data for the specified job if exists
|
||||
GetJobLogData(jobID string) ([]byte, error)
|
||||
}
|
||||
|
9
src/jobservice_v2/env/job_context.go
vendored
9
src/jobservice_v2/env/job_context.go
vendored
@ -2,7 +2,11 @@
|
||||
|
||||
package env
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/logger"
|
||||
)
|
||||
|
||||
//JobContext is combination of BaseContext and other job specified resources.
|
||||
//JobContext will be the real execution context for one job.
|
||||
@ -45,6 +49,9 @@ type JobContext interface {
|
||||
// op command if have
|
||||
// flag to indicate if have command
|
||||
OPCommand() (string, bool)
|
||||
|
||||
//Return the logger
|
||||
GetLogger() logger.Interface
|
||||
}
|
||||
|
||||
//JobData defines job context dependencies.
|
||||
|
@ -28,10 +28,14 @@ const (
|
||||
StopJobErrorCode
|
||||
//CancelJobErrorCode is code for the error of cancelling job
|
||||
CancelJobErrorCode
|
||||
//RetryJobErrorCode is code for the error of retry job
|
||||
//RetryJobErrorCode is code for the error of retrying job
|
||||
RetryJobErrorCode
|
||||
//UnknownActionNameErrorCode is code for the case of unknown action name
|
||||
UnknownActionNameErrorCode
|
||||
//GetJobLogErrorCode is code for the error of getting job log
|
||||
GetJobLogErrorCode
|
||||
//NoObjectFoundErrorCode is code for the error of no object found
|
||||
NoObjectFoundErrorCode
|
||||
)
|
||||
|
||||
//baseError ...
|
||||
@ -109,6 +113,11 @@ func UnknownActionNameError(err error) error {
|
||||
return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error())
|
||||
}
|
||||
|
||||
//GetJobLogError is error for the case of getting job log failed
|
||||
func GetJobLogError(err error) error {
|
||||
return New(GetJobLogErrorCode, "Failed to get the job log", err.Error())
|
||||
}
|
||||
|
||||
//jobStoppedError is designed for the case of stopping job.
|
||||
type jobStoppedError struct {
|
||||
baseError
|
||||
@ -139,6 +148,22 @@ func JobCancelledError() error {
|
||||
}
|
||||
}
|
||||
|
||||
//objectNotFound is designed for the case of no object found
|
||||
type objectNotFoundError struct {
|
||||
baseError
|
||||
}
|
||||
|
||||
//NoObjectFoundError is error wrapper for the case of no object found
|
||||
func NoObjectFoundError(object string) error {
|
||||
return objectNotFoundError{
|
||||
baseError{
|
||||
Code: NoObjectFoundErrorCode,
|
||||
Err: "object is not found",
|
||||
Description: object,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
//IsJobStoppedError return true if the error is jobStoppedError
|
||||
func IsJobStoppedError(err error) bool {
|
||||
_, ok := err.(jobStoppedError)
|
||||
@ -150,3 +175,9 @@ func IsJobCancelledError(err error) bool {
|
||||
_, ok := err.(jobCancelledError)
|
||||
return ok
|
||||
}
|
||||
|
||||
//IsObjectNotFoundError return true if the error is objectNotFoundError
|
||||
func IsObjectNotFoundError(err error) bool {
|
||||
_, ok := err.(objectNotFoundError)
|
||||
return ok
|
||||
}
|
||||
|
@ -1 +0,0 @@
|
||||
subjectAltName = IP:10.4.142.42
|
@ -5,11 +5,14 @@ package impl
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
hlog "github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/config"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
jlogger "github.com/vmware/harbor/src/jobservice_v2/job/impl/logger"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/logger"
|
||||
)
|
||||
|
||||
//Context ...
|
||||
@ -18,7 +21,7 @@ type Context struct {
|
||||
sysContext context.Context
|
||||
|
||||
//Logger for job
|
||||
logger *hlog.Logger
|
||||
logger logger.Interface
|
||||
|
||||
//op command func
|
||||
opCommandFunc job.CheckOPCmdFunc
|
||||
@ -46,7 +49,13 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
|
||||
sysContext: c.sysContext,
|
||||
}
|
||||
|
||||
//TODO:Init logger here
|
||||
//Init logger here
|
||||
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), dep.ID)
|
||||
jContext.logger = jlogger.New(logPath, config.GetLogLevel())
|
||||
if jContext.logger == nil {
|
||||
return nil, errors.New("failed to initialize job logger")
|
||||
}
|
||||
|
||||
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
|
||||
if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func {
|
||||
if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok {
|
||||
@ -54,6 +63,9 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if jContext.opCommandFunc == nil {
|
||||
return nil, errors.New("failed to inject opCommandFunc")
|
||||
}
|
||||
|
||||
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
|
||||
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
|
||||
@ -63,6 +75,10 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if jContext.checkInFunc == nil {
|
||||
return nil, errors.New("failed to inject checkInFunc")
|
||||
}
|
||||
|
||||
return jContext, nil
|
||||
}
|
||||
|
||||
@ -95,3 +111,8 @@ func (c *Context) OPCommand() (string, bool) {
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
||||
//GetLogger returns the logger
|
||||
func (c *Context) GetLogger() logger.Interface {
|
||||
return c.logger
|
||||
}
|
||||
|
101
src/jobservice_v2/job/impl/logger/job_logger.go
Normal file
101
src/jobservice_v2/job/impl/logger/job_logger.go
Normal file
@ -0,0 +1,101 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/logger"
|
||||
)
|
||||
|
||||
//JobLogger is an implementation of logger.Interface.
|
||||
//It used in the job to output logs to the logfile.
|
||||
type JobLogger struct {
|
||||
backendLogger *log.Logger
|
||||
}
|
||||
|
||||
//New logger
|
||||
//nil might be returned
|
||||
func New(logPath string, level string) logger.Interface {
|
||||
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
logLevel := parseLevel(level)
|
||||
backendLogger := log.New(f, log.NewTextFormatter(), logLevel)
|
||||
|
||||
return &JobLogger{
|
||||
backendLogger: backendLogger,
|
||||
}
|
||||
}
|
||||
|
||||
//Debug ...
|
||||
func (jl *JobLogger) Debug(v ...interface{}) {
|
||||
jl.backendLogger.Debug(v...)
|
||||
}
|
||||
|
||||
//Debugf with format
|
||||
func (jl *JobLogger) Debugf(format string, v ...interface{}) {
|
||||
jl.backendLogger.Debugf(format, v...)
|
||||
}
|
||||
|
||||
//Info ...
|
||||
func (jl *JobLogger) Info(v ...interface{}) {
|
||||
jl.backendLogger.Info(v...)
|
||||
}
|
||||
|
||||
//Infof with format
|
||||
func (jl *JobLogger) Infof(format string, v ...interface{}) {
|
||||
jl.backendLogger.Infof(format, v...)
|
||||
}
|
||||
|
||||
//Warning ...
|
||||
func (jl *JobLogger) Warning(v ...interface{}) {
|
||||
jl.backendLogger.Warning(v...)
|
||||
}
|
||||
|
||||
//Warningf with format
|
||||
func (jl *JobLogger) Warningf(format string, v ...interface{}) {
|
||||
jl.backendLogger.Warningf(format, v...)
|
||||
}
|
||||
|
||||
//Error ...
|
||||
func (jl *JobLogger) Error(v ...interface{}) {
|
||||
jl.backendLogger.Error(v...)
|
||||
}
|
||||
|
||||
//Errorf with format
|
||||
func (jl *JobLogger) Errorf(format string, v ...interface{}) {
|
||||
jl.backendLogger.Errorf(format, v...)
|
||||
}
|
||||
|
||||
//Fatal error
|
||||
func (jl *JobLogger) Fatal(v ...interface{}) {
|
||||
jl.backendLogger.Fatal(v...)
|
||||
}
|
||||
|
||||
//Fatalf error
|
||||
func (jl *JobLogger) Fatalf(format string, v ...interface{}) {
|
||||
jl.backendLogger.Fatalf(format, v...)
|
||||
}
|
||||
|
||||
func parseLevel(lvl string) log.Level {
|
||||
|
||||
var level = log.WarningLevel
|
||||
|
||||
switch strings.ToLower(lvl) {
|
||||
case "debug":
|
||||
level = log.DebugLevel
|
||||
case "info":
|
||||
level = log.InfoLevel
|
||||
case "warning":
|
||||
level = log.WarningLevel
|
||||
case "error":
|
||||
level = log.ErrorLevel
|
||||
case "fatal":
|
||||
level = log.FatalLevel
|
||||
default:
|
||||
}
|
||||
|
||||
return level
|
||||
}
|
@ -47,42 +47,47 @@ func (rj *ReplicationJob) Validate(params map[string]interface{}) error {
|
||||
|
||||
//Run the replication logic here.
|
||||
func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
logger := ctx.GetLogger()
|
||||
|
||||
defer func() {
|
||||
logger.Info("I'm finished, exit!")
|
||||
fmt.Println("I'm finished, exit!")
|
||||
}()
|
||||
fmt.Println("=======Replication job running=======")
|
||||
fmt.Printf("params: %#v\n", params)
|
||||
fmt.Printf("context: %#v\n", ctx)
|
||||
logger.Info("=======Replication job running=======")
|
||||
logger.Infof("params: %#v\n", params)
|
||||
logger.Infof("context: %#v\n", ctx)
|
||||
|
||||
/*if 1 != 0 {
|
||||
return errors.New("I suicide")
|
||||
}*/
|
||||
|
||||
fmt.Println("check in 30%")
|
||||
logger.Info("check in 30%")
|
||||
ctx.Checkin("30%")
|
||||
time.Sleep(5 * time.Second)
|
||||
fmt.Println("check in 60%")
|
||||
time.Sleep(2 * time.Second)
|
||||
logger.Warning("check in 60%")
|
||||
ctx.Checkin("60%")
|
||||
time.Sleep(5 * time.Second)
|
||||
fmt.Println("check in 100%")
|
||||
time.Sleep(2 * time.Second)
|
||||
logger.Debug("check in 100%")
|
||||
ctx.Checkin("100%")
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
//HOLD ON FOR A WHILE
|
||||
fmt.Println("Holding for 20 sec")
|
||||
<-time.After(20 * time.Second)
|
||||
fmt.Println("I'm back, check if I'm stopped/cancelled")
|
||||
logger.Error("Holding for 20 sec")
|
||||
<-time.After(10 * time.Second)
|
||||
//logger.Fatal("I'm back, check if I'm stopped/cancelled")
|
||||
|
||||
if cmd, ok := ctx.OPCommand(); ok {
|
||||
fmt.Printf("cmd=%s\n", cmd)
|
||||
logger.Infof("cmd=%s\n", cmd)
|
||||
if cmd == opm.CtlCommandCancel {
|
||||
fmt.Println("exit for receiving cancel signal")
|
||||
logger.Info("exit for receiving cancel signal")
|
||||
return errs.JobCancelledError()
|
||||
}
|
||||
|
||||
fmt.Println("exit for receiving stop signal")
|
||||
logger.Info("exit for receiving stop signal")
|
||||
return errs.JobStoppedError()
|
||||
}
|
||||
|
||||
fmt.Println("I'm here")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
36
src/jobservice_v2/logger/interface.go
Normal file
36
src/jobservice_v2/logger/interface.go
Normal file
@ -0,0 +1,36 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package logger
|
||||
|
||||
//Interface for logger.
|
||||
type Interface interface {
|
||||
//For debuging
|
||||
Debug(v ...interface{})
|
||||
|
||||
//For debuging with format
|
||||
Debugf(format string, v ...interface{})
|
||||
|
||||
//For logging info
|
||||
Info(v ...interface{})
|
||||
|
||||
//For logging info with format
|
||||
Infof(format string, v ...interface{})
|
||||
|
||||
//For warning
|
||||
Warning(v ...interface{})
|
||||
|
||||
//For warning with format
|
||||
Warningf(format string, v ...interface{})
|
||||
|
||||
//For logging error
|
||||
Error(v ...interface{})
|
||||
|
||||
//For logging error with format
|
||||
Errorf(format string, v ...interface{})
|
||||
|
||||
//For fatal error
|
||||
Fatal(v ...interface{})
|
||||
|
||||
//For fatal error with error
|
||||
Fatalf(format string, v ...interface{})
|
||||
}
|
@ -12,6 +12,8 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/period"
|
||||
|
||||
"github.com/robfig/cron"
|
||||
@ -543,7 +545,7 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
|
||||
}
|
||||
|
||||
if vals == nil || len(vals) == 0 {
|
||||
return models.JobStats{}, fmt.Errorf("job '%s' is not found", jobID)
|
||||
return models.JobStats{}, errs.NoObjectFoundError(fmt.Sprintf("job '%s'", jobID))
|
||||
}
|
||||
|
||||
res := models.JobStats{
|
||||
|
@ -30,16 +30,29 @@ func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManag
|
||||
|
||||
//Run the job
|
||||
func (rj *RedisJob) Run(j *work.Job) error {
|
||||
var cancelled = false
|
||||
var (
|
||||
cancelled = false
|
||||
buildContextFailed = false
|
||||
runningJob job.Interface
|
||||
err error
|
||||
execContext env.JobContext
|
||||
)
|
||||
|
||||
execContext, err := rj.buildContext(j)
|
||||
execContext, err = rj.buildContext(j)
|
||||
if err != nil {
|
||||
return err
|
||||
buildContextFailed = true
|
||||
goto FAILED //no need to retry
|
||||
}
|
||||
|
||||
runningJob := Wrap(rj.job)
|
||||
//Wrap job
|
||||
runningJob = Wrap(rj.job)
|
||||
|
||||
defer func() {
|
||||
if rj.shouldDisableRetry(runningJob, j, cancelled) {
|
||||
if err == nil {
|
||||
return //nothing need to do
|
||||
}
|
||||
|
||||
if buildContextFailed || rj.shouldDisableRetry(runningJob, j, cancelled) {
|
||||
j.Fails = 10000000000 //Make it big enough to avoid retrying
|
||||
now := time.Now().Unix()
|
||||
go func() {
|
||||
@ -75,6 +88,7 @@ func (rj *RedisJob) Run(j *work.Job) error {
|
||||
return err //need to resume
|
||||
}
|
||||
|
||||
FAILED:
|
||||
rj.jobFailed(j.ID)
|
||||
return err
|
||||
}
|
||||
|
@ -31,8 +31,7 @@ type Bootstrap struct{}
|
||||
//Return error if meet any problems.
|
||||
func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
|
||||
//Load configurations
|
||||
cfg := config.Configuration{}
|
||||
if err := cfg.Load(configFile, detectEnv); err != nil {
|
||||
if err := config.DefaultConfig.Load(configFile, detectEnv); err != nil {
|
||||
log.Errorf("Failed to load configurations with error: %s\n", err)
|
||||
return
|
||||
}
|
||||
@ -58,16 +57,16 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
|
||||
|
||||
//Start the pool
|
||||
var backendPool pool.Interface
|
||||
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
||||
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, cfg)
|
||||
if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
||||
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
|
||||
}
|
||||
|
||||
//Initialize controller
|
||||
ctl := core.NewController(backendPool)
|
||||
|
||||
//Start the API server
|
||||
apiServer := bs.loadAndRunAPIServer(rootContext, cfg, ctl)
|
||||
log.Infof("Server is started at %s:%d with %s", "", cfg.Port, cfg.Protocol)
|
||||
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
|
||||
log.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
|
||||
|
||||
//Block here
|
||||
sig := make(chan os.Signal, 1)
|
||||
@ -107,7 +106,7 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
|
||||
}
|
||||
|
||||
//Load and run the API server.
|
||||
func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configuration, ctl *core.Controller) *api.Server {
|
||||
func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configuration, ctl *core.Controller) *api.Server {
|
||||
//Initialized API server
|
||||
handler := api.NewDefaultHandler(ctl)
|
||||
router := api.NewBaseRouter(handler)
|
||||
@ -128,7 +127,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg config.Configurat
|
||||
}
|
||||
|
||||
//Load and run the worker pool
|
||||
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg config.Configuration) pool.Interface {
|
||||
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface {
|
||||
redisPoolCfg := pool.RedisPoolConfig{
|
||||
RedisHost: cfg.PoolConfig.RedisPoolCfg.Host,
|
||||
RedisPort: cfg.PoolConfig.RedisPoolCfg.Port,
|
||||
|
@ -24,14 +24,34 @@ func ReadEnv(key string) string {
|
||||
//FileExists check if the specified exists.
|
||||
func FileExists(file string) bool {
|
||||
if !IsEmptyStr(file) {
|
||||
if _, err := os.Stat(file); err == nil {
|
||||
_, err := os.Stat(file)
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
//DirExists check if the specified dir exists
|
||||
func DirExists(path string) bool {
|
||||
if IsEmptyStr(path) {
|
||||
return false
|
||||
}
|
||||
|
||||
f, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return f.IsDir()
|
||||
}
|
||||
|
||||
//IsValidPort check if port is valid.
|
||||
func IsValidPort(port uint) bool {
|
||||
return port != 0 && port < 65536
|
||||
|
Loading…
Reference in New Issue
Block a user