mirror of
https://github.com/goharbor/harbor.git
synced 2025-03-10 21:52:11 +01:00
Add Harbor adapter for replication
Implement the replication adapter for Harbor registry Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
689412b4d0
commit
cabef73980
src
@ -278,7 +278,7 @@ func NewStandardTokenAuthorizer(client *http.Client, credential Credential,
|
|||||||
// 1. performance issue
|
// 1. performance issue
|
||||||
// 2. the realm field returned by registry is an IP which can not reachable
|
// 2. the realm field returned by registry is an IP which can not reachable
|
||||||
// inside Harbor
|
// inside Harbor
|
||||||
if len(customizedTokenService) > 0 {
|
if len(customizedTokenService) > 0 && len(customizedTokenService[0]) > 0 {
|
||||||
generator.realm = customizedTokenService[0]
|
generator.realm = customizedTokenService[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ func fakedFactory(*model.Registry) (adapter.Adapter, error) {
|
|||||||
func TestReplicationAdapterAPIGet(t *testing.T) {
|
func TestReplicationAdapterAPIGet(t *testing.T) {
|
||||||
err := adapter.RegisterFactory(
|
err := adapter.RegisterFactory(
|
||||||
&adapter.Info{
|
&adapter.Info{
|
||||||
Type: "harbor",
|
Type: "test",
|
||||||
SupportedResourceTypes: []model.ResourceType{"image"},
|
SupportedResourceTypes: []model.ResourceType{"image"},
|
||||||
}, fakedFactory)
|
}, fakedFactory)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
@ -73,7 +73,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) {
|
|||||||
{
|
{
|
||||||
request: &testingRequest{
|
request: &testingRequest{
|
||||||
method: http.MethodGet,
|
method: http.MethodGet,
|
||||||
url: "/api/replication/adapters/harbor",
|
url: "/api/replication/adapters/test",
|
||||||
},
|
},
|
||||||
code: http.StatusUnauthorized,
|
code: http.StatusUnauthorized,
|
||||||
},
|
},
|
||||||
@ -81,7 +81,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) {
|
|||||||
{
|
{
|
||||||
request: &testingRequest{
|
request: &testingRequest{
|
||||||
method: http.MethodGet,
|
method: http.MethodGet,
|
||||||
url: "/api/replication/adapters/harbor",
|
url: "/api/replication/adapters/test",
|
||||||
credential: nonSysAdmin,
|
credential: nonSysAdmin,
|
||||||
},
|
},
|
||||||
code: http.StatusForbidden,
|
code: http.StatusForbidden,
|
||||||
@ -99,7 +99,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) {
|
|||||||
{
|
{
|
||||||
request: &testingRequest{
|
request: &testingRequest{
|
||||||
method: http.MethodGet,
|
method: http.MethodGet,
|
||||||
url: "/api/replication/adapters/harbor",
|
url: "/api/replication/adapters/test",
|
||||||
credential: sysAdmin,
|
credential: sysAdmin,
|
||||||
},
|
},
|
||||||
code: http.StatusOK,
|
code: http.StatusOK,
|
||||||
|
@ -74,8 +74,11 @@ func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac
|
|||||||
// ListExecutions ...
|
// ListExecutions ...
|
||||||
func (r *ReplicationOperationAPI) ListExecutions() {
|
func (r *ReplicationOperationAPI) ListExecutions() {
|
||||||
query := &models.ExecutionQuery{
|
query := &models.ExecutionQuery{
|
||||||
Statuses: []string{r.GetString("status")},
|
Trigger: r.GetString("trigger"),
|
||||||
Trigger: r.GetString("trigger"),
|
}
|
||||||
|
|
||||||
|
if len(r.GetString("status")) > 0 {
|
||||||
|
query.Statuses = []string{r.GetString("status")}
|
||||||
}
|
}
|
||||||
if len(r.GetString("policy_id")) > 0 {
|
if len(r.GetString("policy_id")) > 0 {
|
||||||
policyID, err := r.GetInt64("policy_id")
|
policyID, err := r.GetInt64("policy_id")
|
||||||
|
@ -15,45 +15,203 @@
|
|||||||
package harbor
|
package harbor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
common_http "github.com/goharbor/harbor/src/common/http"
|
||||||
|
"github.com/goharbor/harbor/src/common/http/modifier"
|
||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
registry_pkg "github.com/goharbor/harbor/src/common/utils/registry"
|
||||||
|
"github.com/goharbor/harbor/src/common/utils/registry/auth"
|
||||||
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
|
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
harbor model.RegistryType = "Harbor"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// TODO add more information to the info
|
// TODO add more information to the info
|
||||||
info := &adp.Info{
|
info := &adp.Info{
|
||||||
Type: harbor,
|
Type: model.RegistryTypeHarbor,
|
||||||
SupportedResourceTypes: []model.ResourceType{model.ResourceTypeRepository},
|
SupportedResourceTypes: []model.ResourceType{model.ResourceTypeRepository},
|
||||||
}
|
}
|
||||||
|
// TODO passing coreServiceURL and tokenServiceURL
|
||||||
|
coreServiceURL := "http://core:8080"
|
||||||
|
tokenServiceURL := ""
|
||||||
if err := adp.RegisterFactory(info, func(registry *model.Registry) (adp.Adapter, error) {
|
if err := adp.RegisterFactory(info, func(registry *model.Registry) (adp.Adapter, error) {
|
||||||
return newAdapter(registry), nil
|
return newAdapter(registry, coreServiceURL, tokenServiceURL), nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Errorf("failed to register factory for %s: %v", harbor, err)
|
log.Errorf("failed to register factory for %s: %v", model.RegistryTypeHarbor, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Infof("the factory for adapter %s registered", harbor)
|
log.Infof("the factory for adapter %s registered", model.RegistryTypeHarbor)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO implement the functions
|
|
||||||
type adapter struct {
|
type adapter struct {
|
||||||
*adp.DefaultImageRegistry
|
*adp.DefaultImageRegistry
|
||||||
|
registry *model.Registry
|
||||||
|
coreServiceURL string
|
||||||
|
client *common_http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAdapter(registry *model.Registry) *adapter {
|
// The registry URL and core service URL are different when the adapter
|
||||||
return &adapter{}
|
// is created for a local Harbor. If the "coreServicrURL" is null, the
|
||||||
|
// registry URL will be used as the coreServiceURL instead
|
||||||
|
func newAdapter(registry *model.Registry, coreServiceURL string,
|
||||||
|
tokenServiceURL string) *adapter {
|
||||||
|
transport := registry_pkg.GetHTTPTransport(registry.Insecure)
|
||||||
|
modifiers := []modifier.Modifier{
|
||||||
|
&auth.UserAgentModifier{
|
||||||
|
UserAgent: adp.UserAgentReplicator,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if registry.Credential != nil {
|
||||||
|
authorizer := auth.NewBasicAuthCredential(
|
||||||
|
registry.Credential.AccessKey,
|
||||||
|
registry.Credential.AccessSecret)
|
||||||
|
modifiers = append(modifiers, authorizer)
|
||||||
|
}
|
||||||
|
|
||||||
|
url := registry.URL
|
||||||
|
if len(coreServiceURL) > 0 {
|
||||||
|
url = coreServiceURL
|
||||||
|
}
|
||||||
|
|
||||||
|
return &adapter{
|
||||||
|
registry: registry,
|
||||||
|
coreServiceURL: url,
|
||||||
|
client: common_http.NewClient(
|
||||||
|
&http.Client{
|
||||||
|
Transport: transport,
|
||||||
|
}, modifiers...),
|
||||||
|
DefaultImageRegistry: adp.NewDefaultImageRegistry(registry, tokenServiceURL),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO implement the function
|
||||||
func (a *adapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
|
func (a *adapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
func (a *adapter) CreateNamespace(*model.Namespace) error {
|
func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
|
||||||
return nil
|
project := &struct {
|
||||||
|
Name string `json:"project_name"`
|
||||||
|
Metadata map[string]interface{} `json:"metadata"`
|
||||||
|
}{
|
||||||
|
Name: namespace.Name,
|
||||||
|
}
|
||||||
|
// handle the public of the project
|
||||||
|
if meta, exist := namespace.Metadata["public"]; exist {
|
||||||
|
public := true
|
||||||
|
// if one of them is "private", the set the public as false
|
||||||
|
for _, value := range meta.(map[string]interface{}) {
|
||||||
|
b, err := strconv.ParseBool(value.(string))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !b {
|
||||||
|
public = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
project.Metadata = map[string]interface{}{
|
||||||
|
"public": public,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := a.client.Post(a.coreServiceURL+"/api/projects", project)
|
||||||
|
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict {
|
||||||
|
log.Debugf("got 409 when trying to create project %s", namespace.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
func (a *adapter) GetNamespace(string) (*model.Namespace, error) {
|
func (a *adapter) GetNamespace(namespace string) (*model.Namespace, error) {
|
||||||
return nil, nil
|
project, err := a.getProject(namespace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &model.Namespace{
|
||||||
|
Name: namespace,
|
||||||
|
Metadata: project.Metadata,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO implement filter
|
||||||
|
func (a *adapter) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
|
||||||
|
resources := []*model.Resource{}
|
||||||
|
for _, namespace := range namespaces {
|
||||||
|
project, err := a.getProject(namespace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
repositories := []*repository{}
|
||||||
|
url := fmt.Sprintf("%s/api/repositories?project_id=%d", a.coreServiceURL, project.ID)
|
||||||
|
if err = a.client.Get(url, &repositories); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, repository := range repositories {
|
||||||
|
url := fmt.Sprintf("%s/api/repositories/%s/tags", a.coreServiceURL, repository.Name)
|
||||||
|
tags := []*tag{}
|
||||||
|
if err = a.client.Get(url, &tags); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
vtags := []string{}
|
||||||
|
for _, tag := range tags {
|
||||||
|
vtags = append(vtags, tag.Name)
|
||||||
|
}
|
||||||
|
resources = append(resources, &model.Resource{
|
||||||
|
Type: model.ResourceTypeRepository,
|
||||||
|
Registry: a.registry,
|
||||||
|
Metadata: &model.ResourceMetadata{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: repository.Name,
|
||||||
|
Vtags: vtags,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type project struct {
|
||||||
|
ID int64 `json:"project_id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Metadata map[string]interface{} `json:"metadata"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type repository struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type tag struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *adapter) getProject(name string) (*project, error) {
|
||||||
|
// TODO need an API to exact match project by name
|
||||||
|
projects := []*project{}
|
||||||
|
url := fmt.Sprintf("%s/api/projects?name=%s&page=1&page_size=1000", a.coreServiceURL, name)
|
||||||
|
if err := a.client.Get(url, &projects); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pro := range projects {
|
||||||
|
if pro.Name == name {
|
||||||
|
p := &project{
|
||||||
|
ID: pro.ID,
|
||||||
|
Name: name,
|
||||||
|
}
|
||||||
|
if pro.Metadata != nil {
|
||||||
|
metadata := map[string]interface{}{}
|
||||||
|
for key, value := range pro.Metadata {
|
||||||
|
metadata[key] = value
|
||||||
|
}
|
||||||
|
p.Metadata = metadata
|
||||||
|
}
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("project %s not found", name)
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ package execution
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/core/utils"
|
"github.com/goharbor/harbor/src/core/utils"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao"
|
"github.com/goharbor/harbor/src/replication/ng/dao"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
@ -63,8 +64,8 @@ type DefaultManager struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewDefaultManager ...
|
// NewDefaultManager ...
|
||||||
func NewDefaultManager() (Manager, error) {
|
func NewDefaultManager() Manager {
|
||||||
return &DefaultManager{}, nil
|
return &DefaultManager{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new execution
|
// Create a new execution
|
||||||
|
@ -1,17 +1,18 @@
|
|||||||
package execution
|
package execution
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/dao"
|
"github.com/goharbor/harbor/src/common/dao"
|
||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var executionManager, _ = NewDefaultManager()
|
var executionManager = NewDefaultManager()
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
databases := []string{"postgresql"}
|
databases := []string{"postgresql"}
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
@ -204,6 +205,7 @@ func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.R
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStartReplication(t *testing.T) {
|
func TestStartReplication(t *testing.T) {
|
||||||
|
config.InitWithSettings(nil)
|
||||||
err := adapter.RegisterFactory(
|
err := adapter.RegisterFactory(
|
||||||
&adapter.Info{
|
&adapter.Info{
|
||||||
Type: "faked_registry",
|
Type: "faked_registry",
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/replication/ng/execution"
|
"github.com/goharbor/harbor/src/replication/ng/execution"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/utils/log"
|
"github.com/goharbor/harbor/src/common/utils/log"
|
||||||
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
"github.com/goharbor/harbor/src/replication/ng/adapter"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
"github.com/goharbor/harbor/src/replication/ng/dao/models"
|
||||||
"github.com/goharbor/harbor/src/replication/ng/model"
|
"github.com/goharbor/harbor/src/replication/ng/model"
|
||||||
@ -53,45 +54,71 @@ func newFlow(policy *model.Policy, registryMgr registry.Manager,
|
|||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
}
|
}
|
||||||
|
|
||||||
// get source registry
|
// TODO consider to put registry model in the policy directly rather than just the registry ID?
|
||||||
srcRegistry, err := registryMgr.Get(policy.SrcRegistryID)
|
url, err := config.RegistryURL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
|
return nil, fmt.Errorf("failed to get the registry URL: %v", err)
|
||||||
}
|
}
|
||||||
if srcRegistry == nil {
|
registry := &model.Registry{
|
||||||
return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
|
Type: model.RegistryTypeHarbor,
|
||||||
|
Name: "Local",
|
||||||
|
URL: url,
|
||||||
|
// TODO use the service account
|
||||||
|
Credential: &model.Credential{
|
||||||
|
Type: model.CredentialTypeBasic,
|
||||||
|
AccessKey: "admin",
|
||||||
|
AccessSecret: "Harbor12345",
|
||||||
|
},
|
||||||
|
Insecure: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// get source registry
|
||||||
|
if policy.SrcRegistryID != 0 {
|
||||||
|
srcRegistry, err := registryMgr.Get(policy.SrcRegistryID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err)
|
||||||
|
}
|
||||||
|
if srcRegistry == nil {
|
||||||
|
return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID)
|
||||||
|
}
|
||||||
|
f.srcRegistry = srcRegistry
|
||||||
|
} else {
|
||||||
|
f.srcRegistry = registry
|
||||||
}
|
}
|
||||||
f.srcRegistry = srcRegistry
|
|
||||||
|
|
||||||
// get destination registry
|
// get destination registry
|
||||||
dstRegistry, err := registryMgr.Get(policy.DestRegistryID)
|
if policy.DestRegistryID != 0 {
|
||||||
if err != nil {
|
dstRegistry, err := registryMgr.Get(policy.DestRegistryID)
|
||||||
return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err)
|
||||||
|
}
|
||||||
|
if dstRegistry == nil {
|
||||||
|
return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
|
||||||
|
}
|
||||||
|
f.dstRegistry = dstRegistry
|
||||||
|
} else {
|
||||||
|
f.dstRegistry = registry
|
||||||
}
|
}
|
||||||
if dstRegistry == nil {
|
|
||||||
return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID)
|
|
||||||
}
|
|
||||||
f.dstRegistry = dstRegistry
|
|
||||||
|
|
||||||
// create the source registry adapter
|
// create the source registry adapter
|
||||||
srcFactory, err := adapter.GetFactory(srcRegistry.Type)
|
srcFactory, err := adapter.GetFactory(f.srcRegistry.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err)
|
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.srcRegistry.Type, err)
|
||||||
}
|
}
|
||||||
srcAdapter, err := srcFactory(srcRegistry)
|
srcAdapter, err := srcFactory(f.srcRegistry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err)
|
return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", f.srcRegistry.URL, err)
|
||||||
}
|
}
|
||||||
f.srcAdapter = srcAdapter
|
f.srcAdapter = srcAdapter
|
||||||
|
|
||||||
// create the destination registry adapter
|
// create the destination registry adapter
|
||||||
dstFactory, err := adapter.GetFactory(dstRegistry.Type)
|
dstFactory, err := adapter.GetFactory(f.dstRegistry.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err)
|
return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.dstRegistry.Type, err)
|
||||||
}
|
}
|
||||||
dstAdapter, err := dstFactory(dstRegistry)
|
dstAdapter, err := dstFactory(f.dstRegistry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err)
|
return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", f.dstRegistry.URL, err)
|
||||||
}
|
}
|
||||||
f.dstAdapter = dstAdapter
|
f.dstAdapter = dstAdapter
|
||||||
|
|
||||||
@ -150,7 +177,30 @@ func (f *flow) fetchResources() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *flow) createNamespace() error {
|
func (f *flow) createNamespace() error {
|
||||||
// merge the metadata of all source namespaces
|
// Merge the metadata of all source namespaces
|
||||||
|
// eg:
|
||||||
|
// We have two source namespaces:
|
||||||
|
// {
|
||||||
|
// Name: "source01",
|
||||||
|
// Metadata: {"public": true}
|
||||||
|
// }
|
||||||
|
// and
|
||||||
|
// {
|
||||||
|
// Name: "source02",
|
||||||
|
// Metadata: {"public": false}
|
||||||
|
// }
|
||||||
|
// The name of the destination namespace is "destination",
|
||||||
|
// after merging the metadata, the destination namespace
|
||||||
|
// looks like this:
|
||||||
|
// {
|
||||||
|
// Name: "destination",
|
||||||
|
// Metadata: {
|
||||||
|
// "public": {
|
||||||
|
// "source01": true,
|
||||||
|
// "source02": false,
|
||||||
|
// },
|
||||||
|
// },
|
||||||
|
// }
|
||||||
metadata := map[string]interface{}{}
|
metadata := map[string]interface{}{}
|
||||||
for _, srcNamespace := range f.policy.SrcNamespaces {
|
for _, srcNamespace := range f.policy.SrcNamespaces {
|
||||||
namespace, err := f.srcAdapter.GetNamespace(srcNamespace)
|
namespace, err := f.srcAdapter.GetNamespace(srcNamespace)
|
||||||
@ -159,7 +209,13 @@ func (f *flow) createNamespace() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for key, value := range namespace.Metadata {
|
for key, value := range namespace.Metadata {
|
||||||
metadata[namespace.Name+":"+key] = value
|
var m map[string]interface{}
|
||||||
|
if metadata[key] == nil {
|
||||||
|
m = map[string]interface{}{}
|
||||||
|
} else {
|
||||||
|
m = metadata[key].(map[string]interface{})
|
||||||
|
}
|
||||||
|
m[namespace.Name] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,7 +338,7 @@ func (f *flow) markExecutionFailure(err error) {
|
|||||||
Status: models.ExecutionStatusFailed,
|
Status: models.ExecutionStatusFailed,
|
||||||
StatusText: statusText,
|
StatusText: statusText,
|
||||||
EndTime: time.Now(),
|
EndTime: time.Now(),
|
||||||
})
|
}, "Status", "StatusText", "EndTime")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
|
||||||
}
|
}
|
||||||
|
@ -47,9 +47,8 @@ func Init() error {
|
|||||||
RegistryMgr = registry.NewDefaultManager()
|
RegistryMgr = registry.NewDefaultManager()
|
||||||
// init policy manager
|
// init policy manager
|
||||||
PolicyMgr = policy.NewDefaultManager()
|
PolicyMgr = policy.NewDefaultManager()
|
||||||
|
// init ExecutionMgr
|
||||||
// TODO init ExecutionMgr
|
executionMgr := execution.NewDefaultManager()
|
||||||
var executionMgr execution.Manager
|
|
||||||
// TODO init scheduler
|
// TODO init scheduler
|
||||||
var scheduler scheduler.Scheduler
|
var scheduler scheduler.Scheduler
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user