Merge pull request #135 from reasonerjt/job-service

jobs management api
This commit is contained in:
Daniel Jiang 2016-04-20 14:43:16 +08:00
commit 406f890053
15 changed files with 540 additions and 0 deletions

View File

@ -102,6 +102,31 @@ create table access_log (
FOREIGN KEY (project_id) REFERENCES project (project_id) FOREIGN KEY (project_id) REFERENCES project (project_id)
); );
create table job (
job_id int NOT NULL AUTO_INCREMENT,
job_type varchar(64) NOT NULL,
status varchar(64) NOT NULL,
options text,
parms text,
enabled tinyint(1) NOT NULL DEFAULT 1,
cron_str varchar(256),
triggered_by varchar(64),
creation_time timestamp,
update_time timestamp,
PRIMARY KEY (job_id)
);
create table job_log (
log_id int NOT NULL AUTO_INCREMENT,
job_id int NOT NULL,
level varchar(64) NOT NULL,
message text,
creation_time timestamp,
update_time timestamp,
PRIMARY KEY (log_id),
FOREIGN KEY (job_id) REFERENCES job (job_id)
);
create table properties ( create table properties (
k varchar(64) NOT NULL, k varchar(64) NOT NULL,
v varchar(128) NOT NULL, v varchar(128) NOT NULL,

85
api/job.go Normal file
View File

@ -0,0 +1,85 @@
package api
import (
"encoding/json"
"fmt"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"net/http"
"strconv"
)
type JobAPI struct {
BaseAPI
}
func (ja *JobAPI) Post() {
var je models.JobEntry
ja.DecodeJSONReq(&je)
res, err := json.Marshal(je.Options)
if !job.RunnerExists(je.Type) {
log.Errorf("runner for type %s is not registered", je.Type)
ja.RenderError(http.StatusBadRequest, fmt.Sprintf("runner for type %s is not registered", je.Type))
return
}
je.OptionsStr = string(res)
if err != nil {
log.Warningf("Error marshaling options: %v", err)
}
res, err = json.Marshal(je.Parms)
je.ParmsStr = string(res)
if err != nil {
log.Warningf("Error marshaling parms: %v", err)
}
jobID, err := dao.AddJob(je)
if err != nil {
log.Errorf("Failed to add job to DB, error: %v", err)
ja.RenderError(http.StatusInternalServerError, "Failed to add job")
return
}
je.ID = jobID
log.Debugf("job Id:%d, type: %s", je.ID, je.Type)
job.Schedule(je)
}
func (ja *JobAPI) Get() {
idStr := ja.Ctx.Input.Param(":id")
if len(idStr) > 0 {
jobID, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
log.Errorf("Failed to parse job id in url: %s", idStr)
ja.RenderError(http.StatusBadRequest, "invalid job id")
return
}
je, err := dao.GetJob(jobID)
if err != nil {
log.Errorf("Failed to query job from db, error: %v", err)
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
return
}
if je == nil {
log.Errorf("job does not exist, id: %d", jobID)
ja.RenderError(http.StatusNotFound, "")
return
}
logs, err := dao.GetJobLogs(jobID)
if err != nil {
log.Errorf("Failed to get job logs, error: %v", err)
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
return
}
je.Logs = logs
ja.Data["json"] = je
} else {
jobs, err := dao.ListJobs()
if err != nil {
log.Errorf("Failed to list jobs, error:%v", err)
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
}
log.Debugf("jobs: %v", jobs)
ja.Data["json"] = jobs
}
ja.ServeJSON()
}

84
dao/job.go Normal file
View File

@ -0,0 +1,84 @@
package dao
import (
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
const (
JobPending string = "pending"
JobRunning string = "running"
JobError string = "error"
JobStopped string = "stopped"
JobFinished string = "finished"
)
func AddJob(entry models.JobEntry) (int64, error) {
sql := `insert into job (job_type, status, options, parms, cron_str, creation_time, update_time) values (?,"pending",?,?,?,NOW(),NOW())`
o := orm.NewOrm()
p, err := o.Raw(sql).Prepare()
if err != nil {
return 0, err
}
r, err := p.Exec(entry.Type, entry.OptionsStr, entry.ParmsStr, entry.CronStr)
if err != nil {
return 0, err
}
id, err := r.LastInsertId()
return id, err
}
func AddJobLog(id int64, level string, message string) error {
sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())`
log.Debugf("trying to add a log for job:%d", id)
o := orm.NewOrm()
p, err := o.Raw(sql).Prepare()
if err != nil {
return err
}
_, err = p.Exec(id, level, message)
return err
}
func UpdateJobStatus(id int64, status string) error {
o := orm.NewOrm()
sql := "update job set status=?, update_time=NOW() where job_id=?"
_, err := o.Raw(sql, status, id).Exec()
return err
}
func ListJobs() ([]models.JobEntry, error) {
o := orm.NewOrm()
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j`
var res []models.JobEntry
_, err := o.Raw(sql).QueryRows(&res)
if err != nil {
return nil, err
}
return res, err
}
func GetJob(id int64) (*models.JobEntry, error) {
o := orm.NewOrm()
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j where j.job_id = ?`
var res []models.JobEntry
p := make([]interface{}, 1)
p = append(p, id)
n, err := o.Raw(sql, p).QueryRows(&res)
if n == 0 {
return nil, err
}
return &res[0], err
}
func GetJobLogs(jobID int64) ([]models.JobLog, error) {
o := orm.NewOrm()
var res []models.JobLog
p := make([]interface{}, 1)
p = append(p, jobID)
sql := `select l.log_id, l.job_id, l.level, l.message, l.creation_time, l.update_time from job_log l where l.job_id = ?`
_, err := o.Raw(sql, p).QueryRows(&res)
return res, err
}

13
job/imgout/parm.go Normal file
View File

@ -0,0 +1,13 @@
package imgout
type ImgOutParm struct {
Secret string `json:"secret"`
Image string `json:"image"`
Targets []*RegistryInfo `json:"targets"`
}
type RegistryInfo struct {
URL string `json:"url"`
Username string `json:"username"`
Password string `json:"password"`
}

82
job/imgout/runner.go Normal file
View File

@ -0,0 +1,82 @@
package imgout
import (
"encoding/json"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"time"
)
const (
jobType = "transfer_img_out"
)
type Runner struct {
job.JobSM
Logger job.Logger
parm ImgOutParm
}
type ImgPuller struct {
job.DummyHandler
img string
logger job.Logger
}
func (ip ImgPuller) Enter() error {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 10s", ip.img)
time.Sleep(10 * time.Second)
ip.logger.Infof("wake up from sleep....")
return nil
}
type ImgPusher struct {
job.DummyHandler
targetURL string
logger job.Logger
}
func (ip ImgPusher) Enter() error {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 10s", ip.targetURL)
time.Sleep(10 * time.Second)
ip.logger.Infof("wake up from sleep....")
return nil
}
func init() {
job.Register(jobType, Runner{})
}
func (r Runner) Run(je models.JobEntry) error {
err := r.init(je)
if err != nil {
return err
}
path := []string{dao.JobRunning, "pull-img", "push-img", dao.JobFinished}
for _, state := range path {
err := r.EnterState(state)
if err != nil {
r.Logger.Errorf("Error durint transition to state: %s, error: %v", state, err)
r.EnterState(dao.JobError)
break
}
}
return nil
}
func (r *Runner) init(je models.JobEntry) error {
r.JobID = je.ID
r.InitJobSM()
err := json.Unmarshal([]byte(je.ParmsStr), &r.parm)
if err != nil {
return err
}
r.Logger = job.Logger{je.ID}
r.AddTransition(dao.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger})
//only handle on target for now
url := r.parm.Targets[0].URL
r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger})
r.AddTransition("push-img", dao.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, dao.JobFinished})
return nil
}

38
job/logger.go Normal file
View File

@ -0,0 +1,38 @@
package job
import (
"fmt"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/utils/log"
)
const (
INFO = "info"
WARN = "warning"
ERR = "error"
)
type Logger struct {
ID int64
}
func (l *Logger) Infof(format string, v ...interface{}) {
err := dao.AddJobLog(l.ID, INFO, fmt.Sprintf(format, v...))
if err != nil {
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
}
}
func (l *Logger) Warningf(format string, v ...interface{}) {
err := dao.AddJobLog(l.ID, WARN, fmt.Sprintf(format, v...))
if err != nil {
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
}
}
func (l *Logger) Errorf(format string, v ...interface{}) {
err := dao.AddJobLog(l.ID, ERR, fmt.Sprintf(format, v...))
if err != nil {
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
}
}

36
job/runner.go Normal file
View File

@ -0,0 +1,36 @@
package job
import (
"fmt"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"sync"
)
type JobRunner interface {
Run(je models.JobEntry) error
}
var runners map[string]*JobRunner = make(map[string]*JobRunner)
var runnerLock = &sync.Mutex{}
func Register(jobType string, runner JobRunner) {
runnerLock.Lock()
defer runnerLock.Unlock()
runners[jobType] = &runner
log.Debugf("runnter for job type:%s has been registered", jobType)
}
func RunnerExists(jobType string) bool {
_, ok := runners[jobType]
return ok
}
func run(je models.JobEntry) error {
runner, ok := runners[je.Type]
if !ok {
return fmt.Errorf("Runner for job type: %s does not exist")
}
(*runner).Run(je)
return nil
}

12
job/scheduler.go Normal file
View File

@ -0,0 +1,12 @@
package job
import (
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
func Schedule(job models.JobEntry) {
log.Infof("job: %d will be scheduled", job.ID)
//TODO: add support for cron string when needed.
go run(job)
}

104
job/statemachine.go Normal file
View File

@ -0,0 +1,104 @@
package job
import (
"fmt"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/utils/log"
)
type StateHandler interface {
Enter() error
//Exit should be idempotent
Exit() error
}
type DummyHandler struct {
JobID int64
}
func (dh DummyHandler) Enter() error {
return nil
}
func (dh DummyHandler) Exit() error {
return nil
}
type StatusUpdater struct {
DummyHandler
State string
}
func (su StatusUpdater) Enter() error {
err := dao.UpdateJobStatus(su.JobID, su.State)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
}
return err
}
type JobSM struct {
JobID int64
CurrentState string
PreviousState string
//The states that don't have to exist in transition map, such as "Error", "Canceled"
ForcedStates map[string]struct{}
Transitions map[string]map[string]struct{}
Handlers map[string]StateHandler
}
func (sm *JobSM) EnterState(s string) error {
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
_, isForced := sm.ForcedStates[s]
if !exist && !isForced {
return fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
}
exitHandler, ok := sm.Handlers[sm.CurrentState]
if ok {
if err := exitHandler.Exit(); err != nil {
return err
}
} else {
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
if ok {
if err := enterHandler.Enter(); err != nil {
return err
}
} else {
log.Debugf("No handler found for state:%s, skip", s)
}
sm.PreviousState = sm.CurrentState
sm.CurrentState = s
log.Debugf("Transition succeeded, current state: %s", s)
return nil
}
func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
_, ok := sm.Transitions[from]
if !ok {
sm.Transitions[from] = make(map[string]struct{})
}
sm.Transitions[from][to] = struct{}{}
sm.Handlers[to] = h
}
func (sm *JobSM) RemoveTransition(from string, to string) {
_, ok := sm.Transitions[from]
if !ok {
return
}
delete(sm.Transitions[from], to)
}
func (sm *JobSM) InitJobSM() {
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.CurrentState = dao.JobPending
log.Debugf("sm.Handlers: %v", sm.Handlers)
sm.AddTransition(dao.JobPending, dao.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobRunning})
sm.Handlers[dao.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobError}
}

10
jobservice/error.json Normal file
View File

@ -0,0 +1,10 @@
{
"job_type": "notexist",
"options": {
"whatever": "whatever"
},
"parms": {
"test": "test"
},
"cron_str": ""
}

14
jobservice/main.go Normal file
View File

@ -0,0 +1,14 @@
package main
import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
_ "github.com/vmware/harbor/job/imgout"
// "github.com/vmware/harbor/utils/log"
)
func main() {
dao.InitDB()
initRouters()
beego.Run()
}

7
jobservice/my_start.sh Executable file
View File

@ -0,0 +1,7 @@
export MYSQL_HOST=127.0.0.1
export MYSQL_PORT=3306
export MYSQL_USR=root
export MYSQL_PWD=root123
export LOG_LEVEL=debug
./jobservice

11
jobservice/router.go Normal file
View File

@ -0,0 +1,11 @@
package main
import (
"github.com/vmware/harbor/api"
"github.com/astaxie/beego"
)
func initRouters() {
beego.Router("/api/jobs/?:id", &api.JobAPI{})
}

2
jobservice/start_db.sh Executable file
View File

@ -0,0 +1,2 @@
#export MYQL_ROOT_PASSWORD=root123
docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev

17
jobservice/test.json Normal file
View File

@ -0,0 +1,17 @@
{
"job_type": "transfer_img_out",
"options": {
"whatever": "whatever"
},
"parms": {
"secret": "mysecret",
"image": "ubuntu",
"targets": [{
"url": "127.0.0.1:5000",
"username": "admin",
"password": "admin"
}]
},
"cron_str": ""
}