fix quota sync issues

1, fix #8858, add retry to ping backend service
2, fix #8859, split the blobs data when larger then 65535

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
wang yan 2019-08-28 16:24:27 +08:00
parent 3f5ecd1f9c
commit 942e793f20
5 changed files with 71 additions and 8 deletions

View File

@ -34,6 +34,7 @@ func AddBlobToProject(blobID, projectID int64) (int64, error) {
}
// AddBlobsToProject ...
// Note: pq has limitation on support parameters, the maximum length of blobs is 65535
func AddBlobsToProject(projectID int64, blobs ...*models.Blob) (int64, error) {
if len(blobs) == 0 {
return 0, nil

View File

@ -41,6 +41,29 @@ func TestAddBlobToProject(t *testing.T) {
require.Nil(t, err)
}
func TestAddBlobsToProject(t *testing.T) {
var blobs []*models.Blob
pid, err := AddProject(models.Project{
Name: "TestAddBlobsToProject_project1",
OwnerID: 1,
})
require.Nil(t, err)
for i := 0; i < 8888; i++ {
blob := &models.Blob{
Digest: digest.FromString(utils.GenerateRandomString()).String(),
Size: 100,
}
_, err := AddBlob(blob)
require.Nil(t, err)
blobs = append(blobs, blob)
}
cnt, err := AddBlobsToProject(pid, blobs...)
require.Nil(t, err)
require.Equal(t, cnt, int64(8888))
}
func TestHasBlobInProject(t *testing.T) {
_, blob, err := GetOrCreateBlob(&models.Blob{
Digest: digest.FromString(utils.GenerateRandomString()).String(),

View File

@ -52,7 +52,7 @@ var (
// Ping ...
func (rm *Migrator) Ping() error {
return api.HealthCheckerRegistry["chartmuseum"].Check()
return quota.Check(api.HealthCheckerRegistry["chartmuseum"].Check)
}
// Dump ...

View File

@ -22,7 +22,9 @@ import (
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr"
"github.com/goharbor/harbor/src/pkg/types"
"math/rand"
"strconv"
"time"
)
// QuotaMigrator ...
@ -78,6 +80,7 @@ func Register(name string, adapter Instance) {
// Sync ...
func Sync(pm promgr.ProjectManager, populate bool) error {
rand.Seed(time.Now().UnixNano())
totalUsage := make(map[string][]ProjectUsage)
for name, instanceFunc := range adapters {
if !config.WithChartMuseum() {
@ -86,22 +89,31 @@ func Sync(pm promgr.ProjectManager, populate bool) error {
}
}
adapter := instanceFunc(pm)
log.Infof("[Quota-Sync]:: start to ping server ... [%s]", name)
if err := adapter.Ping(); err != nil {
log.Infof("[Quota-Sync]:: fail to ping server ... [%s], quit sync ...", name)
return err
}
log.Infof("[Quota-Sync]:: success to ping server ... [%s]", name)
log.Infof("[Quota-Sync]:: start to dump data from server ... [%s]", name)
data, err := adapter.Dump()
if err != nil {
log.Infof("[Quota-Sync]:: fail to dump data from server ... [%s], quit sync ...", name)
return err
}
log.Infof("[Quota-Sync]:: success to dump data from server ... [%s]", name)
usage, err := adapter.Usage(data)
if err != nil {
return err
}
totalUsage[name] = usage
if populate {
log.Infof("[Quota-Sync]:: start to persist data for server ... [%s]", name)
if err := adapter.Persist(data); err != nil {
log.Infof("[Quota-Sync]:: fail to persist data from server ... [%s], quit sync ...", name)
return err
}
log.Infof("[Quota-Sync]:: success to persist data for server ... [%s]", name)
}
}
merged := mergeUsage(totalUsage)
@ -111,6 +123,11 @@ func Sync(pm promgr.ProjectManager, populate bool) error {
return nil
}
// Check ...
func Check(f func() error) error {
return retry(10, 2*time.Second, f)
}
// mergeUsage merges the usage of adapters
func mergeUsage(total map[string][]ProjectUsage) []ProjectUsage {
if !config.WithChartMuseum() {
@ -171,3 +188,14 @@ func ensureQuota(usages []ProjectUsage) error {
}
return nil
}
func retry(attempts int, sleep time.Duration, f func() error) error {
if err := f(); err != nil {
if attempts--; attempts > 0 {
time.Sleep(sleep)
return retry(attempts, sleep, f)
}
return err
}
return nil
}

View File

@ -48,7 +48,7 @@ func NewRegistryMigrator(pm promgr.ProjectManager) quota.QuotaMigrator {
// Ping ...
func (rm *Migrator) Ping() error {
return api.HealthCheckerRegistry["registry"].Check()
return quota.Check(api.HealthCheckerRegistry["registry"].Check)
}
// Dump ...
@ -182,7 +182,9 @@ func (rm *Migrator) Usage(projects []quota.ProjectInfo) ([]quota.ProjectUsage, e
// Persist ...
func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
for _, project := range projects {
total := len(projects)
for i, project := range projects {
log.Infof("[Quota-Sync]:: start to persist artifact&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
for _, repo := range project.Repos {
if err := persistAf(repo.Afs); err != nil {
return err
@ -194,6 +196,7 @@ func (rm *Migrator) Persist(projects []quota.ProjectInfo) error {
return err
}
}
log.Infof("[Quota-Sync]:: success to persist artifact&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
}
if err := persistPB(projects); err != nil {
return err
@ -250,7 +253,9 @@ func persistBlob(blobs []*models.Blob) error {
}
func persistPB(projects []quota.ProjectInfo) error {
for _, project := range projects {
total := len(projects)
for i, project := range projects {
log.Infof("[Quota-Sync]:: start to persist project&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
var blobs = make(map[string]int64)
var blobsOfPro []*models.Blob
for _, repo := range project.Repos {
@ -275,11 +280,17 @@ func persistPB(projects []quota.ProjectInfo) error {
log.Error(err)
return err
}
_, err = dao.AddBlobsToProject(pro.ProjectID, blobsOfPro...)
if err != nil {
log.Error(err)
return err
for _, blobOfPro := range blobsOfPro {
_, err = dao.AddBlobToProject(blobOfPro.ID, pro.ProjectID)
if err != nil {
log.Error(err)
if err == dao.ErrDupRows {
continue
}
return err
}
}
log.Infof("[Quota-Sync]:: success to persist project&blob for project: %s, progress... [%d/%d]", project.Name, i, total)
}
return nil
}