mirror of
https://github.com/goharbor/harbor.git
synced 2025-03-01 18:21:20 +01:00
Merge branch 'job_service' of https://github.com/vmware/harbor into scan-job-migrate
This commit is contained in:
commit
6303785b1b
@ -123,22 +123,28 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
if jobActionReq.Action == opm.CtlCommandStop {
|
||||
switch jobActionReq.Action {
|
||||
case opm.CtlCommandStop:
|
||||
if err := dh.controller.StopJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
|
||||
return
|
||||
}
|
||||
} else if jobActionReq.Action == opm.CtlCommandCancel {
|
||||
if err := dh.controller.StopJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.StopJobError(err))
|
||||
case opm.CtlCommandCancel:
|
||||
if err := dh.controller.CancelJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.CancelJobError(err))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
case opm.CtlCommandRetry:
|
||||
if err := dh.controller.RetryJob(jobID); err != nil {
|
||||
dh.handleError(w, http.StatusInternalServerError, errs.RetryJobError(err))
|
||||
return
|
||||
}
|
||||
default:
|
||||
dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID)))
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.WriteHeader(http.StatusNoContent) //only header, no content returned
|
||||
}
|
||||
|
||||
//HandleCheckStatusReq is implementation of method defined in interface 'Handler'
|
||||
|
@ -28,6 +28,8 @@ const (
|
||||
StopJobErrorCode
|
||||
//CancelJobErrorCode is code for the error of cancelling job
|
||||
CancelJobErrorCode
|
||||
//RetryJobErrorCode is code for the error of retry job
|
||||
RetryJobErrorCode
|
||||
//UnknownActionNameErrorCode is code for the case of unknown action name
|
||||
UnknownActionNameErrorCode
|
||||
)
|
||||
@ -97,6 +99,11 @@ func CancelJobError(err error) error {
|
||||
return New(CancelJobErrorCode, "Cancel job failed with error", err.Error())
|
||||
}
|
||||
|
||||
//RetryJobError is error for the case of retrying job failed
|
||||
func RetryJobError(err error) error {
|
||||
return New(RetryJobErrorCode, "Retry job failed with error", err.Error())
|
||||
}
|
||||
|
||||
//UnknownActionNameError is error for the case of getting unknown job action
|
||||
func UnknownActionNameError(err error) error {
|
||||
return New(UnknownActionNameErrorCode, "Unknown job action name", err.Error())
|
||||
@ -124,7 +131,7 @@ type jobCancelledError struct {
|
||||
|
||||
//JobCancelledError is error wrapper for the case of cancelling job.
|
||||
func JobCancelledError() error {
|
||||
return jobStoppedError{
|
||||
return jobCancelledError{
|
||||
baseError{
|
||||
Code: JobStoppedErrorCode,
|
||||
Err: "Job is cancelled",
|
||||
|
@ -4,6 +4,7 @@ package impl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
hlog "github.com/vmware/harbor/src/common/utils/log"
|
||||
@ -21,6 +22,9 @@ type Context struct {
|
||||
|
||||
//op command func
|
||||
opCommandFunc job.CheckOPCmdFunc
|
||||
|
||||
//checkin func
|
||||
checkInFunc job.CheckInFunc
|
||||
}
|
||||
|
||||
//NewContext ...
|
||||
@ -51,6 +55,14 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
|
||||
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
|
||||
if funcRef, ok := checkInFunc.(job.CheckInFunc); ok {
|
||||
jContext.checkInFunc = funcRef
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return jContext, nil
|
||||
}
|
||||
|
||||
@ -66,6 +78,12 @@ func (c *Context) SystemContext() context.Context {
|
||||
|
||||
//Checkin is bridge func for reporting detailed status
|
||||
func (c *Context) Checkin(status string) error {
|
||||
if c.checkInFunc != nil {
|
||||
c.checkInFunc(status)
|
||||
} else {
|
||||
return errors.New("nil check in function")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/opm"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
@ -18,7 +20,12 @@ type ReplicationJob struct{}
|
||||
|
||||
//MaxFails is implementation of same method in Interface.
|
||||
func (rj *ReplicationJob) MaxFails() uint {
|
||||
return 2
|
||||
return 3
|
||||
}
|
||||
|
||||
//ShouldRetry ...
|
||||
func (rj *ReplicationJob) ShouldRetry() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
//Validate is implementation of same method in Interface.
|
||||
@ -47,13 +54,33 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{})
|
||||
fmt.Printf("params: %#v\n", params)
|
||||
fmt.Printf("context: %#v\n", ctx)
|
||||
|
||||
/*if 1 != 0 {
|
||||
return errors.New("I suicide")
|
||||
}*/
|
||||
|
||||
fmt.Println("check in 30%")
|
||||
ctx.Checkin("30%")
|
||||
time.Sleep(5 * time.Second)
|
||||
fmt.Println("check in 60%")
|
||||
ctx.Checkin("60%")
|
||||
time.Sleep(5 * time.Second)
|
||||
fmt.Println("check in 100%")
|
||||
ctx.Checkin("100%")
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
//HOLD ON FOR A WHILE
|
||||
fmt.Println("Holding for 20 sec")
|
||||
<-time.After(20 * time.Second)
|
||||
fmt.Println("I'm back, check if I'm stopped")
|
||||
fmt.Println("I'm back, check if I'm stopped/cancelled")
|
||||
|
||||
if cmd, ok := ctx.OPCommand(); ok {
|
||||
fmt.Printf("cmd=%s\n", cmd)
|
||||
if cmd == opm.CtlCommandCancel {
|
||||
fmt.Println("exit for receiving cancel signal")
|
||||
return errs.JobCancelledError()
|
||||
}
|
||||
|
||||
fmt.Println("exit for receiving stop signal")
|
||||
return errs.JobStoppedError()
|
||||
}
|
||||
|
||||
|
@ -9,14 +9,24 @@ import "github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
//command code for job to determine if take corresponding action.
|
||||
type CheckOPCmdFunc func() (string, bool)
|
||||
|
||||
//CheckInFunc is designed for job to report more detailed progress info
|
||||
type CheckInFunc func(message string)
|
||||
|
||||
//Interface defines the related injection and run entry methods.
|
||||
type Interface interface {
|
||||
//Declare how many times the job can be retried if failed.
|
||||
//
|
||||
//Return:
|
||||
// uint: the failure count allowed
|
||||
// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
|
||||
MaxFails() uint
|
||||
|
||||
//Tell the worker pool if retry the failed job when the fails is
|
||||
//still less that the number declared by the method 'MaxFails'.
|
||||
//
|
||||
//Returns:
|
||||
// true for retry and false for none-retry
|
||||
ShouldRetry() bool
|
||||
|
||||
//Indicate whether the parameters of job are valid.
|
||||
//
|
||||
//Return:
|
||||
|
@ -1,73 +0,0 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/gocraft/work"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
)
|
||||
|
||||
//StatusChangeCallback is the func called when job status changed
|
||||
type StatusChangeCallback func(jobID string, status string)
|
||||
|
||||
//CheckOPCmdFuncFactoryFunc is used to generate CheckOPCmdFunc func for the specified job
|
||||
type CheckOPCmdFuncFactoryFunc func(jobID string) CheckOPCmdFunc
|
||||
|
||||
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
|
||||
type RedisJob struct {
|
||||
job interface{}
|
||||
context *env.Context
|
||||
callback StatusChangeCallback
|
||||
opCmdFuncFactory CheckOPCmdFuncFactoryFunc
|
||||
}
|
||||
|
||||
//NewRedisJob is constructor of RedisJob
|
||||
func NewRedisJob(j interface{}, ctx *env.Context, statusChangeCallback StatusChangeCallback, opCmdFuncFactory CheckOPCmdFuncFactoryFunc) *RedisJob {
|
||||
return &RedisJob{j, ctx, statusChangeCallback, opCmdFuncFactory}
|
||||
}
|
||||
|
||||
//Run the job
|
||||
func (rj *RedisJob) Run(j *work.Job) error {
|
||||
//Build job execution context
|
||||
jData := env.JobData{
|
||||
ID: j.ID,
|
||||
Name: j.Name,
|
||||
Args: j.Args,
|
||||
ExtraData: make(map[string]interface{}),
|
||||
}
|
||||
jData.ExtraData["opCommandFunc"] = rj.opCmdFuncFactory(j.ID)
|
||||
execContext, err := rj.context.JobContext.Build(jData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//Inject data
|
||||
runningJob := Wrap(rj.job)
|
||||
//Start to run
|
||||
rj.callback(j.ID, JobStatusRunning)
|
||||
|
||||
//TODO: Check function should be defined
|
||||
err = runningJob.Run(execContext, j.Args)
|
||||
|
||||
if err == nil {
|
||||
rj.callback(j.ID, JobStatusSuccess)
|
||||
return nil
|
||||
}
|
||||
|
||||
if errs.IsJobStoppedError(err) {
|
||||
rj.callback(j.ID, JobStatusStopped)
|
||||
return nil // no need to put it into the dead queue for resume
|
||||
}
|
||||
|
||||
if errs.IsJobCancelledError(err) {
|
||||
rj.callback(j.ID, JobStatusCancelled)
|
||||
return err //need to resume
|
||||
}
|
||||
|
||||
rj.callback(j.ID, JobStatusError)
|
||||
return err
|
||||
|
||||
//TODO:
|
||||
//Need to consider how to rm the retry option
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package job
|
||||
|
||||
import "reflect"
|
||||
|
||||
//Wrap returns a new (job.)Interface based on the wrapped job handler reference.
|
||||
func Wrap(j interface{}) Interface {
|
||||
theType := reflect.TypeOf(j)
|
||||
|
||||
if theType.Kind() == reflect.Ptr {
|
||||
theType = theType.Elem()
|
||||
}
|
||||
|
||||
//Crate new
|
||||
v := reflect.New(theType).Elem()
|
||||
return v.Addr().Interface().(Interface)
|
||||
}
|
@ -44,6 +44,7 @@ type JobStatData struct {
|
||||
RunAt int64 `json:"run_at,omitempty"`
|
||||
CheckIn string `json:"check_in,omitempty"`
|
||||
CheckInAt int64 `json:"check_in_at,omitempty"`
|
||||
DieAt int64 `json:"die_at,omitempty"`
|
||||
}
|
||||
|
||||
//JobPoolStats represent the healthy and status of the job service.
|
||||
|
@ -54,7 +54,7 @@ type JobStatsManager interface {
|
||||
// error if meet any problems
|
||||
Retry(jobID string) error
|
||||
|
||||
//CtlCommand check if control command is fired for the specified job.
|
||||
//CtlCommand checks if control command is fired for the specified job.
|
||||
//
|
||||
//jobID string : ID of the job
|
||||
//
|
||||
@ -62,4 +62,18 @@ type JobStatsManager interface {
|
||||
// the command if it was fired
|
||||
// error if it was not fired yet to meet some other problems
|
||||
CtlCommand(jobID string) (string, error)
|
||||
|
||||
//CheckIn message for the specified job like detailed progress info.
|
||||
//
|
||||
//jobID string : ID of the job
|
||||
//message string : The message being checked in
|
||||
//
|
||||
CheckIn(jobID string, message string)
|
||||
|
||||
//DieAt marks the failed jobs with the time they put into dead queue.
|
||||
//
|
||||
//jobID string : ID of the job
|
||||
//message string : The message being checked in
|
||||
//
|
||||
DieAt(jobID string, dieAt int64)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package opm
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -26,12 +27,16 @@ const (
|
||||
processBufferSize = 1024
|
||||
opSaveStats = "save_job_stats"
|
||||
opUpdateStatus = "update_job_status"
|
||||
opCheckIn = "check_in"
|
||||
opDieAt = "mark_die_at"
|
||||
maxFails = 3
|
||||
|
||||
//CtlCommandStop : command stop
|
||||
CtlCommandStop = "stop"
|
||||
//CtlCommandCancel : command cancel
|
||||
CtlCommandCancel = "cancel"
|
||||
//CtlCommandRetry : command retry
|
||||
CtlCommandRetry = "retry"
|
||||
|
||||
//Copy from period.enqueuer
|
||||
periodicEnqueuerHorizon = 4 * time.Minute
|
||||
@ -90,6 +95,7 @@ func (rjs *RedisJobStatsManager) Shutdown() {
|
||||
}
|
||||
|
||||
//Save is implementation of same method in JobStatsManager interface.
|
||||
//Async method
|
||||
func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
|
||||
item := &queueItem{
|
||||
op: opSaveStats,
|
||||
@ -100,6 +106,7 @@ func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
|
||||
}
|
||||
|
||||
//Retrieve is implementation of same method in JobStatsManager interface.
|
||||
//Sync method
|
||||
func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error) {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return models.JobStats{}, errors.New("empty job ID")
|
||||
@ -109,6 +116,7 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error)
|
||||
}
|
||||
|
||||
//SetJobStatus is implementation of same method in JobStatsManager interface.
|
||||
//Async method
|
||||
func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
|
||||
if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(status) {
|
||||
return
|
||||
@ -135,25 +143,27 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
for {
|
||||
select {
|
||||
case item := <-rjs.processChan:
|
||||
if err := rjs.process(item); err != nil {
|
||||
item.fails++
|
||||
if item.fails < maxFails {
|
||||
//Retry after a random interval
|
||||
go func() {
|
||||
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
|
||||
defer timer.Stop()
|
||||
go func(item *queueItem) {
|
||||
if err := rjs.process(item); err != nil {
|
||||
item.fails++
|
||||
if item.fails < maxFails {
|
||||
//Retry after a random interval
|
||||
go func() {
|
||||
timer := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
rjs.processChan <- item
|
||||
return
|
||||
case <-controlChan:
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
|
||||
select {
|
||||
case <-timer.C:
|
||||
rjs.processChan <- item
|
||||
return
|
||||
case <-controlChan:
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Warningf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(item)
|
||||
break
|
||||
case <-rjs.stopChan:
|
||||
rjs.doneChan <- struct{}{}
|
||||
@ -165,7 +175,6 @@ func (rjs *RedisJobStatsManager) loop() {
|
||||
}
|
||||
|
||||
//Stop the specified job.
|
||||
//Async method, not blocking
|
||||
func (rjs *RedisJobStatsManager) Stop(jobID string) error {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return errors.New("empty job ID")
|
||||
@ -188,12 +197,17 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error {
|
||||
}
|
||||
}
|
||||
case job.JobKindPeriodic:
|
||||
//firstly we need try to delete the job instances scheduled for this periodic job, a try best action
|
||||
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
|
||||
//secondly delete the periodic job policy
|
||||
//firstly delete the periodic job policy
|
||||
if err := rjs.scheduler.UnSchedule(jobID); err != nil {
|
||||
return err
|
||||
}
|
||||
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
|
||||
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
|
||||
//thirdly expire the job stats of this periodic job if exists
|
||||
if err := rjs.expirePeriodicJobStats(theJob.Stats.JobID); err != nil {
|
||||
//only logged
|
||||
log.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
@ -212,13 +226,64 @@ func (rjs *RedisJobStatsManager) Stop(jobID string) error {
|
||||
//Cancel the specified job.
|
||||
//Async method, not blocking
|
||||
func (rjs *RedisJobStatsManager) Cancel(jobID string) error {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return errors.New("empty job ID")
|
||||
}
|
||||
|
||||
theJob, err := rjs.getJobStats(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch theJob.Stats.JobKind {
|
||||
case job.JobKindGeneric:
|
||||
if theJob.Stats.Status != job.JobStatusRunning {
|
||||
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
|
||||
}
|
||||
|
||||
//Send 'cancel' ctl command to the running instance
|
||||
if err := rjs.writeCtlCommand(jobID, CtlCommandCancel); err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
default:
|
||||
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//Retry the specified job.
|
||||
//Async method, not blocking
|
||||
func (rjs *RedisJobStatsManager) Retry(jobID string) error {
|
||||
return nil
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return errors.New("empty job ID")
|
||||
}
|
||||
|
||||
theJob, err := rjs.getJobStats(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if theJob.Stats.DieAt == 0 {
|
||||
return fmt.Errorf("job '%s' is not a retryable job", jobID)
|
||||
}
|
||||
|
||||
return rjs.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
|
||||
}
|
||||
|
||||
//CheckIn mesage
|
||||
func (rjs *RedisJobStatsManager) CheckIn(jobID string, message string) {
|
||||
if utils.IsEmptyStr(jobID) || utils.IsEmptyStr(message) {
|
||||
return
|
||||
}
|
||||
|
||||
item := &queueItem{
|
||||
op: opCheckIn,
|
||||
data: []string{jobID, message},
|
||||
}
|
||||
|
||||
rjs.processChan <- item
|
||||
}
|
||||
|
||||
//CtlCommand checks if control command is fired for the specified job.
|
||||
@ -230,6 +295,33 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) {
|
||||
return rjs.getCrlCommand(jobID)
|
||||
}
|
||||
|
||||
//DieAt marks the failed jobs with the time they put into dead queue.
|
||||
func (rjs *RedisJobStatsManager) DieAt(jobID string, dieAt int64) {
|
||||
if utils.IsEmptyStr(jobID) || dieAt == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
item := &queueItem{
|
||||
op: opDieAt,
|
||||
data: []interface{}{jobID, dieAt},
|
||||
}
|
||||
|
||||
rjs.processChan <- item
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
//The periodic job (policy) is stopped/unscheduled and then
|
||||
//the stats of periodic job now can be expired
|
||||
key := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
expireTime := 24 * 60 * 60 //1 day
|
||||
_, err := conn.Do("EXPIRE", key, expireTime)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
|
||||
schedule, err := cron.Parse(cronSpec)
|
||||
if err != nil {
|
||||
@ -245,7 +337,6 @@ func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID st
|
||||
//return the last error if occurred
|
||||
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
|
||||
epoch := t.Unix()
|
||||
log.Infof("epoch=%d\n", epoch)
|
||||
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
|
||||
//only logged
|
||||
log.Errorf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
|
||||
@ -301,11 +392,55 @@ func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) er
|
||||
key := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
args := make([]interface{}, 0, 5)
|
||||
args = append(args, key, "status", status, "update_time", time.Now().Unix())
|
||||
if status == job.JobStatusSuccess {
|
||||
//make sure the 'die_at' is reset in case it's a retrying job
|
||||
args = append(args, "die_at", 0)
|
||||
}
|
||||
_, err := conn.Do("HMSET", args...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) checkIn(jobID string, message string) error {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
now := time.Now().Unix()
|
||||
key := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
args := make([]interface{}, 0, 7)
|
||||
args = append(args, key, "check_in", message, "check_in_at", now, "update_time", now)
|
||||
_, err := conn.Do("HMSET", args...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
//Query the dead job in the time scope of [baseTime,baseTime+5]
|
||||
key := utils.RedisKeyDead(rjs.namespace)
|
||||
jobWithScores, err := utils.GetZsetByScore(rjs.redisPool, key, []int64{baseTime, baseTime + 5})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, jws := range jobWithScores {
|
||||
if j, err := utils.DeSerializeJob(jws.JobBytes); err == nil {
|
||||
if j.ID == jobID {
|
||||
//Found
|
||||
statsKey := utils.KeyJobStats(rjs.namespace, jobID)
|
||||
args := make([]interface{}, 0, 7)
|
||||
args = append(args, statsKey, "die_at", jws.Score, "update_time", time.Now().Unix())
|
||||
_, err := conn.Do("HMSET", args...)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("seems %s is not a dead job", jobID)
|
||||
}
|
||||
|
||||
func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, error) {
|
||||
conn := rjs.redisPool.Get()
|
||||
defer conn.Close()
|
||||
@ -365,6 +500,9 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
|
||||
case "cron_spec":
|
||||
res.Stats.CronSpec = value
|
||||
break
|
||||
case "die_at":
|
||||
v, _ := strconv.ParseInt(value, 10, 64)
|
||||
res.Stats.DieAt = v
|
||||
default:
|
||||
}
|
||||
}
|
||||
@ -397,6 +535,9 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
|
||||
"check_in_at", jobStats.Stats.CheckInAt,
|
||||
)
|
||||
}
|
||||
if jobStats.Stats.DieAt > 0 {
|
||||
args = append(args, "die_at", jobStats.Stats.DieAt)
|
||||
}
|
||||
|
||||
conn.Send("HMSET", args...)
|
||||
//If job kind is periodic job, expire time should not be set
|
||||
@ -425,6 +566,12 @@ func (rjs *RedisJobStatsManager) process(item *queueItem) error {
|
||||
case opUpdateStatus:
|
||||
data := item.data.([]string)
|
||||
return rjs.updateJobStatus(data[0], data[1])
|
||||
case opCheckIn:
|
||||
data := item.data.([]string)
|
||||
return rjs.checkIn(data[0], data[1])
|
||||
case opDieAt:
|
||||
data := item.data.([]interface{})
|
||||
return rjs.dieAt(data[0].(string), data[1].(int64))
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
@ -3,7 +3,6 @@
|
||||
package period
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -55,7 +54,7 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error {
|
||||
}
|
||||
|
||||
nowEpoch := time.Now().Unix()
|
||||
jobScores, err := GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
|
||||
jobScores, err := utils.GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -94,32 +93,3 @@ func (s *Sweeper) ClearOutdatedScheduledJobs() error {
|
||||
}
|
||||
return fmt.Errorf("%s", errorSummary)
|
||||
}
|
||||
|
||||
//JobScore represents the data item with score in the redis db.
|
||||
type JobScore struct {
|
||||
JobBytes []byte
|
||||
Score int64
|
||||
}
|
||||
|
||||
//GetZsetByScore get the items from the zset filtered by the specified score scope.
|
||||
func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) {
|
||||
if pool == nil || utils.IsEmptyStr(key) || len(scores) < 2 {
|
||||
return nil, errors.New("bad arguments")
|
||||
}
|
||||
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var jobsWithScores []JobScore
|
||||
|
||||
if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobsWithScores, nil
|
||||
}
|
||||
|
151
src/jobservice_v2/pool/redis_job_wrapper.go
Normal file
151
src/jobservice_v2/pool/redis_job_wrapper.go
Normal file
@ -0,0 +1,151 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/opm"
|
||||
)
|
||||
|
||||
//RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
|
||||
type RedisJob struct {
|
||||
job interface{} // the real job implementation
|
||||
context *env.Context //context
|
||||
statsManager opm.JobStatsManager //job stats manager
|
||||
}
|
||||
|
||||
//NewRedisJob is constructor of RedisJob
|
||||
func NewRedisJob(j interface{}, ctx *env.Context, statsManager opm.JobStatsManager) *RedisJob {
|
||||
return &RedisJob{
|
||||
job: j,
|
||||
context: ctx,
|
||||
statsManager: statsManager,
|
||||
}
|
||||
}
|
||||
|
||||
//Run the job
|
||||
func (rj *RedisJob) Run(j *work.Job) error {
|
||||
var cancelled = false
|
||||
|
||||
execContext, err := rj.buildContext(j)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
runningJob := Wrap(rj.job)
|
||||
defer func() {
|
||||
if rj.shouldDisableRetry(runningJob, j, cancelled) {
|
||||
j.Fails = 10000000000 //Make it big enough to avoid retrying
|
||||
now := time.Now().Unix()
|
||||
go func() {
|
||||
timer := time.NewTimer(2 * time.Second) //make sure the failed job is already put into the dead queue
|
||||
defer timer.Stop()
|
||||
|
||||
<-timer.C
|
||||
|
||||
rj.statsManager.DieAt(j.ID, now)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
//Start to run
|
||||
rj.jobRunning(j.ID)
|
||||
//Inject data
|
||||
err = runningJob.Run(execContext, j.Args)
|
||||
|
||||
//update the proper status
|
||||
if err == nil {
|
||||
rj.jobSucceed(j.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if errs.IsJobStoppedError(err) {
|
||||
rj.jobStopped(j.ID)
|
||||
return nil // no need to put it into the dead queue for resume
|
||||
}
|
||||
|
||||
if errs.IsJobCancelledError(err) {
|
||||
rj.jobCancelled(j.ID)
|
||||
cancelled = true
|
||||
return err //need to resume
|
||||
}
|
||||
|
||||
rj.jobFailed(j.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (rj *RedisJob) jobRunning(jobID string) {
|
||||
rj.statsManager.SetJobStatus(jobID, job.JobStatusRunning)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) jobFailed(jobID string) {
|
||||
rj.statsManager.SetJobStatus(jobID, job.JobStatusError)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) jobStopped(jobID string) {
|
||||
rj.statsManager.SetJobStatus(jobID, job.JobStatusStopped)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) jobCancelled(jobID string) {
|
||||
rj.statsManager.SetJobStatus(jobID, job.JobStatusCancelled)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) jobSucceed(jobID string) {
|
||||
rj.statsManager.SetJobStatus(jobID, job.JobStatusSuccess)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) buildContext(j *work.Job) (env.JobContext, error) {
|
||||
//Build job execution context
|
||||
jData := env.JobData{
|
||||
ID: j.ID,
|
||||
Name: j.Name,
|
||||
Args: j.Args,
|
||||
ExtraData: make(map[string]interface{}),
|
||||
}
|
||||
|
||||
checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc {
|
||||
return func() (string, bool) {
|
||||
cmd, err := rj.statsManager.CtlCommand(jobID)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
return cmd, true
|
||||
}
|
||||
}
|
||||
|
||||
jData.ExtraData["opCommandFunc"] = checkOPCmdFuncFactory(j.ID)
|
||||
|
||||
checkInFuncFactory := func(jobID string) job.CheckInFunc {
|
||||
return func(message string) {
|
||||
rj.statsManager.CheckIn(jobID, message)
|
||||
}
|
||||
}
|
||||
|
||||
jData.ExtraData["checkInFunc"] = checkInFuncFactory(j.ID)
|
||||
|
||||
return rj.context.JobContext.Build(jData)
|
||||
}
|
||||
|
||||
func (rj *RedisJob) shouldDisableRetry(j job.Interface, wj *work.Job, cancelled bool) bool {
|
||||
maxFails := j.MaxFails()
|
||||
if maxFails == 0 {
|
||||
maxFails = 4 //Consistent with backend worker pool
|
||||
}
|
||||
fails := wj.Fails
|
||||
fails++ //as the fail is not returned to backend pool yet
|
||||
|
||||
if cancelled && fails < int64(maxFails) {
|
||||
return true
|
||||
}
|
||||
|
||||
if !cancelled && fails < int64(maxFails) && !j.ShouldRetry() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
@ -172,24 +172,10 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
|
||||
return errors.New("job must implement the job.Interface")
|
||||
}
|
||||
|
||||
//Use redis job wrapper pointer to keep the data required by the job.Interface.
|
||||
statusChangeCallback := func(jobID string, status string) {
|
||||
gcwp.statsManager.SetJobStatus(jobID, status)
|
||||
}
|
||||
//Define the concrete factory method for creating 'job.CheckOPCmdFunc'.
|
||||
checkOPCmdFuncFactory := func(jobID string) job.CheckOPCmdFunc {
|
||||
return func() (string, bool) {
|
||||
cmd, err := gcwp.statsManager.CtlCommand(jobID)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
return cmd, true
|
||||
}
|
||||
}
|
||||
redisJob := job.NewRedisJob(j, gcwp.context, statusChangeCallback, checkOPCmdFuncFactory)
|
||||
redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager)
|
||||
|
||||
//Get more info from j
|
||||
theJ := job.Wrap(j)
|
||||
theJ := Wrap(j)
|
||||
|
||||
gcwp.pool.JobWithOptions(name,
|
||||
work.JobOptions{MaxFails: theJ.MaxFails()},
|
||||
@ -350,7 +336,7 @@ func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
|
||||
|
||||
//RetryJob retry the job
|
||||
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error {
|
||||
return nil
|
||||
return gcwp.statsManager.Retry(jobID)
|
||||
}
|
||||
|
||||
//IsKnownJob ...
|
||||
@ -365,13 +351,13 @@ func (gcwp *GoCraftWorkPool) ValidateJobParameters(jobType interface{}, params m
|
||||
return errors.New("nil job type")
|
||||
}
|
||||
|
||||
theJ := job.Wrap(jobType)
|
||||
theJ := Wrap(jobType)
|
||||
return theJ.Validate(params)
|
||||
}
|
||||
|
||||
//log the job
|
||||
func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
|
||||
log.Infof("Job incoming: %s:%s", job.ID, job.Name)
|
||||
log.Infof("Job incoming: %s:%s", job.Name, job.ID)
|
||||
return next()
|
||||
}
|
||||
|
||||
|
22
src/jobservice_v2/pool/runner.go
Normal file
22
src/jobservice_v2/pool/runner.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
)
|
||||
|
||||
//Wrap returns a new job.Interface based on the wrapped job handler reference.
|
||||
func Wrap(j interface{}) job.Interface {
|
||||
theType := reflect.TypeOf(j)
|
||||
|
||||
if theType.Kind() == reflect.Ptr {
|
||||
theType = theType.Elem()
|
||||
}
|
||||
|
||||
//Crate new
|
||||
v := reflect.New(theType).Elem()
|
||||
return v.Addr().Interface().(job.Interface)
|
||||
}
|
@ -43,6 +43,11 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||
}
|
||||
|
||||
//RedisKeyDead returns key of the dead jobs.
|
||||
func RedisKeyDead(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "dead"
|
||||
}
|
||||
|
||||
var nowMock int64
|
||||
|
||||
//NowEpochSeconds ...
|
||||
|
@ -4,8 +4,11 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
//IsEmptyStr check if the specified str is empty (len ==0) after triming prefix and suffix spaces.
|
||||
@ -33,3 +36,32 @@ func FileExists(file string) bool {
|
||||
func IsValidPort(port uint) bool {
|
||||
return port != 0 && port < 65536
|
||||
}
|
||||
|
||||
//JobScore represents the data item with score in the redis db.
|
||||
type JobScore struct {
|
||||
JobBytes []byte
|
||||
Score int64
|
||||
}
|
||||
|
||||
//GetZsetByScore get the items from the zset filtered by the specified score scope.
|
||||
func GetZsetByScore(pool *redis.Pool, key string, scores []int64) ([]JobScore, error) {
|
||||
if pool == nil || IsEmptyStr(key) || len(scores) < 2 {
|
||||
return nil, errors.New("bad arguments")
|
||||
}
|
||||
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
values, err := redis.Values(conn.Do("ZRANGEBYSCORE", key, scores[0], scores[1], "WITHSCORES"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var jobsWithScores []JobScore
|
||||
|
||||
if err := redis.ScanSlice(values, &jobsWithScores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobsWithScores, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user