diff --git a/src/replication/ng/adapter/harbor/adapter.go b/src/replication/ng/adapter/harbor/adapter.go index 7a2aeb2d0..58f93831b 100644 --- a/src/replication/ng/adapter/harbor/adapter.go +++ b/src/replication/ng/adapter/harbor/adapter.go @@ -17,10 +17,12 @@ package harbor import ( "fmt" "net/http" + // "strconv" common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/http/modifier" + common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" "github.com/goharbor/harbor/src/common/utils/log" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" @@ -31,11 +33,8 @@ import ( // TODO add UT func init() { - // TODO passing coreServiceURL and tokenServiceURL - coreServiceURL := "http://core:8080" - tokenServiceURL := "" if err := adp.RegisterFactory(model.RegistryTypeHarbor, func(registry *model.Registry) (adp.Adapter, error) { - return newAdapter(registry, coreServiceURL, tokenServiceURL), nil + return newAdapter(registry), nil }); err != nil { log.Errorf("failed to register factory for %s: %v", model.RegistryTypeHarbor, err) return @@ -50,11 +49,8 @@ type adapter struct { client *common_http.Client } -// The registry URL and core service URL are different when the adapter -// is created for a local Harbor. If the "coreServicrURL" is null, the -// registry URL will be used as the coreServiceURL instead -func newAdapter(registry *model.Registry, coreServiceURL string, - tokenServiceURL string) *adapter { +func newAdapter(registry *model.Registry) *adapter { + // TODO use the global transport transport := registry_pkg.GetHTTPTransport(registry.Insecure) modifiers := []modifier.Modifier{ &auth.UserAgentModifier{ @@ -62,15 +58,23 @@ func newAdapter(registry *model.Registry, coreServiceURL string, }, } if registry.Credential != nil { - authorizer := auth.NewBasicAuthCredential( - registry.Credential.AccessKey, - registry.Credential.AccessSecret) + var authorizer modifier.Modifier + if registry.Credential.Type == model.CredentialTypeSecret { + authorizer = common_http_auth.NewSecretAuthorizer(registry.Credential.AccessSecret) + } else { + authorizer = auth.NewBasicAuthCredential( + registry.Credential.AccessKey, + registry.Credential.AccessSecret) + } modifiers = append(modifiers, authorizer) } + // The registry URL and core service URL are different when the adapter + // is created for a local Harbor. If the "registry.CoreURL" is null, the + // registry URL will be used as the coreServiceURL instead url := registry.URL - if len(coreServiceURL) > 0 { - url = coreServiceURL + if len(registry.CoreURL) > 0 { + url = registry.CoreURL } return &adapter{ @@ -80,7 +84,7 @@ func newAdapter(registry *model.Registry, coreServiceURL string, &http.Client{ Transport: transport, }, modifiers...), - DefaultImageRegistry: adp.NewDefaultImageRegistry(registry, tokenServiceURL), + DefaultImageRegistry: adp.NewDefaultImageRegistry(registry), } } diff --git a/src/replication/ng/adapter/image_registry.go b/src/replication/ng/adapter/image_registry.go index 8bf769756..2b61123e8 100644 --- a/src/replication/ng/adapter/image_registry.go +++ b/src/replication/ng/adapter/image_registry.go @@ -16,6 +16,7 @@ package adapter import ( "errors" + "fmt" "io" "net/http" "strings" @@ -24,6 +25,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/manifest/schema1" "github.com/goharbor/harbor/src/common/http/modifier" + common_http_auth "github.com/goharbor/harbor/src/common/http/modifier/auth" registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" "github.com/goharbor/harbor/src/common/utils/registry/auth" "github.com/goharbor/harbor/src/replication/ng/model" @@ -55,10 +57,8 @@ type DefaultImageRegistry struct { clients map[string]*registry_pkg.Repository } -// TODO: passing the tokenServiceURL - // NewDefaultImageRegistry returns an instance of DefaultImageRegistry -func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string) *DefaultImageRegistry { +func NewDefaultImageRegistry(registry *model.Registry) *DefaultImageRegistry { // use the same HTTP connection pool for all clients transport := registry_pkg.GetHTTPTransport(registry.Insecure) modifiers := []modifier.Modifier{ @@ -67,12 +67,23 @@ func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string }, } if registry.Credential != nil { - cred := auth.NewBasicAuthCredential( - registry.Credential.AccessKey, - registry.Credential.AccessSecret) + var cred modifier.Modifier + if registry.Credential.Type == model.CredentialTypeSecret { + cred = common_http_auth.NewSecretAuthorizer(registry.Credential.AccessSecret) + } else { + cred = auth.NewBasicAuthCredential( + registry.Credential.AccessKey, + registry.Credential.AccessSecret) + } + tokenServiceURL := "" + // the registry is a local Harbor instance if the core URL is specified, + // use the internal token service URL instead + if len(registry.CoreURL) > 0 { + tokenServiceURL = fmt.Sprintf("%s/service/token", registry.CoreURL) + } authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ Transport: transport, - }, cred, tokenServiceURL...) + }, cred, tokenServiceURL) modifiers = append(modifiers, authorizer) } diff --git a/src/replication/ng/config/config.go b/src/replication/ng/config/config.go index 00a9dad9f..27169a812 100644 --- a/src/replication/ng/config/config.go +++ b/src/replication/ng/config/config.go @@ -26,5 +26,7 @@ type Configuration struct { TokenServiceURL string JobserviceURL string SecretKey string - Secret string + // TODO consider to use a specified secret for replication + CoreSecret string + JobserviceSecret string } diff --git a/src/replication/ng/event/handler.go b/src/replication/ng/event/handler.go index 83227a25d..7141c0699 100644 --- a/src/replication/ng/event/handler.go +++ b/src/replication/ng/event/handler.go @@ -154,15 +154,15 @@ func getRegistry(registryMgr registry.Manager, registry *model.Registry) (*model func getLocalRegistry() *model.Registry { return &model.Registry{ - Type: model.RegistryTypeHarbor, - Name: "Local", - URL: config.Config.RegistryURL, - Status: "healthy", - // TODO use the service account + Type: model.RegistryTypeHarbor, + Name: "Local", + URL: config.Config.RegistryURL, + CoreURL: config.Config.CoreURL, + Status: "healthy", Credential: &model.Credential{ - Type: model.CredentialTypeBasic, - AccessKey: "admin", - AccessSecret: "Harbor12345", + Type: model.CredentialTypeSecret, + // use secret to do the auth for the local Harbor + AccessSecret: config.Config.JobserviceSecret, }, Insecure: true, } diff --git a/src/replication/ng/model/registry.go b/src/replication/ng/model/registry.go index 4efa8d0d8..853d71c96 100644 --- a/src/replication/ng/model/registry.go +++ b/src/replication/ng/model/registry.go @@ -36,11 +36,14 @@ type RegistryType string // e.g: u/p, OAuth token type CredentialType string +// const definitions const ( // CredentialTypeBasic indicates credential by user name, password CredentialTypeBasic = "basic" // CredentialTypeOAuth indicates credential by OAuth token CredentialTypeOAuth = "oauth" + // CredentialTypeSecret is only used by the communication of Harbor internal components + CredentialTypeSecret = "secret" ) // Credential keeps the access key and/or secret for the related registry @@ -59,16 +62,19 @@ type Credential struct { // Data required for the secure access way is not contained here. // DAO layer is not considered here type Registry struct { - ID int64 `json:"id"` - Name string `json:"name"` - Description string `json:"description"` - Type RegistryType `json:"type"` - URL string `json:"url"` - Credential *Credential `json:"credential"` - Insecure bool `json:"insecure"` - Status string `json:"status"` - CreationTime time.Time `json:"creation_time"` - UpdateTime time.Time `json:"update_time"` + ID int64 `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Type RegistryType `json:"type"` + URL string `json:"url"` + // CoreURL is only used for local harbor instance to + // avoid the requests passing through the external proxy + CoreURL string `json:"core_url"` + Credential *Credential `json:"credential"` + Insecure bool `json:"insecure"` + Status string `json:"status"` + CreationTime time.Time `json:"creation_time"` + UpdateTime time.Time `json:"update_time"` } // RegistryQuery defines the query conditions for listing registries diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 73ed0616d..924bd8977 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -67,13 +67,16 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso } flow := c.createFlow(id, policy, resource) - if err = c.flowCtl.Start(flow); err != nil { + if n, err := c.flowCtl.Start(flow); err != nil { // just update the status text, the status will be updated automatically // when listing the execution records if e := c.executionMgr.Update(&models.Execution{ ID: id, + Status: models.ExecutionStatusFailed, StatusText: err.Error(), - }, "StatusText"); e != nil { + Total: n, + Failed: n, + }, "Status", "StatusText", "Total", "Failed"); e != nil { log.Errorf("failed to update the execution %d: %v", id, e) } log.Errorf("the execution %d failed: %v", id, err) diff --git a/src/replication/ng/operation/flow/controller.go b/src/replication/ng/operation/flow/controller.go index a96856e41..2119cfa3f 100644 --- a/src/replication/ng/operation/flow/controller.go +++ b/src/replication/ng/operation/flow/controller.go @@ -14,14 +14,15 @@ package flow -// Flow defines replication flow +// Flow defines the replication flow type Flow interface { - Run(interface{}) error + // returns the count of tasks which have been scheduled and the error + Run(interface{}) (int, error) } // Controller is the controller that controls the replication flows type Controller interface { - Start(Flow) error + Start(Flow) (int, error) } // NewController returns an instance of the default flow controller @@ -31,6 +32,6 @@ func NewController() Controller { type controller struct{} -func (c *controller) Start(flow Flow) error { +func (c *controller) Start(flow Flow) (int, error) { return flow.Run(nil) } diff --git a/src/replication/ng/operation/flow/controller_test.go b/src/replication/ng/operation/flow/controller_test.go index f08f90af7..f06f5662a 100644 --- a/src/replication/ng/operation/flow/controller_test.go +++ b/src/replication/ng/operation/flow/controller_test.go @@ -17,18 +17,21 @@ package flow import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type fakedFlow struct{} -func (f *fakedFlow) Run(interface{}) error { - return nil +func (f *fakedFlow) Run(interface{}) (int, error) { + return 1, nil } func TestStart(t *testing.T) { flow := &fakedFlow{} controller := NewController() - err := controller.Start(flow) + n, err := controller.Start(flow) require.Nil(t, err) + assert.Equal(t, 1, n) } diff --git a/src/replication/ng/operation/flow/copy.go b/src/replication/ng/operation/flow/copy.go index f0eb1968e..f5e29ffc9 100644 --- a/src/replication/ng/operation/flow/copy.go +++ b/src/replication/ng/operation/flow/copy.go @@ -43,34 +43,34 @@ func NewCopyFlow(executionMgr execution.Manager, scheduler scheduler.Scheduler, } } -func (c *copyFlow) Run(interface{}) error { +func (c *copyFlow) Run(interface{}) (int, error) { srcAdapter, dstAdapter, err := initialize(c.policy) if err != nil { - return err + return 0, err } srcResources, err := fetchResources(srcAdapter, c.policy) if err != nil { - return err + return 0, err } if len(srcResources) == 0 { markExecutionSuccess(c.executionMgr, c.executionID, "no resources need to be replicated") log.Infof("no resources need to be replicated for the execution %d, skip", c.executionID) - return nil + return 0, nil } dstNamespaces, err := assembleDestinationNamespaces(srcAdapter, srcResources, c.policy.DestNamespace) if err != nil { - return err + return 0, err } if err = createNamespaces(dstAdapter, dstNamespaces); err != nil { - return err + return 0, err } dstResources := assembleDestinationResources(srcResources, c.policy.DestRegistry, c.policy.DestNamespace, c.policy.Override) items, err := preprocess(c.scheduler, srcResources, dstResources) if err != nil { - return err + return 0, err } if err = createTasks(c.executionMgr, c.executionID, items); err != nil { - return err + return 0, err } return schedule(c.scheduler, c.executionMgr, items) } diff --git a/src/replication/ng/operation/flow/copy_test.go b/src/replication/ng/operation/flow/copy_test.go index f5bcd9ad0..c21969728 100644 --- a/src/replication/ng/operation/flow/copy_test.go +++ b/src/replication/ng/operation/flow/copy_test.go @@ -13,6 +13,8 @@ package flow import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/goharbor/harbor/src/replication/ng/model" "github.com/stretchr/testify/require" ) @@ -29,6 +31,7 @@ func TestRunOfCopyFlow(t *testing.T) { }, } flow := NewCopyFlow(executionMgr, scheduler, 1, policy) - err := flow.Run(nil) + n, err := flow.Run(nil) require.Nil(t, err) + assert.Equal(t, 2, n) } diff --git a/src/replication/ng/operation/flow/deletion.go b/src/replication/ng/operation/flow/deletion.go index 9c61d36c7..68c1af502 100644 --- a/src/replication/ng/operation/flow/deletion.go +++ b/src/replication/ng/operation/flow/deletion.go @@ -42,27 +42,27 @@ func NewDeletionFlow(executionMgr execution.Manager, scheduler scheduler.Schedul } } -func (d *deletionFlow) Run(interface{}) error { +func (d *deletionFlow) Run(interface{}) (int, error) { // filling the registry information for _, resource := range d.resources { resource.Registry = d.policy.SrcRegistry } srcResources, err := filterResources(d.resources, d.policy.Filters) if err != nil { - return err + return 0, err } if len(srcResources) == 0 { markExecutionSuccess(d.executionMgr, d.executionID, "no resources need to be replicated") log.Infof("no resources need to be replicated for the execution %d, skip", d.executionID) - return nil + return 0, nil } dstResources := assembleDestinationResources(srcResources, d.policy.DestRegistry, d.policy.DestNamespace, d.policy.Override) items, err := preprocess(d.scheduler, srcResources, dstResources) if err != nil { - return err + return 0, err } if err = createTasks(d.executionMgr, d.executionID, items); err != nil { - return err + return 0, err } return schedule(d.scheduler, d.executionMgr, items) } diff --git a/src/replication/ng/operation/flow/deletion_test.go b/src/replication/ng/operation/flow/deletion_test.go index b923cf531..6644efe63 100644 --- a/src/replication/ng/operation/flow/deletion_test.go +++ b/src/replication/ng/operation/flow/deletion_test.go @@ -17,6 +17,8 @@ package flow import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/goharbor/harbor/src/replication/ng/model" "github.com/stretchr/testify/require" ) @@ -32,8 +34,17 @@ func TestRunOfDeletionFlow(t *testing.T) { Type: model.RegistryTypeHarbor, }, } - resources := []*model.Resource{} + resources := []*model.Resource{ + { + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + }, + } flow := NewDeletionFlow(executionMgr, scheduler, 1, policy, resources) - err := flow.Run(nil) + n, err := flow.Run(nil) require.Nil(t, err) + assert.Equal(t, 1, n) } diff --git a/src/replication/ng/operation/flow/stage.go b/src/replication/ng/operation/flow/stage.go index 9756a4f77..f47b80b4a 100644 --- a/src/replication/ng/operation/flow/stage.go +++ b/src/replication/ng/operation/flow/stage.go @@ -285,13 +285,15 @@ func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.Sc } // schedule the replication tasks and update the task's status -func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) error { +// returns the count of tasks which have been scheduled and the error +func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, items []*scheduler.ScheduleItem) (int, error) { results, err := scheduler.Schedule(items) if err != nil { - return fmt.Errorf("failed to schedule the tasks: %v", err) + return 0, fmt.Errorf("failed to schedule the tasks: %v", err) } allFailed := true + n := len(results) for _, result := range results { // if the task is failed to be submitted, update the status of the // task as failure @@ -318,9 +320,9 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite } // if all the tasks are failed, return err if allFailed { - return errors.New("all tasks are failed") + return n, errors.New("all tasks are failed") } - return nil + return n, nil } // return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" diff --git a/src/replication/ng/operation/flow/stage_test.go b/src/replication/ng/operation/flow/stage_test.go index 6edde44c1..ecc134d8d 100644 --- a/src/replication/ng/operation/flow/stage_test.go +++ b/src/replication/ng/operation/flow/stage_test.go @@ -395,6 +395,7 @@ func TestSchedule(t *testing.T) { TaskID: 1, }, } - err := schedule(sched, mgr, items) + n, err := schedule(sched, mgr, items) require.Nil(t, err) + assert.Equal(t, 1, n) } diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index 34290b663..873e3d726 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -54,15 +54,16 @@ func Init() error { return err } config.Config = &config.Configuration{ - CoreURL: cfg.InternalCoreURL(), - RegistryURL: registryURL, - TokenServiceURL: cfg.InternalTokenServiceEndpoint(), - JobserviceURL: cfg.InternalJobServiceURL(), - SecretKey: secretKey, - Secret: cfg.CoreSecret(), + CoreURL: cfg.InternalCoreURL(), + RegistryURL: registryURL, + TokenServiceURL: cfg.InternalTokenServiceEndpoint(), + JobserviceURL: cfg.InternalJobServiceURL(), + SecretKey: secretKey, + CoreSecret: cfg.CoreSecret(), + JobserviceSecret: cfg.JobserviceSecret(), } // TODO use a global http transport - js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.Secret) + js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.CoreSecret) // init registry manager RegistryMgr = registry.NewDefaultManager() // init policy controller @@ -74,3 +75,6 @@ func Init() error { log.Debug("the replication initialization completed") return nil } + +// TODO ping target API is needed as other old replication instances will +// use that diff --git a/src/replication/ng/transfer/repository/transfer.go b/src/replication/ng/transfer/repository/transfer.go index ab95dd776..891098fdd 100644 --- a/src/replication/ng/transfer/repository/transfer.go +++ b/src/replication/ng/transfer/repository/transfer.go @@ -111,8 +111,7 @@ func (t *transfer) initialize(src *model.Resource, dst *model.Resource) error { return nil } -// TODO handler the tokenServiceURL -func createRegistry(reg *model.Registry, tokenServiceURL ...string) (adapter.ImageRegistry, error) { +func createRegistry(reg *model.Registry) (adapter.ImageRegistry, error) { factory, err := adapter.GetFactory(reg.Type) if err != nil { return nil, err