From dd4c34682633d113762ef23e07d1e151e4f8303b Mon Sep 17 00:00:00 2001 From: Tan Jiang Date: Fri, 13 May 2016 21:43:17 +0800 Subject: [PATCH] refactor, associate statemachine to worker and reuse it --- api/replication.go | 5 +++ job/config/config.go | 38 +++++++++++++++++ job/scheduler.go | 25 ----------- job/statemachine.go | 27 ++++++------ job/workerpool.go | 92 +++++++++++++++++++++++++++++++++++++++++ jobservice/main.go | 73 +------------------------------- jobservice/populate.sql | 1 + jobservice/start_db.sh | 5 +++ 8 files changed, 155 insertions(+), 111 deletions(-) create mode 100644 job/config/config.go create mode 100644 job/workerpool.go diff --git a/api/replication.go b/api/replication.go index ba6f81471..0c8543a80 100644 --- a/api/replication.go +++ b/api/replication.go @@ -33,6 +33,11 @@ func (rj *ReplicationJob) Post() { rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID)) return } + if p == nil { + log.Errorf("Policy not found, id: %d", data.PolicyID) + rj.RenderError(http.StatusNotFound, fmt.Sprintf("Policy not found, id: %d", data.PolicyID)) + return + } repoList, err := getRepoList(p.ProjectID) if err != nil { log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err) diff --git a/job/config/config.go b/job/config/config.go new file mode 100644 index 000000000..553db13c2 --- /dev/null +++ b/job/config/config.go @@ -0,0 +1,38 @@ +package config + +import ( + "github.com/vmware/harbor/utils/log" + "os" + "strconv" +) + +const defaultMaxWorkers int = 10 + +var maxJobWorkers int +var localRegURL string + +func init() { + maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS") + maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32) + maxJobWorkers = int(maxWorkers64) + if err != nil { + log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers) + maxJobWorkers = defaultMaxWorkers + } + + localRegURL := os.Getenv("LOCAL_REGISTRY_URL") + if len(localRegURL) == 0 { + localRegURL = "http://registry:5000/" + } + + log.Debugf("config: maxJobWorkers: %d", maxJobWorkers) + log.Debugf("config: localRegURL: %s", localRegURL) +} + +func MaxJobWorkers() int { + return maxJobWorkers +} + +func LocalRegURL() string { + return localRegURL +} diff --git a/job/scheduler.go b/job/scheduler.go index 020818147..726943e47 100644 --- a/job/scheduler.go +++ b/job/scheduler.go @@ -1,32 +1,7 @@ package job -import ( - "github.com/vmware/harbor/dao" - "github.com/vmware/harbor/models" - "github.com/vmware/harbor/utils/log" -) - var JobQueue chan int64 = make(chan int64) func Schedule(jobID int64) { JobQueue <- jobID } - -func HandleRepJob(id int64) { - sm := &JobSM{JobID: id} - err := sm.Init() - if err != nil { - log.Errorf("Failed to initialize statemachine, error: %v", err) - err2 := dao.UpdateRepJobStatus(id, models.JobError) - if err2 != nil { - log.Errorf("Failed to update job status to ERROR, error:%v", err2) - } - return - } - if sm.Parms.Enabled == 0 { - log.Debugf("The policy of job:%d is disabled, will cancel the job") - _ = dao.UpdateRepJobStatus(id, models.JobCanceled) - } else { - sm.Start(models.JobRunning) - } -} diff --git a/job/statemachine.go b/job/statemachine.go index 616c0ed40..8593d4df9 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -2,10 +2,10 @@ package job import ( "fmt" - "os" "sync" "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/job/config" "github.com/vmware/harbor/job/imgout" "github.com/vmware/harbor/job/utils" "github.com/vmware/harbor/models" @@ -137,12 +137,17 @@ func (sm *JobSM) setDesiredState(s string) { sm.desiredState = s } -func (sm *JobSM) Init() error { +func (sm *JobSM) Init() { + sm.lock = &sync.Mutex{} + sm.Handlers = make(map[string]StateHandler) + sm.Transitions = make(map[string]map[string]struct{}) + sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning}) + sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError} + sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped} +} +func (sm *JobSM) Reset(jid int64) error { + sm.JobID = jid //init parms - regURL := os.Getenv("LOCAL_REGISTRY_URL") - if len(regURL) == 0 { - regURL = "http://registry:5000/" - } job, err := dao.GetRepJob(sm.JobID) if err != nil { return fmt.Errorf("Failed to get job, error: %v", err) @@ -158,7 +163,7 @@ func (sm *JobSM) Init() error { return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID) } sm.Parms = &RepJobParm{ - LocalRegURL: regURL, + LocalRegURL: config.LocalRegURL(), Repository: job.Repository, Enabled: policy.Enabled, Operation: job.Operation, @@ -178,14 +183,8 @@ func (sm *JobSM) Init() error { sm.Parms.TargetUsername = target.Username sm.Parms.TargetPassword = target.Password //init states handlers - sm.lock = &sync.Mutex{} - sm.Handlers = make(map[string]StateHandler) - sm.Transitions = make(map[string]map[string]struct{}) sm.Logger = utils.Logger{sm.JobID} sm.CurrentState = models.JobPending - sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning}) - sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError} - sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped} if sm.Parms.Operation == models.RepOpTransfer { /* sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger}) @@ -207,13 +206,11 @@ func addImgOutTransition(sm *JobSM) error { if err != nil { return err } - sm.AddTransition(models.JobRunning, imgout.StateCheck, &imgout.Checker{BaseHandler: base}) sm.AddTransition(imgout.StateCheck, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base}) sm.AddTransition(imgout.StatePullManifest, imgout.StateTransferBlob, &imgout.BlobTransfer{BaseHandler: base}) sm.AddTransition(imgout.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished}) sm.AddTransition(imgout.StateTransferBlob, imgout.StatePushManifest, &imgout.ManifestPusher{BaseHandler: base}) sm.AddTransition(imgout.StatePushManifest, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base}) - return nil } diff --git a/job/workerpool.go b/job/workerpool.go new file mode 100644 index 000000000..8606df35e --- /dev/null +++ b/job/workerpool.go @@ -0,0 +1,92 @@ +package job + +import ( + "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/job/config" + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" +) + +var WorkerPool chan *Worker + +type Worker struct { + ID int + RepJobs chan int64 + SM *JobSM + quit chan bool +} + +func (w *Worker) Start() { + go func() { + for { + WorkerPool <- w + select { + case jobID := <-w.RepJobs: + log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) + w.handleRepJob(jobID) + case q := <-w.quit: + if q { + log.Debugf("worker: %d, will stop.", w.ID) + return + } + } + } + }() +} + +func (w *Worker) Stop() { + go func() { + w.quit <- true + }() +} + +func (w *Worker) handleRepJob(id int64) { + err := w.SM.Reset(id) + if err != nil { + log.Errorf("Worker %d, failed to re-initialize statemachine, error: %v", err) + err2 := dao.UpdateRepJobStatus(id, models.JobError) + if err2 != nil { + log.Errorf("Failed to update job status to ERROR, error:%v", err2) + } + return + } + if w.SM.Parms.Enabled == 0 { + log.Debugf("The policy of job:%d is disabled, will cancel the job") + _ = dao.UpdateRepJobStatus(id, models.JobCanceled) + } else { + w.SM.Start(models.JobRunning) + } +} + +func NewWorker(id int) *Worker { + w := &Worker{ + ID: id, + RepJobs: make(chan int64), + quit: make(chan bool), + SM: &JobSM{}, + } + w.SM.Init() + return w +} + +func InitWorkerPool() { + WorkerPool = make(chan *Worker, config.MaxJobWorkers()) + for i := 0; i < config.MaxJobWorkers(); i++ { + worker := NewWorker(i) + worker.Start() + log.Debugf("worker %d started", worker.ID) + } +} + +func Dispatch() { + for { + select { + case job := <-JobQueue: + go func(jobID int64) { + log.Debugf("Trying to dispatch job: %d", jobID) + worker := <-WorkerPool + worker.RepJobs <- jobID + }(job) + } + } +} diff --git a/jobservice/main.go b/jobservice/main.go index 322b73a88..10ea2ae79 100644 --- a/jobservice/main.go +++ b/jobservice/main.go @@ -4,81 +4,12 @@ import ( "github.com/astaxie/beego" "github.com/vmware/harbor/dao" "github.com/vmware/harbor/job" - "github.com/vmware/harbor/utils/log" - "os" - "strconv" ) -const defaultMaxWorkers int = 10 - -type Worker struct { - ID int - RepJobs chan int64 - quit chan bool -} - -func (w *Worker) Start() { - go func() { - for { - WorkerPool <- w - select { - case jobID := <-w.RepJobs: - log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) - job.HandleRepJob(jobID) - case q := <-w.quit: - if q { - log.Debugf("worker: %d, will stop.", w.ID) - return - } - } - } - }() -} - -func (w *Worker) Stop() { - go func() { - w.quit <- true - }() -} - -var WorkerPool chan *Worker - func main() { dao.InitDB() initRouters() - initWorkerPool() - go dispatch() + job.InitWorkerPool() + go job.Dispatch() beego.Run() } - -func initWorkerPool() { - maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS") - maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32) - maxWorkers := int(maxWorkers64) - if err != nil { - log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers) - maxWorkers = defaultMaxWorkers - } - WorkerPool = make(chan *Worker, maxWorkers) - for i := 0; i < maxWorkers; i++ { - worker := &Worker{ - ID: i, - RepJobs: make(chan int64), - quit: make(chan bool), - } - worker.Start() - } -} - -func dispatch() { - for { - select { - case job := <-job.JobQueue: - go func(jobID int64) { - log.Debugf("Trying to dispatch job: %d", jobID) - worker := <-WorkerPool - worker.RepJobs <- jobID - }(job) - } - } -} diff --git a/jobservice/populate.sql b/jobservice/populate.sql index 64fb2d360..9c77c6c93 100644 --- a/jobservice/populate.sql +++ b/jobservice/populate.sql @@ -1,2 +1,3 @@ +use registry; insert into replication_target (name, url, username, password) values ('test', '192.168.0.2:5000', 'testuser', 'passw0rd'); insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW()); diff --git a/jobservice/start_db.sh b/jobservice/start_db.sh index 46695a557..686173325 100755 --- a/jobservice/start_db.sh +++ b/jobservice/start_db.sh @@ -1,2 +1,7 @@ #export MYQL_ROOT_PASSWORD=root123 docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev + +echo "sleep 10 seconds..." +sleep 10 + +mysql -h 127.0.0.1 -uroot -proot123 < ./populate.sql