diff --git a/src/replication/ng/scheduler/schedule.go b/src/replication/ng/scheduler/schedule.go new file mode 100644 index 000000000..7b070a556 --- /dev/null +++ b/src/replication/ng/scheduler/schedule.go @@ -0,0 +1,92 @@ +package schedule + +import ( + "encoding/json" + "time" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/jobservice/opm" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// DefaultReplicator provides a default implement for Replicator +type DefaultReplicator struct { + client job.Client +} + +// NewDefaultReplicator returns an instance of DefaultReplicator +func NewDefaultReplicator(client job.Client) *DefaultReplicator { + return &DefaultReplicator{ + client: client, + } +} + +// Scheduler to schedule the tasks to transfer resource data +type Scheduler interface { + // Schedule tasks + Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) + StopTransfer(jobID string) error +} + +// Schedule the task to transfer resouce data +func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) { + var tasks []*model.Task + for _, destResource := range destResources { + + for _, srcResource := range srcResources { + task := &model.Task{} + task.ResourceType = srcResource.Type + task.StartTime = time.Now().UTC() + src, err := json.Marshal(srcResource) + if err != nil { + log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.SrcResource = string(src) + dest, err := json.Marshal(destResource) + if err != nil { + log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error()) + task.Status = "Error" + tasks = append(tasks, task) + continue + } + task.DstResource = string(dest) + + newjob := &models.JobData{ + Metadata: &models.JobMetadata{ + JobKind: job.JobKindGeneric, + }, + } + + newjob.Name = job.ImageTransfer + newjob.Parameters = map[string]interface{}{ + "src_resource": srcResource, + "dst_resource": destResource, + } + uuid, err := d.client.SubmitJob(newjob) + if err != nil { + log.Errorf("failed to submit the job from %v to %v.err:%s!", srcResource, destResource, err.Error()) + } + task.JobID = uuid + task.Status = "" + tasks = append(tasks, task) + + } + } + return tasks, nil +} + +// StopTransfer to stop the transfer job +func (d *DefaultReplicator) StopTransfer(jobID string) error { + + err := d.client.PostAction(jobID, opm.CtlCommandStop) + if err != nil { + return err + } + return nil + +} diff --git a/src/replication/ng/scheduler/schedule_test.go b/src/replication/ng/scheduler/schedule_test.go new file mode 100644 index 000000000..c05578c11 --- /dev/null +++ b/src/replication/ng/scheduler/schedule_test.go @@ -0,0 +1,78 @@ +package schedule + +import ( + "testing" + + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +var replicator *DefaultReplicator + +func init() { + replicator = NewDefaultReplicator(TestClient{}) +} + +type TestClient struct { +} + +func (client TestClient) SubmitJob(*models.JobData) (string, error) { + return "submited-uuid", nil +} +func (client TestClient) GetJobLog(uuid string) ([]byte, error) { + return []byte("job log"), nil +} +func (client TestClient) PostAction(uuid, action string) error { + return nil +} + +func TestDefaultReplicator_Schedule(t *testing.T) { + + srcResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "1", + "dst_registry": "1", + "namespace": "1", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + destResource := &model.Resource{ + Metadata: &model.ResourceMetadata{ + Namespace: &model.Namespace{ + Metadata: map[string]interface{}{ + "resource": "1", + "dst_registry": "1", + "namespace": "1", + }, + }, + Vtags: []string{"latest"}, + Labels: []string{"latest"}, + }, + Registry: &model.Registry{ + Credential: &model.Credential{}, + }, + } + tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) + if err != nil { + t.Error(err) + } + for _, task := range tasks { + t.Log(*task) + } + +} + +func TestDefaultReplicator_StopTransfer(t *testing.T) { + err := replicator.StopTransfer("job_id") + if err != nil { + t.Error(err) + } +}