Implement immediate trigger and the methods of WatchList

This commit is contained in:
Wenkai Yin 2017-11-21 13:08:09 +08:00
parent 59c1160edd
commit 6b0ee138e5
18 changed files with 435 additions and 64 deletions

View File

@ -184,6 +184,17 @@ create table replication_job (
INDEX policy (policy_id),
INDEX poid_uptime (policy_id, update_time)
);
create table replication_immediate_trigger (
id int NOT NULL AUTO_INCREMENT,
policy_id int NOT NULL,
namespace varchar(256) NOT NULL,
on_push tinyint(1) NOT NULL DEFAULT 0,
on_deletion tinyint(1) NOT NULL DEFAULT 0,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
create table img_scan_job (
id int NOT NULL AUTO_INCREMENT,

View File

@ -176,6 +176,16 @@ create table replication_job (
update_time timestamp default CURRENT_TIMESTAMP
);
create table replication_immediate_trigger (
id INTEGER PRIMARY KEY,
policy_id int NOT NULL,
namespace varchar(256) NOT NULL,
on_push tinyint(1) NOT NULL DEFAULT 0,
on_deletion tinyint(1) NOT NULL DEFAULT 0,
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp default CURRENT_TIMESTAMP
);
create table img_scan_job (
id INTEGER PRIMARY KEY,

View File

@ -0,0 +1,62 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"time"
"github.com/vmware/harbor/src/common/models"
)
// DefaultDatabaseWatchItemDAO is an instance of DatabaseWatchItemDAO
var DefaultDatabaseWatchItemDAO WatchItemDAO = &DatabaseWatchItemDAO{}
// WatchItemDAO defines operations about WatchItem
type WatchItemDAO interface {
Add(*models.WatchItem) (int64, error)
DeleteByPolicyID(int64) error
Get(namespace, operation string) ([]models.WatchItem, error)
}
// DatabaseWatchItemDAO implements interface WatchItemDAO for database
type DatabaseWatchItemDAO struct{}
// Add a WatchItem
func (d *DatabaseWatchItemDAO) Add(item *models.WatchItem) (int64, error) {
now := time.Now()
item.CreationTime = now
item.UpdateTime = now
return GetOrmer().Insert(item)
}
// DeleteByPolicyID deletes the WatchItem specified by policy ID
func (d *DatabaseWatchItemDAO) DeleteByPolicyID(policyID int64) error {
_, err := GetOrmer().QueryTable(&models.WatchItem{}).Filter("PolicyID", policyID).Delete()
return err
}
// Get returns WatchItem list according to the namespace and operation
func (d *DatabaseWatchItemDAO) Get(namespace, operation string) ([]models.WatchItem, error) {
qs := GetOrmer().QueryTable(&models.WatchItem{}).Filter("Namespace", namespace)
if operation == "push" {
qs = qs.Filter("OnPush", 1)
} else if operation == "delete" {
qs = qs.Filter("OnDeletion", 1)
}
items := []models.WatchItem{}
_, err := qs.All(&items)
return items, err
}

View File

@ -0,0 +1,71 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware/harbor/src/common/models"
)
func TestMethodsOfWatchItem(t *testing.T) {
targetID, err := AddRepTarget(models.RepTarget{
Name: "test_target_for_watch_item",
URL: "http://127.0.0.1",
})
require.Nil(t, err)
defer DeleteRepTarget(targetID)
policyID, err := AddRepPolicy(models.RepPolicy{
Name: "test_policy_for_watch_item",
ProjectID: 1,
TargetID: targetID,
})
require.Nil(t, err)
defer DeleteRepPolicy(policyID)
item := &models.WatchItem{
PolicyID: policyID,
Namespace: "library",
OnPush: false,
OnDeletion: true,
}
// test Add
id, err := DefaultDatabaseWatchItemDAO.Add(item)
require.Nil(t, err)
// test Get: operation-push
items, err := DefaultDatabaseWatchItemDAO.Get("library", "push")
require.Nil(t, err)
assert.Equal(t, 0, len(items))
// test Get: operation-delete
items, err = DefaultDatabaseWatchItemDAO.Get("library", "delete")
require.Nil(t, err)
assert.Equal(t, 1, len(items))
assert.Equal(t, id, items[0].ID)
assert.Equal(t, "library", items[0].Namespace)
assert.True(t, items[0].OnDeletion)
// test DeleteByPolicyID
err = DefaultDatabaseWatchItemDAO.DeleteByPolicyID(policyID)
require.Nil(t, err)
items, err = DefaultDatabaseWatchItemDAO.Get("library", "delete")
require.Nil(t, err)
assert.Equal(t, 0, len(items))
}

View File

@ -29,5 +29,6 @@ func init() {
new(ScanJob),
new(RepoRecord),
new(ImgScanOverview),
new(ClairVulnTimestamp))
new(ClairVulnTimestamp),
new(WatchItem))
}

View File

@ -0,0 +1,35 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"time"
)
// WatchItem ...
type WatchItem struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
Namespace string `orm:"column(namespace)" json:"namespace"`
OnDeletion bool `orm:"column(on_deletion)" json:"on_deletion"`
OnPush bool `orm:"column(on_push)" json:"on_push"`
CreationTime time.Time `orm:"column(creation_time)" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time)" json:"update_time"`
}
//TableName ...
func (w *WatchItem) TableName() string {
return "replication_immediate_trigger"
}

View File

@ -8,22 +8,16 @@ const (
//FilterItemKindTag : Kind of filter item is 'tag'
FilterItemKindTag = "tag"
//TODO: Refactor constants
//TriggerKindManually : kind of trigger is 'manully'
TriggerKindManually = "manually"
//TriggerKindImmediately : kind of trigger is 'immediately'
TriggerKindImmediately = "immediately"
//AdaptorKindHarbor : Kind of adaptor of Harbor
AdaptorKindHarbor = "Harbor"
//TriggerKindImmediate : Kind of trigger is 'Immediate'
TriggerKindImmediate = "Immediate"
TriggerKindImmediate = "immediate"
//TriggerKindSchedule : Kind of trigger is 'Schedule'
TriggerKindSchedule = "Schedule"
TriggerKindSchedule = "schedule"
//TriggerKindManual : Kind of trigger is 'Manual'
TriggerKindManual = "Manual"
TriggerKindManual = "manual"
//TriggerScheduleDaily : type of scheduling is 'daily'
TriggerScheduleDaily = "daily"
//TriggerScheduleWeekly : type of scheduling is 'weekly'

View File

@ -55,7 +55,6 @@ func (ctl *Controller) Init() error {
//Build query parameters
triggerNames := []string{
replication.TriggerKindImmediate,
replication.TriggerKindSchedule,
}
queryName := ""
@ -73,7 +72,7 @@ func (ctl *Controller) Init() error {
}
if policies != nil && len(policies) > 0 {
for _, policy := range policies {
if err := ctl.triggerManager.SetupTrigger(policy.ID, *policy.Trigger); err != nil {
if err := ctl.triggerManager.SetupTrigger(&policy); err != nil {
//TODO: Log error
fmt.Printf("Error: %s", err)
//TODO:Update the status of policy
@ -96,7 +95,8 @@ func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) (int64,
return 0, err
}
if err = ctl.triggerManager.SetupTrigger(id, *newPolicy.Trigger); err != nil {
newPolicy.ID = id
if err = ctl.triggerManager.SetupTrigger(&newPolicy); err != nil {
return 0, err
}
@ -118,15 +118,34 @@ func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) erro
return fmt.Errorf("policy %d not found", id)
}
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil {
return err
reset := false
if updatedPolicy.Trigger.Kind != originPolicy.Trigger.Kind {
reset = true
} else {
switch updatedPolicy.Trigger.Kind {
case replication.TriggerKindSchedule:
if updatedPolicy.Trigger.Param != originPolicy.Trigger.Param {
reset = true
}
case replication.TriggerKindImmediate:
// Always reset immediate trigger as it is relevent with namespaces
reset = true
default:
// manual trigger, no need to reset
}
}
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
return err
if reset {
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil {
return err
}
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
return err
}
return ctl.triggerManager.SetupTrigger(&updatedPolicy)
}
return ctl.triggerManager.SetupTrigger(id, *updatedPolicy.Trigger)
return ctl.policyManager.UpdatePolicy(updatedPolicy)
}
//RemovePolicy will remove the specified policy and clean the related settings

View File

@ -14,6 +14,7 @@ type ReplicationPolicy struct {
Trigger *Trigger //The trigger of the replication
ProjectIDs []int64 //Projects attached to this policy
TargetIDs []int64
Namespaces []string // The namespaces are used to set immediate trigger
CreationTime time.Time
UpdateTime time.Time
}

View File

@ -18,8 +18,8 @@ type Trigger struct {
// Valid ...
func (t *Trigger) Valid(v *validation.Validation) {
if !(t.Kind == replication.TriggerKindImmediately ||
t.Kind == replication.TriggerKindManually ||
if !(t.Kind == replication.TriggerKindImmediate ||
t.Kind == replication.TriggerKindManual ||
t.Kind == replication.TriggerKindSchedule) {
v.SetError("kind", fmt.Sprintf("invalid trigger kind: %s", t.Kind))
}

View File

@ -1,8 +1,6 @@
package trigger
import (
"errors"
"github.com/vmware/harbor/src/replication"
)
@ -26,19 +24,20 @@ func (st *ImmediateTrigger) Kind() string {
//Setup is the implementation of same method defined in Trigger interface
func (st *ImmediateTrigger) Setup() error {
if st.params.PolicyID <= 0 || len(st.params.Namespace) == 0 {
return errors.New("Invalid parameters for Immediate trigger")
}
//TODO: Need more complicated logic here to handle partial updates
wt := WatchItem{
PolicyID: st.params.PolicyID,
Namespace: st.params.Namespace,
OnDeletion: st.params.OnDeletion,
OnPush: true,
}
for _, namespace := range st.params.Namespaces {
wt := WatchItem{
PolicyID: st.params.PolicyID,
Namespace: namespace,
OnDeletion: st.params.OnDeletion,
OnPush: true,
}
return DefaultWatchList.Add(wt)
if err := DefaultWatchList.Add(wt); err != nil {
return err
}
}
return nil
}
//Unset is the implementation of same method defined in Trigger interface

View File

@ -2,7 +2,9 @@ package trigger
import (
"errors"
"fmt"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
)
@ -50,25 +52,24 @@ func (m *Manager) RemoveTrigger(policyID int64) error {
}
*/
//SetupTrigger will create the new trigger based on the provided json parameters.
//SetupTrigger will create the new trigger based on the provided policy.
//If failed, an error will be returned.
func (m *Manager) SetupTrigger(policyID int64, trigger models.Trigger) error {
if policyID <= 0 {
return errors.New("Invalid policy ID")
}
if len(trigger.Kind) == 0 {
return errors.New("Invalid replication trigger definition")
func (m *Manager) SetupTrigger(policy *models.ReplicationPolicy) error {
if policy == nil || policy.Trigger == nil {
log.Debug("empty policy or trigger, skip trigger setup")
return nil
}
trigger := policy.Trigger
switch trigger.Kind {
case replication.TriggerKindSchedule:
param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion
newTrigger := NewScheduleTrigger(param)
if err := newTrigger.Setup(); err != nil {
@ -79,16 +80,19 @@ func (m *Manager) SetupTrigger(policyID int64, trigger models.Trigger) error {
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
//Append policy ID and whether replicate deletion
param.PolicyID = policy.ID
param.OnDeletion = policy.ReplicateDeletion
param.Namespaces = policy.Namespaces
newTrigger := NewImmediateTrigger(param)
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindManual:
// do nothing
default:
//Treat as manual trigger
break
return fmt.Errorf("invalid trigger type: %s", policy.Trigger.Kind)
}
return nil

View File

@ -1,9 +1,5 @@
package trigger
import (
"errors"
)
//NOTES: Whether replicate the existing images when the type of trigger is
//'Immediate' is a once-effective setting which will not be persisted
// and kept as one parameter of 'Immediate' trigger. It will only be
@ -14,13 +10,13 @@ type ImmediateParam struct {
//Basic parameters
BasicParam
//Namepace
Namespace string
//Namepaces
Namespaces []string
}
//Parse is the implementation of same method in TriggerParam interface
//NOTES: No need to implement this method for 'Immediate' trigger as
//it does not have any parameters with json format.
func (ip ImmediateParam) Parse(param string) error {
return errors.New("Should NOT be called as it's not implemented")
return nil
}

View File

@ -1,5 +1,10 @@
package trigger
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
)
//DefaultWatchList is the default instance of WatchList
var DefaultWatchList = &WatchList{}
@ -24,15 +29,37 @@ type WatchItem struct {
//Add item to the list and persist into DB
func (wl *WatchList) Add(item WatchItem) error {
return nil
_, err := dao.DefaultDatabaseWatchItemDAO.Add(
&models.WatchItem{
PolicyID: item.PolicyID,
Namespace: item.Namespace,
OnPush: item.OnPush,
OnDeletion: item.OnDeletion,
})
return err
}
//Remove the specified watch item from list
func (wl *WatchList) Remove(policyID int64) error {
return nil
return dao.DefaultDatabaseWatchItemDAO.DeleteByPolicyID(policyID)
}
//Get the watch items according to the namespace and operation
func (wl *WatchList) Get(namespace, operation string) ([]WatchItem, error) {
return []WatchItem{}, nil
items, err := dao.DefaultDatabaseWatchItemDAO.Get(namespace, operation)
if err != nil {
return nil, err
}
watchItems := []WatchItem{}
for _, item := range items {
watchItems = append(watchItems, WatchItem{
PolicyID: item.PolicyID,
Namespace: item.Namespace,
OnPush: item.OnPush,
OnDeletion: item.OnDeletion,
})
}
return watchItems, nil
}

View File

@ -0,0 +1,108 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trigger
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
)
type fakeWatchItemDAO struct {
items []models.WatchItem
}
func (f *fakeWatchItemDAO) Add(item *models.WatchItem) (int64, error) {
f.items = append(f.items, *item)
return int64(len(f.items) + 1), nil
}
// Delete the WatchItem specified by policy ID
func (f *fakeWatchItemDAO) DeleteByPolicyID(policyID int64) error {
for i, item := range f.items {
if item.PolicyID == policyID {
f.items = append(f.items[:i], f.items[i+1:]...)
break
}
}
return nil
}
// Get returns WatchItem list according to the namespace and operation
func (f *fakeWatchItemDAO) Get(namespace, operation string) ([]models.WatchItem, error) {
items := []models.WatchItem{}
for _, item := range f.items {
if item.Namespace != namespace {
continue
}
if operation == "push" {
if item.OnPush {
items = append(items, item)
}
}
if operation == "delete" {
if item.OnDeletion {
items = append(items, item)
}
}
}
return items, nil
}
func TestMethodsOfWatchList(t *testing.T) {
dao.DefaultDatabaseWatchItemDAO = &fakeWatchItemDAO{}
var policyID int64 = 1
// test Add
item := WatchItem{
PolicyID: policyID,
Namespace: "library",
OnDeletion: true,
OnPush: false,
}
err := DefaultWatchList.Add(item)
require.Nil(t, err)
// test Get: non-exist namespace
items, err := DefaultWatchList.Get("non-exist-namespace", "delete")
require.Nil(t, err)
assert.Equal(t, 0, len(items))
// test Get: non-exist operation
items, err = DefaultWatchList.Get("library", "non-exist-operation")
require.Nil(t, err)
assert.Equal(t, 0, len(items))
// test Get: valid params
items, err = DefaultWatchList.Get("library", "delete")
require.Nil(t, err)
assert.Equal(t, 1, len(items))
assert.Equal(t, policyID, items[0].PolicyID)
// test Remove
err = DefaultWatchList.Remove(policyID)
require.Nil(t, err)
items, err = DefaultWatchList.Get("library", "delete")
require.Nil(t, err)
assert.Equal(t, 0, len(items))
}

View File

@ -113,15 +113,16 @@ func (pa *RepPolicyAPI) Post() {
// check the existence of projects
for _, project := range policy.Projects {
exist, err := pa.ProjectMgr.Exists(project.ProjectID)
pro, err := pa.ProjectMgr.Get(project.ProjectID)
if err != nil {
pa.ParseAndHandleError(fmt.Sprintf("failed to check the existence of project %d", project.ProjectID), err)
return
}
if !exist {
if pro == nil {
pa.HandleNotFound(fmt.Sprintf("project %d not found", project.ProjectID))
return
}
project.Name = pro.Name
}
// check the existence of targets
@ -168,6 +169,34 @@ func (pa *RepPolicyAPI) Put() {
policy.ID = id
// check the existence of projects
for _, project := range policy.Projects {
pro, err := pa.ProjectMgr.Get(project.ProjectID)
if err != nil {
pa.ParseAndHandleError(fmt.Sprintf("failed to check the existence of project %d", project.ProjectID), err)
return
}
if pro == nil {
pa.HandleNotFound(fmt.Sprintf("project %d not found", project.ProjectID))
return
}
project.Name = pro.Name
}
// check the existence of targets
for _, target := range policy.Targets {
t, err := dao.GetRepTarget(target.ID)
if err != nil {
pa.HandleInternalServerError(fmt.Sprintf("failed to get target %d: %v", target.ID, err))
return
}
if t == nil {
pa.HandleNotFound(fmt.Sprintf("target %d not found", target.ID))
return
}
}
if err = core.DefaultController.UpdatePolicy(convertToRepPolicy(policy)); err != nil {
pa.HandleInternalServerError(fmt.Sprintf("failed to update policy %d: %v", id, err))
return
@ -274,6 +303,7 @@ func convertToRepPolicy(policy *api_models.ReplicationPolicy) rep_models.Replica
for _, project := range policy.Projects {
ply.ProjectIDs = append(ply.ProjectIDs, project.ProjectID)
ply.Namespaces = append(ply.Namespaces, project.Name)
}
for _, target := range policy.Targets {

View File

@ -189,7 +189,7 @@ func TestRepPolicyAPIPost(t *testing.T) {
},
},
Trigger: &rep_models.Trigger{
Kind: replication.TriggerKindManually,
Kind: replication.TriggerKindManual,
},
},
credential: sysAdmin,
@ -220,7 +220,7 @@ func TestRepPolicyAPIPost(t *testing.T) {
},
},
Trigger: &rep_models.Trigger{
Kind: replication.TriggerKindManually,
Kind: replication.TriggerKindManual,
},
},
credential: sysAdmin,
@ -251,7 +251,7 @@ func TestRepPolicyAPIPost(t *testing.T) {
},
},
Trigger: &rep_models.Trigger{
Kind: replication.TriggerKindManually,
Kind: replication.TriggerKindManual,
},
},
credential: sysAdmin,
@ -412,7 +412,7 @@ func TestRepPolicyAPIPut(t *testing.T) {
},
},
Trigger: &rep_models.Trigger{
Kind: replication.TriggerKindImmediately,
Kind: replication.TriggerKindImmediate,
},
},
credential: sysAdmin,
@ -477,6 +477,7 @@ func TestConvertToRepPolicy(t *testing.T) {
Projects: []*models.Project{
&models.Project{
ProjectID: 1,
Name: "library",
},
},
Targets: []*models.RepTarget{
@ -501,6 +502,7 @@ func TestConvertToRepPolicy(t *testing.T) {
Param: "{param}",
},
ProjectIDs: []int64{1},
Namespaces: []string{"library"},
TargetIDs: []int64{1},
},
},

View File

@ -61,3 +61,4 @@ Changelog for harbor database schema
- add column `filters` to table `replication_policy`
- add column `replicate_deletion` to table `replication_policy`
- create table `replication_immediate_trigger`