mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-25 00:02:48 +01:00
refactor: implement a lock free quota (#15399)
Signed-off-by: He Weiwei <hweiwei@vmware.com>
This commit is contained in:
parent
51480b7ec4
commit
ecc1a04c92
@ -2,3 +2,7 @@
|
||||
DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true );
|
||||
|
||||
ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS speed_kb int;
|
||||
|
||||
/* add version fields for lock free quota */
|
||||
ALTER TABLE quota ADD COLUMN IF NOT EXISTS version bigint DEFAULT 0;
|
||||
ALTER TABLE quota_usage ADD COLUMN IF NOT EXISTS version bigint DEFAULT 0;
|
||||
|
@ -23,19 +23,17 @@ import (
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
redislib "github.com/goharbor/harbor/src/lib/redis"
|
||||
"github.com/goharbor/harbor/src/lib/retry"
|
||||
"github.com/goharbor/harbor/src/pkg/quota"
|
||||
"github.com/goharbor/harbor/src/pkg/quota/driver"
|
||||
"github.com/goharbor/harbor/src/pkg/quota/types"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
|
||||
// quota driver
|
||||
_ "github.com/goharbor/harbor/src/controller/quota/driver"
|
||||
)
|
||||
|
||||
var (
|
||||
// expire reserved resources when no actions on the key of the reserved resources in redis during 1 hour
|
||||
defaultReservedExpiration = time.Hour
|
||||
defaultRetryTimeout = time.Minute * 5
|
||||
)
|
||||
|
||||
var (
|
||||
@ -82,8 +80,7 @@ type Controller interface {
|
||||
// NewController creates an instance of the default quota controller
|
||||
func NewController() Controller {
|
||||
return &controller{
|
||||
reservedExpiration: defaultReservedExpiration,
|
||||
quotaMgr: quota.Mgr,
|
||||
quotaMgr: quota.Mgr,
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,107 +164,46 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
func (c *controller) getReservedResources(ctx context.Context, reference, referenceID string) (types.ResourceList, error) {
|
||||
conn := redislib.DefaultPool().Get()
|
||||
defer conn.Close()
|
||||
|
||||
key := reservedResourcesKey(reference, referenceID)
|
||||
|
||||
str, err := redis.String(conn.Do("GET", key))
|
||||
if err == redis.ErrNil {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return types.NewResourceList(str)
|
||||
}
|
||||
|
||||
func (c *controller) setReservedResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error {
|
||||
conn := redislib.DefaultPool().Get()
|
||||
defer conn.Close()
|
||||
|
||||
key := reservedResourcesKey(reference, referenceID)
|
||||
|
||||
reply, err := redis.String(conn.Do("SET", key, resources.String(), "EX", int64(c.reservedExpiration/time.Second)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reply != "OK" {
|
||||
return fmt.Errorf("bad reply value")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) reserveResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error {
|
||||
reserve := func(ctx context.Context) error {
|
||||
q, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID)
|
||||
func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
|
||||
f := func() error {
|
||||
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
|
||||
if err != nil {
|
||||
return err
|
||||
return retry.Abort(err)
|
||||
}
|
||||
|
||||
hardLimits, err := q.GetHard()
|
||||
if err != nil {
|
||||
return err
|
||||
return retry.Abort(err)
|
||||
}
|
||||
|
||||
used, err := q.GetUsed()
|
||||
if err != nil {
|
||||
return err
|
||||
return retry.Abort(err)
|
||||
}
|
||||
|
||||
reserved, err := c.getReservedResources(ctx, reference, referenceID)
|
||||
newUsed, err := op(hardLimits, used)
|
||||
if err != nil {
|
||||
log.G(ctx).Errorf("failed to get reserved resources for %s %s, error: %v", reference, referenceID, err)
|
||||
return err
|
||||
return retry.Abort(err)
|
||||
}
|
||||
|
||||
newReserved := types.Add(reserved, resources)
|
||||
q.SetUsed(newUsed)
|
||||
|
||||
if err := quota.IsSafe(hardLimits, types.Add(used, reserved), types.Add(used, newReserved), false); err != nil {
|
||||
return errors.DeniedError(err).WithMessage("Quota exceeded when processing the request of %v", err)
|
||||
err = c.quotaMgr.Update(ctx, q)
|
||||
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
|
||||
return retry.Abort(err)
|
||||
}
|
||||
|
||||
if err := c.setReservedResources(ctx, reference, referenceID, newReserved); err != nil {
|
||||
log.G(ctx).Errorf("failed to set reserved resources for %s %s, error: %v", reference, referenceID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
return orm.WithTransaction(reserve)(ctx)
|
||||
}
|
||||
|
||||
func (c *controller) unreserveResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error {
|
||||
unreserve := func(ctx context.Context) error {
|
||||
if _, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reserved, err := c.getReservedResources(ctx, reference, referenceID)
|
||||
if err != nil {
|
||||
log.G(ctx).Errorf("failed to get reserved resources for %s %s, error: %v", reference, referenceID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
newReserved := types.Subtract(reserved, resources)
|
||||
// ensure that new used is never negative
|
||||
if negativeUsed := types.IsNegative(newReserved); len(negativeUsed) > 0 {
|
||||
return fmt.Errorf("reserved resources is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed))
|
||||
}
|
||||
|
||||
if err := c.setReservedResources(ctx, reference, referenceID, newReserved); err != nil {
|
||||
log.G(ctx).Errorf("failed to set reserved resources for %s %s, error: %v", reference, referenceID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
options := []retry.Option{
|
||||
retry.Timeout(defaultRetryTimeout),
|
||||
retry.Backoff(false),
|
||||
retry.Callback(func(err error, sleep time.Duration) {
|
||||
log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err)
|
||||
}),
|
||||
}
|
||||
|
||||
return orm.WithTransaction(unreserve)(ctx)
|
||||
return retry.Retry(f, options...)
|
||||
}
|
||||
|
||||
func (c *controller) Refresh(ctx context.Context, reference, referenceID string, options ...Option) error {
|
||||
@ -278,44 +214,17 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
|
||||
|
||||
opts := newOptions(options...)
|
||||
|
||||
refresh := func(ctx context.Context) error {
|
||||
q, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hardLimits, err := q.GetHard()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
used, err := q.GetUsed()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
calculateUsage := func() (types.ResourceList, error) {
|
||||
newUsed, err := driver.CalculateUsage(ctx, referenceID)
|
||||
if err != nil {
|
||||
log.G(ctx).Errorf("failed to calculate quota usage for %s %s, error: %v", reference, referenceID, err)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ensure that new used is never negative
|
||||
if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 {
|
||||
return fmt.Errorf("quota usage is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed))
|
||||
}
|
||||
|
||||
if err := quota.IsSafe(hardLimits, used, newUsed, opts.IgnoreLimitation); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
q.SetUsed(newUsed)
|
||||
q.UpdateTime = time.Now()
|
||||
|
||||
return c.quotaMgr.Update(ctx, q)
|
||||
return newUsed, err
|
||||
}
|
||||
|
||||
return orm.WithTransaction(refresh)(ctx)
|
||||
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation))
|
||||
}
|
||||
|
||||
func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
|
||||
@ -323,28 +232,26 @@ func (c *controller) Request(ctx context.Context, reference, referenceID string,
|
||||
return f()
|
||||
}
|
||||
|
||||
if err := c.reserveResources(ctx, reference, referenceID, resources); err != nil {
|
||||
if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources)); err != nil {
|
||||
log.G(ctx).Errorf("reserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := c.unreserveResources(ctx, reference, referenceID, resources); err != nil {
|
||||
// ignore this error because reserved resources will be expired
|
||||
// when no actions on the key of the reserved resources in redis during sometimes
|
||||
log.G(ctx).Warningf("unreserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err)
|
||||
err := f()
|
||||
|
||||
if err != nil {
|
||||
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources)); er != nil {
|
||||
// ignore this error, the quota usage will be correct when users do operations which will call refresh quota
|
||||
log.G(ctx).Warningf("rollback resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, er)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := f(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.Refresh(ctx, reference, referenceID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *controller) Update(ctx context.Context, u *quota.Quota) error {
|
||||
update := func(ctx context.Context) error {
|
||||
q, err := c.quotaMgr.GetByRefForUpdate(ctx, u.Reference, u.ReferenceID)
|
||||
f := func() error {
|
||||
q, err := c.quotaMgr.GetByRef(ctx, u.Reference, u.ReferenceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -361,11 +268,15 @@ func (c *controller) Update(ctx context.Context, u *quota.Quota) error {
|
||||
}
|
||||
}
|
||||
|
||||
q.UpdateTime = time.Now()
|
||||
return c.quotaMgr.Update(ctx, q)
|
||||
}
|
||||
|
||||
return orm.WithTransaction(update)(ctx)
|
||||
options := []retry.Option{
|
||||
retry.Timeout(defaultRetryTimeout),
|
||||
retry.Backoff(false),
|
||||
}
|
||||
|
||||
return retry.Retry(f, options...)
|
||||
}
|
||||
|
||||
// Driver returns quota driver for the reference
|
||||
@ -388,6 +299,46 @@ func Validate(ctx context.Context, reference string, hardLimits types.ResourceLi
|
||||
return d.Validate(hardLimits)
|
||||
}
|
||||
|
||||
func reservedResourcesKey(reference, referenceID string) string {
|
||||
return fmt.Sprintf("quota:%s:%s:reserved", reference, referenceID)
|
||||
func reserveResources(resources types.ResourceList) func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
return func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
newUsed := types.Add(used, resources)
|
||||
|
||||
if err := quota.IsSafe(hardLimits, used, newUsed, false); err != nil {
|
||||
return nil, errors.DeniedError(err).WithMessage("Quota exceeded when processing the request of %v", err)
|
||||
}
|
||||
|
||||
return newUsed, nil
|
||||
}
|
||||
}
|
||||
|
||||
func rollbackResources(resources types.ResourceList) func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
return func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
newUsed := types.Subtract(used, resources)
|
||||
// ensure that new used is never negative
|
||||
if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 {
|
||||
return nil, fmt.Errorf("resources is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed))
|
||||
}
|
||||
|
||||
return newUsed, nil
|
||||
}
|
||||
}
|
||||
|
||||
func refreshResources(calculateUsage func() (types.ResourceList, error), ignoreLimitation bool) func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
return func(hardLimits, used types.ResourceList) (types.ResourceList, error) {
|
||||
newUsed, err := calculateUsage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ensure that new used is never negative
|
||||
if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 {
|
||||
return nil, fmt.Errorf("quota usage is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed))
|
||||
}
|
||||
|
||||
if err := quota.IsSafe(hardLimits, used, newUsed, ignoreLimitation); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newUsed, nil
|
||||
}
|
||||
}
|
||||
|
@ -17,9 +17,7 @@ package quota
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/pkg/quota"
|
||||
@ -51,81 +49,20 @@ func (suite *ControllerTestSuite) SetupTest() {
|
||||
driver.Register(suite.reference, suite.driver)
|
||||
|
||||
suite.quotaMgr = "atesting.Manager{}
|
||||
suite.ctl = &controller{quotaMgr: suite.quotaMgr, reservedExpiration: defaultReservedExpiration}
|
||||
suite.ctl = &controller{quotaMgr: suite.quotaMgr}
|
||||
|
||||
hardLimits := types.ResourceList{types.ResourceStorage: 100}
|
||||
suite.quota = "a.Quota{Hard: hardLimits.String(), Used: types.Zero(hardLimits).String()}
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) PrepareForUpdate(q *quota.Quota, newUsage interface{}) {
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(q, nil)
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRef").Return(q, nil)
|
||||
|
||||
mock.OnAnything(suite.driver, "CalculateUsage").Return(newUsage, nil)
|
||||
|
||||
mock.OnAnything(suite.quotaMgr, "Update").Return(nil)
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) TestGetReservedResources() {
|
||||
reservedExpiration := time.Second * 3
|
||||
ctl := &controller{reservedExpiration: reservedExpiration}
|
||||
|
||||
reference, referenceID := "reference", uuid.New().String()
|
||||
|
||||
{
|
||||
resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID)
|
||||
suite.Nil(err)
|
||||
suite.Len(resources, 0)
|
||||
}
|
||||
|
||||
suite.Nil(ctl.setReservedResources(context.TODO(), reference, referenceID, types.ResourceList{types.ResourceStorage: 100}))
|
||||
|
||||
{
|
||||
resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID)
|
||||
suite.Nil(err)
|
||||
suite.Len(resources, 1)
|
||||
}
|
||||
|
||||
time.Sleep(reservedExpiration * 2)
|
||||
|
||||
{
|
||||
resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID)
|
||||
suite.Nil(err)
|
||||
suite.Len(resources, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) TestReserveResources() {
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(suite.quota, nil)
|
||||
|
||||
ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{})
|
||||
referenceID := uuid.New().String()
|
||||
resources := types.ResourceList{types.ResourceStorage: 100}
|
||||
|
||||
ctl := suite.ctl.(*controller)
|
||||
|
||||
suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources))
|
||||
|
||||
suite.Error(ctl.reserveResources(ctx, suite.reference, referenceID, resources))
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) TestUnreserveResources() {
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(suite.quota, nil)
|
||||
|
||||
ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{})
|
||||
referenceID := uuid.New().String()
|
||||
resources := types.ResourceList{types.ResourceStorage: 100}
|
||||
|
||||
ctl := suite.ctl.(*controller)
|
||||
|
||||
suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources))
|
||||
|
||||
suite.Error(ctl.reserveResources(ctx, suite.reference, referenceID, resources))
|
||||
|
||||
suite.Nil(ctl.unreserveResources(ctx, suite.reference, referenceID, resources))
|
||||
|
||||
suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources))
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) TestRefresh() {
|
||||
suite.PrepareForUpdate(suite.quota, types.ResourceList{types.ResourceStorage: 0})
|
||||
|
||||
@ -174,6 +111,7 @@ func (suite *ControllerTestSuite) TestNoResourcesRequest() {
|
||||
|
||||
suite.Nil(suite.ctl.Request(ctx, suite.reference, referenceID, nil, func() error { return nil }))
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) TestRequest() {
|
||||
suite.PrepareForUpdate(suite.quota, nil)
|
||||
|
||||
@ -207,25 +145,3 @@ func (suite *ControllerTestSuite) TestRequestFunctionFailed() {
|
||||
func TestControllerTestSuite(t *testing.T) {
|
||||
suite.Run(t, &ControllerTestSuite{})
|
||||
}
|
||||
|
||||
func BenchmarkGetReservedResources(b *testing.B) {
|
||||
ctl := &controller{reservedExpiration: defaultReservedExpiration}
|
||||
|
||||
ctx := context.TODO()
|
||||
reference, referenceID := "reference", uuid.New().String()
|
||||
ctl.setReservedResources(ctx, reference, referenceID, types.ResourceList{types.ResourceStorage: 100})
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
ctl.getReservedResources(ctx, reference, referenceID)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSetReservedResources(b *testing.B) {
|
||||
ctl := &controller{reservedExpiration: defaultReservedExpiration}
|
||||
|
||||
ctx := context.TODO()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s := strconv.Itoa(i)
|
||||
ctl.setReservedResources(ctx, "reference"+s, s, types.ResourceList{types.ResourceStorage: 100})
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
proModels "github.com/goharbor/harbor/src/pkg/project/models"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
@ -24,6 +23,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/controller/project"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/project/models"
|
||||
"github.com/goharbor/harbor/src/pkg/quota"
|
||||
"github.com/goharbor/harbor/src/pkg/quota/driver"
|
||||
"github.com/goharbor/harbor/src/pkg/quota/types"
|
||||
@ -76,21 +76,21 @@ func (suite *RefreshForProjectsTestSuite) TestRefreshForProjects() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
startProjectID := rand.Int63()
|
||||
var firstPageProjects, secondPageProjects []*proModels.Project
|
||||
var firstPageProjects, secondPageProjects []*models.Project
|
||||
for i := 0; i < 50; i++ {
|
||||
firstPageProjects = append(firstPageProjects, &proModels.Project{
|
||||
firstPageProjects = append(firstPageProjects, &models.Project{
|
||||
ProjectID: startProjectID + int64(i),
|
||||
})
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
secondPageProjects = append(secondPageProjects, &proModels.Project{
|
||||
secondPageProjects = append(secondPageProjects, &models.Project{
|
||||
ProjectID: startProjectID + 50 + int64(i),
|
||||
})
|
||||
}
|
||||
|
||||
page := 1
|
||||
mock.OnAnything(suite.projectCtl, "List").Return(func(context.Context, *q.Query, ...project.Option) []*proModels.Project {
|
||||
mock.OnAnything(suite.projectCtl, "List").Return(func(context.Context, *q.Query, ...project.Option) []*models.Project {
|
||||
defer func() {
|
||||
page++
|
||||
}()
|
||||
@ -109,7 +109,6 @@ func (suite *RefreshForProjectsTestSuite) TestRefreshForProjects() {
|
||||
q.SetUsed(types.ResourceList{types.ResourceStorage: 0})
|
||||
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRef").Return(q, nil)
|
||||
mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(q, nil)
|
||||
mock.OnAnything(suite.quotaMgr, "Update").Return(nil)
|
||||
mock.OnAnything(suite.driver, "CalculateUsage").Return(types.ResourceList{types.ResourceStorage: 1}, nil)
|
||||
|
||||
|
@ -23,6 +23,9 @@ import (
|
||||
var (
|
||||
// ErrNoRows error from the beego orm
|
||||
ErrNoRows = orm.ErrNoRows
|
||||
|
||||
// ErrOptimisticLock error when update object failed
|
||||
ErrOptimisticLock = errors.New("the object has been modified; please apply your changes to the latest version and try again")
|
||||
)
|
||||
|
||||
// WrapNotFoundError wrap error as NotFoundError when it is orm.ErrNoRows otherwise return err
|
||||
|
@ -15,6 +15,7 @@
|
||||
package retry
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
@ -31,12 +32,30 @@ var (
|
||||
ErrRetryTimeout = errors.New("retry timeout")
|
||||
)
|
||||
|
||||
type abort struct {
|
||||
cause error
|
||||
}
|
||||
|
||||
func (a *abort) Error() string {
|
||||
if a.cause != nil {
|
||||
return fmt.Sprintf("retry abort, error: %v", a.cause)
|
||||
}
|
||||
|
||||
return "retry abort"
|
||||
}
|
||||
|
||||
// Abort wrap err to stop the Retry function
|
||||
func Abort(err error) error {
|
||||
return &abort{cause: err}
|
||||
}
|
||||
|
||||
// Options options for the retry functions
|
||||
type Options struct {
|
||||
InitialInterval time.Duration // the initial interval for retring after failure, default 100 milliseconds
|
||||
MaxInterval time.Duration // the max interval for retring after failure, default 1 second
|
||||
Timeout time.Duration // the total time before returning if something is wrong, default 1 minute
|
||||
Callback func(err error, sleep time.Duration) // the callback function for Retry when the f called failed
|
||||
Backoff bool
|
||||
}
|
||||
|
||||
// Option ...
|
||||
@ -70,12 +89,21 @@ func Callback(callback func(err error, sleep time.Duration)) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Backoff set backoff
|
||||
func Backoff(backoff bool) Option {
|
||||
return func(opts *Options) {
|
||||
opts.Backoff = backoff
|
||||
}
|
||||
}
|
||||
|
||||
// Retry retry until f run successfully or timeout
|
||||
//
|
||||
// NOTE: This function will use exponential backoff and jitter for retrying, see
|
||||
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information
|
||||
func Retry(f func() error, options ...Option) error {
|
||||
opts := &Options{}
|
||||
opts := &Options{
|
||||
Backoff: true,
|
||||
}
|
||||
|
||||
for _, o := range options {
|
||||
o(opts)
|
||||
@ -93,11 +121,15 @@ func Retry(f func() error, options ...Option) error {
|
||||
opts.Timeout = time.Minute
|
||||
}
|
||||
|
||||
b := &backoff.Backoff{
|
||||
Min: opts.InitialInterval,
|
||||
Max: opts.MaxInterval,
|
||||
Factor: 2,
|
||||
Jitter: true,
|
||||
var b *backoff.Backoff
|
||||
|
||||
if opts.Backoff {
|
||||
b = &backoff.Backoff{
|
||||
Min: opts.InitialInterval,
|
||||
Max: opts.MaxInterval,
|
||||
Factor: 2,
|
||||
Jitter: true,
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
@ -113,7 +145,16 @@ func Retry(f func() error, options ...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
sleep := b.Duration()
|
||||
var ab *abort
|
||||
if errors.As(err, &ab) {
|
||||
return ab.cause
|
||||
}
|
||||
|
||||
var sleep time.Duration
|
||||
if opts.Backoff {
|
||||
sleep = b.Duration()
|
||||
}
|
||||
|
||||
if opts.Callback != nil {
|
||||
opts.Callback(err, sleep)
|
||||
}
|
||||
|
@ -22,7 +22,17 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRetryUntil(t *testing.T) {
|
||||
func TestAbort(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
e1 := Abort(nil)
|
||||
assert.Equal("retry abort", e1.Error())
|
||||
|
||||
e2 := Abort(fmt.Errorf("failed to call func"))
|
||||
assert.Equal("retry abort, error: failed to call func", e2.Error())
|
||||
}
|
||||
|
||||
func TestRetry(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
i := 0
|
||||
@ -68,4 +78,16 @@ func TestRetryUntil(t *testing.T) {
|
||||
|
||||
assert.Error(err)
|
||||
assert.Equal("retry timeout: always failed", err.Error())
|
||||
|
||||
i = 0
|
||||
f4 := func() error {
|
||||
if i == 3 {
|
||||
return Abort(fmt.Errorf("abort"))
|
||||
}
|
||||
|
||||
i++
|
||||
return fmt.Errorf("error")
|
||||
}
|
||||
assert.Error(Retry(f4, InitialInterval(time.Second), MaxInterval(time.Second), Timeout(time.Second*5)))
|
||||
assert.LessOrEqual(i, 3)
|
||||
}
|
||||
|
@ -16,7 +16,9 @@ package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
@ -42,9 +44,6 @@ type DAO interface {
|
||||
// GetByRef returns quota by reference object
|
||||
GetByRef(ctx context.Context, reference, referenceID string) (*models.Quota, error)
|
||||
|
||||
// GetByRefForUpdate get quota by reference object and lock it for update
|
||||
GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*models.Quota, error)
|
||||
|
||||
// Update update quota
|
||||
Update(ctx context.Context, quota *models.Quota) error
|
||||
|
||||
@ -171,47 +170,57 @@ func (d *dao) GetByRef(ctx context.Context, reference, referenceID string) (*mod
|
||||
return toQuota(quota, usage), nil
|
||||
}
|
||||
|
||||
func (d *dao) GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*models.Quota, error) {
|
||||
o, err := orm.FromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
quota := &Quota{Reference: reference, ReferenceID: referenceID}
|
||||
if err := o.ReadForUpdate(quota, "reference", "reference_id"); err != nil {
|
||||
return nil, orm.WrapNotFoundError(err, "quota not found for (%s, %s)", reference, referenceID)
|
||||
}
|
||||
|
||||
usage := &QuotaUsage{Reference: reference, ReferenceID: referenceID}
|
||||
if err := o.ReadForUpdate(usage, "reference", "reference_id"); err != nil {
|
||||
return nil, orm.WrapNotFoundError(err, "quota usage not found for (%s, %s)", reference, referenceID)
|
||||
}
|
||||
|
||||
return toQuota(quota, usage), nil
|
||||
}
|
||||
|
||||
func (d *dao) Update(ctx context.Context, quota *models.Quota) error {
|
||||
o, err := orm.FromContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if quota.UsedChanged {
|
||||
usage := &QuotaUsage{ID: quota.ID, Used: quota.Used, UpdateTime: quota.UpdateTime}
|
||||
if quota.UsedChanged && quota.HardChanged {
|
||||
return errors.New("not support change both hard and used of the quota")
|
||||
}
|
||||
|
||||
_, err := o.Update(usage, "used", "update_time")
|
||||
if err != nil {
|
||||
return err
|
||||
if !quota.UsedChanged && !quota.HardChanged {
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
sql string
|
||||
params []interface{}
|
||||
)
|
||||
|
||||
if quota.UsedChanged {
|
||||
sql = "UPDATE quota_usage SET used = ?, update_time = ?, version = ? WHERE id = ? AND version = ?"
|
||||
params = []interface{}{
|
||||
quota.Used,
|
||||
time.Now(),
|
||||
getVersion(quota.UsedVersion),
|
||||
quota.ID,
|
||||
quota.UsedVersion,
|
||||
}
|
||||
} else {
|
||||
sql = "UPDATE quota SET hard = ?, update_time = ?, version = ? WHERE id = ? AND version = ?"
|
||||
params = []interface{}{
|
||||
quota.Hard,
|
||||
time.Now(),
|
||||
getVersion(quota.HardVersion),
|
||||
quota.ID,
|
||||
quota.HardVersion,
|
||||
}
|
||||
}
|
||||
|
||||
if quota.HardChanged {
|
||||
md := &Quota{ID: quota.ID, Hard: quota.Hard, UpdateTime: quota.UpdateTime}
|
||||
result, err := o.Raw(sql, params...).Exec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := o.Update(md, "hard", "update_time")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
num, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if num == 0 {
|
||||
return orm.ErrOptimisticLock
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -231,7 +240,9 @@ SELECT
|
||||
a.reference,
|
||||
a.reference_id,
|
||||
a.hard,
|
||||
a.version as hard_version,
|
||||
b.used,
|
||||
b.version as used_version,
|
||||
b.creation_time,
|
||||
b.update_time
|
||||
FROM
|
||||
@ -270,6 +281,16 @@ func toQuota(quota *Quota, usage *QuotaUsage) *models.Quota {
|
||||
ReferenceID: quota.ReferenceID,
|
||||
Hard: quota.Hard,
|
||||
Used: usage.Used,
|
||||
HardVersion: quota.Version,
|
||||
UsedVersion: usage.Version,
|
||||
CreationTime: quota.CreationTime,
|
||||
}
|
||||
}
|
||||
|
||||
func getVersion(current int64) int64 {
|
||||
if math.MaxInt64 == current {
|
||||
return 0
|
||||
}
|
||||
|
||||
return current + 1
|
||||
}
|
||||
|
@ -15,11 +15,8 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
"github.com/goharbor/harbor/src/pkg/quota/types"
|
||||
htesting "github.com/goharbor/harbor/src/testing"
|
||||
@ -136,48 +133,6 @@ func (suite *DaoTestSuite) TestGetByRef() {
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *DaoTestSuite) TestGetByRefForUpdate() {
|
||||
hardLimits := types.ResourceList{types.ResourceStorage: 100}
|
||||
usage := types.ResourceList{types.ResourceStorage: 0}
|
||||
|
||||
reference, referenceID := "project", "5"
|
||||
id, err := suite.dao.Create(suite.Context(), reference, referenceID, hardLimits, usage)
|
||||
suite.Nil(err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
count := int64(10)
|
||||
|
||||
for i := int64(0); i < count; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
f := func(ctx context.Context) error {
|
||||
q, err := suite.dao.GetByRefForUpdate(ctx, reference, referenceID)
|
||||
suite.Nil(err)
|
||||
|
||||
used, _ := q.GetUsed()
|
||||
used[types.ResourceStorage]++
|
||||
q.SetUsed(used)
|
||||
|
||||
suite.dao.Update(ctx, q)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
orm.WithTransaction(f)(suite.Context())
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
{
|
||||
q, err := suite.dao.Get(suite.Context(), id)
|
||||
suite.Nil(err)
|
||||
used, _ := q.GetUsed()
|
||||
suite.Equal(count, used[types.ResourceStorage])
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *DaoTestSuite) TestUpdate() {
|
||||
hardLimits := types.ResourceList{types.ResourceStorage: 100}
|
||||
usage := types.ResourceList{types.ResourceStorage: 0}
|
||||
@ -191,7 +146,16 @@ func (suite *DaoTestSuite) TestUpdate() {
|
||||
{
|
||||
q, err := suite.dao.Get(suite.Context(), id)
|
||||
if suite.Nil(err) {
|
||||
q.SetHard(newHardLimits).SetUsed(newUsage)
|
||||
q.SetHard(newHardLimits)
|
||||
|
||||
suite.Nil(suite.dao.Update(suite.Context(), q))
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
q, err := suite.dao.Get(suite.Context(), id)
|
||||
if suite.Nil(err) {
|
||||
q.SetUsed(newUsage)
|
||||
|
||||
suite.Nil(suite.dao.Update(suite.Context(), q))
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ type Quota struct {
|
||||
Hard string `orm:"column(hard);type(jsonb)" json:"-"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
Version int64 `orm:"column(version)" json:"-"`
|
||||
}
|
||||
|
||||
// TableName returns table name for orm
|
||||
@ -48,6 +49,7 @@ type QuotaUsage struct {
|
||||
Used string `orm:"column(used);type(jsonb)" json:"-"`
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
Version int64 `orm:"column(version)" json:"-"`
|
||||
}
|
||||
|
||||
// TableName returns table name for orm
|
||||
|
@ -44,9 +44,6 @@ type Manager interface {
|
||||
// GetByRef returns quota by reference object
|
||||
GetByRef(ctx context.Context, reference, referenceID string) (*Quota, error)
|
||||
|
||||
// GetByRefForUpdate returns quota by reference and reference id for update
|
||||
GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*Quota, error)
|
||||
|
||||
// Update update quota
|
||||
Update(ctx context.Context, quota *Quota) error
|
||||
|
||||
@ -102,16 +99,8 @@ func (m *manager) GetByRef(ctx context.Context, reference, referenceID string) (
|
||||
return m.dao.GetByRef(ctx, reference, referenceID)
|
||||
}
|
||||
|
||||
func (m *manager) GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*Quota, error) {
|
||||
return m.dao.GetByRefForUpdate(ctx, reference, referenceID)
|
||||
}
|
||||
|
||||
func (m *manager) Update(ctx context.Context, q *Quota) error {
|
||||
h := func(ctx context.Context) error {
|
||||
return m.dao.Update(ctx, q)
|
||||
}
|
||||
|
||||
return orm.WithTransaction(h)(ctx)
|
||||
return m.dao.Update(ctx, q)
|
||||
}
|
||||
|
||||
func (m *manager) List(ctx context.Context, query *q.Query) ([]*Quota, error) {
|
||||
|
@ -34,6 +34,9 @@ type Quota struct {
|
||||
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
|
||||
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
|
||||
|
||||
HardVersion int64 `orm:"column(hard_version)" json:"-"`
|
||||
UsedVersion int64 `orm:"column(used_version)" json:"-"`
|
||||
|
||||
HardChanged bool `orm:"-" json:"-"`
|
||||
UsedChanged bool `orm:"-" json:"-"`
|
||||
}
|
||||
|
@ -127,29 +127,6 @@ func (_m *Manager) GetByRef(ctx context.Context, reference string, referenceID s
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetByRefForUpdate provides a mock function with given fields: ctx, reference, referenceID
|
||||
func (_m *Manager) GetByRefForUpdate(ctx context.Context, reference string, referenceID string) (*models.Quota, error) {
|
||||
ret := _m.Called(ctx, reference, referenceID)
|
||||
|
||||
var r0 *models.Quota
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string) *models.Quota); ok {
|
||||
r0 = rf(ctx, reference, referenceID)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*models.Quota)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
|
||||
r1 = rf(ctx, reference, referenceID)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// List provides a mock function with given fields: ctx, query
|
||||
func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*models.Quota, error) {
|
||||
ret := _m.Called(ctx, query)
|
||||
|
Loading…
Reference in New Issue
Block a user