Merge pull request #285 from reasonerjt/job-service

Job service, add comments to pass golint
This commit is contained in:
Daniel Jiang 2016-05-30 17:43:37 +08:00
commit e3085a31bc
15 changed files with 90 additions and 186 deletions

View File

@ -1,76 +0,0 @@
package dao
import (
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
// "github.com/vmware/harbor/utils/log"
)
func AddJob(entry models.JobEntry) (int64, error) {
sql := `insert into job (job_type, status, options, parms, cron_str, creation_time, update_time) values (?,"pending",?,?,?,NOW(),NOW())`
o := orm.NewOrm()
p, err := o.Raw(sql).Prepare()
if err != nil {
return 0, err
}
r, err := p.Exec(entry.Type, entry.OptionsStr, entry.ParmsStr, entry.CronStr)
if err != nil {
return 0, err
}
id, err := r.LastInsertId()
return id, err
}
func AddJobLog(id int64, level string, message string) error {
sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())`
//log.Debugf("trying to add a log for job:%d", id)
o := orm.NewOrm()
p, err := o.Raw(sql).Prepare()
if err != nil {
return err
}
_, err = p.Exec(id, level, message)
return err
}
func UpdateJobStatus(id int64, status string) error {
o := orm.NewOrm()
sql := "update job set status=?, update_time=NOW() where job_id=?"
_, err := o.Raw(sql, status, id).Exec()
return err
}
func ListJobs() ([]models.JobEntry, error) {
o := orm.NewOrm()
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j`
var res []models.JobEntry
_, err := o.Raw(sql).QueryRows(&res)
if err != nil {
return nil, err
}
return res, err
}
func GetJob(id int64) (*models.JobEntry, error) {
o := orm.NewOrm()
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j where j.job_id = ?`
var res []models.JobEntry
p := make([]interface{}, 1)
p = append(p, id)
n, err := o.Raw(sql, p).QueryRows(&res)
if n == 0 {
return nil, err
}
return &res[0], err
}
func GetJobLogs(jobID int64) ([]models.JobLog, error) {
o := orm.NewOrm()
var res []models.JobLog
p := make([]interface{}, 1)
p = append(p, jobID)
sql := `select l.log_id, l.job_id, l.level, l.message, l.creation_time, l.update_time from job_log l where l.job_id = ?`
_, err := o.Raw(sql, p).QueryRows(&res)
return res, err
}

View File

@ -58,18 +58,22 @@ func init() {
log.Debugf("config: uiSecret: ******")
}
// MaxJobWorkers ...
func MaxJobWorkers() int {
return maxJobWorkers
}
// LocalHarborURL returns the local registry url, job service will use this URL to pull manifest and repository.
func LocalHarborURL() string {
return localURL
}
// LogDir returns the absolute path to which the log file will be written
func LogDir() string {
return logDir
}
// UISecret will return the value of secret cookie for jobsevice to call UI API.
func UISecret() string {
return uiSecret
}

View File

@ -1,13 +0,0 @@
package replication
type ImgOutParm struct {
Secret string `json:"secret"`
Image string `json:"image"`
Targets []*RegistryInfo `json:"targets"`
}
type RegistryInfo struct {
URL string `json:"url"`
Username string `json:"username"`
Password string `json:"password"`
}

View File

@ -1,36 +0,0 @@
package job
import (
"fmt"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"sync"
)
type JobRunner interface {
Run(je models.JobEntry) error
}
var runners map[string]*JobRunner = make(map[string]*JobRunner)
var runnerLock = &sync.Mutex{}
func Register(jobType string, runner JobRunner) {
runnerLock.Lock()
defer runnerLock.Unlock()
runners[jobType] = &runner
log.Debugf("runnter for job type:%s has been registered", jobType)
}
func RunnerExists(jobType string) bool {
_, ok := runners[jobType]
return ok
}
func run(je models.JobEntry) error {
runner, ok := runners[je.Type]
if !ok {
return fmt.Errorf("Runner for job type: %s does not exist")
}
(*runner).Run(je)
return nil
}

View File

@ -1,7 +1,8 @@
package job
var JobQueue chan int64 = make(chan int64)
var jobQueue = make(chan int64)
// Schedule put a job id into job queue.
func Schedule(jobID int64) {
JobQueue <- jobID
jobQueue <- jobID
}

View File

@ -18,41 +18,50 @@ type StateHandler interface {
Exit() error
}
// DummyHandler is the default implementation of StateHander interface, which has empty Enter and Exit methods.
type DummyHandler struct {
JobID int64
}
// Enter ...
func (dh DummyHandler) Enter() (string, error) {
return "", nil
}
// Exit ...
func (dh DummyHandler) Exit() error {
return nil
}
// StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters
// a status.
type StatusUpdater struct {
DummyHandler
State string
}
// Enter updates the status of a job and returns "_continue" status to tell state machine to move on.
// If the status is a final status it returns empty string and the state machine will be stopped.
func (su StatusUpdater) Enter() (string, error) {
err := dao.UpdateRepJobStatus(su.JobID, su.State)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
}
var next string = models.JobContinue
var next = models.JobContinue
if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished {
next = ""
}
return next, err
}
// ImgPuller was for testing
type ImgPuller struct {
DummyHandler
img string
logger *log.Logger
}
// Enter ...
func (ip ImgPuller) Enter() (string, error) {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
time.Sleep(30 * time.Second)
@ -60,12 +69,14 @@ func (ip ImgPuller) Enter() (string, error) {
return "push-img", nil
}
// ImgPusher is a statehandler for testing
type ImgPusher struct {
DummyHandler
targetURL string
logger *log.Logger
}
// Enter ...
func (ip ImgPusher) Enter() (string, error) {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
time.Sleep(30 * time.Second)

View File

@ -13,6 +13,7 @@ import (
"github.com/vmware/harbor/utils/log"
)
// RepJobParm wraps the parm of a job
type RepJobParm struct {
LocalRegURL string
TargetURL string
@ -24,7 +25,8 @@ type RepJobParm struct {
Operation string
}
type JobSM struct {
// SM is the state machine to handle job, it handles one job at a time.
type SM struct {
JobID int64
CurrentState string
PreviousState string
@ -38,9 +40,9 @@ type JobSM struct {
lock *sync.Mutex
}
// EnsterState transit the statemachine from the current state to the state in parameter.
// EnterState transit the statemachine from the current state to the state in parameter.
// It returns the next state the statemachine should tranit to.
func (sm *JobSM) EnterState(s string) (string, error) {
func (sm *SM) EnterState(s string) (string, error) {
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
@ -57,7 +59,7 @@ func (sm *JobSM) EnterState(s string) (string, error) {
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next string = models.JobContinue
var next = models.JobContinue
var err error
if ok {
if next, err = enterHandler.Enter(); err != nil {
@ -75,7 +77,7 @@ func (sm *JobSM) EnterState(s string) (string, error) {
// Start kicks off the statemachine to transit from current state to s, and moves on
// It will search the transit map if the next state is "_continue", and
// will enter error state if there's more than one possible path when next state is "_continue"
func (sm *JobSM) Start(s string) {
func (sm *SM) Start(s string) {
n, err := sm.EnterState(s)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
for len(n) > 0 && err == nil {
@ -106,7 +108,8 @@ func (sm *JobSM) Start(s string) {
}
}
func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
// AddTransition add a transition to the transition table of state machine, the handler is the handler of target state "to"
func (sm *SM) AddTransition(from string, to string, h StateHandler) {
_, ok := sm.Transitions[from]
if !ok {
sm.Transitions[from] = make(map[string]struct{})
@ -115,7 +118,8 @@ func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
sm.Handlers[to] = h
}
func (sm *JobSM) RemoveTransition(from string, to string) {
// RemoveTransition removes a transition from transition table of the state machine
func (sm *SM) RemoveTransition(from string, to string) {
_, ok := sm.Transitions[from]
if !ok {
return
@ -123,7 +127,9 @@ func (sm *JobSM) RemoveTransition(from string, to string) {
delete(sm.Transitions[from], to)
}
func (sm *JobSM) Stop(id int64) {
// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job
// and the worker can release itself to the workerpool.
func (sm *SM) Stop(id int64) {
log.Debugf("Trying to stop the job: %d", id)
sm.lock.Lock()
defer sm.lock.Unlock()
@ -136,19 +142,20 @@ func (sm *JobSM) Stop(id int64) {
}
}
func (sm *JobSM) getDesiredState() string {
func (sm *SM) getDesiredState() string {
sm.lock.Lock()
defer sm.lock.Unlock()
return sm.desiredState
}
func (sm *JobSM) setDesiredState(s string) {
func (sm *SM) setDesiredState(s string) {
sm.lock.Lock()
defer sm.lock.Unlock()
sm.desiredState = s
}
func (sm *JobSM) Init() {
// Init initialzie the state machine, it will be called once in the lifecycle of state machine.
func (sm *SM) Init() {
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
@ -159,7 +166,8 @@ func (sm *JobSM) Init() {
}
}
func (sm *JobSM) Reset(jid int64) error {
// Reset resets the state machine so it will start handling another job.
func (sm *SM) Reset(jid int64) error {
//To ensure the new jobID is visible to the thread to stop the SM
sm.lock.Lock()
sm.JobID = jid
@ -234,7 +242,7 @@ func (sm *JobSM) Reset(jid int64) error {
return err
}
func addImgTransferTransition(sm *JobSM) error {
func addImgTransferTransition(sm *SM) error {
base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(),
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
sm.Parms.Tags, sm.Logger)
@ -250,7 +258,7 @@ func addImgTransferTransition(sm *JobSM) error {
return nil
}
func addImgDeleteTransition(sm *JobSM) error {
func addImgDeleteTransition(sm *SM) error {
deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL,
sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Logger)

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
)
// NewLogger create a logger for a speicified job
func NewLogger(jobID int64) *log.Logger {
logFile := GetJobLogPath(jobID)
f, err := os.OpenFile(logFile, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
@ -19,6 +20,7 @@ func NewLogger(jobID int64) *log.Logger {
return log.New(f, log.NewTextFormatter(), log.InfoLevel)
}
// GetJobLogPath returns the absolute path in which the job log file is located.
func GetJobLogPath(jobID int64) string {
fn := fmt.Sprintf("job_%d.log", jobID)
return filepath.Join(config.LogDir(), fn)

View File

@ -12,8 +12,11 @@ type workerPool struct {
workerList []*Worker
}
// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
var WorkerPool *workerPool
// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker.
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
@ -26,13 +29,16 @@ func (wp *workerPool) StopJobs(jobs []int64) {
}
}
// Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine,
// the actual work to handle the job is done via state machine.
type Worker struct {
ID int
RepJobs chan int64
SM *JobSM
SM *SM
quit chan bool
}
// Start is a loop worker gets id from its channel and handle it.
func (w *Worker) Start() {
go func() {
for {
@ -51,6 +57,7 @@ func (w *Worker) Start() {
}()
}
// Stop ...
func (w *Worker) Stop() {
go func() {
w.quit <- true
@ -75,17 +82,19 @@ func (w *Worker) handleRepJob(id int64) {
}
}
// NewWorker returns a pointer to new instance of worker
func NewWorker(id int) *Worker {
w := &Worker{
ID: id,
RepJobs: make(chan int64),
quit: make(chan bool),
SM: &JobSM{},
SM: &SM{},
}
w.SM.Init()
return w
}
// InitWorkerPool create workers according to configuration.
func InitWorkerPool() {
WorkerPool = &workerPool{
workerChan: make(chan *Worker, config.MaxJobWorkers()),
@ -99,10 +108,11 @@ func InitWorkerPool() {
}
}
// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it.
func Dispatch() {
for {
select {
case job := <-JobQueue:
case job := <-jobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool.workerChan

View File

@ -7,7 +7,7 @@ import (
)
func initRouters() {
beego.Router("/api/replicationJobs", &api.ReplicationJob{})
beego.Router("/api/replicationJobs/:id/log", &api.ReplicationJob{}, "get:GetLog")
beego.Router("/api/replicationJobs/actions", &api.ReplicationJob{}, "post:HandleAction")
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
beego.Router("/api/jobs/replication/:id/log", &api.ReplicationJob{}, "get:GetLog")
beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction")
}

View File

@ -1,30 +0,0 @@
package models
import (
"time"
)
type JobEntry struct {
ID int64 `orm:"column(job_id)" json:"job_id"`
Type string `orm:"column(job_type)" json:"job_type"`
OptionsStr string `orm:"column(options)"`
ParmsStr string `orm:"column(parms)"`
Status string `orm:"column(status)" json:"status"`
Options map[string]interface{} `json:"options"`
Parms map[string]interface{} `json:"parms"`
Enabled int `orm:"column(enabled)" json:"enabled"`
CronStr string `orm:"column(cron_str)" json:"cron_str"`
TriggeredBy string `orm:"column(triggered_by)" json:"triggered_by"`
CreationTime time.Time `orm:"creation_time" json:"creation_time"`
UpdateTime time.Time `orm:"update_time" json:"update_time"`
Logs []JobLog `json:"logs"`
}
type JobLog struct {
ID int64 `orm:"column(log_id)" json:"log_id"`
JobID int64 `orm:"column(job_id)" json:"job_id"`
Level string `orm:"column(level)" json:"level"`
Message string `orm:"column(message)" json:"message"`
CreationTime time.Time `orm:"creation_time" json:"creation_time"`
UpdateTime time.Time `orm:"update_time" json:"update_time"`
}

View File

@ -5,20 +5,29 @@ import (
)
const (
JobPending string = "pending"
JobRunning string = "running"
JobError string = "error"
JobStopped string = "stopped"
//JobPending ...
JobPending string = "pending"
//JobRunning ...
JobRunning string = "running"
//JobError ...
JobError string = "error"
//JobStopped ...
JobStopped string = "stopped"
//JobFinished ...
JobFinished string = "finished"
//JobCanceled ...
JobCanceled string = "canceled"
// statemachine will move to next possible state based on trasition table
JobContinue string = "_continue"
//JobContinue is the status returned by statehandler to tell statemachine to move to next possible state based on trasition table.
JobContinue string = "_continue"
//RepOpTransfer represents the operation of a job to transfer repository to a remote registry/harbor instance.
RepOpTransfer string = "transfer"
RepOpDelete string = "delete"
//cookie name to contain the UI secret
//RepOpDelete represents the operation of a job to remove repository from a remote registry/harbor instance.
RepOpDelete string = "delete"
//UISecretCookie is the cookie name to contain the UI secret
UISecretCookie string = "uisecret"
)
// RepPolicy is the model for a replication policy, which associate to a project and a target (destination)
type RepPolicy struct {
ID int64 `orm:"column(id)" json:"id"`
ProjectID int64 `orm:"column(project_id)" json:"project_id"`
@ -33,6 +42,8 @@ type RepPolicy struct {
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// RepJob is the model for a replication job, which is the execution unit on job service, currently it is used to transfer/remove
// a repository to/from a remote registry instance.
type RepJob struct {
ID int64 `orm:"column(id)" json:"id"`
Status string `orm:"column(status)" json:"status"`
@ -46,6 +57,7 @@ type RepJob struct {
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// RepTarget is the model for a replication targe, i.e. destination, which wraps the endpoint URL and username/password of a remote registry.
type RepTarget struct {
ID int64 `orm:"column(id)" json:"id"`
URL string `orm:"column(url)" json:"url"`
@ -56,14 +68,17 @@ type RepTarget struct {
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
//TableName is required by by beego orm to map RepTarget to table replication_target
func (rt *RepTarget) TableName() string {
return "replication_target"
}
//TableName is required by by beego orm to map RepJob to table replication_job
func (rj *RepJob) TableName() string {
return "replication_job"
}
//TableName is required by by beego orm to map RepPolicy to table replication_policy
func (rp *RepPolicy) TableName() string {
return "replication_policy"
}

View File

@ -12,6 +12,8 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package utils contains methods to support security, cache, and webhook functions.
package utils
import (
@ -20,6 +22,7 @@ import (
"os"
)
// VerifySecret verifies the UI_SECRET cookie in a http request.
func VerifySecret(r *http.Request) bool {
secret := os.Getenv("UI_SECRET")
c, err := r.Cookie("uisecret")

View File

@ -47,6 +47,8 @@ type cookieCredential struct {
cookie *http.Cookie
}
// NewCookieCredential initialize a cookie based crendential handler, the cookie in parameter will be added to request to registry
// if this crendential is attached to a registry client.
func NewCookieCredential(c *http.Cookie) Credential {
return &cookieCredential{
cookie: c,

View File

@ -29,6 +29,7 @@ import (
)
const (
// UserAgent is used to decorate the request so it can be identified by webhook.
UserAgent string = "registry-client"
)
@ -82,6 +83,8 @@ func NewRegistryWithUsername(endpoint, username string) (*Registry, error) {
return registry, nil
}
// NewRegistryWithCredential returns a Registry instance which associate to a crendential.
// And Credential is essentially a decorator for client to docorate the request before sending it to the registry.
func NewRegistryWithCredential(endpoint string, credential auth.Credential) (*Registry, error) {
endpoint = strings.TrimSpace(endpoint)
endpoint = strings.TrimRight(endpoint, "/")