Merge pull request #8288 from wy65701436/mf-workflow-base

Add quota workflow for manifest
This commit is contained in:
Wang Yan 2019-07-17 11:25:30 +08:00 committed by GitHub
commit 31c3413ddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 441 additions and 98 deletions

View File

@ -15,6 +15,7 @@
package dao package dao
import ( import (
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
"strings" "strings"
"time" "time"
@ -34,6 +35,12 @@ func AddArtifact(af *models.Artifact) (int64, error) {
return id, nil return id, nil
} }
// UpdateArtifactDigest ...
func UpdateArtifactDigest(af *models.Artifact) error {
_, err := GetOrmer().Update(af, "digest")
return err
}
// DeleteArtifact ... // DeleteArtifact ...
func DeleteArtifact(id int64) error { func DeleteArtifact(id int64) error {
_, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete() _, err := GetOrmer().QueryTable(&models.Artifact{}).Filter("ID", id).Delete()
@ -58,3 +65,34 @@ func DeleteByTag(projectID int, repo, tag string) error {
} }
return nil return nil
} }
// ListArtifacts list artifacts according to the query conditions
func ListArtifacts(query *models.ArtifactQuery) ([]*models.Artifact, error) {
qs := getArtifactQuerySetter(query)
if query.Size > 0 {
qs = qs.Limit(query.Size)
if query.Page > 0 {
qs = qs.Offset((query.Page - 1) * query.Size)
}
}
afs := []*models.Artifact{}
_, err := qs.All(&afs)
return afs, err
}
func getArtifactQuerySetter(query *models.ArtifactQuery) orm.QuerySeter {
qs := GetOrmer().QueryTable(&models.Artifact{})
if query.PID != 0 {
qs = qs.Filter("PID", query.PID)
}
if len(query.Repo) > 0 {
qs = qs.Filter("Repo", query.Repo)
}
if len(query.Tag) > 0 {
qs = qs.Filter("Tag", query.Tag)
}
if len(query.Digest) > 0 {
qs = qs.Filter("Digest", query.Digest)
}
return qs
}

View File

@ -40,6 +40,24 @@ func TestAddArtifact(t *testing.T) {
} }
func TestUpdateArtifactDigest(t *testing.T) {
af := &models.Artifact{
PID: 1,
Repo: "hello-world",
Tag: "v2.0",
Digest: "4321abcd",
Kind: "image",
}
// add
_, err := AddArtifact(af)
require.Nil(t, err)
af.Digest = "update_4321abcd"
require.Nil(t, UpdateArtifactDigest(af))
assert.Equal(t, af.Digest, "update_4321abcd")
}
func TestDeleteArtifact(t *testing.T) { func TestDeleteArtifact(t *testing.T) {
af := &models.Artifact{ af := &models.Artifact{
PID: 1, PID: 1,
@ -90,3 +108,24 @@ func TestDeleteArtifactByTag(t *testing.T) {
err = DeleteByTag(1, "hello-world", "v1.2") err = DeleteByTag(1, "hello-world", "v1.2")
require.Nil(t, err) require.Nil(t, err)
} }
func TestListArtifacts(t *testing.T) {
af := &models.Artifact{
PID: 1,
Repo: "hello-world",
Tag: "v3.0",
Digest: "TestListArtifacts",
Kind: "image",
}
// add
_, err := AddArtifact(af)
require.Nil(t, err)
afs, err := ListArtifacts(&models.ArtifactQuery{
PID: 1,
Repo: "hello-world",
Tag: "v3.0",
})
require.Nil(t, err)
assert.Equal(t, 1, len(afs))
}

View File

@ -21,3 +21,12 @@ type Artifact struct {
func (af *Artifact) TableName() string { func (af *Artifact) TableName() string {
return "artifact" return "artifact"
} }
// ArtifactQuery ...
type ArtifactQuery struct {
PID int64
Repo string
Tag string
Digest string
Pagination
}

View File

@ -18,10 +18,10 @@ import (
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/middlewares/blobquota" "github.com/goharbor/harbor/src/core/middlewares/blobquota"
"github.com/goharbor/harbor/src/core/middlewares/contenttrust" "github.com/goharbor/harbor/src/core/middlewares/contenttrust"
"github.com/goharbor/harbor/src/core/middlewares/countquota"
"github.com/goharbor/harbor/src/core/middlewares/listrepo" "github.com/goharbor/harbor/src/core/middlewares/listrepo"
"github.com/goharbor/harbor/src/core/middlewares/multiplmanifest" "github.com/goharbor/harbor/src/core/middlewares/multiplmanifest"
"github.com/goharbor/harbor/src/core/middlewares/readonly" "github.com/goharbor/harbor/src/core/middlewares/readonly"
"github.com/goharbor/harbor/src/core/middlewares/regquota"
"github.com/goharbor/harbor/src/core/middlewares/url" "github.com/goharbor/harbor/src/core/middlewares/url"
"github.com/goharbor/harbor/src/core/middlewares/vulnerable" "github.com/goharbor/harbor/src/core/middlewares/vulnerable"
"github.com/justinas/alice" "github.com/justinas/alice"
@ -65,7 +65,7 @@ func (b *DefaultCreator) geMiddleware(mName string) alice.Constructor {
LISTREPO: func(next http.Handler) http.Handler { return listrepo.New(next) }, LISTREPO: func(next http.Handler) http.Handler { return listrepo.New(next) },
CONTENTTRUST: func(next http.Handler) http.Handler { return contenttrust.New(next) }, CONTENTTRUST: func(next http.Handler) http.Handler { return contenttrust.New(next) },
VULNERABLE: func(next http.Handler) http.Handler { return vulnerable.New(next) }, VULNERABLE: func(next http.Handler) http.Handler { return vulnerable.New(next) },
REGQUOTA: func(next http.Handler) http.Handler { return regquota.New(next) }, COUNTQUOTA: func(next http.Handler) http.Handler { return countquota.New(next) },
BLOBQUOTA: func(next http.Handler) http.Handler { return blobquota.New(next) }, BLOBQUOTA: func(next http.Handler) http.Handler { return blobquota.New(next) },
} }
return middlewares[mName] return middlewares[mName]

View File

@ -22,9 +22,9 @@ const (
LISTREPO = "listrepo" LISTREPO = "listrepo"
CONTENTTRUST = "contenttrust" CONTENTTRUST = "contenttrust"
VULNERABLE = "vulnerable" VULNERABLE = "vulnerable"
REGQUOTA = "regquota" COUNTQUOTA = "countquota"
BLOBQUOTA = "blobquota" BLOBQUOTA = "blobquota"
) )
// Middlewares with sequential organization // Middlewares with sequential organization
var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, BLOBQUOTA, REGQUOTA} var Middlewares = []string{READONLY, URL, MUITIPLEMANIFEST, LISTREPO, CONTENTTRUST, VULNERABLE, BLOBQUOTA, COUNTQUOTA}

View File

@ -0,0 +1,218 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package countquota
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/garyburd/redigo/redis"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
common_util "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
common_redis "github.com/goharbor/harbor/src/common/utils/redis"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/middlewares/util"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
)
const (
dialConnectionTimeout = 30 * time.Second
dialReadTimeout = time.Minute + 10*time.Second
dialWriteTimeout = 10 * time.Second
)
// ErrRequireQuota ...
var ErrRequireQuota = errors.New("cannot get quota on project for request")
type countQuotaHandler struct {
next http.Handler
mfInfo *util.MfInfo
}
// New ...
func New(next http.Handler) http.Handler {
return &countQuotaHandler{
next: next,
}
}
// ServeHTTP manifest ...
func (cqh *countQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
match, repository, tag := util.MatchPushManifest(req)
if match {
mfInfo := &util.MfInfo{
Repository: repository,
Tag: tag,
}
cqh.mfInfo = mfInfo
mediaType := req.Header.Get("Content-Type")
if mediaType == schema1.MediaTypeManifest ||
mediaType == schema1.MediaTypeSignedManifest ||
mediaType == schema2.MediaTypeManifest {
tagLock, err := cqh.tryLockTag()
if err != nil {
log.Warningf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to lock tag %s:%s with digest %v", repository, tag, err)), http.StatusInternalServerError)
return
}
cqh.mfInfo.TagLock = tagLock
data, err := ioutil.ReadAll(req.Body)
if err != nil {
cqh.tryFreeTag()
log.Warningf("Error occurred when to copy manifest body %v", err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to decode manifest body %v", err)), http.StatusInternalServerError)
return
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(data))
manifest, desc, err := distribution.UnmarshalManifest(mediaType, data)
if err != nil {
cqh.tryFreeTag()
log.Warningf("Error occurred when to Unmarshal Manifest %v", err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to Unmarshal Manifest %v", err)), http.StatusInternalServerError)
return
}
cqh.mfInfo.Refrerence = manifest.References()
cqh.mfInfo.Digest = desc.Digest.String()
projectID, err := cqh.getProjectID(strings.Split(repository, "/")[0])
if err != nil {
log.Warningf("Error occurred when to get project ID %v", err)
return
}
cqh.mfInfo.ProjectID = projectID
imageExist, af, err := cqh.imageExist()
if err != nil {
cqh.tryFreeTag()
log.Warningf("Error occurred when to check Manifest existence by repo and tag name %v", err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to check Manifest existence %v", err)), http.StatusInternalServerError)
return
}
cqh.mfInfo.Exist = imageExist
if imageExist {
if af.Digest != cqh.mfInfo.Digest {
cqh.mfInfo.DigestChanged = true
}
} else {
quotaRes := &quota.ResourceList{
quota.ResourceCount: 1,
}
err := cqh.tryRequireQuota(quotaRes)
if err != nil {
cqh.tryFreeTag()
log.Errorf("Cannot get quota for the manifest %v", err)
if err == ErrRequireQuota {
http.Error(rw, util.MarshalError("StatusNotAcceptable", fmt.Sprintf("Cannot get quota for the manifest %v", err)), http.StatusNotAcceptable)
return
}
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to require quota for the manifest %v", err)), http.StatusInternalServerError)
return
}
cqh.mfInfo.Quota = quotaRes
}
*req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, mfInfo)))
}
}
cqh.next.ServeHTTP(rw, req)
}
// tryLockTag locks tag with redis ...
func (cqh *countQuotaHandler) tryLockTag() (*common_redis.Mutex, error) {
con, err := redis.DialURL(
config.GetRedisOfRegURL(),
redis.DialConnectTimeout(dialConnectionTimeout),
redis.DialReadTimeout(dialReadTimeout),
redis.DialWriteTimeout(dialWriteTimeout),
)
if err != nil {
return nil, err
}
tagLock := common_redis.New(con, cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag, common_util.GenerateRandomString())
success, err := tagLock.Require()
if err != nil {
return nil, err
}
if !success {
return nil, fmt.Errorf("unable to lock tag: %s ", cqh.mfInfo.Repository+":"+cqh.mfInfo.Tag)
}
return tagLock, nil
}
func (cqh *countQuotaHandler) tryFreeTag() {
_, err := cqh.mfInfo.TagLock.Free()
if err != nil {
log.Warningf("Error to unlock tag: %s, with error: %v ", cqh.mfInfo.Tag, err)
}
}
// check the existence of a artifact, if exist, the method will return the artifact model
func (cqh *countQuotaHandler) imageExist() (exist bool, af *models.Artifact, err error) {
artifactQuery := &models.ArtifactQuery{
PID: cqh.mfInfo.ProjectID,
Repo: cqh.mfInfo.Repository,
Tag: cqh.mfInfo.Tag,
}
afs, err := dao.ListArtifacts(artifactQuery)
if err != nil {
log.Errorf("Error occurred when to get project ID %v", err)
return false, nil, err
}
if len(afs) > 0 {
return true, afs[0], nil
}
return false, nil, nil
}
func (cqh *countQuotaHandler) tryRequireQuota(quotaRes *quota.ResourceList) error {
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(cqh.mfInfo.ProjectID, 10))
if err != nil {
log.Errorf("Error occurred when to new quota manager %v", err)
return err
}
if err := quotaMgr.AddResources(*quotaRes); err != nil {
log.Errorf("Cannot get quota for the manifest %v", err)
return ErrRequireQuota
}
return nil
}
func (cqh *countQuotaHandler) getProjectID(name string) (int64, error) {
project, err := dao.GetProjectByName(name)
if err != nil {
return 0, err
}
if project != nil {
return project.ProjectID, nil
}
return 0, fmt.Errorf("project %s is not found", name)
}

View File

@ -15,13 +15,19 @@
package registryproxy package registryproxy
import ( import (
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/middlewares/util" "github.com/goharbor/harbor/src/core/middlewares/util"
"github.com/pkg/errors"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"strconv"
"strings" "strings"
"time"
) )
type proxyHandler struct { type proxyHandler struct {
@ -81,27 +87,14 @@ func director(target *url.URL, req *http.Request) {
// Modify the http response // Modify the http response
func modifyResponse(res *http.Response) error { func modifyResponse(res *http.Response) error {
matchMF, _, _ := util.MatchPushManifest(res.Request)
if res.Request.Method == http.MethodPut { if matchMF {
// PUT manifest return handlerPutManifest(res)
matchMF, _, _ := util.MatchManifestURL(res.Request) }
if matchMF { matchBB, _ := util.MatchPutBlobURL(res.Request)
if res.StatusCode == http.StatusCreated { if matchBB {
log.Infof("we need to insert data here ... ") return handlerPutBlob(res)
} else if res.StatusCode >= 202 || res.StatusCode <= 511 {
log.Infof("we need to roll back data here ... ")
}
}
// PUT blob
matchBB, _ := util.MatchPutBlobURL(res.Request)
if matchBB {
if res.StatusCode != http.StatusCreated {
log.Infof("we need to rollback DB and unlock digest ... ")
}
}
} }
return nil return nil
} }
@ -117,6 +110,99 @@ func singleJoiningSlash(a, b string) string {
return a + b return a + b
} }
func handlerPutManifest(res *http.Response) error {
mfInfo := res.Request.Context().Value(util.MFInfokKey)
mf, ok := mfInfo.(*util.MfInfo)
if !ok {
return errors.New("failed to convert manifest information context into MfInfo")
}
defer func() {
_, err := mf.TagLock.Free()
if err != nil {
log.Errorf("Error to unlock in response handler, %v", err)
}
if err := mf.TagLock.Conn.Close(); err != nil {
log.Errorf("Error to close redis connection in response handler, %v", err)
}
}()
// 201
if res.StatusCode == http.StatusCreated {
af := &models.Artifact{
PID: mf.ProjectID,
Repo: mf.Repository,
Tag: mf.Tag,
Digest: mf.Digest,
PushTime: time.Now(),
Kind: "Docker-Image",
}
// insert or update
if !mf.Exist {
_, err := dao.AddArtifact(af)
if err != nil {
log.Errorf("Error to add artifact, %v", err)
return err
}
}
if mf.DigestChanged {
err := dao.UpdateArtifactDigest(af)
if err != nil {
log.Errorf("Error to add artifact, %v", err)
return err
}
}
if !mf.Exist || mf.DigestChanged {
afnbs := []*models.ArtifactAndBlob{}
for _, d := range mf.Refrerence {
afnb := &models.ArtifactAndBlob{
DigestAF: mf.Digest,
DigestBlob: d.Digest.String(),
}
afnbs = append(afnbs, afnb)
}
if err := dao.AddArtifactNBlobs(afnbs); err != nil {
log.Errorf("Error to add artifact and blobs in proxy response handler, %v", err)
return err
}
}
} else if res.StatusCode >= 300 || res.StatusCode <= 511 {
if !mf.Exist {
success := subtractResources(mf)
if !success {
return errors.New("Error to release resource booked for the manifest")
}
}
}
return nil
}
func handlerPutBlob(res *http.Response) error {
if res.StatusCode != http.StatusCreated {
log.Infof("we need to rollback DB and unlock digest ... ")
}
return nil
}
// used to release resource for failure case
func subtractResources(mfInfo *util.MfInfo) bool {
quotaMgr, err := quota.NewManager("project", strconv.FormatInt(mfInfo.ProjectID, 10))
if err != nil {
log.Errorf("Error occurred when to new quota manager %v", err)
return false
}
if err := quotaMgr.SubtractResources(*mfInfo.Quota); err != nil {
log.Errorf("Cannot get quota for the manifest %v", err)
return false
}
return true
}
// ServeHTTP ... // ServeHTTP ...
func (ph proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func (ph proxyHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
ph.handler.ServeHTTP(rw, req) ph.handler.ServeHTTP(rw, req)

View File

@ -1,74 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package regquota
import (
"bytes"
"fmt"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/middlewares/util"
"io/ioutil"
"net/http"
)
type regQuotaHandler struct {
next http.Handler
}
// New ...
func New(next http.Handler) http.Handler {
return &regQuotaHandler{
next: next,
}
}
// ServeHTTP PATCH manifest ...
func (rqh regQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
match, _, _ := util.MatchManifestURL(req)
if match {
var mfSize int64
var mfDigest string
mediaType := req.Header.Get("Content-Type")
if req.Method == http.MethodPut {
if mediaType == schema1.MediaTypeManifest ||
mediaType == schema1.MediaTypeSignedManifest ||
mediaType == schema2.MediaTypeManifest {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Warningf("Error occurred when to copy manifest body %v", err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to decode manifest body %v", err)), http.StatusInternalServerError)
return
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(data))
_, desc, err := distribution.UnmarshalManifest(mediaType, data)
if err != nil {
log.Warningf("Error occurred when to Unmarshal Manifest %v", err)
http.Error(rw, util.MarshalError("InternalServerError", fmt.Sprintf("Error occurred when to Unmarshal Manifest %v", err)), http.StatusInternalServerError)
return
}
mfDigest = desc.Digest.String()
mfSize = desc.Size
log.Infof("manifest digest... %s", mfDigest)
log.Infof("manifest size... %v", mfSize)
}
}
}
rqh.next.ServeHTTP(rw, req)
}

View File

@ -16,9 +16,12 @@ package util
import ( import (
"encoding/json" "encoding/json"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/common/utils/clair" "github.com/goharbor/harbor/src/common/utils/clair"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
common_redis "github.com/goharbor/harbor/src/common/utils/redis"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr" "github.com/goharbor/harbor/src/core/promgr"
"github.com/goharbor/harbor/src/pkg/scan/whitelist" "github.com/goharbor/harbor/src/pkg/scan/whitelist"
@ -38,6 +41,8 @@ const (
// TokenUsername ... // TokenUsername ...
// TODO: temp solution, remove after vmware/harbor#2242 is resolved. // TODO: temp solution, remove after vmware/harbor#2242 is resolved.
TokenUsername = "harbor-core" TokenUsername = "harbor-core"
// MFInfokKey the context key for image tag redis lock
MFInfokKey = contextKey("ManifestLock")
) )
// ImageInfo ... // ImageInfo ...
@ -48,6 +53,28 @@ type ImageInfo struct {
Digest string Digest string
} }
// MfInfo ...
type MfInfo struct {
// basic information of a manifest
ProjectID int64
Repository string
Tag string
Digest string
// Exist is to index the existing of the manifest in DB. If false, it's an new image for uploading.
Exist bool
// DigestChanged true means the manifest exists but digest is changed.
// Probably it's a new image with existing repo/tag name or overwrite.
DigestChanged bool
// used to block multiple push on same image.
TagLock *common_redis.Mutex
Refrerence []distribution.Descriptor
// Quota is the resource applied for the manifest upload request.
Quota *quota.ResourceList
}
// JSONError wraps a concrete Code and Message, it's readable for docker deamon. // JSONError wraps a concrete Code and Message, it's readable for docker deamon.
type JSONError struct { type JSONError struct {
Code string `json:"code,omitempty"` Code string `json:"code,omitempty"`