mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 18:25:56 +01:00
Feat: enable mtls in core
add mtls related code in core Signed-off-by: DQ <dengq@vmware.com>
This commit is contained in:
parent
a4855cca36
commit
da359f609f
@ -21,7 +21,7 @@ http {
|
||||
|
||||
upstream core {
|
||||
{% if internal_tls.enabled %}
|
||||
server core:10443;
|
||||
server core:8443;
|
||||
{% else %}
|
||||
server core:8080;
|
||||
{% endif %}
|
||||
|
@ -348,9 +348,9 @@ def parse_yaml_config(config_file_path, with_notary, with_clair, with_trivy, wit
|
||||
|
||||
if config_dict['internal_tls'].enabled:
|
||||
config_dict['registry_controller_url'] = 'https://registryctl:8443'
|
||||
config_dict['core_url'] = 'https://core:10443'
|
||||
config_dict['core_local_url'] = 'https://127.0.0.1:10443'
|
||||
config_dict['token_service_url'] = 'https://core:10443/service/token'
|
||||
config_dict['core_url'] = 'https://core:8443'
|
||||
config_dict['core_local_url'] = 'https://127.0.0.1:8443'
|
||||
config_dict['token_service_url'] = 'https://core:8443/service/token'
|
||||
config_dict['jobservice_url'] = 'https://jobservice:8443'
|
||||
# config_dict['clair_adapter_url'] = 'https://clair-adapter:8443'
|
||||
# config_dict['notary_url'] = 'https://notary-server:4443'
|
||||
|
@ -2,7 +2,9 @@ package driver
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/goharbor/harbor/src/common/http"
|
||||
"net/http"
|
||||
|
||||
commonhttp "github.com/goharbor/harbor/src/common/http"
|
||||
"github.com/goharbor/harbor/src/common/http/modifier"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
)
|
||||
@ -10,12 +12,17 @@ import (
|
||||
// RESTDriver - config store driver based on REST API
|
||||
type RESTDriver struct {
|
||||
configRESTURL string
|
||||
client *http.Client
|
||||
client *commonhttp.Client
|
||||
}
|
||||
|
||||
// NewRESTDriver - Create RESTDriver
|
||||
func NewRESTDriver(configRESTURL string, modifiers ...modifier.Modifier) *RESTDriver {
|
||||
return &RESTDriver{configRESTURL: configRESTURL, client: http.NewClient(nil, modifiers...)}
|
||||
if commonhttp.InternalTLSEnabled() {
|
||||
tr := commonhttp.GetHTTPTransport(commonhttp.InternalTransport)
|
||||
return &RESTDriver{configRESTURL: configRESTURL, client: commonhttp.NewClient(&http.Client{Transport: tr}, modifiers...)}
|
||||
|
||||
}
|
||||
return &RESTDriver{configRESTURL: configRESTURL, client: commonhttp.NewClient(nil, modifiers...)}
|
||||
}
|
||||
|
||||
// Load - load config data from REST server
|
||||
|
@ -28,17 +28,24 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// Client is a util for common HTTP operations, such Get, Head, Post, Put and Delete.
|
||||
// Use Do instead if those methods can not meet your requirement
|
||||
type Client struct {
|
||||
modifiers []modifier.Modifier
|
||||
client *http.Client
|
||||
}
|
||||
const (
|
||||
// DefaultTransport used to get the default http Transport
|
||||
DefaultTransport = iota
|
||||
// InsecureTransport used to get the insecure http Transport
|
||||
InsecureTransport
|
||||
// InternalTransport used to get the internal secure http Transport
|
||||
InternalTransport
|
||||
// SecureTransport used to get the external secure http Transport
|
||||
SecureTransport
|
||||
)
|
||||
|
||||
var defaultHTTPTransport, secureHTTPTransport, insecureHTTPTransport *http.Transport
|
||||
var (
|
||||
secureHTTPTransport *http.Transport
|
||||
insecureHTTPTransport *http.Transport
|
||||
internalTransport *http.Transport
|
||||
)
|
||||
|
||||
func init() {
|
||||
defaultHTTPTransport = &http.Transport{}
|
||||
|
||||
secureHTTPTransport = &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
@ -46,23 +53,55 @@ func init() {
|
||||
InsecureSkipVerify: false,
|
||||
},
|
||||
}
|
||||
|
||||
insecureHTTPTransport = &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
|
||||
initInternalTransport()
|
||||
}
|
||||
|
||||
// Client is a util for common HTTP operations, such Get, Head, Post, Put and Delete.
|
||||
// Use Do instead if those methods can not meet your requirement
|
||||
type Client struct {
|
||||
modifiers []modifier.Modifier
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func initInternalTransport() {
|
||||
if InternalTLSEnabled() {
|
||||
tlsConfig, err := GetInternalTLSConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
internalTransport = &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
} else {
|
||||
internalTransport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetHTTPTransport returns HttpTransport based on insecure configuration
|
||||
func GetHTTPTransport(insecure ...bool) *http.Transport {
|
||||
if len(insecure) == 0 {
|
||||
return defaultHTTPTransport
|
||||
func GetHTTPTransport(clientType uint) *http.Transport {
|
||||
switch clientType {
|
||||
case SecureTransport:
|
||||
return secureHTTPTransport.Clone()
|
||||
case InsecureTransport:
|
||||
return insecureHTTPTransport.Clone()
|
||||
case InternalTransport:
|
||||
return internalTransport.Clone()
|
||||
default:
|
||||
// default Transport is secure one
|
||||
return secureHTTPTransport.Clone()
|
||||
}
|
||||
if insecure[0] {
|
||||
return insecureHTTPTransport
|
||||
}
|
||||
return secureHTTPTransport
|
||||
}
|
||||
|
||||
// NewClient creates an instance of Client.
|
||||
@ -74,9 +113,7 @@ func NewClient(c *http.Client, modifiers ...modifier.Modifier) *Client {
|
||||
}
|
||||
if client.client == nil {
|
||||
client.client = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
},
|
||||
Transport: GetHTTPTransport(DefaultTransport),
|
||||
}
|
||||
}
|
||||
if len(modifiers) > 0 {
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
)
|
||||
|
||||
func TestGetHTTPTransport(t *testing.T) {
|
||||
transport := GetHTTPTransport(true)
|
||||
transport := GetHTTPTransport(InsecureTransport)
|
||||
assert.True(t, transport.TLSClientConfig.InsecureSkipVerify)
|
||||
transport = GetHTTPTransport(false)
|
||||
transport = GetHTTPTransport(SecureTransport)
|
||||
assert.False(t, transport.TLSClientConfig.InsecureSkipVerify)
|
||||
}
|
||||
|
90
src/common/http/tls.go
Normal file
90
src/common/http/tls.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// Internal TLS ENV
|
||||
internalTLSEnable = "INTERNAL_TLS_ENABLED"
|
||||
internalTLSKeyPath = "INTERNAL_TLS_KEY_PATH"
|
||||
internalTLSCertPath = "INTERNAL_TLS_CERT_PATH"
|
||||
internalTrustCAPath = "INTERNAL_TLS_TRUST_CA_PATH"
|
||||
)
|
||||
|
||||
// InternalTLSEnabled returns if internal TLS enabled
|
||||
func InternalTLSEnabled() bool {
|
||||
iTLSEnabled := os.Getenv(internalTLSEnable)
|
||||
if strings.ToLower(iTLSEnabled) == "true" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetInternalCA used to get internal cert file from Env
|
||||
func GetInternalCA(caPool *x509.CertPool) *x509.CertPool {
|
||||
if caPool == nil {
|
||||
caPool = x509.NewCertPool()
|
||||
}
|
||||
|
||||
caPath := os.Getenv(internalTrustCAPath)
|
||||
if caPath != "" {
|
||||
caCert, err := ioutil.ReadFile(caPath)
|
||||
if err != nil {
|
||||
log.Errorf("read ca file %s failure %w", caPath, err)
|
||||
}
|
||||
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
|
||||
log.Errorf("append ca to ca pool fail")
|
||||
} else {
|
||||
log.Infof("append trustCA %s success", caPath)
|
||||
}
|
||||
}
|
||||
|
||||
return caPool
|
||||
}
|
||||
|
||||
// GetInternalCertPair used to get internal cert and key pair from environment
|
||||
func GetInternalCertPair() (tls.Certificate, error) {
|
||||
crtPath := os.Getenv(internalTLSCertPath)
|
||||
keyPath := os.Getenv(internalTLSKeyPath)
|
||||
cert, err := tls.LoadX509KeyPair(crtPath, keyPath)
|
||||
return cert, err
|
||||
}
|
||||
|
||||
// GetInternalTLSConfig return a tls.Config for internal https communicate
|
||||
func GetInternalTLSConfig() (*tls.Config, error) {
|
||||
// generate ca pool
|
||||
caCertPool := GetInternalCA(nil)
|
||||
|
||||
// genrate key pair
|
||||
cert, err := GetInternalCertPair()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("internal TLS enabled but can't get cert file %w", err)
|
||||
}
|
||||
|
||||
return &tls.Config{
|
||||
RootCAs: caCertPool,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}, nil
|
||||
}
|
@ -62,10 +62,13 @@ func Init() {
|
||||
// NewDefaultClient creates a default client based on endpoint and secret.
|
||||
func NewDefaultClient(endpoint, secret string) *DefaultClient {
|
||||
var c *commonhttp.Client
|
||||
httpCli := &http.Client{
|
||||
Transport: commonhttp.GetHTTPTransport(commonhttp.InternalTransport),
|
||||
}
|
||||
if len(secret) > 0 {
|
||||
c = commonhttp.NewClient(nil, auth.NewSecretAuthorizer(secret))
|
||||
c = commonhttp.NewClient(httpCli, auth.NewSecretAuthorizer(secret))
|
||||
} else {
|
||||
c = commonhttp.NewClient(nil)
|
||||
c = commonhttp.NewClient(httpCli)
|
||||
}
|
||||
e := strings.TrimRight(endpoint, "/")
|
||||
return &DefaultClient{
|
||||
@ -74,7 +77,35 @@ func NewDefaultClient(endpoint, secret string) *DefaultClient {
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitJob call jobserivce API to submit a job and returns the job's UUID.
|
||||
// NewReplicationClient used to create a client for replication
|
||||
func NewReplicationClient(endpoint, secret string) *DefaultClient {
|
||||
var tr *http.Transport
|
||||
if endpoint == config.InternalCoreURL() {
|
||||
tr = commonhttp.GetHTTPTransport(commonhttp.InternalTransport)
|
||||
} else {
|
||||
tr = commonhttp.GetHTTPTransport(commonhttp.DefaultTransport)
|
||||
}
|
||||
|
||||
var c *commonhttp.Client
|
||||
if len(secret) > 0 {
|
||||
c = commonhttp.NewClient(&http.Client{
|
||||
Transport: tr,
|
||||
},
|
||||
auth.NewSecretAuthorizer(secret))
|
||||
} else {
|
||||
c = commonhttp.NewClient(&http.Client{
|
||||
Transport: tr,
|
||||
})
|
||||
}
|
||||
|
||||
e := strings.TrimRight(endpoint, "/")
|
||||
return &DefaultClient{
|
||||
endpoint: e,
|
||||
client: c,
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitJob call jobservice API to submit a job and returns the job's UUID.
|
||||
func (d *DefaultClient) SubmitJob(jd *models.JobData) (string, error) {
|
||||
url := d.endpoint + "/api/v1/jobs"
|
||||
jq := models.JobRequest{
|
||||
|
@ -131,7 +131,8 @@ func HTTPStatusCodeHealthChecker(method string, url string, header http.Header,
|
||||
}
|
||||
|
||||
client := httputil.NewClient(&http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: httputil.GetHTTPTransport(httputil.InternalTransport),
|
||||
Timeout: timeout,
|
||||
})
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
|
461
src/core/api/quota/registry/registry.go
Normal file
461
src/core/api/quota/registry/registry.go
Normal file
@ -0,0 +1,461 @@
|
||||
// Copyright 2018 Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package registry
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
"github.com/goharbor/harbor/src/common/dao"
|
||||
"github.com/goharbor/harbor/src/common/models"
|
||||
common_quota "github.com/goharbor/harbor/src/common/quota"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/api"
|
||||
quota "github.com/goharbor/harbor/src/core/api/quota"
|
||||
"github.com/goharbor/harbor/src/core/promgr"
|
||||
"github.com/goharbor/harbor/src/pkg/registry"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Migrator ...
|
||||
type Migrator struct {
|
||||
pm promgr.ProjectManager
|
||||
}
|
||||
|
||||
// NewRegistryMigrator returns a new Migrator.
|
||||
func NewRegistryMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
|
||||
migrator := Migrator{
|
||||
pm: pm,
|
||||
}
|
||||
return &migrator
|
||||
}
|
||||
|
||||
// Ping ...
|
||||
func (rm *Migrator) Ping() error {
|
||||
return quota.Check(api.HealthCheckerRegistry["registry"].Check)
|
||||
}
|
||||
|
||||
// Dump ...
|
||||
func (rm *Migrator) Dump() ([]quota.ProjectInfo, error) {
|
||||
var (
|
||||
projects []quota.ProjectInfo
|
||||
wg sync.WaitGroup
|
||||
err error
|
||||
)
|
||||
|
||||
reposInRegistry, err := registry.Cli.Catalog()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// repoMap : map[project_name : []repo list]
|
||||
repoMap := make(map[string][]string)
|
||||
for _, item := range reposInRegistry {
|
||||
projectName := strings.Split(item, "/")[0]
|
||||
pro, err := rm.pm.Get(projectName)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get project %s: %v", projectName, err)
|
||||
continue
|
||||
}
|
||||
if pro == nil {
|
||||
continue
|
||||
}
|
||||
_, exist := repoMap[pro.Name]
|
||||
if !exist {
|
||||
repoMap[pro.Name] = []string{item}
|
||||
} else {
|
||||
repos := repoMap[pro.Name]
|
||||
repos = append(repos, item)
|
||||
repoMap[pro.Name] = repos
|
||||
}
|
||||
}
|
||||
repoMap, err = rm.appendEmptyProject(repoMap)
|
||||
if err != nil {
|
||||
log.Errorf("fail to add empty projects: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wg.Add(len(repoMap))
|
||||
errChan := make(chan error, 1)
|
||||
infoChan := make(chan interface{})
|
||||
done := make(chan bool, 1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case result := <-infoChan:
|
||||
if result == nil {
|
||||
return
|
||||
}
|
||||
project, ok := result.(quota.ProjectInfo)
|
||||
if ok {
|
||||
projects = append(projects, project)
|
||||
}
|
||||
|
||||
case e := <-errChan:
|
||||
if err == nil {
|
||||
err = errors.Wrap(e, "quota sync error on getting info of project")
|
||||
} else {
|
||||
err = errors.Wrap(e, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for project, repos := range repoMap {
|
||||
go func(project string, repos []string) {
|
||||
defer wg.Done()
|
||||
info, err := infoOfProject(project, repos)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
infoChan <- info
|
||||
}(project, repos)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(infoChan)
|
||||
|
||||
// wait for all of project info
|
||||
<-done
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return projects, nil
|
||||
}
|
||||
|
||||
// As catalog api cannot list the empty projects in harbor, here it needs to append the empty projects into repo infor
|
||||
// so that quota syncer can add 0 usage into quota usage.
|
||||
func (rm *Migrator) appendEmptyProject(repoMap map[string][]string) (map[string][]string, error) {
|
||||
var withEmptyProjects map[string][]string
|
||||
all, err := dao.GetProjects(nil)
|
||||
if err != nil {
|
||||
return withEmptyProjects, err
|
||||
}
|
||||
withEmptyProjects = repoMap
|
||||
for _, pro := range all {
|
||||
_, exist := repoMap[pro.Name]
|
||||
if !exist {
|
||||
withEmptyProjects[pro.Name] = []string{}
|
||||
}
|
||||
}
|
||||
return withEmptyProjects, nil
|
||||
}
|
||||
|
||||
// Usage ...
|
||||
// registry needs to merge the shard blobs of different repositories.
|
||||
func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, error) {
|
||||
var pros []quota.ProjectUsage
|
||||
|
||||
for _, project := range projects {
|
||||
var size, count int64
|
||||
var blobs = make(map[string]int64)
|
||||
|
||||
// usage count
|
||||
for _, repo := range project.Repos {
|
||||
count = count + int64(len(repo.Afs))
|
||||
// Because that there are some shared blobs between repositories, it needs to remove the duplicate items.
|
||||
for _, blob := range repo.Blobs {
|
||||
_, exist := blobs[blob.Digest]
|
||||
// foreign blob won't be calculated
|
||||
if !exist && blob.ContentType != common.ForeignLayer {
|
||||
blobs[blob.Digest] = blob.Size
|
||||
}
|
||||
}
|
||||
}
|
||||
// size
|
||||
for _, item := range blobs {
|
||||
size = size + item
|
||||
}
|
||||
|
||||
proUsage := quota.ProjectUsage{
|
||||
Project: project.Name,
|
||||
Used: common_quota.ResourceList{
|
||||
common_quota.ResourceCount: count,
|
||||
common_quota.ResourceStorage: size,
|
||||
},
|
||||
}
|
||||
pros = append(pros, proUsage)
|
||||
}
|
||||
|
||||
return pros, nil
|
||||
}
|
||||
|
||||
// Persist ...
|
||||
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
|
||||
total := len(projects)
|
||||
for i, project := range projects {
|
||||
log.Infof("[Quota-Sync]:: start to persist artifact&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
|
||||
for _, repo := range project.Repos {
|
||||
if err := persistAf(repo.Afs); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := persistAfnbs(repo.Afnbs); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := persistBlob(repo.Blobs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Infof("[Quota-Sync]:: success to persist artifact&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
|
||||
}
|
||||
if err := persistPB(projects); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistAf(afs []*models.Artifact) error {
|
||||
if len(afs) != 0 {
|
||||
for _, af := range afs {
|
||||
_, err := dao.AddArtifact(af)
|
||||
if err != nil {
|
||||
if err == dao.ErrDupRows {
|
||||
continue
|
||||
}
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistAfnbs(afnbs []*models.ArtifactAndBlob) error {
|
||||
if len(afnbs) != 0 {
|
||||
for _, afnb := range afnbs {
|
||||
_, err := dao.AddArtifactNBlob(afnb)
|
||||
if err != nil {
|
||||
if err == dao.ErrDupRows {
|
||||
continue
|
||||
}
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistBlob(blobs []*models.Blob) error {
|
||||
if len(blobs) != 0 {
|
||||
for _, blob := range blobs {
|
||||
_, err := dao.AddBlob(blob)
|
||||
if err != nil {
|
||||
if err == dao.ErrDupRows {
|
||||
continue
|
||||
}
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func persistPB(projects []quota.ProjectInfo) error {
|
||||
total := len(projects)
|
||||
for i, project := range projects {
|
||||
log.Infof("[Quota-Sync]:: start to persist project&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
|
||||
var blobs = make(map[string]int64)
|
||||
var blobsOfPro []*models.Blob
|
||||
for _, repo := range project.Repos {
|
||||
for _, blob := range repo.Blobs {
|
||||
_, exist := blobs[blob.Digest]
|
||||
if exist {
|
||||
continue
|
||||
}
|
||||
blobs[blob.Digest] = blob.Size
|
||||
blobInDB, err := dao.GetBlob(blob.Digest)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
if blobInDB != nil {
|
||||
blobsOfPro = append(blobsOfPro, blobInDB)
|
||||
}
|
||||
}
|
||||
}
|
||||
pro, err := dao.GetProjectByName(project.Name)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
_, err = dao.AddBlobsToProject(pro.ProjectID, blobsOfPro...)
|
||||
if err != nil {
|
||||
if err == dao.ErrDupRows {
|
||||
continue
|
||||
}
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
log.Infof("[Quota-Sync]:: success to persist project&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func infoOfProject(project string, repoList []string) (quota.ProjectInfo, error) {
|
||||
var (
|
||||
repos []quota.RepoData
|
||||
wg sync.WaitGroup
|
||||
err error
|
||||
)
|
||||
wg.Add(len(repoList))
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
infoChan := make(chan interface{})
|
||||
done := make(chan bool, 1)
|
||||
|
||||
pro, err := dao.GetProjectByName(project)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return quota.ProjectInfo{}, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case result := <-infoChan:
|
||||
if result == nil {
|
||||
return
|
||||
}
|
||||
repoData, ok := result.(quota.RepoData)
|
||||
if ok {
|
||||
repos = append(repos, repoData)
|
||||
}
|
||||
|
||||
case e := <-errChan:
|
||||
if err == nil {
|
||||
err = errors.Wrap(e, "quota sync error on getting info of repo")
|
||||
} else {
|
||||
err = errors.Wrap(e, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for _, repo := range repoList {
|
||||
go func(pid int64, repo string) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
info, err := infoOfRepo(pid, repo)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
infoChan <- info
|
||||
}(pro.ProjectID, repo)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(infoChan)
|
||||
|
||||
<-done
|
||||
|
||||
if err != nil {
|
||||
return quota.ProjectInfo{}, err
|
||||
}
|
||||
|
||||
return quota.ProjectInfo{
|
||||
Name: project,
|
||||
Repos: repos,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func infoOfRepo(pid int64, repo string) (quota.RepoData, error) {
|
||||
tags, err := registry.Cli.ListTags(repo)
|
||||
if err != nil {
|
||||
return quota.RepoData{}, err
|
||||
}
|
||||
var afnbs []*models.ArtifactAndBlob
|
||||
var afs []*models.Artifact
|
||||
var blobs []*models.Blob
|
||||
|
||||
for _, tag := range tags {
|
||||
manifest, digest, err := registry.Cli.PullManifest(repo, tag)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
// To workaround issue: https://github.com/goharbor/harbor/issues/9299, just log the error and do not raise it.
|
||||
// Let the sync process pass, but the 'Unknown manifest' will not be counted into size and count of quota usage.
|
||||
// User still can view there images with size 0 in harbor.
|
||||
continue
|
||||
}
|
||||
mediaType, payload, err := manifest.Payload()
|
||||
if err != nil {
|
||||
return quota.RepoData{}, err
|
||||
}
|
||||
// self
|
||||
afnb := &models.ArtifactAndBlob{
|
||||
DigestAF: digest,
|
||||
DigestBlob: digest,
|
||||
}
|
||||
afnbs = append(afnbs, afnb)
|
||||
// add manifest as a blob.
|
||||
blob := &models.Blob{
|
||||
Digest: digest,
|
||||
ContentType: mediaType,
|
||||
Size: int64(len(payload)),
|
||||
CreationTime: time.Now(),
|
||||
}
|
||||
blobs = append(blobs, blob)
|
||||
for _, layer := range manifest.References() {
|
||||
afnb := &models.ArtifactAndBlob{
|
||||
DigestAF: digest,
|
||||
DigestBlob: layer.Digest.String(),
|
||||
}
|
||||
afnbs = append(afnbs, afnb)
|
||||
blob := &models.Blob{
|
||||
Digest: layer.Digest.String(),
|
||||
ContentType: layer.MediaType,
|
||||
Size: layer.Size,
|
||||
CreationTime: time.Now(),
|
||||
}
|
||||
blobs = append(blobs, blob)
|
||||
}
|
||||
af := &models.Artifact{
|
||||
PID: pid,
|
||||
Repo: repo,
|
||||
Tag: tag,
|
||||
Digest: digest,
|
||||
Kind: "Docker-Image",
|
||||
CreationTime: time.Now(),
|
||||
}
|
||||
afs = append(afs, af)
|
||||
}
|
||||
return quota.RepoData{
|
||||
Name: repo,
|
||||
Afs: afs,
|
||||
Afnbs: afnbs,
|
||||
Blobs: blobs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
quota.Register("registry", NewRegistryMigrator)
|
||||
}
|
@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -161,8 +162,21 @@ func main() {
|
||||
|
||||
server.RegisterRoutes()
|
||||
|
||||
log.Infof("Version: %s, Git commit: %s", version.ReleaseVersion, version.GitCommit)
|
||||
iTLSEnabled := os.Getenv("INTERNAL_TLS_ENABLED")
|
||||
if strings.ToLower(iTLSEnabled) == "true" {
|
||||
log.Info("internal TLS enabled, Init TLS ...")
|
||||
iTLSKeyPath := os.Getenv("INTERNAL_TLS_KEY_PATH")
|
||||
iTLSCertPath := os.Getenv("INTERNAL_TLS_CERT_PATH")
|
||||
iTrustCA := os.Getenv("INTERNAL_TLS_TRUST_CA_PATH")
|
||||
|
||||
log.Infof("load client key: %s client cert: %s", iTLSKeyPath, iTLSCertPath)
|
||||
beego.BConfig.Listen.EnableHTTPS = true
|
||||
beego.BConfig.Listen.HTTPSPort = 8443
|
||||
beego.BConfig.Listen.HTTPSKeyFile = iTLSKeyPath
|
||||
beego.BConfig.Listen.HTTPSCertFile = iTLSCertPath
|
||||
}
|
||||
|
||||
log.Infof("Version: %s, Git commit: %s", version.ReleaseVersion, version.GitCommit)
|
||||
beego.RunWithMiddleWares("", middlewares.MiddleWares()...)
|
||||
}
|
||||
|
||||
|
@ -296,6 +296,9 @@ func (a *adapter) getProject(name string) (*project, error) {
|
||||
// when harbor is deployed on Kubernetes
|
||||
func (a *adapter) getURL() string {
|
||||
if a.registry.Type == model.RegistryTypeHarbor && a.registry.Name == "Local" {
|
||||
if common_http.InternalTLSEnabled() {
|
||||
return "https://core:8443"
|
||||
}
|
||||
return "http://127.0.0.1:8080"
|
||||
}
|
||||
return a.url
|
||||
|
Loading…
Reference in New Issue
Block a user