update scheduler interface to follow the new standard

Signed-off-by: Yuan Lei <371304458@qq.com>
This commit is contained in:
Yuan Lei 2019-02-15 17:55:26 +08:00
parent bd84f5f490
commit 7108b5c58c
3 changed files with 162 additions and 136 deletions

View File

@ -1,117 +0,0 @@
package schedule
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/goharbor/harbor/src/common/job"
common_job "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client job.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client,
}
}
// Scheduler to schedule the tasks to transfer resource data
type Scheduler interface {
// Schedule tasks
Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error)
StopExecution(executionID string) error
}
// Schedule the tasks base on resources
func (d *DefaultReplicator) Schedule(srcResources []*model.Resource, destResources []*model.Resource) ([]*model.Task, error) {
if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources")
log.Errorf(err.Error())
return nil, err
}
var tasks []*model.Task
for index, srcResource := range srcResources {
destResource := destResources[index]
task := &model.Task{}
task.ResourceType = srcResource.Type
task.StartTime = time.Now().UTC()
src, err := json.Marshal(srcResource)
if err != nil {
log.Errorf("failed to marshal the srcResource of %v.err:%s!", srcResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.SrcResource = string(src)
dest, err := json.Marshal(destResource)
if err != nil {
log.Errorf("failed to marshal the destResource of %v.err:%s!", destResource, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.DstResource = string(dest)
task.Status = "Initial"
tasks = append(tasks, task)
}
return tasks, nil
}
// SubmitTasks transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) SubmitTasks(tasks []*model.Task) ([]*model.Task, error) {
for _, task := range tasks {
if task.ID == 0 {
err := errors.New("task do not have ID")
log.Errorf(err.Error())
return nil, err
}
job := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), task.ID),
}
job.Name = common_job.ImageTransfer
job.Parameters = map[string]interface{}{
"src_resource": task.SrcResource,
"dst_resource": task.DstResource,
}
uuid, err := d.client.SubmitJob(job)
if err != nil {
log.Errorf("failed to submit the task:%v .err:%s!", task, err.Error())
task.Status = "Error"
tasks = append(tasks, task)
continue
}
task.JobID = uuid
task.Status = "Pending"
tasks = append(tasks, task)
}
return tasks, nil
}
// StopExecution to stop the transfer job
func (d *DefaultReplicator) StopExecution(executionID string) error {
err := d.client.PostAction(executionID, opm.CtlCommandStop)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,134 @@
package scheduler
import (
"encoding/json"
"errors"
"fmt"
"github.com/goharbor/harbor/src/common/job"
common_job "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client job.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client,
}
}
// ScheduleItem is an item that can be scheduled
type ScheduleItem struct {
TaskID int64 // used as the param in the hook
SrcResource *model.Resource
DstResource *model.Resource
}
// ScheduleResult is the result of the schedule for one item
type ScheduleResult struct {
TaskID int64
Error error
}
// Scheduler schedules
type Scheduler interface {
// Preprocess the resources and returns the item list that can be scheduled
Preprocess([]*model.Resource, []*model.Resource) ([]*ScheduleItem, error)
// Schedule the items. If got error when scheduling one of the items,
// the error should be put in the corresponding ScheduleResult and the
// returning error of this function should be nil
Schedule([]*ScheduleItem) ([]*ScheduleResult, error)
// Stop the job specified by ID
Stop(id string) error
}
// Preprocess the resources and returns the item list that can be scheduled
func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) {
if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources")
log.Errorf(err.Error())
return nil, err
}
var items []*ScheduleItem
for index, srcResource := range srcResources {
destResource := destResources[index]
item := &ScheduleItem{
SrcResource: srcResource,
DstResource: destResource,
}
items = append(items, item)
}
return items, nil
}
// Schedule transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) {
var results []*ScheduleResult
for _, item := range items {
if item.TaskID == 0 {
err := errors.New("some tasks do not have a ID")
log.Errorf(err.Error())
return nil, err
}
result := &ScheduleResult{
TaskID: item.TaskID,
}
job := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.InternalCoreURL(), item.TaskID),
}
job.Name = common_job.ImageTransfer
src, err := json.Marshal(item.SrcResource)
if err != nil {
log.Errorf("failed to marshal the srcResource of %v.err:%s!", item.SrcResource, err.Error())
result.Error = err
results = append(results, result)
continue
}
dest, err := json.Marshal(item.DstResource)
if err != nil {
log.Errorf("failed to marshal the dstResource of %v.err:%s!", item.DstResource, err.Error())
result.Error = err
results = append(results, result)
continue
}
job.Parameters = map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
}
_, joberr := d.client.SubmitJob(job)
if joberr != nil {
log.Errorf("failed to submit the task:%v .err:%s!", item, joberr.Error())
result.Error = joberr
results = append(results, result)
continue
}
results = append(results, result)
}
return results, nil
}
// Stop the transfer job
func (d *DefaultReplicator) Stop(id string) error {
err := d.client.PostAction(id, opm.CtlCommandStop)
if err != nil {
return err
}
return nil
}

View File

@ -1,6 +1,7 @@
package schedule
package scheduler
import (
"encoding/json"
"testing"
"github.com/goharbor/harbor/src/common/job/models"
@ -26,43 +27,51 @@ func (client TestClient) PostAction(uuid, action string) error {
return nil
}
func TestDefaultReplicator_Schedule(t *testing.T) {
tasks, err := generateData()
func TestDefaultReplicator_Preprocess(t *testing.T) {
items, err := generateData()
if err != nil {
t.Error(err)
}
for _, task := range tasks {
t.Log(*task)
for _, item := range items {
content, err := json.Marshal(item)
if err != nil {
t.Error(err)
}
t.Log(string(content))
}
}
//func TestDefaultReplicator_SubmitJobs(t *testing.T) {
// config.Init()
// tasks, err := generateData()
//func TestDefaultReplicator_Schedule(t *testing.T) {
// // config.Init()
// items, err := generateData()
// if err != nil {
// t.Error(err)
// }
// for _, task := range tasks {
// task.ID = 22
// for _, item := range items {
// item.TaskID = 22
// }
// newTasks, newErr := replicator.SubmitTasks(tasks)
// results, newErr := replicator.Schedule(items)
// if newErr != nil {
// t.Error(newErr)
// }
// for _, task := range newTasks {
// t.Log(*task)
// for _, result := range results {
// content, err := json.Marshal(result)
// if err != nil {
// t.Error(err)
// }
// t.Log(string(content))
// }
//}
func TestDefaultReplicator_StopExecution(t *testing.T) {
err := replicator.StopExecution("id")
//
func TestDefaultReplicator_Stop(t *testing.T) {
err := replicator.Stop("id")
if err != nil {
t.Error(err)
}
}
func generateData() ([]*model.Task, error) {
func generateData() ([]*ScheduleItem, error) {
srcResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: &model.Namespace{
@ -95,6 +104,6 @@ func generateData() ([]*model.Task, error) {
Credential: &model.Credential{},
},
}
tasks, err := replicator.Schedule([]*model.Resource{srcResource}, []*model.Resource{destResource})
return tasks, err
items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource})
return items, err
}