diff --git a/src/controller/replication/controller.go b/src/controller/replication/controller.go index 44ecca11e..30be1d17e 100644 --- a/src/controller/replication/controller.go +++ b/src/controller/replication/controller.go @@ -116,12 +116,22 @@ func (c *controller) Stop(ctx context.Context, id int64) error { } func (c *controller) ExecutionCount(ctx context.Context, query *q.Query) (int64, error) { - query = q.MustClone(query) - query.Keywords["VendorType"] = job.Replication - return c.execMgr.Count(ctx, query) + return c.execMgr.Count(ctx, c.buildExecutionQuery(query)) } func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Execution, error) { + execs, err := c.execMgr.List(ctx, c.buildExecutionQuery(query)) + if err != nil { + return nil, err + } + var executions []*Execution + for _, exec := range execs { + executions = append(executions, convertExecution(exec)) + } + return executions, nil +} + +func (c *controller) buildExecutionQuery(query *q.Query) *q.Query { // as the following logic may change the content of the query, clone it first query = q.MustClone(query) query.Keywords["VendorType"] = job.Replication @@ -134,16 +144,7 @@ func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Exe query.Keywords["VendorID"] = value delete(query.Keywords, "policy_id") } - - execs, err := c.execMgr.List(ctx, query) - if err != nil { - return nil, err - } - var executions []*Execution - for _, exec := range execs { - executions = append(executions, convertExecution(exec)) - } - return executions, nil + return query } func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, error) { diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index ef37586e7..39b9ee4e5 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -123,8 +123,11 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor // sweep the execution records to avoid the execution/task records explosion go func() { + // as we start a new transaction here to do the sweep work, the current execution record + // may be not visible(when the transaction in which the current execution is created + // in isn't committed), this will cause that there are one more execution records than expected ctx := orm.NewContext(context.Background(), e.ormCreator.Create()) - if err := e.sweep(ctx, vendorType); err != nil { + if err := e.sweep(ctx, vendorType, vendorID); err != nil { log.Errorf("failed to sweep the executions of %s: %v", vendorType, err) return } @@ -133,7 +136,7 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor return id, nil } -func (e *executionManager) sweep(ctx context.Context, vendorType string) error { +func (e *executionManager) sweep(ctx context.Context, vendorType string, vendorID int64) error { count := executionSweeperCount[vendorType] if count == 0 { count = defaultExecutionSweeperCount @@ -146,6 +149,7 @@ func (e *executionManager) sweep(ctx context.Context, vendorType string) error { executions, err := e.List(ctx, &q.Query{ Keywords: map[string]interface{}{ "VendorType": vendorType, + "VendorID": vendorID, }, PageNumber: 2, PageSize: int64(count),