From 201095a25921ce1d157435c077af8280300ba11a Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Mon, 17 Jul 2017 23:43:24 +0800 Subject: [PATCH] Enhance scanning policy notify handler and add more testing cases --- src/common/notifier/notifier.go | 1 - src/common/notifier/notifier_test.go | 92 ++++++++++++++++++- .../scan_policy_notitification_handler.go | 63 +++++++++++-- ...scan_policy_notitification_handler_test.go | 27 +++++- .../scheduler/policy/alternate_policy.go | 8 +- src/common/scheduler/scheduler.go | 34 +++++-- src/common/scheduler/scheduler_test.go | 3 + 7 files changed, 198 insertions(+), 30 deletions(-) diff --git a/src/common/notifier/notifier.go b/src/common/notifier/notifier.go index 5b92247ff..f93df9462 100644 --- a/src/common/notifier/notifier.go +++ b/src/common/notifier/notifier.go @@ -183,7 +183,6 @@ func (nw *NotificationWatcher) Notify(notification Notification) error { //Trigger handlers for _, h := range handlers { var handlerChan chan bool - if h.IsStateful() { t := reflect.TypeOf(h).String() handlerChan = nw.handlerChannels[t].channel diff --git a/src/common/notifier/notifier_test.go b/src/common/notifier/notifier_test.go index 4d3499a9d..8d0c99cfa 100644 --- a/src/common/notifier/notifier_test.go +++ b/src/common/notifier/notifier_test.go @@ -2,11 +2,14 @@ package notifier import ( "reflect" + "sync/atomic" "testing" "time" + + "github.com/vmware/harbor/src/common/scheduler" ) -var statefulData int +var statefulData int32 type fakeStatefulHandler struct { number int @@ -21,7 +24,7 @@ func (fsh *fakeStatefulHandler) Handle(v interface{}) error { if v != nil && reflect.TypeOf(v).Kind() == reflect.Int { increment = v.(int) } - statefulData += increment + atomic.AddInt32(&statefulData, (int32)(increment)) return nil } @@ -126,11 +129,12 @@ func TestPublish(t *testing.T) { //Waiting for async is done <-time.After(1 * time.Second) - if statefulData != 150 { - t.Fatalf("Expect execution result %d, but got %d", 150, statefulData) + finalData := atomic.LoadInt32(&statefulData) + if finalData != 150 { + t.Fatalf("Expect execution result %d, but got %d", 150, finalData) } - err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler") + err = UnSubscribe("topic1", "") if err != nil { t.Fatal(err) } @@ -139,4 +143,82 @@ func TestPublish(t *testing.T) { if err != nil { t.Fatal(err) } + + //Clear stateful data. + atomic.StoreInt32(&statefulData, 0) +} + +func TestConcurrentPublish(t *testing.T) { + err := Subscribe("topic1", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 1 { + t.Fail() + } + + //Publish in a short interval. + for i := 0; i < 10; i++ { + Publish("topic1", 100) + } + + //Waiting for async is done + <-time.After(1 * time.Second) + + finalData := atomic.LoadInt32(&statefulData) + if finalData != 1000 { + t.Fatalf("Expect execution result %d, but got %d", 1000, finalData) + } + + err = UnSubscribe("topic1", "") + if err != nil { + t.Fatal(err) + } + + //Clear stateful data. + atomic.StoreInt32(&statefulData, 0) +} + +func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) { + scheduler.DefaultScheduler.Start() + if !scheduler.DefaultScheduler.IsRunning() { + t.Fatal("Policy scheduler is not started") + } + + if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil { + t.Fatal(err.Error()) + } + if len(notificationWatcher.handlers) != 1 { + t.Fatal("Handler is not registered") + } + //Wating for everything is ready. + <-time.After(1 * time.Second) + + utcTime := time.Now().UTC().Unix() + notification := ScanPolicyNotification{"daily", utcTime + 3600} + for i := 1; i <= 10; i++ { + notification.DailyTime += (int64)(i) + if err := Publish("testing_topic", notification); err != nil { + t.Fatalf("index=%d, error=%s", i, err.Error()) + } + } + + //Wating for everything is ready. + <-time.After(2 * time.Second) + if err := UnSubscribe("testing_topic", ""); err != nil { + t.Fatal(err.Error()) + } + + if len(notificationWatcher.handlers) != 0 { + t.Fatal("Handler is not unregistered") + } + + scheduler.DefaultScheduler.Stop() + //Wating for everything is ready. + <-time.After(1 * time.Second) + if scheduler.DefaultScheduler.IsRunning() { + t.Fatal("Policy scheduler is not stopped") + } + } diff --git a/src/common/notifier/scan_policy_notitification_handler.go b/src/common/notifier/scan_policy_notitification_handler.go index 56c1bf885..518b89127 100644 --- a/src/common/notifier/scan_policy_notitification_handler.go +++ b/src/common/notifier/scan_policy_notitification_handler.go @@ -4,6 +4,7 @@ import ( "errors" "reflect" + "fmt" "time" "github.com/vmware/harbor/src/common/scheduler" @@ -15,6 +16,9 @@ const ( //PolicyTypeDaily specify the policy type is "daily" PolicyTypeDaily = "daily" + //PolicyTypeNone specify the policy type is "none" + PolicyTypeNone = "none" + alternatePolicy = "Alternate Policy" ) @@ -53,20 +57,61 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy) if notification.Type == PolicyTypeDaily { if !hasScheduled { - schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ - Duration: 24 * time.Hour, - OffsetTime: notification.DailyTime, - }) - attachTask := task.NewScanAllTask() - schedulePolicy.AttachTasks(attachTask) - - return scheduler.DefaultScheduler.Schedule(schedulePolicy) + //Schedule a new policy. + return schedulePolicy(notification) } - } else { + + //To check and compare if the related parameter is changed. + if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil { + if reflect.TypeOf(pl).Kind() == reflect.Ptr && + reflect.TypeOf(pl).String() == "*policy.AlternatePolicy" { + spl := pl.(*policy.AlternatePolicy) + plConfig := spl.GetConfig() + if plConfig != nil { + if plConfig.OffsetTime != notification.DailyTime { + //Parameter changed. + //Unschedule policy. + if err := scheduler.DefaultScheduler.UnSchedule(alternatePolicy); err != nil { + return err + } + + //Waiting for async action is done! + <-time.After(500 * time.Millisecond) + + //Reschedule policy. + return schedulePolicy(notification) + } + + //No change, do nothing. + return nil + } + + return errors.New("*policy.AlternatePolicy should not have nil configuration") + } + + return fmt.Errorf("Invalid policy type: %s", reflect.TypeOf(pl).String()) + } + + return errors.New("Inconsistent policy scheduling status") + } else if notification.Type == PolicyTypeNone { if hasScheduled { return scheduler.DefaultScheduler.UnSchedule(alternatePolicy) } + } else { + return fmt.Errorf("Notification type %s is not supported", notification.Type) } return nil } + +//Schedule policy. +func schedulePolicy(notification ScanPolicyNotification) error { + schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ + Duration: 24 * time.Hour, + OffsetTime: notification.DailyTime, + }) + attachTask := task.NewScanAllTask() + schedulePolicy.AttachTasks(attachTask) + + return scheduler.DefaultScheduler.Schedule(schedulePolicy) +} diff --git a/src/common/notifier/scan_policy_notitification_handler_test.go b/src/common/notifier/scan_policy_notitification_handler_test.go index 46efffbe5..d1e45f6ae 100644 --- a/src/common/notifier/scan_policy_notitification_handler_test.go +++ b/src/common/notifier/scan_policy_notitification_handler_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/vmware/harbor/src/common/scheduler" + "github.com/vmware/harbor/src/common/scheduler/policy" ) var testingScheduler = scheduler.DefaultScheduler @@ -33,11 +34,35 @@ func TestScanPolicyNotificationHandler(t *testing.T) { t.Fatal("Handler does not work") } - notification2 := ScanPolicyNotification{"none", 0} + //Policy parameter changed. + notification2 := ScanPolicyNotification{"daily", utcTime + 7200} if err := handler.Handle(notification2); err != nil { t.Fatal(err) } + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if !testingScheduler.HasScheduled("Alternate Policy") { + t.Fatal("Handler does not work [2]") + } + pl := testingScheduler.GetPolicy("Alternate Policy") + if pl == nil { + t.Fail() + } + spl := pl.(*policy.AlternatePolicy) + cfg := spl.GetConfig() + if cfg == nil { + t.Fail() + } + if cfg.OffsetTime != utcTime+7200 { + t.Fatal("Policy is not updated") + } + + notification3 := ScanPolicyNotification{"none", 0} + if err := handler.Handle(notification3); err != nil { + t.Fatal(err) + } + //Waiting for everything is ready. <-time.After(1 * time.Second) if testingScheduler.HasScheduled("Alternate Policy") { diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go index 38a20e92f..5a2235958 100644 --- a/src/common/scheduler/policy/alternate_policy.go +++ b/src/common/scheduler/policy/alternate_policy.go @@ -44,9 +44,10 @@ type AlternatePolicy struct { //NewAlternatePolicy is constructor of creating AlternatePolicy. func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy { return &AlternatePolicy{ - tasks: []task.Task{}, - config: config, - isEnabled: false, + tasks: []task.Task{}, + config: config, + isEnabled: false, + terminator: make(chan bool), } } @@ -108,7 +109,6 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { } alp.done = make(chan bool) - alp.terminator = make(chan bool) alp.evaluation = make(chan bool) go func() { diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go index 3073f6343..7bb97b2b2 100644 --- a/src/common/scheduler/scheduler.go +++ b/src/common/scheduler/scheduler.go @@ -133,25 +133,30 @@ func (sch *Scheduler) Start() { //Exit return case p := <-sch.scheduleQueue: - //Schedule the policy. - watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue) + if !sch.policies.Exists(p.Name()) { + //Schedule the policy. + watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue) - //Keep the policy for future use after it's successfully scheduled. - sch.policies.Put(p.Name(), watcher) + //Keep the policy for future use after it's successfully scheduled. + sch.policies.Put(p.Name(), watcher) - //Enable it. - watcher.Start() + //Enable it. + watcher.Start() - sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} + //Update stats and log info. + log.Infof("Policy %s is scheduled", p.Name()) + sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} + } case name := <-sch.unscheduleQueue: //Find the watcher. watcher := sch.policies.Remove(name) if watcher != nil && watcher.IsRunning() { watcher.Stop() + + //Update stats and log info. + log.Infof("Policy %s is unscheduled", name) + sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} } - - sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} - case stat := <-sch.statChan: { switch stat.Type { @@ -263,3 +268,12 @@ func (sch *Scheduler) IsRunning() bool { func (sch *Scheduler) HasScheduled(policyName string) bool { return sch.policies.Exists(policyName) } + +//GetPolicy is used to get related policy reference by its name. +func (sch *Scheduler) GetPolicy(policyName string) policy.Policy { + if sch.policies.Exists(policyName) { + return sch.policies.Get(policyName).p + } + + return nil +} diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go index f038e1802..27f3b1b21 100644 --- a/src/common/scheduler/scheduler_test.go +++ b/src/common/scheduler/scheduler_test.go @@ -106,6 +106,9 @@ func TestScheduler(t *testing.T) { if DefaultScheduler.stats.PolicyCount != 1 { t.Fatal("Policy stats do not match") } + if DefaultScheduler.GetPolicy(fp.Name()) == nil { + t.Fatal("Failed to get poicy by name") + } time.Sleep(2 * time.Second) if fk.number == 100 {