Implement the operation controller

This commit implements the operation controller. The operation controller wraps the flow controller and execution manager to provide capabilities for the upper level

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-02-21 16:52:53 +08:00
parent 60a6113323
commit 8894a27d2d
6 changed files with 277 additions and 2 deletions

View File

@ -41,7 +41,7 @@ type Manager interface {
GetTask(int64) (*model.Task, error)
// Update the task, the "props" are the properties of task
// that need to be updated, it cannot include "status". If
// you want to update the status, use "UpdateTask" instead
// you want to update the status, use "UpdateTaskStatus" instead
UpdateTask(task *model.Task, props ...string) error
// UpdateTaskStatus only updates the task status. If "statusCondition"
// presents, only the tasks whose status equal to "statusCondition"

View File

@ -55,7 +55,7 @@ type defaultController struct {
scheduler scheduler.Scheduler
}
// Replicate according the to policy ID
// Start a replication according to the policy
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
log.Infof("starting the replication based on the policy %d ...", policy.ID)

View File

@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// Controller handles the replication-related operations: start,
// stop, query, etc.
type Controller interface {
StartReplication(policy *model.Policy) (int64, error)
StopReplication(int64) error
ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error)
GetExecution(int64) (*model.Execution, error)
ListTasks(...*model.TaskQuery) (int64, []*model.Task, error)
GetTaskLog(int64) ([]byte, error)
}
// NewController returns a controller implementation
func NewController(flowCtl flow.Controller, executionMgr execution.Manager) Controller {
return &defaultController{
flowCtl: flowCtl,
executionMgr: executionMgr,
}
}
type defaultController struct {
flowCtl flow.Controller
executionMgr execution.Manager
}
func (d *defaultController) StartReplication(policy *model.Policy) (int64, error) {
return d.flowCtl.StartReplication(policy)
}
func (d *defaultController) StopReplication(executionID int64) error {
return d.flowCtl.StopReplication(executionID)
}
func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return d.executionMgr.List(query...)
}
func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) {
return d.executionMgr.Get(executionID)
}
func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) {
return d.executionMgr.ListTasks(query...)
}
func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) {
return d.executionMgr.GetTaskLog(taskID)
}

View File

@ -0,0 +1,126 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operation
import (
"testing"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakedFlowController struct{}
func (f *fakedFlowController) StartReplication(policy *model.Policy) (int64, error) {
return 1, nil
}
func (f *fakedFlowController) StopReplication(int64) error {
return nil
}
type fakedExecutionManager struct{}
func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) {
return 1, []*model.Execution{
{
ID: 1,
},
}, nil
}
func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) {
return &model.Execution{
ID: 1,
}, nil
}
func (f *fakedExecutionManager) Update(*model.Execution, ...string) error {
return nil
}
func (f *fakedExecutionManager) Remove(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) {
return 1, nil
}
func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) {
return 1, []*model.Task{
{
ID: 1,
},
}, nil
}
func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) {
return nil, nil
}
func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error {
return nil
}
func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error {
return nil
}
func (f *fakedExecutionManager) RemoveTask(int64) error {
return nil
}
func (f *fakedExecutionManager) RemoveAllTasks(int64) error {
return nil
}
func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
return []byte("message"), nil
}
var ctl = NewController(&fakedFlowController{}, &fakedExecutionManager{})
func TestStartReplication(t *testing.T) {
id, err := ctl.StartReplication(nil)
require.Nil(t, err)
assert.Equal(t, int64(1), id)
}
func TestStopReplication(t *testing.T) {
err := ctl.StopReplication(1)
require.Nil(t, err)
}
func TestListExecutions(t *testing.T) {
n, executions, err := ctl.ListExecutions()
require.Nil(t, err)
assert.Equal(t, int64(1), n)
assert.Equal(t, int64(1), executions[0].ID)
}
func TestGetExecution(t *testing.T) {
execution, err := ctl.GetExecution(1)
require.Nil(t, err)
assert.Equal(t, int64(1), execution.ID)
}
func TestListTasks(t *testing.T) {
n, tasks, err := ctl.ListTasks()
require.Nil(t, err)
assert.Equal(t, int64(1), n)
assert.Equal(t, int64(1), tasks[0].ID)
}
func TestGetTaskLog(t *testing.T) {
log, err := ctl.GetTaskLog(1)
require.Nil(t, err)
assert.Equal(t, "message", string(log))
}

View File

@ -0,0 +1,54 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package ng ...
// TODO rename the package name after removing ng
package ng
import (
"fmt"
"github.com/goharbor/harbor/src/replication/ng/scheduler"
"github.com/goharbor/harbor/src/replication/ng/execution"
"github.com/goharbor/harbor/src/replication/ng/flow"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/registry"
)
var (
// RegistryMgr is a global registry manager
RegistryMgr registry.Manager
// ExecutionMgr is a global execution manager
ExecutionMgr execution.Manager
// OperationCtl is a global operation controller
OperationCtl operation.Controller
)
// Init the global variables
func Init() error {
// TODO init RegistryMgr
// TODO init ExecutionMgr
// TODO init scheduler
var scheduler scheduler.Scheduler
flowCtl, err := flow.NewController(RegistryMgr, ExecutionMgr, scheduler)
if err != nil {
return fmt.Errorf("failed to create the flow controller: %v", err)
}
OperationCtl = operation.NewController(flowCtl, ExecutionMgr)
return nil
}

View File

@ -0,0 +1,31 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package ng ...
// TODO rename the package name after removing ng
package ng
import (
"testing"
// "github.com/stretchr/testify/assert"
// "github.com/stretchr/testify/require"
)
func TestInit(t *testing.T) {
// TODO add testing code
// err := Init()
// require.Nil(t, err)
// assert.NotNil(t, OperationCtl)
// TODO add check for RegistryMgr and ExecutionMgr
}