mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 10:15:35 +01:00
fix: reduce the high db cpu usage for tag retention (#17296)
1. Add two indexes to database migrations. 2. Skip refresh quota in middleware for requests from jobservice. 3. Refresh quota by self in the end of tag retention job. Closes: #14708 Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
parent
914d9a5526
commit
70a95a9696
@ -51,3 +51,7 @@ BEGIN
|
|||||||
END IF;
|
END IF;
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END $$;
|
END $$;
|
||||||
|
|
||||||
|
/* Add indexes to improve the performance of tag retention */
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_artifact_blob_digest_blob ON artifact_blob (digest_blob);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_artifact_digest_project_id ON artifact (digest,project_id);
|
@ -163,7 +163,7 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
|
|||||||
return quotas, nil
|
return quotas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
|
func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
|
||||||
f := func() error {
|
f := func() error {
|
||||||
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
|
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -202,6 +202,11 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
|
|||||||
log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err)
|
log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err)
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
// append for override default retry options
|
||||||
|
if len(retryOpts) > 0 {
|
||||||
|
options = append(options, retryOpts...)
|
||||||
|
}
|
||||||
|
|
||||||
return retry.Retry(f, options...)
|
return retry.Retry(f, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,7 +228,7 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
|
|||||||
return newUsed, err
|
return newUsed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation))
|
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
|
func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
package quota
|
package quota
|
||||||
|
|
||||||
|
import "github.com/goharbor/harbor/src/lib/retry"
|
||||||
|
|
||||||
// Option option for `Refresh` method of `Controller`
|
// Option option for `Refresh` method of `Controller`
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
@ -21,6 +23,8 @@ type Option func(*Options)
|
|||||||
type Options struct {
|
type Options struct {
|
||||||
IgnoreLimitation bool
|
IgnoreLimitation bool
|
||||||
WithReferenceObject bool
|
WithReferenceObject bool
|
||||||
|
// RetryOptions is the sets of options but for retry function.
|
||||||
|
RetryOptions []retry.Option
|
||||||
}
|
}
|
||||||
|
|
||||||
// IgnoreLimitation set IgnoreLimitation for the Options
|
// IgnoreLimitation set IgnoreLimitation for the Options
|
||||||
@ -37,6 +41,13 @@ func WithReferenceObject() func(*Options) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRetryOptions set RetryOptions to Options
|
||||||
|
func WithRetryOptions(retryOpts []retry.Option) func(*Options) {
|
||||||
|
return func(opts *Options) {
|
||||||
|
opts.RetryOptions = retryOpts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newOptions(options ...Option) *Options {
|
func newOptions(options ...Option) *Options {
|
||||||
opts := &Options{}
|
opts := &Options{}
|
||||||
for _, f := range options {
|
for _, f := range options {
|
||||||
|
@ -47,10 +47,10 @@ type ChartClient interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New returns an instance of the client which is a default implement for Client
|
// New returns an instance of the client which is a default implement for Client
|
||||||
func New(url string, httpclient *http.Client, authorizer modifier.Modifier) Client {
|
func New(url string, httpclient *http.Client, modifiers ...modifier.Modifier) Client {
|
||||||
return &client{
|
return &client{
|
||||||
url: url,
|
url: url,
|
||||||
httpclient: chttp.NewClient(httpclient, authorizer),
|
httpclient: chttp.NewClient(httpclient, modifiers...),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/http/modifier/auth"
|
"github.com/goharbor/harbor/src/common/http/modifier/auth"
|
||||||
"github.com/goharbor/harbor/src/jobservice/config"
|
"github.com/goharbor/harbor/src/jobservice/config"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/selector"
|
"github.com/goharbor/harbor/src/lib/selector"
|
||||||
"github.com/goharbor/harbor/src/pkg/clients/core"
|
"github.com/goharbor/harbor/src/pkg/clients/core"
|
||||||
)
|
)
|
||||||
@ -59,6 +60,17 @@ type Client interface {
|
|||||||
Delete(candidate *selector.Candidate) error
|
Delete(candidate *selector.Candidate) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type injectVendorType struct{}
|
||||||
|
|
||||||
|
// injectVendorType injects vendor type to request header.
|
||||||
|
func (i *injectVendorType) Modify(req *http.Request) error {
|
||||||
|
if req != nil {
|
||||||
|
req.Header.Set("VendorType", job.Retention)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewClient new a basic client
|
// NewClient new a basic client
|
||||||
func NewClient(client ...*http.Client) Client {
|
func NewClient(client ...*http.Client) Client {
|
||||||
var c *http.Client
|
var c *http.Client
|
||||||
@ -73,7 +85,7 @@ func NewClient(client ...*http.Client) Client {
|
|||||||
internalCoreURL := config.GetCoreURL()
|
internalCoreURL := config.GetCoreURL()
|
||||||
jobserviceSecret := config.GetAuthSecret()
|
jobserviceSecret := config.GetAuthSecret()
|
||||||
authorizer := auth.NewSecretAuthorizer(jobserviceSecret)
|
authorizer := auth.NewSecretAuthorizer(jobserviceSecret)
|
||||||
coreClient := core.New(internalCoreURL, c, authorizer)
|
coreClient := core.New(internalCoreURL, c, authorizer, &injectVendorType{})
|
||||||
|
|
||||||
return &basicClient{
|
return &basicClient{
|
||||||
internalCoreURL: internalCoreURL,
|
internalCoreURL: internalCoreURL,
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package dep
|
package dep
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -143,6 +144,17 @@ func (c *clientTestSuite) TestDelete() {
|
|||||||
require.NotNil(c.T(), err)
|
require.NotNil(c.T(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *clientTestSuite) TestInjectVendorType() {
|
||||||
|
injector := &injectVendorType{}
|
||||||
|
req, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
|
||||||
|
assert.NoError(c.T(), err)
|
||||||
|
assert.Equal(c.T(), "", req.Header.Get("VendorType"))
|
||||||
|
// after injecting should appear vendor type in header
|
||||||
|
err = injector.Modify(req)
|
||||||
|
assert.NoError(c.T(), err)
|
||||||
|
assert.Equal(c.T(), "RETENTION", req.Header.Get("VendorType"))
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientTestSuite(t *testing.T) {
|
func TestClientTestSuite(t *testing.T) {
|
||||||
suite.Run(t, new(clientTestSuite))
|
suite.Run(t, new(clientTestSuite))
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package retention
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
@ -23,9 +24,11 @@ import (
|
|||||||
|
|
||||||
"github.com/olekukonko/tablewriter"
|
"github.com/olekukonko/tablewriter"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/controller/quota"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
"github.com/goharbor/harbor/src/lib/retry"
|
||||||
"github.com/goharbor/harbor/src/lib/selector"
|
"github.com/goharbor/harbor/src/lib/selector"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||||
@ -116,6 +119,14 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
|
|||||||
return logError(myLogger, err)
|
return logError(myLogger, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// refreshQuota after the deleting candidates
|
||||||
|
if !isDryRun {
|
||||||
|
if err = refreshQuota(ctx.SystemContext(), results); err != nil {
|
||||||
|
// just log error if refresh quota error
|
||||||
|
myLogger.Errorf("Refresh quota error after deleting candidates, error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Log stage: results with table view
|
// Log stage: results with table view
|
||||||
logResults(myLogger, allCandidates, results)
|
logResults(myLogger, allCandidates, results)
|
||||||
|
|
||||||
@ -288,3 +299,29 @@ func getParamMeta(params job.Parameters) (*lwp.Metadata, error) {
|
|||||||
|
|
||||||
return meta, nil
|
return meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// refreshQuota refreshes quota by deleted results.
|
||||||
|
func refreshQuota(ctx context.Context, results []*selector.Result) error {
|
||||||
|
projects := make(map[int64]struct{})
|
||||||
|
for _, res := range results {
|
||||||
|
if res != nil && res.Target != nil {
|
||||||
|
projects[res.Target.NamespaceID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// refresh quota by project
|
||||||
|
for pid := range projects {
|
||||||
|
// retry options, enable backoff to reduce the db CPU resource usage.
|
||||||
|
opts := []retry.Option{
|
||||||
|
retry.Backoff(true),
|
||||||
|
// the interval value was determined based on experimental results as a way to achieve a faster total time with less cpu.
|
||||||
|
retry.InitialInterval(5 * time.Second),
|
||||||
|
retry.MaxInterval(10 * time.Second),
|
||||||
|
}
|
||||||
|
if err := quota.Ctl.Refresh(ctx, quota.ProjectReference, fmt.Sprintf("%d", pid), quota.WithRetryOptions(opts)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/pkg/quota"
|
"github.com/goharbor/harbor/src/pkg/quota"
|
||||||
"github.com/goharbor/harbor/src/pkg/quota/types"
|
"github.com/goharbor/harbor/src/pkg/quota/types"
|
||||||
"github.com/goharbor/harbor/src/server/middleware"
|
"github.com/goharbor/harbor/src/server/middleware"
|
||||||
|
"github.com/goharbor/harbor/src/server/middleware/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -219,6 +220,15 @@ func RefreshMiddleware(config RefreshConfig, skipers ...middleware.Skipper) func
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the request is from jobservice and is retention job, ignore refresh as tag retention
|
||||||
|
// delete artifact, and if the number of artifact is large that will
|
||||||
|
// cause huge db CPU resource for refresh quota, so ignore here and let
|
||||||
|
// task call the refresh on the own initiative.
|
||||||
|
if security.FromJobservice(r) && security.FromJobRetention(r) {
|
||||||
|
logger.Debugf("quota is skipped for %s %s, because this request is from jobservice retention job", reference, referenceID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err = quotaController.Refresh(r.Context(), reference, referenceID, cq.IgnoreLimitation(config.IgnoreLimitation)); err != nil {
|
if err = quotaController.Refresh(r.Context(), reference, referenceID, cq.IgnoreLimitation(config.IgnoreLimitation)); err != nil {
|
||||||
logger.Errorf("refresh quota for %s %s failed, error: %v", reference, referenceID, err)
|
logger.Errorf("refresh quota for %s %s failed, error: %v", reference, referenceID, err)
|
||||||
|
|
||||||
|
@ -3,6 +3,10 @@ package security
|
|||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
commonsecret "github.com/goharbor/harbor/src/common/secret"
|
||||||
|
"github.com/goharbor/harbor/src/common/security"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
)
|
)
|
||||||
|
|
||||||
func bearerToken(req *http.Request) string {
|
func bearerToken(req *http.Request) string {
|
||||||
@ -16,3 +20,22 @@ func bearerToken(req *http.Request) string {
|
|||||||
}
|
}
|
||||||
return strings.TrimSpace(token[1])
|
return strings.TrimSpace(token[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FromJobservice detects whether this request is from jobservice.
|
||||||
|
func FromJobservice(req *http.Request) bool {
|
||||||
|
sc, ok := security.FromContext(req.Context())
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// check whether the user is jobservice user
|
||||||
|
return sc.GetUsername() == commonsecret.JobserviceUser
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromJobRetention detects whether this request is from tag retention job.
|
||||||
|
func FromJobRetention(req *http.Request) bool {
|
||||||
|
if req != nil && req.Header != nil {
|
||||||
|
return req.Header.Get("VendorType") == job.Retention
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -4,6 +4,11 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common/models"
|
||||||
|
"github.com/goharbor/harbor/src/common/security"
|
||||||
|
"github.com/goharbor/harbor/src/common/security/local"
|
||||||
|
securitysecret "github.com/goharbor/harbor/src/common/security/secret"
|
||||||
|
"github.com/goharbor/harbor/src/lib/config"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -38,3 +43,34 @@ func TestBearerToken(t *testing.T) {
|
|||||||
assert.Equal(t, c.token, bearerToken(c.request))
|
assert.Equal(t, c.token, bearerToken(c.request))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFromJobservice(t *testing.T) {
|
||||||
|
// no security ctx should return false
|
||||||
|
req1, _ := http.NewRequest(http.MethodHead, "/api", nil)
|
||||||
|
assert.False(t, FromJobservice(req1))
|
||||||
|
// other username should return false
|
||||||
|
req2, _ := http.NewRequest(http.MethodHead, "/api", nil)
|
||||||
|
secCtx1 := local.NewSecurityContext(&models.User{UserID: 1, Username: "test-user"})
|
||||||
|
req2 = req2.WithContext(security.NewContext(req2.Context(), secCtx1))
|
||||||
|
assert.False(t, FromJobservice(req2))
|
||||||
|
// secret ctx from jobservice should return true
|
||||||
|
req3, _ := http.NewRequest(http.MethodHead, "/api", nil)
|
||||||
|
config.Init()
|
||||||
|
secCtx2 := securitysecret.NewSecurityContext(config.JobserviceSecret(), config.SecretStore)
|
||||||
|
req3 = req3.WithContext(security.NewContext(req3.Context(), secCtx2))
|
||||||
|
assert.True(t, FromJobservice(req3))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFromJobRetention(t *testing.T) {
|
||||||
|
// return false if req is nil
|
||||||
|
assert.False(t, FromJobRetention(nil))
|
||||||
|
// return false if req has no header
|
||||||
|
req1, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, FromJobRetention(req1))
|
||||||
|
// return true if header has retention vendor type
|
||||||
|
req2, err := http.NewRequest("GET", "http://localhost:8080/api", nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
req2.Header.Set("VendorType", "RETENTION")
|
||||||
|
assert.True(t, FromJobRetention(req2))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user