From 306e519dbe92970698da6cb7f99758837b4c938c Mon Sep 17 00:00:00 2001 From: fanjiankong Date: Sat, 8 May 2021 18:59:13 +0800 Subject: [PATCH] Fix tencentcr adapter. 1. Limit API qps for the adapter. 2. Allow set qps via env. 3. Fix Tencnet SDK pagenation. 4. Fix resource filter. Signed-off-by: fanjiankong --- src/go.mod | 1 + src/go.sum | 6 + src/pkg/reg/adapter/tencentcr/adapter.go | 15 +- src/pkg/reg/adapter/tencentcr/adapter_test.go | 67 ++-- .../adapter/tencentcr/artifact_registry.go | 56 ++-- src/pkg/reg/adapter/tencentcr/ratelimiter.go | 37 ++ .../reg/adapter/tencentcr/ratelimiter_test.go | 31 ++ src/pkg/reg/adapter/tencentcr/sdk.go | 15 +- .../andres-erbsen/clock/.travis.yml | 7 + .../github.com/andres-erbsen/clock/LICENSE | 21 ++ .../github.com/andres-erbsen/clock/README.md | 104 ++++++ .../github.com/andres-erbsen/clock/clock.go | 317 ++++++++++++++++++ src/vendor/go.uber.org/ratelimit/.gitignore | 6 + src/vendor/go.uber.org/ratelimit/CHANGELOG.md | 23 ++ src/vendor/go.uber.org/ratelimit/LICENSE | 21 ++ src/vendor/go.uber.org/ratelimit/Makefile | 46 +++ src/vendor/go.uber.org/ratelimit/README.md | 46 +++ src/vendor/go.uber.org/ratelimit/go.mod | 9 + src/vendor/go.uber.org/ratelimit/go.sum | 17 + .../go.uber.org/ratelimit/limiter_atomic.go | 110 ++++++ .../ratelimit/limiter_mutexbased.go | 88 +++++ src/vendor/go.uber.org/ratelimit/ratelimit.go | 135 ++++++++ src/vendor/modules.txt | 5 + 23 files changed, 1110 insertions(+), 73 deletions(-) create mode 100644 src/pkg/reg/adapter/tencentcr/ratelimiter.go create mode 100644 src/pkg/reg/adapter/tencentcr/ratelimiter_test.go create mode 100644 src/vendor/github.com/andres-erbsen/clock/.travis.yml create mode 100644 src/vendor/github.com/andres-erbsen/clock/LICENSE create mode 100644 src/vendor/github.com/andres-erbsen/clock/README.md create mode 100644 src/vendor/github.com/andres-erbsen/clock/clock.go create mode 100644 src/vendor/go.uber.org/ratelimit/.gitignore create mode 100644 src/vendor/go.uber.org/ratelimit/CHANGELOG.md create mode 100644 src/vendor/go.uber.org/ratelimit/LICENSE create mode 100644 src/vendor/go.uber.org/ratelimit/Makefile create mode 100644 src/vendor/go.uber.org/ratelimit/README.md create mode 100644 src/vendor/go.uber.org/ratelimit/go.mod create mode 100644 src/vendor/go.uber.org/ratelimit/go.sum create mode 100644 src/vendor/go.uber.org/ratelimit/limiter_atomic.go create mode 100644 src/vendor/go.uber.org/ratelimit/limiter_mutexbased.go create mode 100644 src/vendor/go.uber.org/ratelimit/ratelimit.go diff --git a/src/go.mod b/src/go.mod index cd133248d..575044865 100644 --- a/src/go.mod +++ b/src/go.mod @@ -66,6 +66,7 @@ require ( github.com/tencentcloud/tencentcloud-sdk-go v1.0.62 github.com/theupdateframework/notary v0.6.1 github.com/vmihailenco/msgpack/v5 v5.0.0-rc.2 + go.uber.org/ratelimit v0.2.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20200707034311-ab3426394381 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d diff --git a/src/go.sum b/src/go.sum index 0689b9c43..76af6fbb1 100644 --- a/src/go.sum +++ b/src/go.sum @@ -102,6 +102,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190726115642-cd293c93fd97 h1:bNE5ID4C3YOkROfvBjXJUG53gyb+8az3TQN02LqnGBk= github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190726115642-cd293c93fd97/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -986,9 +988,13 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= +go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= diff --git a/src/pkg/reg/adapter/tencentcr/adapter.go b/src/pkg/reg/adapter/tencentcr/adapter.go index 6a5340655..47089f41a 100644 --- a/src/pkg/reg/adapter/tencentcr/adapter.go +++ b/src/pkg/reg/adapter/tencentcr/adapter.go @@ -5,7 +5,9 @@ import ( "fmt" "net/http" "net/url" + "os" "path" + "strconv" "strings" "github.com/docker/distribution/registry/client/auth/challenge" @@ -28,6 +30,11 @@ var ( ) func init() { + var envTcrQPSLimit, _ = strconv.Atoi(os.Getenv("TCR_QPS_LIMIT")) + if envTcrQPSLimit > 1 && envTcrQPSLimit < tcrQPSLimit { + tcrQPSLimit = envTcrQPSLimit + } + if err := adp.RegisterFactory(model.RegistryTypeTencentTcr, new(factory)); err != nil { log.Errorf("failed to register factory for %s: %v", model.RegistryTypeTencentTcr, err) return @@ -53,7 +60,7 @@ func (f *factory) AdapterPattern() *model.AdapterPattern { } func getAdapterInfo() *model.AdapterPattern { - return nil + return &model.AdapterPattern{} } type adapter struct { @@ -127,7 +134,11 @@ func newAdapter(registry *model.Registry) (a *adapter, err error) { registry.URL, registryURL.Host, *instanceInfo.PublicDomain, *instanceInfo.RegionName, *instanceInfo.RegistryId) // rebuild TCR SDK client - client, err = tcr.NewClient(tcrCredential, *instanceInfo.RegionName, cfp) + client = &tcr.Client{} + client.Init(*instanceInfo.RegionName). + WithCredential(tcrCredential). + WithProfile(cfp). + WithHttpTransport(newRateLimitedTransport(tcrQPSLimit, http.DefaultTransport)) if err != nil { return } diff --git a/src/pkg/reg/adapter/tencentcr/adapter_test.go b/src/pkg/reg/adapter/tencentcr/adapter_test.go index 165c11243..cc561a5d4 100644 --- a/src/pkg/reg/adapter/tencentcr/adapter_test.go +++ b/src/pkg/reg/adapter/tencentcr/adapter_test.go @@ -15,6 +15,7 @@ import ( "github.com/goharbor/harbor/src/pkg/reg/adapter/native" "github.com/goharbor/harbor/src/pkg/reg/model" "github.com/stretchr/testify/assert" + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors" tcr "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/tcr/v20190924" ) @@ -110,41 +111,41 @@ func TestAdapter_NewAdapter_InvalidAKSK(t *testing.T) { assert.Nil(t, adapter) } -// func TestAdapter_NewAdapter_Ok(t *testing.T) { -// adapter, err := newAdapter(&model.Registry{ -// Type: model.RegistryTypeTencentTcr, -// Credential: &model.Credential{ -// AccessKey: mockAccessKey, -// AccessSecret: mockAccessSecret, -// }, -// URL: "https://harbor-community.tencentcloudcr.com", -// }) -// if sdkerr, ok := err.(*errors.TencentCloudSDKError); ok { -// log.Infof("sdk error, error=%v", sdkerr) -// return -// } -// assert.NotNil(t, adapter) -// assert.Nil(t, err) +func TestAdapter_NewAdapter_Ok(t *testing.T) { + adapter, err := newAdapter(&model.Registry{ + Type: model.RegistryTypeTencentTcr, + Credential: &model.Credential{ + AccessKey: mockAccessKey, + AccessSecret: mockAccessSecret, + }, + URL: "https://harbor-community.tencentcloudcr.com", + }) + if sdkerr, ok := err.(*errors.TencentCloudSDKError); ok { + log.Infof("sdk error, error=%v", sdkerr) + return + } + assert.NotNil(t, adapter) + assert.Nil(t, err) -// } +} -// func TestAdapter_NewAdapter_InsecureOk(t *testing.T) { -// adapter, err := newAdapter(&model.Registry{ -// Type: model.RegistryTypeTencentTcr, -// Credential: &model.Credential{ -// AccessKey: mockAccessKey, -// AccessSecret: mockAccessSecret, -// }, -// Insecure: true, -// URL: "https://harbor-community.tencentcloudcr.com", -// }) -// if sdkerr, ok := err.(*errors.TencentCloudSDKError); ok { -// log.Infof("sdk error, error=%v", sdkerr) -// return -// } -// assert.NotNil(t, adapter) -// assert.Nil(t, err) -// } +func TestAdapter_NewAdapter_InsecureOk(t *testing.T) { + adapter, err := newAdapter(&model.Registry{ + Type: model.RegistryTypeTencentTcr, + Credential: &model.Credential{ + AccessKey: mockAccessKey, + AccessSecret: mockAccessSecret, + }, + Insecure: true, + URL: "https://harbor-community.tencentcloudcr.com", + }) + if sdkerr, ok := err.(*errors.TencentCloudSDKError); ok { + log.Infof("sdk error, error=%v", sdkerr) + return + } + assert.NotNil(t, adapter) + assert.Nil(t, err) +} func getMockAdapter(t *testing.T, hasCred, health bool) (*adapter, *httptest.Server) { server := test.NewServer( diff --git a/src/pkg/reg/adapter/tencentcr/artifact_registry.go b/src/pkg/reg/adapter/tencentcr/artifact_registry.go index 761075ba0..d95b84650 100644 --- a/src/pkg/reg/adapter/tencentcr/artifact_registry.go +++ b/src/pkg/reg/adapter/tencentcr/artifact_registry.go @@ -7,13 +7,13 @@ import ( "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/lib/log" adp "github.com/goharbor/harbor/src/pkg/reg/adapter" + "github.com/goharbor/harbor/src/pkg/reg/filter" "github.com/goharbor/harbor/src/pkg/reg/model" "github.com/goharbor/harbor/src/pkg/reg/util" - tcr "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/tcr/v20190924" ) -const ( - tcrQPSLimit = 15 +var ( + tcrQPSLimit = 10 ) /** @@ -48,50 +48,42 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) (resources []*model.Re log.Debugf("[tencent-tcr.FetchArtifacts] namespaces=%v", namespaces) // 2. list repos - var filteredRepos []tcr.TcrRepositoryInfo + // var filteredRepos []tcr.TcrRepositoryInfo + var repos []*model.Repository + var repositories []*model.Repository for _, ns := range namespaces { - var repos []tcr.TcrRepositoryInfo - repos, err = a.listReposByNamespace(ns) + tcrRepos, err := a.listReposByNamespace(ns) if err != nil { - return + return nil, err } - log.Debugf("[tencent-tcr.FetchArtifacts] namespace=%s, repositories=%d", ns, len(repos)) - if _, ok := util.IsSpecificPathComponent(repoPattern); ok { - log.Debugf("[tencent-tcr.FetchArtifacts] specific_repos=%s", repoPattern) - // TODO: Check repo is exist. - filteredRepos = append(filteredRepos, repos...) - } else { - // 3. filter repos - for _, repo := range repos { - var ok bool - ok, err = util.Match(repoPattern, *repo.Name) - log.Debugf("[tencent-tcr.FetchArtifacts] namespace=%s, repository=%s, repoPattern=%s, Match=%v", *repo.Namespace, *repo.Name, repoPattern, ok) - if err != nil { - return - } - if ok { - filteredRepos = append(filteredRepos, repo) - } - } + if len(tcrRepos) == 0 { + continue + } + for _, tcrRepo := range tcrRepos { + repositories = append(repositories, &model.Repository{ + Name: *tcrRepo.Name, + }) } } - log.Debugf("[tencent-tcr.FetchArtifacts] filteredRepos=%d", len(filteredRepos)) + repos, _ = filter.DoFilterRepositories(repositories, filters) + log.Debugf("[tencent-tcr.FetchArtifacts] filteredRepos=%d", len(repos)) // 4. list images - var rawResources = make([]*model.Resource, len(filteredRepos)) + var rawResources = make([]*model.Resource, len(repos)) runner := utils.NewLimitedConcurrentRunner(tcrQPSLimit) - for i, r := range filteredRepos { + for i, r := range repos { // !copy index := i repo := r runner.AddTask(func() error { var images []string - _, images, err = a.getImages(*repo.Namespace, *repo.Name, "") + repoArr := strings.Split(repo.Name, "/") + _, images, err = a.getImages(repoArr[0], strings.Join(repoArr[1:], "/"), "") if err != nil { - return fmt.Errorf("[tencent-tcr.FetchArtifacts.listImages] repo=%s, error=%v", *repo.Name, err) + return fmt.Errorf("[tencent-tcr.FetchArtifacts.listImages] runner=%d repo=%s, error=%v", index, repo.Name, err) } var filteredImages []string @@ -110,7 +102,7 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) (resources []*model.Re filteredImages = images } - log.Debugf("[tencent-tcr.FetchArtifacts] repo=%s, images=%v, filteredImages=%v", *repo.Name, images, filteredImages) + log.Debugf("[tencent-tcr.FetchArtifacts] repo=%s, images=%v, filteredImages=%v", repo.Name, images, filteredImages) if len(filteredImages) > 0 { rawResources[index] = &model.Resource{ @@ -118,7 +110,7 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) (resources []*model.Re Registry: a.registry, Metadata: &model.ResourceMetadata{ Repository: &model.Repository{ - Name: *repo.Name, + Name: repo.Name, }, Vtags: filteredImages, }, diff --git a/src/pkg/reg/adapter/tencentcr/ratelimiter.go b/src/pkg/reg/adapter/tencentcr/ratelimiter.go new file mode 100644 index 000000000..64f662d7f --- /dev/null +++ b/src/pkg/reg/adapter/tencentcr/ratelimiter.go @@ -0,0 +1,37 @@ +package tencentcr + +import ( + "net/http" + "sync" + + "go.uber.org/ratelimit" +) + +type limitTransport struct { + http.RoundTripper + limiter ratelimit.Limiter +} + +var _ http.RoundTripper = limitTransport{} + +func (t limitTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.limiter.Take() + return t.RoundTripper.RoundTrip(req) +} + +var limiterOnce sync.Once +var limiter ratelimit.Limiter + +func newLimiter(rate int) ratelimit.Limiter { + limiterOnce.Do(func() { + limiter = ratelimit.New(rate) + }) + return limiter +} + +func newRateLimitedTransport(rate int, transport http.RoundTripper) http.RoundTripper { + return &limitTransport{ + RoundTripper: transport, + limiter: newLimiter(rate), + } +} diff --git a/src/pkg/reg/adapter/tencentcr/ratelimiter_test.go b/src/pkg/reg/adapter/tencentcr/ratelimiter_test.go new file mode 100644 index 000000000..743e6737e --- /dev/null +++ b/src/pkg/reg/adapter/tencentcr/ratelimiter_test.go @@ -0,0 +1,31 @@ +package tencentcr + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_newRateLimitedTransport(t *testing.T) { + tests := []struct { + name string + rate int + transport http.RoundTripper + }{ + {"1qps", 1, http.DefaultTransport}, + } + var req = &http.Request{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := newRateLimitedTransport(tt.rate, tt.transport) + start := time.Now() + for i := 0; i <= tt.rate; i++ { + got.RoundTrip(req) + } + used := int64(time.Since(start).Milliseconds()) / int64(tt.rate) + assert.GreaterOrEqualf(t, used/int64(tt.rate), int64(1e3/tt.rate), "used %d ms per req", used/int64(tt.rate)) + }) + } +} diff --git a/src/pkg/reg/adapter/tencentcr/sdk.go b/src/pkg/reg/adapter/tencentcr/sdk.go index 889b1b423..328d8ea77 100644 --- a/src/pkg/reg/adapter/tencentcr/sdk.go +++ b/src/pkg/reg/adapter/tencentcr/sdk.go @@ -150,7 +150,8 @@ func (a *adapter) listReposByNamespace(namespace string) (repos []tcr.TcrReposit req.Limit = a.pageSize var resp = tcr.NewDescribeRepositoriesResponse() - var page int64 + var page int64 = 1 + var repositories []string for { req.Offset = common.Int64Ptr(page) resp, err = a.tcrClient.DescribeRepositories(req) @@ -161,10 +162,12 @@ func (a *adapter) listReposByNamespace(namespace string) (repos []tcr.TcrReposit size := len(resp.Response.RepositoryList) for i, repo := range resp.Response.RepositoryList { - log.Debugf("[tencent-tcr.listReposByNamespace.DescribeRepositories] Retrives page=%d repo(%d/%d)=%s", page, i, size, *repo.Name) + log.Debugf("[tencent-tcr.listReposByNamespace.DescribeRepositories] Retrives total=%d page=%d repo(%d/%d)=%s", *resp.Response.TotalCount, page, i, size, *repo.Name) repos = append(repos, *repo) + repositories = append(repositories, *repo.Name) } + log.Debugf("[tencent-tcr.listReposByNamespace.DescribeRepositories] total=%d now=%d page=%d,repositories=%v", *resp.Response.TotalCount, len(repos), page, repositories) if len(repos) == int(*resp.Response.TotalCount) { log.Debugf("[tencent-tcr.listReposByNamespace.DescribeRepositories] Retrives all repos.") break @@ -198,10 +201,10 @@ func (a *adapter) getImages(namespace, repo, tag string) (images []*tcr.TcrImage } var resp = tcr.NewDescribeImagesResponse() - var page int64 + var page int64 = 1 for { - log.Debugf("[tencent-tcr.getImages] registryID=%s, namespace=%s, repo=%s, tag=%s, page=%d", - *a.registryID, namespace, repo, tag, page) + log.Debugf("[tencent-tcr.getImages] registryID=%s, namespace=%s, repo=%s, tag(s)=%d, page=%d", + *a.registryID, namespace, repo, len(imageNames), page) req.Offset = &page resp, err = a.tcrClient.DescribeImages(req) if err != nil { @@ -214,7 +217,7 @@ func (a *adapter) getImages(namespace, repo, tag string) (images []*tcr.TcrImage imageNames = append(imageNames, *image.ImageVersion) } - if len(images) == int(*resp.Response.TotalCount) { + if len(imageNames) == int(*resp.Response.TotalCount) { break } page++ diff --git a/src/vendor/github.com/andres-erbsen/clock/.travis.yml b/src/vendor/github.com/andres-erbsen/clock/.travis.yml new file mode 100644 index 000000000..ca785e519 --- /dev/null +++ b/src/vendor/github.com/andres-erbsen/clock/.travis.yml @@ -0,0 +1,7 @@ +language: go +go: + - 1.3 + - 1.4 + - release + - tip +sudo: false diff --git a/src/vendor/github.com/andres-erbsen/clock/LICENSE b/src/vendor/github.com/andres-erbsen/clock/LICENSE new file mode 100644 index 000000000..ddf4e001e --- /dev/null +++ b/src/vendor/github.com/andres-erbsen/clock/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Ben Johnson, Copyright (c) 2015 Yahoo Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/vendor/github.com/andres-erbsen/clock/README.md b/src/vendor/github.com/andres-erbsen/clock/README.md new file mode 100644 index 000000000..f744e7664 --- /dev/null +++ b/src/vendor/github.com/andres-erbsen/clock/README.md @@ -0,0 +1,104 @@ +clock [![Build Status](https://travis-ci.org/andres-erbsen/clock.svg)](https://travis-ci.org/andres-erbsen/clock) [![Coverage Status](https://coveralls.io/repos/andres-erbsen/clock/badge.png?branch=master)](https://coveralls.io/r/andres-erbsen/clock?branch=master) [![GoDoc](https://godoc.org/github.com/andres-erbsen/clock?status.png)](https://godoc.org/github.com/andres-erbsen/clock) ![Project status](http://img.shields.io/status/experimental.png?color=red) +===== + +Clock is a small library for mocking time in Go. It provides an interface +around the standard library's [`time`][time] package so that the application +can use the realtime clock while tests can use the mock clock. + +[time]: http://golang.org/pkg/time/ + + +## Usage + +### Realtime Clock + +Your application can maintain a `Clock` variable that will allow realtime and +mock clocks to be interchangable. For example, if you had an `Application` type: + +```go +import "github.com/andres-erbsen/clock" + +type Application struct { + Clock clock.Clock +} +``` + +You could initialize it to use the realtime clock like this: + +```go +var app Application +app.Clock = clock.New() +... +``` + +Then all timers and time-related functionality should be performed from the +`Clock` variable. + + +### Mocking time + +In your tests, you will want to use a `Mock` clock: + +```go +import ( + "testing" + + "github.com/andres-erbsen/clock" +) + +func TestApplication_DoSomething(t *testing.T) { + mock := clock.NewMock() + app := Application{Clock: mock} + ... +} +``` + +Now that you've initialized your application to use the mock clock, you can +adjust the time programmatically. The mock clock always starts from the Unix +epoch (midnight, Jan 1, 1970 UTC). + + +### Controlling time + +The mock clock provides the same functions that the standard library's `time` +package provides. For example, to find the current time, you use the `Now()` +function: + +```go +mock := clock.NewMock() + +// Find the current time. +mock.Now().UTC() // 1970-01-01 00:00:00 +0000 UTC + +// Move the clock forward. +mock.Add(2 * time.Hour) + +// Check the time again. It's 2 hours later! +mock.Now().UTC() // 1970-01-01 02:00:00 +0000 UTC +``` + +Timers and Tickers are also controlled by this same mock clock. They will only +execute when the clock is moved forward: + +``` +mock := clock.NewMock() +count := 0 + +// Kick off a timer to increment every 1 mock second. +go func() { + ticker := clock.Ticker(1 * time.Second) + for { + <-ticker.C + count++ + } +}() +runtime.Gosched() + +// Move the clock forward 10 second. +mock.Add(10 * time.Second) + +// This prints 10. +fmt.Println(count) +``` + + diff --git a/src/vendor/github.com/andres-erbsen/clock/clock.go b/src/vendor/github.com/andres-erbsen/clock/clock.go new file mode 100644 index 000000000..b58b70320 --- /dev/null +++ b/src/vendor/github.com/andres-erbsen/clock/clock.go @@ -0,0 +1,317 @@ +package clock + +import ( + "sort" + "sync" + "time" +) + +// Clock represents an interface to the functions in the standard library time +// package. Two implementations are available in the clock package. The first +// is a real-time clock which simply wraps the time package's functions. The +// second is a mock clock which will only make forward progress when +// programmatically adjusted. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + Now() time.Time + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Ticker(d time.Duration) *Ticker + Timer(d time.Duration) *Timer +} + +// New returns an instance of a real-time clock. +func New() Clock { + return &clock{} +} + +// clock implements a real-time clock by simply wrapping the time package functions. +type clock struct{} + +func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (c *clock) Now() time.Time { return time.Now() } + +func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } + +func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } + +func (c *clock) Ticker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{C: t.C, ticker: t} +} + +func (c *clock) Timer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{C: t.C, timer: t} +} + +// Mock represents a mock clock that only moves forward programmically. +// It can be preferable to a real-time clock when testing time-based functionality. +type Mock struct { + mu sync.Mutex + now time.Time // current time + timers clockTimers // tickers & timers +} + +// NewMock returns an instance of a mock clock. +// The current time of the mock clock on initialization is the Unix epoch. +func NewMock() *Mock { + return &Mock{now: time.Unix(0, 0)} +} + +// Add moves the current time of the mock clock forward by the duration. +// This should only be called from a single goroutine at a time. +func (m *Mock) Add(d time.Duration) { + // Calculate the final current time. + t := m.now.Add(d) + + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure the other goroutines get handled. + gosched() +} + +// Sets the current time of the mock clock to a specific one. +// This should only be called from a single goroutine at a time. +func (m *Mock) Set(t time.Time) { + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure the other goroutines get handled. + gosched() +} + +// runNextTimer executes the next timer in chronological order and moves the +// current time to the timer's next tick time. The next time is not executed if +// it's next time if after the max time. Returns true if a timer is executed. +func (m *Mock) runNextTimer(max time.Time) bool { + m.mu.Lock() + + // Sort timers by time. + sort.Sort(m.timers) + + // If we have no more timers then exit. + if len(m.timers) == 0 { + m.mu.Unlock() + return false + } + + // Retrieve next timer. Exit if next tick is after new time. + t := m.timers[0] + if t.Next().After(max) { + m.mu.Unlock() + return false + } + + // Move "now" forward and unlock clock. + m.now = t.Next() + m.mu.Unlock() + + // Execute timer. + t.Tick(m.now) + return true +} + +// After waits for the duration to elapse and then sends the current time on the returned channel. +func (m *Mock) After(d time.Duration) <-chan time.Time { + return m.Timer(d).C +} + +// AfterFunc waits for the duration to elapse and then executes a function. +// A Timer is returned that can be stopped. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + t := m.Timer(d) + t.C = nil + t.fn = f + return t +} + +// Now returns the current wall time on the mock clock. +func (m *Mock) Now() time.Time { + m.mu.Lock() + defer m.mu.Unlock() + return m.now +} + +// Sleep pauses the goroutine for the given duration on the mock clock. +// The clock must be moved forward in a separate goroutine. +func (m *Mock) Sleep(d time.Duration) { + <-m.After(d) +} + +// Tick is a convenience function for Ticker(). +// It will return a ticker channel that cannot be stopped. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + return m.Ticker(d).C +} + +// Ticker creates a new instance of Ticker. +func (m *Mock) Ticker(d time.Duration) *Ticker { + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time, 1) + t := &Ticker{ + C: ch, + c: ch, + mock: m, + d: d, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTicker)(t)) + return t +} + +// Timer creates a new instance of Timer. +func (m *Mock) Timer(d time.Duration) *Timer { + ch := make(chan time.Time, 1) + t := &Timer{ + C: ch, + c: ch, + mock: m, + next: m.Now().Add(d), + } + m.addTimer((*internalTimer)(t)) + return t +} + +func (m *Mock) addTimer(t *internalTimer) { + m.mu.Lock() + defer m.mu.Unlock() + m.timers = append(m.timers, t) +} + +func (m *Mock) removeClockTimer(t clockTimer) bool { + m.mu.Lock() + defer m.mu.Unlock() + ret := false + for i, timer := range m.timers { + if timer == t { + ret = true + copy(m.timers[i:], m.timers[i+1:]) + m.timers[len(m.timers)-1] = nil + m.timers = m.timers[:len(m.timers)-1] + break + } + } + sort.Sort(m.timers) + return ret +} + +// clockTimer represents an object with an associated start time. +type clockTimer interface { + Next() time.Time + Tick(time.Time) +} + +// clockTimers represents a list of sortable timers. +type clockTimers []clockTimer + +func (a clockTimers) Len() int { return len(a) } +func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } + +// Timer represents a single event. +// The current time will be sent on C, unless the timer was created by AfterFunc. +type Timer struct { + C <-chan time.Time + c chan time.Time + timer *time.Timer // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + fn func() // AfterFunc function, if set +} + +// Stop turns off the timer. +func (t *Timer) Stop() bool { + if t.timer != nil { + return t.timer.Stop() + } + return t.mock.removeClockTimer((*internalTimer)(t)) +} + +// Reset changes the timer to expire after duration d. It returns true if the +// timer had been active, false if the timer had expired or been stopped. +func (t *Timer) Reset(d time.Duration) bool { + if t.timer != nil { + return t.timer.Reset(d) + } + ret := t.mock.removeClockTimer((*internalTimer)(t)) + t.next = t.mock.Now().Add(d) + t.mock.addTimer((*internalTimer)(t)) + return ret +} + +type internalTimer Timer + +func (t *internalTimer) Next() time.Time { return t.next } +func (t *internalTimer) Tick(now time.Time) { + if t.fn != nil { + t.fn() + } else { + select { + case t.c <- now: + default: + } + } + t.mock.removeClockTimer((*internalTimer)(t)) + gosched() +} + +// Ticker holds a channel that receives "ticks" at regular intervals. +type Ticker struct { + C <-chan time.Time + c chan time.Time + ticker *time.Ticker // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + d time.Duration // time between ticks +} + +// Stop turns off the ticker. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + } else { + t.mock.removeClockTimer((*internalTicker)(t)) + } +} + +type internalTicker Ticker + +func (t *internalTicker) Next() time.Time { return t.next } +func (t *internalTicker) Tick(now time.Time) { + select { + case t.c <- now: + default: + } + t.next = now.Add(t.d) + gosched() +} + +// Sleep momentarily so that other goroutines can process. +func gosched() { time.Sleep(1 * time.Millisecond) } diff --git a/src/vendor/go.uber.org/ratelimit/.gitignore b/src/vendor/go.uber.org/ratelimit/.gitignore new file mode 100644 index 000000000..aa346ac44 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/.gitignore @@ -0,0 +1,6 @@ +/bin +/vendor +cover.html +cover.out + +*.swp diff --git a/src/vendor/go.uber.org/ratelimit/CHANGELOG.md b/src/vendor/go.uber.org/ratelimit/CHANGELOG.md new file mode 100644 index 000000000..b2b89cf73 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + +## v0.2.0 - 2021-03-02 +### Added +- Allow configuring the limiter with custom slack. + [#64](https://github.com/uber-go/ratelimit/pull/64) +- Allow configuring the limiter per arbitrary time duration. + [#54](https://github.com/uber-go/ratelimit/pull/54) +### Changed +- Switched from Glide to Go Modules. +### Fixed +- Fix not working slack. + [#60](https://github.com/uber-go/ratelimit/pull/60) + +## v0.1.0 +### Fixed +- Changed the import path for `go.uber.org/atomic` to its newer, canonical + import path. + [#18](https://github.com/uber-go/ratelimit/issues/18) diff --git a/src/vendor/go.uber.org/ratelimit/LICENSE b/src/vendor/go.uber.org/ratelimit/LICENSE new file mode 100644 index 000000000..0f3edc861 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Uber Technologies, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/src/vendor/go.uber.org/ratelimit/Makefile b/src/vendor/go.uber.org/ratelimit/Makefile new file mode 100644 index 000000000..5bab5d77a --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/Makefile @@ -0,0 +1,46 @@ +# Directory to put `go install`ed binaries in. +export GOBIN ?= $(shell pwd)/bin + +GO_FILES := $(shell \ + find . '(' -path '*/.*' -o -path './vendor' ')' -prune \ + -o -name '*.go' -print | cut -b3-) + +.PHONY: bench +bench: + go test -bench=. ./... + +bin/golint: tools/go.mod + @cd tools && go install golang.org/x/lint/golint + +bin/staticcheck: tools/go.mod + @cd tools && go install honnef.co/go/tools/cmd/staticcheck + +.PHONY: build +build: + go build ./... + +.PHONY: cover +cover: + go test -coverprofile=cover.out -coverpkg=./... -v ./... + go tool cover -html=cover.out -o cover.html + +.PHONY: gofmt +gofmt: + $(eval FMT_LOG := $(shell mktemp -t gofmt.XXXXX)) + @gofmt -e -s -l $(GO_FILES) > $(FMT_LOG) || true + @[ ! -s "$(FMT_LOG)" ] || (echo "gofmt failed:" | cat - $(FMT_LOG) && false) + +.PHONY: golint +golint: bin/golint + @$(GOBIN)/golint -set_exit_status ./... + +.PHONY: lint +lint: gofmt golint staticcheck + +.PHONY: staticcheck +staticcheck: bin/staticcheck + @$(GOBIN)/staticcheck ./... + +.PHONY: test +test: + go test -race ./... diff --git a/src/vendor/go.uber.org/ratelimit/README.md b/src/vendor/go.uber.org/ratelimit/README.md new file mode 100644 index 000000000..a05a2a888 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/README.md @@ -0,0 +1,46 @@ +# Go rate limiter [![GoDoc][doc-img]][doc] [![Coverage Status][cov-img]][cov] ![test][test-img] + +This package provides a Golang implementation of the leaky-bucket rate limit algorithm. +This implementation refills the bucket based on the time elapsed between +requests instead of requiring an interval clock to fill the bucket discretely. + +Create a rate limiter with a maximum number of operations to perform per second. +Call Take() before each operation. Take will sleep until you can continue. + +```go +import ( + "fmt" + "time" + + "go.uber.org/ratelimit" +) + +func main() { + rl := ratelimit.New(100) // per second + + prev := time.Now() + for i := 0; i < 10; i++ { + now := rl.Take() + fmt.Println(i, now.Sub(prev)) + prev = now + } + + // Output: + // 0 0 + // 1 10ms + // 2 10ms + // 3 10ms + // 4 10ms + // 5 10ms + // 6 10ms + // 7 10ms + // 8 10ms + // 9 10ms +} +``` + +[cov-img]: https://codecov.io/gh/uber-go/ratelimit/branch/master/graph/badge.svg?token=zhLeUjjrm2 +[cov]: https://codecov.io/gh/uber-go/ratelimit +[doc-img]: https://pkg.go.dev/badge/go.uber.org/ratelimit +[doc]: https://pkg.go.dev/go.uber.org/ratelimit +[test-img]: https://github.com/uber-go/ratelimit/workflows/test/badge.svg diff --git a/src/vendor/go.uber.org/ratelimit/go.mod b/src/vendor/go.uber.org/ratelimit/go.mod new file mode 100644 index 000000000..7487c4209 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/go.mod @@ -0,0 +1,9 @@ +module go.uber.org/ratelimit + +go 1.14 + +require ( + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 + github.com/stretchr/testify v1.6.1 + go.uber.org/atomic v1.7.0 +) diff --git a/src/vendor/go.uber.org/ratelimit/go.sum b/src/vendor/go.uber.org/ratelimit/go.sum new file mode 100644 index 000000000..15512eae8 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/go.sum @@ -0,0 +1,17 @@ +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/vendor/go.uber.org/ratelimit/limiter_atomic.go b/src/vendor/go.uber.org/ratelimit/limiter_atomic.go new file mode 100644 index 000000000..745aa4cb5 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/limiter_atomic.go @@ -0,0 +1,110 @@ +// Copyright (c) 2016,2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ratelimit // import "go.uber.org/ratelimit" + +import ( + "time" + + "sync/atomic" + "unsafe" +) + +type state struct { + last time.Time + sleepFor time.Duration +} + +type atomicLimiter struct { + state unsafe.Pointer + //lint:ignore U1000 Padding is unused but it is crucial to maintain performance + // of this rate limiter in case of collocation with other frequently accessed memory. + padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing. + + perRequest time.Duration + maxSlack time.Duration + clock Clock +} + +// newAtomicBased returns a new atomic based limiter. +func newAtomicBased(rate int, opts ...Option) *atomicLimiter { + // TODO consider moving config building to the implementation + // independent code. + config := buildConfig(opts) + perRequest := config.per / time.Duration(rate) + l := &atomicLimiter{ + perRequest: perRequest, + maxSlack: -1 * time.Duration(config.slack) * perRequest, + clock: config.clock, + } + + initialState := state{ + last: time.Time{}, + sleepFor: 0, + } + atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) + return l +} + +// Take blocks to ensure that the time spent between multiple +// Take calls is on average time.Second/rate. +func (t *atomicLimiter) Take() time.Time { + var ( + newState state + taken bool + interval time.Duration + ) + for !taken { + now := t.clock.Now() + + previousStatePointer := atomic.LoadPointer(&t.state) + oldState := (*state)(previousStatePointer) + + newState = state{ + last: now, + sleepFor: oldState.sleepFor, + } + + // If this is our first request, then we allow it. + if oldState.last.IsZero() { + taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) + continue + } + + // sleepFor calculates how much time we should sleep based on + // the perRequest budget and how long the last request took. + // Since the request may take longer than the budget, this number + // can get negative, and is summed across requests. + newState.sleepFor += t.perRequest - now.Sub(oldState.last) + // We shouldn't allow sleepFor to get too negative, since it would mean that + // a service that slowed down a lot for a short period of time would get + // a much higher RPS following that. + if newState.sleepFor < t.maxSlack { + newState.sleepFor = t.maxSlack + } + if newState.sleepFor > 0 { + newState.last = newState.last.Add(newState.sleepFor) + interval, newState.sleepFor = newState.sleepFor, 0 + } + taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) + } + t.clock.Sleep(interval) + return newState.last +} diff --git a/src/vendor/go.uber.org/ratelimit/limiter_mutexbased.go b/src/vendor/go.uber.org/ratelimit/limiter_mutexbased.go new file mode 100644 index 000000000..1408f1c51 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/limiter_mutexbased.go @@ -0,0 +1,88 @@ +// Copyright (c) 2016,2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ratelimit // import "go.uber.org/ratelimit" + +import ( + "sync" + "time" +) + +type mutexLimiter struct { + sync.Mutex + last time.Time + sleepFor time.Duration + perRequest time.Duration + maxSlack time.Duration + clock Clock +} + +// newMutexBased returns a new atomic based limiter. +func newMutexBased(rate int, opts ...Option) *mutexLimiter { + // TODO consider moving config building to the implementation + // independent code. + config := buildConfig(opts) + perRequest := config.per / time.Duration(rate) + l := &mutexLimiter{ + perRequest: perRequest, + maxSlack: -1 * time.Duration(config.slack) * perRequest, + clock: config.clock, + } + return l +} + +// Take blocks to ensure that the time spent between multiple +// Take calls is on average time.Second/rate. +func (t *mutexLimiter) Take() time.Time { + t.Lock() + defer t.Unlock() + + now := t.clock.Now() + + // If this is our first request, then we allow it. + if t.last.IsZero() { + t.last = now + return t.last + } + + // sleepFor calculates how much time we should sleep based on + // the perRequest budget and how long the last request took. + // Since the request may take longer than the budget, this number + // can get negative, and is summed across requests. + t.sleepFor += t.perRequest - now.Sub(t.last) + + // We shouldn't allow sleepFor to get too negative, since it would mean that + // a service that slowed down a lot for a short period of time would get + // a much higher RPS following that. + if t.sleepFor < t.maxSlack { + t.sleepFor = t.maxSlack + } + + // If sleepFor is positive, then we should sleep now. + if t.sleepFor > 0 { + t.clock.Sleep(t.sleepFor) + t.last = now.Add(t.sleepFor) + t.sleepFor = 0 + } else { + t.last = now + } + + return t.last +} diff --git a/src/vendor/go.uber.org/ratelimit/ratelimit.go b/src/vendor/go.uber.org/ratelimit/ratelimit.go new file mode 100644 index 000000000..b5b16e576 --- /dev/null +++ b/src/vendor/go.uber.org/ratelimit/ratelimit.go @@ -0,0 +1,135 @@ +// Copyright (c) 2016,2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ratelimit // import "go.uber.org/ratelimit" + +import ( + "time" + + "github.com/andres-erbsen/clock" +) + +// Note: This file is inspired by: +// https://github.com/prashantv/go-bench/blob/master/ratelimit + +// Limiter is used to rate-limit some process, possibly across goroutines. +// The process is expected to call Take() before every iteration, which +// may block to throttle the goroutine. +type Limiter interface { + // Take should block to make sure that the RPS is met. + Take() time.Time +} + +// Clock is the minimum necessary interface to instantiate a rate limiter with +// a clock or mock clock, compatible with clocks created using +// github.com/andres-erbsen/clock. +type Clock interface { + Now() time.Time + Sleep(time.Duration) +} + +// config configures a limiter. +type config struct { + clock Clock + slack int + per time.Duration +} + +// New returns a Limiter that will limit to the given RPS. +func New(rate int, opts ...Option) Limiter { + return newAtomicBased(rate, opts...) +} + +// buildConfig combines defaults with options. +func buildConfig(opts []Option) config { + c := config{ + clock: clock.New(), + slack: 10, + per: time.Second, + } + + for _, opt := range opts { + opt.apply(&c) + } + return c +} + +// Option configures a Limiter. +type Option interface { + apply(*config) +} + +type clockOption struct { + clock Clock +} + +func (o clockOption) apply(c *config) { + c.clock = o.clock +} + +// WithClock returns an option for ratelimit.New that provides an alternate +// Clock implementation, typically a mock Clock for testing. +func WithClock(clock Clock) Option { + return clockOption{clock: clock} +} + +type slackOption int + +func (o slackOption) apply(c *config) { + c.slack = int(o) +} + +// WithoutSlack configures the limiter to be strict and not to accumulate +// previously "unspent" requests for future bursts of traffic. +var WithoutSlack Option = slackOption(0) + +// WithSlack configures custom slack. +// Slack allows the limiter to accumulate "unspent" requests +// for future bursts of traffic. +func WithSlack(slack int) Option { + return slackOption(slack) +} + +type perOption time.Duration + +func (p perOption) apply(c *config) { + c.per = time.Duration(p) +} + +// Per allows configuring limits for different time windows. +// +// The default window is one second, so New(100) produces a one hundred per +// second (100 Hz) rate limiter. +// +// New(2, Per(60*time.Second)) creates a 2 per minute rate limiter. +func Per(per time.Duration) Option { + return perOption(per) +} + +type unlimited struct{} + +// NewUnlimited returns a RateLimiter that is not limited. +func NewUnlimited() Limiter { + return unlimited{} +} + +func (unlimited) Take() time.Time { + return time.Now() +} diff --git a/src/vendor/modules.txt b/src/vendor/modules.txt index cb6130007..ac37ce61e 100644 --- a/src/vendor/modules.txt +++ b/src/vendor/modules.txt @@ -53,6 +53,8 @@ github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests github.com/aliyun/alibaba-cloud-sdk-go/sdk/responses github.com/aliyun/alibaba-cloud-sdk-go/sdk/utils github.com/aliyun/alibaba-cloud-sdk-go/services/cr +# github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 +github.com/andres-erbsen/clock # github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 github.com/asaskevich/govalidator # github.com/astaxie/beego v1.12.1 @@ -518,6 +520,9 @@ go.mongodb.org/mongo-driver/bson/bsonrw go.mongodb.org/mongo-driver/bson/bsontype go.mongodb.org/mongo-driver/bson/primitive go.mongodb.org/mongo-driver/x/bsonx/bsoncore +# go.uber.org/ratelimit v0.2.0 +## explicit +go.uber.org/ratelimit # golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 ## explicit golang.org/x/crypto/acme