Merge branch 'job-service' of https://github.com/vmware/harbor into job-service

This commit is contained in:
Tan Jiang 2016-05-23 19:37:42 +08:00
commit 71b4dde8a9
8 changed files with 181 additions and 31 deletions

View File

@ -64,6 +64,33 @@ func (rj *ReplicationJob) Post() {
}
}
type RepActionReq struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}
func (rj *ReplicationJob) HandleAction() {
var data RepActionReq
rj.DecodeJSONReq(&data)
//Currently only support stop action
if data.Action != "stop" {
log.Errorf("Unrecognized action: %s", data.Action)
rj.RenderError(http.StatusBadRequest, fmt.Sprintf("Unrecongized action: %s", data.Action))
return
}
jobs, err := dao.GetRepJobToStop(data.PolicyID)
if err != nil {
log.Errorf("Failed to get jobs to stop, error: %v", err)
rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop")
return
}
var jobIDList []int64
for _, j := range jobs {
jobIDList = append(jobIDList, j.ID)
}
job.WorkerPool.StopJobs(jobIDList)
}
// calls the api from UI to get repo list
func getRepoList(projectID int64) ([]string, error) {
uiURL := os.Getenv("UI_URL")

View File

@ -64,6 +64,7 @@ func GenerateRandomString() (string, error) {
//InitDB initializes the database
func InitDB() {
// orm.Debug = true
orm.RegisterDriver("mysql", orm.DRMySQL)
addr := os.Getenv("MYSQL_HOST")
port := os.Getenv("MYSQL_PORT")

View File

@ -25,13 +25,13 @@ import (
"github.com/vmware/harbor/utils/log"
)
func execUpdate(o orm.Ormer, sql string, params interface{}) error {
func execUpdate(o orm.Ormer, sql string, params ...interface{}) error {
p, err := o.Raw(sql).Prepare()
if err != nil {
return err
}
defer p.Close()
_, err = p.Exec(params)
_, err = p.Exec(params...)
if err != nil {
return err
}
@ -95,6 +95,19 @@ func clearUp(username string) {
o.Rollback()
log.Error(err)
}
err = execUpdate(o, `delete from replication_job where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_policy where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_target where id < 99`)
if err != nil {
log.Error(err)
}
o.Commit()
}
@ -856,19 +869,23 @@ func TestAddRepJob(t *testing.T) {
id, err := AddRepJob(job)
if err != nil {
t.Errorf("Error occurred in AddRepJob: %v", err)
return
} else {
jobID = id
}
j, err := GetRepJob(id)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)
return
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", id)
return
}
if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" {
t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, "+
"but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID)
return
}
}
@ -888,6 +905,11 @@ func TestUpdateRepJobStatus(t *testing.T) {
if j.Status != models.JobFinished {
t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID)
}
err = UpdateRepJobStatus(jobID, models.JobPending)
if err != nil {
t.Errorf("Error occured in UpdateRepJobStatus when update it back to status pending, error: %v, id: %d", err, jobID)
return
}
}
func TestGetRepPolicyByProject(t *testing.T) {
@ -941,6 +963,47 @@ func TestGetRepJobByPolicy(t *testing.T) {
}
}
func TestGetRepoJobToStop(t *testing.T) {
jobs := [...]models.RepJob{
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobFinished,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
},
}
var err error
for _, j := range jobs {
_, err = AddRepJob(j)
if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return
}
}
res, err := GetRepJobToStop(policyID)
if err != nil {
log.Errorf("Failed to Get Jobs, error: %v", err)
return
}
//time.Sleep(15 * time.Second)
if len(res) != 2 {
log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res))
return
}
}
func TestDeleteRepTarget(t *testing.T) {
err := DeleteRepTarget(targetID)
if err != nil {

View File

@ -102,11 +102,23 @@ func GetRepJob(id int64) (*models.RepJob, error) {
return &j, err
}
func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) {
o := orm.NewOrm()
var res []*models.RepJob
_, err := o.QueryTable("replication_job").Filter("policy_id", policyID).All(&res)
_, err := repJobPolicyIDQs(policyID).All(&res)
return res, err
}
// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy.
func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).Filter("status__in", models.JobPending, models.JobRunning).All(&res)
return res, err
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
o := orm.NewOrm()
return o.QueryTable("replication_job").Filter("policy_id", policyID)
}
func DeleteRepJob(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepJob{ID: id})

View File

@ -39,12 +39,12 @@ type JobSM struct {
// EnsterState transit the statemachine from the current state to the state in parameter.
// It returns the next state the statemachine should tranit to.
func (sm *JobSM) EnterState(s string) (string, error) {
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
_, isForced := sm.ForcedStates[s]
if !exist && !isForced {
return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
return "", fmt.Errorf("Job id: %d, transition from %s to %s does not exist!", sm.JobID, sm.CurrentState, s)
}
exitHandler, ok := sm.Handlers[sm.CurrentState]
if ok {
@ -52,7 +52,7 @@ func (sm *JobSM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next string = models.JobContinue
@ -62,11 +62,11 @@ func (sm *JobSM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("No handler found for state:%s, skip", s)
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s)
}
sm.PreviousState = sm.CurrentState
sm.CurrentState = s
log.Debugf("Transition succeeded, current state: %s", s)
log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s)
return next, nil
}
@ -75,10 +75,10 @@ func (sm *JobSM) EnterState(s string) (string, error) {
// will enter error state if there's more than one possible path when next state is "_continue"
func (sm *JobSM) Start(s string) {
n, err := sm.EnterState(s)
log.Debugf("next state from handler: %s", n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
for len(n) > 0 && err == nil {
if d := sm.getDesiredState(); len(d) > 0 {
log.Debugf("Desired state: %s, will ignore the next state from handler")
log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d)
n = d
sm.setDesiredState("")
continue
@ -87,19 +87,19 @@ func (sm *JobSM) Start(s string) {
for n = range sm.Transitions[sm.CurrentState] {
break
}
log.Debugf("Continue to state: %s", n)
log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n)
continue
}
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState]))
log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState]))
err = fmt.Errorf("Unable to continue")
break
}
n, err = sm.EnterState(n)
log.Debugf("next state from handler: %s", n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
}
if err != nil {
log.Warningf("The statemachin will enter error state due to error: %v", err)
log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err)
sm.EnterState(models.JobError)
}
}
@ -121,8 +121,17 @@ func (sm *JobSM) RemoveTransition(from string, to string) {
delete(sm.Transitions[from], to)
}
func (sm *JobSM) Stop() {
sm.setDesiredState(models.JobStopped)
func (sm *JobSM) Stop(id int64) {
log.Debugf("Trying to stop the job: %d", id)
sm.lock.Lock()
defer sm.lock.Unlock()
//need to check if the sm switched to other job
if id == sm.JobID {
sm.desiredState = models.JobStopped
log.Debugf("Desired state of job %d is set to stopped", id)
} else {
log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id)
}
}
func (sm *JobSM) getDesiredState() string {
@ -141,14 +150,19 @@ func (sm *JobSM) Init() {
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.ForcedStates = map[string]struct{}{
models.JobError: struct{}{},
models.JobStopped: struct{}{},
models.JobCanceled: struct{}{},
}
}
func (sm *JobSM) Reset(jid int64) error {
sm.JobID = jid
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
func (sm *JobSM) Reset(jid int64) error {
//To ensure the new jobID is visible to the thread to stop the SM
sm.lock.Lock()
sm.JobID = jid
sm.desiredState = ""
sm.lock.Unlock()
//init parms
job, err := dao.GetRepJob(sm.JobID)
@ -172,7 +186,7 @@ func (sm *JobSM) Reset(jid int64) error {
Operation: job.Operation,
}
if policy.Enabled == 0 {
//handler will cancel this job
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
@ -188,16 +202,23 @@ func (sm *JobSM) Reset(jid int64) error {
//init states handlers
sm.Logger = utils.Logger{sm.JobID}
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
if sm.Parms.Operation == models.RepOpTransfer {
/*
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
//only handle on target for now
//only handle one target for now
sm.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: DummyHandler{JobID: sm.JobID}, targetURL: sm.Parms.TargetURL, logger: sm.Logger})
sm.AddTransition("push-img", models.JobFinished, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
*/
if err = addImgOutTransition(sm); err != nil {
return err
}
}
return nil
}

View File

@ -7,7 +7,24 @@ import (
"github.com/vmware/harbor/utils/log"
)
var WorkerPool chan *Worker
type workerPool struct {
workerChan chan *Worker
workerList []*Worker
}
var WorkerPool *workerPool
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
}
}
}
}
type Worker struct {
ID int
@ -19,7 +36,7 @@ type Worker struct {
func (w *Worker) Start() {
go func() {
for {
WorkerPool <- w
WorkerPool.workerChan <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
@ -43,10 +60,10 @@ func (w *Worker) Stop() {
func (w *Worker) handleRepJob(id int64) {
err := w.SM.Reset(id)
if err != nil {
log.Errorf("Worker %d, failed to re-initialize statemachine, error: %v", w.ID, err)
log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err)
err2 := dao.UpdateRepJobStatus(id, models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2)
}
return
}
@ -70,9 +87,13 @@ func NewWorker(id int) *Worker {
}
func InitWorkerPool() {
WorkerPool = make(chan *Worker, config.MaxJobWorkers())
WorkerPool = &workerPool{
workerChan: make(chan *Worker, config.MaxJobWorkers()),
workerList: make([]*Worker, 0, config.MaxJobWorkers()),
}
for i := 0; i < config.MaxJobWorkers(); i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
@ -84,7 +105,7 @@ func Dispatch() {
case job := <-JobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool
worker := <-WorkerPool.workerChan
worker.RepJobs <- jobID
}(job)
}

View File

@ -8,4 +8,5 @@ import (
func initRouters() {
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction")
}

4
jobservice/stop.json Normal file
View File

@ -0,0 +1,4 @@
{
"policy_id":1,
"action":"stop"
}