Merge pull request #219 from reasonerjt/job-service

introduce workerpool to handle jobs
This commit is contained in:
Daniel Jiang 2016-05-12 17:47:08 +08:00
commit 4b20df7a95
12 changed files with 200 additions and 30 deletions

View File

@ -47,7 +47,7 @@ http {
location /v2/ {
proxy_pass http://registry/v2/;
proxy_set_header Host $host;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

View File

@ -92,7 +92,7 @@ func (pma *ProjectMemberAPI) Get() {
}
pma.Data["json"] = userList
} else { //return detail of a member
roleList, err := dao.GetUserProjectRoles(pma.memberID, pid)
roleList, err := listRoles(pma.memberID, pid)
if err != nil {
log.Errorf("Error occurred in GetUserProjectRoles, error: %v", err)
pma.CustomAbort(http.StatusInternalServerError, "Internal error.")
@ -241,3 +241,27 @@ func (pma *ProjectMemberAPI) Delete() {
return
}
}
//sysadmin has all privileges to all projects
func listRoles(userID int, projectID int64) ([]models.Role, error) {
roles := make([]models.Role, 1)
isSysAdmin, err := dao.IsAdminRole(userID)
if err != nil {
return roles, err
}
if isSysAdmin {
role, err := dao.GetRoleByID(models.PROJECTADMIN)
if err != nil {
return roles, err
}
roles = append(roles, *role)
return roles, nil
}
rs, err := dao.GetUserProjectRoles(userID, projectID)
if err != nil {
return roles, err
}
roles = append(roles, rs...)
return roles, nil
}

View File

@ -190,6 +190,16 @@ func (p *ProjectAPI) FilterAccessLog() {
}
func isProjectAdmin(userID int, pid int64) bool {
isSysAdmin, err := dao.IsAdminRole(userID)
if err != nil {
log.Errorf("Error occurred in IsAdminRole, returning false, error: %v", err)
return false
}
if isSysAdmin {
return true
}
rolelist, err := dao.GetUserProjectRoles(userID, pid)
if err != nil {
log.Errorf("Error occurred in GetUserProjectRoles, returning false, error: %v", err)

View File

@ -38,17 +38,36 @@ type searchResult struct {
}
// Get ...
func (n *SearchAPI) Get() {
userID, ok := n.GetSession("userId").(int)
func (s *SearchAPI) Get() {
userID, ok := s.GetSession("userId").(int)
if !ok {
userID = dao.NonExistUserID
}
keyword := n.GetString("q")
projects, err := dao.QueryRelevantProjects(userID)
keyword := s.GetString("q")
isSysAdmin, err := dao.IsAdminRole(userID)
if err != nil {
log.Errorf("Failed to get projects of user id: %d, error: %v", userID, err)
n.CustomAbort(http.StatusInternalServerError, "Failed to get project search result")
log.Errorf("failed to check whether the user %d is system admin: %v", userID, err)
s.CustomAbort(http.StatusInternalServerError, "internal error")
}
var projects []models.Project
if isSysAdmin {
projects, err = dao.GetAllProjects()
if err != nil {
log.Errorf("failed to get all projects: %v", err)
s.CustomAbort(http.StatusInternalServerError, "internal error")
}
} else {
projects, err = dao.GetUserRelevantProjects(userID)
if err != nil {
log.Errorf("failed to get user %d 's relevant projects: %v", userID, err)
s.CustomAbort(http.StatusInternalServerError, "internal error")
}
}
projectSorter := &utils.ProjectSorter{Projects: projects}
sort.Sort(projectSorter)
projectResult := []map[string]interface{}{}
@ -69,17 +88,17 @@ func (n *SearchAPI) Get() {
repositories, err2 := svc_utils.GetRepoFromCache()
if err2 != nil {
log.Errorf("Failed to get repos from cache, error: %v", err2)
n.CustomAbort(http.StatusInternalServerError, "Failed to get repositories search result")
s.CustomAbort(http.StatusInternalServerError, "Failed to get repositories search result")
}
sort.Strings(repositories)
repositoryResult := filterRepositories(repositories, projects, keyword)
result := &searchResult{Project: projectResult, Repository: repositoryResult}
n.Data["json"] = result
n.ServeJSON()
s.Data["json"] = result
s.ServeJSON()
}
func filterRepositories(repositories []string, projects []models.Project, keyword string) []map[string]interface{} {
var i, j int = 0, 0
i, j := 0, 0
result := []map[string]interface{}{}
for i < len(repositories) && j < len(projects) {
r := &utils.Repository{Name: repositories[i]}

View File

@ -351,7 +351,7 @@ func TestChangeUserPasswordWithIncorrectOldPassword(t *testing.T) {
}
func TestQueryRelevantProjectsWhenNoProjectAdded(t *testing.T) {
projects, err := QueryRelevantProjects(currentUser.UserID)
projects, err := GetUserRelevantProjects(currentUser.UserID)
if err != nil {
t.Errorf("Error occurred in QueryRelevantProjects: %v", err)
}
@ -629,10 +629,10 @@ func TestProjectPermission(t *testing.T) {
}
}
func TestQueryRelevantProjects(t *testing.T) {
projects, err := QueryRelevantProjects(currentUser.UserID)
func TestGetUserRelevantProjects(t *testing.T) {
projects, err := GetUserRelevantProjects(currentUser.UserID)
if err != nil {
t.Errorf("Error occurred in QueryRelevantProjects: %v", err)
t.Errorf("Error occurred in GetUserRelevantProjects: %v", err)
}
if len(projects) != 2 {
t.Errorf("Expected length of relevant projects is 2, but actual: %d, the projects: %+v", len(projects), projects)
@ -642,6 +642,19 @@ func TestQueryRelevantProjects(t *testing.T) {
}
}
func TestGetAllProjects(t *testing.T) {
projects, err := GetAllProjects()
if err != nil {
t.Errorf("Error occurred in GetAllProjects: %v", err)
}
if len(projects) != 2 {
t.Errorf("Expected length of projects is 2, but actual: %d, the projects: %+v", len(projects), projects)
}
if projects[1].Name != projectName {
t.Errorf("Expected project name in the list: %s, actual: %s", projectName, projects[1].Name)
}
}
func TestAddProjectMember(t *testing.T) {
err := AddProjectMember(currentProject.ProjectID, 1, models.DEVELOPER)
if err != nil {

View File

@ -208,18 +208,35 @@ func ToggleProjectPublicity(projectID int64, publicity int) error {
return err
}
// QueryRelevantProjects returns all projects that the user is a member of.
func QueryRelevantProjects(userID int) ([]models.Project, error) {
// GetUserRelevantProjects returns a project list,
// which satisfies the following conditions:
// 1. the project is not deleted
// 2. the prject is public or the user is a member of the project
func GetUserRelevantProjects(userID int) ([]models.Project, error) {
o := orm.NewOrm()
sql := `select distinct p.project_id, p.name, p.public
from project p
left join project_member pm on p.project_id = pm.project_id
left join user u on u.user_id = pm.user_id
where u.user_id = ? or p.public = 1 and p.deleted = 0`
var res []models.Project
_, err := o.Raw(sql, userID).QueryRows(&res)
if err != nil {
where (pm.user_id = ? or p.public = 1) and p.deleted = 0`
var projects []models.Project
if _, err := o.Raw(sql, userID).QueryRows(&projects); err != nil {
return nil, err
}
return res, err
return projects, nil
}
// GetAllProjects returns all projects which are not deleted
func GetAllProjects() ([]models.Project, error) {
o := orm.NewOrm()
sql := `select project_id, name, public
from project
where deleted = 0`
var projects []models.Project
if _, err := o.Raw(sql).QueryRows(&projects); err != nil {
return nil, err
}
return projects, nil
}

View File

@ -73,3 +73,18 @@ func IsAdminRole(userIDOrUsername interface{}) (bool, error) {
return user.HasAdminRole == 1, nil
}
// GetRoleByID ...
func GetRoleByID(id int) (*models.Role, error) {
o := orm.NewOrm()
sql := `select *
from role
where role_id = ?`
var role models.Role
if err := o.Raw(sql, id).QueryRow(&role); err != nil {
return nil, err
}
return &role, nil
}

View File

@ -207,7 +207,7 @@ $ rm -r /data/registry
By default, the data of database and image files in the registry are persisted in the directory **/data/** of the target machine. When Harbor's containers are removed and recreated, the data remain unchanged. Harbor leverages rsyslog to collect the logs of each container, by default the log files are stored in the directory **/var/log/harbor/** on Harbor's host.
##Troubleshooting
1.When setting up Harbor behind another nginx proxy or elastic load balancing, remove the below line if the proxy already has similar settings. Be sure to remove the line under these 3 sections: "location /", "location /v2/" and "location /service/".
1.When setting up Harbor behind another nginx proxy or elastic load balancing, remove the below line if the proxy already has similar settings. Be sure to edit Deploy/config/nginx/nginx.conf and remove the line under these 3 sections: "location /", "location /v2/" and "location /service/".
```
proxy_set_header X-Forwarded-Proto $scheme;
```

View File

@ -574,7 +574,7 @@ paths:
description: Retrieved tags from a relevant repository successfully.
500:
description: Unexpected internal errors.
/repositories/manifest:
/repositories/manifests:
get:
summary: Get manifests of a relevant repository.
description: |

View File

@ -6,9 +6,10 @@ import (
"github.com/vmware/harbor/utils/log"
)
var JobQueue chan int64 = make(chan int64)
func Schedule(jobID int64) {
//TODO: introduce jobqueue to better control concurrent job numbers
go HandleRepJob(jobID)
JobQueue <- jobID
}
func HandleRepJob(id int64) {

View File

@ -3,12 +3,82 @@ package main
import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
_ "github.com/vmware/harbor/job/imgout"
// "github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/utils/log"
"os"
"strconv"
)
const defaultMaxWorkers int = 10
type Worker struct {
ID int
RepJobs chan int64
quit chan bool
}
func (w *Worker) Start() {
go func() {
for {
WorkerPool <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
job.HandleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
var WorkerPool chan *Worker
func main() {
dao.InitDB()
initRouters()
initWorkerPool()
go dispatch()
beego.Run()
}
func initWorkerPool() {
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
maxWorkers := int(maxWorkers64)
if err != nil {
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
maxWorkers = defaultMaxWorkers
}
WorkerPool = make(chan *Worker, maxWorkers)
for i := 0; i < maxWorkers; i++ {
worker := &Worker{
ID: i,
RepJobs: make(chan int64),
quit: make(chan bool),
}
worker.Start()
}
}
func dispatch() {
for {
select {
case job := <-job.JobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool
worker.RepJobs <- jobID
}(job)
}
}
}

View File

@ -6,5 +6,6 @@ export LOG_LEVEL=debug
export UI_URL=http://127.0.0.1/
export UI_USR=admin
export UI_PWD=Harbor12345
export MAX_JOB_WORKERS=1
./jobservice