mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 10:15:35 +01:00
Refactor the scheduler with the task manager mechanism
Refactor the scheduler with the task manager mechanism, this will reduce the duplicate code Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
6f4e8150d5
commit
4dc4b6728c
@ -63,3 +63,50 @@ CREATE TABLE IF NOT EXISTS p2p_preheat_policy (
|
|||||||
creation_time timestamp,
|
creation_time timestamp,
|
||||||
update_time timestamp
|
update_time timestamp
|
||||||
);
|
);
|
||||||
|
|
||||||
|
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS cron varchar(64);
|
||||||
|
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS execution_id int;
|
||||||
|
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_name varchar(128);
|
||||||
|
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_param text;
|
||||||
|
|
||||||
|
/*abstract the cron, callback function parameters from table retention_policy*/
|
||||||
|
UPDATE schedule
|
||||||
|
SET cron = retention.cron, callback_func_name = 'RetentionCallback',
|
||||||
|
callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}')
|
||||||
|
FROM (
|
||||||
|
SELECT id, data::json->'trigger'->'references'->>'job_id' AS schedule_id,
|
||||||
|
data::json->'trigger'->'settings'->>'cron' AS cron
|
||||||
|
FROM retention_policy
|
||||||
|
) AS retention
|
||||||
|
WHERE schedule.id=retention.schedule_id::int;
|
||||||
|
|
||||||
|
/*create new execution and task record for each schedule*/
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
sched RECORD;
|
||||||
|
exec_id integer;
|
||||||
|
status_code integer;
|
||||||
|
BEGIN
|
||||||
|
FOR sched IN SELECT * FROM schedule
|
||||||
|
LOOP
|
||||||
|
INSERT INTO execution (vendor_type, trigger) VALUES ('SCHEDULER', 'MANUAL') RETURNING id INTO exec_id;
|
||||||
|
IF sched.status = 'Pending' THEN
|
||||||
|
status_code = 0;
|
||||||
|
ELSIF sched.status = 'Scheduled' THEN
|
||||||
|
status_code = 1;
|
||||||
|
ELSIF sched.status = 'Running' THEN
|
||||||
|
status_code = 2;
|
||||||
|
ELSIF sched.status = 'Stopped' OR sched.status = 'Error' OR sched.status = 'Success' THEN
|
||||||
|
status_code = 3;
|
||||||
|
ELSE
|
||||||
|
status_code = 0;
|
||||||
|
END IF;
|
||||||
|
INSERT INTO task (execution_id, job_id, status, status_code, status_revision, run_count) VALUES (exec_id, sched.job_id, sched.status, status_code, 0, 0);
|
||||||
|
UPDATE schedule SET execution_id=exec_id WHERE id = sched.id;
|
||||||
|
END LOOP;
|
||||||
|
END $$;
|
||||||
|
|
||||||
|
ALTER TABLE schedule DROP COLUMN IF EXISTS job_id;
|
||||||
|
ALTER TABLE schedule DROP COLUMN IF EXISTS status;
|
||||||
|
|
||||||
|
ALTER TABLE schedule ADD CONSTRAINT schedule_execution FOREIGN KEY (execution_id) REFERENCES execution(id);
|
||||||
|
@ -184,13 +184,11 @@ func Init() error {
|
|||||||
// init project manager
|
// init project manager
|
||||||
initProjectManager()
|
initProjectManager()
|
||||||
|
|
||||||
initRetentionScheduler()
|
|
||||||
|
|
||||||
retentionMgr = retention.NewManager()
|
retentionMgr = retention.NewManager()
|
||||||
|
|
||||||
retentionLauncher = retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr)
|
retentionLauncher = retention.NewLauncher(projectMgr, repository.Mgr, retentionMgr)
|
||||||
|
|
||||||
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, retentionScheduler, retentionLauncher)
|
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
|
||||||
|
|
||||||
callbackFun := func(p interface{}) error {
|
callbackFun := func(p interface{}) error {
|
||||||
str, ok := p.(string)
|
str, ok := p.(string)
|
||||||
@ -204,7 +202,7 @@ func Init() error {
|
|||||||
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
|
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err := scheduler.Register(retention.SchedulerCallback, callbackFun)
|
err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, callbackFun)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -227,7 +225,3 @@ func initChartController() error {
|
|||||||
func initProjectManager() {
|
func initProjectManager() {
|
||||||
projectMgr = project.Mgr
|
projectMgr = project.Mgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func initRetentionScheduler() {
|
|
||||||
retentionScheduler = scheduler.GlobalScheduler
|
|
||||||
}
|
|
||||||
|
@ -45,7 +45,6 @@ import (
|
|||||||
_ "github.com/goharbor/harbor/src/pkg/notifier/topic"
|
_ "github.com/goharbor/harbor/src/pkg/notifier/topic"
|
||||||
"github.com/goharbor/harbor/src/pkg/scan"
|
"github.com/goharbor/harbor/src/pkg/scan"
|
||||||
"github.com/goharbor/harbor/src/pkg/scan/dao/scanner"
|
"github.com/goharbor/harbor/src/pkg/scan/dao/scanner"
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/version"
|
"github.com/goharbor/harbor/src/pkg/version"
|
||||||
"github.com/goharbor/harbor/src/replication"
|
"github.com/goharbor/harbor/src/replication"
|
||||||
"github.com/goharbor/harbor/src/server"
|
"github.com/goharbor/harbor/src/server"
|
||||||
@ -125,9 +124,6 @@ func main() {
|
|||||||
log.Fatalf("failed to load config: %v", err)
|
log.Fatalf("failed to load config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// init the scheduler
|
|
||||||
scheduler.Init()
|
|
||||||
|
|
||||||
password, err := config.InitialAdminPassword()
|
password, err := config.InitialAdminPassword()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get admin's initial password: %v", err)
|
log.Fatalf("failed to get admin's initial password: %v", err)
|
||||||
|
@ -16,13 +16,11 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"github.com/goharbor/harbor/src/core/service/notifications"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/job/models"
|
"github.com/goharbor/harbor/src/core/service/notifications"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/hook"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Handler handles the scheduler requests
|
// Handler handles the scheduler requests
|
||||||
@ -34,46 +32,20 @@ type Handler struct {
|
|||||||
func (h *Handler) Handle() {
|
func (h *Handler) Handle() {
|
||||||
log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id"))
|
log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id"))
|
||||||
|
|
||||||
var data models.JobStatusChange
|
var data job.StatusChange
|
||||||
if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil {
|
if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil {
|
||||||
log.Errorf("failed to decode hook event: %v", err)
|
log.Errorf("failed to decode hook event: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// status update
|
|
||||||
if len(data.CheckIn) == 0 {
|
schedulerID, err := h.GetInt64FromPath(":id")
|
||||||
schedulerID, err := h.GetInt64FromPath(":id")
|
if err != nil {
|
||||||
if err != nil {
|
log.Errorf("failed to get the schedule ID: %v", err)
|
||||||
log.Errorf("failed to get the schedule ID: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := hook.GlobalController.UpdateStatus(schedulerID, data.Status); err != nil {
|
|
||||||
h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Debugf("handle status update hook event for schedule %s completed", h.GetStringFromPath(":id"))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// run callback function
|
if err = scheduler.HandleLegacyHook(h.Ctx.Request.Context(), schedulerID, &data); err != nil {
|
||||||
// just log the error message when handling check in request if got any error
|
log.Errorf("failed to handle the legacy hook: %v", err)
|
||||||
params := map[string]interface{}{}
|
|
||||||
if err := json.Unmarshal([]byte(data.CheckIn), ¶ms); err != nil {
|
|
||||||
log.Errorf("failed to unmarshal parameters from check in message: %v", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
callbackFuncNameParam, exist := params[scheduler.JobParamCallbackFunc]
|
|
||||||
if !exist {
|
|
||||||
log.Error("cannot get the parameter \"callback_func_name\" from the check in message")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
callbackFuncName, ok := callbackFuncNameParam.(string)
|
|
||||||
if !ok || len(callbackFuncName) == 0 {
|
|
||||||
log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := hook.GlobalController.Run(callbackFuncName, params[scheduler.JobParamCallbackFuncParams]); err != nil {
|
|
||||||
log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Debugf("callback function %s called for schedule %s", callbackFuncName, h.GetStringFromPath(":id"))
|
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ func WrapConflictError(err error, format string, args ...interface{}) error {
|
|||||||
// as a src/internal/error.Error with not found error code, else return nil
|
// as a src/internal/error.Error with not found error code, else return nil
|
||||||
func AsNotFoundError(err error, messageFormat string, args ...interface{}) *errors.Error {
|
func AsNotFoundError(err error, messageFormat string, args ...interface{}) *errors.Error {
|
||||||
if errors.Is(err, orm.ErrNoRows) {
|
if errors.Is(err, orm.ErrNoRows) {
|
||||||
e := errors.NotFoundError(err)
|
e := errors.NotFoundError(nil)
|
||||||
if len(messageFormat) > 0 {
|
if len(messageFormat) > 0 {
|
||||||
e.WithMessage(messageFormat, args...)
|
e.WithMessage(messageFormat, args...)
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package retention
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/pkg/project"
|
"github.com/goharbor/harbor/src/pkg/project"
|
||||||
@ -65,7 +66,7 @@ type DefaultAPIController struct {
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
// SchedulerCallback ...
|
// SchedulerCallback ...
|
||||||
SchedulerCallback = "SchedulerCallback"
|
SchedulerCallback = "RetentionCallback"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TriggerParam ...
|
// TriggerParam ...
|
||||||
@ -84,7 +85,7 @@ func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) (int64, error
|
|||||||
if p.Trigger.Kind == policy.TriggerKindSchedule {
|
if p.Trigger.Kind == policy.TriggerKindSchedule {
|
||||||
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
|
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
|
||||||
if ok && len(cron.(string)) > 0 {
|
if ok && len(cron.(string)) > 0 {
|
||||||
jobid, err := r.scheduler.Schedule(cron.(string), SchedulerCallback, TriggerParam{
|
jobid, err := r.scheduler.Schedule(orm.Context(), cron.(string), SchedulerCallback, TriggerParam{
|
||||||
PolicyID: p.ID,
|
PolicyID: p.ID,
|
||||||
Trigger: ExecutionTriggerSchedule,
|
Trigger: ExecutionTriggerSchedule,
|
||||||
})
|
})
|
||||||
@ -142,13 +143,13 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if needUn {
|
if needUn {
|
||||||
err = r.scheduler.UnSchedule(p0.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
err = r.scheduler.UnSchedule(orm.Context(), p0.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if needSch {
|
if needSch {
|
||||||
jobid, err := r.scheduler.Schedule(p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
|
jobid, err := r.scheduler.Schedule(orm.Context(), p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
|
||||||
PolicyID: p.ID,
|
PolicyID: p.ID,
|
||||||
Trigger: ExecutionTriggerSchedule,
|
Trigger: ExecutionTriggerSchedule,
|
||||||
})
|
})
|
||||||
@ -168,7 +169,7 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 {
|
if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 {
|
||||||
err = r.scheduler.UnSchedule(p.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
err = r.scheduler.UnSchedule(orm.Context(), p.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
package retention
|
package retention
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||||
"github.com/goharbor/harbor/src/testing/pkg/repository"
|
"github.com/goharbor/harbor/src/testing/pkg/repository"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"strings"
|
"strings"
|
||||||
@ -201,14 +203,18 @@ func (s *ControllerTestSuite) TestExecution() {
|
|||||||
type fakeRetentionScheduler struct {
|
type fakeRetentionScheduler struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeRetentionScheduler) Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) {
|
func (f *fakeRetentionScheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||||
return 111, nil
|
return 111, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeRetentionScheduler) UnSchedule(id int64) error {
|
func (f *fakeRetentionScheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeRetentionScheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type fakeLauncher struct {
|
type fakeLauncher struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
89
src/pkg/scheduler/callback.go
Normal file
89
src/pkg/scheduler/callback.go
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
registry = make(map[string]CallbackFunc)
|
||||||
|
)
|
||||||
|
|
||||||
|
// CallbackFunc defines the function that the scheduler calls when triggered
|
||||||
|
type CallbackFunc func(interface{}) error
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if err := task.RegisterCheckInProcessor(JobNameScheduler, triggerCallback); err != nil {
|
||||||
|
log.Errorf("failed to register check in processor for scheduler: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterCallbackFunc registers the callback function which will be called when the scheduler is triggered
|
||||||
|
func RegisterCallbackFunc(name string, callbackFunc CallbackFunc) error {
|
||||||
|
if len(name) == 0 {
|
||||||
|
return errors.New("empty name")
|
||||||
|
}
|
||||||
|
if callbackFunc == nil {
|
||||||
|
return errors.New("callback function is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, exist := registry[name]
|
||||||
|
if exist {
|
||||||
|
return fmt.Errorf("callback function %s already exists", name)
|
||||||
|
}
|
||||||
|
registry[name] = callbackFunc
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCallbackFunc(name string) (CallbackFunc, error) {
|
||||||
|
f, exist := registry[name]
|
||||||
|
if !exist {
|
||||||
|
return nil, fmt.Errorf("callback function %s not found", name)
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackFuncExist(name string) bool {
|
||||||
|
_, exist := registry[name]
|
||||||
|
return exist
|
||||||
|
}
|
||||||
|
|
||||||
|
func triggerCallback(ctx context.Context, task *task.Task, change *job.StatusChange) (err error) {
|
||||||
|
schedules, err := Sched.(*scheduler).dao.List(ctx, &q.Query{
|
||||||
|
Keywords: map[string]interface{}{
|
||||||
|
"ExecutionID": task.ExecutionID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(schedules) == 0 {
|
||||||
|
return fmt.Errorf("the schedule whose execution ID is %d not found", task.ExecutionID)
|
||||||
|
}
|
||||||
|
callbackFunc, err := getCallbackFunc(schedules[0].CallbackFuncName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return callbackFunc(schedules[0].CallbackFuncParam)
|
||||||
|
}
|
73
src/pkg/scheduler/callback_test.go
Normal file
73
src/pkg/scheduler/callback_test.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
type callbackTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *callbackTestSuite) SetupTest() {
|
||||||
|
registry = map[string]CallbackFunc{}
|
||||||
|
err := RegisterCallbackFunc("callback", func(interface{}) error { return nil })
|
||||||
|
c.Require().Nil(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *callbackTestSuite) TestRegisterCallbackFunc() {
|
||||||
|
// empty name
|
||||||
|
err := RegisterCallbackFunc("", nil)
|
||||||
|
c.NotNil(err)
|
||||||
|
|
||||||
|
// nil callback function
|
||||||
|
err = RegisterCallbackFunc("test", nil)
|
||||||
|
c.NotNil(err)
|
||||||
|
|
||||||
|
// pass
|
||||||
|
err = RegisterCallbackFunc("test", func(interface{}) error { return nil })
|
||||||
|
c.Nil(err)
|
||||||
|
|
||||||
|
// duplicate name
|
||||||
|
err = RegisterCallbackFunc("test", func(interface{}) error { return nil })
|
||||||
|
c.NotNil(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *callbackTestSuite) TestGetCallbackFunc() {
|
||||||
|
// not exist
|
||||||
|
_, err := getCallbackFunc("not-exist")
|
||||||
|
c.NotNil(err)
|
||||||
|
|
||||||
|
// pass
|
||||||
|
f, err := getCallbackFunc("callback")
|
||||||
|
c.Require().Nil(err)
|
||||||
|
c.NotNil(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *callbackTestSuite) TestCallbackFuncExist() {
|
||||||
|
// not exist
|
||||||
|
c.False(callbackFuncExist("not-exist"))
|
||||||
|
|
||||||
|
// exist
|
||||||
|
c.True(callbackFuncExist("callback"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCallbackTestSuite(t *testing.T) {
|
||||||
|
s := &callbackTestSuite{}
|
||||||
|
suite.Run(t, s)
|
||||||
|
}
|
128
src/pkg/scheduler/dao.go
Normal file
128
src/pkg/scheduler/dao.go
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
beegoorm "github.com/astaxie/beego/orm"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
beegoorm.RegisterModel(&schedule{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type schedule struct {
|
||||||
|
ID int64 `orm:"pk;auto;column(id)"`
|
||||||
|
CRON string `orm:"column(cron)"`
|
||||||
|
ExecutionID int64 `orm:"column(execution_id)"`
|
||||||
|
CallbackFuncName string `orm:"column(callback_func_name)"`
|
||||||
|
CallbackFuncParam string `orm:"column(callback_func_param)"`
|
||||||
|
CreationTime time.Time `orm:"column(creation_time)"`
|
||||||
|
UpdateTime time.Time `orm:"column(update_time)"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DAO is the data access object interface for schedule
|
||||||
|
type DAO interface {
|
||||||
|
Create(ctx context.Context, schedule *schedule) (id int64, err error)
|
||||||
|
List(ctx context.Context, query *q.Query) (schedules []*schedule, err error)
|
||||||
|
Get(ctx context.Context, id int64) (schedule *schedule, err error)
|
||||||
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
|
Update(ctx context.Context, schedule *schedule, props ...string) (err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type dao struct{}
|
||||||
|
|
||||||
|
func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) {
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
id, err := ormer.Insert(schedule)
|
||||||
|
if err != nil {
|
||||||
|
if e := orm.AsForeignKeyError(err,
|
||||||
|
"the schedule tries to reference a non existing execution %d", schedule.ExecutionID); e != nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dao) List(ctx context.Context, query *q.Query) ([]*schedule, error) {
|
||||||
|
qs, err := orm.QuerySetter(ctx, &schedule{}, query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
schedules := []*schedule{}
|
||||||
|
if _, err = qs.All(&schedules); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return schedules, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dao) Get(ctx context.Context, id int64) (*schedule, error) {
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
schedule := &schedule{
|
||||||
|
ID: id,
|
||||||
|
}
|
||||||
|
if err = ormer.Read(schedule); err != nil {
|
||||||
|
if e := orm.AsNotFoundError(err, "schedule %d not found", id); e != nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return schedule, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dao) Delete(ctx context.Context, id int64) error {
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err := ormer.Delete(&schedule{
|
||||||
|
ID: id,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
return errors.NotFoundError(nil).WithMessage("schedule %d not found", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) error {
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
n, err := ormer.Update(schedule, props...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
return errors.NotFoundError(nil).WithMessage("schedule %d not found", schedule.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,99 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package dao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/astaxie/beego/orm"
|
|
||||||
"github.com/goharbor/harbor/src/common/dao"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ScheduleDao defines the method that a schedule data access model should implement
|
|
||||||
type ScheduleDao interface {
|
|
||||||
Create(*model.Schedule) (int64, error)
|
|
||||||
Update(*model.Schedule, ...string) error
|
|
||||||
Delete(int64) error
|
|
||||||
Get(int64) (*model.Schedule, error)
|
|
||||||
List(...*model.ScheduleQuery) ([]*model.Schedule, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns an instance of the default schedule data access model implementation
|
|
||||||
func New() ScheduleDao {
|
|
||||||
return &scheduleDao{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type scheduleDao struct{}
|
|
||||||
|
|
||||||
func (s *scheduleDao) Create(schedule *model.Schedule) (int64, error) {
|
|
||||||
if schedule == nil {
|
|
||||||
return 0, errors.New("nil schedule")
|
|
||||||
}
|
|
||||||
now := time.Now()
|
|
||||||
schedule.CreationTime = &now
|
|
||||||
schedule.UpdateTime = &now
|
|
||||||
return dao.GetOrmer().Insert(schedule)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleDao) Update(schedule *model.Schedule, cols ...string) error {
|
|
||||||
if schedule == nil {
|
|
||||||
return errors.New("nil schedule")
|
|
||||||
}
|
|
||||||
if schedule.ID <= 0 {
|
|
||||||
return fmt.Errorf("invalid ID: %d", schedule.ID)
|
|
||||||
}
|
|
||||||
now := time.Now()
|
|
||||||
schedule.UpdateTime = &now
|
|
||||||
_, err := dao.GetOrmer().Update(schedule, cols...)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleDao) Delete(id int64) error {
|
|
||||||
_, err := dao.GetOrmer().Delete(&model.Schedule{
|
|
||||||
ID: id,
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleDao) Get(id int64) (*model.Schedule, error) {
|
|
||||||
schedule := &model.Schedule{
|
|
||||||
ID: id,
|
|
||||||
}
|
|
||||||
if err := dao.GetOrmer().Read(schedule); err != nil {
|
|
||||||
if err == orm.ErrNoRows {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return schedule, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleDao) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) {
|
|
||||||
qs := dao.GetOrmer().QueryTable(&model.Schedule{})
|
|
||||||
if len(query) > 0 && query[0] != nil {
|
|
||||||
if len(query[0].JobID) > 0 {
|
|
||||||
qs = qs.Filter("JobID", query[0].JobID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
schedules := []*model.Schedule{}
|
|
||||||
_, err := qs.All(&schedules)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return schedules, nil
|
|
||||||
}
|
|
@ -1,122 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package dao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/dao"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
)
|
|
||||||
|
|
||||||
var schDao = &scheduleDao{}
|
|
||||||
|
|
||||||
type scheduleTestSuite struct {
|
|
||||||
suite.Suite
|
|
||||||
scheduleID int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) SetupSuite() {
|
|
||||||
dao.PrepareTestForPostgresSQL()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) SetupTest() {
|
|
||||||
t := s.T()
|
|
||||||
id, err := schDao.Create(&model.Schedule{
|
|
||||||
JobID: "1",
|
|
||||||
Status: "pending",
|
|
||||||
})
|
|
||||||
require.Nil(t, err)
|
|
||||||
s.scheduleID = id
|
|
||||||
}
|
|
||||||
func (s *scheduleTestSuite) TearDownTest() {
|
|
||||||
// clear
|
|
||||||
dao.GetOrmer().Raw("delete from schedule").Exec()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) TestCreate() {
|
|
||||||
t := s.T()
|
|
||||||
// nil schedule
|
|
||||||
_, err := schDao.Create(nil)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
_, err = schDao.Create(&model.Schedule{
|
|
||||||
JobID: "1",
|
|
||||||
})
|
|
||||||
require.Nil(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) TestUpdate() {
|
|
||||||
t := s.T()
|
|
||||||
// nil schedule
|
|
||||||
err := schDao.Update(nil)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// invalid ID
|
|
||||||
err = schDao.Update(&model.Schedule{})
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
err = schDao.Update(&model.Schedule{
|
|
||||||
ID: s.scheduleID,
|
|
||||||
Status: "running",
|
|
||||||
})
|
|
||||||
require.Nil(t, err)
|
|
||||||
schedule, err := schDao.Get(s.scheduleID)
|
|
||||||
require.Nil(t, err)
|
|
||||||
assert.Equal(t, "running", schedule.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) TestDelete() {
|
|
||||||
t := s.T()
|
|
||||||
err := schDao.Delete(s.scheduleID)
|
|
||||||
require.Nil(t, err)
|
|
||||||
schedule, err := schDao.Get(s.scheduleID)
|
|
||||||
require.Nil(t, err)
|
|
||||||
assert.Nil(t, schedule)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) TestGet() {
|
|
||||||
t := s.T()
|
|
||||||
schedule, err := schDao.Get(s.scheduleID)
|
|
||||||
require.Nil(t, err)
|
|
||||||
assert.Equal(t, "pending", schedule.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduleTestSuite) TestList() {
|
|
||||||
t := s.T()
|
|
||||||
// nil query
|
|
||||||
schedules, err := schDao.List()
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Equal(t, 1, len(schedules))
|
|
||||||
assert.Equal(t, s.scheduleID, schedules[0].ID)
|
|
||||||
|
|
||||||
// query by job ID
|
|
||||||
schedules, err = schDao.List(&model.ScheduleQuery{
|
|
||||||
JobID: "1",
|
|
||||||
})
|
|
||||||
require.Nil(t, err)
|
|
||||||
require.Equal(t, 1, len(schedules))
|
|
||||||
assert.Equal(t, s.scheduleID, schedules[0].ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestScheduleDao(t *testing.T) {
|
|
||||||
suite.Run(t, &scheduleTestSuite{})
|
|
||||||
}
|
|
128
src/pkg/scheduler/dao_test.go
Normal file
128
src/pkg/scheduler/dao_test.go
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
common_dao "github.com/goharbor/harbor/src/common/dao"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
type daoTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
dao DAO
|
||||||
|
execMgr task.ExecutionManager
|
||||||
|
ctx context.Context
|
||||||
|
id int64
|
||||||
|
execID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) SetupSuite() {
|
||||||
|
d.dao = &dao{}
|
||||||
|
d.execMgr = task.NewExecutionManager()
|
||||||
|
common_dao.PrepareTestForPostgresSQL()
|
||||||
|
d.ctx = orm.Context()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) SetupTest() {
|
||||||
|
execID, err := d.execMgr.Create(d.ctx, "vendor", 0, "trigger")
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.execID = execID
|
||||||
|
schedule := &schedule{
|
||||||
|
CRON: "0 * * * * *",
|
||||||
|
ExecutionID: execID,
|
||||||
|
CallbackFuncName: "callback_func_01",
|
||||||
|
CallbackFuncParam: "callback_func_params",
|
||||||
|
}
|
||||||
|
id, err := d.dao.Create(d.ctx, schedule)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.id = id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TearDownTest() {
|
||||||
|
d.Require().Nil(d.dao.Delete(d.ctx, d.id))
|
||||||
|
d.Require().Nil(d.execMgr.Delete(d.ctx, d.execID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestCreate() {
|
||||||
|
// the happy pass is covered in SetupTest
|
||||||
|
|
||||||
|
// foreign key error
|
||||||
|
_, err := d.dao.Create(d.ctx, &schedule{
|
||||||
|
CRON: "0 * * * * *",
|
||||||
|
ExecutionID: 10000,
|
||||||
|
CallbackFuncName: "callback_func",
|
||||||
|
})
|
||||||
|
d.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestList() {
|
||||||
|
schedules, err := d.dao.List(d.ctx, &q.Query{
|
||||||
|
Keywords: map[string]interface{}{
|
||||||
|
"CallbackFuncName": "callback_func_01",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Require().Len(schedules, 1)
|
||||||
|
d.Equal(d.id, schedules[0].ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestGet() {
|
||||||
|
// not found
|
||||||
|
schedule, err := d.dao.Get(d.ctx, 10000)
|
||||||
|
d.True(errors.IsNotFoundErr(err))
|
||||||
|
|
||||||
|
// pass
|
||||||
|
schedule, err = d.dao.Get(d.ctx, d.id)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Equal(d.id, schedule.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestDelete() {
|
||||||
|
// the happy pass is covered in TearDownTest
|
||||||
|
|
||||||
|
// not found
|
||||||
|
err := d.dao.Delete(d.ctx, 10000)
|
||||||
|
d.True(errors.IsNotFoundErr(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoTestSuite) TestUpdate() {
|
||||||
|
// not found
|
||||||
|
err := d.dao.Update(d.ctx, &schedule{
|
||||||
|
ID: 10000,
|
||||||
|
})
|
||||||
|
d.True(errors.IsNotFoundErr(err))
|
||||||
|
|
||||||
|
// pass
|
||||||
|
err = d.dao.Update(d.ctx, &schedule{
|
||||||
|
ID: d.id,
|
||||||
|
CRON: "* */2 * * * *",
|
||||||
|
}, "CRON")
|
||||||
|
d.Require().Nil(err)
|
||||||
|
|
||||||
|
schedule, err := d.dao.Get(d.ctx, d.id)
|
||||||
|
d.Require().Nil(err)
|
||||||
|
d.Equal("* */2 * * * *", schedule.CRON)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDaoTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &daoTestSuite{})
|
||||||
|
}
|
@ -1,59 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package hook
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GlobalController is an instance of the default controller that can be used globally
|
|
||||||
var GlobalController = NewController()
|
|
||||||
|
|
||||||
// Controller updates the scheduler job status or runs the callback function
|
|
||||||
type Controller interface {
|
|
||||||
UpdateStatus(scheduleID int64, status string) error
|
|
||||||
Run(callbackFuncName string, params interface{}) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewController returns an instance of the default controller
|
|
||||||
func NewController() Controller {
|
|
||||||
return &controller{
|
|
||||||
manager: scheduler.GlobalManager,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type controller struct {
|
|
||||||
manager scheduler.Manager
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) UpdateStatus(scheduleID int64, status string) error {
|
|
||||||
now := time.Now()
|
|
||||||
return c.manager.Update(&model.Schedule{
|
|
||||||
ID: scheduleID,
|
|
||||||
Status: status,
|
|
||||||
UpdateTime: &now,
|
|
||||||
}, "Status", "UpdateTime")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) Run(callbackFuncName string, params interface{}) error {
|
|
||||||
f, err := scheduler.GetCallbackFunc(callbackFuncName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return f(params)
|
|
||||||
}
|
|
@ -1,55 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package hook
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
schedulertesting "github.com/goharbor/harbor/src/testing/pkg/scheduler"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
var h = &controller{
|
|
||||||
manager: &schedulertesting.FakeManager{},
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUpdateStatus(t *testing.T) {
|
|
||||||
// task not exist
|
|
||||||
err := h.UpdateStatus(1, "running")
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
h.manager.(*schedulertesting.FakeManager).Schedules = []*model.Schedule{
|
|
||||||
{
|
|
||||||
ID: 1,
|
|
||||||
Status: "",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
err = h.UpdateStatus(1, "running")
|
|
||||||
require.Nil(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRun(t *testing.T) {
|
|
||||||
// callback function not exist
|
|
||||||
err := h.Run("not-exist", nil)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
err = scheduler.Register("callback", func(interface{}) error { return nil })
|
|
||||||
require.Nil(t, err)
|
|
||||||
err = h.Run("callback", nil)
|
|
||||||
require.Nil(t, err)
|
|
||||||
}
|
|
@ -1,66 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/dao"
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// GlobalManager is an instance of the default manager that
|
|
||||||
// can be used globally
|
|
||||||
GlobalManager = NewManager()
|
|
||||||
)
|
|
||||||
|
|
||||||
// Manager manages the schedule of the scheduler
|
|
||||||
type Manager interface {
|
|
||||||
Create(*model.Schedule) (int64, error)
|
|
||||||
Update(*model.Schedule, ...string) error
|
|
||||||
Delete(int64) error
|
|
||||||
Get(int64) (*model.Schedule, error)
|
|
||||||
List(...*model.ScheduleQuery) ([]*model.Schedule, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewManager returns an instance of the default manager
|
|
||||||
func NewManager() Manager {
|
|
||||||
return &manager{
|
|
||||||
scheduleDao: dao.New(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type manager struct {
|
|
||||||
scheduleDao dao.ScheduleDao
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) Create(schedule *model.Schedule) (int64, error) {
|
|
||||||
return m.scheduleDao.Create(schedule)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) Update(schedule *model.Schedule, props ...string) error {
|
|
||||||
return m.scheduleDao.Update(schedule, props...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) Delete(id int64) error {
|
|
||||||
return m.scheduleDao.Delete(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) {
|
|
||||||
return m.scheduleDao.List(query...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) Get(id int64) (*model.Schedule, error) {
|
|
||||||
return m.scheduleDao.Get(id)
|
|
||||||
}
|
|
@ -1,110 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
)
|
|
||||||
|
|
||||||
var mgr *manager
|
|
||||||
|
|
||||||
type fakeScheduleDao struct {
|
|
||||||
schedules []*model.Schedule
|
|
||||||
mock.Mock
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeScheduleDao) Create(*model.Schedule) (int64, error) {
|
|
||||||
f.Called()
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
func (f *fakeScheduleDao) Update(*model.Schedule, ...string) error {
|
|
||||||
f.Called()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakeScheduleDao) Delete(int64) error {
|
|
||||||
f.Called()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (f *fakeScheduleDao) Get(int64) (*model.Schedule, error) {
|
|
||||||
f.Called()
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
func (f *fakeScheduleDao) List(query ...*model.ScheduleQuery) ([]*model.Schedule, error) {
|
|
||||||
f.Called()
|
|
||||||
if len(query) == 0 || query[0] == nil {
|
|
||||||
return f.schedules, nil
|
|
||||||
}
|
|
||||||
result := []*model.Schedule{}
|
|
||||||
for _, sch := range f.schedules {
|
|
||||||
if sch.JobID == query[0].JobID {
|
|
||||||
result = append(result, sch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type managerTestSuite struct {
|
|
||||||
suite.Suite
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) SetupTest() {
|
|
||||||
// recreate schedule manager
|
|
||||||
mgr = &manager{
|
|
||||||
scheduleDao: &fakeScheduleDao{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) TestCreate() {
|
|
||||||
t := m.T()
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).On("Create", mock.Anything)
|
|
||||||
mgr.Create(nil)
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Create")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) TestUpdate() {
|
|
||||||
t := m.T()
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).On("Update", mock.Anything)
|
|
||||||
mgr.Update(nil)
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Update")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) TestDelete() {
|
|
||||||
t := m.T()
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).On("Delete", mock.Anything)
|
|
||||||
mgr.Delete(1)
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Delete")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) TestGet() {
|
|
||||||
t := m.T()
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).On("Get", mock.Anything)
|
|
||||||
mgr.Get(1)
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "Get")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *managerTestSuite) TestList() {
|
|
||||||
t := m.T()
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).On("List", mock.Anything)
|
|
||||||
mgr.List(nil)
|
|
||||||
mgr.scheduleDao.(*fakeScheduleDao).AssertCalled(t, "List")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManager(t *testing.T) {
|
|
||||||
suite.Run(t, &managerTestSuite{})
|
|
||||||
}
|
|
17
src/pkg/scheduler/mock.go
Normal file
17
src/pkg/scheduler/mock.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
//go:generate mockery -name DAO -output . -outpkg scheduler -filename mock_dao_test.go -structname mockDAO -inpkg
|
117
src/pkg/scheduler/mock_dao_test.go
Normal file
117
src/pkg/scheduler/mock_dao_test.go
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
// Code generated by mockery v1.1.2. DO NOT EDIT.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
|
||||||
|
q "github.com/goharbor/harbor/src/lib/q"
|
||||||
|
mock "github.com/stretchr/testify/mock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mockDAO is an autogenerated mock type for the DAO type
|
||||||
|
type mockDAO struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create provides a mock function with given fields: ctx, schedule
|
||||||
|
func (_m *mockDAO) Create(ctx context.Context, schd *schedule) (int64, error) {
|
||||||
|
ret := _m.Called(ctx, schd)
|
||||||
|
|
||||||
|
var r0 int64
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *schedule) int64); ok {
|
||||||
|
r0 = rf(ctx, schd)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(int64)
|
||||||
|
}
|
||||||
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *schedule) error); ok {
|
||||||
|
r1 = rf(ctx, schd)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete provides a mock function with given fields: ctx, id
|
||||||
|
func (_m *mockDAO) Delete(ctx context.Context, id int64) error {
|
||||||
|
ret := _m.Called(ctx, id)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
||||||
|
r0 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get provides a mock function with given fields: ctx, id
|
||||||
|
func (_m *mockDAO) Get(ctx context.Context, id int64) (*schedule, error) {
|
||||||
|
ret := _m.Called(ctx, id)
|
||||||
|
|
||||||
|
var r0 *schedule
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64) *schedule); ok {
|
||||||
|
r0 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*schedule)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||||
|
r1 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// List provides a mock function with given fields: ctx, query
|
||||||
|
func (_m *mockDAO) List(ctx context.Context, query *q.Query) ([]*schedule, error) {
|
||||||
|
ret := _m.Called(ctx, query)
|
||||||
|
|
||||||
|
var r0 []*schedule
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*schedule); ok {
|
||||||
|
r0 = rf(ctx, query)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).([]*schedule)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok {
|
||||||
|
r1 = rf(ctx, query)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update provides a mock function with given fields: ctx, schedule, props
|
||||||
|
func (_m *mockDAO) Update(ctx context.Context, schd *schedule, props ...string) error {
|
||||||
|
_va := make([]interface{}, len(props))
|
||||||
|
for _i := range props {
|
||||||
|
_va[_i] = props[_i]
|
||||||
|
}
|
||||||
|
var _ca []interface{}
|
||||||
|
_ca = append(_ca, ctx, schd)
|
||||||
|
_ca = append(_ca, _va...)
|
||||||
|
ret := _m.Called(_ca...)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *schedule, ...string) error); ok {
|
||||||
|
r0 = rf(ctx, schd, props...)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
@ -1,40 +0,0 @@
|
|||||||
// Copyright Project Harbor Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package model
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/astaxie/beego/orm"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
orm.RegisterModel(
|
|
||||||
new(Schedule))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule is a record for a scheduler job
|
|
||||||
type Schedule struct {
|
|
||||||
ID int64 `orm:"pk;auto;column(id)" json:"id"`
|
|
||||||
JobID string `orm:"column(job_id)" json:"job_id"`
|
|
||||||
Status string `orm:"column(status)" json:"status"`
|
|
||||||
CreationTime *time.Time `orm:"column(creation_time)" json:"creation_time"`
|
|
||||||
UpdateTime *time.Time `orm:"column(update_time)" json:"update_time"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ScheduleQuery is query for schedule
|
|
||||||
type ScheduleQuery struct {
|
|
||||||
JobID string
|
|
||||||
}
|
|
@ -15,8 +15,6 @@
|
|||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,9 +49,5 @@ func (pj *PeriodicJob) Validate(params job.Parameters) error {
|
|||||||
|
|
||||||
// Run the job
|
// Run the job
|
||||||
func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error {
|
func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error {
|
||||||
data, err := json.Marshal(params)
|
return ctx.Checkin("checkin")
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return ctx.Checkin(string(data))
|
|
||||||
}
|
}
|
||||||
|
@ -15,194 +15,201 @@
|
|||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
chttp "github.com/goharbor/harbor/src/common/http"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/common/job"
|
|
||||||
"github.com/goharbor/harbor/src/common/job/models"
|
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
)
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
|
cronlib "github.com/robfig/cron"
|
||||||
// const definitions
|
|
||||||
const (
|
|
||||||
JobParamCallbackFunc = "callback_func"
|
|
||||||
JobParamCallbackFuncParams = "params"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// GlobalScheduler is an instance of the default scheduler that
|
// Sched is an instance of the default scheduler that can be used globally
|
||||||
// can be used globally. Call Init() to initialize it first
|
Sched = New()
|
||||||
GlobalScheduler Scheduler
|
|
||||||
registry = make(map[string]CallbackFunc)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CallbackFunc defines the function that the scheduler calls when triggered
|
// Schedule describes the detail information about the created schedule
|
||||||
type CallbackFunc func(interface{}) error
|
type Schedule struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
CRON string `json:"cron"`
|
||||||
|
Status string `json:"status"` // status of the underlying task(jobservice job)
|
||||||
|
CreationTime time.Time `json:"creation_time"`
|
||||||
|
UpdateTime time.Time `json:"update_time"`
|
||||||
|
// we can extend this model to include more information(e.g. how many times the schedule already
|
||||||
|
// runs; when will the schedule runs next time)
|
||||||
|
}
|
||||||
|
|
||||||
// Scheduler provides the capability to run a periodic task, a callback function
|
// Scheduler provides the capability to run a periodic task, a callback function
|
||||||
// needs to be registered before using the scheduler
|
// needs to be registered before using the scheduler
|
||||||
// The "params" is passed to the callback function specified by "callbackFuncName"
|
|
||||||
// as encoded json string, so the callback function must decode it before using
|
|
||||||
type Scheduler interface {
|
type Scheduler interface {
|
||||||
Schedule(cron string, callbackFuncName string, params interface{}) (int64, error)
|
// Schedule creates a task which calls the specified callback function periodically
|
||||||
UnSchedule(id int64) error
|
// The callback function needs to be registered first
|
||||||
}
|
// The "params" is passed to the callback function as encoded json string, so the callback
|
||||||
|
// function must decode it before using
|
||||||
// Register the callback function with name, and the function will be called
|
Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error)
|
||||||
// by the scheduler when the scheduler is triggered
|
// UnSchedule the created schedule instance
|
||||||
func Register(name string, callbackFunc CallbackFunc) error {
|
UnSchedule(ctx context.Context, id int64) error
|
||||||
if len(name) == 0 {
|
// GetSchedule gets the schedule specified by ID
|
||||||
return errors.New("empty name")
|
GetSchedule(ctx context.Context, id int64) (*Schedule, error)
|
||||||
}
|
|
||||||
if callbackFunc == nil {
|
|
||||||
return errors.New("callback function is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
_, exist := registry[name]
|
|
||||||
if exist {
|
|
||||||
return fmt.Errorf("callback function %s already exists", name)
|
|
||||||
}
|
|
||||||
registry[name] = callbackFunc
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCallbackFunc returns the registered callback function specified by the name
|
|
||||||
func GetCallbackFunc(name string) (CallbackFunc, error) {
|
|
||||||
f, exist := registry[name]
|
|
||||||
if !exist {
|
|
||||||
return nil, fmt.Errorf("callback function %s not found", name)
|
|
||||||
}
|
|
||||||
return f, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func callbackFuncExist(name string) bool {
|
|
||||||
_, exist := registry[name]
|
|
||||||
return exist
|
|
||||||
}
|
|
||||||
|
|
||||||
// Init the GlobalScheduler
|
|
||||||
func Init() {
|
|
||||||
GlobalScheduler = New(config.InternalCoreURL())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns an instance of the default scheduler
|
// New returns an instance of the default scheduler
|
||||||
func New(internalCoreURL string) Scheduler {
|
func New() Scheduler {
|
||||||
return &scheduler{
|
return &scheduler{
|
||||||
internalCoreURL: internalCoreURL,
|
dao: &dao{},
|
||||||
jobserviceClient: job.GlobalClient,
|
execMgr: task.ExecMgr,
|
||||||
manager: GlobalManager,
|
taskMgr: task.Mgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type scheduler struct {
|
type scheduler struct {
|
||||||
sync.RWMutex
|
dao DAO
|
||||||
internalCoreURL string
|
execMgr task.ExecutionManager
|
||||||
manager Manager
|
taskMgr task.Manager
|
||||||
jobserviceClient job.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) Schedule(cron string, callbackFuncName string, params interface{}) (int64, error) {
|
func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||||
|
if _, err := cronlib.Parse(cron); err != nil {
|
||||||
|
return 0, errors.New(nil).WithCode(errors.BadRequestCode).
|
||||||
|
WithMessage("invalid cron %s: %v", cron, err)
|
||||||
|
}
|
||||||
if !callbackFuncExist(callbackFuncName) {
|
if !callbackFuncExist(callbackFuncName) {
|
||||||
return 0, fmt.Errorf("callback function %s not found", callbackFuncName)
|
return 0, fmt.Errorf("callback function %s not found", callbackFuncName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create schedule record
|
execID, err := s.execMgr.Create(ctx, JobNameScheduler, 0, task.ExecutionTriggerManual)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
scheduleID, err := s.manager.Create(&model.Schedule{
|
sched := &schedule{
|
||||||
CreationTime: &now,
|
CRON: cron,
|
||||||
UpdateTime: &now,
|
ExecutionID: execID,
|
||||||
|
CallbackFuncName: callbackFuncName,
|
||||||
|
CreationTime: now,
|
||||||
|
UpdateTime: now,
|
||||||
|
}
|
||||||
|
if params != nil {
|
||||||
|
paramsData, err := json.Marshal(params)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
sched.CallbackFuncParam = string(paramsData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create schedule record
|
||||||
|
// when status/checkin hook comes, the database record must exist,
|
||||||
|
// so the database record must be created first before submitting job
|
||||||
|
id, err := s.dao.Create(ctx, sched)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
|
||||||
|
Name: JobNameScheduler,
|
||||||
|
Metadata: &job.Metadata{
|
||||||
|
JobKind: job.KindPeriodic,
|
||||||
|
Cron: cron,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
// if got error in the following steps, delete the schedule record in database
|
// when task manager creating a task, it creates the task database record first and
|
||||||
defer func() {
|
// then submits the job to jobservice. If the submitting failed, it doesn't return
|
||||||
if err != nil {
|
// any error. So we check the task status to make sure the job is submitted to jobservice
|
||||||
e := s.manager.Delete(scheduleID)
|
// successfully here
|
||||||
if e != nil {
|
task, err := s.taskMgr.Get(ctx, taskID)
|
||||||
log.Errorf("failed to delete the schedule %d: %v", scheduleID, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
log.Debugf("the schedule record %d created", scheduleID)
|
|
||||||
|
|
||||||
// submit scheduler job to Jobservice
|
|
||||||
statusHookURL := fmt.Sprintf("%s/service/notifications/schedules/%d", s.internalCoreURL, scheduleID)
|
|
||||||
jd := &models.JobData{
|
|
||||||
Name: JobNameScheduler,
|
|
||||||
Parameters: map[string]interface{}{
|
|
||||||
JobParamCallbackFunc: callbackFuncName,
|
|
||||||
},
|
|
||||||
Metadata: &models.JobMetadata{
|
|
||||||
JobKind: job.JobKindPeriodic,
|
|
||||||
Cron: cron,
|
|
||||||
},
|
|
||||||
StatusHook: statusHookURL,
|
|
||||||
}
|
|
||||||
if params != nil {
|
|
||||||
var paramsData []byte
|
|
||||||
paramsData, err = json.Marshal(params)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
jd.Parameters[JobParamCallbackFuncParams] = string(paramsData)
|
|
||||||
}
|
|
||||||
jobID, err := s.jobserviceClient.SubmitJob(jd)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
// if got error in the following steps, stop the scheduler job
|
if task.Status == job.ErrorStatus.String() {
|
||||||
defer func() {
|
return 0, fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
|
||||||
if err != nil {
|
|
||||||
if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil {
|
|
||||||
log.Errorf("failed to stop the scheduler job %s: %v", jobID, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID)
|
|
||||||
|
|
||||||
// populate the job ID for the schedule
|
|
||||||
err = s.manager.Update(&model.Schedule{
|
|
||||||
ID: scheduleID,
|
|
||||||
JobID: jobID,
|
|
||||||
}, "JobID")
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return scheduleID, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) UnSchedule(id int64) error {
|
func (s *scheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||||
schedule, err := s.manager.Get(id)
|
schedule, err := s.dao.Get(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.IsNotFoundErr(err) {
|
||||||
|
log.Warningf("trying to unschedule a non existing schedule %d, skip directly", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if schedule == nil {
|
if err = s.execMgr.Stop(ctx, schedule.ExecutionID); err != nil {
|
||||||
log.Warningf("the schedule record %d not found", id)
|
return err
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil {
|
|
||||||
herr, ok := err.(*chttp.Error)
|
// after the stop called, the execution cannot be stopped immediately,
|
||||||
// if the job specified by jobID is not found in Jobservice, just delete
|
// use the for loop to make sure the execution be in final status before deleting it
|
||||||
// the schedule record
|
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
|
||||||
if !ok || herr.Code != http.StatusNotFound {
|
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if job.Status(exec.Status).Final() {
|
||||||
|
// delete schedule record
|
||||||
|
if err = s.dao.Delete(ctx, id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// delete execution
|
||||||
|
return s.execMgr.Delete(ctx, schedule.ExecutionID)
|
||||||
|
}
|
||||||
|
time.Sleep(t)
|
||||||
}
|
}
|
||||||
log.Debugf("the stop action for job %s submitted to the Jobservice", schedule.JobID)
|
|
||||||
if err = s.manager.Delete(schedule.ID); err != nil {
|
return fmt.Errorf("failed to unschedule the schedule %d: the execution isn't in final status", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) {
|
||||||
|
schedule, err := s.dao.Get(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
schd := &Schedule{
|
||||||
|
ID: schedule.ID,
|
||||||
|
CRON: schedule.CRON,
|
||||||
|
CreationTime: schedule.CreationTime,
|
||||||
|
UpdateTime: schedule.UpdateTime,
|
||||||
|
}
|
||||||
|
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
schd.Status = exec.Status
|
||||||
|
return schd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleLegacyHook handles the legacy web hook for scheduler
|
||||||
|
// We rewrite the implementation of scheduler with task manager mechanism in v2.1,
|
||||||
|
// this method is used to handle the job status hook for the legacy implementation
|
||||||
|
// We can remove the method and the hook endpoint after several releases
|
||||||
|
func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChange) error {
|
||||||
|
scheduler := Sched.(*scheduler)
|
||||||
|
schedule, err := scheduler.dao.Get(ctx, scheduleID)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debugf("the schedule record %d deleted", schedule.ID)
|
tasks, err := scheduler.taskMgr.List(ctx, &q.Query{
|
||||||
|
Keywords: map[string]interface{}{
|
||||||
return nil
|
"ExecutionID": schedule.ExecutionID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return errors.New(nil).WithCode(errors.NotFoundCode).
|
||||||
|
WithMessage("no task references the execution %d", schedule.ExecutionID)
|
||||||
|
}
|
||||||
|
return task.NewHookHandler().Handle(ctx, tasks[0].ID, sc)
|
||||||
}
|
}
|
||||||
|
@ -15,97 +15,149 @@
|
|||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/goharbor/harbor/src/testing/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
schedulertesting "github.com/goharbor/harbor/src/testing/pkg/scheduler"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/goharbor/harbor/src/testing/mock"
|
||||||
|
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var sch *scheduler
|
|
||||||
|
|
||||||
type schedulerTestSuite struct {
|
type schedulerTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
|
scheduler *scheduler
|
||||||
|
dao *mockDAO
|
||||||
|
execMgr *tasktesting.FakeExecutionManager
|
||||||
|
taskMgr *tasktesting.FakeManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerTestSuite) SetupTest() {
|
func (s *schedulerTestSuite) SetupTest() {
|
||||||
t := s.T()
|
registry = map[string]CallbackFunc{}
|
||||||
// empty callback function registry before running every test case
|
err := RegisterCallbackFunc("callback", func(interface{}) error { return nil })
|
||||||
// and register a new callback function named "callback"
|
s.Require().Nil(err)
|
||||||
registry = make(map[string]CallbackFunc)
|
|
||||||
err := Register("callback", func(interface{}) error { return nil })
|
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
// recreate the scheduler object
|
s.dao = &mockDAO{}
|
||||||
sch = &scheduler{
|
s.execMgr = &tasktesting.FakeExecutionManager{}
|
||||||
jobserviceClient: &job.MockJobClient{},
|
s.taskMgr = &tasktesting.FakeManager{}
|
||||||
manager: &schedulertesting.FakeManager{},
|
|
||||||
|
s.scheduler = &scheduler{
|
||||||
|
dao: s.dao,
|
||||||
|
execMgr: s.execMgr,
|
||||||
|
taskMgr: s.taskMgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerTestSuite) TestRegister() {
|
|
||||||
t := s.T()
|
|
||||||
var name string
|
|
||||||
var callbackFun CallbackFunc
|
|
||||||
|
|
||||||
// empty name
|
|
||||||
err := Register(name, callbackFun)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// nil callback function
|
|
||||||
name = "test"
|
|
||||||
err = Register(name, callbackFun)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
callbackFun = func(interface{}) error { return nil }
|
|
||||||
err = Register(name, callbackFun)
|
|
||||||
require.Nil(t, err)
|
|
||||||
|
|
||||||
// duplicate name
|
|
||||||
err = Register(name, callbackFun)
|
|
||||||
require.NotNil(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *schedulerTestSuite) TestGetCallbackFunc() {
|
|
||||||
t := s.T()
|
|
||||||
// not exist
|
|
||||||
_, err := GetCallbackFunc("not-exist")
|
|
||||||
require.NotNil(t, err)
|
|
||||||
|
|
||||||
// pass
|
|
||||||
f, err := GetCallbackFunc("callback")
|
|
||||||
require.Nil(t, err)
|
|
||||||
assert.NotNil(t, f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *schedulerTestSuite) TestSchedule() {
|
func (s *schedulerTestSuite) TestSchedule() {
|
||||||
t := s.T()
|
// invalid cron
|
||||||
|
id, err := s.scheduler.Schedule(nil, "", "callback", nil)
|
||||||
|
s.NotNil(err)
|
||||||
|
|
||||||
// callback function not exist
|
// callback function not exist
|
||||||
_, err := sch.Schedule("0 * * * * *", "not-exist", nil)
|
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "not-exist", nil)
|
||||||
require.NotNil(t, err)
|
s.NotNil(err)
|
||||||
|
|
||||||
|
// failed to submit to jobservice
|
||||||
|
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{
|
||||||
|
ID: 1,
|
||||||
|
ExecutionID: 1,
|
||||||
|
Status: job.ErrorStatus.String(),
|
||||||
|
}, nil)
|
||||||
|
_, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
|
||||||
|
s.Require().NotNil(err)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
s.execMgr.AssertExpectations(s.T())
|
||||||
|
s.taskMgr.AssertExpectations(s.T())
|
||||||
|
|
||||||
|
// reset mocks
|
||||||
|
s.SetupTest()
|
||||||
|
|
||||||
// pass
|
// pass
|
||||||
id, err := sch.Schedule("0 * * * * *", "callback", nil)
|
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
require.Nil(t, err)
|
s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
assert.Equal(t, int64(1), id)
|
s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
|
s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{
|
||||||
|
ID: 1,
|
||||||
|
ExecutionID: 1,
|
||||||
|
Status: job.SuccessStatus.String(),
|
||||||
|
}, nil)
|
||||||
|
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
|
||||||
|
s.Require().Nil(err)
|
||||||
|
s.Equal(int64(1), id)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
s.execMgr.AssertExpectations(s.T())
|
||||||
|
s.taskMgr.AssertExpectations(s.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *schedulerTestSuite) TestUnSchedule() {
|
func (s *schedulerTestSuite) TestUnSchedule() {
|
||||||
t := s.T()
|
// not existing schedule
|
||||||
// schedule not exist
|
s.dao.On("Get", mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil))
|
||||||
err := sch.UnSchedule(1)
|
err := s.scheduler.UnSchedule(nil, 10000)
|
||||||
require.NotNil(t, err)
|
s.Nil(err)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
|
||||||
// schedule exist
|
// reset mocks
|
||||||
id, err := sch.Schedule("0 * * * * *", "callback", nil)
|
s.SetupTest()
|
||||||
require.Nil(t, err)
|
|
||||||
assert.Equal(t, int64(1), id)
|
|
||||||
|
|
||||||
err = sch.UnSchedule(id)
|
// the underlying task isn't stopped
|
||||||
require.Nil(t, err)
|
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||||
|
ID: 1,
|
||||||
|
CRON: "0 * * * * *",
|
||||||
|
ExecutionID: 1,
|
||||||
|
CallbackFuncName: "callback",
|
||||||
|
}, nil)
|
||||||
|
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||||
|
ID: 1,
|
||||||
|
Status: job.RunningStatus.String(),
|
||||||
|
}, nil)
|
||||||
|
err = s.scheduler.UnSchedule(nil, 1)
|
||||||
|
s.NotNil(err)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
s.execMgr.AssertExpectations(s.T())
|
||||||
|
|
||||||
|
// reset mocks
|
||||||
|
s.SetupTest()
|
||||||
|
|
||||||
|
// pass
|
||||||
|
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||||
|
ID: 1,
|
||||||
|
CRON: "0 * * * * *",
|
||||||
|
ExecutionID: 1,
|
||||||
|
CallbackFuncName: "callback",
|
||||||
|
}, nil)
|
||||||
|
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||||
|
ID: 1,
|
||||||
|
Status: job.StoppedStatus.String(),
|
||||||
|
}, nil)
|
||||||
|
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||||
|
err = s.scheduler.UnSchedule(nil, 1)
|
||||||
|
s.Nil(err)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
s.execMgr.AssertExpectations(s.T())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *schedulerTestSuite) TestGetSchedule() {
|
||||||
|
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||||
|
ID: 1,
|
||||||
|
CRON: "0 * * * * *",
|
||||||
|
ExecutionID: 1,
|
||||||
|
}, nil)
|
||||||
|
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||||
|
ID: 1,
|
||||||
|
Status: job.SuccessStatus.String(),
|
||||||
|
}, nil)
|
||||||
|
schedule, err := s.scheduler.GetSchedule(nil, 1)
|
||||||
|
s.Require().Nil(err)
|
||||||
|
s.Equal("0 * * * * *", schedule.CRON)
|
||||||
|
s.Equal(job.SuccessStatus.String(), schedule.Status)
|
||||||
|
s.dao.AssertExpectations(s.T())
|
||||||
|
s.execMgr.AssertExpectations(s.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScheduler(t *testing.T) {
|
func TestScheduler(t *testing.T) {
|
||||||
|
@ -28,8 +28,8 @@ var (
|
|||||||
// CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook
|
// CheckInProcessor is the processor to process the check in data which is sent by jobservice via webhook
|
||||||
type CheckInProcessor func(ctx context.Context, task *Task, change *job.StatusChange) (err error)
|
type CheckInProcessor func(ctx context.Context, task *Task, change *job.StatusChange) (err error)
|
||||||
|
|
||||||
// Register check in processor for the specific vendor type
|
// RegisterCheckInProcessor registers check in processor for the specific vendor type
|
||||||
func Register(vendorType string, processor CheckInProcessor) error {
|
func RegisterCheckInProcessor(vendorType string, processor CheckInProcessor) error {
|
||||||
if _, exist := registry[vendorType]; exist {
|
if _, exist := registry[vendorType]; exist {
|
||||||
return fmt.Errorf("check in processor for %s already exists", vendorType)
|
return fmt.Errorf("check in processor for %s already exists", vendorType)
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,11 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegister(t *testing.T) {
|
func TestRegisterCheckInProcessor(t *testing.T) {
|
||||||
err := Register("test", nil)
|
err := RegisterCheckInProcessor("test", nil)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
// already exist
|
// already exist
|
||||||
err = Register("test", nil)
|
err = RegisterCheckInProcessor("test", nil)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ func (e *executionDAO) Delete(ctx context.Context, id int64) error {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if e := orm.AsForeignKeyError(err,
|
if e := orm.AsForeignKeyError(err,
|
||||||
"the execution %d is referenced by other tasks", id); e != nil {
|
"the execution %d is referenced by other resources", id); e != nil {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -79,7 +79,8 @@ func (m *manager) Create(ctx context.Context, executionID int64, jb *Job, extraA
|
|||||||
jobID, err := m.submitJob(ctx, id, jb)
|
jobID, err := m.submitJob(ctx, id, jb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// failed to submit job to jobservice, update the status of task to error
|
// failed to submit job to jobservice, update the status of task to error
|
||||||
log.Errorf("failed to submit job to jobservice: %v", err)
|
err = fmt.Errorf("failed to submit job to jobservice: %v", err)
|
||||||
|
log.Error(err)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
err = m.dao.Update(ctx, &dao.Task{
|
err = m.dao.Update(ctx, &dao.Task{
|
||||||
ID: id,
|
ID: id,
|
||||||
@ -155,12 +156,9 @@ func (m *manager) Stop(ctx context.Context, id int64) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the task is already in final status, return directly
|
// when a task is in final status, if it's a periodic or retrying job it will
|
||||||
if job.Status(task.Status).Final() {
|
// run again in the near future, so we must operate the stop action to these final
|
||||||
log.Debugf("the task %d is in final status %s, skip", task.ID, task.Status)
|
// status jobs as well
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = m.jsClient.PostAction(task.JobID, string(job.StopCommand)); err != nil {
|
if err = m.jsClient.PostAction(task.JobID, string(job.StopCommand)); err != nil {
|
||||||
// job not found, update it's status to stop directly
|
// job not found, update it's status to stop directly
|
||||||
if err == cjob.ErrJobNotFound {
|
if err == cjob.ErrJobNotFound {
|
||||||
|
@ -70,21 +70,7 @@ func (t *taskManagerTestSuite) TestCreate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskManagerTestSuite) TestStop() {
|
func (t *taskManagerTestSuite) TestStop() {
|
||||||
// the task is in final status
|
// job not found
|
||||||
t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
|
||||||
ID: 1,
|
|
||||||
ExecutionID: 1,
|
|
||||||
Status: job.SuccessStatus.String(),
|
|
||||||
}, nil)
|
|
||||||
|
|
||||||
err := t.mgr.Stop(nil, 1)
|
|
||||||
t.Require().Nil(err)
|
|
||||||
t.dao.AssertExpectations(t.T())
|
|
||||||
|
|
||||||
// reset mock
|
|
||||||
t.SetupTest()
|
|
||||||
|
|
||||||
// the task isn't in final status, job not found
|
|
||||||
t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
||||||
ID: 1,
|
ID: 1,
|
||||||
ExecutionID: 1,
|
ExecutionID: 1,
|
||||||
@ -93,7 +79,7 @@ func (t *taskManagerTestSuite) TestStop() {
|
|||||||
t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound)
|
t.jsClient.On("PostAction", mock.Anything, mock.Anything).Return(cjob.ErrJobNotFound)
|
||||||
t.dao.On("Update", mock.Anything, mock.Anything,
|
t.dao.On("Update", mock.Anything, mock.Anything,
|
||||||
mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
err = t.mgr.Stop(nil, 1)
|
err := t.mgr.Stop(nil, 1)
|
||||||
t.Require().Nil(err)
|
t.Require().Nil(err)
|
||||||
t.dao.AssertExpectations(t.T())
|
t.dao.AssertExpectations(t.T())
|
||||||
t.jsClient.AssertExpectations(t.T())
|
t.jsClient.AssertExpectations(t.T())
|
||||||
@ -101,7 +87,7 @@ func (t *taskManagerTestSuite) TestStop() {
|
|||||||
// reset mock
|
// reset mock
|
||||||
t.SetupTest()
|
t.SetupTest()
|
||||||
|
|
||||||
// the task isn't in final status
|
// pass
|
||||||
t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
t.dao.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
||||||
ID: 1,
|
ID: 1,
|
||||||
ExecutionID: 1,
|
ExecutionID: 1,
|
||||||
|
@ -20,7 +20,9 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
libhttp "github.com/goharbor/harbor/src/lib/http"
|
libhttp "github.com/goharbor/harbor/src/lib/http"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
"github.com/goharbor/harbor/src/pkg/task"
|
"github.com/goharbor/harbor/src/pkg/task"
|
||||||
"github.com/goharbor/harbor/src/server/router"
|
"github.com/goharbor/harbor/src/server/router"
|
||||||
)
|
)
|
||||||
@ -52,6 +54,11 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err = j.handler.Handle(r.Context(), taskID, sc); err != nil {
|
if err = j.handler.Handle(r.Context(), taskID, sc); err != nil {
|
||||||
|
// ignore the not found error to avoid the jobservice re-sending the hook
|
||||||
|
if errors.IsNotFoundErr(err) {
|
||||||
|
log.Warningf("got the status change hook for a non existing task %d", taskID)
|
||||||
|
return
|
||||||
|
}
|
||||||
libhttp.SendError(w, err)
|
libhttp.SendError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
17
src/testing/pkg/scheduler/mock.go
Normal file
17
src/testing/pkg/scheduler/mock.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
//go:generate mockery -dir ../../../pkg/scheduler -name Scheduler -output . -outpkg scheduler
|
@ -1,77 +1,73 @@
|
|||||||
// Copyright Project Harbor Authors
|
// Code generated by mockery v1.1.2. DO NOT EDIT.
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
context "context"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/pkg/scheduler/model"
|
scheduler "github.com/goharbor/harbor/src/pkg/scheduler"
|
||||||
|
mock "github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeManager ...
|
// Scheduler is an autogenerated mock type for the Scheduler type
|
||||||
type FakeManager struct {
|
type Scheduler struct {
|
||||||
idCounter int64
|
mock.Mock
|
||||||
Schedules []*model.Schedule
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create ...
|
// GetSchedule provides a mock function with given fields: ctx, id
|
||||||
func (f *FakeManager) Create(schedule *model.Schedule) (int64, error) {
|
func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) {
|
||||||
f.idCounter++
|
ret := _m.Called(ctx, id)
|
||||||
id := f.idCounter
|
|
||||||
schedule.ID = id
|
|
||||||
f.Schedules = append(f.Schedules, schedule)
|
|
||||||
return id, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update ...
|
var r0 *scheduler.Schedule
|
||||||
func (f *FakeManager) Update(schedule *model.Schedule, props ...string) error {
|
if rf, ok := ret.Get(0).(func(context.Context, int64) *scheduler.Schedule); ok {
|
||||||
for i, sch := range f.Schedules {
|
r0 = rf(ctx, id)
|
||||||
if sch.ID == schedule.ID {
|
} else {
|
||||||
f.Schedules[i] = schedule
|
if ret.Get(0) != nil {
|
||||||
return nil
|
r0 = ret.Get(0).(*scheduler.Schedule)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("the execution %d not found", schedule.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete ...
|
var r1 error
|
||||||
func (f *FakeManager) Delete(id int64) error {
|
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||||
length := len(f.Schedules)
|
r1 = rf(ctx, id)
|
||||||
for i, sch := range f.Schedules {
|
} else {
|
||||||
if sch.ID == id {
|
r1 = ret.Error(1)
|
||||||
f.Schedules = f.Schedules[:i]
|
|
||||||
if i != length-1 {
|
|
||||||
f.Schedules = append(f.Schedules, f.Schedules[i+1:]...)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return fmt.Errorf("the execution %d not found", id)
|
|
||||||
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get ...
|
// Schedule provides a mock function with given fields: ctx, cron, callbackFuncName, params
|
||||||
func (f *FakeManager) Get(id int64) (*model.Schedule, error) {
|
func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||||
for _, sch := range f.Schedules {
|
ret := _m.Called(ctx, cron, callbackFuncName, params)
|
||||||
if sch.ID == id {
|
|
||||||
return sch, nil
|
var r0 int64
|
||||||
}
|
if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}) int64); ok {
|
||||||
|
r0 = rf(ctx, cron, callbackFuncName, params)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(int64)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("the execution %d not found", id)
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}) error); ok {
|
||||||
|
r1 = rf(ctx, cron, callbackFuncName, params)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
// List ...
|
// UnSchedule provides a mock function with given fields: ctx, id
|
||||||
func (f *FakeManager) List(...*model.ScheduleQuery) ([]*model.Schedule, error) {
|
func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||||
return f.Schedules, nil
|
ret := _m.Called(ctx, id)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
||||||
|
r0 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user