mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 18:25:56 +01:00
Merge pull request #12610 from ywk253100/200727_scheduler
Do some refine for the scheduler
This commit is contained in:
commit
3b6cdb8732
@ -68,15 +68,16 @@ CREATE TABLE IF NOT EXISTS p2p_preheat_policy (
|
||||
UNIQUE (name, project_id)
|
||||
);
|
||||
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_type varchar(16);
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_id int;
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS cron varchar(64);
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS execution_id int;
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_name varchar(128);
|
||||
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_param text;
|
||||
|
||||
/*abstract the cron, callback function parameters from table retention_policy*/
|
||||
UPDATE schedule
|
||||
SET cron = retention.cron, callback_func_name = 'RetentionCallback',
|
||||
callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}')
|
||||
SET vendor_type= 'RETENTION', vendor_id=retention.id, cron = retention.cron,
|
||||
callback_func_name = 'RETENTION', callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}')
|
||||
FROM (
|
||||
SELECT id, data::json->'trigger'->'references'->>'job_id' AS schedule_id,
|
||||
data::json->'trigger'->'settings'->>'cron' AS cron
|
||||
@ -93,7 +94,7 @@ DECLARE
|
||||
BEGIN
|
||||
FOR sched IN SELECT * FROM schedule
|
||||
LOOP
|
||||
INSERT INTO execution (vendor_type, trigger) VALUES ('SCHEDULER', 'MANUAL') RETURNING id INTO exec_id;
|
||||
INSERT INTO execution (vendor_type, vendor_id, trigger) VALUES ('SCHEDULER', sched.id, 'MANUAL') RETURNING id INTO exec_id;
|
||||
IF sched.status = 'Pending' THEN
|
||||
status_code = 0;
|
||||
ELSIF sched.status = 'Scheduled' THEN
|
||||
@ -106,11 +107,8 @@ BEGIN
|
||||
status_code = 0;
|
||||
END IF;
|
||||
INSERT INTO task (execution_id, job_id, status, status_code, status_revision, run_count) VALUES (exec_id, sched.job_id, sched.status, status_code, 0, 0);
|
||||
UPDATE schedule SET execution_id=exec_id WHERE id = sched.id;
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
ALTER TABLE schedule DROP COLUMN IF EXISTS job_id;
|
||||
ALTER TABLE schedule DROP COLUMN IF EXISTS status;
|
||||
|
||||
ALTER TABLE schedule ADD CONSTRAINT schedule_execution FOREIGN KEY (execution_id) REFERENCES execution(id);
|
||||
ALTER TABLE schedule DROP COLUMN IF EXISTS status;
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/instance"
|
||||
@ -238,8 +239,8 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
|
||||
len(schema.Trigger.Settings.Cron) > 0 {
|
||||
// schedule and update policy
|
||||
schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id})
|
||||
if err != nil {
|
||||
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, schema.Trigger.Settings.Cron,
|
||||
SchedulerCallback, TriggerParam{PolicyID: id}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -248,7 +249,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if e := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID); e != nil {
|
||||
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil {
|
||||
return 0, errors.Wrap(e, err.Error())
|
||||
}
|
||||
|
||||
@ -293,12 +294,12 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
}
|
||||
|
||||
var cron = schema.Trigger.Settings.Cron
|
||||
var oldJobID = s0.Trigger.Settings.JobID
|
||||
var oldCron = s0.Trigger.Settings.Cron
|
||||
var needUn bool
|
||||
var needSch bool
|
||||
|
||||
if s0.Trigger.Type != schema.Trigger.Type {
|
||||
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 {
|
||||
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && len(oldCron) > 0 {
|
||||
needUn = true
|
||||
}
|
||||
if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 {
|
||||
@ -309,7 +310,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
if schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
|
||||
s0.Trigger.Settings.Cron != cron {
|
||||
// unschedule old
|
||||
if oldJobID > 0 {
|
||||
if len(oldCron) > 0 {
|
||||
needUn = true
|
||||
}
|
||||
// schedule new
|
||||
@ -323,7 +324,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
|
||||
// unschedule old
|
||||
if needUn {
|
||||
err = c.scheduler.UnSchedule(ctx, oldJobID)
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -331,11 +332,10 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
|
||||
// schedule new
|
||||
if needSch {
|
||||
jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID})
|
||||
if err != nil {
|
||||
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, cron, SchedulerCallback,
|
||||
TriggerParam{PolicyID: schema.ID}); err != nil {
|
||||
return err
|
||||
}
|
||||
schema.Trigger.Settings.JobID = jobid
|
||||
if err := schema.Encode(); err != nil {
|
||||
// Possible
|
||||
// TODO: Refactor
|
||||
@ -355,8 +355,8 @@ func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 {
|
||||
err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID)
|
||||
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 {
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -204,10 +204,10 @@ func (s *preheatSuite) TestCreatePolicy() {
|
||||
FiltersStr: `[{"type":"repository","value":"harbor*"},{"type":"tag","value":"2*"}]`,
|
||||
TriggerStr: fmt.Sprintf(`{"type":"%s", "trigger_setting":{"cron":"* * * * */1"}}`, policy.TriggerTypeScheduled),
|
||||
}
|
||||
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil)
|
||||
s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil)
|
||||
s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil)
|
||||
s.fakeScheduler.On("UnScheduleByVendor", s.ctx, mock.Anything, mock.Anything).Return(nil)
|
||||
id, err := s.controller.CreatePolicy(s.ctx, policy)
|
||||
s.NoError(err)
|
||||
s.Equal(int64(1), id)
|
||||
@ -231,7 +231,6 @@ func (s *preheatSuite) TestGetPolicyByName() {
|
||||
|
||||
func (s *preheatSuite) TestUpdatePolicy() {
|
||||
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||
p0.Trigger.Settings.JobID = 1
|
||||
p0.Trigger.Settings.Cron = "* * * * */1"
|
||||
p0.Filters = []*policy.Filter{
|
||||
{
|
||||
@ -272,7 +271,6 @@ func (s *preheatSuite) TestUpdatePolicy() {
|
||||
|
||||
func (s *preheatSuite) TestDeletePolicy() {
|
||||
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||
p0.Trigger.Settings.JobID = 1
|
||||
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
|
||||
s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil)
|
||||
err := s.controller.DeletePolicy(s.ctx, 1)
|
||||
|
@ -99,8 +99,7 @@ type Trigger struct {
|
||||
Type TriggerType `json:"type"`
|
||||
Settings struct {
|
||||
// The cron string for scheduled trigger.
|
||||
Cron string `json:"cron,omitempty"`
|
||||
JobID int64 `json:"job_id,omitempty"`
|
||||
Cron string `json:"cron,omitempty"`
|
||||
} `json:"trigger_setting,omitempty"`
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,8 @@ type DefaultAPIController struct {
|
||||
|
||||
const (
|
||||
// SchedulerCallback ...
|
||||
SchedulerCallback = "RetentionCallback"
|
||||
SchedulerCallback = "RETENTION"
|
||||
schedulerVendorType = "RETENTION"
|
||||
)
|
||||
|
||||
// TriggerParam ...
|
||||
@ -82,26 +83,23 @@ func (r *DefaultAPIController) GetRetention(id int64) (*policy.Metadata, error)
|
||||
|
||||
// CreateRetention Create Retention
|
||||
func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) (int64, error) {
|
||||
if p.Trigger.Kind == policy.TriggerKindSchedule {
|
||||
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
|
||||
if ok && len(cron.(string)) > 0 {
|
||||
jobid, err := r.scheduler.Schedule(orm.Context(), cron.(string), SchedulerCallback, TriggerParam{
|
||||
PolicyID: p.ID,
|
||||
Trigger: ExecutionTriggerSchedule,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if p.Trigger.References == nil {
|
||||
p.Trigger.References = map[string]interface{}{}
|
||||
}
|
||||
p.Trigger.References[policy.TriggerReferencesJobid] = jobid
|
||||
}
|
||||
}
|
||||
id, err := r.manager.CreatePolicy(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if p.Trigger.Kind == policy.TriggerKindSchedule {
|
||||
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
|
||||
if ok && len(cron.(string)) > 0 {
|
||||
if _, err = r.scheduler.Schedule(orm.Context(), schedulerVendorType, id, cron.(string), SchedulerCallback, TriggerParam{
|
||||
PolicyID: id,
|
||||
Trigger: ExecutionTriggerSchedule,
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
@ -142,24 +140,26 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error {
|
||||
return fmt.Errorf("not support Trigger %s", p.Trigger.Kind)
|
||||
}
|
||||
}
|
||||
if err = r.manager.UpdatePolicy(p); err != nil {
|
||||
return err
|
||||
}
|
||||
if needUn {
|
||||
err = r.scheduler.UnSchedule(orm.Context(), p0.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
||||
err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, p.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if needSch {
|
||||
jobid, err := r.scheduler.Schedule(orm.Context(), p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
|
||||
_, err := r.scheduler.Schedule(orm.Context(), schedulerVendorType, p.ID, p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
|
||||
PolicyID: p.ID,
|
||||
Trigger: ExecutionTriggerSchedule,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.Trigger.References[policy.TriggerReferencesJobid] = jobid
|
||||
}
|
||||
|
||||
return r.manager.UpdatePolicy(p)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteRetention Delete Retention
|
||||
@ -169,7 +169,7 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error {
|
||||
return err
|
||||
}
|
||||
if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 {
|
||||
err = r.scheduler.UnSchedule(orm.Context(), p.Trigger.References[policy.TriggerReferencesJobid].(int64))
|
||||
err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||
@ -219,17 +220,23 @@ func (s *ControllerTestSuite) TestExecution() {
|
||||
type fakeRetentionScheduler struct {
|
||||
}
|
||||
|
||||
func (f *fakeRetentionScheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||
func (f *fakeRetentionScheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||
return 111, nil
|
||||
}
|
||||
|
||||
func (f *fakeRetentionScheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||
func (f *fakeRetentionScheduler) UnScheduleByID(ctx context.Context, id int64) error {
|
||||
return nil
|
||||
}
|
||||
func (f *fakeRetentionScheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeRetentionScheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakeRetentionScheduler) ListSchedules(ctx context.Context, q *q.Query) ([]*scheduler.Schedule, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type fakeLauncher struct {
|
||||
}
|
||||
|
@ -120,11 +120,6 @@ func (d *DefaultManager) GetPolicy(id int64) (*policy.Metadata, error) {
|
||||
return nil, err
|
||||
}
|
||||
p.ID = id
|
||||
if p.Trigger.Settings != nil {
|
||||
if _, ok := p.Trigger.References[policy.TriggerReferencesJobid]; ok {
|
||||
p.Trigger.References[policy.TriggerReferencesJobid] = int64(p.Trigger.References[policy.TriggerReferencesJobid].(float64))
|
||||
}
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
@ -28,8 +28,6 @@ const (
|
||||
// TriggerKindSchedule Schedule
|
||||
TriggerKindSchedule = "Schedule"
|
||||
|
||||
// TriggerReferencesJobid job_id
|
||||
TriggerReferencesJobid = "job_id"
|
||||
// TriggerSettingsCron cron
|
||||
TriggerSettingsCron = "cron"
|
||||
|
||||
@ -97,10 +95,6 @@ type Trigger struct {
|
||||
// Settings for the specified trigger
|
||||
// '[cron]="* 22 11 * * *"' for the 'Schedule'
|
||||
Settings map[string]interface{} `json:"settings" valid:"Required"`
|
||||
|
||||
// References of the trigger
|
||||
// e.g: schedule job ID
|
||||
References map[string]interface{} `json:"references"`
|
||||
}
|
||||
|
||||
// Scope definition
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
)
|
||||
|
||||
@ -70,20 +69,21 @@ func callbackFuncExist(name string) bool {
|
||||
}
|
||||
|
||||
func triggerCallback(ctx context.Context, task *task.Task, change *job.StatusChange) (err error) {
|
||||
schedules, err := Sched.(*scheduler).dao.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ExecutionID": task.ExecutionID,
|
||||
},
|
||||
})
|
||||
execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(schedules) == 0 {
|
||||
return fmt.Errorf("the schedule whose execution ID is %d not found", task.ExecutionID)
|
||||
if execution.VendorType != JobNameScheduler {
|
||||
return fmt.Errorf("the vendor type of execution %d isn't %s: %s",
|
||||
task.ExecutionID, JobNameScheduler, execution.VendorType)
|
||||
}
|
||||
callbackFunc, err := getCallbackFunc(schedules[0].CallbackFuncName)
|
||||
schedule, err := Sched.(*scheduler).dao.Get(ctx, execution.VendorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return callbackFunc(schedules[0].CallbackFuncParam)
|
||||
callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return callbackFunc(schedule.CallbackFuncParam)
|
||||
}
|
||||
|
@ -30,8 +30,9 @@ func init() {
|
||||
|
||||
type schedule struct {
|
||||
ID int64 `orm:"pk;auto;column(id)"`
|
||||
VendorType string `orm:"column(vendor_type)"`
|
||||
VendorID int64 `orm:"column(vendor_id)"`
|
||||
CRON string `orm:"column(cron)"`
|
||||
ExecutionID int64 `orm:"column(execution_id)"`
|
||||
CallbackFuncName string `orm:"column(callback_func_name)"`
|
||||
CallbackFuncParam string `orm:"column(callback_func_param)"`
|
||||
CreationTime time.Time `orm:"column(creation_time)"`
|
||||
@ -56,10 +57,6 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) {
|
||||
}
|
||||
id, err := ormer.Insert(schedule)
|
||||
if err != nil {
|
||||
if e := orm.AsForeignKeyError(err,
|
||||
"the schedule tries to reference a non existing execution %d", schedule.ExecutionID); e != nil {
|
||||
err = e
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
|
@ -22,33 +22,27 @@ import (
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type daoTestSuite struct {
|
||||
suite.Suite
|
||||
dao DAO
|
||||
execMgr task.ExecutionManager
|
||||
ctx context.Context
|
||||
id int64
|
||||
execID int64
|
||||
dao DAO
|
||||
ctx context.Context
|
||||
id int64
|
||||
}
|
||||
|
||||
func (d *daoTestSuite) SetupSuite() {
|
||||
d.dao = &dao{}
|
||||
d.execMgr = task.NewExecutionManager()
|
||||
common_dao.PrepareTestForPostgresSQL()
|
||||
d.ctx = orm.Context()
|
||||
}
|
||||
|
||||
func (d *daoTestSuite) SetupTest() {
|
||||
execID, err := d.execMgr.Create(d.ctx, "vendor", 0, "trigger")
|
||||
d.Require().Nil(err)
|
||||
d.execID = execID
|
||||
schedule := &schedule{
|
||||
VendorType: "Vendor",
|
||||
VendorID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
ExecutionID: execID,
|
||||
CallbackFuncName: "callback_func_01",
|
||||
CallbackFuncParam: "callback_func_params",
|
||||
}
|
||||
@ -59,19 +53,10 @@ func (d *daoTestSuite) SetupTest() {
|
||||
|
||||
func (d *daoTestSuite) TearDownTest() {
|
||||
d.Require().Nil(d.dao.Delete(d.ctx, d.id))
|
||||
d.Require().Nil(d.execMgr.Delete(d.ctx, d.execID))
|
||||
}
|
||||
|
||||
func (d *daoTestSuite) TestCreate() {
|
||||
// the happy pass is covered in SetupTest
|
||||
|
||||
// foreign key error
|
||||
_, err := d.dao.Create(d.ctx, &schedule{
|
||||
CRON: "0 * * * * *",
|
||||
ExecutionID: 10000,
|
||||
CallbackFuncName: "callback_func",
|
||||
})
|
||||
d.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode))
|
||||
}
|
||||
|
||||
func (d *daoTestSuite) TestList() {
|
||||
|
@ -20,9 +20,11 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
beegoorm "github.com/astaxie/beego/orm"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
cronlib "github.com/robfig/cron"
|
||||
@ -36,6 +38,8 @@ var (
|
||||
// Schedule describes the detail information about the created schedule
|
||||
type Schedule struct {
|
||||
ID int64 `json:"id"`
|
||||
VendorType string `json:"vendor_type"`
|
||||
VendorID int64 `json:"vendor_id"`
|
||||
CRON string `json:"cron"`
|
||||
Status string `json:"status"` // status of the underlying task(jobservice job)
|
||||
CreationTime time.Time `json:"creation_time"`
|
||||
@ -49,13 +53,20 @@ type Schedule struct {
|
||||
type Scheduler interface {
|
||||
// Schedule creates a task which calls the specified callback function periodically
|
||||
// The callback function needs to be registered first
|
||||
// The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.),
|
||||
// and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention).
|
||||
// The "params" is passed to the callback function as encoded json string, so the callback
|
||||
// function must decode it before using
|
||||
Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error)
|
||||
// UnSchedule the created schedule instance
|
||||
UnSchedule(ctx context.Context, id int64) error
|
||||
Schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
|
||||
callbackFuncName string, params interface{}) (int64, error)
|
||||
// UnScheduleByID the schedule specified by ID
|
||||
UnScheduleByID(ctx context.Context, id int64) error
|
||||
// UnScheduleByVendor the schedule specified by vendor
|
||||
UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error
|
||||
// GetSchedule gets the schedule specified by ID
|
||||
GetSchedule(ctx context.Context, id int64) (*Schedule, error)
|
||||
// ListSchedules according to the query
|
||||
ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error)
|
||||
}
|
||||
|
||||
// New returns an instance of the default scheduler
|
||||
@ -73,7 +84,38 @@ type scheduler struct {
|
||||
taskMgr task.Manager
|
||||
}
|
||||
|
||||
func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||
// Currently all database operations inside one request handling are covered by
|
||||
// one transaction, which means if any one of the operations fails, all of them
|
||||
// will be roll back. As the scheduler creates jobservice jobs that cannot be
|
||||
// roll back by the transaction, this will cause some unexpected data inconsistence
|
||||
// in some cases.
|
||||
// The implementation of "Schedule" replaces the ormer with a new one in the context
|
||||
// to out of control from the global transaction, and uses a new transaction that only
|
||||
// covers the logic inside the function
|
||||
func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
|
||||
callbackFuncName string, params interface{}) (int64, error) {
|
||||
var scheduleID int64
|
||||
f := func(ctx context.Context) error {
|
||||
id, err := s.schedule(ctx, vendorType, vendorID, cron, callbackFuncName, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
scheduleID = id
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx = orm.NewContext(ctx, beegoorm.NewOrm())
|
||||
if err := orm.WithTransaction(f)(ctx); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return scheduleID, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
|
||||
callbackFuncName string, params interface{}) (int64, error) {
|
||||
if len(vendorType) == 0 {
|
||||
return 0, fmt.Errorf("empty vendor type")
|
||||
}
|
||||
if _, err := cronlib.Parse(cron); err != nil {
|
||||
return 0, errors.New(nil).WithCode(errors.BadRequestCode).
|
||||
WithMessage("invalid cron %s: %v", cron, err)
|
||||
@ -82,15 +124,11 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
|
||||
return 0, fmt.Errorf("callback function %s not found", callbackFuncName)
|
||||
}
|
||||
|
||||
execID, err := s.execMgr.Create(ctx, JobNameScheduler, 0, task.ExecutionTriggerManual)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
sched := &schedule{
|
||||
VendorType: vendorType,
|
||||
VendorID: vendorID,
|
||||
CRON: cron,
|
||||
ExecutionID: execID,
|
||||
CallbackFuncName: callbackFuncName,
|
||||
CreationTime: now,
|
||||
UpdateTime: now,
|
||||
@ -102,15 +140,19 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
|
||||
}
|
||||
sched.CallbackFuncParam = string(paramsData)
|
||||
}
|
||||
|
||||
// create schedule record
|
||||
// when status/checkin hook comes, the database record must exist,
|
||||
// when checkin hook comes, the database record must exist,
|
||||
// so the database record must be created first before submitting job
|
||||
id, err := s.dao.Create(ctx, sched)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
|
||||
Name: JobNameScheduler,
|
||||
Metadata: &job.Metadata{
|
||||
@ -121,6 +163,15 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// make sure the created task is stopped if got any error in the following steps
|
||||
defer func() {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if err := s.taskMgr.Stop(ctx, taskID); err != nil {
|
||||
log.Errorf("failed to stop the task %d: %v", taskID, err)
|
||||
}
|
||||
}()
|
||||
// when task manager creating a task, it creates the task database record first and
|
||||
// then submits the job to jobservice. If the submitting failed, it doesn't return
|
||||
// any error. So we check the task status to make sure the job is submitted to jobservice
|
||||
@ -130,44 +181,76 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
|
||||
return 0, err
|
||||
}
|
||||
if task.Status == job.ErrorStatus.String() {
|
||||
return 0, fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
|
||||
// assign the error to "err" to trigger the defer function to clean up the created task
|
||||
err = fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||
schedule, err := s.dao.Get(ctx, id)
|
||||
func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {
|
||||
executions, err := s.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": JobNameScheduler,
|
||||
"VendorID": id,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
if errors.IsNotFoundErr(err) {
|
||||
log.Warningf("trying to unschedule a non existing schedule %d, skip directly", id)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err = s.execMgr.Stop(ctx, schedule.ExecutionID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// after the stop called, the execution cannot be stopped immediately,
|
||||
// use the for loop to make sure the execution be in final status before deleting it
|
||||
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
|
||||
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
|
||||
if err != nil {
|
||||
if len(executions) > 0 {
|
||||
executionID := executions[0].ID
|
||||
if err = s.execMgr.Stop(ctx, executionID); err != nil {
|
||||
return err
|
||||
}
|
||||
if job.Status(exec.Status).Final() {
|
||||
// delete schedule record
|
||||
if err = s.dao.Delete(ctx, id); err != nil {
|
||||
final := false
|
||||
// after the stop called, the execution cannot be stopped immediately, and the execution
|
||||
// cannot be deleted if it's status isn't in final status, so use the for loop to make
|
||||
// sure the execution be in final status before deleting it
|
||||
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
|
||||
exec, err := s.execMgr.Get(ctx, executionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// delete execution
|
||||
return s.execMgr.Delete(ctx, schedule.ExecutionID)
|
||||
if job.Status(exec.Status).Final() {
|
||||
final = true
|
||||
break
|
||||
}
|
||||
time.Sleep(t)
|
||||
}
|
||||
if !final {
|
||||
return fmt.Errorf("failed to unschedule the schedule %d: the execution %d isn't in final status", id, executionID)
|
||||
}
|
||||
// delete execution
|
||||
if err = s.execMgr.Delete(ctx, executionID); err != nil {
|
||||
return err
|
||||
}
|
||||
time.Sleep(t)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to unschedule the schedule %d: the execution isn't in final status", id)
|
||||
// delete schedule record
|
||||
return s.dao.Delete(ctx, id)
|
||||
}
|
||||
|
||||
func (s *scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
|
||||
q := &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": vendorType,
|
||||
},
|
||||
}
|
||||
if vendorID > 0 {
|
||||
q.Keywords["VendorID"] = vendorID
|
||||
}
|
||||
schedules, err := s.dao.List(ctx, q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, schedule := range schedules {
|
||||
if err = s.UnScheduleByID(ctx, schedule.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) {
|
||||
@ -175,17 +258,49 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.convertSchedule(ctx, schedule)
|
||||
}
|
||||
|
||||
func (s *scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) {
|
||||
schedules, err := s.dao.List(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var scheds []*Schedule
|
||||
for _, schedule := range schedules {
|
||||
sched, err := s.convertSchedule(ctx, schedule)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scheds = append(scheds, sched)
|
||||
}
|
||||
return scheds, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) convertSchedule(ctx context.Context, schedule *schedule) (*Schedule, error) {
|
||||
schd := &Schedule{
|
||||
ID: schedule.ID,
|
||||
VendorType: schedule.VendorType,
|
||||
VendorID: schedule.VendorID,
|
||||
CRON: schedule.CRON,
|
||||
CreationTime: schedule.CreationTime,
|
||||
UpdateTime: schedule.UpdateTime,
|
||||
}
|
||||
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
|
||||
executions, err := s.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": JobNameScheduler,
|
||||
"VendorID": schedule.ID,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
schd.Status = exec.Status
|
||||
if len(executions) == 0 {
|
||||
// if no execution found for the schedule, mark it's status as error
|
||||
schd.Status = job.ErrorStatus.String()
|
||||
} else {
|
||||
schd.Status = executions[0].Status
|
||||
}
|
||||
return schd, nil
|
||||
}
|
||||
|
||||
@ -195,13 +310,23 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error
|
||||
// We can remove the method and the hook endpoint after several releases
|
||||
func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChange) error {
|
||||
scheduler := Sched.(*scheduler)
|
||||
schedule, err := scheduler.dao.Get(ctx, scheduleID)
|
||||
executions, err := scheduler.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": JobNameScheduler,
|
||||
"VendorID": scheduleID,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(executions) == 0 {
|
||||
return errors.New(nil).WithCode(errors.NotFoundCode).
|
||||
WithMessage("no execution found for the schedule %d", scheduleID)
|
||||
}
|
||||
|
||||
tasks, err := scheduler.taskMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ExecutionID": schedule.ExecutionID,
|
||||
"ExecutionID": executions[0].ID,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -209,7 +334,7 @@ func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChang
|
||||
}
|
||||
if len(tasks) == 0 {
|
||||
return errors.New(nil).WithCode(errors.NotFoundCode).
|
||||
WithMessage("no task references the execution %d", schedule.ExecutionID)
|
||||
WithMessage("no task found for the execution %d", executions[0].ID)
|
||||
}
|
||||
return task.NewHookHandler().Handle(ctx, tasks[0].ID, sc)
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package scheduler
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
"github.com/goharbor/harbor/src/testing/mock"
|
||||
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
|
||||
@ -49,24 +48,29 @@ func (s *schedulerTestSuite) SetupTest() {
|
||||
}
|
||||
|
||||
func (s *schedulerTestSuite) TestSchedule() {
|
||||
// empty vendor type
|
||||
id, err := s.scheduler.Schedule(nil, "", 0, "0 * * * * *", "callback", nil)
|
||||
s.NotNil(err)
|
||||
|
||||
// invalid cron
|
||||
id, err := s.scheduler.Schedule(nil, "", "callback", nil)
|
||||
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "callback", nil)
|
||||
s.NotNil(err)
|
||||
|
||||
// callback function not exist
|
||||
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "not-exist", nil)
|
||||
id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "not-exist", nil)
|
||||
s.NotNil(err)
|
||||
|
||||
// failed to submit to jobservice
|
||||
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{
|
||||
ID: 1,
|
||||
ExecutionID: 1,
|
||||
Status: job.ErrorStatus.String(),
|
||||
}, nil)
|
||||
_, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
|
||||
s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
_, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param")
|
||||
s.Require().NotNil(err)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
@ -84,7 +88,7 @@ func (s *schedulerTestSuite) TestSchedule() {
|
||||
ExecutionID: 1,
|
||||
Status: job.SuccessStatus.String(),
|
||||
}, nil)
|
||||
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
|
||||
id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param")
|
||||
s.Require().Nil(err)
|
||||
s.Equal(int64(1), id)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
@ -92,29 +96,19 @@ func (s *schedulerTestSuite) TestSchedule() {
|
||||
s.taskMgr.AssertExpectations(s.T())
|
||||
}
|
||||
|
||||
func (s *schedulerTestSuite) TestUnSchedule() {
|
||||
// not existing schedule
|
||||
s.dao.On("Get", mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil))
|
||||
err := s.scheduler.UnSchedule(nil, 10000)
|
||||
s.Nil(err)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
|
||||
// reset mocks
|
||||
s.SetupTest()
|
||||
|
||||
func (s *schedulerTestSuite) TestUnScheduleByID() {
|
||||
// the underlying task isn't stopped
|
||||
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||
ID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
ExecutionID: 1,
|
||||
CallbackFuncName: "callback",
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil)
|
||||
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
ID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
}, nil)
|
||||
err = s.scheduler.UnSchedule(nil, 1)
|
||||
err := s.scheduler.UnScheduleByID(nil, 1)
|
||||
s.NotNil(err)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
@ -123,11 +117,10 @@ func (s *schedulerTestSuite) TestUnSchedule() {
|
||||
s.SetupTest()
|
||||
|
||||
// pass
|
||||
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||
ID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
ExecutionID: 1,
|
||||
CallbackFuncName: "callback",
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil)
|
||||
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
@ -136,26 +129,96 @@ func (s *schedulerTestSuite) TestUnSchedule() {
|
||||
}, nil)
|
||||
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||
err = s.scheduler.UnSchedule(nil, 1)
|
||||
err = s.scheduler.UnScheduleByID(nil, 1)
|
||||
s.Nil(err)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
}
|
||||
|
||||
func (s *schedulerTestSuite) TestUnScheduleByVendor() {
|
||||
s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil)
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
},
|
||||
}, nil)
|
||||
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
|
||||
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
ID: 1,
|
||||
Status: job.StoppedStatus.String(),
|
||||
}, nil)
|
||||
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
|
||||
err := s.scheduler.UnScheduleByVendor(nil, "vendor", 1)
|
||||
s.Nil(err)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
}
|
||||
|
||||
func (s *schedulerTestSuite) TestGetSchedule() {
|
||||
// no execution for the schedule
|
||||
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||
ID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
ExecutionID: 1,
|
||||
ID: 1,
|
||||
VendorType: "vendor",
|
||||
VendorID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
}, nil)
|
||||
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
|
||||
ID: 1,
|
||||
Status: job.SuccessStatus.String(),
|
||||
}, nil)
|
||||
schedule, err := s.scheduler.GetSchedule(nil, 1)
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil)
|
||||
schd, err := s.scheduler.GetSchedule(nil, 1)
|
||||
s.Require().Nil(err)
|
||||
s.Equal("0 * * * * *", schedule.CRON)
|
||||
s.Equal(job.SuccessStatus.String(), schedule.Status)
|
||||
s.Equal("0 * * * * *", schd.CRON)
|
||||
s.Equal(job.ErrorStatus.String(), schd.Status)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
|
||||
// reset mocks
|
||||
s.SetupTest()
|
||||
|
||||
// pass
|
||||
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
|
||||
ID: 1,
|
||||
VendorType: "vendor",
|
||||
VendorID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
}, nil)
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
Status: job.SuccessStatus.String(),
|
||||
},
|
||||
}, nil)
|
||||
schd, err = s.scheduler.GetSchedule(nil, 1)
|
||||
s.Require().Nil(err)
|
||||
s.Equal("0 * * * * *", schd.CRON)
|
||||
s.Equal(job.SuccessStatus.String(), schd.Status)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
}
|
||||
|
||||
func (s *schedulerTestSuite) TestListSchedules() {
|
||||
s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{
|
||||
{
|
||||
ID: 1,
|
||||
VendorType: "vendor",
|
||||
VendorID: 1,
|
||||
CRON: "0 * * * * *",
|
||||
},
|
||||
}, nil)
|
||||
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
Status: job.SuccessStatus.String(),
|
||||
},
|
||||
}, nil)
|
||||
schds, err := s.scheduler.ListSchedules(nil, nil)
|
||||
s.Require().Nil(err)
|
||||
s.Require().Len(schds, 1)
|
||||
s.Equal("0 * * * * *", schds[0].CRON)
|
||||
s.Equal(job.SuccessStatus.String(), schds[0].Status)
|
||||
s.dao.AssertExpectations(s.T())
|
||||
s.execMgr.AssertExpectations(s.T())
|
||||
}
|
||||
|
@ -5,8 +5,10 @@ package scheduler
|
||||
import (
|
||||
context "context"
|
||||
|
||||
scheduler "github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
q "github.com/goharbor/harbor/src/lib/q"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
scheduler "github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
)
|
||||
|
||||
// Scheduler is an autogenerated mock type for the Scheduler type
|
||||
@ -37,20 +39,22 @@ func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Sche
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Schedule provides a mock function with given fields: ctx, cron, callbackFuncName, params
|
||||
func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||
ret := _m.Called(ctx, cron, callbackFuncName, params)
|
||||
// ListSchedules provides a mock function with given fields: ctx, query
|
||||
func (_m *Scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) {
|
||||
ret := _m.Called(ctx, query)
|
||||
|
||||
var r0 int64
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}) int64); ok {
|
||||
r0 = rf(ctx, cron, callbackFuncName, params)
|
||||
var r0 []*scheduler.Schedule
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*scheduler.Schedule); ok {
|
||||
r0 = rf(ctx, query)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*scheduler.Schedule)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}) error); ok {
|
||||
r1 = rf(ctx, cron, callbackFuncName, params)
|
||||
if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok {
|
||||
r1 = rf(ctx, query)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
@ -58,8 +62,29 @@ func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// UnSchedule provides a mock function with given fields: ctx, id
|
||||
func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||
// Schedule provides a mock function with given fields: ctx, vendorType, vendorID, cron, callbackFuncName, params
|
||||
func (_m *Scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) {
|
||||
ret := _m.Called(ctx, vendorType, vendorID, cron, callbackFuncName, params)
|
||||
|
||||
var r0 int64
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64, string, string, interface{}) int64); ok {
|
||||
r0 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params)
|
||||
} else {
|
||||
r0 = ret.Get(0).(int64)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, int64, string, string, interface{}) error); ok {
|
||||
r1 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// UnScheduleByID provides a mock function with given fields: ctx, id
|
||||
func (_m *Scheduler) UnScheduleByID(ctx context.Context, id int64) error {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 error
|
||||
@ -71,3 +96,17 @@ func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error {
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// UnScheduleByVendor provides a mock function with given fields: ctx, vendorType, vendorID
|
||||
func (_m *Scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
|
||||
ret := _m.Called(ctx, vendorType, vendorID)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok {
|
||||
r0 = rf(ctx, vendorType, vendorID)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user