mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-23 09:08:26 +01:00
refactor: refresh the execution status in async mode (#18434)
Refactor the logic for updating the status of execution when receiving the hook from jobservice, avoid the optimistic lock due to the multiple tasks update one execution by refreshing the status asynchronously. But still retain the old way by specifying the flag from ENV. Fixes: #17584 Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
parent
1e884c58c2
commit
62bb56daf6
@ -224,4 +224,6 @@ const (
|
||||
|
||||
// UIMaxLengthLimitedOfNumber is the max length that UI limited for type number
|
||||
UIMaxLengthLimitedOfNumber = 10
|
||||
// ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status
|
||||
ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds"
|
||||
)
|
||||
|
@ -49,6 +49,7 @@ import (
|
||||
_ "github.com/goharbor/harbor/src/lib/cache/memory" // memory cache
|
||||
_ "github.com/goharbor/harbor/src/lib/cache/redis" // redis cache
|
||||
"github.com/goharbor/harbor/src/lib/config"
|
||||
"github.com/goharbor/harbor/src/lib/gtask"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/metric"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
@ -207,6 +208,9 @@ func main() {
|
||||
health.RegisterHealthCheckers()
|
||||
registerScanners(orm.Context())
|
||||
|
||||
// start global task pool, do not stop in the gracefulShutdown because it may take long time to finish.
|
||||
gtask.DefaultPool().Start(ctx)
|
||||
|
||||
closing := make(chan struct{})
|
||||
done := make(chan struct{})
|
||||
go gracefulShutdown(closing, done, shutdownTracerProvider)
|
||||
|
@ -189,5 +189,7 @@ var (
|
||||
{Name: common.ScannerSkipUpdatePullTime, Scope: UserScope, Group: BasicGroup, EnvKey: "SCANNER_SKIP_UPDATE_PULL_TIME", DefaultValue: "false", ItemType: &BoolType{}, Editable: false, Description: `The option to skip update pull time for scanner`},
|
||||
|
||||
{Name: common.SessionTimeout, Scope: UserScope, Group: BasicGroup, EnvKey: "SESSION_TIMEOUT", DefaultValue: "60", ItemType: &Int64Type{}, Editable: true, Description: `The session timeout in minutes`},
|
||||
|
||||
{Name: common.ExecutionStatusRefreshIntervalSeconds, Scope: SystemScope, Group: BasicGroup, EnvKey: "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS", DefaultValue: "30", ItemType: &Int64Type{}, Editable: false, Description: `The interval seconds to refresh the execution status`},
|
||||
}
|
||||
)
|
||||
|
@ -141,6 +141,11 @@ func GetGCTimeWindow() int64 {
|
||||
return common.DefaultGCTimeWindowHours
|
||||
}
|
||||
|
||||
// GetExecutionStatusRefreshIntervalSeconds returns the interval seconds for the refresh of execution status.
|
||||
func GetExecutionStatusRefreshIntervalSeconds() int64 {
|
||||
return DefaultMgr().Get(backgroundCtx, common.ExecutionStatusRefreshIntervalSeconds).GetInt64()
|
||||
}
|
||||
|
||||
// WithNotary returns a bool value to indicate if Harbor's deployed with Notary
|
||||
func WithNotary() bool {
|
||||
return DefaultMgr().Get(backgroundCtx, common.WithNotary).GetBool()
|
||||
|
97
src/lib/gtask/pool.go
Normal file
97
src/lib/gtask/pool.go
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gtask
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func DefaultPool() *Pool {
|
||||
return pool
|
||||
}
|
||||
|
||||
var (
|
||||
// pool is the global task pool.
|
||||
pool = NewPool()
|
||||
)
|
||||
|
||||
type taskFunc func(ctx context.Context)
|
||||
|
||||
// Pool is the task pool for managing some async jobs.
|
||||
type Pool struct {
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
lock sync.Mutex
|
||||
tasks []*task
|
||||
}
|
||||
|
||||
func NewPool() *Pool {
|
||||
return &Pool{
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type task struct {
|
||||
fn taskFunc
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func (p *Pool) AddTask(fn taskFunc, interval time.Duration) {
|
||||
t := &task{
|
||||
fn: fn,
|
||||
interval: interval,
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.tasks = append(p.tasks, t)
|
||||
}
|
||||
|
||||
func (p *Pool) Start(ctx context.Context) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
for _, task := range p.tasks {
|
||||
p.wg.Add(1)
|
||||
go p.doTask(ctx, task)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) doTask(ctx context.Context, task *task) {
|
||||
defer p.wg.Done()
|
||||
for {
|
||||
select {
|
||||
// wait for stop signal
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-p.stopCh:
|
||||
return
|
||||
default:
|
||||
task.fn(ctx)
|
||||
// interval is 0 means it's a one time job, return directly
|
||||
if task.interval == 0 {
|
||||
return
|
||||
}
|
||||
time.Sleep(task.interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Stop() {
|
||||
close(p.stopCh)
|
||||
p.wg.Wait()
|
||||
}
|
103
src/lib/gtask/pool_test.go
Normal file
103
src/lib/gtask/pool_test.go
Normal file
@ -0,0 +1,103 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gtask
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAddTask(t *testing.T) {
|
||||
pool := NewPool()
|
||||
|
||||
taskNum := 3
|
||||
taskInterval := time.Duration(0)
|
||||
for i := 0; i < taskNum; i++ {
|
||||
fn := func(ctx context.Context) {
|
||||
t.Logf("Task %d is running...", i)
|
||||
}
|
||||
|
||||
pool.AddTask(fn, taskInterval)
|
||||
}
|
||||
|
||||
if len(pool.tasks) != taskNum {
|
||||
t.Errorf("Expected %d tasks but found %d", taskNum, len(pool.tasks))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartAndStop(t *testing.T) {
|
||||
// test normal case
|
||||
{
|
||||
pool := NewPool()
|
||||
// create channel with buffer
|
||||
ch1 := make(chan struct{}, 5)
|
||||
ch2 := make(chan struct{}, 5)
|
||||
// test one-time job
|
||||
t1 := &task{
|
||||
interval: 0,
|
||||
fn: func(ctx context.Context) {
|
||||
ch1 <- struct{}{}
|
||||
},
|
||||
}
|
||||
// test interval job
|
||||
t2 := &task{
|
||||
interval: 100 * time.Millisecond,
|
||||
fn: func(ctx context.Context) {
|
||||
ch2 <- struct{}{}
|
||||
},
|
||||
}
|
||||
|
||||
pool.tasks = []*task{t1, t2}
|
||||
|
||||
ctx1, cancel1 := context.WithCancel(context.Background())
|
||||
defer cancel1()
|
||||
pool.Start(ctx1)
|
||||
|
||||
// Let it run for a bit
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
// ch1 should only have one element as it's a one time job
|
||||
assert.Equal(t, 1, len(ch1))
|
||||
// ch2 should have elements over 2 as sleep 300ms and interval is 100ms
|
||||
assert.Greater(t, len(ch2), 2)
|
||||
pool.Stop()
|
||||
close(ch1)
|
||||
close(ch2)
|
||||
}
|
||||
|
||||
// test context timeout case
|
||||
{
|
||||
pool := NewPool()
|
||||
ch1 := make(chan struct{}, 2)
|
||||
t1 := &task{
|
||||
interval: 100 * time.Millisecond,
|
||||
fn: func(ctx context.Context) {
|
||||
ch1 <- struct{}{}
|
||||
},
|
||||
}
|
||||
|
||||
pool.tasks = []*task{t1}
|
||||
ctx1, cancel1 := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
||||
defer cancel1()
|
||||
pool.Start(ctx1)
|
||||
// Let it run for a bit
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
assert.Equal(t, 1, len(ch1))
|
||||
pool.Stop()
|
||||
close(ch1)
|
||||
}
|
||||
}
|
28
src/lib/shuffle.go
Normal file
28
src/lib/shuffle.go
Normal file
@ -0,0 +1,28 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package lib
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ShuffleStringSlice shuffles the string slice in place.
|
||||
func ShuffleStringSlice(slice []string) {
|
||||
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
rd.Shuffle(len(slice), func(i, j int) {
|
||||
slice[i], slice[j] = slice[j], slice[i]
|
||||
})
|
||||
}
|
@ -17,16 +17,50 @@ package dao
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/lib"
|
||||
"github.com/goharbor/harbor/src/lib/cache"
|
||||
"github.com/goharbor/harbor/src/lib/config"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/gtask"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
|
||||
// init the db config
|
||||
_ "github.com/goharbor/harbor/src/pkg/config/db"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// register the execution status refresh task if enable the async update
|
||||
if interval := config.GetExecutionStatusRefreshIntervalSeconds(); interval > 0 {
|
||||
gtask.DefaultPool().AddTask(scanAndRefreshOutdateStatus, time.Duration(interval)*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterExecutionStatusChangePostFunc(vendor string, fc ExecutionStatusChangePostFunc) {
|
||||
executionStatusChangePostFuncRegistry[vendor] = fc
|
||||
}
|
||||
|
||||
var (
|
||||
// ExecDAO is the global execution dao
|
||||
ExecDAO = NewExecutionDAO()
|
||||
executionStatusChangePostFuncRegistry = map[string]ExecutionStatusChangePostFunc{}
|
||||
// execStatusOutdateKeyRegex is the regex for the execution status outdate key,
|
||||
// the regex used to parse exec id and vendor type from the key.
|
||||
// e.g. execution:id:100:vendor:REPLICATION:status_outdate
|
||||
execStatusOutdateKeyRegex = regexp.MustCompile(`execution:id:(\d+):vendor:([A-Z_]+):status_outdate`)
|
||||
)
|
||||
|
||||
// ExecutionStatusChangePostFunc is the function called after the execution status changed
|
||||
type ExecutionStatusChangePostFunc func(ctx context.Context, executionID int64, status string) (err error)
|
||||
|
||||
// ExecutionDAO is the data access object interface for execution
|
||||
type ExecutionDAO interface {
|
||||
// Count returns the total count of executions according to the query
|
||||
@ -50,6 +84,9 @@ type ExecutionDAO interface {
|
||||
// If the status is changed, the returning "statusChanged" is set as "true" and the current status indicates
|
||||
// the changed status
|
||||
RefreshStatus(ctx context.Context, id int64) (statusChanged bool, currentStatus string, err error)
|
||||
// AsyncRefreshStatus refreshes the status of the specified execution in the async mode, which will register
|
||||
// a update flag in the redis and then wait for global periodic job to scan and update the status to db finally.
|
||||
AsyncRefreshStatus(ctx context.Context, id int64, vendor string) (err error)
|
||||
}
|
||||
|
||||
// NewExecutionDAO returns an instance of ExecutionDAO
|
||||
@ -376,3 +413,88 @@ func buildInClauseSQLForExtraAttrs(keys []string) string {
|
||||
return fmt.Sprintf("select id from execution where extra_attrs->%s->>?=?", s)
|
||||
}
|
||||
}
|
||||
|
||||
func buildExecStatusOutdateKey(id int64, vendor string) string {
|
||||
return fmt.Sprintf("execution:id:%d:vendor:%s:status_outdate", id, vendor)
|
||||
}
|
||||
|
||||
func extractExecIDVendorFromKey(key string) (int64, string, error) {
|
||||
// input: execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate
|
||||
// output: [execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate 100 GARBAGE_COLLECTION]
|
||||
matches := execStatusOutdateKeyRegex.FindStringSubmatch(key)
|
||||
if len(matches) < 3 {
|
||||
return 0, "", errors.Errorf("invalid format: %s", key)
|
||||
}
|
||||
|
||||
id, err := strconv.ParseInt(matches[1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, matches[2], err
|
||||
}
|
||||
|
||||
return id, matches[2], nil
|
||||
}
|
||||
|
||||
func (e *executionDAO) AsyncRefreshStatus(ctx context.Context, id int64, vendor string) (err error) {
|
||||
key := buildExecStatusOutdateKey(id, vendor)
|
||||
if cache.Default().Contains(ctx, key) {
|
||||
// return earlier if already have the key
|
||||
return nil
|
||||
}
|
||||
// save the key to redis, the value is useless so set it to empty
|
||||
return cache.Default().Save(ctx, key, "")
|
||||
}
|
||||
|
||||
// scanAndRefreshOutdateStatus scans the outdate execution status from redis and then refresh the status to db,
|
||||
// do not want to expose to external use so keep it as private.
|
||||
func scanAndRefreshOutdateStatus(ctx context.Context) {
|
||||
keys, err := cache.Default().Keys(ctx, "execution:id:")
|
||||
if err != nil {
|
||||
log.Errorf("failed to scan the outdate executions, error: %v", err)
|
||||
return
|
||||
}
|
||||
// return earlier if no keys found which represents no outdate execution
|
||||
if len(keys) == 0 {
|
||||
log.Debug("skip to refresh, no outdate execution status found")
|
||||
return
|
||||
}
|
||||
// TODO: refactor
|
||||
// shuffle the keys to avoid the conflict and improve efficiency when multiple core instance existed,
|
||||
// but currently if multiple instances get the same set of keys at the same time, then eventually everyone
|
||||
// will still need to repeat the same work(refresh same execution), which needs to be optimized later.
|
||||
lib.ShuffleStringSlice(keys)
|
||||
|
||||
log.Infof("scanned out %d executions with outdate status, refresh status to db", len(keys))
|
||||
var succeed, failed int64
|
||||
// update the execution status execution to db
|
||||
for _, key := range keys {
|
||||
execID, vendor, err := extractExecIDVendorFromKey(key)
|
||||
if err != nil {
|
||||
log.Errorf("failed to extract execution id from key %s, error: %v", key, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
statusChanged, currentStatus, err := ExecDAO.RefreshStatus(ctx, execID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to refresh the status of execution %d, error: %v", execID, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
succeed++
|
||||
log.Debugf("refresh the status of execution %d successfully, new status: %s", execID, currentStatus)
|
||||
// run the status change post function
|
||||
// just print error log, not return error for post action
|
||||
if fc, exist := executionStatusChangePostFuncRegistry[vendor]; exist && statusChanged {
|
||||
if err = fc(ctx, execID, currentStatus); err != nil {
|
||||
logger.Errorf("failed to run the execution status change post function for execution %d, error: %v", execID, err)
|
||||
}
|
||||
}
|
||||
// delete the key from redis, it does not matter if the deletion fails, wait for the next round.
|
||||
if err = cache.Default().Delete(ctx, key); err != nil {
|
||||
log.Errorf("failed to delete the key %s in cache, error: %v", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("refresh outdate execution status done, %d succeed, %d failed", succeed, failed)
|
||||
}
|
||||
|
@ -19,13 +19,14 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/cache"
|
||||
_ "github.com/goharbor/harbor/src/lib/cache/memory"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type executionDAOTestSuite struct {
|
||||
@ -43,6 +44,9 @@ func (e *executionDAOTestSuite) SetupSuite() {
|
||||
e.executionDAO = &executionDAO{
|
||||
taskDAO: e.taskDao,
|
||||
}
|
||||
// initializes cache for testing
|
||||
err := cache.Initialize(cache.Memory, "")
|
||||
e.NoError(err)
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) SetupTest() {
|
||||
@ -327,6 +331,65 @@ func (e *executionDAOTestSuite) TestRefreshStatus() {
|
||||
e.Empty(execution.EndTime)
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) TestAsyncRefreshStatus() {
|
||||
err := e.executionDAO.AsyncRefreshStatus(e.ctx, e.executionID, "GC")
|
||||
e.NoError(err)
|
||||
defer cache.Default().Delete(e.ctx, buildExecStatusOutdateKey(e.executionID, "GC"))
|
||||
e.True(cache.Default().Contains(e.ctx, buildExecStatusOutdateKey(e.executionID, "GC")))
|
||||
}
|
||||
|
||||
func (e *executionDAOTestSuite) TestScanAndRefreshOutdateStatus() {
|
||||
// create execution1 with 1 running task
|
||||
id1, err := e.executionDAO.Create(e.ctx, &Execution{
|
||||
VendorType: "test1",
|
||||
Trigger: "test",
|
||||
ExtraAttrs: `{"key":"value"}`,
|
||||
})
|
||||
e.NoError(err)
|
||||
defer e.executionDAO.Delete(e.ctx, id1)
|
||||
|
||||
tid1, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: id1,
|
||||
Status: job.RunningStatus.String(),
|
||||
StatusCode: job.RunningStatus.Code(),
|
||||
ExtraAttrs: `{}`,
|
||||
})
|
||||
e.NoError(err)
|
||||
defer e.taskDao.Delete(e.ctx, tid1)
|
||||
|
||||
// create execution1 with 1 error task
|
||||
id2, err := e.executionDAO.Create(e.ctx, &Execution{
|
||||
VendorType: "test2",
|
||||
Trigger: "test",
|
||||
ExtraAttrs: `{"key":"value"}`,
|
||||
})
|
||||
e.NoError(err)
|
||||
defer e.executionDAO.Delete(e.ctx, id2)
|
||||
|
||||
tid2, err := e.taskDao.Create(e.ctx, &Task{
|
||||
ExecutionID: id2,
|
||||
Status: job.ErrorStatus.String(),
|
||||
StatusCode: job.ErrorStatus.Code(),
|
||||
ExtraAttrs: `{}`,
|
||||
})
|
||||
e.NoError(err)
|
||||
defer e.taskDao.Delete(e.ctx, tid2)
|
||||
|
||||
// async refresh the status
|
||||
err = e.executionDAO.AsyncRefreshStatus(e.ctx, id1, "GC")
|
||||
e.NoError(err)
|
||||
err = e.executionDAO.AsyncRefreshStatus(e.ctx, id2, "GC")
|
||||
e.NoError(err)
|
||||
// test scan out and refresh
|
||||
scanAndRefreshOutdateStatus(e.ctx)
|
||||
exec1, err := e.executionDAO.Get(e.ctx, id1)
|
||||
e.NoError(err)
|
||||
e.Equal(job.RunningStatus.String(), exec1.Status)
|
||||
exec2, err := e.executionDAO.Get(e.ctx, id2)
|
||||
e.NoError(err)
|
||||
e.Equal(job.ErrorStatus.String(), exec2.Status)
|
||||
}
|
||||
|
||||
func TestExecutionDAOSuite(t *testing.T) {
|
||||
suite.Run(t, &executionDAOTestSuite{})
|
||||
}
|
||||
@ -353,3 +416,36 @@ func Test_buildInClauseSQLForExtraAttrs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_extractExecIDVendorFromKey(t *testing.T) {
|
||||
type args struct {
|
||||
key string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantID int64
|
||||
wantVendor string
|
||||
wantErr bool
|
||||
}{
|
||||
{"invalid format", args{"invalid:foo:bar"}, 0, "", true},
|
||||
{"invalid execution id", args{"execution:id:12abc:vendor:GC:status_outdate"}, 0, "", true},
|
||||
{"invalid vendor type", args{"execution:id:100:vendor:foo:status_outdate"}, 0, "", true},
|
||||
{"valid", args{"execution:id:100:vendor:GARBAGE_COLLECTION:status_outdate"}, 100, "GARBAGE_COLLECTION", false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1, err := extractExecIDVendorFromKey(tt.args.key)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("extractExecIDVendorFromKey() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != tt.wantID {
|
||||
t.Errorf("extractExecIDVendorFromKey() got = %v, want %v", got, tt.wantID)
|
||||
}
|
||||
if got1 != tt.wantVendor {
|
||||
t.Errorf("extractExecIDVendorFromKey() got1 = %v, want %v", got1, tt.wantVendor)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,10 @@ package task
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/config"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
@ -28,6 +30,8 @@ import (
|
||||
var (
|
||||
// HkHandler is a global instance of the HookHandler
|
||||
HkHandler = NewHookHandler()
|
||||
// once do the one time work
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// NewHookHandler creates a hook handler instance
|
||||
@ -44,10 +48,20 @@ type HookHandler struct {
|
||||
executionDAO dao.ExecutionDAO
|
||||
}
|
||||
|
||||
func (h *HookHandler) init() {
|
||||
// register the post actions to execution dao
|
||||
once.Do(func() {
|
||||
for vendor, fc := range executionStatusChangePostFuncRegistry {
|
||||
dao.RegisterExecutionStatusChangePostFunc(vendor, dao.ExecutionStatusChangePostFunc(fc))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Handle the job status changing webhook
|
||||
func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error {
|
||||
logger := log.GetLogger(ctx)
|
||||
|
||||
h.init()
|
||||
jobID := sc.JobID
|
||||
// the "JobID" field of some kinds of jobs are set as "87bbdee19bed5ce09c48a149@1605104520" which contains "@".
|
||||
// In this case, read the parent periodical job ID from "sc.Metadata.UpstreamJobID"
|
||||
@ -93,17 +107,22 @@ func (h *HookHandler) Handle(ctx context.Context, sc *job.StatusChange) error {
|
||||
logger.Errorf("failed to run the task status change post function for task %d: %v", task.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// update execution status
|
||||
statusChanged, currentStatus, err := h.executionDAO.RefreshStatus(ctx, task.ExecutionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// run the status change post function
|
||||
if fc, exist := executionStatusChangePostFuncRegistry[execution.VendorType]; exist && statusChanged {
|
||||
if err = fc(ctx, task.ExecutionID, currentStatus); err != nil {
|
||||
logger.Errorf("failed to run the execution status change post function for execution %d: %v", task.ExecutionID, err)
|
||||
// execution status refresh interval <= 0 means update the status immediately
|
||||
if config.GetExecutionStatusRefreshIntervalSeconds() <= 0 {
|
||||
// update execution status immediately which may have optimistic lock
|
||||
statusChanged, currentStatus, err := h.executionDAO.RefreshStatus(ctx, task.ExecutionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// run the status change post function
|
||||
if fc, exist := executionStatusChangePostFuncRegistry[execution.VendorType]; exist && statusChanged {
|
||||
if err = fc(ctx, task.ExecutionID, currentStatus); err != nil {
|
||||
logger.Errorf("failed to run the execution status change post function for execution %d: %v", task.ExecutionID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
// by default, the execution status is updated in asynchronous mode
|
||||
return h.executionDAO.AsyncRefreshStatus(ctx, task.ExecutionID, task.VendorType)
|
||||
}
|
||||
|
@ -80,17 +80,21 @@ func (h *hookHandlerTestSuite) TestHandle() {
|
||||
ID: 1,
|
||||
VendorType: "test",
|
||||
}, nil)
|
||||
h.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(true, job.RunningStatus.String(), nil)
|
||||
sc = &job.StatusChange{
|
||||
Status: job.SuccessStatus.String(),
|
||||
Metadata: &job.StatsInfo{
|
||||
Revision: time.Now().Unix(),
|
||||
},
|
||||
|
||||
// test update status non-immediately when receive the hook
|
||||
{
|
||||
h.execDAO.On("AsyncRefreshStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
sc = &job.StatusChange{
|
||||
Status: job.SuccessStatus.String(),
|
||||
Metadata: &job.StatsInfo{
|
||||
Revision: time.Now().Unix(),
|
||||
},
|
||||
}
|
||||
err = h.handler.Handle(nil, sc)
|
||||
h.Require().Nil(err)
|
||||
h.taskDAO.AssertExpectations(h.T())
|
||||
h.execDAO.AssertExpectations(h.T())
|
||||
}
|
||||
err = h.handler.Handle(nil, sc)
|
||||
h.Require().Nil(err)
|
||||
h.taskDAO.AssertExpectations(h.T())
|
||||
h.execDAO.AssertExpectations(h.T())
|
||||
}
|
||||
|
||||
func TestHookHandlerTestSuite(t *testing.T) {
|
||||
|
@ -16,6 +16,20 @@ type mockExecutionDAO struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// AsyncRefreshStatus provides a mock function with given fields: ctx, id, vendor
|
||||
func (_m *mockExecutionDAO) AsyncRefreshStatus(ctx context.Context, id int64, vendor string) error {
|
||||
ret := _m.Called(ctx, id, vendor)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64, string) error); ok {
|
||||
r0 = rf(ctx, id, vendor)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Count provides a mock function with given fields: ctx, query
|
||||
func (_m *mockExecutionDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
|
||||
ret := _m.Called(ctx, query)
|
||||
|
@ -44,6 +44,8 @@ class TestLogRotation(unittest.TestCase, object):
|
||||
latest_job = self.purge.get_latest_purge_job()
|
||||
self.purge.stop_purge_execution(latest_job.id)
|
||||
# 3. Verify purge audit log job status is Stopped
|
||||
# wait more 5s for status update after stop
|
||||
time.sleep(5)
|
||||
job_status = self.purge.get_purge_job(latest_job.id).job_status
|
||||
self.assertEqual(self.purge.get_purge_job(latest_job.id).job_status, "Stopped")
|
||||
# 4. Create a purge audit log job
|
||||
|
@ -59,6 +59,7 @@ sudo make compile build prepare COMPILETAG=compile_golangimage GOBUILDTAGS="incl
|
||||
|
||||
# set the debugging env
|
||||
echo "GC_TIME_WINDOW_HOURS=0" | sudo tee -a ./make/common/config/core/env
|
||||
echo "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS=5" | sudo tee -a ./make/common/config/core/env
|
||||
sudo make start
|
||||
|
||||
# waiting 5 minutes to start
|
||||
|
Loading…
Reference in New Issue
Block a user