job service rework

This commit is contained in:
Tan Jiang 2016-05-10 19:38:50 +08:00
parent f0a2bc879a
commit 4541ed27f0
15 changed files with 731 additions and 97 deletions

View File

@ -103,19 +103,41 @@ create table access_log (
FOREIGN KEY (project_id) REFERENCES project (project_id)
);
create table job (
job_id int NOT NULL AUTO_INCREMENT,
job_type varchar(64) NOT NULL,
status varchar(64) NOT NULL,
options text,
parms text,
create table replication_policy (
id int NOT NULL AUTO_INCREMENT,
name varchar(256),
project_id int NOT NULL,
target_id int NOT NULL,
enabled tinyint(1) NOT NULL DEFAULT 1,
description text,
cron_str varchar(256),
triggered_by varchar(64),
creation_time timestamp,
update_time timestamp,
PRIMARY KEY (job_id)
);
start_time timestamp,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table replication_target (
id int NOT NULL AUTO_INCREMENT,
name varchar(64),
url varchar(64),
username varchar(40),
password varchar(40),
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table replication_job (
id int NOT NULL AUTO_INCREMENT,
status varchar(64) NOT NULL,
policy_id int NOT NULL,
repository varchar(256) NOT NULL,
operation varchar(64) NOT NULL,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table job_log (
log_id int NOT NULL AUTO_INCREMENT,
@ -125,7 +147,7 @@ create table job_log (
creation_time timestamp,
update_time timestamp,
PRIMARY KEY (log_id),
FOREIGN KEY (job_id) REFERENCES job (job_id)
FOREIGN KEY (job_id) REFERENCES replication_job (id)
);
create table properties (

View File

@ -1,5 +1,6 @@
package api
/*
import (
"encoding/json"
"fmt"
@ -15,6 +16,7 @@ type JobAPI struct {
BaseAPI
}
func (ja *JobAPI) Post() {
var je models.JobEntry
ja.DecodeJSONReq(&je)
@ -82,4 +84,4 @@ func (ja *JobAPI) Get() {
ja.Data["json"] = jobs
}
ja.ServeJSON()
}
}*/

106
api/replication.go Normal file
View File

@ -0,0 +1,106 @@
package api
import (
"encoding/json"
"fmt"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"io/ioutil"
"net/http"
"net/http/httputil"
"os"
"strconv"
"strings"
)
type ReplicationJob struct {
BaseAPI
}
type ReplicationReq struct {
PolicyID int64 `json:"policy_id"`
}
func (rj *ReplicationJob) Post() {
var data ReplicationReq
rj.DecodeJSONReq(&data)
log.Debugf("data: %+v", data)
p, err := dao.GetRepPolicy(data.PolicyID)
if err != nil {
log.Errorf("Failed to get policy, error: %v", err)
rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID))
return
}
repoList, err := getRepoList(p.ProjectID)
if err != nil {
log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err)
rj.RenderError(http.StatusInternalServerError, err.Error())
return
}
log.Debugf("repo list: %v", repoList)
for _, repo := range repoList {
j := models.RepJob{
Repository: repo,
PolicyID: data.PolicyID,
Operation: models.RepOpTransfer,
}
log.Debugf("Creating job for repo: %s, policy: %d", repo, data.PolicyID)
id, err := dao.AddRepJob(j)
if err != nil {
log.Errorf("Failed to insert job record, error: %v", err)
rj.RenderError(http.StatusInternalServerError, err.Error())
return
}
log.Debugf("Send job to scheduler, job id: %d", id)
job.Schedule(id)
}
}
// calls the api from UI to get repo list
func getRepoList(projectID int64) ([]string, error) {
uiURL := os.Getenv("UI_URL")
if len(uiURL) == 0 {
uiURL = "ui"
}
if !strings.HasSuffix(uiURL, "/") {
uiURL += "/"
}
//TODO:Use secret key instead
uiUser := os.Getenv("UI_USR")
if len(uiUser) == 0 {
uiUser = "admin"
}
uiPwd := os.Getenv("UI_PWD")
if len(uiPwd) == 0 {
uiPwd = "Harbor12345"
}
client := &http.Client{}
req, err := http.NewRequest("GET", uiURL+"api/repositories?project_id="+strconv.Itoa(int(projectID)), nil)
if err != nil {
log.Errorf("Error when creating request: %v")
return nil, err
}
req.SetBasicAuth(uiUser, uiPwd)
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error when calling UI api to get repositories, error: %v", err)
return nil, err
}
if resp.StatusCode != http.StatusOK {
log.Errorf("Unexpected status code: %d", resp.StatusCode)
dump, _ := httputil.DumpResponse(resp, true)
log.Debugf("response: %q", dump)
return nil, fmt.Errorf("Unexpected status code when getting repository list: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Failed to read the response body, error: %v")
return nil, err
}
var repoList []string
err = json.Unmarshal(body, &repoList)
return repoList, err
}

View File

@ -20,11 +20,9 @@ import (
"testing"
"time"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/models"
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
func execUpdate(o orm.Ormer, sql string, params interface{}) error {
@ -660,3 +658,227 @@ func TestDeleteUser(t *testing.T) {
t.Errorf("user is not nil after deletion, user: %+v", user)
}
}
var targetID, policyID, jobID int64
func TestAddRepTarget(t *testing.T) {
target := models.RepTarget{
URL: "127.0.0.1:5000",
Username: "admin",
Password: "admin",
}
//_, err := AddRepTarget(target)
id, err := AddRepTarget(target)
t.Logf("added target, id: %d", id)
if err != nil {
t.Errorf("Error occurred in AddRepTarget: %v", err)
} else {
targetID = id
}
id2 := id + 99
tgt, err := GetRepTarget(id2)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id2)
}
if tgt != nil {
t.Errorf("There should not be a target with id: %d", id2)
}
tgt, err = GetRepTarget(id)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id)
}
if tgt == nil {
t.Errorf("Unable to find a target with id: %d", id)
}
if tgt.URL != "127.0.0.1:5000" {
t.Errorf("Unexpected url in target: %s, expected 127.0.0.1:5000", tgt.URL)
}
if tgt.Username != "admin" {
t.Errorf("Unexpected username in target: %s, expected admin", tgt.Username)
}
}
func TestAddRepPolicy(t *testing.T) {
policy := models.RepPolicy{
ProjectID: 1,
Enabled: 1,
TargetID: targetID,
Description: "whatever",
Name: "mypolicy",
}
id, err := AddRepPolicy(policy)
t.Logf("added policy, id: %d", id)
if err != nil {
t.Errorf("Error occurred in AddRepPolicy: %v", err)
} else {
policyID = id
}
p, err := GetRepPolicy(id)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, id)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", id)
}
if p.Name != "mypolicy" || p.TargetID != targetID || p.Enabled != 1 || p.Description != "whatever" {
t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Enabled: 1, Description: whatever;\n result: Name: %s, TargetID: %d, Enabled: %d, Description: %s",
targetID, p.Name, p.TargetID, p.Enabled, p.Description)
}
var tm time.Time = time.Now().AddDate(0, 0, -1)
if !p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
}
func TestDisableRepPolicy(t *testing.T) {
err := DisableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 1 {
t.Errorf("The Enabled value of replication policy is still 1 after disabled, id: %d", policyID)
}
}
func TestEnableRepPolicy(t *testing.T) {
err := EnableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 0 {
t.Errorf("The Enabled value of replication policy is still 0 after disabled, id: %d", policyID)
}
}
func TestAddRepPolicy2(t *testing.T) {
policy2 := models.RepPolicy{
ProjectID: 3,
Enabled: 0,
TargetID: 3,
Description: "whatever",
Name: "mypolicy",
}
id, err := AddRepPolicy(policy2)
t.Logf("added policy, id: %d", id)
if err != nil {
t.Errorf("Error occurred in AddRepPolicy: %v", err)
}
p, err := GetRepPolicy(id)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, id)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", id)
}
var tm time.Time
if p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
}
func TestAddRepJob(t *testing.T) {
job := models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
}
id, err := AddRepJob(job)
if err != nil {
t.Errorf("Error occurred in AddRepJob: %v", err)
} else {
jobID = id
}
j, err := GetRepJob(id)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", id)
}
if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" {
t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, "+
"but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID)
}
}
func TestUpdateRepJobStatus(t *testing.T) {
err := UpdateRepJobStatus(jobID, models.JobFinished)
if err != nil {
t.Errorf("Error occured in UpdateRepJobStatus, error: %v, id: %d", err, jobID)
return
}
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, jobID)
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", jobID)
}
if j.Status != models.JobFinished {
t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID)
}
}
func TestDeleteRepTarget(t *testing.T) {
err := DeleteRepTarget(targetID)
if err != nil {
t.Errorf("Error occured in DeleteRepTarget: %v, id: %d", err, targetID)
return
}
t.Logf("deleted target, id: %d", targetID)
tgt, err := GetRepTarget(targetID)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, targetID)
}
if tgt != nil {
t.Errorf("Able to find target after deletion, id: %d", targetID)
}
}
func TestDeleteRepPolicy(t *testing.T) {
err := DeleteRepPolicy(policyID)
if err != nil {
t.Errorf("Error occured in DeleteRepPolicy: %v, id: %d", err, policyID)
return
}
t.Logf("delete rep policy, id: %d", policyID)
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occured in GetRepPolicy:%v", err)
}
if p != nil {
t.Errorf("Able to find rep policy after deletion, id: %d", policyID)
}
}
func TestDeleteRepJob(t *testing.T) {
err := DeleteRepJob(jobID)
if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
return
}
t.Logf("deleted rep job, id: %d", jobID)
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err)
}
if j != nil {
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
}
}

113
dao/replication_job.go Normal file
View File

@ -0,0 +1,113 @@
package dao
import (
"fmt"
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
)
func AddRepTarget(target models.RepTarget) (int64, error) {
o := orm.NewOrm()
return o.Insert(&target)
}
func GetRepTarget(id int64) (*models.RepTarget, error) {
o := orm.NewOrm()
t := models.RepTarget{ID: id}
err := o.Read(&t)
if err == orm.ErrNoRows {
return nil, nil
}
return &t, err
}
func DeleteRepTarget(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepTarget{ID: id})
return err
}
func AddRepPolicy(policy models.RepPolicy) (int64, error) {
o := orm.NewOrm()
sqlTpl := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, start_time, creation_time, update_time ) values (?, ?, ?, ?, ?, ?, %s, NOW(), NOW())`
var sql string
if policy.Enabled == 1 {
sql = fmt.Sprintf(sqlTpl, "NOW()")
} else {
sql = fmt.Sprintf(sqlTpl, "NULL")
}
p, err := o.Raw(sql).Prepare()
if err != nil {
return 0, err
}
r, err := p.Exec(policy.Name, policy.ProjectID, policy.TargetID, policy.Enabled, policy.Description, policy.CronStr)
if err != nil {
return 0, err
}
id, err := r.LastInsertId()
return id, err
}
func GetRepPolicy(id int64) (*models.RepPolicy, error) {
o := orm.NewOrm()
p := models.RepPolicy{ID: id}
err := o.Read(&p)
if err == orm.ErrNoRows {
return nil, nil
}
return &p, err
}
func DeleteRepPolicy(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepPolicy{ID: id})
return err
}
func updateRepPolicyEnablement(id int64, enabled int) error {
o := orm.NewOrm()
p := models.RepPolicy{
ID: id,
Enabled: enabled}
num, err := o.Update(&p, "Enabled")
if num == 0 {
err = fmt.Errorf("Failed to update replication policy with id: %d", id)
}
return err
}
func EnableRepPolicy(id int64) error {
return updateRepPolicyEnablement(id, 1)
}
func DisableRepPolicy(id int64) error {
return updateRepPolicyEnablement(id, 0)
}
func AddRepJob(job models.RepJob) (int64, error) {
o := orm.NewOrm()
if len(job.Status) == 0 {
job.Status = models.JobPending
}
return o.Insert(&job)
}
func GetRepJob(id int64) (*models.RepJob, error) {
o := orm.NewOrm()
j := models.RepJob{ID: id}
err := o.Read(&j)
if err == orm.ErrNoRows {
return nil, nil
}
return &j, err
}
func DeleteRepJob(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepJob{ID: id})
return err
}
func UpdateRepJobStatus(id int64, status string) error {
o := orm.NewOrm()
j := models.RepJob{
ID: id,
Status: status,
}
num, err := o.Update(&j, "Status")
if num == 0 {
err = fmt.Errorf("Failed to update replication job with id: %d", id)
}
return err
}

View File

@ -1,5 +1,6 @@
package imgout
/*
import (
"encoding/json"
//"github.com/vmware/harbor/dao"
@ -72,3 +73,4 @@ func (r *Runner) init(je models.JobEntry) error {
r.AddTransition("push-img", job.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, job.JobFinished})
return nil
}
*/

View File

@ -1,32 +1,31 @@
package job
import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"os"
"strconv"
)
var lock chan bool
func Schedule(jobID int64) {
//TODO: introduce jobqueue to better control concurrent job numbers
go HandleRepJob(jobID)
}
const defaultMaxJobs int64 = 10
func init() {
maxJobsEnv := os.Getenv("MAX_CONCURRENT_JOB")
maxJobs, err := strconv.ParseInt(maxJobsEnv, 10, 32)
func HandleRepJob(id int64) {
sm := &JobSM{JobID: id}
err := sm.Init()
if err != nil {
log.Warningf("Failed to parse max job setting, error: %v, the default value: %d will be used", err, defaultMaxJobs)
maxJobs = defaultMaxJobs
log.Errorf("Failed to initialize statemachine, error: %v")
err2 := dao.UpdateRepJobStatus(id, models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
}
return
}
if sm.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled, will cancel the job")
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
} else {
sm.Start(models.JobRunning)
}
lock = make(chan bool, maxJobs)
}
func Schedule(job models.JobEntry) {
log.Infof("job: %d will be scheduled", job.ID)
//TODO: add support for cron string when needed.
go func() {
lock <- true
defer func() { <-lock }()
log.Infof("running job: %d", job.ID)
run(job)
}()
}

73
job/statehandlers.go Normal file
View File

@ -0,0 +1,73 @@
package job
import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"time"
)
// StateHandler handles transition, it associates with each state, will be called when
// SM enters and exits a state during a transition.
type StateHandler interface {
// Enter returns the next state, if it returns empty string the SM will hold the current state or
// or decide the next state.
Enter() (string, error)
//Exit should be idempotent
Exit() error
}
type DummyHandler struct {
JobID int64
}
func (dh DummyHandler) Enter() (string, error) {
return "", nil
}
func (dh DummyHandler) Exit() error {
return nil
}
type StatusUpdater struct {
DummyHandler
State string
}
func (su StatusUpdater) Enter() (string, error) {
err := dao.UpdateRepJobStatus(su.JobID, su.State)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
}
var next string = models.JobContinue
if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished {
next = ""
}
return next, err
}
type ImgPuller struct {
DummyHandler
img string
logger Logger
}
func (ip ImgPuller) Enter() (string, error) {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return "push-img", nil
}
type ImgPusher struct {
DummyHandler
targetURL string
logger Logger
}
func (ip ImgPusher) Enter() (string, error) {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return models.JobContinue, nil
}

View File

@ -3,59 +3,22 @@ package job
import (
"fmt"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"os"
"sync"
)
// StateHandler handles transition, it associates with each state, will be called when
// SM enters and exits a state during a transition.
type StateHandler interface {
// Enter returns the next state, if it returns empty string the SM will hold the current state or
// or decide the next state.
Enter() (string, error)
//Exit should be idempotent
Exit() error
type RepJobParm struct {
LocalRegURL string
TargetURL string
TargetUsername string
TargetPassword string
Repository string
Enabled int
Operation string
}
type DummyHandler struct {
JobID int64
}
func (dh DummyHandler) Enter() (string, error) {
return "", nil
}
func (dh DummyHandler) Exit() error {
return nil
}
type StatusUpdater struct {
DummyHandler
State string
}
func (su StatusUpdater) Enter() (string, error) {
err := dao.UpdateJobStatus(su.JobID, su.State)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
}
var next string = JobContinue
if su.State == JobStopped || su.State == JobError || su.State == JobFinished {
next = ""
}
return next, err
}
const (
JobPending string = "pending"
JobRunning string = "running"
JobError string = "error"
JobStopped string = "stopped"
JobFinished string = "finished"
// statemachine will move to next possible state based on trasition table
JobContinue string = "_continue"
)
type JobSM struct {
JobID int64
CurrentState string
@ -64,8 +27,10 @@ type JobSM struct {
ForcedStates map[string]struct{}
Transitions map[string]map[string]struct{}
Handlers map[string]StateHandler
lock *sync.Mutex
desiredState string
Logger Logger
Parms *RepJobParm
lock *sync.Mutex
}
// EnsterState transit the statemachine from the current state to the state in parameter.
@ -87,7 +52,7 @@ func (sm *JobSM) EnterState(s string) (string, error) {
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next string = JobContinue
var next string = models.JobContinue
var err error
if ok {
if next, err = enterHandler.Enter(); err != nil {
@ -115,14 +80,14 @@ func (sm *JobSM) Start(s string) {
sm.setDesiredState("")
continue
}
if n == JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
for n = range sm.Transitions[sm.CurrentState] {
break
}
log.Debugf("Continue to state: %s", n)
continue
}
if n == JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState]))
err = fmt.Errorf("Unable to continue")
break
@ -132,7 +97,7 @@ func (sm *JobSM) Start(s string) {
}
if err != nil {
log.Warningf("The statemachin will enter error state due to error: %v", err)
sm.EnterState(JobError)
sm.EnterState(models.JobError)
}
}
@ -154,7 +119,7 @@ func (sm *JobSM) RemoveTransition(from string, to string) {
}
func (sm *JobSM) Stop() {
sm.setDesiredState(JobStopped)
sm.setDesiredState(models.JobStopped)
}
func (sm *JobSM) getDesiredState() string {
@ -169,12 +134,60 @@ func (sm *JobSM) setDesiredState(s string) {
sm.desiredState = s
}
func (sm *JobSM) InitJobSM() {
func (sm *JobSM) Init() error {
//init parms
regURL := os.Getenv("LOCAL_REGISTRY_URL")
if len(regURL) == 0 {
regURL = "http://registry:5000/"
}
job, err := dao.GetRepJob(sm.JobID)
if err != nil {
return fmt.Errorf("Failed to get job, error: %v", err)
}
if job == nil {
return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID)
}
policy, err := dao.GetRepPolicy(job.PolicyID)
if err != nil {
return fmt.Errorf("Failed to get policy, error: %v", err)
}
if policy == nil {
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
}
sm.Parms = &RepJobParm{
LocalRegURL: regURL,
Repository: job.Repository,
Enabled: policy.Enabled,
Operation: job.Operation,
}
if policy.Enabled == 0 {
//handler will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
if err != nil {
return fmt.Errorf("Failed to get target, error: %v", err)
}
if target == nil {
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
}
sm.Parms.TargetURL = target.URL
sm.Parms.TargetUsername = target.Username
sm.Parms.TargetPassword = target.Password
//init states handlers
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.CurrentState = JobPending
sm.AddTransition(JobPending, JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, JobRunning})
sm.Handlers[JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobError}
sm.Handlers[JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, JobStopped}
sm.Logger = Logger{sm.JobID}
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
if sm.Parms.Operation == models.RepOpTransfer {
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
//only handle on target for now
sm.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: DummyHandler{JobID: sm.JobID}, targetURL: sm.Parms.TargetURL, logger: sm.Logger})
sm.AddTransition("push-img", models.JobFinished, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
}
return nil
}

View File

@ -3,5 +3,8 @@ export MYSQL_PORT=3306
export MYSQL_USR=root
export MYSQL_PWD=root123
export LOG_LEVEL=debug
export UI_URL=http://127.0.0.1/
export UI_USR=admin
export UI_PWD=Harbor12345
./jobservice

1
jobservice/newtest.json Normal file
View File

@ -0,0 +1 @@
{"policy_id": 1}

2
jobservice/populate.sql Normal file
View File

@ -0,0 +1,2 @@
insert into replication_target (name, url, username, password) values ('test', '192.168.0.2:5000', 'testuser', 'passw0rd');
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());

View File

@ -7,5 +7,5 @@ import (
)
func initRouters() {
beego.Router("/api/jobs/?:id", &api.JobAPI{})
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
}

11
models/base.go Normal file
View File

@ -0,0 +1,11 @@
package models
import (
"github.com/astaxie/beego/orm"
)
func init() {
orm.RegisterModel(new(RepTarget),
new(RepPolicy),
new(RepJob))
}

65
models/replication_job.go Normal file
View File

@ -0,0 +1,65 @@
package models
import (
"time"
)
const (
JobPending string = "pending"
JobRunning string = "running"
JobError string = "error"
JobStopped string = "stopped"
JobFinished string = "finished"
JobCanceled string = "canceled"
// statemachine will move to next possible state based on trasition table
JobContinue string = "_continue"
RepOpTransfer string = "transfer"
RepOpDelete string = "delete"
)
type RepPolicy struct {
ID int64 `orm:"column(id)" json:"id"`
ProjectID int64 `orm:"column(project_id)" json:"project_id"`
TargetID int64 `orm:"column(target_id)" json:"target_id"`
Name string `orm:"column(name)" json:"name"`
Target RepTarget `orm:"-" json:"target"`
Enabled int `orm:"column(enabled)" json:"enabled"`
Description string `orm:"column(description)" json:"description"`
CronStr string `orm:"column(cron_str)" json:"cron_str"`
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
type RepJob struct {
ID int64 `orm:"column(id)" json:"id"`
Status string `orm:"column(status)" json:"status"`
Repository string `orm:"column(repository)" json:"repository"`
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
Operation string `orm:"column(operation)" json:"operation"`
Policy RepPolicy `orm:"-" json:"policy"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
type RepTarget struct {
ID int64 `orm:"column(id)" json:"id"`
URL string `orm:"column(url)" json:"url"`
Name string `orm:"column(name)" json:"name"`
Username string `orm:"column(username)" json:"username"`
Password string `orm:"column(password)" json:"password"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
func (rt *RepTarget) TableName() string {
return "replication_target"
}
func (rj *RepJob) TableName() string {
return "replication_job"
}
func (rp *RepPolicy) TableName() string {
return "replication_policy"
}