mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-26 20:26:13 +01:00
refactor, associate statemachine to worker and reuse it
This commit is contained in:
parent
0e4d358bd7
commit
dd4c346826
@ -33,6 +33,11 @@ func (rj *ReplicationJob) Post() {
|
||||
rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID))
|
||||
return
|
||||
}
|
||||
if p == nil {
|
||||
log.Errorf("Policy not found, id: %d", data.PolicyID)
|
||||
rj.RenderError(http.StatusNotFound, fmt.Sprintf("Policy not found, id: %d", data.PolicyID))
|
||||
return
|
||||
}
|
||||
repoList, err := getRepoList(p.ProjectID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err)
|
||||
|
38
job/config/config.go
Normal file
38
job/config/config.go
Normal file
@ -0,0 +1,38 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const defaultMaxWorkers int = 10
|
||||
|
||||
var maxJobWorkers int
|
||||
var localRegURL string
|
||||
|
||||
func init() {
|
||||
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
|
||||
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
|
||||
maxJobWorkers = int(maxWorkers64)
|
||||
if err != nil {
|
||||
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
|
||||
maxJobWorkers = defaultMaxWorkers
|
||||
}
|
||||
|
||||
localRegURL := os.Getenv("LOCAL_REGISTRY_URL")
|
||||
if len(localRegURL) == 0 {
|
||||
localRegURL = "http://registry:5000/"
|
||||
}
|
||||
|
||||
log.Debugf("config: maxJobWorkers: %d", maxJobWorkers)
|
||||
log.Debugf("config: localRegURL: %s", localRegURL)
|
||||
}
|
||||
|
||||
func MaxJobWorkers() int {
|
||||
return maxJobWorkers
|
||||
}
|
||||
|
||||
func LocalRegURL() string {
|
||||
return localRegURL
|
||||
}
|
@ -1,32 +1,7 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
)
|
||||
|
||||
var JobQueue chan int64 = make(chan int64)
|
||||
|
||||
func Schedule(jobID int64) {
|
||||
JobQueue <- jobID
|
||||
}
|
||||
|
||||
func HandleRepJob(id int64) {
|
||||
sm := &JobSM{JobID: id}
|
||||
err := sm.Init()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to initialize statemachine, error: %v", err)
|
||||
err2 := dao.UpdateRepJobStatus(id, models.JobError)
|
||||
if err2 != nil {
|
||||
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
|
||||
}
|
||||
return
|
||||
}
|
||||
if sm.Parms.Enabled == 0 {
|
||||
log.Debugf("The policy of job:%d is disabled, will cancel the job")
|
||||
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
|
||||
} else {
|
||||
sm.Start(models.JobRunning)
|
||||
}
|
||||
}
|
||||
|
@ -2,10 +2,10 @@ package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job/config"
|
||||
"github.com/vmware/harbor/job/imgout"
|
||||
"github.com/vmware/harbor/job/utils"
|
||||
"github.com/vmware/harbor/models"
|
||||
@ -137,12 +137,17 @@ func (sm *JobSM) setDesiredState(s string) {
|
||||
sm.desiredState = s
|
||||
}
|
||||
|
||||
func (sm *JobSM) Init() error {
|
||||
func (sm *JobSM) Init() {
|
||||
sm.lock = &sync.Mutex{}
|
||||
sm.Handlers = make(map[string]StateHandler)
|
||||
sm.Transitions = make(map[string]map[string]struct{})
|
||||
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
|
||||
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
|
||||
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
|
||||
}
|
||||
func (sm *JobSM) Reset(jid int64) error {
|
||||
sm.JobID = jid
|
||||
//init parms
|
||||
regURL := os.Getenv("LOCAL_REGISTRY_URL")
|
||||
if len(regURL) == 0 {
|
||||
regURL = "http://registry:5000/"
|
||||
}
|
||||
job, err := dao.GetRepJob(sm.JobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get job, error: %v", err)
|
||||
@ -158,7 +163,7 @@ func (sm *JobSM) Init() error {
|
||||
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
|
||||
}
|
||||
sm.Parms = &RepJobParm{
|
||||
LocalRegURL: regURL,
|
||||
LocalRegURL: config.LocalRegURL(),
|
||||
Repository: job.Repository,
|
||||
Enabled: policy.Enabled,
|
||||
Operation: job.Operation,
|
||||
@ -178,14 +183,8 @@ func (sm *JobSM) Init() error {
|
||||
sm.Parms.TargetUsername = target.Username
|
||||
sm.Parms.TargetPassword = target.Password
|
||||
//init states handlers
|
||||
sm.lock = &sync.Mutex{}
|
||||
sm.Handlers = make(map[string]StateHandler)
|
||||
sm.Transitions = make(map[string]map[string]struct{})
|
||||
sm.Logger = utils.Logger{sm.JobID}
|
||||
sm.CurrentState = models.JobPending
|
||||
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
|
||||
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
|
||||
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
|
||||
if sm.Parms.Operation == models.RepOpTransfer {
|
||||
/*
|
||||
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
|
||||
@ -207,13 +206,11 @@ func addImgOutTransition(sm *JobSM) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sm.AddTransition(models.JobRunning, imgout.StateCheck, &imgout.Checker{BaseHandler: base})
|
||||
sm.AddTransition(imgout.StateCheck, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
|
||||
sm.AddTransition(imgout.StatePullManifest, imgout.StateTransferBlob, &imgout.BlobTransfer{BaseHandler: base})
|
||||
sm.AddTransition(imgout.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
|
||||
sm.AddTransition(imgout.StateTransferBlob, imgout.StatePushManifest, &imgout.ManifestPusher{BaseHandler: base})
|
||||
sm.AddTransition(imgout.StatePushManifest, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
92
job/workerpool.go
Normal file
92
job/workerpool.go
Normal file
@ -0,0 +1,92 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job/config"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
)
|
||||
|
||||
var WorkerPool chan *Worker
|
||||
|
||||
type Worker struct {
|
||||
ID int
|
||||
RepJobs chan int64
|
||||
SM *JobSM
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func (w *Worker) Start() {
|
||||
go func() {
|
||||
for {
|
||||
WorkerPool <- w
|
||||
select {
|
||||
case jobID := <-w.RepJobs:
|
||||
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
|
||||
w.handleRepJob(jobID)
|
||||
case q := <-w.quit:
|
||||
if q {
|
||||
log.Debugf("worker: %d, will stop.", w.ID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Worker) Stop() {
|
||||
go func() {
|
||||
w.quit <- true
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Worker) handleRepJob(id int64) {
|
||||
err := w.SM.Reset(id)
|
||||
if err != nil {
|
||||
log.Errorf("Worker %d, failed to re-initialize statemachine, error: %v", err)
|
||||
err2 := dao.UpdateRepJobStatus(id, models.JobError)
|
||||
if err2 != nil {
|
||||
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
|
||||
}
|
||||
return
|
||||
}
|
||||
if w.SM.Parms.Enabled == 0 {
|
||||
log.Debugf("The policy of job:%d is disabled, will cancel the job")
|
||||
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
|
||||
} else {
|
||||
w.SM.Start(models.JobRunning)
|
||||
}
|
||||
}
|
||||
|
||||
func NewWorker(id int) *Worker {
|
||||
w := &Worker{
|
||||
ID: id,
|
||||
RepJobs: make(chan int64),
|
||||
quit: make(chan bool),
|
||||
SM: &JobSM{},
|
||||
}
|
||||
w.SM.Init()
|
||||
return w
|
||||
}
|
||||
|
||||
func InitWorkerPool() {
|
||||
WorkerPool = make(chan *Worker, config.MaxJobWorkers())
|
||||
for i := 0; i < config.MaxJobWorkers(); i++ {
|
||||
worker := NewWorker(i)
|
||||
worker.Start()
|
||||
log.Debugf("worker %d started", worker.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func Dispatch() {
|
||||
for {
|
||||
select {
|
||||
case job := <-JobQueue:
|
||||
go func(jobID int64) {
|
||||
log.Debugf("Trying to dispatch job: %d", jobID)
|
||||
worker := <-WorkerPool
|
||||
worker.RepJobs <- jobID
|
||||
}(job)
|
||||
}
|
||||
}
|
||||
}
|
@ -4,81 +4,12 @@ import (
|
||||
"github.com/astaxie/beego"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const defaultMaxWorkers int = 10
|
||||
|
||||
type Worker struct {
|
||||
ID int
|
||||
RepJobs chan int64
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func (w *Worker) Start() {
|
||||
go func() {
|
||||
for {
|
||||
WorkerPool <- w
|
||||
select {
|
||||
case jobID := <-w.RepJobs:
|
||||
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
|
||||
job.HandleRepJob(jobID)
|
||||
case q := <-w.quit:
|
||||
if q {
|
||||
log.Debugf("worker: %d, will stop.", w.ID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Worker) Stop() {
|
||||
go func() {
|
||||
w.quit <- true
|
||||
}()
|
||||
}
|
||||
|
||||
var WorkerPool chan *Worker
|
||||
|
||||
func main() {
|
||||
dao.InitDB()
|
||||
initRouters()
|
||||
initWorkerPool()
|
||||
go dispatch()
|
||||
job.InitWorkerPool()
|
||||
go job.Dispatch()
|
||||
beego.Run()
|
||||
}
|
||||
|
||||
func initWorkerPool() {
|
||||
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
|
||||
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
|
||||
maxWorkers := int(maxWorkers64)
|
||||
if err != nil {
|
||||
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
|
||||
maxWorkers = defaultMaxWorkers
|
||||
}
|
||||
WorkerPool = make(chan *Worker, maxWorkers)
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
worker := &Worker{
|
||||
ID: i,
|
||||
RepJobs: make(chan int64),
|
||||
quit: make(chan bool),
|
||||
}
|
||||
worker.Start()
|
||||
}
|
||||
}
|
||||
|
||||
func dispatch() {
|
||||
for {
|
||||
select {
|
||||
case job := <-job.JobQueue:
|
||||
go func(jobID int64) {
|
||||
log.Debugf("Trying to dispatch job: %d", jobID)
|
||||
worker := <-WorkerPool
|
||||
worker.RepJobs <- jobID
|
||||
}(job)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,3 @@
|
||||
use registry;
|
||||
insert into replication_target (name, url, username, password) values ('test', '192.168.0.2:5000', 'testuser', 'passw0rd');
|
||||
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());
|
||||
|
@ -1,2 +1,7 @@
|
||||
#export MYQL_ROOT_PASSWORD=root123
|
||||
docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev
|
||||
|
||||
echo "sleep 10 seconds..."
|
||||
sleep 10
|
||||
|
||||
mysql -h 127.0.0.1 -uroot -proot123 < ./populate.sql
|
||||
|
Loading…
Reference in New Issue
Block a user