This commit is contained in:
Jirka Vrba 2024-04-29 18:45:49 +02:00 committed by GitHub
commit cbd6b542dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 57 additions and 6 deletions

View File

@ -21,7 +21,9 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/lib/log"
@ -124,6 +126,55 @@ func getAdapterInfo() *model.AdapterPattern {
return info
}
// Rate-limit aware wrapper function for client.Do()
// - Avoids being hit by limit by pausing requests when less than 'lowMark' requests remaining.
// - Pauses for given time when limit is hit.
// - Allows 2 more attempts before giving up.
// Reason: Observed (02/2024) penalty for hitting the limit is 120s, normal reset is 60s,
// so it is better to not hit the wall.
func (a *adapter) limitAwareDo(method string, path string, body io.Reader) (*http.Response, error) {
const lowMark = 8
var attemptsLeft = 3
for attemptsLeft > 0 {
clientResp, clientErr := a.client.Do(method, path, body)
if clientErr != nil {
return clientResp, clientErr
}
if clientResp.StatusCode != http.StatusTooManyRequests {
reqsLeft, err := strconv.ParseInt(clientResp.Header.Get("x-ratelimit-remaining"), 10, 64)
if err != nil {
return clientResp, clientErr
}
if reqsLeft < lowMark {
resetTSC, err := strconv.ParseInt(clientResp.Header.Get("x-ratelimit-reset"), 10, 64)
if err == nil {
dur := time.Until(time.Unix(resetTSC, 0))
log.Infof("Rate-limit exhaustion eminent, sleeping for %.1f seconds", dur.Seconds())
time.Sleep(dur)
log.Info("Sleep finished, resuming operation")
}
}
return clientResp, clientErr
}
var dur = time.Duration(0)
seconds, err := strconv.ParseInt(clientResp.Header.Get("retry-after"), 10, 64)
if err != nil {
expireTime, err := http.ParseTime(clientResp.Header.Get("retry-after"))
if err != nil {
return nil, errors.New("blocked by dockerhub rate-limit and missing retry-after header")
}
dur = time.Until(expireTime)
} else {
dur = time.Duration(seconds) * time.Second
}
log.Infof("Rate-limit exhausted, sleeping for %.1f seconds", dur.Seconds())
time.Sleep(dur)
log.Info("Sleep finished, resuming operation")
attemptsLeft--
}
return nil, errors.New("unable to get past dockerhub rate-limit")
}
// PrepareForPush does the prepare work that needed for pushing/uploading the resource
// eg: create the namespace or repository
func (a *adapter) PrepareForPush(resources []*model.Resource) error {
@ -159,7 +210,7 @@ func (a *adapter) PrepareForPush(resources []*model.Resource) error {
}
func (a *adapter) listNamespaces() ([]string, error) {
resp, err := a.client.Do(http.MethodGet, listNamespacePath, nil)
resp, err := a.limitAwareDo(http.MethodGet, listNamespacePath, nil)
if err != nil {
return nil, err
}
@ -207,7 +258,7 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
return err
}
resp, err := a.client.Do(http.MethodPost, createNamespacePath, bytes.NewReader(b))
resp, err := a.limitAwareDo(http.MethodPost, createNamespacePath, bytes.NewReader(b))
if err != nil {
return err
}
@ -228,7 +279,7 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
// getNamespace get namespace from DockerHub, if the namespace not found, two nil would be returned.
func (a *adapter) getNamespace(namespace string) (*model.Namespace, error) {
resp, err := a.client.Do(http.MethodGet, getNamespacePath(namespace), nil)
resp, err := a.limitAwareDo(http.MethodGet, getNamespacePath(namespace), nil)
if err != nil {
return nil, err
}
@ -389,7 +440,7 @@ func (a *adapter) DeleteManifest(repository, reference string) error {
return fmt.Errorf("dockerhub only support repo in format <namespace>/<name>, but got: %s", repository)
}
resp, err := a.client.Do(http.MethodDelete, deleteTagPath(parts[0], parts[1], reference), nil)
resp, err := a.limitAwareDo(http.MethodDelete, deleteTagPath(parts[0], parts[1], reference), nil)
if err != nil {
return err
}
@ -410,7 +461,7 @@ func (a *adapter) DeleteManifest(repository, reference string) error {
// getRepos gets a page of repos from DockerHub
func (a *adapter) getRepos(namespace, name string, page, pageSize int) (*ReposResp, error) {
resp, err := a.client.Do(http.MethodGet, listReposPath(namespace, name, page, pageSize), nil)
resp, err := a.limitAwareDo(http.MethodGet, listReposPath(namespace, name, page, pageSize), nil)
if err != nil {
return nil, err
}
@ -437,7 +488,7 @@ func (a *adapter) getRepos(namespace, name string, page, pageSize int) (*ReposRe
// getTags gets a page of tags for a repo from DockerHub
func (a *adapter) getTags(namespace, repo string, page, pageSize int) (*TagsResp, error) {
resp, err := a.client.Do(http.MethodGet, listTagsPath(namespace, repo, page, pageSize), nil)
resp, err := a.limitAwareDo(http.MethodGet, listTagsPath(namespace, repo, page, pageSize), nil)
if err != nil {
return nil, err
}