Improve DockerHub adapter performance

Signed-off-by: cd1989 <chende@caicloud.io>
This commit is contained in:
cd1989 2019-07-31 19:01:44 +08:00
parent 8ae1fe4726
commit 5f29d9a3b7
2 changed files with 94 additions and 51 deletions

View File

@ -27,6 +27,7 @@ import (
// const definition
const (
UserAgentReplication = "harbor-replication-service"
MaxConcurrency = 100
)
var registry = map[model.RegistryType]Factory{}

View File

@ -8,7 +8,9 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/adapter/native"
@ -246,68 +248,108 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
log.Debugf("got %d repositories for namespace %s", n, ns)
}
var resources []*model.Resource
// TODO(ChenDe): Get tags for repos in parallel
for _, repo := range repos {
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, name)
if err != nil {
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
}
if !m {
continue
}
}
var resources = make([]*model.Resource, len(repos))
var wg sync.WaitGroup
var stopped = make(chan struct{})
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped)
var tags []string
page := 1
pageSize := 100
for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil {
return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
}
for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name)
if err != nil {
return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
}
for i, r := range repos {
go func(index int, repo Repo) {
wg.Add(1)
defer func() {
wg.Done()
}()
if !m {
continue
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
}
defer func() {
passportsPool.Revoke()
}()
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
log.Infof("Routine started to collect tags for repo: %s", name)
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, name)
if err != nil {
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
return
}
if !m {
return
}
tags = append(tags, t.Name)
}
if len(pageTags.Next) == 0 {
break
var tags []string
page := 1
pageSize := 100
for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil {
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
return
}
for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name)
if err != nil {
if !utils.IsChannelClosed(stopped) {
close(stopped)
}
log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
return
}
if !m {
continue
}
}
tags = append(tags, t.Name)
}
if len(pageTags.Next) == 0 {
break
}
page++
}
page++
}
// If the repo has no tags, skip it
if len(tags) == 0 {
continue
}
if len(tags) == 0 {
resources[index] = nil
} else {
resources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: name,
},
Vtags: tags,
},
}
}
}(i, r)
}
wg.Wait()
resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: name,
},
Vtags: tags,
},
})
var filtered []*model.Resource
for _, r := range resources {
if r != nil {
filtered = append(filtered, r)
}
}
return resources, nil
return filtered, nil
}
func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) {