diff --git a/dao/job.go b/dao/job.go deleted file mode 100644 index c5e6f45b3..000000000 --- a/dao/job.go +++ /dev/null @@ -1,76 +0,0 @@ -package dao - -import ( - "github.com/astaxie/beego/orm" - "github.com/vmware/harbor/models" - // "github.com/vmware/harbor/utils/log" -) - -func AddJob(entry models.JobEntry) (int64, error) { - - sql := `insert into job (job_type, status, options, parms, cron_str, creation_time, update_time) values (?,"pending",?,?,?,NOW(),NOW())` - o := orm.NewOrm() - p, err := o.Raw(sql).Prepare() - if err != nil { - return 0, err - } - r, err := p.Exec(entry.Type, entry.OptionsStr, entry.ParmsStr, entry.CronStr) - if err != nil { - return 0, err - } - id, err := r.LastInsertId() - return id, err -} - -func AddJobLog(id int64, level string, message string) error { - sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())` - //log.Debugf("trying to add a log for job:%d", id) - o := orm.NewOrm() - p, err := o.Raw(sql).Prepare() - if err != nil { - return err - } - _, err = p.Exec(id, level, message) - return err -} - -func UpdateJobStatus(id int64, status string) error { - o := orm.NewOrm() - sql := "update job set status=?, update_time=NOW() where job_id=?" - _, err := o.Raw(sql, status, id).Exec() - return err -} - -func ListJobs() ([]models.JobEntry, error) { - o := orm.NewOrm() - sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j` - var res []models.JobEntry - _, err := o.Raw(sql).QueryRows(&res) - if err != nil { - return nil, err - } - return res, err -} - -func GetJob(id int64) (*models.JobEntry, error) { - o := orm.NewOrm() - sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j where j.job_id = ?` - var res []models.JobEntry - p := make([]interface{}, 1) - p = append(p, id) - n, err := o.Raw(sql, p).QueryRows(&res) - if n == 0 { - return nil, err - } - return &res[0], err -} - -func GetJobLogs(jobID int64) ([]models.JobLog, error) { - o := orm.NewOrm() - var res []models.JobLog - p := make([]interface{}, 1) - p = append(p, jobID) - sql := `select l.log_id, l.job_id, l.level, l.message, l.creation_time, l.update_time from job_log l where l.job_id = ?` - _, err := o.Raw(sql, p).QueryRows(&res) - return res, err -} diff --git a/job/config/config.go b/job/config/config.go index 770d3625e..650fa1188 100644 --- a/job/config/config.go +++ b/job/config/config.go @@ -58,18 +58,22 @@ func init() { log.Debugf("config: uiSecret: ******") } +// MaxJobWorkers ... func MaxJobWorkers() int { return maxJobWorkers } +// LocalHarborURL returns the local registry url, job service will use this URL to pull manifest and repository. func LocalHarborURL() string { return localURL } +// LogDir returns the absolute path to which the log file will be written func LogDir() string { return logDir } +// UISecret will return the value of secret cookie for jobsevice to call UI API. func UISecret() string { return uiSecret } diff --git a/job/replication/parm.go b/job/replication/parm.go deleted file mode 100644 index 68deb4f2d..000000000 --- a/job/replication/parm.go +++ /dev/null @@ -1,13 +0,0 @@ -package replication - -type ImgOutParm struct { - Secret string `json:"secret"` - Image string `json:"image"` - Targets []*RegistryInfo `json:"targets"` -} - -type RegistryInfo struct { - URL string `json:"url"` - Username string `json:"username"` - Password string `json:"password"` -} diff --git a/job/runner.go b/job/runner.go deleted file mode 100644 index 66ae4de3e..000000000 --- a/job/runner.go +++ /dev/null @@ -1,36 +0,0 @@ -package job - -import ( - "fmt" - "github.com/vmware/harbor/models" - "github.com/vmware/harbor/utils/log" - "sync" -) - -type JobRunner interface { - Run(je models.JobEntry) error -} - -var runners map[string]*JobRunner = make(map[string]*JobRunner) -var runnerLock = &sync.Mutex{} - -func Register(jobType string, runner JobRunner) { - runnerLock.Lock() - defer runnerLock.Unlock() - runners[jobType] = &runner - log.Debugf("runnter for job type:%s has been registered", jobType) -} - -func RunnerExists(jobType string) bool { - _, ok := runners[jobType] - return ok -} - -func run(je models.JobEntry) error { - runner, ok := runners[je.Type] - if !ok { - return fmt.Errorf("Runner for job type: %s does not exist") - } - (*runner).Run(je) - return nil -} diff --git a/job/scheduler.go b/job/scheduler.go index 726943e47..387d92942 100644 --- a/job/scheduler.go +++ b/job/scheduler.go @@ -1,7 +1,8 @@ package job -var JobQueue chan int64 = make(chan int64) +var jobQueue = make(chan int64) +// Schedule put a job id into job queue. func Schedule(jobID int64) { - JobQueue <- jobID + jobQueue <- jobID } diff --git a/job/statehandlers.go b/job/statehandlers.go index 10c72c6b5..15907396b 100644 --- a/job/statehandlers.go +++ b/job/statehandlers.go @@ -18,41 +18,50 @@ type StateHandler interface { Exit() error } +// DummyHandler is the default implementation of StateHander interface, which has empty Enter and Exit methods. type DummyHandler struct { JobID int64 } +// Enter ... func (dh DummyHandler) Enter() (string, error) { return "", nil } +// Exit ... func (dh DummyHandler) Exit() error { return nil } +// StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters +// a status. type StatusUpdater struct { DummyHandler State string } +// Enter updates the status of a job and returns "_continue" status to tell state machine to move on. +// If the status is a final status it returns empty string and the state machine will be stopped. func (su StatusUpdater) Enter() (string, error) { err := dao.UpdateRepJobStatus(su.JobID, su.State) if err != nil { log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err) } - var next string = models.JobContinue + var next = models.JobContinue if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished { next = "" } return next, err } +// ImgPuller was for testing type ImgPuller struct { DummyHandler img string logger *log.Logger } +// Enter ... func (ip ImgPuller) Enter() (string, error) { ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img) time.Sleep(30 * time.Second) @@ -60,12 +69,14 @@ func (ip ImgPuller) Enter() (string, error) { return "push-img", nil } +// ImgPusher is a statehandler for testing type ImgPusher struct { DummyHandler targetURL string logger *log.Logger } +// Enter ... func (ip ImgPusher) Enter() (string, error) { ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL) time.Sleep(30 * time.Second) diff --git a/job/statemachine.go b/job/statemachine.go index 99ef38124..5c8233988 100644 --- a/job/statemachine.go +++ b/job/statemachine.go @@ -13,6 +13,7 @@ import ( "github.com/vmware/harbor/utils/log" ) +// RepJobParm wraps the parm of a job type RepJobParm struct { LocalRegURL string TargetURL string @@ -24,7 +25,8 @@ type RepJobParm struct { Operation string } -type JobSM struct { +// SM is the state machine to handle job, it handles one job at a time. +type SM struct { JobID int64 CurrentState string PreviousState string @@ -38,9 +40,9 @@ type JobSM struct { lock *sync.Mutex } -// EnsterState transit the statemachine from the current state to the state in parameter. +// EnterState transit the statemachine from the current state to the state in parameter. // It returns the next state the statemachine should tranit to. -func (sm *JobSM) EnterState(s string) (string, error) { +func (sm *SM) EnterState(s string) (string, error) { log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s) targets, ok := sm.Transitions[sm.CurrentState] _, exist := targets[s] @@ -57,7 +59,7 @@ func (sm *JobSM) EnterState(s string) (string, error) { log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState) } enterHandler, ok := sm.Handlers[s] - var next string = models.JobContinue + var next = models.JobContinue var err error if ok { if next, err = enterHandler.Enter(); err != nil { @@ -75,7 +77,7 @@ func (sm *JobSM) EnterState(s string) (string, error) { // Start kicks off the statemachine to transit from current state to s, and moves on // It will search the transit map if the next state is "_continue", and // will enter error state if there's more than one possible path when next state is "_continue" -func (sm *JobSM) Start(s string) { +func (sm *SM) Start(s string) { n, err := sm.EnterState(s) log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n) for len(n) > 0 && err == nil { @@ -106,7 +108,8 @@ func (sm *JobSM) Start(s string) { } } -func (sm *JobSM) AddTransition(from string, to string, h StateHandler) { +// AddTransition add a transition to the transition table of state machine, the handler is the handler of target state "to" +func (sm *SM) AddTransition(from string, to string, h StateHandler) { _, ok := sm.Transitions[from] if !ok { sm.Transitions[from] = make(map[string]struct{}) @@ -115,7 +118,8 @@ func (sm *JobSM) AddTransition(from string, to string, h StateHandler) { sm.Handlers[to] = h } -func (sm *JobSM) RemoveTransition(from string, to string) { +// RemoveTransition removes a transition from transition table of the state machine +func (sm *SM) RemoveTransition(from string, to string) { _, ok := sm.Transitions[from] if !ok { return @@ -123,7 +127,9 @@ func (sm *JobSM) RemoveTransition(from string, to string) { delete(sm.Transitions[from], to) } -func (sm *JobSM) Stop(id int64) { +// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job +// and the worker can release itself to the workerpool. +func (sm *SM) Stop(id int64) { log.Debugf("Trying to stop the job: %d", id) sm.lock.Lock() defer sm.lock.Unlock() @@ -136,19 +142,20 @@ func (sm *JobSM) Stop(id int64) { } } -func (sm *JobSM) getDesiredState() string { +func (sm *SM) getDesiredState() string { sm.lock.Lock() defer sm.lock.Unlock() return sm.desiredState } -func (sm *JobSM) setDesiredState(s string) { +func (sm *SM) setDesiredState(s string) { sm.lock.Lock() defer sm.lock.Unlock() sm.desiredState = s } -func (sm *JobSM) Init() { +// Init initialzie the state machine, it will be called once in the lifecycle of state machine. +func (sm *SM) Init() { sm.lock = &sync.Mutex{} sm.Handlers = make(map[string]StateHandler) sm.Transitions = make(map[string]map[string]struct{}) @@ -159,7 +166,8 @@ func (sm *JobSM) Init() { } } -func (sm *JobSM) Reset(jid int64) error { +// Reset resets the state machine so it will start handling another job. +func (sm *SM) Reset(jid int64) error { //To ensure the new jobID is visible to the thread to stop the SM sm.lock.Lock() sm.JobID = jid @@ -234,7 +242,7 @@ func (sm *JobSM) Reset(jid int64) error { return err } -func addImgTransferTransition(sm *JobSM) error { +func addImgTransferTransition(sm *SM) error { base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(), sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Parms.Tags, sm.Logger) @@ -250,7 +258,7 @@ func addImgTransferTransition(sm *JobSM) error { return nil } -func addImgDeleteTransition(sm *JobSM) error { +func addImgDeleteTransition(sm *SM) error { deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Logger) diff --git a/job/utils/logger.go b/job/utils/logger.go index 2905e3f50..0a245753c 100644 --- a/job/utils/logger.go +++ b/job/utils/logger.go @@ -9,6 +9,7 @@ import ( "path/filepath" ) +// NewLogger create a logger for a speicified job func NewLogger(jobID int64) *log.Logger { logFile := GetJobLogPath(jobID) f, err := os.OpenFile(logFile, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) @@ -19,6 +20,7 @@ func NewLogger(jobID int64) *log.Logger { return log.New(f, log.NewTextFormatter(), log.InfoLevel) } +// GetJobLogPath returns the absolute path in which the job log file is located. func GetJobLogPath(jobID int64) string { fn := fmt.Sprintf("job_%d.log", jobID) return filepath.Join(config.LogDir(), fn) diff --git a/job/workerpool.go b/job/workerpool.go index e55126078..0d6d2e352 100644 --- a/job/workerpool.go +++ b/job/workerpool.go @@ -12,8 +12,11 @@ type workerPool struct { workerList []*Worker } +// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs. +// it consists of a channel for free workers and a list to all workers var WorkerPool *workerPool +// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker. func (wp *workerPool) StopJobs(jobs []int64) { log.Debugf("Works working on jobs: %v will be stopped", jobs) for _, id := range jobs { @@ -26,13 +29,16 @@ func (wp *workerPool) StopJobs(jobs []int64) { } } +// Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine, +// the actual work to handle the job is done via state machine. type Worker struct { ID int RepJobs chan int64 - SM *JobSM + SM *SM quit chan bool } +// Start is a loop worker gets id from its channel and handle it. func (w *Worker) Start() { go func() { for { @@ -51,6 +57,7 @@ func (w *Worker) Start() { }() } +// Stop ... func (w *Worker) Stop() { go func() { w.quit <- true @@ -75,17 +82,19 @@ func (w *Worker) handleRepJob(id int64) { } } +// NewWorker returns a pointer to new instance of worker func NewWorker(id int) *Worker { w := &Worker{ ID: id, RepJobs: make(chan int64), quit: make(chan bool), - SM: &JobSM{}, + SM: &SM{}, } w.SM.Init() return w } +// InitWorkerPool create workers according to configuration. func InitWorkerPool() { WorkerPool = &workerPool{ workerChan: make(chan *Worker, config.MaxJobWorkers()), @@ -99,10 +108,11 @@ func InitWorkerPool() { } } +// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it. func Dispatch() { for { select { - case job := <-JobQueue: + case job := <-jobQueue: go func(jobID int64) { log.Debugf("Trying to dispatch job: %d", jobID) worker := <-WorkerPool.workerChan diff --git a/jobservice/router.go b/jobservice/router.go index 6a87bd995..e917fcc90 100644 --- a/jobservice/router.go +++ b/jobservice/router.go @@ -7,7 +7,7 @@ import ( ) func initRouters() { - beego.Router("/api/replicationJobs", &api.ReplicationJob{}) - beego.Router("/api/replicationJobs/:id/log", &api.ReplicationJob{}, "get:GetLog") - beego.Router("/api/replicationJobs/actions", &api.ReplicationJob{}, "post:HandleAction") + beego.Router("/api/jobs/replication", &api.ReplicationJob{}) + beego.Router("/api/jobs/replication/:id/log", &api.ReplicationJob{}, "get:GetLog") + beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction") } diff --git a/models/job.go b/models/job.go deleted file mode 100644 index 89562894c..000000000 --- a/models/job.go +++ /dev/null @@ -1,30 +0,0 @@ -package models - -import ( - "time" -) - -type JobEntry struct { - ID int64 `orm:"column(job_id)" json:"job_id"` - Type string `orm:"column(job_type)" json:"job_type"` - OptionsStr string `orm:"column(options)"` - ParmsStr string `orm:"column(parms)"` - Status string `orm:"column(status)" json:"status"` - Options map[string]interface{} `json:"options"` - Parms map[string]interface{} `json:"parms"` - Enabled int `orm:"column(enabled)" json:"enabled"` - CronStr string `orm:"column(cron_str)" json:"cron_str"` - TriggeredBy string `orm:"column(triggered_by)" json:"triggered_by"` - CreationTime time.Time `orm:"creation_time" json:"creation_time"` - UpdateTime time.Time `orm:"update_time" json:"update_time"` - Logs []JobLog `json:"logs"` -} - -type JobLog struct { - ID int64 `orm:"column(log_id)" json:"log_id"` - JobID int64 `orm:"column(job_id)" json:"job_id"` - Level string `orm:"column(level)" json:"level"` - Message string `orm:"column(message)" json:"message"` - CreationTime time.Time `orm:"creation_time" json:"creation_time"` - UpdateTime time.Time `orm:"update_time" json:"update_time"` -} diff --git a/models/replication_job.go b/models/replication_job.go index 70907ea38..a10d26761 100644 --- a/models/replication_job.go +++ b/models/replication_job.go @@ -5,20 +5,29 @@ import ( ) const ( - JobPending string = "pending" - JobRunning string = "running" - JobError string = "error" - JobStopped string = "stopped" + //JobPending ... + JobPending string = "pending" + //JobRunning ... + JobRunning string = "running" + //JobError ... + JobError string = "error" + //JobStopped ... + JobStopped string = "stopped" + //JobFinished ... JobFinished string = "finished" + //JobCanceled ... JobCanceled string = "canceled" - // statemachine will move to next possible state based on trasition table - JobContinue string = "_continue" + //JobContinue is the status returned by statehandler to tell statemachine to move to next possible state based on trasition table. + JobContinue string = "_continue" + //RepOpTransfer represents the operation of a job to transfer repository to a remote registry/harbor instance. RepOpTransfer string = "transfer" - RepOpDelete string = "delete" - //cookie name to contain the UI secret + //RepOpDelete represents the operation of a job to remove repository from a remote registry/harbor instance. + RepOpDelete string = "delete" + //UISecretCookie is the cookie name to contain the UI secret UISecretCookie string = "uisecret" ) +// RepPolicy is the model for a replication policy, which associate to a project and a target (destination) type RepPolicy struct { ID int64 `orm:"column(id)" json:"id"` ProjectID int64 `orm:"column(project_id)" json:"project_id"` @@ -33,6 +42,8 @@ type RepPolicy struct { UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` } +// RepJob is the model for a replication job, which is the execution unit on job service, currently it is used to transfer/remove +// a repository to/from a remote registry instance. type RepJob struct { ID int64 `orm:"column(id)" json:"id"` Status string `orm:"column(status)" json:"status"` @@ -46,6 +57,7 @@ type RepJob struct { UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` } +// RepTarget is the model for a replication targe, i.e. destination, which wraps the endpoint URL and username/password of a remote registry. type RepTarget struct { ID int64 `orm:"column(id)" json:"id"` URL string `orm:"column(url)" json:"url"` @@ -56,14 +68,17 @@ type RepTarget struct { UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` } +//TableName is required by by beego orm to map RepTarget to table replication_target func (rt *RepTarget) TableName() string { return "replication_target" } +//TableName is required by by beego orm to map RepJob to table replication_job func (rj *RepJob) TableName() string { return "replication_job" } +//TableName is required by by beego orm to map RepPolicy to table replication_policy func (rp *RepPolicy) TableName() string { return "replication_policy" } diff --git a/service/utils/utils.go b/service/utils/utils.go index 8863faa09..ae54bc3d3 100644 --- a/service/utils/utils.go +++ b/service/utils/utils.go @@ -12,6 +12,8 @@ See the License for the specific language governing permissions and limitations under the License. */ + +// Package utils contains methods to support security, cache, and webhook functions. package utils import ( @@ -20,6 +22,7 @@ import ( "os" ) +// VerifySecret verifies the UI_SECRET cookie in a http request. func VerifySecret(r *http.Request) bool { secret := os.Getenv("UI_SECRET") c, err := r.Cookie("uisecret") diff --git a/utils/registry/auth/credential.go b/utils/registry/auth/credential.go index 08f9295a0..562e41974 100644 --- a/utils/registry/auth/credential.go +++ b/utils/registry/auth/credential.go @@ -47,6 +47,8 @@ type cookieCredential struct { cookie *http.Cookie } +// NewCookieCredential initialize a cookie based crendential handler, the cookie in parameter will be added to request to registry +// if this crendential is attached to a registry client. func NewCookieCredential(c *http.Cookie) Credential { return &cookieCredential{ cookie: c, diff --git a/utils/registry/registry.go b/utils/registry/registry.go index 1b944664f..974b6d2b7 100644 --- a/utils/registry/registry.go +++ b/utils/registry/registry.go @@ -29,6 +29,7 @@ import ( ) const ( + // UserAgent is used to decorate the request so it can be identified by webhook. UserAgent string = "registry-client" ) @@ -82,6 +83,8 @@ func NewRegistryWithUsername(endpoint, username string) (*Registry, error) { return registry, nil } +// NewRegistryWithCredential returns a Registry instance which associate to a crendential. +// And Credential is essentially a decorator for client to docorate the request before sending it to the registry. func NewRegistryWithCredential(endpoint string, credential auth.Credential) (*Registry, error) { endpoint = strings.TrimSpace(endpoint) endpoint = strings.TrimRight(endpoint, "/")