From 8ac6bdbbb01985dafcd453b4b3896477916be5dc Mon Sep 17 00:00:00 2001 From: Wang Yan Date: Thu, 11 Jul 2019 20:26:52 +0800 Subject: [PATCH] Add quota workflow for quota 1, apply count for manifest if it's a new image 2, insert data for artifact and artifact_blob Signed-off-by: wang yan --- src/common/dao/artifact.go | 38 +++ src/common/dao/artifact_test.go | 39 ++++ src/common/models/artifact.go | 9 + src/core/middlewares/chain.go | 4 +- src/core/middlewares/config.go | 4 +- src/core/middlewares/countquota/handler.go | 218 ++++++++++++++++++ src/core/middlewares/registryproxy/handler.go | 126 ++++++++-- src/core/middlewares/regquota/handler.go | 74 ------ src/core/middlewares/util/util.go | 27 +++ 9 files changed, 441 insertions(+), 98 deletions(-) create mode 100644 src/core/middlewares/countquota/handler.go delete mode 100644 src/core/middlewares/regquota/handler.go diff --git a/src/common/dao/artifact.go b/src/common/dao/artifact.go index d8ba7d86c..ac2b067ee 100644 --- a/src/common/dao/artifact.go +++ b/src/common/dao/artifact.go @@ -15,6 +15,7 @@ package dao import ( + "github.com/astaxie/beego/orm" "github.com/goharbor/harbor/src/common/models" "strings" "time" @@ -34,6 +35,12 @@ func AddArtifact(af *models.Artifact) (int64, error) { return id, nil } +// UpdateArtifactDigest ... +func UpdateArtifactDigest(af *models.Artifact) error { + _, err := GetOrmer().Update(af, "digest") + return err +} + // DeleteArtifact ... func DeleteArtifact(id int64) error { _, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete() @@ -58,3 +65,34 @@ func DeleteByTag(projectID int, repo, tag string) error { } return nil } + +// ListArtifacts list artifacts according to the query conditions +func ListArtifacts(query *models.ArtifactQuery) ([]*models.Artifact, error) { + qs := getArtifactQuerySetter(query) + if query.Size > 0 { + qs = qs.Limit(query.Size) + if query.Page > 0 { + qs = qs.Offset((query.Page - 1) * query.Size) + } + } + afs := []*models.Artifact{} + _, err := qs.All(&afs) + return afs, err +} + +func getArtifactQuerySetter(query *models.ArtifactQuery) orm.QuerySeter { + qs := GetOrmer().QueryTable(&models.Artifact{}) + if query.PID != 0 { + qs = qs.Filter("PID", query.PID) + } + if len(query.Repo) > 0 { + qs = qs.Filter("Repo", query.Repo) + } + if len(query.Tag) > 0 { + qs = qs.Filter("Tag", query.Tag) + } + if len(query.Digest) > 0 { + qs = qs.Filter("Digest", query.Digest) + } + return qs +} diff --git a/src/common/dao/artifact_test.go b/src/common/dao/artifact_test.go index 5c9405087..1869f2982 100644 --- a/src/common/dao/artifact_test.go +++ b/src/common/dao/artifact_test.go @@ -40,6 +40,24 @@ func TestAddArtifact(t *testing.T) { } +func TestUpdateArtifactDigest(t *testing.T) { + af := &models.Artifact{ + PID: 1, + Repo: "hello-world", + Tag: "v2.0", + Digest: "4321abcd", + Kind: "image", + } + + // add + _, err := AddArtifact(af) + require.Nil(t, err) + + af.Digest = "update_4321abcd" + require.Nil(t, UpdateArtifactDigest(af)) + assert.Equal(t, af.Digest, "update_4321abcd") +} + func TestDeleteArtifact(t *testing.T) { af := &models.Artifact{ PID: 1, @@ -90,3 +108,24 @@ func TestDeleteArtifactByTag(t *testing.T) { err = DeleteByTag(1, "hello-world", "v1.2") require.Nil(t, err) } + +func TestListArtifacts(t *testing.T) { + af := &models.Artifact{ + PID: 1, + Repo: "hello-world", + Tag: "v3.0", + Digest: "TestListArtifacts", + Kind: "image", + } + // add + _, err := AddArtifact(af) + require.Nil(t, err) + + afs, err := ListArtifacts(&models.ArtifactQuery{ + PID: 1, + Repo: "hello-world", + Tag: "v3.0", + }) + require.Nil(t, err) + assert.Equal(t, 1, len(afs)) +} diff --git a/src/common/models/artifact.go b/src/common/models/artifact.go index 63abd9760..fa6760702 100644 --- a/src/common/models/artifact.go +++ b/src/common/models/artifact.go @@ -21,3 +21,12 @@ type Artifact struct { func (af *Artifact) TableName() string { return "artifact" } + +// ArtifactQuery ... +type ArtifactQuery struct { + PID int64 + Repo string + Tag string + Digest string + Pagination +} diff --git a/src/core/middlewares/chain.go b/src/core/middlewares/chain.go index 4ea46e41e..4141022ec 100644 --- a/src/core/middlewares/chain.go +++ b/src/core/middlewares/chain.go @@ -18,10 +18,10 @@ import ( "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/middlewares/blobquota" "github.com/goharbor/harbor/src/core/middlewares/contenttrust" + "github.com/goharbor/harbor/src/core/middlewares/countquota" "github.com/goharbor/harbor/src/core/middlewares/listrepo" "github.com/goharbor/harbor/src/core/middlewares/multiplmanifest" "github.com/goharbor/harbor/src/core/middlewares/readonly" - "github.com/goharbor/harbor/src/core/middlewares/regquota" "github.com/goharbor/harbor/src/core/middlewares/url" "github.com/goharbor/harbor/src/core/middlewares/vulnerable" "github.com/justinas/alice" @@ -65,7 +65,7 @@ func (b *DefaultCreator) geMiddleware(mName string) alice.Constructor { LISTREPO: func(next http.Handler) http.Handler { return listrepo.New(next) }, CONTENTTRUST: func(next http.Handler) http.Handler { return contenttrust.New(next) }, VULNERABLE: func(next http.Handler) http.Handler { return vulnerable.New(next) }, - REGQUOTA: func(next http.Handler) http.Handler { return regquota.New(next) }, + COUNTQUOTA: func(next http.Handler) http.Handler { return countquota.New(next) }, BLOBQUOTA: func(next http.Handler) http.Handler { return blobquota.New(next) }, } return middlewares[mName] diff --git a/src/core/middlewares/config.go b/src/core/middlewares/config.go index 633d97f0e..8560c5225 100644 --- a/src/core/middlewares/config.go +++ b/src/core/middlewares/config.go @@ -22,9 +22,9 @@ const ( LISTREPO = "listrepo" CONTENTTRUST = "contenttrust" VULNERABLE = "vulnerable" - REGQUOTA = "regquota" + COUNTQUOTA = "countquota" BLOBQUOTA = "blobquota" ) // Middlewares with sequential organization -var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, BLOBQUOTA, REGQUOTA} +var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, BLOBQUOTA, COUNTQUOTA} diff --git a/src/core/middlewares/countquota/handler.go b/src/core/middlewares/countquota/handler.go new file mode 100644 index 000000000..b9e5a7546 --- /dev/null +++ b/src/core/middlewares/countquota/handler.go @@ -0,0 +1,218 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package countquota + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/garyburd/redigo/redis" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" + common_util "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/common/utils/log" + common_redis "github.com/goharbor/harbor/src/common/utils/redis" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/core/middlewares/util" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" +) + +const ( + dialConnectionTimeout = 30 * time.Second + dialReadTimeout = time.Minute + 10*time.Second + dialWriteTimeout = 10 * time.Second +) + +// ErrRequireQuota ... +var ErrRequireQuota = errors.New("cannot get quota on project for request") + +type countQuotaHandler struct { + next http.Handler + mfInfo *util.MfInfo +} + +// New ... +func New(next http.Handler) http.Handler { + return &countQuotaHandler{ + next: next, + } +} + +// ServeHTTP manifest ... +func (cqh *countQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + match, repository, tag := util.MatchPushManifest(req) + if match { + mfInfo := &util.MfInfo{ + Repository: repository, + Tag: tag, + } + cqh.mfInfo = mfInfo + + mediaType := req.Header.Get("Content-Type") + if mediaType == schema1.MediaTypeManifest || + mediaType == schema1.MediaTypeSignedManifest || + mediaType == schema2.MediaTypeManifest { + + tagLock, err := cqh.tryLockTag() + if err != nil { + log.Warningf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err) + http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err)), http.StatusInternalServerError) + return + } + cqh.mfInfo.TagLock = tagLock + + data, err := ioutil.ReadAll(req.Body) + if err != nil { + cqh.tryFreeTag() + log.Warningf("Error occurred when to copy manifest body %v", err) + http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to decode manifest body %v", err)), http.StatusInternalServerError) + return + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(data)) + + manifest, desc, err := distribution.UnmarshalManifest(mediaType, data) + if err != nil { + cqh.tryFreeTag() + log.Warningf("Error occurred when to Unmarshal Manifest %v", err) + http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to Unmarshal Manifest %v", err)), http.StatusInternalServerError) + return + } + cqh.mfInfo.Refrerence = manifest.References() + cqh.mfInfo.Digest = desc.Digest.String() + + projectID, err := cqh.getProjectID(strings.Split(repository, "/")[0]) + if err != nil { + log.Warningf("Error occurred when to get project ID %v", err) + return + } + cqh.mfInfo.ProjectID = projectID + + imageExist, af, err := cqh.imageExist() + if err != nil { + cqh.tryFreeTag() + log.Warningf("Error occurred when to check Manifest existence by repo and tag name %v", err) + http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to check Manifest existence %v", err)), http.StatusInternalServerError) + return + } + cqh.mfInfo.Exist = imageExist + if imageExist { + if af.Digest != cqh.mfInfo.Digest { + cqh.mfInfo.DigestChanged = true + } + } else { + quotaRes := "a.ResourceList{ + quota.ResourceCount: 1, + } + err := cqh.tryRequireQuota(quotaRes) + if err != nil { + cqh.tryFreeTag() + log.Errorf("Cannot get quota for the manifest %v", err) + if err == ErrRequireQuota { + http.Error(rw, util.MarshalError("StatusNotAcceptable", fmt.Sprintf("Cannot get quota for the manifest %v", err)), http.StatusNotAcceptable) + return + } + http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to require quota for the manifest %v", err)), http.StatusInternalServerError) + return + } + cqh.mfInfo.Quota = quotaRes + } + + *req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, mfInfo))) + } + + } + + cqh.next.ServeHTTP(rw, req) +} + +// tryLockTag locks tag with redis ... +func (cqh *countQuotaHandler) tryLockTag() (*common_redis.Mutex, error) { + con, err := redis.DialURL( + config.GetRedisOfRegURL(), + redis.DialConnectTimeout(dialConnectionTimeout), + redis.DialReadTimeout(dialReadTimeout), + redis.DialWriteTimeout(dialWriteTimeout), + ) + if err != nil { + return nil, err + } + tagLock := common_redis.New(con, cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag, common_util.GenerateRandomString()) + success, err := tagLock.Require() + if err != nil { + return nil, err + } + if !success { + return nil, fmt.Errorf("unable to lock tag: %s ", cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag) + } + return tagLock, nil +} + +func (cqh *countQuotaHandler) tryFreeTag() { + _, err := cqh.mfInfo.TagLock.Free() + if err != nil { + log.Warningf("Error to unlock tag: %s, with error: %v ", cqh.mfInfo.Tag, err) + } +} + +// check the existence of a artifact, if exist, the method will return the artifact model +func (cqh *countQuotaHandler) imageExist() (exist bool, af *models.Artifact, err error) { + artifactQuery := &models.ArtifactQuery{ + PID: cqh.mfInfo.ProjectID, + Repo: cqh.mfInfo.Repository, + Tag: cqh.mfInfo.Tag, + } + afs, err := dao.ListArtifacts(artifactQuery) + if err != nil { + log.Errorf("Error occurred when to get project ID %v", err) + return false, nil, err + } + if len(afs) > 0 { + return true, afs[0], nil + } + return false, nil, nil +} + +func (cqh *countQuotaHandler) tryRequireQuota(quotaRes *quota.ResourceList) error { + quotaMgr, err := quota.NewManager("project", strconv.FormatInt(cqh.mfInfo.ProjectID, 10)) + if err != nil { + log.Errorf("Error occurred when to new quota manager %v", err) + return err + } + if err := quotaMgr.AddResources(*quotaRes); err != nil { + log.Errorf("Cannot get quota for the manifest %v", err) + return ErrRequireQuota + } + return nil +} + +func (cqh *countQuotaHandler) getProjectID(name string) (int64, error) { + project, err := dao.GetProjectByName(name) + if err != nil { + return 0, err + } + if project != nil { + return project.ProjectID, nil + } + return 0, fmt.Errorf("project %s is not found", name) +} diff --git a/src/core/middlewares/registryproxy/handler.go b/src/core/middlewares/registryproxy/handler.go index 2558b9717..71b1ee479 100644 --- a/src/core/middlewares/registryproxy/handler.go +++ b/src/core/middlewares/registryproxy/handler.go @@ -15,13 +15,19 @@ package registryproxy import ( + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/middlewares/util" + "github.com/pkg/errors" "net/http" "net/http/httputil" "net/url" + "strconv" "strings" + "time" ) type proxyHandler struct { @@ -81,27 +87,14 @@ func director(target *url.URL, req *http.Request) { // Modify the http response func modifyResponse(res *http.Response) error { - - if res.Request.Method == http.MethodPut { - // PUT manifest - matchMF, _, _ := util.MatchManifestURL(res.Request) - if matchMF { - if res.StatusCode == http.StatusCreated { - log.Infof("we need to insert data here ... ") - } else if res.StatusCode >= 202 || res.StatusCode <= 511 { - log.Infof("we need to roll back data here ... ") - } - } - - // PUT blob - matchBB, _ := util.MatchPutBlobURL(res.Request) - if matchBB { - if res.StatusCode != http.StatusCreated { - log.Infof("we need to rollback DB and unlock digest ... ") - } - } + matchMF, _, _ := util.MatchPushManifest(res.Request) + if matchMF { + return handlerPutManifest(res) + } + matchBB, _ := util.MatchPutBlobURL(res.Request) + if matchBB { + return handlerPutBlob(res) } - return nil } @@ -117,6 +110,99 @@ func singleJoiningSlash(a, b string) string { return a + b } +func handlerPutManifest(res *http.Response) error { + mfInfo := res.Request.Context().Value(util.MFInfokKey) + mf, ok := mfInfo.(*util.MfInfo) + if !ok { + return errors.New("failed to convert manifest information context into MfInfo") + } + + defer func() { + _, err := mf.TagLock.Free() + if err != nil { + log.Errorf("Error to unlock in response handler, %v", err) + } + if err := mf.TagLock.Conn.Close(); err != nil { + log.Errorf("Error to close redis connection in response handler, %v", err) + } + }() + + // 201 + if res.StatusCode == http.StatusCreated { + af := &models.Artifact{ + PID: mf.ProjectID, + Repo: mf.Repository, + Tag: mf.Tag, + Digest: mf.Digest, + PushTime: time.Now(), + Kind: "Docker-Image", + } + + // insert or update + if !mf.Exist { + _, err := dao.AddArtifact(af) + if err != nil { + log.Errorf("Error to add artifact, %v", err) + return err + } + } + if mf.DigestChanged { + err := dao.UpdateArtifactDigest(af) + if err != nil { + log.Errorf("Error to add artifact, %v", err) + return err + } + } + + if !mf.Exist || mf.DigestChanged { + afnbs := []*models.ArtifactAndBlob{} + for _, d := range mf.Refrerence { + afnb := &models.ArtifactAndBlob{ + DigestAF: mf.Digest, + DigestBlob: d.Digest.String(), + } + afnbs = append(afnbs, afnb) + } + if err := dao.AddArtifactNBlobs(afnbs); err != nil { + log.Errorf("Error to add artifact and blobs in proxy response handler, %v", err) + return err + } + } + + } else if res.StatusCode >= 300 || res.StatusCode <= 511 { + if !mf.Exist { + success := subtractResources(mf) + if !success { + return errors.New("Error to release resource booked for the manifest") + } + } + } + + return nil +} + +func handlerPutBlob(res *http.Response) error { + if res.StatusCode != http.StatusCreated { + log.Infof("we need to rollback DB and unlock digest ... ") + } + return nil +} + +// used to release resource for failure case +func subtractResources(mfInfo *util.MfInfo) bool { + quotaMgr, err := quota.NewManager("project", strconv.FormatInt(mfInfo.ProjectID, 10)) + if err != nil { + log.Errorf("Error occurred when to new quota manager %v", err) + return false + } + + if err := quotaMgr.SubtractResources(*mfInfo.Quota); err != nil { + log.Errorf("Cannot get quota for the manifest %v", err) + return false + } + return true +} + // ServeHTTP ... func (ph proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { ph.handler.ServeHTTP(rw, req) diff --git a/src/core/middlewares/regquota/handler.go b/src/core/middlewares/regquota/handler.go deleted file mode 100644 index 465e99b9b..000000000 --- a/src/core/middlewares/regquota/handler.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package regquota - -import ( - "bytes" - "fmt" - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/schema1" - "github.com/docker/distribution/manifest/schema2" - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/core/middlewares/util" - "io/ioutil" - "net/http" -) - -type regQuotaHandler struct { - next http.Handler -} - -// New ... -func New(next http.Handler) http.Handler { - return ®QuotaHandler{ - next: next, - } -} - -// ServeHTTP PATCH manifest ... -func (rqh regQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - match, _, _ := util.MatchManifestURL(req) - if match { - var mfSize int64 - var mfDigest string - mediaType := req.Header.Get("Content-Type") - if req.Method == http.MethodPut { - if mediaType == schema1.MediaTypeManifest || - mediaType == schema1.MediaTypeSignedManifest || - mediaType == schema2.MediaTypeManifest { - data, err := ioutil.ReadAll(req.Body) - if err != nil { - log.Warningf("Error occurred when to copy manifest body %v", err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to decode manifest body %v", err)), http.StatusInternalServerError) - return - } - req.Body = ioutil.NopCloser(bytes.NewBuffer(data)) - - _, desc, err := distribution.UnmarshalManifest(mediaType, data) - if err != nil { - log.Warningf("Error occurred when to Unmarshal Manifest %v", err) - http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to Unmarshal Manifest %v", err)), http.StatusInternalServerError) - return - } - mfDigest = desc.Digest.String() - mfSize = desc.Size - log.Infof("manifest digest... %s", mfDigest) - log.Infof("manifest size... %v", mfSize) - } - } - } - - rqh.next.ServeHTTP(rw, req) -} diff --git a/src/core/middlewares/util/util.go b/src/core/middlewares/util/util.go index 296caeede..6cf95ce9e 100644 --- a/src/core/middlewares/util/util.go +++ b/src/core/middlewares/util/util.go @@ -16,9 +16,12 @@ package util import ( "encoding/json" + "github.com/docker/distribution" "github.com/goharbor/harbor/src/common/models" + "github.com/goharbor/harbor/src/common/quota" "github.com/goharbor/harbor/src/common/utils/clair" "github.com/goharbor/harbor/src/common/utils/log" + common_redis "github.com/goharbor/harbor/src/common/utils/redis" "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/promgr" "github.com/goharbor/harbor/src/pkg/scan/whitelist" @@ -38,6 +41,8 @@ const ( // TokenUsername ... // TODO: temp solution, remove after vmware/harbor#2242 is resolved. TokenUsername = "harbor-core" + // MFInfokKey the context key for image tag redis lock + MFInfokKey = contextKey("ManifestLock") ) // ImageInfo ... @@ -48,6 +53,28 @@ type ImageInfo struct { Digest string } +// MfInfo ... +type MfInfo struct { + // basic information of a manifest + ProjectID int64 + Repository string + Tag string + Digest string + + // Exist is to index the existing of the manifest in DB. If false, it's an new image for uploading. + Exist bool + // DigestChanged true means the manifest exists but digest is changed. + // Probably it's a new image with existing repo/tag name or overwrite. + DigestChanged bool + + // used to block multiple push on same image. + TagLock *common_redis.Mutex + Refrerence []distribution.Descriptor + + // Quota is the resource applied for the manifest upload request. + Quota *quota.ResourceList +} + // JSONError wraps a concrete Code and Message, it's readable for docker deamon. type JSONError struct { Code string `json:"code,omitempty"`