2019-07-19 13:44:11 +02:00
|
|
|
// Copyright Project Harbor Authors
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package scheduler
|
|
|
|
|
|
|
|
import (
|
2020-07-09 04:53:32 +02:00
|
|
|
"context"
|
2019-08-16 07:54:57 +02:00
|
|
|
"encoding/json"
|
2019-07-19 13:44:11 +02:00
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
"github.com/goharbor/harbor/src/jobservice/job"
|
2020-04-15 16:41:45 +02:00
|
|
|
"github.com/goharbor/harbor/src/lib/errors"
|
2020-04-02 08:08:52 +02:00
|
|
|
"github.com/goharbor/harbor/src/lib/log"
|
2020-07-09 04:53:32 +02:00
|
|
|
"github.com/goharbor/harbor/src/lib/q"
|
|
|
|
"github.com/goharbor/harbor/src/pkg/task"
|
|
|
|
cronlib "github.com/robfig/cron"
|
2019-07-19 13:44:11 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
2020-07-09 04:53:32 +02:00
|
|
|
// Sched is an instance of the default scheduler that can be used globally
|
|
|
|
Sched = New()
|
2019-07-19 13:44:11 +02:00
|
|
|
)
|
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
// Schedule describes the detail information about the created schedule
|
|
|
|
type Schedule struct {
|
2020-12-11 10:45:41 +01:00
|
|
|
ID int64 `json:"id"`
|
|
|
|
VendorType string `json:"vendor_type"`
|
|
|
|
VendorID int64 `json:"vendor_id"`
|
|
|
|
CRONType string `json:"cron_type"`
|
|
|
|
CRON string `json:"cron"`
|
|
|
|
ExtraAttrs map[string]interface{} `json:"extra_attrs"`
|
|
|
|
Status string `json:"status"` // status of the underlying task(jobservice job)
|
|
|
|
CreationTime time.Time `json:"creation_time"`
|
|
|
|
UpdateTime time.Time `json:"update_time"`
|
2020-07-09 04:53:32 +02:00
|
|
|
// we can extend this model to include more information(e.g. how many times the schedule already
|
|
|
|
// runs; when will the schedule runs next time)
|
|
|
|
}
|
2019-07-19 13:44:11 +02:00
|
|
|
|
|
|
|
// Scheduler provides the capability to run a periodic task, a callback function
|
|
|
|
// needs to be registered before using the scheduler
|
|
|
|
type Scheduler interface {
|
2020-07-09 04:53:32 +02:00
|
|
|
// Schedule creates a task which calls the specified callback function periodically
|
|
|
|
// The callback function needs to be registered first
|
2020-07-28 07:31:51 +02:00
|
|
|
// The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.),
|
|
|
|
// and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention).
|
2020-07-09 04:53:32 +02:00
|
|
|
// The "params" is passed to the callback function as encoded json string, so the callback
|
|
|
|
// function must decode it before using
|
2020-09-24 10:48:56 +02:00
|
|
|
Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string,
|
2020-12-11 10:45:41 +01:00
|
|
|
cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error)
|
2020-07-28 07:31:51 +02:00
|
|
|
// UnScheduleByID the schedule specified by ID
|
|
|
|
UnScheduleByID(ctx context.Context, id int64) error
|
|
|
|
// UnScheduleByVendor the schedule specified by vendor
|
|
|
|
UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error
|
2020-07-09 04:53:32 +02:00
|
|
|
// GetSchedule gets the schedule specified by ID
|
|
|
|
GetSchedule(ctx context.Context, id int64) (*Schedule, error)
|
2020-07-28 07:31:51 +02:00
|
|
|
// ListSchedules according to the query
|
|
|
|
ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error)
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// New returns an instance of the default scheduler
|
2020-07-09 04:53:32 +02:00
|
|
|
func New() Scheduler {
|
2019-07-19 13:44:11 +02:00
|
|
|
return &scheduler{
|
2020-07-09 04:53:32 +02:00
|
|
|
dao: &dao{},
|
|
|
|
execMgr: task.ExecMgr,
|
|
|
|
taskMgr: task.Mgr,
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type scheduler struct {
|
2020-07-09 04:53:32 +02:00
|
|
|
dao DAO
|
|
|
|
execMgr task.ExecutionManager
|
|
|
|
taskMgr task.Manager
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
|
|
|
|
2020-09-24 10:48:56 +02:00
|
|
|
func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string,
|
2020-12-11 10:45:41 +01:00
|
|
|
cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error) {
|
2020-07-28 07:31:51 +02:00
|
|
|
if len(vendorType) == 0 {
|
|
|
|
return 0, fmt.Errorf("empty vendor type")
|
|
|
|
}
|
2020-07-09 04:53:32 +02:00
|
|
|
if _, err := cronlib.Parse(cron); err != nil {
|
|
|
|
return 0, errors.New(nil).WithCode(errors.BadRequestCode).
|
|
|
|
WithMessage("invalid cron %s: %v", cron, err)
|
|
|
|
}
|
2019-07-19 13:44:11 +02:00
|
|
|
if !callbackFuncExist(callbackFuncName) {
|
|
|
|
return 0, fmt.Errorf("callback function %s not found", callbackFuncName)
|
|
|
|
}
|
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
now := time.Now()
|
|
|
|
sched := &schedule{
|
2020-07-28 07:31:51 +02:00
|
|
|
VendorType: vendorType,
|
|
|
|
VendorID: vendorID,
|
2020-09-24 10:48:56 +02:00
|
|
|
CRONType: cronType,
|
2020-07-09 04:53:32 +02:00
|
|
|
CRON: cron,
|
|
|
|
CallbackFuncName: callbackFuncName,
|
|
|
|
CreationTime: now,
|
|
|
|
UpdateTime: now,
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
2020-12-21 03:17:02 +01:00
|
|
|
|
|
|
|
paramsData, err := json.Marshal(params)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
2019-08-16 07:54:57 +02:00
|
|
|
}
|
2020-12-21 03:17:02 +01:00
|
|
|
sched.CallbackFuncParam = string(paramsData)
|
|
|
|
|
|
|
|
extrasData, err := json.Marshal(extras)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
2020-12-11 10:45:41 +01:00
|
|
|
}
|
2020-12-21 03:17:02 +01:00
|
|
|
sched.ExtraAttrs = string(extrasData)
|
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
// create schedule record
|
2020-07-28 07:31:51 +02:00
|
|
|
// when checkin hook comes, the database record must exist,
|
2020-07-09 04:53:32 +02:00
|
|
|
// so the database record must be created first before submitting job
|
|
|
|
id, err := s.dao.Create(ctx, sched)
|
2019-07-19 13:44:11 +02:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2020-07-09 04:53:32 +02:00
|
|
|
|
2020-07-28 07:31:51 +02:00
|
|
|
execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
|
|
|
|
Name: JobNameScheduler,
|
|
|
|
Metadata: &job.Metadata{
|
|
|
|
JobKind: job.KindPeriodic,
|
|
|
|
Cron: cron,
|
|
|
|
},
|
|
|
|
})
|
2019-07-19 13:44:11 +02:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2020-07-28 07:31:51 +02:00
|
|
|
// make sure the created task is stopped if got any error in the following steps
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err := s.taskMgr.Stop(ctx, taskID); err != nil {
|
|
|
|
log.Errorf("failed to stop the task %d: %v", taskID, err)
|
|
|
|
}
|
|
|
|
}()
|
2020-07-09 04:53:32 +02:00
|
|
|
// when task manager creating a task, it creates the task database record first and
|
|
|
|
// then submits the job to jobservice. If the submitting failed, it doesn't return
|
|
|
|
// any error. So we check the task status to make sure the job is submitted to jobservice
|
|
|
|
// successfully here
|
|
|
|
task, err := s.taskMgr.Get(ctx, taskID)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
if task.Status == job.ErrorStatus.String() {
|
2020-07-28 07:31:51 +02:00
|
|
|
// assign the error to "err" to trigger the defer function to clean up the created task
|
|
|
|
err = fmt.Errorf("failed to create the schedule: the task status is %s", job.ErrorStatus.String())
|
|
|
|
return 0, err
|
2020-07-09 04:53:32 +02:00
|
|
|
}
|
2019-07-19 13:44:11 +02:00
|
|
|
|
2020-07-09 04:53:32 +02:00
|
|
|
return id, nil
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
|
|
|
|
2020-07-28 07:31:51 +02:00
|
|
|
func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {
|
|
|
|
executions, err := s.execMgr.List(ctx, &q.Query{
|
|
|
|
Keywords: map[string]interface{}{
|
|
|
|
"VendorType": JobNameScheduler,
|
|
|
|
"VendorID": id,
|
|
|
|
},
|
|
|
|
})
|
2019-07-19 13:44:11 +02:00
|
|
|
if err != nil {
|
2020-07-09 04:53:32 +02:00
|
|
|
return err
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
2020-07-28 07:31:51 +02:00
|
|
|
if len(executions) > 0 {
|
|
|
|
executionID := executions[0].ID
|
2020-09-22 11:47:04 +02:00
|
|
|
// stop the execution
|
|
|
|
if err = s.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
|
2019-07-19 13:44:11 +02:00
|
|
|
return err
|
|
|
|
}
|
2020-07-28 07:31:51 +02:00
|
|
|
// delete execution
|
|
|
|
if err = s.execMgr.Delete(ctx, executionID); err != nil {
|
|
|
|
return err
|
2020-07-09 04:53:32 +02:00
|
|
|
}
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
2020-07-09 04:53:32 +02:00
|
|
|
|
2020-07-28 07:31:51 +02:00
|
|
|
// delete schedule record
|
|
|
|
return s.dao.Delete(ctx, id)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *scheduler) UnScheduleByVendor(ctx context.Context, vendorType string, vendorID int64) error {
|
|
|
|
q := &q.Query{
|
|
|
|
Keywords: map[string]interface{}{
|
|
|
|
"VendorType": vendorType,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
if vendorID > 0 {
|
|
|
|
q.Keywords["VendorID"] = vendorID
|
|
|
|
}
|
|
|
|
schedules, err := s.dao.List(ctx, q)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, schedule := range schedules {
|
|
|
|
if err = s.UnScheduleByID(ctx, schedule.ID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2020-07-09 04:53:32 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *scheduler) GetSchedule(ctx context.Context, id int64) (*Schedule, error) {
|
|
|
|
schedule, err := s.dao.Get(ctx, id)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-07-28 07:31:51 +02:00
|
|
|
return s.convertSchedule(ctx, schedule)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) {
|
|
|
|
schedules, err := s.dao.List(ctx, query)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var scheds []*Schedule
|
|
|
|
for _, schedule := range schedules {
|
|
|
|
sched, err := s.convertSchedule(ctx, schedule)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
scheds = append(scheds, sched)
|
|
|
|
}
|
|
|
|
return scheds, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *scheduler) convertSchedule(ctx context.Context, schedule *schedule) (*Schedule, error) {
|
2020-07-09 04:53:32 +02:00
|
|
|
schd := &Schedule{
|
|
|
|
ID: schedule.ID,
|
2020-07-28 07:31:51 +02:00
|
|
|
VendorType: schedule.VendorType,
|
|
|
|
VendorID: schedule.VendorID,
|
2020-09-24 10:48:56 +02:00
|
|
|
CRONType: schedule.CRONType,
|
2020-07-09 04:53:32 +02:00
|
|
|
CRON: schedule.CRON,
|
|
|
|
CreationTime: schedule.CreationTime,
|
|
|
|
UpdateTime: schedule.UpdateTime,
|
2019-07-19 13:44:11 +02:00
|
|
|
}
|
2020-12-11 10:45:41 +01:00
|
|
|
if len(schedule.ExtraAttrs) > 0 {
|
|
|
|
extras := map[string]interface{}{}
|
|
|
|
if err := json.Unmarshal([]byte(schedule.ExtraAttrs), &extras); err != nil {
|
|
|
|
log.Errorf("failed to unmarshal the extra attributes of schedule %d: %v", schedule.ID, err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
schd.ExtraAttrs = extras
|
|
|
|
}
|
|
|
|
|
2020-07-28 07:31:51 +02:00
|
|
|
executions, err := s.execMgr.List(ctx, &q.Query{
|
|
|
|
Keywords: map[string]interface{}{
|
|
|
|
"VendorType": JobNameScheduler,
|
|
|
|
"VendorID": schedule.ID,
|
|
|
|
},
|
|
|
|
})
|
2020-07-09 04:53:32 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-07-28 07:31:51 +02:00
|
|
|
if len(executions) == 0 {
|
|
|
|
// if no execution found for the schedule, mark it's status as error
|
|
|
|
schd.Status = job.ErrorStatus.String()
|
|
|
|
} else {
|
|
|
|
schd.Status = executions[0].Status
|
|
|
|
}
|
2020-07-09 04:53:32 +02:00
|
|
|
return schd, nil
|
|
|
|
}
|