diff --git a/make/migrations/postgresql/0070_2.4.0_schema.up.sql b/make/migrations/postgresql/0070_2.4.0_schema.up.sql index 7eb11a1a2..7a6b0e497 100644 --- a/make/migrations/postgresql/0070_2.4.0_schema.up.sql +++ b/make/migrations/postgresql/0070_2.4.0_schema.up.sql @@ -2,3 +2,7 @@ DELETE FROM project_member pm WHERE pm.entity_type = 'u' AND EXISTS (SELECT NULL FROM harbor_user u WHERE pm.entity_id = u.user_id AND u.deleted = true ); ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS speed_kb int; + +/* add version fields for lock free quota */ +ALTER TABLE quota ADD COLUMN IF NOT EXISTS version bigint DEFAULT 0; +ALTER TABLE quota_usage ADD COLUMN IF NOT EXISTS version bigint DEFAULT 0; diff --git a/src/controller/quota/controller.go b/src/controller/quota/controller.go index 3260166eb..4e93a9bf7 100644 --- a/src/controller/quota/controller.go +++ b/src/controller/quota/controller.go @@ -23,19 +23,17 @@ import ( "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" - redislib "github.com/goharbor/harbor/src/lib/redis" + "github.com/goharbor/harbor/src/lib/retry" "github.com/goharbor/harbor/src/pkg/quota" "github.com/goharbor/harbor/src/pkg/quota/driver" "github.com/goharbor/harbor/src/pkg/quota/types" - "github.com/gomodule/redigo/redis" // quota driver _ "github.com/goharbor/harbor/src/controller/quota/driver" ) var ( - // expire reserved resources when no actions on the key of the reserved resources in redis during 1 hour - defaultReservedExpiration = time.Hour + defaultRetryTimeout = time.Minute * 5 ) var ( @@ -82,8 +80,7 @@ type Controller interface { // NewController creates an instance of the default quota controller func NewController() Controller { return &controller{ - reservedExpiration: defaultReservedExpiration, - quotaMgr: quota.Mgr, + quotaMgr: quota.Mgr, } } @@ -167,107 +164,46 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option return quotas, nil } -func (c *controller) getReservedResources(ctx context.Context, reference, referenceID string) (types.ResourceList, error) { - conn := redislib.DefaultPool().Get() - defer conn.Close() - - key := reservedResourcesKey(reference, referenceID) - - str, err := redis.String(conn.Do("GET", key)) - if err == redis.ErrNil { - return nil, nil - } else if err != nil { - return nil, err - } - - return types.NewResourceList(str) -} - -func (c *controller) setReservedResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error { - conn := redislib.DefaultPool().Get() - defer conn.Close() - - key := reservedResourcesKey(reference, referenceID) - - reply, err := redis.String(conn.Do("SET", key, resources.String(), "EX", int64(c.reservedExpiration/time.Second))) - if err != nil { - return err - } - - if reply != "OK" { - return fmt.Errorf("bad reply value") - } - - return nil -} - -func (c *controller) reserveResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error { - reserve := func(ctx context.Context) error { - q, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID) +func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error { + f := func() error { + q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID) if err != nil { - return err + return retry.Abort(err) } hardLimits, err := q.GetHard() if err != nil { - return err + return retry.Abort(err) } used, err := q.GetUsed() if err != nil { - return err + return retry.Abort(err) } - reserved, err := c.getReservedResources(ctx, reference, referenceID) + newUsed, err := op(hardLimits, used) if err != nil { - log.G(ctx).Errorf("failed to get reserved resources for %s %s, error: %v", reference, referenceID, err) - return err + return retry.Abort(err) } - newReserved := types.Add(reserved, resources) + q.SetUsed(newUsed) - if err := quota.IsSafe(hardLimits, types.Add(used, reserved), types.Add(used, newReserved), false); err != nil { - return errors.DeniedError(err).WithMessage("Quota exceeded when processing the request of %v", err) + err = c.quotaMgr.Update(ctx, q) + if err != nil && !errors.Is(err, orm.ErrOptimisticLock) { + return retry.Abort(err) } - if err := c.setReservedResources(ctx, reference, referenceID, newReserved); err != nil { - log.G(ctx).Errorf("failed to set reserved resources for %s %s, error: %v", reference, referenceID, err) - return err - } - - return nil + return err } - return orm.WithTransaction(reserve)(ctx) -} - -func (c *controller) unreserveResources(ctx context.Context, reference, referenceID string, resources types.ResourceList) error { - unreserve := func(ctx context.Context) error { - if _, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID); err != nil { - return err - } - - reserved, err := c.getReservedResources(ctx, reference, referenceID) - if err != nil { - log.G(ctx).Errorf("failed to get reserved resources for %s %s, error: %v", reference, referenceID, err) - return err - } - - newReserved := types.Subtract(reserved, resources) - // ensure that new used is never negative - if negativeUsed := types.IsNegative(newReserved); len(negativeUsed) > 0 { - return fmt.Errorf("reserved resources is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed)) - } - - if err := c.setReservedResources(ctx, reference, referenceID, newReserved); err != nil { - log.G(ctx).Errorf("failed to set reserved resources for %s %s, error: %v", reference, referenceID, err) - return err - } - - return nil + options := []retry.Option{ + retry.Timeout(defaultRetryTimeout), + retry.Backoff(false), + retry.Callback(func(err error, sleep time.Duration) { + log.G(ctx).Debugf("failed to update the quota usage for %s %s, error: %v", reference, referenceID, err) + }), } - - return orm.WithTransaction(unreserve)(ctx) + return retry.Retry(f, options...) } func (c *controller) Refresh(ctx context.Context, reference, referenceID string, options ...Option) error { @@ -278,44 +214,17 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string, opts := newOptions(options...) - refresh := func(ctx context.Context) error { - q, err := c.quotaMgr.GetByRefForUpdate(ctx, reference, referenceID) - if err != nil { - return err - } - - hardLimits, err := q.GetHard() - if err != nil { - return err - } - - used, err := q.GetUsed() - if err != nil { - return err - } - + calculateUsage := func() (types.ResourceList, error) { newUsed, err := driver.CalculateUsage(ctx, referenceID) if err != nil { log.G(ctx).Errorf("failed to calculate quota usage for %s %s, error: %v", reference, referenceID, err) - return err + return nil, err } - // ensure that new used is never negative - if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 { - return fmt.Errorf("quota usage is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed)) - } - - if err := quota.IsSafe(hardLimits, used, newUsed, opts.IgnoreLimitation); err != nil { - return err - } - - q.SetUsed(newUsed) - q.UpdateTime = time.Now() - - return c.quotaMgr.Update(ctx, q) + return newUsed, err } - return orm.WithTransaction(refresh)(ctx) + return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation)) } func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error { @@ -323,28 +232,26 @@ func (c *controller) Request(ctx context.Context, reference, referenceID string, return f() } - if err := c.reserveResources(ctx, reference, referenceID, resources); err != nil { + if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources)); err != nil { + log.G(ctx).Errorf("reserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err) return err } - defer func() { - if err := c.unreserveResources(ctx, reference, referenceID, resources); err != nil { - // ignore this error because reserved resources will be expired - // when no actions on the key of the reserved resources in redis during sometimes - log.G(ctx).Warningf("unreserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err) + err := f() + + if err != nil { + if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources)); er != nil { + // ignore this error, the quota usage will be correct when users do operations which will call refresh quota + log.G(ctx).Warningf("rollback resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, er) } - }() - - if err := f(); err != nil { - return err } - return c.Refresh(ctx, reference, referenceID) + return err } func (c *controller) Update(ctx context.Context, u *quota.Quota) error { - update := func(ctx context.Context) error { - q, err := c.quotaMgr.GetByRefForUpdate(ctx, u.Reference, u.ReferenceID) + f := func() error { + q, err := c.quotaMgr.GetByRef(ctx, u.Reference, u.ReferenceID) if err != nil { return err } @@ -361,11 +268,15 @@ func (c *controller) Update(ctx context.Context, u *quota.Quota) error { } } - q.UpdateTime = time.Now() return c.quotaMgr.Update(ctx, q) } - return orm.WithTransaction(update)(ctx) + options := []retry.Option{ + retry.Timeout(defaultRetryTimeout), + retry.Backoff(false), + } + + return retry.Retry(f, options...) } // Driver returns quota driver for the reference @@ -388,6 +299,46 @@ func Validate(ctx context.Context, reference string, hardLimits types.ResourceLi return d.Validate(hardLimits) } -func reservedResourcesKey(reference, referenceID string) string { - return fmt.Sprintf("quota:%s:%s:reserved", reference, referenceID) +func reserveResources(resources types.ResourceList) func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + return func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + newUsed := types.Add(used, resources) + + if err := quota.IsSafe(hardLimits, used, newUsed, false); err != nil { + return nil, errors.DeniedError(err).WithMessage("Quota exceeded when processing the request of %v", err) + } + + return newUsed, nil + } +} + +func rollbackResources(resources types.ResourceList) func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + return func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + newUsed := types.Subtract(used, resources) + // ensure that new used is never negative + if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 { + return nil, fmt.Errorf("resources is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed)) + } + + return newUsed, nil + } +} + +func refreshResources(calculateUsage func() (types.ResourceList, error), ignoreLimitation bool) func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + return func(hardLimits, used types.ResourceList) (types.ResourceList, error) { + newUsed, err := calculateUsage() + if err != nil { + return nil, err + } + + // ensure that new used is never negative + if negativeUsed := types.IsNegative(newUsed); len(negativeUsed) > 0 { + return nil, fmt.Errorf("quota usage is negative for resource(s): %s", quota.PrettyPrintResourceNames(negativeUsed)) + } + + if err := quota.IsSafe(hardLimits, used, newUsed, ignoreLimitation); err != nil { + return nil, err + } + + return newUsed, nil + } } diff --git a/src/controller/quota/controller_test.go b/src/controller/quota/controller_test.go index 67605d52b..e84297e5a 100644 --- a/src/controller/quota/controller_test.go +++ b/src/controller/quota/controller_test.go @@ -17,9 +17,7 @@ package quota import ( "context" "fmt" - "strconv" "testing" - "time" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/pkg/quota" @@ -51,81 +49,20 @@ func (suite *ControllerTestSuite) SetupTest() { driver.Register(suite.reference, suite.driver) suite.quotaMgr = "atesting.Manager{} - suite.ctl = &controller{quotaMgr: suite.quotaMgr, reservedExpiration: defaultReservedExpiration} + suite.ctl = &controller{quotaMgr: suite.quotaMgr} hardLimits := types.ResourceList{types.ResourceStorage: 100} suite.quota = "a.Quota{Hard: hardLimits.String(), Used: types.Zero(hardLimits).String()} } func (suite *ControllerTestSuite) PrepareForUpdate(q *quota.Quota, newUsage interface{}) { - mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(q, nil) + mock.OnAnything(suite.quotaMgr, "GetByRef").Return(q, nil) mock.OnAnything(suite.driver, "CalculateUsage").Return(newUsage, nil) mock.OnAnything(suite.quotaMgr, "Update").Return(nil) } -func (suite *ControllerTestSuite) TestGetReservedResources() { - reservedExpiration := time.Second * 3 - ctl := &controller{reservedExpiration: reservedExpiration} - - reference, referenceID := "reference", uuid.New().String() - - { - resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID) - suite.Nil(err) - suite.Len(resources, 0) - } - - suite.Nil(ctl.setReservedResources(context.TODO(), reference, referenceID, types.ResourceList{types.ResourceStorage: 100})) - - { - resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID) - suite.Nil(err) - suite.Len(resources, 1) - } - - time.Sleep(reservedExpiration * 2) - - { - resources, err := ctl.getReservedResources(context.TODO(), reference, referenceID) - suite.Nil(err) - suite.Len(resources, 0) - } -} - -func (suite *ControllerTestSuite) TestReserveResources() { - mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(suite.quota, nil) - - ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{}) - referenceID := uuid.New().String() - resources := types.ResourceList{types.ResourceStorage: 100} - - ctl := suite.ctl.(*controller) - - suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources)) - - suite.Error(ctl.reserveResources(ctx, suite.reference, referenceID, resources)) -} - -func (suite *ControllerTestSuite) TestUnreserveResources() { - mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(suite.quota, nil) - - ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{}) - referenceID := uuid.New().String() - resources := types.ResourceList{types.ResourceStorage: 100} - - ctl := suite.ctl.(*controller) - - suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources)) - - suite.Error(ctl.reserveResources(ctx, suite.reference, referenceID, resources)) - - suite.Nil(ctl.unreserveResources(ctx, suite.reference, referenceID, resources)) - - suite.Nil(ctl.reserveResources(ctx, suite.reference, referenceID, resources)) -} - func (suite *ControllerTestSuite) TestRefresh() { suite.PrepareForUpdate(suite.quota, types.ResourceList{types.ResourceStorage: 0}) @@ -174,6 +111,7 @@ func (suite *ControllerTestSuite) TestNoResourcesRequest() { suite.Nil(suite.ctl.Request(ctx, suite.reference, referenceID, nil, func() error { return nil })) } + func (suite *ControllerTestSuite) TestRequest() { suite.PrepareForUpdate(suite.quota, nil) @@ -207,25 +145,3 @@ func (suite *ControllerTestSuite) TestRequestFunctionFailed() { func TestControllerTestSuite(t *testing.T) { suite.Run(t, &ControllerTestSuite{}) } - -func BenchmarkGetReservedResources(b *testing.B) { - ctl := &controller{reservedExpiration: defaultReservedExpiration} - - ctx := context.TODO() - reference, referenceID := "reference", uuid.New().String() - ctl.setReservedResources(ctx, reference, referenceID, types.ResourceList{types.ResourceStorage: 100}) - - for i := 0; i < b.N; i++ { - ctl.getReservedResources(ctx, reference, referenceID) - } -} - -func BenchmarkSetReservedResources(b *testing.B) { - ctl := &controller{reservedExpiration: defaultReservedExpiration} - - ctx := context.TODO() - for i := 0; i < b.N; i++ { - s := strconv.Itoa(i) - ctl.setReservedResources(ctx, "reference"+s, s, types.ResourceList{types.ResourceStorage: 100}) - } -} diff --git a/src/controller/quota/util_test.go b/src/controller/quota/util_test.go index 4b7229720..0eea30500 100644 --- a/src/controller/quota/util_test.go +++ b/src/controller/quota/util_test.go @@ -16,7 +16,6 @@ package quota import ( "context" - proModels "github.com/goharbor/harbor/src/pkg/project/models" "math/rand" "testing" "time" @@ -24,6 +23,7 @@ import ( "github.com/goharbor/harbor/src/controller/project" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/project/models" "github.com/goharbor/harbor/src/pkg/quota" "github.com/goharbor/harbor/src/pkg/quota/driver" "github.com/goharbor/harbor/src/pkg/quota/types" @@ -76,21 +76,21 @@ func (suite *RefreshForProjectsTestSuite) TestRefreshForProjects() { rand.Seed(time.Now().UnixNano()) startProjectID := rand.Int63() - var firstPageProjects, secondPageProjects []*proModels.Project + var firstPageProjects, secondPageProjects []*models.Project for i := 0; i < 50; i++ { - firstPageProjects = append(firstPageProjects, &proModels.Project{ + firstPageProjects = append(firstPageProjects, &models.Project{ ProjectID: startProjectID + int64(i), }) } for i := 0; i < 10; i++ { - secondPageProjects = append(secondPageProjects, &proModels.Project{ + secondPageProjects = append(secondPageProjects, &models.Project{ ProjectID: startProjectID + 50 + int64(i), }) } page := 1 - mock.OnAnything(suite.projectCtl, "List").Return(func(context.Context, *q.Query, ...project.Option) []*proModels.Project { + mock.OnAnything(suite.projectCtl, "List").Return(func(context.Context, *q.Query, ...project.Option) []*models.Project { defer func() { page++ }() @@ -109,7 +109,6 @@ func (suite *RefreshForProjectsTestSuite) TestRefreshForProjects() { q.SetUsed(types.ResourceList{types.ResourceStorage: 0}) mock.OnAnything(suite.quotaMgr, "GetByRef").Return(q, nil) - mock.OnAnything(suite.quotaMgr, "GetByRefForUpdate").Return(q, nil) mock.OnAnything(suite.quotaMgr, "Update").Return(nil) mock.OnAnything(suite.driver, "CalculateUsage").Return(types.ResourceList{types.ResourceStorage: 1}, nil) diff --git a/src/lib/orm/error.go b/src/lib/orm/error.go index db9792e23..01f6057fa 100644 --- a/src/lib/orm/error.go +++ b/src/lib/orm/error.go @@ -23,6 +23,9 @@ import ( var ( // ErrNoRows error from the beego orm ErrNoRows = orm.ErrNoRows + + // ErrOptimisticLock error when update object failed + ErrOptimisticLock = errors.New("the object has been modified; please apply your changes to the latest version and try again") ) // WrapNotFoundError wrap error as NotFoundError when it is orm.ErrNoRows otherwise return err diff --git a/src/lib/retry/retry.go b/src/lib/retry/retry.go index f9fb06727..8e350fdf1 100644 --- a/src/lib/retry/retry.go +++ b/src/lib/retry/retry.go @@ -15,6 +15,7 @@ package retry import ( + "fmt" "math/rand" "time" @@ -31,12 +32,30 @@ var ( ErrRetryTimeout = errors.New("retry timeout") ) +type abort struct { + cause error +} + +func (a *abort) Error() string { + if a.cause != nil { + return fmt.Sprintf("retry abort, error: %v", a.cause) + } + + return "retry abort" +} + +// Abort wrap err to stop the Retry function +func Abort(err error) error { + return &abort{cause: err} +} + // Options options for the retry functions type Options struct { InitialInterval time.Duration // the initial interval for retring after failure, default 100 milliseconds MaxInterval time.Duration // the max interval for retring after failure, default 1 second Timeout time.Duration // the total time before returning if something is wrong, default 1 minute Callback func(err error, sleep time.Duration) // the callback function for Retry when the f called failed + Backoff bool } // Option ... @@ -70,12 +89,21 @@ func Callback(callback func(err error, sleep time.Duration)) Option { } } +// Backoff set backoff +func Backoff(backoff bool) Option { + return func(opts *Options) { + opts.Backoff = backoff + } +} + // Retry retry until f run successfully or timeout // // NOTE: This function will use exponential backoff and jitter for retrying, see // https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information func Retry(f func() error, options ...Option) error { - opts := &Options{} + opts := &Options{ + Backoff: true, + } for _, o := range options { o(opts) @@ -93,11 +121,15 @@ func Retry(f func() error, options ...Option) error { opts.Timeout = time.Minute } - b := &backoff.Backoff{ - Min: opts.InitialInterval, - Max: opts.MaxInterval, - Factor: 2, - Jitter: true, + var b *backoff.Backoff + + if opts.Backoff { + b = &backoff.Backoff{ + Min: opts.InitialInterval, + Max: opts.MaxInterval, + Factor: 2, + Jitter: true, + } } var err error @@ -113,7 +145,16 @@ func Retry(f func() error, options ...Option) error { return nil } - sleep := b.Duration() + var ab *abort + if errors.As(err, &ab) { + return ab.cause + } + + var sleep time.Duration + if opts.Backoff { + sleep = b.Duration() + } + if opts.Callback != nil { opts.Callback(err, sleep) } diff --git a/src/lib/retry/retry_test.go b/src/lib/retry/retry_test.go index 7db179deb..3a629c02b 100644 --- a/src/lib/retry/retry_test.go +++ b/src/lib/retry/retry_test.go @@ -22,7 +22,17 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRetryUntil(t *testing.T) { +func TestAbort(t *testing.T) { + assert := assert.New(t) + + e1 := Abort(nil) + assert.Equal("retry abort", e1.Error()) + + e2 := Abort(fmt.Errorf("failed to call func")) + assert.Equal("retry abort, error: failed to call func", e2.Error()) +} + +func TestRetry(t *testing.T) { assert := assert.New(t) i := 0 @@ -68,4 +78,16 @@ func TestRetryUntil(t *testing.T) { assert.Error(err) assert.Equal("retry timeout: always failed", err.Error()) + + i = 0 + f4 := func() error { + if i == 3 { + return Abort(fmt.Errorf("abort")) + } + + i++ + return fmt.Errorf("error") + } + assert.Error(Retry(f4, InitialInterval(time.Second), MaxInterval(time.Second), Timeout(time.Second*5))) + assert.LessOrEqual(i, 3) } diff --git a/src/pkg/quota/dao/dao.go b/src/pkg/quota/dao/dao.go index ffb66f647..1a93443c4 100644 --- a/src/pkg/quota/dao/dao.go +++ b/src/pkg/quota/dao/dao.go @@ -16,7 +16,9 @@ package dao import ( "context" + "errors" "fmt" + "math" "time" "github.com/goharbor/harbor/src/lib/orm" @@ -42,9 +44,6 @@ type DAO interface { // GetByRef returns quota by reference object GetByRef(ctx context.Context, reference, referenceID string) (*models.Quota, error) - // GetByRefForUpdate get quota by reference object and lock it for update - GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*models.Quota, error) - // Update update quota Update(ctx context.Context, quota *models.Quota) error @@ -171,47 +170,57 @@ func (d *dao) GetByRef(ctx context.Context, reference, referenceID string) (*mod return toQuota(quota, usage), nil } -func (d *dao) GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*models.Quota, error) { - o, err := orm.FromContext(ctx) - if err != nil { - return nil, err - } - - quota := &Quota{Reference: reference, ReferenceID: referenceID} - if err := o.ReadForUpdate(quota, "reference", "reference_id"); err != nil { - return nil, orm.WrapNotFoundError(err, "quota not found for (%s, %s)", reference, referenceID) - } - - usage := &QuotaUsage{Reference: reference, ReferenceID: referenceID} - if err := o.ReadForUpdate(usage, "reference", "reference_id"); err != nil { - return nil, orm.WrapNotFoundError(err, "quota usage not found for (%s, %s)", reference, referenceID) - } - - return toQuota(quota, usage), nil -} - func (d *dao) Update(ctx context.Context, quota *models.Quota) error { o, err := orm.FromContext(ctx) if err != nil { return err } - if quota.UsedChanged { - usage := &QuotaUsage{ID: quota.ID, Used: quota.Used, UpdateTime: quota.UpdateTime} + if quota.UsedChanged && quota.HardChanged { + return errors.New("not support change both hard and used of the quota") + } - _, err := o.Update(usage, "used", "update_time") - if err != nil { - return err + if !quota.UsedChanged && !quota.HardChanged { + return nil + } + + var ( + sql string + params []interface{} + ) + + if quota.UsedChanged { + sql = "UPDATE quota_usage SET used = ?, update_time = ?, version = ? WHERE id = ? AND version = ?" + params = []interface{}{ + quota.Used, + time.Now(), + getVersion(quota.UsedVersion), + quota.ID, + quota.UsedVersion, + } + } else { + sql = "UPDATE quota SET hard = ?, update_time = ?, version = ? WHERE id = ? AND version = ?" + params = []interface{}{ + quota.Hard, + time.Now(), + getVersion(quota.HardVersion), + quota.ID, + quota.HardVersion, } } - if quota.HardChanged { - md := &Quota{ID: quota.ID, Hard: quota.Hard, UpdateTime: quota.UpdateTime} + result, err := o.Raw(sql, params...).Exec() + if err != nil { + return err + } - _, err := o.Update(md, "hard", "update_time") - if err != nil { - return err - } + num, err := result.RowsAffected() + if err != nil { + return err + } + + if num == 0 { + return orm.ErrOptimisticLock } return nil @@ -231,7 +240,9 @@ SELECT a.reference, a.reference_id, a.hard, + a.version as hard_version, b.used, + b.version as used_version, b.creation_time, b.update_time FROM @@ -270,6 +281,16 @@ func toQuota(quota *Quota, usage *QuotaUsage) *models.Quota { ReferenceID: quota.ReferenceID, Hard: quota.Hard, Used: usage.Used, + HardVersion: quota.Version, + UsedVersion: usage.Version, CreationTime: quota.CreationTime, } } + +func getVersion(current int64) int64 { + if math.MaxInt64 == current { + return 0 + } + + return current + 1 +} diff --git a/src/pkg/quota/dao/dao_test.go b/src/pkg/quota/dao/dao_test.go index 9228d5e6a..d901ae8a3 100644 --- a/src/pkg/quota/dao/dao_test.go +++ b/src/pkg/quota/dao/dao_test.go @@ -15,11 +15,8 @@ package dao import ( - "context" - "sync" "testing" - "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/quota/types" htesting "github.com/goharbor/harbor/src/testing" @@ -136,48 +133,6 @@ func (suite *DaoTestSuite) TestGetByRef() { } } -func (suite *DaoTestSuite) TestGetByRefForUpdate() { - hardLimits := types.ResourceList{types.ResourceStorage: 100} - usage := types.ResourceList{types.ResourceStorage: 0} - - reference, referenceID := "project", "5" - id, err := suite.dao.Create(suite.Context(), reference, referenceID, hardLimits, usage) - suite.Nil(err) - - var wg sync.WaitGroup - - count := int64(10) - - for i := int64(0); i < count; i++ { - wg.Add(1) - go func() { - defer wg.Done() - f := func(ctx context.Context) error { - q, err := suite.dao.GetByRefForUpdate(ctx, reference, referenceID) - suite.Nil(err) - - used, _ := q.GetUsed() - used[types.ResourceStorage]++ - q.SetUsed(used) - - suite.dao.Update(ctx, q) - - return nil - } - - orm.WithTransaction(f)(suite.Context()) - }() - } - wg.Wait() - - { - q, err := suite.dao.Get(suite.Context(), id) - suite.Nil(err) - used, _ := q.GetUsed() - suite.Equal(count, used[types.ResourceStorage]) - } -} - func (suite *DaoTestSuite) TestUpdate() { hardLimits := types.ResourceList{types.ResourceStorage: 100} usage := types.ResourceList{types.ResourceStorage: 0} @@ -191,7 +146,16 @@ func (suite *DaoTestSuite) TestUpdate() { { q, err := suite.dao.Get(suite.Context(), id) if suite.Nil(err) { - q.SetHard(newHardLimits).SetUsed(newUsage) + q.SetHard(newHardLimits) + + suite.Nil(suite.dao.Update(suite.Context(), q)) + } + } + + { + q, err := suite.dao.Get(suite.Context(), id) + if suite.Nil(err) { + q.SetUsed(newUsage) suite.Nil(suite.dao.Update(suite.Context(), q)) } diff --git a/src/pkg/quota/dao/model.go b/src/pkg/quota/dao/model.go index a9b83a739..1b1399b47 100644 --- a/src/pkg/quota/dao/model.go +++ b/src/pkg/quota/dao/model.go @@ -33,6 +33,7 @@ type Quota struct { Hard string `orm:"column(hard);type(jsonb)" json:"-"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` + Version int64 `orm:"column(version)" json:"-"` } // TableName returns table name for orm @@ -48,6 +49,7 @@ type QuotaUsage struct { Used string `orm:"column(used);type(jsonb)" json:"-"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` + Version int64 `orm:"column(version)" json:"-"` } // TableName returns table name for orm diff --git a/src/pkg/quota/manager.go b/src/pkg/quota/manager.go index 4d8048f5e..28d58322e 100644 --- a/src/pkg/quota/manager.go +++ b/src/pkg/quota/manager.go @@ -44,9 +44,6 @@ type Manager interface { // GetByRef returns quota by reference object GetByRef(ctx context.Context, reference, referenceID string) (*Quota, error) - // GetByRefForUpdate returns quota by reference and reference id for update - GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*Quota, error) - // Update update quota Update(ctx context.Context, quota *Quota) error @@ -102,16 +99,8 @@ func (m *manager) GetByRef(ctx context.Context, reference, referenceID string) ( return m.dao.GetByRef(ctx, reference, referenceID) } -func (m *manager) GetByRefForUpdate(ctx context.Context, reference, referenceID string) (*Quota, error) { - return m.dao.GetByRefForUpdate(ctx, reference, referenceID) -} - func (m *manager) Update(ctx context.Context, q *Quota) error { - h := func(ctx context.Context) error { - return m.dao.Update(ctx, q) - } - - return orm.WithTransaction(h)(ctx) + return m.dao.Update(ctx, q) } func (m *manager) List(ctx context.Context, query *q.Query) ([]*Quota, error) { diff --git a/src/pkg/quota/models/quota.go b/src/pkg/quota/models/quota.go index 4238f960a..9499d19c4 100644 --- a/src/pkg/quota/models/quota.go +++ b/src/pkg/quota/models/quota.go @@ -34,6 +34,9 @@ type Quota struct { CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` + HardVersion int64 `orm:"column(hard_version)" json:"-"` + UsedVersion int64 `orm:"column(used_version)" json:"-"` + HardChanged bool `orm:"-" json:"-"` UsedChanged bool `orm:"-" json:"-"` } diff --git a/src/testing/pkg/quota/manager.go b/src/testing/pkg/quota/manager.go index e0bfc87fb..2682e3172 100644 --- a/src/testing/pkg/quota/manager.go +++ b/src/testing/pkg/quota/manager.go @@ -127,29 +127,6 @@ func (_m *Manager) GetByRef(ctx context.Context, reference string, referenceID s return r0, r1 } -// GetByRefForUpdate provides a mock function with given fields: ctx, reference, referenceID -func (_m *Manager) GetByRefForUpdate(ctx context.Context, reference string, referenceID string) (*models.Quota, error) { - ret := _m.Called(ctx, reference, referenceID) - - var r0 *models.Quota - if rf, ok := ret.Get(0).(func(context.Context, string, string) *models.Quota); ok { - r0 = rf(ctx, reference, referenceID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*models.Quota) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, reference, referenceID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // List provides a mock function with given fields: ctx, query func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*models.Quota, error) { ret := _m.Called(ctx, query)