diff --git a/.gitignore b/.gitignore index 8251fd4cd..09be8ccc0 100644 --- a/.gitignore +++ b/.gitignore @@ -6,9 +6,4 @@ Deploy/config/db/env Deploy/config/jobservice/env ui/ui *.pyc -jobservice/*.sql -jobservice/*.sh -jobservice/*.json -jobservice/jobservice - - +jobservice/test diff --git a/job/scheduler.go b/job/scheduler.go index fddfdcafe..a721f235e 100644 --- a/job/scheduler.go +++ b/job/scheduler.go @@ -1,23 +1,36 @@ /* - Copyright (c) 2016 VMware, Inc. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright (c) 2016 VMware, Inc. All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package job +import ( + "github.com/vmware/harbor/utils/log" + "time" +) + var jobQueue = make(chan int64) // Schedule put a job id into job queue. func Schedule(jobID int64) { jobQueue <- jobID } + +// Reschedule is called by statemachine to retry a job +func Reschedule(jobID int64) { + log.Debugf("Job %d will be rescheduled in 5 minutes", jobID) + time.Sleep(5 * time.Minute) + log.Debugf("Rescheduling job %d", jobID) + Schedule(jobID) +} diff --git a/job/statehandlers.go b/job/statehandlers.go index 60e852422..d4a855e77 100644 --- a/job/statehandlers.go +++ b/job/statehandlers.go @@ -1,16 +1,16 @@ /* - Copyright (c) 2016 VMware, Inc. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright (c) 2016 VMware, Inc. All Rights Reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package job @@ -33,25 +33,10 @@ type StateHandler interface { Exit() error } -// DummyHandler is the default implementation of StateHander interface, which has empty Enter and Exit methods. -type DummyHandler struct { - JobID int64 -} - -// Enter ... -func (dh DummyHandler) Enter() (string, error) { - return "", nil -} - -// Exit ... -func (dh DummyHandler) Exit() error { - return nil -} - // StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters // a status. type StatusUpdater struct { - DummyHandler + JobID int64 State string } @@ -69,9 +54,34 @@ func (su StatusUpdater) Enter() (string, error) { return next, err } +// Exit ... +func (su StatusUpdater) Exit() error { + return nil +} + +// Retry handles a special "retrying" in which case it will update the status in DB and reschedule the job +// via scheduler +type Retry struct { + JobID int64 +} + +// Enter ... +func (jr Retry) Enter() (string, error) { + err := dao.UpdateRepJobStatus(jr.JobID, models.JobRetrying) + if err != nil { + log.Errorf("Failed to update state of job :%d to Retrying, error: %v", jr.JobID, err) + } + go Reschedule(jr.JobID) + return "", err +} + +// Exit ... +func (jr Retry) Exit() error { + return nil +} + // ImgPuller was for testing type ImgPuller struct { - DummyHandler img string logger *log.Logger } @@ -80,13 +90,17 @@ type ImgPuller struct { func (ip ImgPuller) Enter() (string, error) { ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img) time.Sleep(30 * time.Second) - ip.logger.Infof("wake up from sleep....") - return "push-img", nil + ip.logger.Infof("wake up from sleep.... testing retry") + return models.JobRetrying, nil +} + +// Exit ... +func (ip ImgPuller) Exit() error { + return nil } // ImgPusher is a statehandler for testing type ImgPusher struct { - DummyHandler targetURL string logger *log.Logger } @@ -95,6 +109,11 @@ type ImgPusher struct { func (ip ImgPusher) Enter() (string, error) { ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL) time.Sleep(30 * time.Second) - ip.logger.Infof("wake up from sleep....") - return models.JobContinue, nil + ip.logger.Infof("wake up from sleep.... testing retry") + return models.JobRetrying, nil +} + +// Exit ... +func (ip ImgPusher) Exit() error { + return nil } diff --git a/job/statemachine.go b/job/statemachine.go index 959a9f797..8a1c007ff 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -179,6 +179,7 @@ func (sm *SM) Init() { models.JobError: struct{}{}, models.JobStopped: struct{}{}, models.JobCanceled: struct{}{}, + models.JobRetrying: struct{}{}, } } @@ -243,9 +244,11 @@ func (sm *SM) Reset(jid int64) error { sm.Transitions = make(map[string]map[string]struct{}) sm.CurrentState = models.JobPending - sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning}) - sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError} - sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped} + sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning}) + sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning}) + sm.Handlers[models.JobError] = StatusUpdater{sm.JobID, models.JobError} + sm.Handlers[models.JobStopped] = StatusUpdater{sm.JobID, models.JobStopped} + sm.Handlers[models.JobRetrying] = Retry{sm.JobID} switch sm.Parms.Operation { case models.RepOpTransfer: @@ -259,6 +262,12 @@ func (sm *SM) Reset(jid int64) error { return err } +//for testing onlly +func addTestTransition(sm *SM) error { + sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{img: sm.Parms.Repository, logger: sm.Logger}) + return nil +} + func addImgTransferTransition(sm *SM) error { base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(), sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword, @@ -269,7 +278,7 @@ func addImgTransferTransition(sm *SM) error { sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base}) sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base}) - sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) + sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished}) sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base}) sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) return nil @@ -283,7 +292,7 @@ func addImgDeleteTransition(sm *SM) error { } sm.AddTransition(models.JobRunning, replication.StateDelete, deleter) - sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) + sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished}) return nil } diff --git a/jobservice/main.go b/jobservice/main.go index d960a0904..6f54b09e4 100644 --- a/jobservice/main.go +++ b/jobservice/main.go @@ -38,13 +38,13 @@ func resumeJobs() { if err != nil { log.Warningf("Failed to reset all running jobs to pending, error: %v", err) } - jobs, err := dao.GetRepJobByStatus(models.JobPending) + jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying) if err == nil { for _, j := range jobs { - log.Debugf("Rescheduling job: %d", j.ID) + log.Debugf("Resuming job: %d", j.ID) job.Schedule(j.ID) } } else { - log.Warningf("Failed to get pending jobs, error: %v", err) + log.Warningf("Failed to jobs to resume, error: %v", err) } }