diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index 44498ca2e..d7e51eba7 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -15,9 +15,30 @@ package scheduler import ( + "encoding/json" + "errors" + "fmt" + + "github.com/goharbor/harbor/src/common/job" + common_job "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/replication/ng/model" ) +// DefaultReplicator provides a default implement for Replicator +type DefaultReplicator struct { + client job.Client +} + +// NewDefaultReplicator returns an instance of DefaultReplicator +func NewDefaultReplicator(client job.Client) *DefaultReplicator { + return &DefaultReplicator{ + client: client, + } +} + // ScheduleItem is an item that can be scheduled type ScheduleItem struct { TaskID int64 // used as the param in the hook @@ -42,3 +63,81 @@ type Scheduler interface { // Stop the job specified by ID Stop(id string) error } + +// Preprocess the resources and returns the item list that can be scheduled +func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { + if len(srcResources) != len(destResources) { + err := errors.New("srcResources has different length with destResources") + return nil, err + } + var items []*ScheduleItem + for index, srcResource := range srcResources { + destResource := destResources[index] + item := &ScheduleItem{ + SrcResource: srcResource, + DstResource: destResource, + } + items = append(items, item) + + } + return items, nil +} + +// Schedule transfer the tasks to jobs,and then submit these jobs to job service. +func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { + var results []*ScheduleResult + for _, item := range items { + result := &ScheduleResult{ + TaskID: item.TaskID, + } + if item.TaskID == 0 { + result.Error = errors.New("some tasks do not have a ID") + results = append(results, result) + continue + } + job := &models.JobData{ + Metadata: &models.JobMetadata{ + JobKind: job.JobKindGeneric, + }, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID), + } + + job.Name = common_job.ImageTransfer + src, err := json.Marshal(item.SrcResource) + if err != nil { + result.Error = err + results = append(results, result) + continue + } + dest, err := json.Marshal(item.DstResource) + if err != nil { + result.Error = err + results = append(results, result) + continue + } + job.Parameters = map[string]interface{}{ + "src_resource": string(src), + "dst_resource": string(dest), + } + _, joberr := d.client.SubmitJob(job) + if joberr != nil { + result.Error = joberr + results = append(results, result) + continue + } + results = append(results, result) + } + return results, nil + +} + +// Stop the transfer job +func (d *DefaultReplicator) Stop(id string) error { + + err := d.client.PostAction(id, opm.CtlCommandStop) + if err != nil { + return err + } + return nil + +} diff --git a/src/replication/ng/scheduler/scheduler_test.go b/src/replication/ng/scheduler/scheduler_test.go new file mode 100644 index 000000000..1d620757f --- /dev/null +++ b/src/replication/ng/scheduler/scheduler_test.go @@ -0,0 +1,75 @@ +package scheduler + +import ( + "encoding/json" + "testing" + + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +var replicator *DefaultReplicator + +func init() { + replicator = NewDefaultReplicator(TestClient{}) +} + +type TestClient struct { +} + +func (client TestClient) SubmitJob(*models.JobData) (string, error) { + return "submited-uuid", nil +} +func (client TestClient) GetJobLog(uuid string) ([]byte, error) { + return []byte("job log"), nil +} +func (client TestClient) PostAction(uuid, action string) error { + return nil +} + +func TestDefaultReplicator_Preprocess(t *testing.T) { + items, err := generateData() + if err != nil { + t.Error(err) + } + for _, item := range items { + content, err := json.Marshal(item) + if err != nil { + t.Error(err) + } + t.Log(string(content)) + } + +} + +func TestDefaultReplicator_Stop(t *testing.T) { + err := replicator.Stop("id") + if err != nil { + t.Error(err) + } +} + +func generateData() ([]*ScheduleItem, error) { + srcResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: "namespace1", + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + destResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: "namespace2", + Vtags: []string{"v1", "v2"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) + return items, err +}