From 258b22a9a59c89469d1e52db00272aa552b836ae Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Sun, 10 Mar 2019 11:14:23 +0800 Subject: [PATCH] Fix bug in replication This commit fixes bugs found in the implement of replciation NG Signed-off-by: Wenkai Yin --- src/common/job/const.go | 4 +- src/core/api/replication_execution.go | 5 +- .../job/impl/replication/ng/replication.go | 2 + src/jobservice/runtime/bootstrap.go | 14 +-- src/replication/ng/adapter/harbor/adapter.go | 43 ++++--- src/replication/ng/flow/controller.go | 7 ++ src/replication/ng/flow/flow.go | 109 +++++++++++------- src/replication/ng/replication.go | 16 ++- src/replication/ng/scheduler/scheduler.go | 22 ++-- .../ng/scheduler/scheduler_test.go | 14 +-- 10 files changed, 136 insertions(+), 100 deletions(-) diff --git a/src/common/job/const.go b/src/common/job/const.go index 08876ec68..fa492a477 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -13,8 +13,8 @@ const ( ImageReplicate = "IMAGE_REPLICATE" // ImageGC the name of image garbage collection job in job service ImageGC = "IMAGE_GC" - // ImageReplication : the name of image replication job in job service - ImageReplication = "IMAGE_REPLICATION" + // Replication : the name of the replication job in job service + Replication = "REPLICATION" // JobKindGeneric : Kind of generic job JobKindGeneric = "Generic" diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index f250ba40e..45af40ef6 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -166,7 +166,10 @@ func (r *ReplicationOperationAPI) ListTasks() { query := &models.TaskQuery{ ExecutionID: executionID, ResourceType: r.GetString("resource_type"), - Statuses: []string{r.GetString("status")}, + } + status := r.GetString("status") + if len(status) > 0 { + query.Statuses = []string{status} } query.Page, query.Size = r.GetPaginationParams() total, tasks, err := ng.OperationCtl.ListTasks(query) diff --git a/src/jobservice/job/impl/replication/ng/replication.go b/src/jobservice/job/impl/replication/ng/replication.go index ec0ec5a08..da49eb829 100644 --- a/src/jobservice/job/impl/replication/ng/replication.go +++ b/src/jobservice/job/impl/replication/ng/replication.go @@ -26,6 +26,8 @@ import ( _ "github.com/goharbor/harbor/src/replication/ng/transfer/chart" // import repository transfer _ "github.com/goharbor/harbor/src/replication/ng/transfer/repository" + // register the Harbor adapter + _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" ) // Replication implements the job interface diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index c4b3ed4cb..79e7c0d04 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -206,13 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ - job.ImageScanJob: (*scan.ClairJob)(nil), - job.ImageScanAllJob: (*scan.All)(nil), - job.ImageTransfer: (*replication.Transfer)(nil), - job.ImageDelete: (*replication.Deleter)(nil), - job.ImageReplicate: (*replication.Replicator)(nil), - job.ImageGC: (*gc.GarbageCollector)(nil), - job.ImageReplication: (*ng.Replication)(nil), + job.ImageScanJob: (*scan.ClairJob)(nil), + job.ImageScanAllJob: (*scan.All)(nil), + job.ImageTransfer: (*replication.Transfer)(nil), + job.ImageDelete: (*replication.Deleter)(nil), + job.ImageReplicate: (*replication.Replicator)(nil), + job.ImageGC: (*gc.GarbageCollector)(nil), + job.Replication: (*ng.Replication)(nil), }); err != nil { // exit return nil, err diff --git a/src/replication/ng/adapter/harbor/adapter.go b/src/replication/ng/adapter/harbor/adapter.go index b394486c4..135d1b574 100644 --- a/src/replication/ng/adapter/harbor/adapter.go +++ b/src/replication/ng/adapter/harbor/adapter.go @@ -17,7 +17,7 @@ package harbor import ( "fmt" "net/http" - "strconv" + // "strconv" common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/http/modifier" @@ -96,27 +96,32 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error { Name string `json:"project_name"` Metadata map[string]interface{} `json:"metadata"` }{ - Name: namespace.Name, + Name: namespace.Name, + Metadata: namespace.Metadata, } - // handle the public of the project - if meta, exist := namespace.Metadata["public"]; exist { - public := true - // if one of them is "private", the set the public as false - for _, value := range meta.(map[string]interface{}) { - b, err := strconv.ParseBool(value.(string)) - if err != nil { - return err - } - if !b { - public = false - break - } + // TODO + /* + // handle the public of the project + if meta, exist := namespace.Metadata["public"]; exist { + public := true + // if one of them is "private", the set the public as false + for _, value := range meta.(map[string]interface{}) { + b, err := strconv.ParseBool(value.(string)) + if err != nil { + return err + } + if !b { + public = false + break + } + + } + project.Metadata = map[string]interface{}{ + "public": public, + } } - project.Metadata = map[string]interface{}{ - "public": public, - } - } + */ err := a.client.Post(a.coreServiceURL+"/api/projects", project) if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict { diff --git a/src/replication/ng/flow/controller.go b/src/replication/ng/flow/controller.go index dcb35c0b3..49240249b 100644 --- a/src/replication/ng/flow/controller.go +++ b/src/replication/ng/flow/controller.go @@ -77,6 +77,13 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error return id, nil } + // if no resources need to be replicated, mark the execution success and return + if len(flow.srcResources) == 0 { + flow.markExecutionSuccess("no resources need to be replicated") + log.Infof("no resources need to be replicated for the execution %d, skip", id) + return id, nil + } + // create the namespace on the destination registry if err = flow.createNamespace(); err != nil { log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err) diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index ec43667fb..d45bd852f 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -39,7 +39,8 @@ type flow struct { srcAdapter adapter.Adapter dstAdapter adapter.Adapter executionID int64 - resources []*model.Resource + srcResources []*model.Resource + dstResources []*model.Resource executionMgr execution.Manager scheduler scheduler.Scheduler scheduleItems []*scheduler.ScheduleItem @@ -151,7 +152,7 @@ func (f *flow) fetchResources() error { } // TODO consider whether the logic can be refactored by using reflect - resources := []*model.Resource{} + srcResources := []*model.Resource{} for _, typ := range resTypes { if typ == model.ResourceTypeRepository { reg, ok := f.srcAdapter.(adapter.ImageRegistry) @@ -165,12 +166,38 @@ func (f *flow) fetchResources() error { f.markExecutionFailure(err) return err } - resources = append(resources, res...) + srcResources = append(srcResources, res...) continue } // TODO add support for chart } - f.resources = resources + + dstResources := []*model.Resource{} + for _, srcResource := range srcResources { + dstResource := &model.Resource{ + Type: srcResource.Type, + Metadata: &model.ResourceMetadata{ + Name: srcResource.Metadata.Name, + Namespace: srcResource.Metadata.Namespace, + Vtags: srcResource.Metadata.Vtags, + }, + Registry: f.dstRegistry, + ExtendedInfo: srcResource.ExtendedInfo, + Deleted: srcResource.Deleted, + Override: f.policy.Override, + } + // TODO check whether the logic is applied to chart + // if the destination namespace is specified, use the specified one + if len(f.policy.DestNamespace) > 0 { + dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name, + srcResource.Metadata.Namespace, f.policy.DestNamespace, 1) + dstResource.Metadata.Namespace = f.policy.DestNamespace + } + dstResources = append(dstResources, dstResource) + } + + f.srcResources = srcResources + f.dstResources = dstResources log.Debugf("resources for the execution %d fetched from the source registry", f.executionID) return nil @@ -201,55 +228,36 @@ func (f *flow) createNamespace() error { // }, // }, // } - metadata := map[string]interface{}{} - for _, srcNamespace := range f.policy.SrcNamespaces { - namespace, err := f.srcAdapter.GetNamespace(srcNamespace) + // TODO merge the metadata of different namespaces + namespaces := []*model.Namespace{} + for i, resource := range f.dstResources { + namespace := &model.Namespace{ + Name: resource.Metadata.Namespace, + } + // get the metadata of the namespace from the source registry + ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace) if err != nil { f.markExecutionFailure(err) return err } - for key, value := range namespace.Metadata { - var m map[string]interface{} - if metadata[key] == nil { - m = map[string]interface{}{} - } else { - m = metadata[key].(map[string]interface{}) - } - m[namespace.Name] = value + namespace.Metadata = ns.Metadata + namespaces = append(namespaces, namespace) + } + + for _, namespace := range namespaces { + if err := f.dstAdapter.CreateNamespace(namespace); err != nil { + f.markExecutionFailure(err) + return err } + + log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID) } - if err := f.dstAdapter.CreateNamespace(&model.Namespace{ - Name: f.policy.DestNamespace, - Metadata: metadata, - }); err != nil { - f.markExecutionFailure(err) - return err - } - - log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID) return nil } func (f *flow) preprocess() error { - dstResources := []*model.Resource{} - for _, srcResource := range f.resources { - dstResource := &model.Resource{ - Type: srcResource.Type, - Metadata: &model.ResourceMetadata{ - Name: srcResource.Metadata.Name, - Namespace: f.policy.DestNamespace, - Vtags: srcResource.Metadata.Vtags, - }, - Registry: f.dstRegistry, - ExtendedInfo: srcResource.ExtendedInfo, - Deleted: srcResource.Deleted, - Override: f.policy.Override, - } - dstResources = append(dstResources, dstResource) - } - - items, err := f.scheduler.Preprocess(f.resources, dstResources) + items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources) if err != nil { f.markExecutionFailure(err) return err @@ -277,6 +285,7 @@ func (f *flow) createTasks() error { f.markExecutionFailure(err) return err } + item.TaskID = id log.Debugf("task record %d for the execution %d created", id, f.executionID) @@ -311,7 +320,7 @@ func (f *flow) schedule() error { ID: result.TaskID, JobID: result.JobID, StartTime: time.Now(), - }); err != nil { + }, "JobID", "StartTime"); err != nil { log.Errorf("failed to update task %d: %v", result.TaskID, err) } log.Debugf("the task %d scheduled", result.TaskID) @@ -344,6 +353,20 @@ func (f *flow) markExecutionFailure(err error) { } } +func (f *flow) markExecutionSuccess(msg string) { + log.Debugf("the execution %d is marked as success", f.executionID) + err := f.executionMgr.Update( + &models.Execution{ + ID: f.executionID, + Status: models.ExecutionStatusSucceed, + StatusText: msg, + EndTime: time.Now(), + }, "Status", "StatusText", "EndTime") + if err != nil { + log.Errorf("failed to update the execution %d: %v", f.executionID, err) + } +} + // return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" // if the resource has vtags func getResourceName(res *model.Resource) string { diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index e5e79ed31..8f12a7577 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -19,17 +19,15 @@ package ng import ( "fmt" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/execution" - - "github.com/goharbor/harbor/src/replication/ng/policy" - - "github.com/goharbor/harbor/src/replication/ng/scheduler" - - // register the Harbor adapter - _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/operation" + "github.com/goharbor/harbor/src/replication/ng/policy" "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" + // register the Harbor adapter + _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" ) var ( @@ -49,8 +47,8 @@ func Init() error { PolicyMgr = policy.NewDefaultManager() // init ExecutionMgr executionMgr := execution.NewDefaultManager() - // TODO init scheduler - var scheduler scheduler.Scheduler + // init scheduler + scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret()) flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler) if err != nil { diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index 17a2013af..47a93d83f 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -27,15 +27,16 @@ import ( "github.com/goharbor/harbor/src/replication/ng/model" ) -// DefaultReplicator provides a default implement for Replicator -type DefaultReplicator struct { +type defaultScheduler struct { client job.Client } -// NewDefaultReplicator returns an instance of DefaultReplicator -func NewDefaultReplicator(client job.Client) *DefaultReplicator { - return &DefaultReplicator{ - client: client, +// TODO use the service account? + +// NewScheduler returns an instance of Scheduler +func NewScheduler(jobserviceURL, secret string) Scheduler { + return &defaultScheduler{ + client: job.NewDefaultClient(jobserviceURL, secret), } } @@ -66,7 +67,7 @@ type Scheduler interface { } // Preprocess the resources and returns the item list that can be scheduled -func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { +func (d *defaultScheduler) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { if len(srcResources) != len(destResources) { err := errors.New("srcResources has different length with destResources") return nil, err @@ -85,7 +86,7 @@ func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResou } // Schedule transfer the tasks to jobs,and then submit these jobs to job service. -func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { +func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { var results []*ScheduleResult for _, item := range items { result := &ScheduleResult{ @@ -103,7 +104,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID), } - job.Name = common_job.ImageTransfer + job.Name = common_job.Replication src, err := json.Marshal(item.SrcResource) if err != nil { result.Error = err @@ -133,8 +134,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, } // Stop the transfer job -func (d *DefaultReplicator) Stop(id string) error { - +func (d *defaultScheduler) Stop(id string) error { err := d.client.PostAction(id, opm.CtlCommandStop) if err != nil { return err diff --git a/src/replication/ng/scheduler/scheduler_test.go b/src/replication/ng/scheduler/scheduler_test.go index 1d620757f..1ff75b6e8 100644 --- a/src/replication/ng/scheduler/scheduler_test.go +++ b/src/replication/ng/scheduler/scheduler_test.go @@ -8,10 +8,8 @@ import ( "github.com/goharbor/harbor/src/replication/ng/model" ) -var replicator *DefaultReplicator - -func init() { - replicator = NewDefaultReplicator(TestClient{}) +var scheduler = &defaultScheduler{ + client: TestClient{}, } type TestClient struct { @@ -27,7 +25,7 @@ func (client TestClient) PostAction(uuid, action string) error { return nil } -func TestDefaultReplicator_Preprocess(t *testing.T) { +func TestPreprocess(t *testing.T) { items, err := generateData() if err != nil { t.Error(err) @@ -42,8 +40,8 @@ func TestDefaultReplicator_Preprocess(t *testing.T) { } -func TestDefaultReplicator_Stop(t *testing.T) { - err := replicator.Stop("id") +func TestStop(t *testing.T) { + err := scheduler.Stop("id") if err != nil { t.Error(err) } @@ -70,6 +68,6 @@ func generateData() ([]*ScheduleItem, error) { Credential: &model.Credential{}, }, } - items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) + items, err := scheduler.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) return items, err }