diff --git a/src/common/dao/base.go b/src/common/dao/base.go index 0a420fb1d..89e7a1479 100644 --- a/src/common/dao/base.go +++ b/src/common/dao/base.go @@ -80,6 +80,14 @@ func GetOrmer() orm.Ormer { return globalOrm } +// ClearTable is the shortcut for test cases, it should be called only in test cases. +func ClearTable(table string) error { + o := GetOrmer() + sql := fmt.Sprintf("delete from %s where 1=1", table) + _, err := o.Raw(sql).Exec() + return err +} + func paginateForRawSQL(sql string, limit, offset int64) string { return fmt.Sprintf("%s limit %d offset %d", sql, limit, offset) } diff --git a/src/common/dao/dao_test.go b/src/common/dao/dao_test.go index a7400702b..b7a18d21a 100644 --- a/src/common/dao/dao_test.go +++ b/src/common/dao/dao_test.go @@ -15,7 +15,6 @@ package dao import ( - "fmt" "os" "strconv" "testing" @@ -42,13 +41,6 @@ func execUpdate(o orm.Ormer, sql string, params ...interface{}) error { return nil } -func clearTable(table string) error { - o := GetOrmer() - sql := fmt.Sprintf("delete from %s where 1=1", table) - _, err := o.Raw(sql).Exec() - return err -} - func clearUp(username string) { var err error @@ -1659,7 +1651,7 @@ func TestAddScanJob(t *testing.T) { assert.Equal(sj1.Tag, r1.Tag) assert.Equal(sj1.Status, r1.Status) assert.Equal(sj1.Repository, r1.Repository) - err = clearTable(ScanJobTable) + err = ClearTable(models.ScanJobTable) assert.Nil(err) } @@ -1685,7 +1677,7 @@ func TestGetScanJobs(t *testing.T) { assert.Equal(1, len(r)) assert.Equal(sj2.Tag, r[0].Tag) assert.Nil(err) - err = clearTable(ScanJobTable) + err = ClearTable(models.ScanJobTable) assert.Nil(err) } @@ -1700,7 +1692,7 @@ func TestUpdateScanJobStatus(t *testing.T) { assert.Equal("newstatus", j.Status) err = UpdateScanJobStatus(id+9, "newstatus") assert.NotNil(err) - err = clearTable(ScanJobTable) + err = ClearTable(models.ScanJobTable) assert.Nil(err) } diff --git a/src/common/dao/scan_job.go b/src/common/dao/scan_job.go index 29fee7c25..9e2d06bee 100644 --- a/src/common/dao/scan_job.go +++ b/src/common/dao/scan_job.go @@ -22,9 +22,6 @@ import ( "time" ) -// ScanJobTable is the table name of scan jobs. -const ScanJobTable = "img_scan_job" - // AddScanJob ... func AddScanJob(job models.ScanJob) (int64, error) { o := GetOrmer() @@ -77,5 +74,5 @@ func scanJobQs(limit ...int) orm.QuerySeter { if len(limit) == 1 { l = limit[0] } - return o.QueryTable(ScanJobTable).Limit(l) + return o.QueryTable(models.ScanJobTable).Limit(l) } diff --git a/src/common/models/replication_job.go b/src/common/models/replication_job.go index b735c8a62..b02621c01 100644 --- a/src/common/models/replication_job.go +++ b/src/common/models/replication_job.go @@ -28,6 +28,12 @@ const ( RepOpDelete string = "delete" //UISecretCookie is the cookie name to contain the UI secret UISecretCookie string = "secret" + //RepTargetTable is the table name for replication targets + RepTargetTable = "replication_target" + //RepJobTable is the table name for replication jobs + RepJobTable = "replication_job" + //RepPolicyTable is table name for replication policies + RepPolicyTable = "replication_policy" ) // RepPolicy is the model for a replication policy, which associate to a project and a target (destination) @@ -132,15 +138,15 @@ func (r *RepTarget) Valid(v *validation.Validation) { //TableName is required by by beego orm to map RepTarget to table replication_target func (r *RepTarget) TableName() string { - return "replication_target" + return RepTargetTable } //TableName is required by by beego orm to map RepJob to table replication_job func (r *RepJob) TableName() string { - return "replication_job" + return RepJobTable } //TableName is required by by beego orm to map RepPolicy to table replication_policy func (r *RepPolicy) TableName() string { - return "replication_policy" + return RepPolicyTable } diff --git a/src/common/models/scan_job.go b/src/common/models/scan_job.go index b0299a861..286429cbc 100644 --- a/src/common/models/scan_job.go +++ b/src/common/models/scan_job.go @@ -16,6 +16,9 @@ package models import "time" +//ScanJobTable is the name of the table whose data is mapped by ScanJob struct. +const ScanJobTable = "img_scan_job" + //ScanJob is the model to represent a job for image scan in DB. type ScanJob struct { ID int64 `orm:"pk;auto;column(id)" json:"id"` @@ -29,5 +32,5 @@ type ScanJob struct { //TableName is required by by beego orm to map ScanJob to table img_scan_job func (s *ScanJob) TableName() string { - return "img_scan_job" + return ScanJobTable } diff --git a/src/common/utils/test/adminserver.go b/src/common/utils/test/adminserver.go index cb14298d2..39db32de5 100644 --- a/src/common/utils/test/adminserver.go +++ b/src/common/utils/test/adminserver.go @@ -129,3 +129,8 @@ func NewCapacityHandle() (func(http.ResponseWriter, *http.Request), error) { } return Handler(resp), nil } + +// GetDefaultConfigMap returns the defailt config map for easier modification. +func GetDefaultConfigMap() map[string]interface{} { + return adminServerDefaultConfig +} diff --git a/src/jobservice/api/replication.go b/src/jobservice/api/replication.go index 6db126af3..98a203348 100644 --- a/src/jobservice/api/replication.go +++ b/src/jobservice/api/replication.go @@ -28,7 +28,6 @@ import ( "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice/config" "github.com/vmware/harbor/src/jobservice/job" - "github.com/vmware/harbor/src/jobservice/utils" ) // ReplicationJob handles /api/replicationJobs /api/replicationJobs/:id/log @@ -126,8 +125,10 @@ func (rj *ReplicationJob) addJob(repo string, policyID int64, operation string, if err != nil { return err } + repJob := job.NewRepJob(id) + log.Debugf("Send job to scheduler, job id: %d", id) - job.Schedule(id) + job.Schedule(repJob) return nil } @@ -153,11 +154,13 @@ func (rj *ReplicationJob) HandleAction() { rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop") return } - var jobIDList []int64 + var repJobs []job.Job for _, j := range jobs { - jobIDList = append(jobIDList, j.ID) + //transform the data record to job struct that can be handled by state machine. + repJob := job.NewRepJob(j.ID) + repJobs = append(repJobs, repJob) } - job.WorkerPool.StopJobs(jobIDList) + job.WorkerPools[job.ReplicationType].StopJobs(repJobs) } // GetLog gets logs of the job @@ -169,13 +172,8 @@ func (rj *ReplicationJob) GetLog() { rj.RenderError(http.StatusBadRequest, "Invalid job id") return } - logFile, err := utils.GetJobLogPath(jid) - if err != nil { - log.Errorf("failed to get log path of job %s: %v", idStr, err) - rj.RenderError(http.StatusInternalServerError, - http.StatusText(http.StatusInternalServerError)) - return - } + repJob := job.NewRepJob(jid) + logFile := repJob.LogPath() rj.Ctx.Output.Download(logFile) } @@ -191,9 +189,7 @@ func getRepoList(projectID int64) ([]string, error) { if err != nil { return repositories, err } - req.AddCookie(&http.Cookie{Name: models.UISecretCookie, Value: config.JobserviceSecret()}) - resp, err := client.Do(req) if err != nil { return repositories, err @@ -219,7 +215,6 @@ func getRepoList(projectID int64) ([]string, error) { if err = json.Unmarshal(body, &list); err != nil { return repositories, err } - repositories = append(repositories, list...) links := u.ParseLink(resp.Header.Get(http.CanonicalHeaderKey("link"))) diff --git a/src/jobservice/job/job_test.go b/src/jobservice/job/job_test.go index fd7ac3e49..7a8126489 100644 --- a/src/jobservice/job/job_test.go +++ b/src/jobservice/job/job_test.go @@ -14,9 +14,155 @@ package job import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/vmware/harbor/src/common" + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/common/utils/test" + "github.com/vmware/harbor/src/jobservice/config" + "os" "testing" ) -func TestMain(t *testing.T) { +var repJobID int64 + +func TestMain(m *testing.M) { + //Init config... + conf := test.GetDefaultConfigMap() + if len(os.Getenv("MYSQL_HOST")) > 0 { + conf[common.MySQLHost] = os.Getenv("MYSQL_HOST") + } + if len(os.Getenv("MYSQL_USR")) > 0 { + conf[common.MySQLUsername] = os.Getenv("MYSQL_USR") + } + if len(os.Getenv("MYSQL_PWD")) > 0 { + conf[common.MySQLPassword] = os.Getenv("MYSQL_PWD") + } + + server, err := test.NewAdminserver(conf) + if err != nil { + log.Fatalf("failed to create a mock admin server: %v", err) + } + defer server.Close() + if err := os.Setenv("ADMIN_SERVER_URL", server.URL); err != nil { + log.Fatalf("failed to set env %s: %v", "ADMIN_SERVER_URL", err) + } + secretKeyPath := "/tmp/secretkey" + _, err = test.GenerateKey(secretKeyPath) + if err != nil { + log.Fatalf("failed to generate secret key: %v", err) + } + defer os.Remove(secretKeyPath) + if err := os.Setenv("KEY_PATH", secretKeyPath); err != nil { + log.Fatalf("failed to set env %s: %v", "KEY_PATH", err) + } + if err := config.Init(); err != nil { + log.Fatalf("failed to initialize configurations: %v", err) + } + dbSetting, err := config.Database() + if err != nil { + log.Fatalf("failed to get db configurations: %v", err) + } + if err := dao.InitDatabase(dbSetting); err != nil { + log.Fatalf("failed to initialised databse, error: %v", err) + } + //prepare data + if err := prepareRepJobData(); err != nil { + log.Fatalf("failed to initialised databse, error: %v", err) + } + rc := m.Run() + clearRepJobData() + if rc != 0 { + os.Exit(rc) + } } +func TestRepJob(t *testing.T) { + rj := NewRepJob(repJobID) + assert := assert.New(t) + err := rj.Init() + assert.Nil(err) + assert.Equal(repJobID, rj.ID()) + assert.Equal(ReplicationType, rj.Type()) + p := fmt.Sprintf("/var/log/jobs/job_%d.log", repJobID) + assert.Equal(p, rj.LogPath()) + err = rj.UpdateStatus(models.JobRetrying) + assert.Nil(err) + j, err := dao.GetRepJob(repJobID) + assert.Equal(models.JobRetrying, j.Status) + assert.Equal(1, rj.parm.Enabled) + assert.True(rj.parm.Insecure) + rj2 := NewRepJob(99999) + err = rj2.Init() + assert.NotNil(err) +} + +func TestStatusUpdater(t *testing.T) { + assert := assert.New(t) + rj := NewRepJob(repJobID) + su := &StatusUpdater{rj, models.JobFinished} + su.Enter() + su.Exit() + j, err := dao.GetRepJob(repJobID) + assert.Nil(err) + assert.Equal(models.JobFinished, j.Status) +} + +func prepareRepJobData() error { + if err := clearRepJobData(); err != nil { + return err + } + regURL, err := config.LocalRegURL() + if err != nil { + return err + } + target := models.RepTarget{ + Name: "name", + URL: regURL, + Username: "username", + Password: "password", + } + + targetID, err := dao.AddRepTarget(target) + if err != nil { + return err + } + policy := models.RepPolicy{ + ProjectID: 1, + Enabled: 1, + TargetID: targetID, + Description: "whatever", + Name: "mypolicy", + } + policyID, err := dao.AddRepPolicy(policy) + if err != nil { + return err + } + job := models.RepJob{ + Repository: "library/ubuntu", + PolicyID: policyID, + Operation: "transfer", + TagList: []string{"12.01", "14.04", "latest"}, + } + id, err := dao.AddRepJob(job) + if err != nil { + return err + } + repJobID = id + return nil +} + +func clearRepJobData() error { + if err := dao.ClearTable(models.RepJobTable); err != nil { + return err + } + if err := dao.ClearTable(models.RepPolicyTable); err != nil { + return err + } + if err := dao.ClearTable(models.RepTargetTable); err != nil { + return err + } + return nil +} diff --git a/src/jobservice/job/jobs.go b/src/jobservice/job/jobs.go new file mode 100644 index 000000000..ef33e4ad4 --- /dev/null +++ b/src/jobservice/job/jobs.go @@ -0,0 +1,168 @@ +// Copyright (c) 2017 VMware, Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package job + +import ( + "github.com/vmware/harbor/src/common/dao" + uti "github.com/vmware/harbor/src/common/utils" + "github.com/vmware/harbor/src/jobservice/config" + + "fmt" +) + +// Type is for job Type +type Type int + +const ( + // ReplicationType is the Type to identify a replication job. + ReplicationType Type = iota + // ScanType is the Type to identify a image scanning job. + ScanType +) + +func (t Type) String() string { + if ReplicationType == t { + return "Replication" + } else if ScanType == t { + return "Scan" + } else { + return "Unknown" + } +} + +//Job is abstraction for image replication and image scan jobs. +type Job interface { + //ID returns the id of the job + ID() int64 + Type() Type + LogPath() string + UpdateStatus(status string) error + Init() error + //Parm() interface{} +} + +// RepJobParm wraps the parm of a replication job +type RepJobParm struct { + LocalRegURL string + TargetURL string + TargetUsername string + TargetPassword string + Repository string + Tags []string + Enabled int + Operation string + Insecure bool +} + +// RepJob implements Job interface, represents a replication job. +type RepJob struct { + id int64 + parm *RepJobParm +} + +// ID returns the ID of the replication job +func (rj *RepJob) ID() int64 { + return rj.id +} + +// Type returns the type of the replication job, it should always be ReplicationType +func (rj *RepJob) Type() Type { + return ReplicationType +} + +// LogPath returns the absolute path of the particular replication job. +func (rj *RepJob) LogPath() string { + return GetJobLogPath(config.LogDir(), rj.id) +} + +// UpdateStatus ... +func (rj *RepJob) UpdateStatus(status string) error { + return dao.UpdateRepJobStatus(rj.id, status) +} + +// String ... +func (rj *RepJob) String() string { + return fmt.Sprintf("{JobID: %d, JobType: %v}", rj.ID(), rj.Type()) +} + +// Init prepares parm for the replication job +func (rj *RepJob) Init() error { + //init parms + job, err := dao.GetRepJob(rj.id) + if err != nil { + return fmt.Errorf("Failed to get job, error: %v", err) + } + if job == nil { + return fmt.Errorf("The job doesn't exist in DB, job id: %d", rj.id) + } + policy, err := dao.GetRepPolicy(job.PolicyID) + if err != nil { + return fmt.Errorf("Failed to get policy, error: %v", err) + } + if policy == nil { + return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID) + } + + regURL, err := config.LocalRegURL() + if err != nil { + return err + } + verify, err := config.VerifyRemoteCert() + if err != nil { + return err + } + rj.parm = &RepJobParm{ + LocalRegURL: regURL, + Repository: job.Repository, + Tags: job.TagList, + Enabled: policy.Enabled, + Operation: job.Operation, + Insecure: !verify, + } + if policy.Enabled == 0 { + //worker will cancel this job + return nil + } + target, err := dao.GetRepTarget(policy.TargetID) + if err != nil { + return fmt.Errorf("Failed to get target, error: %v", err) + } + if target == nil { + return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID) + } + rj.parm.TargetURL = target.URL + rj.parm.TargetUsername = target.Username + pwd := target.Password + + if len(pwd) != 0 { + key, err := config.SecretKey() + if err != nil { + return err + } + pwd, err = uti.ReversibleDecrypt(pwd, key) + if err != nil { + return fmt.Errorf("failed to decrypt password: %v", err) + } + } + + rj.parm.TargetPassword = pwd + return nil +} + +// NewRepJob returns a pointer to RepJob which implements the Job interface. +// Given API only gets the id, it will call this func to get a instance that can be manuevered by state machine. +func NewRepJob(id int64) *RepJob { + return &RepJob{id: id} +} diff --git a/src/jobservice/utils/logger.go b/src/jobservice/job/logger.go similarity index 77% rename from src/jobservice/utils/logger.go rename to src/jobservice/job/logger.go index f60896aac..aa7212e22 100644 --- a/src/jobservice/utils/logger.go +++ b/src/jobservice/job/logger.go @@ -12,25 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package utils +package job import ( "fmt" - "os" "path/filepath" "strconv" "github.com/vmware/harbor/src/common/utils/log" - "github.com/vmware/harbor/src/jobservice/config" ) // NewLogger create a logger for a speicified job -func NewLogger(jobID int64) (*log.Logger, error) { - logFile, err := GetJobLogPath(jobID) - if err != nil { - return nil, err - } +func NewLogger(j Job) (*log.Logger, error) { + logFile := j.LogPath() d := filepath.Dir(logFile) if _, err := os.Stat(d); os.IsNotExist(err) { err := os.MkdirAll(d, 0660) @@ -40,14 +35,14 @@ func NewLogger(jobID int64) (*log.Logger, error) { } f, err := os.OpenFile(logFile, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { - log.Errorf("Failed to open log file %s, the log of job %d will be printed to standard output, the error: %v", logFile, jobID, err) + log.Errorf("Failed to open log file %s, the log of job %v will be printed to standard output, the error: %v", logFile, j, err) f = os.Stdout } return log.New(f, log.NewTextFormatter(), log.InfoLevel), nil } // GetJobLogPath returns the absolute path in which the job log file is located. -func GetJobLogPath(jobID int64) (string, error) { +func GetJobLogPath(base string, jobID int64) string { f := fmt.Sprintf("job_%d.log", jobID) k := jobID / 1000 p := "" @@ -64,6 +59,6 @@ func GetJobLogPath(jobID int64) (string, error) { p = filepath.Join(d, p) } - p = filepath.Join(config.LogDir(), p, f) - return p, nil + p = filepath.Join(base, p, f) + return p } diff --git a/src/jobservice/job/scheduler.go b/src/jobservice/job/scheduler.go index a7c68d24e..38068f302 100644 --- a/src/jobservice/job/scheduler.go +++ b/src/jobservice/job/scheduler.go @@ -19,17 +19,17 @@ import ( "time" ) -var jobQueue = make(chan int64) +var jobQueue = make(chan Job) // Schedule put a job id into job queue. -func Schedule(jobID int64) { - jobQueue <- jobID +func Schedule(j Job) { + jobQueue <- j } // Reschedule is called by statemachine to retry a job -func Reschedule(jobID int64) { - log.Debugf("Job %d will be rescheduled in 5 minutes", jobID) +func Reschedule(j Job) { + log.Debugf("Job %v will be rescheduled in 5 minutes", j) time.Sleep(5 * time.Minute) - log.Debugf("Rescheduling job %d", jobID) - Schedule(jobID) + log.Debugf("Rescheduling job %v", j) + Schedule(j) } diff --git a/src/jobservice/job/statehandlers.go b/src/jobservice/job/statehandlers.go index 28cce6df6..56c95bca6 100644 --- a/src/jobservice/job/statehandlers.go +++ b/src/jobservice/job/statehandlers.go @@ -17,7 +17,6 @@ package job import ( "time" - "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/utils/log" ) @@ -35,19 +34,20 @@ type StateHandler interface { // StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters // a status. type StatusUpdater struct { - JobID int64 - State string + Job Job + Status string } // Enter updates the status of a job and returns "_continue" status to tell state machine to move on. // If the status is a final status it returns empty string and the state machine will be stopped. func (su StatusUpdater) Enter() (string, error) { - err := dao.UpdateRepJobStatus(su.JobID, su.State) + //err := dao.UpdateRepJobStatus(su.JobID, su.State) + err := su.Job.UpdateStatus(su.Status) if err != nil { - log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err) + log.Warningf("Failed to update state of job: %v, status: %s, error: %v", su.Job, su.Status, err) } var next = models.JobContinue - if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished { + if su.Status == models.JobStopped || su.Status == models.JobError || su.Status == models.JobFinished { next = "" } return next, err @@ -61,16 +61,16 @@ func (su StatusUpdater) Exit() error { // Retry handles a special "retrying" in which case it will update the status in DB and reschedule the job // via scheduler type Retry struct { - JobID int64 + Job Job } // Enter ... func (jr Retry) Enter() (string, error) { - err := dao.UpdateRepJobStatus(jr.JobID, models.JobRetrying) + err := jr.Job.UpdateStatus(models.JobRetrying) if err != nil { - log.Errorf("Failed to update state of job :%d to Retrying, error: %v", jr.JobID, err) + log.Errorf("Failed to update state of job: %v to Retrying, error: %v", jr.Job, err) } - go Reschedule(jr.JobID) + go Reschedule(jr.Job) return "", err } diff --git a/src/jobservice/job/statemachine.go b/src/jobservice/job/statemachine.go index 22bec71a3..431c86e32 100644 --- a/src/jobservice/job/statemachine.go +++ b/src/jobservice/job/statemachine.go @@ -18,31 +18,15 @@ import ( "fmt" "sync" - "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" - uti "github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice/config" "github.com/vmware/harbor/src/jobservice/replication" - "github.com/vmware/harbor/src/jobservice/utils" ) -// RepJobParm wraps the parm of a job -type RepJobParm struct { - LocalRegURL string - TargetURL string - TargetUsername string - TargetPassword string - Repository string - Tags []string - Enabled int - Operation string - Insecure bool -} - // SM is the state machine to handle job, it handles one job at a time. type SM struct { - JobID int64 + CurrentJob Job CurrentState string PreviousState string //The states that don't have to exist in transition map, such as "Error", "Canceled" @@ -51,19 +35,18 @@ type SM struct { Handlers map[string]StateHandler desiredState string Logger *log.Logger - Parms *RepJobParm lock *sync.Mutex } // EnterState transit the statemachine from the current state to the state in parameter. // It returns the next state the statemachine should tranit to. func (sm *SM) EnterState(s string) (string, error) { - log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s) + log.Debugf("Job: %v, transiting from State: %s, to State: %s", sm.CurrentJob, sm.CurrentState, s) targets, ok := sm.Transitions[sm.CurrentState] _, exist := targets[s] _, isForced := sm.ForcedStates[s] if !exist && !isForced { - return "", fmt.Errorf("job id: %d, transition from %s to %s does not exist", sm.JobID, sm.CurrentState, s) + return "", fmt.Errorf("job: %v, transition from %s to %s does not exist", sm.CurrentJob, sm.CurrentState, s) } exitHandler, ok := sm.Handlers[sm.CurrentState] if ok { @@ -71,7 +54,7 @@ func (sm *SM) EnterState(s string) (string, error) { return "", err } } else { - log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState) + log.Debugf("Job: %d, no handler found for state:%s, skip", sm.CurrentJob, sm.CurrentState) } enterHandler, ok := sm.Handlers[s] var next = models.JobContinue @@ -81,11 +64,11 @@ func (sm *SM) EnterState(s string) (string, error) { return "", err } } else { - log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s) + log.Debugf("Job: %v, no handler found for state:%s, skip", sm.CurrentJob, s) } sm.PreviousState = sm.CurrentState sm.CurrentState = s - log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s) + log.Debugf("Job: %v, transition succeeded, current state: %s", sm.CurrentJob, s) return next, nil } @@ -94,10 +77,10 @@ func (sm *SM) EnterState(s string) (string, error) { // will enter error state if there's more than one possible path when next state is "_continue" func (sm *SM) Start(s string) { n, err := sm.EnterState(s) - log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n) + log.Debugf("Job: %v, next state from handler: %s", sm.CurrentJob, n) for len(n) > 0 && err == nil { if d := sm.getDesiredState(); len(d) > 0 { - log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d) + log.Debugf("Job: %v, Desired state: %s, will ignore the next state from handler", sm.CurrentJob, d) n = d sm.setDesiredState("") continue @@ -106,19 +89,19 @@ func (sm *SM) Start(s string) { for n = range sm.Transitions[sm.CurrentState] { break } - log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n) + log.Debugf("Job: %v, Continue to state: %s", sm.CurrentJob, n) continue } if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 { - log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState])) + log.Errorf("Job: %v, next state is continue but there are %d possible next states in transition table", sm.CurrentJob, len(sm.Transitions[sm.CurrentState])) err = fmt.Errorf("Unable to continue") break } n, err = sm.EnterState(n) - log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n) + log.Debugf("Job: %v, next state from handler: %s", sm.CurrentJob, n) } if err != nil { - log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err) + log.Warningf("Job: %v, the statemachin will enter error state due to error: %v", sm.CurrentJob, err) sm.EnterState(models.JobError) } } @@ -144,16 +127,16 @@ func (sm *SM) RemoveTransition(from string, to string) { // Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job // and the worker can release itself to the workerpool. -func (sm *SM) Stop(id int64) { - log.Debugf("Trying to stop the job: %d", id) +func (sm *SM) Stop(job Job) { + log.Debugf("Trying to stop the job: %v", job) sm.lock.Lock() defer sm.lock.Unlock() //need to check if the sm switched to other job - if id == sm.JobID { + if job.ID() == sm.CurrentJob.ID() && job.Type() == sm.CurrentJob.Type() { sm.desiredState = models.JobStopped - log.Debugf("Desired state of job %d is set to stopped", id) + log.Debugf("Desired state of job %v is set to stopped", job) } else { - log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id) + log.Debugf("State machine has switched to job %v, so the action to stop job %v will be ignored", sm.CurrentJob, job) } } @@ -182,125 +165,103 @@ func (sm *SM) Init() { } } -// Reset resets the state machine so it will start handling another job. -func (sm *SM) Reset(jid int64) (err error) { - //To ensure the new jobID is visible to the thread to stop the SM +// Reset resets the state machine and after prereq checking, it will start handling the job. +func (sm *SM) Reset(j Job) error { + //To ensure the Job visible to the thread to stop the SM sm.lock.Lock() - sm.JobID = jid + sm.CurrentJob = j sm.desiredState = "" sm.lock.Unlock() - sm.Logger, err = utils.NewLogger(sm.JobID) - if err != nil { - return - } - //init parms - job, err := dao.GetRepJob(sm.JobID) - if err != nil { - return fmt.Errorf("Failed to get job, error: %v", err) - } - if job == nil { - return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID) - } - policy, err := dao.GetRepPolicy(job.PolicyID) - if err != nil { - return fmt.Errorf("Failed to get policy, error: %v", err) - } - if policy == nil { - return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID) - } - - regURL, err := config.LocalRegURL() + var err error + sm.Logger, err = NewLogger(j) if err != nil { return err } - verify, err := config.VerifyRemoteCert() - if err != nil { - return err - } - sm.Parms = &RepJobParm{ - LocalRegURL: regURL, - Repository: job.Repository, - Tags: job.TagList, - Enabled: policy.Enabled, - Operation: job.Operation, - Insecure: !verify, - } - if policy.Enabled == 0 { - //worker will cancel this job - return nil - } - target, err := dao.GetRepTarget(policy.TargetID) - if err != nil { - return fmt.Errorf("Failed to get target, error: %v", err) - } - if target == nil { - return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID) - } - sm.Parms.TargetURL = target.URL - sm.Parms.TargetUsername = target.Username - pwd := target.Password - - if len(pwd) != 0 { - key, err := config.SecretKey() - if err != nil { - return err - } - pwd, err = uti.ReversibleDecrypt(pwd, key) - if err != nil { - return fmt.Errorf("failed to decrypt password: %v", err) - } - } - - sm.Parms.TargetPassword = pwd - //init states handlers sm.Handlers = make(map[string]StateHandler) sm.Transitions = make(map[string]map[string]struct{}) sm.CurrentState = models.JobPending - sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning}) - sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.JobID, models.JobRunning}) - sm.Handlers[models.JobError] = StatusUpdater{sm.JobID, models.JobError} - sm.Handlers[models.JobStopped] = StatusUpdater{sm.JobID, models.JobStopped} - sm.Handlers[models.JobRetrying] = Retry{sm.JobID} - - switch sm.Parms.Operation { - case models.RepOpTransfer: - addImgTransferTransition(sm) - case models.RepOpDelete: - addImgDeleteTransition(sm) - default: - err = fmt.Errorf("unsupported operation: %s", sm.Parms.Operation) + sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{sm.CurrentJob, models.JobRunning}) + sm.AddTransition(models.JobRetrying, models.JobRunning, StatusUpdater{sm.CurrentJob, models.JobRunning}) + sm.Handlers[models.JobError] = StatusUpdater{sm.CurrentJob, models.JobError} + sm.Handlers[models.JobStopped] = StatusUpdater{sm.CurrentJob, models.JobStopped} + sm.Handlers[models.JobRetrying] = Retry{sm.CurrentJob} + if err := sm.CurrentJob.Init(); err != nil { + return err } + if err := sm.initTransitions(); err != nil { + return err + } + return sm.kickOff() +} - return err +func (sm *SM) kickOff() error { + if repJob, ok := sm.CurrentJob.(*RepJob); ok { + if repJob.parm.Enabled == 0 { + log.Debugf("The policy of job:%v is disabled, will cancel the job", repJob) + if err := repJob.UpdateStatus(models.JobCanceled); err != nil { + log.Warningf("Failed to update status of job: %v to 'canceled', error: %v", repJob, err) + + } + } + } + sm.Start(models.JobRunning) + return nil +} + +func (sm *SM) initTransitions() error { + switch sm.CurrentJob.Type() { + case ReplicationType: + repJob, ok := sm.CurrentJob.(*RepJob) + if !ok { + //Shouldn't be here. + return fmt.Errorf("The job: %v is not a type of RepJob", sm.CurrentJob) + } + jobParm := repJob.parm + if jobParm.Operation == models.RepOpTransfer { + addImgTransferTransition(sm, jobParm) + } else if jobParm.Operation == models.RepOpDelete { + addImgDeleteTransition(sm, jobParm) + } else { + return fmt.Errorf("unsupported operation: %s", jobParm.Operation) + } + case ScanType: + log.Debugf("TODO for scan job, job: %v", sm.CurrentJob) + return nil + default: + return fmt.Errorf("Unsupported job type: %v", sm.CurrentJob.Type()) + } + return nil } //for testing onlly +/* func addTestTransition(sm *SM) error { sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{img: sm.Parms.Repository, logger: sm.Logger}) return nil } +*/ -func addImgTransferTransition(sm *SM) { - base := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.JobserviceSecret(), - sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword, - sm.Parms.Insecure, sm.Parms.Tags, sm.Logger) +func addImgTransferTransition(sm *SM, parm *RepJobParm) { + base := replication.InitBaseHandler(parm.Repository, parm.LocalRegURL, config.JobserviceSecret(), + parm.TargetURL, parm.TargetUsername, parm.TargetPassword, + parm.Insecure, parm.Tags, sm.Logger) sm.AddTransition(models.JobRunning, replication.StateInitialize, &replication.Initializer{BaseHandler: base}) sm.AddTransition(replication.StateInitialize, replication.StateCheck, &replication.Checker{BaseHandler: base}) sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base}) - sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished}) + sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{sm.CurrentJob, models.JobFinished}) sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base}) sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base}) } -func addImgDeleteTransition(sm *SM) { - deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL, - sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Parms.Insecure, sm.Logger) +func addImgDeleteTransition(sm *SM, parm *RepJobParm) { + deleter := replication.NewDeleter(parm.Repository, parm.Tags, parm.TargetURL, + parm.TargetUsername, parm.TargetPassword, parm.Insecure, sm.Logger) sm.AddTransition(models.JobRunning, replication.StateDelete, deleter) - sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.JobID, models.JobFinished}) + sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{sm.CurrentJob, models.JobFinished}) } diff --git a/src/jobservice/job/workerpool.go b/src/jobservice/job/workerpool.go index 0d8a5d314..2969a9163 100644 --- a/src/jobservice/job/workerpool.go +++ b/src/jobservice/job/workerpool.go @@ -15,29 +15,35 @@ package job import ( - "github.com/vmware/harbor/src/common/dao" + "fmt" + "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/jobservice/config" ) +// workerPool is a set of workers each worker is associate to a statemachine for handling jobs. +// it consists of a channel for free workers and a list to all workers type workerPool struct { + poolType Type workerChan chan *Worker workerList []*Worker } -// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs. -// it consists of a channel for free workers and a list to all workers -var WorkerPool *workerPool +// WorkerPools is a map contains workerpools for different types of jobs. +var WorkerPools map[Type]*workerPool + +//TODO: remove the hard code? +const maxScanWorker = 3 // StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker. -func (wp *workerPool) StopJobs(jobs []int64) { +func (wp *workerPool) StopJobs(jobs []Job) { log.Debugf("Works working on jobs: %v will be stopped", jobs) - for _, id := range jobs { + for _, j := range jobs { for _, w := range wp.workerList { - if w.SM.JobID == id { - log.Debugf("found a worker whose job ID is %d, will try to stop it", id) - w.SM.Stop(id) + if w.SM.CurrentJob.ID() == j.ID() { + log.Debugf("found a worker whose job ID is %d, type: %v, will try to stop it", j.ID(), j.Type()) + w.SM.Stop(j) } } } @@ -46,24 +52,31 @@ func (wp *workerPool) StopJobs(jobs []int64) { // Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine, // the actual work to handle the job is done via state machine. type Worker struct { - ID int - RepJobs chan int64 - SM *SM - quit chan bool + ID int + Type Type + Jobs chan Job + queue chan *Worker + SM *SM + quit chan bool +} + +// String ... +func (w *Worker) String() string { + return fmt.Sprintf("{ID: %d, Type: %v}", w.ID, w.Type) } // Start is a loop worker gets id from its channel and handle it. func (w *Worker) Start() { go func() { for { - WorkerPool.workerChan <- w + w.queue <- w select { - case jobID := <-w.RepJobs: - log.Debugf("worker: %d, will handle job: %d", w.ID, jobID) - w.handleRepJob(jobID) + case job := <-w.Jobs: + log.Debugf("worker: %v, will handle job: %v", w, job) + w.handle(job) case q := <-w.quit: if q { - log.Debugf("worker: %d, will stop.", w.ID) + log.Debugf("worker: %v, will stop.", w) return } } @@ -78,54 +91,57 @@ func (w *Worker) Stop() { }() } -func (w *Worker) handleRepJob(id int64) { - err := w.SM.Reset(id) +func (w *Worker) handle(job Job) { + err := w.SM.Reset(job) if err != nil { - log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err) - err2 := dao.UpdateRepJobStatus(id, models.JobError) + log.Errorf("Worker %v, failed to re-initialize statemachine for job: %v, error: %v", w, job, err) + err2 := job.UpdateStatus(models.JobError) if err2 != nil { - log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2) + log.Errorf("Failed to update job status to ERROR, job: %v, error:%v", job, err2) } - return - } - if w.SM.Parms.Enabled == 0 { - log.Debugf("The policy of job:%d is disabled, will cancel the job", id) - _ = dao.UpdateRepJobStatus(id, models.JobCanceled) - w.SM.Logger.Info("The job has been canceled") - } else { - w.SM.Start(models.JobRunning) } } // NewWorker returns a pointer to new instance of worker -func NewWorker(id int) *Worker { +func NewWorker(id int, wp *workerPool) *Worker { w := &Worker{ - ID: id, - RepJobs: make(chan int64), - quit: make(chan bool), - SM: &SM{}, + ID: id, + Jobs: make(chan Job), + quit: make(chan bool), + queue: wp.workerChan, + SM: &SM{}, } w.SM.Init() return w } -// InitWorkerPool create workers according to configuration. -func InitWorkerPool() error { - n, err := config.MaxJobWorkers() +// InitWorkerPools create worker pools for different types of jobs. +func InitWorkerPools() error { + if len(WorkerPools) > 0 { + return fmt.Errorf("The WorkerPool map has been initialised") + } + maxRepWorker, err := config.MaxJobWorkers() if err != nil { return err } - WorkerPool = &workerPool{ + WorkerPools[ReplicationType] = createWorkerPool(maxRepWorker) + WorkerPools[ScanType] = createWorkerPool(maxScanWorker) + return nil +} + +//createWorkerPool create workers according to parm +func createWorkerPool(n int) *workerPool { + wp := &workerPool{ workerChan: make(chan *Worker, n), workerList: make([]*Worker, 0, n), } for i := 0; i < n; i++ { - worker := NewWorker(i) - WorkerPool.workerList = append(WorkerPool.workerList, worker) + worker := NewWorker(i, wp) + wp.workerList = append(wp.workerList, worker) worker.Start() - log.Debugf("worker %d started", worker.ID) + log.Debugf("worker %v started", worker) } - return nil + return wp } // Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it. @@ -133,10 +149,10 @@ func Dispatch() { for { select { case job := <-jobQueue: - go func(jobID int64) { - log.Debugf("Trying to dispatch job: %d", jobID) - worker := <-WorkerPool.workerChan - worker.RepJobs <- jobID + go func(job Job) { + log.Debugf("Trying to dispatch job: %v", job) + worker := <-WorkerPools[job.Type()].workerChan + worker.Jobs <- job }(job) } } diff --git a/src/jobservice/main.go b/src/jobservice/main.go index 48cd559e1..e62a7a0e6 100644 --- a/src/jobservice/main.go +++ b/src/jobservice/main.go @@ -42,13 +42,14 @@ func main() { } initRouters() - job.InitWorkerPool() + job.InitWorkerPools() go job.Dispatch() resumeJobs() beego.Run() } func resumeJobs() { + //TODO: may need to resume scan jobs also? log.Debugf("Trying to resume halted jobs...") err := dao.ResetRunningJobs() if err != nil { @@ -57,8 +58,9 @@ func resumeJobs() { jobs, err := dao.GetRepJobByStatus(models.JobPending, models.JobRetrying) if err == nil { for _, j := range jobs { - log.Debugf("Resuming job: %d", j.ID) - job.Schedule(j.ID) + rj := job.NewRepJob(j.ID) + log.Debugf("Resuming job: %v", rj) + job.Schedule(rj) } } else { log.Warningf("Failed to jobs to resume, error: %v", err) diff --git a/src/jobservice/utils/utils_test.go b/src/jobservice/utils/utils_test.go deleted file mode 100644 index 2b179d5ca..000000000 --- a/src/jobservice/utils/utils_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) 2017 VMware, Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package utils - -import ( - "testing" -) - -func TestMain(t *testing.T) { -} - diff --git a/src/ui/api/repository.go b/src/ui/api/repository.go index 7dc036e63..7adcb0c5e 100644 --- a/src/ui/api/repository.go +++ b/src/ui/api/repository.go @@ -258,7 +258,7 @@ func (ra *RepositoryAPI) Delete() { } if project == nil { - log.Error("project %s not found", projectName) + log.Errorf("project %s not found", projectName) return }