mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-27 02:58:05 +01:00
Merge pull request #15543 from steven-zou/feat/sync-schedules
feat(schedule):sync schedules in db to js datastore
This commit is contained in:
commit
8e1ffd022c
@ -63,10 +63,10 @@ type Context interface {
|
||||
// flag to indicate if have command
|
||||
OPCommand() (OPCommand, bool)
|
||||
|
||||
// Return the logger
|
||||
// GetLogger returns the logger
|
||||
GetLogger() logger.Interface
|
||||
|
||||
// Get tracker
|
||||
// Tracker of job.
|
||||
Tracker() Tracker
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,8 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
)
|
||||
@ -78,6 +80,16 @@ type ACK struct {
|
||||
CheckInAt int64 `json:"check_in_at"`
|
||||
}
|
||||
|
||||
// JSON of ACK.
|
||||
func (a *ACK) JSON() string {
|
||||
str, err := json.Marshal(a)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return string(str)
|
||||
}
|
||||
|
||||
// ActionRequest defines for triggering job action like stop/cancel.
|
||||
type ActionRequest struct {
|
||||
Action string `json:"action"`
|
||||
|
@ -335,10 +335,20 @@ func (bt *basicTracker) Save() (err error) {
|
||||
}
|
||||
// Set update timestamp
|
||||
args = append(args, "update_time", time.Now().Unix())
|
||||
// Set the first revision
|
||||
args = append(args, "revision", time.Now().Unix())
|
||||
// Set the first revision if it is not set.
|
||||
rev := time.Now().Unix()
|
||||
if stats.Info.Revision > 0 {
|
||||
rev = stats.Info.Revision
|
||||
}
|
||||
args = append(args, "revision", rev)
|
||||
|
||||
// ACK data is saved/updated not via tracker, so ignore the ACK saving
|
||||
// For restoring if ACK is not nil.
|
||||
if stats.Info.HookAck != nil {
|
||||
ack := stats.Info.HookAck.JSON()
|
||||
if len(ack) > 0 {
|
||||
args = append(args, "ack")
|
||||
}
|
||||
}
|
||||
|
||||
// Do it in a transaction
|
||||
err = conn.Send("MULTI")
|
||||
|
17
src/jobservice/mgt/mock.go
Normal file
17
src/jobservice/mgt/mock.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mgt
|
||||
|
||||
//go:generate mockery --name Manager --output . --outpkg mgt --filename mock_manager.go --structname MockManager --inpackage
|
142
src/jobservice/mgt/mock_manager.go
Normal file
142
src/jobservice/mgt/mock_manager.go
Normal file
@ -0,0 +1,142 @@
|
||||
// Code generated by mockery v2.1.0. DO NOT EDIT.
|
||||
|
||||
package mgt
|
||||
|
||||
import (
|
||||
job "github.com/goharbor/harbor/src/jobservice/job"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
query "github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
)
|
||||
|
||||
// MockManager is an autogenerated mock type for the Manager type
|
||||
type MockManager struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// GetJob provides a mock function with given fields: jobID
|
||||
func (_m *MockManager) GetJob(jobID string) (*job.Stats, error) {
|
||||
ret := _m.Called(jobID)
|
||||
|
||||
var r0 *job.Stats
|
||||
if rf, ok := ret.Get(0).(func(string) *job.Stats); ok {
|
||||
r0 = rf(jobID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*job.Stats)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||
r1 = rf(jobID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetJobs provides a mock function with given fields: q
|
||||
func (_m *MockManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
ret := _m.Called(q)
|
||||
|
||||
var r0 []*job.Stats
|
||||
if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok {
|
||||
r0 = rf(q)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*job.Stats)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 int64
|
||||
if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok {
|
||||
r1 = rf(q)
|
||||
} else {
|
||||
r1 = ret.Get(1).(int64)
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok {
|
||||
r2 = rf(q)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// GetPeriodicExecution provides a mock function with given fields: pID, q
|
||||
func (_m *MockManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
ret := _m.Called(pID, q)
|
||||
|
||||
var r0 []*job.Stats
|
||||
if rf, ok := ret.Get(0).(func(string, *query.Parameter) []*job.Stats); ok {
|
||||
r0 = rf(pID, q)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*job.Stats)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 int64
|
||||
if rf, ok := ret.Get(1).(func(string, *query.Parameter) int64); ok {
|
||||
r1 = rf(pID, q)
|
||||
} else {
|
||||
r1 = ret.Get(1).(int64)
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(string, *query.Parameter) error); ok {
|
||||
r2 = rf(pID, q)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// GetScheduledJobs provides a mock function with given fields: q
|
||||
func (_m *MockManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
ret := _m.Called(q)
|
||||
|
||||
var r0 []*job.Stats
|
||||
if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok {
|
||||
r0 = rf(q)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*job.Stats)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 int64
|
||||
if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok {
|
||||
r1 = rf(q)
|
||||
} else {
|
||||
r1 = ret.Get(1).(int64)
|
||||
}
|
||||
|
||||
var r2 error
|
||||
if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok {
|
||||
r2 = rf(q)
|
||||
} else {
|
||||
r2 = ret.Error(2)
|
||||
}
|
||||
|
||||
return r0, r1, r2
|
||||
}
|
||||
|
||||
// SaveJob provides a mock function with given fields: _a0
|
||||
func (_m *MockManager) SaveJob(_a0 *job.Stats) error {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(*job.Stats) error); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
@ -16,6 +16,7 @@ package period
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
@ -86,7 +87,7 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
pid := time.Now().Unix()
|
||||
pid := time.Now().Unix() + rand.Int63n(10)
|
||||
|
||||
// Save to redis db
|
||||
if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil {
|
||||
|
17
src/jobservice/period/mock.go
Normal file
17
src/jobservice/period/mock.go
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package period
|
||||
|
||||
//go:generate mockery --name Scheduler --output . --outpkg period --filename mock_scheduler.go --structname MockScheduler --inpackage
|
50
src/jobservice/period/mock_scheduler.go
Normal file
50
src/jobservice/period/mock_scheduler.go
Normal file
@ -0,0 +1,50 @@
|
||||
// Code generated by mockery v2.1.0. DO NOT EDIT.
|
||||
|
||||
package period
|
||||
|
||||
import mock "github.com/stretchr/testify/mock"
|
||||
|
||||
// MockScheduler is an autogenerated mock type for the Scheduler type
|
||||
type MockScheduler struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Schedule provides a mock function with given fields: policy
|
||||
func (_m *MockScheduler) Schedule(policy *Policy) (int64, error) {
|
||||
ret := _m.Called(policy)
|
||||
|
||||
var r0 int64
|
||||
if rf, ok := ret.Get(0).(func(*Policy) int64); ok {
|
||||
r0 = rf(policy)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(*Policy) error); ok {
|
||||
r1 = rf(policy)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Start provides a mock function with given fields:
|
||||
func (_m *MockScheduler) Start() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// UnSchedule provides a mock function with given fields: policyID
|
||||
func (_m *MockScheduler) UnSchedule(policyID string) error {
|
||||
ret := _m.Called(policyID)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string) error); ok {
|
||||
r0 = rf(policyID)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -40,6 +41,8 @@ import (
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"github.com/goharbor/harbor/src/jobservice/migration"
|
||||
"github.com/goharbor/harbor/src/jobservice/period"
|
||||
sync2 "github.com/goharbor/harbor/src/jobservice/sync"
|
||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||
"github.com/goharbor/harbor/src/jobservice/worker/cworker"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
@ -49,6 +52,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/pkg/retention"
|
||||
"github.com/goharbor/harbor/src/pkg/scan"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
@ -59,7 +63,9 @@ const (
|
||||
)
|
||||
|
||||
// JobService ...
|
||||
var JobService = &Bootstrap{}
|
||||
var JobService = &Bootstrap{
|
||||
syncEnabled: true,
|
||||
}
|
||||
|
||||
// workerPoolID
|
||||
var workerPoolID string
|
||||
@ -67,6 +73,7 @@ var workerPoolID string
|
||||
// Bootstrap is coordinating process to help load and start the other components to serve.
|
||||
type Bootstrap struct {
|
||||
jobContextInitializer job.ContextInitializer
|
||||
syncEnabled bool
|
||||
}
|
||||
|
||||
// SetJobContextInitializer set the job context initializer
|
||||
@ -103,6 +110,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
var (
|
||||
backendWorker worker.Interface
|
||||
manager mgt.Manager
|
||||
syncWorker *sync2.Worker
|
||||
)
|
||||
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
||||
// Number of workers
|
||||
@ -175,6 +183,29 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
if err = lcmCtl.Serve(); err != nil {
|
||||
return errors.Errorf("start life cycle controller error: %s", err)
|
||||
}
|
||||
|
||||
// Initialize sync worker
|
||||
if bs.syncEnabled {
|
||||
syncWorker = sync2.New(3).
|
||||
WithContext(rootContext).
|
||||
UseManager(manager).
|
||||
UseScheduler(period.NewScheduler(rootContext.SystemContext, namespace, redisPool, lcmCtl)).
|
||||
WithCoreInternalAddr(strings.TrimSuffix(config.GetCoreURL(), "/")).
|
||||
UseCoreScheduler(scheduler.Sched).
|
||||
UseCoreExecutionManager(task.ExecMgr).
|
||||
UseCoreTaskManager(task.Mgr).
|
||||
WithPolicyLoader(func() ([]*period.Policy, error) {
|
||||
conn := redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return period.Load(namespace, conn)
|
||||
})
|
||||
// Start sync worker
|
||||
// Not block the regular process.
|
||||
if err := syncWorker.Start(); err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend)
|
||||
}
|
||||
@ -225,7 +256,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
sig <- os.Interrupt
|
||||
}
|
||||
|
||||
// Wait everyone exit
|
||||
// Wait everyone exits.
|
||||
rootContext.WG.Wait()
|
||||
|
||||
return
|
||||
|
@ -17,14 +17,16 @@ package runtime
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BootStrapTestSuite tests bootstrap
|
||||
@ -38,6 +40,8 @@ type BootStrapTestSuite struct {
|
||||
|
||||
// SetupSuite prepares test suite
|
||||
func (suite *BootStrapTestSuite) SetupSuite() {
|
||||
dao.PrepareTestForPostgresSQL()
|
||||
|
||||
// Load configurations
|
||||
err := config.DefaultConfig.Load("../config_test.yml", true)
|
||||
require.NoError(suite.T(), err, "load configurations error: %s", err)
|
||||
@ -51,7 +55,9 @@ func (suite *BootStrapTestSuite) SetupSuite() {
|
||||
err = logger.Init(suite.ctx)
|
||||
require.NoError(suite.T(), err, "init logger: nil error expected but got %s", err)
|
||||
|
||||
suite.jobService = &Bootstrap{}
|
||||
suite.jobService = &Bootstrap{
|
||||
syncEnabled: false,
|
||||
}
|
||||
suite.jobService.SetJobContextInitializer(nil)
|
||||
}
|
||||
|
||||
|
368
src/jobservice/sync/schedule.go
Normal file
368
src/jobservice/sync/schedule.go
Normal file
@ -0,0 +1,368 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
o "github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"github.com/goharbor/harbor/src/jobservice/period"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
)
|
||||
|
||||
// PolicyLoader is a func template to load schedule policies from js datastore.
|
||||
type PolicyLoader func() ([]*period.Policy, error)
|
||||
|
||||
// Worker is designed to sync the schedules in the database into jobservice datastore.
|
||||
type Worker struct {
|
||||
// context of Worker.
|
||||
context *env.Context
|
||||
// Indicate whether a new round should be run.
|
||||
lastErr error
|
||||
// How many rounds have been run.
|
||||
round uint8
|
||||
// The max number of rounds for repeated runs.
|
||||
maxRounds uint8
|
||||
// Periodical job scheduler.
|
||||
scheduler period.Scheduler
|
||||
// Job stats manager.
|
||||
manager mgt.Manager
|
||||
// Internal addr of core.
|
||||
internalCoreAddr string
|
||||
// Scheduler from core.
|
||||
coreScheduler scheduler.Scheduler
|
||||
// Execution manager from core.
|
||||
coreExecutionManager task.ExecutionManager
|
||||
// Task manager from core
|
||||
coreTaskManager task.Manager
|
||||
// Loader for loading polices from the js store.
|
||||
policyLoader PolicyLoader
|
||||
}
|
||||
|
||||
// New sync worker.
|
||||
func New(maxRounds uint8) *Worker {
|
||||
return &Worker{
|
||||
maxRounds: maxRounds,
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext set context.
|
||||
func (w *Worker) WithContext(ctx *env.Context) *Worker {
|
||||
w.context = ctx
|
||||
return w
|
||||
}
|
||||
|
||||
// UseScheduler refers the period.Scheduler.
|
||||
func (w *Worker) UseScheduler(scheduler period.Scheduler) *Worker {
|
||||
w.scheduler = scheduler
|
||||
return w
|
||||
}
|
||||
|
||||
// WithCoreInternalAddr sets the internal addr of core.
|
||||
func (w *Worker) WithCoreInternalAddr(addr string) *Worker {
|
||||
w.internalCoreAddr = addr
|
||||
return w
|
||||
}
|
||||
|
||||
// UseManager refers the mgt.Manager.
|
||||
func (w *Worker) UseManager(mgr mgt.Manager) *Worker {
|
||||
w.manager = mgr
|
||||
return w
|
||||
}
|
||||
|
||||
// UseCoreScheduler refers the core scheduler.
|
||||
func (w *Worker) UseCoreScheduler(scheduler scheduler.Scheduler) *Worker {
|
||||
w.coreScheduler = scheduler
|
||||
return w
|
||||
}
|
||||
|
||||
// UseCoreExecutionManager refers the core execution manager.
|
||||
func (w *Worker) UseCoreExecutionManager(executionMgr task.ExecutionManager) *Worker {
|
||||
w.coreExecutionManager = executionMgr
|
||||
return w
|
||||
}
|
||||
|
||||
// UseCoreTaskManager refers the core task manager.
|
||||
func (w *Worker) UseCoreTaskManager(taskManager task.Manager) *Worker {
|
||||
w.coreTaskManager = taskManager
|
||||
return w
|
||||
}
|
||||
|
||||
// WithPolicyLoader determines the policy loader func.
|
||||
func (w *Worker) WithPolicyLoader(loader PolicyLoader) *Worker {
|
||||
w.policyLoader = loader
|
||||
return w
|
||||
}
|
||||
|
||||
// Start the loop in none-blocking way.
|
||||
func (w *Worker) Start() error {
|
||||
if err := w.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.context.WG.Add(1)
|
||||
// Run
|
||||
go func() {
|
||||
defer func() {
|
||||
w.context.WG.Done()
|
||||
}()
|
||||
|
||||
ctx := orm.NewContext(w.context.SystemContext, o.NewOrm())
|
||||
ctlChan := make(chan struct{}, 1)
|
||||
ctlChan <- struct{}{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctlChan:
|
||||
w.round++
|
||||
if w.round == w.maxRounds {
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.Run(ctx); err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for a while and retry then.
|
||||
time.AfterFunc(1*time.Minute, func() {
|
||||
ctlChan <- struct{}{}
|
||||
})
|
||||
case <-w.context.SystemContext.Done():
|
||||
logger.Info("Context cancel signal received:sync worker exit")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run one round.
|
||||
func (w *Worker) Run(ctx context.Context) error {
|
||||
// Start sync schedules.
|
||||
logger.Infof("Start to sync schedules in database to jobservice: round[%d].", w.round)
|
||||
|
||||
// Get all the schedules from the database first.
|
||||
// Use the default scheduler.
|
||||
schedules, err := w.coreScheduler.ListSchedules(ctx, &q.Query{})
|
||||
if err != nil {
|
||||
// We can not proceed.
|
||||
// A non-nil error will cause a follow-up retry later.
|
||||
return errors.Wrap(err, "list all the schedules in the database")
|
||||
}
|
||||
|
||||
// Exit earlier if no schedules found.
|
||||
if len(schedules) == 0 {
|
||||
// Log and gracefully exit.
|
||||
logger.Info("No schedules found in the database.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get schedule records from the jobservice datastore.
|
||||
polices, err := w.policyLoader()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "load schedule records from jobservice store")
|
||||
}
|
||||
|
||||
// Define a function to get the policy with the specified ID.
|
||||
getPolicy := func(jobID string) *period.Policy {
|
||||
for _, p := range polices {
|
||||
if p.ID == jobID {
|
||||
return p
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
jobHash := make(map[string]struct{})
|
||||
restoreCounter := 0
|
||||
clearCounter := 0
|
||||
|
||||
// Sync now.
|
||||
for _, sch := range schedules {
|
||||
// Get the corresponding task.
|
||||
t, err := w.getTask(ctx, sch)
|
||||
if err != nil {
|
||||
// Log and skip
|
||||
logger.Error(err)
|
||||
w.lastErr = err
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Recorded
|
||||
jobHash[t.JobID] = struct{}{}
|
||||
|
||||
// Get policy
|
||||
p := getPolicy(t.JobID)
|
||||
if p == nil {
|
||||
// Need to restore this missing schedule.
|
||||
if err := w.restore(sch.CRON, t); err != nil {
|
||||
// Log and skip
|
||||
logger.Error(err)
|
||||
w.lastErr = err
|
||||
} else {
|
||||
restoreCounter++
|
||||
logger.Infof("Sync: restore missing schedule: taskID=%d, jobID=%s, cron=%s", t.ID, t.JobID, sch.CRON)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the dirty ones.
|
||||
for _, p := range polices {
|
||||
_, ok := jobHash[p.ID]
|
||||
if p.JobName == scheduler.JobNameScheduler && !ok {
|
||||
if err := w.scheduler.UnSchedule(p.ID); err != nil {
|
||||
logger.Error(err)
|
||||
} else {
|
||||
clearCounter++
|
||||
logger.Infof("Sync: unschedule dirty schedule: %s:%s", p.JobName, p.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("End sync schedules in database to jobservice: round[%d].", w.round)
|
||||
logger.Infof("Found %d schedules, restore %d missing schedules, clear %d dirty schedules", len(schedules), restoreCounter, clearCounter)
|
||||
|
||||
return w.lastErr
|
||||
}
|
||||
|
||||
func (w *Worker) restore(cron string, t *task.Task) error {
|
||||
p := &period.Policy{
|
||||
ID: t.JobID,
|
||||
JobName: scheduler.JobNameScheduler,
|
||||
CronSpec: cron,
|
||||
WebHookURL: fmt.Sprintf("%s/service/notifications/tasks/%d", w.internalCoreAddr, t.ID),
|
||||
}
|
||||
|
||||
// Schedule the policy.
|
||||
numericID, err := w.scheduler.Schedule(p)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "schedule policy")
|
||||
}
|
||||
|
||||
res := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: p.ID,
|
||||
JobName: p.JobName,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
JobKind: job.KindPeriodic,
|
||||
CronSpec: cron,
|
||||
WebHookURL: p.WebHookURL,
|
||||
NumericPID: numericID,
|
||||
EnqueueTime: time.Now().Unix(),
|
||||
UpdateTime: time.Now().Unix(),
|
||||
RefLink: fmt.Sprintf("/api/v1/jobs/%s", p.ID),
|
||||
Revision: t.StatusRevision,
|
||||
},
|
||||
}
|
||||
|
||||
// Keep status synced.
|
||||
if res.Info.Revision > 0 {
|
||||
res.Info.HookAck = &job.ACK{
|
||||
Revision: t.StatusRevision,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
}
|
||||
}
|
||||
|
||||
// Save the stats.
|
||||
return w.manager.SaveJob(res)
|
||||
}
|
||||
|
||||
// validate whether the worker is ready or not.
|
||||
func (w *Worker) validate() error {
|
||||
if w.context == nil {
|
||||
return errors.New("missing context")
|
||||
}
|
||||
|
||||
if w.manager == nil {
|
||||
return errors.New("missing stats manager")
|
||||
}
|
||||
|
||||
if w.scheduler == nil {
|
||||
return errors.New("missing period scheduler")
|
||||
}
|
||||
|
||||
if len(w.internalCoreAddr) == 0 {
|
||||
return errors.New("empty internal addr of core")
|
||||
}
|
||||
|
||||
if w.coreScheduler == nil {
|
||||
return errors.New("missing core scheduler")
|
||||
}
|
||||
|
||||
if w.coreExecutionManager == nil {
|
||||
return errors.New("missing core execution manager")
|
||||
}
|
||||
|
||||
if w.coreTaskManager == nil {
|
||||
return errors.New("missing core task manager")
|
||||
}
|
||||
|
||||
if w.policyLoader == nil {
|
||||
return errors.New("missing policy loader")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTask gets the task associated with the specified schedule.
|
||||
// Here is an assumption that each schedule has only one execution as well as only one task under the execution.
|
||||
func (w *Worker) getTask(ctx context.Context, schedule *scheduler.Schedule) (*task.Task, error) {
|
||||
// Get associated execution first.
|
||||
executions, err := w.coreExecutionManager.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"vendor_type": scheduler.JobNameScheduler,
|
||||
"vendor_id": schedule.ID,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(executions) == 0 {
|
||||
return nil, errors.Errorf("no execution found for schedule: %s:%d", schedule.VendorType, schedule.VendorID)
|
||||
}
|
||||
|
||||
theOne := executions[0]
|
||||
// Now get the execution.
|
||||
tasks, err := w.coreTaskManager.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"execution_id": theOne.ID,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(tasks) == 0 {
|
||||
return nil, errors.Errorf("no task found for execution: %s:%d", schedule.VendorType, theOne.ID)
|
||||
}
|
||||
|
||||
return tasks[0], nil
|
||||
}
|
138
src/jobservice/sync/schedule_test.go
Normal file
138
src/jobservice/sync/schedule_test.go
Normal file
@ -0,0 +1,138 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"github.com/goharbor/harbor/src/jobservice/period"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"github.com/goharbor/harbor/src/testing/mock"
|
||||
ts "github.com/goharbor/harbor/src/testing/pkg/scheduler"
|
||||
tt "github.com/goharbor/harbor/src/testing/pkg/task"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
// WorkerTestSuite is test suite for testing sync.Worker.
|
||||
type WorkerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
worker *Worker
|
||||
}
|
||||
|
||||
// TestWorker is the entry method of WorkerTestSuite.
|
||||
func TestWorker(t *testing.T) {
|
||||
suite.Run(t, &WorkerTestSuite{})
|
||||
}
|
||||
|
||||
// SetupSuite sets up suite.
|
||||
func (suite *WorkerTestSuite) SetupSuite() {
|
||||
sysContext := context.TODO()
|
||||
|
||||
dao.PrepareTestForPostgresSQL()
|
||||
|
||||
getPolicies := func() ([]*period.Policy, error) {
|
||||
return []*period.Policy{
|
||||
// Dirty data in js datastore.
|
||||
{
|
||||
ID: "8ff2aabb977077b84b4d5f1b",
|
||||
JobName: scheduler.JobNameScheduler,
|
||||
CronSpec: "0 0 0 * * 0",
|
||||
WebHookURL: "http://core:8080/service/notifications/tasks/250",
|
||||
NumericID: 1630667250,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Mock methods
|
||||
//
|
||||
tss := &ts.Scheduler{}
|
||||
tss.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler.Schedule{
|
||||
{
|
||||
ID: 550,
|
||||
CRON: "0 0 0 * * *",
|
||||
},
|
||||
}, nil)
|
||||
|
||||
// The missing schedule in database.
|
||||
tte := &tt.ExecutionManager{}
|
||||
tte.On("List", mock.Anything, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"vendor_type": scheduler.JobNameScheduler,
|
||||
"vendor_id": (int64)(550),
|
||||
},
|
||||
}).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1550,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
ttm := &tt.Manager{}
|
||||
ttm.On("List", mock.Anything, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"execution_id": (int64)(1550),
|
||||
},
|
||||
}).Return([]*task.Task{
|
||||
{
|
||||
ID: 2550,
|
||||
ExecutionID: 1550,
|
||||
JobID: "f754ccdd123664b2acb971d9",
|
||||
},
|
||||
}, nil)
|
||||
|
||||
pms := &period.MockScheduler{}
|
||||
pms.On("Schedule", &period.Policy{
|
||||
ID: "f754ccdd123664b2acb971d9",
|
||||
JobName: scheduler.JobNameScheduler,
|
||||
CronSpec: "0 0 0 * * *",
|
||||
WebHookURL: "http://core:8080/service/notifications/tasks/2550",
|
||||
}).Return((int64)(1630667500), nil)
|
||||
pms.On("UnSchedule", "8ff2aabb977077b84b4d5f1b").Return(nil)
|
||||
|
||||
mmm := &mgt.MockManager{}
|
||||
mmm.On("SaveJob", mock.Anything).Return(nil)
|
||||
|
||||
suite.worker = New(3).
|
||||
WithContext(&env.Context{
|
||||
SystemContext: sysContext,
|
||||
WG: &sync.WaitGroup{},
|
||||
ErrorChan: make(chan error, 1),
|
||||
}).UseCoreScheduler(tss).
|
||||
UseCoreExecutionManager(tte).
|
||||
UseCoreTaskManager(ttm).
|
||||
UseScheduler(pms).
|
||||
UseManager(mmm).
|
||||
WithCoreInternalAddr("http://core:8080").
|
||||
WithPolicyLoader(getPolicies)
|
||||
}
|
||||
|
||||
// TestStart test Start().
|
||||
func (suite *WorkerTestSuite) TestStart() {
|
||||
err := suite.worker.Start()
|
||||
suite.NoError(err, "start worker")
|
||||
}
|
||||
|
||||
// TestRun test Run().
|
||||
func (suite *WorkerTestSuite) TestRun() {
|
||||
err := suite.worker.Run(context.TODO())
|
||||
suite.NoError(err, "run worker")
|
||||
}
|
@ -82,9 +82,10 @@ type Task struct {
|
||||
// the time that the task record created
|
||||
CreationTime time.Time `json:"creation_time"`
|
||||
// the time that the underlying job starts
|
||||
StartTime time.Time `json:"start_time"`
|
||||
UpdateTime time.Time `json:"update_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
UpdateTime time.Time `json:"update_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
StatusRevision int64 `json:"status_revision"`
|
||||
}
|
||||
|
||||
// From constructs a task from DAO model
|
||||
@ -100,6 +101,7 @@ func (t *Task) From(task *dao.Task) {
|
||||
t.StartTime = task.StartTime
|
||||
t.UpdateTime = task.UpdateTime
|
||||
t.EndTime = task.EndTime
|
||||
t.StatusRevision = task.StatusRevision
|
||||
if len(task.ExtraAttrs) > 0 {
|
||||
extras := map[string]interface{}{}
|
||||
if err := json.Unmarshal([]byte(task.ExtraAttrs), &extras); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user