Merge pull request #6215 from steven-zou/logger_framework_job_service

Build logger framework to support configurable loggers/sweepers/getters
This commit is contained in:
Steven Zou 2018-11-08 18:15:36 +08:00 committed by GitHub
commit b3c87673a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1892 additions and 757 deletions

View File

@ -20,10 +20,22 @@ worker_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index
redis_url: $redis_url
namespace: "harbor_job_service_namespace"
#Logger for job
logger:
path: "/var/log/jobs"
level: "INFO"
archive_period: 14 #days
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "INFO" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "INFO"
settings: # Customized settings of logger
base_dir: "/var/log/jobs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/var/log/jobs"
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "INFO"
#Admin server endpoint
admin_server: "http://adminserver:8080/"

View File

@ -23,11 +23,9 @@ import (
"time"
)
var logger = New(os.Stdout, NewTextFormatter(), WarningLevel)
var logger = New(os.Stdout, NewTextFormatter(), WarningLevel, 4)
func init() {
logger.callDepth = 4
lvl := os.Getenv("LOG_LEVEL")
if len(lvl) == 0 {
logger.SetLevel(InfoLevel)
@ -41,7 +39,6 @@ func init() {
}
logger.SetLevel(level)
}
// Logger provides a struct with fields that describe the details of logger.
@ -55,12 +52,22 @@ type Logger struct {
}
// New returns a customized Logger
func New(out io.Writer, fmtter Formatter, lvl Level) *Logger {
func New(out io.Writer, fmtter Formatter, lvl Level, options ...interface{}) *Logger {
// Default set to be 3
depth := 3
// If passed in as option, then reset depth
// Use index 0
if len(options) > 0 {
d, ok := options[0].(int)
if ok && d > 0 {
depth = d
}
}
return &Logger{
out: out,
fmtter: fmtter,
lvl: lvl,
callDepth: 3,
callDepth: depth,
}
}

View File

@ -30,8 +30,8 @@ With job service, you can:
* Cancel a specified job.
* Retry a specified job (This should be a failed job and match the retrying criteria).
* Get stats of specified job (no list jobs function).
* Get execution log of specified job.
* Check the health status of job service.
* Get execution log of specified job (It depends on the logger implementation).
* Check the health status of job service.(No authentication required, it can be used as health check endpoint)
## Architecture
@ -47,7 +47,7 @@ Components:
* Controller: The core of job service. Responsible for coordinating the whole flow of job service.
* Job Launcher : Launch the jobs except `Periodic` ones.
* Scheduler: Schedules the `Periodic` jobs.
* Logger: Catches and write the job execution logs to files.
* Logger: A flexible logger framework. It will catch and write the job execution logs to the configured backends.
* Stats Manager: Maintains the status and stats of jobs as well as status hooks.
* Data Backend: Define storage methods to store the additional info.
* Pool Driver: A interface layer to broke the functions of upstream job queue framework to upper layers.
@ -108,7 +108,7 @@ Just pay attention, your main logic should be written in the `Run` method.
A job context will be provided when executed the `Run` logic. With this context, you can
* Get a logger handle if you want to output the execution log to the log file.
* Get a logger handle if you want to output the execution log to the log backends.
* Retrieve the system context reference.
* Get job operation signal if your job supports `stop` and `cancel`.
* Get the `checkin` func to check in message.
@ -261,6 +261,110 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
}
```
## Job Execution
Job execution is used to track the jobs which are related to a specified job, like parent and children jobs. If one job has executions, the following two extra properties will be appended to the job stats.
```json
{
"job": {
"executions": ["uuid-sub-job"],
"multiple_executions": true
}
}
```
For the job execution/sub job, there will be an extra property `upstream_job_id` pointing to id of the upstream (/parent) job.
```json
{
"job": {
"upstream_job_id": "parent-id"
}
}
```
Under that situation, the flag `multiple_executions` will be set to be `true`. The list `executions` will contain all the ids of the executions (/sub jobs).
### General job
Any jobs can launch new jobs through the launch function in the job context. All those jobs will be tracked as sub jobs (executions) of the caller job.
```go
func (j *Job) Run(ctx env.JobContext, params map[string]interface{}) error{
// ...
subJob, err := ctx.LaunchJob(models.JobRequest{})
// ...
return nil
}
```
### Periodic job
The job launched with `Periodic` kind is actually a scheduled job template which will be not run directly. The real running job will be created by cloning the configurations from the job template and run. And then each _periodic job_ will have multiple job executions with independent id and each _job execution_ will link to the `Periodic` job by the `upstream_job_id`.
### Logger
There are two loggers here. One is for job service itself and another one is for the running jobs. Each logger can configure multi logger backends.
Each backend logger is identified by an unique name which will be used in the logger configurations to enable the corresponding loggers. Meanwhile, each backend logger MUST implement the `logger.Interface`. A logger can also support (optional):
* _sweeper_: Sweep the outdated logs. A sweeper MUST implement `sweeper.Interface`
* _getter_: Get the specified log data. A getter MUST implement `getter.Interface`
All the backend loggers SHOULD onboard via the static logger registry.
```go
// knownLoggers is a static logger registry.
// All the implemented loggers (w/ sweeper) should be registered
// with an unique name in this registry. Then they can be used to
// log info.
var knownLoggers = map[string]*Declaration{
// File logger
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
// STD output(both stdout and stderr) logger
LoggerNameStdOutput: {StdFactory, nil, nil, true},
}
```
So far, only the following two backends are supported:
* **STD_OUTPUT**: Output the log to the std stream (stdout/stderr)
* **FILE**: Output the log to the log files
* sweeper supports
* getter supports
### Configure loggers
Logger configuration options:
| Option | Description |
|--------------|---------------------------|
| loggers[x].name | The unique name of the logger backend |
| loggers[x].level| The logger level of the logger backend|
| loggers[x].settings | A hash map to pass extra settings of the logger backend. Depends on the implementation of the backend.|
| loggers[x].sweeper.duration| The duration of the sweeper looping |
| loggers[x].sweeper.settings| A hash map to pass extra settings of the sweeper. Depends on the implementation of sweeper. |
An example:
```yaml
#Loggers
loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
```
## Configuration
The following configuration options are supported:
@ -275,9 +379,8 @@ The following configuration options are supported:
| worker_pool.backend | The job data persistent backend driver. So far, only redis supported| JOB_SERVICE_POOL_BACKEND |
| worker_pool.redis_pool.redis_url | The redis url if backend is redis| JOB_SERVICE_POOL_REDIS_URL |
| worker_pool.redis_pool.namespace | The namespace used in redis| JOB_SERVICE_POOL_REDIS_NAMESPACE |
| logger.path | The file path to keep the log files| JOB_SERVICE_LOGGER_BASE_PATH |
| logger.level | Log level setting | JOB_SERVICE_LOGGER_LEVEL |
| logger.archive_period | The days to sweep the outdated logs | JOB_SERVICE_LOGGER_ARCHIVE_PERIOD |
| loggers | Loggers for job service itself. Refer to [Configure loggers](#configure-loggers)| |
| job_loggers | Loggers for the running jobs. Refer to [Configure loggers](#configure-loggers) | |
| admin_server | The harbor admin server endpoint which used to retrieve Harbor configures| ADMINSERVER_URL |
### Sample
@ -304,17 +407,29 @@ worker_pool:
redis_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index
#or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379"
redis_url: "localhost:6379"
namespace: "harbor_job_service"
#Logger for job
logger:
path: "/Users/szou/tmp/job_logs"
level: "INFO"
archive_period: 1 #days
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "DEBUG"
#Admin server endpoint
admin_server: "http://10.160.178.186:9010/"
admin_server: "http://adminserver:9010/"
```
## API
@ -368,7 +483,8 @@ The expected secret is passed to job service by the ENV variable `CORE_SECRET`.
"unique": false,
"ref_link": "/api/v1/jobs/uuid-job",
"enqueue_time": "2018-10-10 12:00:00",
"update_time": "2018-10-10 13:00:00"
"update_time": "2018-10-10 13:00:00",
"multiple_executions": false // To indicate if the job has sub executions
}
}
```
@ -406,7 +522,9 @@ The expected secret is passed to job service by the ENV variable `CORE_SECRET`.
"check_in": "check in message", // if check in message
"check_in_at": 1539164889, // if check in message
"die_at": 0,
"hook_status": "http://status-check.com"
"hook_status": "http://status-check.com",
"executions": ["uuid-sub-job"], // the ids of sub executions of the job
"multiple_executions": true
}
}
```

View File

@ -22,11 +22,23 @@ worker_pool:
redis_url: "localhost:6379"
namespace: "harbor_job_service"
#Logger for job
logger:
path: "~/tmp/job_logs"
level: "DEBUG"
archive_period: 1 #days
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
settings: # Customized settings of logger
base_dir: "tmp/job_logs"
sweeper:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "DEBUG"
#Admin server endpoint
admin_server: "http://adminserver:9010/"

View File

@ -28,19 +28,16 @@ import (
)
const (
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
jobServicePort = "JOB_SERVICE_PORT"
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
jobServiceRedisURL = "JOB_SERVICE_POOL_REDIS_URL"
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
jobServiceLoggerArchivePeriod = "JOB_SERVICE_LOGGER_ARCHIVE_PERIOD"
jobServiceCoreServerEndpoint = "CORE_URL"
jobServiceAuthSecret = "JOBSERVICE_SECRET"
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
jobServicePort = "JOB_SERVICE_PORT"
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
jobServiceRedisURL = "JOB_SERVICE_POOL_REDIS_URL"
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
jobServiceCoreServerEndpoint = "CORE_URL"
jobServiceAuthSecret = "JOBSERVICE_SECRET"
// JobServiceProtocolHTTPS points to the 'https' protocol
JobServiceProtocolHTTPS = "https"
@ -76,8 +73,11 @@ type Configuration struct {
// Configurations of worker pool
PoolConfig *PoolConfig `yaml:"worker_pool,omitempty"`
// Job logger configurations
JobLoggerConfigs []*LoggerConfig `yaml:"job_loggers,omitempty"`
// Logger configurations
LoggerConfig *LoggerConfig `yaml:"logger,omitempty"`
LoggerConfigs []*LoggerConfig `yaml:"loggers,omitempty"`
}
// HTTPSConfig keeps additional configurations when using https protocol
@ -100,11 +100,21 @@ type PoolConfig struct {
RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"`
}
// LoggerConfig keeps logger configurations.
// CustomizedSettings keeps the customized settings of logger
type CustomizedSettings map[string]interface{}
// LogSweeperConfig keeps settings of log sweeper
type LogSweeperConfig struct {
Duration int `yaml:"duration"`
Settings CustomizedSettings `yaml:"settings"`
}
// LoggerConfig keeps logger basic configurations.
type LoggerConfig struct {
BasePath string `yaml:"path"`
LogLevel string `yaml:"level"`
ArchivePeriod uint `yaml:"archive_period"`
Name string `yaml:"name"`
Level string `yaml:"level"`
Settings CustomizedSettings `yaml:"settings"`
Sweeper *LogSweeperConfig `yaml:"sweeper"`
}
// Load the configuration options from the specified yaml file.
@ -151,33 +161,6 @@ func (c *Configuration) Load(yamlFilePath string, detectEnv bool) error {
return c.validate()
}
// GetLogBasePath returns the log base path config
func GetLogBasePath() string {
if DefaultConfig.LoggerConfig != nil {
return DefaultConfig.LoggerConfig.BasePath
}
return ""
}
// GetLogLevel returns the log level
func GetLogLevel() string {
if DefaultConfig.LoggerConfig != nil {
return DefaultConfig.LoggerConfig.LogLevel
}
return ""
}
// GetLogArchivePeriod returns the archive period
func GetLogArchivePeriod() uint {
if DefaultConfig.LoggerConfig != nil {
return DefaultConfig.LoggerConfig.ArchivePeriod
}
return 1 // return default
}
// GetAuthSecret get the auth secret from the env
func GetAuthSecret() string {
return utils.ReadEnv(jobServiceAuthSecret)
@ -268,31 +251,6 @@ func (c *Configuration) loadEnvs() {
}
}
// logger
loggerPath := utils.ReadEnv(jobServiceLoggerBasePath)
if !utils.IsEmptyStr(loggerPath) {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.BasePath = loggerPath
}
loggerLevel := utils.ReadEnv(jobServiceLoggerLevel)
if !utils.IsEmptyStr(loggerLevel) {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.LogLevel = loggerLevel
}
archivePeriod := utils.ReadEnv(jobServiceLoggerArchivePeriod)
if !utils.IsEmptyStr(archivePeriod) {
if period, err := strconv.Atoi(archivePeriod); err == nil {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.ArchivePeriod = uint(period)
}
}
// admin server
if coreServer := utils.ReadEnv(jobServiceCoreServerEndpoint); !utils.IsEmptyStr(coreServer) {
c.AdminServer = coreServer
@ -357,21 +315,14 @@ func (c *Configuration) validate() error {
}
}
if c.LoggerConfig == nil {
return errors.New("missing logger config")
// Job service loggers
if len(c.LoggerConfigs) == 0 {
return errors.New("missing logger config of job service")
}
if !utils.DirExists(c.LoggerConfig.BasePath) {
return errors.New("logger path should be an existing dir")
}
validLevels := "DEBUG,INFO,WARNING,ERROR,FATAL"
if !strings.Contains(validLevels, c.LoggerConfig.LogLevel) {
return fmt.Errorf("logger level can only be one of: %s", validLevels)
}
if c.LoggerConfig.ArchivePeriod == 0 {
return fmt.Errorf("logger archive period should be greater than 0")
// Job loggers
if len(c.JobLoggerConfigs) == 0 {
return errors.New("missing logger config of job")
}
if _, err := url.Parse(c.AdminServer); err != nil {

View File

@ -26,24 +26,13 @@ func TestConfigLoadingFailed(t *testing.T) {
}
func TestConfigLoadingSucceed(t *testing.T) {
if err := CreateLogDir(); err != nil {
t.Fatal(err)
}
cfg := &Configuration{}
if err := cfg.Load("../config_test.yml", false); err != nil {
t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
}
if err := RemoveLogDir(); err != nil {
t.Fatal(err)
}
}
func TestConfigLoadingWithEnv(t *testing.T) {
if err := CreateLogDir(); err != nil {
t.Error(err)
}
setENV()
cfg := &Configuration{}
@ -52,68 +41,74 @@ func TestConfigLoadingWithEnv(t *testing.T) {
}
if cfg.Protocol != "https" {
t.Fatalf("expect protocol 'https', but got '%s'\n", cfg.Protocol)
t.Errorf("expect protocol 'https', but got '%s'\n", cfg.Protocol)
}
if cfg.Port != 8989 {
t.Fatalf("expect port 8989 but got '%d'\n", cfg.Port)
t.Errorf("expect port 8989 but got '%d'\n", cfg.Port)
}
if cfg.PoolConfig.WorkerCount != 8 {
t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
t.Errorf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
}
if cfg.PoolConfig.RedisPoolCfg.RedisURL != "redis://arbitrary_username:password@8.8.8.8:6379/0" {
t.Fatalf("expect redis URL 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.RedisURL)
t.Errorf("expect redis URL 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.RedisURL)
}
if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" {
t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
t.Errorf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
}
if cfg.LoggerConfig.BasePath != "/tmp" {
t.Fatalf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath)
if GetAuthSecret() != "js_secret" {
t.Errorf("expect auth secret 'js_secret' but got '%s'", GetAuthSecret())
}
if cfg.LoggerConfig.LogLevel != "DEBUG" {
t.Fatalf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel)
}
if cfg.LoggerConfig.ArchivePeriod != 5 {
t.Fatalf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod)
if GetUIAuthSecret() != "core_secret" {
t.Errorf("expect auth secret 'core_secret' but got '%s'", GetUIAuthSecret())
}
unsetENV()
if err := RemoveLogDir(); err != nil {
t.Fatal(err)
}
}
func TestDefaultConfig(t *testing.T) {
if err := CreateLogDir(); err != nil {
t.Fatal(err)
}
if err := DefaultConfig.Load("../config_test.yml", true); err != nil {
t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
}
if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888" {
t.Fatalf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint)
}
if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" {
t.Fatalf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath)
}
if lvl := GetLogLevel(); lvl != "INFO" {
t.Fatalf("expect default logger level 'INFO' but got '%s'\n", lvl)
}
if period := GetLogArchivePeriod(); period != 1 {
t.Fatalf("expect default log archive period 1 but got '%d'\n", period)
t.Errorf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint)
}
redisURL := DefaultConfig.PoolConfig.RedisPoolCfg.RedisURL
if redisURL != "redis://redis:6379" {
t.Fatalf("expect redisURL '%s' but got '%s'\n", "redis://redis:6379", redisURL)
if redisURL != "redis://localhost:6379" {
t.Errorf("expect redisURL '%s' but got '%s'\n", "redis://localhost:6379", redisURL)
}
if err := RemoveLogDir(); err != nil {
t.Fatal(err)
if len(DefaultConfig.JobLoggerConfigs) == 0 {
t.Errorf("expect 2 job loggers configured but got %d", len(DefaultConfig.JobLoggerConfigs))
}
if len(DefaultConfig.LoggerConfigs) == 0 {
t.Errorf("expect 1 loggers configured but got %d", len(DefaultConfig.LoggerConfigs))
}
// Only verify the complicated one
theLogger := DefaultConfig.JobLoggerConfigs[1]
if theLogger.Name != "FILE" {
t.Fatalf("expect FILE logger but got %s", theLogger.Name)
}
if theLogger.Level != "INFO" {
t.Errorf("expect INFO log level of FILE logger but got %s", theLogger.Level)
}
if len(theLogger.Settings) == 0 {
t.Errorf("expect extra settings but got nothing")
}
if theLogger.Settings["base_dir"] != "/tmp/job_logs" {
t.Errorf("expect extra setting base_dir to be '/tmp/job_logs' but got %s", theLogger.Settings["base_dir"])
}
if theLogger.Sweeper == nil {
t.Fatalf("expect non nil sweeper of FILE logger but got nil")
}
if theLogger.Sweeper.Duration != 5 {
t.Errorf("expect sweep duration to be 5 but got %d", theLogger.Sweeper.Duration)
}
if theLogger.Sweeper.Settings["work_dir"] != "/tmp/job_logs" {
t.Errorf("expect work dir of sweeper of FILE logger to be '/tmp/job_logs' but got %s", theLogger.Sweeper.Settings["work_dir"])
}
}
@ -126,9 +121,8 @@ func setENV() {
os.Setenv("JOB_SERVICE_POOL_WORKERS", "8")
os.Setenv("JOB_SERVICE_POOL_REDIS_URL", "8.8.8.8:6379,100,password,0")
os.Setenv("JOB_SERVICE_POOL_REDIS_NAMESPACE", "ut_namespace")
os.Setenv("JOB_SERVICE_LOGGER_BASE_PATH", "/tmp")
os.Setenv("JOB_SERVICE_LOGGER_LEVEL", "DEBUG")
os.Setenv("JOB_SERVICE_LOGGER_ARCHIVE_PERIOD", "5")
os.Setenv("JOBSERVICE_SECRET", "js_secret")
os.Setenv("CORE_SECRET", "core_secret")
}
func unsetENV() {
@ -140,15 +134,6 @@ func unsetENV() {
os.Unsetenv("JOB_SERVICE_POOL_WORKERS")
os.Unsetenv("JOB_SERVICE_POOL_REDIS_URL")
os.Unsetenv("JOB_SERVICE_POOL_REDIS_NAMESPACE")
os.Unsetenv("JOB_SERVICE_LOGGER_BASE_PATH")
os.Unsetenv("JOB_SERVICE_LOGGER_LEVEL")
os.Unsetenv("JOB_SERVICE_LOGGER_ARCHIVE_PERIOD")
}
func CreateLogDir() error {
return os.MkdirAll("/tmp/job_logs", 0755)
}
func RemoveLogDir() error {
return os.Remove("/tmp/job_logs")
os.Unsetenv("JOBSERVICE_SECRET")
os.Unsetenv("CORE_SECRET")
}

View File

@ -8,7 +8,7 @@ https_config:
key: "../server.key"
#Server listening port
port: 9443
port: 9444
#Worker pool
worker_pool:
@ -19,14 +19,26 @@ worker_pool:
redis_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index
#or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379"
redis_url: "localhost:6379"
namespace: "harbor_job_service"
#Logger for job
logger:
path: "/tmp/job_logs"
level: "INFO"
archive_period: 1 #days
#Loggers for the running job
job_loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "INFO"
settings: # Customized settings of logger
base_dir: "/tmp/job_logs"
sweeper:
duration: 5 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
#Loggers for the job service
loggers:
- name: "STD_OUTPUT" # Same with above
level: "DEBUG"
#Admin server endpoint
admin_server: "http://127.0.0.1:8888"

View File

@ -17,10 +17,9 @@ package core
import (
"errors"
"fmt"
"io/ioutil"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/pool"
@ -141,12 +140,7 @@ func (c *Controller) GetJobLogData(jobID string) ([]byte, error) {
return nil, errors.New("empty job ID")
}
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), jobID)
if !utils.FileExists(logPath) {
return nil, errs.NoObjectFoundError(fmt.Sprintf("%s.log", jobID))
}
logData, err := ioutil.ReadFile(logPath)
logData, err := logger.Retrieve(jobID)
if err != nil {
return nil, err
}

View File

@ -17,8 +17,6 @@ import (
"errors"
"testing"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/models"
)
@ -141,11 +139,7 @@ func TestGetJobLogData(t *testing.T) {
pool := &fakePool{}
c := NewController(pool)
if _, err := c.GetJobLogData("fake_ID"); err != nil {
if !errs.IsObjectNotFoundError(err) {
t.Errorf("expect object not found error but got '%s'\n", err)
}
} else {
if _, err := c.GetJobLogData("fake_ID"); err == nil {
t.Fatal("expect error but got nil")
}
}

View File

@ -29,7 +29,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
jlogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger"
"github.com/goharbor/harbor/src/jobservice/logger"
jmodel "github.com/goharbor/harbor/src/jobservice/models"
)
@ -124,11 +123,11 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
jContext.properties[k] = v
}
// Init logger here
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), dep.ID)
jContext.logger = jlogger.New(logPath, config.GetLogLevel())
if jContext.logger == nil {
return nil, errors.New("failed to initialize job logger")
// Set loggers for job
if err := setLoggers(func(lg logger.Interface) {
jContext.logger = lg
}, dep.ID); err != nil {
return nil, err
}
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
@ -227,3 +226,44 @@ func getDBFromConfig(cfg map[string]interface{}) *models.Database {
return database
}
// create loggers based on the configurations and set it to the job executing context.
func setLoggers(setter func(lg logger.Interface), jobID string) error {
if setter == nil {
return errors.New("missing setter func")
}
// Init job loggers here
lOptions := []logger.Option{}
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
// For running job, the depth should be 5
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameStdOutput {
if lc.Settings == nil {
lc.Settings = map[string]interface{}{}
}
lc.Settings["depth"] = 5
}
if lc.Name == logger.LoggerNameFile {
// Need extra param
fSettings := map[string]interface{}{}
for k, v := range lc.Settings {
// Copy settings
fSettings[k] = v
}
// Append file name param
fSettings["filename"] = fmt.Sprintf("%s.log", jobID)
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, fSettings))
} else {
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, lc.Settings))
}
}
// Get logger for the job
lg, err := logger.GetLogger(lOptions...)
if err != nil {
return fmt.Errorf("initialize job logger error: %s", err)
}
setter(lg)
return nil
}

View File

@ -17,13 +17,10 @@ package impl
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
jlogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger"
"github.com/goharbor/harbor/src/jobservice/logger"
jmodel "github.com/goharbor/harbor/src/jobservice/models"
)
@ -72,11 +69,11 @@ func (c *DefaultContext) Build(dep env.JobData) (env.JobContext, error) {
}
}
// Init logger here
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), dep.ID)
jContext.logger = jlogger.New(logPath, config.GetLogLevel())
if jContext.logger == nil {
return nil, errors.New("failed to initialize job logger")
// Set loggers for job
if err := setLoggers(func(lg logger.Interface) {
jContext.logger = lg
}, dep.ID); err != nil {
return nil, err
}
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {

View File

@ -57,15 +57,19 @@ func TestDefaultContext(t *testing.T) {
jobData.ExtraData["checkInFunc"] = checkInFunc
jobData.ExtraData["launchJobFunc"] = launchJobFunc
oldLogConfig := config.DefaultConfig.LoggerConfig
oldLogConfig := config.DefaultConfig.JobLoggerConfigs
defer func() {
config.DefaultConfig.LoggerConfig = oldLogConfig
config.DefaultConfig.JobLoggerConfigs = oldLogConfig
}()
config.DefaultConfig.LoggerConfig = &config.LoggerConfig{
LogLevel: "debug",
ArchivePeriod: 1,
BasePath: os.TempDir(),
logSettings := map[string]interface{}{}
logSettings["base_dir"] = os.TempDir()
config.DefaultConfig.JobLoggerConfigs = []*config.LoggerConfig{
{
Level: "DEBUG",
Name: "FILE",
Settings: logSettings,
},
}
newJobContext, err := defaultContext.Build(jobData)

View File

@ -1,128 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger
import (
"os"
"strings"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/jobservice/logger"
)
// JobLogger is an implementation of logger.Interface.
// It used in the job to output logs to the logfile.
type JobLogger struct {
backendLogger *log.Logger
streamRef *os.File
}
// New logger
// nil might be returned
func New(logPath string, level string) logger.Interface {
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logger.Errorf("Failed to create job logger: %s", err)
return nil
}
logLevel := parseLevel(level)
backendLogger := log.New(f, log.NewTextFormatter(), logLevel)
return &JobLogger{
backendLogger: backendLogger,
streamRef: f,
}
}
// Close the opened io stream
// Implements logger.Closer interface
func (jl *JobLogger) Close() error {
if jl.streamRef != nil {
return jl.streamRef.Close()
}
return nil
}
// Debug ...
func (jl *JobLogger) Debug(v ...interface{}) {
jl.backendLogger.Debug(v...)
}
// Debugf with format
func (jl *JobLogger) Debugf(format string, v ...interface{}) {
jl.backendLogger.Debugf(format, v...)
}
// Info ...
func (jl *JobLogger) Info(v ...interface{}) {
jl.backendLogger.Info(v...)
}
// Infof with format
func (jl *JobLogger) Infof(format string, v ...interface{}) {
jl.backendLogger.Infof(format, v...)
}
// Warning ...
func (jl *JobLogger) Warning(v ...interface{}) {
jl.backendLogger.Warning(v...)
}
// Warningf with format
func (jl *JobLogger) Warningf(format string, v ...interface{}) {
jl.backendLogger.Warningf(format, v...)
}
// Error ...
func (jl *JobLogger) Error(v ...interface{}) {
jl.backendLogger.Error(v...)
}
// Errorf with format
func (jl *JobLogger) Errorf(format string, v ...interface{}) {
jl.backendLogger.Errorf(format, v...)
}
// Fatal error
func (jl *JobLogger) Fatal(v ...interface{}) {
jl.backendLogger.Fatal(v...)
}
// Fatalf error
func (jl *JobLogger) Fatalf(format string, v ...interface{}) {
jl.backendLogger.Fatalf(format, v...)
}
func parseLevel(lvl string) log.Level {
var level = log.WarningLevel
switch strings.ToLower(lvl) {
case "debug":
level = log.DebugLevel
case "info":
level = log.InfoLevel
case "warning":
level = log.WarningLevel
case "error":
level = log.ErrorLevel
case "fatal":
level = log.FatalLevel
default:
}
return level
}

View File

@ -1,88 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger
import (
"os"
"github.com/goharbor/harbor/src/common/utils/log"
)
// ServiceLogger is an implementation of logger.Interface.
// It used to log info in workerpool components.
type ServiceLogger struct {
backendLogger *log.Logger
}
// NewServiceLogger to create new logger for job service
// nil might be returned
func NewServiceLogger(level string) *ServiceLogger {
logLevel := parseLevel(level)
backendLogger := log.New(os.Stdout, log.NewTextFormatter(), logLevel)
return &ServiceLogger{
backendLogger: backendLogger,
}
}
// Debug ...
func (sl *ServiceLogger) Debug(v ...interface{}) {
sl.backendLogger.Debug(v...)
}
// Debugf with format
func (sl *ServiceLogger) Debugf(format string, v ...interface{}) {
sl.backendLogger.Debugf(format, v...)
}
// Info ...
func (sl *ServiceLogger) Info(v ...interface{}) {
sl.backendLogger.Info(v...)
}
// Infof with format
func (sl *ServiceLogger) Infof(format string, v ...interface{}) {
sl.backendLogger.Infof(format, v...)
}
// Warning ...
func (sl *ServiceLogger) Warning(v ...interface{}) {
sl.backendLogger.Warning(v...)
}
// Warningf with format
func (sl *ServiceLogger) Warningf(format string, v ...interface{}) {
sl.backendLogger.Warningf(format, v...)
}
// Error ...
func (sl *ServiceLogger) Error(v ...interface{}) {
sl.backendLogger.Error(v...)
}
// Errorf with format
func (sl *ServiceLogger) Errorf(format string, v ...interface{}) {
sl.backendLogger.Errorf(format, v...)
}
// Fatal error
func (sl *ServiceLogger) Fatal(v ...interface{}) {
sl.backendLogger.Fatal(v...)
}
// Fatalf error
func (sl *ServiceLogger) Fatalf(format string, v ...interface{}) {
sl.backendLogger.Fatalf(format, v...)
}

View File

@ -0,0 +1,90 @@
package backend
import (
"os"
"github.com/goharbor/harbor/src/common/utils/log"
)
// FileLogger is an implementation of logger.Interface.
// It outputs logs to the specified logfile.
type FileLogger struct {
backendLogger *log.Logger
streamRef *os.File
}
// NewFileLogger crates a new file logger
// nil might be returned
func NewFileLogger(level string, logPath string, depth int) (*FileLogger, error) {
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
logLevel := parseLevel(level)
backendLogger := log.New(f, log.NewTextFormatter(), logLevel, depth)
return &FileLogger{
backendLogger: backendLogger,
streamRef: f,
}, nil
}
// Close the opened io stream
// Implements logger.Closer interface
func (fl *FileLogger) Close() error {
if fl.streamRef != nil {
return fl.streamRef.Close()
}
return nil
}
// Debug ...
func (fl *FileLogger) Debug(v ...interface{}) {
fl.backendLogger.Debug(v...)
}
// Debugf with format
func (fl *FileLogger) Debugf(format string, v ...interface{}) {
fl.backendLogger.Debugf(format, v...)
}
// Info ...
func (fl *FileLogger) Info(v ...interface{}) {
fl.backendLogger.Info(v...)
}
// Infof with format
func (fl *FileLogger) Infof(format string, v ...interface{}) {
fl.backendLogger.Infof(format, v...)
}
// Warning ...
func (fl *FileLogger) Warning(v ...interface{}) {
fl.backendLogger.Warning(v...)
}
// Warningf with format
func (fl *FileLogger) Warningf(format string, v ...interface{}) {
fl.backendLogger.Warningf(format, v...)
}
// Error ...
func (fl *FileLogger) Error(v ...interface{}) {
fl.backendLogger.Error(v...)
}
// Errorf with format
func (fl *FileLogger) Errorf(format string, v ...interface{}) {
fl.backendLogger.Errorf(format, v...)
}
// Fatal error
func (fl *FileLogger) Fatal(v ...interface{}) {
fl.backendLogger.Fatal(v...)
}
// Fatalf error
func (fl *FileLogger) Fatalf(format string, v ...interface{}) {
fl.backendLogger.Fatalf(format, v...)
}

View File

@ -0,0 +1,30 @@
package backend
import (
"os"
"path"
"testing"
)
// Test file logger creation with non existing file path
func TestFileLoggerCreation(t *testing.T) {
if _, err := NewFileLogger("DEBUG", "/non-existing/a.log", 4); err == nil {
t.Fatalf("expect non nil error but got nil when creating file logger with non existing path")
}
}
// Test file logger
func TestFileLogger(t *testing.T) {
l, err := NewFileLogger("DEBUG", path.Join(os.TempDir(), "TestFileLogger.log"), 4)
if err != nil {
t.Fatal(err)
}
l.Debug("TestFileLogger")
l.Info("TestFileLogger")
l.Warning("TestFileLogger")
l.Error("TestFileLogger")
l.Debugf("%s", "TestFileLogger")
l.Warningf("%s", "TestFileLogger")
l.Errorf("%s", "TestFileLogger")
}

View File

@ -0,0 +1,84 @@
package backend
import (
"os"
"github.com/goharbor/harbor/src/common/utils/log"
)
const (
// StdOut represents os.Stdout
StdOut = "std_out"
// StdErr represents os.StdErr
StdErr = "std_err"
)
// StdOutputLogger is an implementation of logger.Interface.
// It outputs the log to the stdout/stderr.
type StdOutputLogger struct {
backendLogger *log.Logger
}
// NewStdOutputLogger creates a new std output logger
func NewStdOutputLogger(level string, output string, depth int) *StdOutputLogger {
logLevel := parseLevel(level)
logStream := os.Stdout
if output == StdErr {
logStream = os.Stderr
}
backendLogger := log.New(logStream, log.NewTextFormatter(), logLevel, depth)
return &StdOutputLogger{
backendLogger: backendLogger,
}
}
// Debug ...
func (sl *StdOutputLogger) Debug(v ...interface{}) {
sl.backendLogger.Debug(v...)
}
// Debugf with format
func (sl *StdOutputLogger) Debugf(format string, v ...interface{}) {
sl.backendLogger.Debugf(format, v...)
}
// Info ...
func (sl *StdOutputLogger) Info(v ...interface{}) {
sl.backendLogger.Info(v...)
}
// Infof with format
func (sl *StdOutputLogger) Infof(format string, v ...interface{}) {
sl.backendLogger.Infof(format, v...)
}
// Warning ...
func (sl *StdOutputLogger) Warning(v ...interface{}) {
sl.backendLogger.Warning(v...)
}
// Warningf with format
func (sl *StdOutputLogger) Warningf(format string, v ...interface{}) {
sl.backendLogger.Warningf(format, v...)
}
// Error ...
func (sl *StdOutputLogger) Error(v ...interface{}) {
sl.backendLogger.Error(v...)
}
// Errorf with format
func (sl *StdOutputLogger) Errorf(format string, v ...interface{}) {
sl.backendLogger.Errorf(format, v...)
}
// Fatal error
func (sl *StdOutputLogger) Fatal(v ...interface{}) {
sl.backendLogger.Fatal(v...)
}
// Fatalf error
func (sl *StdOutputLogger) Fatalf(format string, v ...interface{}) {
sl.backendLogger.Fatalf(format, v...)
}

View File

@ -0,0 +1,16 @@
package backend
import "testing"
// Test std logger
func TestStdLogger(t *testing.T) {
l := NewStdOutputLogger("DEBUG", StdErr, 4)
l.Debug("TestStdLogger")
l.Debugf("%s", "TestStdLogger")
l.Info("TestStdLogger")
l.Infof("%s", "TestStdLogger")
l.Warning("TestStdLogger")
l.Warningf("%s", "TestStdLogger")
l.Error("TestStdLogger")
l.Errorf("%s", "TestStdLogger")
}

View File

@ -0,0 +1,28 @@
package backend
import (
"strings"
"github.com/goharbor/harbor/src/common/utils/log"
)
func parseLevel(lvl string) log.Level {
var level = log.WarningLevel
switch strings.ToLower(lvl) {
case "debug":
level = log.DebugLevel
case "info":
level = log.InfoLevel
case "warning":
level = log.WarningLevel
case "error":
level = log.ErrorLevel
case "fatal":
level = log.FatalLevel
default:
}
return level
}

View File

@ -0,0 +1,29 @@
package backend
import (
"testing"
"github.com/goharbor/harbor/src/common/utils/log"
)
// Test parseLevel
func TestParseLevel(t *testing.T) {
if l := parseLevel(""); l != log.WarningLevel {
t.Errorf("expect level %d but got %d", log.WarningLevel, l)
}
if l := parseLevel("DEBUG"); l != log.DebugLevel {
t.Errorf("expect level %d but got %d", log.DebugLevel, l)
}
if l := parseLevel("info"); l != log.InfoLevel {
t.Errorf("expect level %d but got %d", log.InfoLevel, l)
}
if l := parseLevel("warning"); l != log.WarningLevel {
t.Errorf("expect level %d but got %d", log.WarningLevel, l)
}
if l := parseLevel("error"); l != log.ErrorLevel {
t.Errorf("expect level %d but got %d", log.ErrorLevel, l)
}
if l := parseLevel("FATAL"); l != log.FatalLevel {
t.Errorf("expect level %d but got %d", log.FatalLevel, l)
}
}

View File

@ -0,0 +1,239 @@
package logger
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/logger/getter"
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
)
const (
systemKeyServiceLogger = "system.jobServiceLogger"
systemKeyLogDataGetter = "system.logDataGetter"
)
var singletons sync.Map
// GetLogger gets an unified logger entry for logging per the passed settings.
// The logger may built based on the multiple registered logger backends.
//
// loggerOptions ...Option : logger options
//
// If failed, a nil logger and a non-nil error will be returned.
// Otherwise, a non nil logger is returned with nil error.
func GetLogger(loggerOptions ...Option) (Interface, error) {
lOptions := &options{
values: make(map[string][]OptionItem),
}
// Config
for _, op := range loggerOptions {
op.Apply(lOptions)
}
// No options specified, enable std as default
if len(loggerOptions) == 0 {
defaultOp := BackendOption(LoggerNameStdOutput, "", nil)
defaultOp.Apply(lOptions)
}
// Create backends
loggers := []Interface{}
for name, ops := range lOptions.values {
if !IsKnownLogger(name) {
return nil, fmt.Errorf("no logger registered for name '%s'", name)
}
d := KnownLoggers(name)
var (
l Interface
ok bool
)
// Singleton
if d.Singleton {
var li interface{}
li, ok = singletons.Load(name)
if ok {
l = li.(Interface)
}
}
if !ok {
var err error
l, err = d.Logger(ops...)
if err != nil {
return nil, err
}
// Cache it
singletons.Store(name, l)
}
// Append to the logger list as logger entry backends
loggers = append(loggers, l)
}
return NewEntry(loggers), nil
}
// GetSweeper gets an unified sweeper controller for sweeping purpose.
//
// context context.Context : system contex used to control the sweeping loops
// sweeperOptions ...Option : sweeper options
//
// If failed, a nil sweeper and a non-nil error will be returned.
// Otherwise, a non nil sweeper is returned with nil error.
func GetSweeper(context context.Context, sweeperOptions ...Option) (sweeper.Interface, error) {
// No default sweeper will provdie
// If no one is configured, directly return nil with error
if len(sweeperOptions) == 0 {
return nil, errors.New("no options provided for creating sweeper controller")
}
sOptions := &options{
values: make(map[string][]OptionItem),
}
// Config
for _, op := range sweeperOptions {
op.Apply(sOptions)
}
sweepers := []sweeper.Interface{}
for name, ops := range sOptions.values {
if !HasSweeper(name) {
return nil, fmt.Errorf("no sweeper provided for the logger %s", name)
}
d := KnownLoggers(name)
s, err := d.Sweeper(ops...)
if err != nil {
return nil, err
}
sweepers = append(sweepers, s)
}
return NewSweeperController(context, sweepers), nil
}
// GetLogDataGetter return the 1st matched log data getter interface
//
// loggerOptions ...Option : logger options
//
// If failed,
// configured but initialize failed: a nil log data getter and a non-nil error will be returned.
// no getter configured: a nil log data getter with a nil error are returned
// Otherwise, a non nil log data getter is returned with nil error.
func GetLogDataGetter(loggerOptions ...Option) (getter.Interface, error) {
if len(loggerOptions) == 0 {
// If no options, directly return nil interface with error
return nil, errors.New("no options provided to create log data getter")
}
lOptions := &options{
values: make(map[string][]OptionItem),
}
// Config
for _, op := range loggerOptions {
op.Apply(lOptions)
}
// Iterate with specified order
keys := []string{}
for k := range lOptions.values {
keys = append(keys, k)
}
// Sort
sort.Strings(keys)
for _, k := range keys {
if HasGetter(k) {
// 1st match
d := knownLoggers[k]
theGetter, err := d.Getter(lOptions.values[k]...)
if err != nil {
return nil, err
}
return theGetter, nil
}
}
// No one configured
return nil, nil
}
// Init the loggers and sweepers
func Init(ctx context.Context) error {
// For loggers
options := []Option{}
// For sweepers
sOptions := []Option{}
for _, lc := range config.DefaultConfig.LoggerConfigs {
// Inject logger depth here for FILE and STD logger to avoid configuring it in the yaml
// For logger of job service itself, the depth should be 6
if lc.Name == LoggerNameFile || lc.Name == LoggerNameStdOutput {
if lc.Settings == nil {
lc.Settings = map[string]interface{}{}
}
lc.Settings["depth"] = 6
}
options = append(options, BackendOption(lc.Name, lc.Level, lc.Settings))
if lc.Sweeper != nil {
sOptions = append(sOptions, SweeperOption(lc.Name, lc.Sweeper.Duration, lc.Sweeper.Settings))
}
}
// Get loggers for job service
lg, err := GetLogger(options...)
if err != nil {
return err
}
// Avoid data race issue
singletons.Store(systemKeyServiceLogger, lg)
jOptions := []Option{}
// Append configured sweepers in job loggers if existing
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
jOptions = append(jOptions, BackendOption(lc.Name, lc.Level, lc.Settings))
if lc.Sweeper != nil {
sOptions = append(sOptions, SweeperOption(lc.Name, lc.Sweeper.Duration, lc.Sweeper.Settings))
}
}
// Get log data getter with the same options of job loggers
g, err := GetLogDataGetter(jOptions...)
if err != nil {
return err
}
if g != nil {
// Avoid data race issue
singletons.Store(systemKeyLogDataGetter, g)
}
// If sweepers configured
if len(sOptions) > 0 {
// Get the sweeper controller
sweeper, err := GetSweeper(ctx, sOptions...)
if err != nil {
return fmt.Errorf("create logger sweeper error: %s", err)
}
// Start sweep loop
_, err = sweeper.Sweep()
if err != nil {
return fmt.Errorf("start logger sweeper error: %s", err)
}
}
return nil
}

View File

@ -0,0 +1,216 @@
package logger
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"testing"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/logger/backend"
)
// Test one single std logger
func TestGetLoggerSingleStd(t *testing.T) {
l, err := GetLogger(BackendOption("STD_OUTPUT", "DEBUG", nil))
if err != nil {
t.Fatal(err)
}
l.Debugf("Verify logger testing: %s", "case_1")
lSettings := map[string]interface{}{}
lSettings["output"] = backend.StdErr
l, err = GetLogger(BackendOption("STD_OUTPUT", "ERROR", lSettings))
if err != nil {
t.Fatal(err)
}
l.Errorf("Verify logger testing: %s", "case_2")
// With empty options
l, err = GetLogger()
if err != nil {
t.Fatal(err)
}
l.Warningf("Verify logger testing: %s", "case_3")
}
// Test one single file logger
func TestGetLoggerSingleFile(t *testing.T) {
_, err := GetLogger(BackendOption("FILE", "DEBUG", nil))
if err == nil {
t.Fatalf("expect non nil error when creating file logger with empty settings but got nil error: %s", "case_4")
}
lSettings := map[string]interface{}{}
lSettings["base_dir"] = os.TempDir()
lSettings["filename"] = fmt.Sprintf("%s.log", "fake_job_ID")
defer func() {
if err := os.Remove(path.Join(os.TempDir(), lSettings["filename"].(string))); err != nil {
t.Error(err)
}
}()
l, err := GetLogger(BackendOption("FILE", "DEBUG", lSettings))
if err != nil {
t.Fatal(err)
}
l.Debugf("Verify logger testing: %s", "case_5")
}
// Test getting multi loggers
func TestGetLoggersMulti(t *testing.T) {
lSettings := map[string]interface{}{}
lSettings["base_dir"] = os.TempDir()
lSettings["filename"] = fmt.Sprintf("%s.log", "fake_job_ID2")
defer func() {
if err := os.Remove(path.Join(os.TempDir(), lSettings["filename"].(string))); err != nil {
t.Error(err)
}
}()
ops := []Option{}
ops = append(
ops,
BackendOption("STD_OUTPUT", "DEBUG", nil),
BackendOption("FILE", "DEBUG", lSettings),
)
l, err := GetLogger(ops...)
if err != nil {
t.Fatal(err)
}
l.Infof("Verify logger testing: %s", "case_6")
}
// Test getting sweepers
func TestGetSweeper(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := GetSweeper(ctx)
if err == nil {
t.Fatalf("expect non nil error but got nil error when getting sweeper with empty settings: %s", "case_7")
}
_, err = GetSweeper(ctx, SweeperOption("STD_OUTPUT", 1, nil))
if err == nil {
t.Fatalf("expect non nil error but got nil error when getting sweeper with name 'STD_OUTPUT': %s", "case_8")
}
sSettings := map[string]interface{}{}
sSettings["work_dir"] = os.TempDir()
s, err := GetSweeper(ctx, SweeperOption("FILE", 5, sSettings))
if err != nil {
t.Fatal(err)
}
_, err = s.Sweep()
if err != nil {
t.Fatalf("[%s] start sweeper error: %s", "case_9", err)
}
}
// Test getting getters
func TestGetGetter(t *testing.T) {
_, err := GetLogDataGetter()
if err == nil {
t.Fatalf("error should be returned if no options provided: %s", "case_10")
}
// no configured
g, err := GetLogDataGetter(GetterOption("STD_OUTPUT", nil))
if err != nil || g != nil {
t.Fatalf("nil interface with nil error should be returned if no log data getter configured: %s", "case_11")
}
lSettings := map[string]interface{}{}
_, err = GetLogDataGetter(GetterOption("FILE", lSettings))
if err == nil {
t.Fatalf("expect non nil error but got nil one: %s", "case_12")
}
lSettings["base_dir"] = os.TempDir()
g, err = GetLogDataGetter(GetterOption("FILE", lSettings))
if err != nil {
t.Fatal(err)
}
logFile := path.Join(os.TempDir(), "fake_log_file.log")
if err := ioutil.WriteFile(logFile, []byte("hello log getter"), 0644); err != nil {
t.Fatal(err)
}
defer func() {
if err := os.Remove(logFile); err != nil {
t.Error(err)
}
}()
data, err := g.Retrieve("fake_log_file")
if err != nil {
t.Error(err)
}
if len(data) != 16 {
t.Errorf("expect 16 bytes data but got %d bytes", len(data))
}
}
// Test init
func TestLoggerInit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldJobLoggerCfg := config.DefaultConfig.JobLoggerConfigs
oldLoggerCfg := config.DefaultConfig.LoggerConfigs
defer func() {
config.DefaultConfig.JobLoggerConfigs = oldJobLoggerCfg
config.DefaultConfig.LoggerConfigs = oldLoggerCfg
}()
config.DefaultConfig.JobLoggerConfigs = []*config.LoggerConfig{
{
Name: "STD_OUTPUT",
Level: "DEBUG",
Settings: map[string]interface{}{
"output": backend.StdErr,
},
},
{
Name: "FILE",
Level: "ERROR",
Settings: map[string]interface{}{
"base_dir": os.TempDir(),
},
Sweeper: &config.LogSweeperConfig{
Duration: 5,
Settings: map[string]interface{}{
"work_dir": os.TempDir(),
},
},
},
}
config.DefaultConfig.LoggerConfigs = []*config.LoggerConfig{
{
Name: "STD_OUTPUT",
Level: "DEBUG",
},
}
if err := Init(ctx); err != nil {
t.Fatal(err)
}
Debug("Verify logger init: case_13")
Info("Verify logger init: case_13")
Infof("Verify logger init: %s", "case_13")
Error("Verify logger init: case_13")
Errorf("Verify logger init: %s", "case_13")
}

View File

@ -0,0 +1,84 @@
package logger
// Entry provides unique interfaces on top of multiple logger backends.
// Entry also implements @Interface.
type Entry struct {
loggers []Interface
}
// NewEntry creates a new logger Entry
func NewEntry(loggers []Interface) *Entry {
return &Entry{
loggers: loggers,
}
}
// Debug ...
func (e *Entry) Debug(v ...interface{}) {
for _, l := range e.loggers {
l.Debug(v...)
}
}
// Debugf with format
func (e *Entry) Debugf(format string, v ...interface{}) {
for _, l := range e.loggers {
l.Debugf(format, v...)
}
}
// Info ...
func (e *Entry) Info(v ...interface{}) {
for _, l := range e.loggers {
l.Info(v...)
}
}
// Infof with format
func (e *Entry) Infof(format string, v ...interface{}) {
for _, l := range e.loggers {
l.Infof(format, v...)
}
}
// Warning ...
func (e *Entry) Warning(v ...interface{}) {
for _, l := range e.loggers {
l.Warning(v...)
}
}
// Warningf with format
func (e *Entry) Warningf(format string, v ...interface{}) {
for _, l := range e.loggers {
l.Warningf(format, v...)
}
}
// Error ...
func (e *Entry) Error(v ...interface{}) {
for _, l := range e.loggers {
l.Error(v...)
}
}
// Errorf with format
func (e *Entry) Errorf(format string, v ...interface{}) {
for _, l := range e.loggers {
l.Errorf(format, v...)
}
}
// Fatal error
func (e *Entry) Fatal(v ...interface{}) {
for _, l := range e.loggers {
l.Fatal(v...)
}
}
// Fatalf error
func (e *Entry) Fatalf(format string, v ...interface{}) {
for _, l := range e.loggers {
l.Fatalf(format, v...)
}
}

View File

@ -0,0 +1,64 @@
package logger
import (
"errors"
"path"
"github.com/goharbor/harbor/src/jobservice/logger/backend"
)
// Factory creates a new logger based on the settings.
type Factory func(options ...OptionItem) (Interface, error)
// FileFactory is factory of file logger
func FileFactory(options ...OptionItem) (Interface, error) {
var (
level, baseDir, fileName string
depth int
)
for _, op := range options {
switch op.Field() {
case "level":
level = op.String()
case "base_dir":
baseDir = op.String()
case "filename":
fileName = op.String()
case "depth":
depth = op.Int()
default:
}
}
if len(baseDir) == 0 {
return nil, errors.New("missing base dir option of the file logger")
}
if len(fileName) == 0 {
return nil, errors.New("missing file name option of the file logger")
}
return backend.NewFileLogger(level, path.Join(baseDir, fileName), depth)
}
// StdFactory is factory of std output logger.
func StdFactory(options ...OptionItem) (Interface, error) {
var (
level, output string
depth int
)
for _, op := range options {
switch op.Field() {
case "level":
level = op.String()
case "output":
output = op.String()
case "depth":
depth = op.Int()
default:
}
}
return backend.NewStdOutputLogger(level, output, depth), nil
}

View File

@ -0,0 +1,12 @@
package getter
// Interface defines operations of a log data getter
type Interface interface {
// Retrieve the log data of the specified log entry
//
// logID string : the id of the log entry. e.g: file name a.log for file log
//
// If succeed, log data bytes will be returned
// otherwise, a non nil error is returned
Retrieve(logID string) ([]byte, error)
}

View File

@ -0,0 +1,37 @@
package getter
import (
"errors"
"fmt"
"io/ioutil"
"path"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/utils"
)
// FileGetter is responsible for retrieving file log data
type FileGetter struct {
baseDir string
}
// NewFileGetter is constructor of FileGetter
func NewFileGetter(baseDir string) *FileGetter {
return &FileGetter{baseDir}
}
// Retrieve implements @Interface.Retrieve
func (fg *FileGetter) Retrieve(logID string) ([]byte, error) {
if len(logID) == 0 {
return nil, errors.New("empty log identify")
}
fPath := path.Join(fg.baseDir, fmt.Sprintf("%s.log", logID))
if !utils.FileExists(fPath) {
return nil, errs.NoObjectFoundError(logID)
}
return ioutil.ReadFile(fPath)
}

View File

@ -0,0 +1,41 @@
package getter
import (
"io/ioutil"
"os"
"path"
"testing"
"github.com/goharbor/harbor/src/jobservice/errs"
)
// Test the log data getter
func TestLogDataGetter(t *testing.T) {
fakeLog := path.Join(os.TempDir(), "TestLogDataGetter.log")
if err := ioutil.WriteFile(fakeLog, []byte("hello"), 0644); err != nil {
t.Fatal(err)
}
defer func() {
if err := os.Remove(fakeLog); err != nil {
t.Error(err)
}
}()
fg := NewFileGetter(os.TempDir())
if _, err := fg.Retrieve("not-existing"); err != nil {
if !errs.IsObjectNotFoundError(err) {
t.Error("expect object not found error but got other error")
}
} else {
t.Error("expect non nil error but got nil")
}
data, err := fg.Retrieve("TestLogDataGetter")
if err != nil {
t.Error(err)
}
if len(data) != 5 {
t.Errorf("expect reading 5 bytes but got %d bytes", len(data))
}
}

View File

@ -0,0 +1,27 @@
package logger
import (
"errors"
"github.com/goharbor/harbor/src/jobservice/logger/getter"
)
// GetterFactory is responsible for creating a log data getter based on the options
type GetterFactory func(options ...OptionItem) (getter.Interface, error)
// FileGetterFactory creates a getter for the "FILE" logger
func FileGetterFactory(options ...OptionItem) (getter.Interface, error) {
var baseDir string
for _, op := range options {
if op.Field() == "base_dir" {
baseDir = op.String()
break
}
}
if len(baseDir) == 0 {
return nil, errors.New("missing required option 'base_dir'")
}
return getter.NewFileGetter(baseDir), nil
}

View File

@ -0,0 +1,81 @@
package logger
import "strings"
const (
// LoggerNameFile is unique name of the file logger.
LoggerNameFile = "FILE"
// LoggerNameStdOutput is the unique name of the std logger.
LoggerNameStdOutput = "STD_OUTPUT"
)
// Declaration is used to declare a supported logger.
// Use this declaration to indicate what logger and sweeper will be provided.
type Declaration struct {
Logger Factory
Sweeper SweeperFactory
Getter GetterFactory
// Indicate if the logger is a singleton logger
Singleton bool
}
// knownLoggers is a static logger registry.
// All the implemented loggers (w/ sweeper) should be registered
// with an unique name in this registry. Then they can be used to
// log info.
var knownLoggers = map[string]*Declaration{
// File logger
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
// STD output(both stdout and stderr) logger
LoggerNameStdOutput: {StdFactory, nil, nil, true},
}
// IsKnownLogger checks if the logger is supported with name.
func IsKnownLogger(name string) bool {
_, ok := knownLoggers[name]
return ok
}
// HasSweeper checks if the logger with the name provides a sweeper.
func HasSweeper(name string) bool {
d, ok := knownLoggers[name]
return ok && d.Sweeper != nil
}
// HasGetter checks if the logger with the name provides a log data getter.
func HasGetter(name string) bool {
d, ok := knownLoggers[name]
return ok && d.Getter != nil
}
// KnownLoggers return the declaration by the name
func KnownLoggers(name string) *Declaration {
return knownLoggers[name]
}
// All known levels which are supported.
var debugLevels = []string{
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"FATAL",
}
// IsKnownLevel is used to check if the logger level is supported.
func IsKnownLevel(level string) bool {
if len(level) == 0 {
return false
}
for _, lvl := range debugLevels {
if lvl == strings.ToUpper(level) {
return true
}
}
return false
}

View File

@ -0,0 +1,23 @@
package logger
import (
"errors"
"github.com/goharbor/harbor/src/jobservice/logger/getter"
)
// Retrieve is wrapper func for getter.Retrieve
func Retrieve(logID string) ([]byte, error) {
val, ok := singletons.Load(systemKeyLogDataGetter)
if !ok {
return nil, errors.New("no log data getter is configured")
}
return val.(getter.Interface).Retrieve(logID)
}
// HasLogGetterConfigured checks if a log data getter is there for using
func HasLogGetterConfigured() bool {
_, ok := singletons.Load(systemKeyLogDataGetter)
return ok
}

View File

@ -0,0 +1,99 @@
package logger
// options keep settings for loggers/sweepers
// Indexed by the logger unique name
type options struct {
values map[string][]OptionItem
}
// Option represents settings of the logger
type Option struct {
// Apply logger option
Apply func(op *options)
}
// BackendOption creates option for the specified backend.
func BackendOption(name string, level string, settings map[string]interface{}) Option {
return Option{func(op *options) {
vals := make([]OptionItem, 0)
vals = append(vals, OptionItem{"level", level})
// Append extra settings if existing
if len(settings) > 0 {
for k, v := range settings {
vals = append(vals, OptionItem{k, v})
}
}
// Append with overriding way
op.values[name] = vals
}}
}
// SweeperOption creates option for the sweeper.
func SweeperOption(name string, duration int, settings map[string]interface{}) Option {
return Option{func(op *options) {
vals := make([]OptionItem, 0)
vals = append(vals, OptionItem{"duration", duration})
// Append settings if existing
if len(settings) > 0 {
for k, v := range settings {
vals = append(vals, OptionItem{k, v})
}
}
// Append with overriding way
op.values[name] = vals
}}
}
// GetterOption creates option for the getter.
func GetterOption(name string, settings map[string]interface{}) Option {
return Option{func(op *options) {
vals := make([]OptionItem, 0)
// Append settings if existing
if len(settings) > 0 {
for k, v := range settings {
vals = append(vals, OptionItem{k, v})
}
}
// Append with overriding way
op.values[name] = vals
}}
}
// OptionItem is a simple wrapper of property and value
type OptionItem struct {
field string
val interface{}
}
// Field returns name of the option
func (o *OptionItem) Field() string {
return o.field
}
// Int returns the integer value of option
func (o *OptionItem) Int() int {
if o.val == nil {
return 0
}
return o.val.(int)
}
// String returns the string value of option
func (o *OptionItem) String() string {
if o.val == nil {
return ""
}
return o.val.(string)
}
// Raw returns the raw value
func (o *OptionItem) Raw() interface{} {
return o.val
}

View File

@ -18,110 +18,102 @@ import (
"log"
)
// sLogger is used to log for workerpool itself
var sLogger Interface
// jobServiceLogger is used to log for job service itself
func jobServiceLogger() (Interface, bool) {
val, ok := singletons.Load(systemKeyServiceLogger)
if ok {
return val.(Interface), ok
}
// SetLogger sets the logger implementation
func SetLogger(logger Interface) {
sLogger = logger
return nil, false
}
// Debug ...
func Debug(v ...interface{}) {
if sLogger != nil {
sLogger.Debug(v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Debug(v...)
} else {
log.Println(v...)
}
log.Println(v...)
}
// Debugf for debuging with format
func Debugf(format string, v ...interface{}) {
if sLogger != nil {
sLogger.Debugf(format, v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Debugf(format, v...)
} else {
log.Printf(format, v...)
}
log.Printf(format, v...)
}
// Info ...
func Info(v ...interface{}) {
if sLogger != nil {
sLogger.Info(v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Info(v...)
} else {
log.Println(v...)
}
log.Println(v...)
}
// Infof for logging info with format
func Infof(format string, v ...interface{}) {
if sLogger != nil {
sLogger.Infof(format, v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Infof(format, v...)
} else {
log.Printf(format, v...)
}
log.Printf(format, v...)
}
// Warning ...
func Warning(v ...interface{}) {
if sLogger != nil {
sLogger.Warning(v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Warning(v...)
} else {
log.Println(v...)
}
log.Println(v...)
}
// Warningf for warning with format
func Warningf(format string, v ...interface{}) {
if sLogger != nil {
sLogger.Warningf(format, v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Warningf(format, v...)
} else {
log.Printf(format, v...)
}
log.Printf(format, v...)
}
// Error for logging error
func Error(v ...interface{}) {
if sLogger != nil {
sLogger.Error(v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Error(v...)
} else {
log.Println(v...)
}
log.Println(v...)
}
// Errorf for logging error with format
func Errorf(format string, v ...interface{}) {
if sLogger != nil {
sLogger.Errorf(format, v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Errorf(format, v...)
} else {
log.Printf(format, v...)
}
log.Printf(format, v...)
}
// Fatal ...
func Fatal(v ...interface{}) {
if sLogger != nil {
sLogger.Fatal(v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Fatal(v...)
} else {
log.Fatalln(v...)
}
log.Fatal(v...)
}
// Fatalf for fatal error with error
func Fatalf(format string, v ...interface{}) {
if sLogger != nil {
sLogger.Fatalf(format, v...)
return
if jLogger, ok := jobServiceLogger(); ok {
jLogger.Fatalf(format, v...)
} else {
log.Fatalf(format, v...)
}
log.Fatalf(format, v...)
}

View File

@ -1,87 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger
import (
"fmt"
"testing"
)
func TestServiceLogger(t *testing.T) {
testingLogger := &fakeLogger{}
SetLogger(testingLogger)
Debug("DEBUG")
Debugf("%s\n", "DEBUGF")
Info("INFO")
Infof("%s\n", "INFOF")
Warning("WARNING")
Warningf("%s\n", "WARNINGF")
Error("ERROR")
Errorf("%s\n", "ERRORF")
Fatal("FATAL")
Fatalf("%s\n", "FATALF")
}
type fakeLogger struct{}
// For debuging
func (fl *fakeLogger) Debug(v ...interface{}) {
fmt.Println(v...)
}
// For debuging with format
func (fl *fakeLogger) Debugf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
// For logging info
func (fl *fakeLogger) Info(v ...interface{}) {
fmt.Println(v...)
}
// For logging info with format
func (fl *fakeLogger) Infof(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
// For warning
func (fl *fakeLogger) Warning(v ...interface{}) {
fmt.Println(v...)
}
// For warning with format
func (fl *fakeLogger) Warningf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
// For logging error
func (fl *fakeLogger) Error(v ...interface{}) {
fmt.Println(v...)
}
// For logging error with format
func (fl *fakeLogger) Errorf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
// For fatal error
func (fl *fakeLogger) Fatal(v ...interface{}) {
fmt.Println(v...)
}
// For fatal error with error
func (fl *fakeLogger) Fatalf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}

View File

@ -1,103 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger
import (
"context"
"fmt"
"io/ioutil"
"os"
"time"
)
const (
oneDay = 3600 * 24
)
// Sweeper takes charge of archive the outdated log files of jobs.
type Sweeper struct {
context context.Context
workDir string
period uint
}
// NewSweeper creates new prt of Sweeper
func NewSweeper(ctx context.Context, workDir string, period uint) *Sweeper {
return &Sweeper{ctx, workDir, period}
}
// Start to work
func (s *Sweeper) Start() {
go s.loop()
Info("Logger sweeper is started")
}
func (s *Sweeper) loop() {
// Apply default if needed before starting
if s.period == 0 {
s.period = 1
}
defer func() {
Info("Logger sweeper is stopped")
}()
// First run
go s.clear()
ticker := time.NewTicker(time.Duration(s.period*oneDay+5) * time.Second)
defer ticker.Stop()
for {
select {
case <-s.context.Done():
return
case <-ticker.C:
go s.clear()
}
}
}
func (s *Sweeper) clear() {
var (
cleared uint
count = &cleared
)
Info("Start to clear the job outdated log files")
defer func() {
Infof("%d job outdated log files cleared", *count)
}()
logFiles, err := ioutil.ReadDir(s.workDir)
if err != nil {
Errorf("Failed to get the outdated log files under '%s' with error: %s\n", s.workDir, err)
return
}
if len(logFiles) == 0 {
return
}
for _, logFile := range logFiles {
if logFile.ModTime().Add(time.Duration(s.period*oneDay) * time.Second).Before(time.Now()) {
logFilePath := fmt.Sprintf("%s%s%s", s.workDir, string(os.PathSeparator), logFile.Name())
if err := os.Remove(logFilePath); err == nil {
cleared++
} else {
Warningf("Failed to remove log file '%s'\n", logFilePath)
}
}
}
}

View File

@ -0,0 +1,70 @@
package sweeper
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"time"
)
const (
oneDay = 24 * time.Hour
)
// FileSweeper is used to sweep the file logs
type FileSweeper struct {
duration int
workDir string
}
// NewFileSweeper is constructor of FileSweeper
func NewFileSweeper(workDir string, duration int) *FileSweeper {
return &FileSweeper{
workDir: workDir,
duration: duration,
}
}
// Sweep logs
func (fs *FileSweeper) Sweep() (int, error) {
cleared := 0
logFiles, err := ioutil.ReadDir(fs.workDir)
if err != nil {
return 0, fmt.Errorf("getting outdated log files under '%s' failed with error: %s", fs.workDir, err)
}
// Nothing to sweep
if len(logFiles) == 0 {
return 0, nil
}
// Start to sweep log files
// Record all errors
errs := make([]string, 0)
for _, logFile := range logFiles {
if logFile.ModTime().Add(time.Duration(fs.duration) * oneDay).Before(time.Now()) {
logFilePath := path.Join(fs.workDir, logFile.Name())
if err := os.Remove(logFilePath); err != nil {
errs = append(errs, fmt.Sprintf("remove log file '%s' error: %s", logFilePath, err))
continue // go on for next one
}
cleared++
}
}
if len(errs) > 0 {
err = fmt.Errorf("%s", strings.Join(errs, "\n"))
}
// Return error with high priority
return cleared, err
}
// Duration for sweeping
func (fs *FileSweeper) Duration() int {
return fs.duration
}

View File

@ -0,0 +1,45 @@
package sweeper
import (
"io/ioutil"
"os"
"path"
"testing"
"time"
)
// Test file sweeper
func TestFileSweeper(t *testing.T) {
workDir := path.Join(os.TempDir(), "job_logs")
if err := os.Mkdir(workDir, os.ModePerm); err != nil {
t.Fatal(err)
}
defer func() {
if err := os.RemoveAll(workDir); err != nil {
t.Error(err)
}
}()
logFile := path.Join(workDir, "TestFileSweeper.log")
if err := ioutil.WriteFile(logFile, []byte("hello"), os.ModePerm); err != nil {
t.Fatal(err)
}
oldModTime := time.Unix(time.Now().Unix()-6*24*3600, 0)
if err := os.Chtimes(logFile, oldModTime, oldModTime); err != nil {
t.Error(err)
}
fs := NewFileSweeper(workDir, 5)
if fs.Duration() != 5 {
t.Errorf("expect duration 5 but got %d", fs.Duration())
}
count, err := fs.Sweep()
if err != nil {
t.Error(err)
}
if count != 1 {
t.Errorf("expect count 1 but got %d", count)
}
}

View File

@ -0,0 +1,13 @@
package sweeper
// Interface defines the operations a sweeper should have
type Interface interface {
// Sweep the outdated log entries if necessary
//
// If failed, an non-nil error will return
// If succeeded, count of sweepped log entries is returned
Sweep() (int, error)
// Return the sweeping duration with day unit.
Duration() int
}

View File

@ -0,0 +1,98 @@
package logger
import (
"context"
"fmt"
"reflect"
"time"
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
)
const (
oneDay = 24 * time.Hour
)
// SweeperController is an unified sweeper entry and built on top of multiple sweepers.
// It's responsible for starting the configured sweepers.
type SweeperController struct {
context context.Context
sweepers []sweeper.Interface
errChan chan error
}
// NewSweeperController is constructor of controller.
func NewSweeperController(ctx context.Context, sweepers []sweeper.Interface) *SweeperController {
return &SweeperController{
context: ctx,
sweepers: sweepers,
errChan: make(chan error, 1),
}
}
// Sweep logs
func (c *SweeperController) Sweep() (int, error) {
// Start to process errors
go func() {
for {
select {
case err := <-c.errChan:
Error(err)
case <-c.context.Done():
return
}
}
}()
for _, s := range c.sweepers {
go func(sw sweeper.Interface) {
c.startSweeper(sw)
}(s)
}
return 0, nil
}
// Duration = -1 for controller
func (c *SweeperController) Duration() int {
return -1
}
func (c *SweeperController) startSweeper(s sweeper.Interface) {
d := s.Duration()
if d <= 0 {
d = 1
}
// Use the type name as a simple ID
sid := reflect.TypeOf(s).String()
defer Infof("sweeper %s exit", sid)
// First run
go c.doSweeping(sid, s)
// Loop
ticker := time.NewTicker(time.Duration(d) * oneDay)
defer ticker.Stop()
for {
select {
case <-ticker.C:
go c.doSweeping(sid, s)
case <-c.context.Done():
return
}
}
}
func (c *SweeperController) doSweeping(sid string, s sweeper.Interface) {
Debugf("Sweeper %s is under working", sid)
count, err := s.Sweep()
if err != nil {
c.errChan <- fmt.Errorf("sweep logs error in %s at %d: %s", sid, time.Now().Unix(), err)
return
}
Infof("%d outdated log entries are sweepped by sweeper %s", count, sid)
}

View File

@ -0,0 +1,32 @@
package logger
import (
"errors"
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
)
// SweeperFactory is responsible for creating a sweeper.Interface based on the settings
type SweeperFactory func(options ...OptionItem) (sweeper.Interface, error)
// FileSweeperFactory creates file sweeper.
func FileSweeperFactory(options ...OptionItem) (sweeper.Interface, error) {
var workDir, duration = "", 1
for _, op := range options {
switch op.Field() {
case "work_dir":
workDir = op.String()
case "duration":
if op.Int() > 0 {
duration = op.Int()
}
default:
}
}
if len(workDir) == 0 {
return nil, errors.New("missing required option 'work_dir'")
}
return sweeper.NewFileSweeper(workDir, duration), nil
}

View File

@ -1,48 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger
import (
"context"
"fmt"
"os"
"testing"
"time"
)
func TestSweeper(t *testing.T) {
workDir := "/tmp/sweeper_logs"
if err := os.MkdirAll(workDir, 0755); err != nil {
t.Fatal(err)
}
_, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir))
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sweeper := NewSweeper(ctx, workDir, 1)
sweeper.Start()
<-time.After(100 * time.Millisecond)
if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil {
t.Fatal(err)
}
if err := os.Remove(workDir); err != nil {
t.Fatal(err)
}
}

View File

@ -15,14 +15,11 @@
package main
import (
"errors"
"context"
"flag"
"fmt"
"github.com/goharbor/harbor/src/adminserver/client"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job/impl"
ilogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/runtime"
"github.com/goharbor/harbor/src/jobservice/utils"
@ -36,16 +33,25 @@ func main() {
// Missing config file
if configPath == nil || utils.IsEmptyStr(*configPath) {
flag.Usage()
logger.Fatal("Config file should be specified")
panic("no config file is specified")
}
// Load configurations
if err := config.DefaultConfig.Load(*configPath, true); err != nil {
logger.Fatalf("Failed to load configurations with error: %s\n", err)
panic(fmt.Sprintf("load configurations error: %s\n", err))
}
// Create the root context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize logger
if err := logger.Init(ctx); err != nil {
panic(err)
}
// Set job context initializer
runtime.JobService.SetJobContextInitializer(func(ctx *env.Context) (env.JobContext, error) {
/*runtime.JobService.SetJobContextInitializer(func(ctx *env.Context) (env.JobContext, error) {
secret := config.GetAuthSecret()
if utils.IsEmptyStr(secret) {
return nil, errors.New("empty auth secret")
@ -59,12 +65,8 @@ func main() {
}
return jobCtx, nil
})
// New logger for job service
sLogger := ilogger.NewServiceLogger(config.GetLogLevel())
logger.SetLogger(sLogger)
})*/
// Start
runtime.JobService.LoadAndRun()
runtime.JobService.LoadAndRun(ctx, cancel)
}

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger/backend"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/utils"
@ -56,16 +57,34 @@ func TestJobWrapper(t *testing.T) {
EnqueuedAt: time.Now().Add(5 * time.Minute).Unix(),
}
oldLogConfig := config.DefaultConfig.LoggerConfig
oldJobLoggerCfg := config.DefaultConfig.JobLoggerConfigs
defer func() {
config.DefaultConfig.LoggerConfig = oldLogConfig
config.DefaultConfig.JobLoggerConfigs = oldJobLoggerCfg
}()
config.DefaultConfig.LoggerConfig = &config.LoggerConfig{
LogLevel: "debug",
ArchivePeriod: 1,
BasePath: os.TempDir(),
config.DefaultConfig.JobLoggerConfigs = []*config.LoggerConfig{
{
Name: "STD_OUTPUT",
Level: "DEBUG",
Settings: map[string]interface{}{
"output": backend.StdErr,
},
},
{
Name: "FILE",
Level: "ERROR",
Settings: map[string]interface{}{
"base_dir": os.TempDir(),
},
Sweeper: &config.LogSweeperConfig{
Duration: 5,
Settings: map[string]interface{}{
"work_dir": os.TempDir(),
},
},
},
}
if err := wrapper.Run(j); err != nil {
t.Fatal(err)
}

View File

@ -64,11 +64,7 @@ func (bs *Bootstrap) SetJobContextInitializer(initializer env.JobContextInitiali
// LoadAndRun will load configurations, initialize components and then start the related process to serve requests.
// Return error if meet any problems.
func (bs *Bootstrap) LoadAndRun() {
// Create the root context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) {
rootContext := &env.Context{
SystemContext: ctx,
WG: &sync.WaitGroup{},
@ -110,10 +106,6 @@ func (bs *Bootstrap) LoadAndRun() {
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
// Start outdated log files sweeper
logSweeper := logger.NewSweeper(ctx, config.GetLogBasePath(), config.GetLogArchivePeriod())
logSweeper.Start()
// To indicate if any errors occurred
var err error
// Block here