* fix #2382
This commit is contained in:
Daniel Jiang 2017-06-05 21:51:50 +08:00 committed by GitHub
parent 5512478593
commit c099ccf02e
2 changed files with 14 additions and 10 deletions

View File

@ -54,7 +54,7 @@ func (sm *SM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("Job: %d, no handler found for state:%s, skip", sm.CurrentJob, sm.CurrentState)
log.Debugf("Job: %v, no exit handler found for state:%s, skip", sm.CurrentJob, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next = models.JobContinue
@ -101,7 +101,7 @@ func (sm *SM) Start(s string) {
log.Debugf("Job: %v, next state from handler: %s", sm.CurrentJob, n)
}
if err != nil {
log.Warningf("Job: %v, the statemachin will enter error state due to error: %v", sm.CurrentJob, err)
log.Warningf("Job: %v, the statemachine will enter error state due to error: %v", sm.CurrentJob, err)
sm.EnterState(models.JobError)
}
}
@ -187,6 +187,7 @@ func (sm *SM) Reset(j Job) error {
sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.CurrentJob, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{sm.CurrentJob, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{sm.CurrentJob, models.JobStopped}
sm.Handlers[models.JobCanceled] = StatusUpdater{sm.CurrentJob, models.JobCanceled}
sm.Handlers[models.JobRetrying] = Retry{sm.CurrentJob}
if err := sm.CurrentJob.Init(); err != nil {
return err
@ -201,12 +202,14 @@ func (sm *SM) kickOff() error {
if repJob, ok := sm.CurrentJob.(*RepJob); ok {
if repJob.parm.Enabled == 0 {
log.Debugf("The policy of job:%v is disabled, will cancel the job", repJob)
if err := repJob.UpdateStatus(models.JobCanceled); err != nil {
log.Warningf("Failed to update status of job: %v to 'canceled', error: %v", repJob, err)
}
_, err := sm.EnterState(models.JobCanceled)
if err != nil {
log.Warningf("For job: %v, failed to update state to 'canceled', error: %v", repJob, err)
}
return err
}
}
log.Debugf("In kickOff: will start job: %v", sm.CurrentJob)
sm.Start(models.JobRunning)
return nil
}

View File

@ -103,9 +103,10 @@ func (w *Worker) handle(job Job) {
}
// NewWorker returns a pointer to new instance of worker
func NewWorker(id int, wp *workerPool) *Worker {
func NewWorker(id int, t Type, wp *workerPool) *Worker {
w := &Worker{
ID: id,
Type: t,
Jobs: make(chan Job),
quit: make(chan bool),
queue: wp.workerChan,
@ -125,19 +126,19 @@ func InitWorkerPools() error {
return err
}
WorkerPools = make(map[Type]*workerPool)
WorkerPools[ReplicationType] = createWorkerPool(maxRepWorker)
WorkerPools[ScanType] = createWorkerPool(maxScanWorker)
WorkerPools[ReplicationType] = createWorkerPool(maxRepWorker, ReplicationType)
WorkerPools[ScanType] = createWorkerPool(maxScanWorker, ScanType)
return nil
}
//createWorkerPool create workers according to parm
func createWorkerPool(n int) *workerPool {
func createWorkerPool(n int, t Type) *workerPool {
wp := &workerPool{
workerChan: make(chan *Worker, n),
workerList: make([]*Worker, 0, n),
}
for i := 0; i < n; i++ {
worker := NewWorker(i, wp)
worker := NewWorker(i, t, wp)
wp.workerList = append(wp.workerList, worker)
worker.Start()
log.Debugf("worker %v started", worker)