mirror of
https://github.com/goharbor/harbor.git
synced 2024-09-30 06:18:02 +02:00
introduce worker pool to handle jobs
This commit is contained in:
parent
2752213ed1
commit
5c5643b9ce
@ -6,9 +6,10 @@ import (
|
|||||||
"github.com/vmware/harbor/utils/log"
|
"github.com/vmware/harbor/utils/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var JobQueue chan int64 = make(chan int64)
|
||||||
|
|
||||||
func Schedule(jobID int64) {
|
func Schedule(jobID int64) {
|
||||||
//TODO: introduce jobqueue to better control concurrent job numbers
|
JobQueue <- jobID
|
||||||
go HandleRepJob(jobID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleRepJob(id int64) {
|
func HandleRepJob(id int64) {
|
||||||
|
@ -3,12 +3,82 @@ package main
|
|||||||
import (
|
import (
|
||||||
"github.com/astaxie/beego"
|
"github.com/astaxie/beego"
|
||||||
"github.com/vmware/harbor/dao"
|
"github.com/vmware/harbor/dao"
|
||||||
_ "github.com/vmware/harbor/job/imgout"
|
"github.com/vmware/harbor/job"
|
||||||
// "github.com/vmware/harbor/utils/log"
|
"github.com/vmware/harbor/utils/log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultMaxWorkers int = 10
|
||||||
|
|
||||||
|
type Worker struct {
|
||||||
|
ID int
|
||||||
|
RepJobs chan int64
|
||||||
|
quit chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Start() {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
WorkerPool <- w
|
||||||
|
select {
|
||||||
|
case jobID := <-w.RepJobs:
|
||||||
|
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
|
||||||
|
job.HandleRepJob(jobID)
|
||||||
|
case q := <-w.quit:
|
||||||
|
if q {
|
||||||
|
log.Debugf("worker: %d, will stop.", w.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Stop() {
|
||||||
|
go func() {
|
||||||
|
w.quit <- true
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var WorkerPool chan *Worker
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
dao.InitDB()
|
dao.InitDB()
|
||||||
initRouters()
|
initRouters()
|
||||||
|
initWorkerPool()
|
||||||
|
go dispatch()
|
||||||
beego.Run()
|
beego.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func initWorkerPool() {
|
||||||
|
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
|
||||||
|
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
|
||||||
|
maxWorkers := int(maxWorkers64)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
|
||||||
|
maxWorkers = defaultMaxWorkers
|
||||||
|
}
|
||||||
|
WorkerPool = make(chan *Worker, maxWorkers)
|
||||||
|
for i := 0; i < maxWorkers; i++ {
|
||||||
|
worker := &Worker{
|
||||||
|
ID: i,
|
||||||
|
RepJobs: make(chan int64),
|
||||||
|
quit: make(chan bool),
|
||||||
|
}
|
||||||
|
worker.Start()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dispatch() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case job := <-job.JobQueue:
|
||||||
|
go func(jobID int64) {
|
||||||
|
log.Debugf("Trying to dispatch job: %d", jobID)
|
||||||
|
worker := <-WorkerPool
|
||||||
|
worker.RepJobs <- jobID
|
||||||
|
}(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -6,5 +6,6 @@ export LOG_LEVEL=debug
|
|||||||
export UI_URL=http://127.0.0.1/
|
export UI_URL=http://127.0.0.1/
|
||||||
export UI_USR=admin
|
export UI_USR=admin
|
||||||
export UI_PWD=Harbor12345
|
export UI_PWD=Harbor12345
|
||||||
|
export MAX_JOB_WORKERS=1
|
||||||
|
|
||||||
./jobservice
|
./jobservice
|
||||||
|
Loading…
Reference in New Issue
Block a user