statemachine and schduler to support retry

This commit is contained in:
Tan Jiang 2016-07-05 11:11:14 +08:00
parent 9052206b99
commit 6b955cd154
5 changed files with 96 additions and 60 deletions

7
.gitignore vendored
View File

@ -6,9 +6,4 @@ Deploy/config/db/env
Deploy/config/jobservice/env
ui/ui
*.pyc
jobservice/*.sql
jobservice/*.sh
jobservice/*.json
jobservice/jobservice
jobservice/test

View File

@ -15,9 +15,22 @@
package job
import (
"github.com/vmware/harbor/utils/log"
"time"
)
var jobQueue = make(chan int64)
// Schedule put a job id into job queue.
func Schedule(jobID int64) {
jobQueue <- jobID
}
// Reschedule is called by statemachine to retry a job
func Reschedule(jobID int64) {
log.Debugf("Job %d will be rescheduled in 5 minutes", jobID)
time.Sleep(5 * time.Minute)
log.Debugf("Rescheduling job %d", jobID)
Schedule(jobID)
}

View File

@ -33,25 +33,10 @@ type StateHandler interface {
Exit() error
}
// DummyHandler is the default implementation of StateHander interface, which has empty Enter and Exit methods.
type DummyHandler struct {
JobID int64
}
// Enter ...
func (dh DummyHandler) Enter() (string, error) {
return "", nil
}
// Exit ...
func (dh DummyHandler) Exit() error {
return nil
}
// StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters
// a status.
type StatusUpdater struct {
DummyHandler
JobID int64
State string
}
@ -69,9 +54,34 @@ func (su StatusUpdater) Enter() (string, error) {
return next, err
}
// Exit ...
func (su StatusUpdater) Exit() error {
return nil
}
// Retry handles a special "retrying" in which case it will update the status in DB and reschedule the job
// via scheduler
type Retry struct {
JobID int64
}
// Enter ...
func (jr Retry) Enter() (string, error) {
err := dao.UpdateRepJobStatus(jr.JobID, models.JobRetrying)
if err != nil {
log.Errorf("Failed to update state of job :%d to Retrying, error: %v", jr.JobID, err)
}
go Reschedule(jr.JobID)
return "", err
}
// Exit ...
func (jr Retry) Exit() error {
return nil
}
// ImgPuller was for testing
type ImgPuller struct {
DummyHandler
img string
logger *log.Logger
}
@ -80,13 +90,17 @@ type ImgPuller struct {
func (ip ImgPuller) Enter() (string, error) {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return "push-img", nil
ip.logger.Infof("wake up from sleep.... testing retry")
return models.JobRetrying, nil
}
// Exit ...
func (ip ImgPuller) Exit() error {
return nil
}
// ImgPusher is a statehandler for testing
type ImgPusher struct {
DummyHandler
targetURL string
logger *log.Logger
}
@ -95,6 +109,11 @@ type ImgPusher struct {
func (ip ImgPusher) Enter() (string, error) {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return models.JobContinue, nil
ip.logger.Infof("wake up from sleep.... testing retry")
return models.JobRetrying, nil
}
// Exit ...
func (ip ImgPusher) Exit() error {
return nil
}

View File

@ -179,6 +179,7 @@ func (sm *SM) Init() {
models.JobError: struct{}{},
models.JobStopped: struct{}{},
models.JobCanceled: struct{}{},
models.JobRetrying: struct{}{},
}
}
@ -243,9 +244,11 @@ func (sm *SM) Reset(jid int64) error {
sm.Transitions = make(map[string]map[string]struct{})
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning})
sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{sm.JobID, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{sm.JobID, models.JobStopped}
sm.Handlers[models.JobRetrying] = Retry{sm.JobID}
switch sm.Parms.Operation {
case models.RepOpTransfer:
@ -259,6 +262,12 @@ func (sm *SM) Reset(jid int64) error {
return err
}
//for testing onlly
func addTestTransition(sm *SM) error {
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{img: sm.Parms.Repository, logger: sm.Logger})
return nil
}
func addImgTransferTransition(sm *SM) error {
base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(),
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
@ -269,7 +278,7 @@ func addImgTransferTransition(sm *SM) error {
sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base})
sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished})
sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base})
sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
return nil
@ -283,7 +292,7 @@ func addImgDeleteTransition(sm *SM) error {
}
sm.AddTransition(models.JobRunning, replication.StateDelete, deleter)
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished})
return nil
}

View File

@ -38,13 +38,13 @@ func resumeJobs() {
if err != nil {
log.Warningf("Failed to reset all running jobs to pending, error: %v", err)
}
jobs, err := dao.GetRepJobByStatus(models.JobPending)
jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying)
if err == nil {
for _, j := range jobs {
log.Debugf("Rescheduling job: %d", j.ID)
log.Debugf("Resuming job: %d", j.ID)
job.Schedule(j.ID)
}
} else {
log.Warningf("Failed to get pending jobs, error: %v", err)
log.Warningf("Failed to jobs to resume, error: %v", err)
}
}