Merge pull request #8514 from cd1989/replication-performance

Replication performance
This commit is contained in:
Wenkai Yin(尹文开) 2019-08-08 06:27:47 +08:00 committed by GitHub
commit 4add0eaba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 365 additions and 148 deletions

View File

@ -0,0 +1,128 @@
package utils
import (
"context"
"sync"
"github.com/goharbor/harbor/src/common/utils/log"
)
// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool
// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users
// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish
// the task, the passport must be revoked.
type PassportsPool interface {
// Apply applies a passport from the pool.
Apply() bool
// Revoke revokes a passport to the pool
Revoke() bool
}
type passportsPool struct {
passports chan struct{}
stopped <-chan struct{}
}
// NewPassportsPool creates a passports pool with given size
func NewPassportsPool(size int, stopped <-chan struct{}) PassportsPool {
return &passportsPool{
passports: make(chan struct{}, size),
stopped: stopped,
}
}
// Apply applies a passport from the pool. Returning value 'true' means passport acquired
// successfully. If no available passports in the pool, 'Apply' will wait for it. If the
// all passports in the pool are turned into invalid by the 'stopped' channel, then false
// is returned, means no more passports will be dispatched.
func (p *passportsPool) Apply() bool {
select {
case p.passports <- struct{}{}:
return true
case <-p.stopped:
return false
}
}
// Revoke revokes a passport to the pool. Returning value 'true' means passport revoked
// successfully, otherwise 'Revoke' will wait. If pool turns into invalid by 'stopped' channel
// false will be returned.
func (p *passportsPool) Revoke() bool {
select {
case <-p.passports:
return true
case <-p.stopped:
return false
}
}
// LimitedConcurrentRunner is used to run tasks, but limit the max concurrency.
type LimitedConcurrentRunner interface {
// AddTask adds a task to run
AddTask(task func() error)
// Wait waits all the tasks to be finished
Wait()
// Cancel cancels all tasks, tasks that already started will continue to run
Cancel()
// IsCancelled checks whether context is cancelled. This happens when some task encountered
// critical errors.
IsCancelled() bool
}
type limitedConcurrentRunner struct {
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
passportsPool PassportsPool
}
// NewLimitedConcurrentRunner creates a runner
func NewLimitedConcurrentRunner(limit int) LimitedConcurrentRunner {
ctx, cancel := context.WithCancel(context.Background())
return &limitedConcurrentRunner{
wg: new(sync.WaitGroup),
ctx: ctx,
cancel: cancel,
passportsPool: NewPassportsPool(limit, ctx.Done()),
}
}
// AddTask adds a task to run
func (r *limitedConcurrentRunner) AddTask(task func() error) {
r.wg.Add(1)
go func() {
defer func() {
r.wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := r.passportsPool.Apply(); !ok {
return
}
defer func() {
r.passportsPool.Revoke()
}()
err := task()
if err != nil {
log.Errorf("%v", err)
r.cancel()
}
}()
}
// Wait waits all the tasks to be finished
func (r *limitedConcurrentRunner) Wait() {
r.wg.Wait()
}
// Cancel cancels all tasks, tasks that already started will continue to run
func (r *limitedConcurrentRunner) Cancel() {
r.cancel()
}
// IsCancelled checks whether context is cancelled. This happens when some task encountered critical errors.
func (r *limitedConcurrentRunner) IsCancelled() bool {
return r.ctx.Err() != nil
}

View File

@ -27,6 +27,7 @@ import (
// const definition
const (
UserAgentReplication = "harbor-replication-service"
MaxConcurrency = 100
)
var registry = map[model.RegistryType]Factory{}

View File

@ -9,13 +9,13 @@ import (
"regexp"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cr"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/adapter/native"
"github.com/goharbor/harbor/src/replication/util"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/adapter/native"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/util"
)
func init() {
@ -157,42 +157,62 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou
}
log.Debugf("FetchImages.repositories: %#v\n", repositories)
// list tags
for _, repository := range repositories {
var tags []string
tags, err = a.getTags(repository, client)
if err != nil {
return
}
var rawResources = make([]*model.Resource, len(repositories))
runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
defer runner.Cancel()
var filterTags []string
if tagsPattern != "" {
for _, tag := range tags {
var ok bool
ok, err = util.Match(tagsPattern, tag)
if err != nil {
return
for i, r := range repositories {
index := i
repo := r
runner.AddTask(func() error {
var tags []string
tags, err = a.getTags(repo, client)
if err != nil {
return fmt.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err)
}
var filterTags []string
if tagsPattern != "" {
for _, tag := range tags {
var ok bool
ok, err = util.Match(tagsPattern, tag)
if err != nil {
return fmt.Errorf("Match tag '%s' error: %v", tag, err)
}
if ok {
filterTags = append(filterTags, tag)
}
}
if ok {
filterTags = append(filterTags, tag)
} else {
filterTags = tags
}
if len(filterTags) > 0 {
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: filepath.Join(repo.RepoNamespace, repo.RepoName),
},
Vtags: filterTags,
Labels: []string{},
},
}
}
} else {
filterTags = tags
}
if len(filterTags) > 0 {
resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: filepath.Join(repository.RepoNamespace, repository.RepoName),
},
Vtags: filterTags,
Labels: []string{},
},
})
return nil
})
}
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
}

View File

@ -9,6 +9,7 @@ import (
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/adapter/native"
@ -25,16 +26,16 @@ func init() {
}
func factory(registry *model.Registry) (adp.Adapter, error) {
client, err := NewClient(&model.Registry{
URL: baseURL, // specify the URL of Docker Hub
Credential: registry.Credential,
Insecure: registry.Insecure,
})
client, err := NewClient(registry)
if err != nil {
return nil, err
}
dockerRegistryAdapter, err := native.NewAdapter(registry)
dockerRegistryAdapter, err := native.NewAdapter(&model.Registry{
URL: registryURL,
Credential: registry.Credential,
Insecure: registry.Insecure,
})
if err != nil {
return nil, err
}
@ -246,66 +247,84 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
log.Debugf("got %d repositories for namespace %s", n, ns)
}
var resources []*model.Resource
// TODO(ChenDe): Get tags for repos in parallel
for _, repo := range repos {
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, name)
if err != nil {
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
}
if !m {
continue
}
}
var rawResources = make([]*model.Resource, len(repos))
runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
defer runner.Cancel()
for i, r := range repos {
index := i
repo := r
runner.AddTask(func() error {
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
log.Infof("Routine started to collect tags for repo: %s", name)
var tags []string
page := 1
pageSize := 100
for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil {
return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
}
for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name)
if err != nil {
return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
}
if !m {
continue
}
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, name)
if err != nil {
return fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
}
if !m {
return nil
}
tags = append(tags, t.Name)
}
if len(pageTags.Next) == 0 {
break
var tags []string
page := 1
pageSize := 100
for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil {
return fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
}
for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name)
if err != nil {
return fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
}
if !m {
continue
}
}
tags = append(tags, t.Name)
}
if len(pageTags.Next) == 0 {
break
}
page++
}
page++
}
// If the repo has no tags, skip it
if len(tags) == 0 {
continue
}
if len(tags) > 0 {
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: name,
},
Vtags: tags,
},
}
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: name,
},
Vtags: tags,
},
return nil
})
}
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
var resources []*model.Resource
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
}
return resources, nil
}

View File

@ -18,6 +18,7 @@ import (
"fmt"
"strings"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/model"
@ -29,6 +30,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
if err != nil {
return nil, err
}
resources := []*model.Resource{}
for _, project := range projects {
repositories, err := a.getRepositories(project.ID)
@ -43,38 +45,62 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
return nil, err
}
}
for _, repository := range repositories {
vTags, err := a.getTags(repository.Name)
if err != nil {
return nil, err
}
if len(vTags) == 0 {
continue
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
return nil, err
var rawResources = make([]*model.Resource, len(repositories))
runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
defer runner.Cancel()
for i, r := range repositories {
index := i
repo := r
runner.AddTask(func() error {
vTags, err := a.getTags(repo.Name)
if err != nil {
return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
}
}
if len(vTags) == 0 {
continue
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repository.Name,
Metadata: project.Metadata,
if len(vTags) == 0 {
rawResources[index] = nil
return nil
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
return fmt.Errorf("Filter tags %v error: %v", vTags, err)
}
}
if len(vTags) == 0 {
rawResources[index] = nil
return nil
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repo.Name,
Metadata: project.Metadata,
},
Vtags: tags,
},
Vtags: tags,
},
}
return nil
})
}
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
}
}
return resources, nil

View File

@ -15,6 +15,7 @@
package native
import (
"fmt"
"io"
"net/http"
"strings"
@ -24,6 +25,7 @@ import (
"github.com/docker/distribution/manifest/schema1"
"github.com/goharbor/harbor/src/common/http/modifier"
common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log"
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
@ -159,38 +161,59 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
}
}
var resources []*model.Resource
for _, repository := range repositories {
vTags, err := a.getVTags(repository.Name)
if err != nil {
return nil, err
}
if len(vTags) == 0 {
continue
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
return nil, err
var rawResources = make([]*model.Resource, len(repositories))
runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
defer runner.Cancel()
for i, r := range repositories {
index := i
repo := r
runner.AddTask(func() error {
vTags, err := a.getVTags(repo.Name)
if err != nil {
return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
}
}
if len(vTags) == 0 {
continue
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repository.Name,
if len(vTags) == 0 {
return nil
}
for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil {
return fmt.Errorf("Filter tags %v error: %v", vTags, err)
}
}
if len(vTags) == 0 {
return nil
}
tags := []string{}
for _, vTag := range vTags {
tags = append(tags, vTag.Name)
}
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repo.Name,
},
Vtags: tags,
},
Vtags: tags,
},
}
return nil
})
}
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos")
}
var resources []*model.Resource
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
}
return resources, nil
}