From c2e0c8d1f2a20be526c1782b5ea58c1268c91a6d Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Thu, 9 Nov 2017 22:27:54 +0800 Subject: [PATCH] Define the related interfaces for triggers and core controllers of replication service --- src/common/notifier/init.go | 13 ++ src/common/notifier/notifier_test.go | 20 +- src/replication/consts.go | 15 +- src/replication/core/controller.go | 121 ++++++++++ src/replication/event/on_deletion_handler.go | 57 +++++ src/replication/event/on_push_handler.go | 56 +++++ .../event/start_replication_handler.go | 45 ++++ src/replication/event/topics.go | 12 + src/replication/models/policy.go | 28 +++ src/replication/models/trigger.go | 10 + src/replication/policy/manager.go | 42 ++++ src/replication/source/sourcer.go | 8 - src/replication/trigger/cache.go | 212 ++++++++++++++++++ src/replication/trigger/cache_test.go | 53 +++++ src/replication/trigger/immediate.go | 47 ++++ src/replication/trigger/interface.go | 13 ++ src/replication/trigger/manager.go | 136 +++++++++++ src/replication/trigger/param_immediate.go | 26 +++ src/replication/trigger/param_schedule.go | 30 +++ src/replication/trigger/schedule.go | 34 +++ src/replication/trigger/trigger_param.go | 17 ++ src/replication/trigger/watch_list.go | 43 ++++ 22 files changed, 1020 insertions(+), 18 deletions(-) create mode 100644 src/common/notifier/init.go create mode 100644 src/replication/core/controller.go create mode 100644 src/replication/event/on_deletion_handler.go create mode 100644 src/replication/event/on_push_handler.go create mode 100644 src/replication/event/start_replication_handler.go create mode 100644 src/replication/event/topics.go create mode 100644 src/replication/models/policy.go create mode 100644 src/replication/models/trigger.go create mode 100644 src/replication/policy/manager.go create mode 100644 src/replication/trigger/cache.go create mode 100644 src/replication/trigger/cache_test.go create mode 100644 src/replication/trigger/immediate.go create mode 100644 src/replication/trigger/interface.go create mode 100644 src/replication/trigger/manager.go create mode 100644 src/replication/trigger/param_immediate.go create mode 100644 src/replication/trigger/param_schedule.go create mode 100644 src/replication/trigger/schedule.go create mode 100644 src/replication/trigger/trigger_param.go create mode 100644 src/replication/trigger/watch_list.go diff --git a/src/common/notifier/init.go b/src/common/notifier/init.go new file mode 100644 index 000000000..b36d1f8de --- /dev/null +++ b/src/common/notifier/init.go @@ -0,0 +1,13 @@ +package notifier + +import ( + "github.com/vmware/harbor/src/replication/event" +) + +//Subscribe related topics +func init() { + //Listen the related event topics + Subscribe(event.StartReplicationTopic, &event.StartReplicationHandler{}) + Subscribe(event.ReplicationEventTopicOnPush, &event.OnPushHandler{}) + Subscribe(event.ReplicationEventTopicOnDeletion, &event.OnDeletionHandler{}) +} diff --git a/src/common/notifier/notifier_test.go b/src/common/notifier/notifier_test.go index 9f98679ce..8bb93c59d 100644 --- a/src/common/notifier/notifier_test.go +++ b/src/common/notifier/notifier_test.go @@ -39,6 +39,7 @@ func (fsh *fakeStatelessHandler) Handle(v interface{}) error { } func TestSubscribeAndUnSubscribe(t *testing.T) { + count := len(notificationWatcher.handlers) err := Subscribe("topic1", &fakeStatefulHandler{0}) if err != nil { t.Fatal(err) @@ -59,7 +60,7 @@ func TestSubscribeAndUnSubscribe(t *testing.T) { t.Fatal(err) } - if len(notificationWatcher.handlers) != 2 { + if len(notificationWatcher.handlers) != (count + 2) { t.Fail() } @@ -94,7 +95,7 @@ func TestSubscribeAndUnSubscribe(t *testing.T) { t.Fatal(err) } - if len(notificationWatcher.handlers) != 1 { + if len(notificationWatcher.handlers) != (count + 1) { t.Fail() } @@ -103,12 +104,13 @@ func TestSubscribeAndUnSubscribe(t *testing.T) { t.Fatal(err) } - if len(notificationWatcher.handlers) != 0 { + if len(notificationWatcher.handlers) != count { t.Fail() } } func TestPublish(t *testing.T) { + count := len(notificationWatcher.handlers) err := Subscribe("topic1", &fakeStatefulHandler{0}) if err != nil { t.Fatal(err) @@ -119,7 +121,7 @@ func TestPublish(t *testing.T) { t.Fatal(err) } - if len(notificationWatcher.handlers) != 2 { + if len(notificationWatcher.handlers) != (count + 2) { t.Fail() } @@ -149,12 +151,13 @@ func TestPublish(t *testing.T) { } func TestConcurrentPublish(t *testing.T) { + count := len(notificationWatcher.handlers) err := Subscribe("topic1", &fakeStatefulHandler{0}) if err != nil { t.Fatal(err) } - if len(notificationWatcher.handlers) != 1 { + if len(notificationWatcher.handlers) != (count + 1) { t.Fail() } @@ -186,11 +189,12 @@ func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) { t.Fatal("Policy scheduler is not started") } + count := len(notificationWatcher.handlers) if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil { t.Fatal(err.Error()) } - if len(notificationWatcher.handlers) != 1 { - t.Fatal("Handler is not registered") + if len(notificationWatcher.handlers) != (count + 1) { + t.Fatalf("Handler is not registered") } utcTime := time.Now().UTC().Unix() @@ -209,7 +213,7 @@ func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) { t.Fatal(err.Error()) } - if len(notificationWatcher.handlers) != 0 { + if len(notificationWatcher.handlers) != count { t.Fatal("Handler is not unregistered") } diff --git a/src/replication/consts.go b/src/replication/consts.go index c9407cbd9..64800b994 100644 --- a/src/replication/consts.go +++ b/src/replication/consts.go @@ -8,13 +8,24 @@ const ( //FilterItemKindTag : Kind of filter item is 'tag' FilterItemKindTag = "tag" + //TODO: Refactor constants + //TriggerKindManually : kind of trigger is 'manully' TriggerKindManually = "manually" - //TriggerKindSchedule : kind of trigger is 'schedule' - TriggerKindSchedule = "schedule" //TriggerKindImmediately : kind of trigger is 'immediately' TriggerKindImmediately = "immediately" //AdaptorKindHarbor : Kind of adaptor of Harbor AdaptorKindHarbor = "Harbor" + + //TriggerKindImmediate : Kind of trigger is 'Immediate' + TriggerKindImmediate = "Immediate" + //TriggerKindSchedule : Kind of trigger is 'Schedule' + TriggerKindSchedule = "Schedule" + //TriggerKindManual : Kind of trigger is 'Manual' + TriggerKindManual = "Manual" + //TriggerScheduleDaily : type of scheduling is 'daily' + TriggerScheduleDaily = "daily" + //TriggerScheduleWeekly : type of scheduling is 'weekly' + TriggerScheduleWeekly = "weekly" ) diff --git a/src/replication/core/controller.go b/src/replication/core/controller.go new file mode 100644 index 000000000..24430e7ef --- /dev/null +++ b/src/replication/core/controller.go @@ -0,0 +1,121 @@ +package core + +import ( + "fmt" + + "github.com/vmware/harbor/src/replication" + "github.com/vmware/harbor/src/replication/models" + "github.com/vmware/harbor/src/replication/policy" + "github.com/vmware/harbor/src/replication/source" + "github.com/vmware/harbor/src/replication/trigger" +) + +//Controller is core module to cordinate and control the overall workflow of the +//replication modules. +type Controller struct { + //Indicate whether the controller has been initialized or not + initialized bool + + //Manage the policies + policyManager *policy.Manager + + //Handle the things related with source + sourcer *source.Sourcer + + //Manage the triggers of policies + triggerManager *trigger.Manager +} + +//Keep controller as singleton instance +var ( + DefaultController = NewController(ControllerConfig{}) //Use default data +) + +//ControllerConfig includes related configurations required by the controller +type ControllerConfig struct { + //The capacity of the cache storing enabled triggers + CacheCapacity int +} + +//NewController is the constructor of Controller. +func NewController(config ControllerConfig) *Controller { + //Controller refer the default instances + return &Controller{ + policyManager: policy.NewManager(), + sourcer: source.NewSourcer(), + triggerManager: trigger.NewManager(config.CacheCapacity), + } +} + +//Init will initialize the controller and the sub components +func (ctl *Controller) Init() error { + if ctl.initialized { + return nil + } + + //Build query parameters + triggerNames := []string{ + replication.TriggerKindImmediate, + replication.TriggerKindSchedule, + } + queryName := "" + for _, name := range triggerNames { + queryName = fmt.Sprintf("%s,%s", queryName, name) + } + //Enable the triggers + query := models.QueryParameter{ + TriggerName: queryName, + } + + policies := ctl.policyManager.GetPolicies(query) + if policies != nil && len(policies) > 0 { + for _, policy := range policies { + if err := ctl.triggerManager.SetupTrigger(policy.ID, policy.Trigger); err != nil { + //TODO: Log error + fmt.Printf("Error: %s", err) + //TODO:Update the status of policy + } + } + } + + //Initialize sourcer + ctl.sourcer.Init() + + ctl.initialized = true + + return nil +} + +//CreatePolicy is used to create a new policy and enable it if necessary +func (ctl *Controller) CreatePolicy(newPolicy models.ReplicationPolicy) error { + //Validate policy + //TODO: + return nil +} + +//UpdatePolicy will update the policy with new content. +//Parameter updatedPolicy must have the ID of the updated policy. +func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) error { + return nil +} + +//RemovePolicy will remove the specified policy and clean the related settings +func (ctl *Controller) RemovePolicy(policyID int) error { + return nil +} + +//GetPolicy is delegation of GetPolicy of Policy.Manager +func (ctl *Controller) GetPolicy(policyID int) models.ReplicationPolicy { + return models.ReplicationPolicy{} +} + +//GetPolicies is delegation of GetPolicies of Policy.Manager +func (ctl *Controller) GetPolicies(query models.QueryParameter) []models.ReplicationPolicy { + return nil +} + +//Replicate starts one replication defined in the specified policy; +//Can be launched by the API layer and related triggers. +func (ctl *Controller) Replicate(policyID int) error { + return nil +} diff --git a/src/replication/event/on_deletion_handler.go b/src/replication/event/on_deletion_handler.go new file mode 100644 index 000000000..c3b76e299 --- /dev/null +++ b/src/replication/event/on_deletion_handler.go @@ -0,0 +1,57 @@ +package event + +import ( + "errors" + "fmt" + "reflect" + + "github.com/vmware/harbor/src/replication/core" + "github.com/vmware/harbor/src/replication/models" +) + +//OnDeletionHandler implements the notification handler interface to handle image on push event. +type OnDeletionHandler struct{} + +//OnDeletionNotification contains the data required by this handler +type OnDeletionNotification struct { + //The name of the project where the being pushed images are located + ProjectName string +} + +//Handle implements the same method of notification handler interface +func (oph *OnDeletionHandler) Handle(value interface{}) error { + if value == nil { + return errors.New("OnDeletionHandler can not handle nil value") + } + + vType := reflect.TypeOf(value) + if vType.Kind() != reflect.Struct || vType.String() != "event.OnDeletionNotification" { + return fmt.Errorf("Mismatch value type of OnDeletionHandler, expect %s but got %s", "event.OnDeletionNotification", vType.String()) + } + + notification := value.(OnDeletionNotification) + //TODO:Call projectManager to get the projectID + fmt.Println(notification.ProjectName) + query := models.QueryParameter{ + ProjectID: 0, + } + + policies := core.DefaultController.GetPolicies(query) + if policies != nil && len(policies) > 0 { + for _, p := range policies { + //Error accumulated and then return? + if err := core.DefaultController.Replicate(p.ID); err != nil { + //TODO:Log error + fmt.Println(err.Error()) + } + } + } + + return nil +} + +//IsStateful implements the same method of notification handler interface +func (oph *OnDeletionHandler) IsStateful() bool { + //Statless + return false +} diff --git a/src/replication/event/on_push_handler.go b/src/replication/event/on_push_handler.go new file mode 100644 index 000000000..ba6eae4df --- /dev/null +++ b/src/replication/event/on_push_handler.go @@ -0,0 +1,56 @@ +package event + +import ( + "errors" + "fmt" + "reflect" + + "github.com/vmware/harbor/src/replication/core" + "github.com/vmware/harbor/src/replication/models" +) + +//OnPushHandler implements the notification handler interface to handle image on push event. +type OnPushHandler struct{} + +//OnPushNotification contains the data required by this handler +type OnPushNotification struct { + //The ID of the project where the being pushed images are located + ProjectID int +} + +//Handle implements the same method of notification handler interface +func (oph *OnPushHandler) Handle(value interface{}) error { + if value == nil { + return errors.New("OnPushHandler can not handle nil value") + } + + vType := reflect.TypeOf(value) + if vType.Kind() != reflect.Struct || vType.String() != "event.OnPushNotification" { + return fmt.Errorf("Mismatch value type of OnPushHandler, expect %s but got %s", "event.OnPushNotification", vType.String()) + } + + notification := value.(OnDeletionNotification) + //TODO:Call projectManager to get the projectID + fmt.Println(notification.ProjectName) + query := models.QueryParameter{ + ProjectID: 0, + } + + policies := core.DefaultController.GetPolicies(query) + if policies != nil && len(policies) > 0 { + for _, p := range policies { + if err := core.DefaultController.Replicate(p.ID); err != nil { + //TODO:Log error + fmt.Println(err.Error()) + } + } + } + + return nil +} + +//IsStateful implements the same method of notification handler interface +func (oph *OnPushHandler) IsStateful() bool { + //Statless + return false +} diff --git a/src/replication/event/start_replication_handler.go b/src/replication/event/start_replication_handler.go new file mode 100644 index 000000000..aee83d576 --- /dev/null +++ b/src/replication/event/start_replication_handler.go @@ -0,0 +1,45 @@ +package event + +import ( + "errors" + "fmt" + "reflect" + + "github.com/vmware/harbor/src/replication/core" +) + +//StartReplicationHandler implements the notification handler interface to handle start replication requests. +type StartReplicationHandler struct{} + +//StartReplicationNotification contains data required by this handler +type StartReplicationNotification struct { + //ID of the policy + PolicyID int +} + +//Handle implements the same method of notification handler interface +func (srh *StartReplicationHandler) Handle(value interface{}) error { + if value == nil { + return errors.New("StartReplicationHandler can not handle nil value") + } + + vType := reflect.TypeOf(value) + if vType.Kind() != reflect.Struct || vType.String() != "core.StartReplicationNotification" { + return fmt.Errorf("Mismatch value type of StartReplicationHandler, expect %s but got %s", "core.StartReplicationNotification", vType.String()) + } + + notification := value.(StartReplicationNotification) + if notification.PolicyID <= 0 { + return errors.New("Invalid policy") + } + + //Start replication + //TODO: + return core.DefaultController.Replicate(notification.PolicyID) +} + +//IsStateful implements the same method of notification handler interface +func (srh *StartReplicationHandler) IsStateful() bool { + //Stateless + return false +} diff --git a/src/replication/event/topics.go b/src/replication/event/topics.go new file mode 100644 index 000000000..5a6aa954e --- /dev/null +++ b/src/replication/event/topics.go @@ -0,0 +1,12 @@ +package event + +const ( + //ReplicationEventTopicOnPush : OnPush event + ReplicationEventTopicOnPush = "OnPush" + + //ReplicationEventTopicOnDeletion : OnDeletion event + ReplicationEventTopicOnDeletion = "OnDeletion" + + //StartReplicationTopic : Start application request + StartReplicationTopic = "StartReplication" +) diff --git a/src/replication/models/policy.go b/src/replication/models/policy.go new file mode 100644 index 000000000..47f1867fb --- /dev/null +++ b/src/replication/models/policy.go @@ -0,0 +1,28 @@ +package models + +//ReplicationPolicy defines the structure of a replication policy. +type ReplicationPolicy struct { + //UUID of the policy + ID int + + //Projects attached to this policy + RelevantProjects []int + + //The trigger of the replication + Trigger Trigger +} + +//QueryParameter defines the parameters used to do query selection. +type QueryParameter struct { + //Query by page, couple with pageSize + Page int + + //Size of each page, couple with page + PageSize int + + //Query by the name of trigger + TriggerName string + + //Query by project ID + ProjectID int +} diff --git a/src/replication/models/trigger.go b/src/replication/models/trigger.go new file mode 100644 index 000000000..f8a06b4e5 --- /dev/null +++ b/src/replication/models/trigger.go @@ -0,0 +1,10 @@ +package models + +//Trigger is replication launching approach definition +type Trigger struct { + //The name of the trigger + Name string + + //The parameters with json text format required by the trigger + Param string +} diff --git a/src/replication/policy/manager.go b/src/replication/policy/manager.go new file mode 100644 index 000000000..22890ec73 --- /dev/null +++ b/src/replication/policy/manager.go @@ -0,0 +1,42 @@ +package policy + +import ( + "github.com/vmware/harbor/src/replication/models" +) + +//Manager provides replication policy CURD capabilities. +type Manager struct{} + +//NewManager is the constructor of Manager. +func NewManager() *Manager { + return &Manager{} +} + +//GetPolicies returns all the policies +func (m *Manager) GetPolicies(query models.QueryParameter) []models.ReplicationPolicy { + return []models.ReplicationPolicy{} +} + +//GetPolicy returns the policy with the specified ID +func (m *Manager) GetPolicy(policyID int) models.ReplicationPolicy { + return models.ReplicationPolicy{} +} + +//CreatePolicy creates a new policy with the provided data; +//If creating failed, error will be returned; +//If creating succeed, ID of the new created policy will be returned. +func (m *Manager) CreatePolicy(policy models.ReplicationPolicy) (int, error) { + return 0, nil +} + +//UpdatePolicy updates the policy; +//If updating failed, error will be returned. +func (m *Manager) UpdatePolicy(policy models.ReplicationPolicy) error { + return nil +} + +//RemovePolicy removes the specified policy; +//If removing failed, error will be returned. +func (m *Manager) RemovePolicy(policyID int) error { + return nil +} diff --git a/src/replication/source/sourcer.go b/src/replication/source/sourcer.go index 4bbfc2c44..9322bb50e 100644 --- a/src/replication/source/sourcer.go +++ b/src/replication/source/sourcer.go @@ -12,9 +12,6 @@ type Sourcer struct { adaptors map[string]registry.Adaptor } -//ReplicationSourcer is default sourcer for replication service. -var ReplicationSourcer = NewSourcer() - //NewSourcer is the constructor of Sourcer func NewSourcer() *Sourcer { return &Sourcer{ @@ -37,8 +34,3 @@ func (sc *Sourcer) GetAdaptor(kind string) registry.Adaptor { return sc.adaptors[kind] } - -//Init the adaptors -func Init() { - ReplicationSourcer.Init() -} diff --git a/src/replication/trigger/cache.go b/src/replication/trigger/cache.go new file mode 100644 index 000000000..0f74bc79d --- /dev/null +++ b/src/replication/trigger/cache.go @@ -0,0 +1,212 @@ +package trigger + +import ( + "container/heap" + "fmt" + "sync" + "time" +) + +const ( + //The max count of items the cache can keep + defaultCapacity = 1000 +) + +//Item keeps more metadata of the triggers which are stored in the heap. +type Item struct { + //Which policy the trigger belong to + policyID int + + //Frequency of cache querying + //First compration factor + frequency int + + //The timestamp of being put into heap + //Second compration factor + timestamp int64 + + //The index in the heap + index int +} + +//MetaQueue implements heap.Interface and holds items which are metadata of trigger +type MetaQueue []*Item + +//Len return the size of the queue +func (mq MetaQueue) Len() int { + return len(mq) +} + +//Less is a comparator of heap +func (mq MetaQueue) Less(i, j int) bool { + return mq[i].frequency < mq[j].frequency || + (mq[i].frequency == mq[j].frequency && + mq[i].timestamp < mq[j].timestamp) +} + +//Swap the items to rebuild heap +func (mq MetaQueue) Swap(i, j int) { + mq[i], mq[j] = mq[j], mq[i] + mq[i].index = i + mq[j].index = j +} + +//Push item into heap +func (mq *MetaQueue) Push(x interface{}) { + item := x.(*Item) + n := len(*mq) + item.index = n + item.timestamp = time.Now().UTC().UnixNano() + *mq = append(*mq, item) +} + +//Pop smallest item from heap +func (mq *MetaQueue) Pop() interface{} { + old := *mq + n := len(old) + item := old[n-1] //Smallest item + item.index = -1 //For safety + *mq = old[:n-1] + return item +} + +//Update the frequency of item +func (mq *MetaQueue) Update(item *Item) { + item.frequency++ + heap.Fix(mq, item.index) +} + +//CacheItem is the data stored in the cache. +//It contains trigger and heap item references. +type CacheItem struct { + //The trigger reference + trigger Interface + + //The heap item reference + item *Item +} + +//Cache is used to cache the enabled triggers with specified capacity. +//If exceed the capacity, cached items will be adjusted with the following rules: +// The item with least usage frequency will be replaced; +// If multiple items with same usage frequency, the oldest one will be replaced. +type Cache struct { + //The max count of items this cache can keep + capacity int + + //Lock to handle concurrent case + lock *sync.RWMutex + + //Hash map for quick locating cached item + hash map[string]CacheItem + + //Heap for quick locating the trigger with least usage + queue *MetaQueue +} + +//NewCache is constructor of cache +func NewCache(capacity int) *Cache { + cap := capacity + if cap <= 0 { + cap = defaultCapacity + } + + //Initialize heap + mq := make(MetaQueue, 0) + heap.Init(&mq) + + return &Cache{ + capacity: cap, + lock: new(sync.RWMutex), + hash: make(map[string]CacheItem), + queue: &mq, + } +} + +//Get the trigger interface with the specified policy ID +func (c *Cache) Get(policyID int) Interface { + if policyID <= 0 { + return nil + } + + c.lock.RLock() + defer c.lock.RUnlock() + + k := c.key(policyID) + + if cacheItem, ok := c.hash[k]; ok { + //Update frequency + c.queue.Update(cacheItem.item) + return cacheItem.trigger + } + + return nil +} + +//Put the item into cache with ID of ploicy as key +func (c *Cache) Put(policyID int, trigger Interface) { + if policyID <= 0 || trigger == nil { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + //Exceed the capacity? + if c.Size() >= c.capacity { + //Pop one for the new one + v := heap.Pop(c.queue) + item := v.(*Item) + //Remove from hash + delete(c.hash, c.key(item.policyID)) + } + + //Add to meta queue + item := &Item{ + policyID: policyID, + frequency: 1, + } + heap.Push(c.queue, item) + + //Cache + cacheItem := CacheItem{ + trigger: trigger, + item: item, + } + + k := c.key(policyID) + c.hash[k] = cacheItem +} + +//Remove the trigger attached to the specified policy +func (c *Cache) Remove(policyID int) Interface { + if policyID > 0 { + c.lock.Lock() + defer c.lock.Unlock() + + //If existing + k := c.key(policyID) + if cacheItem, ok := c.hash[k]; ok { + //Remove from heap + heap.Remove(c.queue, cacheItem.item.index) + + //Remove from hash + delete(c.hash, k) + + return cacheItem.trigger + } + + } + + return nil +} + +//Size return the count of triggers in the cache +func (c *Cache) Size() int { + return len(c.hash) +} + +//Generate a hash key with the policy ID +func (c *Cache) key(policyID int) string { + return fmt.Sprintf("trigger-%d", policyID) +} diff --git a/src/replication/trigger/cache_test.go b/src/replication/trigger/cache_test.go new file mode 100644 index 000000000..b7348cf26 --- /dev/null +++ b/src/replication/trigger/cache_test.go @@ -0,0 +1,53 @@ +package trigger + +import "testing" +import "time" + +func TestCache(t *testing.T) { + cache := NewCache(10) + trigger := NewImmediateTrigger(ImmediateParam{}) + + cache.Put(1, trigger) + if cache.Size() != 1 { + t.Fatalf("Invalid size, expect 1 but got %d", cache.Size()) + } + + tr := cache.Get(1) + if tr == nil { + t.Fatal("Should not get nil item") + } + + tri := cache.Remove(1) + if tri == nil || cache.Size() > 0 { + t.Fatal("Failed to remove") + } +} + +func TestCacheChange(t *testing.T) { + cache := NewCache(2) + trigger1 := NewImmediateTrigger(ImmediateParam{}) + trigger2 := NewImmediateTrigger(ImmediateParam{}) + cache.Put(1, trigger1) + cache.Put(2, trigger2) + + if cache.Size() != 2 { + t.Fatalf("Invalid size, expect 2 but got %d", cache.Size()) + } + + if tr := cache.Get(2); tr == nil { + t.Fatal("Should not get nil item") + } + + time.Sleep(100 * time.Microsecond) + + trigger3 := NewImmediateTrigger(ImmediateParam{}) + cache.Put(3, trigger3) + if cache.Size() != 2 { + t.Fatalf("Invalid size, expect 2 but got %d", cache.Size()) + } + + if tr := cache.Get(1); tr != nil { + t.Fatal("item1 should not exist") + } + +} diff --git a/src/replication/trigger/immediate.go b/src/replication/trigger/immediate.go new file mode 100644 index 000000000..af4ba3b2b --- /dev/null +++ b/src/replication/trigger/immediate.go @@ -0,0 +1,47 @@ +package trigger + +import ( + "errors" + + "github.com/vmware/harbor/src/replication" +) + +//ImmediateTrigger will setup watcher at the image pushing action to fire +//replication event at pushing happening time. +type ImmediateTrigger struct { + params ImmediateParam +} + +//NewImmediateTrigger is constructor of ImmediateTrigger +func NewImmediateTrigger(params ImmediateParam) *ImmediateTrigger { + return &ImmediateTrigger{ + params: params, + } +} + +//Kind is the implementation of same method defined in Trigger interface +func (st *ImmediateTrigger) Kind() string { + return replication.TriggerKindImmediate +} + +//Setup is the implementation of same method defined in Trigger interface +func (st *ImmediateTrigger) Setup() error { + if st.params.PolicyID <= 0 || len(st.params.Namespace) == 0 { + return errors.New("Invalid parameters for Immediate trigger") + } + + //TODO: Need more complicated logic here to handle partial updates + wt := WatchItem{ + PolicyID: st.params.PolicyID, + Namespace: st.params.Namespace, + OnDeletion: st.params.OnDeletion, + OnPush: true, + } + + return DefaultWatchList.Add(wt) +} + +//Unset is the implementation of same method defined in Trigger interface +func (st *ImmediateTrigger) Unset() error { + return errors.New("Not implemented") +} diff --git a/src/replication/trigger/interface.go b/src/replication/trigger/interface.go new file mode 100644 index 000000000..d08e75137 --- /dev/null +++ b/src/replication/trigger/interface.go @@ -0,0 +1,13 @@ +package trigger + +//Interface is certian mechanism to know when fire the replication operation. +type Interface interface { + //Kind indicates what type of the trigger is. + Kind() string + + //Setup/enable the trigger; if failed, an error would be returned. + Setup() error + + //Remove/disable the trigger; if failed, an error would be returned. + Unset() error +} diff --git a/src/replication/trigger/manager.go b/src/replication/trigger/manager.go new file mode 100644 index 000000000..daed480e0 --- /dev/null +++ b/src/replication/trigger/manager.go @@ -0,0 +1,136 @@ +package trigger + +import ( + "errors" + + "github.com/vmware/harbor/src/replication" + "github.com/vmware/harbor/src/replication/models" +) + +//Manager provides unified methods to manage the triggers of policies; +//Cache the enabled triggers, setup/unset the trigger based on the parameters +//with json format. +type Manager struct { + //Cache for triggers + cache *Cache +} + +//NewManager is the constructor of trigger manager. +//capacity is the max number of trigger references manager can keep in memory +func NewManager(capacity int) *Manager { + return &Manager{ + cache: NewCache(capacity), + } +} + +//GetTrigger returns the enabled trigger reference if existing in the cache. +func (m *Manager) GetTrigger(policyID int) Interface { + return m.cache.Get(policyID) +} + +//RemoveTrigger will disable the trigger and remove it from the cache if existing. +func (m *Manager) RemoveTrigger(policyID int) error { + trigger := m.cache.Get(policyID) + if trigger == nil { + return errors.New("Trigger is not cached, please use UnsetTrigger to disable the trigger") + } + + //Unset trigger + if err := trigger.Unset(); err != nil { + return err + } + + //Remove from cache + //No need to check the return of remove because the dirty item cached in the cache + //will be removed out finally after a certain while + m.cache.Remove(policyID) + + return nil +} + +//SetupTrigger will create the new trigger based on the provided json parameters. +//If failed, an error will be returned. +func (m *Manager) SetupTrigger(policyID int, trigger models.Trigger) error { + if policyID <= 0 { + return errors.New("Invalid policy ID") + } + + if len(trigger.Name) == 0 { + return errors.New("Invalid replication trigger definition") + } + + switch trigger.Name { + case replication.TriggerKindSchedule: + param := ScheduleParam{} + if err := param.Parse(trigger.Param); err != nil { + return err + } + //Append policy ID info + param.PolicyID = policyID + + newTrigger := NewScheduleTrigger(param) + if err := newTrigger.Setup(); err != nil { + return err + } + case replication.TriggerKindImmediate: + param := ImmediateParam{} + if err := param.Parse(trigger.Param); err != nil { + return err + } + //Append policy ID info + param.PolicyID = policyID + + newTrigger := NewImmediateTrigger(param) + if err := newTrigger.Setup(); err != nil { + return err + } + default: + //Treat as manual trigger + break + } + + return nil +} + +//UnsetTrigger will disable the trigger which is not cached in the trigger cache. +func (m *Manager) UnsetTrigger(policyID int, trigger models.Trigger) error { + if policyID <= 0 { + return errors.New("Invalid policy ID") + } + + if len(trigger.Name) == 0 { + return errors.New("Invalid replication trigger definition") + } + + switch trigger.Name { + case replication.TriggerKindSchedule: + param := ScheduleParam{} + if err := param.Parse(trigger.Param); err != nil { + return err + } + //Append policy ID info + param.PolicyID = policyID + + newTrigger := NewScheduleTrigger(param) + if err := newTrigger.Unset(); err != nil { + return err + } + case replication.TriggerKindImmediate: + param := ImmediateParam{} + if err := param.Parse(trigger.Param); err != nil { + return err + } + //Append policy ID info + param.PolicyID = policyID + + newTrigger := NewImmediateTrigger(param) + if err := newTrigger.Unset(); err != nil { + return err + } + default: + //Treat as manual trigger + break + } + + return nil +} diff --git a/src/replication/trigger/param_immediate.go b/src/replication/trigger/param_immediate.go new file mode 100644 index 000000000..492c65872 --- /dev/null +++ b/src/replication/trigger/param_immediate.go @@ -0,0 +1,26 @@ +package trigger + +import ( + "errors" +) + +//NOTES: Whether replicate the existing images when the type of trigger is +//'Immediate' is a once-effective setting which will not be persisted +// and kept as one parameter of 'Immediate' trigger. It will only be +//covered by the UI logic. + +//ImmediateParam defines the parameter of immediate trigger +type ImmediateParam struct { + //Basic parameters + BasicParam + + //Namepace + Namespace string +} + +//Parse is the implementation of same method in TriggerParam interface +//NOTES: No need to implement this method for 'Immediate' trigger as +//it does not have any parameters with json format. +func (ip ImmediateParam) Parse(param string) error { + return errors.New("Should NOT be called as it's not implemented") +} diff --git a/src/replication/trigger/param_schedule.go b/src/replication/trigger/param_schedule.go new file mode 100644 index 000000000..84ca46f44 --- /dev/null +++ b/src/replication/trigger/param_schedule.go @@ -0,0 +1,30 @@ +package trigger + +import ( + "encoding/json" + "errors" +) + +//ScheduleParam defines the parameter of schedule trigger +type ScheduleParam struct { + //Basic parameters + BasicParam + + //Daily or weekly + Type string + + //Optional, only used when type is 'weekly' + Weekday int8 + + //The time offset with the UTC 00:00 in seconds + Offtime int64 +} + +//Parse is the implementation of same method in TriggerParam interface +func (stp ScheduleParam) Parse(param string) error { + if len(param) == 0 { + return errors.New("Parameter of schedule trigger should not be empty") + } + + return json.Unmarshal([]byte(param), &stp) +} diff --git a/src/replication/trigger/schedule.go b/src/replication/trigger/schedule.go new file mode 100644 index 000000000..d7c07cadd --- /dev/null +++ b/src/replication/trigger/schedule.go @@ -0,0 +1,34 @@ +package trigger + +import ( + "errors" + + "github.com/vmware/harbor/src/replication" +) + +//ScheduleTrigger will schedule a alternate policy to provide 'daily' and 'weekly' trigger ways. +type ScheduleTrigger struct { + params ScheduleParam +} + +//NewScheduleTrigger is constructor of ScheduleTrigger +func NewScheduleTrigger(params ScheduleParam) *ScheduleTrigger { + return &ScheduleTrigger{ + params: params, + } +} + +//Kind is the implementation of same method defined in Trigger interface +func (st *ScheduleTrigger) Kind() string { + return replication.TriggerKindSchedule +} + +//Setup is the implementation of same method defined in Trigger interface +func (st *ScheduleTrigger) Setup() error { + return errors.New("Not implemented") +} + +//Unset is the implementation of same method defined in Trigger interface +func (st *ScheduleTrigger) Unset() error { + return errors.New("Not implemented") +} diff --git a/src/replication/trigger/trigger_param.go b/src/replication/trigger/trigger_param.go new file mode 100644 index 000000000..10420ef5b --- /dev/null +++ b/src/replication/trigger/trigger_param.go @@ -0,0 +1,17 @@ +package trigger + +//BasicParam contains the general parameters for all triggers +type BasicParam struct { + //ID of the related policy + PolicyID int + + //Whether delete remote replicated images if local ones are deleted + OnDeletion bool +} + +//Parameter defines operation of doing initialization from parameter json text +type Parameter interface { + //Decode parameter with json style to the owner struct + //If failed, an error will be returned + Parse(param string) error +} diff --git a/src/replication/trigger/watch_list.go b/src/replication/trigger/watch_list.go new file mode 100644 index 000000000..6a2009f29 --- /dev/null +++ b/src/replication/trigger/watch_list.go @@ -0,0 +1,43 @@ +package trigger + +//DefaultWatchList is the default instance of WatchList +var DefaultWatchList = &WatchList{} + +//WatchList contains the items which should be evaluated for replication +//when image pushing or deleting happens. +type WatchList struct{} + +//WatchItem keeps the related data for evaluation in WatchList. +type WatchItem struct { + //ID of policy + PolicyID int + + //Corresponding namespace + Namespace string + + //For deletion event + OnDeletion bool + + //For pushing event + OnPush bool +} + +//Add item to the list and persist into DB +func (wl *WatchList) Add(item WatchItem) error { + return nil +} + +//Remove the specified watch item from list +func (wl *WatchList) Remove() WatchItem { + return WatchItem{} +} + +//Update the watch item in the list +func (wl *WatchList) Update(updatedItem WatchItem) error { + return nil +} + +//Get the specified watch item +func (wl *WatchList) Get(namespace string) WatchItem { + return WatchItem{} +}