Merge pull request #7262 from ywk253100/190401_harbor_adapter

Use the secret to auth the replication for local Harbor
This commit is contained in:
Wenkai Yin 2019-04-01 17:26:56 +08:00 committed by GitHub
commit df2ae63308
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 130 additions and 80 deletions

View File

@ -17,10 +17,12 @@ package harbor
import (
"fmt"
"net/http"
// "strconv"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier"
common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/common/utils/log"
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
@ -31,11 +33,8 @@ import (
// TODO add UT
func init() {
// TODO passing coreServiceURL and tokenServiceURL
coreServiceURL := "http://core:8080"
tokenServiceURL := ""
if err := adp.RegisterFactory(model.RegistryTypeHarbor, func(registry *model.Registry) (adp.Adapter, error) {
return newAdapter(registry, coreServiceURL, tokenServiceURL), nil
return newAdapter(registry), nil
}); err != nil {
log.Errorf("failed to register factory for %s: %v", model.RegistryTypeHarbor, err)
return
@ -50,11 +49,8 @@ type adapter struct {
client *common_http.Client
}
// The registry URL and core service URL are different when the adapter
// is created for a local Harbor. If the "coreServicrURL" is null, the
// registry URL will be used as the coreServiceURL instead
func newAdapter(registry *model.Registry, coreServiceURL string,
tokenServiceURL string) *adapter {
func newAdapter(registry *model.Registry) *adapter {
// TODO use the global transport
transport := registry_pkg.GetHTTPTransport(registry.Insecure)
modifiers := []modifier.Modifier{
&auth.UserAgentModifier{
@ -62,15 +58,23 @@ func newAdapter(registry *model.Registry, coreServiceURL string,
},
}
if registry.Credential != nil {
authorizer := auth.NewBasicAuthCredential(
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
var authorizer modifier.Modifier
if registry.Credential.Type == model.CredentialTypeSecret {
authorizer = common_http_auth.NewSecretAuthorizer(registry.Credential.AccessSecret)
} else {
authorizer = auth.NewBasicAuthCredential(
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
}
modifiers = append(modifiers, authorizer)
}
// The registry URL and core service URL are different when the adapter
// is created for a local Harbor. If the "registry.CoreURL" is null, the
// registry URL will be used as the coreServiceURL instead
url := registry.URL
if len(coreServiceURL) > 0 {
url = coreServiceURL
if len(registry.CoreURL) > 0 {
url = registry.CoreURL
}
return &adapter{
@ -80,7 +84,7 @@ func newAdapter(registry *model.Registry, coreServiceURL string,
&http.Client{
Transport: transport,
}, modifiers...),
DefaultImageRegistry: adp.NewDefaultImageRegistry(registry, tokenServiceURL),
DefaultImageRegistry: adp.NewDefaultImageRegistry(registry),
}
}

View File

@ -16,6 +16,7 @@ package adapter
import (
"errors"
"fmt"
"io"
"net/http"
"strings"
@ -24,6 +25,7 @@ import (
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/goharbor/harbor/src/common/http/modifier"
common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth"
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/common/utils/registry/auth"
"github.com/goharbor/harbor/src/replication/ng/model"
@ -55,10 +57,8 @@ type DefaultImageRegistry struct {
clients map[string]*registry_pkg.Repository
}
// TODO: passing the tokenServiceURL
// NewDefaultImageRegistry returns an instance of DefaultImageRegistry
func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string) *DefaultImageRegistry {
func NewDefaultImageRegistry(registry *model.Registry) *DefaultImageRegistry {
// use the same HTTP connection pool for all clients
transport := registry_pkg.GetHTTPTransport(registry.Insecure)
modifiers := []modifier.Modifier{
@ -67,12 +67,23 @@ func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string
},
}
if registry.Credential != nil {
cred := auth.NewBasicAuthCredential(
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
var cred modifier.Modifier
if registry.Credential.Type == model.CredentialTypeSecret {
cred = common_http_auth.NewSecretAuthorizer(registry.Credential.AccessSecret)
} else {
cred = auth.NewBasicAuthCredential(
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
}
tokenServiceURL := ""
// the registry is a local Harbor instance if the core URL is specified,
// use the internal token service URL instead
if len(registry.CoreURL) > 0 {
tokenServiceURL = fmt.Sprintf("%s/service/token", registry.CoreURL)
}
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
Transport: transport,
}, cred, tokenServiceURL...)
}, cred, tokenServiceURL)
modifiers = append(modifiers, authorizer)
}

View File

@ -26,5 +26,7 @@ type Configuration struct {
TokenServiceURL string
JobserviceURL string
SecretKey string
Secret string
// TODO consider to use a specified secret for replication
CoreSecret string
JobserviceSecret string
}

View File

@ -154,15 +154,15 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model
func getLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
Status: "healthy",
// TODO use the service account
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
CoreURL: config.Config.CoreURL,
Status: "healthy",
Credential: &model.Credential{
Type: model.CredentialTypeBasic,
AccessKey: "admin",
AccessSecret: "Harbor12345",
Type: model.CredentialTypeSecret,
// use secret to do the auth for the local Harbor
AccessSecret: config.Config.JobserviceSecret,
},
Insecure: true,
}

View File

@ -36,11 +36,14 @@ type RegistryType string
// e.g: u/p, OAuth token
type CredentialType string
// const definitions
const (
// CredentialTypeBasic indicates credential by user name, password
CredentialTypeBasic = "basic"
// CredentialTypeOAuth indicates credential by OAuth token
CredentialTypeOAuth = "oauth"
// CredentialTypeSecret is only used by the communication of Harbor internal components
CredentialTypeSecret = "secret"
)
// Credential keeps the access key and/or secret for the related registry
@ -59,16 +62,19 @@ type Credential struct {
// Data required for the secure access way is not contained here.
// DAO layer is not considered here
type Registry struct {
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type RegistryType `json:"type"`
URL string `json:"url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
ID int64 `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type RegistryType `json:"type"`
URL string `json:"url"`
// CoreURL is only used for local harbor instance to
// avoid the requests passing through the external proxy
CoreURL string `json:"core_url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`
CreationTime time.Time `json:"creation_time"`
UpdateTime time.Time `json:"update_time"`
}
// RegistryQuery defines the query conditions for listing registries

View File

@ -67,13 +67,16 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso
}
flow := c.createFlow(id, policy, resource)
if err = c.flowCtl.Start(flow); err != nil {
if n, err := c.flowCtl.Start(flow); err != nil {
// just update the status text, the status will be updated automatically
// when listing the execution records
if e := c.executionMgr.Update(&models.Execution{
ID: id,
Status: models.ExecutionStatusFailed,
StatusText: err.Error(),
}, "StatusText"); e != nil {
Total: n,
Failed: n,
}, "Status", "StatusText", "Total", "Failed"); e != nil {
log.Errorf("failed to update the execution %d: %v", id, e)
}
log.Errorf("the execution %d failed: %v", id, err)

View File

@ -14,14 +14,15 @@
package flow
// Flow defines replication flow
// Flow defines the replication flow
type Flow interface {
Run(interface{}) error
// returns the count of tasks which have been scheduled and the error
Run(interface{}) (int, error)
}
// Controller is the controller that controls the replication flows
type Controller interface {
Start(Flow) error
Start(Flow) (int, error)
}
// NewController returns an instance of the default flow controller
@ -31,6 +32,6 @@ func NewController() Controller {
type controller struct{}
func (c *controller) Start(flow Flow) error {
func (c *controller) Start(flow Flow) (int, error) {
return flow.Run(nil)
}

View File

@ -17,18 +17,21 @@ package flow
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedFlow struct{}
func (f *fakedFlow) Run(interface{}) error {
return nil
func (f *fakedFlow) Run(interface{}) (int, error) {
return 1, nil
}
func TestStart(t *testing.T) {
flow := &fakedFlow{}
controller := NewController()
err := controller.Start(flow)
n, err := controller.Start(flow)
require.Nil(t, err)
assert.Equal(t, 1, n)
}

View File

@ -43,34 +43,34 @@ func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler,
}
}
func (c *copyFlow) Run(interface{}) error {
func (c *copyFlow) Run(interface{}) (int, error) {
srcAdapter, dstAdapter, err := initialize(c.policy)
if err != nil {
return err
return 0, err
}
srcResources, err := fetchResources(srcAdapter, c.policy)
if err != nil {
return err
return 0, err
}
if len(srcResources) == 0 {
markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID)
return nil
return 0, nil
}
dstNamespaces, err := assembleDestinationNamespaces(srcAdapter, srcResources, c.policy.DestNamespace)
if err != nil {
return err
return 0, err
}
if err = createNamespaces(dstAdapter, dstNamespaces); err != nil {
return err
return 0, err
}
dstResources := assembleDestinationResources(srcResources, c.policy.DestRegistry, c.policy.DestNamespace, c.policy.Override)
items, err := preprocess(c.scheduler, srcResources, dstResources)
if err != nil {
return err
return 0, err
}
if err = createTasks(c.executionMgr, c.executionID, items); err != nil {
return err
return 0, err
}
return schedule(c.scheduler, c.executionMgr, items)
}

View File

@ -13,6 +13,8 @@ package flow
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/require"
)
@ -29,6 +31,7 @@ func TestRunOfCopyFlow(t *testing.T) {
},
}
flow := NewCopyFlow(executionMgr, scheduler, 1, policy)
err := flow.Run(nil)
n, err := flow.Run(nil)
require.Nil(t, err)
assert.Equal(t, 2, n)
}

View File

@ -42,27 +42,27 @@ func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Schedul
}
}
func (d *deletionFlow) Run(interface{}) error {
func (d *deletionFlow) Run(interface{}) (int, error) {
// filling the registry information
for _, resource := range d.resources {
resource.Registry = d.policy.SrcRegistry
}
srcResources, err := filterResources(d.resources, d.policy.Filters)
if err != nil {
return err
return 0, err
}
if len(srcResources) == 0 {
markExecutionSuccess(d.executionMgr, d.executionID, "no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID)
return nil
return 0, nil
}
dstResources := assembleDestinationResources(srcResources, d.policy.DestRegistry, d.policy.DestNamespace, d.policy.Override)
items, err := preprocess(d.scheduler, srcResources, dstResources)
if err != nil {
return err
return 0, err
}
if err = createTasks(d.executionMgr, d.executionID, items); err != nil {
return err
return 0, err
}
return schedule(d.scheduler, d.executionMgr, items)
}

View File

@ -17,6 +17,8 @@ package flow
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/require"
)
@ -32,8 +34,17 @@ func TestRunOfDeletionFlow(t *testing.T) {
Type: model.RegistryTypeHarbor,
},
}
resources := []*model.Resource{}
resources := []*model.Resource{
{
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
},
}
flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources)
err := flow.Run(nil)
n, err := flow.Run(nil)
require.Nil(t, err)
assert.Equal(t, 1, n)
}

View File

@ -285,13 +285,15 @@ func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.Sc
}
// schedule the replication tasks and update the task's status
func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) error {
// returns the count of tasks which have been scheduled and the error
func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) (int, error) {
results, err := scheduler.Schedule(items)
if err != nil {
return fmt.Errorf("failed to schedule the tasks: %v", err)
return 0, fmt.Errorf("failed to schedule the tasks: %v", err)
}
allFailed := true
n := len(results)
for _, result := range results {
// if the task is failed to be submitted, update the status of the
// task as failure
@ -318,9 +320,9 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite
}
// if all the tasks are failed, return err
if allFailed {
return errors.New("all tasks are failed")
return n, errors.New("all tasks are failed")
}
return nil
return n, nil
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"

View File

@ -395,6 +395,7 @@ func TestSchedule(t *testing.T) {
TaskID: 1,
},
}
err := schedule(sched, mgr, items)
n, err := schedule(sched, mgr, items)
require.Nil(t, err)
assert.Equal(t, 1, n)
}

View File

@ -54,15 +54,16 @@ func Init() error {
return err
}
config.Config = &config.Configuration{
CoreURL: cfg.InternalCoreURL(),
RegistryURL: registryURL,
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
JobserviceURL: cfg.InternalJobServiceURL(),
SecretKey: secretKey,
Secret: cfg.CoreSecret(),
CoreURL: cfg.InternalCoreURL(),
RegistryURL: registryURL,
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
JobserviceURL: cfg.InternalJobServiceURL(),
SecretKey: secretKey,
CoreSecret: cfg.CoreSecret(),
JobserviceSecret: cfg.JobserviceSecret(),
}
// TODO use a global http transport
js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.Secret)
js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.CoreSecret)
// init registry manager
RegistryMgr = registry.NewDefaultManager()
// init policy controller
@ -74,3 +75,6 @@ func Init() error {
log.Debug("the replication initialization completed")
return nil
}
// TODO ping target API is needed as other old replication instances will
// use that

View File

@ -111,8 +111,7 @@ func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error {
return nil
}
// TODO handler the tokenServiceURL
func createRegistry(reg *model.Registry, tokenServiceURL ...string) (adapter.ImageRegistry, error) {
func createRegistry(reg *model.Registry) (adapter.ImageRegistry, error) {
factory, err := adapter.GetFactory(reg.Type)
if err != nil {
return nil, err