Merge branch 'replication_ng' into replication_ng_execution_upgrade

This commit is contained in:
Wenkai Yin 2019-03-18 18:12:22 +08:00 committed by GitHub
commit 79217e2485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 136 additions and 100 deletions

View File

@ -13,8 +13,8 @@ const (
ImageReplicate = "IMAGE_REPLICATE" ImageReplicate = "IMAGE_REPLICATE"
// ImageGC the name of image garbage collection job in job service // ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC" ImageGC = "IMAGE_GC"
// ImageReplication : the name of image replication job in job service // Replication : the name of the replication job in job service
ImageReplication = "IMAGE_REPLICATION" Replication = "REPLICATION"
// JobKindGeneric : Kind of generic job // JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic" JobKindGeneric = "Generic"

View File

@ -166,7 +166,10 @@ func (r *ReplicationOperationAPI) ListTasks() {
query := &models.TaskQuery{ query := &models.TaskQuery{
ExecutionID: executionID, ExecutionID: executionID,
ResourceType: r.GetString("resource_type"), ResourceType: r.GetString("resource_type"),
Statuses: []string{r.GetString("status")}, }
status := r.GetString("status")
if len(status) > 0 {
query.Statuses = []string{status}
} }
query.Page, query.Size = r.GetPaginationParams() query.Page, query.Size = r.GetPaginationParams()
total, tasks, err := ng.OperationCtl.ListTasks(query) total, tasks, err := ng.OperationCtl.ListTasks(query)

View File

@ -26,6 +26,8 @@ import (
_ "github.com/goharbor/harbor/src/replication/ng/transfer/chart" _ "github.com/goharbor/harbor/src/replication/ng/transfer/chart"
// import repository transfer // import repository transfer
_ "github.com/goharbor/harbor/src/replication/ng/transfer/repository" _ "github.com/goharbor/harbor/src/replication/ng/transfer/repository"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
) )
// Replication implements the job interface // Replication implements the job interface

View File

@ -206,13 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
} }
if err := redisWorkerPool.RegisterJobs( if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{ map[string]interface{}{
job.ImageScanJob: (*scan.ClairJob)(nil), job.ImageScanJob: (*scan.ClairJob)(nil),
job.ImageScanAllJob: (*scan.All)(nil), job.ImageScanAllJob: (*scan.All)(nil),
job.ImageTransfer: (*replication.Transfer)(nil), job.ImageTransfer: (*replication.Transfer)(nil),
job.ImageDelete: (*replication.Deleter)(nil), job.ImageDelete: (*replication.Deleter)(nil),
job.ImageReplicate: (*replication.Replicator)(nil), job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil), job.ImageGC: (*gc.GarbageCollector)(nil),
job.ImageReplication: (*ng.Replication)(nil), job.Replication: (*ng.Replication)(nil),
}); err != nil { }); err != nil {
// exit // exit
return nil, err return nil, err

View File

@ -17,7 +17,7 @@ package harbor
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strconv" // "strconv"
common_http "github.com/goharbor/harbor/src/common/http" common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier" "github.com/goharbor/harbor/src/common/http/modifier"
@ -96,27 +96,32 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
Name string `json:"project_name"` Name string `json:"project_name"`
Metadata map[string]interface{} `json:"metadata"` Metadata map[string]interface{} `json:"metadata"`
}{ }{
Name: namespace.Name, Name: namespace.Name,
Metadata: namespace.Metadata,
} }
// handle the public of the project
if meta, exist := namespace.Metadata["public"]; exist {
public := true
// if one of them is "private", the set the public as false
for _, value := range meta.(map[string]interface{}) {
b, err := strconv.ParseBool(value.(string))
if err != nil {
return err
}
if !b {
public = false
break
}
// TODO
/*
// handle the public of the project
if meta, exist := namespace.Metadata["public"]; exist {
public := true
// if one of them is "private", the set the public as false
for _, value := range meta.(map[string]interface{}) {
b, err := strconv.ParseBool(value.(string))
if err != nil {
return err
}
if !b {
public = false
break
}
}
project.Metadata = map[string]interface{}{
"public": public,
}
} }
project.Metadata = map[string]interface{}{ */
"public": public,
}
}
err := a.client.Post(a.coreServiceURL+"/api/projects", project) err := a.client.Post(a.coreServiceURL+"/api/projects", project)
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict { if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict {

View File

@ -77,6 +77,13 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error
return id, nil return id, nil
} }
// if no resources need to be replicated, mark the execution success and return
if len(flow.srcResources) == 0 {
flow.markExecutionSuccess("no resources need to be replicated")
log.Infof("no resources need to be replicated for the execution %d, skip", id)
return id, nil
}
// create the namespace on the destination registry // create the namespace on the destination registry
if err = flow.createNamespace(); err != nil { if err = flow.createNamespace(); err != nil {
log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err) log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err)

View File

@ -39,7 +39,8 @@ type flow struct {
srcAdapter adapter.Adapter srcAdapter adapter.Adapter
dstAdapter adapter.Adapter dstAdapter adapter.Adapter
executionID int64 executionID int64
resources []*model.Resource srcResources []*model.Resource
dstResources []*model.Resource
executionMgr execution.Manager executionMgr execution.Manager
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
scheduleItems []*scheduler.ScheduleItem scheduleItems []*scheduler.ScheduleItem
@ -151,7 +152,7 @@ func (f *flow) fetchResources() error {
} }
// TODO consider whether the logic can be refactored by using reflect // TODO consider whether the logic can be refactored by using reflect
resources := []*model.Resource{} srcResources := []*model.Resource{}
for _, typ := range resTypes { for _, typ := range resTypes {
if typ == model.ResourceTypeRepository { if typ == model.ResourceTypeRepository {
reg, ok := f.srcAdapter.(adapter.ImageRegistry) reg, ok := f.srcAdapter.(adapter.ImageRegistry)
@ -165,12 +166,38 @@ func (f *flow) fetchResources() error {
f.markExecutionFailure(err) f.markExecutionFailure(err)
return err return err
} }
resources = append(resources, res...) srcResources = append(srcResources, res...)
continue continue
} }
// TODO add support for chart // TODO add support for chart
} }
f.resources = resources
dstResources := []*model.Resource{}
for _, srcResource := range srcResources {
dstResource := &model.Resource{
Type: srcResource.Type,
Metadata: &model.ResourceMetadata{
Name: srcResource.Metadata.Name,
Namespace: srcResource.Metadata.Namespace,
Vtags: srcResource.Metadata.Vtags,
},
Registry: f.dstRegistry,
ExtendedInfo: srcResource.ExtendedInfo,
Deleted: srcResource.Deleted,
Override: f.policy.Override,
}
// TODO check whether the logic is applied to chart
// if the destination namespace is specified, use the specified one
if len(f.policy.DestNamespace) > 0 {
dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name,
srcResource.Metadata.Namespace, f.policy.DestNamespace, 1)
dstResource.Metadata.Namespace = f.policy.DestNamespace
}
dstResources = append(dstResources, dstResource)
}
f.srcResources = srcResources
f.dstResources = dstResources
log.Debugf("resources for the execution %d fetched from the source registry", f.executionID) log.Debugf("resources for the execution %d fetched from the source registry", f.executionID)
return nil return nil
@ -201,55 +228,36 @@ func (f *flow) createNamespace() error {
// }, // },
// }, // },
// } // }
metadata := map[string]interface{}{} // TODO merge the metadata of different namespaces
for _, srcNamespace := range f.policy.SrcNamespaces { namespaces := []*model.Namespace{}
namespace, err := f.srcAdapter.GetNamespace(srcNamespace) for i, resource := range f.dstResources {
namespace := &model.Namespace{
Name: resource.Metadata.Namespace,
}
// get the metadata of the namespace from the source registry
ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace)
if err != nil { if err != nil {
f.markExecutionFailure(err) f.markExecutionFailure(err)
return err return err
} }
for key, value := range namespace.Metadata { namespace.Metadata = ns.Metadata
var m map[string]interface{} namespaces = append(namespaces, namespace)
if metadata[key] == nil { }
m = map[string]interface{}{}
} else { for _, namespace := range namespaces {
m = metadata[key].(map[string]interface{}) if err := f.dstAdapter.CreateNamespace(namespace); err != nil {
} f.markExecutionFailure(err)
m[namespace.Name] = value return err
} }
log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID)
} }
if err := f.dstAdapter.CreateNamespace(&model.Namespace{
Name: f.policy.DestNamespace,
Metadata: metadata,
}); err != nil {
f.markExecutionFailure(err)
return err
}
log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID)
return nil return nil
} }
func (f *flow) preprocess() error { func (f *flow) preprocess() error {
dstResources := []*model.Resource{} items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources)
for _, srcResource := range f.resources {
dstResource := &model.Resource{
Type: srcResource.Type,
Metadata: &model.ResourceMetadata{
Name: srcResource.Metadata.Name,
Namespace: f.policy.DestNamespace,
Vtags: srcResource.Metadata.Vtags,
},
Registry: f.dstRegistry,
ExtendedInfo: srcResource.ExtendedInfo,
Deleted: srcResource.Deleted,
Override: f.policy.Override,
}
dstResources = append(dstResources, dstResource)
}
items, err := f.scheduler.Preprocess(f.resources, dstResources)
if err != nil { if err != nil {
f.markExecutionFailure(err) f.markExecutionFailure(err)
return err return err
@ -277,6 +285,7 @@ func (f *flow) createTasks() error {
f.markExecutionFailure(err) f.markExecutionFailure(err)
return err return err
} }
item.TaskID = id item.TaskID = id
log.Debugf("task record %d for the execution %d created", log.Debugf("task record %d for the execution %d created",
id, f.executionID) id, f.executionID)
@ -311,7 +320,7 @@ func (f *flow) schedule() error {
ID: result.TaskID, ID: result.TaskID,
JobID: result.JobID, JobID: result.JobID,
StartTime: time.Now(), StartTime: time.Now(),
}); err != nil { }, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update task %d: %v", result.TaskID, err) log.Errorf("failed to update task %d: %v", result.TaskID, err)
} }
log.Debugf("the task %d scheduled", result.TaskID) log.Debugf("the task %d scheduled", result.TaskID)
@ -344,6 +353,20 @@ func (f *flow) markExecutionFailure(err error) {
} }
} }
func (f *flow) markExecutionSuccess(msg string) {
log.Debugf("the execution %d is marked as success", f.executionID)
err := f.executionMgr.Update(
&models.Execution{
ID: f.executionID,
Status: models.ExecutionStatusSucceed,
StatusText: msg,
EndTime: time.Now(),
}, "Status", "StatusText", "EndTime")
if err != nil {
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
}
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" // return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
// if the resource has vtags // if the resource has vtags
func getResourceName(res *model.Resource) string { func getResourceName(res *model.Resource) string {

View File

@ -19,17 +19,15 @@ package ng
import ( import (
"fmt" "fmt"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/execution" "github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
"github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation" "github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/registry" "github.com/goharbor/harbor/src/replication/ng/registry"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
) )
var ( var (
@ -49,8 +47,8 @@ func Init() error {
PolicyMgr = policy.NewDefaultManager() PolicyMgr = policy.NewDefaultManager()
// init ExecutionMgr // init ExecutionMgr
executionMgr := execution.NewDefaultManager() executionMgr := execution.NewDefaultManager()
// TODO init scheduler // init scheduler
var scheduler scheduler.Scheduler scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret())
flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler) flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler)
if err != nil { if err != nil {

View File

@ -27,15 +27,16 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
) )
// DefaultReplicator provides a default implement for Replicator type defaultScheduler struct {
type DefaultReplicator struct {
client job.Client client job.Client
} }
// NewDefaultReplicator returns an instance of DefaultReplicator // TODO use the service account?
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{ // NewScheduler returns an instance of Scheduler
client: client, func NewScheduler(jobserviceURL, secret string) Scheduler {
return &defaultScheduler{
client: job.NewDefaultClient(jobserviceURL, secret),
} }
} }
@ -66,7 +67,7 @@ type Scheduler interface {
} }
// Preprocess the resources and returns the item list that can be scheduled // Preprocess the resources and returns the item list that can be scheduled
func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { func (d *defaultScheduler) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) {
if len(srcResources) != len(destResources) { if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources") err := errors.New("srcResources has different length with destResources")
return nil, err return nil, err
@ -85,7 +86,7 @@ func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResou
} }
// Schedule transfer the tasks to jobs,and then submit these jobs to job service. // Schedule transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) {
var results []*ScheduleResult var results []*ScheduleResult
for _, item := range items { for _, item := range items {
result := &ScheduleResult{ result := &ScheduleResult{
@ -103,7 +104,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult,
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID), StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID),
} }
job.Name = common_job.ImageTransfer job.Name = common_job.Replication
src, err := json.Marshal(item.SrcResource) src, err := json.Marshal(item.SrcResource)
if err != nil { if err != nil {
result.Error = err result.Error = err
@ -133,8 +134,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult,
} }
// Stop the transfer job // Stop the transfer job
func (d *DefaultReplicator) Stop(id string) error { func (d *defaultScheduler) Stop(id string) error {
err := d.client.PostAction(id, opm.CtlCommandStop) err := d.client.PostAction(id, opm.CtlCommandStop)
if err != nil { if err != nil {
return err return err

View File

@ -8,10 +8,8 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
) )
var replicator *DefaultReplicator var scheduler = &defaultScheduler{
client: TestClient{},
func init() {
replicator = NewDefaultReplicator(TestClient{})
} }
type TestClient struct { type TestClient struct {
@ -27,7 +25,7 @@ func (client TestClient) PostAction(uuid, action string) error {
return nil return nil
} }
func TestDefaultReplicator_Preprocess(t *testing.T) { func TestPreprocess(t *testing.T) {
items, err := generateData() items, err := generateData()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -42,8 +40,8 @@ func TestDefaultReplicator_Preprocess(t *testing.T) {
} }
func TestDefaultReplicator_Stop(t *testing.T) { func TestStop(t *testing.T) {
err := replicator.Stop("id") err := scheduler.Stop("id")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -70,6 +68,6 @@ func generateData() ([]*ScheduleItem, error) {
Credential: &model.Credential{}, Credential: &model.Credential{},
}, },
} }
items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) items, err := scheduler.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource})
return items, err return items, err
} }