Enhance the read-only API to avoid deleting operations during the job running (#17055)

Enhance the read-only API to avoid deleting operations during the job running

Fixes #16901

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
This commit is contained in:
Wenkai Yin(尹文开) 2022-06-29 08:00:17 +08:00 committed by GitHub
parent 77d28105bc
commit ab74e853ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 260 additions and 29 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/registry/interceptor/readonly"
"github.com/goharbor/harbor/src/registryctl/client"
)
@ -42,5 +43,5 @@ func initRegistryCtlClient() {
cfg := &client.Config{
Secret: os.Getenv("JOBSERVICE_SECRET"),
}
RegistryCtlClient = client.NewClient(registryCtlURL, cfg)
RegistryCtlClient = client.NewClient(registryCtlURL, cfg, readonly.NewInterceptor())
}

View File

@ -31,6 +31,7 @@ import (
"github.com/goharbor/harbor/src/pkg/artifactrash/model"
"github.com/goharbor/harbor/src/pkg/blob"
blobModels "github.com/goharbor/harbor/src/pkg/blob/models"
"github.com/goharbor/harbor/src/pkg/registry/interceptor/readonly"
"github.com/goharbor/harbor/src/registryctl/client"
)
@ -289,6 +290,10 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", idx, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
if err == readonly.Err {
return err
}
skippedBlob = true
continue
}
@ -296,7 +301,12 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger.Infof("[%d/%d] delete manifest from storage: %s", idx, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
return gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
// if the system is in read-only mode, return an Abort error to skip retrying
if err == readonly.Err {
return retry.Abort(err)
}
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", idx, total, err, sleep)
@ -308,6 +318,10 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", idx, total, art.RepositoryName, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
if err == readonly.Err {
return err
}
skippedBlob = true
continue
}
@ -333,7 +347,12 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger.Infof("[%d/%d] delete blob from storage: %s", idx, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
return gc.registryCtlClient.DeleteBlob(blob.Digest)
err := gc.registryCtlClient.DeleteBlob(blob.Digest)
// if the system is in read-only mode, return an Abort error to skip retrying
if err == readonly.Err {
return retry.Abort(err)
}
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", idx, total, err, sleep)
@ -345,6 +364,10 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
gc.logger.Errorf("[%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", idx, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
if err == readonly.Err {
return err
}
continue
}
sweepSize = sweepSize + blob.Size

View File

@ -8,6 +8,7 @@ import (
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/registry"
"github.com/goharbor/harbor/src/pkg/registry/interceptor/readonly"
"github.com/gomodule/redigo/redis"
)
@ -55,7 +56,12 @@ func v2DeleteManifest(logger logger.Interface, repository, digest string) error
return nil
}
return retry.Retry(func() error {
return registry.Cli.DeleteManifest(repository, digest)
err := registry.Cli.DeleteManifest(repository, digest)
// if the system is in read-only mode, return an Abort error to skip retrying
if err == readonly.Err {
return retry.Abort(err)
}
return err
}, retry.Callback(func(err error, sleep time.Duration) {
logger.Infof("failed to exec v2DeleteManifest, error: %v, will retry again after: %s", err, sleep)
}))

View File

@ -1,9 +1,12 @@
package logger
import (
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/jobservice/logger/backend"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/log"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/stretchr/testify/require"
"os"
"path"
@ -11,6 +14,7 @@ import (
)
func TestMain(m *testing.M) {
config.DefaultCfgManager = common.InMemoryCfgManager
// databases := []string{"mysql", "sqlite"}
databases := []string{"postgresql"}

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/goharbor/harbor/src/jobservice/logger/sweeper"
"github.com/goharbor/harbor/src/lib/config"
)
const (
@ -88,6 +89,15 @@ func (c *SweeperController) startSweeper(s sweeper.Interface) {
func (c *SweeperController) doSweeping(sid string, s sweeper.Interface) {
Debugf("Sweeper %s is under working", sid)
if err := config.Load(context.Background()); err != nil {
c.errChan <- fmt.Errorf("failed to load configurations: %v", err)
return
}
if config.ReadOnly(context.Background()) {
c.errChan <- fmt.Errorf("the system is in read only mode, cancel the sweeping")
return
}
count, err := s.Sweep()
if err != nil {
c.errChan <- fmt.Errorf("sweep logs error in %s at %d: %s", sid, time.Now().Unix(), err)

View File

@ -20,11 +20,14 @@ import (
"testing"
"time"
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/tests"
libcfg "github.com/goharbor/harbor/src/lib/config"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@ -42,6 +45,8 @@ type BootStrapTestSuite struct {
func (suite *BootStrapTestSuite) SetupSuite() {
dao.PrepareTestForPostgresSQL()
libcfg.DefaultCfgManager = common.InMemoryCfgManager
// Load configurations
err := config.DefaultConfig.Load("../config_test.yml", true)
require.NoError(suite.T(), err, "load configurations error: %s", err)

View File

@ -75,7 +75,9 @@ func (c *Cache) Fetch(ctx context.Context, key string, value interface{}) error
e := v.(*entry)
if e.isExpirated() {
err := c.Delete(ctx, c.opts.Key(key))
if err != nil {
log.Errorf("failed to delete cache in Fetch() method when it's expired, error: %v", err)
}
return cache.ErrNotFound
}

View File

@ -26,8 +26,6 @@ import (
"strings"
"time"
"github.com/goharbor/harbor/src/lib/config"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
_ "github.com/docker/distribution/manifest/ocischema" // register oci manifest unmarshal function
@ -35,8 +33,11 @@ import (
"github.com/docker/distribution/manifest/schema2"
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/registry/auth"
"github.com/goharbor/harbor/src/pkg/registry/interceptor"
"github.com/goharbor/harbor/src/pkg/registry/interceptor/readonly"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
)
@ -46,7 +47,7 @@ var (
Cli = func() Client {
url, _ := config.RegistryURL()
username, password := config.RegistryCredential()
return NewClient(url, username, password, false)
return NewClient(url, username, password, false, readonly.NewInterceptor())
}()
accepts = []string{
@ -101,10 +102,17 @@ type Client interface {
// NewClient creates a registry client with the default authorizer which determines the auth scheme
// of the registry automatically and calls the corresponding underlying authorizers(basic/bearer) to
// do the auth work. If a customized authorizer is needed, use "NewClientWithAuthorizer" instead
func NewClient(url, username, password string, insecure bool) Client {
func NewClient(url, username, password string, insecure bool, interceptors ...interceptor.Interceptor) Client {
authorizer := auth.NewAuthorizer(username, password, insecure)
return NewClientWithAuthorizer(url, authorizer, insecure, interceptors...)
}
// NewClientWithAuthorizer creates a registry client with the provided authorizer
func NewClientWithAuthorizer(url string, authorizer lib.Authorizer, insecure bool, interceptors ...interceptor.Interceptor) Client {
return &client{
url: url,
authorizer: auth.NewAuthorizer(username, password, insecure),
authorizer: authorizer,
interceptors: interceptors,
client: &http.Client{
Transport: commonhttp.GetHTTPTransport(commonhttp.WithInsecure(insecure)),
Timeout: 30 * time.Minute,
@ -112,20 +120,10 @@ func NewClient(url, username, password string, insecure bool) Client {
}
}
// NewClientWithAuthorizer creates a registry client with the provided authorizer
func NewClientWithAuthorizer(url string, authorizer lib.Authorizer, insecure bool) Client {
return &client{
url: url,
authorizer: authorizer,
client: &http.Client{
Transport: commonhttp.GetHTTPTransport(commonhttp.WithInsecure(insecure)),
},
}
}
type client struct {
url string
authorizer lib.Authorizer
interceptors []interceptor.Interceptor
client *http.Client
}
@ -510,6 +508,11 @@ func (c *client) Do(req *http.Request) (*http.Response, error) {
}
func (c *client) do(req *http.Request) (*http.Response, error) {
for _, interceptor := range c.interceptors {
if err := interceptor.Intercept(req); err != nil {
return nil, err
}
}
if c.authorizer != nil {
if err := c.authorizer.Modify(req); err != nil {
return nil, err

View File

@ -0,0 +1,22 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package interceptor
import "net/http"
// Interceptor intercepts the request
type Interceptor interface {
Intercept(req *http.Request) error
}

View File

@ -0,0 +1,88 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package readonly
import (
"context"
"errors"
"github.com/goharbor/harbor/src/lib/cache/memory"
"net/http"
"time"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/config"
itcp "github.com/goharbor/harbor/src/pkg/registry/interceptor"
)
// Err indicates the system is in read only mode
var (
Err = errors.New("the system is in read only mode, cancel the request")
key = "read-only"
)
// NewInterceptor creates an interceptor that intercepts any requests if the system is set to read-only
func NewInterceptor() itcp.Interceptor {
// ignore the error as the New return nil error
cache, _ := memory.New(cache.Options{
Expiration: 5 * time.Second,
Codec: cache.DefaultCodec(),
})
return &interceptor{cache: cache}
}
type interceptor struct {
cache cache.Cache
}
func (i *interceptor) Intercept(req *http.Request) error {
switch req.Method {
case http.MethodGet, http.MethodHead, http.MethodOptions:
return nil
}
isReadOnly, err := i.isReadOnly(req.Context())
if err != nil {
return err
}
if isReadOnly {
return Err
}
return nil
}
func (i *interceptor) isReadOnly(ctx context.Context) (bool, error) {
var (
isReadOnly bool
err error
)
// return the cached value if exists
if err = i.cache.Fetch(ctx, key, &isReadOnly); err == nil {
return isReadOnly, nil
}
if err != cache.ErrNotFound {
return false, err
}
// no cache, get the config via API
if err := config.Load(ctx); err != nil {
return false, err
}
isReadOnly = config.ReadOnly(ctx)
if err := i.cache.Save(ctx, key, &isReadOnly); err != nil {
return false, err
}
return isReadOnly, nil
}

View File

@ -0,0 +1,59 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package readonly
import (
"context"
"net/http"
"testing"
"time"
"github.com/goharbor/harbor/src/common"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/cache/memory"
"github.com/goharbor/harbor/src/lib/config"
_ "github.com/goharbor/harbor/src/pkg/config/inmemory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntercept(t *testing.T) {
cache, _ := memory.New(cache.Options{
Expiration: 1 * time.Nanosecond,
Codec: cache.DefaultCodec(),
})
interceptor := &interceptor{
cache: cache,
}
// method: GET
req, _ := http.NewRequest(http.MethodGet, "", nil)
assert.Nil(t, interceptor.Intercept(req))
config.DefaultCfgManager = common.InMemoryCfgManager
// method: DELETE
// read only enable: false
req, _ = http.NewRequest(http.MethodDelete, "", nil)
assert.Nil(t, interceptor.Intercept(req))
// method: DELETE
// read only enable: true
req, _ = http.NewRequest(http.MethodDelete, "", nil)
err := config.DefaultMgr().UpdateConfig(context.Background(), map[string]interface{}{common.ReadOnly: true})
require.Nil(t, err)
time.Sleep(1 * time.Nanosecond) // make sure the cached key is expired
assert.Equal(t, Err, interceptor.Intercept(req))
}

View File

@ -24,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/pkg/registry/interceptor"
)
// const definition
@ -44,6 +45,7 @@ type Client interface {
type client struct {
baseURL string
client *common_http.Client
interceptors []interceptor.Interceptor
}
// Config contains configurations needed for client
@ -52,13 +54,14 @@ type Config struct {
}
// NewClient return an instance of Registry client
func NewClient(baseURL string, cfg *Config) Client {
func NewClient(baseURL string, cfg *Config, interceptors ...interceptor.Interceptor) Client {
baseURL = strings.TrimRight(baseURL, "/")
if !strings.Contains(baseURL, "://") {
baseURL = "http://" + baseURL
}
client := &client{
baseURL: baseURL,
interceptors: interceptors,
}
if cfg != nil {
authorizer := auth.NewSecretAuthorizer(cfg.Secret)
@ -105,6 +108,11 @@ func (c *client) DeleteManifest(repository, reference string) (err error) {
}
func (c *client) do(req *http.Request) (*http.Response, error) {
for _, interceptor := range c.interceptors {
if err := interceptor.Intercept(req); err != nil {
return nil, err
}
}
req.Header.Set("User-Agent", UserAgent)
resp, err := c.client.Do(req)
if err != nil {