Merge branch 'job-service' into new-ui-with-sync-image

This commit is contained in:
kunw 2016-06-03 14:10:53 +08:00
commit 86e13b6c02
56 changed files with 3197 additions and 236 deletions

2
.gitignore vendored
View File

@ -3,6 +3,6 @@ Deploy/config/registry/config.yml
Deploy/config/ui/env
Deploy/config/ui/app.conf
Deploy/config/db/env
Deploy/harbor.cfg
Deploy/config/jobservice/env
ui/ui
*.pyc

View File

@ -5,13 +5,25 @@ go:
go_import_path: github.com/vmware/harbor
service:
- mysql
#service:
# - mysql
env: DB_HOST=127.0.0.1 DB_PORT=3306 DB_USR=root DB_PWD=
install:
- sudo apt-get update && sudo apt-get install -y libldap2-dev
- sudo apt-get update && sudo apt-get install -y libldap2-dev
- sudo apt-get remove mysql-common mysql-server-5.5 mysql-server-core-5.5 mysql-client-5.5 mysql-client-core-5.5
- sudo apt-get autoremove
- sudo apt-get install libaio1
- wget -O mysql-5.6.14.deb http://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-5.6.14-debian6.0-x86_64.deb/from/http://cdn.mysql.com/
- sudo dpkg -i mysql-5.6.14.deb
- sudo cp /opt/mysql/server-5.6/support-files/mysql.server /etc/init.d/mysql.server
- sudo ln -s /opt/mysql/server-5.6/bin/* /usr/bin/
- sudo sed -i'' 's/table_cache/table_open_cache/' /etc/mysql/my.cnf
- sudo sed -i'' 's/log_slow_queries/slow_query_log/' /etc/mysql/my.cnf
- sudo sed -i'' 's/basedir[^=]\+=.*$/basedir = \/opt\/mysql\/server-5.6/' /etc/mysql/my.cnf
- sudo /etc/init.d/mysql.server start
- mysql --version
- go get -d github.com/docker/distribution
- go get -d github.com/docker/libtrust
- go get -d github.com/go-sql-driver/mysql

View File

@ -103,6 +103,43 @@ create table access_log (
FOREIGN KEY (project_id) REFERENCES project (project_id)
);
create table replication_policy (
id int NOT NULL AUTO_INCREMENT,
name varchar(256),
project_id int NOT NULL,
target_id int NOT NULL,
enabled tinyint(1) NOT NULL DEFAULT 1,
description text,
cron_str varchar(256),
start_time timestamp NULL,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table replication_target (
id int NOT NULL AUTO_INCREMENT,
name varchar(64),
url varchar(64),
username varchar(40),
password varchar(40),
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table replication_job (
id int NOT NULL AUTO_INCREMENT,
status varchar(64) NOT NULL,
policy_id int NOT NULL,
repository varchar(256) NOT NULL,
operation varchar(64) NOT NULL,
tags varchar(16384),
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table properties (
k varchar(64) NOT NULL,
v varchar(128) NOT NULL,

View File

@ -55,6 +55,19 @@ services:
options:
syslog-address: "tcp://127.0.0.1:1514"
syslog-tag: "ui"
jobservice:
build:
context: ../
dockerfile: Dockerfile.job
env_file:
- ./config/jobservice/env
depends_on:
- ui
logging:
driver: "syslog"
options:
syslog-address: "tcp://127.0.0.1:1514"
syslog-tag: "jobservice"
proxy:
image: library/nginx:1.9
volumes:

View File

@ -2,7 +2,7 @@
#The IP address or hostname to access admin UI and registry service.
#DO NOT use localhost or 127.0.0.1, because Harbor needs to be accessed by external clients.
hostname = reg.mydomain.com
hostname = reg.mydomain.org
#The protocol for accessing the UI and token/notification service, by default it is http.
#It can be set to https if ssl is enabled on nginx.
@ -38,6 +38,9 @@ self_registration = on
#Turn on or off the customize your certicate
customize_crt = on
#Number of job workers in job service, default is 10
max_job_workers = 10
#fill in your certicate message
crt_country = CN
crt_state = State

View File

@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals # We require Python 2.6 or later
from string import Template
import random
import string
import os
import sys
from io import open
@ -44,13 +46,15 @@ crt_organization = rcp.get("configuration", "crt_organization")
crt_organizationalunit = rcp.get("configuration", "crt_organizationalunit")
crt_commonname = rcp.get("configuration", "crt_commonname")
crt_email = rcp.get("configuration", "crt_email")
max_job_workers = rcp.get("configuration", "max_job_workers")
########
ui_secret = ''.join(random.choice(string.ascii_letters+string.digits) for i in range(16))
base_dir = os.path.dirname(__file__)
config_dir = os.path.join(base_dir, "config")
templates_dir = os.path.join(base_dir, "templates")
ui_config_dir = os.path.join(config_dir,"ui")
if not os.path.exists(ui_config_dir):
os.makedirs(os.path.join(config_dir, "ui"))
@ -59,6 +63,10 @@ db_config_dir = os.path.join(config_dir, "db")
if not os.path.exists(db_config_dir):
os.makedirs(os.path.join(config_dir, "db"))
job_config_dir = os.path.join(config_dir, "jobservice")
if not os.path.exists(job_config_dir):
os.makedirs(job_config_dir)
def render(src, dest, **kw):
t = Template(open(src, 'r').read())
with open(dest, 'w') as f:
@ -69,8 +77,9 @@ ui_conf_env = os.path.join(config_dir, "ui", "env")
ui_conf = os.path.join(config_dir, "ui", "app.conf")
registry_conf = os.path.join(config_dir, "registry", "config.yml")
db_conf_env = os.path.join(config_dir, "db", "env")
job_conf_env = os.path.join(config_dir, "jobservice", "env")
conf_files = [ ui_conf, ui_conf_env, registry_conf, db_conf_env ]
conf_files = [ ui_conf, ui_conf_env, registry_conf, db_conf_env, job_conf_env ]
def rmdir(cf):
for f in cf:
if os.path.exists(f):
@ -87,7 +96,8 @@ render(os.path.join(templates_dir, "ui", "env"),
harbor_admin_password=harbor_admin_password,
ldap_url=ldap_url,
ldap_basedn=ldap_basedn,
self_registration=self_registration)
self_registration=self_registration,
ui_secret=ui_secret)
render(os.path.join(templates_dir, "ui", "app.conf"),
ui_conf,
@ -107,6 +117,13 @@ render(os.path.join(templates_dir, "db", "env"),
db_conf_env,
db_password=db_password)
render(os.path.join(templates_dir, "jobservice", "env"),
job_conf_env,
db_password=db_password,
ui_secret=ui_secret,
max_job_workers=max_job_workers,
ui_url=ui_url)
def validate_crt_subj(dirty_subj):
subj_list = [item for item in dirty_subj.strip().split("/") \
if len(item.split("=")) == 2 and len(item.split("=")[1]) > 0]

View File

@ -0,0 +1,9 @@
MYSQL_HOST=mysql
MYSQL_PORT=3306
MYSQL_USR=root
MYSQL_PWD=$db_password
UI_SECRET=$ui_secret
HARBOR_URL=$ui_url
MAX_JOB_WORKERS=10
LOG_LEVEL=debug
GODEBUG=netdns=cgo

View File

@ -10,6 +10,7 @@ HARBOR_URL=$hostname
AUTH_MODE=$auth_mode
LDAP_URL=$ldap_url
LDAP_BASE_DN=$ldap_basedn
UI_SECRET=$ui_secret
SELF_REGISTRATION=$self_registration
LOG_LEVEL=debug
GODEBUG=netdns=cgo

19
Dockerfile.job Normal file
View File

@ -0,0 +1,19 @@
FROM golang:1.6.2
MAINTAINER jiangd@vmware.com
RUN apt-get update \
&& apt-get install -y libldap2-dev \
&& rm -r /var/lib/apt/lists/*
COPY . /go/src/github.com/vmware/harbor
WORKDIR /go/src/github.com/vmware/harbor/jobservice
RUN go get -d github.com/docker/distribution \
&& go get -d github.com/docker/libtrust \
&& go get -d github.com/go-sql-driver/mysql \
&& go build -v -a -o /go/bin/harbor_jobservice \
&& chmod u+x /go/bin/harbor_jobservice
ADD ./jobservice/conf /go/bin/conf
WORKDIR /go/bin/
ENTRYPOINT ["/go/bin/harbor_jobservice"]

184
api/jobs/replication.go Normal file
View File

@ -0,0 +1,184 @@
package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strconv"
"github.com/vmware/harbor/api"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/job/utils"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
// ReplicationJob handles /api/replicationJobs /api/replicationJobs/:id/log
// /api/replicationJobs/actions
type ReplicationJob struct {
api.BaseAPI
}
// ReplicationReq holds informations of request for /api/replicationJobs
type ReplicationReq struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
Operation string `json:"operation"`
TagList []string `json:"tags"`
}
// Post creates replication jobs according to the policy.
func (rj *ReplicationJob) Post() {
var data ReplicationReq
rj.DecodeJSONReq(&data)
log.Debugf("data: %+v", data)
p, err := dao.GetRepPolicy(data.PolicyID)
if err != nil {
log.Errorf("Failed to get policy, error: %v", err)
rj.RenderError(http.StatusInternalServerError, fmt.Sprintf("Failed to get policy, id: %d", data.PolicyID))
return
}
if p == nil {
log.Errorf("Policy not found, id: %d", data.PolicyID)
rj.RenderError(http.StatusNotFound, fmt.Sprintf("Policy not found, id: %d", data.PolicyID))
return
}
if len(data.Repo) == 0 { // sync all repositories
repoList, err := getRepoList(p.ProjectID)
if err != nil {
log.Errorf("Failed to get repository list, project id: %d, error: %v", p.ProjectID, err)
rj.RenderError(http.StatusInternalServerError, err.Error())
return
}
log.Debugf("repo list: %v", repoList)
for _, repo := range repoList {
err := rj.addJob(repo, data.PolicyID, models.RepOpTransfer)
if err != nil {
log.Errorf("Failed to insert job record, error: %v", err)
rj.RenderError(http.StatusInternalServerError, err.Error())
return
}
}
} else { // sync a single repository
var op string
if len(data.Operation) > 0 {
op = data.Operation
} else {
op = models.RepOpTransfer
}
err := rj.addJob(data.Repo, data.PolicyID, op, data.TagList...)
if err != nil {
log.Errorf("Failed to insert job record, error: %v", err)
rj.RenderError(http.StatusInternalServerError, err.Error())
return
}
}
}
func (rj *ReplicationJob) addJob(repo string, policyID int64, operation string, tags ...string) error {
j := models.RepJob{
Repository: repo,
PolicyID: policyID,
Operation: operation,
TagList: tags,
}
log.Debugf("Creating job for repo: %s, policy: %d", repo, policyID)
id, err := dao.AddRepJob(j)
if err != nil {
return err
}
log.Debugf("Send job to scheduler, job id: %d", id)
job.Schedule(id)
return nil
}
// RepActionReq holds informations of request for /api/replicationJobs/actions
type RepActionReq struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}
// HandleAction supports some operations to all the jobs of one policy
func (rj *ReplicationJob) HandleAction() {
var data RepActionReq
rj.DecodeJSONReq(&data)
//Currently only support stop action
if data.Action != "stop" {
log.Errorf("Unrecognized action: %s", data.Action)
rj.RenderError(http.StatusBadRequest, fmt.Sprintf("Unrecongized action: %s", data.Action))
return
}
jobs, err := dao.GetRepJobToStop(data.PolicyID)
if err != nil {
log.Errorf("Failed to get jobs to stop, error: %v", err)
rj.RenderError(http.StatusInternalServerError, "Faild to get jobs to stop")
return
}
var jobIDList []int64
for _, j := range jobs {
jobIDList = append(jobIDList, j.ID)
}
job.WorkerPool.StopJobs(jobIDList)
}
// GetLog gets logs of the job
func (rj *ReplicationJob) GetLog() {
idStr := rj.Ctx.Input.Param(":id")
jid, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
log.Errorf("Error parsing job id: %s, error: %v", idStr, err)
rj.RenderError(http.StatusBadRequest, "Invalid job id")
return
}
logFile := utils.GetJobLogPath(jid)
rj.Ctx.Output.Download(logFile)
}
// calls the api from UI to get repo list
func getRepoList(projectID int64) ([]string, error) {
/*
uiUser := os.Getenv("UI_USR")
if len(uiUser) == 0 {
uiUser = "admin"
}
uiPwd := os.Getenv("UI_PWD")
if len(uiPwd) == 0 {
uiPwd = "Harbor12345"
}
*/
uiURL := config.LocalHarborURL()
client := &http.Client{}
req, err := http.NewRequest("GET", uiURL+"/api/repositories?project_id="+strconv.Itoa(int(projectID)), nil)
if err != nil {
log.Errorf("Error when creating request: %v", err)
return nil, err
}
//req.SetBasicAuth(uiUser, uiPwd)
req.AddCookie(&http.Cookie{Name: models.UISecretCookie, Value: config.UISecret()})
//dump, err := httputil.DumpRequest(req, true)
//log.Debugf("req: %q", dump)
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error when calling UI api to get repositories, error: %v", err)
return nil, err
}
if resp.StatusCode != http.StatusOK {
log.Errorf("Unexpected status code: %d", resp.StatusCode)
dump, _ := httputil.DumpResponse(resp, true)
log.Debugf("response: %q", dump)
return nil, fmt.Errorf("Unexpected status code when getting repository list: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Failed to read the response body, error: %v", err)
return nil, err
}
var repoList []string
err = json.Unmarshal(body, &repoList)
return repoList, err
}

119
api/replication_job.go Normal file
View File

@ -0,0 +1,119 @@
package api
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
// RepJobAPI handles request to /api/replicationJobs /api/replicationJobs/:id/log
type RepJobAPI struct {
BaseAPI
jobID int64
}
// Prepare validates that whether user has system admin role
func (ra *RepJobAPI) Prepare() {
uid := ra.ValidateUser()
isAdmin, err := dao.IsAdminRole(uid)
if err != nil {
log.Errorf("Failed to Check if the user is admin, error: %v, uid: %d", err, uid)
}
if !isAdmin {
ra.CustomAbort(http.StatusForbidden, "")
}
idStr := ra.Ctx.Input.Param(":id")
if len(idStr) != 0 {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
ra.CustomAbort(http.StatusBadRequest, "ID is invalid")
}
ra.jobID = id
}
}
// Get gets all the jobs according to the policy
func (ra *RepJobAPI) Get() {
policyID, err := ra.GetInt64("policy_id")
if err != nil {
log.Errorf("Failed to get policy id, error: %v", err)
ra.RenderError(http.StatusBadRequest, "Invalid policy id")
return
}
jobs, err := dao.GetRepJobByPolicy(policyID)
if err != nil {
log.Errorf("Failed to query job from db, error: %v", err)
ra.RenderError(http.StatusInternalServerError, "Failed to query job")
return
}
ra.Data["json"] = jobs
ra.ServeJSON()
}
// Delete ...
func (ra *RepJobAPI) Delete() {
if ra.jobID == 0 {
ra.CustomAbort(http.StatusBadRequest, "id is nil")
}
job, err := dao.GetRepJob(ra.jobID)
if err != nil {
log.Errorf("failed to get job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if job.Status == models.JobPending || job.Status == models.JobRunning {
ra.CustomAbort(http.StatusBadRequest, fmt.Sprintf("job is %s, can not be deleted", job.Status))
}
if err = dao.DeleteRepJob(ra.jobID); err != nil {
log.Errorf("failed to deleted job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// GetLog ...
func (ra *RepJobAPI) GetLog() {
if ra.jobID == 0 {
ra.CustomAbort(http.StatusBadRequest, "id is nil")
}
resp, err := http.Get(buildJobLogURL(strconv.FormatInt(ra.jobID, 10)))
if err != nil {
log.Errorf("failed to get log for job %d: %v", ra.jobID, err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if resp.StatusCode == http.StatusOK {
for key, values := range resp.Header {
for _, value := range values {
ra.Ctx.ResponseWriter.Header().Set(key, value)
}
}
if _, err = io.Copy(ra.Ctx.ResponseWriter, resp.Body); err != nil {
log.Errorf("failed to write log to response; %v", err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("failed to read reponse body: %v", err)
ra.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
ra.CustomAbort(resp.StatusCode, string(b))
}
//TODO:add Post handler to call job service API to submit jobs by policy

134
api/replication_policy.go Normal file
View File

@ -0,0 +1,134 @@
package api
import (
"fmt"
"net/http"
"strconv"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
// RepPolicyAPI handles /api/replicationPolicies /api/replicationPolicies/:id/enablement
type RepPolicyAPI struct {
BaseAPI
policyID int64
policy *models.RepPolicy
}
// Prepare validates whether the user has system admin role
// and parsed the policy ID if it exists
func (pa *RepPolicyAPI) Prepare() {
uid := pa.ValidateUser()
var err error
isAdmin, err := dao.IsAdminRole(uid)
if err != nil {
log.Errorf("Failed to Check if the user is admin, error: %v, uid: %d", err, uid)
}
if !isAdmin {
pa.CustomAbort(http.StatusForbidden, "")
}
idStr := pa.Ctx.Input.Param(":id")
if len(idStr) > 0 {
pa.policyID, err = strconv.ParseInt(idStr, 10, 64)
if err != nil {
log.Errorf("Error parsing policy id: %s, error: %v", idStr, err)
pa.CustomAbort(http.StatusBadRequest, "invalid policy id")
}
p, err := dao.GetRepPolicy(pa.policyID)
if err != nil {
log.Errorf("Error occurred in GetRepPolicy, error: %v", err)
pa.CustomAbort(http.StatusInternalServerError, "Internal error.")
}
if p == nil {
pa.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy does not exist, id: %v", pa.policyID))
}
pa.policy = p
}
}
// Get gets all the policies according to the project
func (pa *RepPolicyAPI) Get() {
projectID, err := pa.GetInt64("project_id")
if err != nil {
log.Errorf("Failed to get project id, error: %v", err)
pa.RenderError(http.StatusBadRequest, "Invalid project id")
return
}
policies, err := dao.GetRepPolicyByProject(projectID)
if err != nil {
log.Errorf("Failed to query policies from db, error: %v", err)
pa.RenderError(http.StatusInternalServerError, "Failed to query policies")
return
}
pa.Data["json"] = policies
pa.ServeJSON()
}
// Post creates a policy, and if it is enbled, the replication will be triggered right now.
func (pa *RepPolicyAPI) Post() {
policy := models.RepPolicy{}
pa.DecodeJSONReq(&policy)
pid, err := dao.AddRepPolicy(policy)
if err != nil {
log.Errorf("Failed to add policy to DB, error: %v", err)
pa.RenderError(http.StatusInternalServerError, "Internal Error")
return
}
if policy.Enabled == 1 {
go func() {
if err := TriggerReplication(pid, "", nil, models.RepOpTransfer); err != nil {
log.Errorf("failed to trigger replication of %d: %v", pid, err)
} else {
log.Infof("replication of %d triggered", pid)
}
}()
}
pa.Redirect(http.StatusCreated, strconv.FormatInt(pid, 10))
}
type enablementReq struct {
Enabled int `json:"enabled"`
}
// UpdateEnablement changes the enablement of the policy
func (pa *RepPolicyAPI) UpdateEnablement() {
e := enablementReq{}
pa.DecodeJSONReq(&e)
if e.Enabled != 0 && e.Enabled != 1 {
pa.RenderError(http.StatusBadRequest, "invalid enabled value")
return
}
if pa.policy.Enabled == e.Enabled {
return
}
if err := dao.UpdateRepPolicyEnablement(pa.policyID, e.Enabled); err != nil {
log.Errorf("Failed to update policy enablement in DB, error: %v", err)
pa.RenderError(http.StatusInternalServerError, "Internal Error")
return
}
if e.Enabled == 1 {
go func() {
if err := TriggerReplication(pa.policyID, "", nil, models.RepOpTransfer); err != nil {
log.Errorf("failed to trigger replication of %d: %v", pa.policyID, err)
} else {
log.Infof("replication of %d triggered", pa.policyID)
}
}()
} else {
go func() {
if err := postReplicationAction(pa.policyID, "stop"); err != nil {
log.Errorf("failed to stop replication of %d: %v", pa.policyID, err)
} else {
log.Infof("try to stop replication of %d", pa.policyID)
}
}()
}
}

View File

@ -27,11 +27,14 @@ import (
"github.com/docker/distribution/manifest/schema1"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/service/cache"
svc_utils "github.com/vmware/harbor/service/utils"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry"
registry_error "github.com/vmware/harbor/utils/registry/error"
"github.com/vmware/harbor/utils/registry/auth"
"github.com/vmware/harbor/utils/registry/errors"
)
// RepositoryAPI handles request to /api/repositories /api/repositories/tags /api/repositories/manifests, the parm has to be put
@ -62,7 +65,13 @@ func (ra *RepositoryAPI) Get() {
}
if p.Public == 0 {
userID := ra.ValidateUser()
var userID int
if svc_utils.VerifySecret(ra.Ctx.Request) {
userID = 1
} else {
userID = ra.ValidateUser()
}
if !checkProjectPermission(userID, projectID) {
ra.RenderError(http.StatusForbidden, "")
@ -70,7 +79,7 @@ func (ra *RepositoryAPI) Get() {
}
}
repoList, err := svc_utils.GetRepoFromCache()
repoList, err := cache.GetRepoFromCache()
if err != nil {
log.Errorf("Failed to get repo from cache, error: %v", err)
ra.RenderError(http.StatusInternalServerError, "internal sever error")
@ -117,14 +126,12 @@ func (ra *RepositoryAPI) Delete() {
if len(tag) == 0 {
tagList, err := rc.ListTag()
if err != nil {
e, ok := errors.ParseError(err)
if ok {
log.Info(e)
ra.CustomAbort(e.StatusCode, e.Message)
} else {
log.Error(err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
if regErr, ok := err.(*registry_error.Error); ok {
ra.CustomAbort(regErr.StatusCode, regErr.Detail)
}
log.Errorf("error occurred while listing tags of %s: %v", repoName, err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
}
tags = append(tags, tagList...)
} else {
@ -133,20 +140,21 @@ func (ra *RepositoryAPI) Delete() {
for _, t := range tags {
if err := rc.DeleteTag(t); err != nil {
e, ok := errors.ParseError(err)
if ok {
ra.CustomAbort(e.StatusCode, e.Message)
} else {
log.Error(err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
if regErr, ok := err.(*registry_error.Error); ok {
ra.CustomAbort(regErr.StatusCode, regErr.Detail)
}
log.Errorf("error occurred while deleting tags of %s: %v", repoName, err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
}
log.Infof("delete tag: %s %s", repoName, t)
go TriggerReplicationByRepository(repoName, []string{t}, models.RepOpDelete)
}
go func() {
log.Debug("refreshing catalog cache")
if err := svc_utils.RefreshCatalogCache(); err != nil {
if err := cache.RefreshCatalogCache(); err != nil {
log.Errorf("error occurred while refresh catalog cache: %v", err)
}
}()
@ -175,13 +183,12 @@ func (ra *RepositoryAPI) GetTags() {
ts, err := rc.ListTag()
if err != nil {
e, ok := errors.ParseError(err)
if ok {
ra.CustomAbort(e.StatusCode, e.Message)
} else {
log.Error(err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
if regErr, ok := err.(*registry_error.Error); ok {
ra.CustomAbort(regErr.StatusCode, regErr.Detail)
}
log.Errorf("error occurred while listing tags of %s: %v", repoName, err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
}
tags = append(tags, ts...)
@ -212,13 +219,12 @@ func (ra *RepositoryAPI) GetManifests() {
mediaTypes := []string{schema1.MediaTypeManifest}
_, _, payload, err := rc.PullManifest(tag, mediaTypes)
if err != nil {
e, ok := errors.ParseError(err)
if ok {
ra.CustomAbort(e.StatusCode, e.Message)
} else {
log.Error(err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
if regErr, ok := err.(*registry_error.Error); ok {
ra.CustomAbort(regErr.StatusCode, regErr.Detail)
}
log.Errorf("error occurred while getting manifest of %s:%s: %v", repoName, tag, err)
ra.CustomAbort(http.StatusInternalServerError, "internal error")
}
mani := models.Manifest{}
err = json.Unmarshal(payload, &mani)

View File

@ -22,7 +22,7 @@ import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
svc_utils "github.com/vmware/harbor/service/utils"
"github.com/vmware/harbor/service/cache"
"github.com/vmware/harbor/utils"
"github.com/vmware/harbor/utils/log"
)
@ -85,7 +85,7 @@ func (s *SearchAPI) Get() {
}
}
repositories, err2 := svc_utils.GetRepoFromCache()
repositories, err2 := cache.GetRepoFromCache()
if err2 != nil {
log.Errorf("Failed to get repos from cache, error: %v", err2)
s.CustomAbort(http.StatusInternalServerError, "Failed to get repositories search result")

View File

@ -21,7 +21,7 @@ import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
svc_utils "github.com/vmware/harbor/service/utils"
"github.com/vmware/harbor/service/cache"
"github.com/vmware/harbor/utils/log"
)
@ -88,7 +88,7 @@ func (s *StatisticAPI) Get() {
//getReposByProject returns repo numbers of specified project
func getRepoCountByProject(projectName string) int {
repoList, err := svc_utils.GetRepoFromCache()
repoList, err := cache.GetRepoFromCache()
if err != nil {
log.Errorf("Failed to get repo from cache, error: %v", err)
return 0
@ -107,7 +107,7 @@ func getRepoCountByProject(projectName string) int {
//getTotalRepoCount returns total repo count
func getTotalRepoCount() int {
repoList, err := svc_utils.GetRepoFromCache()
repoList, err := cache.GetRepoFromCache()
if err != nil {
log.Errorf("Failed to get repo from cache, error: %v", err)
return 0

245
api/target.go Normal file
View File

@ -0,0 +1,245 @@
/*
Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils"
"github.com/vmware/harbor/utils/log"
registry_util "github.com/vmware/harbor/utils/registry"
"github.com/vmware/harbor/utils/registry/auth"
registry_error "github.com/vmware/harbor/utils/registry/error"
)
// TargetAPI handles request to /api/targets/ping /api/targets/{}
type TargetAPI struct {
BaseAPI
}
// Prepare validates the user
func (t *TargetAPI) Prepare() {
userID := t.ValidateUser()
isSysAdmin, err := dao.IsAdminRole(userID)
if err != nil {
log.Errorf("error occurred in IsAdminRole: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if !isSysAdmin {
t.CustomAbort(http.StatusForbidden, http.StatusText(http.StatusForbidden))
}
}
// Ping validates whether the target is reachable and whether the credential is valid
func (t *TargetAPI) Ping() {
var endpoint, username, password string
idStr := t.GetString("id")
if len(idStr) != 0 {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
t.CustomAbort(http.StatusBadRequest, fmt.Sprintf("id %s is invalid", idStr))
}
target, err := dao.GetRepTarget(id)
if err != nil {
log.Errorf("failed to get target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if target == nil {
t.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
}
endpoint = target.URL
username = target.Username
password = target.Password
if len(password) != 0 {
password, err = utils.ReversibleDecrypt(password)
if err != nil {
log.Errorf("failed to decrypt password: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
} else {
endpoint = t.GetString("endpoint")
if len(endpoint) == 0 {
t.CustomAbort(http.StatusBadRequest, "id or endpoint is needed")
}
username = t.GetString("username")
password = t.GetString("password")
}
credential := auth.NewBasicAuthCredential(username, password)
registry, err := registry_util.NewRegistryWithCredential(endpoint, credential)
if err != nil {
// timeout, dns resolve error, connection refused, etc.
if urlErr, ok := err.(*url.Error); ok {
if netErr, ok := urlErr.Err.(net.Error); ok {
t.CustomAbort(http.StatusBadRequest, netErr.Error())
}
t.CustomAbort(http.StatusBadRequest, urlErr.Error())
}
log.Errorf("failed to create registry client: %#v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if err = registry.Ping(); err != nil {
if regErr, ok := err.(*registry_error.Error); ok {
t.CustomAbort(regErr.StatusCode, regErr.Detail)
}
log.Errorf("failed to ping registry %s: %v", registry.Endpoint.String(), err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// Get ...
func (t *TargetAPI) Get() {
id := t.getIDFromURL()
// list targets
if id == 0 {
targets, err := dao.GetAllRepTargets()
if err != nil {
log.Errorf("failed to get all targets: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
for _, target := range targets {
target.Password = ""
}
t.Data["json"] = targets
t.ServeJSON()
return
}
target, err := dao.GetRepTarget(id)
if err != nil {
log.Errorf("failed to get target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if target == nil {
t.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
}
if len(target.Password) != 0 {
pwd, err := utils.ReversibleDecrypt(target.Password)
if err != nil {
log.Errorf("failed to decrypt password: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
target.Password = pwd
}
t.Data["json"] = target
t.ServeJSON()
}
// Post ...
func (t *TargetAPI) Post() {
target := &models.RepTarget{}
t.DecodeJSONReq(target)
if len(target.Name) == 0 || len(target.URL) == 0 {
t.CustomAbort(http.StatusBadRequest, "name or URL is nil")
}
if len(target.Password) != 0 {
target.Password = utils.ReversibleEncrypt(target.Password)
}
id, err := dao.AddRepTarget(*target)
if err != nil {
log.Errorf("failed to add target: %v", err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
t.Redirect(http.StatusCreated, strconv.FormatInt(id, 10))
}
// Put ...
func (t *TargetAPI) Put() {
id := t.getIDFromURL()
if id == 0 {
t.CustomAbort(http.StatusBadRequest, http.StatusText(http.StatusBadRequest))
}
target := &models.RepTarget{}
t.DecodeJSONReq(target)
if target.ID == 0 {
target.ID = id
}
if len(target.Password) != 0 {
target.Password = utils.ReversibleEncrypt(target.Password)
}
if err := dao.UpdateRepTarget(*target); err != nil {
log.Errorf("failed to update target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
// Delete ...
func (t *TargetAPI) Delete() {
id := t.getIDFromURL()
if id == 0 {
t.CustomAbort(http.StatusBadRequest, http.StatusText(http.StatusBadRequest))
}
target, err := dao.GetRepTarget(id)
if err != nil {
log.Errorf("failed to get target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
if target == nil {
t.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound))
}
if err = dao.DeleteRepTarget(id); err != nil {
log.Errorf("failed to delete target %d: %v", id, err)
t.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
}
}
func (t *TargetAPI) getIDFromURL() int64 {
idStr := t.Ctx.Input.Param(":id")
if len(idStr) == 0 {
return 0
}
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
t.CustomAbort(http.StatusBadRequest, "invalid ID in request URL")
}
return id
}

View File

@ -16,6 +16,14 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
@ -81,3 +89,142 @@ func checkUserExists(name string) int {
}
return 0
}
// TriggerReplication triggers the replication according to the policy
func TriggerReplication(policyID int64, repository string,
tags []string, operation string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Repo string `json:"repository"`
Operation string `json:"operation"`
TagList []string `json:"tags"`
}{
PolicyID: policyID,
Repo: repository,
TagList: tags,
Operation: operation,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationURL()
resp, err := http.DefaultClient.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}
// GetPoliciesByRepository returns policies according the repository
func GetPoliciesByRepository(repository string) ([]*models.RepPolicy, error) {
repository = strings.TrimSpace(repository)
repository = strings.TrimRight(repository, "/")
projectName := repository[:strings.LastIndex(repository, "/")]
project, err := dao.GetProjectByName(projectName)
if err != nil {
return nil, err
}
policies, err := dao.GetRepPolicyByProject(project.ProjectID)
if err != nil {
return nil, err
}
return policies, nil
}
// TriggerReplicationByRepository triggers the replication according to the repository
func TriggerReplicationByRepository(repository string, tags []string, operation string) {
policies, err := GetPoliciesByRepository(repository)
if err != nil {
log.Errorf("failed to get policies for repository %s: %v", repository, err)
return
}
for _, policy := range policies {
if err := TriggerReplication(policy.ID, repository, tags, operation); err != nil {
log.Errorf("failed to trigger replication of policy %d for %s: %v", policy.ID, repository, err)
} else {
log.Infof("replication of policy %d for %s triggered", policy.ID, repository)
}
}
}
func postReplicationAction(policyID int64, acton string) error {
data := struct {
PolicyID int64 `json:"policy_id"`
Action string `json:"action"`
}{
PolicyID: policyID,
Action: acton,
}
b, err := json.Marshal(&data)
if err != nil {
return err
}
url := buildReplicationActionURL()
resp, err := http.DefaultClient.Post(url, "application/json", bytes.NewBuffer(b))
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}
func buildReplicationURL() string {
url := getJobServiceURL()
return fmt.Sprintf("%s/api/jobs/replication", url)
}
func buildJobLogURL(jobID string) string {
url := getJobServiceURL()
return fmt.Sprintf("%s/api/jobs/replication/%s/log", url, jobID)
}
func buildReplicationActionURL() string {
url := getJobServiceURL()
return fmt.Sprintf("%s/api/jobs/replication/actions", url)
}
func getJobServiceURL() string {
url := os.Getenv("JOB_SERVICE_URL")
url = strings.TrimSpace(url)
url = strings.TrimRight(url, "/")
if len(url) == 0 {
url = "http://jobservice"
}
return url
}

View File

@ -65,6 +65,7 @@ func GenerateRandomString() (string, error) {
//InitDB initializes the database
func InitDB() {
// orm.Debug = true
orm.RegisterDriver("mysql", orm.DRMySQL)
addr := os.Getenv("MYSQL_HOST")
port := os.Getenv("MYSQL_PORT")

View File

@ -20,20 +20,18 @@ import (
"testing"
"time"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/models"
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
func execUpdate(o orm.Ormer, sql string, params interface{}) error {
func execUpdate(o orm.Ormer, sql string, params ...interface{}) error {
p, err := o.Raw(sql).Prepare()
if err != nil {
return err
}
defer p.Close()
_, err = p.Exec(params)
_, err = p.Exec(params...)
if err != nil {
return err
}
@ -97,6 +95,19 @@ func clearUp(username string) {
o.Rollback()
log.Error(err)
}
err = execUpdate(o, `delete from replication_job where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_policy where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_target where id < 99`)
if err != nil {
log.Error(err)
}
o.Commit()
}
@ -716,6 +727,332 @@ func TestDeleteUser(t *testing.T) {
}
}
var targetID, policyID, policyID2, policyID3, jobID, jobID2, jobID3 int64
func TestAddRepTarget(t *testing.T) {
target := models.RepTarget{
URL: "127.0.0.1:5000",
Username: "admin",
Password: "admin",
}
//_, err := AddRepTarget(target)
id, err := AddRepTarget(target)
t.Logf("added target, id: %d", id)
if err != nil {
t.Errorf("Error occurred in AddRepTarget: %v", err)
} else {
targetID = id
}
id2 := id + 99
tgt, err := GetRepTarget(id2)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id2)
}
if tgt != nil {
t.Errorf("There should not be a target with id: %d", id2)
}
tgt, err = GetRepTarget(id)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, id)
}
if tgt == nil {
t.Errorf("Unable to find a target with id: %d", id)
}
if tgt.URL != "127.0.0.1:5000" {
t.Errorf("Unexpected url in target: %s, expected 127.0.0.1:5000", tgt.URL)
}
if tgt.Username != "admin" {
t.Errorf("Unexpected username in target: %s, expected admin", tgt.Username)
}
}
func TestAddRepPolicy(t *testing.T) {
policy := models.RepPolicy{
ProjectID: 1,
Enabled: 1,
TargetID: targetID,
Description: "whatever",
Name: "mypolicy",
}
id, err := AddRepPolicy(policy)
t.Logf("added policy, id: %d", id)
if err != nil {
t.Errorf("Error occurred in AddRepPolicy: %v", err)
} else {
policyID = id
}
p, err := GetRepPolicy(id)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, id)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", id)
}
if p.Name != "mypolicy" || p.TargetID != targetID || p.Enabled != 1 || p.Description != "whatever" {
t.Errorf("The data does not match, expected: Name: mypolicy, TargetID: %d, Enabled: 1, Description: whatever;\n result: Name: %s, TargetID: %d, Enabled: %d, Description: %s",
targetID, p.Name, p.TargetID, p.Enabled, p.Description)
}
var tm = time.Now().AddDate(0, 0, -1)
if !p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
}
func TestDisableRepPolicy(t *testing.T) {
err := DisableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 1 {
t.Errorf("The Enabled value of replication policy is still 1 after disabled, id: %d", policyID)
}
}
func TestEnableRepPolicy(t *testing.T) {
err := EnableRepPolicy(policyID)
if err != nil {
t.Errorf("Failed to disable policy, id: %d", policyID)
}
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID)
}
if p.Enabled == 0 {
t.Errorf("The Enabled value of replication policy is still 0 after disabled, id: %d", policyID)
}
}
func TestAddRepPolicy2(t *testing.T) {
policy2 := models.RepPolicy{
ProjectID: 3,
Enabled: 0,
TargetID: 3,
Description: "whatever",
Name: "mypolicy",
}
policyID2, err := AddRepPolicy(policy2)
t.Logf("added policy, id: %d", policyID2)
if err != nil {
t.Errorf("Error occurred in AddRepPolicy: %v", err)
}
p, err := GetRepPolicy(policyID2)
if err != nil {
t.Errorf("Error occurred in GetPolicy: %v, id: %d", err, policyID2)
}
if p == nil {
t.Errorf("Unable to find a policy with id: %d", policyID2)
}
var tm time.Time
if p.StartTime.After(tm) {
t.Errorf("Unexpected start_time: %v", p.StartTime)
}
}
func TestAddRepJob(t *testing.T) {
job := models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
TagList: []string{"12.01", "14.04", "latest"},
}
id, err := AddRepJob(job)
if err != nil {
t.Errorf("Error occurred in AddRepJob: %v", err)
return
}
jobID = id
j, err := GetRepJob(id)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, id)
return
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", id)
return
}
if j.Status != models.JobPending || j.Repository != "library/ubuntu" || j.PolicyID != policyID || j.Operation != "transfer" || len(j.TagList) != 3 {
t.Errorf("Expected data of job, id: %d, Status: %s, Repository: library/ubuntu, PolicyID: %d, Operation: transfer, taglist length 3"+
"but in returned data:, Status: %s, Repository: %s, Operation: %s, PolicyID: %d, TagList: %v", id, models.JobPending, policyID, j.Status, j.Repository, j.Operation, j.PolicyID, j.TagList)
return
}
}
func TestUpdateRepJobStatus(t *testing.T) {
err := UpdateRepJobStatus(jobID, models.JobFinished)
if err != nil {
t.Errorf("Error occured in UpdateRepJobStatus, error: %v, id: %d", err, jobID)
return
}
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occurred in GetRepJob: %v, id: %d", err, jobID)
}
if j == nil {
t.Errorf("Unable to find a job with id: %d", jobID)
}
if j.Status != models.JobFinished {
t.Errorf("Job's status: %s, expected: %s, id: %d", j.Status, models.JobFinished, jobID)
}
err = UpdateRepJobStatus(jobID, models.JobPending)
if err != nil {
t.Errorf("Error occured in UpdateRepJobStatus when update it back to status pending, error: %v, id: %d", err, jobID)
return
}
}
func TestGetRepPolicyByProject(t *testing.T) {
p1, err := GetRepPolicyByProject(99)
if err != nil {
t.Errorf("Error occured in GetRepPolicyByProject:%v, project ID: %d", err, 99)
return
}
if len(p1) > 0 {
t.Errorf("Unexpected length of policy list, expected: 0, in fact: %d, project id: %d", len(p1), 99)
return
}
p2, err := GetRepPolicyByProject(1)
if err != nil {
t.Errorf("Error occuered in GetRepPolicyByProject:%v, project ID: %d", err, 2)
return
}
if len(p2) != 1 {
t.Errorf("Unexpected length of policy list, expected: 1, in fact: %d, project id: %d", len(p2), 1)
return
}
if p2[0].ID != policyID {
t.Errorf("Unexpecred policy id in result, expected: %d, in fact: %d", policyID, p2[0].ID)
return
}
}
func TestGetRepJobByPolicy(t *testing.T) {
jobs, err := GetRepJobByPolicy(999)
if err != nil {
log.Errorf("Error occured in GetRepJobByPolicy: %v, policy ID: %d", err, 999)
return
}
if len(jobs) > 0 {
log.Errorf("Unexpected length of jobs, expected: 0, in fact: %d", len(jobs))
return
}
jobs, err = GetRepJobByPolicy(policyID)
if err != nil {
log.Errorf("Error occured in GetRepJobByPolicy: %v, policy ID: %d", err, policyID)
return
}
if len(jobs) != 1 {
log.Errorf("Unexpected length of jobs, expected: 1, in fact: %d", len(jobs))
return
}
if jobs[0].ID != jobID {
log.Errorf("Unexpected job ID in the result, expected: %d, in fact: %d", jobID, jobs[0].ID)
return
}
}
func TestGetRepoJobToStop(t *testing.T) {
jobs := [...]models.RepJob{
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobRunning,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobFinished,
},
models.RepJob{
Repository: "library/ubuntu",
PolicyID: policyID,
Operation: "transfer",
Status: models.JobCanceled,
},
}
var err error
for _, j := range jobs {
_, err = AddRepJob(j)
if err != nil {
log.Errorf("Failed to add Job: %+v, error: %v", j, err)
return
}
}
res, err := GetRepJobToStop(policyID)
if err != nil {
log.Errorf("Failed to Get Jobs, error: %v", err)
return
}
//time.Sleep(15 * time.Second)
if len(res) != 2 {
log.Errorf("Expected length of stoppable jobs, expected:2, in fact: %d", len(res))
return
}
}
func TestDeleteRepTarget(t *testing.T) {
err := DeleteRepTarget(targetID)
if err != nil {
t.Errorf("Error occured in DeleteRepTarget: %v, id: %d", err, targetID)
return
}
t.Logf("deleted target, id: %d", targetID)
tgt, err := GetRepTarget(targetID)
if err != nil {
t.Errorf("Error occurred in GetTarget: %v, id: %d", err, targetID)
}
if tgt != nil {
t.Errorf("Able to find target after deletion, id: %d", targetID)
}
}
func TestDeleteRepPolicy(t *testing.T) {
err := DeleteRepPolicy(policyID)
if err != nil {
t.Errorf("Error occured in DeleteRepPolicy: %v, id: %d", err, policyID)
return
}
t.Logf("delete rep policy, id: %d", policyID)
p, err := GetRepPolicy(policyID)
if err != nil {
t.Errorf("Error occured in GetRepPolicy:%v", err)
}
if p != nil {
t.Errorf("Able to find rep policy after deletion, id: %d", policyID)
}
}
func TestDeleteRepJob(t *testing.T) {
err := DeleteRepJob(jobID)
if err != nil {
t.Errorf("Error occured in DeleteRepJob: %v, id: %d", err, jobID)
return
}
t.Logf("deleted rep job, id: %d", jobID)
j, err := GetRepJob(jobID)
if err != nil {
t.Errorf("Error occured in GetRepJob:%v", err)
}
if j != nil {
t.Errorf("Able to find rep job after deletion, id: %d", jobID)
}
}
func TestGetOrmer(t *testing.T) {
o := GetOrmer()
if o == nil {

198
dao/replication_job.go Normal file
View File

@ -0,0 +1,198 @@
package dao
import (
"fmt"
"strings"
"github.com/astaxie/beego/orm"
"github.com/vmware/harbor/models"
)
// AddRepTarget ...
func AddRepTarget(target models.RepTarget) (int64, error) {
o := orm.NewOrm()
return o.Insert(&target)
}
// GetRepTarget ...
func GetRepTarget(id int64) (*models.RepTarget, error) {
o := orm.NewOrm()
t := models.RepTarget{ID: id}
err := o.Read(&t)
if err == orm.ErrNoRows {
return nil, nil
}
return &t, err
}
// DeleteRepTarget ...
func DeleteRepTarget(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepTarget{ID: id})
return err
}
// UpdateRepTarget ...
func UpdateRepTarget(target models.RepTarget) error {
o := orm.NewOrm()
if len(target.Password) != 0 {
_, err := o.Update(&target)
return err
}
_, err := o.Update(&target, "URL", "Name", "Username")
return err
}
// GetAllRepTargets ...
func GetAllRepTargets() ([]*models.RepTarget, error) {
o := orm.NewOrm()
qs := o.QueryTable(&models.RepTarget{})
var targets []*models.RepTarget
_, err := qs.All(&targets)
return targets, err
}
// AddRepPolicy ...
func AddRepPolicy(policy models.RepPolicy) (int64, error) {
o := orm.NewOrm()
sqlTpl := `insert into replication_policy (name, project_id, target_id, enabled, description, cron_str, start_time, creation_time, update_time ) values (?, ?, ?, ?, ?, ?, %s, NOW(), NOW())`
var sql string
if policy.Enabled == 1 {
sql = fmt.Sprintf(sqlTpl, "NOW()")
} else {
sql = fmt.Sprintf(sqlTpl, "NULL")
}
p, err := o.Raw(sql).Prepare()
if err != nil {
return 0, err
}
r, err := p.Exec(policy.Name, policy.ProjectID, policy.TargetID, policy.Enabled, policy.Description, policy.CronStr)
if err != nil {
return 0, err
}
id, err := r.LastInsertId()
return id, err
}
// GetRepPolicy ...
func GetRepPolicy(id int64) (*models.RepPolicy, error) {
o := orm.NewOrm()
p := models.RepPolicy{ID: id}
err := o.Read(&p)
if err == orm.ErrNoRows {
return nil, nil
}
return &p, err
}
// GetRepPolicyByProject ...
func GetRepPolicyByProject(projectID int64) ([]*models.RepPolicy, error) {
var res []*models.RepPolicy
o := orm.NewOrm()
_, err := o.QueryTable("replication_policy").Filter("project_id", projectID).All(&res)
return res, err
}
// DeleteRepPolicy ...
func DeleteRepPolicy(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepPolicy{ID: id})
return err
}
// UpdateRepPolicyEnablement ...
func UpdateRepPolicyEnablement(id int64, enabled int) error {
o := orm.NewOrm()
p := models.RepPolicy{
ID: id,
Enabled: enabled}
_, err := o.Update(&p, "Enabled")
return err
}
// EnableRepPolicy ...
func EnableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 1)
}
// DisableRepPolicy ...
func DisableRepPolicy(id int64) error {
return UpdateRepPolicyEnablement(id, 0)
}
// AddRepJob ...
func AddRepJob(job models.RepJob) (int64, error) {
o := orm.NewOrm()
if len(job.Status) == 0 {
job.Status = models.JobPending
}
if len(job.TagList) > 0 {
job.Tags = strings.Join(job.TagList, ",")
}
return o.Insert(&job)
}
// GetRepJob ...
func GetRepJob(id int64) (*models.RepJob, error) {
o := orm.NewOrm()
j := models.RepJob{ID: id}
err := o.Read(&j)
if err == orm.ErrNoRows {
return nil, nil
}
genTagListForJob(&j)
return &j, nil
}
// GetRepJobByPolicy ...
func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).All(&res)
genTagListForJob(res...)
return res, err
}
// GetRepJobToStop get jobs that are possibly being handled by workers of a certain policy.
func GetRepJobToStop(policyID int64) ([]*models.RepJob, error) {
var res []*models.RepJob
_, err := repJobPolicyIDQs(policyID).Filter("status__in", models.JobPending, models.JobRunning).All(&res)
genTagListForJob(res...)
return res, err
}
func repJobPolicyIDQs(policyID int64) orm.QuerySeter {
o := orm.NewOrm()
return o.QueryTable("replication_job").Filter("policy_id", policyID)
}
// DeleteRepJob ...
func DeleteRepJob(id int64) error {
o := orm.NewOrm()
_, err := o.Delete(&models.RepJob{ID: id})
return err
}
// UpdateRepJobStatus ...
func UpdateRepJobStatus(id int64, status string) error {
o := orm.NewOrm()
j := models.RepJob{
ID: id,
Status: status,
}
num, err := o.Update(&j, "Status")
if num == 0 {
err = fmt.Errorf("Failed to update replication job with id: %d %s", id, err.Error())
}
return err
}
func genTagListForJob(jobs ...*models.RepJob) {
for _, j := range jobs {
if len(j.Tags) > 0 {
j.TagList = strings.Split(j.Tags, ",")
}
}
}

79
job/config/config.go Normal file
View File

@ -0,0 +1,79 @@
package config
import (
"fmt"
"os"
"strconv"
"github.com/vmware/harbor/utils/log"
)
const defaultMaxWorkers int = 10
var maxJobWorkers int
var localURL string
var logDir string
var uiSecret string
func init() {
maxWorkersEnv := os.Getenv("MAX_JOB_WORKERS")
maxWorkers64, err := strconv.ParseInt(maxWorkersEnv, 10, 32)
maxJobWorkers = int(maxWorkers64)
if err != nil {
log.Warningf("Failed to parse max works setting, error: %v, the default value: %d will be used", err, defaultMaxWorkers)
maxJobWorkers = defaultMaxWorkers
}
localURL = os.Getenv("HARBOR_URL")
if len(localURL) == 0 {
localURL = "http://registry:5000/"
}
logDir = os.Getenv("LOG_DIR")
if len(logDir) == 0 {
logDir = "/var/log"
}
f, err := os.Open(logDir)
defer f.Close()
if err != nil {
panic(err)
}
finfo, err := f.Stat()
if err != nil {
panic(err)
}
if !finfo.IsDir() {
panic(fmt.Sprintf("%s is not a direcotry", logDir))
}
uiSecret = os.Getenv("UI_SECRET")
if len(uiSecret) == 0 {
panic("UI Secret is not set")
}
log.Debugf("config: maxJobWorkers: %d", maxJobWorkers)
log.Debugf("config: localHarborURL: %s", localURL)
log.Debugf("config: logDir: %s", logDir)
log.Debugf("config: uiSecret: ******")
}
// MaxJobWorkers ...
func MaxJobWorkers() int {
return maxJobWorkers
}
// LocalHarborURL returns the local registry url, job service will use this URL to pull manifest and repository.
func LocalHarborURL() string {
return localURL
}
// LogDir returns the absolute path to which the log file will be written
func LogDir() string {
return logDir
}
// UISecret will return the value of secret cookie for jobsevice to call UI API.
func UISecret() string {
return uiSecret
}

119
job/replication/delete.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
const (
// StateDelete ...
StateDelete = "delete"
)
// Deleter deletes repository or tags
type Deleter struct {
repository string // prject_name/repo_name
tags []string
dstURL string // url of target registry
dstUsr string // username ...
dstPwd string // username ...
logger *log.Logger
}
// NewDeleter returns a Deleter
func NewDeleter(repository string, tags []string, dstURL, dstUsr, dstPwd string, logger *log.Logger) *Deleter {
deleter := &Deleter{
repository: repository,
tags: tags,
dstURL: dstURL,
dstUsr: dstUsr,
dstPwd: dstPwd,
logger: logger,
}
deleter.logger.Infof("initialization completed: repository: %s, tags: %v, destination URL: %s, destination user: %s",
deleter.repository, deleter.tags, deleter.dstURL, deleter.dstUsr)
return deleter
}
// Exit ...
func (d *Deleter) Exit() error {
return nil
}
// Enter deletes repository or tags
func (d *Deleter) Enter() (string, error) {
url := strings.TrimRight(d.dstURL, "/") + "/api/repositories/"
// delete repository
if len(d.tags) == 0 {
u := url + "?repo_name=" + d.repository
if err := del(u, d.dstUsr, d.dstPwd); err != nil {
d.logger.Errorf("an error occurred while deleting repository %s on %s with user %s: %v", d.repository, d.dstURL, d.dstUsr, err)
return "", err
}
d.logger.Infof("repository %s on %s has been deleted", d.repository, d.dstURL)
return models.JobFinished, nil
}
// delele tags
for _, tag := range d.tags {
u := url + "?repo_name=" + d.repository + "&tag=" + tag
if err := del(u, d.dstUsr, d.dstPwd); err != nil {
d.logger.Errorf("an error occurred while deleting repository %s:%s on %s with user %s: %v", d.repository, tag, d.dstURL, d.dstUsr, err)
return "", err
}
d.logger.Infof("repository %s:%s on %s has been deleted", d.repository, tag, d.dstURL)
}
return models.JobFinished, nil
}
func del(url, username, password string) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return err
}
req.SetBasicAuth(username, password)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("%d %s", resp.StatusCode, string(b))
}

76
job/replication/runner.go Normal file
View File

@ -0,0 +1,76 @@
package replication
/*
import (
"encoding/json"
//"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
"github.com/vmware/harbor/models"
"time"
)
const (
jobType = "transfer_img_out"
)
type Runner struct {
job.JobSM
Logger job.Logger
parm ImgOutParm
}
type ImgPuller struct {
job.DummyHandler
img string
logger job.Logger
}
func (ip ImgPuller) Enter() (string, error) {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return "push-img", nil
}
type ImgPusher struct {
job.DummyHandler
targetURL string
logger job.Logger
}
func (ip ImgPusher) Enter() (string, error) {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return job.JobContinue, nil
}
func init() {
job.Register(jobType, Runner{})
}
func (r Runner) Run(je models.JobEntry) error {
err := r.init(je)
if err != nil {
return err
}
r.Start(job.JobRunning)
return nil
}
func (r *Runner) init(je models.JobEntry) error {
r.JobID = je.ID
r.InitJobSM()
err := json.Unmarshal([]byte(je.ParmsStr), &r.parm)
if err != nil {
return err
}
r.Logger = job.Logger{je.ID}
r.AddTransition(job.JobRunning, "pull-img", ImgPuller{DummyHandler: job.DummyHandler{JobID: r.JobID}, img: r.parm.Image, logger: r.Logger})
//only handle on target for now
url := r.parm.Targets[0].URL
r.AddTransition("pull-img", "push-img", ImgPusher{DummyHandler: job.DummyHandler{JobID: r.JobID}, targetURL: url, logger: r.Logger})
r.AddTransition("push-img", job.JobFinished, job.StatusUpdater{job.DummyHandler{JobID: r.JobID}, job.JobFinished})
return nil
}
*/

399
job/replication/transfer.go Normal file
View File

@ -0,0 +1,399 @@
/*
Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry"
"github.com/vmware/harbor/utils/registry/auth"
)
const (
// StateCheck ...
StateCheck = "check"
// StatePullManifest ...
StatePullManifest = "pull_manifest"
// StateTransferBlob ...
StateTransferBlob = "transfer_blob"
// StatePushManifest ...
StatePushManifest = "push_manifest"
)
// BaseHandler holds informations shared by other state handlers
type BaseHandler struct {
project string // project_name
repository string // prject_name/repo_name
tags []string
srcURL string // url of source registry
dstURL string // url of target registry
dstUsr string // username ...
dstPwd string // password ...
srcClient *registry.Repository
dstClient *registry.Repository
manifest distribution.Manifest // manifest of tags[0]
blobs []string // blobs need to be transferred for tags[0]
blobsExistence map[string]bool //key: digest of blob, value: existence
logger *log.Logger
}
// InitBaseHandler initializes a BaseHandler: creating clients for source and destination registry,
// listing tags of the repository if parameter tags is nil.
func InitBaseHandler(repository, srcURL, srcSecret,
dstURL, dstUsr, dstPwd string, tags []string, logger *log.Logger) (*BaseHandler, error) {
logger.Infof("initializing: repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
repository, tags, srcURL, dstURL, dstUsr)
base := &BaseHandler{
repository: repository,
tags: tags,
srcURL: srcURL,
dstURL: dstURL,
dstUsr: dstUsr,
dstPwd: dstPwd,
blobsExistence: make(map[string]bool, 10),
logger: logger,
}
base.project = getProjectName(base.repository)
c := &http.Cookie{Name: models.UISecretCookie, Value: srcSecret}
srcCred := auth.NewCookieCredential(c)
// srcCred := auth.NewBasicAuthCredential("admin", "Harbor12345")
srcClient, err := registry.NewRepositoryWithCredential(base.repository, base.srcURL, srcCred)
if err != nil {
return nil, err
}
base.srcClient = srcClient
dstCred := auth.NewBasicAuthCredential(base.dstUsr, base.dstPwd)
dstClient, err := registry.NewRepositoryWithCredential(base.repository, base.dstURL, dstCred)
if err != nil {
return nil, err
}
base.dstClient = dstClient
if len(base.tags) == 0 {
tags, err := base.srcClient.ListTag()
if err != nil {
return nil, err
}
base.tags = tags
}
base.logger.Infof("initialization completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
base.project, base.repository, base.tags, base.srcURL, base.dstURL, base.dstUsr)
return base, nil
}
// Exit ...
func (b *BaseHandler) Exit() error {
return nil
}
func getProjectName(repository string) string {
repository = strings.TrimSpace(repository)
repository = strings.TrimRight(repository, "/")
return repository[:strings.LastIndex(repository, "/")]
}
// Checker checks the existence of project and the user's privlege to the project
type Checker struct {
*BaseHandler
}
// Enter check existence of project, if it does not exist, create it,
// if it exists, check whether the user has write privilege to it.
func (c *Checker) Enter() (string, error) {
exist, canWrite, err := c.projectExist()
if err != nil {
c.logger.Errorf("an error occurred while checking existence of project %s on %s with user %s : %v", c.project, c.dstURL, c.dstUsr, err)
return "", err
}
if !exist {
if err := c.createProject(); err != nil {
c.logger.Errorf("an error occurred while creating project %s on %s with user %s : %v", c.project, c.dstURL, c.dstUsr, err)
return "", err
}
c.logger.Infof("project %s is created on %s with user %s", c.project, c.dstURL, c.dstUsr)
return StatePullManifest, nil
}
c.logger.Infof("project %s already exists on %s", c.project, c.dstURL)
if !canWrite {
err = fmt.Errorf("the user %s is unauthorized to write to project %s on %s", c.dstUsr, c.project, c.dstURL)
c.logger.Errorf("%v", err)
return "", err
}
c.logger.Infof("the user %s has write privilege to project %s on %s", c.dstUsr, c.project, c.dstURL)
return StatePullManifest, nil
}
// check the existence of project, if it exists, returning whether the user has write privilege to it
func (c *Checker) projectExist() (exist, canWrite bool, err error) {
url := strings.TrimRight(c.dstURL, "/") + "/api/projects/?project_name=" + c.project
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return
}
req.SetBasicAuth(c.dstUsr, c.dstPwd)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
if resp.StatusCode == http.StatusNotFound {
return
}
if resp.StatusCode == http.StatusUnauthorized {
exist = true
return
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
if resp.StatusCode == http.StatusOK {
projects := make([]models.Project, 1)
if err = json.Unmarshal(data, &projects); err != nil {
return
}
if len(projects) == 0 {
return
}
for _, project := range projects {
if project.Name == c.project {
exist = true
canWrite = (project.Role == models.PROJECTADMIN ||
project.Role == models.DEVELOPER)
break
}
}
return
}
err = fmt.Errorf("an error occurred while checking existen of project %s on %s with user %s: %d %s",
c.project, c.dstURL, c.dstUsr, resp.StatusCode, string(data))
return
}
func (c *Checker) createProject() error {
// TODO handle publicity of project
project := struct {
ProjectName string `json:"project_name"`
Public bool `json:"public"`
}{
ProjectName: c.project,
}
data, err := json.Marshal(project)
if err != nil {
return err
}
url := strings.TrimRight(c.dstURL, "/") + "/api/projects/"
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
if err != nil {
return err
}
req.SetBasicAuth(c.dstUsr, c.dstPwd)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusCreated {
defer resp.Body.Close()
message, err := ioutil.ReadAll(resp.Body)
if err != nil {
c.logger.Errorf("an error occurred while reading message from response: %v", err)
}
return fmt.Errorf("failed to create project %s on %s with user %s: %d %s",
c.project, c.dstURL, c.dstUsr, resp.StatusCode, string(message))
}
return nil
}
// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled,
// the next state that state machine should enter is "finished".
type ManifestPuller struct {
*BaseHandler
}
// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry
func (m *ManifestPuller) Enter() (string, error) {
if len(m.tags) == 0 {
m.logger.Infof("no tag needs to be replicated, next state is \"finished\"")
return models.JobFinished, nil
}
name := m.repository
tag := m.tags[0]
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
digest, mediaType, payload, err := m.srcClient.PullManifest(tag, acceptMediaTypes)
if err != nil {
m.logger.Errorf("an error occurred while pulling manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
return "", err
}
m.logger.Infof("manifest of %s:%s pulled successfully from %s: %s", name, tag, m.srcURL, digest)
if strings.Contains(mediaType, "application/json") {
mediaType = schema1.MediaTypeManifest
}
manifest, _, err := registry.UnMarshal(mediaType, payload)
if err != nil {
m.logger.Errorf("an error occurred while parsing manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
return "", err
}
m.manifest = manifest
// all blobs(layers and config)
var blobs []string
for _, discriptor := range manifest.References() {
blobs = append(blobs, discriptor.Digest.String())
}
// config is also need to be transferred if the schema of manifest is v2
manifest2, ok := manifest.(*schema2.DeserializedManifest)
if ok {
blobs = append(blobs, manifest2.Target().Digest.String())
}
m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs)
for _, blob := range blobs {
exist, ok := m.blobsExistence[blob]
if !ok {
exist, err = m.dstClient.BlobExist(blob)
if err != nil {
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err)
return "", err
}
m.blobsExistence[blob] = exist
}
if !exist {
m.blobs = append(m.blobs, blob)
} else {
m.logger.Infof("blob %s of %s:%s already exists in %s", blob, name, tag, m.dstURL)
}
}
m.logger.Infof("blobs of %s:%s need to be transferred to %s: %v", name, tag, m.dstURL, m.blobs)
return StateTransferBlob, nil
}
// BlobTransfer transfers blobs of a tag
type BlobTransfer struct {
*BaseHandler
}
// Enter pulls blobs and then pushs them to destination registry.
func (b *BlobTransfer) Enter() (string, error) {
name := b.repository
tag := b.tags[0]
for _, blob := range b.blobs {
b.logger.Infof("transferring blob %s of %s:%s to %s ...", blob, name, tag, b.dstURL)
size, data, err := b.srcClient.PullBlob(blob)
if err != nil {
b.logger.Errorf("an error occurred while pulling blob %s of %s:%s from %s: %v", blob, name, tag, b.srcURL, err)
return "", err
}
if err = b.dstClient.PushBlob(blob, size, data); err != nil {
b.logger.Errorf("an error occurred while pushing blob %s of %s:%s to %s : %v", blob, name, tag, b.dstURL, err)
return "", err
}
b.logger.Infof("blob %s of %s:%s transferred to %s completed", blob, name, tag, b.dstURL)
}
return StatePushManifest, nil
}
// ManifestPusher pushs the manifest to destination registry
type ManifestPusher struct {
*BaseHandler
}
// Enter checks the existence of manifest in the source registry first, and if it
// exists, pushs it to destination registry. The checking operation is to avoid
// the situation that the tag is deleted during the blobs transfering
func (m *ManifestPusher) Enter() (string, error) {
name := m.repository
tag := m.tags[0]
_, exist, err := m.srcClient.ManifestExist(tag)
if err != nil {
m.logger.Infof("an error occurred while checking the existence of manifest of %s:%s on %s: %v", name, tag, m.srcURL, err)
return "", err
}
if !exist {
m.logger.Infof("manifest of %s:%s does not exist on source registry %s, cancel manifest pushing", name, tag, m.srcURL)
} else {
m.logger.Infof("manifest of %s:%s exists on source registry %s, continue manifest pushing", name, tag, m.srcURL)
mediaType, data, err := m.manifest.Payload()
if err != nil {
m.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", name, tag, err)
return "", err
}
if _, err = m.dstClient.PushManifest(tag, mediaType, data); err != nil {
m.logger.Errorf("an error occurred while pushing manifest of %s:%s to %s : %v", name, tag, m.dstURL, err)
return "", err
}
m.logger.Infof("manifest of %s:%s has been pushed to %s", name, tag, m.dstURL)
}
m.tags = m.tags[1:]
m.manifest = nil
m.blobs = nil
return StatePullManifest, nil
}

8
job/scheduler.go Normal file
View File

@ -0,0 +1,8 @@
package job
var jobQueue = make(chan int64)
// Schedule put a job id into job queue.
func Schedule(jobID int64) {
jobQueue <- jobID
}

85
job/statehandlers.go Normal file
View File

@ -0,0 +1,85 @@
package job
import (
"time"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
// StateHandler handles transition, it associates with each state, will be called when
// SM enters and exits a state during a transition.
type StateHandler interface {
// Enter returns the next state, if it returns empty string the SM will hold the current state or
// or decide the next state.
Enter() (string, error)
//Exit should be idempotent
Exit() error
}
// DummyHandler is the default implementation of StateHander interface, which has empty Enter and Exit methods.
type DummyHandler struct {
JobID int64
}
// Enter ...
func (dh DummyHandler) Enter() (string, error) {
return "", nil
}
// Exit ...
func (dh DummyHandler) Exit() error {
return nil
}
// StatusUpdater implements the StateHandler interface which updates the status of a job in DB when the job enters
// a status.
type StatusUpdater struct {
DummyHandler
State string
}
// Enter updates the status of a job and returns "_continue" status to tell state machine to move on.
// If the status is a final status it returns empty string and the state machine will be stopped.
func (su StatusUpdater) Enter() (string, error) {
err := dao.UpdateRepJobStatus(su.JobID, su.State)
if err != nil {
log.Warningf("Failed to update state of job: %d, state: %s, error: %v", su.JobID, su.State, err)
}
var next = models.JobContinue
if su.State == models.JobStopped || su.State == models.JobError || su.State == models.JobFinished {
next = ""
}
return next, err
}
// ImgPuller was for testing
type ImgPuller struct {
DummyHandler
img string
logger *log.Logger
}
// Enter ...
func (ip ImgPuller) Enter() (string, error) {
ip.logger.Infof("I'm pretending to pull img:%s, then sleep 30s", ip.img)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return "push-img", nil
}
// ImgPusher is a statehandler for testing
type ImgPusher struct {
DummyHandler
targetURL string
logger *log.Logger
}
// Enter ...
func (ip ImgPusher) Enter() (string, error) {
ip.logger.Infof("I'm pretending to push img to:%s, then sleep 30s", ip.targetURL)
time.Sleep(30 * time.Second)
ip.logger.Infof("wake up from sleep....")
return models.JobContinue, nil
}

269
job/statemachine.go Normal file
View File

@ -0,0 +1,269 @@
package job
import (
"fmt"
"sync"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/job/replication"
"github.com/vmware/harbor/job/utils"
"github.com/vmware/harbor/models"
uti "github.com/vmware/harbor/utils"
"github.com/vmware/harbor/utils/log"
)
// RepJobParm wraps the parm of a job
type RepJobParm struct {
LocalRegURL string
TargetURL string
TargetUsername string
TargetPassword string
Repository string
Tags []string
Enabled int
Operation string
}
// SM is the state machine to handle job, it handles one job at a time.
type SM struct {
JobID int64
CurrentState string
PreviousState string
//The states that don't have to exist in transition map, such as "Error", "Canceled"
ForcedStates map[string]struct{}
Transitions map[string]map[string]struct{}
Handlers map[string]StateHandler
desiredState string
Logger *log.Logger
Parms *RepJobParm
lock *sync.Mutex
}
// EnterState transit the statemachine from the current state to the state in parameter.
// It returns the next state the statemachine should tranit to.
func (sm *SM) EnterState(s string) (string, error) {
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
targets, ok := sm.Transitions[sm.CurrentState]
_, exist := targets[s]
_, isForced := sm.ForcedStates[s]
if !exist && !isForced {
return "", fmt.Errorf("Job id: %d, transition from %s to %s does not exist!", sm.JobID, sm.CurrentState, s)
}
exitHandler, ok := sm.Handlers[sm.CurrentState]
if ok {
if err := exitHandler.Exit(); err != nil {
return "", err
}
} else {
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
}
enterHandler, ok := sm.Handlers[s]
var next = models.JobContinue
var err error
if ok {
if next, err = enterHandler.Enter(); err != nil {
return "", err
}
} else {
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s)
}
sm.PreviousState = sm.CurrentState
sm.CurrentState = s
log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s)
return next, nil
}
// Start kicks off the statemachine to transit from current state to s, and moves on
// It will search the transit map if the next state is "_continue", and
// will enter error state if there's more than one possible path when next state is "_continue"
func (sm *SM) Start(s string) {
n, err := sm.EnterState(s)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
for len(n) > 0 && err == nil {
if d := sm.getDesiredState(); len(d) > 0 {
log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d)
n = d
sm.setDesiredState("")
continue
}
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
for n = range sm.Transitions[sm.CurrentState] {
break
}
log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n)
continue
}
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState]))
err = fmt.Errorf("Unable to continue")
break
}
n, err = sm.EnterState(n)
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
}
if err != nil {
log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err)
sm.EnterState(models.JobError)
}
}
// AddTransition add a transition to the transition table of state machine, the handler is the handler of target state "to"
func (sm *SM) AddTransition(from string, to string, h StateHandler) {
_, ok := sm.Transitions[from]
if !ok {
sm.Transitions[from] = make(map[string]struct{})
}
sm.Transitions[from][to] = struct{}{}
sm.Handlers[to] = h
}
// RemoveTransition removes a transition from transition table of the state machine
func (sm *SM) RemoveTransition(from string, to string) {
_, ok := sm.Transitions[from]
if !ok {
return
}
delete(sm.Transitions[from], to)
}
// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job
// and the worker can release itself to the workerpool.
func (sm *SM) Stop(id int64) {
log.Debugf("Trying to stop the job: %d", id)
sm.lock.Lock()
defer sm.lock.Unlock()
//need to check if the sm switched to other job
if id == sm.JobID {
sm.desiredState = models.JobStopped
log.Debugf("Desired state of job %d is set to stopped", id)
} else {
log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id)
}
}
func (sm *SM) getDesiredState() string {
sm.lock.Lock()
defer sm.lock.Unlock()
return sm.desiredState
}
func (sm *SM) setDesiredState(s string) {
sm.lock.Lock()
defer sm.lock.Unlock()
sm.desiredState = s
}
// Init initialzie the state machine, it will be called once in the lifecycle of state machine.
func (sm *SM) Init() {
sm.lock = &sync.Mutex{}
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.ForcedStates = map[string]struct{}{
models.JobError: struct{}{},
models.JobStopped: struct{}{},
models.JobCanceled: struct{}{},
}
}
// Reset resets the state machine so it will start handling another job.
func (sm *SM) Reset(jid int64) error {
//To ensure the new jobID is visible to the thread to stop the SM
sm.lock.Lock()
sm.JobID = jid
sm.desiredState = ""
sm.lock.Unlock()
//init parms
job, err := dao.GetRepJob(sm.JobID)
if err != nil {
return fmt.Errorf("Failed to get job, error: %v", err)
}
if job == nil {
return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID)
}
policy, err := dao.GetRepPolicy(job.PolicyID)
if err != nil {
return fmt.Errorf("Failed to get policy, error: %v", err)
}
if policy == nil {
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
}
sm.Parms = &RepJobParm{
LocalRegURL: config.LocalHarborURL(),
Repository: job.Repository,
Tags: job.TagList,
Enabled: policy.Enabled,
Operation: job.Operation,
}
if policy.Enabled == 0 {
//worker will cancel this job
return nil
}
target, err := dao.GetRepTarget(policy.TargetID)
if err != nil {
return fmt.Errorf("Failed to get target, error: %v", err)
}
if target == nil {
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
}
sm.Parms.TargetURL = target.URL
sm.Parms.TargetUsername = target.Username
pwd := target.Password
if len(pwd) != 0 {
pwd, err = uti.ReversibleDecrypt(pwd)
if err != nil {
return fmt.Errorf("failed to decrypt password: %v", err)
}
}
sm.Parms.TargetPassword = pwd
//init states handlers
sm.Logger = utils.NewLogger(sm.JobID)
sm.Handlers = make(map[string]StateHandler)
sm.Transitions = make(map[string]map[string]struct{})
sm.CurrentState = models.JobPending
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
switch sm.Parms.Operation {
case models.RepOpTransfer:
err = addImgTransferTransition(sm)
case models.RepOpDelete:
err = addImgDeleteTransition(sm)
default:
err = fmt.Errorf("unsupported operation: %s", sm.Parms.Operation)
}
return err
}
func addImgTransferTransition(sm *SM) error {
base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(),
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
sm.Parms.Tags, sm.Logger)
if err != nil {
return err
}
sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base})
sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base})
sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
return nil
}
func addImgDeleteTransition(sm *SM) error {
deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL,
sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Logger)
sm.AddTransition(models.JobRunning, replication.StateDelete, deleter)
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
return nil
}

27
job/utils/logger.go Normal file
View File

@ -0,0 +1,27 @@
package utils
import (
"fmt"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/utils/log"
"os"
"path/filepath"
)
// NewLogger create a logger for a speicified job
func NewLogger(jobID int64) *log.Logger {
logFile := GetJobLogPath(jobID)
f, err := os.OpenFile(logFile, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
if err != nil {
log.Errorf("Failed to open log file %s, the log of job %d will be printed to standard output", logFile, jobID)
f = os.Stdout
}
return log.New(f, log.NewTextFormatter(), log.InfoLevel)
}
// GetJobLogPath returns the absolute path in which the job log file is located.
func GetJobLogPath(jobID int64) string {
fn := fmt.Sprintf("job_%d.log", jobID)
return filepath.Join(config.LogDir(), fn)
}

123
job/workerpool.go Normal file
View File

@ -0,0 +1,123 @@
package job
import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
)
type workerPool struct {
workerChan chan *Worker
workerList []*Worker
}
// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
var WorkerPool *workerPool
// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker.
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
}
}
}
}
// Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine,
// the actual work to handle the job is done via state machine.
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
}
// Start is a loop worker gets id from its channel and handle it.
func (w *Worker) Start() {
go func() {
for {
WorkerPool.workerChan <- w
select {
case jobID := <-w.RepJobs:
log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
// Stop ...
func (w *Worker) Stop() {
go func() {
w.quit <- true
}()
}
func (w *Worker) handleRepJob(id int64) {
err := w.SM.Reset(id)
if err != nil {
log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err)
err2 := dao.UpdateRepJobStatus(id, models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2)
}
return
}
if w.SM.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled, will cancel the job")
_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
} else {
w.SM.Start(models.JobRunning)
}
}
// NewWorker returns a pointer to new instance of worker
func NewWorker(id int) *Worker {
w := &Worker{
ID: id,
RepJobs: make(chan int64),
quit: make(chan bool),
SM: &SM{},
}
w.SM.Init()
return w
}
// InitWorkerPool create workers according to configuration.
func InitWorkerPool() {
WorkerPool = &workerPool{
workerChan: make(chan *Worker, config.MaxJobWorkers()),
workerList: make([]*Worker, 0, config.MaxJobWorkers()),
}
for i := 0; i < config.MaxJobWorkers(); i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
}
// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it.
func Dispatch() {
for {
select {
case job := <-jobQueue:
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d", jobID)
worker := <-WorkerPool.workerChan
worker.RepJobs <- jobID
}(job)
}
}
}

5
jobservice/conf/app.conf Normal file
View File

@ -0,0 +1,5 @@
appname = jobservice
runmode = dev
[dev]
httpport = 80

10
jobservice/error.json Normal file
View File

@ -0,0 +1,10 @@
{
"job_type": "notexist",
"options": {
"whatever": "whatever"
},
"parms": {
"test": "test"
},
"cron_str": ""
}

15
jobservice/main.go Normal file
View File

@ -0,0 +1,15 @@
package main
import (
"github.com/astaxie/beego"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job"
)
func main() {
dao.InitDB()
initRouters()
job.InitWorkerPool()
go job.Dispatch()
beego.Run()
}

12
jobservice/my_start.sh Executable file
View File

@ -0,0 +1,12 @@
export MYSQL_HOST=127.0.0.1
export MYSQL_PORT=3306
export MYSQL_USR=root
export MYSQL_PWD=root123
export LOG_LEVEL=debug
export LOCAL_HARBOR_URL=http://127.0.0.1/
export UI_SECRET=abcdef
#export UI_USR=admin
#export UI_PWD=Harbor12345
export MAX_JOB_WORKERS=1
./jobservice

3
jobservice/populate.sql Normal file
View File

@ -0,0 +1,3 @@
use registry;
insert into replication_target (name, url, username, password) values ('test', 'http://10.117.171.31', 'admin', 'Harbor12345');
insert into replication_policy (name, project_id, target_id, enabled, start_time) value ('test_policy', 1, 1, 1, NOW());

13
jobservice/router.go Normal file
View File

@ -0,0 +1,13 @@
package main
import (
api "github.com/vmware/harbor/api/jobs"
"github.com/astaxie/beego"
)
func initRouters() {
beego.Router("/api/jobs/replication", &api.ReplicationJob{})
beego.Router("/api/jobs/replication/:id/log", &api.ReplicationJob{}, "get:GetLog")
beego.Router("/api/jobs/replication/actions", &api.ReplicationJob{}, "post:HandleAction")
}

7
jobservice/start_db.sh Executable file
View File

@ -0,0 +1,7 @@
#export MYQL_ROOT_PASSWORD=root123
docker run --name harbor_mysql -d -e MYSQL_ROOT_PASSWORD=root123 -p 3306:3306 -v /devdata/database:/var/lib/mysql harbor/mysql:dev
echo "sleep 10 seconds..."
sleep 10
mysql -h 127.0.0.1 -uroot -proot123 < ./populate.sql

4
jobservice/stop.json Normal file
View File

@ -0,0 +1,4 @@
{
"policy_id":1,
"action":"stop"
}

View File

@ -0,0 +1 @@
{"policy_id": 1}

View File

@ -0,0 +1 @@
{"policy_id": 1, "repository":"library/ubuntu", "tags":["12.04","11.11"]}

17
jobservice/test.json Normal file
View File

@ -0,0 +1,17 @@
{
"job_type": "transfer_img_out",
"options": {
"whatever": "whatever"
},
"parms": {
"secret": "mysecret",
"image": "ubuntu",
"targets": [{
"url": "127.0.0.1:5000",
"username": "admin",
"password": "admin"
}]
},
"cron_str": ""
}

11
models/base.go Normal file
View File

@ -0,0 +1,11 @@
package models
import (
"github.com/astaxie/beego/orm"
)
func init() {
orm.RegisterModel(new(RepTarget),
new(RepPolicy),
new(RepJob))
}

84
models/replication_job.go Normal file
View File

@ -0,0 +1,84 @@
package models
import (
"time"
)
const (
//JobPending ...
JobPending string = "pending"
//JobRunning ...
JobRunning string = "running"
//JobError ...
JobError string = "error"
//JobStopped ...
JobStopped string = "stopped"
//JobFinished ...
JobFinished string = "finished"
//JobCanceled ...
JobCanceled string = "canceled"
//JobContinue is the status returned by statehandler to tell statemachine to move to next possible state based on trasition table.
JobContinue string = "_continue"
//RepOpTransfer represents the operation of a job to transfer repository to a remote registry/harbor instance.
RepOpTransfer string = "transfer"
//RepOpDelete represents the operation of a job to remove repository from a remote registry/harbor instance.
RepOpDelete string = "delete"
//UISecretCookie is the cookie name to contain the UI secret
UISecretCookie string = "uisecret"
)
// RepPolicy is the model for a replication policy, which associate to a project and a target (destination)
type RepPolicy struct {
ID int64 `orm:"column(id)" json:"id"`
ProjectID int64 `orm:"column(project_id)" json:"project_id"`
TargetID int64 `orm:"column(target_id)" json:"target_id"`
Name string `orm:"column(name)" json:"name"`
// Target RepTarget `orm:"-" json:"target"`
Enabled int `orm:"column(enabled)" json:"enabled"`
Description string `orm:"column(description)" json:"description"`
CronStr string `orm:"column(cron_str)" json:"cron_str"`
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// RepJob is the model for a replication job, which is the execution unit on job service, currently it is used to transfer/remove
// a repository to/from a remote registry instance.
type RepJob struct {
ID int64 `orm:"column(id)" json:"id"`
Status string `orm:"column(status)" json:"status"`
Repository string `orm:"column(repository)" json:"repository"`
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
Operation string `orm:"column(operation)" json:"operation"`
Tags string `orm:"column(tags)" json:"-"`
TagList []string `orm:"-" json:"tags"`
// Policy RepPolicy `orm:"-" json:"policy"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// RepTarget is the model for a replication targe, i.e. destination, which wraps the endpoint URL and username/password of a remote registry.
type RepTarget struct {
ID int64 `orm:"column(id)" json:"id"`
URL string `orm:"column(url)" json:"url"`
Name string `orm:"column(name)" json:"name"`
Username string `orm:"column(username)" json:"username"`
Password string `orm:"column(password)" json:"password"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
//TableName is required by by beego orm to map RepTarget to table replication_target
func (rt *RepTarget) TableName() string {
return "replication_target"
}
//TableName is required by by beego orm to map RepJob to table replication_job
func (rj *RepJob) TableName() string {
return "replication_job"
}
//TableName is required by by beego orm to map RepPolicy to table replication_policy
func (rp *RepPolicy) TableName() string {
return "replication_policy"
}

View File

@ -13,7 +13,7 @@
limitations under the License.
*/
package utils
package cache
import (
"os"

View File

@ -20,10 +20,12 @@ import (
"regexp"
"strings"
"github.com/vmware/harbor/api"
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/models"
svc_utils "github.com/vmware/harbor/service/utils"
"github.com/vmware/harbor/service/cache"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry"
"github.com/astaxie/beego"
)
@ -54,7 +56,8 @@ func (n *NotificationHandler) Post() {
log.Errorf("Failed to match the media type against pattern, error: %v", err)
matched = false
}
if matched && strings.HasPrefix(e.Request.UserAgent, "docker") {
if matched && (strings.HasPrefix(e.Request.UserAgent, "docker") ||
strings.ToLower(strings.TrimSpace(e.Request.UserAgent)) == strings.ToLower(registry.UserAgent)) {
username = e.Actor.Name
action = e.Action
repo = e.Target.Repository
@ -67,14 +70,21 @@ func (n *NotificationHandler) Post() {
if username == "" {
username = "anonymous"
}
if action == "pull" && username == "job-service-user" {
return
}
go dao.AccessLog(username, project, repo, repoTag, action)
if action == "push" {
go func() {
err2 := svc_utils.RefreshCatalogCache()
err2 := cache.RefreshCatalogCache()
if err2 != nil {
log.Errorf("Error happens when refreshing cache: %v", err2)
}
}()
go api.TriggerReplicationByRepository(repo, []string{repoTag}, models.RepOpTransfer)
}
}
}

View File

@ -21,7 +21,7 @@ import (
"github.com/vmware/harbor/auth"
"github.com/vmware/harbor/models"
//svc_utils "github.com/vmware/harbor/service/utils"
svc_utils "github.com/vmware/harbor/service/utils"
"github.com/vmware/harbor/utils/log"
"github.com/astaxie/beego"
@ -38,20 +38,27 @@ type Handler struct {
// checkes the permission agains local DB and generates jwt token.
func (h *Handler) Get() {
var username, password string
request := h.Ctx.Request
log.Infof("request url: %v", request.URL.String())
username, password, _ := request.BasicAuth()
authenticated := authenticate(username, password)
service := h.GetString("service")
scopes := h.GetStrings("scope")
if len(scopes) == 0 && !authenticated {
log.Info("login request with invalid credentials")
h.CustomAbort(http.StatusUnauthorized, "")
}
access := GetResourceActions(scopes)
for _, a := range access {
FilterAccess(username, authenticated, a)
log.Infof("request url: %v", request.URL.String())
if svc_utils.VerifySecret(request) {
log.Debugf("Will grant all access as this request is from job service with legal secret.")
username = "job-service-user"
} else {
username, password, _ = request.BasicAuth()
authenticated := authenticate(username, password)
if len(scopes) == 0 && !authenticated {
log.Info("login request with invalid credentials")
h.CustomAbort(http.StatusUnauthorized, "")
}
for _, a := range access {
FilterAccess(username, authenticated, a)
}
}
h.serveToken(username, service, access)
}

33
service/utils/utils.go Normal file
View File

@ -0,0 +1,33 @@
/*
Copyright (c) 2016 VMware, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package utils contains methods to support security, cache, and webhook functions.
package utils
import (
"github.com/vmware/harbor/utils/log"
"net/http"
"os"
)
// VerifySecret verifies the UI_SECRET cookie in a http request.
func VerifySecret(r *http.Request) bool {
secret := os.Getenv("UI_SECRET")
c, err := r.Cookie("uisecret")
if err != nil {
log.Errorf("Failed to get secret cookie, error: %v", err)
}
return c != nil && c.Value == secret
}

View File

@ -52,17 +52,22 @@ func initRouters() {
//API:
beego.Router("/api/search", &api.SearchAPI{})
beego.Router("/api/projects/:pid/members/?:mid", &api.ProjectMemberAPI{})
beego.Router("/api/projects/:pid([0-9]+)/members/?:mid", &api.ProjectMemberAPI{})
beego.Router("/api/projects/", &api.ProjectAPI{}, "get:List")
beego.Router("/api/projects/?:id", &api.ProjectAPI{})
beego.Router("/api/statistics", &api.StatisticAPI{})
beego.Router("/api/projects/:id/logs/filter", &api.ProjectAPI{}, "post:FilterAccessLog")
beego.Router("/api/users", &api.UserAPI{})
beego.Router("/api/projects/:id([0-9]+)/logs/filter", &api.ProjectAPI{}, "post:FilterAccessLog")
beego.Router("/api/users/?:id", &api.UserAPI{})
beego.Router("/api/users/:id/password", &api.UserAPI{}, "put:ChangePassword")
beego.Router("/api/users/:id([0-9]+)/password", &api.UserAPI{}, "put:ChangePassword")
beego.Router("/api/repositories", &api.RepositoryAPI{})
beego.Router("/api/repositories/tags", &api.RepositoryAPI{}, "get:GetTags")
beego.Router("/api/repositories/manifests", &api.RepositoryAPI{}, "get:GetManifests")
beego.Router("/api/jobs/replication/?:id([0-9]+)", &api.RepJobAPI{})
beego.Router("/api/jobs/replication/:id([0-9]+)/log", &api.RepJobAPI{}, "get:GetLog")
beego.Router("/api/policies/replication", &api.RepPolicyAPI{})
beego.Router("/api/policies/replication/:id([0-9]+)/enablement", &api.RepPolicyAPI{}, "put:UpdateEnablement")
beego.Router("/api/targets/?:id([0-9]+)", &api.TargetAPI{})
beego.Router("/api/targets/ping", &api.TargetAPI{}, "post:Ping")
//external service that hosted on harbor process:
beego.Router("/service/notifications", &service.NotificationHandler{})

View File

@ -17,6 +17,7 @@ package utils
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"golang.org/x/crypto/pbkdf2"
@ -26,3 +27,14 @@ import (
func Encrypt(content string, salt string) string {
return fmt.Sprintf("%x", pbkdf2.Key([]byte(content), []byte(salt), 4096, 16, sha1.New))
}
// ReversibleEncrypt encrypts the str with base64
func ReversibleEncrypt(str string) string {
return base64.StdEncoding.EncodeToString([]byte(str))
}
// ReversibleDecrypt decrypts the str with base64
func ReversibleDecrypt(str string) (string, error) {
b, err := base64.StdEncoding.DecodeString(str)
return string(b), err
}

View File

@ -27,7 +27,7 @@ import (
var logger = New(os.Stdout, NewTextFormatter(), WarningLevel)
func init() {
logger.callDepth = 3
logger.callDepth = 4
// TODO add item in configuaration file
lvl := os.Getenv("LOG_LEVEL")
@ -52,6 +52,7 @@ type Logger struct {
fmtter Formatter
lvl Level
callDepth int
skipLine bool
mu sync.Mutex
}
@ -61,7 +62,7 @@ func New(out io.Writer, fmtter Formatter, lvl Level) *Logger {
out: out,
fmtter: fmtter,
lvl: lvl,
callDepth: 2,
callDepth: 3,
}
}
@ -121,8 +122,7 @@ func (l *Logger) output(record *Record) (err error) {
// Debug ...
func (l *Logger) Debug(v ...interface{}) {
if l.lvl <= DebugLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprint(v...), line, DebugLevel)
record := NewRecord(time.Now(), fmt.Sprint(v...), l.getLine(), DebugLevel)
l.output(record)
}
}
@ -130,8 +130,7 @@ func (l *Logger) Debug(v ...interface{}) {
// Debugf ...
func (l *Logger) Debugf(format string, v ...interface{}) {
if l.lvl <= DebugLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), line, DebugLevel)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), l.getLine(), DebugLevel)
l.output(record)
}
}
@ -171,8 +170,7 @@ func (l *Logger) Warningf(format string, v ...interface{}) {
// Error ...
func (l *Logger) Error(v ...interface{}) {
if l.lvl <= ErrorLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprint(v...), line, ErrorLevel)
record := NewRecord(time.Now(), fmt.Sprint(v...), l.getLine(), ErrorLevel)
l.output(record)
}
}
@ -180,8 +178,7 @@ func (l *Logger) Error(v ...interface{}) {
// Errorf ...
func (l *Logger) Errorf(format string, v ...interface{}) {
if l.lvl <= ErrorLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), line, ErrorLevel)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), l.getLine(), ErrorLevel)
l.output(record)
}
}
@ -189,8 +186,7 @@ func (l *Logger) Errorf(format string, v ...interface{}) {
// Fatal ...
func (l *Logger) Fatal(v ...interface{}) {
if l.lvl <= FatalLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprint(v...), line, FatalLevel)
record := NewRecord(time.Now(), fmt.Sprint(v...), l.getLine(), FatalLevel)
l.output(record)
}
os.Exit(1)
@ -199,13 +195,19 @@ func (l *Logger) Fatal(v ...interface{}) {
// Fatalf ...
func (l *Logger) Fatalf(format string, v ...interface{}) {
if l.lvl <= FatalLevel {
line := line(l.callDepth)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), line, FatalLevel)
record := NewRecord(time.Now(), fmt.Sprintf(format, v...), l.getLine(), FatalLevel)
l.output(record)
}
os.Exit(1)
}
func (l *Logger) getLine() string {
if l.skipLine {
return ""
}
return line(l.callDepth)
}
// Debug ...
func Debug(v ...interface{}) {
logger.Debug(v...)

View File

@ -42,3 +42,19 @@ func NewBasicAuthCredential(username, password string) Credential {
func (b *basicAuthCredential) AddAuthorization(req *http.Request) {
req.SetBasicAuth(b.username, b.password)
}
type cookieCredential struct {
cookie *http.Cookie
}
// NewCookieCredential initialize a cookie based crendential handler, the cookie in parameter will be added to request to registry
// if this crendential is attached to a registry client.
func NewCookieCredential(c *http.Cookie) Credential {
return &cookieCredential{
cookie: c,
}
}
func (c *cookieCredential) AddAuthorization(req *http.Request) {
req.AddCookie(c.cookie)
}

View File

@ -27,7 +27,7 @@ import (
token_util "github.com/vmware/harbor/service/token"
"github.com/vmware/harbor/utils/log"
registry_errors "github.com/vmware/harbor/utils/registry/errors"
registry_error "github.com/vmware/harbor/utils/registry/error"
)
type scope struct {
@ -75,7 +75,9 @@ func (t *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]str
hasFrom = true
}
scopes = append(scopes, t.scope)
if t.scope != nil {
scopes = append(scopes, t.scope)
}
expired := true
@ -143,11 +145,14 @@ func NewStandardTokenHandler(credential Credential, scopeType, scopeName string,
credential: credential,
}
handler.scope = &scope{
Type: scopeType,
Name: scopeName,
Actions: scopeActions,
if len(scopeType) != 0 || len(scopeName) != 0 {
handler.scope = &scope{
Type: scopeType,
Name: scopeName,
Actions: scopeActions,
}
}
handler.tg = handler.generateToken
return handler
@ -182,10 +187,9 @@ func (s *standardTokenHandler) generateToken(realm, service string, scopes []str
return
}
if resp.StatusCode != http.StatusOK {
err = registry_errors.Error{
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
}
@ -203,12 +207,14 @@ func (s *standardTokenHandler) generateToken(realm, service string, scopes []str
expiresIn = tk.ExpiresIn
t, err := time.Parse(time.RFC3339, tk.IssuedAt)
if err != nil {
log.Errorf("error occurred while parsing issued_at: %v", err)
err = nil
} else {
issuedAt = &t
if len(tk.IssuedAt) != 0 {
t, err := time.Parse(time.RFC3339, tk.IssuedAt)
if err != nil {
log.Errorf("error occurred while parsing issued_at: %v", err)
err = nil
} else {
issuedAt = &t
}
}
log.Debug("get token from token server")

View File

@ -13,27 +13,19 @@
limitations under the License.
*/
package errors
package error
import (
"fmt"
)
// Error : if response's status code is not 200 or does not meet requirement,
// an Error instance will be returned
// Error : if response is returned but the status code is not 200, an Error instance will be returned
type Error struct {
StatusCode int
StatusText string
Message string
Detail string
}
// Error ...
func (e Error) Error() string {
return fmt.Sprintf("%d %s %s", e.StatusCode, e.StatusText, e.Message)
}
// ParseError parses err, if err is type Error, convert it to Error
func ParseError(err error) (Error, bool) {
e, ok := err.(Error)
return e, ok
// Error returns the details as string
func (e *Error) Error() string {
return fmt.Sprintf("%d %s", e.StatusCode, e.Detail)
}

View File

@ -25,7 +25,12 @@ import (
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry/auth"
"github.com/vmware/harbor/utils/registry/errors"
registry_error "github.com/vmware/harbor/utils/registry/error"
)
const (
// UserAgent is used to decorate the request so it can be identified by webhook.
UserAgent string = "registry-client"
)
// Registry holds information of a registry entity
@ -78,6 +83,36 @@ func NewRegistryWithUsername(endpoint, username string) (*Registry, error) {
return registry, nil
}
// NewRegistryWithCredential returns a Registry instance which associate to a crendential.
// And Credential is essentially a decorator for client to docorate the request before sending it to the registry.
func NewRegistryWithCredential(endpoint string, credential auth.Credential) (*Registry, error) {
endpoint = strings.TrimSpace(endpoint)
endpoint = strings.TrimRight(endpoint, "/")
if !strings.HasPrefix(endpoint, "http://") &&
!strings.HasPrefix(endpoint, "https://") {
endpoint = "http://" + endpoint
}
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
client, err := newClient(endpoint, "", credential, "", "", "")
if err != nil {
return nil, err
}
registry := &Registry{
Endpoint: u,
client: client,
}
log.Debugf("initialized a registry client with credential: %s", endpoint)
return registry, nil
}
// Catalog ...
func (r *Registry) Catalog() ([]string, error) {
repos := []string{}
@ -89,11 +124,7 @@ func (r *Registry) Catalog() ([]string, error) {
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return repos, e
}
return repos, err
return repos, parseError(err)
}
defer resp.Body.Close()
@ -117,10 +148,48 @@ func (r *Registry) Catalog() ([]string, error) {
return repos, nil
}
return repos, errors.Error{
return repos, &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}
// Ping ...
func (r *Registry) Ping() error {
req, err := http.NewRequest("GET", buildPingURL(r.Endpoint.String()), nil)
if err != nil {
return err
}
resp, err := r.client.Do(req)
if err != nil {
// if urlErr, ok := err.(*url.Error); ok {
// if regErr, ok := urlErr.Err.(*registry_error.Error); ok {
// return &registry_error.Error{
// StatusCode: regErr.StatusCode,
// Detail: regErr.Detail,
// }
// }
// return urlErr.Err
// }
return parseError(err)
}
if resp.StatusCode == http.StatusOK {
return nil
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return &registry_error.Error{
StatusCode: resp.StatusCode,
Detail: string(b),
}
}
@ -149,8 +218,9 @@ func newClient(endpoint, username string, credential auth.Credential,
challenges := auth.ParseChallengeFromResponse(resp)
authorizer := auth.NewRequestAuthorizer(handlers, challenges)
headerModifier := NewHeaderModifier(map[string]string{http.CanonicalHeaderKey("User-Agent"): UserAgent})
transport := NewTransport(http.DefaultTransport, []RequestModifier{authorizer})
transport := NewTransport(http.DefaultTransport, []RequestModifier{authorizer, headerModifier})
return &http.Client{
Transport: transport,
}, nil

View File

@ -19,6 +19,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
@ -29,7 +30,7 @@ import (
"github.com/docker/distribution/manifest/schema2"
"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry/auth"
"github.com/vmware/harbor/utils/registry/errors"
registry_error "github.com/vmware/harbor/utils/registry/error"
)
// Repository holds information of a repository entity
@ -111,15 +112,14 @@ func NewRepositoryWithUsername(name, endpoint, username string) (*Repository, er
return repository, nil
}
// try to convert err to errors.Error if it is
func isUnauthorizedError(err error) (bool, error) {
if strings.Contains(err.Error(), http.StatusText(http.StatusUnauthorized)) {
return true, errors.Error{
StatusCode: http.StatusUnauthorized,
StatusText: http.StatusText(http.StatusUnauthorized),
func parseError(err error) error {
if urlErr, ok := err.(*url.Error); ok {
if regErr, ok := urlErr.Err.(*registry_error.Error); ok {
return regErr
}
return urlErr.Err
}
return false, err
return err
}
// ListTag ...
@ -132,11 +132,7 @@ func (r *Repository) ListTag() ([]string, error) {
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return tags, e
}
return tags, err
return tags, parseError(err)
}
defer resp.Body.Close()
@ -159,10 +155,9 @@ func (r *Repository) ListTag() ([]string, error) {
return tags, nil
}
return tags, errors.Error{
return tags, &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}
@ -179,11 +174,7 @@ func (r *Repository) ManifestExist(reference string) (digest string, exist bool,
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
err = e
return
}
err = parseError(err)
return
}
@ -204,10 +195,9 @@ func (r *Repository) ManifestExist(reference string) (digest string, exist bool,
return
}
err = errors.Error{
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
}
@ -225,11 +215,7 @@ func (r *Repository) PullManifest(reference string, acceptMediaTypes []string) (
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
err = e
return
}
err = parseError(err)
return
}
@ -246,10 +232,9 @@ func (r *Repository) PullManifest(reference string, acceptMediaTypes []string) (
return
}
err = errors.Error{
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
@ -266,11 +251,7 @@ func (r *Repository) PushManifest(reference, mediaType string, payload []byte) (
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
err = e
return
}
err = parseError(err)
return
}
@ -286,10 +267,9 @@ func (r *Repository) PushManifest(reference, mediaType string, payload []byte) (
return
}
err = errors.Error{
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
@ -304,11 +284,7 @@ func (r *Repository) DeleteManifest(digest string) error {
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return e
}
return err
return parseError(err)
}
if resp.StatusCode == http.StatusAccepted {
@ -322,10 +298,9 @@ func (r *Repository) DeleteManifest(digest string) error {
return err
}
return errors.Error{
return &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}
@ -337,9 +312,8 @@ func (r *Repository) DeleteTag(tag string) error {
}
if !exist {
return errors.Error{
return &registry_error.Error{
StatusCode: http.StatusNotFound,
StatusText: http.StatusText(http.StatusNotFound),
}
}
@ -355,11 +329,7 @@ func (r *Repository) BlobExist(digest string) (bool, error) {
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return false, e
}
return false, err
return false, parseError(err)
}
if resp.StatusCode == http.StatusOK {
@ -377,15 +347,14 @@ func (r *Repository) BlobExist(digest string) (bool, error) {
return false, err
}
return false, errors.Error{
return false, &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}
// PullBlob ...
func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error) {
func (r *Repository) PullBlob(digest string) (size int64, data io.ReadCloser, err error) {
req, err := http.NewRequest("GET", buildBlobURL(r.Endpoint.String(), r.Name, digest), nil)
if err != nil {
return
@ -393,17 +362,7 @@ func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
err = e
return
}
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
err = parseError(err)
return
}
@ -413,14 +372,19 @@ func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error
if err != nil {
return
}
data = b
data = resp.Body
return
}
err = errors.Error{
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
@ -432,11 +396,7 @@ func (r *Repository) initiateBlobUpload(name string) (location, uploadUUID strin
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
err = e
return
}
err = parseError(err)
return
}
@ -453,28 +413,23 @@ func (r *Repository) initiateBlobUpload(name string) (location, uploadUUID strin
return
}
err = errors.Error{
err = &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
return
}
func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data []byte) error {
req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), bytes.NewReader(data))
func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data io.Reader) error {
req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), data)
if err != nil {
return err
}
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return e
}
return err
return parseError(err)
}
if resp.StatusCode == http.StatusCreated {
@ -488,25 +443,14 @@ func (r *Repository) monolithicBlobUpload(location, digest string, size int64, d
return err
}
return errors.Error{
return &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}
// PushBlob ...
func (r *Repository) PushBlob(digest string, size int64, data []byte) error {
exist, err := r.BlobExist(digest)
if err != nil {
return err
}
if exist {
log.Infof("blob already exists, skip pushing: %s %s", r.Name, digest)
return nil
}
func (r *Repository) PushBlob(digest string, size int64, data io.Reader) error {
location, _, err := r.initiateBlobUpload(r.Name)
if err != nil {
return err
@ -524,11 +468,7 @@ func (r *Repository) DeleteBlob(digest string) error {
resp, err := r.client.Do(req)
if err != nil {
ok, e := isUnauthorizedError(err)
if ok {
return e
}
return err
return parseError(err)
}
if resp.StatusCode == http.StatusAccepted {
@ -542,10 +482,9 @@ func (r *Repository) DeleteBlob(digest string) error {
return err
}
return errors.Error{
return &registry_error.Error{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
Message: string(b),
Detail: string(b),
}
}

View File

@ -25,9 +25,8 @@ import (
"testing"
"time"
//"github.com/vmware/harbor/utils/log"
"github.com/vmware/harbor/utils/registry/auth"
"github.com/vmware/harbor/utils/registry/errors"
"github.com/vmware/harbor/utils/registry/error"
)
var (
@ -164,12 +163,12 @@ func TestListTagWithInvalidCredential(t *testing.T) {
t.Error(err)
}
_, err = client.ListTag()
if err != nil {
e, ok := errors.ParseError(err)
if _, err = client.ListTag(); err != nil {
e, ok := err.(*error.Error)
if ok && e.StatusCode == http.StatusUnauthorized {
return
}
t.Error(err)
return
}

View File

@ -26,6 +26,26 @@ type RequestModifier interface {
ModifyRequest(*http.Request) error
}
// HeaderModifier adds headers to request
type HeaderModifier struct {
headers map[string]string
}
// NewHeaderModifier ...
func NewHeaderModifier(headers map[string]string) *HeaderModifier {
return &HeaderModifier{
headers: headers,
}
}
// ModifyRequest adds headers to the request
func (h *HeaderModifier) ModifyRequest(req *http.Request) error {
for key, value := range h.headers {
req.Header.Add(key, value)
}
return nil
}
// Transport holds information about base transport and modifiers
type Transport struct {
transport http.RoundTripper