Merge pull request #13909 from ywk253100/210106_task

Provide a mechanism to sweep the execution/task records in task manager
This commit is contained in:
Wenkai Yin(尹文开) 2021-01-07 13:44:21 +08:00 committed by GitHub
commit 3408f0b577
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 6 deletions

View File

@ -60,6 +60,11 @@ const (
robotIDKey = "robot_id"
)
func init() {
// keep only the latest created 5 scan all execution records
task.SetExecutionSweeperCount(job.ImageScanAllJob, 5)
}
// uuidGenerator is a func template which is for generating UUID.
type uuidGenerator func() (string, error)

View File

@ -24,13 +24,16 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task/dao"
)
var (
// ExecMgr is a global execution manager instance
ExecMgr = NewExecutionManager()
ExecMgr = NewExecutionManager()
executionSweeperCount = map[string]uint8{}
defaultExecutionSweeperCount uint8 = 50
)
// ExecutionManager manages executions.
@ -75,6 +78,7 @@ func NewExecutionManager() ExecutionManager {
executionDAO: dao.NewExecutionDAO(),
taskMgr: Mgr,
taskDAO: dao.NewTaskDAO(),
ormCreator: orm.Crt,
}
}
@ -82,6 +86,7 @@ type executionManager struct {
executionDAO dao.ExecutionDAO
taskMgr Manager
taskDAO dao.TaskDAO
ormCreator orm.Creator
}
func (e *executionManager) Count(ctx context.Context, query *q.Query) (int64, error) {
@ -109,7 +114,58 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
StartTime: now,
UpdateTime: now,
}
return e.executionDAO.Create(ctx, execution)
id, err := e.executionDAO.Create(ctx, execution)
if err != nil {
return 0, err
}
// sweep the execution records to avoid the execution/task records explosion
go func() {
ctx := orm.NewContext(context.Background(), e.ormCreator.Create())
if err := e.sweep(ctx, vendorType); err != nil {
log.Errorf("failed to sweep the executions of %s: %v", vendorType, err)
return
}
}()
return id, nil
}
func (e *executionManager) sweep(ctx context.Context, vendorType string) error {
count := executionSweeperCount[vendorType]
if count == 0 {
count = defaultExecutionSweeperCount
}
for {
// the function "List" of the execution manager returns the execution records
// ordered by start time. After the sorting is supported in query, we should
// specify the sorting explicitly
// the execution records in second page are always the candidates should to be swept
executions, err := e.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
},
PageNumber: 2,
PageSize: int64(count),
})
if err != nil {
return err
}
// no execution records need to be swept, return directly
if len(executions) == 0 {
return nil
}
for _, execution := range executions {
// if the status of the execution isn't final, skip
if !job.Status(execution.Status).Final() {
continue
}
if err = e.Delete(ctx, execution.ID); err != nil {
log.Errorf("failed to delete the execution %d: %v", execution.ID, err)
continue
}
}
}
}
func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
@ -297,3 +353,10 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
return exec
}
// SetExecutionSweeperCount sets the count of execution records retained by the sweeper
// If no count is set for the specified vendor, the default value will be used
// The sweeper retains the latest created #count execution records for the specified vendor
func SetExecutionSweeperCount(vendorType string, count uint8) {
executionSweeperCount[vendorType] = count
}

View File

@ -22,26 +22,30 @@ import (
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task/dao"
"github.com/goharbor/harbor/src/testing/lib/orm"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
type executionManagerTestSuite struct {
suite.Suite
execMgr *executionManager
taskMgr *mockTaskManager
execDAO *mockExecutionDAO
taskDAO *mockTaskDAO
execMgr *executionManager
taskMgr *mockTaskManager
execDAO *mockExecutionDAO
taskDAO *mockTaskDAO
ormCreator *orm.Creator
}
func (e *executionManagerTestSuite) SetupTest() {
e.taskMgr = &mockTaskManager{}
e.execDAO = &mockExecutionDAO{}
e.taskDAO = &mockTaskDAO{}
e.ormCreator = &orm.Creator{}
e.execMgr = &executionManager{
executionDAO: e.execDAO,
taskMgr: e.taskMgr,
taskDAO: e.taskDAO,
ormCreator: e.ormCreator,
}
}
@ -55,11 +59,16 @@ func (e *executionManagerTestSuite) TestCount() {
func (e *executionManagerTestSuite) TestCreate() {
e.execDAO.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
e.ormCreator.On("Create").Return(&orm.FakeOrmer{})
e.execDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
id, err := e.execMgr.Create(nil, "vendor", 0, ExecutionTriggerManual,
map[string]interface{}{"k": "v"})
e.Require().Nil(err)
e.Equal(int64(1), id)
// sleep to make sure the function in the goroutine run
time.Sleep(1 * time.Second)
e.execDAO.AssertExpectations(e.T())
e.ormCreator.AssertExpectations(e.T())
}
func (e *executionManagerTestSuite) TestMarkDone() {