Enable policy scheduler in ui main

This commit is contained in:
Steven Zou 2017-07-07 00:38:38 +08:00
parent 82e17fade1
commit 44be165edf
13 changed files with 617 additions and 23 deletions

View File

@ -0,0 +1,15 @@
package notifier
//NotificationHandler defines what operations a notification handler
//should have.
type NotificationHandler interface {
//Handle the event when it coming.
//value might be optional, it depends on usages.
Handle(value interface{}) error
//IsStateful returns whether the handler is stateful or not.
//If handler is stateful, it will not be triggerred in parallel.
//Otherwise, the handler will be triggered concurrently if more
//than one same handler are matched the topics.
IsStateful() bool
}

View File

@ -0,0 +1,228 @@
package notifier
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"github.com/vmware/harbor/src/common/utils/log"
)
//HandlerIndexer is setup the relationship between the handler type and
//instance.
type HandlerIndexer map[string]NotificationHandler
//Notification wraps the topic and related data value if existing.
type Notification struct {
//Topic of notification
//Required
Topic string
//Value of notification.
//Optional
Value interface{}
}
//HandlerChannel provides not only the chan itself but also the count of
//handlers related with this chan.
type HandlerChannel struct {
//To indicate how many handler instances bound with this chan.
boundCount uint32
//The chan for controling concurrent executions.
channel chan bool
}
//NotificationWatcher is defined to accept the events published
//by the sender and match it with pre-registered notification handler
//and then trigger the execution of the found handler.
type NotificationWatcher struct {
//For handle concurrent scenario.
*sync.RWMutex
//To keep the registered handlers in memory.
//Each topic can register multiple handlers.
//Each handler can bind to multiple topics.
handlers map[string]HandlerIndexer
//Keep the channels which are used to control the concurrent executions
//of multiple stateful handlers with same type.
handlerChannels map[string]*HandlerChannel
}
//notificationWatcher is a default notification watcher in package level.
var notificationWatcher = NewNotificationWatcher()
//NewNotificationWatcher is constructor of NotificationWatcher.
func NewNotificationWatcher() *NotificationWatcher {
return &NotificationWatcher{
new(sync.RWMutex),
make(map[string]HandlerIndexer),
make(map[string]*HandlerChannel),
}
}
//Handle the related topic with the specified handler.
func (nw *NotificationWatcher) Handle(topic string, handler NotificationHandler) error {
if strings.TrimSpace(topic) == "" {
return errors.New("Empty topic is not supported")
}
if handler == nil {
return errors.New("Nil handler can not be registered")
}
defer nw.Unlock()
nw.Lock()
t := reflect.TypeOf(handler).String()
if indexer, ok := nw.handlers[topic]; ok {
if _, existing := indexer[t]; existing {
return fmt.Errorf("Topic %s has already register the handler with type %s", topic, t)
}
indexer[t] = handler
} else {
newIndexer := make(HandlerIndexer)
newIndexer[t] = handler
nw.handlers[topic] = newIndexer
}
if handler.IsStateful() {
//First time
if handlerChan, ok := nw.handlerChannels[t]; !ok {
nw.handlerChannels[t] = &HandlerChannel{1, make(chan bool, 1)}
} else {
//Already have chan, just increase count
handlerChan.boundCount++
}
}
return nil
}
//UnHandle is to revoke the registered handler with the specified topic.
//'handler' is optional, the type name of the handler. If it's empty value,
//then revoke the whole topic, otherwise only revoke the specified handler.
func (nw *NotificationWatcher) UnHandle(topic string, handler string) error {
if strings.TrimSpace(topic) == "" {
return errors.New("Empty topic is not supported")
}
defer nw.Unlock()
nw.Lock()
var revokeHandler = func(indexer HandlerIndexer, handlerType string) bool {
//Find the specified one
if hd, existing := indexer[handlerType]; existing {
delete(indexer, handlerType)
if len(indexer) == 0 {
//No handler existing, then remove topic
delete(nw.handlers, topic)
}
//Update channel counter or remove channel
if hd.IsStateful() {
if theChan, yes := nw.handlerChannels[handlerType]; yes {
theChan.boundCount--
if theChan.boundCount == 0 {
//Empty, then remove the channel
delete(nw.handlerChannels, handlerType)
}
}
}
return true
}
return false
}
if indexer, ok := nw.handlers[topic]; ok {
if strings.TrimSpace(handler) == "" {
for t := range indexer {
revokeHandler(indexer, t)
}
return nil
}
//Revoke the specified handler.
if revokeHandler(indexer, handler) {
return nil
}
}
return fmt.Errorf("Failed to revoke handler %s with topic %s", handler, topic)
}
//Notify that notification is coming.
func (nw *NotificationWatcher) Notify(notification Notification) error {
if strings.TrimSpace(notification.Topic) == "" {
return errors.New("Empty topic can not be notified")
}
nw.RLock()
defer nw.RUnlock()
var (
indexer HandlerIndexer
ok bool
handlers = []NotificationHandler{}
)
if indexer, ok = nw.handlers[notification.Topic]; !ok {
return fmt.Errorf("No handlers registered for handling topic %s", notification.Topic)
}
for _, h := range indexer {
handlers = append(handlers, h)
}
//Trigger handlers
for _, h := range handlers {
var handlerChan chan bool
if h.IsStateful() {
t := reflect.TypeOf(h).String()
handlerChan = nw.handlerChannels[t].channel
}
go func(hd NotificationHandler, ch chan bool) {
if hd.IsStateful() && ch != nil {
ch <- true
}
go func() {
defer func() {
if hd.IsStateful() && ch != nil {
<-ch
}
}()
if err := hd.Handle(notification.Value); err != nil {
//Currently, we just log the error
log.Errorf("Error occurred when triggerring handler %s of topic %s: %s\n", reflect.TypeOf(hd).String(), notification.Topic, err.Error())
}
}()
}(h, handlerChan)
}
return nil
}
//Subscribe is a wrapper utility method for NotificationWatcher.handle()
func Subscribe(topic string, handler NotificationHandler) error {
return notificationWatcher.Handle(topic, handler)
}
//UnSubscribe is a wrapper utility method for NotificationWatcher.UnHandle()
func UnSubscribe(topic string, handler string) error {
return notificationWatcher.UnHandle(topic, handler)
}
//Publish is a wrapper utility method for NotificationWatcher.notify()
func Publish(topic string, value interface{}) error {
return notificationWatcher.Notify(Notification{
Topic: topic,
Value: value,
})
}

View File

@ -0,0 +1,142 @@
package notifier
import (
"reflect"
"testing"
"time"
)
var statefulData int
type fakeStatefulHandler struct {
number int
}
func (fsh *fakeStatefulHandler) IsStateful() bool {
return true
}
func (fsh *fakeStatefulHandler) Handle(v interface{}) error {
increment := 0
if v != nil && reflect.TypeOf(v).Kind() == reflect.Int {
increment = v.(int)
}
statefulData += increment
return nil
}
type fakeStatelessHandler struct{}
func (fsh *fakeStatelessHandler) IsStateful() bool {
return false
}
func (fsh *fakeStatelessHandler) Handle(v interface{}) error {
return nil
}
func TestSubscribeAndUnSubscribe(t *testing.T) {
err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil {
t.Fatal(err)
}
err = Subscribe("topic1", &fakeStatelessHandler{})
if err != nil {
t.Fatal(err)
}
err = Subscribe("topic2", &fakeStatefulHandler{0})
if err != nil {
t.Fatal(err)
}
err = Subscribe("topic2", &fakeStatelessHandler{})
if err != nil {
t.Fatal(err)
}
if len(notificationWatcher.handlers) != 2 {
t.Fail()
}
if indexer, ok := notificationWatcher.handlers["topic1"]; !ok {
t.Fail()
} else {
if len(indexer) != 2 {
t.Fail()
}
}
if len(notificationWatcher.handlerChannels) != 1 {
t.Fail()
}
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
if err != nil {
t.Fatal(err)
}
err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler")
if err != nil {
t.Fatal(err)
}
if len(notificationWatcher.handlerChannels) != 0 {
t.Fail()
}
err = UnSubscribe("topic1", "")
if err != nil {
t.Fatal(err)
}
if len(notificationWatcher.handlers) != 1 {
t.Fail()
}
err = UnSubscribe("topic2", "")
if err != nil {
t.Fatal(err)
}
if len(notificationWatcher.handlers) != 0 {
t.Fail()
}
}
func TestPublish(t *testing.T) {
err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil {
t.Fatal(err)
}
err = Subscribe("topic2", &fakeStatefulHandler{0})
if err != nil {
t.Fatal(err)
}
if len(notificationWatcher.handlers) != 2 {
t.Fail()
}
Publish("topic1", 100)
Publish("topic2", 50)
//Waiting for async is done
<-time.After(1 * time.Second)
if statefulData != 150 {
t.Fatalf("Expect execution result %d, but got %d", 150, statefulData)
}
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
if err != nil {
t.Fatal(err)
}
err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler")
if err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,72 @@
package notifier
import (
"errors"
"reflect"
"time"
"github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task"
)
const (
//PolicyTypeDaily specify the policy type is "daily"
PolicyTypeDaily = "daily"
alternatePolicy = "Alternate Policy"
)
//ScanPolicyNotification is defined for pass the policy change data.
type ScanPolicyNotification struct {
//Type is used to keep the scan policy type: "none","daily" and "refresh".
Type string
//DailyTime is used when the type is 'daily', the offset with UTC time 00:00.
DailyTime int64
}
//ScanPolicyNotificationHandler is defined to handle the changes of scanning
//policy.
type ScanPolicyNotificationHandler struct{}
//IsStateful to indicate this handler is stateful.
func (s *ScanPolicyNotificationHandler) IsStateful() bool {
//Policy change should be done one by one.
return true
}
//Handle the policy change notification.
func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
if value == nil {
return errors.New("ScanPolicyNotificationHandler can not handle nil value")
}
if reflect.TypeOf(value).Kind() != reflect.Struct ||
reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" {
return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type")
}
notification := value.(ScanPolicyNotification)
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
if notification.Type == PolicyTypeDaily {
if !hasScheduled {
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
Duration: 24 * time.Hour,
OffsetTime: notification.DailyTime,
})
attachTask := task.NewScanAllTask()
schedulePolicy.AttachTasks(attachTask)
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
}
} else {
if hasScheduled {
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
}
}
return nil
}

View File

@ -0,0 +1,54 @@
package notifier
import (
"testing"
"time"
"github.com/vmware/harbor/src/common/scheduler"
)
var testingScheduler = scheduler.DefaultScheduler
func TestScanPolicyNotificationHandler(t *testing.T) {
//Scheduler should be running.
testingScheduler.Start()
if !testingScheduler.IsRunning() {
t.Fatal("scheduler should be running")
}
handler := &ScanPolicyNotificationHandler{}
if !handler.IsStateful() {
t.Fail()
}
utcTime := time.Now().UTC().Unix()
notification := ScanPolicyNotification{"daily", utcTime + 3600}
if err := handler.Handle(notification); err != nil {
t.Fatal(err)
}
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if !testingScheduler.HasScheduled("Alternate Policy") {
t.Fatal("Handler does not work")
}
notification2 := ScanPolicyNotification{"none", 0}
if err := handler.Handle(notification2); err != nil {
t.Fatal(err)
}
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if testingScheduler.HasScheduled("Alternate Policy") {
t.Fail()
}
//Clear
testingScheduler.Stop()
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if testingScheduler.IsRunning() {
t.Fatal("scheduler should be stopped")
}
}

View File

@ -0,0 +1,11 @@
package notifier
import (
"github.com/vmware/harbor/src/common"
)
//Define global topic names
const (
//ScanAllPolicyTopic is for notifying the change of scanning all policy.
ScanAllPolicyTopic = common.ScanAllPolicy
)

View File

@ -79,7 +79,7 @@ func (alp *AlternatePolicy) Tasks() []task.Task {
}
//Done is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Done() chan bool {
func (alp *AlternatePolicy) Done() <-chan bool {
return alp.done
}
@ -96,8 +96,6 @@ func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
//Disable is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Disable() error {
alp.isEnabled = false
//Stop the ticker
if alp.ticker != nil {
alp.ticker.Stop()
@ -111,7 +109,7 @@ func (alp *AlternatePolicy) Disable() error {
}
//Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
func (alp *AlternatePolicy) Evaluate() <-chan EvaluationResult {
//Keep idempotent
if alp.isEnabled && alp.evaluation != nil {
return alp.evaluation
@ -122,6 +120,9 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
alp.evaluation = make(chan EvaluationResult)
go func() {
defer func() {
alp.isEnabled = false
}()
timeNow := time.Now().UTC()
timeSeconds := timeNow.Unix()
@ -130,13 +131,16 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
if alp.config.EndTimestamp > 0 && timeSeconds >= alp.config.EndTimestamp {
//Invalid configuration, exit.
alp.done <- true
alp.isEnabled = false
return
}
if alp.config.StartTimestamp > 0 && timeSeconds < alp.config.StartTimestamp {
//Let's hold on for a while.
forWhile := alp.config.StartTimestamp - timeSeconds
time.Sleep(time.Duration(forWhile) * time.Second)
select {
case <-time.After(time.Duration(forWhile) * time.Second):
case <-alp.terminator:
return
}
}
//Reach the execution time point?
@ -147,7 +151,11 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
}
if diff > 0 {
//Wait for a while.
time.Sleep(time.Duration(diff) * time.Second)
select {
case <-time.After(time.Duration(diff) * time.Second):
case <-alp.terminator:
return
}
}
//Trigger the first tick.
@ -163,7 +171,6 @@ func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
if alp.config.EndTimestamp > 0 && time >= alp.config.EndTimestamp {
//Ploicy is done.
alp.done <- true
alp.isEnabled = false
if alp.ticker != nil {
alp.ticker.Stop()
}

View File

@ -25,13 +25,13 @@ type Policy interface {
//Done will setup a channel for other components to check whether or not
//the policy is completed. Possibly designed for the none loop policy.
Done() chan bool
Done() <-chan bool
//Evaluate the policy based on its definition and return the result via
//result channel. Policy is enabled after it is evaluated.
//Make sure Evaluate is idempotent, that means one policy can be only enabled
//only once even if Evaluate is called more than one times.
Evaluate() chan EvaluationResult
Evaluate() <-chan EvaluationResult
//Disable the enabled policy and release all the allocated resources.
//Disable should also send signal to the terminated channel which returned by Done.

View File

@ -93,7 +93,7 @@ func NewScheduler(config *Configuration) *Scheduler {
sq := make(chan policy.Policy, qSize)
usq := make(chan string, qSize)
stChan := make(chan *StatItem, 4)
tc := make(chan bool, 2)
tc := make(chan bool, 1)
store := NewConcurrentStore()
return &Scheduler{
@ -129,6 +129,9 @@ func (sch *Scheduler) Start() {
}()
for {
select {
case <-sch.terminateChan:
//Exit
return
case p := <-sch.scheduleQueue:
//Schedule the policy.
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
@ -148,9 +151,6 @@ func (sch *Scheduler) Start() {
}
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
case <-sch.terminateChan:
//Exit
return
case stat := <-sch.statChan:
{
@ -258,3 +258,8 @@ func (sch *Scheduler) UnSchedule(policyName string) error {
func (sch *Scheduler) IsRunning() bool {
return sch.isRunning
}
//HasScheduled is to check whether the given policy has been scheduled or not.
func (sch *Scheduler) HasScheduled(policyName string) bool {
return sch.policies.Exists(policyName)
}

View File

@ -29,11 +29,11 @@ func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error {
return nil
}
func (fp *fakePolicy) Done() chan bool {
func (fp *fakePolicy) Done() <-chan bool {
return fp.done
}
func (fp *fakePolicy) Evaluate() chan policy.EvaluationResult {
func (fp *fakePolicy) Evaluate() <-chan policy.EvaluationResult {
fp.evaluation = make(chan policy.EvaluationResult, 2)
fp.done = make(chan bool)
fp.terminate = make(chan bool)
@ -136,7 +136,7 @@ func TestScheduler(t *testing.T) {
}
DefaultScheduler.Stop()
if DefaultScheduler.policies.Size() != 0 {
if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is not cleared after stopping")
}
}

View File

@ -54,6 +54,10 @@ func (wc *Watcher) Start() {
}
}()
defer func() {
wc.isRunning = false
}()
evalChan := pl.Evaluate()
done := pl.Done()
@ -94,8 +98,6 @@ func (wc *Watcher) Start() {
case <-done:
{
//Policy is automatically completed.
wc.isRunning = false
//Report policy change stats.
if wc.doneChan != nil {
wc.doneChan <- wc.p.Name()
@ -125,8 +127,6 @@ func (wc *Watcher) Stop() {
}
//Stop watcher.
wc.cmdChan <- true
wc.isRunning = false
}
//IsRunning to indicate if the watcher is still running.

View File

@ -20,13 +20,17 @@ import (
"fmt"
"net/http"
"os"
"reflect"
"strings"
"errors"
"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/adminserver/client/auth"
"github.com/vmware/harbor/src/common"
comcfg "github.com/vmware/harbor/src/common/config"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/secret"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/ui/projectmanager"
@ -143,9 +147,17 @@ func Reset() error {
return mg.Reset()
}
// Upload uploads all system configutations to admin server
// Upload uploads all system configurations to admin server
func Upload(cfg map[string]interface{}) error {
return mg.Upload(cfg)
err := mg.Upload(cfg)
if err == nil {
//Watch configuration changes after updating.
if er := watchConfigChanges(cfg); er != nil {
log.Errorf("Error occurred when watching configuration changes: %s\n", er.Error())
}
}
return err
}
// GetSystemCfg returns the system configurations
@ -400,3 +412,31 @@ func ScanAllPolicy() models.ScanAllPolicy {
func WithAdmiral() bool {
return len(AdmiralEndpoint()) > 0
}
func watchConfigChanges(cfg map[string]interface{}) error {
if cfg == nil {
return errors.New("Empty configurations")
}
//Currently only watch the scan all policy change.
if v, ok := cfg[notifier.ScanAllPolicyTopic]; ok {
if reflect.TypeOf(v).Kind() == reflect.Struct &&
reflect.TypeOf(v).String() == "models.ScanAllPolicy" {
policyCfg := v.(models.ScanAllPolicy)
policyNotification := notifier.ScanPolicyNotification{
Type: policyCfg.Type,
DailyTime: 0,
}
if t, yes := policyCfg.Parm["daily_time"]; yes {
if reflect.TypeOf(t).Kind() == reflect.Int {
policyNotification.DailyTime = t.(int64)
}
}
return notifier.Publish(notifier.ScanAllPolicyTopic, policyNotification)
}
}
return nil
}

View File

@ -26,6 +26,8 @@ import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/ui/api"
_ "github.com/vmware/harbor/src/ui/auth/db"
_ "github.com/vmware/harbor/src/ui/auth/ldap"
@ -98,6 +100,24 @@ func main() {
log.Error(err)
}
//Enable the policy scheduler here.
scheduler.DefaultScheduler.Start()
//Subscribe the policy change topic.
notifier.Subscribe(notifier.ScanAllPolicyTopic, &notifier.ScanPolicyNotificationHandler{})
//Get policy configuration.
scanAllPolicy := config.ScanAllPolicy()
if scanAllPolicy.Type == notifier.PolicyTypeDaily {
dailyTime := 0
if t, ok := scanAllPolicy.Parm["daily_time"]; ok {
dailyTime = t
}
//Send notification to handle first policy change.
notifier.publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{scanAllPolicy.Type, dailyTime})
}
filter.Init()
beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter)