diff --git a/Deploy/db/registry.sql b/Deploy/db/registry.sql index 9e4a342b3..da3850c55 100644 --- a/Deploy/db/registry.sql +++ b/Deploy/db/registry.sql @@ -102,6 +102,31 @@ create table access_log ( FOREIGN KEY (project_id) REFERENCES project (project_id) ); +create table job ( + job_id int NOT NULL AUTO_INCREMENT, + job_type varchar(64) NOT NULL, + status varchar(64) NOT NULL, + options text, + parms text, + enabled tinyint(1) NOT NULL DEFAULT 1, + cron_str varchar(256), + triggered_by varchar(64), + creation_time timestamp, + update_time timestamp, + PRIMARY KEY (job_id) +); + +create table job_log ( + log_id int NOT NULL AUTO_INCREMENT, + job_id int NOT NULL, + level varchar(64) NOT NULL, + message text, + creation_time timestamp, + update_time timestamp, + PRIMARY KEY (log_id), + FOREIGN KEY (job_id) REFERENCES job (job_id) + ); + create table properties ( k varchar(64) NOT NULL, v varchar(128) NOT NULL, diff --git a/api/job.go b/api/job.go new file mode 100644 index 000000000..5d5066a5f --- /dev/null +++ b/api/job.go @@ -0,0 +1,85 @@ +package api + +import ( + "encoding/json" + "fmt" + "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/job" + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" + "net/http" + "strconv" +) + +type JobAPI struct { + BaseAPI +} + +func (ja *JobAPI) Post() { + var je models.JobEntry + ja.DecodeJSONReq(&je) + res, err := json.Marshal(je.Options) + if !job.RunnerExists(je.Type) { + log.Errorf("runner for type %s is not registered", je.Type) + ja.RenderError(http.StatusBadRequest, fmt.Sprintf("runner for type %s is not registered", je.Type)) + return + } + je.OptionsStr = string(res) + if err != nil { + log.Warningf("Error marshaling options: %v", err) + } + res, err = json.Marshal(je.Parms) + je.ParmsStr = string(res) + if err != nil { + log.Warningf("Error marshaling parms: %v", err) + } + jobID, err := dao.AddJob(je) + if err != nil { + log.Errorf("Failed to add job to DB, error: %v", err) + ja.RenderError(http.StatusInternalServerError, "Failed to add job") + return + } + je.ID = jobID + log.Debugf("job Id:%d, type: %s", je.ID, je.Type) + job.Schedule(je) +} + +func (ja *JobAPI) Get() { + idStr := ja.Ctx.Input.Param(":id") + if len(idStr) > 0 { + jobID, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + log.Errorf("Failed to parse job id in url: %s", idStr) + ja.RenderError(http.StatusBadRequest, "invalid job id") + return + } + je, err := dao.GetJob(jobID) + if err != nil { + log.Errorf("Failed to query job from db, error: %v", err) + ja.RenderError(http.StatusInternalServerError, "Failed to query job") + return + } + if je == nil { + log.Errorf("job does not exist, id: %d", jobID) + ja.RenderError(http.StatusNotFound, "") + return + } + logs, err := dao.GetJobLogs(jobID) + if err != nil { + log.Errorf("Failed to get job logs, error: %v", err) + ja.RenderError(http.StatusInternalServerError, "Failed to query job") + return + } + je.Logs = logs + ja.Data["json"] = je + } else { + jobs, err := dao.ListJobs() + if err != nil { + log.Errorf("Failed to list jobs, error:%v", err) + ja.RenderError(http.StatusInternalServerError, "Failed to query job") + } + log.Debugf("jobs: %v", jobs) + ja.Data["json"] = jobs + } + ja.ServeJSON() +} diff --git a/dao/job.go b/dao/job.go new file mode 100644 index 000000000..82e0a1e47 --- /dev/null +++ b/dao/job.go @@ -0,0 +1,84 @@ +package dao + +import ( + "github.com/astaxie/beego/orm" + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" +) + +const ( + JobPending string = "pending" + JobRunning string = "running" + JobError string = "error" + JobStopped string = "stopped" + JobFinished string = "finished" +) + +func AddJob(entry models.JobEntry) (int64, error) { + + sql := `insert into job (job_type, status, options, parms, cron_str, creation_time, update_time) values (?,"pending",?,?,?,NOW(),NOW())` + o := orm.NewOrm() + p, err := o.Raw(sql).Prepare() + if err != nil { + return 0, err + } + r, err := p.Exec(entry.Type, entry.OptionsStr, entry.ParmsStr, entry.CronStr) + if err != nil { + return 0, err + } + id, err := r.LastInsertId() + return id, err +} + +func AddJobLog(id int64, level string, message string) error { + sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())` + log.Debugf("trying to add a log for job:%d", id) + o := orm.NewOrm() + p, err := o.Raw(sql).Prepare() + if err != nil { + return err + } + _, err = p.Exec(id, level, message) + return err +} + +func UpdateJobStatus(id int64, status string) error { + o := orm.NewOrm() + sql := "update job set status=?, update_time=NOW() where job_id=?" + _, err := o.Raw(sql, status, id).Exec() + return err +} + +func ListJobs() ([]models.JobEntry, error) { + o := orm.NewOrm() + sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j` + var res []models.JobEntry + _, err := o.Raw(sql).QueryRows(&res) + if err != nil { + return nil, err + } + return res, err +} + +func GetJob(id int64) (*models.JobEntry, error) { + o := orm.NewOrm() + sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j where j.job_id = ?` + var res []models.JobEntry + p := make([]interface{}, 1) + p = append(p, id) + n, err := o.Raw(sql, p).QueryRows(&res) + if n == 0 { + return nil, err + } + return &res[0], err +} + +func GetJobLogs(jobID int64) ([]models.JobLog, error) { + o := orm.NewOrm() + var res []models.JobLog + p := make([]interface{}, 1) + p = append(p, jobID) + sql := `select l.log_id, l.job_id, l.level, l.message, l.creation_time, l.update_time from job_log l where l.job_id = ?` + _, err := o.Raw(sql, p).QueryRows(&res) + return res, err +} diff --git a/job/imgout/parm.go b/job/imgout/parm.go new file mode 100644 index 000000000..d1ff6bc8d --- /dev/null +++ b/job/imgout/parm.go @@ -0,0 +1,13 @@ +package imgout + +type ImgOutParm struct { + Secret string `json:"secret"` + Image string `json:"image"` + Targets []*RegistryInfo `json:"targets"` +} + +type RegistryInfo struct { + URL string `json:"url"` + Username string `json:"username"` + Password string `json:"password"` +} diff --git a/job/imgout/runner.go b/job/imgout/runner.go new file mode 100644 index 000000000..7bd012c8b --- /dev/null +++ b/job/imgout/runner.go @@ -0,0 +1,82 @@ +package imgout + +import ( + "encoding/json" + "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/job" + "github.com/vmware/harbor/models" + "time" +) + +const ( + jobType = "transfer_img_out" +) + +type Runner struct { + job.JobSM + Logger job.Logger + parm ImgOutParm +} + +type ImgPuller struct { + job.DummyHandler + img string + logger job.Logger +} + +func (ip ImgPuller) Enter() error { + ip.logger.Infof("I'm pretending to pull img:%s, then sleep 10s", ip.img) + time.Sleep(10 * time.Second) + ip.logger.Infof("wake up from sleep....") + return nil +} + +type ImgPusher struct { + job.DummyHandler + targetURL string + logger job.Logger +} + +func (ip ImgPusher) Enter() error { + ip.logger.Infof("I'm pretending to push img to:%s, then sleep 10s", ip.targetURL) + time.Sleep(10 * time.Second) + ip.logger.Infof("wake up from sleep....") + return nil +} + +func init() { + job.Register(jobType, Runner{}) +} + +func (r Runner) Run(je models.JobEntry) error { + err := r.init(je) + if err != nil { + return err + } + path := []string{dao.JobRunning, "pull-img", "push-img", dao.JobFinished} + for _, state := range path { + err := r.EnterState(state) + if err != nil { + r.Logger.Errorf("Error durint transition to state: %s, error: %v", state, err) + r.EnterState(dao.JobError) + break + } + } + return nil +} + +func (r *Runner) init(je models.JobEntry) error { + r.JobID = je.ID + r.InitJobSM() + err := json.Unmarshal([]byte(je.ParmsStr), &r.parm) + if err != nil { + return err + } + r.Logger = job.Logger{je.ID} + r.AddTransition(dao.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger}) + //only handle on target for now + url := r.parm.Targets[0].URL + r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger}) + r.AddTransition("push-img", dao.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, dao.JobFinished}) + return nil +} diff --git a/job/logger.go b/job/logger.go new file mode 100644 index 000000000..0ba8bfef8 --- /dev/null +++ b/job/logger.go @@ -0,0 +1,38 @@ +package job + +import ( + "fmt" + "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/utils/log" +) + +const ( + INFO = "info" + WARN = "warning" + ERR = "error" +) + +type Logger struct { + ID int64 +} + +func (l *Logger) Infof(format string, v ...interface{}) { + err := dao.AddJobLog(l.ID, INFO, fmt.Sprintf(format, v...)) + if err != nil { + log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err) + } +} + +func (l *Logger) Warningf(format string, v ...interface{}) { + err := dao.AddJobLog(l.ID, WARN, fmt.Sprintf(format, v...)) + if err != nil { + log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err) + } +} + +func (l *Logger) Errorf(format string, v ...interface{}) { + err := dao.AddJobLog(l.ID, ERR, fmt.Sprintf(format, v...)) + if err != nil { + log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err) + } +} diff --git a/job/runner.go b/job/runner.go new file mode 100644 index 000000000..66ae4de3e --- /dev/null +++ b/job/runner.go @@ -0,0 +1,36 @@ +package job + +import ( + "fmt" + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" + "sync" +) + +type JobRunner interface { + Run(je models.JobEntry) error +} + +var runners map[string]*JobRunner = make(map[string]*JobRunner) +var runnerLock = &sync.Mutex{} + +func Register(jobType string, runner JobRunner) { + runnerLock.Lock() + defer runnerLock.Unlock() + runners[jobType] = &runner + log.Debugf("runnter for job type:%s has been registered", jobType) +} + +func RunnerExists(jobType string) bool { + _, ok := runners[jobType] + return ok +} + +func run(je models.JobEntry) error { + runner, ok := runners[je.Type] + if !ok { + return fmt.Errorf("Runner for job type: %s does not exist") + } + (*runner).Run(je) + return nil +} diff --git a/job/scheduler.go b/job/scheduler.go new file mode 100644 index 000000000..41d7c7ba1 --- /dev/null +++ b/job/scheduler.go @@ -0,0 +1,12 @@ +package job + +import ( + "github.com/vmware/harbor/models" + "github.com/vmware/harbor/utils/log" +) + +func Schedule(job models.JobEntry) { + log.Infof("job: %d will be scheduled", job.ID) + //TODO: add support for cron string when needed. + go run(job) +} diff --git a/job/statemachine.go b/job/statemachine.go new file mode 100644 index 000000000..f7ec6a01c --- /dev/null +++ b/job/statemachine.go @@ -0,0 +1,104 @@ +package job + +import ( + "fmt" + "github.com/vmware/harbor/dao" + "github.com/vmware/harbor/utils/log" +) + +type StateHandler interface { + Enter() error + //Exit should be idempotent + Exit() error +} + +type DummyHandler struct { + JobID int64 +} + +func (dh DummyHandler) Enter() error { + return nil +} + +func (dh DummyHandler) Exit() error { + return nil +} + +type StatusUpdater struct { + DummyHandler + State string +} + +func (su StatusUpdater) Enter() error { + err := dao.UpdateJobStatus(su.JobID, su.State) + if err != nil { + log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err) + } + return err +} + +type JobSM struct { + JobID int64 + CurrentState string + PreviousState string + //The states that don't have to exist in transition map, such as "Error", "Canceled" + ForcedStates map[string]struct{} + Transitions map[string]map[string]struct{} + Handlers map[string]StateHandler +} + +func (sm *JobSM) EnterState(s string) error { + log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s) + targets, ok := sm.Transitions[sm.CurrentState] + _, exist := targets[s] + _, isForced := sm.ForcedStates[s] + if !exist && !isForced { + return fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s) + } + exitHandler, ok := sm.Handlers[sm.CurrentState] + if ok { + if err := exitHandler.Exit(); err != nil { + return err + } + } else { + log.Debugf("No handler found for state:%s, skip", sm.CurrentState) + } + enterHandler, ok := sm.Handlers[s] + if ok { + if err := enterHandler.Enter(); err != nil { + return err + } + } else { + log.Debugf("No handler found for state:%s, skip", s) + } + sm.PreviousState = sm.CurrentState + sm.CurrentState = s + log.Debugf("Transition succeeded, current state: %s", s) + return nil +} + +func (sm *JobSM) AddTransition(from string, to string, h StateHandler) { + _, ok := sm.Transitions[from] + if !ok { + sm.Transitions[from] = make(map[string]struct{}) + } + sm.Transitions[from][to] = struct{}{} + sm.Handlers[to] = h +} + +func (sm *JobSM) RemoveTransition(from string, to string) { + _, ok := sm.Transitions[from] + if !ok { + return + } + delete(sm.Transitions[from], to) +} + +func (sm *JobSM) InitJobSM() { + sm.Handlers = make(map[string]StateHandler) + sm.Transitions = make(map[string]map[string]struct{}) + sm.CurrentState = dao.JobPending + log.Debugf("sm.Handlers: %v", sm.Handlers) + sm.AddTransition(dao.JobPending, dao.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobRunning}) + sm.Handlers[dao.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, dao.JobError} +} diff --git a/jobservice/error.json b/jobservice/error.json new file mode 100644 index 000000000..3db697999 --- /dev/null +++ b/jobservice/error.json @@ -0,0 +1,10 @@ +{ +"job_type": "notexist", +"options": { + "whatever": "whatever" + }, +"parms": { + "test": "test" + }, +"cron_str": "" +} diff --git a/jobservice/main.go b/jobservice/main.go new file mode 100644 index 000000000..51efa3046 --- /dev/null +++ b/jobservice/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "github.com/astaxie/beego" + "github.com/vmware/harbor/dao" + _ "github.com/vmware/harbor/job/imgout" + // "github.com/vmware/harbor/utils/log" +) + +func main() { + dao.InitDB() + initRouters() + beego.Run() +} diff --git a/jobservice/my_start.sh b/jobservice/my_start.sh new file mode 100755 index 000000000..d2274be48 --- /dev/null +++ b/jobservice/my_start.sh @@ -0,0 +1,7 @@ +export MYSQL_HOST=127.0.0.1 +export MYSQL_PORT=3306 +export MYSQL_USR=root +export MYSQL_PWD=root123 +export LOG_LEVEL=debug + +./jobservice diff --git a/jobservice/router.go b/jobservice/router.go new file mode 100644 index 000000000..eb70e52fa --- /dev/null +++ b/jobservice/router.go @@ -0,0 +1,11 @@ +package main + +import ( + "github.com/vmware/harbor/api" + + "github.com/astaxie/beego" +) + +func initRouters() { + beego.Router("/api/jobs/?:id", &api.JobAPI{}) +} diff --git a/jobservice/start_db.sh b/jobservice/start_db.sh new file mode 100755 index 000000000..46695a557 --- /dev/null +++ b/jobservice/start_db.sh @@ -0,0 +1,2 @@ +#export MYQL_ROOT_PASSWORD=root123 +docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev diff --git a/jobservice/test.json b/jobservice/test.json new file mode 100644 index 000000000..2a5083d89 --- /dev/null +++ b/jobservice/test.json @@ -0,0 +1,17 @@ +{ + "job_type": "transfer_img_out", + "options": { + "whatever": "whatever" + }, + "parms": { + "secret": "mysecret", + "image": "ubuntu", + "targets": [{ + "url": "127.0.0.1:5000", + "username": "admin", + "password": "admin" + }] + + }, + "cron_str": "" +}