Recover the panic of the process of replication adapter and fix bug of gitlab adapter

Recover the panic of the process of replication adapter
Fix bug of gitlab adapter
Fixes #14153

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2021-02-04 18:18:43 +08:00
parent ec83f49a1a
commit cc3e240d56
4 changed files with 55 additions and 34 deletions

View File

@ -16,6 +16,7 @@ package replication
import (
"context"
"fmt"
"time"
"github.com/goharbor/harbor/src/controller/replication/flow"
@ -94,38 +95,51 @@ func (c *controller) Start(ctx context.Context, policy *model.Policy, resource *
}
c.wp.GetWorker()
// start the replication flow in background
go func() {
defer c.wp.ReleaseWorker()
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create a new context
// with orm populated
ctxx := orm.NewContext(context.Background(), c.ormCreator.Create())
// may be submitted already when the process starts, so pass a new context
// with orm populated to the goroutine
go func(ctx context.Context) {
defer c.wp.ReleaseWorker()
// recover in case panic during the adapter process
defer func() {
if err := recover(); err != nil {
logger.Errorf("recovered from the panic: %v", err)
c.markError(ctx, id, fmt.Errorf("panic during the process"))
}
}()
// as we start a new transaction in the goroutine, the execution record may not
// be inserted yet, wait until it is ready before continue
if err := lib.RetryUntil(func() error {
_, err := c.execMgr.Get(ctxx, id)
_, err := c.execMgr.Get(ctx, id)
return err
}); err != nil {
logger.Errorf("failed to wait the execution record to be inserted: %v", err)
c.markError(ctx, id, fmt.Errorf(
"failed to wait the execution record to be inserted: %v", err))
return
}
err := c.flowCtl.Start(ctxx, id, policy, resource)
err := c.flowCtl.Start(ctx, id, policy, resource)
if err == nil {
// no err, return directly
return
}
// got error, try to stop the execution first in case that some tasks are already created
if err := c.execMgr.StopAndWait(ctxx, id, 10*time.Second); err != nil {
logger.Errorf("failed to stop the execution %d: %v", id, err)
}
if err := c.execMgr.MarkError(ctxx, id, err.Error()); err != nil {
logger.Errorf("failed to mark error for the execution %d: %v", id, err)
}
}()
c.markError(ctx, id, err)
}(orm.NewContext(context.Background(), c.ormCreator.Create()))
return id, nil
}
func (c *controller) markError(ctx context.Context, executionID int64, err error) {
logger := log.GetLogger(ctx)
// try to stop the execution first in case that some tasks are already created
if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
logger.Errorf("failed to stop the execution %d: %v", executionID, err)
}
if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil {
logger.Errorf("failed to mark error for the execution %d: %v", executionID, err)
}
}
func (c *controller) Stop(ctx context.Context, id int64) error {
return c.execMgr.Stop(ctx, id)
}

View File

@ -22,7 +22,7 @@ type factory struct {
// Create ...
func (f *factory) Create(r *model.Registry) (adp.Adapter, error) {
return newAdapter(r), nil
return newAdapter(r)
}
// AdapterPattern ...
@ -44,13 +44,17 @@ type adapter struct {
clientGitlabAPI *Client
}
func newAdapter(registry *model.Registry) *adapter {
func newAdapter(registry *model.Registry) (*adapter, error) {
client, err := NewClient(registry)
if err != nil {
return nil, err
}
return &adapter{
registry: registry,
url: registry.URL,
clientGitlabAPI: NewClient(registry),
clientGitlabAPI: client,
Adapter: native.NewAdapter(registry),
}
}, nil
}
func (a *adapter) Info() (info *model.RegistryInfo, err error) {
@ -93,7 +97,10 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) ([]*model.Resource, er
}
}
projects = a.getProjectsByPattern(nameFilter)
projects, err = a.getProjectsByPattern(nameFilter)
if err != nil {
return nil, err
}
if len(projects) == 0 {
projects, err = a.clientGitlabAPI.getProjects()
if err != nil {
@ -156,7 +163,7 @@ func (a *adapter) FetchArtifacts(filters []*model.Filter) ([]*model.Resource, er
return resources, nil
}
func (a *adapter) getProjectsByPattern(pattern string) []*Project {
func (a *adapter) getProjectsByPattern(pattern string) ([]*Project, error) {
var projects []*Project
projectset := make(map[string]bool)
var err error
@ -175,7 +182,7 @@ func (a *adapter) getProjectsByPattern(pattern string) []*Project {
projectset[substrings[1]] = true
var projectsByName, err = a.clientGitlabAPI.getProjectsByName(substrings[1])
if err != nil {
return nil
return nil, err
}
if projectsByName == nil {
continue
@ -185,24 +192,24 @@ func (a *adapter) getProjectsByPattern(pattern string) []*Project {
} else {
substrings := strings.Split(pattern, "/")
if len(substrings) < 2 {
return projects
return projects, nil
}
projectName := substrings[1]
if projectName == "*" {
return projects
return projects, nil
}
projectName = strings.Trim(projectName, "*")
if strings.Contains(projectName, "*") {
return projects
return projects, nil
}
projects, err = a.clientGitlabAPI.getProjectsByName(projectName)
if err != nil {
return projects
return nil, err
}
}
}
return projects
return projects, nil
}
func existPatterns(path string, patterns []string) bool {

View File

@ -86,7 +86,7 @@ func getAdapter(t *testing.T) adp.Adapter {
assertions.Nil(err)
assertions.NotNil(factory)
server := getServer(t)
adapter := newAdapter(&model.Registry{
adapter, err := newAdapter(&model.Registry{
Type: model.RegistryTypeGitLab,
URL: server.URL,
Credential: &model.Credential{

View File

@ -30,20 +30,20 @@ type Client struct {
}
// NewClient creates a new GitLab client.
func NewClient(registry *model.Registry) *Client {
func NewClient(registry *model.Registry) (*Client, error) {
realm, _, err := ping(&http.Client{
Transport: util.GetHTTPTransport(registry.Insecure),
}, registry.URL)
if err != nil {
return nil
return nil, err
}
if realm == "" {
return nil
return nil, fmt.Errorf("empty realm")
}
location, err := url.Parse(realm)
if err != nil {
return nil
return nil, err
}
client := &Client{
url: location.Scheme + "://" + location.Host,
@ -54,7 +54,7 @@ func NewClient(registry *model.Registry) *Client {
Transport: util.GetHTTPTransport(registry.Insecure),
}),
}
return client
return client, nil
}
// ping returns the realm, service and error