Implement replication operation API

This commit implements the replication operation related APIs

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-02-28 16:43:07 +08:00
parent 2cce835784
commit ec2a7f9239
9 changed files with 841 additions and 8 deletions

View File

@ -1539,6 +1539,171 @@ paths:
description: User need to login first.
'500':
description: Unexpected internal errors.
/replication/executions:
get:
summary: List replication executions.
description: |
This endpoint let user list replication executions.
parameters:
- name: policy_id
in: query
type: integer
required: false
description: The policy ID.
- name: status
in: query
type: string
required: false
description: The execution status.
- name: trigger
in: query
type: string
required: false
description: The trigger mode.
- name: page
in: query
type: integer
required: false
description: The page.
- name: page_size
in: query
type: integer
required: false
description: The page size.
tags:
- Products
responses:
'200':
description: Success
schema:
type: array
items:
$ref: '#/definitions/ReplicationExecution'
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'500':
description: Unexpected internal errors.
post:
summary: Start one execution of the replication.
description: |
This endpoint is for user to start one execution of the replication.
parameters:
- name: execution
in: body
description: The execution that needs to be started, only the property "policy_id" is needed.
required: true
schema:
$ref: '#/definitions/ReplicationExecution'
tags:
- Products
responses:
'201':
description: Success.
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'415':
$ref: '#/responses/UnsupportedMediaType'
'500':
description: Unexpected internal errors.
/replication/executions/{id}:
put:
summary: Stop the execution of the replication.
description: |
This endpoint is for user to stop one execution of the replication.
parameters:
- name: id
in: path
type: integer
format: int64
description: The execution ID.
required: true
tags:
- Products
responses:
'200':
description: Success.
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'415':
$ref: '#/responses/UnsupportedMediaType'
'500':
description: Unexpected internal errors.
/replication/executions/{id}/tasks:
get:
summary: Get the task list of one execution.
description: |
This endpoint is for user to get the task list of one execution.
parameters:
- name: id
in: path
type: integer
format: int64
description: The execution ID.
required: true
tags:
- Products
responses:
'200':
description: Success.
schema:
type: array
items:
$ref: '#/definitions/ReplicationTask'
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'500':
description: Unexpected internal errors.
/replication/executions/{id}/tasks/{task_id}/log:
get:
summary: Get the log of one task.
description: |
This endpoint is for user to get the log of one task.
parameters:
- name: id
in: path
type: integer
format: int64
description: The execution ID.
required: true
- name: task_id
in: path
type: integer
format: int64
description: The task ID.
required: true
tags:
- Products
responses:
'200':
description: Success.
'400':
description: Bad request.
'401':
description: User need to login first.
'403':
description: User has no privilege for the operation.
'404':
description: Resource requested does not exist.
'500':
description: Unexpected internal errors.
/jobs/replication:
get:
summary: List filters jobs according to the policy and repository
@ -4813,3 +4978,74 @@ definitions:
description: The filters that the adapter supports
items:
type: string
ReplicationExecution:
type: object
description: The replication execution
properties:
id:
type: integer
description: The ID
policy_id:
type: integer
description: The policy ID
status:
type: string
description: The status
status_text:
type: string
description: The status text
trigger:
type: string
description: The trigger mode
total:
type: integer
description: The total count of all tasks
failed:
type: integer
description: The count of failed tasks
succeed:
type: integer
description: The count of succeed tasks
in_progress:
type: integer
description: The count of in_progress tasks
stopped:
type: integer
description: The count of stopped tasks
start_time:
type: string
description: The start time
end_time:
type: string
description: The end time
ReplicationTask:
type: object
description: The replication task
properties:
id:
type: integer
description: The ID
execution_id:
type: integer
description: The execution ID
resource_type:
type: string
description: The resource type
src_resource:
type: string
description: The source resource
dst_resource:
type: string
description: The destination resource
job_id:
type: string
description: The job ID
status:
type: string
description: The status
start_time:
type: string
description: The start time
end_time:
type: string
description: The end time

View File

@ -42,8 +42,10 @@ const (
ResourceLog = Resource("log")
ResourceMember = Resource("member")
ResourceMetadata = Resource("metadata")
ResourceReplication = Resource("replication")
ResourceReplicationJob = Resource("replication-job")
ResourceReplication = Resource("replication") // TODO remove
ResourceReplicationJob = Resource("replication-job") // TODO remove
ResourceReplicationExecution = Resource("replication-execution")
ResourceReplicationTask = Resource("replication-task")
ResourceRepository = Resource("repository")
ResourceRepositoryLabel = Resource("repository-label")
ResourceRepositoryTag = Resource("repository-tag")

View File

@ -75,6 +75,18 @@ var (
{Resource: rbac.ResourceReplicationJob, Action: rbac.ActionRead},
{Resource: rbac.ResourceReplicationJob, Action: rbac.ActionList},
{Resource: rbac.ResourceReplicationExecution, Action: rbac.ActionRead},
{Resource: rbac.ResourceReplicationExecution, Action: rbac.ActionList},
{Resource: rbac.ResourceReplicationExecution, Action: rbac.ActionCreate},
{Resource: rbac.ResourceReplicationExecution, Action: rbac.ActionUpdate},
{Resource: rbac.ResourceReplicationExecution, Action: rbac.ActionDelete},
{Resource: rbac.ResourceReplicationTask, Action: rbac.ActionRead},
{Resource: rbac.ResourceReplicationTask, Action: rbac.ActionList},
{Resource: rbac.ResourceReplicationTask, Action: rbac.ActionCreate},
{Resource: rbac.ResourceReplicationTask, Action: rbac.ActionUpdate},
{Resource: rbac.ResourceReplicationTask, Action: rbac.ActionDelete},
{Resource: rbac.ResourceLabel, Action: rbac.ActionCreate},
{Resource: rbac.ResourceLabel, Action: rbac.ActionRead},
{Resource: rbac.ResourceLabel, Action: rbac.ActionUpdate},

View File

@ -153,6 +153,10 @@ func init() {
beego.Router("/api/replication/adapters", &ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/adapters/:type", &ReplicationAdapterAPI{}, "get:Get")
beego.Router("/api/replication/executions", &ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &ReplicationOperationAPI{}, "put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &ReplicationOperationAPI{}, "get:GetTaskLog")
// Charts are controlled under projects
chartRepositoryAPIType := &ChartRepositoryAPI{}

View File

@ -0,0 +1,223 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"fmt"
"net/http"
"strconv"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// ReplicationOperationAPI handles the replication operation requests
type ReplicationOperationAPI struct {
BaseController
}
// Prepare ...
func (r *ReplicationOperationAPI) Prepare() {
r.BaseController.Prepare()
// TODO if we delegate the jobservice to trigger the scheduled replication,
// add the logic to check whether the user is a solution user
if !r.SecurityCtx.IsSysAdmin() {
if !r.SecurityCtx.IsAuthenticated() {
r.HandleUnauthorized()
return
}
r.HandleForbidden(r.SecurityCtx.GetUsername())
return
}
}
// The API is open only for system admin currently, we can use
// the code commentted below to make the API available to the
// users who have permission for all projects that the policy
// refers
/*
func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac.Resource, action rbac.Action) bool {
projects := []string{}
// pull mode
if policy.SrcRegistryID != 0 {
projects = append(projects, policy.DestNamespace)
} else {
// push mode
projects = append(projects, policy.SrcNamespaces...)
}
for _, project := range projects {
resource := rbac.NewProjectNamespace(project).Resource(resource)
if !r.SecurityCtx.Can(action, resource) {
r.HandleForbidden(r.SecurityCtx.GetUsername())
return false
}
}
return true
}
*/
// ListExecutions ...
func (r *ReplicationOperationAPI) ListExecutions() {
query := &model.ExecutionQuery{
Status: r.GetString("status"),
Trigger: r.GetString("trigger"),
}
if len(r.GetString("policy_id")) > 0 {
policyID, err := r.GetInt64("policy_id")
if err != nil || policyID <= 0 {
r.HandleBadRequest(fmt.Sprintf("invalid policy_id %s", r.GetString("policy_id")))
return
}
query.PolicyID = policyID
}
query.Page, query.Size = r.GetPaginationParams()
total, executions, err := ng.OperationCtl.ListExecutions(query)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to list executions: %v", err))
return
}
r.SetPaginationHeader(total, query.Page, query.Size)
r.WriteJSONData(executions)
}
// CreateExecution starts a replication
func (r *ReplicationOperationAPI) CreateExecution() {
execution := &model.Execution{}
r.DecodeJSONReq(execution)
policy, err := ng.PolicyMgr.Get(execution.PolicyID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", execution.PolicyID, err))
return
}
if policy == nil {
r.HandleNotFound(fmt.Sprintf("policy %d not found", execution.PolicyID))
return
}
executionID, err := ng.OperationCtl.StartReplication(policy)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to start replication for policy %d: %v", execution.PolicyID, err))
return
}
r.Redirect(http.StatusCreated, strconv.FormatInt(executionID, 10))
}
// StopExecution stops one execution of the replication
func (r *ReplicationOperationAPI) StopExecution() {
executionID, err := r.GetInt64FromPath(":id")
if err != nil || executionID <= 0 {
r.HandleBadRequest("invalid execution ID")
return
}
execution, err := ng.OperationCtl.GetExecution(executionID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get execution %d: %v", executionID, err))
return
}
if execution == nil {
r.HandleNotFound(fmt.Sprintf("execution %d not found", executionID))
return
}
if err := ng.OperationCtl.StopReplication(executionID); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to stop execution %d: %v", executionID, err))
return
}
}
// ListTasks ...
func (r *ReplicationOperationAPI) ListTasks() {
executionID, err := r.GetInt64FromPath(":id")
if err != nil || executionID <= 0 {
r.HandleBadRequest("invalid execution ID")
return
}
execution, err := ng.OperationCtl.GetExecution(executionID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get execution %d: %v", executionID, err))
return
}
if execution == nil {
r.HandleNotFound(fmt.Sprintf("execution %d not found", executionID))
return
}
query := &model.TaskQuery{
ExecutionID: executionID,
ResourceType: (model.ResourceType)(r.GetString("resource_type")),
Status: r.GetString("status"),
}
query.Page, query.Size = r.GetPaginationParams()
total, tasks, err := ng.OperationCtl.ListTasks(query)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to list tasks: %v", err))
return
}
r.SetPaginationHeader(total, query.Page, query.Size)
r.WriteJSONData(tasks)
}
// GetTaskLog ...
func (r *ReplicationOperationAPI) GetTaskLog() {
executionID, err := r.GetInt64FromPath(":id")
if err != nil || executionID <= 0 {
r.HandleBadRequest("invalid execution ID")
return
}
execution, err := ng.OperationCtl.GetExecution(executionID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get execution %d: %v", executionID, err))
return
}
if execution == nil {
r.HandleNotFound(fmt.Sprintf("execution %d not found", executionID))
return
}
taskID, err := r.GetInt64FromPath(":tid")
if err != nil || taskID <= 0 {
r.HandleBadRequest("invalid task ID")
return
}
task, err := ng.OperationCtl.GetTask(taskID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get task %d: %v", taskID, err))
return
}
if task == nil {
r.HandleNotFound(fmt.Sprintf("task %d not found", taskID))
return
}
logBytes, err := ng.OperationCtl.GetTaskLog(taskID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get log of task %d: %v", taskID, err))
return
}
r.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes)))
r.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain")
_, err = r.Ctx.ResponseWriter.Write(logBytes)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to write log of task %d: %v", taskID, err))
return
}
}

View File

@ -0,0 +1,344 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"net/http"
"testing"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/model"
)
type fakedOperationController struct{}
func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64, error) {
return 1, nil
}
func (f *fakedOperationController) StopReplication(int64) error {
return nil
}
func (f *fakedOperationController) ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return 1, []*model.Execution{
{
ID: 1,
PolicyID: 1,
},
}, nil
}
func (f *fakedOperationController) GetExecution(id int64) (*model.Execution, error) {
if id == 1 {
return &model.Execution{
ID: 1,
PolicyID: 1,
}, nil
}
return nil, nil
}
func (f *fakedOperationController) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
return 1, []*model.Task{
{
ID: 1,
ExecutionID: 1,
},
}, nil
}
func (f *fakedOperationController) GetTask(id int64) (*model.Task, error) {
if id == 1 {
return &model.Task{
ID: 1,
ExecutionID: 1,
}, nil
}
return nil, nil
}
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {
return []byte("success"), nil
}
type fakedPolicyManager struct{}
func (f *fakedPolicyManager) Create(*model.Policy) (int64, error) {
return 0, nil
}
func (f *fakedPolicyManager) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
return 0, nil, nil
}
func (f *fakedPolicyManager) Get(id int64) (*model.Policy, error) {
if id == 1 {
return &model.Policy{
ID: 1,
SrcRegistryID: 1,
SrcNamespaces: []string{"library"},
DestRegistryID: 2,
}, nil
}
return nil, nil
}
func (f *fakedPolicyManager) Update(*model.Policy, ...string) error {
return nil
}
func (f *fakedPolicyManager) Remove(int64) error {
return nil
}
func TestListExecutions(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 200
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}
func TestCreateExecution(t *testing.T) {
operationCtl := ng.OperationCtl
policyMgr := ng.PolicyMgr
defer func() {
ng.OperationCtl = operationCtl
ng.PolicyMgr = policyMgr
}()
ng.OperationCtl = &fakedOperationController{}
ng.PolicyMgr = &fakedPolicyManager{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/replication/executions",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/replication/executions",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/replication/executions",
bodyJSON: &model.Execution{
PolicyID: 2,
},
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 201
{
request: &testingRequest{
method: http.MethodPost,
url: "/api/replication/executions",
bodyJSON: &model.Execution{
PolicyID: 1,
},
credential: sysAdmin,
},
code: http.StatusCreated,
},
}
runCodeCheckingCases(t, cases...)
}
func TestStopExecution(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodPut,
url: "/api/replication/executions/1",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodPut,
url: "/api/replication/executions/1",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404
{
request: &testingRequest{
method: http.MethodPut,
url: "/api/replication/executions/2",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 200
{
request: &testingRequest{
method: http.MethodPut,
url: "/api/replication/executions/1",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}
func TestListTasks(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/2/tasks",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 200
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}
func TestGetTaskLog(t *testing.T) {
operationCtl := ng.OperationCtl
defer func() {
ng.OperationCtl = operationCtl
}()
ng.OperationCtl = &fakedOperationController{}
cases := []*codeCheckingCase{
// 401
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks/1/log",
},
code: http.StatusUnauthorized,
},
// 403
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks/1/log",
credential: nonSysAdmin,
},
code: http.StatusForbidden,
},
// 404, execution not found
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/2/tasks/1/log",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 404, task not found
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks/2/log",
credential: sysAdmin,
},
code: http.StatusNotFound,
},
// 200
{
request: &testingRequest{
method: http.MethodGet,
url: "/api/replication/executions/1/tasks/1/log",
credential: sysAdmin,
},
code: http.StatusOK,
},
}
runCodeCheckingCases(t, cases...)
}

View File

@ -101,6 +101,10 @@ func initRouters() {
beego.Router("/api/replication/adapters", &api.ReplicationAdapterAPI{}, "get:List")
beego.Router("/api/replication/adapters/:type", &api.ReplicationAdapterAPI{}, "get:Get")
beego.Router("/api/replication/executions", &api.ReplicationOperationAPI{}, "get:ListExecutions;post:CreateExecution")
beego.Router("/api/replication/executions/:id([0-9]+)", &api.ReplicationOperationAPI{}, "put:StopExecution")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks", &api.ReplicationOperationAPI{}, "get:ListTasks")
beego.Router("/api/replication/executions/:id([0-9]+)/tasks/:tid([0-9]+)/log", &api.ReplicationOperationAPI{}, "get:GetTaskLog")
beego.Router("/api/internal/configurations", &api.ConfigAPI{}, "get:GetInternalConfig;put:Put")
beego.Router("/api/configurations", &api.ConfigAPI{}, "get:Get;put:Put")

View File

@ -28,6 +28,7 @@ type Controller interface {
ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error)
GetExecution(int64) (*model.Execution, error)
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
GetTask(int64) (*model.Task, error)
GetTaskLog(int64) ([]byte, error)
}
@ -59,6 +60,9 @@ func (d *defaultController) GetExecution(executionID int64) (*model.Execution, e
func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) {
return d.executionMgr.ListTasks(query...)
}
func (d *defaultController) GetTask(id int64) (*model.Task, error) {
return d.executionMgr.GetTask(id)
}
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID)
}

View File

@ -19,19 +19,22 @@ package ng
import (
"fmt"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
var (
// PolicyMgr is a global policy manager
PolicyMgr policy.Manager
// RegistryMgr is a global registry manager
RegistryMgr registry.Manager
// ExecutionMgr is a global execution manager
ExecutionMgr execution.Manager
// OperationCtl is a global operation controller
OperationCtl operation.Controller
)
@ -40,16 +43,17 @@ var (
func Init() error {
// Init registry manager
RegistryMgr = registry.NewDefaultManager()
// TODO init PolicyMgr
// TODO init ExecutionMgr
var executionMgr execution.Manager
// TODO init scheduler
var scheduler scheduler.Scheduler
flowCtl, err := flow.NewController(RegistryMgr, ExecutionMgr, scheduler)
flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler)
if err != nil {
return fmt.Errorf("failed to create the flow controller: %v", err)
}
OperationCtl = operation.NewController(flowCtl, ExecutionMgr)
OperationCtl = operation.NewController(flowCtl, executionMgr)
return nil
}