mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-02 04:51:22 +01:00
Fix bug found in scheduler
The scheduler hook handler doesn't parse the job status struct when handling the hook. This commit fixes it. Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
c279b7f3e9
commit
5c286d799f
@ -15,7 +15,9 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/goharbor/harbor/src/pkg/retention"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
@ -121,12 +123,16 @@ func Init() error {
|
||||
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repositoryMgr, retentionScheduler, retentionLauncher)
|
||||
|
||||
callbackFun := func(p interface{}) error {
|
||||
r, ok := p.(retention.TriggerParam)
|
||||
if ok {
|
||||
_, err := retentionController.TriggerRetentionExec(r.PolicyID, r.Trigger, false)
|
||||
return err
|
||||
str, ok := p.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("the type of param %v isn't string", p)
|
||||
}
|
||||
return errors.New("bad retention callback param")
|
||||
param := &retention.TriggerParam{}
|
||||
if err := json.Unmarshal([]byte(str), param); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal the param: %v", err)
|
||||
}
|
||||
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
|
||||
return err
|
||||
}
|
||||
err := scheduler.Register(retention.SchedulerCallback, callbackFun)
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/job/models"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler/hook"
|
||||
)
|
||||
|
||||
@ -31,7 +32,13 @@ type Handler struct {
|
||||
|
||||
// Handle ...
|
||||
func (h *Handler) Handle() {
|
||||
log.Debugf("received scheduler hook event for schedule %s", h.GetStringFromPath(":id"))
|
||||
|
||||
var data models.JobStatusChange
|
||||
if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data); err != nil {
|
||||
log.Errorf("failed to decode hook event: %v", err)
|
||||
return
|
||||
}
|
||||
// status update
|
||||
if len(data.CheckIn) == 0 {
|
||||
schedulerID, err := h.GetInt64FromPath(":id")
|
||||
@ -43,6 +50,7 @@ func (h *Handler) Handle() {
|
||||
h.SendInternalServerError(fmt.Errorf("failed to update status of job %s: %v", data.JobID, err))
|
||||
return
|
||||
}
|
||||
log.Debugf("handle status update hook event for schedule %s completed", h.GetStringFromPath(":id"))
|
||||
return
|
||||
}
|
||||
|
||||
@ -53,7 +61,7 @@ func (h *Handler) Handle() {
|
||||
log.Errorf("failed to unmarshal parameters from check in message: %v", err)
|
||||
return
|
||||
}
|
||||
callbackFuncNameParam, exist := params["callback_func_name"]
|
||||
callbackFuncNameParam, exist := params[scheduler.JobParamCallbackFunc]
|
||||
if !exist {
|
||||
log.Error("cannot get the parameter \"callback_func_name\" from the check in message")
|
||||
return
|
||||
@ -63,8 +71,9 @@ func (h *Handler) Handle() {
|
||||
log.Errorf("invalid \"callback_func_name\": %v", callbackFuncName)
|
||||
return
|
||||
}
|
||||
if err := hook.GlobalController.Run(callbackFuncName, params["params"]); err != nil {
|
||||
if err := hook.GlobalController.Run(callbackFuncName, params[scheduler.JobParamCallbackFuncParams]); err != nil {
|
||||
log.Errorf("failed to run the callback function %s: %v", callbackFuncName, err)
|
||||
return
|
||||
}
|
||||
log.Debugf("callback function %s called for schedule %s", callbackFuncName, h.GetStringFromPath(":id"))
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
@ -29,9 +30,10 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// const definitions
|
||||
const (
|
||||
jobParamCallbackFunc = "callback_func"
|
||||
jobParamCallbackFuncParams = "params"
|
||||
JobParamCallbackFunc = "callback_func"
|
||||
JobParamCallbackFuncParams = "params"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -46,6 +48,8 @@ type CallbackFunc func(interface{}) error
|
||||
|
||||
// Scheduler provides the capability to run a periodic task, a callback function
|
||||
// needs to be registered before using the scheduler
|
||||
// The "params" is passed to the callback function specified by "callbackFuncName"
|
||||
// as encoded json string, so the callback function must decode it before using
|
||||
type Scheduler interface {
|
||||
Schedule(cron string, callbackFuncName string, params interface{}) (int64, error)
|
||||
UnSchedule(id int64) error
|
||||
@ -119,6 +123,15 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// if got error in the following steps, delete the schedule record in database
|
||||
defer func() {
|
||||
if err != nil {
|
||||
e := s.manager.Delete(scheduleID)
|
||||
if e != nil {
|
||||
log.Errorf("failed to delete the schedule %d: %v", scheduleID, e)
|
||||
}
|
||||
}
|
||||
}()
|
||||
log.Debugf("the schedule record %d created", scheduleID)
|
||||
|
||||
// submit scheduler job to Jobservice
|
||||
@ -126,8 +139,7 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
|
||||
jd := &models.JobData{
|
||||
Name: JobNameScheduler,
|
||||
Parameters: map[string]interface{}{
|
||||
jobParamCallbackFunc: callbackFuncName,
|
||||
jobParamCallbackFuncParams: params,
|
||||
JobParamCallbackFunc: callbackFuncName,
|
||||
},
|
||||
Metadata: &models.JobMetadata{
|
||||
JobKind: job.JobKindPeriodic,
|
||||
@ -135,15 +147,26 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
|
||||
},
|
||||
StatusHook: statusHookURL,
|
||||
}
|
||||
if params != nil {
|
||||
var paramsData []byte
|
||||
paramsData, err = json.Marshal(params)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
jd.Parameters[JobParamCallbackFuncParams] = string(paramsData)
|
||||
}
|
||||
jobID, err := s.jobserviceClient.SubmitJob(jd)
|
||||
if err != nil {
|
||||
// if failed to submit to Jobservice, delete the schedule record in database
|
||||
e := s.manager.Delete(scheduleID)
|
||||
if e != nil {
|
||||
log.Errorf("failed to delete the schedule %d: %v", scheduleID, e)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
// if got error in the following steps, stop the scheduler job
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil {
|
||||
log.Errorf("failed to stop the scheduler job %s: %v", jobID, e)
|
||||
}
|
||||
}
|
||||
}()
|
||||
log.Debugf("the scheduler job submitted to Jobservice, job ID: %s", jobID)
|
||||
|
||||
// populate the job ID for the schedule
|
||||
@ -152,14 +175,6 @@ func (s *scheduler) Schedule(cron string, callbackFuncName string, params interf
|
||||
JobID: jobID,
|
||||
}, "JobID")
|
||||
if err != nil {
|
||||
// stop the scheduler job
|
||||
if e := s.jobserviceClient.PostAction(jobID, job.JobActionStop); e != nil {
|
||||
log.Errorf("failed to stop the scheduler job %s: %v", jobID, e)
|
||||
}
|
||||
// delete the schedule record
|
||||
if e := s.manager.Delete(scheduleID); e != nil {
|
||||
log.Errorf("failed to delete the schedule record %d: %v", scheduleID, e)
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -172,7 +187,8 @@ func (s *scheduler) UnSchedule(id int64) error {
|
||||
return err
|
||||
}
|
||||
if schedule == nil {
|
||||
return fmt.Errorf("the schedule record %d not found", id)
|
||||
log.Warningf("the schedule record %d not found", id)
|
||||
return nil
|
||||
}
|
||||
if err = s.jobserviceClient.PostAction(schedule.JobID, job.JobActionStop); err != nil {
|
||||
herr, ok := err.(*chttp.Error)
|
||||
|
Loading…
Reference in New Issue
Block a user