sperate the logics to two part: 1,tasks generating. 2,tasks submit

Signed-off-by: Yuan Lei <371304458@qq.com>
This commit is contained in:
Yuan Lei 2019-02-13 17:47:07 +08:00
parent 956a4c4f05
commit bd84f5f490
2 changed files with 171 additions and 124 deletions

View File

@ -2,11 +2,15 @@ package schedule
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt"
"time" "time"
"github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job"
common_job "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
) )
@ -27,63 +31,84 @@ func NewDefaultReplicator(client job.Client) *DefaultReplicator {
type Scheduler interface { type Scheduler interface {
// Schedule tasks // Schedule tasks
Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error)
StopTransfer(jobID string) error StopExecution(executionID string) error
} }
// Schedule the task to transfer resouce data // Schedule the tasks base on resources
func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) { func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) {
if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources")
log.Errorf(err.Error())
return nil, err
}
var tasks []*model.Task var tasks []*model.Task
for _, destResource := range destResources { for index, srcResource := range srcResources {
destResource := destResources[index]
for _, srcResource := range srcResources { task := &model.Task{}
task := &model.Task{} task.ResourceType = srcResource.Type
task.ResourceType = srcResource.Type task.StartTime = time.Now().UTC()
task.StartTime = time.Now().UTC() src, err := json.Marshal(srcResource)
src, err := json.Marshal(srcResource) if err != nil {
if err != nil { log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error())
log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error()) task.Status = "Error"
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.SrcResource = string(src)
dest, err := json.Marshal(destResource)
if err != nil {
log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.DstResource = string(dest)
newjob := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
}
newjob.Name = job.ImageTransfer
newjob.Parameters = map[string]interface{}{
"src_resource": srcResource,
"dst_resource": destResource,
}
uuid, err := d.client.SubmitJob(newjob)
if err != nil {
log.Errorf("failed to submit the job from %v to %v.err:%s!", srcResource, destResource, err.Error())
}
task.JobID = uuid
task.Status = ""
tasks = append(tasks, task) tasks = append(tasks, task)
continue
} }
task.SrcResource = string(src)
dest, err := json.Marshal(destResource)
if err != nil {
log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.DstResource = string(dest)
task.Status = "Initial"
tasks = append(tasks, task)
} }
return tasks, nil return tasks, nil
} }
// StopTransfer to stop the transfer job // SubmitTasks transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) StopTransfer(jobID string) error { func (d *DefaultReplicator) SubmitTasks(tasks []*model.Task) ([]*model.Task, error) {
for _, task := range tasks {
if task.ID == 0 {
err := errors.New("task do not have ID")
log.Errorf(err.Error())
return nil, err
}
job := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), task.ID),
}
err := d.client.PostAction(jobID, opm.CtlCommandStop) job.Name = common_job.ImageTransfer
job.Parameters = map[string]interface{}{
"src_resource": task.SrcResource,
"dst_resource": task.DstResource,
}
uuid, err := d.client.SubmitJob(job)
if err != nil {
log.Errorf("failed to submit the task:%v .err:%s!", task, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.JobID = uuid
task.Status = "Pending"
tasks = append(tasks, task)
}
return tasks, nil
}
// StopExecution to stop the transfer job
func (d *DefaultReplicator) StopExecution(executionID string) error {
err := d.client.PostAction(executionID, opm.CtlCommandStop)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,78 +1,100 @@
package schedule package schedule
import ( import (
"testing" "testing"
"github.com/goharbor/harbor/src/common/job/models" "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/model"
) )
var replicator *DefaultReplicator var replicator *DefaultReplicator
func init() { func init() {
replicator = NewDefaultReplicator(TestClient{}) replicator = NewDefaultReplicator(TestClient{})
} }
type TestClient struct { type TestClient struct {
} }
func (client TestClient) SubmitJob(*models.JobData) (string, error) { func (client TestClient) SubmitJob(*models.JobData) (string, error) {
return "submited-uuid", nil return "submited-uuid", nil
} }
func (client TestClient) GetJobLog(uuid string) ([]byte, error) { func (client TestClient) GetJobLog(uuid string) ([]byte, error) {
return []byte("job log"), nil return []byte("job log"), nil
} }
func (client TestClient) PostAction(uuid, action string) error { func (client TestClient) PostAction(uuid, action string) error {
return nil return nil
} }
func TestDefaultReplicator_Schedule(t *testing.T) { func TestDefaultReplicator_Schedule(t *testing.T) {
tasks, err := generateData()
srcResource := &model.Resource{ if err != nil {
Metadata: &model.ResourceMetadata{ t.Error(err)
Namespace: &model.Namespace{ }
Metadata: map[string]interface{}{ for _, task := range tasks {
"resource": "1", t.Log(*task)
"dst_registry": "1", }
"namespace": "1",
}, }
},
Vtags: []string{"latest"}, //func TestDefaultReplicator_SubmitJobs(t *testing.T) {
Labels: []string{"latest"}, // config.Init()
}, // tasks, err := generateData()
Registry: &model.Registry{ // if err != nil {
Credential: &model.Credential{}, // t.Error(err)
}, // }
} // for _, task := range tasks {
destResource := &model.Resource{ // task.ID = 22
Metadata: &model.ResourceMetadata{ // }
Namespace: &model.Namespace{ // newTasks, newErr := replicator.SubmitTasks(tasks)
Metadata: map[string]interface{}{ // if newErr != nil {
"resource": "1", // t.Error(newErr)
"dst_registry": "1", // }
"namespace": "1", // for _, task := range newTasks {
}, // t.Log(*task)
}, // }
Vtags: []string{"latest"}, //}
Labels: []string{"latest"},
}, func TestDefaultReplicator_StopExecution(t *testing.T) {
Registry: &model.Registry{ err := replicator.StopExecution("id")
Credential: &model.Credential{}, if err != nil {
}, t.Error(err)
} }
tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource}) }
if err != nil {
t.Error(err) func generateData() ([]*model.Task, error) {
} srcResource := &model.Resource{
for _, task := range tasks { Metadata: &model.ResourceMetadata{
t.Log(*task) Namespace: &model.Namespace{
} Metadata: map[string]interface{}{
"resource": "1",
} "dst_registry": "1",
"namespace": "1",
func TestDefaultReplicator_StopTransfer(t *testing.T) { },
err := replicator.StopTransfer("job_id") },
if err != nil { Vtags: []string{"latest"},
t.Error(err) Labels: []string{"latest"},
} },
} Registry: &model.Registry{
Credential: &model.Credential{},
},
}
destResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: &model.Namespace{
Metadata: map[string]interface{}{
"resource": "2",
"dst_registry": "2",
"namespace": "2",
},
},
Vtags: []string{"latest"},
Labels: []string{"latest"},
},
Registry: &model.Registry{
Credential: &model.Credential{},
},
}
tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource})
return tasks, err
}