mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-26 18:48:02 +01:00
Merge pull request #6371 from mmpei/6144-JSLoggerInDB
6144 js logger in db
This commit is contained in:
commit
d82482499b
@ -1,2 +1,12 @@
|
||||
ALTER TABLE properties ALTER COLUMN v TYPE varchar(1024);
|
||||
DELETE FROM properties where k='scan_all_policy';
|
||||
|
||||
create table job_log (
|
||||
log_id SERIAL NOT NULL,
|
||||
job_uuid varchar (64) NOT NULL,
|
||||
creation_time timestamp default CURRENT_TIMESTAMP,
|
||||
content text,
|
||||
primary key (log_id)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX job_log_uuid ON job_log (job_uuid);
|
||||
|
40
src/common/dao/joblog.go
Normal file
40
src/common/dao/joblog.go
Normal file
@ -0,0 +1,40 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CreateOrUpdateJobLog ...
|
||||
func CreateOrUpdateJobLog(log *models.JobLog) (int64, error) {
|
||||
o := GetOrmer()
|
||||
count, err := o.InsertOrUpdate(log, "job_uuid")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// GetJobLog ...
|
||||
func GetJobLog(uuid string) (*models.JobLog, error) {
|
||||
o := GetOrmer()
|
||||
jl := models.JobLog{UUID: uuid}
|
||||
err := o.Read(&jl, "UUID")
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, err
|
||||
}
|
||||
return &jl, nil
|
||||
}
|
||||
|
||||
// DeleteJobLogsBefore ...
|
||||
func DeleteJobLogsBefore(t time.Time) (int64, error) {
|
||||
o := GetOrmer()
|
||||
sql := `delete from job_log where creation_time < ?`
|
||||
res, err := o.Raw(sql, t).Exec()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
44
src/common/dao/joblog_test.go
Normal file
44
src/common/dao/joblog_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMethodsOfJobLog(t *testing.T) {
|
||||
uuid := "uuid_for_unit_test"
|
||||
now := time.Now()
|
||||
content := "content for unit text"
|
||||
jobLog := &models.JobLog{
|
||||
UUID: uuid,
|
||||
CreationTime: now,
|
||||
Content: content,
|
||||
}
|
||||
|
||||
// create
|
||||
_, err := CreateOrUpdateJobLog(jobLog)
|
||||
require.Nil(t, err)
|
||||
|
||||
// update
|
||||
updateContent := "content for unit text update"
|
||||
jobLog.Content = updateContent
|
||||
_, err = CreateOrUpdateJobLog(jobLog)
|
||||
require.Nil(t, err)
|
||||
|
||||
// get
|
||||
log, err := GetJobLog(uuid)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, now.Second(), log.CreationTime.Second())
|
||||
assert.Equal(t, updateContent, log.Content)
|
||||
assert.Equal(t, jobLog.LogID, log.LogID)
|
||||
|
||||
// delete
|
||||
count, err := DeleteJobLogsBefore(time.Now().Add(time.Duration(time.Minute)))
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, int64(1), count)
|
||||
}
|
@ -36,5 +36,6 @@ func init() {
|
||||
new(Label),
|
||||
new(ResourceLabel),
|
||||
new(UserGroup),
|
||||
new(AdminJob))
|
||||
new(AdminJob),
|
||||
new(JobLog))
|
||||
}
|
||||
|
21
src/common/models/joblog.go
Normal file
21
src/common/models/joblog.go
Normal file
@ -0,0 +1,21 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// JobLogTable is the name of the table that record the job execution result.
|
||||
const JobLogTable = "job_log"
|
||||
|
||||
// JobLog holds information about logs which are used to record the result of execution of a job.
|
||||
type JobLog struct {
|
||||
LogID int `orm:"pk;auto;column(log_id)" json:"log_id"`
|
||||
UUID string `orm:"column(job_uuid)" json:"uuid"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
Content string `orm:"column(content)" json:"content"`
|
||||
}
|
||||
|
||||
// TableName is required by by beego orm to map JobLog to table job_log
|
||||
func (a *JobLog) TableName() string {
|
||||
return JobLogTable
|
||||
}
|
@ -30,6 +30,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
|
||||
jmodel "github.com/goharbor/harbor/src/jobservice/models"
|
||||
)
|
||||
|
||||
@ -95,7 +96,14 @@ func (c *Context) Init() error {
|
||||
|
||||
db := getDBFromConfig(configs)
|
||||
|
||||
return dao.InitDatabase(db)
|
||||
err = dao.InitDatabase(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize DB finished
|
||||
initDBCompleted()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build implements the same method in env.JobContext interface
|
||||
@ -237,22 +245,28 @@ func setLoggers(setter func(lg logger.Interface), jobID string) error {
|
||||
lOptions := []logger.Option{}
|
||||
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
|
||||
// For running job, the depth should be 5
|
||||
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameStdOutput {
|
||||
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameStdOutput || lc.Name == logger.LoggerNameDB {
|
||||
if lc.Settings == nil {
|
||||
lc.Settings = map[string]interface{}{}
|
||||
}
|
||||
lc.Settings["depth"] = 5
|
||||
}
|
||||
if lc.Name == logger.LoggerNameFile {
|
||||
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameDB {
|
||||
// Need extra param
|
||||
fSettings := map[string]interface{}{}
|
||||
for k, v := range lc.Settings {
|
||||
// Copy settings
|
||||
fSettings[k] = v
|
||||
}
|
||||
// Append file name param
|
||||
fSettings["filename"] = fmt.Sprintf("%s.log", jobID)
|
||||
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, fSettings))
|
||||
if lc.Name == logger.LoggerNameFile {
|
||||
// Append file name param
|
||||
fSettings["filename"] = fmt.Sprintf("%s.log", jobID)
|
||||
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, fSettings))
|
||||
} else { // DB Logger
|
||||
// Append DB key
|
||||
fSettings["key"] = jobID
|
||||
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, fSettings))
|
||||
}
|
||||
} else {
|
||||
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, lc.Settings))
|
||||
}
|
||||
@ -267,3 +281,8 @@ func setLoggers(setter func(lg logger.Interface), jobID string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func initDBCompleted() error {
|
||||
sweeper.PrepareDBSweep()
|
||||
return nil
|
||||
}
|
||||
|
105
src/jobservice/logger/backend/db_logger.go
Normal file
105
src/jobservice/logger/backend/db_logger.go
Normal file
@ -0,0 +1,105 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
)
|
||||
|
||||
// DBLogger is an implementation of logger.Interface.
|
||||
// It outputs logs to PGSql.
|
||||
type DBLogger struct {
|
||||
backendLogger *log.Logger
|
||||
bw *bufio.Writer
|
||||
buffer *bytes.Buffer
|
||||
key string
|
||||
}
|
||||
|
||||
// NewDBLogger crates a new DB logger
|
||||
// nil might be returned
|
||||
func NewDBLogger(key string, level string, depth int) (*DBLogger, error) {
|
||||
buffer := bytes.NewBuffer(make([]byte, 0))
|
||||
bw := bufio.NewWriter(buffer)
|
||||
logLevel := parseLevel(level)
|
||||
|
||||
backendLogger := log.New(bw, log.NewTextFormatter(), logLevel, depth)
|
||||
|
||||
return &DBLogger{
|
||||
backendLogger: backendLogger,
|
||||
bw: bw,
|
||||
buffer: buffer,
|
||||
key: key,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close the opened io stream and flush data into DB
|
||||
// Implements logger.Closer interface
|
||||
func (dbl *DBLogger) Close() error {
|
||||
err := dbl.bw.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobLog := models.JobLog{
|
||||
UUID: dbl.key,
|
||||
Content: dbl.buffer.String(),
|
||||
}
|
||||
|
||||
_, err = dao.CreateOrUpdateJobLog(&jobLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Debug ...
|
||||
func (dbl *DBLogger) Debug(v ...interface{}) {
|
||||
dbl.backendLogger.Debug(v...)
|
||||
}
|
||||
|
||||
// Debugf with format
|
||||
func (dbl *DBLogger) Debugf(format string, v ...interface{}) {
|
||||
dbl.backendLogger.Debugf(format, v...)
|
||||
}
|
||||
|
||||
// Info ...
|
||||
func (dbl *DBLogger) Info(v ...interface{}) {
|
||||
dbl.backendLogger.Info(v...)
|
||||
}
|
||||
|
||||
// Infof with format
|
||||
func (dbl *DBLogger) Infof(format string, v ...interface{}) {
|
||||
dbl.backendLogger.Infof(format, v...)
|
||||
}
|
||||
|
||||
// Warning ...
|
||||
func (dbl *DBLogger) Warning(v ...interface{}) {
|
||||
dbl.backendLogger.Warning(v...)
|
||||
}
|
||||
|
||||
// Warningf with format
|
||||
func (dbl *DBLogger) Warningf(format string, v ...interface{}) {
|
||||
dbl.backendLogger.Warningf(format, v...)
|
||||
}
|
||||
|
||||
// Error ...
|
||||
func (dbl *DBLogger) Error(v ...interface{}) {
|
||||
dbl.backendLogger.Error(v...)
|
||||
}
|
||||
|
||||
// Errorf with format
|
||||
func (dbl *DBLogger) Errorf(format string, v ...interface{}) {
|
||||
dbl.backendLogger.Errorf(format, v...)
|
||||
}
|
||||
|
||||
// Fatal error
|
||||
func (dbl *DBLogger) Fatal(v ...interface{}) {
|
||||
dbl.backendLogger.Fatal(v...)
|
||||
}
|
||||
|
||||
// Fatalf error
|
||||
func (dbl *DBLogger) Fatalf(format string, v ...interface{}) {
|
||||
dbl.backendLogger.Fatalf(format, v...)
|
||||
}
|
64
src/jobservice/logger/backend/db_logger_test.go
Normal file
64
src/jobservice/logger/backend/db_logger_test.go
Normal file
@ -0,0 +1,64 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger/getter"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
|
||||
"github.com/stretchr/testify/require"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
||||
// databases := []string{"mysql", "sqlite"}
|
||||
databases := []string{"postgresql"}
|
||||
for _, database := range databases {
|
||||
log.Infof("run test cases for database: %s", database)
|
||||
|
||||
result := 1
|
||||
switch database {
|
||||
case "postgresql":
|
||||
dao.PrepareTestForPostgresSQL()
|
||||
default:
|
||||
log.Fatalf("invalid database: %s", database)
|
||||
}
|
||||
|
||||
result = m.Run()
|
||||
|
||||
if result != 0 {
|
||||
os.Exit(result)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Test DB logger
|
||||
func TestDBLogger(t *testing.T) {
|
||||
uuid := "uuid_for_unit_test"
|
||||
l, err := NewDBLogger(uuid, "DEBUG", 4)
|
||||
require.Nil(t, err)
|
||||
|
||||
l.Debug("JobLog Debug: TestDBLogger")
|
||||
l.Info("JobLog Info: TestDBLogger")
|
||||
l.Warning("JobLog Warning: TestDBLogger")
|
||||
l.Error("JobLog Error: TestDBLogger")
|
||||
l.Debugf("JobLog Debugf: %s", "TestDBLogger")
|
||||
l.Infof("JobLog Infof: %s", "TestDBLogger")
|
||||
l.Warningf("JobLog Warningf: %s", "TestDBLogger")
|
||||
l.Errorf("JobLog Errorf: %s", "TestDBLogger")
|
||||
|
||||
l.Close()
|
||||
|
||||
dbGetter := getter.NewDBGetter()
|
||||
ll, err := dbGetter.Retrieve(uuid)
|
||||
require.Nil(t, err)
|
||||
log.Infof("get logger %s", ll)
|
||||
|
||||
sweeper.PrepareDBSweep()
|
||||
dbSweeper := sweeper.NewDBSweeper(-1)
|
||||
count, err := dbSweeper.Sweep()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 1, count)
|
||||
}
|
@ -1,5 +1,7 @@
|
||||
package logger
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Entry provides unique interfaces on top of multiple logger backends.
|
||||
// Entry also implements @Interface.
|
||||
type Entry struct {
|
||||
@ -82,3 +84,24 @@ func (e *Entry) Fatalf(format string, v ...interface{}) {
|
||||
l.Fatalf(format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Close logger
|
||||
func (e *Entry) Close() error {
|
||||
var errMsg string
|
||||
for _, l := range e.loggers {
|
||||
if closer, ok := l.(Closer); ok {
|
||||
err := closer.Close()
|
||||
if err != nil {
|
||||
if errMsg == "" {
|
||||
errMsg = fmt.Sprintf("logger: %s, err: %s", GetLoggerName(l), err)
|
||||
} else {
|
||||
errMsg = fmt.Sprintf("%s; logger: %s, err: %s", errMsg, GetLoggerName(l), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if errMsg != "" {
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -62,3 +62,24 @@ func StdFactory(options ...OptionItem) (Interface, error) {
|
||||
|
||||
return backend.NewStdOutputLogger(level, output, depth), nil
|
||||
}
|
||||
|
||||
// DBFactory is factory of file logger
|
||||
func DBFactory(options ...OptionItem) (Interface, error) {
|
||||
var (
|
||||
level, key string
|
||||
depth int
|
||||
)
|
||||
for _, op := range options {
|
||||
switch op.Field() {
|
||||
case "level":
|
||||
level = op.String()
|
||||
case "key":
|
||||
key = op.String()
|
||||
case "depth":
|
||||
depth = op.Int()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return backend.NewDBLogger(key, level, depth)
|
||||
}
|
||||
|
29
src/jobservice/logger/getter/db_getter.go
Normal file
29
src/jobservice/logger/getter/db_getter.go
Normal file
@ -0,0 +1,29 @@
|
||||
package getter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
)
|
||||
|
||||
// DBGetter is responsible for retrieving DB log data
|
||||
type DBGetter struct {
|
||||
}
|
||||
|
||||
// NewDBGetter is constructor of DBGetter
|
||||
func NewDBGetter() *DBGetter {
|
||||
return &DBGetter{}
|
||||
}
|
||||
|
||||
// Retrieve implements @Interface.Retrieve
|
||||
func (dbg *DBGetter) Retrieve(logID string) ([]byte, error) {
|
||||
if len(logID) == 0 {
|
||||
return nil, errors.New("empty log identify")
|
||||
}
|
||||
|
||||
jobLog, err := dao.GetJobLog(logID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []byte(jobLog.Content), nil
|
||||
}
|
@ -25,3 +25,8 @@ func FileGetterFactory(options ...OptionItem) (getter.Interface, error) {
|
||||
|
||||
return getter.NewFileGetter(baseDir), nil
|
||||
}
|
||||
|
||||
// DBGetterFactory creates a getter for the DB logger
|
||||
func DBGetterFactory(options ...OptionItem) (getter.Interface, error) {
|
||||
return getter.NewDBGetter(), nil
|
||||
}
|
||||
|
@ -1,12 +1,18 @@
|
||||
package logger
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/logger/backend"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
// LoggerNameFile is unique name of the file logger.
|
||||
LoggerNameFile = "FILE"
|
||||
// LoggerNameStdOutput is the unique name of the std logger.
|
||||
LoggerNameStdOutput = "STD_OUTPUT"
|
||||
// LoggerNameDB is the unique name of the DB logger.
|
||||
LoggerNameDB = "DB"
|
||||
)
|
||||
|
||||
// Declaration is used to declare a supported logger.
|
||||
@ -28,6 +34,8 @@ var knownLoggers = map[string]*Declaration{
|
||||
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
|
||||
// STD output(both stdout and stderr) logger
|
||||
LoggerNameStdOutput: {StdFactory, nil, nil, true},
|
||||
// DB logger
|
||||
LoggerNameDB: {DBFactory, DBSweeperFactory, DBGetterFactory, false},
|
||||
}
|
||||
|
||||
// IsKnownLogger checks if the logger is supported with name.
|
||||
@ -79,3 +87,24 @@ func IsKnownLevel(level string) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// GetLoggerName return a logger name by Interface
|
||||
func GetLoggerName(l Interface) string {
|
||||
var name string
|
||||
if l == nil {
|
||||
return name
|
||||
}
|
||||
|
||||
switch l.(type) {
|
||||
case *backend.DBLogger:
|
||||
name = LoggerNameDB
|
||||
case *backend.StdOutputLogger:
|
||||
name = LoggerNameStdOutput
|
||||
case *backend.FileLogger:
|
||||
name = LoggerNameFile
|
||||
default:
|
||||
name = reflect.TypeOf(l).String()
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
24
src/jobservice/logger/known_loggers_test.go
Normal file
24
src/jobservice/logger/known_loggers_test.go
Normal file
@ -0,0 +1,24 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/logger/backend"
|
||||
"github.com/stretchr/testify/require"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test GetLoggerName
|
||||
func TestGetLoggerName(t *testing.T) {
|
||||
uuid := "uuid_for_unit_test"
|
||||
l, err := backend.NewDBLogger(uuid, "DEBUG", 4)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, LoggerNameDB, GetLoggerName(l))
|
||||
|
||||
stdLog := backend.NewStdOutputLogger("DEBUG", backend.StdErr, 4)
|
||||
require.Equal(t, LoggerNameStdOutput, GetLoggerName(stdLog))
|
||||
|
||||
fileLog, err := backend.NewFileLogger("DEBUG", path.Join(os.TempDir(), "TestFileLogger.log"), 4)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, LoggerNameFile, GetLoggerName(fileLog))
|
||||
}
|
57
src/jobservice/logger/sweeper/db_sweeper.go
Normal file
57
src/jobservice/logger/sweeper/db_sweeper.go
Normal file
@ -0,0 +1,57 @@
|
||||
package sweeper
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"time"
|
||||
)
|
||||
|
||||
var dbInit = make(chan int, 1)
|
||||
var isDBInit = false
|
||||
|
||||
// DBSweeper is used to sweep the DB logs
|
||||
type DBSweeper struct {
|
||||
duration int
|
||||
}
|
||||
|
||||
// NewDBSweeper is constructor of DBSweeper
|
||||
func NewDBSweeper(duration int) *DBSweeper {
|
||||
return &DBSweeper{
|
||||
duration: duration,
|
||||
}
|
||||
}
|
||||
|
||||
// Sweep logs
|
||||
func (dbs *DBSweeper) Sweep() (int, error) {
|
||||
// DB initialization not completed, waiting
|
||||
WaitingDBInit()
|
||||
|
||||
// Start to sweep logs
|
||||
before := time.Now().Add(time.Duration(dbs.duration) * oneDay * -1)
|
||||
count, err := dao.DeleteJobLogsBefore(before)
|
||||
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("sweep logs in DB failed before %s with error: %s", before, err)
|
||||
}
|
||||
|
||||
return int(count), nil
|
||||
}
|
||||
|
||||
// Duration for sweeping
|
||||
func (dbs *DBSweeper) Duration() int {
|
||||
return dbs.duration
|
||||
}
|
||||
|
||||
// WaitingDBInit waiting DB init
|
||||
func WaitingDBInit() {
|
||||
if !isDBInit {
|
||||
<-dbInit
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareDBSweep invoked after DB init
|
||||
func PrepareDBSweep() error {
|
||||
isDBInit = true
|
||||
dbInit <- 1
|
||||
return nil
|
||||
}
|
@ -30,3 +30,19 @@ func FileSweeperFactory(options ...OptionItem) (sweeper.Interface, error) {
|
||||
|
||||
return sweeper.NewFileSweeper(workDir, duration), nil
|
||||
}
|
||||
|
||||
// DBSweeperFactory creates DB sweeper.
|
||||
func DBSweeperFactory(options ...OptionItem) (sweeper.Interface, error) {
|
||||
var duration = 1
|
||||
for _, op := range options {
|
||||
switch op.Field() {
|
||||
case "duration":
|
||||
if op.Int() > 0 {
|
||||
duration = op.Int()
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return sweeper.NewDBSweeper(duration), nil
|
||||
}
|
||||
|
@ -107,7 +107,10 @@ func (rj *RedisJob) Run(j *work.Job) error {
|
||||
defer func() {
|
||||
// Close open io stream first
|
||||
if closer, ok := execContext.GetLogger().(logger.Closer); ok {
|
||||
closer.Close()
|
||||
err := closer.Close()
|
||||
if err != nil {
|
||||
logger.Errorf("Close job logger failed: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user