Merge pull request #226 from reasonerjt/job-service

refactor, associate statemachine to worker and reuse it
This commit is contained in:
Daniel Jiang 2016-05-13 21:45:33 +08:00
commit c95fbb6e3f
8 changed files with 155 additions and 111 deletions

View File

@ -33,6 +33,11 @@ func (rj *ReplicationJob) Post() {
rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID))
return
}
if p == nil {
log.Errorf("Policy not found, id: %d", data.PolicyID)
rj.RenderError(http.StatusNotFound, fmt.Sprintf("Policy not found, id: %d", data.PolicyID))
return
}
repoList, err := getRepoList(p.ProjectID)
if err != nil {
log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err)

38
job/config/config.go Normal file
View File

@ -0,0 +1,38 @@
package config
import (
"github.com/vmware/harbor/utils/log"
"os"
"strconv"
)
const defaultMaxWorkers int = 10
var maxJobWorkers int
var localRegURL string
func init() {
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
maxJobWorkers = int(maxWorkers64)
if err != nil {
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
maxJobWorkers = defaultMaxWorkers
}
localRegURL := os.Getenv("LOCAL_REGISTRY_URL")
if len(localRegURL) == 0 {
localRegURL = "http://registry:5000/"
}
log.Debugf("config: maxJobWorkers: %d", maxJobWorkers)
log.Debugf("config: localRegURL: %s", localRegURL)
}
func MaxJobWorkers() int {
return maxJobWorkers
}
func LocalRegURL() string {
return localRegURL
}

View File

@ -1,32 +1,7 @@
package job
import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
var JobQueue chan int64 = make(chan int64)
func Schedule(jobID int64) {
JobQueue <- jobID
}
func HandleRepJob(id int64) {
sm := &JobSM{JobID: id}
err := sm.Init()
if err != nil {
log.Errorf("Failed to initialize statemachine, error: %v", err)
err2 := dao.UpdateRepJobStatus(id, models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
}
return
}
if sm.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled, will cancel the job")
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
} else {
sm.Start(models.JobRunning)
}
}

View File

@ -2,10 +2,10 @@ package job
import (
"fmt"
"os"
"sync"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/job/imgout"
"github.com/vmware/harbor/job/utils"
"github.com/vmware/harbor/models"
@ -137,12 +137,17 @@ func (sm *JobSM) setDesiredState(s string) {
sm.desiredState = s
}
func (sm *JobSM) Init() error {
func (sm *JobSM) Init() {
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
}
func (sm *JobSM) Reset(jid int64) error {
sm.JobID = jid
//init parms
regURL := os.Getenv("LOCAL_REGISTRY_URL")
if len(regURL) == 0 {
regURL = "http://registry:5000/"
}
job, err := dao.GetRepJob(sm.JobID)
if err != nil {
return fmt.Errorf("Failed to get job, error: %v", err)
@ -158,7 +163,7 @@ func (sm *JobSM) Init() error {
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
}
sm.Parms = &RepJobParm{
LocalRegURL: regURL,
LocalRegURL: config.LocalRegURL(),
Repository: job.Repository,
Enabled: policy.Enabled,
Operation: job.Operation,
@ -178,14 +183,8 @@ func (sm *JobSM) Init() error {
sm.Parms.TargetUsername = target.Username
sm.Parms.TargetPassword = target.Password
//init states handlers
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.Logger = utils.Logger{sm.JobID}
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
if sm.Parms.Operation == models.RepOpTransfer {
/*
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
@ -207,13 +206,11 @@ func addImgOutTransition(sm *JobSM) error {
if err != nil {
return err
}
sm.AddTransition(models.JobRunning, imgout.StateCheck, &imgout.Checker{BaseHandler: base})
sm.AddTransition(imgout.StateCheck, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
sm.AddTransition(imgout.StatePullManifest, imgout.StateTransferBlob, &imgout.BlobTransfer{BaseHandler: base})
sm.AddTransition(imgout.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(imgout.StateTransferBlob, imgout.StatePushManifest, &imgout.ManifestPusher{BaseHandler: base})
sm.AddTransition(imgout.StatePushManifest, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
return nil
}

92
job/workerpool.go Normal file
View File

@ -0,0 +1,92 @@
package job
import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
var WorkerPool chan *Worker
type Worker struct {
ID int
RepJobs chan int64
SM *JobSM
quit chan bool
}
func (w *Worker) Start() {
go func() {
for {
WorkerPool <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
func (w *Worker) handleRepJob(id int64) {
err := w.SM.Reset(id)
if err != nil {
log.Errorf("Worker %d, failed to re-initialize statemachine, error: %v", err)
err2 := dao.UpdateRepJobStatus(id, models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
}
return
}
if w.SM.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled, will cancel the job")
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
} else {
w.SM.Start(models.JobRunning)
}
}
func NewWorker(id int) *Worker {
w := &Worker{
ID: id,
RepJobs: make(chan int64),
quit: make(chan bool),
SM: &JobSM{},
}
w.SM.Init()
return w
}
func InitWorkerPool() {
WorkerPool = make(chan *Worker, config.MaxJobWorkers())
for i := 0; i < config.MaxJobWorkers(); i++ {
worker := NewWorker(i)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
}
func Dispatch() {
for {
select {
case job := <-JobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool
worker.RepJobs <- jobID
}(job)
}
}
}

View File

@ -4,81 +4,12 @@ import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/utils/log"
"os"
"strconv"
)
const defaultMaxWorkers int = 10
type Worker struct {
ID int
RepJobs chan int64
quit chan bool
}
func (w *Worker) Start() {
go func() {
for {
WorkerPool <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
job.HandleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
var WorkerPool chan *Worker
func main() {
dao.InitDB()
initRouters()
initWorkerPool()
go dispatch()
job.InitWorkerPool()
go job.Dispatch()
beego.Run()
}
func initWorkerPool() {
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
maxWorkers := int(maxWorkers64)
if err != nil {
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
maxWorkers = defaultMaxWorkers
}
WorkerPool = make(chan *Worker, maxWorkers)
for i := 0; i < maxWorkers; i++ {
worker := &Worker{
ID: i,
RepJobs: make(chan int64),
quit: make(chan bool),
}
worker.Start()
}
}
func dispatch() {
for {
select {
case job := <-job.JobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool
worker.RepJobs <- jobID
}(job)
}
}
}

View File

@ -1,2 +1,3 @@
use registry;
insert into replication_target (name, url, username, password) values ('test', '192.168.0.2:5000', 'testuser', 'passw0rd');
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());

View File

@ -1,2 +1,7 @@
#export MYQL_ROOT_PASSWORD=root123
docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev
echo "sleep 10 seconds..."
sleep 10
mysql -h 127.0.0.1 -uroot -proot123 < ./populate.sql