From 1dfc47d24e56eaf004b2a1c6fddf59a6c1fcd644 Mon Sep 17 00:00:00 2001 From: Wang Yan Date: Fri, 19 Jul 2019 17:46:41 +0800 Subject: [PATCH] Add size middleware to support quota [Add]: 1, size middleware for quota size 2, count middleware for quota artifact count [Support]: 1, put, patch, mount blob 2, put manifest [Refactor]: 1, Add handle response for middlerware 2, Remove the modifyResponse for registry proxy 3, Use the custom response writer to recored status Signed-off-by: wang yan --- src/common/dao/artifact_blob.go | 3 + src/common/dao/blob.go | 14 ++ src/common/dao/blob_test.go | 40 +++ src/core/middlewares/blobquota/handler.go | 120 --------- src/core/middlewares/chain.go | 4 +- src/core/middlewares/config.go | 4 +- src/core/middlewares/countquota/handler.go | 199 ++------------- .../middlewares/countquota/putmanifest.go | 211 ++++++++++++++++ src/core/middlewares/inlet.go | 4 +- src/core/middlewares/registryproxy/handler.go | 150 +----------- src/core/middlewares/sizequota/handler.go | 231 ++++++++++++++++++ .../middlewares/sizequota/handler_test.go | 177 ++++++++++++++ src/core/middlewares/sizequota/mountblob.go | 69 ++++++ .../middlewares/sizequota/mountblob_test.go | 85 +++++++ src/core/middlewares/sizequota/patchblob.go | 86 +++++++ .../middlewares/sizequota/patchblob_test.go | 42 ++++ src/core/middlewares/sizequota/putblob.go | 83 +++++++ .../middlewares/sizequota/putblob_test.go | 80 ++++++ src/core/middlewares/sizequota/putmanifest.go | 102 ++++++++ .../middlewares/sizequota/putmanifest_test.go | 92 +++++++ src/core/middlewares/util/reginteceptor.go | 28 +++ src/core/middlewares/util/response.go | 59 +++++ src/core/middlewares/util/response_test.go | 29 +++ src/core/middlewares/util/util.go | 159 +++++++++++- src/core/middlewares/util/util_test.go | 124 +++++++++- 25 files changed, 1743 insertions(+), 452 deletions(-) delete mode 100644 src/core/middlewares/blobquota/handler.go create mode 100644 src/core/middlewares/countquota/putmanifest.go create mode 100644 src/core/middlewares/sizequota/handler.go create mode 100644 src/core/middlewares/sizequota/handler_test.go create mode 100644 src/core/middlewares/sizequota/mountblob.go create mode 100644 src/core/middlewares/sizequota/mountblob_test.go create mode 100644 src/core/middlewares/sizequota/patchblob.go create mode 100644 src/core/middlewares/sizequota/patchblob_test.go create mode 100644 src/core/middlewares/sizequota/putblob.go create mode 100644 src/core/middlewares/sizequota/putblob_test.go create mode 100644 src/core/middlewares/sizequota/putmanifest.go create mode 100644 src/core/middlewares/sizequota/putmanifest_test.go create mode 100644 src/core/middlewares/util/reginteceptor.go create mode 100644 src/core/middlewares/util/response.go create mode 100644 src/core/middlewares/util/response_test.go diff --git a/src/common/dao/artifact_blob.go b/src/common/dao/artifact_blob.go index e0f0f089e..f1bcabb56 100644 --- a/src/common/dao/artifact_blob.go +++ b/src/common/dao/artifact_blob.go @@ -52,6 +52,9 @@ func AddArtifactNBlobs(afnbs []*models.ArtifactAndBlob) error { successNums, err := o.InsertMulti(total, afnbs) if err != nil { errInsertMultiple = err + if strings.Contains(err.Error(), "duplicate key value violates unique constraint") { + errInsertMultiple = errors.Wrap(errInsertMultiple, ErrDupRows.Error()) + } err := o.Rollback() if err != nil { log.Errorf("fail to rollback when to insert multiple artifact and blobs, %v", err) diff --git a/src/common/dao/blob.go b/src/common/dao/blob.go index e63d218d2..9a50bc3bd 100644 --- a/src/common/dao/blob.go +++ b/src/common/dao/blob.go @@ -2,6 +2,7 @@ package dao import ( "fmt" + "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "strings" @@ -48,3 +49,16 @@ func DeleteBlob(digest string) error { _, err := o.QueryTable("blob").Filter("digest", digest).Delete() return err } + +// HasBlobInProject ... +func HasBlobInProject(projectID int64, digest string) (bool, error) { + var res []orm.Params + num, err := GetOrmer().Raw(`SELECT * FROM artifact af LEFT JOIN artifact_blob afnb ON af.digest = afnb.digest_af WHERE af.project_id = ? and afnb.digest_blob = ? `, projectID, digest).Values(&res) + if err != nil { + return false, err + } + if num == 0 { + return false, nil + } + return true, nil +} diff --git a/src/common/dao/blob_test.go b/src/common/dao/blob_test.go index 0ae70ff5c..9d5403563 100644 --- a/src/common/dao/blob_test.go +++ b/src/common/dao/blob_test.go @@ -63,3 +63,43 @@ func TestDeleteBlob(t *testing.T) { err = DeleteBlob(blob.Digest) require.Nil(t, err) } + +func TestHasBlobInProject(t *testing.T) { + af := &models.Artifact{ + PID: 1, + Repo: "TestHasBlobInProject", + Tag: "latest", + Digest: "tttt", + Kind: "image", + } + + // add + _, err := AddArtifact(af) + require.Nil(t, err) + + afnb1 := &models.ArtifactAndBlob{ + DigestAF: "tttt", + DigestBlob: "zzza", + } + afnb2 := &models.ArtifactAndBlob{ + DigestAF: "tttt", + DigestBlob: "zzzb", + } + afnb3 := &models.ArtifactAndBlob{ + DigestAF: "tttt", + DigestBlob: "zzzc", + } + + var afnbs []*models.ArtifactAndBlob + afnbs = append(afnbs, afnb1) + afnbs = append(afnbs, afnb2) + afnbs = append(afnbs, afnb3) + + // add + err = AddArtifactNBlobs(afnbs) + require.Nil(t, err) + + has, err := HasBlobInProject(1, "zzzb") + require.Nil(t, err) + assert.True(t, has) +} diff --git a/src/core/middlewares/blobquota/handler.go b/src/core/middlewares/blobquota/handler.go deleted file mode 100644 index 91558fa57..000000000 --- a/src/core/middlewares/blobquota/handler.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package blobquota - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/core/middlewares/util" - "github.com/opencontainers/go-digest" - "github.com/pkg/errors" - "net/http" - "time" -) - -type blobQuotaHandler struct { - next http.Handler -} - -// New ... -func New(next http.Handler) http.Handler { - return &blobQuotaHandler{ - next: next, - } -} - -// ServeHTTP ... -func (bqh blobQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if req.Method == http.MethodPut { - match, _ := util.MatchPutBlobURL(req) - if match { - dgstStr := req.FormValue("digest") - if dgstStr == "" { - http.Error(rw, util.MarshalError("InternalServerError", "blob digest missing"), http.StatusInternalServerError) - return - } - dgst, err := digest.Parse(dgstStr) - if err != nil { - http.Error(rw, util.MarshalError("InternalServerError", "blob digest parsing failed"), http.StatusInternalServerError) - return - } - // ToDo lock digest with redis - - // ToDo read placeholder from config - state, err := hmacKey("placeholder").unpackUploadState(req.FormValue("_state")) - if err != nil { - http.Error(rw, util.MarshalError("InternalServerError", "failed to decode state"), http.StatusInternalServerError) - return - } - log.Infof("we need to insert blob data into DB.") - log.Infof("blob digest, %v", dgst) - log.Infof("blob size, %v", state.Offset) - } - - } - bqh.next.ServeHTTP(rw, req) -} - -// blobUploadState captures the state serializable state of the blob upload. -type blobUploadState struct { - // name is the primary repository under which the blob will be linked. - Name string - - // UUID identifies the upload. - UUID string - - // offset contains the current progress of the upload. - Offset int64 - - // StartedAt is the original start time of the upload. - StartedAt time.Time -} - -type hmacKey string - -var errInvalidSecret = errors.New("invalid secret") - -// unpackUploadState unpacks and validates the blob upload state from the -// token, using the hmacKey secret. -func (secret hmacKey) unpackUploadState(token string) (blobUploadState, error) { - var state blobUploadState - - tokenBytes, err := base64.URLEncoding.DecodeString(token) - if err != nil { - return state, err - } - mac := hmac.New(sha256.New, []byte(secret)) - - if len(tokenBytes) < mac.Size() { - return state, errInvalidSecret - } - - macBytes := tokenBytes[:mac.Size()] - messageBytes := tokenBytes[mac.Size():] - - mac.Write(messageBytes) - if !hmac.Equal(mac.Sum(nil), macBytes) { - return state, errInvalidSecret - } - - if err := json.Unmarshal(messageBytes, &state); err != nil { - return state, err - } - - return state, nil -} diff --git a/src/core/middlewares/chain.go b/src/core/middlewares/chain.go index 4141022ec..fba7a9300 100644 --- a/src/core/middlewares/chain.go +++ b/src/core/middlewares/chain.go @@ -16,12 +16,12 @@ package middlewares import ( "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/core/middlewares/blobquota" "github.com/goharbor/harbor/src/core/middlewares/contenttrust" "github.com/goharbor/harbor/src/core/middlewares/countquota" "github.com/goharbor/harbor/src/core/middlewares/listrepo" "github.com/goharbor/harbor/src/core/middlewares/multiplmanifest" "github.com/goharbor/harbor/src/core/middlewares/readonly" + "github.com/goharbor/harbor/src/core/middlewares/sizequota" "github.com/goharbor/harbor/src/core/middlewares/url" "github.com/goharbor/harbor/src/core/middlewares/vulnerable" "github.com/justinas/alice" @@ -65,8 +65,8 @@ func (b *DefaultCreator) geMiddleware(mName string) alice.Constructor { LISTREPO: func(next http.Handler) http.Handler { return listrepo.New(next) }, CONTENTTRUST: func(next http.Handler) http.Handler { return contenttrust.New(next) }, VULNERABLE: func(next http.Handler) http.Handler { return vulnerable.New(next) }, + SIZEQUOTA: func(next http.Handler) http.Handler { return sizequota.New(next) }, COUNTQUOTA: func(next http.Handler) http.Handler { return countquota.New(next) }, - BLOBQUOTA: func(next http.Handler) http.Handler { return blobquota.New(next) }, } return middlewares[mName] } diff --git a/src/core/middlewares/config.go b/src/core/middlewares/config.go index 8560c5225..fb9ea5f75 100644 --- a/src/core/middlewares/config.go +++ b/src/core/middlewares/config.go @@ -22,9 +22,9 @@ const ( LISTREPO = "listrepo" CONTENTTRUST = "contenttrust" VULNERABLE = "vulnerable" + SIZEQUOTA = "sizequota" COUNTQUOTA = "countquota" - BLOBQUOTA = "blobquota" ) // Middlewares with sequential organization -var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, BLOBQUOTA, COUNTQUOTA} +var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, SIZEQUOTA, COUNTQUOTA} diff --git a/src/core/middlewares/countquota/handler.go b/src/core/middlewares/countquota/handler.go index b9e5a7546..c08078177 100644 --- a/src/core/middlewares/countquota/handler.go +++ b/src/core/middlewares/countquota/handler.go @@ -15,41 +15,14 @@ package countquota import ( - "bytes" - "context" - "errors" "fmt" - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/schema1" - "github.com/docker/distribution/manifest/schema2" - "github.com/garyburd/redigo/redis" - "github.com/goharbor/harbor/src/common/dao" - "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/quota" - common_util "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" - common_redis "github.com/goharbor/harbor/src/common/utils/redis" - "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/middlewares/util" - "io/ioutil" "net/http" - "strconv" - "strings" - "time" ) -const ( - dialConnectionTimeout = 30 * time.Second - dialReadTimeout = time.Minute + 10*time.Second - dialWriteTimeout = 10 * time.Second -) - -// ErrRequireQuota ... -var ErrRequireQuota = errors.New("cannot get quota on project for request") - type countQuotaHandler struct { - next http.Handler - mfInfo *util.MfInfo + next http.Handler } // New ... @@ -61,158 +34,32 @@ func New(next http.Handler) http.Handler { // ServeHTTP manifest ... func (cqh *countQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - match, repository, tag := util.MatchPushManifest(req) - if match { - mfInfo := &util.MfInfo{ - Repository: repository, - Tag: tag, - } - cqh.mfInfo = mfInfo - - mediaType := req.Header.Get("Content-Type") - if mediaType == schema1.MediaTypeManifest || - mediaType == schema1.MediaTypeSignedManifest || - mediaType == schema2.MediaTypeManifest { - - tagLock, err := cqh.tryLockTag() - if err != nil { - log.Warningf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err)), http.StatusInternalServerError) - return - } - cqh.mfInfo.TagLock = tagLock - - data, err := ioutil.ReadAll(req.Body) - if err != nil { - cqh.tryFreeTag() - log.Warningf("Error occurred when to copy manifest body %v", err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to decode manifest body %v", err)), http.StatusInternalServerError) - return - } - req.Body = ioutil.NopCloser(bytes.NewBuffer(data)) - - manifest, desc, err := distribution.UnmarshalManifest(mediaType, data) - if err != nil { - cqh.tryFreeTag() - log.Warningf("Error occurred when to Unmarshal Manifest %v", err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to Unmarshal Manifest %v", err)), http.StatusInternalServerError) - return - } - cqh.mfInfo.Refrerence = manifest.References() - cqh.mfInfo.Digest = desc.Digest.String() - - projectID, err := cqh.getProjectID(strings.Split(repository, "/")[0]) - if err != nil { - log.Warningf("Error occurred when to get project ID %v", err) - return - } - cqh.mfInfo.ProjectID = projectID - - imageExist, af, err := cqh.imageExist() - if err != nil { - cqh.tryFreeTag() - log.Warningf("Error occurred when to check Manifest existence by repo and tag name %v", err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to check Manifest existence %v", err)), http.StatusInternalServerError) - return - } - cqh.mfInfo.Exist = imageExist - if imageExist { - if af.Digest != cqh.mfInfo.Digest { - cqh.mfInfo.DigestChanged = true - } - } else { - quotaRes := "a.ResourceList{ - quota.ResourceCount: 1, - } - err := cqh.tryRequireQuota(quotaRes) - if err != nil { - cqh.tryFreeTag() - log.Errorf("Cannot get quota for the manifest %v", err) - if err == ErrRequireQuota { - http.Error(rw, util.MarshalError("StatusNotAcceptable", fmt.Sprintf("Cannot get quota for the manifest %v", err)), http.StatusNotAcceptable) - return - } - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to require quota for the manifest %v", err)), http.StatusInternalServerError) - return - } - cqh.mfInfo.Quota = quotaRes - } - - *req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, mfInfo))) - } - + countInteceptor := getInteceptor(req) + if countInteceptor == nil { + cqh.next.ServeHTTP(rw, req) + return + } + // handler request + if err := countInteceptor.HandleRequest(req); err != nil { + log.Warningf("Error occurred when to handle request in count quota handler: %v", err) + http.Error(rw, util.MarshalError("InternalError", fmt.Sprintf("Error occurred when to handle request in count quota handler: %v", err)), + http.StatusInternalServerError) + return } - cqh.next.ServeHTTP(rw, req) + + // handler response + countInteceptor.HandleResponse(*rw.(*util.CustomResponseWriter), req) } -// tryLockTag locks tag with redis ... -func (cqh *countQuotaHandler) tryLockTag() (*common_redis.Mutex, error) { - con, err := redis.DialURL( - config.GetRedisOfRegURL(), - redis.DialConnectTimeout(dialConnectionTimeout), - redis.DialReadTimeout(dialReadTimeout), - redis.DialWriteTimeout(dialWriteTimeout), - ) - if err != nil { - return nil, err - } - tagLock := common_redis.New(con, cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag, common_util.GenerateRandomString()) - success, err := tagLock.Require() - if err != nil { - return nil, err - } - if !success { - return nil, fmt.Errorf("unable to lock tag: %s ", cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag) - } - return tagLock, nil -} - -func (cqh *countQuotaHandler) tryFreeTag() { - _, err := cqh.mfInfo.TagLock.Free() - if err != nil { - log.Warningf("Error to unlock tag: %s, with error: %v ", cqh.mfInfo.Tag, err) - } -} - -// check the existence of a artifact, if exist, the method will return the artifact model -func (cqh *countQuotaHandler) imageExist() (exist bool, af *models.Artifact, err error) { - artifactQuery := &models.ArtifactQuery{ - PID: cqh.mfInfo.ProjectID, - Repo: cqh.mfInfo.Repository, - Tag: cqh.mfInfo.Tag, - } - afs, err := dao.ListArtifacts(artifactQuery) - if err != nil { - log.Errorf("Error occurred when to get project ID %v", err) - return false, nil, err - } - if len(afs) > 0 { - return true, afs[0], nil - } - return false, nil, nil -} - -func (cqh *countQuotaHandler) tryRequireQuota(quotaRes *quota.ResourceList) error { - quotaMgr, err := quota.NewManager("project", strconv.FormatInt(cqh.mfInfo.ProjectID, 10)) - if err != nil { - log.Errorf("Error occurred when to new quota manager %v", err) - return err - } - if err := quotaMgr.AddResources(*quotaRes); err != nil { - log.Errorf("Cannot get quota for the manifest %v", err) - return ErrRequireQuota +func getInteceptor(req *http.Request) util.RegInterceptor { + // PUT /v2//manifests/ + matchPushMF, repository, tag := util.MatchPushManifest(req) + if matchPushMF { + mfInfo := util.MfInfo{} + mfInfo.Repository = repository + mfInfo.Tag = tag + return NewPutManifestInterceptor(&mfInfo) } return nil } - -func (cqh *countQuotaHandler) getProjectID(name string) (int64, error) { - project, err := dao.GetProjectByName(name) - if err != nil { - return 0, err - } - if project != nil { - return project.ProjectID, nil - } - return 0, fmt.Errorf("project %s is not found", name) -} diff --git a/src/core/middlewares/countquota/putmanifest.go b/src/core/middlewares/countquota/putmanifest.go new file mode 100644 index 000000000..1c1f3c789 --- /dev/null +++ b/src/core/middlewares/countquota/putmanifest.go @@ -0,0 +1,211 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package countquota + +import ( + "context" + "errors" + "fmt" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" + common_util "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/common/utils/log" + common_redis "github.com/goharbor/harbor/src/common/utils/redis" + "github.com/goharbor/harbor/src/core/middlewares/util" + "net/http" + "strings" + "time" +) + +// PutManifestInterceptor ... +type PutManifestInterceptor struct { + mfInfo *util.MfInfo +} + +// NewPutManifestInterceptor ... +func NewPutManifestInterceptor(mfInfo *util.MfInfo) *PutManifestInterceptor { + return &PutManifestInterceptor{ + mfInfo: mfInfo, + } +} + +// HandleRequest ... +// The context has already contain mfinfo as it was put by size quota handler. +func (pmi *PutManifestInterceptor) HandleRequest(req *http.Request) error { + mfInfo := req.Context().Value(util.MFInfokKey) + mf, ok := mfInfo.(*util.MfInfo) + if !ok { + return errors.New("failed to get manifest infor from context") + } + + tagLock, err := tryLockTag(mf) + if err != nil { + return fmt.Errorf("error occurred when to lock tag %s:%s with digest %v", mf.Repository, mf.Tag, err) + } + mf.TagLock = tagLock + + imageExist, af, err := imageExist(mf) + if err != nil { + tryFreeTag(mf) + return fmt.Errorf("error occurred when to check Manifest existence %v", err) + } + mf.Exist = imageExist + if imageExist { + if af.Digest != mf.Digest { + mf.DigestChanged = true + } + } else { + quotaRes := "a.ResourceList{ + quota.ResourceCount: 1, + } + err := util.TryRequireQuota(mf.ProjectID, quotaRes) + if err != nil { + tryFreeTag(mf) + log.Errorf("Cannot get quota for the manifest %v", err) + if err == util.ErrRequireQuota { + return err + } + return fmt.Errorf("error occurred when to require quota for the manifest %v", err) + } + mf.Quota = quotaRes + } + *req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, mf))) + return nil +} + +// HandleResponse ... +func (pmi *PutManifestInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) { + mfInfo := req.Context().Value(util.MFInfokKey) + mf, ok := mfInfo.(*util.MfInfo) + if !ok { + log.Error("failed to convert manifest information context into MfInfo") + return + } + defer func() { + _, err := mf.TagLock.Free() + if err != nil { + log.Errorf("Error to unlock in response handler, %v", err) + } + if err := mf.TagLock.Conn.Close(); err != nil { + log.Errorf("Error to close redis connection in response handler, %v", err) + } + }() + + // 201 + if rw.Status() == http.StatusCreated { + af := &models.Artifact{ + PID: mf.ProjectID, + Repo: mf.Repository, + Tag: mf.Tag, + Digest: mf.Digest, + PushTime: time.Now(), + Kind: "Docker-Image", + } + + // insert or update + if !mf.Exist { + _, err := dao.AddArtifact(af) + if err != nil { + log.Errorf("Error to add artifact, %v", err) + return + } + } + if mf.DigestChanged { + err := dao.UpdateArtifactDigest(af) + if err != nil { + log.Errorf("Error to add artifact, %v", err) + return + } + } + + if !mf.Exist || mf.DigestChanged { + afnbs := []*models.ArtifactAndBlob{} + self := &models.ArtifactAndBlob{ + DigestAF: mf.Digest, + DigestBlob: mf.Digest, + } + afnbs = append(afnbs, self) + for _, d := range mf.Refrerence { + afnb := &models.ArtifactAndBlob{ + DigestAF: mf.Digest, + DigestBlob: d.Digest.String(), + } + afnbs = append(afnbs, afnb) + } + if err := dao.AddArtifactNBlobs(afnbs); err != nil { + if strings.Contains(err.Error(), dao.ErrDupRows.Error()) { + log.Warning("the artifact and blobs have already in the DB, it maybe an existing image with different tag") + return + } + log.Errorf("Error to add artifact and blobs in proxy response handler, %v", err) + return + } + } + + } else if rw.Status() >= 300 || rw.Status() <= 511 { + if !mf.Exist { + success := util.TryFreeQuota(mf.ProjectID, mf.Quota) + if !success { + log.Error("error to release resource booked for the manifest") + return + } + } + } + + return +} + +// tryLockTag locks tag with redis ... +func tryLockTag(mfInfo *util.MfInfo) (*common_redis.Mutex, error) { + con, err := util.GetRegRedisCon() + if err != nil { + return nil, err + } + tagLock := common_redis.New(con, "Quota::manifest-lock::"+mfInfo.Repository+":"+mfInfo.Tag, common_util.GenerateRandomString()) + success, err := tagLock.Require() + if err != nil { + return nil, err + } + if !success { + return nil, fmt.Errorf("unable to lock tag: %s ", mfInfo.Repository+":"+mfInfo.Tag) + } + return tagLock, nil +} + +func tryFreeTag(mfInfo *util.MfInfo) { + _, err := mfInfo.TagLock.Free() + if err != nil { + log.Warningf("Error to unlock tag: %s, with error: %v ", mfInfo.Tag, err) + } +} + +// check the existence of a artifact, if exist, the method will return the artifact model +func imageExist(mfInfo *util.MfInfo) (exist bool, af *models.Artifact, err error) { + artifactQuery := &models.ArtifactQuery{ + PID: mfInfo.ProjectID, + Repo: mfInfo.Repository, + Tag: mfInfo.Tag, + } + afs, err := dao.ListArtifacts(artifactQuery) + if err != nil { + log.Errorf("Error occurred when to get project ID %v", err) + return false, nil, err + } + if len(afs) > 0 { + return true, afs[0], nil + } + return false, nil, nil +} diff --git a/src/core/middlewares/inlet.go b/src/core/middlewares/inlet.go index d7a505613..65854e87a 100644 --- a/src/core/middlewares/inlet.go +++ b/src/core/middlewares/inlet.go @@ -17,6 +17,7 @@ package middlewares import ( "errors" "github.com/goharbor/harbor/src/core/middlewares/registryproxy" + "github.com/goharbor/harbor/src/core/middlewares/util" "net/http" ) @@ -35,5 +36,6 @@ func Init() error { // Handle handles the request. func Handle(rw http.ResponseWriter, req *http.Request) { - head.ServeHTTP(rw, req) + customResW := util.NewCustomResponseWriter(rw) + head.ServeHTTP(customResW, req) } diff --git a/src/core/middlewares/registryproxy/handler.go b/src/core/middlewares/registryproxy/handler.go index 71b1ee479..72a9f02f0 100644 --- a/src/core/middlewares/registryproxy/handler.go +++ b/src/core/middlewares/registryproxy/handler.go @@ -15,19 +15,11 @@ package registryproxy import ( - "github.com/goharbor/harbor/src/common/dao" - "github.com/goharbor/harbor/src/common/models" - "github.com/goharbor/harbor/src/common/quota" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/core/middlewares/util" - "github.com/pkg/errors" "net/http" "net/http/httputil" "net/url" - "strconv" - "strings" - "time" ) type proxyHandler struct { @@ -58,151 +50,11 @@ func New(urls ...string) http.Handler { } return &proxyHandler{ - handler: &httputil.ReverseProxy{ - Director: func(req *http.Request) { - director(targetURL, req) - }, - ModifyResponse: modifyResponse, - }, + handler: httputil.NewSingleHostReverseProxy(targetURL), } } -// Overwrite the http requests -func director(target *url.URL, req *http.Request) { - targetQuery := target.RawQuery - req.URL.Scheme = target.Scheme - req.URL.Host = target.Host - req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) - if targetQuery == "" || req.URL.RawQuery == "" { - req.URL.RawQuery = targetQuery + req.URL.RawQuery - } else { - req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery - } - if _, ok := req.Header["User-Agent"]; !ok { - // explicitly disable User-Agent so it's not set to default value - req.Header.Set("User-Agent", "") - } -} - -// Modify the http response -func modifyResponse(res *http.Response) error { - matchMF, _, _ := util.MatchPushManifest(res.Request) - if matchMF { - return handlerPutManifest(res) - } - matchBB, _ := util.MatchPutBlobURL(res.Request) - if matchBB { - return handlerPutBlob(res) - } - return nil -} - -func singleJoiningSlash(a, b string) string { - aslash := strings.HasSuffix(a, "/") - bslash := strings.HasPrefix(b, "/") - switch { - case aslash && bslash: - return a + b[1:] - case !aslash && !bslash: - return a + "/" + b - } - return a + b -} - -func handlerPutManifest(res *http.Response) error { - mfInfo := res.Request.Context().Value(util.MFInfokKey) - mf, ok := mfInfo.(*util.MfInfo) - if !ok { - return errors.New("failed to convert manifest information context into MfInfo") - } - - defer func() { - _, err := mf.TagLock.Free() - if err != nil { - log.Errorf("Error to unlock in response handler, %v", err) - } - if err := mf.TagLock.Conn.Close(); err != nil { - log.Errorf("Error to close redis connection in response handler, %v", err) - } - }() - - // 201 - if res.StatusCode == http.StatusCreated { - af := &models.Artifact{ - PID: mf.ProjectID, - Repo: mf.Repository, - Tag: mf.Tag, - Digest: mf.Digest, - PushTime: time.Now(), - Kind: "Docker-Image", - } - - // insert or update - if !mf.Exist { - _, err := dao.AddArtifact(af) - if err != nil { - log.Errorf("Error to add artifact, %v", err) - return err - } - } - if mf.DigestChanged { - err := dao.UpdateArtifactDigest(af) - if err != nil { - log.Errorf("Error to add artifact, %v", err) - return err - } - } - - if !mf.Exist || mf.DigestChanged { - afnbs := []*models.ArtifactAndBlob{} - for _, d := range mf.Refrerence { - afnb := &models.ArtifactAndBlob{ - DigestAF: mf.Digest, - DigestBlob: d.Digest.String(), - } - afnbs = append(afnbs, afnb) - } - if err := dao.AddArtifactNBlobs(afnbs); err != nil { - log.Errorf("Error to add artifact and blobs in proxy response handler, %v", err) - return err - } - } - - } else if res.StatusCode >= 300 || res.StatusCode <= 511 { - if !mf.Exist { - success := subtractResources(mf) - if !success { - return errors.New("Error to release resource booked for the manifest") - } - } - } - - return nil -} - -func handlerPutBlob(res *http.Response) error { - if res.StatusCode != http.StatusCreated { - log.Infof("we need to rollback DB and unlock digest ... ") - } - return nil -} - -// used to release resource for failure case -func subtractResources(mfInfo *util.MfInfo) bool { - quotaMgr, err := quota.NewManager("project", strconv.FormatInt(mfInfo.ProjectID, 10)) - if err != nil { - log.Errorf("Error occurred when to new quota manager %v", err) - return false - } - - if err := quotaMgr.SubtractResources(*mfInfo.Quota); err != nil { - log.Errorf("Cannot get quota for the manifest %v", err) - return false - } - return true -} - // ServeHTTP ... func (ph proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { ph.handler.ServeHTTP(rw, req) diff --git a/src/core/middlewares/sizequota/handler.go b/src/core/middlewares/sizequota/handler.go new file mode 100644 index 000000000..e9ceddc98 --- /dev/null +++ b/src/core/middlewares/sizequota/handler.go @@ -0,0 +1,231 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "errors" + "fmt" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" + common_util "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/common/utils/log" + common_redis "github.com/goharbor/harbor/src/common/utils/redis" + "github.com/goharbor/harbor/src/core/middlewares/util" + "net/http" + "strings" + "time" +) + +type sizeQuotaHandler struct { + next http.Handler +} + +// New ... +func New(next http.Handler) http.Handler { + return &sizeQuotaHandler{ + next: next, + } +} + +// ServeHTTP ... +func (sqh *sizeQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + sizeInteceptor := getInteceptor(req) + if sizeInteceptor == nil { + sqh.next.ServeHTTP(rw, req) + return + } + + // handler request + if err := sizeInteceptor.HandleRequest(req); err != nil { + log.Warningf("Error occurred when to handle request in size quota handler: %v", err) + http.Error(rw, util.MarshalError("InternalError", fmt.Sprintf("Error occurred when to handle request in size quota handler: %v", err)), + http.StatusInternalServerError) + return + } + sqh.next.ServeHTTP(rw, req) + + // handler response + sizeInteceptor.HandleResponse(*rw.(*util.CustomResponseWriter), req) +} + +func getInteceptor(req *http.Request) util.RegInterceptor { + // POST /v2//blobs/uploads/?mount=&from= + matchMountBlob, repository, mount, _ := util.MatchMountBlobURL(req) + if matchMountBlob { + bb := util.BlobInfo{} + bb.Repository = repository + bb.Digest = mount + return NewMountBlobInterceptor(&bb) + } + + // PUT /v2//blobs/uploads/?digest= + matchPutBlob, repository := util.MatchPutBlobURL(req) + if matchPutBlob { + bb := util.BlobInfo{} + bb.Repository = repository + return NewPutBlobInterceptor(&bb) + } + + // PUT /v2//manifests/ + matchPushMF, repository, tag := util.MatchPushManifest(req) + if matchPushMF { + bb := util.BlobInfo{} + mfInfo := util.MfInfo{} + bb.Repository = repository + mfInfo.Repository = repository + mfInfo.Tag = tag + return NewPutManifestInterceptor(&bb, &mfInfo) + } + + // PATCH /v2//blobs/uploads/ + matchPatchBlob, _ := util.MatchPatchBlobURL(req) + if matchPatchBlob { + return NewPatchBlobInterceptor() + } + + return nil +} + +func requireQuota(conn redis.Conn, blobInfo *util.BlobInfo) error { + projectID, err := util.GetProjectID(strings.Split(blobInfo.Repository, "/")[0]) + if err != nil { + return err + } + blobInfo.ProjectID = projectID + + digestLock, err := tryLockBlob(conn, blobInfo) + if err != nil { + log.Infof("failed to lock digest in redis, %v", err) + return err + } + blobInfo.DigestLock = digestLock + + blobExist, err := dao.HasBlobInProject(blobInfo.ProjectID, blobInfo.Digest) + if err != nil { + tryFreeBlob(blobInfo) + return err + } + blobInfo.Exist = blobExist + if blobExist { + return nil + } + + // only require quota for non existing blob. + quotaRes := "a.ResourceList{ + quota.ResourceStorage: blobInfo.Size, + } + err = util.TryRequireQuota(blobInfo.ProjectID, quotaRes) + if err != nil { + log.Infof("project id, %d, size %d", blobInfo.ProjectID, blobInfo.Size) + tryFreeBlob(blobInfo) + log.Errorf("cannot get quota for the blob %v", err) + return err + } + blobInfo.Quota = quotaRes + + return nil +} + +// HandleBlobCommon handles put blob complete request +// 1, add blob into DB if success +// 2, roll back resource if failure. +func HandleBlobCommon(rw util.CustomResponseWriter, req *http.Request) error { + bbInfo := req.Context().Value(util.BBInfokKey) + bb, ok := bbInfo.(*util.BlobInfo) + if !ok { + return errors.New("failed to convert blob information context into BBInfo") + } + defer func() { + _, err := bb.DigestLock.Free() + if err != nil { + log.Errorf("Error to unlock blob digest:%s in response handler, %v", bb.Digest, err) + } + if err := bb.DigestLock.Conn.Close(); err != nil { + log.Errorf("Error to close redis connection in put blob response handler, %v", err) + } + }() + + // Do nothing for a existing blob. + if bb.Exist { + return nil + } + + if rw.Status() == http.StatusCreated { + blob := &models.Blob{ + Digest: bb.Digest, + ContentType: bb.ContentType, + Size: bb.Size, + CreationTime: time.Now(), + } + _, err := dao.AddBlob(blob) + if err != nil { + return err + } + } else if rw.Status() >= 300 || rw.Status() <= 511 { + success := util.TryFreeQuota(bb.ProjectID, bb.Quota) + if !success { + return fmt.Errorf("Error to release resource booked for the blob, %d, digest: %s ", bb.ProjectID, bb.Digest) + } + } + return nil +} + +// tryLockBlob locks blob with redis ... +func tryLockBlob(conn redis.Conn, blobInfo *util.BlobInfo) (*common_redis.Mutex, error) { + // Quota::blob-lock::projectname::digest + digestLock := common_redis.New(conn, "Quota::blob-lock::"+strings.Split(blobInfo.Repository, "/")[0]+":"+blobInfo.Digest, common_util.GenerateRandomString()) + success, err := digestLock.Require() + if err != nil { + return nil, err + } + if !success { + return nil, fmt.Errorf("unable to lock digest: %s, %s ", blobInfo.Repository, blobInfo.Digest) + } + return digestLock, nil +} + +func tryFreeBlob(blobInfo *util.BlobInfo) { + _, err := blobInfo.DigestLock.Free() + if err != nil { + log.Warningf("Error to unlock digest: %s,%s with error: %v ", blobInfo.Repository, blobInfo.Digest, err) + } +} + +func rmBlobUploadUUID(conn redis.Conn, UUID string) (bool, error) { + exists, err := redis.Int(conn.Do("EXISTS", UUID)) + if err != nil { + return false, err + } + if exists == 1 { + res, err := redis.Int(conn.Do("DEL", UUID)) + if err != nil { + return false, err + } + return res == 1, nil + } + return true, nil +} + +// put blob path: /v2//blobs/uploads/ +func getUUID(path string) string { + if !strings.Contains(path, "/") { + log.Infof("it's not a valid path string: %s", path) + return "" + } + strs := strings.Split(path, "/") + return strs[len(strs)-1] +} diff --git a/src/core/middlewares/sizequota/handler_test.go b/src/core/middlewares/sizequota/handler_test.go new file mode 100644 index 000000000..b5231f16b --- /dev/null +++ b/src/core/middlewares/sizequota/handler_test.go @@ -0,0 +1,177 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "fmt" + "github.com/garyburd/redigo/redis" + utilstest "github.com/goharbor/harbor/src/common/utils/test" + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +const testingRedisHost = "REDIS_HOST" + +func TestMain(m *testing.M) { + utilstest.InitDatabaseFromEnv() + rc := m.Run() + if rc != 0 { + os.Exit(rc) + } +} + +func TestGetInteceptor(t *testing.T) { + assert := assert.New(t) + req1, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + res1 := getInteceptor(req1) + + _, ok := res1.(*PutManifestInterceptor) + assert.True(ok) + + req2, _ := http.NewRequest("POST", "http://127.0.0.1:5000/v2/library/ubuntu/TestGetInteceptor/14.04", nil) + res2 := getInteceptor(req2) + assert.Nil(res2) + +} + +func TestRequireQuota(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + assert := assert.New(t) + blobInfo := &util.BlobInfo{ + Repository: "library/test", + Digest: "sha256:abcdf123sdfefeg1246", + } + + err = requireQuota(con, blobInfo) + assert.Nil(err) + +} + +func TestGetUUID(t *testing.T) { + str1 := "test/1/2/uuid-1" + uuid1 := getUUID(str1) + assert.Equal(t, uuid1, "uuid-1") + + // not a valid path, just return empty + str2 := "test-1-2-uuid-2" + uuid2 := getUUID(str2) + assert.Equal(t, uuid2, "") +} + +func TestAddRmUUID(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + rmfail, err := rmBlobUploadUUID(con, "test-rm-uuid") + assert.Nil(t, err) + assert.True(t, rmfail) + + success, err := util.SetBunkSize(con, "test-rm-uuid", 1000) + assert.Nil(t, err) + assert.True(t, success) + + rmSuccess, err := rmBlobUploadUUID(con, "test-rm-uuid") + assert.Nil(t, err) + assert.True(t, rmSuccess) + +} + +func TestTryFreeLockBlob(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + blobInfo := util.BlobInfo{ + Repository: "lock/test", + Digest: "sha256:abcdf123sdfefeg1246", + } + + lock, err := tryLockBlob(con, &blobInfo) + assert.Nil(t, err) + blobInfo.DigestLock = lock + tryFreeBlob(&blobInfo) +} + +func TestBlobCommon(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobInfo := util.BlobInfo{ + Repository: "TestBlobCommon/test", + Digest: "sha256:abcdf12345678sdfefeg1246", + ContentType: "ContentType", + Size: 101, + Exist: false, + } + + rw := httptest.NewRecorder() + customResW := util.CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(201) + + lock, err := tryLockBlob(con, &blobInfo) + assert.Nil(t, err) + blobInfo.DigestLock = lock + + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, &blobInfo))) + + err = HandleBlobCommon(customResW, req) + assert.Nil(t, err) + +} + +func getRedisHost() string { + redisHost := os.Getenv(testingRedisHost) + if redisHost == "" { + redisHost = "127.0.0.1" // for local test + } + + return redisHost +} diff --git a/src/core/middlewares/sizequota/mountblob.go b/src/core/middlewares/sizequota/mountblob.go new file mode 100644 index 000000000..8eba2ee3b --- /dev/null +++ b/src/core/middlewares/sizequota/mountblob.go @@ -0,0 +1,69 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "fmt" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/middlewares/util" + "net/http" + "strings" +) + +// MountBlobInterceptor ... +type MountBlobInterceptor struct { + blobInfo *util.BlobInfo +} + +// NewMountBlobInterceptor ... +func NewMountBlobInterceptor(blobInfo *util.BlobInfo) *MountBlobInterceptor { + return &MountBlobInterceptor{ + blobInfo: blobInfo, + } +} + +// HandleRequest ... +func (mbi *MountBlobInterceptor) HandleRequest(req *http.Request) error { + tProjectID, err := util.GetProjectID(strings.Split(mbi.blobInfo.Repository, "/")[0]) + if err != nil { + return fmt.Errorf("error occurred when to get target project: %d, %v", tProjectID, err) + } + blob, err := dao.GetBlob(mbi.blobInfo.Digest) + if err != nil { + return err + } + if blob == nil { + return fmt.Errorf("the blob in the mount request with digest: %s doesn't exist", mbi.blobInfo.Digest) + } + mbi.blobInfo.Size = blob.Size + con, err := util.GetRegRedisCon() + if err != nil { + return err + } + if err := requireQuota(con, mbi.blobInfo); err != nil { + return err + } + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, mbi.blobInfo))) + return nil +} + +// HandleResponse ... +func (mbi *MountBlobInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) { + if err := HandleBlobCommon(rw, req); err != nil { + log.Error(err) + } +} diff --git a/src/core/middlewares/sizequota/mountblob_test.go b/src/core/middlewares/sizequota/mountblob_test.go new file mode 100644 index 000000000..7d6c07cbc --- /dev/null +++ b/src/core/middlewares/sizequota/mountblob_test.go @@ -0,0 +1,85 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "fmt" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewMountBlobInterceptor(t *testing.T) { + blobinfo := &util.BlobInfo{} + blobinfo.Repository = "TestNewMountBlobInterceptor/latest" + + bi := NewMountBlobInterceptor(blobinfo) + assert.NotNil(t, bi) +} + +func TestMountBlobHandleRequest(t *testing.T) { + blobInfo := util.BlobInfo{ + Repository: "TestHandleRequest/test", + Digest: "sha256:TestHandleRequest1234", + ContentType: "ContentType", + Size: 101, + Exist: false, + } + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + bi := NewMountBlobInterceptor(&blobInfo) + assert.NotNil(t, bi.HandleRequest(req)) +} + +func TestMountBlobHandleResponse(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobInfo := util.BlobInfo{ + Repository: "TestHandleResponse/test", + Digest: "sha256:TestHandleResponseabcdf12345678sdfefeg1246", + ContentType: "ContentType", + Size: 101, + Exist: false, + } + + rw := httptest.NewRecorder() + customResW := util.CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(201) + + lock, err := tryLockBlob(con, &blobInfo) + assert.Nil(t, err) + blobInfo.DigestLock = lock + + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, &blobInfo))) + + bi := NewMountBlobInterceptor(&blobInfo) + assert.NotNil(t, bi) + + bi.HandleResponse(customResW, req) + +} diff --git a/src/core/middlewares/sizequota/patchblob.go b/src/core/middlewares/sizequota/patchblob.go new file mode 100644 index 000000000..c5ce15d63 --- /dev/null +++ b/src/core/middlewares/sizequota/patchblob.go @@ -0,0 +1,86 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/middlewares/util" + "net/http" + "strconv" + "strings" +) + +// PatchBlobInterceptor ... +type PatchBlobInterceptor struct { +} + +// NewPatchBlobInterceptor ... +func NewPatchBlobInterceptor() *PatchBlobInterceptor { + return &PatchBlobInterceptor{} +} + +// HandleRequest do nothing for patch blob, just let the request to proxy. +func (pbi *PatchBlobInterceptor) HandleRequest(req *http.Request) error { + return nil +} + +// HandleResponse record the upload process with Range attribute, set it into redis with UUID as the key +func (pbi *PatchBlobInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) { + if rw.Status() != http.StatusAccepted { + return + } + + con, err := util.GetRegRedisCon() + if err != nil { + log.Error(err) + return + } + defer con.Close() + + uuid := rw.Header().Get("Docker-Upload-UUID") + if uuid == "" { + log.Errorf("no UUID in the patch blob response, the request path %s ", req.URL.Path) + return + } + + // Range: Range indicating the current progress of the upload. + // https://github.com/opencontainers/distribution-spec/blob/master/spec.md#get-blob-upload + patchRange := rw.Header().Get("Range") + if uuid == "" { + log.Errorf("no Range in the patch blob response, the request path %s ", req.URL.Path) + return + } + + endRange := strings.Split(patchRange, "-")[1] + size, err := strconv.ParseInt(endRange, 10, 64) + // docker registry did '-1' in the response + if size > 0 { + size = size + 1 + } + if err != nil { + log.Error(err) + return + } + success, err := util.SetBunkSize(con, uuid, size) + if err != nil { + log.Error(err) + return + } + if !success { + // ToDo discuss what to do here. + log.Warningf(" T_T: Fail to set bunk: %s size: %d in redis, it causes unable to set correct quota for the artifact.", uuid, size) + } + return +} diff --git a/src/core/middlewares/sizequota/patchblob_test.go b/src/core/middlewares/sizequota/patchblob_test.go new file mode 100644 index 000000000..843b505c1 --- /dev/null +++ b/src/core/middlewares/sizequota/patchblob_test.go @@ -0,0 +1,42 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "testing" +) + +func TestNewPatchBlobInterceptor(t *testing.T) { + bi := NewPatchBlobInterceptor() + assert.NotNil(t, bi) +} + +func TestPatchBlobHandleRequest(t *testing.T) { + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + bi := NewPatchBlobInterceptor() + assert.Nil(t, bi.HandleRequest(req)) +} + +func TestPatchBlobHandleResponse(t *testing.T) { + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + rw := httptest.NewRecorder() + customResW := util.CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(400) + NewPatchBlobInterceptor().HandleResponse(customResW, req) +} diff --git a/src/core/middlewares/sizequota/putblob.go b/src/core/middlewares/sizequota/putblob.go new file mode 100644 index 000000000..e2e75b8b3 --- /dev/null +++ b/src/core/middlewares/sizequota/putblob.go @@ -0,0 +1,83 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "errors" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/opencontainers/go-digest" + "net/http" +) + +// PutBlobInterceptor ... +type PutBlobInterceptor struct { + blobInfo *util.BlobInfo +} + +// NewPutBlobInterceptor ... +func NewPutBlobInterceptor(blobInfo *util.BlobInfo) *PutBlobInterceptor { + return &PutBlobInterceptor{ + blobInfo: blobInfo, + } +} + +// HandleRequest ... +func (pbi *PutBlobInterceptor) HandleRequest(req *http.Request) error { + // the redis connection will be closed in the put response. + con, err := util.GetRegRedisCon() + if err != nil { + return err + } + + defer func() { + if pbi.blobInfo.UUID != "" { + _, err := rmBlobUploadUUID(con, pbi.blobInfo.UUID) + if err != nil { + log.Warningf("error occurred when remove UUID for blob, %v", err) + } + } + }() + + dgstStr := req.FormValue("digest") + if dgstStr == "" { + return errors.New("blob digest missing") + } + dgst, err := digest.Parse(dgstStr) + if err != nil { + return errors.New("blob digest parsing failed") + } + + pbi.blobInfo.Digest = dgst.String() + pbi.blobInfo.UUID = getUUID(req.URL.Path) + size, err := util.GetBlobSize(con, pbi.blobInfo.UUID) + if err != nil { + return err + } + pbi.blobInfo.Size = size + if err := requireQuota(con, pbi.blobInfo); err != nil { + return err + } + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, pbi.blobInfo))) + return nil +} + +// HandleResponse ... +func (pbi *PutBlobInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) { + if err := HandleBlobCommon(rw, req); err != nil { + log.Error(err) + } +} diff --git a/src/core/middlewares/sizequota/putblob_test.go b/src/core/middlewares/sizequota/putblob_test.go new file mode 100644 index 000000000..847623c56 --- /dev/null +++ b/src/core/middlewares/sizequota/putblob_test.go @@ -0,0 +1,80 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "fmt" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewPutBlobInterceptor(t *testing.T) { + blobinfo := &util.BlobInfo{} + blobinfo.Repository = "TestNewPutBlobInterceptor/latest" + + bi := NewPutBlobInterceptor(blobinfo) + assert.NotNil(t, bi) +} + +func TestPutBlobHandleRequest(t *testing.T) { + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobinfo := &util.BlobInfo{} + blobinfo.Repository = "TestPutBlobHandleRequest/latest" + + bi := NewPutBlobInterceptor(blobinfo) + assert.NotNil(t, bi.HandleRequest(req)) +} + +func TestPutBlobHandleResponse(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobInfo := util.BlobInfo{ + Repository: "TestPutBlobHandleResponse/test", + Digest: "sha256:TestPutBlobHandleResponseabcdf12345678sdfefeg1246", + ContentType: "ContentType", + Size: 101, + Exist: false, + } + + rw := httptest.NewRecorder() + customResW := util.CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(201) + + lock, err := tryLockBlob(con, &blobInfo) + assert.Nil(t, err) + blobInfo.DigestLock = lock + + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, &blobInfo))) + + bi := NewPutBlobInterceptor(&blobInfo) + assert.NotNil(t, bi) + + bi.HandleResponse(customResW, req) +} diff --git a/src/core/middlewares/sizequota/putmanifest.go b/src/core/middlewares/sizequota/putmanifest.go new file mode 100644 index 000000000..76d87044a --- /dev/null +++ b/src/core/middlewares/sizequota/putmanifest.go @@ -0,0 +1,102 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "bytes" + "context" + "fmt" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/middlewares/util" + "io/ioutil" + "net/http" + "strings" +) + +// PutManifestInterceptor ... +type PutManifestInterceptor struct { + blobInfo *util.BlobInfo + mfInfo *util.MfInfo +} + +// NewPutManifestInterceptor ... +func NewPutManifestInterceptor(blobInfo *util.BlobInfo, mfInfo *util.MfInfo) *PutManifestInterceptor { + return &PutManifestInterceptor{ + blobInfo: blobInfo, + mfInfo: mfInfo, + } +} + +// HandleRequest ... +func (pmi *PutManifestInterceptor) HandleRequest(req *http.Request) error { + mediaType := req.Header.Get("Content-Type") + if mediaType == schema1.MediaTypeManifest || + mediaType == schema1.MediaTypeSignedManifest || + mediaType == schema2.MediaTypeManifest { + + con, err := util.GetRegRedisCon() + if err != nil { + log.Infof("failed to get registry redis connection, %v", err) + return err + } + + data, err := ioutil.ReadAll(req.Body) + if err != nil { + log.Warningf("Error occurred when to copy manifest body %v", err) + return err + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(data)) + manifest, desc, err := distribution.UnmarshalManifest(mediaType, data) + if err != nil { + log.Warningf("Error occurred when to Unmarshal Manifest %v", err) + return err + } + projectID, err := util.GetProjectID(strings.Split(pmi.mfInfo.Repository, "/")[0]) + if err != nil { + log.Warningf("Error occurred when to get project ID %v", err) + return err + } + + pmi.mfInfo.ProjectID = projectID + pmi.mfInfo.Refrerence = manifest.References() + pmi.mfInfo.Digest = desc.Digest.String() + pmi.blobInfo.ProjectID = projectID + pmi.blobInfo.Digest = desc.Digest.String() + pmi.blobInfo.Size = desc.Size + pmi.blobInfo.ContentType = mediaType + + if err := requireQuota(con, pmi.blobInfo); err != nil { + return err + } + + *req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, pmi.mfInfo))) + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, pmi.blobInfo))) + + return nil + } + + return fmt.Errorf("unsupported content type for manifest: %s", mediaType) +} + +// HandleResponse ... +func (pmi *PutManifestInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) { + if err := HandleBlobCommon(rw, req); err != nil { + log.Error(err) + return + } +} diff --git a/src/core/middlewares/sizequota/putmanifest_test.go b/src/core/middlewares/sizequota/putmanifest_test.go new file mode 100644 index 000000000..dc6b91098 --- /dev/null +++ b/src/core/middlewares/sizequota/putmanifest_test.go @@ -0,0 +1,92 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sizequota + +import ( + "context" + "fmt" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewPutManifestInterceptor(t *testing.T) { + blobinfo := &util.BlobInfo{} + blobinfo.Repository = "TestNewPutManifestInterceptor/latest" + + mfinfo := &util.MfInfo{ + Repository: "TestNewPutManifestInterceptor", + } + + mi := NewPutManifestInterceptor(blobinfo, mfinfo) + assert.NotNil(t, mi) +} + +func TestPutManifestHandleRequest(t *testing.T) { + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobinfo := &util.BlobInfo{} + blobinfo.Repository = "TestPutManifestHandleRequest/latest" + + mfinfo := &util.MfInfo{ + Repository: "TestPutManifestHandleRequest", + } + + mi := NewPutManifestInterceptor(blobinfo, mfinfo) + assert.NotNil(t, mi.HandleRequest(req)) +} + +func TestPutManifestHandleResponse(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + req, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) + blobInfo := util.BlobInfo{ + Repository: "TestPutManifestandleResponse/test", + Digest: "sha256:TestPutManifestandleResponseabcdf12345678sdfefeg1246", + ContentType: "ContentType", + Size: 101, + Exist: false, + } + + mfinfo := util.MfInfo{ + Repository: "TestPutManifestandleResponse", + } + + rw := httptest.NewRecorder() + customResW := util.CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(201) + + lock, err := tryLockBlob(con, &blobInfo) + assert.Nil(t, err) + blobInfo.DigestLock = lock + + *req = *(req.WithContext(context.WithValue(req.Context(), util.BBInfokKey, &blobInfo))) + + bi := NewPutManifestInterceptor(&blobInfo, &mfinfo) + assert.NotNil(t, bi) + + bi.HandleResponse(customResW, req) +} diff --git a/src/core/middlewares/util/reginteceptor.go b/src/core/middlewares/util/reginteceptor.go new file mode 100644 index 000000000..902b66f0a --- /dev/null +++ b/src/core/middlewares/util/reginteceptor.go @@ -0,0 +1,28 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "net/http" +) + +// RegInterceptor ... +type RegInterceptor interface { + // HandleRequest ... + HandleRequest(req *http.Request) error + + // HandleResponse won't return any error + HandleResponse(rw CustomResponseWriter, req *http.Request) +} diff --git a/src/core/middlewares/util/response.go b/src/core/middlewares/util/response.go new file mode 100644 index 000000000..48e3f0cda --- /dev/null +++ b/src/core/middlewares/util/response.go @@ -0,0 +1,59 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "net/http" +) + +// CustomResponseWriter write the response code into the status +type CustomResponseWriter struct { + http.ResponseWriter + status int + wroteHeader bool +} + +// NewCustomResponseWriter ... +func NewCustomResponseWriter(w http.ResponseWriter) *CustomResponseWriter { + return &CustomResponseWriter{ResponseWriter: w} +} + +// Status ... +func (w *CustomResponseWriter) Status() int { + return w.status +} + +// Header ... +func (w CustomResponseWriter) Header() http.Header { + return w.ResponseWriter.Header() +} + +// Write ... +func (w *CustomResponseWriter) Write(p []byte) (n int, err error) { + if !w.wroteHeader { + w.WriteHeader(http.StatusOK) + } + return w.ResponseWriter.Write(p) +} + +// WriteHeader ... +func (w *CustomResponseWriter) WriteHeader(code int) { + w.ResponseWriter.WriteHeader(code) + if w.wroteHeader { + return + } + w.status = code + w.wroteHeader = true +} diff --git a/src/core/middlewares/util/response_test.go b/src/core/middlewares/util/response_test.go new file mode 100644 index 000000000..40ec59c4e --- /dev/null +++ b/src/core/middlewares/util/response_test.go @@ -0,0 +1,29 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCustomResponseWriter(t *testing.T) { + rw := httptest.NewRecorder() + customResW := CustomResponseWriter{ResponseWriter: rw} + customResW.WriteHeader(501) + assert.Equal(t, customResW.Status(), 501) +} diff --git a/src/core/middlewares/util/util.go b/src/core/middlewares/util/util.go index 6cf95ce9e..8e1df00e2 100644 --- a/src/core/middlewares/util/util.go +++ b/src/core/middlewares/util/util.go @@ -16,7 +16,11 @@ package util import ( "encoding/json" + "errors" + "fmt" "github.com/docker/distribution" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/quota" "github.com/goharbor/harbor/src/common/utils/clair" @@ -28,11 +32,16 @@ import ( "net/http" "net/http/httptest" "regexp" + "strconv" "strings" + "time" ) type contextKey string +// ErrRequireQuota ... +var ErrRequireQuota = errors.New("cannot get quota on project for request") + const ( manifestURLPattern = `^/v2/((?:[a-z0-9]+(?:[._-][a-z0-9]+)*/)+)manifests/([\w][\w.:-]{0,127})` blobURLPattern = `^/v2/((?:[a-z0-9]+(?:[._-][a-z0-9]+)*/)+)blobs/uploads/` @@ -42,7 +51,16 @@ const ( // TODO: temp solution, remove after vmware/harbor#2242 is resolved. TokenUsername = "harbor-core" // MFInfokKey the context key for image tag redis lock - MFInfokKey = contextKey("ManifestLock") + MFInfokKey = contextKey("ManifestInfo") + // BBInfokKey the context key for image tag redis lock + BBInfokKey = contextKey("BlobInfo") + + // DialConnectionTimeout ... + DialConnectionTimeout = 30 * time.Second + // DialReadTimeout ... + DialReadTimeout = time.Minute + 10*time.Second + // DialWriteTimeout ... + DialWriteTimeout = 10 * time.Second ) // ImageInfo ... @@ -53,6 +71,24 @@ type ImageInfo struct { Digest string } +// BlobInfo ... +type BlobInfo struct { + UUID string + ProjectID int64 + ContentType string + Size int64 + Repository string + Tag string + + // Exist is to index the existing of the manifest in DB. If false, it's an new image for uploading. + Exist bool + + Digest string + DigestLock *common_redis.Mutex + // Quota is the resource applied for the manifest upload request. + Quota *quota.ResourceList +} + // MfInfo ... type MfInfo struct { // basic information of a manifest @@ -133,6 +169,24 @@ func MatchPutBlobURL(req *http.Request) (bool, string) { return false, "" } +// MatchPatchBlobURL ... +func MatchPatchBlobURL(req *http.Request) (bool, string) { + if req.Method != http.MethodPatch { + return false, "" + } + re, err := regexp.Compile(blobURLPattern) + if err != nil { + log.Errorf("error to match put blob url, %v", err) + return false, "" + } + s := re.FindStringSubmatch(req.URL.Path) + if len(s) == 2 { + s[1] = strings.TrimSuffix(s[1], "/") + return true, s[1] + } + return false, "" +} + // MatchPullManifest checks if the request looks like a request to pull manifest. If it is returns the image and tag/sha256 digest as 2nd and 3rd return values func MatchPullManifest(req *http.Request) (bool, string, string) { if req.Method != http.MethodGet { @@ -149,6 +203,33 @@ func MatchPushManifest(req *http.Request) (bool, string, string) { return MatchManifestURL(req) } +// MatchMountBlobURL POST /v2//blobs/uploads/?mount=&from= +// If match, will return repo, mount and from as the 2nd, 3th and 4th. +func MatchMountBlobURL(req *http.Request) (bool, string, string, string) { + if req.Method != http.MethodPost { + return false, "", "", "" + } + re, err := regexp.Compile(blobURLPattern) + if err != nil { + log.Errorf("error to match post blob url, %v", err) + return false, "", "", "" + } + s := re.FindStringSubmatch(req.URL.Path) + if len(s) == 2 { + s[1] = strings.TrimSuffix(s[1], "/") + mount := req.FormValue("mount") + if mount == "" { + return false, "", "", "" + } + from := req.FormValue("from") + if from == "" { + return false, "", "", "" + } + return true, s[1], mount, from + } + return false, "", "", "" +} + // CopyResp ... func CopyResp(rec *httptest.ResponseRecorder, rw http.ResponseWriter) { for k, v := range rec.Header() { @@ -218,3 +299,79 @@ func NewPMSPolicyChecker(pm promgr.ProjectManager) PolicyChecker { func GetPolicyChecker() PolicyChecker { return NewPMSPolicyChecker(config.GlobalProjectMgr) } + +// TryRequireQuota ... +func TryRequireQuota(projectID int64, quotaRes *quota.ResourceList) error { + quotaMgr, err := quota.NewManager("project", strconv.FormatInt(projectID, 10)) + if err != nil { + log.Errorf("Error occurred when to new quota manager %v", err) + return err + } + if err := quotaMgr.AddResources(*quotaRes); err != nil { + log.Errorf("cannot get quota for the project resource: %d, err: %v", projectID, err) + return ErrRequireQuota + } + return nil +} + +// TryFreeQuota used to release resource for failure case +func TryFreeQuota(projectID int64, qres *quota.ResourceList) bool { + quotaMgr, err := quota.NewManager("project", strconv.FormatInt(projectID, 10)) + if err != nil { + log.Errorf("Error occurred when to new quota manager %v", err) + return false + } + + if err := quotaMgr.SubtractResources(*qres); err != nil { + log.Errorf("cannot release quota for the project resource: %d, err: %v", projectID, err) + return false + } + return true +} + +// GetBlobSize blob size with UUID in redis +func GetBlobSize(conn redis.Conn, uuid string) (int64, error) { + exists, err := redis.Int(conn.Do("EXISTS", uuid)) + if err != nil { + return 0, err + } + if exists == 1 { + size, err := redis.Int64(conn.Do("GET", uuid)) + if err != nil { + return 0, err + } + return size, nil + } + return 0, nil +} + +// SetBunkSize sets the temp size for blob bunk with its uuid. +func SetBunkSize(conn redis.Conn, uuid string, size int64) (bool, error) { + setRes, err := redis.String(conn.Do("SET", uuid, size)) + if err != nil { + return false, err + } + return setRes == "OK", nil +} + +// GetProjectID ... +func GetProjectID(name string) (int64, error) { + project, err := dao.GetProjectByName(name) + if err != nil { + return 0, err + } + if project != nil { + return project.ProjectID, nil + } + return 0, fmt.Errorf("project %s is not found", name) +} + +// GetRegRedisCon ... +func GetRegRedisCon() (redis.Conn, error) { + return redis.DialURL( + config.GetRedisOfRegURL(), + redis.DialConnectTimeout(DialConnectionTimeout), + redis.DialReadTimeout(DialReadTimeout), + redis.DialWriteTimeout(DialWriteTimeout), + ) +} diff --git a/src/core/middlewares/util/util_test.go b/src/core/middlewares/util/util_test.go index be7264eeb..736c81a05 100644 --- a/src/core/middlewares/util/util_test.go +++ b/src/core/middlewares/util/util_test.go @@ -16,6 +16,7 @@ package util import ( "github.com/goharbor/harbor/src/common" + "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" notarytest "github.com/goharbor/harbor/src/common/utils/notary/test" testutils "github.com/goharbor/harbor/src/common/utils/test" @@ -23,19 +24,26 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "fmt" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/common/quota" "net/http" "net/http/httptest" "os" "testing" + "time" ) var endpoint = "10.117.4.142" var notaryServer *httptest.Server +const testingRedisHost = "REDIS_HOST" + var admiralEndpoint = "http://127.0.0.1:8282" var token = "" func TestMain(m *testing.M) { + testutils.InitDatabaseFromEnv() notaryServer = notarytest.NewNotaryServer(endpoint) defer notaryServer.Close() var defaultConfig = map[string]interface{}{ @@ -107,6 +115,40 @@ func TestMatchPutBlob(t *testing.T) { assert.False(res3, "%s %v is not a request to put blob", req3.Method, req3.URL) } +func TestMatchMountBlobURL(t *testing.T) { + assert := assert.New(t) + req1, _ := http.NewRequest("POST", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/?mount=digtest123&from=testrepo", nil) + res1, repo1, mount, from := MatchMountBlobURL(req1) + assert.True(res1, "%s %v is not a request to mount blob", req1.Method, req1.URL) + assert.Equal("library/ubuntu", repo1) + assert.Equal("digtest123", mount) + assert.Equal("testrepo", from) + + req2, _ := http.NewRequest("PATCH", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/?mount=digtest123&from=testrepo", nil) + res2, _, _, _ := MatchMountBlobURL(req2) + assert.False(res2, "%s %v is a request to mount blob", req2.Method, req2.URL) + + req3, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/?mount=digtest123&from=testrepo", nil) + res3, _, _, _ := MatchMountBlobURL(req3) + assert.False(res3, "%s %v is not a request to put blob", req3.Method, req3.URL) +} + +func TestPatchBlobURL(t *testing.T) { + assert := assert.New(t) + req1, _ := http.NewRequest("PATCH", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/1234-1234-abcd", nil) + res1, repo1 := MatchPatchBlobURL(req1) + assert.True(res1, "%s %v is not a request to patch blob", req1.Method, req1.URL) + assert.Equal("library/ubuntu", repo1) + + req2, _ := http.NewRequest("POST", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/1234-1234-abcd", nil) + res2, _ := MatchPatchBlobURL(req2) + assert.False(res2, "%s %v is a request to patch blob", req2.Method, req2.URL) + + req3, _ := http.NewRequest("PUT", "http://127.0.0.1:5000/v2/library/ubuntu/blobs/uploads/?mount=digtest123&from=testrepo", nil) + res3, _ := MatchPatchBlobURL(req3) + assert.False(res3, "%s %v is not a request to patch blob", req3.Method, req3.URL) +} + func TestMatchPushManifest(t *testing.T) { assert := assert.New(t) req1, _ := http.NewRequest("POST", "http://127.0.0.1:5000/v2/library/ubuntu/manifests/14.04", nil) @@ -170,7 +212,6 @@ func TestPMSPolicyChecker(t *testing.T) { if err := config.Init(); err != nil { panic(err) } - testutils.InitDatabaseFromEnv() config.Upload(defaultConfigAdmiral) @@ -218,3 +259,84 @@ func TestMarshalError(t *testing.T) { js2 := MarshalError("DENIED", "The action is denied") assert.Equal("{\"errors\":[{\"code\":\"DENIED\",\"message\":\"The action is denied\",\"detail\":\"The action is denied\"}]}", js2) } + +func TestTryRequireQuota(t *testing.T) { + quotaRes := "a.ResourceList{ + quota.ResourceStorage: 100, + } + err := TryRequireQuota(1, quotaRes) + assert.Nil(t, err) +} + +func TestTryFreeQuota(t *testing.T) { + quotaRes := "a.ResourceList{ + quota.ResourceStorage: 1, + } + success := TryFreeQuota(1, quotaRes) + assert.True(t, success) +} + +func TestGetBlobSize(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + size, err := GetBlobSize(con, "test-TestGetBlobSize") + assert.Nil(t, err) + assert.Equal(t, size, int64(0)) +} + +func TestSetBunkSize(t *testing.T) { + con, err := redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", getRedisHost(), 6379), + redis.DialConnectTimeout(30*time.Second), + redis.DialReadTimeout(time.Minute+10*time.Second), + redis.DialWriteTimeout(10*time.Second), + ) + assert.Nil(t, err) + defer con.Close() + + size, err := GetBlobSize(con, "TestSetBunkSize") + assert.Nil(t, err) + assert.Equal(t, size, int64(0)) + + _, err = SetBunkSize(con, "TestSetBunkSize", 123) + assert.Nil(t, err) + + size1, err := GetBlobSize(con, "TestSetBunkSize") + assert.Nil(t, err) + assert.Equal(t, size1, int64(123)) +} + +func TestGetProjectID(t *testing.T) { + name := "project_for_TestGetProjectID" + project := models.Project{ + OwnerID: 1, + Name: name, + } + + id, err := dao.AddProject(project) + if err != nil { + t.Fatalf("failed to add project: %v", err) + } + + idget, err := GetProjectID(name) + assert.Nil(t, err) + assert.Equal(t, id, idget) +} + +func getRedisHost() string { + redisHost := os.Getenv(testingRedisHost) + if redisHost == "" { + redisHost = "127.0.0.1" // for local test + } + + return redisHost +}