Refactor the Scheduler interface

This commit refactors the Scheduler interface and does the corresponding changes to the flow controller

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-02-14 11:30:49 +08:00
parent 7569527f70
commit e3a9264162
6 changed files with 180 additions and 58 deletions

View File

@ -23,7 +23,7 @@ type Manager interface {
// Create a new execution
Create(*model.Execution) (int64, error)
// List the summaries of executions
List(*model.ExecutionQuery) (int64, []*model.Execution, error)
List(...*model.ExecutionQuery) (int64, []*model.Execution, error)
// Get the specified execution
Get(int64) (*model.Execution, error)
// Update the data of the specified execution, the "props" are the
@ -36,12 +36,17 @@ type Manager interface {
// Create a task
CreateTask(*model.Task) (int64, error)
// List the tasks according to the query
ListTasks(*model.TaskQuery) (int64, []*model.Task, error)
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
// Get one specified task
GetTask(int64) (*model.Task, error)
// Update the task, the "props" are the properties of task
// that need to be updated
// that need to be updated, it cannot include "status". If
// you want to update the status, use "UpdateTask" instead
UpdateTask(task *model.Task, props ...string) error
// UpdateTaskStatus only updates the task status. If "statusCondition"
// presents, only the tasks whose status equal to "statusCondition"
// will be updated
UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error
// Remove one task specified by task ID
RemoveTask(int64) error
// Remove all tasks of one execution specified by the execution ID

View File

@ -83,7 +83,19 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error
return id, nil
}
// schedule the replication
// preprocess the resources
if err = flow.preprocess(); err != nil {
log.Errorf("failed to preprocess the resources for the execution %d: %v", id, err)
return id, nil
}
// create task records in database
if err = flow.createTasks(); err != nil {
log.Errorf("failed to create task records for the execution %d: %v", id, err)
return id, nil
}
// schedule the tasks
if err = flow.schedule(); err != nil {
log.Errorf("failed to schedule the execution %d: %v", id, err)
return id, nil

View File

@ -19,6 +19,7 @@ import (
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -79,7 +80,7 @@ type fakedExecutionManager struct{}
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) List(*model.ExecutionQuery) (int64, []*model.Execution, error) {
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
@ -95,9 +96,9 @@ func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
return 0, nil
return 1, nil
}
func (f *fakedExecutionManager) ListTasks(*model.TaskQuery) (int64, []*model.Task, error) {
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
return 0, nil, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
@ -106,6 +107,9 @@ func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
return nil
}
@ -118,13 +122,25 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
type fakedScheduler struct{}
func (f *fakedScheduler) Schedule(src []*model.Resource, dst []*model.Resource) ([]*model.Task, error) {
return []*model.Task{
{
Status: model.TaskStatusPending,
JobID: "uuid",
},
}, nil
func (f *fakedScheduler) Preprocess(src []*model.Resource, dst []*model.Resource) ([]*scheduler.ScheduleItem, error) {
items := []*scheduler.ScheduleItem{}
for i, res := range src {
items = append(items, &scheduler.ScheduleItem{
SrcResource: res,
DstResource: dst[i],
})
}
return items, nil
}
func (f *fakedScheduler) Schedule(items []*scheduler.ScheduleItem) ([]*scheduler.ScheduleResult, error) {
results := []*scheduler.ScheduleResult{}
for _, item := range items {
results = append(results, &scheduler.ScheduleResult{
TaskID: item.TaskID,
Error: nil,
})
}
return results, nil
}
func (f *fakedScheduler) Stop(id string) error {
return nil
@ -153,7 +169,7 @@ func (f *fakedAdapter) FetchResources(namespace []string, filters []*model.Filte
{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: "hello-world",
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},

View File

@ -15,7 +15,9 @@
package flow
import (
"errors"
"fmt"
"strings"
"time"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
@ -29,15 +31,16 @@ import (
)
type flow struct {
policy *model.Policy
srcRegistry *model.Registry
dstRegistry *model.Registry
srcAdapter adapter.Adapter
dstAdapter adapter.Adapter
executionID int64
resources []*model.Resource
executionMgr execution.Manager
scheduler scheduler.Scheduler
policy *model.Policy
srcRegistry *model.Registry
dstRegistry *model.Registry
srcAdapter adapter.Adapter
dstAdapter adapter.Adapter
executionID int64
resources []*model.Resource
executionMgr execution.Manager
scheduler scheduler.Scheduler
scheduleItems []*scheduler.ScheduleItem
}
func newFlow(policy *model.Policy, registryMgr registry.Manager,
@ -143,7 +146,7 @@ func (f *flow) createNamespace() error {
return nil
}
func (f *flow) schedule() error {
func (f *flow) preprocess() error {
dstResources := []*model.Resource{}
for _, srcResource := range f.resources {
dstResource := &model.Resource{
@ -161,30 +164,78 @@ func (f *flow) schedule() error {
dstResources = append(dstResources, dstResource)
}
tasks, err := f.scheduler.Schedule(f.resources, dstResources)
items, err := f.scheduler.Preprocess(f.resources, dstResources)
if err != nil {
f.markExecutionFailure(err)
return err
}
f.scheduleItems = items
log.Debugf("the preprocess for resources of the execution %d completed",
f.executionID)
return nil
}
func (f *flow) createTasks() error {
for _, item := range f.scheduleItems {
task := &model.Task{
ExecutionID: f.executionID,
Status: model.TaskStatusInitialized,
ResourceType: item.SrcResource.Type,
SrcResource: getResourceName(item.SrcResource),
DstResource: getResourceName(item.DstResource),
}
id, err := f.executionMgr.CreateTask(task)
if err != nil {
// if failed to create the task for one of the items,
// the whole execution is marked as failure and all
// the items will not be submitted
f.markExecutionFailure(err)
return err
}
item.TaskID = id
log.Debugf("task record %d for the execution %d created",
id, f.executionID)
}
return nil
}
func (f *flow) schedule() error {
results, err := f.scheduler.Schedule(f.scheduleItems)
if err != nil {
f.markExecutionFailure(err)
return err
}
allFailed := true
for _, task := range tasks {
if task.Status != model.TaskStatusFailed {
allFailed = false
for _, result := range results {
// if the task is failed to be submitted, update the status of the
// task as failure
if result.Error != nil {
log.Errorf("failed to schedule task %d: %v", result.TaskID, err)
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusFailed); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
}
continue
}
task.ExecutionID = f.executionID
taskID, err := f.executionMgr.CreateTask(task)
if err != nil {
f.markExecutionFailure(err)
return err
allFailed = false
// if the task is submitted successfully, update the status and start time
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
}
log.Debugf("task record %d for execution %d created", taskID, f.executionID)
if err = f.executionMgr.UpdateTask(&model.Task{
ID: result.TaskID,
StartTime: time.Now(),
}); err != nil {
log.Errorf("failed to update task %d: %v", result.TaskID, err)
}
log.Debugf("the task %d scheduled", result.TaskID)
}
// if all the tasks are failed, mark the execution failed
if allFailed {
err = errors.New("all tasks are failed")
f.markExecutionFailure(err)
return err
}
return nil
}
@ -193,6 +244,8 @@ func (f *flow) markExecutionFailure(err error) {
if err != nil {
statusText = err.Error()
}
log.Errorf("the execution %d is marked as failure because of the error: %s",
f.executionID, statusText)
err = f.executionMgr.Update(
&model.Execution{
ID: f.executionID,
@ -204,3 +257,19 @@ func (f *flow) markExecutionFailure(err error) {
log.Errorf("failed to update the execution %d: %v", f.executionID, err)
}
}
// return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]"
// if the resource has vtags
func getResourceName(res *model.Resource) string {
if res == nil {
return ""
}
meta := res.Metadata
if meta == nil {
return ""
}
if len(meta.Vtags) == 0 {
return meta.Name
}
return meta.Name + ":[" + strings.Join(meta.Vtags, ",") + "]"
}

View File

@ -31,28 +31,31 @@ const (
ExecutionTriggerEvent string = "Event"
ExecutionTriggerSchedule string = "Schedule"
TaskStatusFailed string = "Failed"
TaskStatusSucceed string = "Succeed"
TaskStatusStopped string = "Stopped"
TaskStatusInProgress string = "InProgress"
TaskStatusPending string = "Pending"
// The task has been persisted in db but not submitted to Jobservice
TaskStatusInitialized string = "Initialized"
TaskStatusPending string = "Pending"
TaskStatusInProgress string = "InProgress"
TaskStatusSucceed string = "Succeed"
TaskStatusFailed string = "Failed"
TaskStatusStopped string = "Stopped"
)
// Execution defines an execution of the replication
type Execution struct {
ID int64 `json:"id"`
PolicyID int64 `json:"policy_id"`
Status string `json:"status"`
StatusText string `json:"status_text"`
Trigger string `json:"trigger"`
Total int `json:"total"`
Failed int `json:"failed"`
Succeed int `json:"succeed"`
Pending int `json:"pending"`
InProgress int `json:"in_progress"`
Stopped int `json:"stopped"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
ID int64 `json:"id"`
PolicyID int64 `json:"policy_id"`
Status string `json:"status"`
StatusText string `json:"status_text"`
Trigger string `json:"trigger"`
Total int `json:"total"`
Failed int `json:"failed"`
Succeed int `json:"succeed"`
Pending int `json:"pending"`
InProgress int `json:"in_progress"`
Stopped int `json:"stopped"`
Initialized int `json:"initialized"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
}
// Task holds the information of one replication task

View File

@ -18,10 +18,27 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
// Scheduler schedules tasks to transfer resource data
// ScheduleItem is an item that can be scheduled
type ScheduleItem struct {
TaskID int64 // used as the param in the hook
SrcResource *model.Resource
DstResource *model.Resource
}
// ScheduleResult is the result of the schedule for one item
type ScheduleResult struct {
TaskID int64
Error error
}
// Scheduler schedules
type Scheduler interface {
// Schedule tasks for one execution
Schedule([]*model.Resource, []*model.Resource) ([]*model.Task, error)
// Stop the task specified by ID
// Preprocess the resources and returns the item list that can be scheduled
Preprocess([]*model.Resource, []*model.Resource) ([]*ScheduleItem, error)
// Schedule the items. If got error when scheduling one of the items,
// the error should be put in the corresponding ScheduleResult and the
// returning error of this function should be nil
Schedule([]*ScheduleItem) ([]*ScheduleResult, error)
// Stop the job specified by ID
Stop(id string) error
}