Merge pull request #2705 from steven-zou/master

Implement policy scheduler
This commit is contained in:
Steven Zou 2017-07-11 16:21:25 +08:00 committed by GitHub
commit 0ade32eef5
32 changed files with 1903 additions and 43 deletions

View File

@ -0,0 +1,15 @@
package notifier
//NotificationHandler defines what operations a notification handler
//should have.
type NotificationHandler interface {
//Handle the event when it coming.
//value might be optional, it depends on usages.
Handle(value interface{}) error
//IsStateful returns whether the handler is stateful or not.
//If handler is stateful, it will not be triggerred in parallel.
//Otherwise, the handler will be triggered concurrently if more
//than one same handler are matched the topics.
IsStateful() bool

View File

@ -0,0 +1,228 @@
package notifier
import (
//HandlerIndexer is setup the relationship between the handler type and
type HandlerIndexer map[string]NotificationHandler
//Notification wraps the topic and related data value if existing.
type Notification struct {
//Topic of notification
Topic string
//Value of notification.
Value interface{}
//HandlerChannel provides not only the chan itself but also the count of
//handlers related with this chan.
type HandlerChannel struct {
//To indicate how many handler instances bound with this chan.
boundCount uint32
//The chan for controling concurrent executions.
channel chan bool
//NotificationWatcher is defined to accept the events published
//by the sender and match it with pre-registered notification handler
//and then trigger the execution of the found handler.
type NotificationWatcher struct {
//For handle concurrent scenario.
//To keep the registered handlers in memory.
//Each topic can register multiple handlers.
//Each handler can bind to multiple topics.
handlers map[string]HandlerIndexer
//Keep the channels which are used to control the concurrent executions
//of multiple stateful handlers with same type.
handlerChannels map[string]*HandlerChannel
//notificationWatcher is a default notification watcher in package level.
var notificationWatcher = NewNotificationWatcher()
//NewNotificationWatcher is constructor of NotificationWatcher.
func NewNotificationWatcher() *NotificationWatcher {
return &NotificationWatcher{
//Handle the related topic with the specified handler.
func (nw *NotificationWatcher) Handle(topic string, handler NotificationHandler) error {
if strings.TrimSpace(topic) == "" {
return errors.New("Empty topic is not supported")
if handler == nil {
return errors.New("Nil handler can not be registered")
defer nw.Unlock()
t := reflect.TypeOf(handler).String()
if indexer, ok := nw.handlers[topic]; ok {
if _, existing := indexer[t]; existing {
return fmt.Errorf("Topic %s has already register the handler with type %s", topic, t)
indexer[t] = handler
} else {
newIndexer := make(HandlerIndexer)
newIndexer[t] = handler
nw.handlers[topic] = newIndexer
if handler.IsStateful() {
//First time
if handlerChan, ok := nw.handlerChannels[t]; !ok {
nw.handlerChannels[t] = &HandlerChannel{1, make(chan bool, 1)}
} else {
//Already have chan, just increase count
return nil
//UnHandle is to revoke the registered handler with the specified topic.
//'handler' is optional, the type name of the handler. If it's empty value,
//then revoke the whole topic, otherwise only revoke the specified handler.
func (nw *NotificationWatcher) UnHandle(topic string, handler string) error {
if strings.TrimSpace(topic) == "" {
return errors.New("Empty topic is not supported")
defer nw.Unlock()
var revokeHandler = func(indexer HandlerIndexer, handlerType string) bool {
//Find the specified one
if hd, existing := indexer[handlerType]; existing {
delete(indexer, handlerType)
if len(indexer) == 0 {
//No handler existing, then remove topic
delete(nw.handlers, topic)
//Update channel counter or remove channel
if hd.IsStateful() {
if theChan, yes := nw.handlerChannels[handlerType]; yes {
if theChan.boundCount == 0 {
//Empty, then remove the channel
delete(nw.handlerChannels, handlerType)
return true
return false
if indexer, ok := nw.handlers[topic]; ok {
if strings.TrimSpace(handler) == "" {
for t := range indexer {
revokeHandler(indexer, t)
return nil
//Revoke the specified handler.
if revokeHandler(indexer, handler) {
return nil
return fmt.Errorf("Failed to revoke handler %s with topic %s", handler, topic)
//Notify that notification is coming.
func (nw *NotificationWatcher) Notify(notification Notification) error {
if strings.TrimSpace(notification.Topic) == "" {
return errors.New("Empty topic can not be notified")
defer nw.RUnlock()
var (
indexer HandlerIndexer
ok bool
handlers = []NotificationHandler{}
if indexer, ok = nw.handlers[notification.Topic]; !ok {
return fmt.Errorf("No handlers registered for handling topic %s", notification.Topic)
for _, h := range indexer {
handlers = append(handlers, h)
//Trigger handlers
for _, h := range handlers {
var handlerChan chan bool
if h.IsStateful() {
t := reflect.TypeOf(h).String()
handlerChan = nw.handlerChannels[t].channel
go func(hd NotificationHandler, ch chan bool) {
if hd.IsStateful() && ch != nil {
ch <- true
go func() {
defer func() {
if hd.IsStateful() && ch != nil {
if err := hd.Handle(notification.Value); err != nil {
//Currently, we just log the error
log.Errorf("Error occurred when triggerring handler %s of topic %s: %s\n", reflect.TypeOf(hd).String(), notification.Topic, err.Error())
}(h, handlerChan)
return nil
//Subscribe is a wrapper utility method for NotificationWatcher.handle()
func Subscribe(topic string, handler NotificationHandler) error {
return notificationWatcher.Handle(topic, handler)
//UnSubscribe is a wrapper utility method for NotificationWatcher.UnHandle()
func UnSubscribe(topic string, handler string) error {
return notificationWatcher.UnHandle(topic, handler)
//Publish is a wrapper utility method for NotificationWatcher.notify()
func Publish(topic string, value interface{}) error {
return notificationWatcher.Notify(Notification{
Topic: topic,
Value: value,

View File

@ -0,0 +1,142 @@
package notifier
import (
var statefulData int
type fakeStatefulHandler struct {
number int
func (fsh *fakeStatefulHandler) IsStateful() bool {
return true
func (fsh *fakeStatefulHandler) Handle(v interface{}) error {
increment := 0
if v != nil && reflect.TypeOf(v).Kind() == reflect.Int {
increment = v.(int)
statefulData += increment
return nil
type fakeStatelessHandler struct{}
func (fsh *fakeStatelessHandler) IsStateful() bool {
return false
func (fsh *fakeStatelessHandler) Handle(v interface{}) error {
return nil
func TestSubscribeAndUnSubscribe(t *testing.T) {
err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil {
err = Subscribe("topic1", &fakeStatelessHandler{})
if err != nil {
err = Subscribe("topic2", &fakeStatefulHandler{0})
if err != nil {
err = Subscribe("topic2", &fakeStatelessHandler{})
if err != nil {
if len(notificationWatcher.handlers) != 2 {
if indexer, ok := notificationWatcher.handlers["topic1"]; !ok {
} else {
if len(indexer) != 2 {
if len(notificationWatcher.handlerChannels) != 1 {
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
if err != nil {
err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler")
if err != nil {
if len(notificationWatcher.handlerChannels) != 0 {
err = UnSubscribe("topic1", "")
if err != nil {
if len(notificationWatcher.handlers) != 1 {
err = UnSubscribe("topic2", "")
if err != nil {
if len(notificationWatcher.handlers) != 0 {
func TestPublish(t *testing.T) {
err := Subscribe("topic1", &fakeStatefulHandler{0})
if err != nil {
err = Subscribe("topic2", &fakeStatefulHandler{0})
if err != nil {
if len(notificationWatcher.handlers) != 2 {
Publish("topic1", 100)
Publish("topic2", 50)
//Waiting for async is done
<-time.After(1 * time.Second)
if statefulData != 150 {
t.Fatalf("Expect execution result %d, but got %d", 150, statefulData)
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
if err != nil {
err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler")
if err != nil {

View File

@ -0,0 +1,72 @@
package notifier
import (
const (
//PolicyTypeDaily specify the policy type is "daily"
PolicyTypeDaily = "daily"
alternatePolicy = "Alternate Policy"
//ScanPolicyNotification is defined for pass the policy change data.
type ScanPolicyNotification struct {
//Type is used to keep the scan policy type: "none","daily" and "refresh".
Type string
//DailyTime is used when the type is 'daily', the offset with UTC time 00:00.
DailyTime int64
//ScanPolicyNotificationHandler is defined to handle the changes of scanning
type ScanPolicyNotificationHandler struct{}
//IsStateful to indicate this handler is stateful.
func (s *ScanPolicyNotificationHandler) IsStateful() bool {
//Policy change should be done one by one.
return true
//Handle the policy change notification.
func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
if value == nil {
return errors.New("ScanPolicyNotificationHandler can not handle nil value")
if reflect.TypeOf(value).Kind() != reflect.Struct ||
reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" {
return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type")
notification := value.(ScanPolicyNotification)
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
if notification.Type == PolicyTypeDaily {
if !hasScheduled {
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
Duration: 24 * time.Hour,
OffsetTime: notification.DailyTime,
attachTask := task.NewScanAllTask()
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
} else {
if hasScheduled {
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
return nil

View File

@ -0,0 +1,54 @@
package notifier
import (
var testingScheduler = scheduler.DefaultScheduler
func TestScanPolicyNotificationHandler(t *testing.T) {
//Scheduler should be running.
if !testingScheduler.IsRunning() {
t.Fatal("scheduler should be running")
handler := &ScanPolicyNotificationHandler{}
if !handler.IsStateful() {
utcTime := time.Now().UTC().Unix()
notification := ScanPolicyNotification{"daily", utcTime + 3600}
if err := handler.Handle(notification); err != nil {
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if !testingScheduler.HasScheduled("Alternate Policy") {
t.Fatal("Handler does not work")
notification2 := ScanPolicyNotification{"none", 0}
if err := handler.Handle(notification2); err != nil {
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if testingScheduler.HasScheduled("Alternate Policy") {
//Waiting for everything is ready.
<-time.After(1 * time.Second)
if testingScheduler.IsRunning() {
t.Fatal("scheduler should be stopped")

View File

@ -0,0 +1,11 @@
package notifier
import (
//Define global topic names
const (
//ScanAllPolicyTopic is for notifying the change of scanning all policy.
ScanAllPolicyTopic = common.ScanAllPolicy

View File

@ -0,0 +1,154 @@
package policy
import (
//AlternatePolicyConfiguration store the related configurations for alternate policy.
type AlternatePolicyConfiguration struct {
//Duration is the interval of executing attached tasks.
Duration time.Duration
//OffsetTime is the execution time point of each turn
//It's a number to indicate the seconds offset to the 00:00 of UTC time.
OffsetTime int64
//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
type AlternatePolicy struct {
//Keep the attached tasks.
tasks []task.Task
//Policy configurations.
config *AlternatePolicyConfiguration
//Generate time ticks with specified duration.
ticker *time.Ticker
//To indicated whether policy is completed.
isEnabled bool
//Channel used to send evaluation result signals.
evaluation chan bool
//Channel used to notify policy termination.
done chan bool
//Channel used to receive terminate signal.
terminator chan bool
//NewAlternatePolicy is constructor of creating AlternatePolicy.
func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
return &AlternatePolicy{
tasks: []task.Task{},
config: config,
isEnabled: false,
//GetConfig returns the current configuration options of this policy.
func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration {
return alp.config
//Name is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Name() string {
return "Alternate Policy"
//Tasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Tasks() []task.Task {
copyList := []task.Task{}
if alp.tasks != nil && len(alp.tasks) > 0 {
copyList = append(copyList, alp.tasks...)
return copyList
//Done is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Done() <-chan bool {
return alp.done
//AttachTasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
if tasks == nil || len(tasks) == 0 {
return errors.New("No tasks can be attached")
alp.tasks = append(alp.tasks, tasks...)
return nil
//Disable is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Disable() error {
//Stop the ticker
if alp.ticker != nil {
//Stop the evaluation goroutine
alp.terminator <- true
alp.ticker = nil
return nil
//Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
//Keep idempotent
if alp.isEnabled && alp.evaluation != nil {
return alp.evaluation, nil
alp.done = make(chan bool)
alp.terminator = make(chan bool)
alp.evaluation = make(chan bool)
go func() {
defer func() {
alp.isEnabled = false
timeNow := time.Now().UTC()
//Reach the execution time point?
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
diff := alp.config.OffsetTime - utcTime
if diff < 0 {
diff += 24 * 3600
if diff > 0 {
//Wait for a while.
select {
case <-time.After(time.Duration(diff) * time.Second):
case <-alp.terminator:
//Trigger the first tick.
alp.evaluation <- true
//Start the ticker for repeat checking.
alp.ticker = time.NewTicker(alp.config.Duration)
for {
select {
case <-alp.ticker.C:
alp.evaluation <- true
case <-alp.terminator:
alp.isEnabled = true
return alp.evaluation, nil

View File

@ -0,0 +1,114 @@
package policy
import (
type fakeTask struct {
number int
func (ft *fakeTask) Name() string {
return "for testing"
func (ft *fakeTask) Run() error {
return nil
func TestBasic(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{})
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
if tp.GetConfig() == nil {
if tp.Name() != "Alternate Policy" {
tks := tp.Tasks()
if tks == nil || len(tks) != 1 {
func TestEvaluatePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
ch, _ := tp.Evaluate()
counter := 0
for i := 0; i < 3; i++ {
select {
case <-ch:
case <-time.After(2 * time.Second):
if counter != 3 {
func TestDisablePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
ch, _ := tp.Evaluate()
counter := 0
terminate := make(chan bool)
defer func() {
terminate <- true
go func() {
for {
select {
case <-ch:
case <-terminate:
case <-time.After(6 * time.Second):
time.Sleep(2 * time.Second)
if tp.Disable() != nil {
t.Fatal("Failed to disable policy")
//Waiting for everything is stable
<-time.After(1 * time.Second)
//Copy value
copiedCounter := counter
time.Sleep(2 * time.Second)
if counter != copiedCounter {
t.Fatalf("Policy is still running after calling Disable() %d=%d", copiedCounter, counter)

View File

@ -0,0 +1,39 @@
package policy
import (
//Policy is an if-then logic to determine how the attached tasks should be
//executed based on the evaluation result of the defined conditions.
// Daily execute TASK between 2017/06/24 and 2018/06/23
// Execute TASK at 2017/09/01 14:30:00
//Each policy should have a name to identify itself.
//Please be aware that policy with no tasks will be treated as invalid.
type Policy interface {
//Name will return the name of the policy.
Name() string
//Tasks will return the attached tasks with this policy.
Tasks() []task.Task
//AttachTasks is to attach tasks to this policy
AttachTasks(...task.Task) error
//Done will setup a channel for other components to check whether or not
//the policy is completed. Possibly designed for the none loop policy.
Done() <-chan bool
//Evaluate the policy based on its definition and return the result via
//result channel. Policy is enabled after it is evaluated.
//Make sure Evaluate is idempotent, that means one policy can be only enabled
//only once even if Evaluate is called more than one times.
Evaluate() (<-chan bool, error)
//Disable the enabled policy and release all the allocated resources.
//Disable should also send signal to the terminated channel which returned by Done.
Disable() error

View File

@ -0,0 +1,265 @@
package scheduler
import (
const (
defaultQueueSize = 10
statSchedulePolicy = "Schedule Policy"
statUnSchedulePolicy = "Unschedule Policy"
statTaskRun = "Task Run"
statTaskComplete = "Task Complete"
statTaskFail = "Task Fail"
//StatItem is defined for the stat metrics.
type StatItem struct {
//Metrics catalog
Type string
//The stat value
Value uint32
//Attach some other info
Attachment interface{}
//StatSummary is used to collect some metrics of scheduler.
type StatSummary struct {
//Count of scheduled policy
PolicyCount uint32
//Total count of tasks
Tasks uint32
//Count of successfully complete tasks
CompletedTasks uint32
//Count of tasks with errors
TasksWithError uint32
//Configuration defines configuration of Scheduler.
type Configuration struct {
QueueSize uint8
//Scheduler is designed for scheduling policies.
type Scheduler struct {
//Related configuration options for scheduler.
config *Configuration
//Store to keep the references of scheduled policies.
policies Store
//Queue for accepting the scheduling polices.
scheduleQueue chan policy.Policy
//Queue for receiving policy unschedule request or complete signal.
unscheduleQueue chan string
//Channel for receiving stat metrics.
statChan chan *StatItem
//Channel for terminate scheduler damon.
terminateChan chan bool
//The stat metrics of scheduler.
stats *StatSummary
//To indicate whether scheduler is running or not
isRunning bool
//DefaultScheduler is a default scheduler.
var DefaultScheduler = NewScheduler(nil)
//NewScheduler is constructor for creating a scheduler.
func NewScheduler(config *Configuration) *Scheduler {
var qSize uint8 = defaultQueueSize
if config != nil && config.QueueSize > 0 {
qSize = config.QueueSize
sq := make(chan policy.Policy, qSize)
usq := make(chan string, qSize)
stChan := make(chan *StatItem, 4)
tc := make(chan bool, 1)
store := NewConcurrentStore()
return &Scheduler{
config: config,
policies: store,
scheduleQueue: sq,
unscheduleQueue: usq,
statChan: stChan,
terminateChan: tc,
stats: &StatSummary{
PolicyCount: 0,
Tasks: 0,
CompletedTasks: 0,
TasksWithError: 0,
isRunning: false,
//Start the scheduler damon.
func (sch *Scheduler) Start() {
if sch.isRunning {
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in scheduler:%s\n", r)
defer func() {
sch.isRunning = false
for {
select {
case <-sch.terminateChan:
case p := <-sch.scheduleQueue:
//Schedule the policy.
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
//Keep the policy for future use after it's successfully scheduled.
sch.policies.Put(p.Name(), watcher)
//Enable it.
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
case name := <-sch.unscheduleQueue:
//Find the watcher.
watcher := sch.policies.Remove(name)
if watcher != nil && watcher.IsRunning() {
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
case stat := <-sch.statChan:
switch stat.Type {
case statSchedulePolicy:
sch.stats.PolicyCount += stat.Value
case statUnSchedulePolicy:
sch.stats.PolicyCount -= stat.Value
case statTaskRun:
sch.stats.Tasks += stat.Value
case statTaskComplete:
sch.stats.CompletedTasks += stat.Value
case statTaskFail:
sch.stats.TasksWithError += stat.Value
log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n",
if stat.Attachment != nil &&
reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" {
log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error())
sch.isRunning = true
log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339))
//Stop the scheduler damon.
func (sch *Scheduler) Stop() {
if !sch.isRunning {
//Terminate damon firstly to stop receiving signals.
sch.terminateChan <- true
//Stop all watchers.
for _, wt := range sch.policies.GetAll() {
//Clear resources
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
//Schedule and enable the policy.
func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
if scheduledPolicy == nil {
return errors.New("nil is not Policy object")
if strings.TrimSpace(scheduledPolicy.Name()) == "" {
return errors.New("Policy should be assigned a name")
tasks := scheduledPolicy.Tasks()
if tasks == nil || len(tasks) == 0 {
return errors.New("Policy must attach task(s)")
if sch.policies.Exists(scheduledPolicy.Name()) {
return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name())
//Schedule the policy.
sch.scheduleQueue <- scheduledPolicy
return nil
//UnSchedule the specified policy from the enabled policies list.
func (sch *Scheduler) UnSchedule(policyName string) error {
if strings.TrimSpace(policyName) == "" {
return errors.New("Empty policy name is invalid")
if !sch.policies.Exists(policyName) {
return fmt.Errorf("Policy %s is not existing", policyName)
//Unschedule the policy.
sch.unscheduleQueue <- policyName
return nil
//IsRunning to indicate whether the scheduler is running.
func (sch *Scheduler) IsRunning() bool {
return sch.isRunning
//HasScheduled is to check whether the given policy has been scheduled or not.
func (sch *Scheduler) HasScheduled(policyName string) bool {
return sch.policies.Exists(policyName)

View File

@ -0,0 +1,136 @@
package scheduler
import (
//Store define the basic operations for storing and managing policy watcher.
//The concrete implementation should consider concurrent supporting scenario.
type Store interface {
//Put a new policy in.
Put(key string, value *Watcher)
//Get the corresponding policy with the key.
Get(key string) *Watcher
//Exists is to check if the key existing in the store.
Exists(key string) bool
//Remove the specified policy and return its reference.
Remove(key string) *Watcher
//Size return the total count of items in store.
Size() uint32
//GetAll is to get all the items in the store.
GetAll() []*Watcher
//Clear store.
//ConcurrentStore implements Store interface and supports concurrent operations.
type ConcurrentStore struct {
//Read-write mutex to synchronize the data map.
//Map used to keep the policy list.
data map[string]*Watcher
//NewConcurrentStore is used to create a new store and return the pointer reference.
func NewConcurrentStore() *ConcurrentStore {
mutex := new(sync.RWMutex)
data := make(map[string]*Watcher)
return &ConcurrentStore{mutex, data}
//Put a policy into store.
func (cs *ConcurrentStore) Put(key string, value *Watcher) {
if strings.TrimSpace(key) == "" || value == nil {
defer cs.Unlock()
cs.Lock()[key] = value
//Get policy via key.
func (cs *ConcurrentStore) Get(key string) *Watcher {
if strings.TrimSpace(key) == "" {
return nil
defer cs.RUnlock()
//Exists is used to check whether or not the key exists in store.
func (cs *ConcurrentStore) Exists(key string) bool {
if strings.TrimSpace(key) == "" {
return false
defer cs.RUnlock()
_, ok :=[key]
return ok
//Remove is to delete the specified policy.
func (cs *ConcurrentStore) Remove(key string) *Watcher {
if !cs.Exists(key) {
return nil
defer cs.Unlock()
if wt, ok :=[key]; ok {
delete(, key)
return wt
return nil
//Size return the total count of items in store.
func (cs *ConcurrentStore) Size() uint32 {
return (uint32)(len(
//GetAll to get all the items of store.
func (cs *ConcurrentStore) GetAll() []*Watcher {
all := []*Watcher{}
defer cs.RUnlock()
for _, v := range {
all = append(all, v)
return all
//Clear all the items in store.
func (cs *ConcurrentStore) Clear() {
if cs.Size() == 0 {
defer cs.Unlock()
for k := range {
delete(, k)

View File

@ -0,0 +1,71 @@
package scheduler
import (
func TestPut(t *testing.T) {
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
store.Put("testing", NewWatcher(nil, nil, nil))
if store.Size() != 1 {
func TestGet(t *testing.T) {
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
store.Put("testing", NewWatcher(nil, nil, nil))
w := store.Get("testing")
if w == nil {
func TestRemove(t *testing.T) {
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
store.Put("testing", NewWatcher(nil, nil, nil))
if !store.Exists("testing") {
w := store.Remove("testing")
if w == nil {
func TestExisting(t *testing.T) {
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
store.Put("testing", NewWatcher(nil, nil, nil))
if !store.Exists("testing") {
if store.Exists("fake_key") {
func TestGetAll(t *testing.T) {
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
store.Put("testing", NewWatcher(nil, nil, nil))
store.Put("testing2", NewWatcher(nil, nil, nil))
list := store.GetAll()
if list == nil || len(list) != 2 {

View File

@ -0,0 +1,142 @@
package scheduler
import (
type fakePolicy struct {
tasks []task.Task
done chan bool
evaluation chan bool
terminate chan bool
ticker *time.Ticker
func (fp *fakePolicy) Name() string {
return "testing policy"
func (fp *fakePolicy) Tasks() []task.Task {
return fp.tasks
func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error {
fp.tasks = append(fp.tasks, tasks...)
return nil
func (fp *fakePolicy) Done() <-chan bool {
return fp.done
func (fp *fakePolicy) Evaluate() (<-chan bool, error) {
fp.evaluation = make(chan bool, 1)
fp.done = make(chan bool)
fp.terminate = make(chan bool)
fp.evaluation <- true
go func() {
fp.ticker = time.NewTicker(1 * time.Second)
for {
select {
case <-fp.terminate:
case <-fp.ticker.C:
fp.evaluation <- true
return fp.evaluation, nil
func (fp *fakePolicy) Disable() error {
if fp.ticker != nil {
fp.terminate <- true
return nil
type fakeTask struct {
number int
func (ft *fakeTask) Name() string {
return "for testing"
func (ft *fakeTask) Run() error {
return nil
//Wacher will be tested together with scheduler.
func TestScheduler(t *testing.T) {
if DefaultScheduler.policies.Size() != 0 {
if DefaultScheduler.stats.PolicyCount != 0 {
if !DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is not started")
fp := &fakePolicy{
tasks: []task.Task{},
fk := &fakeTask{number: 100}
if DefaultScheduler.Schedule(fp) != nil {
t.Fatal("Schedule policy failed")
//Waiting for everything is stable
time.Sleep(1 * time.Second)
if DefaultScheduler.policies.Size() == 0 {
t.Fatal("No policy in the store after calling Schedule()")
if DefaultScheduler.stats.PolicyCount != 1 {
t.Fatal("Policy stats do not match")
time.Sleep(2 * time.Second)
if fk.number == 100 {
t.Fatal("Task is not triggered")
if DefaultScheduler.stats.Tasks == 0 {
if DefaultScheduler.stats.CompletedTasks == 0 {
if DefaultScheduler.UnSchedule(fp.Name()) != nil {
t.Fatal("Unschedule policy failed")
//Waiting for everything is stable
time.Sleep(1 * time.Second)
if DefaultScheduler.stats.PolicyCount != 0 {
t.Fatal("Policy count does not match after calling UnSchedule()")
copiedValue := DefaultScheduler.stats.CompletedTasks
<-time.After(2 * time.Second)
if copiedValue != DefaultScheduler.stats.CompletedTasks {
t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", copiedValue, DefaultScheduler.stats.CompletedTasks)
<-time.After(1 * time.Second)
if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is still running after stopping")

View File

@ -0,0 +1,23 @@
package task
import (
//ScanAllTask is task of scanning all tags.
type ScanAllTask struct{}
//NewScanAllTask is constructor of creating ScanAllTask.
func NewScanAllTask() *ScanAllTask {
return &ScanAllTask{}
//Name returns the name of the task.
func (sat *ScanAllTask) Name() string {
return "scan all"
//Run the actions.
func (sat *ScanAllTask) Run() error {
return utils.ScanAllImages()

View File

@ -0,0 +1,16 @@
package task
import (
func TestTask(t *testing.T) {
tk := NewScanAllTask()
if tk == nil {
if tk.Name() != "scan all" {

View File

@ -0,0 +1,10 @@
package task
//Task is used to synchronously run specific action(s).
type Task interface {
//Name should return the name of the task.
Name() string
//Run the concrete code here
Run() error

View File

@ -0,0 +1,139 @@
package scheduler
import (
//Watcher is an asynchronous runner to provide an evaluation environment for the policy.
type Watcher struct {
//The target policy.
p policy.Policy
//The channel for receive stop signal.
cmdChan chan bool
//Indicate whether the watch is started and running.
isRunning bool
//Report stats to scheduler.
stats chan *StatItem
//If policy is automatically completed, report the policy to scheduler.
doneChan chan string
//NewWatcher is used as a constructor.
func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher {
return &Watcher{
p: p,
cmdChan: make(chan bool),
isRunning: false,
stats: st,
doneChan: done,
//Start the running.
func (wc *Watcher) Start() {
if wc.isRunning {
if wc.p == nil {
go func(pl policy.Policy) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in watcher:%s\n", r)
defer func() {
wc.isRunning = false
evalChan, err := pl.Evaluate()
if err != nil {
log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error())
done := pl.Done()
for {
select {
case <-evalChan:
//Start to run the attached tasks.
for _, t := range pl.Tasks() {
go func(tk task.Task) {
defer func() {
if r := recover(); r != nil {
st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)}
if wc.stats != nil {
wc.stats <- st
err := tk.Run()
//Report task execution stats.
st := &StatItem{statTaskComplete, 1, err}
if err != nil {
st.Type = statTaskFail
if wc.stats != nil {
wc.stats <- st
//Report task run stats.
st := &StatItem{statTaskRun, 1, nil}
if wc.stats != nil {
wc.stats <- st
case <-done:
//Policy is automatically completed.
//Report policy change stats.
if wc.doneChan != nil {
wc.doneChan <- wc.p.Name()
case <-wc.cmdChan:
//Exit goroutine.
wc.isRunning = true
//Stop the running.
func (wc *Watcher) Stop() {
if !wc.isRunning {
//Disable policy.
if wc.p != nil {
//Stop watcher.
wc.cmdChan <- true
//IsRunning to indicate if the watcher is still running.
func (wc *Watcher) IsRunning() bool {
return wc.isRunning

View File

@ -20,6 +20,7 @@ import (
@ -127,6 +128,48 @@ func ParseTimeStamp(timestamp string) (*time.Time, error) {
return &t, nil
//ConvertMapToStruct is used to fill the specified struct with map.
func ConvertMapToStruct(object interface{}, valuesInMap map[string]interface{}) error {
if object == nil {
return fmt.Errorf("nil struct is not supported")
if reflect.TypeOf(object).Kind() != reflect.Ptr {
return fmt.Errorf("object should be referred by pointer")
for k, v := range valuesInMap {
if err := setField(object, k, v); err != nil {
return err
return nil
func setField(object interface{}, field string, value interface{}) error {
structValue := reflect.ValueOf(object).Elem()
structFieldValue := structValue.FieldByName(field)
if !structFieldValue.IsValid() {
return fmt.Errorf("No such field: %s in obj", field)
if !structFieldValue.CanSet() {
return fmt.Errorf("Cannot set value for field %s", field)
structFieldType := structFieldValue.Type()
val := reflect.ValueOf(value)
if structFieldType != val.Type() {
return errors.New("Provided value type didn't match object field type")
return nil
// ParseProjectIDOrName parses value to ID(int64) or name(string)
func ParseProjectIDOrName(value interface{}) (int64, string, error) {
if value == nil {

View File

@ -243,3 +243,23 @@ func TestParseHarborIDOrName(t *testing.T) {
assert.Equal(t, int64(0), id)
assert.Equal(t, "project", name)
type testingStruct struct {
Name string
Count int
func TestConvertMapToStruct(t *testing.T) {
dataMap := make(map[string]interface{})
dataMap["Name"] = "testing"
dataMap["Count"] = 100
obj := &testingStruct{}
if err := ConvertMapToStruct(obj, dataMap); err != nil {
} else {
if obj.Name != "testing" || obj.Count != 100 {

View File

@ -190,6 +190,9 @@ func (c *ConfigAPI) Put() {
log.Errorf("failed to load configurations: %v", err)
c.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
//Everything is ok, detect the configurations to confirm if the option we are caring is changed.
// Reset system configurations

View File

@ -17,14 +17,17 @@ package api
import (
registry_error ""
@ -496,3 +499,35 @@ func transformVulnerabilities(layerWithVuln *models.ClairLayerEnvelope) []*model
return res
//Watch the configuration changes.
func watchConfigChanges(cfg map[string]interface{}) error {
if cfg == nil {
return errors.New("Empty configurations")
//Currently only watch the scan all policy change.
if v, ok := cfg[notifier.ScanAllPolicyTopic]; ok {
if reflect.TypeOf(v).Kind() == reflect.Map {
policyCfg := &models.ScanAllPolicy{}
if err := utils.ConvertMapToStruct(policyCfg, v.(map[string]interface{})); err != nil {
return err
policyNotification := notifier.ScanPolicyNotification{
Type: policyCfg.Type,
DailyTime: 0,
if t, yes := policyCfg.Parm["daily_time"]; yes {
if reflect.TypeOf(t).Kind() == reflect.Int {
policyNotification.DailyTime = (int64)(t.(int))
return notifier.Publish(notifier.ScanAllPolicyTopic, policyNotification)
return nil

View File

@ -143,7 +143,7 @@ func Reset() error {
return mg.Reset()
// Upload uploads all system configutations to admin server
// Upload uploads all system configurations to admin server
func Upload(cfg map[string]interface{}) error {
return mg.Upload(cfg)

View File

@ -17,6 +17,7 @@ package main
import (
@ -26,6 +27,8 @@ import (
_ ""
_ ""
@ -98,6 +101,26 @@ func main() {
//Enable the policy scheduler here.
//Subscribe the policy change topic.
notifier.Subscribe(notifier.ScanAllPolicyTopic, &notifier.ScanPolicyNotificationHandler{})
//Get policy configuration.
scanAllPolicy := config.ScanAllPolicy()
if scanAllPolicy.Type == notifier.PolicyTypeDaily {
dailyTime := 0
if t, ok := scanAllPolicy.Parm["daily_time"]; ok {
if reflect.TypeOf(t).Kind() == reflect.Int {
dailyTime = t.(int)
//Send notification to handle first policy change.
notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)})
beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter)

View File

@ -100,7 +100,7 @@ export class Configuration {
this.verify_remote_cert = new BoolValueItem(false, true);
this.scan_all_policy = new ComplexValueItem({
type: "daily",
parameters: {
parameter: {
daily_time: 0
}, true);

View File

@ -1,7 +1,12 @@
export const REGISTRY_CONFIG_HTML: string = `
<replication-config [(replicationConfig)]="config"></replication-config>
<system-settings [(systemSettings)]="config"></system-settings>
<vulnerability-config [(vulnerabilityConfig)]="config"></vulnerability-config>
<replication-config #replicationConfig [(replicationConfig)]="config"></replication-config>
<system-settings #systemSettings [(systemSettings)]="config"></system-settings>
<vulnerability-config #vulnerabilityConfig [(vulnerabilityConfig)]="config"></vulnerability-config>
<button type="button" class="btn btn-primary" (click)="save()" [disabled]="shouldDisable">{{'BUTTON.SAVE' | translate}}</button>
<button type="button" class="btn btn-outline" (click)="cancel()" [disabled]="shouldDisable">{{'BUTTON.CANCEL' | translate}}</button>
<confirmation-dialog #cfgConfirmationDialog (confirmAction)="confirmCancel($event)"></confirmation-dialog>

View File

@ -8,6 +8,7 @@ import { ReplicationConfigComponent } from './replication/
import { SystemSettingsComponent } from './system/system-settings.component';
import { VulnerabilityConfigComponent } from './vulnerability/vulnerability-config.component';
import { RegistryConfigComponent } from './registry-config.component';
import { ConfirmationDialogComponent } from '../confirmation-dialog/confirmation-dialog.component';
import {
@ -29,7 +30,7 @@ describe('RegistryConfigComponent (inline template)', () => {
mockConfig.verify_remote_cert.value = true;
mockConfig.scan_all_policy.value = {
type: "daily",
parameters: {
parameter: {
daily_time: 0
@ -46,7 +47,8 @@ describe('RegistryConfigComponent (inline template)', () => {
providers: [

View File

@ -1,10 +1,22 @@
import { Component, OnInit, EventEmitter, Output } from '@angular/core';
import { Component, OnInit, EventEmitter, Output, ViewChild } from '@angular/core';
import { Configuration, ComplexValueItem } from './config';
import { REGISTRY_CONFIG_HTML } from './registry-config.component.html';
import { ConfigurationService } from '../service/index';
import { toPromise } from '../utils';
import { ErrorHandler } from '../error-handler';
import {
} from './index';
import { ConfirmationState, ConfirmationTargets, ConfirmationButtons } from '../shared/shared.const';
import { ConfirmationDialogComponent } from '../confirmation-dialog/confirmation-dialog.component';
import { ConfirmationMessage } from '../confirmation-dialog/confirmation-message';
import { ConfirmationAcknowledgement } from '../confirmation-dialog/confirmation-state-message';
import { TranslateService } from '@ngx-translate/core';
selector: 'hbr-registry-config',
@ -13,27 +25,56 @@ import { ErrorHandler } from '../error-handler';
export class RegistryConfigComponent implements OnInit {
config: Configuration = new Configuration();
configCopy: Configuration;
onGoing: boolean = false;
@Output() configChanged: EventEmitter<any> = new EventEmitter<any>();
@ViewChild("replicationConfig") replicationCfg: ReplicationConfigComponent;
@ViewChild("systemSettings") systemSettings: SystemSettingsComponent;
@ViewChild("vulnerabilityConfig") vulnerabilityCfg: VulnerabilityConfigComponent;
@ViewChild("cfgConfirmationDialog") confirmationDlg: ConfirmationDialogComponent;
private configService: ConfigurationService,
private errorHandler: ErrorHandler
private errorHandler: ErrorHandler,
private translate: TranslateService
) { }
get shouldDisable(): boolean {
return !this.isValid() || !this.hasChanges() || this.onGoing;
ngOnInit(): void {
isValid(): boolean {
return this.replicationCfg &&
this.replicationCfg.isValid &&
this.systemSettings &&
this.systemSettings.isValid &&
this.vulnerabilityCfg &&
hasChanges(): boolean {
return !this._isEmptyObject(this.getChanges());
//Load configurations
load(): void {
this.onGoing = true;
.then((config: Configuration) => {
this.configCopy = Object.assign({}, config);
this.onGoing = false;
this.configCopy = this._clone(config);
this.config = config;
.catch(error => this.errorHandler.error(error));
.catch(error => {
this.onGoing = false;
//Save configuration changes
@ -45,26 +86,48 @@ export class RegistryConfigComponent implements OnInit {
//Fix policy parameters issue
let scanningAllPolicy = changes["scan_all_policy"];
if (scanningAllPolicy &&
scanningAllPolicy.type !== "daily" &&
scanningAllPolicy.parameters) {
delete (scanningAllPolicy.parameters);
this.onGoing = true;
.then(() => {
this.onGoing = false;
this.translate.get("CONFIG.SAVE_SUCCESS").subscribe((res: string) => {;
//Reload to fetch all the updates
.catch(error => this.errorHandler.error(error));
.catch(error => {
this.onGoing = false;
//Cancel the changes if have
cancel(): void {
let msg = new ConfirmationMessage(
//Confirm cancel
confirmCancel(ack: ConfirmationAcknowledgement): void {
if (ack && ack.source === ConfirmationTargets.CONFIG &&
ack.state === ConfirmationState.CONFIRMED) {
reset(): void {
//Reset to the values of copy
let changes: { [key: string]: any | any[] } = this.getChanges();
for (let prop in changes) {
this.config[prop] = Object.assign({}, this.configCopy[prop]);
this.config[prop] = this._clone(this.configCopy[prop]);
@ -107,4 +170,10 @@ export class RegistryConfigComponent implements OnInit {
_isEmptyObject(obj: any): boolean {
return !obj || JSON.stringify(obj) === "{}";
//Deeper clone all
_clone(srcObj: any): any {
if (!srcObj) return null;
return JSON.parse(JSON.stringify(srcObj));

View File

@ -5,7 +5,7 @@ export const VULNERABILITY_CONFIG_HTML: string = `
<div class="form-group">
<label for="scanAllPolicy">{{ 'CONFIG.SCANNING.SCAN_ALL' | translate }}</label>
<div class="select">
<select id="scanAllPolicy" name="scanAllPolicy" [disabled]="!editable" [(ngModel)]="vulnerabilityConfig.scan_all_policy.value.type">
<select id="scanAllPolicy" name="scanAllPolicy" [disabled]="!editable" [(ngModel)]="scanningType">
<option value="none">{{ 'CONFIG.SCANNING.NONE_POLICY' | translate }}</option>
<option value="daily">{{ 'CONFIG.SCANNING.DAILY_POLICY' | translate }}</option>
<option value="on_refresh">{{ 'CONFIG.SCANNING.REFRESH_POLICY' | translate }}</option>

View File

@ -30,9 +30,9 @@ export class VulnerabilityConfigComponent {
this.config = cfg;
if (this.config.scan_all_policy &&
this.config.scan_all_policy.value) {
if (this.config.scan_all_policy.value.type === "daily"){
this.config.scan_all_policy.value.parameters = {
if (this.config.scan_all_policy.value.type === "daily") {
if (!this.config.scan_all_policy.value.parameter) {
this.config.scan_all_policy.value.parameter = {
daily_time: 0
@ -51,8 +51,8 @@ export class VulnerabilityConfigComponent {
let timeOffset: number = 0;//seconds
if (this.config.scan_all_policy.value.parameters) {
let daily_time = this.config.scan_all_policy.value.parameters.daily_time;
if (this.config.scan_all_policy.value.parameter) {
let daily_time = this.config.scan_all_policy.value.parameter.daily_time;
if (daily_time && typeof daily_time === "number") {
timeOffset = +daily_time;
@ -99,8 +99,9 @@ export class VulnerabilityConfigComponent {
if (!this.config.scan_all_policy.value.parameters) {
this.config.scan_all_policy.value.parameters = {
//Double confirm inner parameter existing.
if (!this.config.scan_all_policy.value.parameter) {
this.config.scan_all_policy.value.parameter = {
daily_time: 0
@ -124,7 +125,41 @@ export class VulnerabilityConfigComponent {
utcTimes -= ONE_DAY_SECONDS;
this.config.scan_all_policy.value.parameters.daily_time = utcTimes;
this.config.scan_all_policy.value.parameter.daily_time = utcTimes;
//Scanning type
get scanningType(): string {
if (this.config &&
this.config.scan_all_policy &&
this.config.scan_all_policy.value) {
return this.config.scan_all_policy.value.type;
} else {
return "none";
set scanningType(v: string) {
if (this.config &&
this.config.scan_all_policy &&
this.config.scan_all_policy.value) {
let type: string = (v && v.trim() !== "") ? v : "none";
this.config.scan_all_policy.value.type = type;
if (type !== "daily") {
//No parameter
if (this.config.scan_all_policy.value.parameter) {
delete (this.config.scan_all_policy.value.parameter);
} else {
//Has parameter
if (!this.config.scan_all_policy.value.parameter) {
this.config.scan_all_policy.value.parameter = {
daily_time: 0
@ViewChild("systemConfigFrom") systemSettingsForm: NgForm;

View File

@ -31,7 +31,7 @@
"clarity-icons": "^0.9.8",
"clarity-ui": "^0.9.8",
"core-js": "^2.4.1",
"harbor-ui": "^0.2.40",
"harbor-ui": "^0.2.52",
"intl": "^1.2.5",
"mutationobserver-shim": "^0.3.2",
"ngx-cookie": "^1.0.0",

View File

@ -16,7 +16,7 @@
<button id="config-system" class="btn btn-link nav-link" aria-controls="system_settings" []='isCurrentTabLink("config-system")' type="button" (click)='tabLinkClick("config-system")'>{{'CONFIG.SYSTEM' | translate }}</button>
<li role="presentation" class="nav-item">
<button id="config-vulnerability" class="btn btn-link nav-link" aria-controls="vulnerability" []='isCurrentTabLink("config-vulnerability")' type="button" (click)='tabLinkClick("config-vulnerability")'>Vulnerability</button>
<button id="config-vulnerability" class="btn btn-link nav-link" aria-controls="vulnerability" []='isCurrentTabLink("config-vulnerability")' type="button" (click)='tabLinkClick("config-vulnerability")'>{{'VULNERABILITY.SINGULAR' | translate}}</button>
<section id="authentication" role="tabpanel" aria-labelledby="config-auth" [hidden]='!isCurrentTabContent("authentication")'>
@ -41,7 +41,6 @@
<button type="button" class="btn btn-outline" (click)="testLDAPServer()" *ngIf="showLdapServerBtn" [disabled]="!isLDAPConfigValid()">{{'BUTTON.TEST_LDAP' | translate}}</button>
<span id="forTestingMail" class="spinner spinner-inline" [hidden]="hideMailTestingSpinner"></span>
<span id="forTestingLDAP" class="spinner spinner-inline" [hidden]="hideLDAPTestingSpinner"></span>
<button type="button" class="btn btn-primary" (click)="consoleTest()">CONSOLE</button>

View File

@ -71,11 +71,6 @@ export class ConfigurationComponent implements OnInit, OnDestroy {
private appConfigService: AppConfigService,
private session: SessionService) { }
consoleTest(): void {
console.log(this.allConfig, this.originalCopy);
isCurrentTabLink(tabId: string): boolean {
return this.currentTabId === tabId;