Update README of job service to reflect latest updates

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2018-11-07 16:31:07 +08:00
parent e9194ca107
commit e6d4c024ee
9 changed files with 183 additions and 33 deletions

View File

@ -23,11 +23,9 @@ import (
"time"
)
var logger = New(os.Stdout, NewTextFormatter(), WarningLevel)
var logger = New(os.Stdout, NewTextFormatter(), WarningLevel, 4)
func init() {
logger.callDepth = 4
lvl := os.Getenv("LOG_LEVEL")
if len(lvl) == 0 {
logger.SetLevel(InfoLevel)
@ -41,7 +39,6 @@ func init() {
}
logger.SetLevel(level)
}
// Logger provides a struct with fields that describe the details of logger.
@ -55,12 +52,22 @@ type Logger struct {
}
// New returns a customized Logger
func New(out io.Writer, fmtter Formatter, lvl Level) *Logger {
func New(out io.Writer, fmtter Formatter, lvl Level, options ...interface{}) *Logger {
// Default set to be 3
depth := 3
// If passed in as option, then reset depth
// Use index 0
if len(options) > 0 {
d, ok := options[0].(int)
if ok && d > 0 {
depth = d
}
}
return &Logger{
out: out,
fmtter: fmtter,
lvl: lvl,
callDepth: 6,
callDepth: depth,
}
}

View File

@ -30,8 +30,8 @@ With job service, you can:
* Cancel a specified job.
* Retry a specified job (This should be a failed job and match the retrying criteria).
* Get stats of specified job (no list jobs function).
* Get execution log of specified job.
* Check the health status of job service.
* Get execution log of specified job (It depends on the logger implementation).
* Check the health status of job service.(No authentication required, it can be used as health check endpoint)
## Architecture
@ -47,7 +47,7 @@ Components:
* Controller: The core of job service. Responsible for coordinating the whole flow of job service.
* Job Launcher : Launch the jobs except `Periodic` ones.
* Scheduler: Schedules the `Periodic` jobs.
* Logger: Catches and write the job execution logs to files.
* Logger: A flexible logger framework. It will catch and write the job execution logs to the configured backends.
* Stats Manager: Maintains the status and stats of jobs as well as status hooks.
* Data Backend: Define storage methods to store the additional info.
* Pool Driver: A interface layer to broke the functions of upstream job queue framework to upper layers.
@ -108,7 +108,7 @@ Just pay attention, your main logic should be written in the `Run` method.
A job context will be provided when executed the `Run` logic. With this context, you can
* Get a logger handle if you want to output the execution log to the log file.
* Get a logger handle if you want to output the execution log to the log backends.
* Retrieve the system context reference.
* Get job operation signal if your job supports `stop` and `cancel`.
* Get the `checkin` func to check in message.
@ -261,6 +261,110 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
}
```
## Job Execution
Job execution is used to track the jobs which are related to a specified job, like parent and children jobs. If one job has executions, the following two extra properties will be appended to the job stats.
```json
{
"job": {
"executions": ["uuid-sub-job"],
"multiple_executions": true
}
}
```
For the job execution/sub job, there will be an extra property `upstream_job_id` pointing to id of the upstream (/parent) job.
```json
{
"job": {
"upstream_job_id": "parent-id"
}
}
```
Under that situation, the flag `multiple_executions` will be set to be `true`. The list `executions` will contain all the ids of the executions (/sub jobs).
### General job
Any jobs can launch new jobs through the launch function in the job context. All those jobs will be tracked as sub jobs (executions) of the caller job.
```go
func (j *Job) Run(ctx env.JobContext, params map[string]interface{}) error{
// ...
subJob, err := ctx.LaunchJob(models.JobRequest{})
// ...
return nil
}
```
### Periodic job
The job launched with `Periodic` kind is actually a scheduled job template which will be not run directly. The real running job will be created by cloning the configurations from the job template and run. And then each _periodic job_ will have multiple job executions with independent id and each _job execution_ will link to the `Periodic` job by the `upstream_job_id`.
### Logger
There are two loggers here. One is for job service itself and another one is for the running jobs. Each logger can configure multi logger backends.
Each backend logger is identified by an unique name which will be used in the logger configurations to enable the corresponding loggers. Meanwhile, each backend logger MUST implement the `logger.Interface`. A logger can also support (optional):
* _sweeper_: Sweep the outdated logs. A sweeper MUST implement `sweeper.Interface`
* _getter_: Get the specified log data. A getter MUST implement `getter.Interface`
All the backend loggers SHOULD onboard via the static logger registry.
```go
// knownLoggers is a static logger registry.
// All the implemented loggers (w/ sweeper) should be registered
// with an unique name in this registry. Then they can be used to
// log info.
var knownLoggers = map[string]*Declaration{
// File logger
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
// STD output(both stdout and stderr) logger
LoggerNameStdOutput: {StdFactory, nil, nil, true},
}
```
So far, only the following two backends are supported:
* **STD_OUTPUT**: Output the log to the std stream (stdout/stderr)
* **FILE**: Output the log to the log files
* sweeper supports
* getter supports
### Configure loggers
Logger configuration options:
| Option | Description |
|--------------|---------------------------|
| loggers[x].name | The unique name of the logger backend |
| loggers[x].level| The logger level of the logger backend|
| loggers[x].settings | A hash map to pass extra settings of the logger backend. Depends on the implementation of the backend.|
| loggers[x].sweeper.duration| The duration of the sweeper looping |
| loggers[x].sweeper.settings| A hash map to pass extra settings of the sweeper. Depends on the implementation of sweeper. |
An example:
```yaml
#Loggers
loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
```
## Configuration
The following configuration options are supported:
@ -275,9 +379,8 @@ The following configuration options are supported:
| worker_pool.backend | The job data persistent backend driver. So far, only redis supported| JOB_SERVICE_POOL_BACKEND |
| worker_pool.redis_pool.redis_url | The redis url if backend is redis| JOB_SERVICE_POOL_REDIS_URL |
| worker_pool.redis_pool.namespace | The namespace used in redis| JOB_SERVICE_POOL_REDIS_NAMESPACE |
| logger.path | The file path to keep the log files| JOB_SERVICE_LOGGER_BASE_PATH |
| logger.level | Log level setting | JOB_SERVICE_LOGGER_LEVEL |
| logger.archive_period | The days to sweep the outdated logs | JOB_SERVICE_LOGGER_ARCHIVE_PERIOD |
| loggers | Loggers for job service itself. Refer to [Configure loggers](#configure-loggers)| |
| job_loggers | Loggers for the running jobs. Refer to [Configure loggers](#configure-loggers) | |
| admin_server | The harbor admin server endpoint which used to retrieve Harbor configures| ADMINSERVER_URL |
### Sample
@ -304,17 +407,29 @@ worker_pool:
redis_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index
#or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379"
redis_url: "localhost:6379"
namespace: "harbor_job_service"
#Logger for job
logger:
path: "/Users/szou/tmp/job_logs"
level: "INFO"
archive_period: 1 #days
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "DEBUG"
#Admin server endpoint
admin_server: "http://10.160.178.186:9010/"
admin_server: "http://adminserver:9010/"
```
## API
@ -368,7 +483,8 @@ The expected secret is passed to job service by the ENV variable `CORE_SECRET`.
"unique": false,
"ref_link": "/api/v1/jobs/uuid-job",
"enqueue_time": "2018-10-10 12:00:00",
"update_time": "2018-10-10 13:00:00"
"update_time": "2018-10-10 13:00:00",
"multiple_executions": false // To indicate if the job has sub executions
}
}
```
@ -406,7 +522,9 @@ The expected secret is passed to job service by the ENV variable `CORE_SECRET`.
"check_in": "check in message", // if check in message
"check_in_at": 1539164889, // if check in message
"die_at": 0,
"hook_status": "http://status-check.com"
"hook_status": "http://status-check.com",
"executions": ["uuid-sub-job"], // the ids of sub executions of the job
"multiple_executions": true
}
}
```

View File

@ -236,6 +236,13 @@ func setLoggers(setter func(lg logger.Interface), jobID string) error {
// Init job loggers here
lOptions := []logger.Option{}
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
// For running job, the depth should be 5
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameStdOutput {
if lc.Settings == nil {
lc.Settings = map[string]interface{}{}
}
lc.Settings["depth"] = 5
}
if lc.Name == logger.LoggerNameFile {
// Need extra param
fSettings := map[string]interface{}{}

View File

@ -15,13 +15,13 @@ type FileLogger struct {
// NewFileLogger crates a new file logger
// nil might be returned
func NewFileLogger(level string, logPath string) (*FileLogger, error) {
func NewFileLogger(level string, logPath string, depth int) (*FileLogger, error) {
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
logLevel := parseLevel(level)
backendLogger := log.New(f, log.NewTextFormatter(), logLevel)
backendLogger := log.New(f, log.NewTextFormatter(), logLevel, depth)
return &FileLogger{
backendLogger: backendLogger,

View File

@ -8,14 +8,14 @@ import (
// Test file logger creation with non existing file path
func TestFileLoggerCreation(t *testing.T) {
if _, err := NewFileLogger("DEBUG", "/non-existing/a.log"); err == nil {
if _, err := NewFileLogger("DEBUG", "/non-existing/a.log", 4); err == nil {
t.Fatalf("expect non nil error but got nil when creating file logger with non existing path")
}
}
// Test file logger
func TestFileLogger(t *testing.T) {
l, err := NewFileLogger("DEBUG", path.Join(os.TempDir(), "TestFileLogger.log"))
l, err := NewFileLogger("DEBUG", path.Join(os.TempDir(), "TestFileLogger.log"), 4)
if err != nil {
t.Fatal(err)
}

View File

@ -20,13 +20,13 @@ type StdOutputLogger struct {
}
// NewStdOutputLogger creates a new std output logger
func NewStdOutputLogger(level string, output string) *StdOutputLogger {
func NewStdOutputLogger(level string, output string, depth int) *StdOutputLogger {
logLevel := parseLevel(level)
logStream := os.Stdout
if output == StdErr {
logStream = os.Stderr
}
backendLogger := log.New(logStream, log.NewTextFormatter(), logLevel)
backendLogger := log.New(logStream, log.NewTextFormatter(), logLevel, depth)
return &StdOutputLogger{
backendLogger: backendLogger,

View File

@ -4,7 +4,7 @@ import "testing"
// Test std logger
func TestStdLogger(t *testing.T) {
l := NewStdOutputLogger("DEBUG", StdErr)
l := NewStdOutputLogger("DEBUG", StdErr, 4)
l.Debug("TestStdLogger")
l.Debugf("%s", "TestStdLogger")
l.Info("TestStdLogger")

View File

@ -180,6 +180,14 @@ func Init(ctx context.Context) error {
sOptions := []Option{}
for _, lc := range config.DefaultConfig.LoggerConfigs {
// Inject logger depth here for FILE and STD logger to avoid configuring it in the yaml
// For logger of job service itself, the depth should be 6
if lc.Name == LoggerNameFile || lc.Name == LoggerNameStdOutput {
if lc.Settings == nil {
lc.Settings = map[string]interface{}{}
}
lc.Settings["depth"] = 6
}
options = append(options, BackendOption(lc.Name, lc.Level, lc.Settings))
if lc.Sweeper != nil {
sOptions = append(sOptions, SweeperOption(lc.Name, lc.Sweeper.Duration, lc.Sweeper.Settings))

View File

@ -12,7 +12,10 @@ type Factory func(options ...OptionItem) (Interface, error)
// FileFactory is factory of file logger
func FileFactory(options ...OptionItem) (Interface, error) {
var level, baseDir, fileName string
var (
level, baseDir, fileName string
depth int
)
for _, op := range options {
switch op.Field() {
case "level":
@ -21,6 +24,8 @@ func FileFactory(options ...OptionItem) (Interface, error) {
baseDir = op.String()
case "filename":
fileName = op.String()
case "depth":
depth = op.Int()
default:
}
@ -34,21 +39,26 @@ func FileFactory(options ...OptionItem) (Interface, error) {
return nil, errors.New("missing file name option of the file logger")
}
return backend.NewFileLogger(level, path.Join(baseDir, fileName))
return backend.NewFileLogger(level, path.Join(baseDir, fileName), depth)
}
// StdFactory is factory of std output logger.
func StdFactory(options ...OptionItem) (Interface, error) {
var level, output string
var (
level, output string
depth int
)
for _, op := range options {
switch op.Field() {
case "level":
level = op.String()
case "output":
output = op.String()
case "depth":
depth = op.Int()
default:
}
}
return backend.NewStdOutputLogger(level, output), nil
return backend.NewStdOutputLogger(level, output, depth), nil
}