diff --git a/src/common/utils/passports.go b/src/common/utils/passports.go new file mode 100644 index 000000000..bce88be9e --- /dev/null +++ b/src/common/utils/passports.go @@ -0,0 +1,128 @@ +package utils + +import ( + "context" + "sync" + + "github.com/goharbor/harbor/src/common/utils/log" +) + +// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool +// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users +// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish +// the task, the passport must be revoked. +type PassportsPool interface { + // Apply applies a passport from the pool. + Apply() bool + // Revoke revokes a passport to the pool + Revoke() bool +} + +type passportsPool struct { + passports chan struct{} + stopped <-chan struct{} +} + +// NewPassportsPool creates a passports pool with given size +func NewPassportsPool(size int, stopped <-chan struct{}) PassportsPool { + return &passportsPool{ + passports: make(chan struct{}, size), + stopped: stopped, + } +} + +// Apply applies a passport from the pool. Returning value 'true' means passport acquired +// successfully. If no available passports in the pool, 'Apply' will wait for it. If the +// all passports in the pool are turned into invalid by the 'stopped' channel, then false +// is returned, means no more passports will be dispatched. +func (p *passportsPool) Apply() bool { + select { + case p.passports <- struct{}{}: + return true + case <-p.stopped: + return false + } +} + +// Revoke revokes a passport to the pool. Returning value 'true' means passport revoked +// successfully, otherwise 'Revoke' will wait. If pool turns into invalid by 'stopped' channel +// false will be returned. +func (p *passportsPool) Revoke() bool { + select { + case <-p.passports: + return true + case <-p.stopped: + return false + } +} + +// LimitedConcurrentRunner is used to run tasks, but limit the max concurrency. +type LimitedConcurrentRunner interface { + // AddTask adds a task to run + AddTask(task func() error) + // Wait waits all the tasks to be finished + Wait() + // Cancel cancels all tasks, tasks that already started will continue to run + Cancel() + // IsCancelled checks whether context is cancelled. This happens when some task encountered + // critical errors. + IsCancelled() bool +} + +type limitedConcurrentRunner struct { + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + passportsPool PassportsPool +} + +// NewLimitedConcurrentRunner creates a runner +func NewLimitedConcurrentRunner(limit int) LimitedConcurrentRunner { + ctx, cancel := context.WithCancel(context.Background()) + return &limitedConcurrentRunner{ + wg: new(sync.WaitGroup), + ctx: ctx, + cancel: cancel, + passportsPool: NewPassportsPool(limit, ctx.Done()), + } +} + +// AddTask adds a task to run +func (r *limitedConcurrentRunner) AddTask(task func() error) { + r.wg.Add(1) + go func() { + defer func() { + r.wg.Done() + }() + + // Return false means no passport acquired, and no valid passport will be dispatched any more. + // For example, some crucial errors happened and all tasks should be cancelled. + if ok := r.passportsPool.Apply(); !ok { + return + } + defer func() { + r.passportsPool.Revoke() + }() + + err := task() + if err != nil { + log.Errorf("%v", err) + r.cancel() + } + }() +} + +// Wait waits all the tasks to be finished +func (r *limitedConcurrentRunner) Wait() { + r.wg.Wait() +} + +// Cancel cancels all tasks, tasks that already started will continue to run +func (r *limitedConcurrentRunner) Cancel() { + r.cancel() +} + +// IsCancelled checks whether context is cancelled. This happens when some task encountered critical errors. +func (r *limitedConcurrentRunner) IsCancelled() bool { + return r.ctx.Err() != nil +} diff --git a/src/replication/adapter/adapter.go b/src/replication/adapter/adapter.go index 5e944ac7e..81c49f0a9 100644 --- a/src/replication/adapter/adapter.go +++ b/src/replication/adapter/adapter.go @@ -27,6 +27,7 @@ import ( // const definition const ( UserAgentReplication = "harbor-replication-service" + MaxConcurrency = 100 ) var registry = map[model.RegistryType]Factory{} diff --git a/src/replication/adapter/aliacr/adapter.go b/src/replication/adapter/aliacr/adapter.go index 7b043942f..386f2b3c1 100644 --- a/src/replication/adapter/aliacr/adapter.go +++ b/src/replication/adapter/aliacr/adapter.go @@ -9,13 +9,13 @@ import ( "regexp" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/replication/adapter/native" - "github.com/goharbor/harbor/src/replication/util" - + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/common/utils/registry/auth" adp "github.com/goharbor/harbor/src/replication/adapter" + "github.com/goharbor/harbor/src/replication/adapter/native" "github.com/goharbor/harbor/src/replication/model" + "github.com/goharbor/harbor/src/replication/util" ) func init() { @@ -157,42 +157,62 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou } log.Debugf("FetchImages.repositories: %#v\n", repositories) - // list tags - for _, repository := range repositories { - var tags []string - tags, err = a.getTags(repository, client) - if err != nil { - return - } + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() - var filterTags []string - if tagsPattern != "" { - for _, tag := range tags { - var ok bool - ok, err = util.Match(tagsPattern, tag) - if err != nil { - return + for i, r := range repositories { + index := i + repo := r + runner.AddTask(func() error { + var tags []string + tags, err = a.getTags(repo, client) + if err != nil { + return fmt.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) + } + + var filterTags []string + if tagsPattern != "" { + for _, tag := range tags { + var ok bool + ok, err = util.Match(tagsPattern, tag) + if err != nil { + return fmt.Errorf("Match tag '%s' error: %v", tag, err) + } + if ok { + filterTags = append(filterTags, tag) + } } - if ok { - filterTags = append(filterTags, tag) + } else { + filterTags = tags + } + + if len(filterTags) > 0 { + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: filepath.Join(repo.RepoNamespace, repo.RepoName), + }, + Vtags: filterTags, + Labels: []string{}, + }, } } - } else { - filterTags = tags - } - if len(filterTags) > 0 { - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: filepath.Join(repository.RepoNamespace, repository.RepoName), - }, - Vtags: filterTags, - Labels: []string{}, - }, - }) + return nil + }) + } + runner.Wait() + + if runner.IsCancelled() { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) } } diff --git a/src/replication/adapter/dockerhub/adapter.go b/src/replication/adapter/dockerhub/adapter.go index 908b62843..3cfb692eb 100644 --- a/src/replication/adapter/dockerhub/adapter.go +++ b/src/replication/adapter/dockerhub/adapter.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/adapter/native" @@ -25,16 +26,16 @@ func init() { } func factory(registry *model.Registry) (adp.Adapter, error) { - client, err := NewClient(&model.Registry{ - URL: baseURL, // specify the URL of Docker Hub - Credential: registry.Credential, - Insecure: registry.Insecure, - }) + client, err := NewClient(registry) if err != nil { return nil, err } - dockerRegistryAdapter, err := native.NewAdapter(registry) + dockerRegistryAdapter, err := native.NewAdapter(&model.Registry{ + URL: registryURL, + Credential: registry.Credential, + Insecure: registry.Insecure, + }) if err != nil { return nil, err } @@ -246,66 +247,84 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error log.Debugf("got %d repositories for namespace %s", n, ns) } - var resources []*model.Resource - // TODO(ChenDe): Get tags for repos in parallel - for _, repo := range repos { - name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) - // If name filter set, skip repos that don't match the filter pattern. - if len(nameFilter) != 0 { - m, err := util.Match(nameFilter, name) - if err != nil { - return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) - } - if !m { - continue - } - } + var rawResources = make([]*model.Resource, len(repos)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() + for i, r := range repos { + index := i + repo := r + runner.AddTask(func() error { + name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) + log.Infof("Routine started to collect tags for repo: %s", name) - var tags []string - page := 1 - pageSize := 100 - for { - pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) - if err != nil { - return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) - } - for _, t := range pageTags.Tags { - // If tag filter set, skip tags that don't match the filter pattern. - if len(tagFilter) != 0 { - m, err := util.Match(tagFilter, t.Name) - if err != nil { - return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) - } - - if !m { - continue - } + // If name filter set, skip repos that don't match the filter pattern. + if len(nameFilter) != 0 { + m, err := util.Match(nameFilter, name) + if err != nil { + return fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err) + } + if !m { + return nil } - tags = append(tags, t.Name) } - if len(pageTags.Next) == 0 { - break + var tags []string + page := 1 + pageSize := 100 + for { + pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) + if err != nil { + return fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err) + } + for _, t := range pageTags.Tags { + // If tag filter set, skip tags that don't match the filter pattern. + if len(tagFilter) != 0 { + m, err := util.Match(tagFilter, t.Name) + if err != nil { + return fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err) + } + + if !m { + continue + } + } + tags = append(tags, t.Name) + } + + if len(pageTags.Next) == 0 { + break + } + page++ } - page++ - } - // If the repo has no tags, skip it - if len(tags) == 0 { - continue - } + if len(tags) > 0 { + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: name, + }, + Vtags: tags, + }, + } + } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: name, - }, - Vtags: tags, - }, + return nil }) } + runner.Wait() + + if runner.IsCancelled() { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } + } return resources, nil } diff --git a/src/replication/adapter/harbor/image_registry.go b/src/replication/adapter/harbor/image_registry.go index a278bb099..269627090 100644 --- a/src/replication/adapter/harbor/image_registry.go +++ b/src/replication/adapter/harbor/image_registry.go @@ -18,6 +18,7 @@ import ( "fmt" "strings" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" adp "github.com/goharbor/harbor/src/replication/adapter" "github.com/goharbor/harbor/src/replication/model" @@ -29,6 +30,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error if err != nil { return nil, err } + resources := []*model.Resource{} for _, project := range projects { repositories, err := a.getRepositories(project.ID) @@ -43,38 +45,62 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error return nil, err } } - for _, repository := range repositories { - vTags, err := a.getTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() + + for i, r := range repositories { + index := i + repo := r + runner.AddTask(func() error { + vTags, err := a.getTags(repo.Name) + if err != nil { + return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err) } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, - Metadata: project.Metadata, + if len(vTags) == 0 { + rawResources[index] = nil + return nil + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + return fmt.Errorf("Filter tags %v error: %v", vTags, err) + } + } + if len(vTags) == 0 { + rawResources[index] = nil + return nil + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + Metadata: project.Metadata, + }, + Vtags: tags, }, - Vtags: tags, - }, + } + + return nil }) } + runner.Wait() + + if runner.IsCancelled() { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } + } } return resources, nil diff --git a/src/replication/adapter/native/adapter.go b/src/replication/adapter/native/adapter.go index 46a8731d6..8f42f6a61 100644 --- a/src/replication/adapter/native/adapter.go +++ b/src/replication/adapter/native/adapter.go @@ -15,6 +15,7 @@ package native import ( + "fmt" "io" "net/http" "strings" @@ -24,6 +25,7 @@ import ( "github.com/docker/distribution/manifest/schema1" "github.com/goharbor/harbor/src/common/http/modifier" common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" + "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" @@ -159,38 +161,59 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error } } - var resources []*model.Resource - for _, repository := range repositories { - vTags, err := a.getVTags(repository.Name) - if err != nil { - return nil, err - } - if len(vTags) == 0 { - continue - } - for _, filter := range filters { - if err = filter.DoFilter(&vTags); err != nil { - return nil, err + var rawResources = make([]*model.Resource, len(repositories)) + runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency) + defer runner.Cancel() + + for i, r := range repositories { + index := i + repo := r + runner.AddTask(func() error { + vTags, err := a.getVTags(repo.Name) + if err != nil { + return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err) } - } - if len(vTags) == 0 { - continue - } - tags := []string{} - for _, vTag := range vTags { - tags = append(tags, vTag.Name) - } - resources = append(resources, &model.Resource{ - Type: model.ResourceTypeImage, - Registry: a.registry, - Metadata: &model.ResourceMetadata{ - Repository: &model.Repository{ - Name: repository.Name, + if len(vTags) == 0 { + return nil + } + for _, filter := range filters { + if err = filter.DoFilter(&vTags); err != nil { + return fmt.Errorf("Filter tags %v error: %v", vTags, err) + } + } + if len(vTags) == 0 { + return nil + } + tags := []string{} + for _, vTag := range vTags { + tags = append(tags, vTag.Name) + } + rawResources[index] = &model.Resource{ + Type: model.ResourceTypeImage, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: repo.Name, + }, + Vtags: tags, }, - Vtags: tags, - }, + } + + return nil }) } + runner.Wait() + + if runner.IsCancelled() { + return nil, fmt.Errorf("FetchImages error when collect tags for repos") + } + + var resources []*model.Resource + for _, r := range rawResources { + if r != nil { + resources = append(resources, r) + } + } return resources, nil }