From 956a4c4f05f7c09ae254798667e2701fe62d8eea Mon Sep 17 00:00:00 2001 From: Yuan Lei <371304458@qq.com> Date: Wed, 30 Jan 2019 14:49:05 +0800 Subject: [PATCH 1/3] scheduler module to schedule the transfer job of resouce Signed-off-by: Yuan Lei <371304458@qq.com> --- src/replication/ng/scheduler/schedule.go | 92 +++++++++++++++++++ src/replication/ng/scheduler/schedule_test.go | 78 ++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 src/replication/ng/scheduler/schedule.go create mode 100644 src/replication/ng/scheduler/schedule_test.go diff --git a/src/replication/ng/scheduler/schedule.go b/src/replication/ng/scheduler/schedule.go new file mode 100644 index 000000000..7b070a556 --- /dev/null +++ b/src/replication/ng/scheduler/schedule.go @@ -0,0 +1,92 @@ +package schedule + +import ( + "encoding/json" + "time" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/jobservice/opm" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// DefaultReplicator provides a default implement for Replicator +type DefaultReplicator struct { + client job.Client +} + +// NewDefaultReplicator returns an instance of DefaultReplicator +func NewDefaultReplicator(client job.Client) *DefaultReplicator { + return &DefaultReplicator{ + client: client, + } +} + +// Scheduler to schedule the tasks to transfer resource data +type Scheduler interface { + // Schedule tasks + Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) + StopTransfer(jobID string) error +} + +// Schedule the task to transfer resouce data +func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) { + var tasks []*model.Task + for _, destResource := range destResources { + + for _, srcResource := range srcResources { + task := &model.Task{} + task.ResourceType = srcResource.Type + task.StartTime = time.Now().UTC() + src, err := json.Marshal(srcResource) + if err != nil { + log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.SrcResource = string(src) + dest, err := json.Marshal(destResource) + if err != nil { + log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.DstResource = string(dest) + + newjob := &models.JobData{ + Metadata: &models.JobMetadata{ + JobKind: job.JobKindGeneric, + }, + } + + newjob.Name = job.ImageTransfer + newjob.Parameters = map[string]interface{}{ + "src_resource": srcResource, + "dst_resource": destResource, + } + uuid, err := d.client.SubmitJob(newjob) + if err != nil { + log.Errorf("failed to submit the job from %v to %v.err:%s!", srcResource, destResource, err.Error()) + } + task.JobID = uuid + task.Status = "" + tasks = append(tasks, task) + + } + } + return tasks, nil +} + +// StopTransfer to stop the transfer job +func (d *DefaultReplicator) StopTransfer(jobID string) error { + + err := d.client.PostAction(jobID, opm.CtlCommandStop) + if err != nil { + return err + } + return nil + +} diff --git a/src/replication/ng/scheduler/schedule_test.go b/src/replication/ng/scheduler/schedule_test.go new file mode 100644 index 000000000..c05578c11 --- /dev/null +++ b/src/replication/ng/scheduler/schedule_test.go @@ -0,0 +1,78 @@ +package schedule + +import ( + "testing" + + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +var replicator *DefaultReplicator + +func init() { + replicator = NewDefaultReplicator(TestClient{}) +} + +type TestClient struct { +} + +func (client TestClient) SubmitJob(*models.JobData) (string, error) { + return "submited-uuid", nil +} +func (client TestClient) GetJobLog(uuid string) ([]byte, error) { + return []byte("job log"), nil +} +func (client TestClient) PostAction(uuid, action string) error { + return nil +} + +func TestDefaultReplicator_Schedule(t *testing.T) { + + srcResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "1", + "dst_registry": "1", + "namespace": "1", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + destResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "1", + "dst_registry": "1", + "namespace": "1", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) + if err != nil { + t.Error(err) + } + for _, task := range tasks { + t.Log(*task) + } + +} + +func TestDefaultReplicator_StopTransfer(t *testing.T) { + err := replicator.StopTransfer("job_id") + if err != nil { + t.Error(err) + } +} From bd84f5f490d09c52373cddfad5bcffdd3297aa94 Mon Sep 17 00:00:00 2001 From: Yuan Lei <371304458@qq.com> Date: Wed, 13 Feb 2019 17:47:07 +0800 Subject: [PATCH 2/3] sperate the logics to two part: 1,tasks generating. 2,tasks submit Signed-off-by: Yuan Lei <371304458@qq.com> --- src/replication/ng/scheduler/schedule.go | 117 +++++++----- src/replication/ng/scheduler/schedule_test.go | 178 ++++++++++-------- 2 files changed, 171 insertions(+), 124 deletions(-) diff --git a/src/replication/ng/scheduler/schedule.go b/src/replication/ng/scheduler/schedule.go index 7b070a556..1afe7b64a 100644 --- a/src/replication/ng/scheduler/schedule.go +++ b/src/replication/ng/scheduler/schedule.go @@ -2,11 +2,15 @@ package schedule import ( "encoding/json" + "errors" + "fmt" "time" "github.com/goharbor/harbor/src/common/job" + common_job "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -27,63 +31,84 @@ func NewDefaultReplicator(client job.Client) *DefaultReplicator { type Scheduler interface { // Schedule tasks Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) - StopTransfer(jobID string) error + StopExecution(executionID string) error } -// Schedule the task to transfer resouce data +// Schedule the tasks base on resources func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) { + if len(srcResources) != len(destResources) { + err := errors.New("srcResources has different length with destResources") + log.Errorf(err.Error()) + return nil, err + } var tasks []*model.Task - for _, destResource := range destResources { - - for _, srcResource := range srcResources { - task := &model.Task{} - task.ResourceType = srcResource.Type - task.StartTime = time.Now().UTC() - src, err := json.Marshal(srcResource) - if err != nil { - log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) - task.Status = "Error" - tasks = append(tasks, task) - continue - } - task.SrcResource = string(src) - dest, err := json.Marshal(destResource) - if err != nil { - log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error()) - task.Status = "Error" - tasks = append(tasks, task) - continue - } - task.DstResource = string(dest) - - newjob := &models.JobData{ - Metadata: &models.JobMetadata{ - JobKind: job.JobKindGeneric, - }, - } - - newjob.Name = job.ImageTransfer - newjob.Parameters = map[string]interface{}{ - "src_resource": srcResource, - "dst_resource": destResource, - } - uuid, err := d.client.SubmitJob(newjob) - if err != nil { - log.Errorf("failed to submit the job from %v to %v.err:%s!", srcResource, destResource, err.Error()) - } - task.JobID = uuid - task.Status = "" + for index, srcResource := range srcResources { + destResource := destResources[index] + task := &model.Task{} + task.ResourceType = srcResource.Type + task.StartTime = time.Now().UTC() + src, err := json.Marshal(srcResource) + if err != nil { + log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) + task.Status = "Error" tasks = append(tasks, task) - + continue } + task.SrcResource = string(src) + dest, err := json.Marshal(destResource) + if err != nil { + log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.DstResource = string(dest) + task.Status = "Initial" + tasks = append(tasks, task) + } return tasks, nil } -// StopTransfer to stop the transfer job -func (d *DefaultReplicator) StopTransfer(jobID string) error { +// SubmitTasks transfer the tasks to jobs,and then submit these jobs to job service. +func (d *DefaultReplicator) SubmitTasks(tasks []*model.Task) ([]*model.Task, error) { + for _, task := range tasks { + if task.ID == 0 { + err := errors.New("task do not have ID") + log.Errorf(err.Error()) + return nil, err + } + job := &models.JobData{ + Metadata: &models.JobMetadata{ + JobKind: job.JobKindGeneric, + }, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), task.ID), + } - err := d.client.PostAction(jobID, opm.CtlCommandStop) + job.Name = common_job.ImageTransfer + job.Parameters = map[string]interface{}{ + "src_resource": task.SrcResource, + "dst_resource": task.DstResource, + } + uuid, err := d.client.SubmitJob(job) + if err != nil { + log.Errorf("failed to submit the task:%v .err:%s!", task, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.JobID = uuid + task.Status = "Pending" + tasks = append(tasks, task) + } + return tasks, nil + +} + +// StopExecution to stop the transfer job +func (d *DefaultReplicator) StopExecution(executionID string) error { + + err := d.client.PostAction(executionID, opm.CtlCommandStop) if err != nil { return err } diff --git a/src/replication/ng/scheduler/schedule_test.go b/src/replication/ng/scheduler/schedule_test.go index c05578c11..de6429c29 100644 --- a/src/replication/ng/scheduler/schedule_test.go +++ b/src/replication/ng/scheduler/schedule_test.go @@ -1,78 +1,100 @@ -package schedule - -import ( - "testing" - - "github.com/goharbor/harbor/src/common/job/models" - "github.com/goharbor/harbor/src/replication/ng/model" -) - -var replicator *DefaultReplicator - -func init() { - replicator = NewDefaultReplicator(TestClient{}) -} - -type TestClient struct { -} - -func (client TestClient) SubmitJob(*models.JobData) (string, error) { - return "submited-uuid", nil -} -func (client TestClient) GetJobLog(uuid string) ([]byte, error) { - return []byte("job log"), nil -} -func (client TestClient) PostAction(uuid, action string) error { - return nil -} - -func TestDefaultReplicator_Schedule(t *testing.T) { - - srcResource := &model.Resource{ - Metadata: &model.ResourceMetadata{ - Namespace: &model.Namespace{ - Metadata: map[string]interface{}{ - "resource": "1", - "dst_registry": "1", - "namespace": "1", - }, - }, - Vtags: []string{"latest"}, - Labels: []string{"latest"}, - }, - Registry: &model.Registry{ - Credential: &model.Credential{}, - }, - } - destResource := &model.Resource{ - Metadata: &model.ResourceMetadata{ - Namespace: &model.Namespace{ - Metadata: map[string]interface{}{ - "resource": "1", - "dst_registry": "1", - "namespace": "1", - }, - }, - Vtags: []string{"latest"}, - Labels: []string{"latest"}, - }, - Registry: &model.Registry{ - Credential: &model.Credential{}, - }, - } - tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) - if err != nil { - t.Error(err) - } - for _, task := range tasks { - t.Log(*task) - } - -} - -func TestDefaultReplicator_StopTransfer(t *testing.T) { - err := replicator.StopTransfer("job_id") - if err != nil { - t.Error(err) - } -} +package schedule + +import ( + "testing" + + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +var replicator *DefaultReplicator + +func init() { + replicator = NewDefaultReplicator(TestClient{}) +} + +type TestClient struct { +} + +func (client TestClient) SubmitJob(*models.JobData) (string, error) { + return "submited-uuid", nil +} +func (client TestClient) GetJobLog(uuid string) ([]byte, error) { + return []byte("job log"), nil +} +func (client TestClient) PostAction(uuid, action string) error { + return nil +} + +func TestDefaultReplicator_Schedule(t *testing.T) { + tasks, err := generateData() + if err != nil { + t.Error(err) + } + for _, task := range tasks { + t.Log(*task) + } + +} + +//func TestDefaultReplicator_SubmitJobs(t *testing.T) { +// config.Init() +// tasks, err := generateData() +// if err != nil { +// t.Error(err) +// } +// for _, task := range tasks { +// task.ID = 22 +// } +// newTasks, newErr := replicator.SubmitTasks(tasks) +// if newErr != nil { +// t.Error(newErr) +// } +// for _, task := range newTasks { +// t.Log(*task) +// } +//} + +func TestDefaultReplicator_StopExecution(t *testing.T) { + err := replicator.StopExecution("id") + if err != nil { + t.Error(err) + } +} + +func generateData() ([]*model.Task, error) { + srcResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "1", + "dst_registry": "1", + "namespace": "1", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + destResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "2", + "dst_registry": "2", + "namespace": "2", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) + return tasks, err +} From 7108b5c58c18575cc5ecfe5b6097170fcfaf1a00 Mon Sep 17 00:00:00 2001 From: Yuan Lei <371304458@qq.com> Date: Fri, 15 Feb 2019 17:55:26 +0800 Subject: [PATCH 3/3] update scheduler interface to follow the new standard Signed-off-by: Yuan Lei <371304458@qq.com> --- src/replication/ng/scheduler/schedule.go | 117 --------------- src/replication/ng/scheduler/scheduler.go | 134 ++++++++++++++++++ .../{schedule_test.go => scheduler_test.go} | 47 +++--- 3 files changed, 162 insertions(+), 136 deletions(-) delete mode 100644 src/replication/ng/scheduler/schedule.go create mode 100644 src/replication/ng/scheduler/scheduler.go rename src/replication/ng/scheduler/{schedule_test.go => scheduler_test.go} (63%) diff --git a/src/replication/ng/scheduler/schedule.go b/src/replication/ng/scheduler/schedule.go deleted file mode 100644 index 1afe7b64a..000000000 --- a/src/replication/ng/scheduler/schedule.go +++ /dev/null @@ -1,117 +0,0 @@ -package schedule - -import ( - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/goharbor/harbor/src/common/job" - common_job "github.com/goharbor/harbor/src/common/job" - "github.com/goharbor/harbor/src/common/job/models" - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/jobservice/opm" - "github.com/goharbor/harbor/src/replication/ng/model" -) - -// DefaultReplicator provides a default implement for Replicator -type DefaultReplicator struct { - client job.Client -} - -// NewDefaultReplicator returns an instance of DefaultReplicator -func NewDefaultReplicator(client job.Client) *DefaultReplicator { - return &DefaultReplicator{ - client: client, - } -} - -// Scheduler to schedule the tasks to transfer resource data -type Scheduler interface { - // Schedule tasks - Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) - StopExecution(executionID string) error -} - -// Schedule the tasks base on resources -func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) { - if len(srcResources) != len(destResources) { - err := errors.New("srcResources has different length with destResources") - log.Errorf(err.Error()) - return nil, err - } - var tasks []*model.Task - for index, srcResource := range srcResources { - destResource := destResources[index] - task := &model.Task{} - task.ResourceType = srcResource.Type - task.StartTime = time.Now().UTC() - src, err := json.Marshal(srcResource) - if err != nil { - log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) - task.Status = "Error" - tasks = append(tasks, task) - continue - } - task.SrcResource = string(src) - dest, err := json.Marshal(destResource) - if err != nil { - log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error()) - task.Status = "Error" - tasks = append(tasks, task) - continue - } - task.DstResource = string(dest) - task.Status = "Initial" - tasks = append(tasks, task) - - } - return tasks, nil -} - -// SubmitTasks transfer the tasks to jobs,and then submit these jobs to job service. -func (d *DefaultReplicator) SubmitTasks(tasks []*model.Task) ([]*model.Task, error) { - for _, task := range tasks { - if task.ID == 0 { - err := errors.New("task do not have ID") - log.Errorf(err.Error()) - return nil, err - } - job := &models.JobData{ - Metadata: &models.JobMetadata{ - JobKind: job.JobKindGeneric, - }, - StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), task.ID), - } - - job.Name = common_job.ImageTransfer - job.Parameters = map[string]interface{}{ - "src_resource": task.SrcResource, - "dst_resource": task.DstResource, - } - uuid, err := d.client.SubmitJob(job) - if err != nil { - log.Errorf("failed to submit the task:%v .err:%s!", task, err.Error()) - task.Status = "Error" - tasks = append(tasks, task) - continue - } - task.JobID = uuid - task.Status = "Pending" - tasks = append(tasks, task) - } - return tasks, nil - -} - -// StopExecution to stop the transfer job -func (d *DefaultReplicator) StopExecution(executionID string) error { - - err := d.client.PostAction(executionID, opm.CtlCommandStop) - if err != nil { - return err - } - return nil - -} diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go new file mode 100644 index 000000000..4088c391a --- /dev/null +++ b/src/replication/ng/scheduler/scheduler.go @@ -0,0 +1,134 @@ +package scheduler + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/goharbor/harbor/src/common/job" + common_job "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/jobservice/opm" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// DefaultReplicator provides a default implement for Replicator +type DefaultReplicator struct { + client job.Client +} + +// NewDefaultReplicator returns an instance of DefaultReplicator +func NewDefaultReplicator(client job.Client) *DefaultReplicator { + return &DefaultReplicator{ + client: client, + } +} + +// ScheduleItem is an item that can be scheduled +type ScheduleItem struct { + TaskID int64 // used as the param in the hook + SrcResource *model.Resource + DstResource *model.Resource +} + +// ScheduleResult is the result of the schedule for one item +type ScheduleResult struct { + TaskID int64 + Error error +} + +// Scheduler schedules +type Scheduler interface { + // Preprocess the resources and returns the item list that can be scheduled + Preprocess([]*model.Resource, []*model.Resource) ([]*ScheduleItem, error) + // Schedule the items. If got error when scheduling one of the items, + // the error should be put in the corresponding ScheduleResult and the + // returning error of this function should be nil + Schedule([]*ScheduleItem) ([]*ScheduleResult, error) + // Stop the job specified by ID + Stop(id string) error +} + +// Preprocess the resources and returns the item list that can be scheduled +func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { + if len(srcResources) != len(destResources) { + err := errors.New("srcResources has different length with destResources") + log.Errorf(err.Error()) + return nil, err + } + var items []*ScheduleItem + for index, srcResource := range srcResources { + destResource := destResources[index] + item := &ScheduleItem{ + SrcResource: srcResource, + DstResource: destResource, + } + items = append(items, item) + + } + return items, nil +} + +// Schedule transfer the tasks to jobs,and then submit these jobs to job service. +func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { + var results []*ScheduleResult + for _, item := range items { + if item.TaskID == 0 { + err := errors.New("some tasks do not have a ID") + log.Errorf(err.Error()) + return nil, err + } + result := &ScheduleResult{ + TaskID: item.TaskID, + } + job := &models.JobData{ + Metadata: &models.JobMetadata{ + JobKind: job.JobKindGeneric, + }, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), item.TaskID), + } + + job.Name = common_job.ImageTransfer + src, err := json.Marshal(item.SrcResource) + if err != nil { + log.Errorf("failed to marshal the srcResource of %v.err:%s!", item.SrcResource, err.Error()) + result.Error = err + results = append(results, result) + continue + } + dest, err := json.Marshal(item.DstResource) + if err != nil { + log.Errorf("failed to marshal the dstResource of %v.err:%s!", item.DstResource, err.Error()) + result.Error = err + results = append(results, result) + continue + } + job.Parameters = map[string]interface{}{ + "src_resource": string(src), + "dst_resource": string(dest), + } + _, joberr := d.client.SubmitJob(job) + if joberr != nil { + log.Errorf("failed to submit the task:%v .err:%s!", item, joberr.Error()) + result.Error = joberr + results = append(results, result) + continue + } + results = append(results, result) + } + return results, nil + +} + +// Stop the transfer job +func (d *DefaultReplicator) Stop(id string) error { + + err := d.client.PostAction(id, opm.CtlCommandStop) + if err != nil { + return err + } + return nil + +} diff --git a/src/replication/ng/scheduler/schedule_test.go b/src/replication/ng/scheduler/scheduler_test.go similarity index 63% rename from src/replication/ng/scheduler/schedule_test.go rename to src/replication/ng/scheduler/scheduler_test.go index de6429c29..8a4ed0bb9 100644 --- a/src/replication/ng/scheduler/schedule_test.go +++ b/src/replication/ng/scheduler/scheduler_test.go @@ -1,6 +1,7 @@ -package schedule +package scheduler import ( + "encoding/json" "testing" "github.com/goharbor/harbor/src/common/job/models" @@ -26,43 +27,51 @@ func (client TestClient) PostAction(uuid, action string) error { return nil } -func TestDefaultReplicator_Schedule(t *testing.T) { - tasks, err := generateData() +func TestDefaultReplicator_Preprocess(t *testing.T) { + items, err := generateData() if err != nil { t.Error(err) } - for _, task := range tasks { - t.Log(*task) + for _, item := range items { + content, err := json.Marshal(item) + if err != nil { + t.Error(err) + } + t.Log(string(content)) } } -//func TestDefaultReplicator_SubmitJobs(t *testing.T) { -// config.Init() -// tasks, err := generateData() +//func TestDefaultReplicator_Schedule(t *testing.T) { +// // config.Init() +// items, err := generateData() // if err != nil { // t.Error(err) // } -// for _, task := range tasks { -// task.ID = 22 +// for _, item := range items { +// item.TaskID = 22 // } -// newTasks, newErr := replicator.SubmitTasks(tasks) +// results, newErr := replicator.Schedule(items) // if newErr != nil { // t.Error(newErr) // } -// for _, task := range newTasks { -// t.Log(*task) +// for _, result := range results { +// content, err := json.Marshal(result) +// if err != nil { +// t.Error(err) +// } +// t.Log(string(content)) // } //} - -func TestDefaultReplicator_StopExecution(t *testing.T) { - err := replicator.StopExecution("id") +// +func TestDefaultReplicator_Stop(t *testing.T) { + err := replicator.Stop("id") if err != nil { t.Error(err) } } -func generateData() ([]*model.Task, error) { +func generateData() ([]*ScheduleItem, error) { srcResource := &model.Resource{ Metadata: &model.ResourceMetadata{ Namespace: &model.Namespace{ @@ -95,6 +104,6 @@ func generateData() ([]*model.Task, error) { Credential: &model.Credential{}, }, } - tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) - return tasks, err + items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) + return items, err }