mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-22 23:51:27 +01:00
Provide a mechanism to sweep the execution/task records in task manager
Provide a mechanism to sweep the execution/task records in task manager Fixes #13888 Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
a1a5ef5e20
commit
fb871dbbe8
@ -60,6 +60,11 @@ const (
|
|||||||
robotIDKey = "robot_id"
|
robotIDKey = "robot_id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// keep only the latest created 5 scan all execution records
|
||||||
|
task.SetExecutionSweeperCount(job.ImageScanAllJob, 5)
|
||||||
|
}
|
||||||
|
|
||||||
// uuidGenerator is a func template which is for generating UUID.
|
// uuidGenerator is a func template which is for generating UUID.
|
||||||
type uuidGenerator func() (string, error)
|
type uuidGenerator func() (string, error)
|
||||||
|
|
||||||
|
@ -24,13 +24,16 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
"github.com/goharbor/harbor/src/pkg/task/dao"
|
"github.com/goharbor/harbor/src/pkg/task/dao"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ExecMgr is a global execution manager instance
|
// ExecMgr is a global execution manager instance
|
||||||
ExecMgr = NewExecutionManager()
|
ExecMgr = NewExecutionManager()
|
||||||
|
executionSweeperCount = map[string]uint8{}
|
||||||
|
defaultExecutionSweeperCount uint8 = 50
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecutionManager manages executions.
|
// ExecutionManager manages executions.
|
||||||
@ -75,6 +78,7 @@ func NewExecutionManager() ExecutionManager {
|
|||||||
executionDAO: dao.NewExecutionDAO(),
|
executionDAO: dao.NewExecutionDAO(),
|
||||||
taskMgr: Mgr,
|
taskMgr: Mgr,
|
||||||
taskDAO: dao.NewTaskDAO(),
|
taskDAO: dao.NewTaskDAO(),
|
||||||
|
ormCreator: orm.Crt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,6 +86,7 @@ type executionManager struct {
|
|||||||
executionDAO dao.ExecutionDAO
|
executionDAO dao.ExecutionDAO
|
||||||
taskMgr Manager
|
taskMgr Manager
|
||||||
taskDAO dao.TaskDAO
|
taskDAO dao.TaskDAO
|
||||||
|
ormCreator orm.Creator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) {
|
func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) {
|
||||||
@ -109,7 +114,58 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
|
|||||||
StartTime: now,
|
StartTime: now,
|
||||||
UpdateTime: now,
|
UpdateTime: now,
|
||||||
}
|
}
|
||||||
return e.executionDAO.Create(ctx, execution)
|
id, err := e.executionDAO.Create(ctx, execution)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweep the execution records to avoid the execution/task records explosion
|
||||||
|
go func() {
|
||||||
|
ctx := orm.NewContext(context.Background(), e.ormCreator.Create())
|
||||||
|
if err := e.sweep(ctx, vendorType); err != nil {
|
||||||
|
log.Errorf("failed to sweep the executions of %s: %v", vendorType, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executionManager) sweep(ctx context.Context, vendorType string) error {
|
||||||
|
count := executionSweeperCount[vendorType]
|
||||||
|
if count == 0 {
|
||||||
|
count = defaultExecutionSweeperCount
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
// the function "List" of the execution manager returns the execution records
|
||||||
|
// ordered by start time. After the sorting is supported in query, we should
|
||||||
|
// specify the sorting explicitly
|
||||||
|
// the execution records in second page are always the candidates should to be swept
|
||||||
|
executions, err := e.List(ctx, &q.Query{
|
||||||
|
Keywords: map[string]interface{}{
|
||||||
|
"VendorType": vendorType,
|
||||||
|
},
|
||||||
|
PageNumber: 2,
|
||||||
|
PageSize: int64(count),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// no execution records need to be swept, return directly
|
||||||
|
if len(executions) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, execution := range executions {
|
||||||
|
// if the status of the execution isn't final, skip
|
||||||
|
if !job.Status(execution.Status).Final() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = e.Delete(ctx, execution.ID); err != nil {
|
||||||
|
log.Errorf("failed to delete the execution %d: %v", execution.ID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
|
func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
|
||||||
@ -297,3 +353,10 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
|
|||||||
|
|
||||||
return exec
|
return exec
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetExecutionSweeperCount sets the count of execution records retained by the sweeper
|
||||||
|
// If no count is set for the specified vendor, the default value will be used
|
||||||
|
// The sweeper retains the latest created #count execution records for the specified vendor
|
||||||
|
func SetExecutionSweeperCount(vendorType string, count uint8) {
|
||||||
|
executionSweeperCount[vendorType] = count
|
||||||
|
}
|
||||||
|
@ -22,26 +22,30 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
"github.com/goharbor/harbor/src/pkg/task/dao"
|
"github.com/goharbor/harbor/src/pkg/task/dao"
|
||||||
|
"github.com/goharbor/harbor/src/testing/lib/orm"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
)
|
)
|
||||||
|
|
||||||
type executionManagerTestSuite struct {
|
type executionManagerTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
execMgr *executionManager
|
execMgr *executionManager
|
||||||
taskMgr *mockTaskManager
|
taskMgr *mockTaskManager
|
||||||
execDAO *mockExecutionDAO
|
execDAO *mockExecutionDAO
|
||||||
taskDAO *mockTaskDAO
|
taskDAO *mockTaskDAO
|
||||||
|
ormCreator *orm.Creator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManagerTestSuite) SetupTest() {
|
func (e *executionManagerTestSuite) SetupTest() {
|
||||||
e.taskMgr = &mockTaskManager{}
|
e.taskMgr = &mockTaskManager{}
|
||||||
e.execDAO = &mockExecutionDAO{}
|
e.execDAO = &mockExecutionDAO{}
|
||||||
e.taskDAO = &mockTaskDAO{}
|
e.taskDAO = &mockTaskDAO{}
|
||||||
|
e.ormCreator = &orm.Creator{}
|
||||||
e.execMgr = &executionManager{
|
e.execMgr = &executionManager{
|
||||||
executionDAO: e.execDAO,
|
executionDAO: e.execDAO,
|
||||||
taskMgr: e.taskMgr,
|
taskMgr: e.taskMgr,
|
||||||
taskDAO: e.taskDAO,
|
taskDAO: e.taskDAO,
|
||||||
|
ormCreator: e.ormCreator,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,11 +59,16 @@ func (e *executionManagerTestSuite) TestCount() {
|
|||||||
|
|
||||||
func (e *executionManagerTestSuite) TestCreate() {
|
func (e *executionManagerTestSuite) TestCreate() {
|
||||||
e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
|
e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
e.ormCreator.On("Create").Return(&orm.FakeOrmer{})
|
||||||
|
e.execDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
|
||||||
id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual,
|
id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual,
|
||||||
map[string]interface{}{"k": "v"})
|
map[string]interface{}{"k": "v"})
|
||||||
e.Require().Nil(err)
|
e.Require().Nil(err)
|
||||||
e.Equal(int64(1), id)
|
e.Equal(int64(1), id)
|
||||||
|
// sleep to make sure the function in the goroutine run
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
e.execDAO.AssertExpectations(e.T())
|
e.execDAO.AssertExpectations(e.T())
|
||||||
|
e.ormCreator.AssertExpectations(e.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManagerTestSuite) TestMarkDone() {
|
func (e *executionManagerTestSuite) TestMarkDone() {
|
||||||
|
Loading…
Reference in New Issue
Block a user