From 09c3d042eac1bd5d2a4c91238f1ab047003d3314 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 18 Mar 2021 23:07:21 +0800 Subject: [PATCH] Improve the performance of replication Improve the performance of replication by introducing a new API to check whether the blob can be mounted directly Signed-off-by: Wenkai Yin --- api/v2.0/swagger.yaml | 31 +++++++ src/common/models/repo.go | 19 ++++ .../replication/flow/mock_adapter_test.go | 42 +++++++++ .../replication/transfer/image/transfer.go | 14 +++ .../transfer/image/transfer_test.go | 6 ++ src/lib/q/query.go | 4 +- src/pkg/reg/adapter/adapter.go | 2 + src/pkg/reg/adapter/harbor/v2/adapter.go | 16 ++++ src/pkg/reg/adapter/harbor/v2/client.go | 12 +++ src/pkg/reg/adapter/native/adapter.go | 5 ++ src/server/v2.0/handler/repository.go | 89 ++++++++++++++++++- 11 files changed, 236 insertions(+), 4 deletions(-) diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 9857af5de..b7f49ca84 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -447,6 +447,37 @@ paths: $ref: '#/responses/404' '500': $ref: '#/responses/500' + /repositories: + get: + summary: List all authorized repositories + description: List all authorized repositories + tags: + - repository + operationId: listAllRepositories + parameters: + - $ref: '#/parameters/requestId' + - $ref: '#/parameters/query' + - $ref: '#/parameters/sort' + - $ref: '#/parameters/page' + - $ref: '#/parameters/pageSize' + responses: + '200': + description: Success + headers: + X-Total-Count: + description: The total count of repositories + type: integer + Link: + description: Link refers to the previous page and next page + type: string + schema: + type: array + items: + $ref: '#/definitions/Repository' + '400': + $ref: '#/responses/400' + '500': + $ref: '#/responses/500' /projects/{project_name}/repositories: get: summary: List repositories diff --git a/src/common/models/repo.go b/src/common/models/repo.go index c5bdf88bf..1e6528a44 100644 --- a/src/common/models/repo.go +++ b/src/common/models/repo.go @@ -15,9 +15,13 @@ package models import ( + "context" + "fmt" "time" + "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/pkg/signature/notary/model" + "github.com/lib/pq" "github.com/theupdateframework/notary/tuf/data" ) @@ -38,6 +42,21 @@ type RepoRecord struct { UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` } +// FilterByBlobDigest filters the repositories by the blob digest +func (r *RepoRecord) FilterByBlobDigest(ctx context.Context, qs orm.QuerySeter, key string, value interface{}) orm.QuerySeter { + digest, ok := value.(string) + if !ok || len(digest) == 0 { + return qs + } + + sql := fmt.Sprintf(`select distinct(a.repository_id) + from artifact as a + join artifact_blob as ab + on a.digest = ab.digest_af + where ab.digest_blob = %s`, pq.QuoteLiteral(digest)) + return qs.FilterRaw("repository_id", fmt.Sprintf("in (%s)", sql)) +} + // TableName is required by by beego orm to map RepoRecord to table repository func (r *RepoRecord) TableName() string { return RepoTable diff --git a/src/controller/replication/flow/mock_adapter_test.go b/src/controller/replication/flow/mock_adapter_test.go index be69e611e..939f191a8 100644 --- a/src/controller/replication/flow/mock_adapter_test.go +++ b/src/controller/replication/flow/mock_adapter_test.go @@ -38,6 +38,34 @@ func (_m *mockAdapter) BlobExist(repository string, digest string) (bool, error) return r0, r1 } +// CanBeMount provides a mock function with given fields: digest +func (_m *mockAdapter) CanBeMount(digest string) (bool, string, error) { + ret := _m.Called(digest) + + var r0 bool + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(digest) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 string + if rf, ok := ret.Get(1).(func(string) string); ok { + r1 = rf(digest) + } else { + r1 = ret.Get(1).(string) + } + + var r2 error + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(digest) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // DeleteManifest provides a mock function with given fields: repository, reference func (_m *mockAdapter) DeleteManifest(repository string, reference string) error { ret := _m.Called(repository, reference) @@ -163,6 +191,20 @@ func (_m *mockAdapter) ManifestExist(repository string, reference string) (bool, return r0, r1, r2 } +// MountBlob provides a mock function with given fields: srcRepository, digest, dstRepository +func (_m *mockAdapter) MountBlob(srcRepository string, digest string, dstRepository string) error { + ret := _m.Called(srcRepository, digest, dstRepository) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, string) error); ok { + r0 = rf(srcRepository, digest, dstRepository) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // PrepareForPush provides a mock function with given fields: _a0 func (_m *mockAdapter) PrepareForPush(_a0 []*model.Resource) error { ret := _m.Called(_a0) diff --git a/src/controller/replication/transfer/image/transfer.go b/src/controller/replication/transfer/image/transfer.go index 58ac71d05..fd0bee376 100644 --- a/src/controller/replication/transfer/image/transfer.go +++ b/src/controller/replication/transfer/image/transfer.go @@ -292,6 +292,20 @@ func (t *transfer) copyBlob(srcRepo, dstRepo, digest string, sizeFromDescriptor return nil } + mount, repository, err := t.dst.CanBeMount(digest) + if err != nil { + t.logger.Errorf("failed to check whether the blob %s can be mounted on the destination registry: %v", digest, err) + return err + } + if mount { + if err = t.dst.MountBlob(repository, digest, dstRepo); err != nil { + t.logger.Errorf("failed to mount the blob %s on the destination registry: %v", digest, err) + return err + } + t.logger.Infof("the blob %s mounted from the repository %s on the destination registry directly", digest, repository) + return nil + } + size, data, err := t.src.PullBlob(srcRepo, digest) if err != nil { t.logger.Errorf("failed to pulling the blob %s: %v", digest, err) diff --git a/src/controller/replication/transfer/image/transfer_test.go b/src/controller/replication/transfer/image/transfer_test.go index 24b0fecbc..edc44d1f6 100644 --- a/src/controller/replication/transfer/image/transfer_test.go +++ b/src/controller/replication/transfer/image/transfer_test.go @@ -96,6 +96,12 @@ func (f *fakeRegistry) PushBlob(repository, digest string, size int64, blob io.R func (f *fakeRegistry) DeleteTag(repository, tag string) error { return nil } +func (f *fakeRegistry) CanBeMount(digest string) (bool, string, error) { + return false, "", nil +} +func (f *fakeRegistry) MountBlob(srcRepository, digest, dstRepository string) error { + return nil +} func TestFactory(t *testing.T) { tr, err := factory(nil, nil) diff --git a/src/lib/q/query.go b/src/lib/q/query.go index e8dc4a3cf..e1bf9c21f 100644 --- a/src/lib/q/query.go +++ b/src/lib/q/query.go @@ -114,14 +114,14 @@ func NewRange(min, max interface{}) *Range { } // NewAndList creates a new and list -func NewAndList(values ...interface{}) *AndList { +func NewAndList(values []interface{}) *AndList { return &AndList{ Values: values, } } // NewOrList creates a new or list -func NewOrList(values ...interface{}) *OrList { +func NewOrList(values []interface{}) *OrList { return &OrList{ Values: values, } diff --git a/src/pkg/reg/adapter/adapter.go b/src/pkg/reg/adapter/adapter.go index 39a8ab71c..dbac8bc6e 100644 --- a/src/pkg/reg/adapter/adapter.go +++ b/src/pkg/reg/adapter/adapter.go @@ -60,6 +60,8 @@ type ArtifactRegistry interface { BlobExist(repository, digest string) (exist bool, err error) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) PushBlob(repository, digest string, size int64, blob io.Reader) error + MountBlob(srcRepository, digest, dstRepository string) (err error) + CanBeMount(digest string) (mount bool, repository string, err error) // check whether the blob can be mounted from the remote registry DeleteTag(repository, tag string) error } diff --git a/src/pkg/reg/adapter/harbor/v2/adapter.go b/src/pkg/reg/adapter/harbor/v2/adapter.go index 204fd3366..9645d3749 100644 --- a/src/pkg/reg/adapter/harbor/v2/adapter.go +++ b/src/pkg/reg/adapter/harbor/v2/adapter.go @@ -17,6 +17,7 @@ package v2 import ( "fmt" + "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/utils" adp "github.com/goharbor/harbor/src/pkg/reg/adapter" "github.com/goharbor/harbor/src/pkg/reg/adapter/harbor/base" @@ -129,3 +130,18 @@ func (a *adapter) listArtifacts(repository string, filters []*model.Filter) ([]* } return filter.DoFilterArtifacts(artifacts, filters) } + +func (a *adapter) CanBeMount(digest string) (bool, string, error) { + repository, err := a.client.getRepositoryByBlobDigest(digest) + if err != nil { + // return false directly for the previous version of harbor which doesn't support list repositories API + if e, ok := err.(*http.Error); ok && e.Code == 404 { + return false, "", nil + } + return false, "", err + } + if len(repository) == 0 { + return false, "", nil + } + return true, repository, nil +} diff --git a/src/pkg/reg/adapter/harbor/v2/client.go b/src/pkg/reg/adapter/harbor/v2/client.go index 26c2df3ff..9ddc3e17d 100644 --- a/src/pkg/reg/adapter/harbor/v2/client.go +++ b/src/pkg/reg/adapter/harbor/v2/client.go @@ -77,3 +77,15 @@ func (c *client) deleteTag(repo, tag string) error { c.BasePath(), project, repo, tag, tag) return c.C.Delete(url) } + +func (c *client) getRepositoryByBlobDigest(digest string) (string, error) { + repositories := []*models.RepoRecord{} + url := fmt.Sprintf("%s/repositories?q=blob_digest=%s&page_size=1&page_number=1", c.BasePath(), digest) + if err := c.C.Get(url, &repositories); err != nil { + return "", err + } + if len(repositories) == 0 { + return "", nil + } + return repositories[0].Name, nil +} diff --git a/src/pkg/reg/adapter/native/adapter.go b/src/pkg/reg/adapter/native/adapter.go index be1a38258..b7cdbf675 100644 --- a/src/pkg/reg/adapter/native/adapter.go +++ b/src/pkg/reg/adapter/native/adapter.go @@ -243,3 +243,8 @@ func (a *Adapter) PingSimple() error { func (a *Adapter) DeleteTag(repository, tag string) error { return errors.New("the tag deletion isn't supported") } + +// CanBeMount isn't supported for docker registry +func (a *Adapter) CanBeMount(digest string) (mount bool, repository string, err error) { + return false, "", nil +} diff --git a/src/server/v2.0/handler/repository.go b/src/server/v2.0/handler/repository.go index ade0fe3df..1a702e7c5 100644 --- a/src/server/v2.0/handler/repository.go +++ b/src/server/v2.0/handler/repository.go @@ -17,17 +17,20 @@ package handler import ( "context" "fmt" - "github.com/goharbor/harbor/src/controller/event/metadata" - "github.com/goharbor/harbor/src/pkg/notification" "github.com/go-openapi/runtime/middleware" cmodels "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/rbac" + "github.com/goharbor/harbor/src/common/security" + "github.com/goharbor/harbor/src/common/security/local" "github.com/goharbor/harbor/src/controller/artifact" + "github.com/goharbor/harbor/src/controller/event/metadata" "github.com/goharbor/harbor/src/controller/project" "github.com/goharbor/harbor/src/controller/repository" + "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/notification" "github.com/goharbor/harbor/src/server/v2.0/handler/model" "github.com/goharbor/harbor/src/server/v2.0/models" operation "github.com/goharbor/harbor/src/server/v2.0/restapi/operations/repository" @@ -56,6 +59,88 @@ func (r *repositoryAPI) Prepare(ctx context.Context, operation string, params in return nil } +func (r *repositoryAPI) ListAllRepositories(ctx context.Context, params operation.ListAllRepositoriesParams) middleware.Responder { + // set query + query, err := r.BuildQuery(ctx, params.Q, params.Sort, params.Page, params.PageSize) + if err != nil { + return r.SendError(ctx, err) + } + secCtx, ok := security.FromContext(ctx) + if !ok { + return r.SendError(ctx, errors.UnauthorizedError(errors.New("security context not found"))) + } + if !secCtx.IsSysAdmin() && !secCtx.IsSolutionUser() { + projectIDs, err := r.listAuthorizedProjectIDs(ctx) + if err != nil { + return r.SendError(ctx, err) + } + // no authorized projects, return nil directly + if len(projectIDs) == 0 { + return operation.NewListAllRepositoriesOK(). + WithXTotalCount(0). + WithLink(r.Links(ctx, params.HTTPRequest.URL, 0, query.PageNumber, query.PageSize).String()). + WithPayload(nil) + } + orList := &q.OrList{} + for _, projectID := range projectIDs { + orList.Values = append(orList.Values, projectID) + } + query.Keywords["ProjectID"] = orList + } + + total, err := r.repoCtl.Count(ctx, query) + if err != nil { + return r.SendError(ctx, err) + } + repositories, err := r.repoCtl.List(ctx, query) + if err != nil { + return r.SendError(ctx, err) + } + var repos []*models.Repository + for _, repository := range repositories { + repos = append(repos, r.assembleRepository(ctx, model.NewRepoRecord(repository))) + } + return operation.NewListAllRepositoriesOK(). + WithXTotalCount(total). + WithLink(r.Links(ctx, params.HTTPRequest.URL, total, query.PageNumber, query.PageSize).String()). + WithPayload(repos) +} + +func (r *repositoryAPI) listAuthorizedProjectIDs(ctx context.Context) ([]int64, error) { + secCtx, ok := security.FromContext(ctx) + if !ok { + return nil, errors.UnauthorizedError(errors.New("security context not found")) + } + query := &q.Query{ + Keywords: map[string]interface{}{}, + } + if secCtx.IsAuthenticated() { + switch secCtx.(type) { + case *local.SecurityContext: + currentUser := secCtx.(*local.SecurityContext).User() + query.Keywords["member"] = &project.MemberQuery{ + UserID: currentUser.UserID, + GroupIDs: currentUser.GroupIDs, + WithPublic: true, + } + default: + query.Keywords["public"] = true + } + } else { + query.Keywords["public"] = true + } + + projects, err := r.proCtl.List(ctx, query) + if err != nil { + return nil, err + } + var ids []int64 + for _, project := range projects { + ids = append(ids, project.ProjectID) + } + return ids, nil +} + func (r *repositoryAPI) ListRepositories(ctx context.Context, params operation.ListRepositoriesParams) middleware.Responder { if err := r.RequireProjectAccess(ctx, params.ProjectName, rbac.ActionList, rbac.ResourceRepository); err != nil { return r.SendError(ctx, err)