Merge pull request #8508 from heww/image-count-quota

feat(quota,middleware): image count quota support
This commit is contained in:
Wang Yan 2019-08-02 13:24:36 +08:00 committed by GitHub
commit 7c3858f28c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 724 additions and 252 deletions

View File

@ -15,10 +15,11 @@
package dao
import (
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/models"
"strings"
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/models"
)
// AddArtifact ...
@ -54,16 +55,17 @@ func DeleteArtifact(id int64) error {
}
// DeleteArtifactByDigest ...
func DeleteArtifactByDigest(digest string) error {
_, err := GetOrmer().Raw(`delete from artifact where digest = ? `, digest).Exec()
func DeleteArtifactByDigest(projectID int64, repo, digest string) error {
_, err := GetOrmer().Raw(`delete from artifact where project_id = ? and repo = ? and digest = ? `,
projectID, repo, digest).Exec()
if err != nil {
return err
}
return nil
}
// DeleteByTag ...
func DeleteByTag(projectID int, repo, tag string) error {
// DeleteArtifactByTag ...
func DeleteArtifactByTag(projectID int64, repo, tag string) error {
_, err := GetOrmer().Raw(`delete from artifact where project_id = ? and repo = ? and tag = ? `,
projectID, repo, tag).Exec()
if err != nil {
@ -86,6 +88,18 @@ func ListArtifacts(query *models.ArtifactQuery) ([]*models.Artifact, error) {
return afs, err
}
// GetTotalOfArtifacts returns total of artifacts
func GetTotalOfArtifacts(query ...*models.ArtifactQuery) (int64, error) {
var qs orm.QuerySeter
if len(query) > 0 {
qs = getArtifactQuerySetter(query[0])
} else {
qs = GetOrmer().QueryTable(&models.Artifact{})
}
return qs.Count()
}
func getArtifactQuerySetter(query *models.ArtifactQuery) orm.QuerySeter {
qs := GetOrmer().QueryTable(&models.Artifact{})
if query.PID != 0 {

View File

@ -16,12 +16,11 @@ package dao
import (
"testing"
"time"
"github.com/goharbor/harbor/src/common/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"time"
)
func TestAddArtifact(t *testing.T) {
@ -111,7 +110,7 @@ func TestDeleteArtifactByDigest(t *testing.T) {
require.Nil(t, err)
// delete
err = DeleteArtifactByDigest(af.Digest)
err = DeleteArtifactByDigest(af.PID, af.Repo, af.Digest)
require.Nil(t, err)
}
@ -128,7 +127,7 @@ func TestDeleteArtifactByTag(t *testing.T) {
require.Nil(t, err)
// delete
err = DeleteByTag(1, "hello-world", "v1.2")
err = DeleteArtifactByTag(1, "hello-world", "v1.2")
require.Nil(t, err)
}
@ -152,3 +151,24 @@ func TestListArtifacts(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, 1, len(afs))
}
func TestGetTotalOfArtifacts(t *testing.T) {
af := &models.Artifact{
PID: 2,
Repo: "hello-world",
Tag: "v3.0",
Digest: "TestGetTotalOfArtifacts",
Kind: "image",
}
// add
_, err := AddArtifact(af)
require.Nil(t, err)
total, err := GetTotalOfArtifacts(&models.ArtifactQuery{
PID: 2,
Repo: "hello-world",
Tag: "v3.0",
})
require.Nil(t, err)
assert.Equal(t, int64(1), total)
}

View File

@ -42,7 +42,7 @@ var (
type deleteChartVersionBuilder struct {
}
func (m *deleteChartVersionBuilder) Build(req *http.Request) interceptor.Interceptor {
func (*deleteChartVersionBuilder) Build(req *http.Request) interceptor.Interceptor {
if req.Method != http.MethodDelete {
return nil
}
@ -78,7 +78,7 @@ func (m *deleteChartVersionBuilder) Build(req *http.Request) interceptor.Interce
type uploadChartVersionBuilder struct {
}
func (m *uploadChartVersionBuilder) Build(req *http.Request) interceptor.Interceptor {
func (*uploadChartVersionBuilder) Build(req *http.Request) interceptor.Interceptor {
if req.Method != http.MethodPost {
return nil
}

View File

@ -85,18 +85,18 @@ func chartVersionExists(namespace, chartName, version string) bool {
return !chartVersion.Removed
}
func computeQuotaForUpload(req *http.Request) types.ResourceList {
func computeQuotaForUpload(req *http.Request) (types.ResourceList, error) {
info, ok := util.ChartVersionInfoFromContext(req.Context())
if !ok {
return nil
return nil, errors.New("chart version info missing")
}
if chartVersionExists(info.Namespace, info.ChartName, info.Version) {
log.Debugf("Chart %s with version %s in namespace %s exists", info.ChartName, info.Version, info.Namespace)
return nil
return nil, nil
}
return types.ResourceList{types.ResourceCount: 1}
return types.ResourceList{types.ResourceCount: 1}, nil
}
func mutexKey(str ...string) string {

View File

@ -0,0 +1,195 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package countquota
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/middlewares/interceptor"
"github.com/goharbor/harbor/src/core/middlewares/interceptor/quota"
"github.com/goharbor/harbor/src/core/middlewares/util"
"github.com/opencontainers/go-digest"
)
var (
defaultBuilders = []interceptor.Builder{
&deleteManifestBuilder{},
&putManifestBuilder{},
}
)
type deleteManifestBuilder struct {
}
func (*deleteManifestBuilder) Build(req *http.Request) interceptor.Interceptor {
if req.Method != http.MethodDelete {
return nil
}
match, name, reference := util.MatchManifestURL(req)
if !match {
return nil
}
dgt, err := digest.Parse(reference)
if err != nil {
// Delete manifest only accept digest as reference
return nil
}
projectName := strings.Split(name, "/")[0]
project, err := dao.GetProjectByName(projectName)
if err != nil {
log.Errorf("Failed to get project %s, error: %v", projectName, err)
return nil
}
if project == nil {
log.Warningf("Project %s not found", projectName)
return nil
}
info := &util.MfInfo{
ProjectID: project.ProjectID,
Repository: name,
Digest: dgt.String(),
}
// Manifest info will be used by computeQuotaForUpload
*req = *req.WithContext(util.NewManifestInfoContext(req.Context(), info))
opts := []quota.Option{
quota.WithManager("project", strconv.FormatInt(project.ProjectID, 10)),
quota.WithAction(quota.SubtractAction),
quota.StatusCode(http.StatusAccepted),
quota.MutexKeys(mutexKey(info)),
quota.OnResources(computeQuotaForDelete),
quota.OnFulfilled(func(http.ResponseWriter, *http.Request) error {
return dao.DeleteArtifactByDigest(info.ProjectID, info.Repository, info.Digest)
}),
}
return quota.New(opts...)
}
type putManifestBuilder struct {
}
func (b *putManifestBuilder) Build(req *http.Request) interceptor.Interceptor {
if req.Method != http.MethodPut {
return nil
}
info, ok := util.ManifestInfoFromContext(req.Context())
if !ok {
// assert that manifest info will be set by others
return nil
}
opts := []quota.Option{
quota.WithManager("project", strconv.FormatInt(info.ProjectID, 10)),
quota.WithAction(quota.AddAction),
quota.StatusCode(http.StatusCreated),
quota.MutexKeys(mutexKey(info)),
quota.OnResources(computeQuotaForPut),
quota.OnFulfilled(func(http.ResponseWriter, *http.Request) error {
newManifest, overwriteTag := !info.Exist, info.DigestChanged
if newManifest {
if err := b.doNewManifest(info); err != nil {
log.Errorf("Failed to handle response for new manifest, error: %v", err)
}
} else if overwriteTag {
if err := b.doOverwriteTag(info); err != nil {
log.Errorf("Failed to handle response for overwrite tag, error: %v", err)
}
}
return nil
}),
}
return quota.New(opts...)
}
func (b *putManifestBuilder) doNewManifest(info *util.MfInfo) error {
artifact := &models.Artifact{
PID: info.ProjectID,
Repo: info.Repository,
Tag: info.Tag,
Digest: info.Digest,
PushTime: time.Now(),
Kind: "Docker-Image",
}
if _, err := dao.AddArtifact(artifact); err != nil {
return fmt.Errorf("error to add artifact, %v", err)
}
return b.attachBlobsToArtifact(info)
}
func (b *putManifestBuilder) doOverwriteTag(info *util.MfInfo) error {
artifact := &models.Artifact{
ID: info.ArtifactID,
PID: info.ProjectID,
Repo: info.Repository,
Tag: info.Tag,
Digest: info.Digest,
PushTime: time.Now(),
Kind: "Docker-Image",
}
if err := dao.UpdateArtifactDigest(artifact); err != nil {
return fmt.Errorf("error to update artifact, %v", err)
}
return b.attachBlobsToArtifact(info)
}
func (b *putManifestBuilder) attachBlobsToArtifact(info *util.MfInfo) error {
self := &models.ArtifactAndBlob{
DigestAF: info.Digest,
DigestBlob: info.Digest,
}
artifactBlobs := append([]*models.ArtifactAndBlob{}, self)
for _, d := range info.Refrerence {
artifactBlob := &models.ArtifactAndBlob{
DigestAF: info.Digest,
DigestBlob: d.Digest.String(),
}
artifactBlobs = append(artifactBlobs, artifactBlob)
}
if err := dao.AddArtifactNBlobs(artifactBlobs); err != nil {
if strings.Contains(err.Error(), dao.ErrDupRows.Error()) {
log.Warning("the artifact and blobs have already in the DB, it maybe an existing image with different tag")
return nil
}
return fmt.Errorf("error to add artifact and blobs in proxy response handler, %v", err)
}
return nil
}

View File

@ -16,50 +16,57 @@ package countquota
import (
"fmt"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/middlewares/util"
"net/http"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/middlewares/interceptor"
"github.com/goharbor/harbor/src/core/middlewares/util"
)
type countQuotaHandler struct {
next http.Handler
builders []interceptor.Builder
next http.Handler
}
// New ...
func New(next http.Handler) http.Handler {
func New(next http.Handler, builders ...interceptor.Builder) http.Handler {
if len(builders) == 0 {
builders = defaultBuilders
}
return &countQuotaHandler{
next: next,
builders: builders,
next: next,
}
}
// ServeHTTP manifest ...
func (cqh *countQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
countInteceptor := getInteceptor(req)
if countInteceptor == nil {
cqh.next.ServeHTTP(rw, req)
func (h *countQuotaHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
interceptor := h.getInterceptor(req)
if interceptor == nil {
h.next.ServeHTTP(rw, req)
return
}
// handler request
if err := countInteceptor.HandleRequest(req); err != nil {
if err := interceptor.HandleRequest(req); err != nil {
log.Warningf("Error occurred when to handle request in count quota handler: %v", err)
http.Error(rw, util.MarshalError("InternalError", fmt.Sprintf("Error occurred when to handle request in count quota handler: %v", err)),
http.StatusInternalServerError)
return
}
cqh.next.ServeHTTP(rw, req)
// handler response
countInteceptor.HandleResponse(*rw.(*util.CustomResponseWriter), req)
h.next.ServeHTTP(rw, req)
interceptor.HandleResponse(rw, req)
}
func getInteceptor(req *http.Request) util.RegInterceptor {
// PUT /v2/<name>/manifests/<reference>
matchPushMF, repository, tag := util.MatchPushManifest(req)
if matchPushMF {
mfInfo := util.MfInfo{}
mfInfo.Repository = repository
mfInfo.Tag = tag
return NewPutManifestInterceptor(&mfInfo)
func (h *countQuotaHandler) getInterceptor(req *http.Request) interceptor.Interceptor {
for _, builder := range h.builders {
interceptor := builder.Build(req)
if interceptor != nil {
return interceptor
}
}
return nil
}

View File

@ -0,0 +1,306 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package countquota
import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/docker/distribution"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/core/middlewares/util"
"github.com/goharbor/harbor/src/pkg/types"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/suite"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func getProjectCountUsage(projectID int64) (int64, error) {
usage := models.QuotaUsage{Reference: "project", ReferenceID: fmt.Sprintf("%d", projectID)}
err := dao.GetOrmer().Read(&usage, "reference", "reference_id")
if err != nil {
return 0, err
}
used, err := types.NewResourceList(usage.Used)
if err != nil {
return 0, err
}
return used[types.ResourceCount], nil
}
func randomString(n int) string {
const letterBytes = "abcdefghijklmnopqrstuvwxyz"
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}
func doDeleteManifestRequest(projectID int64, projectName, name, dgt string, next ...http.HandlerFunc) int {
repository := fmt.Sprintf("%s/%s", projectName, name)
url := fmt.Sprintf("/v2/%s/manifests/%s", repository, dgt)
req, _ := http.NewRequest("DELETE", url, nil)
ctx := util.NewManifestInfoContext(req.Context(), &util.MfInfo{
ProjectID: projectID,
Repository: repository,
Digest: dgt,
})
rr := httptest.NewRecorder()
var n http.HandlerFunc
if len(next) > 0 {
n = next[0]
} else {
n = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusAccepted)
}
}
h := New(http.HandlerFunc(n))
h.ServeHTTP(util.NewCustomResponseWriter(rr), req.WithContext(ctx))
return rr.Code
}
func doPutManifestRequest(projectID int64, projectName, name, tag, dgt string, next ...http.HandlerFunc) int {
repository := fmt.Sprintf("%s/%s", projectName, name)
url := fmt.Sprintf("/v2/%s/manifests/%s", repository, tag)
req, _ := http.NewRequest("PUT", url, nil)
ctx := util.NewManifestInfoContext(req.Context(), &util.MfInfo{
ProjectID: projectID,
Repository: repository,
Tag: tag,
Digest: dgt,
Refrerence: []distribution.Descriptor{
{Digest: digest.FromString(randomString(15))},
{Digest: digest.FromString(randomString(15))},
},
})
rr := httptest.NewRecorder()
var n http.HandlerFunc
if len(next) > 0 {
n = next[0]
} else {
n = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusCreated)
}
}
h := New(http.HandlerFunc(n))
h.ServeHTTP(util.NewCustomResponseWriter(rr), req.WithContext(ctx))
return rr.Code
}
type HandlerSuite struct {
suite.Suite
}
func (suite *HandlerSuite) addProject(projectName string) int64 {
projectID, err := dao.AddProject(models.Project{
Name: projectName,
OwnerID: 1,
})
suite.Nil(err, fmt.Sprintf("Add project failed for %s", projectName))
return projectID
}
func (suite *HandlerSuite) checkCountUsage(expected, projectID int64) {
count, err := getProjectCountUsage(projectID)
suite.Nil(err, fmt.Sprintf("Failed to get count usage of project %d, error: %v", projectID, err))
suite.Equal(expected, count, "Failed to check count usage for project %d", projectID)
}
func (suite *HandlerSuite) TearDownTest() {
dao.ClearTable("artifact")
dao.ClearTable("blob")
dao.ClearTable("artifact_blob")
dao.ClearTable("quota")
dao.ClearTable("quota_usage")
}
func (suite *HandlerSuite) TestPutManifestCreated() {
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
dgt := digest.FromString(randomString(15)).String()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(1, projectID)
total, err := dao.GetTotalOfArtifacts(&models.ArtifactQuery{Digest: dgt})
suite.Nil(err)
suite.Equal(int64(1), total, "Artifact should be created")
if exists, err := dao.HasBlobInProject(projectID, dgt); suite.Nil(err) {
suite.True(exists)
}
// Push the photon:latest with photon:dev
code = doPutManifestRequest(projectID, projectName, "photon", "dev", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(2, projectID)
total, err = dao.GetTotalOfArtifacts(&models.ArtifactQuery{Digest: dgt})
suite.Nil(err)
suite.Equal(int64(2), total, "Artifact should be created")
// Push the photon:latest with new image
newDgt := digest.FromString(randomString(15)).String()
code = doPutManifestRequest(projectID, projectName, "photon", "latest", newDgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(2, projectID)
total, err = dao.GetTotalOfArtifacts(&models.ArtifactQuery{Digest: newDgt})
suite.Nil(err)
suite.Equal(int64(1), total, "Artifact should be updated")
}
func (suite *HandlerSuite) TestPutManifestFailed() {
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
next := func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
dgt := digest.FromString(randomString(15)).String()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt, next)
suite.Equal(http.StatusInternalServerError, code)
suite.checkCountUsage(0, projectID)
total, err := dao.GetTotalOfArtifacts(&models.ArtifactQuery{Digest: dgt})
suite.Nil(err)
suite.Equal(int64(0), total, "Artifact should not be created")
if exists, err := dao.HasBlobInProject(projectID, dgt); suite.Nil(err) {
suite.False(exists)
}
}
func (suite *HandlerSuite) TestDeleteManifestAccepted() {
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
dgt := digest.FromString(randomString(15)).String()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(1, projectID)
code = doDeleteManifestRequest(projectID, projectName, "photon", dgt)
suite.Equal(http.StatusAccepted, code)
suite.checkCountUsage(0, projectID)
}
func (suite *HandlerSuite) TestDeleteManifestFailed() {
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
dgt := digest.FromString(randomString(15)).String()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(1, projectID)
next := func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
code = doDeleteManifestRequest(projectID, projectName, "photon", dgt, next)
suite.Equal(http.StatusInternalServerError, code)
suite.checkCountUsage(1, projectID)
}
func (suite *HandlerSuite) TestDeleteManifesInMultiProjects() {
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
dgt := digest.FromString(randomString(15)).String()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(1, projectID)
{
projectName := randomString(5)
projectID := suite.addProject(projectName)
defer func() {
dao.DeleteProject(projectID)
}()
code := doPutManifestRequest(projectID, projectName, "photon", "latest", dgt)
suite.Equal(http.StatusCreated, code)
suite.checkCountUsage(1, projectID)
code = doDeleteManifestRequest(projectID, projectName, "photon", dgt)
suite.Equal(http.StatusAccepted, code)
suite.checkCountUsage(0, projectID)
}
code = doDeleteManifestRequest(projectID, projectName, "photon", dgt)
suite.Equal(http.StatusAccepted, code)
suite.checkCountUsage(0, projectID)
}
func TestMain(m *testing.M) {
dao.PrepareTestForPostgresSQL()
if result := m.Run(); result != 0 {
os.Exit(result)
}
}
func TestRunHandlerSuite(t *testing.T) {
suite.Run(t, new(HandlerSuite))
}

View File

@ -1,211 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package countquota
import (
"context"
"errors"
"fmt"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
common_util "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
common_redis "github.com/goharbor/harbor/src/common/utils/redis"
"github.com/goharbor/harbor/src/core/middlewares/util"
"net/http"
"strings"
"time"
)
// PutManifestInterceptor ...
type PutManifestInterceptor struct {
mfInfo *util.MfInfo
}
// NewPutManifestInterceptor ...
func NewPutManifestInterceptor(mfInfo *util.MfInfo) *PutManifestInterceptor {
return &PutManifestInterceptor{
mfInfo: mfInfo,
}
}
// HandleRequest ...
// The context has already contain mfinfo as it was put by size quota handler.
func (pmi *PutManifestInterceptor) HandleRequest(req *http.Request) error {
mfInfo := req.Context().Value(util.MFInfokKey)
mf, ok := mfInfo.(*util.MfInfo)
if !ok {
return errors.New("failed to get manifest infor from context")
}
tagLock, err := tryLockTag(mf)
if err != nil {
return fmt.Errorf("error occurred when to lock tag %s:%s with digest %v", mf.Repository, mf.Tag, err)
}
mf.TagLock = tagLock
imageExist, af, err := imageExist(mf)
if err != nil {
tryFreeTag(mf)
return fmt.Errorf("error occurred when to check Manifest existence %v", err)
}
mf.Exist = imageExist
if imageExist {
if af.Digest != mf.Digest {
mf.DigestChanged = true
}
} else {
quotaRes := &quota.ResourceList{
quota.ResourceCount: 1,
}
err := util.TryRequireQuota(mf.ProjectID, quotaRes)
if err != nil {
tryFreeTag(mf)
log.Errorf("Cannot get quota for the manifest %v", err)
if err == util.ErrRequireQuota {
return err
}
return fmt.Errorf("error occurred when to require quota for the manifest %v", err)
}
mf.Quota = quotaRes
}
*req = *(req.WithContext(context.WithValue(req.Context(), util.MFInfokKey, mf)))
return nil
}
// HandleResponse ...
func (pmi *PutManifestInterceptor) HandleResponse(rw util.CustomResponseWriter, req *http.Request) {
mfInfo := req.Context().Value(util.MFInfokKey)
mf, ok := mfInfo.(*util.MfInfo)
if !ok {
log.Error("failed to convert manifest information context into MfInfo")
return
}
defer func() {
_, err := mf.TagLock.Free()
if err != nil {
log.Errorf("Error to unlock in response handler, %v", err)
}
if err := mf.TagLock.Conn.Close(); err != nil {
log.Errorf("Error to close redis connection in response handler, %v", err)
}
}()
// 201
if rw.Status() == http.StatusCreated {
af := &models.Artifact{
PID: mf.ProjectID,
Repo: mf.Repository,
Tag: mf.Tag,
Digest: mf.Digest,
PushTime: time.Now(),
Kind: "Docker-Image",
}
// insert or update
if !mf.Exist {
_, err := dao.AddArtifact(af)
if err != nil {
log.Errorf("Error to add artifact, %v", err)
return
}
}
if mf.DigestChanged {
err := dao.UpdateArtifactDigest(af)
if err != nil {
log.Errorf("Error to add artifact, %v", err)
return
}
}
if !mf.Exist || mf.DigestChanged {
afnbs := []*models.ArtifactAndBlob{}
self := &models.ArtifactAndBlob{
DigestAF: mf.Digest,
DigestBlob: mf.Digest,
}
afnbs = append(afnbs, self)
for _, d := range mf.Refrerence {
afnb := &models.ArtifactAndBlob{
DigestAF: mf.Digest,
DigestBlob: d.Digest.String(),
}
afnbs = append(afnbs, afnb)
}
if err := dao.AddArtifactNBlobs(afnbs); err != nil {
if strings.Contains(err.Error(), dao.ErrDupRows.Error()) {
log.Warning("the artifact and blobs have already in the DB, it maybe an existing image with different tag")
return
}
log.Errorf("Error to add artifact and blobs in proxy response handler, %v", err)
return
}
}
} else if rw.Status() >= 300 && rw.Status() <= 511 {
if !mf.Exist {
success := util.TryFreeQuota(mf.ProjectID, mf.Quota)
if !success {
log.Error("error to release resource booked for the manifest")
return
}
}
}
return
}
// tryLockTag locks tag with redis ...
func tryLockTag(mfInfo *util.MfInfo) (*common_redis.Mutex, error) {
con, err := util.GetRegRedisCon()
if err != nil {
return nil, err
}
tagLock := common_redis.New(con, "Quota::manifest-lock::"+mfInfo.Repository+":"+mfInfo.Tag, common_util.GenerateRandomString())
success, err := tagLock.Require()
if err != nil {
return nil, err
}
if !success {
return nil, fmt.Errorf("unable to lock tag: %s ", mfInfo.Repository+":"+mfInfo.Tag)
}
return tagLock, nil
}
func tryFreeTag(mfInfo *util.MfInfo) {
_, err := mfInfo.TagLock.Free()
if err != nil {
log.Warningf("Error to unlock tag: %s, with error: %v ", mfInfo.Tag, err)
}
}
// check the existence of a artifact, if exist, the method will return the artifact model
func imageExist(mfInfo *util.MfInfo) (exist bool, af *models.Artifact, err error) {
artifactQuery := &models.ArtifactQuery{
PID: mfInfo.ProjectID,
Repo: mfInfo.Repository,
Tag: mfInfo.Tag,
}
afs, err := dao.ListArtifacts(artifactQuery)
if err != nil {
log.Errorf("Error occurred when to get project ID %v", err)
return false, nil, err
}
if len(afs) > 0 {
return true, afs[0], nil
}
return false, nil, nil
}

View File

@ -0,0 +1,92 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package countquota
import (
"errors"
"fmt"
"net/http"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/quota"
"github.com/goharbor/harbor/src/core/middlewares/util"
"github.com/goharbor/harbor/src/pkg/types"
)
func mutexKey(info *util.MfInfo) string {
if info.Tag != "" {
return "Quota::manifest-lock::" + info.Repository + ":" + info.Tag
}
return "Quota::manifest-lock::" + info.Repository + ":" + info.Digest
}
func computeQuotaForDelete(req *http.Request) (types.ResourceList, error) {
info, ok := util.ManifestInfoFromContext(req.Context())
if !ok {
return nil, errors.New("manifest info missing")
}
total, err := dao.GetTotalOfArtifacts(&models.ArtifactQuery{
PID: info.ProjectID,
Repo: info.Repository,
Digest: info.Digest,
})
if err != nil {
return nil, fmt.Errorf("error occurred when get artifacts %v ", err)
}
return types.ResourceList{types.ResourceCount: total}, nil
}
func computeQuotaForPut(req *http.Request) (types.ResourceList, error) {
info, ok := util.ManifestInfoFromContext(req.Context())
if !ok {
return nil, errors.New("manifest info missing")
}
artifact, err := getArtifact(info)
if err != nil {
return nil, fmt.Errorf("error occurred when to check Manifest existence %v", err)
}
if artifact != nil {
info.ArtifactID = artifact.ID
info.DigestChanged = artifact.Digest != info.Digest
info.Exist = true
return nil, nil
}
return quota.ResourceList{quota.ResourceCount: 1}, nil
}
// get artifact by manifest info
func getArtifact(info *util.MfInfo) (*models.Artifact, error) {
query := &models.ArtifactQuery{
PID: info.ProjectID,
Repo: info.Repository,
Tag: info.Tag,
}
artifacts, err := dao.ListArtifacts(query)
if err != nil || len(artifacts) == 0 {
return nil, err
}
return artifacts[0], nil
}

View File

@ -42,7 +42,7 @@ type Options struct {
Resources types.ResourceList
StatusCode int
OnResources func(*http.Request) types.ResourceList
OnResources func(*http.Request) (types.ResourceList, error)
OnFulfilled func(http.ResponseWriter, *http.Request) error
OnRejected func(http.ResponseWriter, *http.Request) error
OnFinally func(http.ResponseWriter, *http.Request) error
@ -114,8 +114,15 @@ func StatusCode(c int) Option {
}
// OnResources sets the interceptor on resources function
func OnResources(f func(*http.Request) types.ResourceList) Option {
func OnResources(f func(*http.Request) (types.ResourceList, error)) Option {
return func(o *Options) {
o.OnResources = f
}
}
// OnFulfilled set the success handler for interceptor
func OnFulfilled(f func(http.ResponseWriter, *http.Request) error) Option {
return func(o *Options) {
o.OnFulfilled = f
}
}

View File

@ -15,6 +15,7 @@
package quota
import (
"fmt"
"net/http"
"github.com/goharbor/harbor/src/common/utils/log"
@ -60,7 +61,11 @@ func (qi *quotaInterceptor) HandleRequest(req *http.Request) (err error) {
resources := opts.Resources
if len(resources) == 0 && opts.OnResources != nil {
resources = opts.OnResources(req)
resources, err = opts.OnResources(req)
if err != nil {
return fmt.Errorf("failed to compute the resources for quota, error: %v", err)
}
log.Debugf("Compute the resources for quota, got: %v", resources)
}
qi.resources = resources

View File

@ -113,6 +113,10 @@ type MfInfo struct {
// Exist is to index the existing of the manifest in DB. If false, it's an new image for uploading.
Exist bool
// ArtifactID is the ID of the artifact which query by repository and tag
ArtifactID int64
// DigestChanged true means the manifest exists but digest is changed.
// Probably it's a new image with existing repo/tag name or overwrite.
DigestChanged bool
@ -400,13 +404,46 @@ func GetRegRedisCon() (redis.Conn, error) {
)
}
// BlobInfoFromContext returns blob info from context
func BlobInfoFromContext(ctx context.Context) (*BlobInfo, bool) {
info, ok := ctx.Value(BBInfokKey).(*BlobInfo)
return info, ok
}
// ChartVersionInfoFromContext returns chart info from context
func ChartVersionInfoFromContext(ctx context.Context) (*ChartVersionInfo, bool) {
info, ok := ctx.Value(chartVersionInfoKey).(*ChartVersionInfo)
return info, ok
}
// ImageInfoFromContext returns image info from context
func ImageInfoFromContext(ctx context.Context) (*ImageInfo, bool) {
info, ok := ctx.Value(ImageInfoCtxKey).(*ImageInfo)
return info, ok
}
// ManifestInfoFromContext returns manifest info from context
func ManifestInfoFromContext(ctx context.Context) (*MfInfo, bool) {
info, ok := ctx.Value(MFInfokKey).(*MfInfo)
return info, ok
}
// NewBlobInfoContext returns context with blob info
func NewBlobInfoContext(ctx context.Context, info *BlobInfo) context.Context {
return context.WithValue(ctx, BBInfokKey, info)
}
// NewChartVersionInfoContext returns context with blob info
func NewChartVersionInfoContext(ctx context.Context, info *ChartVersionInfo) context.Context {
return context.WithValue(ctx, chartVersionInfoKey, info)
}
// NewImageInfoContext returns context with image info
func NewImageInfoContext(ctx context.Context, info *ImageInfo) context.Context {
return context.WithValue(ctx, ImageInfoCtxKey, info)
}
// NewManifestInfoContext returns context with manifest info
func NewManifestInfoContext(ctx context.Context, info *MfInfo) context.Context {
return context.WithValue(ctx, MFInfokKey, info)
}