refactor job service (#2348)

This commit is contained in:
Daniel Jiang 2017-05-22 22:33:20 -07:00 committed by GitHub
parent 4884ec7835
commit 1c441b17be
17 changed files with 532 additions and 260 deletions

View File

@ -80,6 +80,14 @@ func GetOrmer() orm.Ormer {
return globalOrm
}
// ClearTable is the shortcut for test cases, it should be called only in test cases.
func ClearTable(table string) error {
o := GetOrmer()
sql := fmt.Sprintf("delete from %s where 1=1", table)
_, err := o.Raw(sql).Exec()
return err
}
func paginateForRawSQL(sql string, limit, offset int64) string {
return fmt.Sprintf("%s limit %d offset %d", sql, limit, offset)
}

View File

@ -15,7 +15,6 @@
package dao
import (
"fmt"
"os"
"strconv"
"testing"
@ -42,13 +41,6 @@ func execUpdate(o orm.Ormer, sql string, params ...interface{}) error {
return nil
}
func clearTable(table string) error {
o := GetOrmer()
sql := fmt.Sprintf("delete from %s where 1=1", table)
_, err := o.Raw(sql).Exec()
return err
}
func clearUp(username string) {
var err error
@ -1659,7 +1651,7 @@ func TestAddScanJob(t *testing.T) {
assert.Equal(sj1.Tag, r1.Tag)
assert.Equal(sj1.Status, r1.Status)
assert.Equal(sj1.Repository, r1.Repository)
err = clearTable(ScanJobTable)
err = ClearTable(models.ScanJobTable)
assert.Nil(err)
}
@ -1685,7 +1677,7 @@ func TestGetScanJobs(t *testing.T) {
assert.Equal(1, len(r))
assert.Equal(sj2.Tag, r[0].Tag)
assert.Nil(err)
err = clearTable(ScanJobTable)
err = ClearTable(models.ScanJobTable)
assert.Nil(err)
}
@ -1700,7 +1692,7 @@ func TestUpdateScanJobStatus(t *testing.T) {
assert.Equal("newstatus", j.Status)
err = UpdateScanJobStatus(id+9, "newstatus")
assert.NotNil(err)
err = clearTable(ScanJobTable)
err = ClearTable(models.ScanJobTable)
assert.Nil(err)
}

View File

@ -22,9 +22,6 @@ import (
"time"
)
// ScanJobTable is the table name of scan jobs.
const ScanJobTable = "img_scan_job"
// AddScanJob ...
func AddScanJob(job models.ScanJob) (int64, error) {
o := GetOrmer()
@ -77,5 +74,5 @@ func scanJobQs(limit ...int) orm.QuerySeter {
if len(limit) == 1 {
l = limit[0]
}
return o.QueryTable(ScanJobTable).Limit(l)
return o.QueryTable(models.ScanJobTable).Limit(l)
}

View File

@ -28,6 +28,12 @@ const (
RepOpDelete string = "delete"
//UISecretCookie is the cookie name to contain the UI secret
UISecretCookie string = "secret"
//RepTargetTable is the table name for replication targets
RepTargetTable = "replication_target"
//RepJobTable is the table name for replication jobs
RepJobTable = "replication_job"
//RepPolicyTable is table name for replication policies
RepPolicyTable = "replication_policy"
)
// RepPolicy is the model for a replication policy, which associate to a project and a target (destination)
@ -132,15 +138,15 @@ func (r *RepTarget) Valid(v *validation.Validation) {
//TableName is required by by beego orm to map RepTarget to table replication_target
func (r *RepTarget) TableName() string {
return "replication_target"
return RepTargetTable
}
//TableName is required by by beego orm to map RepJob to table replication_job
func (r *RepJob) TableName() string {
return "replication_job"
return RepJobTable
}
//TableName is required by by beego orm to map RepPolicy to table replication_policy
func (r *RepPolicy) TableName() string {
return "replication_policy"
return RepPolicyTable
}

View File

@ -16,6 +16,9 @@ package models
import "time"
//ScanJobTable is the name of the table whose data is mapped by ScanJob struct.
const ScanJobTable = "img_scan_job"
//ScanJob is the model to represent a job for image scan in DB.
type ScanJob struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
@ -29,5 +32,5 @@ type ScanJob struct {
//TableName is required by by beego orm to map ScanJob to table img_scan_job
func (s *ScanJob) TableName() string {
return "img_scan_job"
return ScanJobTable
}

View File

@ -129,3 +129,8 @@ func NewCapacityHandle() (func(http.ResponseWriter, *http.Request), error) {
}
return Handler(resp), nil
}
// GetDefaultConfigMap returns the defailt config map for easier modification.
func GetDefaultConfigMap() map[string]interface{} {
return adminServerDefaultConfig
}

View File

@ -28,7 +28,6 @@ import (
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/config"
"github.com/vmware/harbor/src/jobservice/job"
"github.com/vmware/harbor/src/jobservice/utils"
)
// ReplicationJob handles /api/replicationJobs /api/replicationJobs/:id/log
@ -126,8 +125,10 @@ func (rj *ReplicationJob) addJob(repo string, policyID int64, operation string,
if err != nil {
return err
}
repJob := job.NewRepJob(id)
log.Debugf("Send job to scheduler, job id: %d", id)
job.Schedule(id)
job.Schedule(repJob)
return nil
}
@ -153,11 +154,13 @@ func (rj *ReplicationJob) HandleAction() {
rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop")
return
}
var jobIDList []int64
var repJobs []job.Job
for _, j := range jobs {
jobIDList = append(jobIDList, j.ID)
//transform the data record to job struct that can be handled by state machine.
repJob := job.NewRepJob(j.ID)
repJobs = append(repJobs, repJob)
}
job.WorkerPool.StopJobs(jobIDList)
job.WorkerPools[job.ReplicationType].StopJobs(repJobs)
}
// GetLog gets logs of the job
@ -169,13 +172,8 @@ func (rj *ReplicationJob) GetLog() {
rj.RenderError(http.StatusBadRequest, "Invalid job id")
return
}
logFile, err := utils.GetJobLogPath(jid)
if err != nil {
log.Errorf("failed to get log path of job %s: %v", idStr, err)
rj.RenderError(http.StatusInternalServerError,
http.StatusText(http.StatusInternalServerError))
return
}
repJob := job.NewRepJob(jid)
logFile := repJob.LogPath()
rj.Ctx.Output.Download(logFile)
}
@ -191,9 +189,7 @@ func getRepoList(projectID int64) ([]string, error) {
if err != nil {
return repositories, err
}
req.AddCookie(&http.Cookie{Name: models.UISecretCookie, Value: config.JobserviceSecret()})
resp, err := client.Do(req)
if err != nil {
return repositories, err
@ -219,7 +215,6 @@ func getRepoList(projectID int64) ([]string, error) {
if err = json.Unmarshal(body, &list); err != nil {
return repositories, err
}
repositories = append(repositories, list...)
links := u.ParseLink(resp.Header.Get(http.CanonicalHeaderKey("link")))

View File

@ -14,9 +14,155 @@
package job
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/jobservice/config"
"os"
"testing"
)
func TestMain(t *testing.T) {
var repJobID int64
func TestMain(m *testing.M) {
//Init config...
conf := test.GetDefaultConfigMap()
if len(os.Getenv("MYSQL_HOST")) > 0 {
conf[common.MySQLHost] = os.Getenv("MYSQL_HOST")
}
if len(os.Getenv("MYSQL_USR")) > 0 {
conf[common.MySQLUsername] = os.Getenv("MYSQL_USR")
}
if len(os.Getenv("MYSQL_PWD")) > 0 {
conf[common.MySQLPassword] = os.Getenv("MYSQL_PWD")
}
server, err := test.NewAdminserver(conf)
if err != nil {
log.Fatalf("failed to create a mock admin server: %v", err)
}
defer server.Close()
if err := os.Setenv("ADMIN_SERVER_URL", server.URL); err != nil {
log.Fatalf("failed to set env %s: %v", "ADMIN_SERVER_URL", err)
}
secretKeyPath := "/tmp/secretkey"
_, err = test.GenerateKey(secretKeyPath)
if err != nil {
log.Fatalf("failed to generate secret key: %v", err)
}
defer os.Remove(secretKeyPath)
if err := os.Setenv("KEY_PATH", secretKeyPath); err != nil {
log.Fatalf("failed to set env %s: %v", "KEY_PATH", err)
}
if err := config.Init(); err != nil {
log.Fatalf("failed to initialize configurations: %v", err)
}
dbSetting, err := config.Database()
if err != nil {
log.Fatalf("failed to get db configurations: %v", err)
}
if err := dao.InitDatabase(dbSetting); err != nil {
log.Fatalf("failed to initialised databse, error: %v", err)
}
//prepare data
if err := prepareRepJobData(); err != nil {
log.Fatalf("failed to initialised databse, error: %v", err)
}
rc := m.Run()
clearRepJobData()
if rc != 0 {
os.Exit(rc)
}
}
func TestRepJob(t *testing.T) {
rj := NewRepJob(repJobID)
assert := assert.New(t)
err := rj.Init()
assert.Nil(err)
assert.Equal(repJobID, rj.ID())
assert.Equal(ReplicationType, rj.Type())
p := fmt.Sprintf("/var/log/jobs/job_%d.log", repJobID)
assert.Equal(p, rj.LogPath())
err = rj.UpdateStatus(models.JobRetrying)
assert.Nil(err)
j, err := dao.GetRepJob(repJobID)
assert.Equal(models.JobRetrying, j.Status)
assert.Equal(1, rj.parm.Enabled)
assert.True(rj.parm.Insecure)
rj2 := NewRepJob(99999)
err = rj2.Init()
assert.NotNil(err)
}
func TestStatusUpdater(t *testing.T) {
assert := assert.New(t)
rj := NewRepJob(repJobID)
su := &StatusUpdater{rj, models.JobFinished}
su.Enter()
su.Exit()
j, err := dao.GetRepJob(repJobID)
assert.Nil(err)
assert.Equal(models.JobFinished, j.Status)
}
func prepareRepJobData() error {
if err := clearRepJobData(); err != nil {
return err
}
regURL, err := config.LocalRegURL()
if err != nil {
return err
}
target := models.RepTarget{
Name: "name",
URL: regURL,
Username: "username",
Password: "password",
}
targetID, err := dao.AddRepTarget(target)
if err != nil {
return err
}
policy := models.RepPolicy{
ProjectID: 1,
Enabled: 1,
TargetID: targetID,
Description: "whatever",
Name: "mypolicy",
}
policyID, err := dao.AddRepPolicy(policy)
if err != nil {
return err
}
job := models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
TagList: []string{"12.01", "14.04", "latest"},
}
id, err := dao.AddRepJob(job)
if err != nil {
return err
}
repJobID = id
return nil
}
func clearRepJobData() error {
if err := dao.ClearTable(models.RepJobTable); err != nil {
return err
}
if err := dao.ClearTable(models.RepPolicyTable); err != nil {
return err
}
if err := dao.ClearTable(models.RepTargetTable); err != nil {
return err
}
return nil
}

168
src/jobservice/job/jobs.go Normal file
View File

@ -0,0 +1,168 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"github.com/vmware/harbor/src/common/dao"
uti "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/jobservice/config"
"fmt"
)
// Type is for job Type
type Type int
const (
// ReplicationType is the Type to identify a replication job.
ReplicationType Type = iota
// ScanType is the Type to identify a image scanning job.
ScanType
)
func (t Type) String() string {
if ReplicationType == t {
return "Replication"
} else if ScanType == t {
return "Scan"
} else {
return "Unknown"
}
}
//Job is abstraction for image replication and image scan jobs.
type Job interface {
//ID returns the id of the job
ID() int64
Type() Type
LogPath() string
UpdateStatus(status string) error
Init() error
//Parm() interface{}
}
// RepJobParm wraps the parm of a replication job
type RepJobParm struct {
LocalRegURL string
TargetURL string
TargetUsername string
TargetPassword string
Repository string
Tags []string
Enabled int
Operation string
Insecure bool
}
// RepJob implements Job interface, represents a replication job.
type RepJob struct {
id int64
parm *RepJobParm
}
// ID returns the ID of the replication job
func (rj *RepJob) ID() int64 {
return rj.id
}
// Type returns the type of the replication job, it should always be ReplicationType
func (rj *RepJob) Type() Type {
return ReplicationType
}
// LogPath returns the absolute path of the particular replication job.
func (rj *RepJob) LogPath() string {
return GetJobLogPath(config.LogDir(), rj.id)
}
// UpdateStatus ...
func (rj *RepJob) UpdateStatus(status string) error {
return dao.UpdateRepJobStatus(rj.id, status)
}
// String ...
func (rj *RepJob) String() string {
return fmt.Sprintf("{JobID: %d, JobType: %v}", rj.ID(), rj.Type())
}
// Init prepares parm for the replication job
func (rj *RepJob) Init() error {
//init parms
job, err := dao.GetRepJob(rj.id)
if err != nil {
return fmt.Errorf("Failed to get job, error: %v", err)
}
if job == nil {
return fmt.Errorf("The job doesn't exist in DB, job id: %d", rj.id)
}
policy, err := dao.GetRepPolicy(job.PolicyID)
if err != nil {
return fmt.Errorf("Failed to get policy, error: %v", err)
}
if policy == nil {
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
}
regURL, err := config.LocalRegURL()
if err != nil {
return err
}
verify, err := config.VerifyRemoteCert()
if err != nil {
return err
}
rj.parm = &RepJobParm{
LocalRegURL: regURL,
Repository: job.Repository,
Tags: job.TagList,
Enabled: policy.Enabled,
Operation: job.Operation,
Insecure: !verify,
}
if policy.Enabled == 0 {
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
if err != nil {
return fmt.Errorf("Failed to get target, error: %v", err)
}
if target == nil {
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
}
rj.parm.TargetURL = target.URL
rj.parm.TargetUsername = target.Username
pwd := target.Password
if len(pwd) != 0 {
key, err := config.SecretKey()
if err != nil {
return err
}
pwd, err = uti.ReversibleDecrypt(pwd, key)
if err != nil {
return fmt.Errorf("failed to decrypt password: %v", err)
}
}
rj.parm.TargetPassword = pwd
return nil
}
// NewRepJob returns a pointer to RepJob which implements the Job interface.
// Given API only gets the id, it will call this func to get a instance that can be manuevered by state machine.
func NewRepJob(id int64) *RepJob {
return &RepJob{id: id}
}

View File

@ -12,25 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
package job
import (
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/config"
)
// NewLogger create a logger for a speicified job
func NewLogger(jobID int64) (*log.Logger, error) {
logFile, err := GetJobLogPath(jobID)
if err != nil {
return nil, err
}
func NewLogger(j Job) (*log.Logger, error) {
logFile := j.LogPath()
d := filepath.Dir(logFile)
if _, err := os.Stat(d); os.IsNotExist(err) {
err := os.MkdirAll(d, 0660)
@ -40,14 +35,14 @@ func NewLogger(jobID int64) (*log.Logger, error) {
}
f, err := os.OpenFile(logFile, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
log.Errorf("Failed to open log file %s, the log of job %d will be printed to standard output, the error: %v", logFile, jobID, err)
log.Errorf("Failed to open log file %s, the log of job %v will be printed to standard output, the error: %v", logFile, j, err)
f = os.Stdout
}
return log.New(f, log.NewTextFormatter(), log.InfoLevel), nil
}
// GetJobLogPath returns the absolute path in which the job log file is located.
func GetJobLogPath(jobID int64) (string, error) {
func GetJobLogPath(base string, jobID int64) string {
f := fmt.Sprintf("job_%d.log", jobID)
k := jobID / 1000
p := ""
@ -64,6 +59,6 @@ func GetJobLogPath(jobID int64) (string, error) {
p = filepath.Join(d, p)
}
p = filepath.Join(config.LogDir(), p, f)
return p, nil
p = filepath.Join(base, p, f)
return p
}

View File

@ -19,17 +19,17 @@ import (
"time"
)
var jobQueue = make(chan int64)
var jobQueue = make(chan Job)
// Schedule put a job id into job queue.
func Schedule(jobID int64) {
jobQueue <- jobID
func Schedule(j Job) {
jobQueue <- j
}
// Reschedule is called by statemachine to retry a job
func Reschedule(jobID int64) {
log.Debugf("Job %d will be rescheduled in 5 minutes", jobID)
func Reschedule(j Job) {
log.Debugf("Job %v will be rescheduled in 5 minutes", j)
time.Sleep(5 * time.Minute)
log.Debugf("Rescheduling job %d", jobID)
Schedule(jobID)
log.Debugf("Rescheduling job %v", j)
Schedule(j)
}

View File

@ -17,7 +17,6 @@ package job
import (
"time"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
)
@ -35,19 +34,20 @@ type StateHandler interface {
// StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters
// a status.
type StatusUpdater struct {
JobID int64
State string
Job Job
Status string
}
// Enter updates the status of a job and returns "_continue" status to tell state machine to move on.
// If the status is a final status it returns empty string and the state machine will be stopped.
func (su StatusUpdater) Enter() (string, error) {
err := dao.UpdateRepJobStatus(su.JobID, su.State)
//err := dao.UpdateRepJobStatus(su.JobID, su.State)
err := su.Job.UpdateStatus(su.Status)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
log.Warningf("Failed to update state of job: %v, status: %s, error: %v", su.Job, su.Status, err)
}
var next = models.JobContinue
if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished {
if su.Status == models.JobStopped || su.Status == models.JobError || su.Status == models.JobFinished {
next = ""
}
return next, err
@ -61,16 +61,16 @@ func (su StatusUpdater) Exit() error {
// Retry handles a special "retrying" in which case it will update the status in DB and reschedule the job
// via scheduler
type Retry struct {
JobID int64
Job Job
}
// Enter ...
func (jr Retry) Enter() (string, error) {
err := dao.UpdateRepJobStatus(jr.JobID, models.JobRetrying)
err := jr.Job.UpdateStatus(models.JobRetrying)
if err != nil {
log.Errorf("Failed to update state of job :%d to Retrying, error: %v", jr.JobID, err)
log.Errorf("Failed to update state of job: %v to Retrying, error: %v", jr.Job, err)
}
go Reschedule(jr.JobID)
go Reschedule(jr.Job)
return "", err
}

View File

@ -18,31 +18,15 @@ import (
"fmt"
"sync"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
uti "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/config"
"github.com/vmware/harbor/src/jobservice/replication"
"github.com/vmware/harbor/src/jobservice/utils"
)
// RepJobParm wraps the parm of a job
type RepJobParm struct {
LocalRegURL string
TargetURL string
TargetUsername string
TargetPassword string
Repository string
Tags []string
Enabled int
Operation string
Insecure bool
}
// SM is the state machine to handle job, it handles one job at a time.
type SM struct {
JobID int64
CurrentJob Job
CurrentState string
PreviousState string
//The states that don't have to exist in transition map, such as "Error", "Canceled"
@ -51,19 +35,18 @@ type SM struct {
Handlers map[string]StateHandler
desiredState string
Logger *log.Logger
Parms *RepJobParm
lock *sync.Mutex
}
// EnterState transit the statemachine from the current state to the state in parameter.
// It returns the next state the statemachine should tranit to.
func (sm *SM) EnterState(s string) (string, error) {
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
log.Debugf("Job: %v, transiting from State: %s, to State: %s", sm.CurrentJob, sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
_, isForced := sm.ForcedStates[s]
if !exist && !isForced {
return "", fmt.Errorf("job id: %d, transition from %s to %s does not exist", sm.JobID, sm.CurrentState, s)
return "", fmt.Errorf("job: %v, transition from %s to %s does not exist", sm.CurrentJob, sm.CurrentState, s)
}
exitHandler, ok := sm.Handlers[sm.CurrentState]
if ok {
@ -71,7 +54,7 @@ func (sm *SM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
log.Debugf("Job: %d, no handler found for state:%s, skip", sm.CurrentJob, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next = models.JobContinue
@ -81,11 +64,11 @@ func (sm *SM) EnterState(s string) (string, error) {
return "", err
}
} else {
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s)
log.Debugf("Job: %v, no handler found for state:%s, skip", sm.CurrentJob, s)
}
sm.PreviousState = sm.CurrentState
sm.CurrentState = s
log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s)
log.Debugf("Job: %v, transition succeeded, current state: %s", sm.CurrentJob, s)
return next, nil
}
@ -94,10 +77,10 @@ func (sm *SM) EnterState(s string) (string, error) {
// will enter error state if there's more than one possible path when next state is "_continue"
func (sm *SM) Start(s string) {
n, err := sm.EnterState(s)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
log.Debugf("Job: %v, next state from handler: %s", sm.CurrentJob, n)
for len(n) > 0 && err == nil {
if d := sm.getDesiredState(); len(d) > 0 {
log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d)
log.Debugf("Job: %v, Desired state: %s, will ignore the next state from handler", sm.CurrentJob, d)
n = d
sm.setDesiredState("")
continue
@ -106,19 +89,19 @@ func (sm *SM) Start(s string) {
for n = range sm.Transitions[sm.CurrentState] {
break
}
log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n)
log.Debugf("Job: %v, Continue to state: %s", sm.CurrentJob, n)
continue
}
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState]))
log.Errorf("Job: %v, next state is continue but there are %d possible next states in transition table", sm.CurrentJob, len(sm.Transitions[sm.CurrentState]))
err = fmt.Errorf("Unable to continue")
break
}
n, err = sm.EnterState(n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
log.Debugf("Job: %v, next state from handler: %s", sm.CurrentJob, n)
}
if err != nil {
log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err)
log.Warningf("Job: %v, the statemachin will enter error state due to error: %v", sm.CurrentJob, err)
sm.EnterState(models.JobError)
}
}
@ -144,16 +127,16 @@ func (sm *SM) RemoveTransition(from string, to string) {
// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job
// and the worker can release itself to the workerpool.
func (sm *SM) Stop(id int64) {
log.Debugf("Trying to stop the job: %d", id)
func (sm *SM) Stop(job Job) {
log.Debugf("Trying to stop the job: %v", job)
sm.lock.Lock()
defer sm.lock.Unlock()
//need to check if the sm switched to other job
if id == sm.JobID {
if job.ID() == sm.CurrentJob.ID() && job.Type() == sm.CurrentJob.Type() {
sm.desiredState = models.JobStopped
log.Debugf("Desired state of job %d is set to stopped", id)
log.Debugf("Desired state of job %v is set to stopped", job)
} else {
log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id)
log.Debugf("State machine has switched to job %v, so the action to stop job %v will be ignored", sm.CurrentJob, job)
}
}
@ -182,125 +165,103 @@ func (sm *SM) Init() {
}
}
// Reset resets the state machine so it will start handling another job.
func (sm *SM) Reset(jid int64) (err error) {
//To ensure the new jobID is visible to the thread to stop the SM
// Reset resets the state machine and after prereq checking, it will start handling the job.
func (sm *SM) Reset(j Job) error {
//To ensure the Job visible to the thread to stop the SM
sm.lock.Lock()
sm.JobID = jid
sm.CurrentJob = j
sm.desiredState = ""
sm.lock.Unlock()
sm.Logger, err = utils.NewLogger(sm.JobID)
if err != nil {
return
}
//init parms
job, err := dao.GetRepJob(sm.JobID)
if err != nil {
return fmt.Errorf("Failed to get job, error: %v", err)
}
if job == nil {
return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID)
}
policy, err := dao.GetRepPolicy(job.PolicyID)
if err != nil {
return fmt.Errorf("Failed to get policy, error: %v", err)
}
if policy == nil {
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
}
regURL, err := config.LocalRegURL()
var err error
sm.Logger, err = NewLogger(j)
if err != nil {
return err
}
verify, err := config.VerifyRemoteCert()
if err != nil {
return err
}
sm.Parms = &RepJobParm{
LocalRegURL: regURL,
Repository: job.Repository,
Tags: job.TagList,
Enabled: policy.Enabled,
Operation: job.Operation,
Insecure: !verify,
}
if policy.Enabled == 0 {
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
if err != nil {
return fmt.Errorf("Failed to get target, error: %v", err)
}
if target == nil {
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
}
sm.Parms.TargetURL = target.URL
sm.Parms.TargetUsername = target.Username
pwd := target.Password
if len(pwd) != 0 {
key, err := config.SecretKey()
if err != nil {
return err
}
pwd, err = uti.ReversibleDecrypt(pwd, key)
if err != nil {
return fmt.Errorf("failed to decrypt password: %v", err)
}
}
sm.Parms.TargetPassword = pwd
//init states handlers
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning})
sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{sm.JobID, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{sm.JobID, models.JobStopped}
sm.Handlers[models.JobRetrying] = Retry{sm.JobID}
switch sm.Parms.Operation {
case models.RepOpTransfer:
addImgTransferTransition(sm)
case models.RepOpDelete:
addImgDeleteTransition(sm)
default:
err = fmt.Errorf("unsupported operation: %s", sm.Parms.Operation)
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.CurrentJob, models.JobRunning})
sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.CurrentJob, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{sm.CurrentJob, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{sm.CurrentJob, models.JobStopped}
sm.Handlers[models.JobRetrying] = Retry{sm.CurrentJob}
if err := sm.CurrentJob.Init(); err != nil {
return err
}
if err := sm.initTransitions(); err != nil {
return err
}
return sm.kickOff()
}
return err
func (sm *SM) kickOff() error {
if repJob, ok := sm.CurrentJob.(*RepJob); ok {
if repJob.parm.Enabled == 0 {
log.Debugf("The policy of job:%v is disabled, will cancel the job", repJob)
if err := repJob.UpdateStatus(models.JobCanceled); err != nil {
log.Warningf("Failed to update status of job: %v to 'canceled', error: %v", repJob, err)
}
}
}
sm.Start(models.JobRunning)
return nil
}
func (sm *SM) initTransitions() error {
switch sm.CurrentJob.Type() {
case ReplicationType:
repJob, ok := sm.CurrentJob.(*RepJob)
if !ok {
//Shouldn't be here.
return fmt.Errorf("The job: %v is not a type of RepJob", sm.CurrentJob)
}
jobParm := repJob.parm
if jobParm.Operation == models.RepOpTransfer {
addImgTransferTransition(sm, jobParm)
} else if jobParm.Operation == models.RepOpDelete {
addImgDeleteTransition(sm, jobParm)
} else {
return fmt.Errorf("unsupported operation: %s", jobParm.Operation)
}
case ScanType:
log.Debugf("TODO for scan job, job: %v", sm.CurrentJob)
return nil
default:
return fmt.Errorf("Unsupported job type: %v", sm.CurrentJob.Type())
}
return nil
}
//for testing onlly
/*
func addTestTransition(sm *SM) error {
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{img: sm.Parms.Repository, logger: sm.Logger})
return nil
}
*/
func addImgTransferTransition(sm *SM) {
base := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.JobserviceSecret(),
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
sm.Parms.Insecure, sm.Parms.Tags, sm.Logger)
func addImgTransferTransition(sm *SM, parm *RepJobParm) {
base := replication.InitBaseHandler(parm.Repository, parm.LocalRegURL, config.JobserviceSecret(),
parm.TargetURL, parm.TargetUsername, parm.TargetPassword,
parm.Insecure, parm.Tags, sm.Logger)
sm.AddTransition(models.JobRunning, replication.StateInitialize, &replication.Initializer{BaseHandler: base})
sm.AddTransition(replication.StateInitialize, replication.StateCheck, &replication.Checker{BaseHandler: base})
sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.CurrentJob, models.JobFinished})
sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base})
sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
}
func addImgDeleteTransition(sm *SM) {
deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL,
sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Parms.Insecure, sm.Logger)
func addImgDeleteTransition(sm *SM, parm *RepJobParm) {
deleter := replication.NewDeleter(parm.Repository, parm.Tags, parm.TargetURL,
parm.TargetUsername, parm.TargetPassword, parm.Insecure, sm.Logger)
sm.AddTransition(models.JobRunning, replication.StateDelete, deleter)
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished})
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.CurrentJob, models.JobFinished})
}

View File

@ -15,29 +15,35 @@
package job
import (
"github.com/vmware/harbor/src/common/dao"
"fmt"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/config"
)
// workerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
type workerPool struct {
poolType Type
workerChan chan *Worker
workerList []*Worker
}
// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
var WorkerPool *workerPool
// WorkerPools is a map contains workerpools for different types of jobs.
var WorkerPools map[Type]*workerPool
//TODO: remove the hard code?
const maxScanWorker = 3
// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker.
func (wp *workerPool) StopJobs(jobs []int64) {
func (wp *workerPool) StopJobs(jobs []Job) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, j := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
if w.SM.CurrentJob.ID() == j.ID() {
log.Debugf("found a worker whose job ID is %d, type: %v, will try to stop it", j.ID(), j.Type())
w.SM.Stop(j)
}
}
}
@ -46,24 +52,31 @@ func (wp *workerPool) StopJobs(jobs []int64) {
// Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine,
// the actual work to handle the job is done via state machine.
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
ID int
Type Type
Jobs chan Job
queue chan *Worker
SM *SM
quit chan bool
}
// String ...
func (w *Worker) String() string {
return fmt.Sprintf("{ID: %d, Type: %v}", w.ID, w.Type)
}
// Start is a loop worker gets id from its channel and handle it.
func (w *Worker) Start() {
go func() {
for {
WorkerPool.workerChan <- w
w.queue <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case job := <-w.Jobs:
log.Debugf("worker: %v, will handle job: %v", w, job)
w.handle(job)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
log.Debugf("worker: %v, will stop.", w)
return
}
}
@ -78,54 +91,57 @@ func (w *Worker) Stop() {
}()
}
func (w *Worker) handleRepJob(id int64) {
err := w.SM.Reset(id)
func (w *Worker) handle(job Job) {
err := w.SM.Reset(job)
if err != nil {
log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err)
err2 := dao.UpdateRepJobStatus(id, models.JobError)
log.Errorf("Worker %v, failed to re-initialize statemachine for job: %v, error: %v", w, job, err)
err2 := job.UpdateStatus(models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2)
log.Errorf("Failed to update job status to ERROR, job: %v, error:%v", job, err2)
}
return
}
if w.SM.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled, will cancel the job", id)
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
w.SM.Logger.Info("The job has been canceled")
} else {
w.SM.Start(models.JobRunning)
}
}
// NewWorker returns a pointer to new instance of worker
func NewWorker(id int) *Worker {
func NewWorker(id int, wp *workerPool) *Worker {
w := &Worker{
ID: id,
RepJobs: make(chan int64),
quit: make(chan bool),
SM: &SM{},
ID: id,
Jobs: make(chan Job),
quit: make(chan bool),
queue: wp.workerChan,
SM: &SM{},
}
w.SM.Init()
return w
}
// InitWorkerPool create workers according to configuration.
func InitWorkerPool() error {
n, err := config.MaxJobWorkers()
// InitWorkerPools create worker pools for different types of jobs.
func InitWorkerPools() error {
if len(WorkerPools) > 0 {
return fmt.Errorf("The WorkerPool map has been initialised")
}
maxRepWorker, err := config.MaxJobWorkers()
if err != nil {
return err
}
WorkerPool = &workerPool{
WorkerPools[ReplicationType] = createWorkerPool(maxRepWorker)
WorkerPools[ScanType] = createWorkerPool(maxScanWorker)
return nil
}
//createWorkerPool create workers according to parm
func createWorkerPool(n int) *workerPool {
wp := &workerPool{
workerChan: make(chan *Worker, n),
workerList: make([]*Worker, 0, n),
}
for i := 0; i < n; i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList, worker)
worker := NewWorker(i, wp)
wp.workerList = append(wp.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
log.Debugf("worker %v started", worker)
}
return nil
return wp
}
// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it.
@ -133,10 +149,10 @@ func Dispatch() {
for {
select {
case job := <-jobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool.workerChan
worker.RepJobs <- jobID
go func(job Job) {
log.Debugf("Trying to dispatch job: %v", job)
worker := <-WorkerPools[job.Type()].workerChan
worker.Jobs <- job
}(job)
}
}

View File

@ -42,13 +42,14 @@ func main() {
}
initRouters()
job.InitWorkerPool()
job.InitWorkerPools()
go job.Dispatch()
resumeJobs()
beego.Run()
}
func resumeJobs() {
//TODO: may need to resume scan jobs also?
log.Debugf("Trying to resume halted jobs...")
err := dao.ResetRunningJobs()
if err != nil {
@ -57,8 +58,9 @@ func resumeJobs() {
jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying)
if err == nil {
for _, j := range jobs {
log.Debugf("Resuming job: %d", j.ID)
job.Schedule(j.ID)
rj := job.NewRepJob(j.ID)
log.Debugf("Resuming job: %v", rj)
job.Schedule(rj)
}
} else {
log.Warningf("Failed to jobs to resume, error: %v", err)

View File

@ -1,22 +0,0 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"testing"
)
func TestMain(t *testing.T) {
}

View File

@ -258,7 +258,7 @@ func (ra *RepositoryAPI) Delete() {
}
if project == nil {
log.Error("project %s not found", projectName)
log.Errorf("project %s not found", projectName)
return
}