Merge pull request #353 from reasonerjt/job-service

update schema, and resume jobs when job service restarts.
This commit is contained in:
Daniel Jiang 2016-06-20 13:28:30 +08:00 committed by GitHub
commit 044e6a7981
6 changed files with 172 additions and 37 deletions

View File

@ -123,6 +123,12 @@ create table replication_target (
url varchar(64), url varchar(64),
username varchar(40), username varchar(40),
password varchar(40), password varchar(40),
/*
target_type indicates the type of target registry,
0 means it's a harbor instance,
1 means it's a regulart registry
*/
target_type tinyint(1) NOT NULL DEFAULT 0,
creation_time timestamp default CURRENT_TIMESTAMP, creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id) PRIMARY KEY (id)

View File

@ -1107,6 +1107,24 @@ func TestGetRepJobByPolicy(t *testing.T) {
} }
} }
func TestDeleteRepJob(t *testing.T) {
err := DeleteRepJob(jobID)
if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
return
}
t.Logf("deleted rep job, id: %d", jobID)
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err)
return
}
if j != nil {
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
return
}
}
func TestFilterRepJobs(t *testing.T) { func TestFilterRepJobs(t *testing.T) {
jobs, err := FilterRepJobs("", policyID) jobs, err := FilterRepJobs("", policyID)
if err != nil { if err != nil {
@ -1145,8 +1163,11 @@ func TestGetRepoJobToStop(t *testing.T) {
}, },
} }
var err error var err error
var i int64
var ids []int64
for _, j := range jobs { for _, j := range jobs {
_, err = AddRepJob(j) i, err = AddRepJob(j)
ids = append(ids, i)
if err != nil { if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err) log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return return
@ -1158,10 +1179,17 @@ func TestGetRepoJobToStop(t *testing.T) {
return return
} }
//time.Sleep(15 * time.Second) //time.Sleep(15 * time.Second)
if len(res) != 2 { if len(res) != 1 {
log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res)) log.Errorf("Expected length of stoppable jobs, expected:1, in fact: %d", len(res))
return return
} }
for _, id := range ids {
err = DeleteRepJob(id)
if err != nil {
log.Errorf("Failed to delete job, id: %d, error: %v", id, err)
return
}
}
} }
func TestDeleteRepTarget(t *testing.T) { func TestDeleteRepTarget(t *testing.T) {
@ -1213,19 +1241,74 @@ func TestDeleteRepPolicy(t *testing.T) {
} }
} }
func TestDeleteRepJob(t *testing.T) { func TestResetRepJobs(t *testing.T) {
err := DeleteRepJob(jobID)
job1 := models.RepJob{
Repository: "library/ubuntua",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
}
job2 := models.RepJob{
Repository: "library/ubuntub",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
}
id1, err := AddRepJob(job1)
if err != nil { if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID) t.Errorf("Failed to add job: %+v, error: %v", job1, err)
return return
} }
t.Logf("deleted rep job, id: %d", jobID) id2, err := AddRepJob(job2)
j, err := GetRepJob(jobID)
if err != nil { if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err) t.Errorf("Failed to add job: %+v, error: %v", job2, err)
return
} }
if j != nil { err = ResetRunningJobs()
t.Errorf("Able to find rep job after deletion, id: %d", jobID) if err != nil {
t.Errorf("Failed to reset running jobs, error: %v", err)
}
j1, err := GetRepJob(id1)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id1, err)
return
}
if j1.Status != models.JobPending {
t.Errorf("The rep job: %d, status should be Pending, but infact: %s", id1, j1.Status)
return
}
j2, err := GetRepJob(id2)
if err != nil {
t.Errorf("Failed to get rep job, id: %d, error: %v", id2, err)
return
}
if j2.Status == models.JobPending {
t.Errorf("The rep job: %d, status should be Canceled, but infact: %s", id2, j2.Status)
return
}
}
func TestGetJobByStatus(t *testing.T) {
r1, err := GetRepJobByStatus(models.JobPending, models.JobRunning)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r1) != 1 {
t.Errorf("Unexpected length of result, expected 1, but in fact:%d", len(r1))
return
}
r2, err := GetRepJobByStatus(models.JobPending, models.JobCanceled)
if err != nil {
t.Errorf("Failed to run GetRepJobByStatus, error: %v", err)
}
if len(r2) != 2 {
t.Errorf("Unexpected length of result, expected 2, but in fact:%d", len(r2))
return
}
for _, j := range r2 {
DeleteRepJob(j.ID)
} }
} }

View File

@ -313,9 +313,13 @@ func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
return res, err return res, err
} }
func repJobPolicyIDQs(policyID int64) orm.QuerySeter { func repJobQs() orm.QuerySeter {
o := GetOrmer() o := GetOrmer()
return o.QueryTable("replication_job").Filter("policy_id", policyID) return o.QueryTable("replication_job")
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
return repJobQs().Filter("policy_id", policyID)
} }
// DeleteRepJob ... // DeleteRepJob ...
@ -339,6 +343,26 @@ func UpdateRepJobStatus(id int64, status string) error {
return err return err
} }
// ResetRunningJobs update all running jobs status to pending
func ResetRunningJobs() error {
o := GetOrmer()
sql := fmt.Sprintf("update replication_job set status = '%s' where status = '%s'", models.JobPending, models.JobRunning)
_, err := o.Raw(sql).Exec()
return err
}
// GetRepJobByStatus get jobs of certain statuses
func GetRepJobByStatus(status ...string) ([]*models.RepJob, error) {
var res []*models.RepJob
var t []interface{}
for _, s := range status {
t = append(t, interface{}(s))
}
_, err := repJobQs().Filter("status__in", t...).All(&res)
genTagListForJob(res...)
return res, err
}
func genTagListForJob(jobs ...*models.RepJob) { func genTagListForJob(jobs ...*models.RepJob) {
for _, j := range jobs { for _, j := range jobs {
if len(j.Tags) > 0 { if len(j.Tags) > 0 {

View File

@ -1,16 +1,16 @@
/* /*
Copyright (c) 2016 VMware, Inc. All Rights Reserved. Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package main package main
@ -19,6 +19,8 @@ import (
"github.com/astaxie/beego" "github.com/astaxie/beego"
"github.com/vmware/harbor/dao" "github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job" "github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
) )
func main() { func main() {
@ -26,5 +28,23 @@ func main() {
initRouters() initRouters()
job.InitWorkerPool() job.InitWorkerPool()
go job.Dispatch() go job.Dispatch()
resumeJobs()
beego.Run() beego.Run()
} }
func resumeJobs() {
log.Debugf("Trying to resume halted jobs...")
err := dao.ResetRunningJobs()
if err != nil {
log.Warningf("Failed to reset all running jobs to pending, error: %v", err)
}
jobs, err := dao.GetRepJobByStatus(models.JobPending)
if err == nil {
for _, j := range jobs {
log.Debugf("Rescheduling job: %d", j.ID)
job.Schedule(j.ID)
}
} else {
log.Warningf("Failed to get pending jobs, error: %v", err)
}
}

View File

@ -1,3 +1,4 @@
use registry; use registry;
insert into replication_target (name, url, username, password) values ('test', 'http://10.117.171.31', 'admin', 'Harbor12345'); insert into replication_target (name, url, username, password) values ('test', 'http://10.117.171.31', 'admin', 'Harbor12345');
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW()); insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());
insert into replication_job (status, policy_id, repository, operation) value ('running', 1, 'library/whatever', 'transfer')

View File

@ -1,16 +1,16 @@
/* /*
Copyright (c) 2016 VMware, Inc. All Rights Reserved. Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package models package models
@ -110,6 +110,7 @@ type RepTarget struct {
Name string `orm:"column(name)" json:"name"` Name string `orm:"column(name)" json:"name"`
Username string `orm:"column(username)" json:"username"` Username string `orm:"column(username)" json:"username"`
Password string `orm:"column(password)" json:"password"` Password string `orm:"column(password)" json:"password"`
Type int `orm:"column(target_type)" json:"type"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
} }