From 8b1817be0f189db4b1e1cf3d31a1e2a1724ed1c4 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 10 Mar 2021 16:24:20 +0800 Subject: [PATCH] Fix the consume too much CPU issue 1. Update execution status during the upgrade 2. Refine the execution sweeper Signed-off-by: Wenkai Yin --- .../postgresql/0051_2.2.1_schema.up.sql | 4 ++ src/pkg/task/execution.go | 63 +++++++++++++------ 2 files changed, 47 insertions(+), 20 deletions(-) create mode 100644 make/migrations/postgresql/0051_2.2.1_schema.up.sql diff --git a/make/migrations/postgresql/0051_2.2.1_schema.up.sql b/make/migrations/postgresql/0051_2.2.1_schema.up.sql new file mode 100644 index 000000000..425da7fd9 --- /dev/null +++ b/make/migrations/postgresql/0051_2.2.1_schema.up.sql @@ -0,0 +1,4 @@ +/*fixes #14358*/ +UPDATE execution SET status='Success' WHERE status='Succeed'; + +CREATE INDEX IF NOT EXISTS task_execution_id_idx ON task (execution_id); \ No newline at end of file diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index 24a6a7df7..f83c7762e 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -136,32 +136,54 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor } func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error { - count := executionSweeperCount[vendorType] - if count == 0 { - log.Debugf("the execution sweeper count doesn't set for %s, skip sweep", vendorType) + size := int64(executionSweeperCount[vendorType]) + if size == 0 { + log.Debugf("the execution sweeper size doesn't set for %s, skip sweep", vendorType) return nil } - for { - // the function "List" of the execution manager returns the execution records - // ordered by start time. After the sorting is supported in query, we should - // specify the sorting explicitly - // the execution records in second page are always the candidates should to be swept - executions, err := e.List(ctx, &q.Query{ - Keywords: map[string]interface{}{ - "VendorType": vendorType, - "VendorID": vendorID, - }, - PageNumber: 2, - PageSize: int64(count), - }) + // get the #size execution record + query := &q.Query{ + Keywords: map[string]interface{}{ + "VendorType": vendorType, + "VendorID": vendorID, + }, + Sorts: []*q.Sort{ + { + Key: "StartTime", + DESC: true, + }}, + PageSize: 1, + PageNumber: size, + } + executions, err := e.executionDAO.List(ctx, query) + if err != nil { + return err + } + // list is null means that the execution count < size, return directly + if len(executions) == 0 { + return nil + } + + query.Keywords["StartTime"] = &q.Range{ + Max: executions[0].StartTime, + } + totalOfCandidate, err := e.executionDAO.Count(ctx, query) + if err != nil { + return err + } + // n is the page count of all candidates + n := totalOfCandidate / 1000 + if totalOfCandidate%1000 > 0 { + n = n + 1 + } + query.PageSize = 1000 + for i := n; i >= 1; i-- { + query.PageNumber = i + executions, err := e.List(ctx, query) if err != nil { return err } - // no execution records need to be swept, return directly - if len(executions) == 0 { - return nil - } for _, execution := range executions { // if the status of the execution isn't final, skip if !job.Status(execution.Status).Final() { @@ -176,6 +198,7 @@ func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorI } } } + return nil } func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {