Merge pull request #16861 from chlins/feat/cache-layer-for-manifest

feat(manifest): introduce cache layer for manifest (#16459)
This commit is contained in:
Chenyu Zhang 2022-05-27 14:03:42 +08:00 committed by GitHub
commit b356d58253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 500 additions and 5 deletions

View File

@ -61,6 +61,11 @@ func (r *ResponseBuffer) Header() http.Header {
return r.header
}
// Buffer returns the buffer of ResponseBuffer
func (r *ResponseBuffer) Buffer() []byte {
return r.buffer.Bytes()
}
// Flush the status code, header and data into the underlying response writer
func (r *ResponseBuffer) Flush() (int, error) {
r.flushed = true

View File

@ -15,10 +15,11 @@
package lib
import (
"github.com/stretchr/testify/suite"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/suite"
)
type responseBufferTestSuite struct {
@ -61,6 +62,14 @@ func (r *responseBufferTestSuite) TestHeader() {
r.Equal("v", r.buffer.header.Get("k"))
r.Empty(r.recorder.Header())
}
func (r *responseBufferTestSuite) TestBuffer() {
body := []byte{'a'}
_, err := r.buffer.Write(body)
r.NoError(err)
r.Equal(body, r.buffer.Buffer())
}
func (r *responseBufferTestSuite) TestFlush() {
r.buffer.WriteHeader(http.StatusOK)
_, err := r.buffer.Write([]byte{'a'})

View File

@ -31,6 +31,8 @@ const (
ResourceTypeProjectMeta = "project_metadata"
// ResourceTypeRepository defines repository type.
ResourceTypeRepository = "repository"
// ResourceTypeManifest defines manifest type.
ResourceTypeManifest = "manifest"
)
// Manager is the interface for resource cache manager.

View File

@ -0,0 +1,136 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"time"
libcache "github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/cached"
)
var _ CachedManager = &manager{}
// ManifestManager is the manager for manifest.
type ManifestManager interface {
// Save manifest to cache.
Save(ctx context.Context, digest string, manifest []byte) error
// Get manifest from cache.
Get(ctx context.Context, digest string) ([]byte, error)
// Delete manifest from cache.
Delete(ctx context.Context, digest string) error
}
// CachedManager is the interface combines raw resource manager and cached manager for better extension.
type CachedManager interface {
// ManifestManager is the manager for manifest.
ManifestManager
// Manager is the common interface for resource cache.
cached.Manager
}
// manager is the cached manager implemented by redis.
type manager struct {
// client returns the redis cache client.
client func() libcache.Cache
// keyBuilder builds cache object key.
keyBuilder *cached.ObjectKey
// lifetime is the cache life time.
lifetime time.Duration
}
// NewManager returns the redis cache manager.
func NewManager() *manager {
return &manager{
client: func() libcache.Cache { return libcache.Default() },
keyBuilder: cached.NewObjectKey(cached.ResourceTypeManifest),
lifetime: time.Duration(config.CacheExpireHours()) * time.Hour,
}
}
func (m *manager) Save(ctx context.Context, digest string, manifest []byte) error {
key, err := m.keyBuilder.Format("digest", digest)
if err != nil {
return err
}
return m.client().Save(ctx, key, manifest, m.lifetime)
}
func (m *manager) Get(ctx context.Context, digest string) ([]byte, error) {
key, err := m.keyBuilder.Format("digest", digest)
if err != nil {
return nil, err
}
var manifest []byte
if err = m.client().Fetch(ctx, key, &manifest); err == nil {
return manifest, nil
}
return nil, err
}
func (m *manager) Delete(ctx context.Context, digest string) error {
key, err := m.keyBuilder.Format("digest", digest)
if err != nil {
return err
}
return retry.Retry(func() error { return m.client().Delete(ctx, key) })
}
func (m *manager) ResourceType(ctx context.Context) string {
return cached.ResourceTypeManifest
}
func (m *manager) CountCache(ctx context.Context) (int64, error) {
// prefix is resource type
keys, err := m.client().Keys(ctx, m.ResourceType(ctx))
if err != nil {
return 0, err
}
return int64(len(keys)), nil
}
func (m *manager) DeleteCache(ctx context.Context, key string) error {
return m.client().Delete(ctx, key)
}
func (m *manager) FlushAll(ctx context.Context) error {
// prefix is resource type
keys, err := m.client().Keys(ctx, m.ResourceType(ctx))
if err != nil {
return err
}
var errs errors.Errors
for _, key := range keys {
if err = m.client().Delete(ctx, key); err != nil {
errs = append(errs, err)
}
}
if errs.Len() > 0 {
return errs
}
return nil
}

View File

@ -0,0 +1,93 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"fmt"
"testing"
"github.com/goharbor/harbor/src/lib/cache"
testcache "github.com/goharbor/harbor/src/testing/lib/cache"
"github.com/goharbor/harbor/src/testing/mock"
"github.com/stretchr/testify/suite"
)
type managerTestSuite struct {
suite.Suite
cachedManager CachedManager
cache *testcache.Cache
ctx context.Context
digest string
manifestKey string
}
func (m *managerTestSuite) SetupTest() {
m.cache = &testcache.Cache{}
m.cachedManager = NewManager()
m.cachedManager.(*manager).client = func() cache.Cache { return m.cache }
m.ctx = context.TODO()
m.digest = "sha256:52f431d980baa76878329b68ddb69cb124c25efa6e206d8b0bd797a828f0528e"
m.manifestKey = fmt.Sprintf("manifest:digest:%s", m.digest)
}
func (m *managerTestSuite) TestSave() {
m.cache.On("Save", mock.Anything, m.manifestKey, mock.Anything, mock.Anything).Return(nil).Once()
err := m.cachedManager.Save(m.ctx, m.digest, []byte{})
m.NoError(err)
}
func (m *managerTestSuite) TestGet() {
m.cache.On("Fetch", mock.Anything, m.manifestKey, mock.Anything).Return(nil).Once()
_, err := m.cachedManager.Get(m.ctx, m.digest)
m.NoError(err)
}
func (m *managerTestSuite) TestDelete() {
m.cache.On("Delete", mock.Anything, m.manifestKey).Return(nil).Once()
err := m.cachedManager.Delete(m.ctx, m.digest)
m.NoError(err)
}
func (m *managerTestSuite) TestResourceType() {
t := m.cachedManager.ResourceType(m.ctx)
m.Equal("manifest", t)
}
func (m *managerTestSuite) TestCountCache() {
m.cache.On("Keys", mock.Anything, mock.Anything).Return([]string{"1"}, nil).Once()
c, err := m.cachedManager.CountCache(m.ctx)
m.NoError(err)
m.Equal(int64(1), c)
}
func (m *managerTestSuite) TestDeleteCache() {
m.cache.On("Delete", mock.Anything, mock.Anything).Return(nil).Once()
err := m.cachedManager.DeleteCache(m.ctx, "key")
m.NoError(err)
}
func (m *managerTestSuite) TestFlushAll() {
m.cache.On("Keys", mock.Anything, mock.Anything).Return([]string{"1"}, nil).Once()
m.cache.On("Delete", mock.Anything, mock.Anything).Return(nil).Once()
err := m.cachedManager.FlushAll(m.ctx)
m.NoError(err)
}
func TestManager(t *testing.T) {
suite.Run(t, &managerTestSuite{})
}

View File

@ -18,6 +18,7 @@ import (
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/pkg/artifact"
cachedArtifact "github.com/goharbor/harbor/src/pkg/cached/artifact/redis"
cachedManifest "github.com/goharbor/harbor/src/pkg/cached/manifest/redis"
cachedProject "github.com/goharbor/harbor/src/pkg/cached/project/redis"
cachedProjectMeta "github.com/goharbor/harbor/src/pkg/cached/project_metadata/redis"
cachedRepo "github.com/goharbor/harbor/src/pkg/cached/repository/redis"
@ -36,6 +37,8 @@ var (
ProjectMetaMgr metadata.Manager
// RepositoryMgr is the manager for repository.
RepositoryMgr repository.Manager
// ManifestMgr is the manager for manifest.
ManifestMgr cachedManifest.CachedManager
)
// init initialize mananger for resources
@ -45,6 +48,7 @@ func init() {
initProjectMgr(cacheEnabled)
initProjectMetaMgr(cacheEnabled)
initRepositoryMgr(cacheEnabled)
initManifestManager(cacheEnabled)
}
func initArtifactMgr(cacheEnabled bool) {
@ -84,3 +88,7 @@ func initRepositoryMgr(cacheEnabled bool) {
RepositoryMgr = repoMgr
}
}
func initManifestManager(cacheEnabled bool) {
ManifestMgr = cachedManifest.NewManager()
}

View File

@ -15,6 +15,7 @@
package registry
import (
"fmt"
"net/http"
"strings"
@ -23,15 +24,27 @@ import (
"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/repository"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
lib_http "github.com/goharbor/harbor/src/lib/http"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg"
"github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/registry"
"github.com/goharbor/harbor/src/server/router"
"github.com/opencontainers/go-digest"
)
// https://github.com/distribution/distribution/blob/c202b9b0d7b79a67337dec8e1f1bafb1c7095315/registry/handlers/manifests.go#L280
func etagMatch(r *http.Request, etag string) bool {
for _, headerVal := range r.Header["If-None-Match"] {
if headerVal == etag || headerVal == fmt.Sprintf(`"%s"`, etag) { // allow quoted or unquoted
return true
}
}
return false
}
// make sure the artifact exist before proxying the request to the backend registry
func getManifest(w http.ResponseWriter, req *http.Request) {
repository := router.Param(req.Context(), ":splat")
@ -49,11 +62,66 @@ func getManifest(w http.ResponseWriter, req *http.Request) {
req.URL.RawPath = req.URL.EscapedPath()
}
recorder := lib.NewResponseRecorder(w)
proxy.ServeHTTP(recorder, req)
// if etag match, we can earlier return and no need to proxy to distribution
// as we have stored digest in database.
// https://github.com/distribution/distribution/blob/c202b9b0d7b79a67337dec8e1f1bafb1c7095315/registry/handlers/manifests.go#L135
if etagMatch(req, art.Digest) {
w.WriteHeader(http.StatusNotModified)
return
}
buffer := lib.NewResponseBuffer(w)
// whether get manifest from cache
fromCache := false
// whether need to write-back cache
wbCache := false
// check cache
if config.CacheEnabled() {
manifest, err := pkg.ManifestMgr.Get(req.Context(), art.Digest)
if err == nil {
fromCache = true
// write header
buffer.Header().Set("Content-Length", fmt.Sprintf("%d", len(manifest)))
buffer.Header().Set("Content-Type", art.ManifestMediaType)
buffer.Header().Set("Docker-Distribution-Api-Version", "registry/2.0")
buffer.Header().Set("Docker-Content-Digest", art.Digest)
buffer.Header().Set("Etag", fmt.Sprintf(`"%s"`, art.Digest))
buffer.WriteHeader(http.StatusOK)
// write data from cache, no need to write body if is head request
if req.Method == http.MethodGet {
buffer.Write(manifest)
}
} else {
log.Warningf("failed to get manifest from cache, error: %v", err)
// only write cache when request is GET because HEAD request resp
// body is empty.
if req.Method == http.MethodGet {
wbCache = true
}
}
}
// proxy to registry if not from cache
if !fromCache {
proxy.ServeHTTP(buffer, req)
}
// flush data
if _, err = buffer.Flush(); err != nil {
log.Errorf("failed to flush: %v", err)
return
}
// return if not success
if !buffer.Success() {
return
}
// write back manifest to cache if needed
if wbCache {
if err = pkg.ManifestMgr.Save(req.Context(), art.Digest, buffer.Buffer()); err != nil {
log.Warningf("failed to save manifest %s to cache, error: %v", art.Digest, err)
}
}
// fire event, ignore the HEAD request and pulling request from replication service
if !recorder.Success() || req.Method == http.MethodHead ||
req.UserAgent() == registry.UserAgent {
if req.Method == http.MethodHead || req.UserAgent() == registry.UserAgent {
return
}
@ -90,6 +158,13 @@ func deleteManifest(w http.ResponseWriter, req *http.Request) {
return
}
w.WriteHeader(http.StatusAccepted)
// clean cache if enabled
if config.CacheEnabled() {
if err = pkg.ManifestMgr.Delete(req.Context(), art.Digest); err != nil {
log.Errorf("failed to delete manifest cache: %v", err)
}
}
}
func putManifest(w http.ResponseWriter, req *http.Request) {

View File

@ -21,6 +21,8 @@ import (
"testing"
beegocontext "github.com/beego/beego/context"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/pkg"
"github.com/goharbor/harbor/src/server/router"
"github.com/goharbor/harbor/src/controller/artifact"
@ -29,6 +31,7 @@ import (
arttesting "github.com/goharbor/harbor/src/testing/controller/artifact"
repotesting "github.com/goharbor/harbor/src/testing/controller/repository"
"github.com/goharbor/harbor/src/testing/mock"
testmanifest "github.com/goharbor/harbor/src/testing/pkg/cached/manifest/redis"
"github.com/stretchr/testify/suite"
)
@ -39,12 +42,14 @@ type manifestTestSuite struct {
originalProxy http.Handler
repoCtl *repotesting.Controller
artCtl *arttesting.Controller
cachedMgr *testmanifest.CachedManager
}
func (m *manifestTestSuite) SetupSuite() {
m.originalRepoCtl = repository.Ctl
m.originalArtCtl = artifact.Ctl
m.originalProxy = proxy
m.cachedMgr = &testmanifest.CachedManager{}
}
func (m *manifestTestSuite) SetupTest() {
@ -52,6 +57,7 @@ func (m *manifestTestSuite) SetupTest() {
m.artCtl = &arttesting.Controller{}
repository.Ctl = m.repoCtl
artifact.Ctl = m.artCtl
pkg.ManifestMgr = m.cachedMgr
}
func (m *manifestTestSuite) TearDownTest() {
@ -94,6 +100,24 @@ func (m *manifestTestSuite) TestGetManifest() {
mock.OnAnything(m.artCtl, "GetByReference").Return(art, nil)
getManifest(w, req)
m.Equal(http.StatusOK, w.Code)
// if etag match, return 304
req = httptest.NewRequest(http.MethodGet, "/v2/library/hello-world/manifests/", nil)
w = &httptest.ResponseRecorder{}
req.Header.Set("If-None-Match", "sha256:418fb88ec412e340cdbef913b8ca1bbe8f9e8dc705f9617414c1f2c8db980180")
getManifest(w, req)
m.Equal(http.StatusNotModified, w.Code)
// should get from cache if enable cache.
config.DefaultMgr().Set(req.Context(), "cache_enabled", true)
defer config.DefaultMgr().Set(req.Context(), "cache_enabled", false)
req = httptest.NewRequest(http.MethodGet, "/v2/library/hello-world/manifests/", nil)
w = &httptest.ResponseRecorder{}
mock.OnAnything(m.cachedMgr, "Get").Return([]byte{}, nil)
getManifest(w, req)
m.Equal(http.StatusOK, w.Code)
m.Equal("sha256:418fb88ec412e340cdbef913b8ca1bbe8f9e8dc705f9617414c1f2c8db980180", w.Header().Get("Docker-Content-Digest"))
m.cachedMgr.AssertCalled(m.T(), "Get", mock.Anything, mock.Anything)
}
func (m *manifestTestSuite) TestDeleteManifest() {
@ -129,6 +153,20 @@ func (m *manifestTestSuite) TestDeleteManifest() {
mock.OnAnything(m.artCtl, "Delete").Return(nil)
deleteManifest(w, req)
m.Equal(http.StatusAccepted, w.Code)
// should get from cache if enable cache.
config.DefaultMgr().Set(req.Context(), "cache_enabled", true)
defer config.DefaultMgr().Set(req.Context(), "cache_enabled", false)
// should delete cache when manifest be deleted.
req = httptest.NewRequest(http.MethodDelete, "/v2/library/hello-world/manifests/sha256:418fb88ec412e340cdbef913b8ca1bbe8f9e8dc705f9617414c1f2c8db980180", nil)
input = &beegocontext.BeegoInput{}
input.SetParam(":reference", "sha256:418fb88ec412e340cdbef913b8ca1bbe8f9e8dc705f9617414c1f2c8db980180")
*req = *(req.WithContext(context.WithValue(req.Context(), router.ContextKeyInput{}, input)))
w = &httptest.ResponseRecorder{}
mock.OnAnything(m.cachedMgr, "Delete").Return(nil)
deleteManifest(w, req)
m.Equal(http.StatusAccepted, w.Code)
m.cachedMgr.AssertCalled(m.T(), "Delete", mock.Anything, mock.Anything)
}
func (m *manifestTestSuite) TestPutManifest() {

View File

@ -0,0 +1,128 @@
// Code generated by mockery v2.1.0. DO NOT EDIT.
package redis
import (
context "context"
mock "github.com/stretchr/testify/mock"
)
// CachedManager is an autogenerated mock type for the CachedManager type
type CachedManager struct {
mock.Mock
}
// CountCache provides a mock function with given fields: ctx
func (_m *CachedManager) CountCache(ctx context.Context) (int64, error) {
ret := _m.Called(ctx)
var r0 int64
if rf, ok := ret.Get(0).(func(context.Context) int64); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(int64)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Delete provides a mock function with given fields: ctx, digest
func (_m *CachedManager) Delete(ctx context.Context, digest string) error {
ret := _m.Called(ctx, digest)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, digest)
} else {
r0 = ret.Error(0)
}
return r0
}
// DeleteCache provides a mock function with given fields: ctx, key
func (_m *CachedManager) DeleteCache(ctx context.Context, key string) error {
ret := _m.Called(ctx, key)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, key)
} else {
r0 = ret.Error(0)
}
return r0
}
// FlushAll provides a mock function with given fields: ctx
func (_m *CachedManager) FlushAll(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// Get provides a mock function with given fields: ctx, digest
func (_m *CachedManager) Get(ctx context.Context, digest string) ([]byte, error) {
ret := _m.Called(ctx, digest)
var r0 []byte
if rf, ok := ret.Get(0).(func(context.Context, string) []byte); ok {
r0 = rf(ctx, digest)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, digest)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ResourceType provides a mock function with given fields: ctx
func (_m *CachedManager) ResourceType(ctx context.Context) string {
ret := _m.Called(ctx)
var r0 string
if rf, ok := ret.Get(0).(func(context.Context) string); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Save provides a mock function with given fields: ctx, digest, manifest
func (_m *CachedManager) Save(ctx context.Context, digest string, manifest []byte) error {
ret := _m.Called(ctx, digest, manifest)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []byte) error); ok {
r0 = rf(ctx, digest, manifest)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -61,3 +61,4 @@ package pkg
//go:generate mockery --case snake --dir ../../pkg/systemartifact --name Manager --output ./systemartifact --outpkg systemartifact
//go:generate mockery --case snake --dir ../../pkg/systemartifact/ --name Selector --output ./systemartifact/cleanup --outpkg cleanup
//go:generate mockery --case snake --dir ../../pkg/systemartifact/dao --name DAO --output ./systemartifact/dao --outpkg dao
//go:generate mockery --case snake --dir ../../pkg/cached/manifest/redis --name CachedManager --output ./cached/manifest/redis --outpkg redis