feat(schedule):sync schedules in db to js datastore

- add a sync worker to sync db schedules when js starting. add missing ones and clear dirty ones.
- update task model to contain status revision info
- update job lifecycle tracker save() method
- update job ACK model
- add UT cases
- update malformat comments

fix #15323

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2021-09-03 19:39:44 +08:00
parent ff617950b7
commit 6b5cd3a7a5
13 changed files with 808 additions and 14 deletions

View File

@ -63,10 +63,10 @@ type Context interface {
// flag to indicate if have command
OPCommand() (OPCommand, bool)
// Return the logger
// GetLogger returns the logger
GetLogger() logger.Interface
// Get tracker
// Tracker of job.
Tracker() Tracker
}

View File

@ -15,6 +15,8 @@
package job
import (
"encoding/json"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/lib/errors"
)
@ -78,6 +80,16 @@ type ACK struct {
CheckInAt int64 `json:"check_in_at"`
}
// JSON of ACK.
func (a *ACK) JSON() string {
str, err := json.Marshal(a)
if err != nil {
return ""
}
return string(str)
}
// ActionRequest defines for triggering job action like stop/cancel.
type ActionRequest struct {
Action string `json:"action"`

View File

@ -335,10 +335,20 @@ func (bt *basicTracker) Save() (err error) {
}
// Set update timestamp
args = append(args, "update_time", time.Now().Unix())
// Set the first revision
args = append(args, "revision", time.Now().Unix())
// Set the first revision if it is not set.
rev := time.Now().Unix()
if stats.Info.Revision > 0 {
rev = stats.Info.Revision
}
args = append(args, "revision", rev)
// ACK data is saved/updated not via tracker, so ignore the ACK saving
// For restoring if ACK is not nil.
if stats.Info.HookAck != nil {
ack := stats.Info.HookAck.JSON()
if len(ack) > 0 {
args = append(args, "ack")
}
}
// Do it in a transaction
err = conn.Send("MULTI")

View File

@ -0,0 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mgt
//go:generate mockery --name Manager --output . --outpkg mgt --filename mock_manager.go --structname MockManager --inpackage

View File

@ -0,0 +1,142 @@
// Code generated by mockery v2.1.0. DO NOT EDIT.
package mgt
import (
job "github.com/goharbor/harbor/src/jobservice/job"
mock "github.com/stretchr/testify/mock"
query "github.com/goharbor/harbor/src/jobservice/common/query"
)
// MockManager is an autogenerated mock type for the Manager type
type MockManager struct {
mock.Mock
}
// GetJob provides a mock function with given fields: jobID
func (_m *MockManager) GetJob(jobID string) (*job.Stats, error) {
ret := _m.Called(jobID)
var r0 *job.Stats
if rf, ok := ret.Get(0).(func(string) *job.Stats); ok {
r0 = rf(jobID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*job.Stats)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(jobID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetJobs provides a mock function with given fields: q
func (_m *MockManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
ret := _m.Called(q)
var r0 []*job.Stats
if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok {
r0 = rf(q)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*job.Stats)
}
}
var r1 int64
if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok {
r1 = rf(q)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok {
r2 = rf(q)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// GetPeriodicExecution provides a mock function with given fields: pID, q
func (_m *MockManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
ret := _m.Called(pID, q)
var r0 []*job.Stats
if rf, ok := ret.Get(0).(func(string, *query.Parameter) []*job.Stats); ok {
r0 = rf(pID, q)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*job.Stats)
}
}
var r1 int64
if rf, ok := ret.Get(1).(func(string, *query.Parameter) int64); ok {
r1 = rf(pID, q)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(string, *query.Parameter) error); ok {
r2 = rf(pID, q)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// GetScheduledJobs provides a mock function with given fields: q
func (_m *MockManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
ret := _m.Called(q)
var r0 []*job.Stats
if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok {
r0 = rf(q)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*job.Stats)
}
}
var r1 int64
if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok {
r1 = rf(q)
} else {
r1 = ret.Get(1).(int64)
}
var r2 error
if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok {
r2 = rf(q)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// SaveJob provides a mock function with given fields: _a0
func (_m *MockManager) SaveJob(_a0 *job.Stats) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(*job.Stats) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -16,6 +16,7 @@ package period
import (
"context"
"math/rand"
"time"
"github.com/goharbor/harbor/src/jobservice/errs"
@ -86,7 +87,7 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) {
return -1, err
}
pid := time.Now().Unix()
pid := time.Now().Unix() + rand.Int63n(10)
// Save to redis db
if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil {

View File

@ -0,0 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period
//go:generate mockery --name Scheduler --output . --outpkg period --filename mock_scheduler.go --structname MockScheduler --inpackage

View File

@ -0,0 +1,50 @@
// Code generated by mockery v2.1.0. DO NOT EDIT.
package period
import mock "github.com/stretchr/testify/mock"
// MockScheduler is an autogenerated mock type for the Scheduler type
type MockScheduler struct {
mock.Mock
}
// Schedule provides a mock function with given fields: policy
func (_m *MockScheduler) Schedule(policy *Policy) (int64, error) {
ret := _m.Called(policy)
var r0 int64
if rf, ok := ret.Get(0).(func(*Policy) int64); ok {
r0 = rf(policy)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(*Policy) error); ok {
r1 = rf(policy)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Start provides a mock function with given fields:
func (_m *MockScheduler) Start() {
_m.Called()
}
// UnSchedule provides a mock function with given fields: policyID
func (_m *MockScheduler) UnSchedule(policyID string) error {
ret := _m.Called(policyID)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(policyID)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -19,6 +19,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
@ -40,6 +41,8 @@ import (
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/mgt"
"github.com/goharbor/harbor/src/jobservice/migration"
"github.com/goharbor/harbor/src/jobservice/period"
sync2 "github.com/goharbor/harbor/src/jobservice/sync"
"github.com/goharbor/harbor/src/jobservice/worker"
"github.com/goharbor/harbor/src/jobservice/worker/cworker"
"github.com/goharbor/harbor/src/lib/errors"
@ -49,6 +52,7 @@ import (
"github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/pkg/scan"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/gomodule/redigo/redis"
)
@ -59,7 +63,9 @@ const (
)
// JobService ...
var JobService = &Bootstrap{}
var JobService = &Bootstrap{
syncEnabled: true,
}
// workerPoolID
var workerPoolID string
@ -67,6 +73,7 @@ var workerPoolID string
// Bootstrap is coordinating process to help load and start the other components to serve.
type Bootstrap struct {
jobContextInitializer job.ContextInitializer
syncEnabled bool
}
// SetJobContextInitializer set the job context initializer
@ -103,6 +110,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
var (
backendWorker worker.Interface
manager mgt.Manager
syncWorker *sync2.Worker
)
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
// Number of workers
@ -175,6 +183,29 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
if err = lcmCtl.Serve(); err != nil {
return errors.Errorf("start life cycle controller error: %s", err)
}
// Initialize sync worker
if bs.syncEnabled {
syncWorker = sync2.New(3).
WithContext(rootContext).
UseManager(manager).
UseScheduler(period.NewScheduler(rootContext.SystemContext, namespace, redisPool, lcmCtl)).
WithCoreInternalAddr(strings.TrimSuffix(config.GetCoreURL(), "/")).
UseCoreScheduler(scheduler.Sched).
UseCoreExecutionManager(task.ExecMgr).
UseCoreTaskManager(task.Mgr).
WithPolicyLoader(func() ([]*period.Policy, error) {
conn := redisPool.Get()
defer conn.Close()
return period.Load(namespace, conn)
})
// Start sync worker
// Not block the regular process.
if err := syncWorker.Start(); err != nil {
logger.Error(err)
}
}
} else {
return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend)
}
@ -225,7 +256,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
sig <- os.Interrupt
}
// Wait everyone exit
// Wait everyone exits.
rootContext.WG.Wait()
return

View File

@ -17,14 +17,16 @@ package runtime
import (
"context"
"fmt"
"testing"
"time"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)
// BootStrapTestSuite tests bootstrap
@ -38,6 +40,8 @@ type BootStrapTestSuite struct {
// SetupSuite prepares test suite
func (suite *BootStrapTestSuite) SetupSuite() {
dao.PrepareTestForPostgresSQL()
// Load configurations
err := config.DefaultConfig.Load("../config_test.yml", true)
require.NoError(suite.T(), err, "load configurations error: %s", err)
@ -51,7 +55,9 @@ func (suite *BootStrapTestSuite) SetupSuite() {
err = logger.Init(suite.ctx)
require.NoError(suite.T(), err, "init logger: nil error expected but got %s", err)
suite.jobService = &Bootstrap{}
suite.jobService = &Bootstrap{
syncEnabled: false,
}
suite.jobService.SetJobContextInitializer(nil)
}

View File

@ -0,0 +1,368 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"context"
"fmt"
"time"
o "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/mgt"
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
)
// PolicyLoader is a func template to load schedule policies from js datastore.
type PolicyLoader func() ([]*period.Policy, error)
// Worker is designed to sync the schedules in the database into jobservice datastore.
type Worker struct {
// context of Worker.
context *env.Context
// Indicate whether a new round should be run.
lastErr error
// How many rounds have been run.
round uint8
// The max number of rounds for repeated runs.
maxRounds uint8
// Periodical job scheduler.
scheduler period.Scheduler
// Job stats manager.
manager mgt.Manager
// Internal addr of core.
internalCoreAddr string
// Scheduler from core.
coreScheduler scheduler.Scheduler
// Execution manager from core.
coreExecutionManager task.ExecutionManager
// Task manager from core
coreTaskManager task.Manager
// Loader for loading polices from the js store.
policyLoader PolicyLoader
}
// New sync worker.
func New(maxRounds uint8) *Worker {
return &Worker{
maxRounds: maxRounds,
}
}
// WithContext set context.
func (w *Worker) WithContext(ctx *env.Context) *Worker {
w.context = ctx
return w
}
// UseScheduler refers the period.Scheduler.
func (w *Worker) UseScheduler(scheduler period.Scheduler) *Worker {
w.scheduler = scheduler
return w
}
// WithCoreInternalAddr sets the internal addr of core.
func (w *Worker) WithCoreInternalAddr(addr string) *Worker {
w.internalCoreAddr = addr
return w
}
// UseManager refers the mgt.Manager.
func (w *Worker) UseManager(mgr mgt.Manager) *Worker {
w.manager = mgr
return w
}
// UseCoreScheduler refers the core scheduler.
func (w *Worker) UseCoreScheduler(scheduler scheduler.Scheduler) *Worker {
w.coreScheduler = scheduler
return w
}
// UseCoreExecutionManager refers the core execution manager.
func (w *Worker) UseCoreExecutionManager(executionMgr task.ExecutionManager) *Worker {
w.coreExecutionManager = executionMgr
return w
}
// UseCoreTaskManager refers the core task manager.
func (w *Worker) UseCoreTaskManager(taskManager task.Manager) *Worker {
w.coreTaskManager = taskManager
return w
}
// WithPolicyLoader determines the policy loader func.
func (w *Worker) WithPolicyLoader(loader PolicyLoader) *Worker {
w.policyLoader = loader
return w
}
// Start the loop in none-blocking way.
func (w *Worker) Start() error {
if err := w.validate(); err != nil {
return err
}
w.context.WG.Add(1)
// Run
go func() {
defer func() {
w.context.WG.Done()
}()
ctx := orm.NewContext(w.context.SystemContext, o.NewOrm())
ctlChan := make(chan struct{}, 1)
ctlChan <- struct{}{}
for {
select {
case <-ctlChan:
w.round++
if w.round == w.maxRounds {
return
}
if err := w.Run(ctx); err == nil {
return
}
// Wait for a while and retry then.
time.AfterFunc(1*time.Minute, func() {
ctlChan <- struct{}{}
})
case <-w.context.SystemContext.Done():
logger.Info("Context cancel signal received:sync worker exit")
return
}
}
}()
return nil
}
// Run one round.
func (w *Worker) Run(ctx context.Context) error {
// Start sync schedules.
logger.Infof("Start to sync schedules in database to jobservice: round[%d].", w.round)
// Get all the schedules from the database first.
// Use the default scheduler.
schedules, err := w.coreScheduler.ListSchedules(ctx, &q.Query{})
if err != nil {
// We can not proceed.
// A non-nil error will cause a follow-up retry later.
return errors.Wrap(err, "list all the schedules in the database")
}
// Exit earlier if no schedules found.
if len(schedules) == 0 {
// Log and gracefully exit.
logger.Info("No schedules found in the database.")
return nil
}
// Get schedule records from the jobservice datastore.
polices, err := w.policyLoader()
if err != nil {
return errors.Wrap(err, "load schedule records from jobservice store")
}
// Define a function to get the policy with the specified ID.
getPolicy := func(jobID string) *period.Policy {
for _, p := range polices {
if p.ID == jobID {
return p
}
}
return nil
}
jobHash := make(map[string]struct{})
restoreCounter := 0
clearCounter := 0
// Sync now.
for _, sch := range schedules {
// Get the corresponding task.
t, err := w.getTask(ctx, sch)
if err != nil {
// Log and skip
logger.Error(err)
w.lastErr = err
continue
}
// Recorded
jobHash[t.JobID] = struct{}{}
// Get policy
p := getPolicy(t.JobID)
if p == nil {
// Need to restore this missing schedule.
if err := w.restore(sch.CRON, t); err != nil {
// Log and skip
logger.Error(err)
w.lastErr = err
} else {
restoreCounter++
logger.Infof("Sync: restore missing schedule: taskID=%d, jobID=%s, cron=%s", t.ID, t.JobID, sch.CRON)
}
}
}
// Clear the dirty ones.
for _, p := range polices {
_, ok := jobHash[p.ID]
if p.JobName == scheduler.JobNameScheduler && !ok {
if err := w.scheduler.UnSchedule(p.ID); err != nil {
logger.Error(err)
} else {
clearCounter++
logger.Infof("Sync: unschedule dirty schedule: %s:%s", p.JobName, p.ID)
}
}
}
logger.Infof("End sync schedules in database to jobservice: round[%d].", w.round)
logger.Infof("Found %d schedules, restore %d missing schedules, clear %d dirty schedules", len(schedules), restoreCounter, clearCounter)
return w.lastErr
}
func (w *Worker) restore(cron string, t *task.Task) error {
p := &period.Policy{
ID: t.JobID,
JobName: scheduler.JobNameScheduler,
CronSpec: cron,
WebHookURL: fmt.Sprintf("%s/service/notifications/tasks/%d", w.internalCoreAddr, t.ID),
}
// Schedule the policy.
numericID, err := w.scheduler.Schedule(p)
if err != nil {
return errors.Wrap(err, "schedule policy")
}
res := &job.Stats{
Info: &job.StatsInfo{
JobID: p.ID,
JobName: p.JobName,
Status: job.ScheduledStatus.String(),
JobKind: job.KindPeriodic,
CronSpec: cron,
WebHookURL: p.WebHookURL,
NumericPID: numericID,
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", p.ID),
Revision: t.StatusRevision,
},
}
// Keep status synced.
if res.Info.Revision > 0 {
res.Info.HookAck = &job.ACK{
Revision: t.StatusRevision,
Status: job.ScheduledStatus.String(),
}
}
// Save the stats.
return w.manager.SaveJob(res)
}
// validate whether the worker is ready or not.
func (w *Worker) validate() error {
if w.context == nil {
return errors.New("missing context")
}
if w.manager == nil {
return errors.New("missing stats manager")
}
if w.scheduler == nil {
return errors.New("missing period scheduler")
}
if len(w.internalCoreAddr) == 0 {
return errors.New("empty internal addr of core")
}
if w.coreScheduler == nil {
return errors.New("missing core scheduler")
}
if w.coreExecutionManager == nil {
return errors.New("missing core execution manager")
}
if w.coreTaskManager == nil {
return errors.New("missing core task manager")
}
if w.policyLoader == nil {
return errors.New("missing policy loader")
}
return nil
}
// getTask gets the task associated with the specified schedule.
// Here is an assumption that each schedule has only one execution as well as only one task under the execution.
func (w *Worker) getTask(ctx context.Context, schedule *scheduler.Schedule) (*task.Task, error) {
// Get associated execution first.
executions, err := w.coreExecutionManager.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"vendor_type": scheduler.JobNameScheduler,
"vendor_id": schedule.ID,
},
})
if err != nil {
return nil, err
}
if len(executions) == 0 {
return nil, errors.Errorf("no execution found for schedule: %s:%d", schedule.VendorType, schedule.VendorID)
}
theOne := executions[0]
// Now get the execution.
tasks, err := w.coreTaskManager.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"execution_id": theOne.ID,
},
})
if err != nil {
return nil, err
}
if len(tasks) == 0 {
return nil, errors.Errorf("no task found for execution: %s:%d", schedule.VendorType, theOne.ID)
}
return tasks[0], nil
}

View File

@ -0,0 +1,138 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
import (
"context"
"sync"
"testing"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/mgt"
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/goharbor/harbor/src/testing/mock"
ts "github.com/goharbor/harbor/src/testing/pkg/scheduler"
tt "github.com/goharbor/harbor/src/testing/pkg/task"
"github.com/stretchr/testify/suite"
)
// WorkerTestSuite is test suite for testing sync.Worker.
type WorkerTestSuite struct {
suite.Suite
worker *Worker
}
// TestWorker is the entry method of WorkerTestSuite.
func TestWorker(t *testing.T) {
suite.Run(t, &WorkerTestSuite{})
}
// SetupSuite sets up suite.
func (suite *WorkerTestSuite) SetupSuite() {
sysContext := context.TODO()
dao.PrepareTestForPostgresSQL()
getPolicies := func() ([]*period.Policy, error) {
return []*period.Policy{
// Dirty data in js datastore.
{
ID: "8ff2aabb977077b84b4d5f1b",
JobName: scheduler.JobNameScheduler,
CronSpec: "0 0 0 * * 0",
WebHookURL: "http://core:8080/service/notifications/tasks/250",
NumericID: 1630667250,
},
}, nil
}
// Mock methods
//
tss := &ts.Scheduler{}
tss.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler.Schedule{
{
ID: 550,
CRON: "0 0 0 * * *",
},
}, nil)
// The missing schedule in database.
tte := &tt.ExecutionManager{}
tte.On("List", mock.Anything, &q.Query{
Keywords: map[string]interface{}{
"vendor_type": scheduler.JobNameScheduler,
"vendor_id": (int64)(550),
},
}).Return([]*task.Execution{
{
ID: 1550,
},
}, nil)
ttm := &tt.Manager{}
ttm.On("List", mock.Anything, &q.Query{
Keywords: map[string]interface{}{
"execution_id": (int64)(1550),
},
}).Return([]*task.Task{
{
ID: 2550,
ExecutionID: 1550,
JobID: "f754ccdd123664b2acb971d9",
},
}, nil)
pms := &period.MockScheduler{}
pms.On("Schedule", &period.Policy{
ID: "f754ccdd123664b2acb971d9",
JobName: scheduler.JobNameScheduler,
CronSpec: "0 0 0 * * *",
WebHookURL: "http://core:8080/service/notifications/tasks/2550",
}).Return((int64)(1630667500), nil)
pms.On("UnSchedule", "8ff2aabb977077b84b4d5f1b").Return(nil)
mmm := &mgt.MockManager{}
mmm.On("SaveJob", mock.Anything).Return(nil)
suite.worker = New(3).
WithContext(&env.Context{
SystemContext: sysContext,
WG: &sync.WaitGroup{},
ErrorChan: make(chan error, 1),
}).UseCoreScheduler(tss).
UseCoreExecutionManager(tte).
UseCoreTaskManager(ttm).
UseScheduler(pms).
UseManager(mmm).
WithCoreInternalAddr("http://core:8080").
WithPolicyLoader(getPolicies)
}
// TestStart test Start().
func (suite *WorkerTestSuite) TestStart() {
err := suite.worker.Start()
suite.NoError(err, "start worker")
}
// TestRun test Run().
func (suite *WorkerTestSuite) TestRun() {
err := suite.worker.Run(context.TODO())
suite.NoError(err, "run worker")
}

View File

@ -82,9 +82,10 @@ type Task struct {
// the time that the task record created
CreationTime time.Time `json:"creation_time"`
// the time that the underlying job starts
StartTime time.Time `json:"start_time"`
UpdateTime time.Time `json:"update_time"`
EndTime time.Time `json:"end_time"`
StartTime time.Time `json:"start_time"`
UpdateTime time.Time `json:"update_time"`
EndTime time.Time `json:"end_time"`
StatusRevision int64 `json:"status_revision"`
}
// From constructs a task from DAO model
@ -100,6 +101,7 @@ func (t *Task) From(task *dao.Task) {
t.StartTime = task.StartTime
t.UpdateTime = task.UpdateTime
t.EndTime = task.EndTime
t.StatusRevision = task.StatusRevision
if len(task.ExtraAttrs) > 0 {
extras := map[string]interface{}{}
if err := json.Unmarshal([]byte(task.ExtraAttrs), &extras); err != nil {