Refactor code to extract a common task runner

Signed-off-by: cd1989 <chende@caicloud.io>
This commit is contained in:
cd1989 2019-08-07 14:13:39 +08:00
parent e2e540233b
commit 870d7115c4
5 changed files with 140 additions and 144 deletions

View File

@ -1,5 +1,12 @@
package utils package utils
import (
"context"
"sync"
"github.com/goharbor/harbor/src/common/utils/log"
)
// PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool // PassportsPool holds a given number of passports, they can be applied or be revoked. PassportsPool
// is used to control the concurrency of tasks, the pool size determine the max concurrency. When users // is used to control the concurrency of tasks, the pool size determine the max concurrency. When users
// want to start a goroutine to perform some task, they must apply a passport firstly, and after finish // want to start a goroutine to perform some task, they must apply a passport firstly, and after finish
@ -48,3 +55,74 @@ func (p *passportsPool) Revoke() bool {
return false return false
} }
} }
// LimitedConcurrentRunner is used to run tasks, but limit the max concurrency.
type LimitedConcurrentRunner interface {
// AddTask adds a task to run
AddTask(task func() error)
// Wait waits all the tasks to be finished
Wait()
// Cancel cancels all tasks, tasks that already started will continue to run
Cancel()
// IsCancelled checks whether context is cancelled. This happens when some task encountered
// critical errors.
IsCancelled() bool
}
type limitedConcurrentRunner struct {
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
passportsPool PassportsPool
}
// NewLimitedConcurrentRunner creates a runner
func NewLimitedConcurrentRunner(limit int) LimitedConcurrentRunner {
ctx, cancel := context.WithCancel(context.Background())
return &limitedConcurrentRunner{
wg: new(sync.WaitGroup),
ctx: ctx,
cancel: cancel,
passportsPool: NewPassportsPool(limit, ctx.Done()),
}
}
// AddTask adds a task to run
func (r *limitedConcurrentRunner) AddTask(task func() error) {
r.wg.Add(1)
go func() {
defer func() {
r.wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := r.passportsPool.Apply(); !ok {
return
}
defer func() {
r.passportsPool.Revoke()
}()
err := task()
if err != nil {
log.Errorf("%v", err)
r.cancel()
}
}()
}
// Wait waits all the tasks to be finished
func (r *limitedConcurrentRunner) Wait() {
r.wg.Wait()
}
// Cancel cancels all tasks, tasks that already started will continue to run
func (r *limitedConcurrentRunner) Cancel() {
r.cancel()
}
// IsCancelled checks whether context is cancelled. This happens when some task encountered critical errors.
func (r *limitedConcurrentRunner) IsCancelled() bool {
return r.ctx.Err() != nil
}

View File

@ -1,14 +1,12 @@
package aliacr package aliacr
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sync"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cr" "github.com/aliyun/alibaba-cloud-sdk-go/services/cr"
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
@ -159,33 +157,18 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou
} }
log.Debugf("FetchImages.repositories: %#v\n", repositories) log.Debugf("FetchImages.repositories: %#v\n", repositories)
rawResources := make([]*model.Resource, len(repositories)) var rawResources = make([]*model.Resource, len(repositories))
var wg = new(sync.WaitGroup) runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
ctx, cancel := context.WithCancel(context.Background()) defer runner.Cancel()
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done())
for i, r := range repositories { for i, r := range repositories {
wg.Add(1) index := i
go func(index int, repo aliRepo) { repo := r
defer func() { runner.AddTask(func() error {
wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
}
defer func() {
passportsPool.Revoke()
}()
var tags []string var tags []string
tags, err = a.getTags(repo, client) tags, err = a.getTags(repo, client)
if err != nil { if err != nil {
log.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err) return fmt.Errorf("List tags for repo '%s' error: %v", repo.RepoName, err)
cancel()
return
} }
var filterTags []string var filterTags []string
@ -194,9 +177,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou
var ok bool var ok bool
ok, err = util.Match(tagsPattern, tag) ok, err = util.Match(tagsPattern, tag)
if err != nil { if err != nil {
log.Errorf("Match tag '%s' error: %v", tag, err) return fmt.Errorf("Match tag '%s' error: %v", tag, err)
cancel()
return
} }
if ok { if ok {
filterTags = append(filterTags, tag) filterTags = append(filterTags, tag)
@ -218,16 +199,14 @@ func (a *adapter) FetchImages(filters []*model.Filter) (resources []*model.Resou
Labels: []string{}, Labels: []string{},
}, },
} }
} else {
rawResources[index] = nil
} }
}(i, r)
}
wg.Wait()
err = ctx.Err() return nil
cancel() })
if err != nil { }
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos") return nil, fmt.Errorf("FetchImages error when collect tags for repos")
} }

View File

@ -2,14 +2,12 @@ package dockerhub
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"sync"
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
@ -250,26 +248,12 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
var rawResources = make([]*model.Resource, len(repos)) var rawResources = make([]*model.Resource, len(repos))
var wg = new(sync.WaitGroup) runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
ctx, cancel := context.WithCancel(context.Background()) defer runner.Cancel()
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done())
for i, r := range repos { for i, r := range repos {
wg.Add(1) index := i
go func(index int, repo Repo) { repo := r
defer func() { runner.AddTask(func() error {
wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
}
defer func() {
passportsPool.Revoke()
}()
name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) name := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
log.Infof("Routine started to collect tags for repo: %s", name) log.Infof("Routine started to collect tags for repo: %s", name)
@ -277,12 +261,10 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
if len(nameFilter) != 0 { if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, name) m, err := util.Match(nameFilter, name)
if err != nil { if err != nil {
cancel() return fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
log.Errorf("match repo name '%s' against pattern '%s' error: %v", name, nameFilter, err)
return
} }
if !m { if !m {
return return nil
} }
} }
@ -292,18 +274,14 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
for { for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize) pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil { if err != nil {
cancel() return fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
log.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
return
} }
for _, t := range pageTags.Tags { for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern. // If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 { if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name) m, err := util.Match(tagFilter, t.Name)
if err != nil { if err != nil {
cancel() return fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
log.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
return
} }
if !m { if !m {
@ -319,9 +297,7 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
page++ page++
} }
if len(tags) == 0 { if len(tags) > 0 {
rawResources[index] = nil
} else {
rawResources[index] = &model.Resource{ rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage, Type: model.ResourceTypeImage,
Registry: a.registry, Registry: a.registry,
@ -333,13 +309,13 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
}, },
} }
} }
}(i, r)
}
wg.Wait()
err = ctx.Err() return nil
cancel() })
if err != nil { }
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos") return nil, fmt.Errorf("FetchImages error when collect tags for repos")
} }

View File

@ -15,10 +15,8 @@
package harbor package harbor
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
@ -48,47 +46,30 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
} }
rawResources := make([]*model.Resource, len(repositories)) var rawResources = make([]*model.Resource, len(repositories))
var wg = new(sync.WaitGroup) runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
ctx, cancel := context.WithCancel(context.Background()) defer runner.Cancel()
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done())
for i, r := range repositories { for i, r := range repositories {
wg.Add(1) index := i
go func(index int, repo *adp.Repository) { repo := r
defer func() { runner.AddTask(func() error {
wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
}
defer func() {
passportsPool.Revoke()
}()
vTags, err := a.getTags(repo.Name) vTags, err := a.getTags(repo.Name)
if err != nil { if err != nil {
log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
cancel()
return
} }
if len(vTags) == 0 { if len(vTags) == 0 {
rawResources[index] = nil rawResources[index] = nil
return return nil
} }
for _, filter := range filters { for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil { if err = filter.DoFilter(&vTags); err != nil {
log.Errorf("Filter tags %v error: %v", vTags, err) return fmt.Errorf("Filter tags %v error: %v", vTags, err)
cancel()
return
} }
} }
if len(vTags) == 0 { if len(vTags) == 0 {
rawResources[index] = nil rawResources[index] = nil
return return nil
} }
tags := []string{} tags := []string{}
for _, vTag := range vTags { for _, vTag := range vTags {
@ -105,13 +86,13 @@ func (a *adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
Vtags: tags, Vtags: tags,
}, },
} }
}(i, r)
}
wg.Wait()
err = ctx.Err() return nil
cancel() })
if err != nil { }
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos") return nil, fmt.Errorf("FetchImages error when collect tags for repos")
} }

View File

@ -15,7 +15,6 @@
package native package native
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -162,45 +161,28 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
} }
} }
rawResources := make([]*model.Resource, len(repositories)) var rawResources = make([]*model.Resource, len(repositories))
var wg = new(sync.WaitGroup) runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
ctx, cancel := context.WithCancel(context.Background()) defer runner.Cancel()
var passportsPool = utils.NewPassportsPool(adp.MaxConcurrency, ctx.Done())
for i, r := range repositories { for i, r := range repositories {
wg.Add(1) index := i
go func(index int, repo *adp.Repository) { repo := r
defer func() { runner.AddTask(func() error {
wg.Done()
}()
// Return false means no passport acquired, and no valid passport will be dispatched any more.
// For example, some crucial errors happened and all tasks should be cancelled.
if ok := passportsPool.Apply(); !ok {
return
}
defer func() {
passportsPool.Revoke()
}()
vTags, err := a.getVTags(repo.Name) vTags, err := a.getVTags(repo.Name)
if err != nil { if err != nil {
log.Errorf("List tags for repo '%s' error: %v", repo.Name, err) return fmt.Errorf("List tags for repo '%s' error: %v", repo.Name, err)
cancel()
return
} }
if len(vTags) == 0 { if len(vTags) == 0 {
return return nil
} }
for _, filter := range filters { for _, filter := range filters {
if err = filter.DoFilter(&vTags); err != nil { if err = filter.DoFilter(&vTags); err != nil {
log.Errorf("Filter tags %v error: %v", vTags, err) return fmt.Errorf("Filter tags %v error: %v", vTags, err)
cancel()
return
} }
} }
if len(vTags) == 0 { if len(vTags) == 0 {
return return nil
} }
tags := []string{} tags := []string{}
for _, vTag := range vTags { for _, vTag := range vTags {
@ -216,13 +198,13 @@ func (a *Adapter) FetchImages(filters []*model.Filter) ([]*model.Resource, error
Vtags: tags, Vtags: tags,
}, },
} }
}(i, r)
}
wg.Wait()
err = ctx.Err() return nil
cancel() })
if err != nil { }
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchImages error when collect tags for repos") return nil, fmt.Errorf("FetchImages error when collect tags for repos")
} }