mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-03 06:28:06 +01:00
Revert "introduce workerpool to handle jobs"
This commit is contained in:
parent
6d6a353c1f
commit
d7c0bb66e2
@ -103,53 +103,6 @@ create table access_log (
|
||||
FOREIGN KEY (project_id) REFERENCES project (project_id)
|
||||
);
|
||||
|
||||
create table replication_policy (
|
||||
id int NOT NULL AUTO_INCREMENT,
|
||||
name varchar(256),
|
||||
project_id int NOT NULL,
|
||||
target_id int NOT NULL,
|
||||
enabled tinyint(1) NOT NULL DEFAULT 1,
|
||||
description text,
|
||||
cron_str varchar(256),
|
||||
start_time timestamp,
|
||||
creation_time timestamp default CURRENT_TIMESTAMP,
|
||||
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
create table replication_target (
|
||||
id int NOT NULL AUTO_INCREMENT,
|
||||
name varchar(64),
|
||||
url varchar(64),
|
||||
username varchar(40),
|
||||
password varchar(40),
|
||||
creation_time timestamp default CURRENT_TIMESTAMP,
|
||||
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
create table replication_job (
|
||||
id int NOT NULL AUTO_INCREMENT,
|
||||
status varchar(64) NOT NULL,
|
||||
policy_id int NOT NULL,
|
||||
repository varchar(256) NOT NULL,
|
||||
operation varchar(64) NOT NULL,
|
||||
creation_time timestamp default CURRENT_TIMESTAMP,
|
||||
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
create table job_log (
|
||||
log_id int NOT NULL AUTO_INCREMENT,
|
||||
job_id int NOT NULL,
|
||||
level varchar(64) NOT NULL,
|
||||
message text,
|
||||
creation_time timestamp,
|
||||
update_time timestamp,
|
||||
PRIMARY KEY (log_id),
|
||||
FOREIGN KEY (job_id) REFERENCES replication_job (id)
|
||||
);
|
||||
|
||||
create table properties (
|
||||
k varchar(64) NOT NULL,
|
||||
v varchar(128) NOT NULL,
|
||||
|
87
api/job.go
87
api/job.go
@ -1,87 +0,0 @@
|
||||
package api
|
||||
|
||||
/*
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type JobAPI struct {
|
||||
BaseAPI
|
||||
}
|
||||
|
||||
|
||||
func (ja *JobAPI) Post() {
|
||||
var je models.JobEntry
|
||||
ja.DecodeJSONReq(&je)
|
||||
res, err := json.Marshal(je.Options)
|
||||
if !job.RunnerExists(je.Type) {
|
||||
log.Errorf("runner for type %s is not registered", je.Type)
|
||||
ja.RenderError(http.StatusBadRequest, fmt.Sprintf("runner for type %s is not registered", je.Type))
|
||||
return
|
||||
}
|
||||
je.OptionsStr = string(res)
|
||||
if err != nil {
|
||||
log.Warningf("Error marshaling options: %v", err)
|
||||
}
|
||||
res, err = json.Marshal(je.Parms)
|
||||
je.ParmsStr = string(res)
|
||||
if err != nil {
|
||||
log.Warningf("Error marshaling parms: %v", err)
|
||||
}
|
||||
jobID, err := dao.AddJob(je)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to add job to DB, error: %v", err)
|
||||
ja.RenderError(http.StatusInternalServerError, "Failed to add job")
|
||||
return
|
||||
}
|
||||
je.ID = jobID
|
||||
log.Debugf("job Id:%d, type: %s", je.ID, je.Type)
|
||||
job.Schedule(je)
|
||||
}
|
||||
|
||||
func (ja *JobAPI) Get() {
|
||||
idStr := ja.Ctx.Input.Param(":id")
|
||||
if len(idStr) > 0 {
|
||||
jobID, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse job id in url: %s", idStr)
|
||||
ja.RenderError(http.StatusBadRequest, "invalid job id")
|
||||
return
|
||||
}
|
||||
je, err := dao.GetJob(jobID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to query job from db, error: %v", err)
|
||||
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
|
||||
return
|
||||
}
|
||||
if je == nil {
|
||||
log.Errorf("job does not exist, id: %d", jobID)
|
||||
ja.RenderError(http.StatusNotFound, "")
|
||||
return
|
||||
}
|
||||
logs, err := dao.GetJobLogs(jobID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get job logs, error: %v", err)
|
||||
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
|
||||
return
|
||||
}
|
||||
je.Logs = logs
|
||||
ja.Data["json"] = je
|
||||
} else {
|
||||
jobs, err := dao.ListJobs()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to list jobs, error:%v", err)
|
||||
ja.RenderError(http.StatusInternalServerError, "Failed to query job")
|
||||
}
|
||||
log.Debugf("jobs: %v", jobs)
|
||||
ja.Data["json"] = jobs
|
||||
}
|
||||
ja.ServeJSON()
|
||||
}*/
|
@ -1,106 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ReplicationJob struct {
|
||||
BaseAPI
|
||||
}
|
||||
|
||||
type ReplicationReq struct {
|
||||
PolicyID int64 `json:"policy_id"`
|
||||
}
|
||||
|
||||
func (rj *ReplicationJob) Post() {
|
||||
var data ReplicationReq
|
||||
rj.DecodeJSONReq(&data)
|
||||
log.Debugf("data: %+v", data)
|
||||
p, err := dao.GetRepPolicy(data.PolicyID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get policy, error: %v", err)
|
||||
rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID))
|
||||
return
|
||||
}
|
||||
repoList, err := getRepoList(p.ProjectID)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err)
|
||||
rj.RenderError(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
log.Debugf("repo list: %v", repoList)
|
||||
for _, repo := range repoList {
|
||||
j := models.RepJob{
|
||||
Repository: repo,
|
||||
PolicyID: data.PolicyID,
|
||||
Operation: models.RepOpTransfer,
|
||||
}
|
||||
log.Debugf("Creating job for repo: %s, policy: %d", repo, data.PolicyID)
|
||||
id, err := dao.AddRepJob(j)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to insert job record, error: %v", err)
|
||||
rj.RenderError(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
log.Debugf("Send job to scheduler, job id: %d", id)
|
||||
job.Schedule(id)
|
||||
}
|
||||
}
|
||||
|
||||
// calls the api from UI to get repo list
|
||||
func getRepoList(projectID int64) ([]string, error) {
|
||||
uiURL := os.Getenv("UI_URL")
|
||||
if len(uiURL) == 0 {
|
||||
uiURL = "ui"
|
||||
}
|
||||
if !strings.HasSuffix(uiURL, "/") {
|
||||
uiURL += "/"
|
||||
}
|
||||
//TODO:Use secret key instead
|
||||
uiUser := os.Getenv("UI_USR")
|
||||
if len(uiUser) == 0 {
|
||||
uiUser = "admin"
|
||||
}
|
||||
uiPwd := os.Getenv("UI_PWD")
|
||||
if len(uiPwd) == 0 {
|
||||
uiPwd = "Harbor12345"
|
||||
}
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest("GET", uiURL+"api/repositories?project_id="+strconv.Itoa(int(projectID)), nil)
|
||||
if err != nil {
|
||||
log.Errorf("Error when creating request: %v")
|
||||
return nil, err
|
||||
}
|
||||
req.SetBasicAuth(uiUser, uiPwd)
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Errorf("Error when calling UI api to get repositories, error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Errorf("Unexpected status code: %d", resp.StatusCode)
|
||||
dump, _ := httputil.DumpResponse(resp, true)
|
||||
log.Debugf("response: %q", dump)
|
||||
return nil, fmt.Errorf("Unexpected status code when getting repository list: %d", resp.StatusCode)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to read the response body, error: %v")
|
||||
return nil, err
|
||||
}
|
||||
var repoList []string
|
||||
err = json.Unmarshal(body, &repoList)
|
||||
return repoList, err
|
||||
}
|
230
dao/dao_test.go
230
dao/dao_test.go
@ -20,9 +20,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
|
||||
"github.com/vmware/harbor/models"
|
||||
|
||||
"github.com/astaxie/beego/orm"
|
||||
)
|
||||
|
||||
func execUpdate(o orm.Ormer, sql string, params interface{}) error {
|
||||
@ -733,227 +735,3 @@ func TestDeleteUser(t *testing.T) {
|
||||
t.Errorf("user is not nil after deletion, user: %+v", user)
|
||||
}
|
||||
}
|
||||
|
||||
var targetID, policyID, jobID int64
|
||||
|
||||
func TestAddRepTarget(t *testing.T) {
|
||||
target := models.RepTarget{
|
||||
URL: "127.0.0.1:5000",
|
||||
Username: "admin",
|
||||
Password: "admin",
|
||||
}
|
||||
//_, err := AddRepTarget(target)
|
||||
id, err := AddRepTarget(target)
|
||||
t.Logf("added target, id: %d", id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in AddRepTarget: %v", err)
|
||||
} else {
|
||||
targetID = id
|
||||
}
|
||||
id2 := id + 99
|
||||
tgt, err := GetRepTarget(id2)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id2)
|
||||
}
|
||||
if tgt != nil {
|
||||
t.Errorf("There should not be a target with id: %d", id2)
|
||||
}
|
||||
tgt, err = GetRepTarget(id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id)
|
||||
}
|
||||
if tgt == nil {
|
||||
t.Errorf("Unable to find a target with id: %d", id)
|
||||
}
|
||||
if tgt.URL != "127.0.0.1:5000" {
|
||||
t.Errorf("Unexpected url in target: %s, expected 127.0.0.1:5000", tgt.URL)
|
||||
}
|
||||
if tgt.Username != "admin" {
|
||||
t.Errorf("Unexpected username in target: %s, expected admin", tgt.Username)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddRepPolicy(t *testing.T) {
|
||||
policy := models.RepPolicy{
|
||||
ProjectID: 1,
|
||||
Enabled: 1,
|
||||
TargetID: targetID,
|
||||
Description: "whatever",
|
||||
Name: "mypolicy",
|
||||
}
|
||||
id, err := AddRepPolicy(policy)
|
||||
t.Logf("added policy, id: %d", id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in AddRepPolicy: %v", err)
|
||||
} else {
|
||||
policyID = id
|
||||
}
|
||||
p, err := GetRepPolicy(id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, id)
|
||||
}
|
||||
if p == nil {
|
||||
t.Errorf("Unable to find a policy with id: %d", id)
|
||||
}
|
||||
|
||||
if p.Name != "mypolicy" || p.TargetID != targetID || p.Enabled != 1 || p.Description != "whatever" {
|
||||
t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Enabled: 1, Description: whatever;\n result: Name: %s, TargetID: %d, Enabled: %d, Description: %s",
|
||||
targetID, p.Name, p.TargetID, p.Enabled, p.Description)
|
||||
}
|
||||
var tm time.Time = time.Now().AddDate(0, 0, -1)
|
||||
if !p.StartTime.After(tm) {
|
||||
t.Errorf("Unexpected start_time: %v", p.StartTime)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestDisableRepPolicy(t *testing.T) {
|
||||
err := DisableRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to disable policy, id: %d", policyID)
|
||||
}
|
||||
p, err := GetRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
|
||||
}
|
||||
if p == nil {
|
||||
t.Errorf("Unable to find a policy with id: %d", policyID)
|
||||
}
|
||||
if p.Enabled == 1 {
|
||||
t.Errorf("The Enabled value of replication policy is still 1 after disabled, id: %d", policyID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnableRepPolicy(t *testing.T) {
|
||||
err := EnableRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to disable policy, id: %d", policyID)
|
||||
}
|
||||
p, err := GetRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
|
||||
}
|
||||
if p == nil {
|
||||
t.Errorf("Unable to find a policy with id: %d", policyID)
|
||||
}
|
||||
if p.Enabled == 0 {
|
||||
t.Errorf("The Enabled value of replication policy is still 0 after disabled, id: %d", policyID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddRepPolicy2(t *testing.T) {
|
||||
policy2 := models.RepPolicy{
|
||||
ProjectID: 3,
|
||||
Enabled: 0,
|
||||
TargetID: 3,
|
||||
Description: "whatever",
|
||||
Name: "mypolicy",
|
||||
}
|
||||
id, err := AddRepPolicy(policy2)
|
||||
t.Logf("added policy, id: %d", id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in AddRepPolicy: %v", err)
|
||||
}
|
||||
p, err := GetRepPolicy(id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, id)
|
||||
}
|
||||
if p == nil {
|
||||
t.Errorf("Unable to find a policy with id: %d", id)
|
||||
}
|
||||
var tm time.Time
|
||||
if p.StartTime.After(tm) {
|
||||
t.Errorf("Unexpected start_time: %v", p.StartTime)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddRepJob(t *testing.T) {
|
||||
job := models.RepJob{
|
||||
Repository: "library/ubuntu",
|
||||
PolicyID: policyID,
|
||||
Operation: "transfer",
|
||||
}
|
||||
id, err := AddRepJob(job)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in AddRepJob: %v", err)
|
||||
} else {
|
||||
jobID = id
|
||||
}
|
||||
j, err := GetRepJob(id)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)
|
||||
}
|
||||
if j == nil {
|
||||
t.Errorf("Unable to find a job with id: %d", id)
|
||||
}
|
||||
if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" {
|
||||
t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, "+
|
||||
"but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateRepJobStatus(t *testing.T) {
|
||||
err := UpdateRepJobStatus(jobID, models.JobFinished)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in UpdateRepJobStatus, error: %v, id: %d", err, jobID)
|
||||
return
|
||||
}
|
||||
j, err := GetRepJob(jobID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, jobID)
|
||||
}
|
||||
if j == nil {
|
||||
t.Errorf("Unable to find a job with id: %d", jobID)
|
||||
}
|
||||
if j.Status != models.JobFinished {
|
||||
t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteRepTarget(t *testing.T) {
|
||||
err := DeleteRepTarget(targetID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in DeleteRepTarget: %v, id: %d", err, targetID)
|
||||
return
|
||||
}
|
||||
t.Logf("deleted target, id: %d", targetID)
|
||||
tgt, err := GetRepTarget(targetID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, targetID)
|
||||
}
|
||||
if tgt != nil {
|
||||
t.Errorf("Able to find target after deletion, id: %d", targetID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteRepPolicy(t *testing.T) {
|
||||
err := DeleteRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in DeleteRepPolicy: %v, id: %d", err, policyID)
|
||||
return
|
||||
}
|
||||
t.Logf("delete rep policy, id: %d", policyID)
|
||||
p, err := GetRepPolicy(policyID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in GetRepPolicy:%v", err)
|
||||
}
|
||||
if p != nil {
|
||||
t.Errorf("Able to find rep policy after deletion, id: %d", policyID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteRepJob(t *testing.T) {
|
||||
err := DeleteRepJob(jobID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
|
||||
return
|
||||
}
|
||||
t.Logf("deleted rep job, id: %d", jobID)
|
||||
j, err := GetRepJob(jobID)
|
||||
if err != nil {
|
||||
t.Errorf("Error occured in GetRepJob:%v", err)
|
||||
}
|
||||
if j != nil {
|
||||
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
|
||||
}
|
||||
}
|
||||
|
76
dao/job.go
76
dao/job.go
@ -1,76 +0,0 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/vmware/harbor/models"
|
||||
// "github.com/vmware/harbor/utils/log"
|
||||
)
|
||||
|
||||
func AddJob(entry models.JobEntry) (int64, error) {
|
||||
|
||||
sql := `insert into job (job_type, status, options, parms, cron_str, creation_time, update_time) values (?,"pending",?,?,?,NOW(),NOW())`
|
||||
o := orm.NewOrm()
|
||||
p, err := o.Raw(sql).Prepare()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r, err := p.Exec(entry.Type, entry.OptionsStr, entry.ParmsStr, entry.CronStr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
id, err := r.LastInsertId()
|
||||
return id, err
|
||||
}
|
||||
|
||||
func AddJobLog(id int64, level string, message string) error {
|
||||
sql := `insert into job_log (job_id, level, message, creation_time, update_time) values (?, ?, ?, NOW(), NOW())`
|
||||
//log.Debugf("trying to add a log for job:%d", id)
|
||||
o := orm.NewOrm()
|
||||
p, err := o.Raw(sql).Prepare()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = p.Exec(id, level, message)
|
||||
return err
|
||||
}
|
||||
|
||||
func UpdateJobStatus(id int64, status string) error {
|
||||
o := orm.NewOrm()
|
||||
sql := "update job set status=?, update_time=NOW() where job_id=?"
|
||||
_, err := o.Raw(sql, status, id).Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
func ListJobs() ([]models.JobEntry, error) {
|
||||
o := orm.NewOrm()
|
||||
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j`
|
||||
var res []models.JobEntry
|
||||
_, err := o.Raw(sql).QueryRows(&res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
func GetJob(id int64) (*models.JobEntry, error) {
|
||||
o := orm.NewOrm()
|
||||
sql := `select j.job_id, j.job_type, j.status, j.enabled, j.creation_time, j.update_time from job j where j.job_id = ?`
|
||||
var res []models.JobEntry
|
||||
p := make([]interface{}, 1)
|
||||
p = append(p, id)
|
||||
n, err := o.Raw(sql, p).QueryRows(&res)
|
||||
if n == 0 {
|
||||
return nil, err
|
||||
}
|
||||
return &res[0], err
|
||||
}
|
||||
|
||||
func GetJobLogs(jobID int64) ([]models.JobLog, error) {
|
||||
o := orm.NewOrm()
|
||||
var res []models.JobLog
|
||||
p := make([]interface{}, 1)
|
||||
p = append(p, jobID)
|
||||
sql := `select l.log_id, l.job_id, l.level, l.message, l.creation_time, l.update_time from job_log l where l.job_id = ?`
|
||||
_, err := o.Raw(sql, p).QueryRows(&res)
|
||||
return res, err
|
||||
}
|
@ -1,113 +0,0 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/astaxie/beego/orm"
|
||||
"github.com/vmware/harbor/models"
|
||||
)
|
||||
|
||||
func AddRepTarget(target models.RepTarget) (int64, error) {
|
||||
o := orm.NewOrm()
|
||||
return o.Insert(&target)
|
||||
}
|
||||
func GetRepTarget(id int64) (*models.RepTarget, error) {
|
||||
o := orm.NewOrm()
|
||||
t := models.RepTarget{ID: id}
|
||||
err := o.Read(&t)
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return &t, err
|
||||
}
|
||||
func DeleteRepTarget(id int64) error {
|
||||
o := orm.NewOrm()
|
||||
_, err := o.Delete(&models.RepTarget{ID: id})
|
||||
return err
|
||||
}
|
||||
|
||||
func AddRepPolicy(policy models.RepPolicy) (int64, error) {
|
||||
o := orm.NewOrm()
|
||||
sqlTpl := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, start_time, creation_time, update_time ) values (?, ?, ?, ?, ?, ?, %s, NOW(), NOW())`
|
||||
var sql string
|
||||
if policy.Enabled == 1 {
|
||||
sql = fmt.Sprintf(sqlTpl, "NOW()")
|
||||
} else {
|
||||
sql = fmt.Sprintf(sqlTpl, "NULL")
|
||||
}
|
||||
p, err := o.Raw(sql).Prepare()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r, err := p.Exec(policy.Name, policy.ProjectID, policy.TargetID, policy.Enabled, policy.Description, policy.CronStr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
id, err := r.LastInsertId()
|
||||
return id, err
|
||||
}
|
||||
func GetRepPolicy(id int64) (*models.RepPolicy, error) {
|
||||
o := orm.NewOrm()
|
||||
p := models.RepPolicy{ID: id}
|
||||
err := o.Read(&p)
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return &p, err
|
||||
}
|
||||
func DeleteRepPolicy(id int64) error {
|
||||
o := orm.NewOrm()
|
||||
_, err := o.Delete(&models.RepPolicy{ID: id})
|
||||
return err
|
||||
}
|
||||
func updateRepPolicyEnablement(id int64, enabled int) error {
|
||||
o := orm.NewOrm()
|
||||
p := models.RepPolicy{
|
||||
ID: id,
|
||||
Enabled: enabled}
|
||||
num, err := o.Update(&p, "Enabled")
|
||||
if num == 0 {
|
||||
err = fmt.Errorf("Failed to update replication policy with id: %d", id)
|
||||
}
|
||||
return err
|
||||
}
|
||||
func EnableRepPolicy(id int64) error {
|
||||
return updateRepPolicyEnablement(id, 1)
|
||||
}
|
||||
|
||||
func DisableRepPolicy(id int64) error {
|
||||
return updateRepPolicyEnablement(id, 0)
|
||||
}
|
||||
|
||||
func AddRepJob(job models.RepJob) (int64, error) {
|
||||
o := orm.NewOrm()
|
||||
if len(job.Status) == 0 {
|
||||
job.Status = models.JobPending
|
||||
}
|
||||
return o.Insert(&job)
|
||||
}
|
||||
func GetRepJob(id int64) (*models.RepJob, error) {
|
||||
o := orm.NewOrm()
|
||||
j := models.RepJob{ID: id}
|
||||
err := o.Read(&j)
|
||||
if err == orm.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return &j, err
|
||||
}
|
||||
func DeleteRepJob(id int64) error {
|
||||
o := orm.NewOrm()
|
||||
_, err := o.Delete(&models.RepJob{ID: id})
|
||||
return err
|
||||
}
|
||||
func UpdateRepJobStatus(id int64, status string) error {
|
||||
o := orm.NewOrm()
|
||||
j := models.RepJob{
|
||||
ID: id,
|
||||
Status: status,
|
||||
}
|
||||
num, err := o.Update(&j, "Status")
|
||||
if num == 0 {
|
||||
err = fmt.Errorf("Failed to update replication job with id: %d", id)
|
||||
}
|
||||
return err
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
package imgout
|
||||
|
||||
type ImgOutParm struct {
|
||||
Secret string `json:"secret"`
|
||||
Image string `json:"image"`
|
||||
Targets []*RegistryInfo `json:"targets"`
|
||||
}
|
||||
|
||||
type RegistryInfo struct {
|
||||
URL string `json:"url"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
package imgout
|
||||
|
||||
/*
|
||||
import (
|
||||
"encoding/json"
|
||||
//"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job"
|
||||
"github.com/vmware/harbor/models"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
jobType = "transfer_img_out"
|
||||
)
|
||||
|
||||
type Runner struct {
|
||||
job.JobSM
|
||||
Logger job.Logger
|
||||
parm ImgOutParm
|
||||
}
|
||||
|
||||
type ImgPuller struct {
|
||||
job.DummyHandler
|
||||
img string
|
||||
logger job.Logger
|
||||
}
|
||||
|
||||
func (ip ImgPuller) Enter() (string, error) {
|
||||
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
|
||||
time.Sleep(30 * time.Second)
|
||||
ip.logger.Infof("wake up from sleep....")
|
||||
return "push-img", nil
|
||||
}
|
||||
|
||||
type ImgPusher struct {
|
||||
job.DummyHandler
|
||||
targetURL string
|
||||
logger job.Logger
|
||||
}
|
||||
|
||||
func (ip ImgPusher) Enter() (string, error) {
|
||||
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
|
||||
time.Sleep(30 * time.Second)
|
||||
ip.logger.Infof("wake up from sleep....")
|
||||
return job.JobContinue, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
job.Register(jobType, Runner{})
|
||||
}
|
||||
|
||||
func (r Runner) Run(je models.JobEntry) error {
|
||||
err := r.init(je)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Start(job.JobRunning)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) init(je models.JobEntry) error {
|
||||
r.JobID = je.ID
|
||||
r.InitJobSM()
|
||||
err := json.Unmarshal([]byte(je.ParmsStr), &r.parm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Logger = job.Logger{je.ID}
|
||||
r.AddTransition(job.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger})
|
||||
//only handle on target for now
|
||||
url := r.parm.Targets[0].URL
|
||||
r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger})
|
||||
r.AddTransition("push-img", job.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, job.JobFinished})
|
||||
return nil
|
||||
}
|
||||
*/
|
@ -1,38 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
)
|
||||
|
||||
const (
|
||||
INFO = "info"
|
||||
WARN = "warning"
|
||||
ERR = "error"
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
ID int64
|
||||
}
|
||||
|
||||
func (l *Logger) Infof(format string, v ...interface{}) {
|
||||
err := dao.AddJobLog(l.ID, INFO, fmt.Sprintf(format, v...))
|
||||
if err != nil {
|
||||
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Logger) Warningf(format string, v ...interface{}) {
|
||||
err := dao.AddJobLog(l.ID, WARN, fmt.Sprintf(format, v...))
|
||||
if err != nil {
|
||||
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Logger) Errorf(format string, v ...interface{}) {
|
||||
err := dao.AddJobLog(l.ID, ERR, fmt.Sprintf(format, v...))
|
||||
if err != nil {
|
||||
log.Warningf("Failed to add job log, id: %d, error: %v", l.ID, err)
|
||||
}
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type JobRunner interface {
|
||||
Run(je models.JobEntry) error
|
||||
}
|
||||
|
||||
var runners map[string]*JobRunner = make(map[string]*JobRunner)
|
||||
var runnerLock = &sync.Mutex{}
|
||||
|
||||
func Register(jobType string, runner JobRunner) {
|
||||
runnerLock.Lock()
|
||||
defer runnerLock.Unlock()
|
||||
runners[jobType] = &runner
|
||||
log.Debugf("runnter for job type:%s has been registered", jobType)
|
||||
}
|
||||
|
||||
func RunnerExists(jobType string) bool {
|
||||
_, ok := runners[jobType]
|
||||
return ok
|
||||
}
|
||||
|
||||
func run(je models.JobEntry) error {
|
||||
runner, ok := runners[je.Type]
|
||||
if !ok {
|
||||
return fmt.Errorf("Runner for job type: %s does not exist")
|
||||
}
|
||||
(*runner).Run(je)
|
||||
return nil
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
)
|
||||
|
||||
var JobQueue chan int64 = make(chan int64)
|
||||
|
||||
func Schedule(jobID int64) {
|
||||
JobQueue <- jobID
|
||||
}
|
||||
|
||||
func HandleRepJob(id int64) {
|
||||
sm := &JobSM{JobID: id}
|
||||
err := sm.Init()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to initialize statemachine, error: %v")
|
||||
err2 := dao.UpdateRepJobStatus(id, models.JobError)
|
||||
if err2 != nil {
|
||||
log.Errorf("Failed to update job status to ERROR, error:%v", err2)
|
||||
}
|
||||
return
|
||||
}
|
||||
if sm.Parms.Enabled == 0 {
|
||||
log.Debugf("The policy of job:%d is disabled, will cancel the job")
|
||||
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
|
||||
} else {
|
||||
sm.Start(models.JobRunning)
|
||||
}
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// StateHandler handles transition, it associates with each state, will be called when
|
||||
// SM enters and exits a state during a transition.
|
||||
type StateHandler interface {
|
||||
// Enter returns the next state, if it returns empty string the SM will hold the current state or
|
||||
// or decide the next state.
|
||||
Enter() (string, error)
|
||||
//Exit should be idempotent
|
||||
Exit() error
|
||||
}
|
||||
|
||||
type DummyHandler struct {
|
||||
JobID int64
|
||||
}
|
||||
|
||||
func (dh DummyHandler) Enter() (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (dh DummyHandler) Exit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type StatusUpdater struct {
|
||||
DummyHandler
|
||||
State string
|
||||
}
|
||||
|
||||
func (su StatusUpdater) Enter() (string, error) {
|
||||
err := dao.UpdateRepJobStatus(su.JobID, su.State)
|
||||
if err != nil {
|
||||
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
|
||||
}
|
||||
var next string = models.JobContinue
|
||||
if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished {
|
||||
next = ""
|
||||
}
|
||||
return next, err
|
||||
}
|
||||
|
||||
type ImgPuller struct {
|
||||
DummyHandler
|
||||
img string
|
||||
logger Logger
|
||||
}
|
||||
|
||||
func (ip ImgPuller) Enter() (string, error) {
|
||||
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
|
||||
time.Sleep(30 * time.Second)
|
||||
ip.logger.Infof("wake up from sleep....")
|
||||
return "push-img", nil
|
||||
}
|
||||
|
||||
type ImgPusher struct {
|
||||
DummyHandler
|
||||
targetURL string
|
||||
logger Logger
|
||||
}
|
||||
|
||||
func (ip ImgPusher) Enter() (string, error) {
|
||||
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
|
||||
time.Sleep(30 * time.Second)
|
||||
ip.logger.Infof("wake up from sleep....")
|
||||
return models.JobContinue, nil
|
||||
}
|
@ -1,193 +0,0 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/models"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type RepJobParm struct {
|
||||
LocalRegURL string
|
||||
TargetURL string
|
||||
TargetUsername string
|
||||
TargetPassword string
|
||||
Repository string
|
||||
Enabled int
|
||||
Operation string
|
||||
}
|
||||
|
||||
type JobSM struct {
|
||||
JobID int64
|
||||
CurrentState string
|
||||
PreviousState string
|
||||
//The states that don't have to exist in transition map, such as "Error", "Canceled"
|
||||
ForcedStates map[string]struct{}
|
||||
Transitions map[string]map[string]struct{}
|
||||
Handlers map[string]StateHandler
|
||||
desiredState string
|
||||
Logger Logger
|
||||
Parms *RepJobParm
|
||||
lock *sync.Mutex
|
||||
}
|
||||
|
||||
// EnsterState transit the statemachine from the current state to the state in parameter.
|
||||
// It returns the next state the statemachine should tranit to.
|
||||
func (sm *JobSM) EnterState(s string) (string, error) {
|
||||
log.Debugf("Trying to transit from State: %s, to State: %s", sm.CurrentState, s)
|
||||
targets, ok := sm.Transitions[sm.CurrentState]
|
||||
_, exist := targets[s]
|
||||
_, isForced := sm.ForcedStates[s]
|
||||
if !exist && !isForced {
|
||||
return "", fmt.Errorf("Transition from %s to %s does not exist!", sm.CurrentState, s)
|
||||
}
|
||||
exitHandler, ok := sm.Handlers[sm.CurrentState]
|
||||
if ok {
|
||||
if err := exitHandler.Exit(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
log.Debugf("No handler found for state:%s, skip", sm.CurrentState)
|
||||
}
|
||||
enterHandler, ok := sm.Handlers[s]
|
||||
var next string = models.JobContinue
|
||||
var err error
|
||||
if ok {
|
||||
if next, err = enterHandler.Enter(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
log.Debugf("No handler found for state:%s, skip", s)
|
||||
}
|
||||
sm.PreviousState = sm.CurrentState
|
||||
sm.CurrentState = s
|
||||
log.Debugf("Transition succeeded, current state: %s", s)
|
||||
return next, nil
|
||||
}
|
||||
|
||||
// Start kicks off the statemachine to transit from current state to s, and moves on
|
||||
// It will search the transit map if the next state is "_continue", and
|
||||
// will enter error state if there's more than one possible path when next state is "_continue"
|
||||
func (sm *JobSM) Start(s string) {
|
||||
n, err := sm.EnterState(s)
|
||||
log.Debugf("next state from handler: %s", n)
|
||||
for len(n) > 0 && err == nil {
|
||||
if d := sm.getDesiredState(); len(d) > 0 {
|
||||
log.Debugf("Desired state: %s, will ignore the next state from handler")
|
||||
n = d
|
||||
sm.setDesiredState("")
|
||||
continue
|
||||
}
|
||||
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
|
||||
for n = range sm.Transitions[sm.CurrentState] {
|
||||
break
|
||||
}
|
||||
log.Debugf("Continue to state: %s", n)
|
||||
continue
|
||||
}
|
||||
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
|
||||
log.Errorf("Next state is continue but there are %d possible next states in transition table", len(sm.Transitions[sm.CurrentState]))
|
||||
err = fmt.Errorf("Unable to continue")
|
||||
break
|
||||
}
|
||||
n, err = sm.EnterState(n)
|
||||
log.Debugf("next state from handler: %s", n)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warningf("The statemachin will enter error state due to error: %v", err)
|
||||
sm.EnterState(models.JobError)
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *JobSM) AddTransition(from string, to string, h StateHandler) {
|
||||
_, ok := sm.Transitions[from]
|
||||
if !ok {
|
||||
sm.Transitions[from] = make(map[string]struct{})
|
||||
}
|
||||
sm.Transitions[from][to] = struct{}{}
|
||||
sm.Handlers[to] = h
|
||||
}
|
||||
|
||||
func (sm *JobSM) RemoveTransition(from string, to string) {
|
||||
_, ok := sm.Transitions[from]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
delete(sm.Transitions[from], to)
|
||||
}
|
||||
|
||||
func (sm *JobSM) Stop() {
|
||||
sm.setDesiredState(models.JobStopped)
|
||||
}
|
||||
|
||||
func (sm *JobSM) getDesiredState() string {
|
||||
sm.lock.Lock()
|
||||
defer sm.lock.Unlock()
|
||||
return sm.desiredState
|
||||
}
|
||||
|
||||
func (sm *JobSM) setDesiredState(s string) {
|
||||
sm.lock.Lock()
|
||||
defer sm.lock.Unlock()
|
||||
sm.desiredState = s
|
||||
}
|
||||
|
||||
func (sm *JobSM) Init() error {
|
||||
//init parms
|
||||
regURL := os.Getenv("LOCAL_REGISTRY_URL")
|
||||
if len(regURL) == 0 {
|
||||
regURL = "http://registry:5000/"
|
||||
}
|
||||
job, err := dao.GetRepJob(sm.JobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get job, error: %v", err)
|
||||
}
|
||||
if job == nil {
|
||||
return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID)
|
||||
}
|
||||
policy, err := dao.GetRepPolicy(job.PolicyID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get policy, error: %v", err)
|
||||
}
|
||||
if policy == nil {
|
||||
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
|
||||
}
|
||||
sm.Parms = &RepJobParm{
|
||||
LocalRegURL: regURL,
|
||||
Repository: job.Repository,
|
||||
Enabled: policy.Enabled,
|
||||
Operation: job.Operation,
|
||||
}
|
||||
if policy.Enabled == 0 {
|
||||
//handler will cancel this job
|
||||
return nil
|
||||
}
|
||||
target, err := dao.GetRepTarget(policy.TargetID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get target, error: %v", err)
|
||||
}
|
||||
if target == nil {
|
||||
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
|
||||
}
|
||||
sm.Parms.TargetURL = target.URL
|
||||
sm.Parms.TargetUsername = target.Username
|
||||
sm.Parms.TargetPassword = target.Password
|
||||
//init states handlers
|
||||
sm.lock = &sync.Mutex{}
|
||||
sm.Handlers = make(map[string]StateHandler)
|
||||
sm.Transitions = make(map[string]map[string]struct{})
|
||||
sm.Logger = Logger{sm.JobID}
|
||||
sm.CurrentState = models.JobPending
|
||||
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
|
||||
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
|
||||
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
|
||||
if sm.Parms.Operation == models.RepOpTransfer {
|
||||
sm.AddTransition(models.JobRunning, "pull-img", ImgPuller{DummyHandler: DummyHandler{JobID: sm.JobID}, img: sm.Parms.Repository, logger: sm.Logger})
|
||||
//only handle on target for now
|
||||
sm.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: DummyHandler{JobID: sm.JobID}, targetURL: sm.Parms.TargetURL, logger: sm.Logger})
|
||||
sm.AddTransition("push-img", models.JobFinished, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,10 +0,0 @@
|
||||
{
|
||||
"job_type": "notexist",
|
||||
"options": {
|
||||
"whatever": "whatever"
|
||||
},
|
||||
"parms": {
|
||||
"test": "test"
|
||||
},
|
||||
"cron_str": ""
|
||||
}
|
@ -1,84 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego"
|
||||
"github.com/vmware/harbor/dao"
|
||||
"github.com/vmware/harbor/job"
|
||||
"github.com/vmware/harbor/utils/log"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const defaultMaxWorkers int = 10
|
||||
|
||||
type Worker struct {
|
||||
ID int
|
||||
RepJobs chan int64
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func (w *Worker) Start() {
|
||||
go func() {
|
||||
for {
|
||||
WorkerPool <- w
|
||||
select {
|
||||
case jobID := <-w.RepJobs:
|
||||
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
|
||||
job.HandleRepJob(jobID)
|
||||
case q := <-w.quit:
|
||||
if q {
|
||||
log.Debugf("worker: %d, will stop.", w.ID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Worker) Stop() {
|
||||
go func() {
|
||||
w.quit <- true
|
||||
}()
|
||||
}
|
||||
|
||||
var WorkerPool chan *Worker
|
||||
|
||||
func main() {
|
||||
dao.InitDB()
|
||||
initRouters()
|
||||
initWorkerPool()
|
||||
go dispatch()
|
||||
beego.Run()
|
||||
}
|
||||
|
||||
func initWorkerPool() {
|
||||
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
|
||||
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
|
||||
maxWorkers := int(maxWorkers64)
|
||||
if err != nil {
|
||||
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
|
||||
maxWorkers = defaultMaxWorkers
|
||||
}
|
||||
WorkerPool = make(chan *Worker, maxWorkers)
|
||||
for i := 0; i < maxWorkers; i++ {
|
||||
worker := &Worker{
|
||||
ID: i,
|
||||
RepJobs: make(chan int64),
|
||||
quit: make(chan bool),
|
||||
}
|
||||
worker.Start()
|
||||
}
|
||||
}
|
||||
|
||||
func dispatch() {
|
||||
for {
|
||||
select {
|
||||
case job := <-job.JobQueue:
|
||||
go func(jobID int64) {
|
||||
log.Debugf("Trying to dispatch job: %d", jobID)
|
||||
worker := <-WorkerPool
|
||||
worker.RepJobs <- jobID
|
||||
}(job)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
export MYSQL_HOST=127.0.0.1
|
||||
export MYSQL_PORT=3306
|
||||
export MYSQL_USR=root
|
||||
export MYSQL_PWD=root123
|
||||
export LOG_LEVEL=debug
|
||||
export UI_URL=http://127.0.0.1/
|
||||
export UI_USR=admin
|
||||
export UI_PWD=Harbor12345
|
||||
export MAX_JOB_WORKERS=1
|
||||
|
||||
./jobservice
|
@ -1 +0,0 @@
|
||||
{"policy_id": 1}
|
@ -1,2 +0,0 @@
|
||||
insert into replication_target (name, url, username, password) values ('test', '192.168.0.2:5000', 'testuser', 'passw0rd');
|
||||
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());
|
@ -1,11 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/vmware/harbor/api"
|
||||
|
||||
"github.com/astaxie/beego"
|
||||
)
|
||||
|
||||
func initRouters() {
|
||||
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
#export MYQL_ROOT_PASSWORD=root123
|
||||
docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev
|
@ -1,17 +0,0 @@
|
||||
{
|
||||
"job_type": "transfer_img_out",
|
||||
"options": {
|
||||
"whatever": "whatever"
|
||||
},
|
||||
"parms": {
|
||||
"secret": "mysecret",
|
||||
"image": "ubuntu",
|
||||
"targets": [{
|
||||
"url": "127.0.0.1:5000",
|
||||
"username": "admin",
|
||||
"password": "admin"
|
||||
}]
|
||||
|
||||
},
|
||||
"cron_str": ""
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego/orm"
|
||||
)
|
||||
|
||||
func init() {
|
||||
orm.RegisterModel(new(RepTarget),
|
||||
new(RepPolicy),
|
||||
new(RepJob))
|
||||
}
|
@ -1,30 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobEntry struct {
|
||||
ID int64 `orm:"column(job_id)" json:"job_id"`
|
||||
Type string `orm:"column(job_type)" json:"job_type"`
|
||||
OptionsStr string `orm:"column(options)"`
|
||||
ParmsStr string `orm:"column(parms)"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
Options map[string]interface{} `json:"options"`
|
||||
Parms map[string]interface{} `json:"parms"`
|
||||
Enabled int `orm:"column(enabled)" json:"enabled"`
|
||||
CronStr string `orm:"column(cron_str)" json:"cron_str"`
|
||||
TriggeredBy string `orm:"column(triggered_by)" json:"triggered_by"`
|
||||
CreationTime time.Time `orm:"creation_time" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"update_time" json:"update_time"`
|
||||
Logs []JobLog `json:"logs"`
|
||||
}
|
||||
|
||||
type JobLog struct {
|
||||
ID int64 `orm:"column(log_id)" json:"log_id"`
|
||||
JobID int64 `orm:"column(job_id)" json:"job_id"`
|
||||
Level string `orm:"column(level)" json:"level"`
|
||||
Message string `orm:"column(message)" json:"message"`
|
||||
CreationTime time.Time `orm:"creation_time" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"update_time" json:"update_time"`
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
JobPending string = "pending"
|
||||
JobRunning string = "running"
|
||||
JobError string = "error"
|
||||
JobStopped string = "stopped"
|
||||
JobFinished string = "finished"
|
||||
JobCanceled string = "canceled"
|
||||
// statemachine will move to next possible state based on trasition table
|
||||
JobContinue string = "_continue"
|
||||
RepOpTransfer string = "transfer"
|
||||
RepOpDelete string = "delete"
|
||||
)
|
||||
|
||||
type RepPolicy struct {
|
||||
ID int64 `orm:"column(id)" json:"id"`
|
||||
ProjectID int64 `orm:"column(project_id)" json:"project_id"`
|
||||
TargetID int64 `orm:"column(target_id)" json:"target_id"`
|
||||
Name string `orm:"column(name)" json:"name"`
|
||||
Target RepTarget `orm:"-" json:"target"`
|
||||
Enabled int `orm:"column(enabled)" json:"enabled"`
|
||||
Description string `orm:"column(description)" json:"description"`
|
||||
CronStr string `orm:"column(cron_str)" json:"cron_str"`
|
||||
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
}
|
||||
|
||||
type RepJob struct {
|
||||
ID int64 `orm:"column(id)" json:"id"`
|
||||
Status string `orm:"column(status)" json:"status"`
|
||||
Repository string `orm:"column(repository)" json:"repository"`
|
||||
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
|
||||
Operation string `orm:"column(operation)" json:"operation"`
|
||||
Policy RepPolicy `orm:"-" json:"policy"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
}
|
||||
|
||||
type RepTarget struct {
|
||||
ID int64 `orm:"column(id)" json:"id"`
|
||||
URL string `orm:"column(url)" json:"url"`
|
||||
Name string `orm:"column(name)" json:"name"`
|
||||
Username string `orm:"column(username)" json:"username"`
|
||||
Password string `orm:"column(password)" json:"password"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
}
|
||||
|
||||
func (rt *RepTarget) TableName() string {
|
||||
return "replication_target"
|
||||
}
|
||||
|
||||
func (rj *RepJob) TableName() string {
|
||||
return "replication_job"
|
||||
}
|
||||
|
||||
func (rp *RepPolicy) TableName() string {
|
||||
return "replication_policy"
|
||||
}
|
Loading…
Reference in New Issue
Block a user