Improve the performance of replication

Improve the performance of replication by introducing a new API to check whether the blob can be mounted directly

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2021-03-18 23:07:21 +08:00
parent 017132611e
commit 09c3d042ea
11 changed files with 236 additions and 4 deletions

View File

@ -447,6 +447,37 @@ paths:
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/repositories:
get:
summary: List all authorized repositories
description: List all authorized repositories
tags:
- repository
operationId: listAllRepositories
parameters:
- $ref: '#/parameters/requestId'
- $ref: '#/parameters/query'
- $ref: '#/parameters/sort'
- $ref: '#/parameters/page'
- $ref: '#/parameters/pageSize'
responses:
'200':
description: Success
headers:
X-Total-Count:
description: The total count of repositories
type: integer
Link:
description: Link refers to the previous page and next page
type: string
schema:
type: array
items:
$ref: '#/definitions/Repository'
'400':
$ref: '#/responses/400'
'500':
$ref: '#/responses/500'
/projects/{project_name}/repositories:
get:
summary: List repositories

View File

@ -15,9 +15,13 @@
package models
import (
"context"
"fmt"
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/pkg/signature/notary/model"
"github.com/lib/pq"
"github.com/theupdateframework/notary/tuf/data"
)
@ -38,6 +42,21 @@ type RepoRecord struct {
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// FilterByBlobDigest filters the repositories by the blob digest
func (r *RepoRecord) FilterByBlobDigest(ctx context.Context, qs orm.QuerySeter, key string, value interface{}) orm.QuerySeter {
digest, ok := value.(string)
if !ok || len(digest) == 0 {
return qs
}
sql := fmt.Sprintf(`select distinct(a.repository_id)
from artifact as a
join artifact_blob as ab
on a.digest = ab.digest_af
where ab.digest_blob = %s`, pq.QuoteLiteral(digest))
return qs.FilterRaw("repository_id", fmt.Sprintf("in (%s)", sql))
}
// TableName is required by by beego orm to map RepoRecord to table repository
func (r *RepoRecord) TableName() string {
return RepoTable

View File

@ -38,6 +38,34 @@ func (_m *mockAdapter) BlobExist(repository string, digest string) (bool, error)
return r0, r1
}
// CanBeMount provides a mock function with given fields: digest
func (_m *mockAdapter) CanBeMount(digest string) (bool, string, error) {
ret := _m.Called(digest)
var r0 bool
if rf, ok := ret.Get(0).(func(string) bool); ok {
r0 = rf(digest)
} else {
r0 = ret.Get(0).(bool)
}
var r1 string
if rf, ok := ret.Get(1).(func(string) string); ok {
r1 = rf(digest)
} else {
r1 = ret.Get(1).(string)
}
var r2 error
if rf, ok := ret.Get(2).(func(string) error); ok {
r2 = rf(digest)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// DeleteManifest provides a mock function with given fields: repository, reference
func (_m *mockAdapter) DeleteManifest(repository string, reference string) error {
ret := _m.Called(repository, reference)
@ -163,6 +191,20 @@ func (_m *mockAdapter) ManifestExist(repository string, reference string) (bool,
return r0, r1, r2
}
// MountBlob provides a mock function with given fields: srcRepository, digest, dstRepository
func (_m *mockAdapter) MountBlob(srcRepository string, digest string, dstRepository string) error {
ret := _m.Called(srcRepository, digest, dstRepository)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, string) error); ok {
r0 = rf(srcRepository, digest, dstRepository)
} else {
r0 = ret.Error(0)
}
return r0
}
// PrepareForPush provides a mock function with given fields: _a0
func (_m *mockAdapter) PrepareForPush(_a0 []*model.Resource) error {
ret := _m.Called(_a0)

View File

@ -292,6 +292,20 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor
return nil
}
mount, repository, err := t.dst.CanBeMount(digest)
if err != nil {
t.logger.Errorf("failed to check whether the blob %s can be mounted on the destination registry: %v", digest, err)
return err
}
if mount {
if err = t.dst.MountBlob(repository, digest, dstRepo); err != nil {
t.logger.Errorf("failed to mount the blob %s on the destination registry: %v", digest, err)
return err
}
t.logger.Infof("the blob %s mounted from the repository %s on the destination registry directly", digest, repository)
return nil
}
size, data, err := t.src.PullBlob(srcRepo, digest)
if err != nil {
t.logger.Errorf("failed to pulling the blob %s: %v", digest, err)

View File

@ -96,6 +96,12 @@ func (f *fakeRegistry) PushBlob(repository, digest string, size int64, blob io.R
func (f *fakeRegistry) DeleteTag(repository, tag string) error {
return nil
}
func (f *fakeRegistry) CanBeMount(digest string) (bool, string, error) {
return false, "", nil
}
func (f *fakeRegistry) MountBlob(srcRepository, digest, dstRepository string) error {
return nil
}
func TestFactory(t *testing.T) {
tr, err := factory(nil, nil)

View File

@ -114,14 +114,14 @@ func NewRange(min, max interface{}) *Range {
}
// NewAndList creates a new and list
func NewAndList(values ...interface{}) *AndList {
func NewAndList(values []interface{}) *AndList {
return &AndList{
Values: values,
}
}
// NewOrList creates a new or list
func NewOrList(values ...interface{}) *OrList {
func NewOrList(values []interface{}) *OrList {
return &OrList{
Values: values,
}

View File

@ -60,6 +60,8 @@ type ArtifactRegistry interface {
BlobExist(repository, digest string) (exist bool, err error)
PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error)
PushBlob(repository, digest string, size int64, blob io.Reader) error
MountBlob(srcRepository, digest, dstRepository string) (err error)
CanBeMount(digest string) (mount bool, repository string, err error) // check whether the blob can be mounted from the remote registry
DeleteTag(repository, tag string) error
}

View File

@ -17,6 +17,7 @@ package v2
import (
"fmt"
"github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/utils"
adp "github.com/goharbor/harbor/src/pkg/reg/adapter"
"github.com/goharbor/harbor/src/pkg/reg/adapter/harbor/base"
@ -129,3 +130,18 @@ func (a *adapter) listArtifacts(repository string, filters []*model.Filter) ([]*
}
return filter.DoFilterArtifacts(artifacts, filters)
}
func (a *adapter) CanBeMount(digest string) (bool, string, error) {
repository, err := a.client.getRepositoryByBlobDigest(digest)
if err != nil {
// return false directly for the previous version of harbor which doesn't support list repositories API
if e, ok := err.(*http.Error); ok && e.Code == 404 {
return false, "", nil
}
return false, "", err
}
if len(repository) == 0 {
return false, "", nil
}
return true, repository, nil
}

View File

@ -77,3 +77,15 @@ func (c *client) deleteTag(repo, tag string) error {
c.BasePath(), project, repo, tag, tag)
return c.C.Delete(url)
}
func (c *client) getRepositoryByBlobDigest(digest string) (string, error) {
repositories := []*models.RepoRecord{}
url := fmt.Sprintf("%s/repositories?q=blob_digest=%s&page_size=1&page_number=1", c.BasePath(), digest)
if err := c.C.Get(url, &repositories); err != nil {
return "", err
}
if len(repositories) == 0 {
return "", nil
}
return repositories[0].Name, nil
}

View File

@ -243,3 +243,8 @@ func (a *Adapter) PingSimple() error {
func (a *Adapter) DeleteTag(repository, tag string) error {
return errors.New("the tag deletion isn't supported")
}
// CanBeMount isn't supported for docker registry
func (a *Adapter) CanBeMount(digest string) (mount bool, repository string, err error) {
return false, "", nil
}

View File

@ -17,17 +17,20 @@ package handler
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/controller/event/metadata"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/go-openapi/runtime/middleware"
cmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/rbac"
"github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/common/security/local"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/event/metadata"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/controller/repository"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/server/v2.0/handler/model"
"github.com/goharbor/harbor/src/server/v2.0/models"
operation "github.com/goharbor/harbor/src/server/v2.0/restapi/operations/repository"
@ -56,6 +59,88 @@ func (r *repositoryAPI) Prepare(ctx context.Context, operation string, params in
return nil
}
func (r *repositoryAPI) ListAllRepositories(ctx context.Context, params operation.ListAllRepositoriesParams) middleware.Responder {
// set query
query, err := r.BuildQuery(ctx, params.Q, params.Sort, params.Page, params.PageSize)
if err != nil {
return r.SendError(ctx, err)
}
secCtx, ok := security.FromContext(ctx)
if !ok {
return r.SendError(ctx, errors.UnauthorizedError(errors.New("security context not found")))
}
if !secCtx.IsSysAdmin() && !secCtx.IsSolutionUser() {
projectIDs, err := r.listAuthorizedProjectIDs(ctx)
if err != nil {
return r.SendError(ctx, err)
}
// no authorized projects, return nil directly
if len(projectIDs) == 0 {
return operation.NewListAllRepositoriesOK().
WithXTotalCount(0).
WithLink(r.Links(ctx, params.HTTPRequest.URL, 0, query.PageNumber, query.PageSize).String()).
WithPayload(nil)
}
orList := &q.OrList{}
for _, projectID := range projectIDs {
orList.Values = append(orList.Values, projectID)
}
query.Keywords["ProjectID"] = orList
}
total, err := r.repoCtl.Count(ctx, query)
if err != nil {
return r.SendError(ctx, err)
}
repositories, err := r.repoCtl.List(ctx, query)
if err != nil {
return r.SendError(ctx, err)
}
var repos []*models.Repository
for _, repository := range repositories {
repos = append(repos, r.assembleRepository(ctx, model.NewRepoRecord(repository)))
}
return operation.NewListAllRepositoriesOK().
WithXTotalCount(total).
WithLink(r.Links(ctx, params.HTTPRequest.URL, total, query.PageNumber, query.PageSize).String()).
WithPayload(repos)
}
func (r *repositoryAPI) listAuthorizedProjectIDs(ctx context.Context) ([]int64, error) {
secCtx, ok := security.FromContext(ctx)
if !ok {
return nil, errors.UnauthorizedError(errors.New("security context not found"))
}
query := &q.Query{
Keywords: map[string]interface{}{},
}
if secCtx.IsAuthenticated() {
switch secCtx.(type) {
case *local.SecurityContext:
currentUser := secCtx.(*local.SecurityContext).User()
query.Keywords["member"] = &project.MemberQuery{
UserID: currentUser.UserID,
GroupIDs: currentUser.GroupIDs,
WithPublic: true,
}
default:
query.Keywords["public"] = true
}
} else {
query.Keywords["public"] = true
}
projects, err := r.proCtl.List(ctx, query)
if err != nil {
return nil, err
}
var ids []int64
for _, project := range projects {
ids = append(ids, project.ProjectID)
}
return ids, nil
}
func (r *repositoryAPI) ListRepositories(ctx context.Context, params operation.ListRepositoriesParams) middleware.Responder {
if err := r.RequireProjectAccess(ctx, params.ProjectName, rbac.ActionList, rbac.ResourceRepository); err != nil {
return r.SendError(ctx, err)