Provide the function to update extra attributes in the task manager

Provide the function to update extra attributes in the task manager

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2020-12-24 10:37:32 +08:00 committed by Ziming
parent 7b4c4b76e7
commit 923a538570
6 changed files with 60 additions and 10 deletions

View File

@ -55,10 +55,11 @@ type Scheduler interface {
// The callback function needs to be registered first // The callback function needs to be registered first
// The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.), // The "vendorType" specifies the type of vendor (e.g. replication, scan, gc, retention, etc.),
// and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention). // and the "vendorID" specifies the ID of vendor if needed(e.g. policy ID for replication and retention).
// The "params" is passed to the callback function as encoded json string, so the callback // The "callbackFuncParams" is passed to the callback function as encoded json string, so the callback
// function must decode it before using // function must decode it before using
// The customized attributes can be put into the "extraAttrs"
Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string,
cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error) cron string, callbackFuncName string, callbackFuncParams interface{}, extraAttrs map[string]interface{}) (int64, error)
// UnScheduleByID the schedule specified by ID // UnScheduleByID the schedule specified by ID
UnScheduleByID(ctx context.Context, id int64) error UnScheduleByID(ctx context.Context, id int64) error
// UnScheduleByVendor the schedule specified by vendor // UnScheduleByVendor the schedule specified by vendor
@ -85,7 +86,7 @@ type scheduler struct {
} }
func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string,
cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error) { cron string, callbackFuncName string, callbackFuncParams interface{}, extraAttrs map[string]interface{}) (int64, error) {
if len(vendorType) == 0 { if len(vendorType) == 0 {
return 0, fmt.Errorf("empty vendor type") return 0, fmt.Errorf("empty vendor type")
} }
@ -108,13 +109,13 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in
UpdateTime: now, UpdateTime: now,
} }
paramsData, err := json.Marshal(params) paramsData, err := json.Marshal(callbackFuncParams)
if err != nil { if err != nil {
return 0, err return 0, err
} }
sched.CallbackFuncParam = string(paramsData) sched.CallbackFuncParam = string(paramsData)
extrasData, err := json.Marshal(extras) extrasData, err := json.Marshal(extraAttrs)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -145,3 +145,17 @@ func (_m *mockTaskManager) Stop(ctx context.Context, id int64) error {
return r0 return r0
} }
// UpdateExtraAttrs provides a mock function with given fields: ctx, id, extraAttrs
func (_m *mockTaskManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {
ret := _m.Called(ctx, id, extraAttrs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, map[string]interface{}) error); ok {
r0 = rf(ctx, id, extraAttrs)
} else {
r0 = ret.Error(0)
}
return r0
}

View File

@ -49,6 +49,8 @@ type Manager interface {
// List the tasks according to the query // List the tasks according to the query
// Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"'
List(ctx context.Context, query *q.Query) (tasks []*Task, err error) List(ctx context.Context, query *q.Query) (tasks []*Task, err error)
// Update the extra attributes of the specified task
UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) (err error)
// Get the log of the specified task // Get the log of the specified task
GetLog(ctx context.Context, id int64) (log []byte, err error) GetLog(ctx context.Context, id int64) (log []byte, err error)
// Count counts total of tasks according to the query. // Count counts total of tasks according to the query.
@ -210,6 +212,18 @@ func (m *manager) List(ctx context.Context, query *q.Query) ([]*Task, error) {
return ts, nil return ts, nil
} }
func (m *manager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {
data, err := json.Marshal(extraAttrs)
if err != nil {
return err
}
return m.dao.Update(ctx, &dao.Task{
ID: id,
ExtraAttrs: string(data),
UpdateTime: time.Time{},
}, "ExtraAttrs", "UpdateTime")
}
func (m *manager) GetLog(ctx context.Context, id int64) ([]byte, error) { func (m *manager) GetLog(ctx context.Context, id int64) ([]byte, error) {
task, err := m.dao.Get(ctx, id) task, err := m.dao.Get(ctx, id)
if err != nil { if err != nil {

View File

@ -125,6 +125,13 @@ func (t *taskManagerTestSuite) TestGet() {
t.dao.AssertExpectations(t.T()) t.dao.AssertExpectations(t.T())
} }
func (t *taskManagerTestSuite) TestUpdateExtraAttrs() {
t.dao.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := t.mgr.UpdateExtraAttrs(nil, 1, map[string]interface{}{})
t.Require().Nil(err)
t.dao.AssertExpectations(t.T())
}
func (t *taskManagerTestSuite) TestList() { func (t *taskManagerTestSuite) TestList() {
t.dao.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{ t.dao.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{ {

View File

@ -62,20 +62,20 @@ func (_m *Scheduler) ListSchedules(ctx context.Context, query *q.Query) ([]*sche
return r0, r1 return r0, r1
} }
// Schedule provides a mock function with given fields: ctx, vendorType, vendorID, cronType, cron, callbackFuncName, params, extras // Schedule provides a mock function with given fields: ctx, vendorType, vendorID, cronType, cron, callbackFuncName, callbackFuncParams, extraAttrs
func (_m *Scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error) { func (_m *Scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, cron string, callbackFuncName string, callbackFuncParams interface{}, extraAttrs map[string]interface{}) (int64, error) {
ret := _m.Called(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, params, extras) ret := _m.Called(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, callbackFuncParams, extraAttrs)
var r0 int64 var r0 int64
if rf, ok := ret.Get(0).(func(context.Context, string, int64, string, string, string, interface{}, map[string]interface{}) int64); ok { if rf, ok := ret.Get(0).(func(context.Context, string, int64, string, string, string, interface{}, map[string]interface{}) int64); ok {
r0 = rf(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, params, extras) r0 = rf(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, callbackFuncParams, extraAttrs)
} else { } else {
r0 = ret.Get(0).(int64) r0 = ret.Get(0).(int64)
} }
var r1 error var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, int64, string, string, string, interface{}, map[string]interface{}) error); ok { if rf, ok := ret.Get(1).(func(context.Context, string, int64, string, string, string, interface{}, map[string]interface{}) error); ok {
r1 = rf(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, params, extras) r1 = rf(ctx, vendorType, vendorID, cronType, cron, callbackFuncName, callbackFuncParams, extraAttrs)
} else { } else {
r1 = ret.Error(1) r1 = ret.Error(1)
} }

View File

@ -147,3 +147,17 @@ func (_m *Manager) Stop(ctx context.Context, id int64) error {
return r0 return r0
} }
// UpdateExtraAttrs provides a mock function with given fields: ctx, id, extraAttrs
func (_m *Manager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error {
ret := _m.Called(ctx, id, extraAttrs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, map[string]interface{}) error); ok {
r0 = rf(ctx, id, extraAttrs)
} else {
r0 = ret.Error(0)
}
return r0
}