scheduler module to schedule the transfer job of resouce

Signed-off-by: Yuan Lei <371304458@qq.com>
This commit is contained in:
Yuan Lei 2019-01-30 14:49:05 +08:00
parent 0115067277
commit 956a4c4f05
2 changed files with 170 additions and 0 deletions

View File

@ -0,0 +1,92 @@
package schedule
import (
"encoding/json"
"time"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client job.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client,
}
}
// Scheduler to schedule the tasks to transfer resource data
type Scheduler interface {
// Schedule tasks
Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error)
StopTransfer(jobID string) error
}
// Schedule the task to transfer resouce data
func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) {
var tasks []*model.Task
for _, destResource := range destResources {
for _, srcResource := range srcResources {
task := &model.Task{}
task.ResourceType = srcResource.Type
task.StartTime = time.Now().UTC()
src, err := json.Marshal(srcResource)
if err != nil {
log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.SrcResource = string(src)
dest, err := json.Marshal(destResource)
if err != nil {
log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.DstResource = string(dest)
newjob := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
}
newjob.Name = job.ImageTransfer
newjob.Parameters = map[string]interface{}{
"src_resource": srcResource,
"dst_resource": destResource,
}
uuid, err := d.client.SubmitJob(newjob)
if err != nil {
log.Errorf("failed to submit the job from %v to %v.err:%s!", srcResource, destResource, err.Error())
}
task.JobID = uuid
task.Status = ""
tasks = append(tasks, task)
}
}
return tasks, nil
}
// StopTransfer to stop the transfer job
func (d *DefaultReplicator) StopTransfer(jobID string) error {
err := d.client.PostAction(jobID, opm.CtlCommandStop)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,78 @@
package schedule
import (
"testing"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/replication/ng/model"
)
var replicator *DefaultReplicator
func init() {
replicator = NewDefaultReplicator(TestClient{})
}
type TestClient struct {
}
func (client TestClient) SubmitJob(*models.JobData) (string, error) {
return "submited-uuid", nil
}
func (client TestClient) GetJobLog(uuid string) ([]byte, error) {
return []byte("job log"), nil
}
func (client TestClient) PostAction(uuid, action string) error {
return nil
}
func TestDefaultReplicator_Schedule(t *testing.T) {
srcResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: &model.Namespace{
Metadata: map[string]interface{}{
"resource": "1",
"dst_registry": "1",
"namespace": "1",
},
},
Vtags: []string{"latest"},
Labels: []string{"latest"},
},
Registry: &model.Registry{
Credential: &model.Credential{},
},
}
destResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: &model.Namespace{
Metadata: map[string]interface{}{
"resource": "1",
"dst_registry": "1",
"namespace": "1",
},
},
Vtags: []string{"latest"},
Labels: []string{"latest"},
},
Registry: &model.Registry{
Credential: &model.Credential{},
},
}
tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource})
if err != nil {
t.Error(err)
}
for _, task := range tasks {
t.Log(*task)
}
}
func TestDefaultReplicator_StopTransfer(t *testing.T) {
err := replicator.StopTransfer("job_id")
if err != nil {
t.Error(err)
}
}