Add logger sweeper to periodically clear outdated log files created by the jobs

This commit is contained in:
Steven Zou 2018-03-22 23:39:24 +08:00
parent f7fe8380bd
commit 49bb9bfceb
5 changed files with 150 additions and 17 deletions

View File

@ -25,4 +25,5 @@ worker_pool:
logger:
path: "/Users/szou/tmp/job_logs"
level: "INFO"
archive_period: 1 #days

View File

@ -15,17 +15,18 @@ import (
)
const (
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
jobServicePort = "JOB_SERVICE_PORT"
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
jobServiceRedisHost = "JOB_SERVICE_POOL_REDIS_HOST"
jobServiceRedisPort = "JOB_SERVICE_POOL_REDIS_PORT"
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
jobServicePort = "JOB_SERVICE_PORT"
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
jobServiceRedisHost = "JOB_SERVICE_POOL_REDIS_HOST"
jobServiceRedisPort = "JOB_SERVICE_POOL_REDIS_PORT"
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
jobServiceLoggerArchivePeriod = "JOB_SERVICE_LOGGER_ARCHIVE_PERIOD"
//JobServiceProtocolHTTPS points to the 'https' protocol
JobServiceProtocolHTTPS = "https"
@ -80,8 +81,9 @@ type PoolConfig struct {
//LoggerConfig keeps logger configurations.
type LoggerConfig struct {
BasePath string `yaml:"path"`
LogLevel string `yaml:"level"`
BasePath string `yaml:"path"`
LogLevel string `yaml:"level"`
ArchivePeriod uint `yaml:"archive_period"`
}
//Load the configuration options from the specified yaml file.
@ -130,6 +132,15 @@ func GetLogLevel() string {
return ""
}
//GetLogArchivePeriod returns the archive period
func GetLogArchivePeriod() uint {
if DefaultConfig.LoggerConfig != nil {
return DefaultConfig.LoggerConfig.ArchivePeriod
}
return 1 //return default
}
//Load env variables
func (c *Configuration) loadEnvs() {
prot := utils.ReadEnv(jobServiceProtocol)
@ -218,12 +229,27 @@ func (c *Configuration) loadEnvs() {
//logger
loggerPath := utils.ReadEnv(jobServiceLoggerBasePath)
if !utils.IsEmptyStr(loggerPath) {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.BasePath = loggerPath
}
loggerLevel := utils.ReadEnv(jobServiceLoggerLevel)
if !utils.IsEmptyStr(loggerLevel) {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.LogLevel = loggerLevel
}
archivePeriod := utils.ReadEnv(jobServiceLoggerArchivePeriod)
if !utils.IsEmptyStr(archivePeriod) {
if period, err := strconv.Atoi(archivePeriod); err == nil {
if c.LoggerConfig == nil {
c.LoggerConfig = &LoggerConfig{}
}
c.LoggerConfig.ArchivePeriod = uint(period)
}
}
}
@ -291,5 +317,9 @@ func (c *Configuration) validate() error {
return fmt.Errorf("logger level can only be one of: %s", validLevels)
}
if c.LoggerConfig.ArchivePeriod == 0 {
return fmt.Errorf("logger archive period should be greater than 0")
}
return nil //valid
}

View File

@ -0,0 +1,93 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package logger
import (
"context"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/vmware/harbor/src/common/utils/log"
)
const (
oneDay = 3600 * 24
)
//Sweeper takes charge of archive the outdated log files of jobs.
type Sweeper struct {
context context.Context
workDir string
period uint
}
//NewSweeper creates new prt of Sweeper
func NewSweeper(ctx context.Context, workDir string, period uint) *Sweeper {
return &Sweeper{ctx, workDir, period}
}
//Start to work
func (s *Sweeper) Start() {
go s.loop()
log.Info("Logger sweeper is started")
}
func (s *Sweeper) loop() {
//Apply default if needed before starting
if s.period == 0 {
s.period = 1
}
defer func() {
log.Info("Logger sweeper is stopped")
}()
//First run
go s.clear()
ticker := time.NewTicker(time.Duration(s.period*oneDay+5) * time.Second)
defer ticker.Stop()
for {
select {
case <-s.context.Done():
return
case <-ticker.C:
go s.clear()
}
}
}
func (s *Sweeper) clear() {
var (
cleared uint
count = &cleared
)
log.Info("Start to clear the job outdated log files")
defer func() {
log.Infof("%d job outdated log files cleared", *count)
}()
logFiles, err := ioutil.ReadDir(s.workDir)
if err != nil {
log.Errorf("Failed to get the outdated log files under '%s' with error: %s\n", s.workDir, err)
return
}
if len(logFiles) == 0 {
return
}
for _, logFile := range logFiles {
if logFile.ModTime().Add(oneDay * time.Second).Before(time.Now()) {
logFilePath := fmt.Sprintf("%s%s%s", s.workDir, string(os.PathSeparator), logFile.Name())
if err := os.Remove(logFilePath); err == nil {
cleared++
} else {
log.Warningf("Failed to remove log file '%s'\n", logFilePath)
}
}
}
}

View File

@ -367,15 +367,15 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
//Let it run in a separate goroutine to avoid waiting more time
go func() {
var (
hookUrl string
hookURL string
ok bool
err error
)
hookUrl, ok = rjs.hookStore.Get(jobID)
hookURL, ok = rjs.hookStore.Get(jobID)
if !ok {
//Retrieve from backend
hookUrl, err = rjs.getHook(jobID)
hookURL, err = rjs.getHook(jobID)
if err != nil {
//logged and exit
log.Warningf("no status hook found for job %s\n, abandon status reporting", jobID)
@ -385,7 +385,7 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
item := &queueItem{
op: opReportStatus,
data: []string{jobID, hookUrl, status, checkIn},
data: []string{jobID, hookURL, status, checkIn},
}
rjs.processChan <- item
@ -609,6 +609,10 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
}
func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
if jobStats.Stats == nil {
return errors.New("malformed job stats object")
}
conn := rjs.redisPool.Get()
defer conn.Close()

View File

@ -18,6 +18,7 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job/impl"
"github.com/vmware/harbor/src/jobservice_v2/job/impl/scan"
"github.com/vmware/harbor/src/jobservice_v2/logger"
"github.com/vmware/harbor/src/jobservice_v2/pool"
)
@ -68,6 +69,10 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
log.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
//Start outdated log files sweeper
logSweeper := logger.NewSweeper(ctx, config.GetLogBasePath(), config.GetLogArchivePeriod())
logSweeper.Start()
//Block here
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)