Fix the consume too much CPU issue

1. Update execution status during the upgrade
2. Refine the execution sweeper

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2021-03-10 16:24:20 +08:00
parent 7694c131a4
commit 8b1817be0f
2 changed files with 47 additions and 20 deletions

View File

@ -0,0 +1,4 @@
/*fixes #14358*/
UPDATE execution SET status='Success' WHERE status='Succeed';
CREATE INDEX IF NOT EXISTS task_execution_id_idx ON task (execution_id);

View File

@ -136,32 +136,54 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
}
func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error {
count := executionSweeperCount[vendorType]
if count == 0 {
log.Debugf("the execution sweeper count doesn't set for %s, skip sweep", vendorType)
size := int64(executionSweeperCount[vendorType])
if size == 0 {
log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType)
return nil
}
for {
// the function "List" of the execution manager returns the execution records
// ordered by start time. After the sorting is supported in query, we should
// specify the sorting explicitly
// the execution records in second page are always the candidates should to be swept
executions, err := e.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
"VendorID": vendorID,
},
PageNumber: 2,
PageSize: int64(count),
})
// get the #size execution record
query := &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
"VendorID": vendorID,
},
Sorts: []*q.Sort{
{
Key: "StartTime",
DESC: true,
}},
PageSize: 1,
PageNumber: size,
}
executions, err := e.executionDAO.List(ctx, query)
if err != nil {
return err
}
// list is null means that the execution count < size, return directly
if len(executions) == 0 {
return nil
}
query.Keywords["StartTime"] = &q.Range{
Max: executions[0].StartTime,
}
totalOfCandidate, err := e.executionDAO.Count(ctx, query)
if err != nil {
return err
}
// n is the page count of all candidates
n := totalOfCandidate / 1000
if totalOfCandidate%1000 > 0 {
n = n + 1
}
query.PageSize = 1000
for i := n; i >= 1; i-- {
query.PageNumber = i
executions, err := e.List(ctx, query)
if err != nil {
return err
}
// no execution records need to be swept, return directly
if len(executions) == 0 {
return nil
}
for _, execution := range executions {
// if the status of the execution isn't final, skip
if !job.Status(execution.Status).Final() {
@ -176,6 +198,7 @@ func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorI
}
}
}
return nil
}
func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {