Improve performance for other registry adapters

Signed-off-by: cd1989 <chende@caicloud.io>
This commit is contained in:
cd1989 2019-07-31 23:05:36 +08:00
parent 254aa7e2b7
commit 1f541c890c
5 changed files with 296 additions and 101 deletions

View File

@ -0,0 +1,61 @@
package utils
// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool
// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users
// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish
// the task, the passport must be revoked.
type PassportsPool interface {
// Apply applies a passport from the pool.
Apply() bool
// Revoke revokes a passport to the pool
Revoke() bool
}
type passportsPool struct {
passports chan struct{}
stopped chan struct{}
}
// NewPassportsPool creates a passports pool with given size
func NewPassportsPool(size int, stopped chan struct{}) PassportsPool {
return &passportsPool{
passports: make(chan struct{}, size),
stopped: stopped,
}
}
// Apply applies a passport from the pool. Returning value 'true' means passport acquired
// successfully. If no available passports in the pool, 'Apply' will wait for it. If the
// all passports in the pool are turned into invalid by the 'stopped' channel, then false
// is returned, means no more passports will be dispatched.
func (p *passportsPool) Apply() bool {
select {
case p.passports <- struct{}{}:
return true
case <-p.stopped:
return false
}
}
// Revoke revokes a passport to the pool. Returning value 'true' means passport revoked
// successfully, otherwise 'Revoke' will wait. If pool turns into invalid by 'stopped' channel
// false will be returned.
func (p *passportsPool) Revoke() bool {
select {
case <-p.passports:
return true
case <-p.stopped:
return false
}
}
// IsChannelClosed ...
func IsChannelClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}

View File

@ -7,15 +7,16 @@ import (
"net/http" "net/http"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sync"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cr" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr"
"github.com/goharbor/harbor/src/common/utils/registry/auth" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/replication/adapter/native"
"github.com/goharbor/harbor/src/replication/util"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
adp "github.com/goharbor/harbor/src/replication/adapter" adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/adapter/native"
"github.com/goharbor/harbor/src/replication/model" "github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/util"
) )
func init() { func init() {
@ -157,42 +158,83 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou
} }
log.Debugf("FetchImages.repositories: %#v\n", repositories) log.Debugf("FetchImages.repositories: %#v\n", repositories)
// list tags rawResources := make([]*model.Resource, len(repositories))
for _, repository := range repositories { var wg = new(sync.WaitGroup)
var tags []string var stopped = make(chan struct{})
tags, err = a.getTags(repository, client) var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped)
if err != nil {
return
}
var filterTags []string for i, r := range repositories {
if tagsPattern != "" { wg.Add(1)
for _, tag := range tags { go func(index int, repo aliRepo) {
var ok bool defer func() {
ok, err = util.Match(tagsPattern, tag) wg.Done()
if err != nil { }()
return
} // Return false means no passport acquired, and no valid passport will be dispatched any more.
if ok { // For example, some crucial errors happened and all tasks should be cancelled.
filterTags = append(filterTags, tag) if ok := passportsPool.Apply(); !ok {
} return
} }
} else { defer func() {
filterTags = tags passportsPool.Revoke()
} }()
if len(filterTags) > 0 { var tags []string
resources = append(resources, &model.Resource{ tags, err = a.getTags(repo, client)
Type: model.ResourceTypeImage, if err != nil {
Registry: a.registry, log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err)
Metadata: &model.ResourceMetadata{ if !utils.IsChannelClosed(stopped) {
Repository: &model.Repository{ close(stopped)
Name: filepath.Join(repository.RepoNamespace, repository.RepoName), }
return
}
var filterTags []string
if tagsPattern != "" {
for _, tag := range tags {
var ok bool
ok, err = util.Match(tagsPattern, tag)
if err != nil {
log.Errorf("Match tag '%s' error: %v", tag, err)
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
return
}
if ok {
filterTags = append(filterTags, tag)
}
}
} else {
filterTags = tags
}
if len(filterTags) > 0 {
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: filepath.Join(repo.RepoNamespace, repo.RepoName),
},
Vtags: filterTags,
Labels: []string{},
}, },
Vtags: filterTags, }
Labels: []string{}, } else {
}, rawResources[index] = nil
}) }
}(i, r)
}
wg.Wait()
if utils.IsChannelClosed(stopped) {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
} }
} }

View File

@ -248,7 +248,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
log.Debugf("got %d repositories for namespace %s", n, ns) log.Debugf("got %d repositories for namespace %s", n, ns)
} }
var resources = make([]*model.Resource, len(repos)) var rawResources = make([]*model.Resource, len(repos))
var wg = new(sync.WaitGroup) var wg = new(sync.WaitGroup)
var stopped = make(chan struct{}) var stopped = make(chan struct{})
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped)
@ -325,9 +325,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
if len(tags) == 0 { if len(tags) == 0 {
resources[index] = nil rawResources[index] = nil
} else { } else {
resources[index] = &model.Resource{ rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage, Type: model.ResourceTypeImage,
Registry: a.registry, Registry: a.registry,
Metadata: &model.ResourceMetadata{ Metadata: &model.ResourceMetadata{
@ -342,14 +342,18 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
wg.Wait() wg.Wait()
var filtered []*model.Resource if utils.IsChannelClosed(stopped) {
for _, r := range resources { return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
var resources []*model.Resource
for _, r := range rawResources {
if r != nil { if r != nil {
filtered = append(filtered, r) resources = append(resources, r)
} }
} }
return filtered, nil return resources, nil
} }
func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) { func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) {

View File

@ -17,7 +17,9 @@ package harbor
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/adapter" adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/model" "github.com/goharbor/harbor/src/replication/model"
@ -29,6 +31,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
resources := []*model.Resource{} resources := []*model.Resource{}
for _, project := range projects { for _, project := range projects {
repositories, err := a.getRepositories(project.ID) repositories, err := a.getRepositories(project.ID)
@ -43,37 +46,80 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
return nil, err return nil, err
} }
} }
for _, repository := range repositories {
vTags, err := a.getTags(repository.Name) rawResources := make([]*model.Resource, len(repositories))
if err != nil { var wg = new(sync.WaitGroup)
return nil, err var stopped = make(chan struct{})
} var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped)
if len(vTags) == 0 {
continue for i, r := range repositories {
} wg.Add(1)
for _, filter := range filters { go func(index int, repo *adp.Repository) {
if err = filter.DoFilter(&vTags); err != nil { defer func() {
return nil, err wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
} }
} defer func() {
if len(vTags) == 0 { passportsPool.Revoke()
continue }()
}
tags := []string{} vTags, err := a.getTags(repo.Name)
for _, vTag := range vTags { if err != nil {
tags = append(tags, vTag.Name) log.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
} if !utils.IsChannelClosed(stopped) {
resources = append(resources, &model.Resource{ close(stopped)
Type: model.ResourceTypeImage, }
Registry: a.registry, return
Metadata: &model.ResourceMetadata{ }
Repository: &model.Repository{ if len(vTags) == 0 {
Name: repository.Name, rawResources[index] = nil
Metadata: project.Metadata, return
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
log.Errorf("Filter tags %v error: %v", vTags, err)
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
return
}
}
if len(vTags) == 0 {
rawResources[index] = nil
return
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repo.Name,
Metadata: project.Metadata,
},
Vtags: tags,
}, },
Vtags: tags, }
}, }(i, r)
}) }
wg.Wait()
if utils.IsChannelClosed(stopped) {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
} }
} }

View File

@ -15,6 +15,7 @@
package native package native
import ( import (
"fmt"
"io" "io"
"net/http" "net/http"
"strings" "strings"
@ -24,6 +25,7 @@ import (
"github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/manifest/schema1"
"github.com/goharbor/harbor/src/common/http/modifier" "github.com/goharbor/harbor/src/common/http/modifier"
common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth" "github.com/goharbor/harbor/src/common/utils/registry/auth"
@ -159,37 +161,77 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
} }
var resources []*model.Resource rawResources := make([]*model.Resource, len(repositories))
for _, repository := range repositories { var wg = new(sync.WaitGroup)
vTags, err := a.getVTags(repository.Name) var stopped = make(chan struct{})
if err != nil { var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped)
return nil, err
} for i, r := range repositories {
if len(vTags) == 0 { wg.Add(1)
continue go func(index int, repo *adp.Repository) {
} defer func() {
for _, filter := range filters { wg.Done()
if err = filter.DoFilter(&vTags); err != nil { }()
return nil, err
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
} }
} defer func() {
if len(vTags) == 0 { passportsPool.Revoke()
continue }()
}
tags := []string{} vTags, err := a.getVTags(repo.Name)
for _, vTag := range vTags { if err != nil {
tags = append(tags, vTag.Name) log.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
} if !utils.IsChannelClosed(stopped) {
resources = append(resources, &model.Resource{ close(stopped)
Type: model.ResourceTypeImage, }
Registry: a.registry, return
Metadata: &model.ResourceMetadata{ }
Repository: &model.Repository{ if len(vTags) == 0 {
Name: repository.Name, return
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
log.Errorf("Filter tags %v error: %v", vTags, err)
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
return
}
}
if len(vTags) == 0 {
return
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repo.Name,
},
Vtags: tags,
}, },
Vtags: tags, }
}, }(i, r)
}) }
wg.Wait()
if utils.IsChannelClosed(stopped) {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
var resources []*model.Resource
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
} }
return resources, nil return resources, nil