Do some refine for the scheduler

1. Accept vendorType and vendorID when creating the schedule
2. Provide more methods in the scheduler interface to reduce the duplicated works of callers
3. Use a new ormer and transaction when creating the schedule

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2020-07-28 13:31:51 +08:00
parent 0ae8133a60
commit d6288a43e8
14 changed files with 386 additions and 186 deletions

View File

@ -68,15 +68,16 @@ CREATE TABLE IF NOT EXISTS p2p_preheat_policy (
UNIQUE (name, project_id)
);
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_type varchar(16);
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS vendor_id int;
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS cron varchar(64);
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS execution_id int;
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_name varchar(128);
ALTER TABLE schedule ADD COLUMN IF NOT EXISTS callback_func_param text;
/*abstract the cron, callback function parameters from table retention_policy*/
UPDATE schedule
SET cron = retention.cron, callback_func_name = 'RetentionCallback',
callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}')
SET vendor_type= 'RETENTION', vendor_id=retention.id, cron = retention.cron,
callback_func_name = 'RETENTION', callback_func_param=concat('{"PolicyID":', retention.id, ',"Trigger":"Schedule"}')
FROM (
SELECT id, data::json->'trigger'->'references'->>'job_id' AS schedule_id,
data::json->'trigger'->'settings'->>'cron' AS cron
@ -93,7 +94,7 @@ DECLARE
BEGIN
FOR sched IN SELECT * FROM schedule
LOOP
INSERT INTO execution (vendor_type, trigger) VALUES ('SCHEDULER', 'MANUAL') RETURNING id INTO exec_id;
INSERT INTO execution (vendor_type, vendor_id, trigger) VALUES ('SCHEDULER', sched.id, 'MANUAL') RETURNING id INTO exec_id;
IF sched.status = 'Pending' THEN
status_code = 0;
ELSIF sched.status = 'Scheduled' THEN
@ -106,11 +107,8 @@ BEGIN
status_code = 0;
END IF;
INSERT INTO task (execution_id, job_id, status, status_code, status_revision, run_count) VALUES (exec_id, sched.job_id, sched.status, status_code, 0, 0);
UPDATE schedule SET execution_id=exec_id WHERE id = sched.id;
END LOOP;
END $$;
ALTER TABLE schedule DROP COLUMN IF EXISTS job_id;
ALTER TABLE schedule DROP COLUMN IF EXISTS status;
ALTER TABLE schedule ADD CONSTRAINT schedule_execution FOREIGN KEY (execution_id) REFERENCES execution(id);
ALTER TABLE schedule DROP COLUMN IF EXISTS status;

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/instance"
@ -238,8 +239,8 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
len(schema.Trigger.Settings.Cron) > 0 {
// schedule and update policy
schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id})
if err != nil {
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, schema.Trigger.Settings.Cron,
SchedulerCallback, TriggerParam{PolicyID: id}); err != nil {
return 0, err
}
@ -248,7 +249,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
}
if err != nil {
if e := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID); e != nil {
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil {
return 0, errors.Wrap(e, err.Error())
}
@ -293,12 +294,12 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
}
var cron = schema.Trigger.Settings.Cron
var oldJobID = s0.Trigger.Settings.JobID
var oldCron = s0.Trigger.Settings.Cron
var needUn bool
var needSch bool
if s0.Trigger.Type != schema.Trigger.Type {
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 {
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && len(oldCron) > 0 {
needUn = true
}
if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 {
@ -309,7 +310,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
if schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
s0.Trigger.Settings.Cron != cron {
// unschedule old
if oldJobID > 0 {
if len(oldCron) > 0 {
needUn = true
}
// schedule new
@ -323,7 +324,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
// unschedule old
if needUn {
err = c.scheduler.UnSchedule(ctx, oldJobID)
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID)
if err != nil {
return err
}
@ -331,11 +332,10 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
// schedule new
if needSch {
jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID})
if err != nil {
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, cron, SchedulerCallback,
TriggerParam{PolicyID: schema.ID}); err != nil {
return err
}
schema.Trigger.Settings.JobID = jobid
if err := schema.Encode(); err != nil {
// Possible
// TODO: Refactor
@ -355,8 +355,8 @@ func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
if err != nil {
return err
}
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 {
err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID)
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 {
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id)
if err != nil {
return err
}

View File

@ -204,10 +204,10 @@ func (s *preheatSuite) TestCreatePolicy() {
FiltersStr: `[{"type":"repository","value":"harbor*"},{"type":"tag","value":"2*"}]`,
TriggerStr: fmt.Sprintf(`{"type":"%s", "trigger_setting":{"cron":"* * * * */1"}}`, policy.TriggerTypeScheduled),
}
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil)
s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil)
s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil)
s.fakeScheduler.On("UnScheduleByVendor", s.ctx, mock.Anything, mock.Anything).Return(nil)
id, err := s.controller.CreatePolicy(s.ctx, policy)
s.NoError(err)
s.Equal(int64(1), id)
@ -231,7 +231,6 @@ func (s *preheatSuite) TestGetPolicyByName() {
func (s *preheatSuite) TestUpdatePolicy() {
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
p0.Trigger.Settings.JobID = 1
p0.Trigger.Settings.Cron = "* * * * */1"
p0.Filters = []*policy.Filter{
{
@ -272,7 +271,6 @@ func (s *preheatSuite) TestUpdatePolicy() {
func (s *preheatSuite) TestDeletePolicy() {
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
p0.Trigger.Settings.JobID = 1
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil)
err := s.controller.DeletePolicy(s.ctx, 1)

View File

@ -99,8 +99,7 @@ type Trigger struct {
Type TriggerType `json:"type"`
Settings struct {
// The cron string for scheduled trigger.
Cron string `json:"cron,omitempty"`
JobID int64 `json:"job_id,omitempty"`
Cron string `json:"cron,omitempty"`
} `json:"trigger_setting,omitempty"`
}

View File

@ -66,7 +66,8 @@ type DefaultAPIController struct {
const (
// SchedulerCallback ...
SchedulerCallback = "RetentionCallback"
SchedulerCallback = "RETENTION"
schedulerVendorType = "RETENTION"
)
// TriggerParam ...
@ -82,26 +83,23 @@ func (r *DefaultAPIController) GetRetention(id int64) (*policy.Metadata, error)
// CreateRetention Create Retention
func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) (int64, error) {
if p.Trigger.Kind == policy.TriggerKindSchedule {
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
if ok && len(cron.(string)) > 0 {
jobid, err := r.scheduler.Schedule(orm.Context(), cron.(string), SchedulerCallback, TriggerParam{
PolicyID: p.ID,
Trigger: ExecutionTriggerSchedule,
})
if err != nil {
return 0, err
}
if p.Trigger.References == nil {
p.Trigger.References = map[string]interface{}{}
}
p.Trigger.References[policy.TriggerReferencesJobid] = jobid
}
}
id, err := r.manager.CreatePolicy(p)
if err != nil {
return 0, err
}
if p.Trigger.Kind == policy.TriggerKindSchedule {
cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron]
if ok && len(cron.(string)) > 0 {
if _, err = r.scheduler.Schedule(orm.Context(), schedulerVendorType, id, cron.(string), SchedulerCallback, TriggerParam{
PolicyID: id,
Trigger: ExecutionTriggerSchedule,
}); err != nil {
return 0, err
}
}
}
return id, nil
}
@ -142,24 +140,26 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error {
return fmt.Errorf("not support Trigger %s", p.Trigger.Kind)
}
}
if err = r.manager.UpdatePolicy(p); err != nil {
return err
}
if needUn {
err = r.scheduler.UnSchedule(orm.Context(), p0.Trigger.References[policy.TriggerReferencesJobid].(int64))
err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, p.ID)
if err != nil {
return err
}
}
if needSch {
jobid, err := r.scheduler.Schedule(orm.Context(), p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
_, err := r.scheduler.Schedule(orm.Context(), schedulerVendorType, p.ID, p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{
PolicyID: p.ID,
Trigger: ExecutionTriggerSchedule,
})
if err != nil {
return err
}
p.Trigger.References[policy.TriggerReferencesJobid] = jobid
}
return r.manager.UpdatePolicy(p)
return nil
}
// DeleteRetention Delete Retention
@ -169,7 +169,7 @@ func (r *DefaultAPIController) DeleteRetention(id int64) error {
return err
}
if p.Trigger.Kind == policy.TriggerKindSchedule && len(p.Trigger.Settings[policy.TriggerSettingsCron].(string)) > 0 {
err = r.scheduler.UnSchedule(orm.Context(), p.Trigger.References[policy.TriggerReferencesJobid].(int64))
err = r.scheduler.UnScheduleByVendor(orm.Context(), schedulerVendorType, id)
if err != nil {
return err
}

View File

@ -19,6 +19,7 @@ import (
"strings"
"testing"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
@ -219,17 +220,23 @@ func (s *ControllerTestSuite) TestExecution() {
type fakeRetentionScheduler struct {
}
func (f *fakeRetentionScheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
func (f *fakeRetentionScheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) {
return 111, nil
}
func (f *fakeRetentionScheduler) UnSchedule(ctx context.Context, id int64) error {
func (f *fakeRetentionScheduler) UnScheduleByID(ctx context.Context, id int64) error {
return nil
}
func (f *fakeRetentionScheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
return nil
}
func (f *fakeRetentionScheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) {
return nil, nil
}
func (f *fakeRetentionScheduler) ListSchedules(ctx context.Context, q *q.Query) ([]*scheduler.Schedule, error) {
return nil, nil
}
type fakeLauncher struct {
}

View File

@ -120,11 +120,6 @@ func (d *DefaultManager) GetPolicy(id int64) (*policy.Metadata, error) {
return nil, err
}
p.ID = id
if p.Trigger.Settings != nil {
if _, ok := p.Trigger.References[policy.TriggerReferencesJobid]; ok {
p.Trigger.References[policy.TriggerReferencesJobid] = int64(p.Trigger.References[policy.TriggerReferencesJobid].(float64))
}
}
return p, nil
}

View File

@ -28,8 +28,6 @@ const (
// TriggerKindSchedule Schedule
TriggerKindSchedule = "Schedule"
// TriggerReferencesJobid job_id
TriggerReferencesJobid = "job_id"
// TriggerSettingsCron cron
TriggerSettingsCron = "cron"
@ -97,10 +95,6 @@ type Trigger struct {
// Settings for the specified trigger
// '[cron]="* 22 11 * * *"' for the 'Schedule'
Settings map[string]interface{} `json:"settings" valid:"Required"`
// References of the trigger
// e.g: schedule job ID
References map[string]interface{} `json:"references"`
}
// Scope definition

View File

@ -21,7 +21,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task"
)
@ -70,20 +69,21 @@ func callbackFuncExist(name string) bool {
}
func triggerCallback(ctx context.Context, task *task.Task, change *job.StatusChange) (err error) {
schedules, err := Sched.(*scheduler).dao.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": task.ExecutionID,
},
})
execution, err := Sched.(*scheduler).execMgr.Get(ctx, task.ExecutionID)
if err != nil {
return err
}
if len(schedules) == 0 {
return fmt.Errorf("the schedule whose execution ID is %d not found", task.ExecutionID)
if execution.VendorType != JobNameScheduler {
return fmt.Errorf("the vendor type of execution %d isn't %s: %s",
task.ExecutionID, JobNameScheduler, execution.VendorType)
}
callbackFunc, err := getCallbackFunc(schedules[0].CallbackFuncName)
schedule, err := Sched.(*scheduler).dao.Get(ctx, execution.VendorID)
if err != nil {
return err
}
return callbackFunc(schedules[0].CallbackFuncParam)
callbackFunc, err := getCallbackFunc(schedule.CallbackFuncName)
if err != nil {
return err
}
return callbackFunc(schedule.CallbackFuncParam)
}

View File

@ -30,8 +30,9 @@ func init() {
type schedule struct {
ID int64 `orm:"pk;auto;column(id)"`
VendorType string `orm:"column(vendor_type)"`
VendorID int64 `orm:"column(vendor_id)"`
CRON string `orm:"column(cron)"`
ExecutionID int64 `orm:"column(execution_id)"`
CallbackFuncName string `orm:"column(callback_func_name)"`
CallbackFuncParam string `orm:"column(callback_func_param)"`
CreationTime time.Time `orm:"column(creation_time)"`
@ -56,10 +57,6 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) {
}
id, err := ormer.Insert(schedule)
if err != nil {
if e := orm.AsForeignKeyError(err,
"the schedule tries to reference a non existing execution %d", schedule.ExecutionID); e != nil {
err = e
}
return 0, err
}
return id, nil

View File

@ -22,33 +22,27 @@ import (
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/stretchr/testify/suite"
)
type daoTestSuite struct {
suite.Suite
dao DAO
execMgr task.ExecutionManager
ctx context.Context
id int64
execID int64
dao DAO
ctx context.Context
id int64
}
func (d *daoTestSuite) SetupSuite() {
d.dao = &dao{}
d.execMgr = task.NewExecutionManager()
common_dao.PrepareTestForPostgresSQL()
d.ctx = orm.Context()
}
func (d *daoTestSuite) SetupTest() {
execID, err := d.execMgr.Create(d.ctx, "vendor", 0, "trigger")
d.Require().Nil(err)
d.execID = execID
schedule := &schedule{
VendorType: "Vendor",
VendorID: 1,
CRON: "0 * * * * *",
ExecutionID: execID,
CallbackFuncName: "callback_func_01",
CallbackFuncParam: "callback_func_params",
}
@ -59,19 +53,10 @@ func (d *daoTestSuite) SetupTest() {
func (d *daoTestSuite) TearDownTest() {
d.Require().Nil(d.dao.Delete(d.ctx, d.id))
d.Require().Nil(d.execMgr.Delete(d.ctx, d.execID))
}
func (d *daoTestSuite) TestCreate() {
// the happy pass is covered in SetupTest
// foreign key error
_, err := d.dao.Create(d.ctx, &schedule{
CRON: "0 * * * * *",
ExecutionID: 10000,
CallbackFuncName: "callback_func",
})
d.True(errors.IsErr(err, errors.ViolateForeignKeyConstraintCode))
}
func (d *daoTestSuite) TestList() {

View File

@ -20,9 +20,11 @@ import (
"fmt"
"time"
beegoorm "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task"
cronlib "github.com/robfig/cron"
@ -36,6 +38,8 @@ var (
// Schedule describes the detail information about the created schedule
type Schedule struct {
ID int64 `json:"id"`
VendorType string `json:"vendor_type"`
VendorID int64 `json:"vendor_id"`
CRON string `json:"cron"`
Status string `json:"status"` // status of the underlying task(jobservice job)
CreationTime time.Time `json:"creation_time"`
@ -49,13 +53,20 @@ type Schedule struct {
type Scheduler interface {
// Schedule creates a task which calls the specified callback function periodically
// The callback function needs to be registered first
// The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.),
// and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention).
// The "params" is passed to the callback function as encoded json string, so the callback
// function must decode it before using
Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error)
// UnSchedule the created schedule instance
UnSchedule(ctx context.Context, id int64) error
Schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
callbackFuncName string, params interface{}) (int64, error)
// UnScheduleByID the schedule specified by ID
UnScheduleByID(ctx context.Context, id int64) error
// UnScheduleByVendor the schedule specified by vendor
UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error
// GetSchedule gets the schedule specified by ID
GetSchedule(ctx context.Context, id int64) (*Schedule, error)
// ListSchedules according to the query
ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error)
}
// New returns an instance of the default scheduler
@ -73,7 +84,38 @@ type scheduler struct {
taskMgr task.Manager
}
func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
// Currently all database operations inside one request handling are covered by
// one transaction, which means if any one of the operations fails, all of them
// will be roll back. As the scheduler creates jobservice jobs that cannot be
// roll back by the transaction, this will cause some unexpected data inconsistence
// in some cases.
// The implementation of "Schedule" replaces the ormer with a new one in the context
// to out of control from the global transaction, and uses a new transaction that only
// covers the logic inside the function
func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
callbackFuncName string, params interface{}) (int64, error) {
var scheduleID int64
f := func(ctx context.Context) error {
id, err := s.schedule(ctx, vendorType, vendorID, cron, callbackFuncName, params)
if err != nil {
return err
}
scheduleID = id
return nil
}
ctx = orm.NewContext(ctx, beegoorm.NewOrm())
if err := orm.WithTransaction(f)(ctx); err != nil {
return 0, err
}
return scheduleID, nil
}
func (s *scheduler) schedule(ctx context.Context, vendorType string, vendorID int64, cron string,
callbackFuncName string, params interface{}) (int64, error) {
if len(vendorType) == 0 {
return 0, fmt.Errorf("empty vendor type")
}
if _, err := cronlib.Parse(cron); err != nil {
return 0, errors.New(nil).WithCode(errors.BadRequestCode).
WithMessage("invalid cron %s: %v", cron, err)
@ -82,15 +124,11 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
return 0, fmt.Errorf("callback function %s not found", callbackFuncName)
}
execID, err := s.execMgr.Create(ctx, JobNameScheduler, 0, task.ExecutionTriggerManual)
if err != nil {
return 0, err
}
now := time.Now()
sched := &schedule{
VendorType: vendorType,
VendorID: vendorID,
CRON: cron,
ExecutionID: execID,
CallbackFuncName: callbackFuncName,
CreationTime: now,
UpdateTime: now,
@ -102,15 +140,19 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
}
sched.CallbackFuncParam = string(paramsData)
}
// create schedule record
// when status/checkin hook comes, the database record must exist,
// when checkin hook comes, the database record must exist,
// so the database record must be created first before submitting job
id, err := s.dao.Create(ctx, sched)
if err != nil {
return 0, err
}
execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual)
if err != nil {
return 0, err
}
taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
Name: JobNameScheduler,
Metadata: &job.Metadata{
@ -121,6 +163,15 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
if err != nil {
return 0, err
}
// make sure the created task is stopped if got any error in the following steps
defer func() {
if err == nil {
return
}
if err := s.taskMgr.Stop(ctx, taskID); err != nil {
log.Errorf("failed to stop the task %d: %v", taskID, err)
}
}()
// when task manager creating a task, it creates the task database record first and
// then submits the job to jobservice. If the submitting failed, it doesn't return
// any error. So we check the task status to make sure the job is submitted to jobservice
@ -130,44 +181,76 @@ func (s *scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
return 0, err
}
if task.Status == job.ErrorStatus.String() {
return 0, fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
// assign the error to "err" to trigger the defer function to clean up the created task
err = fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
return 0, err
}
return id, nil
}
func (s *scheduler) UnSchedule(ctx context.Context, id int64) error {
schedule, err := s.dao.Get(ctx, id)
func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {
executions, err := s.execMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": JobNameScheduler,
"VendorID": id,
},
})
if err != nil {
if errors.IsNotFoundErr(err) {
log.Warningf("trying to unschedule a non existing schedule %d, skip directly", id)
return nil
}
return err
}
if err = s.execMgr.Stop(ctx, schedule.ExecutionID); err != nil {
return err
}
// after the stop called, the execution cannot be stopped immediately,
// use the for loop to make sure the execution be in final status before deleting it
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
if err != nil {
if len(executions) > 0 {
executionID := executions[0].ID
if err = s.execMgr.Stop(ctx, executionID); err != nil {
return err
}
if job.Status(exec.Status).Final() {
// delete schedule record
if err = s.dao.Delete(ctx, id); err != nil {
final := false
// after the stop called, the execution cannot be stopped immediately, and the execution
// cannot be deleted if it's status isn't in final status, so use the for loop to make
// sure the execution be in final status before deleting it
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
exec, err := s.execMgr.Get(ctx, executionID)
if err != nil {
return err
}
// delete execution
return s.execMgr.Delete(ctx, schedule.ExecutionID)
if job.Status(exec.Status).Final() {
final = true
break
}
time.Sleep(t)
}
if !final {
return fmt.Errorf("failed to unschedule the schedule %d: the execution %d isn't in final status", id, executionID)
}
// delete execution
if err = s.execMgr.Delete(ctx, executionID); err != nil {
return err
}
time.Sleep(t)
}
return fmt.Errorf("failed to unschedule the schedule %d: the execution isn't in final status", id)
// delete schedule record
return s.dao.Delete(ctx, id)
}
func (s *scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
q := &q.Query{
Keywords: map[string]interface{}{
"VendorType": vendorType,
},
}
if vendorID > 0 {
q.Keywords["VendorID"] = vendorID
}
schedules, err := s.dao.List(ctx, q)
if err != nil {
return err
}
for _, schedule := range schedules {
if err = s.UnScheduleByID(ctx, schedule.ID); err != nil {
return err
}
}
return nil
}
func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) {
@ -175,17 +258,49 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error
if err != nil {
return nil, err
}
return s.convertSchedule(ctx, schedule)
}
func (s *scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) {
schedules, err := s.dao.List(ctx, query)
if err != nil {
return nil, err
}
var scheds []*Schedule
for _, schedule := range schedules {
sched, err := s.convertSchedule(ctx, schedule)
if err != nil {
return nil, err
}
scheds = append(scheds, sched)
}
return scheds, nil
}
func (s *scheduler) convertSchedule(ctx context.Context, schedule *schedule) (*Schedule, error) {
schd := &Schedule{
ID: schedule.ID,
VendorType: schedule.VendorType,
VendorID: schedule.VendorID,
CRON: schedule.CRON,
CreationTime: schedule.CreationTime,
UpdateTime: schedule.UpdateTime,
}
exec, err := s.execMgr.Get(ctx, schedule.ExecutionID)
executions, err := s.execMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": JobNameScheduler,
"VendorID": schedule.ID,
},
})
if err != nil {
return nil, err
}
schd.Status = exec.Status
if len(executions) == 0 {
// if no execution found for the schedule, mark it's status as error
schd.Status = job.ErrorStatus.String()
} else {
schd.Status = executions[0].Status
}
return schd, nil
}
@ -195,13 +310,23 @@ func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error
// We can remove the method and the hook endpoint after several releases
func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChange) error {
scheduler := Sched.(*scheduler)
schedule, err := scheduler.dao.Get(ctx, scheduleID)
executions, err := scheduler.execMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": JobNameScheduler,
"VendorID": scheduleID,
},
})
if err != nil {
return err
}
if len(executions) == 0 {
return errors.New(nil).WithCode(errors.NotFoundCode).
WithMessage("no execution found for the schedule %d", scheduleID)
}
tasks, err := scheduler.taskMgr.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": schedule.ExecutionID,
"ExecutionID": executions[0].ID,
},
})
if err != nil {
@ -209,7 +334,7 @@ func HandleLegacyHook(ctx context.Context, scheduleID int64, sc *job.StatusChang
}
if len(tasks) == 0 {
return errors.New(nil).WithCode(errors.NotFoundCode).
WithMessage("no task references the execution %d", schedule.ExecutionID)
WithMessage("no task found for the execution %d", executions[0].ID)
}
return task.NewHookHandler().Handle(ctx, tasks[0].ID, sc)
}

View File

@ -16,7 +16,6 @@ package scheduler
import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/goharbor/harbor/src/testing/mock"
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
@ -49,24 +48,29 @@ func (s *schedulerTestSuite) SetupTest() {
}
func (s *schedulerTestSuite) TestSchedule() {
// empty vendor type
id, err := s.scheduler.Schedule(nil, "", 0, "0 * * * * *", "callback", nil)
s.NotNil(err)
// invalid cron
id, err := s.scheduler.Schedule(nil, "", "callback", nil)
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "callback", nil)
s.NotNil(err)
// callback function not exist
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "not-exist", nil)
id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "not-exist", nil)
s.NotNil(err)
// failed to submit to jobservice
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.dao.On("Create", mock.Anything, mock.Anything).Return(int64(1), nil)
s.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.taskMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Task{
ID: 1,
ExecutionID: 1,
Status: job.ErrorStatus.String(),
}, nil)
_, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
_, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param")
s.Require().NotNil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
@ -84,7 +88,7 @@ func (s *schedulerTestSuite) TestSchedule() {
ExecutionID: 1,
Status: job.SuccessStatus.String(),
}, nil)
id, err = s.scheduler.Schedule(nil, "0 * * * * *", "callback", "param")
id, err = s.scheduler.Schedule(nil, "vendor", 1, "0 * * * * *", "callback", "param")
s.Require().Nil(err)
s.Equal(int64(1), id)
s.dao.AssertExpectations(s.T())
@ -92,29 +96,19 @@ func (s *schedulerTestSuite) TestSchedule() {
s.taskMgr.AssertExpectations(s.T())
}
func (s *schedulerTestSuite) TestUnSchedule() {
// not existing schedule
s.dao.On("Get", mock.Anything, mock.Anything).Return(nil, errors.NotFoundError(nil))
err := s.scheduler.UnSchedule(nil, 10000)
s.Nil(err)
s.dao.AssertExpectations(s.T())
// reset mocks
s.SetupTest()
func (s *schedulerTestSuite) TestUnScheduleByID() {
// the underlying task isn't stopped
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
ID: 1,
CRON: "0 * * * * *",
ExecutionID: 1,
CallbackFuncName: "callback",
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
err = s.scheduler.UnSchedule(nil, 1)
err := s.scheduler.UnScheduleByID(nil, 1)
s.NotNil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
@ -123,11 +117,10 @@ func (s *schedulerTestSuite) TestUnSchedule() {
s.SetupTest()
// pass
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
ID: 1,
CRON: "0 * * * * *",
ExecutionID: 1,
CallbackFuncName: "callback",
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
@ -136,26 +129,96 @@ func (s *schedulerTestSuite) TestUnSchedule() {
}, nil)
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
err = s.scheduler.UnSchedule(nil, 1)
err = s.scheduler.UnScheduleByID(nil, 1)
s.Nil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
}
func (s *schedulerTestSuite) TestUnScheduleByVendor() {
s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{
{
ID: 1,
},
}, nil)
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.StoppedStatus.String(),
}, nil)
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
err := s.scheduler.UnScheduleByVendor(nil, "vendor", 1)
s.Nil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
}
func (s *schedulerTestSuite) TestGetSchedule() {
// no execution for the schedule
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
ID: 1,
CRON: "0 * * * * *",
ExecutionID: 1,
ID: 1,
VendorType: "vendor",
VendorID: 1,
CRON: "0 * * * * *",
}, nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.SuccessStatus.String(),
}, nil)
schedule, err := s.scheduler.GetSchedule(nil, 1)
s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil)
schd, err := s.scheduler.GetSchedule(nil, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schedule.CRON)
s.Equal(job.SuccessStatus.String(), schedule.Status)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.ErrorStatus.String(), schd.Status)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
// reset mocks
s.SetupTest()
// pass
s.dao.On("Get", mock.Anything, mock.Anything).Return(&schedule{
ID: 1,
VendorType: "vendor",
VendorID: 1,
CRON: "0 * * * * *",
}, nil)
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
Status: job.SuccessStatus.String(),
},
}, nil)
schd, err = s.scheduler.GetSchedule(nil, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.SuccessStatus.String(), schd.Status)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
}
func (s *schedulerTestSuite) TestListSchedules() {
s.dao.On("List", mock.Anything, mock.Anything).Return([]*schedule{
{
ID: 1,
VendorType: "vendor",
VendorID: 1,
CRON: "0 * * * * *",
},
}, nil)
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
Status: job.SuccessStatus.String(),
},
}, nil)
schds, err := s.scheduler.ListSchedules(nil, nil)
s.Require().Nil(err)
s.Require().Len(schds, 1)
s.Equal("0 * * * * *", schds[0].CRON)
s.Equal(job.SuccessStatus.String(), schds[0].Status)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
}

View File

@ -5,8 +5,10 @@ package scheduler
import (
context "context"
scheduler "github.com/goharbor/harbor/src/pkg/scheduler"
q "github.com/goharbor/harbor/src/lib/q"
mock "github.com/stretchr/testify/mock"
scheduler "github.com/goharbor/harbor/src/pkg/scheduler"
)
// Scheduler is an autogenerated mock type for the Scheduler type
@ -37,20 +39,22 @@ func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Sche
return r0, r1
}
// Schedule provides a mock function with given fields: ctx, cron, callbackFuncName, params
func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName string, params interface{}) (int64, error) {
ret := _m.Called(ctx, cron, callbackFuncName, params)
// ListSchedules provides a mock function with given fields: ctx, query
func (_m *Scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) {
ret := _m.Called(ctx, query)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}) int64); ok {
r0 = rf(ctx, cron, callbackFuncName, params)
var r0 []*scheduler.Schedule
if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*scheduler.Schedule); ok {
r0 = rf(ctx, query)
} else {
r0 = ret.Get(0).(int64)
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*scheduler.Schedule)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, string, interface{}) error); ok {
r1 = rf(ctx, cron, callbackFuncName, params)
if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok {
r1 = rf(ctx, query)
} else {
r1 = ret.Error(1)
}
@ -58,8 +62,29 @@ func (_m *Scheduler) Schedule(ctx context.Context, cron string, callbackFuncName
return r0, r1
}
// UnSchedule provides a mock function with given fields: ctx, id
func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error {
// Schedule provides a mock function with given fields: ctx, vendorType, vendorID, cron, callbackFuncName, params
func (_m *Scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cron string, callbackFuncName string, params interface{}) (int64, error) {
ret := _m.Called(ctx, vendorType, vendorID, cron, callbackFuncName, params)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, string, int64, string, string, interface{}) int64); ok {
r0 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, int64, string, string, interface{}) error); ok {
r1 = rf(ctx, vendorType, vendorID, cron, callbackFuncName, params)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UnScheduleByID provides a mock function with given fields: ctx, id
func (_m *Scheduler) UnScheduleByID(ctx context.Context, id int64) error {
ret := _m.Called(ctx, id)
var r0 error
@ -71,3 +96,17 @@ func (_m *Scheduler) UnSchedule(ctx context.Context, id int64) error {
return r0
}
// UnScheduleByVendor provides a mock function with given fields: ctx, vendorType, vendorID
func (_m *Scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
ret := _m.Called(ctx, vendorType, vendorID)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, int64) error); ok {
r0 = rf(ctx, vendorType, vendorID)
} else {
r0 = ret.Error(0)
}
return r0
}