diff --git a/src/jobservice/job/context.go b/src/jobservice/job/context.go index 3651e1817..1f699e1e0 100644 --- a/src/jobservice/job/context.go +++ b/src/jobservice/job/context.go @@ -63,10 +63,10 @@ type Context interface { // flag to indicate if have command OPCommand() (OPCommand, bool) - // Return the logger + // GetLogger returns the logger GetLogger() logger.Interface - // Get tracker + // Tracker of job. Tracker() Tracker } diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index 0fd844fe7..1a521de43 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -15,6 +15,8 @@ package job import ( + "encoding/json" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/lib/errors" ) @@ -78,6 +80,16 @@ type ACK struct { CheckInAt int64 `json:"check_in_at"` } +// JSON of ACK. +func (a *ACK) JSON() string { + str, err := json.Marshal(a) + if err != nil { + return "" + } + + return string(str) +} + // ActionRequest defines for triggering job action like stop/cancel. type ActionRequest struct { Action string `json:"action"` diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index 279bbf868..08f989be2 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -335,10 +335,20 @@ func (bt *basicTracker) Save() (err error) { } // Set update timestamp args = append(args, "update_time", time.Now().Unix()) - // Set the first revision - args = append(args, "revision", time.Now().Unix()) + // Set the first revision if it is not set. + rev := time.Now().Unix() + if stats.Info.Revision > 0 { + rev = stats.Info.Revision + } + args = append(args, "revision", rev) - // ACK data is saved/updated not via tracker, so ignore the ACK saving + // For restoring if ACK is not nil. + if stats.Info.HookAck != nil { + ack := stats.Info.HookAck.JSON() + if len(ack) > 0 { + args = append(args, "ack") + } + } // Do it in a transaction err = conn.Send("MULTI") diff --git a/src/jobservice/mgt/mock.go b/src/jobservice/mgt/mock.go new file mode 100644 index 000000000..a8b3f4551 --- /dev/null +++ b/src/jobservice/mgt/mock.go @@ -0,0 +1,17 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mgt + +//go:generate mockery --name Manager --output . --outpkg mgt --filename mock_manager.go --structname MockManager --inpackage diff --git a/src/jobservice/mgt/mock_manager.go b/src/jobservice/mgt/mock_manager.go new file mode 100644 index 000000000..73fb5e908 --- /dev/null +++ b/src/jobservice/mgt/mock_manager.go @@ -0,0 +1,142 @@ +// Code generated by mockery v2.1.0. DO NOT EDIT. + +package mgt + +import ( + job "github.com/goharbor/harbor/src/jobservice/job" + mock "github.com/stretchr/testify/mock" + + query "github.com/goharbor/harbor/src/jobservice/common/query" +) + +// MockManager is an autogenerated mock type for the Manager type +type MockManager struct { + mock.Mock +} + +// GetJob provides a mock function with given fields: jobID +func (_m *MockManager) GetJob(jobID string) (*job.Stats, error) { + ret := _m.Called(jobID) + + var r0 *job.Stats + if rf, ok := ret.Get(0).(func(string) *job.Stats); ok { + r0 = rf(jobID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.Stats) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(jobID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetJobs provides a mock function with given fields: q +func (_m *MockManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + ret := _m.Called(q) + + var r0 []*job.Stats + if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok { + r0 = rf(q) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*job.Stats) + } + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok { + r1 = rf(q) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok { + r2 = rf(q) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetPeriodicExecution provides a mock function with given fields: pID, q +func (_m *MockManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) { + ret := _m.Called(pID, q) + + var r0 []*job.Stats + if rf, ok := ret.Get(0).(func(string, *query.Parameter) []*job.Stats); ok { + r0 = rf(pID, q) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*job.Stats) + } + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(string, *query.Parameter) int64); ok { + r1 = rf(pID, q) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, *query.Parameter) error); ok { + r2 = rf(pID, q) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// GetScheduledJobs provides a mock function with given fields: q +func (_m *MockManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + ret := _m.Called(q) + + var r0 []*job.Stats + if rf, ok := ret.Get(0).(func(*query.Parameter) []*job.Stats); ok { + r0 = rf(q) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*job.Stats) + } + } + + var r1 int64 + if rf, ok := ret.Get(1).(func(*query.Parameter) int64); ok { + r1 = rf(q) + } else { + r1 = ret.Get(1).(int64) + } + + var r2 error + if rf, ok := ret.Get(2).(func(*query.Parameter) error); ok { + r2 = rf(q) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// SaveJob provides a mock function with given fields: _a0 +func (_m *MockManager) SaveJob(_a0 *job.Stats) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*job.Stats) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/jobservice/period/basic_scheduler.go b/src/jobservice/period/basic_scheduler.go index 2797780b2..324e6c123 100644 --- a/src/jobservice/period/basic_scheduler.go +++ b/src/jobservice/period/basic_scheduler.go @@ -16,6 +16,7 @@ package period import ( "context" + "math/rand" "time" "github.com/goharbor/harbor/src/jobservice/errs" @@ -86,7 +87,7 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) { return -1, err } - pid := time.Now().Unix() + pid := time.Now().Unix() + rand.Int63n(10) // Save to redis db if _, err := conn.Do("ZADD", rds.KeyPeriodicPolicy(bs.namespace), pid, rawJSON); err != nil { diff --git a/src/jobservice/period/mock.go b/src/jobservice/period/mock.go new file mode 100644 index 000000000..e2e540d22 --- /dev/null +++ b/src/jobservice/period/mock.go @@ -0,0 +1,17 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package period + +//go:generate mockery --name Scheduler --output . --outpkg period --filename mock_scheduler.go --structname MockScheduler --inpackage diff --git a/src/jobservice/period/mock_scheduler.go b/src/jobservice/period/mock_scheduler.go new file mode 100644 index 000000000..1477cca56 --- /dev/null +++ b/src/jobservice/period/mock_scheduler.go @@ -0,0 +1,50 @@ +// Code generated by mockery v2.1.0. DO NOT EDIT. + +package period + +import mock "github.com/stretchr/testify/mock" + +// MockScheduler is an autogenerated mock type for the Scheduler type +type MockScheduler struct { + mock.Mock +} + +// Schedule provides a mock function with given fields: policy +func (_m *MockScheduler) Schedule(policy *Policy) (int64, error) { + ret := _m.Called(policy) + + var r0 int64 + if rf, ok := ret.Get(0).(func(*Policy) int64); ok { + r0 = rf(policy) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(*Policy) error); ok { + r1 = rf(policy) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Start provides a mock function with given fields: +func (_m *MockScheduler) Start() { + _m.Called() +} + +// UnSchedule provides a mock function with given fields: policyID +func (_m *MockScheduler) UnSchedule(policyID string) error { + ret := _m.Called(policyID) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(policyID) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 760ae4f84..c6573c6fc 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "os/signal" + "strings" "sync" "syscall" "time" @@ -40,6 +41,8 @@ import ( "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/mgt" "github.com/goharbor/harbor/src/jobservice/migration" + "github.com/goharbor/harbor/src/jobservice/period" + sync2 "github.com/goharbor/harbor/src/jobservice/sync" "github.com/goharbor/harbor/src/jobservice/worker" "github.com/goharbor/harbor/src/jobservice/worker/cworker" "github.com/goharbor/harbor/src/lib/errors" @@ -49,6 +52,7 @@ import ( "github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/scan" "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" "github.com/gomodule/redigo/redis" ) @@ -59,7 +63,9 @@ const ( ) // JobService ... -var JobService = &Bootstrap{} +var JobService = &Bootstrap{ + syncEnabled: true, +} // workerPoolID var workerPoolID string @@ -67,6 +73,7 @@ var workerPoolID string // Bootstrap is coordinating process to help load and start the other components to serve. type Bootstrap struct { jobContextInitializer job.ContextInitializer + syncEnabled bool } // SetJobContextInitializer set the job context initializer @@ -103,6 +110,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) var ( backendWorker worker.Interface manager mgt.Manager + syncWorker *sync2.Worker ) if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { // Number of workers @@ -175,6 +183,29 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) if err = lcmCtl.Serve(); err != nil { return errors.Errorf("start life cycle controller error: %s", err) } + + // Initialize sync worker + if bs.syncEnabled { + syncWorker = sync2.New(3). + WithContext(rootContext). + UseManager(manager). + UseScheduler(period.NewScheduler(rootContext.SystemContext, namespace, redisPool, lcmCtl)). + WithCoreInternalAddr(strings.TrimSuffix(config.GetCoreURL(), "/")). + UseCoreScheduler(scheduler.Sched). + UseCoreExecutionManager(task.ExecMgr). + UseCoreTaskManager(task.Mgr). + WithPolicyLoader(func() ([]*period.Policy, error) { + conn := redisPool.Get() + defer conn.Close() + + return period.Load(namespace, conn) + }) + // Start sync worker + // Not block the regular process. + if err := syncWorker.Start(); err != nil { + logger.Error(err) + } + } } else { return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend) } @@ -225,7 +256,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) sig <- os.Interrupt } - // Wait everyone exit + // Wait everyone exits. rootContext.WG.Wait() return diff --git a/src/jobservice/runtime/bootstrap_test.go b/src/jobservice/runtime/bootstrap_test.go index 257e58479..94f641e51 100644 --- a/src/jobservice/runtime/bootstrap_test.go +++ b/src/jobservice/runtime/bootstrap_test.go @@ -17,14 +17,16 @@ package runtime import ( "context" "fmt" + "testing" + "time" + + "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/tests" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "testing" - "time" ) // BootStrapTestSuite tests bootstrap @@ -38,6 +40,8 @@ type BootStrapTestSuite struct { // SetupSuite prepares test suite func (suite *BootStrapTestSuite) SetupSuite() { + dao.PrepareTestForPostgresSQL() + // Load configurations err := config.DefaultConfig.Load("../config_test.yml", true) require.NoError(suite.T(), err, "load configurations error: %s", err) @@ -51,7 +55,9 @@ func (suite *BootStrapTestSuite) SetupSuite() { err = logger.Init(suite.ctx) require.NoError(suite.T(), err, "init logger: nil error expected but got %s", err) - suite.jobService = &Bootstrap{} + suite.jobService = &Bootstrap{ + syncEnabled: false, + } suite.jobService.SetJobContextInitializer(nil) } diff --git a/src/jobservice/sync/schedule.go b/src/jobservice/sync/schedule.go new file mode 100644 index 000000000..f21dca6f6 --- /dev/null +++ b/src/jobservice/sync/schedule.go @@ -0,0 +1,368 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "fmt" + "time" + + o "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/jobservice/env" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/jobservice/mgt" + "github.com/goharbor/harbor/src/jobservice/period" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" +) + +// PolicyLoader is a func template to load schedule policies from js datastore. +type PolicyLoader func() ([]*period.Policy, error) + +// Worker is designed to sync the schedules in the database into jobservice datastore. +type Worker struct { + // context of Worker. + context *env.Context + // Indicate whether a new round should be run. + lastErr error + // How many rounds have been run. + round uint8 + // The max number of rounds for repeated runs. + maxRounds uint8 + // Periodical job scheduler. + scheduler period.Scheduler + // Job stats manager. + manager mgt.Manager + // Internal addr of core. + internalCoreAddr string + // Scheduler from core. + coreScheduler scheduler.Scheduler + // Execution manager from core. + coreExecutionManager task.ExecutionManager + // Task manager from core + coreTaskManager task.Manager + // Loader for loading polices from the js store. + policyLoader PolicyLoader +} + +// New sync worker. +func New(maxRounds uint8) *Worker { + return &Worker{ + maxRounds: maxRounds, + } +} + +// WithContext set context. +func (w *Worker) WithContext(ctx *env.Context) *Worker { + w.context = ctx + return w +} + +// UseScheduler refers the period.Scheduler. +func (w *Worker) UseScheduler(scheduler period.Scheduler) *Worker { + w.scheduler = scheduler + return w +} + +// WithCoreInternalAddr sets the internal addr of core. +func (w *Worker) WithCoreInternalAddr(addr string) *Worker { + w.internalCoreAddr = addr + return w +} + +// UseManager refers the mgt.Manager. +func (w *Worker) UseManager(mgr mgt.Manager) *Worker { + w.manager = mgr + return w +} + +// UseCoreScheduler refers the core scheduler. +func (w *Worker) UseCoreScheduler(scheduler scheduler.Scheduler) *Worker { + w.coreScheduler = scheduler + return w +} + +// UseCoreExecutionManager refers the core execution manager. +func (w *Worker) UseCoreExecutionManager(executionMgr task.ExecutionManager) *Worker { + w.coreExecutionManager = executionMgr + return w +} + +// UseCoreTaskManager refers the core task manager. +func (w *Worker) UseCoreTaskManager(taskManager task.Manager) *Worker { + w.coreTaskManager = taskManager + return w +} + +// WithPolicyLoader determines the policy loader func. +func (w *Worker) WithPolicyLoader(loader PolicyLoader) *Worker { + w.policyLoader = loader + return w +} + +// Start the loop in none-blocking way. +func (w *Worker) Start() error { + if err := w.validate(); err != nil { + return err + } + + w.context.WG.Add(1) + // Run + go func() { + defer func() { + w.context.WG.Done() + }() + + ctx := orm.NewContext(w.context.SystemContext, o.NewOrm()) + ctlChan := make(chan struct{}, 1) + ctlChan <- struct{}{} + + for { + select { + case <-ctlChan: + w.round++ + if w.round == w.maxRounds { + return + } + + if err := w.Run(ctx); err == nil { + return + } + + // Wait for a while and retry then. + time.AfterFunc(1*time.Minute, func() { + ctlChan <- struct{}{} + }) + case <-w.context.SystemContext.Done(): + logger.Info("Context cancel signal received:sync worker exit") + return + } + } + }() + + return nil +} + +// Run one round. +func (w *Worker) Run(ctx context.Context) error { + // Start sync schedules. + logger.Infof("Start to sync schedules in database to jobservice: round[%d].", w.round) + + // Get all the schedules from the database first. + // Use the default scheduler. + schedules, err := w.coreScheduler.ListSchedules(ctx, &q.Query{}) + if err != nil { + // We can not proceed. + // A non-nil error will cause a follow-up retry later. + return errors.Wrap(err, "list all the schedules in the database") + } + + // Exit earlier if no schedules found. + if len(schedules) == 0 { + // Log and gracefully exit. + logger.Info("No schedules found in the database.") + return nil + } + + // Get schedule records from the jobservice datastore. + polices, err := w.policyLoader() + if err != nil { + return errors.Wrap(err, "load schedule records from jobservice store") + } + + // Define a function to get the policy with the specified ID. + getPolicy := func(jobID string) *period.Policy { + for _, p := range polices { + if p.ID == jobID { + return p + } + } + + return nil + } + + jobHash := make(map[string]struct{}) + restoreCounter := 0 + clearCounter := 0 + + // Sync now. + for _, sch := range schedules { + // Get the corresponding task. + t, err := w.getTask(ctx, sch) + if err != nil { + // Log and skip + logger.Error(err) + w.lastErr = err + + continue + } + + // Recorded + jobHash[t.JobID] = struct{}{} + + // Get policy + p := getPolicy(t.JobID) + if p == nil { + // Need to restore this missing schedule. + if err := w.restore(sch.CRON, t); err != nil { + // Log and skip + logger.Error(err) + w.lastErr = err + } else { + restoreCounter++ + logger.Infof("Sync: restore missing schedule: taskID=%d, jobID=%s, cron=%s", t.ID, t.JobID, sch.CRON) + } + } + } + + // Clear the dirty ones. + for _, p := range polices { + _, ok := jobHash[p.ID] + if p.JobName == scheduler.JobNameScheduler && !ok { + if err := w.scheduler.UnSchedule(p.ID); err != nil { + logger.Error(err) + } else { + clearCounter++ + logger.Infof("Sync: unschedule dirty schedule: %s:%s", p.JobName, p.ID) + } + } + } + + logger.Infof("End sync schedules in database to jobservice: round[%d].", w.round) + logger.Infof("Found %d schedules, restore %d missing schedules, clear %d dirty schedules", len(schedules), restoreCounter, clearCounter) + + return w.lastErr +} + +func (w *Worker) restore(cron string, t *task.Task) error { + p := &period.Policy{ + ID: t.JobID, + JobName: scheduler.JobNameScheduler, + CronSpec: cron, + WebHookURL: fmt.Sprintf("%s/service/notifications/tasks/%d", w.internalCoreAddr, t.ID), + } + + // Schedule the policy. + numericID, err := w.scheduler.Schedule(p) + if err != nil { + return errors.Wrap(err, "schedule policy") + } + + res := &job.Stats{ + Info: &job.StatsInfo{ + JobID: p.ID, + JobName: p.JobName, + Status: job.ScheduledStatus.String(), + JobKind: job.KindPeriodic, + CronSpec: cron, + WebHookURL: p.WebHookURL, + NumericPID: numericID, + EnqueueTime: time.Now().Unix(), + UpdateTime: time.Now().Unix(), + RefLink: fmt.Sprintf("/api/v1/jobs/%s", p.ID), + Revision: t.StatusRevision, + }, + } + + // Keep status synced. + if res.Info.Revision > 0 { + res.Info.HookAck = &job.ACK{ + Revision: t.StatusRevision, + Status: job.ScheduledStatus.String(), + } + } + + // Save the stats. + return w.manager.SaveJob(res) +} + +// validate whether the worker is ready or not. +func (w *Worker) validate() error { + if w.context == nil { + return errors.New("missing context") + } + + if w.manager == nil { + return errors.New("missing stats manager") + } + + if w.scheduler == nil { + return errors.New("missing period scheduler") + } + + if len(w.internalCoreAddr) == 0 { + return errors.New("empty internal addr of core") + } + + if w.coreScheduler == nil { + return errors.New("missing core scheduler") + } + + if w.coreExecutionManager == nil { + return errors.New("missing core execution manager") + } + + if w.coreTaskManager == nil { + return errors.New("missing core task manager") + } + + if w.policyLoader == nil { + return errors.New("missing policy loader") + } + + return nil +} + +// getTask gets the task associated with the specified schedule. +// Here is an assumption that each schedule has only one execution as well as only one task under the execution. +func (w *Worker) getTask(ctx context.Context, schedule *scheduler.Schedule) (*task.Task, error) { + // Get associated execution first. + executions, err := w.coreExecutionManager.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "vendor_type": scheduler.JobNameScheduler, + "vendor_id": schedule.ID, + }, + }) + + if err != nil { + return nil, err + } + + if len(executions) == 0 { + return nil, errors.Errorf("no execution found for schedule: %s:%d", schedule.VendorType, schedule.VendorID) + } + + theOne := executions[0] + // Now get the execution. + tasks, err := w.coreTaskManager.List(ctx, &q.Query{ + Keywords: map[string]interface{}{ + "execution_id": theOne.ID, + }, + }) + + if err != nil { + return nil, err + } + + if len(tasks) == 0 { + return nil, errors.Errorf("no task found for execution: %s:%d", schedule.VendorType, theOne.ID) + } + + return tasks[0], nil +} diff --git a/src/jobservice/sync/schedule_test.go b/src/jobservice/sync/schedule_test.go new file mode 100644 index 000000000..f73714d71 --- /dev/null +++ b/src/jobservice/sync/schedule_test.go @@ -0,0 +1,138 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "context" + "sync" + "testing" + + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/jobservice/env" + "github.com/goharbor/harbor/src/jobservice/mgt" + "github.com/goharbor/harbor/src/jobservice/period" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/goharbor/harbor/src/testing/mock" + ts "github.com/goharbor/harbor/src/testing/pkg/scheduler" + tt "github.com/goharbor/harbor/src/testing/pkg/task" + "github.com/stretchr/testify/suite" +) + +// WorkerTestSuite is test suite for testing sync.Worker. +type WorkerTestSuite struct { + suite.Suite + + worker *Worker +} + +// TestWorker is the entry method of WorkerTestSuite. +func TestWorker(t *testing.T) { + suite.Run(t, &WorkerTestSuite{}) +} + +// SetupSuite sets up suite. +func (suite *WorkerTestSuite) SetupSuite() { + sysContext := context.TODO() + + dao.PrepareTestForPostgresSQL() + + getPolicies := func() ([]*period.Policy, error) { + return []*period.Policy{ + // Dirty data in js datastore. + { + ID: "8ff2aabb977077b84b4d5f1b", + JobName: scheduler.JobNameScheduler, + CronSpec: "0 0 0 * * 0", + WebHookURL: "http://core:8080/service/notifications/tasks/250", + NumericID: 1630667250, + }, + }, nil + } + + // Mock methods + // + tss := &ts.Scheduler{} + tss.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler.Schedule{ + { + ID: 550, + CRON: "0 0 0 * * *", + }, + }, nil) + + // The missing schedule in database. + tte := &tt.ExecutionManager{} + tte.On("List", mock.Anything, &q.Query{ + Keywords: map[string]interface{}{ + "vendor_type": scheduler.JobNameScheduler, + "vendor_id": (int64)(550), + }, + }).Return([]*task.Execution{ + { + ID: 1550, + }, + }, nil) + + ttm := &tt.Manager{} + ttm.On("List", mock.Anything, &q.Query{ + Keywords: map[string]interface{}{ + "execution_id": (int64)(1550), + }, + }).Return([]*task.Task{ + { + ID: 2550, + ExecutionID: 1550, + JobID: "f754ccdd123664b2acb971d9", + }, + }, nil) + + pms := &period.MockScheduler{} + pms.On("Schedule", &period.Policy{ + ID: "f754ccdd123664b2acb971d9", + JobName: scheduler.JobNameScheduler, + CronSpec: "0 0 0 * * *", + WebHookURL: "http://core:8080/service/notifications/tasks/2550", + }).Return((int64)(1630667500), nil) + pms.On("UnSchedule", "8ff2aabb977077b84b4d5f1b").Return(nil) + + mmm := &mgt.MockManager{} + mmm.On("SaveJob", mock.Anything).Return(nil) + + suite.worker = New(3). + WithContext(&env.Context{ + SystemContext: sysContext, + WG: &sync.WaitGroup{}, + ErrorChan: make(chan error, 1), + }).UseCoreScheduler(tss). + UseCoreExecutionManager(tte). + UseCoreTaskManager(ttm). + UseScheduler(pms). + UseManager(mmm). + WithCoreInternalAddr("http://core:8080"). + WithPolicyLoader(getPolicies) +} + +// TestStart test Start(). +func (suite *WorkerTestSuite) TestStart() { + err := suite.worker.Start() + suite.NoError(err, "start worker") +} + +// TestRun test Run(). +func (suite *WorkerTestSuite) TestRun() { + err := suite.worker.Run(context.TODO()) + suite.NoError(err, "run worker") +} diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go index 246cb3d99..fcaf8c74d 100644 --- a/src/pkg/task/model.go +++ b/src/pkg/task/model.go @@ -82,9 +82,10 @@ type Task struct { // the time that the task record created CreationTime time.Time `json:"creation_time"` // the time that the underlying job starts - StartTime time.Time `json:"start_time"` - UpdateTime time.Time `json:"update_time"` - EndTime time.Time `json:"end_time"` + StartTime time.Time `json:"start_time"` + UpdateTime time.Time `json:"update_time"` + EndTime time.Time `json:"end_time"` + StatusRevision int64 `json:"status_revision"` } // From constructs a task from DAO model @@ -100,6 +101,7 @@ func (t *Task) From(task *dao.Task) { t.StartTime = task.StartTime t.UpdateTime = task.UpdateTime t.EndTime = task.EndTime + t.StatusRevision = task.StatusRevision if len(task.ExtraAttrs) > 0 { extras := map[string]interface{}{} if err := json.Unmarshal([]byte(task.ExtraAttrs), &extras); err != nil {