Merge remote-tracking branch 'upstream/job-service' into sync_image

This commit is contained in:
Wenkai Yin 2016-05-12 18:55:49 +08:00
commit e4f4ef76d1
4 changed files with 77 additions and 5 deletions

View File

@ -574,7 +574,7 @@ paths:
description: Retrieved tags from a relevant repository successfully.
500:
description: Unexpected internal errors.
/repositories/manifest:
/repositories/manifests:
get:
summary: Get manifests of a relevant repository.
description: |

View File

@ -6,9 +6,10 @@ import (
"github.com/vmware/harbor/utils/log"
)
var JobQueue chan int64 = make(chan int64)
func Schedule(jobID int64) {
//TODO: introduce jobqueue to better control concurrent job numbers
go HandleRepJob(jobID)
JobQueue <- jobID
}
func HandleRepJob(id int64) {

View File

@ -3,12 +3,82 @@ package main
import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
_ "github.com/vmware/harbor/job/imgout"
// "github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/utils/log"
"os"
"strconv"
)
const defaultMaxWorkers int = 10
type Worker struct {
ID int
RepJobs chan int64
quit chan bool
}
func (w *Worker) Start() {
go func() {
for {
WorkerPool <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
job.HandleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
var WorkerPool chan *Worker
func main() {
dao.InitDB()
initRouters()
initWorkerPool()
go dispatch()
beego.Run()
}
func initWorkerPool() {
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
maxWorkers := int(maxWorkers64)
if err != nil {
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
maxWorkers = defaultMaxWorkers
}
WorkerPool = make(chan *Worker, maxWorkers)
for i := 0; i < maxWorkers; i++ {
worker := &Worker{
ID: i,
RepJobs: make(chan int64),
quit: make(chan bool),
}
worker.Start()
}
}
func dispatch() {
for {
select {
case job := <-job.JobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool
worker.RepJobs <- jobID
}(job)
}
}
}

View File

@ -6,5 +6,6 @@ export LOG_LEVEL=debug
export UI_URL=http://127.0.0.1/
export UI_USR=admin
export UI_PWD=Harbor12345
export MAX_JOB_WORKERS=1
./jobservice