From 5f29d9a3b77317620c0cda1ab1e0b010f5d1dd39 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 31 Jul 2019 19:01:44 +0800 Subject: [PATCH] Improve DockerHub adapter performance Signed-off-by: cd1989 --- src/replication/adapter/adapter.go | 1 + src/replication/adapter/dockerhub/adapter.go | 144 ++++++++++++------- 2 files changed, 94 insertions(+), 51 deletions(-) diff --git a/src/replication/adapter/adapter.go b/src/replication/adapter/adapter.go index 5e944ac7e..81c49f0a9 100644 --- a/src/replication/adapter/adapter.go +++ b/src/replication/adapter/adapter.go @@ -27,6 +27,7 @@ import ( // const definition const ( UserAgentReplication = "harbor-replication-service" + MaxConcurrency = 100 ) var registry = map[model.RegistryType]Factory{} diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 908b62843..6d36abd0f 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -8,7 +8,9 @@ import ( "io/ioutil" "net/http" "strings" + "sync" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/adapter/native" @@ -246,68 +248,108 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error log.Debugf("got %d repositories for namespace %s", n, ns) } - var resources []*model.Resource - // TODO(ChenDe): Get tags for repos in parallel - for _, repo := range repos { - name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) - // If name filter set, skip repos that don't match the filter pattern. - if len(nameFilter) != 0 { - m, err := util.Match(nameFilter, name) - if err != nil { - return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) - } - if !m { - continue - } - } + var resources = make([]*model.Resource, len(repos)) + var wg sync.WaitGroup + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) - var tags []string - page := 1 - pageSize := 100 - for { - pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) - if err != nil { - return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) - } - for _, t := range pageTags.Tags { - // If tag filter set, skip tags that don't match the filter pattern. - if len(tagFilter) != 0 { - m, err := util.Match(tagFilter, t.Name) - if err != nil { - return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) - } + for i, r := range repos { + go func(index int, repo Repo) { + wg.Add(1) + defer func() { + wg.Done() + }() - if !m { - continue + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return + } + defer func() { + passportsPool.Revoke() + }() + + name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) + log.Infof("Routine started to collect tags for repo: %s", name) + + // If name filter set, skip repos that don't match the filter pattern. + if len(nameFilter) != 0 { + m, err := util.Match(nameFilter, name) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) } + log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) + return + } + if !m { + return } - tags = append(tags, t.Name) } - if len(pageTags.Next) == 0 { - break + var tags []string + page := 1 + pageSize := 100 + for { + pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) + return + } + for _, t := range pageTags.Tags { + // If tag filter set, skip tags that don't match the filter pattern. + if len(tagFilter) != 0 { + m, err := util.Match(tagFilter, t.Name) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) + return + } + + if !m { + continue + } + } + tags = append(tags, t.Name) + } + + if len(pageTags.Next) == 0 { + break + } + page++ } - page++ - } - // If the repo has no tags, skip it - if len(tags) == 0 { - continue - } + if len(tags) == 0 { + resources[index] = nil + } else { + resources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: name, + }, + Vtags: tags, + }, + } + } + }(i, r) + } + wg.Wait() - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: name, - }, - Vtags: tags, - }, - }) + var filtered []*model.Resource + for _, r := range resources { + if r != nil { + filtered = append(filtered, r) + } } - return resources, nil + return filtered, nil } func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) {