Fix bug in replication

This commit fixes bugs found in the implement of replciation NG

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-03-10 11:14:23 +08:00
parent 2d1bf58a88
commit 258b22a9a5
10 changed files with 136 additions and 100 deletions

View File

@ -13,8 +13,8 @@ const (
ImageReplicate = "IMAGE_REPLICATE"
// ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC"
// ImageReplication : the name of image replication job in job service
ImageReplication = "IMAGE_REPLICATION"
// Replication : the name of the replication job in job service
Replication = "REPLICATION"
// JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"

View File

@ -166,7 +166,10 @@ func (r *ReplicationOperationAPI) ListTasks() {
query := &models.TaskQuery{
ExecutionID: executionID,
ResourceType: r.GetString("resource_type"),
Statuses: []string{r.GetString("status")},
}
status := r.GetString("status")
if len(status) > 0 {
query.Statuses = []string{status}
}
query.Page, query.Size = r.GetPaginationParams()
total, tasks, err := ng.OperationCtl.ListTasks(query)

View File

@ -26,6 +26,8 @@ import (
_ "github.com/goharbor/harbor/src/replication/ng/transfer/chart"
// import repository transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/repository"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
)
// Replication implements the job interface

View File

@ -212,7 +212,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageReplication: (*ng.Replication)(nil),
job.Replication: (*ng.Replication)(nil),
}); err != nil {
// exit
return nil, err

View File

@ -17,7 +17,7 @@ package harbor
import (
"fmt"
"net/http"
"strconv"
// "strconv"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier"
@ -97,7 +97,11 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
Metadata map[string]interface{} `json:"metadata"`
}{
Name: namespace.Name,
Metadata: namespace.Metadata,
}
// TODO
/*
// handle the public of the project
if meta, exist := namespace.Metadata["public"]; exist {
public := true
@ -117,6 +121,7 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
"public": public,
}
}
*/
err := a.client.Post(a.coreServiceURL+"/api/projects", project)
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict {

View File

@ -77,6 +77,13 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error
return id, nil
}
// if no resources need to be replicated, mark the execution success and return
if len(flow.srcResources) == 0 {
flow.markExecutionSuccess("no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", id)
return id, nil
}
// create the namespace on the destination registry
if err = flow.createNamespace(); err != nil {
log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err)

View File

@ -39,7 +39,8 @@ type flow struct {
srcAdapter adapter.Adapter
dstAdapter adapter.Adapter
executionID int64
resources []*model.Resource
srcResources []*model.Resource
dstResources []*model.Resource
executionMgr execution.Manager
scheduler scheduler.Scheduler
scheduleItems []*scheduler.ScheduleItem
@ -151,7 +152,7 @@ func (f *flow) fetchResources() error {
}
// TODO consider whether the logic can be refactored by using reflect
resources := []*model.Resource{}
srcResources := []*model.Resource{}
for _, typ := range resTypes {
if typ == model.ResourceTypeRepository {
reg, ok := f.srcAdapter.(adapter.ImageRegistry)
@ -165,12 +166,38 @@ func (f *flow) fetchResources() error {
f.markExecutionFailure(err)
return err
}
resources = append(resources, res...)
srcResources = append(srcResources, res...)
continue
}
// TODO add support for chart
}
f.resources = resources
dstResources := []*model.Resource{}
for _, srcResource := range srcResources {
dstResource := &model.Resource{
Type: srcResource.Type,
Metadata: &model.ResourceMetadata{
Name: srcResource.Metadata.Name,
Namespace: srcResource.Metadata.Namespace,
Vtags: srcResource.Metadata.Vtags,
},
Registry: f.dstRegistry,
ExtendedInfo: srcResource.ExtendedInfo,
Deleted: srcResource.Deleted,
Override: f.policy.Override,
}
// TODO check whether the logic is applied to chart
// if the destination namespace is specified, use the specified one
if len(f.policy.DestNamespace) > 0 {
dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name,
srcResource.Metadata.Namespace, f.policy.DestNamespace, 1)
dstResource.Metadata.Namespace = f.policy.DestNamespace
}
dstResources = append(dstResources, dstResource)
}
f.srcResources = srcResources
f.dstResources = dstResources
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
return nil
@ -201,55 +228,36 @@ func (f *flow) createNamespace() error {
// },
// },
// }
metadata := map[string]interface{}{}
for _, srcNamespace := range f.policy.SrcNamespaces {
namespace, err := f.srcAdapter.GetNamespace(srcNamespace)
// TODO merge the metadata of different namespaces
namespaces := []*model.Namespace{}
for i, resource := range f.dstResources {
namespace := &model.Namespace{
Name: resource.Metadata.Namespace,
}
// get the metadata of the namespace from the source registry
ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace)
if err != nil {
f.markExecutionFailure(err)
return err
}
for key, value := range namespace.Metadata {
var m map[string]interface{}
if metadata[key] == nil {
m = map[string]interface{}{}
} else {
m = metadata[key].(map[string]interface{})
}
m[namespace.Name] = value
}
namespace.Metadata = ns.Metadata
namespaces = append(namespaces, namespace)
}
if err := f.dstAdapter.CreateNamespace(&model.Namespace{
Name: f.policy.DestNamespace,
Metadata: metadata,
}); err != nil {
for _, namespace := range namespaces {
if err := f.dstAdapter.CreateNamespace(namespace); err != nil {
f.markExecutionFailure(err)
return err
}
log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID)
log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID)
}
return nil
}
func (f *flow) preprocess() error {
dstResources := []*model.Resource{}
for _, srcResource := range f.resources {
dstResource := &model.Resource{
Type: srcResource.Type,
Metadata: &model.ResourceMetadata{
Name: srcResource.Metadata.Name,
Namespace: f.policy.DestNamespace,
Vtags: srcResource.Metadata.Vtags,
},
Registry: f.dstRegistry,
ExtendedInfo: srcResource.ExtendedInfo,
Deleted: srcResource.Deleted,
Override: f.policy.Override,
}
dstResources = append(dstResources, dstResource)
}
items, err := f.scheduler.Preprocess(f.resources, dstResources)
items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources)
if err != nil {
f.markExecutionFailure(err)
return err
@ -277,6 +285,7 @@ func (f *flow) createTasks() error {
f.markExecutionFailure(err)
return err
}
item.TaskID = id
log.Debugf("task record %d for the execution %d created",
id, f.executionID)
@ -311,7 +320,7 @@ func (f *flow) schedule() error {
ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(),
}); err != nil {
}, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update task %d: %v", result.TaskID, err)
}
log.Debugf("the task %d scheduled", result.TaskID)
@ -344,6 +353,20 @@ func (f *flow) markExecutionFailure(err error) {
}
}
func (f *flow) markExecutionSuccess(msg string) {
log.Debugf("the execution %d is marked as success", f.executionID)
err := f.executionMgr.Update(
&models.Execution{
ID: f.executionID,
Status: models.ExecutionStatusSucceed,
StatusText: msg,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
}
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
// if the resource has vtags
func getResourceName(res *model.Resource) string {

View File

@ -19,17 +19,15 @@ package ng
import (
"fmt"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
)
var (
@ -49,8 +47,8 @@ func Init() error {
PolicyMgr = policy.NewDefaultManager()
// init ExecutionMgr
executionMgr := execution.NewDefaultManager()
// TODO init scheduler
var scheduler scheduler.Scheduler
// init scheduler
scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret())
flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler)
if err != nil {

View File

@ -27,15 +27,16 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
type defaultScheduler struct {
client job.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client,
// TODO use the service account?
// NewScheduler returns an instance of Scheduler
func NewScheduler(jobserviceURL, secret string) Scheduler {
return &defaultScheduler{
client: job.NewDefaultClient(jobserviceURL, secret),
}
}
@ -66,7 +67,7 @@ type Scheduler interface {
}
// Preprocess the resources and returns the item list that can be scheduled
func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) {
func (d *defaultScheduler) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) {
if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources")
return nil, err
@ -85,7 +86,7 @@ func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResou
}
// Schedule transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) {
func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) {
var results []*ScheduleResult
for _, item := range items {
result := &ScheduleResult{
@ -103,7 +104,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult,
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID),
}
job.Name = common_job.ImageTransfer
job.Name = common_job.Replication
src, err := json.Marshal(item.SrcResource)
if err != nil {
result.Error = err
@ -133,8 +134,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult,
}
// Stop the transfer job
func (d *DefaultReplicator) Stop(id string) error {
func (d *defaultScheduler) Stop(id string) error {
err := d.client.PostAction(id, opm.CtlCommandStop)
if err != nil {
return err

View File

@ -8,10 +8,8 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
var replicator *DefaultReplicator
func init() {
replicator = NewDefaultReplicator(TestClient{})
var scheduler = &defaultScheduler{
client: TestClient{},
}
type TestClient struct {
@ -27,7 +25,7 @@ func (client TestClient) PostAction(uuid, action string) error {
return nil
}
func TestDefaultReplicator_Preprocess(t *testing.T) {
func TestPreprocess(t *testing.T) {
items, err := generateData()
if err != nil {
t.Error(err)
@ -42,8 +40,8 @@ func TestDefaultReplicator_Preprocess(t *testing.T) {
}
func TestDefaultReplicator_Stop(t *testing.T) {
err := replicator.Stop("id")
func TestStop(t *testing.T) {
err := scheduler.Stop("id")
if err != nil {
t.Error(err)
}
@ -70,6 +68,6 @@ func generateData() ([]*ScheduleItem, error) {
Credential: &model.Credential{},
},
}
items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource})
items, err := scheduler.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource})
return items, err
}