Merge pull request #251 from reasonerjt/job-service

stopping running jobs under a policy
This commit is contained in:
Daniel Jiang 2016-05-19 16:17:04 +08:00
commit 37d732bafc
7 changed files with 154 additions and 25 deletions

View File

@ -64,6 +64,33 @@ func (rj *ReplicationJob) Post() {
}
}
type RepActionReq struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}
func (rj *ReplicationJob) HandleAction() {
var data RepActionReq
rj.DecodeJSONReq(&data)
//Currently only support stop action
if data.Action != "stop" {
log.Errorf("Unrecognized action: %s", data.Action)
rj.RenderError(http.StatusBadRequest, fmt.Sprintf("Unrecongized action: %s", data.Action))
return
}
jobs, err := dao.GetRepJobToStop(data.PolicyID)
if err != nil {
log.Errorf("Failed to get jobs to stop, error: %v", err)
rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop")
return
}
var jobIDList []int64
for _, j := range jobs {
jobIDList = append(jobIDList, j.ID)
}
job.WorkerPool.StopJobs(jobIDList)
}
// calls the api from UI to get repo list
func getRepoList(projectID int64) ([]string, error) {
uiURL := os.Getenv("UI_URL")

View File

@ -64,6 +64,7 @@ func GenerateRandomString() (string, error) {
//InitDB initializes the database
func InitDB() {
// orm.Debug = true
orm.RegisterDriver("mysql", orm.DRMySQL)
addr := os.Getenv("MYSQL_HOST")
port := os.Getenv("MYSQL_PORT")

View File

@ -25,13 +25,13 @@ import (
"github.com/vmware/harbor/utils/log"
)
func execUpdate(o orm.Ormer, sql string, params interface{}) error {
func execUpdate(o orm.Ormer, sql string, params ...interface{}) error {
p, err := o.Raw(sql).Prepare()
if err != nil {
return err
}
defer p.Close()
_, err = p.Exec(params)
_, err = p.Exec(params...)
if err != nil {
return err
}
@ -95,6 +95,19 @@ func clearUp(username string) {
o.Rollback()
log.Error(err)
}
err = execUpdate(o, `delete from replication_job where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_policy where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_target where id < 99`)
if err != nil {
log.Error(err)
}
o.Commit()
}
@ -876,19 +889,23 @@ func TestAddRepJob(t *testing.T) {
id, err := AddRepJob(job)
if err != nil {
t.Errorf("Error occurred in AddRepJob: %v", err)
return
} else {
jobID = id
}
j, err := GetRepJob(id)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)
return
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", id)
return
}
if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" {
t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, "+
"but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID)
return
}
}
@ -908,6 +925,11 @@ func TestUpdateRepJobStatus(t *testing.T) {
if j.Status != models.JobFinished {
t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID)
}
err = UpdateRepJobStatus(jobID, models.JobPending)
if err != nil {
t.Errorf("Error occured in UpdateRepJobStatus when update it back to status pending, error: %v, id: %d", err, jobID)
return
}
}
func TestGetRepPolicyByProject(t *testing.T) {
@ -961,6 +983,47 @@ func TestGetRepJobByPolicy(t *testing.T) {
}
}
func TestGetRepoJobToStop(t *testing.T) {
jobs := [...]models.RepJob{
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobFinished,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
},
}
var err error
for _, j := range jobs {
_, err = AddRepJob(j)
if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return
}
}
res, err := GetRepJobToStop(policyID)
if err != nil {
log.Errorf("Failed to Get Jobs, error: %v", err)
return
}
//time.Sleep(15 * time.Second)
if len(res) != 2 {
log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res))
return
}
}
func TestDeleteRepTarget(t *testing.T) {
err := DeleteRepTarget(targetID)
if err != nil {

View File

@ -102,11 +102,23 @@ func GetRepJob(id int64) (*models.RepJob, error) {
return &j, err
}
func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) {
o := orm.NewOrm()
var res []*models.RepJob
_, err := o.QueryTable("replication_job").Filter("policy_id", policyID).All(&res)
_, err := repJobPolicyIDQs(policyID).All(&res)
return res, err
}
// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy.
func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).Filter("status__in", models.JobPending, models.JobRunning).All(&res)
return res, err
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
o := orm.NewOrm()
return o.QueryTable("replication_job").Filter("policy_id", policyID)
}
func DeleteRepJob(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepJob{ID: id})

View File

@ -39,12 +39,12 @@ type JobSM struct {
// EnsterState transit the statemachine from the current state to the state in parameter.
// It returns the next state the statemachine should tranit to.
func (sm *JobSM) EnterState(s string) (string, error) {
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
_, isForced := sm.ForcedStates[s]
if !exist && !isForced {
return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
return "", fmt.Errorf("Job id: %d, transition from %s to %s does not exist!", sm.JobID, sm.CurrentState, s)
}
exitHandler, ok := sm.Handlers[sm.CurrentState]
if ok {
@ -52,7 +52,7 @@ func (sm *JobSM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next string = models.JobContinue
@ -62,11 +62,11 @@ func (sm *JobSM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("No handler found for state:%s, skip", s)
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s)
}
sm.PreviousState = sm.CurrentState
sm.CurrentState = s
log.Debugf("Transition succeeded, current state: %s", s)
log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s)
return next, nil
}
@ -75,10 +75,10 @@ func (sm *JobSM) EnterState(s string) (string, error) {
// will enter error state if there's more than one possible path when next state is "_continue"
func (sm *JobSM) Start(s string) {
n, err := sm.EnterState(s)
log.Debugf("next state from handler: %s", n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
for len(n) > 0 && err == nil {
if d := sm.getDesiredState(); len(d) > 0 {
log.Debugf("Desired state: %s, will ignore the next state from handler")
log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d)
n = d
sm.setDesiredState("")
continue
@ -87,19 +87,19 @@ func (sm *JobSM) Start(s string) {
for n = range sm.Transitions[sm.CurrentState] {
break
}
log.Debugf("Continue to state: %s", n)
log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n)
continue
}
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState]))
log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState]))
err = fmt.Errorf("Unable to continue")
break
}
n, err = sm.EnterState(n)
log.Debugf("next state from handler: %s", n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
}
if err != nil {
log.Warningf("The statemachin will enter error state due to error: %v", err)
log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err)
sm.EnterState(models.JobError)
}
}
@ -121,8 +121,17 @@ func (sm *JobSM) RemoveTransition(from string, to string) {
delete(sm.Transitions[from], to)
}
func (sm *JobSM) Stop() {
sm.setDesiredState(models.JobStopped)
func (sm *JobSM) Stop(id int64) {
log.Debugf("Trying to stop the job: %d", id)
sm.lock.Lock()
defer sm.lock.Unlock()
//need to check if the sm switched to other job
if id == sm.JobID {
sm.desiredState = models.JobStopped
log.Debugf("Desired state of job %d is set to stopped", id)
} else {
log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id)
}
}
func (sm *JobSM) getDesiredState() string {
@ -141,14 +150,19 @@ func (sm *JobSM) Init() {
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.ForcedStates = map[string]struct{}{
models.JobError: struct{}{},
models.JobStopped: struct{}{},
models.JobCanceled: struct{}{},
}
}
func (sm *JobSM) Reset(jid int64) error {
sm.JobID = jid
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
func (sm *JobSM) Reset(jid int64) error {
//To ensure the new jobID is visible to the thread to stop the SM
sm.lock.Lock()
sm.JobID = jid
sm.desiredState = ""
sm.lock.Unlock()
//init parms
job, err := dao.GetRepJob(sm.JobID)
@ -172,7 +186,7 @@ func (sm *JobSM) Reset(jid int64) error {
Operation: job.Operation,
}
if policy.Enabled == 0 {
//handler will cancel this job
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
@ -188,16 +202,23 @@ func (sm *JobSM) Reset(jid int64) error {
//init states handlers
sm.Logger = utils.Logger{sm.JobID}
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
if sm.Parms.Operation == models.RepOpTransfer {
/*
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
//only handle on target for now
//only handle one target for now
sm.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: DummyHandler{JobID: sm.JobID}, targetURL: sm.Parms.TargetURL, logger: sm.Logger})
sm.AddTransition("push-img", models.JobFinished, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
*/
if err = addImgOutTransition(sm); err != nil {
return err
}
}
return nil
}

View File

@ -8,4 +8,5 @@ import (
func initRouters() {
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction")
}

4
jobservice/stop.json Normal file
View File

@ -0,0 +1,4 @@
{
"policy_id":1,
"action":"stop"
}