From 1f541c890c1d0c4988910f5e9395dbcda3ccfb29 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 31 Jul 2019 23:05:36 +0800 Subject: [PATCH] Improve performance for other registry adapters Signed-off-by: cd1989 --- src/common/utils/passports.go | 61 ++++++++++ src/replication/adapter/aliacr/adapter.go | 114 ++++++++++++------ src/replication/adapter/dockerhub/adapter.go | 18 +-- .../adapter/harbor/image_registry.go | 104 +++++++++++----- src/replication/adapter/native/adapter.go | 100 ++++++++++----- 5 files changed, 296 insertions(+), 101 deletions(-) create mode 100644 src/common/utils/passports.go diff --git a/src/common/utils/passports.go b/src/common/utils/passports.go new file mode 100644 index 000000000..9f2cafb52 --- /dev/null +++ b/src/common/utils/passports.go @@ -0,0 +1,61 @@ +package utils + +// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool +// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users +// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish +// the task, the passport must be revoked. +type PassportsPool interface { + // Apply applies a passport from the pool. + Apply() bool + // Revoke revokes a passport to the pool + Revoke() bool +} + +type passportsPool struct { + passports chan struct{} + stopped chan struct{} +} + +// NewPassportsPool creates a passports pool with given size +func NewPassportsPool(size int, stopped chan struct{}) PassportsPool { + return &passportsPool{ + passports: make(chan struct{}, size), + stopped: stopped, + } +} + +// Apply applies a passport from the pool. Returning value 'true' means passport acquired +// successfully. If no available passports in the pool, 'Apply' will wait for it. If the +// all passports in the pool are turned into invalid by the 'stopped' channel, then false +// is returned, means no more passports will be dispatched. +func (p *passportsPool) Apply() bool { + select { + case p.passports <- struct{}{}: + return true + case <-p.stopped: + return false + } +} + +// Revoke revokes a passport to the pool. Returning value 'true' means passport revoked +// successfully, otherwise 'Revoke' will wait. If pool turns into invalid by 'stopped' channel +// false will be returned. +func (p *passportsPool) Revoke() bool { + select { + case <-p.passports: + return true + case <-p.stopped: + return false + } +} + +// IsChannelClosed ... +func IsChannelClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + } + + return false +} diff --git a/src/replication/adapter/aliacr/adapter.go b/src/replication/adapter/aliacr/adapter.go index 7b043942f..7ca148bd0 100644 --- a/src/replication/adapter/aliacr/adapter.go +++ b/src/replication/adapter/aliacr/adapter.go @@ -7,15 +7,16 @@ import ( "net/http" "path/filepath" "regexp" + "sync" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/replication/adapter/native" - "github.com/goharbor/harbor/src/replication/util" - + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/common/utils/registry/auth" adp "github.com/goharbor/harbor/src/replication/adapter" + "github.com/goharbor/harbor/src/replication/adapter/native" "github.com/goharbor/harbor/src/replication/model" + "github.com/goharbor/harbor/src/replication/util" ) func init() { @@ -157,42 +158,83 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou } log.Debugf("FetchImages.repositories: %#v\n", repositories) - // list tags - for _, repository := range repositories { - var tags []string - tags, err = a.getTags(repository, client) - if err != nil { - return - } + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) - var filterTags []string - if tagsPattern != "" { - for _, tag := range tags { - var ok bool - ok, err = util.Match(tagsPattern, tag) - if err != nil { - return - } - if ok { - filterTags = append(filterTags, tag) - } + for i, r := range repositories { + wg.Add(1) + go func(index int, repo aliRepo) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } else { - filterTags = tags - } + defer func() { + passportsPool.Revoke() + }() - if len(filterTags) > 0 { - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: filepath.Join(repository.RepoNamespace, repository.RepoName), + var tags []string + tags, err = a.getTags(repo, client) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + + var filterTags []string + if tagsPattern != "" { + for _, tag := range tags { + var ok bool + ok, err = util.Match(tagsPattern, tag) + if err != nil { + log.Errorf("Match tag '%s' error: %v", tag, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if ok { + filterTags = append(filterTags, tag) + } + } + } else { + filterTags = tags + } + + if len(filterTags) > 0 { + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: filepath.Join(repo.RepoNamespace, repo.RepoName), + }, + Vtags: filterTags, + Labels: []string{}, }, - Vtags: filterTags, - Labels: []string{}, - }, - }) + } + } else { + rawResources[index] = nil + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) } } diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index c4de449aa..21ad04a22 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -248,7 +248,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error log.Debugf("got %d repositories for namespace %s", n, ns) } - var resources = make([]*model.Resource, len(repos)) + var rawResources = make([]*model.Resource, len(repos)) var wg = new(sync.WaitGroup) var stopped = make(chan struct{}) var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) @@ -325,9 +325,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } if len(tags) == 0 { - resources[index] = nil + rawResources[index] = nil } else { - resources[index] = &model.Resource{ + rawResources[index] = &model.Resource{ Type: model.ResourceTypeImage, Registry: a.registry, Metadata: &model.ResourceMetadata{ @@ -342,14 +342,18 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } wg.Wait() - var filtered []*model.Resource - for _, r := range resources { + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { if r != nil { - filtered = append(filtered, r) + resources = append(resources, r) } } - return filtered, nil + return resources, nil } func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) { diff --git a/src/replication/adapter/harbor/image_registry.go b/src/replication/adapter/harbor/image_registry.go index a278bb099..f65b7ea63 100644 --- a/src/replication/adapter/harbor/image_registry.go +++ b/src/replication/adapter/harbor/image_registry.go @@ -17,7 +17,9 @@ package harbor import ( "fmt" "strings" + "sync" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/model" @@ -29,6 +31,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if err != nil { return nil, err } + resources := []*model.Resource{} for _, project := range projects { repositories, err := a.getRepositories(project.ID) @@ -43,37 +46,80 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error return nil, err } } - for _, repository := range repositories { - vTags, err := a.getTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + + for i, r := range repositories { + wg.Add(1) + go func(index int, repo *adp.Repository) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, - Metadata: project.Metadata, + defer func() { + passportsPool.Revoke() + }() + + vTags, err := a.getTags(repo.Name) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if len(vTags) == 0 { + rawResources[index] = nil + return + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + log.Errorf("Filter tags %v error: %v", vTags, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + } + if len(vTags) == 0 { + rawResources[index] = nil + return + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + Metadata: project.Metadata, + }, + Vtags: tags, }, - Vtags: tags, - }, - }) + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } } } diff --git a/src/replication/adapter/native/adapter.go b/src/replication/adapter/native/adapter.go index 46a8731d6..6e710b6bb 100644 --- a/src/replication/adapter/native/adapter.go +++ b/src/replication/adapter/native/adapter.go @@ -15,6 +15,7 @@ package native import ( + "fmt" "io" "net/http" "strings" @@ -24,6 +25,7 @@ import ( "github.com/docker/distribution/manifest/schema1" "github.com/goharbor/harbor/src/common/http/modifier" common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" @@ -159,37 +161,77 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } } - var resources []*model.Resource - for _, repository := range repositories { - vTags, err := a.getVTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + + for i, r := range repositories { + wg.Add(1) + go func(index int, repo *adp.Repository) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, + defer func() { + passportsPool.Revoke() + }() + + vTags, err := a.getVTags(repo.Name) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if len(vTags) == 0 { + return + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + log.Errorf("Filter tags %v error: %v", vTags, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + } + if len(vTags) == 0 { + return + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + }, + Vtags: tags, }, - Vtags: tags, - }, - }) + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } } return resources, nil