Merge pull request #6917 from yuanshuhan/replication_ng

scheduler module for replication
This commit is contained in:
Wenkai Yin 2019-03-05 16:15:31 +08:00 committed by GitHub
commit e1a7bfa32d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 174 additions and 0 deletions

View File

@ -15,9 +15,30 @@
package scheduler
import (
"encoding/json"
"errors"
"fmt"
"github.com/goharbor/harbor/src/common/job"
common_job "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct {
client job.Client
}
// NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client job.Client) *DefaultReplicator {
return &DefaultReplicator{
client: client,
}
}
// ScheduleItem is an item that can be scheduled
type ScheduleItem struct {
TaskID int64 // used as the param in the hook
@ -42,3 +63,81 @@ type Scheduler interface {
// Stop the job specified by ID
Stop(id string) error
}
// Preprocess the resources and returns the item list that can be scheduled
func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) {
if len(srcResources) != len(destResources) {
err := errors.New("srcResources has different length with destResources")
return nil, err
}
var items []*ScheduleItem
for index, srcResource := range srcResources {
destResource := destResources[index]
item := &ScheduleItem{
SrcResource: srcResource,
DstResource: destResource,
}
items = append(items, item)
}
return items, nil
}
// Schedule transfer the tasks to jobs,and then submit these jobs to job service.
func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) {
var results []*ScheduleResult
for _, item := range items {
result := &ScheduleResult{
TaskID: item.TaskID,
}
if item.TaskID == 0 {
result.Error = errors.New("some tasks do not have a ID")
results = append(results, result)
continue
}
job := &models.JobData{
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID),
}
job.Name = common_job.ImageTransfer
src, err := json.Marshal(item.SrcResource)
if err != nil {
result.Error = err
results = append(results, result)
continue
}
dest, err := json.Marshal(item.DstResource)
if err != nil {
result.Error = err
results = append(results, result)
continue
}
job.Parameters = map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
}
_, joberr := d.client.SubmitJob(job)
if joberr != nil {
result.Error = joberr
results = append(results, result)
continue
}
results = append(results, result)
}
return results, nil
}
// Stop the transfer job
func (d *DefaultReplicator) Stop(id string) error {
err := d.client.PostAction(id, opm.CtlCommandStop)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,75 @@
package scheduler
import (
"encoding/json"
"testing"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/replication/ng/model"
)
var replicator *DefaultReplicator
func init() {
replicator = NewDefaultReplicator(TestClient{})
}
type TestClient struct {
}
func (client TestClient) SubmitJob(*models.JobData) (string, error) {
return "submited-uuid", nil
}
func (client TestClient) GetJobLog(uuid string) ([]byte, error) {
return []byte("job log"), nil
}
func (client TestClient) PostAction(uuid, action string) error {
return nil
}
func TestDefaultReplicator_Preprocess(t *testing.T) {
items, err := generateData()
if err != nil {
t.Error(err)
}
for _, item := range items {
content, err := json.Marshal(item)
if err != nil {
t.Error(err)
}
t.Log(string(content))
}
}
func TestDefaultReplicator_Stop(t *testing.T) {
err := replicator.Stop("id")
if err != nil {
t.Error(err)
}
}
func generateData() ([]*ScheduleItem, error) {
srcResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: "namespace1",
Vtags: []string{"latest"},
Labels: []string{"latest"},
},
Registry: &model.Registry{
Credential: &model.Credential{},
},
}
destResource := &model.Resource{
Metadata: &model.ResourceMetadata{
Namespace: "namespace2",
Vtags: []string{"v1", "v2"},
Labels: []string{"latest"},
},
Registry: &model.Registry{
Credential: &model.Credential{},
},
}
items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource})
return items, err
}