From 5f29d9a3b77317620c0cda1ab1e0b010f5d1dd39 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 31 Jul 2019 19:01:44 +0800 Subject: [PATCH 1/5] Improve DockerHub adapter performance Signed-off-by: cd1989 --- src/replication/adapter/adapter.go | 1 + src/replication/adapter/dockerhub/adapter.go | 144 ++++++++++++------- 2 files changed, 94 insertions(+), 51 deletions(-) diff --git a/src/replication/adapter/adapter.go b/src/replication/adapter/adapter.go index 5e944ac7e..81c49f0a9 100644 --- a/src/replication/adapter/adapter.go +++ b/src/replication/adapter/adapter.go @@ -27,6 +27,7 @@ import ( // const definition const ( UserAgentReplication = "harbor-replication-service" + MaxConcurrency = 100 ) var registry = map[model.RegistryType]Factory{} diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 908b62843..6d36abd0f 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -8,7 +8,9 @@ import ( "io/ioutil" "net/http" "strings" + "sync" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/adapter/native" @@ -246,68 +248,108 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error log.Debugf("got %d repositories for namespace %s", n, ns) } - var resources []*model.Resource - // TODO(ChenDe): Get tags for repos in parallel - for _, repo := range repos { - name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) - // If name filter set, skip repos that don't match the filter pattern. - if len(nameFilter) != 0 { - m, err := util.Match(nameFilter, name) - if err != nil { - return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) - } - if !m { - continue - } - } + var resources = make([]*model.Resource, len(repos)) + var wg sync.WaitGroup + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) - var tags []string - page := 1 - pageSize := 100 - for { - pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) - if err != nil { - return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) - } - for _, t := range pageTags.Tags { - // If tag filter set, skip tags that don't match the filter pattern. - if len(tagFilter) != 0 { - m, err := util.Match(tagFilter, t.Name) - if err != nil { - return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) - } + for i, r := range repos { + go func(index int, repo Repo) { + wg.Add(1) + defer func() { + wg.Done() + }() - if !m { - continue + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return + } + defer func() { + passportsPool.Revoke() + }() + + name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) + log.Infof("Routine started to collect tags for repo: %s", name) + + // If name filter set, skip repos that don't match the filter pattern. + if len(nameFilter) != 0 { + m, err := util.Match(nameFilter, name) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) } + log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) + return + } + if !m { + return } - tags = append(tags, t.Name) } - if len(pageTags.Next) == 0 { - break + var tags []string + page := 1 + pageSize := 100 + for { + pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) + return + } + for _, t := range pageTags.Tags { + // If tag filter set, skip tags that don't match the filter pattern. + if len(tagFilter) != 0 { + m, err := util.Match(tagFilter, t.Name) + if err != nil { + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) + return + } + + if !m { + continue + } + } + tags = append(tags, t.Name) + } + + if len(pageTags.Next) == 0 { + break + } + page++ } - page++ - } - // If the repo has no tags, skip it - if len(tags) == 0 { - continue - } + if len(tags) == 0 { + resources[index] = nil + } else { + resources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: name, + }, + Vtags: tags, + }, + } + } + }(i, r) + } + wg.Wait() - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: name, - }, - Vtags: tags, - }, - }) + var filtered []*model.Resource + for _, r := range resources { + if r != nil { + filtered = append(filtered, r) + } } - return resources, nil + return filtered, nil } func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) { From 254aa7e2b7b90e3b96bec11a0d05109a3d7c1375 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 31 Jul 2019 22:34:36 +0800 Subject: [PATCH 2/5] Fix DockerHub adapter registry URL problem Signed-off-by: cd1989 --- src/replication/adapter/dockerhub/adapter.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 6d36abd0f..c4de449aa 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -27,16 +27,16 @@ func init() { } func factory(registry *model.Registry) (adp.Adapter, error) { - client, err := NewClient(&model.Registry{ - URL: baseURL, // specify the URL of Docker Hub - Credential: registry.Credential, - Insecure: registry.Insecure, - }) + client, err := NewClient(registry) if err != nil { return nil, err } - dockerRegistryAdapter, err := native.NewAdapter(registry) + dockerRegistryAdapter, err := native.NewAdapter(&model.Registry{ + URL: registryURL, + Credential: registry.Credential, + Insecure: registry.Insecure, + }) if err != nil { return nil, err } @@ -249,13 +249,13 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } var resources = make([]*model.Resource, len(repos)) - var wg sync.WaitGroup + var wg = new(sync.WaitGroup) var stopped = make(chan struct{}) var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) for i, r := range repos { + wg.Add(1) go func(index int, repo Repo) { - wg.Add(1) defer func() { wg.Done() }() From 1f541c890c1d0c4988910f5e9395dbcda3ccfb29 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 31 Jul 2019 23:05:36 +0800 Subject: [PATCH 3/5] Improve performance for other registry adapters Signed-off-by: cd1989 --- src/common/utils/passports.go | 61 ++++++++++ src/replication/adapter/aliacr/adapter.go | 114 ++++++++++++------ src/replication/adapter/dockerhub/adapter.go | 18 +-- .../adapter/harbor/image_registry.go | 104 +++++++++++----- src/replication/adapter/native/adapter.go | 100 ++++++++++----- 5 files changed, 296 insertions(+), 101 deletions(-) create mode 100644 src/common/utils/passports.go diff --git a/src/common/utils/passports.go b/src/common/utils/passports.go new file mode 100644 index 000000000..9f2cafb52 --- /dev/null +++ b/src/common/utils/passports.go @@ -0,0 +1,61 @@ +package utils + +// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool +// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users +// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish +// the task, the passport must be revoked. +type PassportsPool interface { + // Apply applies a passport from the pool. + Apply() bool + // Revoke revokes a passport to the pool + Revoke() bool +} + +type passportsPool struct { + passports chan struct{} + stopped chan struct{} +} + +// NewPassportsPool creates a passports pool with given size +func NewPassportsPool(size int, stopped chan struct{}) PassportsPool { + return &passportsPool{ + passports: make(chan struct{}, size), + stopped: stopped, + } +} + +// Apply applies a passport from the pool. Returning value 'true' means passport acquired +// successfully. If no available passports in the pool, 'Apply' will wait for it. If the +// all passports in the pool are turned into invalid by the 'stopped' channel, then false +// is returned, means no more passports will be dispatched. +func (p *passportsPool) Apply() bool { + select { + case p.passports <- struct{}{}: + return true + case <-p.stopped: + return false + } +} + +// Revoke revokes a passport to the pool. Returning value 'true' means passport revoked +// successfully, otherwise 'Revoke' will wait. If pool turns into invalid by 'stopped' channel +// false will be returned. +func (p *passportsPool) Revoke() bool { + select { + case <-p.passports: + return true + case <-p.stopped: + return false + } +} + +// IsChannelClosed ... +func IsChannelClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + } + + return false +} diff --git a/src/replication/adapter/aliacr/adapter.go b/src/replication/adapter/aliacr/adapter.go index 7b043942f..7ca148bd0 100644 --- a/src/replication/adapter/aliacr/adapter.go +++ b/src/replication/adapter/aliacr/adapter.go @@ -7,15 +7,16 @@ import ( "net/http" "path/filepath" "regexp" + "sync" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/replication/adapter/native" - "github.com/goharbor/harbor/src/replication/util" - + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/common/utils/registry/auth" adp "github.com/goharbor/harbor/src/replication/adapter" + "github.com/goharbor/harbor/src/replication/adapter/native" "github.com/goharbor/harbor/src/replication/model" + "github.com/goharbor/harbor/src/replication/util" ) func init() { @@ -157,42 +158,83 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou } log.Debugf("FetchImages.repositories: %#v\n", repositories) - // list tags - for _, repository := range repositories { - var tags []string - tags, err = a.getTags(repository, client) - if err != nil { - return - } + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) - var filterTags []string - if tagsPattern != "" { - for _, tag := range tags { - var ok bool - ok, err = util.Match(tagsPattern, tag) - if err != nil { - return - } - if ok { - filterTags = append(filterTags, tag) - } + for i, r := range repositories { + wg.Add(1) + go func(index int, repo aliRepo) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } else { - filterTags = tags - } + defer func() { + passportsPool.Revoke() + }() - if len(filterTags) > 0 { - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: filepath.Join(repository.RepoNamespace, repository.RepoName), + var tags []string + tags, err = a.getTags(repo, client) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + + var filterTags []string + if tagsPattern != "" { + for _, tag := range tags { + var ok bool + ok, err = util.Match(tagsPattern, tag) + if err != nil { + log.Errorf("Match tag '%s' error: %v", tag, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if ok { + filterTags = append(filterTags, tag) + } + } + } else { + filterTags = tags + } + + if len(filterTags) > 0 { + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: filepath.Join(repo.RepoNamespace, repo.RepoName), + }, + Vtags: filterTags, + Labels: []string{}, }, - Vtags: filterTags, - Labels: []string{}, - }, - }) + } + } else { + rawResources[index] = nil + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) } } diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index c4de449aa..21ad04a22 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -248,7 +248,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error log.Debugf("got %d repositories for namespace %s", n, ns) } - var resources = make([]*model.Resource, len(repos)) + var rawResources = make([]*model.Resource, len(repos)) var wg = new(sync.WaitGroup) var stopped = make(chan struct{}) var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) @@ -325,9 +325,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } if len(tags) == 0 { - resources[index] = nil + rawResources[index] = nil } else { - resources[index] = &model.Resource{ + rawResources[index] = &model.Resource{ Type: model.ResourceTypeImage, Registry: a.registry, Metadata: &model.ResourceMetadata{ @@ -342,14 +342,18 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } wg.Wait() - var filtered []*model.Resource - for _, r := range resources { + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { if r != nil { - filtered = append(filtered, r) + resources = append(resources, r) } } - return filtered, nil + return resources, nil } func (a *adapter) listCandidateNamespaces(pattern string) ([]string, error) { diff --git a/src/replication/adapter/harbor/image_registry.go b/src/replication/adapter/harbor/image_registry.go index a278bb099..f65b7ea63 100644 --- a/src/replication/adapter/harbor/image_registry.go +++ b/src/replication/adapter/harbor/image_registry.go @@ -17,7 +17,9 @@ package harbor import ( "fmt" "strings" + "sync" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/model" @@ -29,6 +31,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if err != nil { return nil, err } + resources := []*model.Resource{} for _, project := range projects { repositories, err := a.getRepositories(project.ID) @@ -43,37 +46,80 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error return nil, err } } - for _, repository := range repositories { - vTags, err := a.getTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + + for i, r := range repositories { + wg.Add(1) + go func(index int, repo *adp.Repository) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, - Metadata: project.Metadata, + defer func() { + passportsPool.Revoke() + }() + + vTags, err := a.getTags(repo.Name) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if len(vTags) == 0 { + rawResources[index] = nil + return + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + log.Errorf("Filter tags %v error: %v", vTags, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + } + if len(vTags) == 0 { + rawResources[index] = nil + return + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + Metadata: project.Metadata, + }, + Vtags: tags, }, - Vtags: tags, - }, - }) + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } } } diff --git a/src/replication/adapter/native/adapter.go b/src/replication/adapter/native/adapter.go index 46a8731d6..6e710b6bb 100644 --- a/src/replication/adapter/native/adapter.go +++ b/src/replication/adapter/native/adapter.go @@ -15,6 +15,7 @@ package native import ( + "fmt" "io" "net/http" "strings" @@ -24,6 +25,7 @@ import ( "github.com/docker/distribution/manifest/schema1" "github.com/goharbor/harbor/src/common/http/modifier" common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" @@ -159,37 +161,77 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } } - var resources []*model.Resource - for _, repository := range repositories { - vTags, err := a.getVTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + rawResources := make([]*model.Resource, len(repositories)) + var wg = new(sync.WaitGroup) + var stopped = make(chan struct{}) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + + for i, r := range repositories { + wg.Add(1) + go func(index int, repo *adp.Repository) { + defer func() { + wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := passportsPool.Apply(); !ok { + return } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, + defer func() { + passportsPool.Revoke() + }() + + vTags, err := a.getVTags(repo.Name) + if err != nil { + log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + if len(vTags) == 0 { + return + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + log.Errorf("Filter tags %v error: %v", vTags, err) + if !utils.IsChannelClosed(stopped) { + close(stopped) + } + return + } + } + if len(vTags) == 0 { + return + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + }, + Vtags: tags, }, - Vtags: tags, - }, - }) + } + }(i, r) + } + wg.Wait() + + if utils.IsChannelClosed(stopped) { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } } return resources, nil From e2e540233bd7659108d773155257e4c9f6749313 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Thu, 1 Aug 2019 09:56:32 +0800 Subject: [PATCH 4/5] Use context for concurrency control Signed-off-by: cd1989 --- src/common/utils/passports.go | 15 ++----------- src/replication/adapter/aliacr/adapter.go | 17 +++++++-------- src/replication/adapter/dockerhub/adapter.go | 21 ++++++++----------- .../adapter/harbor/image_registry.go | 17 +++++++-------- src/replication/adapter/native/adapter.go | 17 +++++++-------- 5 files changed, 35 insertions(+), 52 deletions(-) diff --git a/src/common/utils/passports.go b/src/common/utils/passports.go index 9f2cafb52..128a75b51 100644 --- a/src/common/utils/passports.go +++ b/src/common/utils/passports.go @@ -13,11 +13,11 @@ type PassportsPool interface { type passportsPool struct { passports chan struct{} - stopped chan struct{} + stopped <-chan struct{} } // NewPassportsPool creates a passports pool with given size -func NewPassportsPool(size int, stopped chan struct{}) PassportsPool { +func NewPassportsPool(size int, stopped <-chan struct{}) PassportsPool { return &passportsPool{ passports: make(chan struct{}, size), stopped: stopped, @@ -48,14 +48,3 @@ func (p *passportsPool) Revoke() bool { return false } } - -// IsChannelClosed ... -func IsChannelClosed(ch <-chan struct{}) bool { - select { - case <-ch: - return true - default: - } - - return false -} diff --git a/src/replication/adapter/aliacr/adapter.go b/src/replication/adapter/aliacr/adapter.go index 7ca148bd0..95f92f87d 100644 --- a/src/replication/adapter/aliacr/adapter.go +++ b/src/replication/adapter/aliacr/adapter.go @@ -1,6 +1,7 @@ package aliacr import ( + "context" "encoding/json" "errors" "fmt" @@ -160,8 +161,8 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou rawResources := make([]*model.Resource, len(repositories)) var wg = new(sync.WaitGroup) - var stopped = make(chan struct{}) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + ctx, cancel := context.WithCancel(context.Background()) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) for i, r := range repositories { wg.Add(1) @@ -183,9 +184,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou tags, err = a.getTags(repo, client) if err != nil { log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } @@ -196,9 +195,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou ok, err = util.Match(tagsPattern, tag) if err != nil { log.Errorf("Match tag '%s' error: %v", tag, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } if ok { @@ -228,7 +225,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou } wg.Wait() - if utils.IsChannelClosed(stopped) { + err = ctx.Err() + cancel() + if err != nil { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 21ad04a22..363b2ea07 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -2,6 +2,7 @@ package dockerhub import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -250,8 +251,8 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error var rawResources = make([]*model.Resource, len(repos)) var wg = new(sync.WaitGroup) - var stopped = make(chan struct{}) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + ctx, cancel := context.WithCancel(context.Background()) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) for i, r := range repos { wg.Add(1) @@ -276,9 +277,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if len(nameFilter) != 0 { m, err := util.Match(nameFilter, name) if err != nil { - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) return } @@ -293,9 +292,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error for { pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) if err != nil { - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) return } @@ -304,9 +301,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if len(tagFilter) != 0 { m, err := util.Match(tagFilter, t.Name) if err != nil { - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) return } @@ -342,7 +337,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } wg.Wait() - if utils.IsChannelClosed(stopped) { + err = ctx.Err() + cancel() + if err != nil { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/harbor/image_registry.go b/src/replication/adapter/harbor/image_registry.go index f65b7ea63..1ae5f1685 100644 --- a/src/replication/adapter/harbor/image_registry.go +++ b/src/replication/adapter/harbor/image_registry.go @@ -15,6 +15,7 @@ package harbor import ( + "context" "fmt" "strings" "sync" @@ -49,8 +50,8 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error rawResources := make([]*model.Resource, len(repositories)) var wg = new(sync.WaitGroup) - var stopped = make(chan struct{}) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + ctx, cancel := context.WithCancel(context.Background()) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) for i, r := range repositories { wg.Add(1) @@ -71,9 +72,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error vTags, err := a.getTags(repo.Name) if err != nil { log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } if len(vTags) == 0 { @@ -83,9 +82,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error for _, filter := range filters { if err = filter.DoFilter(&vTags); err != nil { log.Errorf("Filter tags %v error: %v", vTags, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } } @@ -112,7 +109,9 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } wg.Wait() - if utils.IsChannelClosed(stopped) { + err = ctx.Err() + cancel() + if err != nil { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/native/adapter.go b/src/replication/adapter/native/adapter.go index 6e710b6bb..5353d08a4 100644 --- a/src/replication/adapter/native/adapter.go +++ b/src/replication/adapter/native/adapter.go @@ -15,6 +15,7 @@ package native import ( + "context" "fmt" "io" "net/http" @@ -163,8 +164,8 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error rawResources := make([]*model.Resource, len(repositories)) var wg = new(sync.WaitGroup) - var stopped = make(chan struct{}) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, stopped) + ctx, cancel := context.WithCancel(context.Background()) + var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) for i, r := range repositories { wg.Add(1) @@ -185,9 +186,7 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error vTags, err := a.getVTags(repo.Name) if err != nil { log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } if len(vTags) == 0 { @@ -196,9 +195,7 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error for _, filter := range filters { if err = filter.DoFilter(&vTags); err != nil { log.Errorf("Filter tags %v error: %v", vTags, err) - if !utils.IsChannelClosed(stopped) { - close(stopped) - } + cancel() return } } @@ -223,7 +220,9 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } wg.Wait() - if utils.IsChannelClosed(stopped) { + err = ctx.Err() + cancel() + if err != nil { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } From 870d7115c4fdabbde0e586a73b4d593cb6959ce9 Mon Sep 17 00:00:00 2001 From: cd1989 Date: Wed, 7 Aug 2019 14:13:39 +0800 Subject: [PATCH 5/5] Refactor code to extract a common task runner Signed-off-by: cd1989 --- src/common/utils/passports.go | 78 +++++++++++++++++++ src/replication/adapter/aliacr/adapter.go | 49 ++++-------- src/replication/adapter/dockerhub/adapter.go | 56 ++++--------- .../adapter/harbor/image_registry.go | 51 ++++-------- src/replication/adapter/native/adapter.go | 50 ++++-------- 5 files changed, 140 insertions(+), 144 deletions(-) diff --git a/src/common/utils/passports.go b/src/common/utils/passports.go index 128a75b51..bce88be9e 100644 --- a/src/common/utils/passports.go +++ b/src/common/utils/passports.go @@ -1,5 +1,12 @@ package utils +import ( + "context" + "sync" + + "github.com/goharbor/harbor/src/common/utils/log" +) + // PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool // is used to control the concurrency of tasks, the pool size determine the max concurrency. When users // want to start a goroutine to perform some task, they must apply a passport firstly, and after finish @@ -48,3 +55,74 @@ func (p *passportsPool) Revoke() bool { return false } } + +// LimitedConcurrentRunner is used to run tasks, but limit the max concurrency. +type LimitedConcurrentRunner interface { + // AddTask adds a task to run + AddTask(task func() error) + // Wait waits all the tasks to be finished + Wait() + // Cancel cancels all tasks, tasks that already started will continue to run + Cancel() + // IsCancelled checks whether context is cancelled. This happens when some task encountered + // critical errors. + IsCancelled() bool +} + +type limitedConcurrentRunner struct { + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + passportsPool PassportsPool +} + +// NewLimitedConcurrentRunner creates a runner +func NewLimitedConcurrentRunner(limit int) LimitedConcurrentRunner { + ctx, cancel := context.WithCancel(context.Background()) + return &limitedConcurrentRunner{ + wg: new(sync.WaitGroup), + ctx: ctx, + cancel: cancel, + passportsPool: NewPassportsPool(limit, ctx.Done()), + } +} + +// AddTask adds a task to run +func (r *limitedConcurrentRunner) AddTask(task func() error) { + r.wg.Add(1) + go func() { + defer func() { + r.wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := r.passportsPool.Apply(); !ok { + return + } + defer func() { + r.passportsPool.Revoke() + }() + + err := task() + if err != nil { + log.Errorf("%v", err) + r.cancel() + } + }() +} + +// Wait waits all the tasks to be finished +func (r *limitedConcurrentRunner) Wait() { + r.wg.Wait() +} + +// Cancel cancels all tasks, tasks that already started will continue to run +func (r *limitedConcurrentRunner) Cancel() { + r.cancel() +} + +// IsCancelled checks whether context is cancelled. This happens when some task encountered critical errors. +func (r *limitedConcurrentRunner) IsCancelled() bool { + return r.ctx.Err() != nil +} diff --git a/src/replication/adapter/aliacr/adapter.go b/src/replication/adapter/aliacr/adapter.go index 95f92f87d..386f2b3c1 100644 --- a/src/replication/adapter/aliacr/adapter.go +++ b/src/replication/adapter/aliacr/adapter.go @@ -1,14 +1,12 @@ package aliacr import ( - "context" "encoding/json" "errors" "fmt" "net/http" "path/filepath" "regexp" - "sync" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr" "github.com/goharbor/harbor/src/common/utils" @@ -159,33 +157,18 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou } log.Debugf("FetchImages.repositories: %#v\n", repositories) - rawResources := make([]*model.Resource, len(repositories)) - var wg = new(sync.WaitGroup) - ctx, cancel := context.WithCancel(context.Background()) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() for i, r := range repositories { - wg.Add(1) - go func(index int, repo aliRepo) { - defer func() { - wg.Done() - }() - - // Return false means no passport acquired, and no valid passport will be dispatched any more. - // For example, some crucial errors happened and all tasks should be cancelled. - if ok := passportsPool.Apply(); !ok { - return - } - defer func() { - passportsPool.Revoke() - }() - + index := i + repo := r + runner.AddTask(func() error { var tags []string tags, err = a.getTags(repo, client) if err != nil { - log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) - cancel() - return + return fmt.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) } var filterTags []string @@ -194,9 +177,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou var ok bool ok, err = util.Match(tagsPattern, tag) if err != nil { - log.Errorf("Match tag '%s' error: %v", tag, err) - cancel() - return + return fmt.Errorf("Match tag '%s' error: %v", tag, err) } if ok { filterTags = append(filterTags, tag) @@ -218,16 +199,14 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou Labels: []string{}, }, } - } else { - rawResources[index] = nil } - }(i, r) - } - wg.Wait() - err = ctx.Err() - cancel() - if err != nil { + return nil + }) + } + runner.Wait() + + if runner.IsCancelled() { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 363b2ea07..3cfb692eb 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -2,14 +2,12 @@ package dockerhub import ( "bytes" - "context" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "strings" - "sync" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" @@ -250,26 +248,12 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } var rawResources = make([]*model.Resource, len(repos)) - var wg = new(sync.WaitGroup) - ctx, cancel := context.WithCancel(context.Background()) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) - + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() for i, r := range repos { - wg.Add(1) - go func(index int, repo Repo) { - defer func() { - wg.Done() - }() - - // Return false means no passport acquired, and no valid passport will be dispatched any more. - // For example, some crucial errors happened and all tasks should be cancelled. - if ok := passportsPool.Apply(); !ok { - return - } - defer func() { - passportsPool.Revoke() - }() - + index := i + repo := r + runner.AddTask(func() error { name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) log.Infof("Routine started to collect tags for repo: %s", name) @@ -277,12 +261,10 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if len(nameFilter) != 0 { m, err := util.Match(nameFilter, name) if err != nil { - cancel() - log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) - return + return fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) } if !m { - return + return nil } } @@ -292,18 +274,14 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error for { pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) if err != nil { - cancel() - log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) - return + return fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) } for _, t := range pageTags.Tags { // If tag filter set, skip tags that don't match the filter pattern. if len(tagFilter) != 0 { m, err := util.Match(tagFilter, t.Name) if err != nil { - cancel() - log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) - return + return fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) } if !m { @@ -319,9 +297,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error page++ } - if len(tags) == 0 { - rawResources[index] = nil - } else { + if len(tags) > 0 { rawResources[index] = &model.Resource{ Type: model.ResourceTypeImage, Registry: a.registry, @@ -333,13 +309,13 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error }, } } - }(i, r) - } - wg.Wait() - err = ctx.Err() - cancel() - if err != nil { + return nil + }) + } + runner.Wait() + + if runner.IsCancelled() { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/harbor/image_registry.go b/src/replication/adapter/harbor/image_registry.go index 1ae5f1685..269627090 100644 --- a/src/replication/adapter/harbor/image_registry.go +++ b/src/replication/adapter/harbor/image_registry.go @@ -15,10 +15,8 @@ package harbor import ( - "context" "fmt" "strings" - "sync" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" @@ -48,47 +46,30 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } } - rawResources := make([]*model.Resource, len(repositories)) - var wg = new(sync.WaitGroup) - ctx, cancel := context.WithCancel(context.Background()) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() for i, r := range repositories { - wg.Add(1) - go func(index int, repo *adp.Repository) { - defer func() { - wg.Done() - }() - - // Return false means no passport acquired, and no valid passport will be dispatched any more. - // For example, some crucial errors happened and all tasks should be cancelled. - if ok := passportsPool.Apply(); !ok { - return - } - defer func() { - passportsPool.Revoke() - }() - + index := i + repo := r + runner.AddTask(func() error { vTags, err := a.getTags(repo.Name) if err != nil { - log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) - cancel() - return + return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err) } if len(vTags) == 0 { rawResources[index] = nil - return + return nil } for _, filter := range filters { if err = filter.DoFilter(&vTags); err != nil { - log.Errorf("Filter tags %v error: %v", vTags, err) - cancel() - return + return fmt.Errorf("Filter tags %v error: %v", vTags, err) } } if len(vTags) == 0 { rawResources[index] = nil - return + return nil } tags := []string{} for _, vTag := range vTags { @@ -105,13 +86,13 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error Vtags: tags, }, } - }(i, r) - } - wg.Wait() - err = ctx.Err() - cancel() - if err != nil { + return nil + }) + } + runner.Wait() + + if runner.IsCancelled() { return nil, fmt.Errorf("FetchImages error when collect tags for repos") } diff --git a/src/replication/adapter/native/adapter.go b/src/replication/adapter/native/adapter.go index 5353d08a4..8f42f6a61 100644 --- a/src/replication/adapter/native/adapter.go +++ b/src/replication/adapter/native/adapter.go @@ -15,7 +15,6 @@ package native import ( - "context" "fmt" "io" "net/http" @@ -162,45 +161,28 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } } - rawResources := make([]*model.Resource, len(repositories)) - var wg = new(sync.WaitGroup) - ctx, cancel := context.WithCancel(context.Background()) - var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done()) + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() for i, r := range repositories { - wg.Add(1) - go func(index int, repo *adp.Repository) { - defer func() { - wg.Done() - }() - - // Return false means no passport acquired, and no valid passport will be dispatched any more. - // For example, some crucial errors happened and all tasks should be cancelled. - if ok := passportsPool.Apply(); !ok { - return - } - defer func() { - passportsPool.Revoke() - }() - + index := i + repo := r + runner.AddTask(func() error { vTags, err := a.getVTags(repo.Name) if err != nil { - log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) - cancel() - return + return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err) } if len(vTags) == 0 { - return + return nil } for _, filter := range filters { if err = filter.DoFilter(&vTags); err != nil { - log.Errorf("Filter tags %v error: %v", vTags, err) - cancel() - return + return fmt.Errorf("Filter tags %v error: %v", vTags, err) } } if len(vTags) == 0 { - return + return nil } tags := []string{} for _, vTag := range vTags { @@ -216,13 +198,13 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error Vtags: tags, }, } - }(i, r) - } - wg.Wait() - err = ctx.Err() - cancel() - if err != nil { + return nil + }) + } + runner.Wait() + + if runner.IsCancelled() { return nil, fmt.Errorf("FetchImages error when collect tags for repos") }