resume running jobs when jobservice starts

This commit is contained in:
Tan Jiang 2016-06-17 18:54:29 +08:00
parent 0b25569536
commit aa9fc2a083
6 changed files with 182 additions and 49 deletions

View File

@ -123,6 +123,12 @@ create table replication_target (
url varchar(64),
username varchar(40),
password varchar(40),
/*
target_type indicates the type of target registry,
0 means it's a harbor instance,
1 means it's a regulart registry
*/
target_type tinyint(1) NOT NULL DEFAULT 0,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)

View File

@ -1092,6 +1092,22 @@ func TestGetRepJobByPolicy(t *testing.T) {
}
}
func TestDeleteRepJob(t *testing.T) {
err := DeleteRepJob(jobID)
if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
return
}
t.Logf("deleted rep job, id: %d", jobID)
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err)
}
if j != nil {
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
}
}
func TestGetRepoJobToStop(t *testing.T) {
jobs := [...]models.RepJob{
models.RepJob{
@ -1114,8 +1130,11 @@ func TestGetRepoJobToStop(t *testing.T) {
},
}
var err error
var i int64
ids := make([]int64, 0)
for _, j := range jobs {
_, err = AddRepJob(j)
i, err = AddRepJob(j)
ids = append(ids, i)
if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return
@ -1127,10 +1146,17 @@ func TestGetRepoJobToStop(t *testing.T) {
return
}
//time.Sleep(15 * time.Second)
if len(res) != 2 {
log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res))
if len(res) != 1 {
log.Errorf("Expected length of stoppable jobs, expected:1, in fact: %d", len(res))
return
}
for _, id := range ids {
err = DeleteRepJob(id)
if err != nil {
log.Errorf("Failed to delete job, id: %d, error: %v", id, err)
return
}
}
}
func TestDeleteRepTarget(t *testing.T) {
@ -1182,19 +1208,74 @@ func TestDeleteRepPolicy(t *testing.T) {
}
}
func TestDeleteRepJob(t *testing.T) {
err := DeleteRepJob(jobID)
func TestResetRepJobs(t *testing.T) {
job1 := models.RepJob{
Repository: "library/ubuntua",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
}
job2 := models.RepJob{
Repository: "library/ubuntub",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
}
id1, err := AddRepJob(job1)
if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
t.Errorf("Failed to add job: %+v, error: %v", job1, err)
return
}
t.Logf("deleted rep job, id: %d", jobID)
j, err := GetRepJob(jobID)
id2, err := AddRepJob(job2)
if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err)
t.Errorf("Failed to add job: %+v, error: %v", job2, err)
return
}
if j != nil {
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
err = ResetRunningJobs()
if err != nil {
t.Errorf("Failed to reset running jobs, error: %v", err)
}
j1, err := GetRepJob(id1)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id1, err)
return
}
if j1.Status != models.JobPending {
t.Errorf("The rep job: %d, status should be Pending, but infact: %s", id1, j1.Status)
return
}
j2, err := GetRepJob(id2)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id2, err)
return
}
if j2.Status == models.JobPending {
t.Errorf("The rep job: %d, status should be Canceled, but infact: %s", id2, j2.Status)
return
}
}
func TestGetJobByStatus(t *testing.T) {
r1, err := GetRepJobByStatus(models.JobPending, models.JobRunning)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r1) != 1 {
t.Errorf("Unexpected length of result, expected 1, but in fact:%d", len(r1))
return
}
r2, err := GetRepJobByStatus(models.JobPending, models.JobCanceled)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r2) != 2 {
t.Errorf("Unexpected length of result, expected 2, but in fact:%d", len(r2))
return
}
for _, j := range r2 {
DeleteRepJob(j.ID)
}
}

View File

@ -267,9 +267,13 @@ func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
return res, err
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
func repJobQs() orm.QuerySeter {
o := GetOrmer()
return o.QueryTable("replication_job").Filter("policy_id", policyID)
return o.QueryTable("replication_job")
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
return repJobQs().Filter("policy_id", policyID)
}
// DeleteRepJob ...
@ -293,6 +297,26 @@ func UpdateRepJobStatus(id int64, status string) error {
return err
}
// ResetRunningJobs update all running jobs status to pending
func ResetRunningJobs() error {
o := GetOrmer()
sql := fmt.Sprintf("update replication_job set status = '%s' where status = '%s'", models.JobPending, models.JobRunning)
_, err := o.Raw(sql).Exec()
return err
}
// GetRepJobsByStatus get jobs of certain statuses
func GetRepJobByStatus(status ...string) ([]*models.RepJob, error) {
var res []*models.RepJob
t := make([]interface{}, 0)
for _, s := range status {
t = append(t, interface{}(s))
}
_, err := repJobQs().Filter("status__in", t...).All(&res)
genTagListForJob(res...)
return res, err
}
func genTagListForJob(jobs ...*models.RepJob) {
for _, j := range jobs {
if len(j.Tags) > 0 {

View File

@ -19,6 +19,8 @@ import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
func main() {
@ -26,5 +28,23 @@ func main() {
initRouters()
job.InitWorkerPool()
go job.Dispatch()
resumeJobs()
beego.Run()
}
func resumeJobs() {
log.Debugf("Trying to resume halted jobs...")
err := dao.ResetRunningJobs()
if err != nil {
log.Warningf("Failed to reset all running jobs to pending, error: %v", err)
}
jobs, err := dao.GetRepJobByStatus(models.JobPending)
if err == nil {
for _, j := range jobs {
log.Debugf("Rescheduling job: %d", j.ID)
job.Schedule(j.ID)
}
} else {
log.Warningf("Failed to get pending jobs, error: %v", err)
}
}

View File

@ -1,3 +1,4 @@
use registry;
insert into replication_target (name, url, username, password) values ('test', 'http://10.117.171.31', 'admin', 'Harbor12345');
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());
insert into replication_job (status, policy_id, repository, operation) value ('running', 1, 'library/whatever', 'transfer')

View File

@ -110,6 +110,7 @@ type RepTarget struct {
Name string `orm:"column(name)" json:"name"`
Username string `orm:"column(username)" json:"username"`
Password string `orm:"column(password)" json:"password"`
Type int `orm:"column(target_type)" json:"type"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}