Define the related interfaces for triggers and core controllers of replication service

This commit is contained in:
Steven Zou 2017-11-09 22:27:54 +08:00
parent 41c0ff66ce
commit c2e0c8d1f2
22 changed files with 1020 additions and 18 deletions

View File

@ -0,0 +1,13 @@
package notifier
import (
"github.com/vmware/harbor/src/replication/event"
)
//Subscribe related topics
func init() {
//Listen the related event topics
Subscribe(event.StartReplicationTopic, &event.StartReplicationHandler{})
Subscribe(event.ReplicationEventTopicOnPush, &event.OnPushHandler{})
Subscribe(event.ReplicationEventTopicOnDeletion, &event.OnDeletionHandler{})
}

View File

@ -39,6 +39,7 @@ func (fsh *fakeStatelessHandler) Handle(v interface{}) error {
} }
func TestSubscribeAndUnSubscribe(t *testing.T) { func TestSubscribeAndUnSubscribe(t *testing.T) {
count := len(notificationWatcher.handlers)
err := Subscribe("topic1", &fakeStatefulHandler{0}) err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -59,7 +60,7 @@ func TestSubscribeAndUnSubscribe(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(notificationWatcher.handlers) != 2 { if len(notificationWatcher.handlers) != (count + 2) {
t.Fail() t.Fail()
} }
@ -94,7 +95,7 @@ func TestSubscribeAndUnSubscribe(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(notificationWatcher.handlers) != 1 { if len(notificationWatcher.handlers) != (count + 1) {
t.Fail() t.Fail()
} }
@ -103,12 +104,13 @@ func TestSubscribeAndUnSubscribe(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(notificationWatcher.handlers) != 0 { if len(notificationWatcher.handlers) != count {
t.Fail() t.Fail()
} }
} }
func TestPublish(t *testing.T) { func TestPublish(t *testing.T) {
count := len(notificationWatcher.handlers)
err := Subscribe("topic1", &fakeStatefulHandler{0}) err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -119,7 +121,7 @@ func TestPublish(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(notificationWatcher.handlers) != 2 { if len(notificationWatcher.handlers) != (count + 2) {
t.Fail() t.Fail()
} }
@ -149,12 +151,13 @@ func TestPublish(t *testing.T) {
} }
func TestConcurrentPublish(t *testing.T) { func TestConcurrentPublish(t *testing.T) {
count := len(notificationWatcher.handlers)
err := Subscribe("topic1", &fakeStatefulHandler{0}) err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(notificationWatcher.handlers) != 1 { if len(notificationWatcher.handlers) != (count + 1) {
t.Fail() t.Fail()
} }
@ -186,11 +189,12 @@ func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) {
t.Fatal("Policy scheduler is not started") t.Fatal("Policy scheduler is not started")
} }
count := len(notificationWatcher.handlers)
if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil { if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
if len(notificationWatcher.handlers) != 1 { if len(notificationWatcher.handlers) != (count + 1) {
t.Fatal("Handler is not registered") t.Fatalf("Handler is not registered")
} }
utcTime := time.Now().UTC().Unix() utcTime := time.Now().UTC().Unix()
@ -209,7 +213,7 @@ func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) {
t.Fatal(err.Error()) t.Fatal(err.Error())
} }
if len(notificationWatcher.handlers) != 0 { if len(notificationWatcher.handlers) != count {
t.Fatal("Handler is not unregistered") t.Fatal("Handler is not unregistered")
} }

View File

@ -8,13 +8,24 @@ const (
//FilterItemKindTag : Kind of filter item is 'tag' //FilterItemKindTag : Kind of filter item is 'tag'
FilterItemKindTag = "tag" FilterItemKindTag = "tag"
//TODO: Refactor constants
//TriggerKindManually : kind of trigger is 'manully' //TriggerKindManually : kind of trigger is 'manully'
TriggerKindManually = "manually" TriggerKindManually = "manually"
//TriggerKindSchedule : kind of trigger is 'schedule'
TriggerKindSchedule = "schedule"
//TriggerKindImmediately : kind of trigger is 'immediately' //TriggerKindImmediately : kind of trigger is 'immediately'
TriggerKindImmediately = "immediately" TriggerKindImmediately = "immediately"
//AdaptorKindHarbor : Kind of adaptor of Harbor //AdaptorKindHarbor : Kind of adaptor of Harbor
AdaptorKindHarbor = "Harbor" AdaptorKindHarbor = "Harbor"
//TriggerKindImmediate : Kind of trigger is 'Immediate'
TriggerKindImmediate = "Immediate"
//TriggerKindSchedule : Kind of trigger is 'Schedule'
TriggerKindSchedule = "Schedule"
//TriggerKindManual : Kind of trigger is 'Manual'
TriggerKindManual = "Manual"
//TriggerScheduleDaily : type of scheduling is 'daily'
TriggerScheduleDaily = "daily"
//TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly"
) )

View File

@ -0,0 +1,121 @@
package core
import (
"fmt"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/policy"
"github.com/vmware/harbor/src/replication/source"
"github.com/vmware/harbor/src/replication/trigger"
)
//Controller is core module to cordinate and control the overall workflow of the
//replication modules.
type Controller struct {
//Indicate whether the controller has been initialized or not
initialized bool
//Manage the policies
policyManager *policy.Manager
//Handle the things related with source
sourcer *source.Sourcer
//Manage the triggers of policies
triggerManager *trigger.Manager
}
//Keep controller as singleton instance
var (
DefaultController = NewController(ControllerConfig{}) //Use default data
)
//ControllerConfig includes related configurations required by the controller
type ControllerConfig struct {
//The capacity of the cache storing enabled triggers
CacheCapacity int
}
//NewController is the constructor of Controller.
func NewController(config ControllerConfig) *Controller {
//Controller refer the default instances
return &Controller{
policyManager: policy.NewManager(),
sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(config.CacheCapacity),
}
}
//Init will initialize the controller and the sub components
func (ctl *Controller) Init() error {
if ctl.initialized {
return nil
}
//Build query parameters
triggerNames := []string{
replication.TriggerKindImmediate,
replication.TriggerKindSchedule,
}
queryName := ""
for _, name := range triggerNames {
queryName = fmt.Sprintf("%s,%s", queryName, name)
}
//Enable the triggers
query := models.QueryParameter{
TriggerName: queryName,
}
policies := ctl.policyManager.GetPolicies(query)
if policies != nil && len(policies) > 0 {
for _, policy := range policies {
if err := ctl.triggerManager.SetupTrigger(policy.ID, policy.Trigger); err != nil {
//TODO: Log error
fmt.Printf("Error: %s", err)
//TODO:Update the status of policy
}
}
}
//Initialize sourcer
ctl.sourcer.Init()
ctl.initialized = true
return nil
}
//CreatePolicy is used to create a new policy and enable it if necessary
func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) error {
//Validate policy
//TODO:
return nil
}
//UpdatePolicy will update the policy with new content.
//Parameter updatedPolicy must have the ID of the updated policy.
func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error {
return nil
}
//RemovePolicy will remove the specified policy and clean the related settings
func (ctl *Controller) RemovePolicy(policyID int) error {
return nil
}
//GetPolicy is delegation of GetPolicy of Policy.Manager
func (ctl *Controller) GetPolicy(policyID int) models.ReplicationPolicy {
return models.ReplicationPolicy{}
}
//GetPolicies is delegation of GetPolicies of Policy.Manager
func (ctl *Controller) GetPolicies(query models.QueryParameter) []models.ReplicationPolicy {
return nil
}
//Replicate starts one replication defined in the specified policy;
//Can be launched by the API layer and related triggers.
func (ctl *Controller) Replicate(policyID int) error {
return nil
}

View File

@ -0,0 +1,57 @@
package event
import (
"errors"
"fmt"
"reflect"
"github.com/vmware/harbor/src/replication/core"
"github.com/vmware/harbor/src/replication/models"
)
//OnDeletionHandler implements the notification handler interface to handle image on push event.
type OnDeletionHandler struct{}
//OnDeletionNotification contains the data required by this handler
type OnDeletionNotification struct {
//The name of the project where the being pushed images are located
ProjectName string
}
//Handle implements the same method of notification handler interface
func (oph *OnDeletionHandler) Handle(value interface{}) error {
if value == nil {
return errors.New("OnDeletionHandler can not handle nil value")
}
vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "event.OnDeletionNotification" {
return fmt.Errorf("Mismatch value type of OnDeletionHandler, expect %s but got %s", "event.OnDeletionNotification", vType.String())
}
notification := value.(OnDeletionNotification)
//TODO:Call projectManager to get the projectID
fmt.Println(notification.ProjectName)
query := models.QueryParameter{
ProjectID: 0,
}
policies := core.DefaultController.GetPolicies(query)
if policies != nil && len(policies) > 0 {
for _, p := range policies {
//Error accumulated and then return?
if err := core.DefaultController.Replicate(p.ID); err != nil {
//TODO:Log error
fmt.Println(err.Error())
}
}
}
return nil
}
//IsStateful implements the same method of notification handler interface
func (oph *OnDeletionHandler) IsStateful() bool {
//Statless
return false
}

View File

@ -0,0 +1,56 @@
package event
import (
"errors"
"fmt"
"reflect"
"github.com/vmware/harbor/src/replication/core"
"github.com/vmware/harbor/src/replication/models"
)
//OnPushHandler implements the notification handler interface to handle image on push event.
type OnPushHandler struct{}
//OnPushNotification contains the data required by this handler
type OnPushNotification struct {
//The ID of the project where the being pushed images are located
ProjectID int
}
//Handle implements the same method of notification handler interface
func (oph *OnPushHandler) Handle(value interface{}) error {
if value == nil {
return errors.New("OnPushHandler can not handle nil value")
}
vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "event.OnPushNotification" {
return fmt.Errorf("Mismatch value type of OnPushHandler, expect %s but got %s", "event.OnPushNotification", vType.String())
}
notification := value.(OnDeletionNotification)
//TODO:Call projectManager to get the projectID
fmt.Println(notification.ProjectName)
query := models.QueryParameter{
ProjectID: 0,
}
policies := core.DefaultController.GetPolicies(query)
if policies != nil && len(policies) > 0 {
for _, p := range policies {
if err := core.DefaultController.Replicate(p.ID); err != nil {
//TODO:Log error
fmt.Println(err.Error())
}
}
}
return nil
}
//IsStateful implements the same method of notification handler interface
func (oph *OnPushHandler) IsStateful() bool {
//Statless
return false
}

View File

@ -0,0 +1,45 @@
package event
import (
"errors"
"fmt"
"reflect"
"github.com/vmware/harbor/src/replication/core"
)
//StartReplicationHandler implements the notification handler interface to handle start replication requests.
type StartReplicationHandler struct{}
//StartReplicationNotification contains data required by this handler
type StartReplicationNotification struct {
//ID of the policy
PolicyID int
}
//Handle implements the same method of notification handler interface
func (srh *StartReplicationHandler) Handle(value interface{}) error {
if value == nil {
return errors.New("StartReplicationHandler can not handle nil value")
}
vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "core.StartReplicationNotification" {
return fmt.Errorf("Mismatch value type of StartReplicationHandler, expect %s but got %s", "core.StartReplicationNotification", vType.String())
}
notification := value.(StartReplicationNotification)
if notification.PolicyID <= 0 {
return errors.New("Invalid policy")
}
//Start replication
//TODO:
return core.DefaultController.Replicate(notification.PolicyID)
}
//IsStateful implements the same method of notification handler interface
func (srh *StartReplicationHandler) IsStateful() bool {
//Stateless
return false
}

View File

@ -0,0 +1,12 @@
package event
const (
//ReplicationEventTopicOnPush : OnPush event
ReplicationEventTopicOnPush = "OnPush"
//ReplicationEventTopicOnDeletion : OnDeletion event
ReplicationEventTopicOnDeletion = "OnDeletion"
//StartReplicationTopic : Start application request
StartReplicationTopic = "StartReplication"
)

View File

@ -0,0 +1,28 @@
package models
//ReplicationPolicy defines the structure of a replication policy.
type ReplicationPolicy struct {
//UUID of the policy
ID int
//Projects attached to this policy
RelevantProjects []int
//The trigger of the replication
Trigger Trigger
}
//QueryParameter defines the parameters used to do query selection.
type QueryParameter struct {
//Query by page, couple with pageSize
Page int
//Size of each page, couple with page
PageSize int
//Query by the name of trigger
TriggerName string
//Query by project ID
ProjectID int
}

View File

@ -0,0 +1,10 @@
package models
//Trigger is replication launching approach definition
type Trigger struct {
//The name of the trigger
Name string
//The parameters with json text format required by the trigger
Param string
}

View File

@ -0,0 +1,42 @@
package policy
import (
"github.com/vmware/harbor/src/replication/models"
)
//Manager provides replication policy CURD capabilities.
type Manager struct{}
//NewManager is the constructor of Manager.
func NewManager() *Manager {
return &Manager{}
}
//GetPolicies returns all the policies
func (m *Manager) GetPolicies(query models.QueryParameter) []models.ReplicationPolicy {
return []models.ReplicationPolicy{}
}
//GetPolicy returns the policy with the specified ID
func (m *Manager) GetPolicy(policyID int) models.ReplicationPolicy {
return models.ReplicationPolicy{}
}
//CreatePolicy creates a new policy with the provided data;
//If creating failed, error will be returned;
//If creating succeed, ID of the new created policy will be returned.
func (m *Manager) CreatePolicy(policy models.ReplicationPolicy) (int, error) {
return 0, nil
}
//UpdatePolicy updates the policy;
//If updating failed, error will be returned.
func (m *Manager) UpdatePolicy(policy models.ReplicationPolicy) error {
return nil
}
//RemovePolicy removes the specified policy;
//If removing failed, error will be returned.
func (m *Manager) RemovePolicy(policyID int) error {
return nil
}

View File

@ -12,9 +12,6 @@ type Sourcer struct {
adaptors map[string]registry.Adaptor adaptors map[string]registry.Adaptor
} }
//ReplicationSourcer is default sourcer for replication service.
var ReplicationSourcer = NewSourcer()
//NewSourcer is the constructor of Sourcer //NewSourcer is the constructor of Sourcer
func NewSourcer() *Sourcer { func NewSourcer() *Sourcer {
return &Sourcer{ return &Sourcer{
@ -37,8 +34,3 @@ func (sc *Sourcer) GetAdaptor(kind string) registry.Adaptor {
return sc.adaptors[kind] return sc.adaptors[kind]
} }
//Init the adaptors
func Init() {
ReplicationSourcer.Init()
}

View File

@ -0,0 +1,212 @@
package trigger
import (
"container/heap"
"fmt"
"sync"
"time"
)
const (
//The max count of items the cache can keep
defaultCapacity = 1000
)
//Item keeps more metadata of the triggers which are stored in the heap.
type Item struct {
//Which policy the trigger belong to
policyID int
//Frequency of cache querying
//First compration factor
frequency int
//The timestamp of being put into heap
//Second compration factor
timestamp int64
//The index in the heap
index int
}
//MetaQueue implements heap.Interface and holds items which are metadata of trigger
type MetaQueue []*Item
//Len return the size of the queue
func (mq MetaQueue) Len() int {
return len(mq)
}
//Less is a comparator of heap
func (mq MetaQueue) Less(i, j int) bool {
return mq[i].frequency < mq[j].frequency ||
(mq[i].frequency == mq[j].frequency &&
mq[i].timestamp < mq[j].timestamp)
}
//Swap the items to rebuild heap
func (mq MetaQueue) Swap(i, j int) {
mq[i], mq[j] = mq[j], mq[i]
mq[i].index = i
mq[j].index = j
}
//Push item into heap
func (mq *MetaQueue) Push(x interface{}) {
item := x.(*Item)
n := len(*mq)
item.index = n
item.timestamp = time.Now().UTC().UnixNano()
*mq = append(*mq, item)
}
//Pop smallest item from heap
func (mq *MetaQueue) Pop() interface{} {
old := *mq
n := len(old)
item := old[n-1] //Smallest item
item.index = -1 //For safety
*mq = old[:n-1]
return item
}
//Update the frequency of item
func (mq *MetaQueue) Update(item *Item) {
item.frequency++
heap.Fix(mq, item.index)
}
//CacheItem is the data stored in the cache.
//It contains trigger and heap item references.
type CacheItem struct {
//The trigger reference
trigger Interface
//The heap item reference
item *Item
}
//Cache is used to cache the enabled triggers with specified capacity.
//If exceed the capacity, cached items will be adjusted with the following rules:
// The item with least usage frequency will be replaced;
// If multiple items with same usage frequency, the oldest one will be replaced.
type Cache struct {
//The max count of items this cache can keep
capacity int
//Lock to handle concurrent case
lock *sync.RWMutex
//Hash map for quick locating cached item
hash map[string]CacheItem
//Heap for quick locating the trigger with least usage
queue *MetaQueue
}
//NewCache is constructor of cache
func NewCache(capacity int) *Cache {
cap := capacity
if cap <= 0 {
cap = defaultCapacity
}
//Initialize heap
mq := make(MetaQueue, 0)
heap.Init(&mq)
return &Cache{
capacity: cap,
lock: new(sync.RWMutex),
hash: make(map[string]CacheItem),
queue: &mq,
}
}
//Get the trigger interface with the specified policy ID
func (c *Cache) Get(policyID int) Interface {
if policyID <= 0 {
return nil
}
c.lock.RLock()
defer c.lock.RUnlock()
k := c.key(policyID)
if cacheItem, ok := c.hash[k]; ok {
//Update frequency
c.queue.Update(cacheItem.item)
return cacheItem.trigger
}
return nil
}
//Put the item into cache with ID of ploicy as key
func (c *Cache) Put(policyID int, trigger Interface) {
if policyID <= 0 || trigger == nil {
return
}
c.lock.Lock()
defer c.lock.Unlock()
//Exceed the capacity?
if c.Size() >= c.capacity {
//Pop one for the new one
v := heap.Pop(c.queue)
item := v.(*Item)
//Remove from hash
delete(c.hash, c.key(item.policyID))
}
//Add to meta queue
item := &Item{
policyID: policyID,
frequency: 1,
}
heap.Push(c.queue, item)
//Cache
cacheItem := CacheItem{
trigger: trigger,
item: item,
}
k := c.key(policyID)
c.hash[k] = cacheItem
}
//Remove the trigger attached to the specified policy
func (c *Cache) Remove(policyID int) Interface {
if policyID > 0 {
c.lock.Lock()
defer c.lock.Unlock()
//If existing
k := c.key(policyID)
if cacheItem, ok := c.hash[k]; ok {
//Remove from heap
heap.Remove(c.queue, cacheItem.item.index)
//Remove from hash
delete(c.hash, k)
return cacheItem.trigger
}
}
return nil
}
//Size return the count of triggers in the cache
func (c *Cache) Size() int {
return len(c.hash)
}
//Generate a hash key with the policy ID
func (c *Cache) key(policyID int) string {
return fmt.Sprintf("trigger-%d", policyID)
}

View File

@ -0,0 +1,53 @@
package trigger
import "testing"
import "time"
func TestCache(t *testing.T) {
cache := NewCache(10)
trigger := NewImmediateTrigger(ImmediateParam{})
cache.Put(1, trigger)
if cache.Size() != 1 {
t.Fatalf("Invalid size, expect 1 but got %d", cache.Size())
}
tr := cache.Get(1)
if tr == nil {
t.Fatal("Should not get nil item")
}
tri := cache.Remove(1)
if tri == nil || cache.Size() > 0 {
t.Fatal("Failed to remove")
}
}
func TestCacheChange(t *testing.T) {
cache := NewCache(2)
trigger1 := NewImmediateTrigger(ImmediateParam{})
trigger2 := NewImmediateTrigger(ImmediateParam{})
cache.Put(1, trigger1)
cache.Put(2, trigger2)
if cache.Size() != 2 {
t.Fatalf("Invalid size, expect 2 but got %d", cache.Size())
}
if tr := cache.Get(2); tr == nil {
t.Fatal("Should not get nil item")
}
time.Sleep(100 * time.Microsecond)
trigger3 := NewImmediateTrigger(ImmediateParam{})
cache.Put(3, trigger3)
if cache.Size() != 2 {
t.Fatalf("Invalid size, expect 2 but got %d", cache.Size())
}
if tr := cache.Get(1); tr != nil {
t.Fatal("item1 should not exist")
}
}

View File

@ -0,0 +1,47 @@
package trigger
import (
"errors"
"github.com/vmware/harbor/src/replication"
)
//ImmediateTrigger will setup watcher at the image pushing action to fire
//replication event at pushing happening time.
type ImmediateTrigger struct {
params ImmediateParam
}
//NewImmediateTrigger is constructor of ImmediateTrigger
func NewImmediateTrigger(params ImmediateParam) *ImmediateTrigger {
return &ImmediateTrigger{
params: params,
}
}
//Kind is the implementation of same method defined in Trigger interface
func (st *ImmediateTrigger) Kind() string {
return replication.TriggerKindImmediate
}
//Setup is the implementation of same method defined in Trigger interface
func (st *ImmediateTrigger) Setup() error {
if st.params.PolicyID <= 0 || len(st.params.Namespace) == 0 {
return errors.New("Invalid parameters for Immediate trigger")
}
//TODO: Need more complicated logic here to handle partial updates
wt := WatchItem{
PolicyID: st.params.PolicyID,
Namespace: st.params.Namespace,
OnDeletion: st.params.OnDeletion,
OnPush: true,
}
return DefaultWatchList.Add(wt)
}
//Unset is the implementation of same method defined in Trigger interface
func (st *ImmediateTrigger) Unset() error {
return errors.New("Not implemented")
}

View File

@ -0,0 +1,13 @@
package trigger
//Interface is certian mechanism to know when fire the replication operation.
type Interface interface {
//Kind indicates what type of the trigger is.
Kind() string
//Setup/enable the trigger; if failed, an error would be returned.
Setup() error
//Remove/disable the trigger; if failed, an error would be returned.
Unset() error
}

View File

@ -0,0 +1,136 @@
package trigger
import (
"errors"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models"
)
//Manager provides unified methods to manage the triggers of policies;
//Cache the enabled triggers, setup/unset the trigger based on the parameters
//with json format.
type Manager struct {
//Cache for triggers
cache *Cache
}
//NewManager is the constructor of trigger manager.
//capacity is the max number of trigger references manager can keep in memory
func NewManager(capacity int) *Manager {
return &Manager{
cache: NewCache(capacity),
}
}
//GetTrigger returns the enabled trigger reference if existing in the cache.
func (m *Manager) GetTrigger(policyID int) Interface {
return m.cache.Get(policyID)
}
//RemoveTrigger will disable the trigger and remove it from the cache if existing.
func (m *Manager) RemoveTrigger(policyID int) error {
trigger := m.cache.Get(policyID)
if trigger == nil {
return errors.New("Trigger is not cached, please use UnsetTrigger to disable the trigger")
}
//Unset trigger
if err := trigger.Unset(); err != nil {
return err
}
//Remove from cache
//No need to check the return of remove because the dirty item cached in the cache
//will be removed out finally after a certain while
m.cache.Remove(policyID)
return nil
}
//SetupTrigger will create the new trigger based on the provided json parameters.
//If failed, an error will be returned.
func (m *Manager) SetupTrigger(policyID int, trigger models.Trigger) error {
if policyID <= 0 {
return errors.New("Invalid policy ID")
}
if len(trigger.Name) == 0 {
return errors.New("Invalid replication trigger definition")
}
switch trigger.Name {
case replication.TriggerKindSchedule:
param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewScheduleTrigger(param)
if err := newTrigger.Setup(); err != nil {
return err
}
case replication.TriggerKindImmediate:
param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewImmediateTrigger(param)
if err := newTrigger.Setup(); err != nil {
return err
}
default:
//Treat as manual trigger
break
}
return nil
}
//UnsetTrigger will disable the trigger which is not cached in the trigger cache.
func (m *Manager) UnsetTrigger(policyID int, trigger models.Trigger) error {
if policyID <= 0 {
return errors.New("Invalid policy ID")
}
if len(trigger.Name) == 0 {
return errors.New("Invalid replication trigger definition")
}
switch trigger.Name {
case replication.TriggerKindSchedule:
param := ScheduleParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewScheduleTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
case replication.TriggerKindImmediate:
param := ImmediateParam{}
if err := param.Parse(trigger.Param); err != nil {
return err
}
//Append policy ID info
param.PolicyID = policyID
newTrigger := NewImmediateTrigger(param)
if err := newTrigger.Unset(); err != nil {
return err
}
default:
//Treat as manual trigger
break
}
return nil
}

View File

@ -0,0 +1,26 @@
package trigger
import (
"errors"
)
//NOTES: Whether replicate the existing images when the type of trigger is
//'Immediate' is a once-effective setting which will not be persisted
// and kept as one parameter of 'Immediate' trigger. It will only be
//covered by the UI logic.
//ImmediateParam defines the parameter of immediate trigger
type ImmediateParam struct {
//Basic parameters
BasicParam
//Namepace
Namespace string
}
//Parse is the implementation of same method in TriggerParam interface
//NOTES: No need to implement this method for 'Immediate' trigger as
//it does not have any parameters with json format.
func (ip ImmediateParam) Parse(param string) error {
return errors.New("Should NOT be called as it's not implemented")
}

View File

@ -0,0 +1,30 @@
package trigger
import (
"encoding/json"
"errors"
)
//ScheduleParam defines the parameter of schedule trigger
type ScheduleParam struct {
//Basic parameters
BasicParam
//Daily or weekly
Type string
//Optional, only used when type is 'weekly'
Weekday int8
//The time offset with the UTC 00:00 in seconds
Offtime int64
}
//Parse is the implementation of same method in TriggerParam interface
func (stp ScheduleParam) Parse(param string) error {
if len(param) == 0 {
return errors.New("Parameter of schedule trigger should not be empty")
}
return json.Unmarshal([]byte(param), &stp)
}

View File

@ -0,0 +1,34 @@
package trigger
import (
"errors"
"github.com/vmware/harbor/src/replication"
)
//ScheduleTrigger will schedule a alternate policy to provide 'daily' and 'weekly' trigger ways.
type ScheduleTrigger struct {
params ScheduleParam
}
//NewScheduleTrigger is constructor of ScheduleTrigger
func NewScheduleTrigger(params ScheduleParam) *ScheduleTrigger {
return &ScheduleTrigger{
params: params,
}
}
//Kind is the implementation of same method defined in Trigger interface
func (st *ScheduleTrigger) Kind() string {
return replication.TriggerKindSchedule
}
//Setup is the implementation of same method defined in Trigger interface
func (st *ScheduleTrigger) Setup() error {
return errors.New("Not implemented")
}
//Unset is the implementation of same method defined in Trigger interface
func (st *ScheduleTrigger) Unset() error {
return errors.New("Not implemented")
}

View File

@ -0,0 +1,17 @@
package trigger
//BasicParam contains the general parameters for all triggers
type BasicParam struct {
//ID of the related policy
PolicyID int
//Whether delete remote replicated images if local ones are deleted
OnDeletion bool
}
//Parameter defines operation of doing initialization from parameter json text
type Parameter interface {
//Decode parameter with json style to the owner struct
//If failed, an error will be returned
Parse(param string) error
}

View File

@ -0,0 +1,43 @@
package trigger
//DefaultWatchList is the default instance of WatchList
var DefaultWatchList = &WatchList{}
//WatchList contains the items which should be evaluated for replication
//when image pushing or deleting happens.
type WatchList struct{}
//WatchItem keeps the related data for evaluation in WatchList.
type WatchItem struct {
//ID of policy
PolicyID int
//Corresponding namespace
Namespace string
//For deletion event
OnDeletion bool
//For pushing event
OnPush bool
}
//Add item to the list and persist into DB
func (wl *WatchList) Add(item WatchItem) error {
return nil
}
//Remove the specified watch item from list
func (wl *WatchList) Remove() WatchItem {
return WatchItem{}
}
//Update the watch item in the list
func (wl *WatchList) Update(updatedItem WatchItem) error {
return nil
}
//Get the specified watch item
func (wl *WatchList) Get(namespace string) WatchItem {
return WatchItem{}
}