From 44be165edf64a25f4084bfdc94d3bc2ab3e1fac6 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 7 Jul 2017 00:38:38 +0800 Subject: [PATCH] Enable policy scheduler in ui main --- src/common/notifier/notification_handler.go | 15 ++ src/common/notifier/notifier.go | 228 ++++++++++++++++++ src/common/notifier/notifier_test.go | 142 +++++++++++ .../scan_policy_notitification_handler.go | 72 ++++++ ...scan_policy_notitification_handler_test.go | 54 +++++ src/common/notifier/topics.go | 11 + .../scheduler/policy/alternate_policy.go | 23 +- src/common/scheduler/policy/policy.go | 4 +- src/common/scheduler/scheduler.go | 13 +- src/common/scheduler/scheduler_test.go | 6 +- src/common/scheduler/watcher.go | 8 +- src/ui/config/config.go | 44 +++- src/ui/main.go | 20 ++ 13 files changed, 617 insertions(+), 23 deletions(-) create mode 100644 src/common/notifier/notification_handler.go create mode 100644 src/common/notifier/notifier.go create mode 100644 src/common/notifier/notifier_test.go create mode 100644 src/common/notifier/scan_policy_notitification_handler.go create mode 100644 src/common/notifier/scan_policy_notitification_handler_test.go create mode 100644 src/common/notifier/topics.go diff --git a/src/common/notifier/notification_handler.go b/src/common/notifier/notification_handler.go new file mode 100644 index 000000000..5d4508aa3 --- /dev/null +++ b/src/common/notifier/notification_handler.go @@ -0,0 +1,15 @@ +package notifier + +//NotificationHandler defines what operations a notification handler +//should have. +type NotificationHandler interface { + //Handle the event when it coming. + //value might be optional, it depends on usages. + Handle(value interface{}) error + + //IsStateful returns whether the handler is stateful or not. + //If handler is stateful, it will not be triggerred in parallel. + //Otherwise, the handler will be triggered concurrently if more + //than one same handler are matched the topics. + IsStateful() bool +} diff --git a/src/common/notifier/notifier.go b/src/common/notifier/notifier.go new file mode 100644 index 000000000..5b92247ff --- /dev/null +++ b/src/common/notifier/notifier.go @@ -0,0 +1,228 @@ +package notifier + +import ( + "errors" + "fmt" + "reflect" + "strings" + "sync" + + "github.com/vmware/harbor/src/common/utils/log" +) + +//HandlerIndexer is setup the relationship between the handler type and +//instance. +type HandlerIndexer map[string]NotificationHandler + +//Notification wraps the topic and related data value if existing. +type Notification struct { + //Topic of notification + //Required + Topic string + + //Value of notification. + //Optional + Value interface{} +} + +//HandlerChannel provides not only the chan itself but also the count of +//handlers related with this chan. +type HandlerChannel struct { + //To indicate how many handler instances bound with this chan. + boundCount uint32 + + //The chan for controling concurrent executions. + channel chan bool +} + +//NotificationWatcher is defined to accept the events published +//by the sender and match it with pre-registered notification handler +//and then trigger the execution of the found handler. +type NotificationWatcher struct { + //For handle concurrent scenario. + *sync.RWMutex + + //To keep the registered handlers in memory. + //Each topic can register multiple handlers. + //Each handler can bind to multiple topics. + handlers map[string]HandlerIndexer + + //Keep the channels which are used to control the concurrent executions + //of multiple stateful handlers with same type. + handlerChannels map[string]*HandlerChannel +} + +//notificationWatcher is a default notification watcher in package level. +var notificationWatcher = NewNotificationWatcher() + +//NewNotificationWatcher is constructor of NotificationWatcher. +func NewNotificationWatcher() *NotificationWatcher { + return &NotificationWatcher{ + new(sync.RWMutex), + make(map[string]HandlerIndexer), + make(map[string]*HandlerChannel), + } +} + +//Handle the related topic with the specified handler. +func (nw *NotificationWatcher) Handle(topic string, handler NotificationHandler) error { + if strings.TrimSpace(topic) == "" { + return errors.New("Empty topic is not supported") + } + + if handler == nil { + return errors.New("Nil handler can not be registered") + } + + defer nw.Unlock() + nw.Lock() + + t := reflect.TypeOf(handler).String() + if indexer, ok := nw.handlers[topic]; ok { + if _, existing := indexer[t]; existing { + return fmt.Errorf("Topic %s has already register the handler with type %s", topic, t) + } + + indexer[t] = handler + } else { + newIndexer := make(HandlerIndexer) + newIndexer[t] = handler + nw.handlers[topic] = newIndexer + } + + if handler.IsStateful() { + //First time + if handlerChan, ok := nw.handlerChannels[t]; !ok { + nw.handlerChannels[t] = &HandlerChannel{1, make(chan bool, 1)} + } else { + //Already have chan, just increase count + handlerChan.boundCount++ + } + } + + return nil +} + +//UnHandle is to revoke the registered handler with the specified topic. +//'handler' is optional, the type name of the handler. If it's empty value, +//then revoke the whole topic, otherwise only revoke the specified handler. +func (nw *NotificationWatcher) UnHandle(topic string, handler string) error { + if strings.TrimSpace(topic) == "" { + return errors.New("Empty topic is not supported") + } + + defer nw.Unlock() + nw.Lock() + + var revokeHandler = func(indexer HandlerIndexer, handlerType string) bool { + //Find the specified one + if hd, existing := indexer[handlerType]; existing { + delete(indexer, handlerType) + if len(indexer) == 0 { + //No handler existing, then remove topic + delete(nw.handlers, topic) + } + + //Update channel counter or remove channel + if hd.IsStateful() { + if theChan, yes := nw.handlerChannels[handlerType]; yes { + theChan.boundCount-- + if theChan.boundCount == 0 { + //Empty, then remove the channel + delete(nw.handlerChannels, handlerType) + } + } + } + + return true + } + + return false + } + + if indexer, ok := nw.handlers[topic]; ok { + if strings.TrimSpace(handler) == "" { + for t := range indexer { + revokeHandler(indexer, t) + } + + return nil + } + + //Revoke the specified handler. + if revokeHandler(indexer, handler) { + return nil + } + } + + return fmt.Errorf("Failed to revoke handler %s with topic %s", handler, topic) +} + +//Notify that notification is coming. +func (nw *NotificationWatcher) Notify(notification Notification) error { + if strings.TrimSpace(notification.Topic) == "" { + return errors.New("Empty topic can not be notified") + } + + nw.RLock() + defer nw.RUnlock() + + var ( + indexer HandlerIndexer + ok bool + handlers = []NotificationHandler{} + ) + if indexer, ok = nw.handlers[notification.Topic]; !ok { + return fmt.Errorf("No handlers registered for handling topic %s", notification.Topic) + } + + for _, h := range indexer { + handlers = append(handlers, h) + } + + //Trigger handlers + for _, h := range handlers { + var handlerChan chan bool + + if h.IsStateful() { + t := reflect.TypeOf(h).String() + handlerChan = nw.handlerChannels[t].channel + } + go func(hd NotificationHandler, ch chan bool) { + if hd.IsStateful() && ch != nil { + ch <- true + } + go func() { + defer func() { + if hd.IsStateful() && ch != nil { + <-ch + } + }() + if err := hd.Handle(notification.Value); err != nil { + //Currently, we just log the error + log.Errorf("Error occurred when triggerring handler %s of topic %s: %s\n", reflect.TypeOf(hd).String(), notification.Topic, err.Error()) + } + }() + }(h, handlerChan) + } + + return nil +} + +//Subscribe is a wrapper utility method for NotificationWatcher.handle() +func Subscribe(topic string, handler NotificationHandler) error { + return notificationWatcher.Handle(topic, handler) +} + +//UnSubscribe is a wrapper utility method for NotificationWatcher.UnHandle() +func UnSubscribe(topic string, handler string) error { + return notificationWatcher.UnHandle(topic, handler) +} + +//Publish is a wrapper utility method for NotificationWatcher.notify() +func Publish(topic string, value interface{}) error { + return notificationWatcher.Notify(Notification{ + Topic: topic, + Value: value, + }) +} diff --git a/src/common/notifier/notifier_test.go b/src/common/notifier/notifier_test.go new file mode 100644 index 000000000..4d3499a9d --- /dev/null +++ b/src/common/notifier/notifier_test.go @@ -0,0 +1,142 @@ +package notifier + +import ( + "reflect" + "testing" + "time" +) + +var statefulData int + +type fakeStatefulHandler struct { + number int +} + +func (fsh *fakeStatefulHandler) IsStateful() bool { + return true +} + +func (fsh *fakeStatefulHandler) Handle(v interface{}) error { + increment := 0 + if v != nil && reflect.TypeOf(v).Kind() == reflect.Int { + increment = v.(int) + } + statefulData += increment + return nil +} + +type fakeStatelessHandler struct{} + +func (fsh *fakeStatelessHandler) IsStateful() bool { + return false +} + +func (fsh *fakeStatelessHandler) Handle(v interface{}) error { + return nil +} + +func TestSubscribeAndUnSubscribe(t *testing.T) { + err := Subscribe("topic1", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic1", &fakeStatelessHandler{}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatelessHandler{}) + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 2 { + t.Fail() + } + + if indexer, ok := notificationWatcher.handlers["topic1"]; !ok { + t.Fail() + } else { + if len(indexer) != 2 { + t.Fail() + } + } + + if len(notificationWatcher.handlerChannels) != 1 { + t.Fail() + } + + err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlerChannels) != 0 { + t.Fail() + } + + err = UnSubscribe("topic1", "") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 1 { + t.Fail() + } + + err = UnSubscribe("topic2", "") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 0 { + t.Fail() + } +} + +func TestPublish(t *testing.T) { + err := Subscribe("topic1", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 2 { + t.Fail() + } + + Publish("topic1", 100) + Publish("topic2", 50) + + //Waiting for async is done + <-time.After(1 * time.Second) + + if statefulData != 150 { + t.Fatalf("Expect execution result %d, but got %d", 150, statefulData) + } + + err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } +} diff --git a/src/common/notifier/scan_policy_notitification_handler.go b/src/common/notifier/scan_policy_notitification_handler.go new file mode 100644 index 000000000..56c1bf885 --- /dev/null +++ b/src/common/notifier/scan_policy_notitification_handler.go @@ -0,0 +1,72 @@ +package notifier + +import ( + "errors" + "reflect" + + "time" + + "github.com/vmware/harbor/src/common/scheduler" + "github.com/vmware/harbor/src/common/scheduler/policy" + "github.com/vmware/harbor/src/common/scheduler/task" +) + +const ( + //PolicyTypeDaily specify the policy type is "daily" + PolicyTypeDaily = "daily" + + alternatePolicy = "Alternate Policy" +) + +//ScanPolicyNotification is defined for pass the policy change data. +type ScanPolicyNotification struct { + //Type is used to keep the scan policy type: "none","daily" and "refresh". + Type string + + //DailyTime is used when the type is 'daily', the offset with UTC time 00:00. + DailyTime int64 +} + +//ScanPolicyNotificationHandler is defined to handle the changes of scanning +//policy. +type ScanPolicyNotificationHandler struct{} + +//IsStateful to indicate this handler is stateful. +func (s *ScanPolicyNotificationHandler) IsStateful() bool { + //Policy change should be done one by one. + return true +} + +//Handle the policy change notification. +func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { + if value == nil { + return errors.New("ScanPolicyNotificationHandler can not handle nil value") + } + + if reflect.TypeOf(value).Kind() != reflect.Struct || + reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" { + return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type") + } + + notification := value.(ScanPolicyNotification) + + hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy) + if notification.Type == PolicyTypeDaily { + if !hasScheduled { + schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ + Duration: 24 * time.Hour, + OffsetTime: notification.DailyTime, + }) + attachTask := task.NewScanAllTask() + schedulePolicy.AttachTasks(attachTask) + + return scheduler.DefaultScheduler.Schedule(schedulePolicy) + } + } else { + if hasScheduled { + return scheduler.DefaultScheduler.UnSchedule(alternatePolicy) + } + } + + return nil +} diff --git a/src/common/notifier/scan_policy_notitification_handler_test.go b/src/common/notifier/scan_policy_notitification_handler_test.go new file mode 100644 index 000000000..46efffbe5 --- /dev/null +++ b/src/common/notifier/scan_policy_notitification_handler_test.go @@ -0,0 +1,54 @@ +package notifier + +import ( + "testing" + "time" + + "github.com/vmware/harbor/src/common/scheduler" +) + +var testingScheduler = scheduler.DefaultScheduler + +func TestScanPolicyNotificationHandler(t *testing.T) { + //Scheduler should be running. + testingScheduler.Start() + if !testingScheduler.IsRunning() { + t.Fatal("scheduler should be running") + } + + handler := &ScanPolicyNotificationHandler{} + if !handler.IsStateful() { + t.Fail() + } + + utcTime := time.Now().UTC().Unix() + notification := ScanPolicyNotification{"daily", utcTime + 3600} + if err := handler.Handle(notification); err != nil { + t.Fatal(err) + } + + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if !testingScheduler.HasScheduled("Alternate Policy") { + t.Fatal("Handler does not work") + } + + notification2 := ScanPolicyNotification{"none", 0} + if err := handler.Handle(notification2); err != nil { + t.Fatal(err) + } + + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if testingScheduler.HasScheduled("Alternate Policy") { + t.Fail() + } + + //Clear + testingScheduler.Stop() + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if testingScheduler.IsRunning() { + t.Fatal("scheduler should be stopped") + } +} diff --git a/src/common/notifier/topics.go b/src/common/notifier/topics.go new file mode 100644 index 000000000..869ff1055 --- /dev/null +++ b/src/common/notifier/topics.go @@ -0,0 +1,11 @@ +package notifier + +import ( + "github.com/vmware/harbor/src/common" +) + +//Define global topic names +const ( + //ScanAllPolicyTopic is for notifying the change of scanning all policy. + ScanAllPolicyTopic = common.ScanAllPolicy +) diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go index a126f7468..4fa6ecf7f 100644 --- a/src/common/scheduler/policy/alternate_policy.go +++ b/src/common/scheduler/policy/alternate_policy.go @@ -79,7 +79,7 @@ func (alp *AlternatePolicy) Tasks() []task.Task { } //Done is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Done() chan bool { +func (alp *AlternatePolicy) Done() <-chan bool { return alp.done } @@ -96,8 +96,6 @@ func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error { //Disable is an implementation of same method in policy interface. func (alp *AlternatePolicy) Disable() error { - alp.isEnabled = false - //Stop the ticker if alp.ticker != nil { alp.ticker.Stop() @@ -111,7 +109,7 @@ func (alp *AlternatePolicy) Disable() error { } //Evaluate is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Evaluate() chan EvaluationResult { +func (alp *AlternatePolicy) Evaluate() <-chan EvaluationResult { //Keep idempotent if alp.isEnabled && alp.evaluation != nil { return alp.evaluation @@ -122,6 +120,9 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult { alp.evaluation = make(chan EvaluationResult) go func() { + defer func() { + alp.isEnabled = false + }() timeNow := time.Now().UTC() timeSeconds := timeNow.Unix() @@ -130,13 +131,16 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult { if alp.config.EndTimestamp > 0 && timeSeconds >= alp.config.EndTimestamp { //Invalid configuration, exit. alp.done <- true - alp.isEnabled = false return } if alp.config.StartTimestamp > 0 && timeSeconds < alp.config.StartTimestamp { //Let's hold on for a while. forWhile := alp.config.StartTimestamp - timeSeconds - time.Sleep(time.Duration(forWhile) * time.Second) + select { + case <-time.After(time.Duration(forWhile) * time.Second): + case <-alp.terminator: + return + } } //Reach the execution time point? @@ -147,7 +151,11 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult { } if diff > 0 { //Wait for a while. - time.Sleep(time.Duration(diff) * time.Second) + select { + case <-time.After(time.Duration(diff) * time.Second): + case <-alp.terminator: + return + } } //Trigger the first tick. @@ -163,7 +171,6 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult { if alp.config.EndTimestamp > 0 && time >= alp.config.EndTimestamp { //Ploicy is done. alp.done <- true - alp.isEnabled = false if alp.ticker != nil { alp.ticker.Stop() } diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go index 38ba2da4e..b012c6f9b 100644 --- a/src/common/scheduler/policy/policy.go +++ b/src/common/scheduler/policy/policy.go @@ -25,13 +25,13 @@ type Policy interface { //Done will setup a channel for other components to check whether or not //the policy is completed. Possibly designed for the none loop policy. - Done() chan bool + Done() <-chan bool //Evaluate the policy based on its definition and return the result via //result channel. Policy is enabled after it is evaluated. //Make sure Evaluate is idempotent, that means one policy can be only enabled //only once even if Evaluate is called more than one times. - Evaluate() chan EvaluationResult + Evaluate() <-chan EvaluationResult //Disable the enabled policy and release all the allocated resources. //Disable should also send signal to the terminated channel which returned by Done. diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go index 38e5b204e..3073f6343 100644 --- a/src/common/scheduler/scheduler.go +++ b/src/common/scheduler/scheduler.go @@ -93,7 +93,7 @@ func NewScheduler(config *Configuration) *Scheduler { sq := make(chan policy.Policy, qSize) usq := make(chan string, qSize) stChan := make(chan *StatItem, 4) - tc := make(chan bool, 2) + tc := make(chan bool, 1) store := NewConcurrentStore() return &Scheduler{ @@ -129,6 +129,9 @@ func (sch *Scheduler) Start() { }() for { select { + case <-sch.terminateChan: + //Exit + return case p := <-sch.scheduleQueue: //Schedule the policy. watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue) @@ -148,9 +151,6 @@ func (sch *Scheduler) Start() { } sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} - case <-sch.terminateChan: - //Exit - return case stat := <-sch.statChan: { @@ -258,3 +258,8 @@ func (sch *Scheduler) UnSchedule(policyName string) error { func (sch *Scheduler) IsRunning() bool { return sch.isRunning } + +//HasScheduled is to check whether the given policy has been scheduled or not. +func (sch *Scheduler) HasScheduled(policyName string) bool { + return sch.policies.Exists(policyName) +} diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go index a25d5527f..4396a2275 100644 --- a/src/common/scheduler/scheduler_test.go +++ b/src/common/scheduler/scheduler_test.go @@ -29,11 +29,11 @@ func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error { return nil } -func (fp *fakePolicy) Done() chan bool { +func (fp *fakePolicy) Done() <-chan bool { return fp.done } -func (fp *fakePolicy) Evaluate() chan policy.EvaluationResult { +func (fp *fakePolicy) Evaluate() <-chan policy.EvaluationResult { fp.evaluation = make(chan policy.EvaluationResult, 2) fp.done = make(chan bool) fp.terminate = make(chan bool) @@ -136,7 +136,7 @@ func TestScheduler(t *testing.T) { } DefaultScheduler.Stop() - if DefaultScheduler.policies.Size() != 0 { + if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() { t.Fatal("Scheduler is not cleared after stopping") } } diff --git a/src/common/scheduler/watcher.go b/src/common/scheduler/watcher.go index 2fe32a127..2e1b53f11 100644 --- a/src/common/scheduler/watcher.go +++ b/src/common/scheduler/watcher.go @@ -54,6 +54,10 @@ func (wc *Watcher) Start() { } }() + defer func() { + wc.isRunning = false + }() + evalChan := pl.Evaluate() done := pl.Done() @@ -94,8 +98,6 @@ func (wc *Watcher) Start() { case <-done: { //Policy is automatically completed. - wc.isRunning = false - //Report policy change stats. if wc.doneChan != nil { wc.doneChan <- wc.p.Name() @@ -125,8 +127,6 @@ func (wc *Watcher) Stop() { } //Stop watcher. wc.cmdChan <- true - - wc.isRunning = false } //IsRunning to indicate if the watcher is still running. diff --git a/src/ui/config/config.go b/src/ui/config/config.go index 2c0287dbc..6daea3fa6 100644 --- a/src/ui/config/config.go +++ b/src/ui/config/config.go @@ -20,13 +20,17 @@ import ( "fmt" "net/http" "os" + "reflect" "strings" + "errors" + "github.com/vmware/harbor/src/adminserver/client" "github.com/vmware/harbor/src/adminserver/client/auth" "github.com/vmware/harbor/src/common" comcfg "github.com/vmware/harbor/src/common/config" "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/secret" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/ui/projectmanager" @@ -143,9 +147,17 @@ func Reset() error { return mg.Reset() } -// Upload uploads all system configutations to admin server +// Upload uploads all system configurations to admin server func Upload(cfg map[string]interface{}) error { - return mg.Upload(cfg) + err := mg.Upload(cfg) + if err == nil { + //Watch configuration changes after updating. + if er := watchConfigChanges(cfg); er != nil { + log.Errorf("Error occurred when watching configuration changes: %s\n", er.Error()) + } + } + + return err } // GetSystemCfg returns the system configurations @@ -400,3 +412,31 @@ func ScanAllPolicy() models.ScanAllPolicy { func WithAdmiral() bool { return len(AdmiralEndpoint()) > 0 } + +func watchConfigChanges(cfg map[string]interface{}) error { + if cfg == nil { + return errors.New("Empty configurations") + } + + //Currently only watch the scan all policy change. + if v, ok := cfg[notifier.ScanAllPolicyTopic]; ok { + if reflect.TypeOf(v).Kind() == reflect.Struct && + reflect.TypeOf(v).String() == "models.ScanAllPolicy" { + policyCfg := v.(models.ScanAllPolicy) + policyNotification := notifier.ScanPolicyNotification{ + Type: policyCfg.Type, + DailyTime: 0, + } + + if t, yes := policyCfg.Parm["daily_time"]; yes { + if reflect.TypeOf(t).Kind() == reflect.Int { + policyNotification.DailyTime = t.(int64) + } + } + + return notifier.Publish(notifier.ScanAllPolicyTopic, policyNotification) + } + } + + return nil +} diff --git a/src/ui/main.go b/src/ui/main.go index 5a3651912..c21913219 100644 --- a/src/ui/main.go +++ b/src/ui/main.go @@ -26,6 +26,8 @@ import ( "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/notifier" + "github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/ui/api" _ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/ldap" @@ -98,6 +100,24 @@ func main() { log.Error(err) } + //Enable the policy scheduler here. + scheduler.DefaultScheduler.Start() + + //Subscribe the policy change topic. + notifier.Subscribe(notifier.ScanAllPolicyTopic, ¬ifier.ScanPolicyNotificationHandler{}) + + //Get policy configuration. + scanAllPolicy := config.ScanAllPolicy() + if scanAllPolicy.Type == notifier.PolicyTypeDaily { + dailyTime := 0 + if t, ok := scanAllPolicy.Parm["daily_time"]; ok { + dailyTime = t + } + + //Send notification to handle first policy change. + notifier.publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{scanAllPolicy.Type, dailyTime}) + } + filter.Init() beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter)