mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-23 16:11:24 +01:00
Enhance scanning policy notify handler and add more testing cases
This commit is contained in:
parent
db58ca673d
commit
201095a259
@ -183,7 +183,6 @@ func (nw *NotificationWatcher) Notify(notification Notification) error {
|
||||
//Trigger handlers
|
||||
for _, h := range handlers {
|
||||
var handlerChan chan bool
|
||||
|
||||
if h.IsStateful() {
|
||||
t := reflect.TypeOf(h).String()
|
||||
handlerChan = nw.handlerChannels[t].channel
|
||||
|
@ -2,11 +2,14 @@ package notifier
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/common/scheduler"
|
||||
)
|
||||
|
||||
var statefulData int
|
||||
var statefulData int32
|
||||
|
||||
type fakeStatefulHandler struct {
|
||||
number int
|
||||
@ -21,7 +24,7 @@ func (fsh *fakeStatefulHandler) Handle(v interface{}) error {
|
||||
if v != nil && reflect.TypeOf(v).Kind() == reflect.Int {
|
||||
increment = v.(int)
|
||||
}
|
||||
statefulData += increment
|
||||
atomic.AddInt32(&statefulData, (int32)(increment))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -126,11 +129,12 @@ func TestPublish(t *testing.T) {
|
||||
//Waiting for async is done
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
if statefulData != 150 {
|
||||
t.Fatalf("Expect execution result %d, but got %d", 150, statefulData)
|
||||
finalData := atomic.LoadInt32(&statefulData)
|
||||
if finalData != 150 {
|
||||
t.Fatalf("Expect execution result %d, but got %d", 150, finalData)
|
||||
}
|
||||
|
||||
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
|
||||
err = UnSubscribe("topic1", "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -139,4 +143,82 @@ func TestPublish(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//Clear stateful data.
|
||||
atomic.StoreInt32(&statefulData, 0)
|
||||
}
|
||||
|
||||
func TestConcurrentPublish(t *testing.T) {
|
||||
err := Subscribe("topic1", &fakeStatefulHandler{0})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(notificationWatcher.handlers) != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
//Publish in a short interval.
|
||||
for i := 0; i < 10; i++ {
|
||||
Publish("topic1", 100)
|
||||
}
|
||||
|
||||
//Waiting for async is done
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
finalData := atomic.LoadInt32(&statefulData)
|
||||
if finalData != 1000 {
|
||||
t.Fatalf("Expect execution result %d, but got %d", 1000, finalData)
|
||||
}
|
||||
|
||||
err = UnSubscribe("topic1", "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//Clear stateful data.
|
||||
atomic.StoreInt32(&statefulData, 0)
|
||||
}
|
||||
|
||||
func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) {
|
||||
scheduler.DefaultScheduler.Start()
|
||||
if !scheduler.DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Policy scheduler is not started")
|
||||
}
|
||||
|
||||
if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if len(notificationWatcher.handlers) != 1 {
|
||||
t.Fatal("Handler is not registered")
|
||||
}
|
||||
//Wating for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
utcTime := time.Now().UTC().Unix()
|
||||
notification := ScanPolicyNotification{"daily", utcTime + 3600}
|
||||
for i := 1; i <= 10; i++ {
|
||||
notification.DailyTime += (int64)(i)
|
||||
if err := Publish("testing_topic", notification); err != nil {
|
||||
t.Fatalf("index=%d, error=%s", i, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
//Wating for everything is ready.
|
||||
<-time.After(2 * time.Second)
|
||||
if err := UnSubscribe("testing_topic", ""); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if len(notificationWatcher.handlers) != 0 {
|
||||
t.Fatal("Handler is not unregistered")
|
||||
}
|
||||
|
||||
scheduler.DefaultScheduler.Stop()
|
||||
//Wating for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
if scheduler.DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Policy scheduler is not stopped")
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/common/scheduler"
|
||||
@ -15,6 +16,9 @@ const (
|
||||
//PolicyTypeDaily specify the policy type is "daily"
|
||||
PolicyTypeDaily = "daily"
|
||||
|
||||
//PolicyTypeNone specify the policy type is "none"
|
||||
PolicyTypeNone = "none"
|
||||
|
||||
alternatePolicy = "Alternate Policy"
|
||||
)
|
||||
|
||||
@ -53,20 +57,61 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
|
||||
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
|
||||
if notification.Type == PolicyTypeDaily {
|
||||
if !hasScheduled {
|
||||
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
|
||||
Duration: 24 * time.Hour,
|
||||
OffsetTime: notification.DailyTime,
|
||||
})
|
||||
attachTask := task.NewScanAllTask()
|
||||
schedulePolicy.AttachTasks(attachTask)
|
||||
|
||||
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
|
||||
//Schedule a new policy.
|
||||
return schedulePolicy(notification)
|
||||
}
|
||||
} else {
|
||||
|
||||
//To check and compare if the related parameter is changed.
|
||||
if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil {
|
||||
if reflect.TypeOf(pl).Kind() == reflect.Ptr &&
|
||||
reflect.TypeOf(pl).String() == "*policy.AlternatePolicy" {
|
||||
spl := pl.(*policy.AlternatePolicy)
|
||||
plConfig := spl.GetConfig()
|
||||
if plConfig != nil {
|
||||
if plConfig.OffsetTime != notification.DailyTime {
|
||||
//Parameter changed.
|
||||
//Unschedule policy.
|
||||
if err := scheduler.DefaultScheduler.UnSchedule(alternatePolicy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//Waiting for async action is done!
|
||||
<-time.After(500 * time.Millisecond)
|
||||
|
||||
//Reschedule policy.
|
||||
return schedulePolicy(notification)
|
||||
}
|
||||
|
||||
//No change, do nothing.
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.New("*policy.AlternatePolicy should not have nil configuration")
|
||||
}
|
||||
|
||||
return fmt.Errorf("Invalid policy type: %s", reflect.TypeOf(pl).String())
|
||||
}
|
||||
|
||||
return errors.New("Inconsistent policy scheduling status")
|
||||
} else if notification.Type == PolicyTypeNone {
|
||||
if hasScheduled {
|
||||
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Notification type %s is not supported", notification.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//Schedule policy.
|
||||
func schedulePolicy(notification ScanPolicyNotification) error {
|
||||
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
|
||||
Duration: 24 * time.Hour,
|
||||
OffsetTime: notification.DailyTime,
|
||||
})
|
||||
attachTask := task.NewScanAllTask()
|
||||
schedulePolicy.AttachTasks(attachTask)
|
||||
|
||||
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/common/scheduler"
|
||||
"github.com/vmware/harbor/src/common/scheduler/policy"
|
||||
)
|
||||
|
||||
var testingScheduler = scheduler.DefaultScheduler
|
||||
@ -33,11 +34,35 @@ func TestScanPolicyNotificationHandler(t *testing.T) {
|
||||
t.Fatal("Handler does not work")
|
||||
}
|
||||
|
||||
notification2 := ScanPolicyNotification{"none", 0}
|
||||
//Policy parameter changed.
|
||||
notification2 := ScanPolicyNotification{"daily", utcTime + 7200}
|
||||
if err := handler.Handle(notification2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//Waiting for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
if !testingScheduler.HasScheduled("Alternate Policy") {
|
||||
t.Fatal("Handler does not work [2]")
|
||||
}
|
||||
pl := testingScheduler.GetPolicy("Alternate Policy")
|
||||
if pl == nil {
|
||||
t.Fail()
|
||||
}
|
||||
spl := pl.(*policy.AlternatePolicy)
|
||||
cfg := spl.GetConfig()
|
||||
if cfg == nil {
|
||||
t.Fail()
|
||||
}
|
||||
if cfg.OffsetTime != utcTime+7200 {
|
||||
t.Fatal("Policy is not updated")
|
||||
}
|
||||
|
||||
notification3 := ScanPolicyNotification{"none", 0}
|
||||
if err := handler.Handle(notification3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//Waiting for everything is ready.
|
||||
<-time.After(1 * time.Second)
|
||||
if testingScheduler.HasScheduled("Alternate Policy") {
|
||||
|
@ -44,9 +44,10 @@ type AlternatePolicy struct {
|
||||
//NewAlternatePolicy is constructor of creating AlternatePolicy.
|
||||
func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
|
||||
return &AlternatePolicy{
|
||||
tasks: []task.Task{},
|
||||
config: config,
|
||||
isEnabled: false,
|
||||
tasks: []task.Task{},
|
||||
config: config,
|
||||
isEnabled: false,
|
||||
terminator: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,7 +109,6 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
|
||||
}
|
||||
|
||||
alp.done = make(chan bool)
|
||||
alp.terminator = make(chan bool)
|
||||
alp.evaluation = make(chan bool)
|
||||
|
||||
go func() {
|
||||
|
@ -133,25 +133,30 @@ func (sch *Scheduler) Start() {
|
||||
//Exit
|
||||
return
|
||||
case p := <-sch.scheduleQueue:
|
||||
//Schedule the policy.
|
||||
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
|
||||
if !sch.policies.Exists(p.Name()) {
|
||||
//Schedule the policy.
|
||||
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
|
||||
|
||||
//Keep the policy for future use after it's successfully scheduled.
|
||||
sch.policies.Put(p.Name(), watcher)
|
||||
//Keep the policy for future use after it's successfully scheduled.
|
||||
sch.policies.Put(p.Name(), watcher)
|
||||
|
||||
//Enable it.
|
||||
watcher.Start()
|
||||
//Enable it.
|
||||
watcher.Start()
|
||||
|
||||
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
|
||||
//Update stats and log info.
|
||||
log.Infof("Policy %s is scheduled", p.Name())
|
||||
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
|
||||
}
|
||||
case name := <-sch.unscheduleQueue:
|
||||
//Find the watcher.
|
||||
watcher := sch.policies.Remove(name)
|
||||
if watcher != nil && watcher.IsRunning() {
|
||||
watcher.Stop()
|
||||
|
||||
//Update stats and log info.
|
||||
log.Infof("Policy %s is unscheduled", name)
|
||||
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
|
||||
}
|
||||
|
||||
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
|
||||
|
||||
case stat := <-sch.statChan:
|
||||
{
|
||||
switch stat.Type {
|
||||
@ -263,3 +268,12 @@ func (sch *Scheduler) IsRunning() bool {
|
||||
func (sch *Scheduler) HasScheduled(policyName string) bool {
|
||||
return sch.policies.Exists(policyName)
|
||||
}
|
||||
|
||||
//GetPolicy is used to get related policy reference by its name.
|
||||
func (sch *Scheduler) GetPolicy(policyName string) policy.Policy {
|
||||
if sch.policies.Exists(policyName) {
|
||||
return sch.policies.Get(policyName).p
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -106,6 +106,9 @@ func TestScheduler(t *testing.T) {
|
||||
if DefaultScheduler.stats.PolicyCount != 1 {
|
||||
t.Fatal("Policy stats do not match")
|
||||
}
|
||||
if DefaultScheduler.GetPolicy(fp.Name()) == nil {
|
||||
t.Fatal("Failed to get poicy by name")
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
if fk.number == 100 {
|
||||
|
Loading…
Reference in New Issue
Block a user