diff --git a/src/common/const.go b/src/common/const.go index 25ad6efad..0cf2894b9 100755 --- a/src/common/const.go +++ b/src/common/const.go @@ -224,4 +224,6 @@ const ( // UIMaxLengthLimitedOfNumber is the max length that UI limited for type number UIMaxLengthLimitedOfNumber = 10 + // ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status + ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds" ) diff --git a/src/core/main.go b/src/core/main.go index 10ef68483..b121e7b69 100755 --- a/src/core/main.go +++ b/src/core/main.go @@ -49,6 +49,7 @@ import ( _ "github.com/goharbor/harbor/src/lib/cache/memory" // memory cache _ "github.com/goharbor/harbor/src/lib/cache/redis" // redis cache "github.com/goharbor/harbor/src/lib/config" + "github.com/goharbor/harbor/src/lib/gtask" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/metric" "github.com/goharbor/harbor/src/lib/orm" @@ -207,6 +208,9 @@ func main() { health.RegisterHealthCheckers() registerScanners(orm.Context()) + // start global task pool, do not stop in the gracefulShutdown because it may take long time to finish. + gtask.DefaultPool().Start(ctx) + closing := make(chan struct{}) done := make(chan struct{}) go gracefulShutdown(closing, done, shutdownTracerProvider) diff --git a/src/lib/config/metadata/metadatalist.go b/src/lib/config/metadata/metadatalist.go index 612c181de..b0717f8b4 100644 --- a/src/lib/config/metadata/metadatalist.go +++ b/src/lib/config/metadata/metadatalist.go @@ -189,5 +189,7 @@ var ( {Name: common.ScannerSkipUpdatePullTime, Scope: UserScope, Group: BasicGroup, EnvKey: "SCANNER_SKIP_UPDATE_PULL_TIME", DefaultValue: "false", ItemType: &BoolType{}, Editable: false, Description: `The option to skip update pull time for scanner`}, {Name: common.SessionTimeout, Scope: UserScope, Group: BasicGroup, EnvKey: "SESSION_TIMEOUT", DefaultValue: "60", ItemType: &Int64Type{}, Editable: true, Description: `The session timeout in minutes`}, + + {Name: common.ExecutionStatusRefreshIntervalSeconds, Scope: SystemScope, Group: BasicGroup, EnvKey: "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS", DefaultValue: "30", ItemType: &Int64Type{}, Editable: false, Description: `The interval seconds to refresh the execution status`}, } ) diff --git a/src/lib/config/systemconfig.go b/src/lib/config/systemconfig.go index 727280523..b088388ea 100644 --- a/src/lib/config/systemconfig.go +++ b/src/lib/config/systemconfig.go @@ -141,6 +141,11 @@ func GetGCTimeWindow() int64 { return common.DefaultGCTimeWindowHours } +// GetExecutionStatusRefreshIntervalSeconds returns the interval seconds for the refresh of execution status. +func GetExecutionStatusRefreshIntervalSeconds() int64 { + return DefaultMgr().Get(backgroundCtx, common.ExecutionStatusRefreshIntervalSeconds).GetInt64() +} + // WithNotary returns a bool value to indicate if Harbor's deployed with Notary func WithNotary() bool { return DefaultMgr().Get(backgroundCtx, common.WithNotary).GetBool() diff --git a/src/lib/gtask/pool.go b/src/lib/gtask/pool.go new file mode 100644 index 000000000..588cfeb88 --- /dev/null +++ b/src/lib/gtask/pool.go @@ -0,0 +1,97 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gtask + +import ( + "context" + "sync" + "time" +) + +func DefaultPool() *Pool { + return pool +} + +var ( + // pool is the global task pool. + pool = NewPool() +) + +type taskFunc func(ctx context.Context) + +// Pool is the task pool for managing some async jobs. +type Pool struct { + stopCh chan struct{} + wg sync.WaitGroup + lock sync.Mutex + tasks []*task +} + +func NewPool() *Pool { + return &Pool{ + stopCh: make(chan struct{}), + } +} + +type task struct { + fn taskFunc + interval time.Duration +} + +func (p *Pool) AddTask(fn taskFunc, interval time.Duration) { + t := &task{ + fn: fn, + interval: interval, + } + + p.lock.Lock() + defer p.lock.Unlock() + p.tasks = append(p.tasks, t) +} + +func (p *Pool) Start(ctx context.Context) { + p.lock.Lock() + defer p.lock.Unlock() + + for _, task := range p.tasks { + p.wg.Add(1) + go p.doTask(ctx, task) + } +} + +func (p *Pool) doTask(ctx context.Context, task *task) { + defer p.wg.Done() + for { + select { + // wait for stop signal + case <-ctx.Done(): + return + case <-p.stopCh: + return + default: + task.fn(ctx) + // interval is 0 means it's a one time job, return directly + if task.interval == 0 { + return + } + time.Sleep(task.interval) + } + } +} + +func (p *Pool) Stop() { + close(p.stopCh) + p.wg.Wait() +} diff --git a/src/lib/gtask/pool_test.go b/src/lib/gtask/pool_test.go new file mode 100644 index 000000000..be9b857c2 --- /dev/null +++ b/src/lib/gtask/pool_test.go @@ -0,0 +1,103 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gtask + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestAddTask(t *testing.T) { + pool := NewPool() + + taskNum := 3 + taskInterval := time.Duration(0) + for i := 0; i < taskNum; i++ { + fn := func(ctx context.Context) { + t.Logf("Task %d is running...", i) + } + + pool.AddTask(fn, taskInterval) + } + + if len(pool.tasks) != taskNum { + t.Errorf("Expected %d tasks but found %d", taskNum, len(pool.tasks)) + } +} + +func TestStartAndStop(t *testing.T) { + // test normal case + { + pool := NewPool() + // create channel with buffer + ch1 := make(chan struct{}, 5) + ch2 := make(chan struct{}, 5) + // test one-time job + t1 := &task{ + interval: 0, + fn: func(ctx context.Context) { + ch1 <- struct{}{} + }, + } + // test interval job + t2 := &task{ + interval: 100 * time.Millisecond, + fn: func(ctx context.Context) { + ch2 <- struct{}{} + }, + } + + pool.tasks = []*task{t1, t2} + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + pool.Start(ctx1) + + // Let it run for a bit + time.Sleep(300 * time.Millisecond) + // ch1 should only have one element as it's a one time job + assert.Equal(t, 1, len(ch1)) + // ch2 should have elements over 2 as sleep 300ms and interval is 100ms + assert.Greater(t, len(ch2), 2) + pool.Stop() + close(ch1) + close(ch2) + } + + // test context timeout case + { + pool := NewPool() + ch1 := make(chan struct{}, 2) + t1 := &task{ + interval: 100 * time.Millisecond, + fn: func(ctx context.Context) { + ch1 <- struct{}{} + }, + } + + pool.tasks = []*task{t1} + ctx1, cancel1 := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel1() + pool.Start(ctx1) + // Let it run for a bit + time.Sleep(200 * time.Millisecond) + assert.Equal(t, 1, len(ch1)) + pool.Stop() + close(ch1) + } +} diff --git a/src/lib/shuffle.go b/src/lib/shuffle.go new file mode 100644 index 000000000..625133372 --- /dev/null +++ b/src/lib/shuffle.go @@ -0,0 +1,28 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lib + +import ( + "math/rand" + "time" +) + +// ShuffleStringSlice shuffles the string slice in place. +func ShuffleStringSlice(slice []string) { + rd := rand.New(rand.NewSource(time.Now().UnixNano())) + rd.Shuffle(len(slice), func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) +} diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index b1c6dd768..caadac476 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -17,16 +17,50 @@ package dao import ( "context" "fmt" + "regexp" + "strconv" "strings" "time" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/lib" + "github.com/goharbor/harbor/src/lib/cache" + "github.com/goharbor/harbor/src/lib/config" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/gtask" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" + + // init the db config + _ "github.com/goharbor/harbor/src/pkg/config/db" ) +func init() { + // register the execution status refresh task if enable the async update + if interval := config.GetExecutionStatusRefreshIntervalSeconds(); interval > 0 { + gtask.DefaultPool().AddTask(scanAndRefreshOutdateStatus, time.Duration(interval)*time.Second) + } +} + +func RegisterExecutionStatusChangePostFunc(vendor string, fc ExecutionStatusChangePostFunc) { + executionStatusChangePostFuncRegistry[vendor] = fc +} + +var ( + // ExecDAO is the global execution dao + ExecDAO = NewExecutionDAO() + executionStatusChangePostFuncRegistry = map[string]ExecutionStatusChangePostFunc{} + // execStatusOutdateKeyRegex is the regex for the execution status outdate key, + // the regex used to parse exec id and vendor type from the key. + // e.g. execution:id:100:vendor:REPLICATION:status_outdate + execStatusOutdateKeyRegex = regexp.MustCompile(`execution:id:(\d+):vendor:([A-Z_]+):status_outdate`) +) + +// ExecutionStatusChangePostFunc is the function called after the execution status changed +type ExecutionStatusChangePostFunc func(ctx context.Context, executionID int64, status string) (err error) + // ExecutionDAO is the data access object interface for execution type ExecutionDAO interface { // Count returns the total count of executions according to the query @@ -50,6 +84,9 @@ type ExecutionDAO interface { // If the status is changed, the returning "statusChanged" is set as "true" and the current status indicates // the changed status RefreshStatus(ctx context.Context, id int64) (statusChanged bool, currentStatus string, err error) + // AsyncRefreshStatus refreshes the status of the specified execution in the async mode, which will register + // a update flag in the redis and then wait for global periodic job to scan and update the status to db finally. + AsyncRefreshStatus(ctx context.Context, id int64, vendor string) (err error) } // NewExecutionDAO returns an instance of ExecutionDAO @@ -376,3 +413,88 @@ func buildInClauseSQLForExtraAttrs(keys []string) string { return fmt.Sprintf("select id from execution where extra_attrs->%s->>?=?", s) } } + +func buildExecStatusOutdateKey(id int64, vendor string) string { + return fmt.Sprintf("execution:id:%d:vendor:%s:status_outdate", id, vendor) +} + +func extractExecIDVendorFromKey(key string) (int64, string, error) { + // input: execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate + // output: [execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate 100 GARBAGE_COLLECTION] + matches := execStatusOutdateKeyRegex.FindStringSubmatch(key) + if len(matches) < 3 { + return 0, "", errors.Errorf("invalid format: %s", key) + } + + id, err := strconv.ParseInt(matches[1], 10, 64) + if err != nil { + return 0, matches[2], err + } + + return id, matches[2], nil +} + +func (e *executionDAO) AsyncRefreshStatus(ctx context.Context, id int64, vendor string) (err error) { + key := buildExecStatusOutdateKey(id, vendor) + if cache.Default().Contains(ctx, key) { + // return earlier if already have the key + return nil + } + // save the key to redis, the value is useless so set it to empty + return cache.Default().Save(ctx, key, "") +} + +// scanAndRefreshOutdateStatus scans the outdate execution status from redis and then refresh the status to db, +// do not want to expose to external use so keep it as private. +func scanAndRefreshOutdateStatus(ctx context.Context) { + keys, err := cache.Default().Keys(ctx, "execution:id:") + if err != nil { + log.Errorf("failed to scan the outdate executions, error: %v", err) + return + } + // return earlier if no keys found which represents no outdate execution + if len(keys) == 0 { + log.Debug("skip to refresh, no outdate execution status found") + return + } + // TODO: refactor + // shuffle the keys to avoid the conflict and improve efficiency when multiple core instance existed, + // but currently if multiple instances get the same set of keys at the same time, then eventually everyone + // will still need to repeat the same work(refresh same execution), which needs to be optimized later. + lib.ShuffleStringSlice(keys) + + log.Infof("scanned out %d executions with outdate status, refresh status to db", len(keys)) + var succeed, failed int64 + // update the execution status execution to db + for _, key := range keys { + execID, vendor, err := extractExecIDVendorFromKey(key) + if err != nil { + log.Errorf("failed to extract execution id from key %s, error: %v", key, err) + failed++ + continue + } + + statusChanged, currentStatus, err := ExecDAO.RefreshStatus(ctx, execID) + if err != nil { + log.Errorf("failed to refresh the status of execution %d, error: %v", execID, err) + failed++ + continue + } + + succeed++ + log.Debugf("refresh the status of execution %d successfully, new status: %s", execID, currentStatus) + // run the status change post function + // just print error log, not return error for post action + if fc, exist := executionStatusChangePostFuncRegistry[vendor]; exist && statusChanged { + if err = fc(ctx, execID, currentStatus); err != nil { + logger.Errorf("failed to run the execution status change post function for execution %d, error: %v", execID, err) + } + } + // delete the key from redis, it does not matter if the deletion fails, wait for the next round. + if err = cache.Default().Delete(ctx, key); err != nil { + log.Errorf("failed to delete the key %s in cache, error: %v", key, err) + } + } + + log.Infof("refresh outdate execution status done, %d succeed, %d failed", succeed, failed) +} diff --git a/src/pkg/task/dao/execution_test.go b/src/pkg/task/dao/execution_test.go index 5e3fd4282..5490615f6 100644 --- a/src/pkg/task/dao/execution_test.go +++ b/src/pkg/task/dao/execution_test.go @@ -19,13 +19,14 @@ import ( "testing" "time" - "github.com/stretchr/testify/suite" - "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/cache" + _ "github.com/goharbor/harbor/src/lib/cache/memory" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" + "github.com/stretchr/testify/suite" ) type executionDAOTestSuite struct { @@ -43,6 +44,9 @@ func (e *executionDAOTestSuite) SetupSuite() { e.executionDAO = &executionDAO{ taskDAO: e.taskDao, } + // initializes cache for testing + err := cache.Initialize(cache.Memory, "") + e.NoError(err) } func (e *executionDAOTestSuite) SetupTest() { @@ -327,6 +331,65 @@ func (e *executionDAOTestSuite) TestRefreshStatus() { e.Empty(execution.EndTime) } +func (e *executionDAOTestSuite) TestAsyncRefreshStatus() { + err := e.executionDAO.AsyncRefreshStatus(e.ctx, e.executionID, "GC") + e.NoError(err) + defer cache.Default().Delete(e.ctx, buildExecStatusOutdateKey(e.executionID, "GC")) + e.True(cache.Default().Contains(e.ctx, buildExecStatusOutdateKey(e.executionID, "GC"))) +} + +func (e *executionDAOTestSuite) TestScanAndRefreshOutdateStatus() { + // create execution1 with 1 running task + id1, err := e.executionDAO.Create(e.ctx, &Execution{ + VendorType: "test1", + Trigger: "test", + ExtraAttrs: `{"key":"value"}`, + }) + e.NoError(err) + defer e.executionDAO.Delete(e.ctx, id1) + + tid1, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: id1, + Status: job.RunningStatus.String(), + StatusCode: job.RunningStatus.Code(), + ExtraAttrs: `{}`, + }) + e.NoError(err) + defer e.taskDao.Delete(e.ctx, tid1) + + // create execution1 with 1 error task + id2, err := e.executionDAO.Create(e.ctx, &Execution{ + VendorType: "test2", + Trigger: "test", + ExtraAttrs: `{"key":"value"}`, + }) + e.NoError(err) + defer e.executionDAO.Delete(e.ctx, id2) + + tid2, err := e.taskDao.Create(e.ctx, &Task{ + ExecutionID: id2, + Status: job.ErrorStatus.String(), + StatusCode: job.ErrorStatus.Code(), + ExtraAttrs: `{}`, + }) + e.NoError(err) + defer e.taskDao.Delete(e.ctx, tid2) + + // async refresh the status + err = e.executionDAO.AsyncRefreshStatus(e.ctx, id1, "GC") + e.NoError(err) + err = e.executionDAO.AsyncRefreshStatus(e.ctx, id2, "GC") + e.NoError(err) + // test scan out and refresh + scanAndRefreshOutdateStatus(e.ctx) + exec1, err := e.executionDAO.Get(e.ctx, id1) + e.NoError(err) + e.Equal(job.RunningStatus.String(), exec1.Status) + exec2, err := e.executionDAO.Get(e.ctx, id2) + e.NoError(err) + e.Equal(job.ErrorStatus.String(), exec2.Status) +} + func TestExecutionDAOSuite(t *testing.T) { suite.Run(t, &executionDAOTestSuite{}) } @@ -353,3 +416,36 @@ func Test_buildInClauseSQLForExtraAttrs(t *testing.T) { }) } } + +func Test_extractExecIDVendorFromKey(t *testing.T) { + type args struct { + key string + } + tests := []struct { + name string + args args + wantID int64 + wantVendor string + wantErr bool + }{ + {"invalid format", args{"invalid:foo:bar"}, 0, "", true}, + {"invalid execution id", args{"execution:id:12abc:vendor:GC:status_outdate"}, 0, "", true}, + {"invalid vendor type", args{"execution:id:100:vendor:foo:status_outdate"}, 0, "", true}, + {"valid", args{"execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate"}, 100, "GARBAGE_COLLECTION", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, err := extractExecIDVendorFromKey(tt.args.key) + if (err != nil) != tt.wantErr { + t.Errorf("extractExecIDVendorFromKey() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.wantID { + t.Errorf("extractExecIDVendorFromKey() got = %v, want %v", got, tt.wantID) + } + if got1 != tt.wantVendor { + t.Errorf("extractExecIDVendorFromKey() got1 = %v, want %v", got1, tt.wantVendor) + } + }) + } +} diff --git a/src/pkg/task/hook.go b/src/pkg/task/hook.go index 4f752e47a..c49de73da 100644 --- a/src/pkg/task/hook.go +++ b/src/pkg/task/hook.go @@ -17,8 +17,10 @@ package task import ( "context" "fmt" + "sync" "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/lib/config" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/q" @@ -28,6 +30,8 @@ import ( var ( // HkHandler is a global instance of the HookHandler HkHandler = NewHookHandler() + // once do the one time work + once sync.Once ) // NewHookHandler creates a hook handler instance @@ -44,10 +48,20 @@ type HookHandler struct { executionDAO dao.ExecutionDAO } +func (h *HookHandler) init() { + // register the post actions to execution dao + once.Do(func() { + for vendor, fc := range executionStatusChangePostFuncRegistry { + dao.RegisterExecutionStatusChangePostFunc(vendor, dao.ExecutionStatusChangePostFunc(fc)) + } + }) +} + // Handle the job status changing webhook func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error { logger := log.GetLogger(ctx) + h.init() jobID := sc.JobID // the "JobID" field of some kinds of jobs are set as "87bbdee19bed5ce09c48a149@1605104520" which contains "@". // In this case, read the parent periodical job ID from "sc.Metadata.UpstreamJobID" @@ -93,17 +107,22 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error { logger.Errorf("failed to run the task status change post function for task %d: %v", task.ID, err) } } - - // update execution status - statusChanged, currentStatus, err := h.executionDAO.RefreshStatus(ctx, task.ExecutionID) - if err != nil { - return err - } - // run the status change post function - if fc, exist := executionStatusChangePostFuncRegistry[execution.VendorType]; exist && statusChanged { - if err = fc(ctx, task.ExecutionID, currentStatus); err != nil { - logger.Errorf("failed to run the execution status change post function for execution %d: %v", task.ExecutionID, err) + // execution status refresh interval <= 0 means update the status immediately + if config.GetExecutionStatusRefreshIntervalSeconds() <= 0 { + // update execution status immediately which may have optimistic lock + statusChanged, currentStatus, err := h.executionDAO.RefreshStatus(ctx, task.ExecutionID) + if err != nil { + return err } + // run the status change post function + if fc, exist := executionStatusChangePostFuncRegistry[execution.VendorType]; exist && statusChanged { + if err = fc(ctx, task.ExecutionID, currentStatus); err != nil { + logger.Errorf("failed to run the execution status change post function for execution %d: %v", task.ExecutionID, err) + } + } + + return nil } - return nil + // by default, the execution status is updated in asynchronous mode + return h.executionDAO.AsyncRefreshStatus(ctx, task.ExecutionID, task.VendorType) } diff --git a/src/pkg/task/hook_test.go b/src/pkg/task/hook_test.go index a8550cc35..fc4f5f794 100644 --- a/src/pkg/task/hook_test.go +++ b/src/pkg/task/hook_test.go @@ -80,17 +80,21 @@ func (h *hookHandlerTestSuite) TestHandle() { ID: 1, VendorType: "test", }, nil) - h.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(true, job.RunningStatus.String(), nil) - sc = &job.StatusChange{ - Status: job.SuccessStatus.String(), - Metadata: &job.StatsInfo{ - Revision: time.Now().Unix(), - }, + + // test update status non-immediately when receive the hook + { + h.execDAO.On("AsyncRefreshStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil) + sc = &job.StatusChange{ + Status: job.SuccessStatus.String(), + Metadata: &job.StatsInfo{ + Revision: time.Now().Unix(), + }, + } + err = h.handler.Handle(nil, sc) + h.Require().Nil(err) + h.taskDAO.AssertExpectations(h.T()) + h.execDAO.AssertExpectations(h.T()) } - err = h.handler.Handle(nil, sc) - h.Require().Nil(err) - h.taskDAO.AssertExpectations(h.T()) - h.execDAO.AssertExpectations(h.T()) } func TestHookHandlerTestSuite(t *testing.T) { diff --git a/src/pkg/task/mock_execution_dao_test.go b/src/pkg/task/mock_execution_dao_test.go index 6f2421455..a8f64085f 100644 --- a/src/pkg/task/mock_execution_dao_test.go +++ b/src/pkg/task/mock_execution_dao_test.go @@ -16,6 +16,20 @@ type mockExecutionDAO struct { mock.Mock } +// AsyncRefreshStatus provides a mock function with given fields: ctx, id, vendor +func (_m *mockExecutionDAO) AsyncRefreshStatus(ctx context.Context, id int64, vendor string) error { + ret := _m.Called(ctx, id, vendor) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string) error); ok { + r0 = rf(ctx, id, vendor) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Count provides a mock function with given fields: ctx, query func (_m *mockExecutionDAO) Count(ctx context.Context, query *q.Query) (int64, error) { ret := _m.Called(ctx, query) diff --git a/tests/apitests/python/test_log_rotation.py b/tests/apitests/python/test_log_rotation.py index 83dd7bec0..6a4fefb82 100644 --- a/tests/apitests/python/test_log_rotation.py +++ b/tests/apitests/python/test_log_rotation.py @@ -44,6 +44,8 @@ class TestLogRotation(unittest.TestCase, object): latest_job = self.purge.get_latest_purge_job() self.purge.stop_purge_execution(latest_job.id) # 3. Verify purge audit log job status is Stopped + # wait more 5s for status update after stop + time.sleep(5) job_status = self.purge.get_purge_job(latest_job.id).job_status self.assertEqual(self.purge.get_purge_job(latest_job.id).job_status, "Stopped") # 4. Create a purge audit log job diff --git a/tests/ci/api_common_install.sh b/tests/ci/api_common_install.sh index 8684ed4f9..5cfacc15d 100755 --- a/tests/ci/api_common_install.sh +++ b/tests/ci/api_common_install.sh @@ -59,6 +59,7 @@ sudo make compile build prepare COMPILETAG=compile_golangimage GOBUILDTAGS="incl # set the debugging env echo "GC_TIME_WINDOW_HOURS=0" | sudo tee -a ./make/common/config/core/env +echo "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS=5" | sudo tee -a ./make/common/config/core/env sudo make start # waiting 5 minutes to start