Merge pull request #14012 from ywk253100/210113_replication

Query executions with both vendor type and ID when sweep the execution records
This commit is contained in:
Wenkai Yin(尹文开) 2021-01-18 11:15:31 +08:00 committed by GitHub
commit c3b986cbcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 15 deletions

View File

@ -116,12 +116,22 @@ func (c *controller) Stop(ctx context.Context, id int64) error {
}
func (c *controller) ExecutionCount(ctx context.Context, query *q.Query) (int64, error) {
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Replication
return c.execMgr.Count(ctx, query)
return c.execMgr.Count(ctx, c.buildExecutionQuery(query))
}
func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Execution, error) {
execs, err := c.execMgr.List(ctx, c.buildExecutionQuery(query))
if err != nil {
return nil, err
}
var executions []*Execution
for _, exec := range execs {
executions = append(executions, convertExecution(exec))
}
return executions, nil
}
func (c *controller) buildExecutionQuery(query *q.Query) *q.Query {
// as the following logic may change the content of the query, clone it first
query = q.MustClone(query)
query.Keywords["VendorType"] = job.Replication
@ -134,16 +144,7 @@ func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Exe
query.Keywords["VendorID"] = value
delete(query.Keywords, "policy_id")
}
execs, err := c.execMgr.List(ctx, query)
if err != nil {
return nil, err
}
var executions []*Execution
for _, exec := range execs {
executions = append(executions, convertExecution(exec))
}
return executions, nil
return query
}
func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, error) {

View File

@ -123,8 +123,11 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
// sweep the execution records to avoid the execution/task records explosion
go func() {
// as we start a new transaction here to do the sweep work, the current execution record
// may be not visible(when the transaction in which the current execution is created
// in isn't committed), this will cause that there are one more execution records than expected
ctx := orm.NewContext(context.Background(), e.ormCreator.Create())
if err := e.sweep(ctx, vendorType); err != nil {
if err := e.sweep(ctx, vendorType, vendorID); err != nil {
log.Errorf("failed to sweep the executions of %s: %v", vendorType, err)
return
}
@ -133,7 +136,7 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
return id, nil
}
func (e *executionManager) sweep(ctx context.Context, vendorType string) error {
func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error {
count := executionSweeperCount[vendorType]
if count == 0 {
count = defaultExecutionSweeperCount
@ -146,6 +149,7 @@ func (e *executionManager) sweep(ctx context.Context, vendorType string) error {
executions, err := e.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
"VendorID": vendorID,
},
PageNumber: 2,
PageSize: int64(count),