diff --git a/src/common/notifier/notification_handler.go b/src/common/notifier/notification_handler.go new file mode 100644 index 000000000..5d4508aa3 --- /dev/null +++ b/src/common/notifier/notification_handler.go @@ -0,0 +1,15 @@ +package notifier + +//NotificationHandler defines what operations a notification handler +//should have. +type NotificationHandler interface { + //Handle the event when it coming. + //value might be optional, it depends on usages. + Handle(value interface{}) error + + //IsStateful returns whether the handler is stateful or not. + //If handler is stateful, it will not be triggerred in parallel. + //Otherwise, the handler will be triggered concurrently if more + //than one same handler are matched the topics. + IsStateful() bool +} diff --git a/src/common/notifier/notifier.go b/src/common/notifier/notifier.go new file mode 100644 index 000000000..5b92247ff --- /dev/null +++ b/src/common/notifier/notifier.go @@ -0,0 +1,228 @@ +package notifier + +import ( + "errors" + "fmt" + "reflect" + "strings" + "sync" + + "github.com/vmware/harbor/src/common/utils/log" +) + +//HandlerIndexer is setup the relationship between the handler type and +//instance. +type HandlerIndexer map[string]NotificationHandler + +//Notification wraps the topic and related data value if existing. +type Notification struct { + //Topic of notification + //Required + Topic string + + //Value of notification. + //Optional + Value interface{} +} + +//HandlerChannel provides not only the chan itself but also the count of +//handlers related with this chan. +type HandlerChannel struct { + //To indicate how many handler instances bound with this chan. + boundCount uint32 + + //The chan for controling concurrent executions. + channel chan bool +} + +//NotificationWatcher is defined to accept the events published +//by the sender and match it with pre-registered notification handler +//and then trigger the execution of the found handler. +type NotificationWatcher struct { + //For handle concurrent scenario. + *sync.RWMutex + + //To keep the registered handlers in memory. + //Each topic can register multiple handlers. + //Each handler can bind to multiple topics. + handlers map[string]HandlerIndexer + + //Keep the channels which are used to control the concurrent executions + //of multiple stateful handlers with same type. + handlerChannels map[string]*HandlerChannel +} + +//notificationWatcher is a default notification watcher in package level. +var notificationWatcher = NewNotificationWatcher() + +//NewNotificationWatcher is constructor of NotificationWatcher. +func NewNotificationWatcher() *NotificationWatcher { + return &NotificationWatcher{ + new(sync.RWMutex), + make(map[string]HandlerIndexer), + make(map[string]*HandlerChannel), + } +} + +//Handle the related topic with the specified handler. +func (nw *NotificationWatcher) Handle(topic string, handler NotificationHandler) error { + if strings.TrimSpace(topic) == "" { + return errors.New("Empty topic is not supported") + } + + if handler == nil { + return errors.New("Nil handler can not be registered") + } + + defer nw.Unlock() + nw.Lock() + + t := reflect.TypeOf(handler).String() + if indexer, ok := nw.handlers[topic]; ok { + if _, existing := indexer[t]; existing { + return fmt.Errorf("Topic %s has already register the handler with type %s", topic, t) + } + + indexer[t] = handler + } else { + newIndexer := make(HandlerIndexer) + newIndexer[t] = handler + nw.handlers[topic] = newIndexer + } + + if handler.IsStateful() { + //First time + if handlerChan, ok := nw.handlerChannels[t]; !ok { + nw.handlerChannels[t] = &HandlerChannel{1, make(chan bool, 1)} + } else { + //Already have chan, just increase count + handlerChan.boundCount++ + } + } + + return nil +} + +//UnHandle is to revoke the registered handler with the specified topic. +//'handler' is optional, the type name of the handler. If it's empty value, +//then revoke the whole topic, otherwise only revoke the specified handler. +func (nw *NotificationWatcher) UnHandle(topic string, handler string) error { + if strings.TrimSpace(topic) == "" { + return errors.New("Empty topic is not supported") + } + + defer nw.Unlock() + nw.Lock() + + var revokeHandler = func(indexer HandlerIndexer, handlerType string) bool { + //Find the specified one + if hd, existing := indexer[handlerType]; existing { + delete(indexer, handlerType) + if len(indexer) == 0 { + //No handler existing, then remove topic + delete(nw.handlers, topic) + } + + //Update channel counter or remove channel + if hd.IsStateful() { + if theChan, yes := nw.handlerChannels[handlerType]; yes { + theChan.boundCount-- + if theChan.boundCount == 0 { + //Empty, then remove the channel + delete(nw.handlerChannels, handlerType) + } + } + } + + return true + } + + return false + } + + if indexer, ok := nw.handlers[topic]; ok { + if strings.TrimSpace(handler) == "" { + for t := range indexer { + revokeHandler(indexer, t) + } + + return nil + } + + //Revoke the specified handler. + if revokeHandler(indexer, handler) { + return nil + } + } + + return fmt.Errorf("Failed to revoke handler %s with topic %s", handler, topic) +} + +//Notify that notification is coming. +func (nw *NotificationWatcher) Notify(notification Notification) error { + if strings.TrimSpace(notification.Topic) == "" { + return errors.New("Empty topic can not be notified") + } + + nw.RLock() + defer nw.RUnlock() + + var ( + indexer HandlerIndexer + ok bool + handlers = []NotificationHandler{} + ) + if indexer, ok = nw.handlers[notification.Topic]; !ok { + return fmt.Errorf("No handlers registered for handling topic %s", notification.Topic) + } + + for _, h := range indexer { + handlers = append(handlers, h) + } + + //Trigger handlers + for _, h := range handlers { + var handlerChan chan bool + + if h.IsStateful() { + t := reflect.TypeOf(h).String() + handlerChan = nw.handlerChannels[t].channel + } + go func(hd NotificationHandler, ch chan bool) { + if hd.IsStateful() && ch != nil { + ch <- true + } + go func() { + defer func() { + if hd.IsStateful() && ch != nil { + <-ch + } + }() + if err := hd.Handle(notification.Value); err != nil { + //Currently, we just log the error + log.Errorf("Error occurred when triggerring handler %s of topic %s: %s\n", reflect.TypeOf(hd).String(), notification.Topic, err.Error()) + } + }() + }(h, handlerChan) + } + + return nil +} + +//Subscribe is a wrapper utility method for NotificationWatcher.handle() +func Subscribe(topic string, handler NotificationHandler) error { + return notificationWatcher.Handle(topic, handler) +} + +//UnSubscribe is a wrapper utility method for NotificationWatcher.UnHandle() +func UnSubscribe(topic string, handler string) error { + return notificationWatcher.UnHandle(topic, handler) +} + +//Publish is a wrapper utility method for NotificationWatcher.notify() +func Publish(topic string, value interface{}) error { + return notificationWatcher.Notify(Notification{ + Topic: topic, + Value: value, + }) +} diff --git a/src/common/notifier/notifier_test.go b/src/common/notifier/notifier_test.go new file mode 100644 index 000000000..4d3499a9d --- /dev/null +++ b/src/common/notifier/notifier_test.go @@ -0,0 +1,142 @@ +package notifier + +import ( + "reflect" + "testing" + "time" +) + +var statefulData int + +type fakeStatefulHandler struct { + number int +} + +func (fsh *fakeStatefulHandler) IsStateful() bool { + return true +} + +func (fsh *fakeStatefulHandler) Handle(v interface{}) error { + increment := 0 + if v != nil && reflect.TypeOf(v).Kind() == reflect.Int { + increment = v.(int) + } + statefulData += increment + return nil +} + +type fakeStatelessHandler struct{} + +func (fsh *fakeStatelessHandler) IsStateful() bool { + return false +} + +func (fsh *fakeStatelessHandler) Handle(v interface{}) error { + return nil +} + +func TestSubscribeAndUnSubscribe(t *testing.T) { + err := Subscribe("topic1", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic1", &fakeStatelessHandler{}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatelessHandler{}) + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 2 { + t.Fail() + } + + if indexer, ok := notificationWatcher.handlers["topic1"]; !ok { + t.Fail() + } else { + if len(indexer) != 2 { + t.Fail() + } + } + + if len(notificationWatcher.handlerChannels) != 1 { + t.Fail() + } + + err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlerChannels) != 0 { + t.Fail() + } + + err = UnSubscribe("topic1", "") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 1 { + t.Fail() + } + + err = UnSubscribe("topic2", "") + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 0 { + t.Fail() + } +} + +func TestPublish(t *testing.T) { + err := Subscribe("topic1", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + err = Subscribe("topic2", &fakeStatefulHandler{0}) + if err != nil { + t.Fatal(err) + } + + if len(notificationWatcher.handlers) != 2 { + t.Fail() + } + + Publish("topic1", 100) + Publish("topic2", 50) + + //Waiting for async is done + <-time.After(1 * time.Second) + + if statefulData != 150 { + t.Fatalf("Expect execution result %d, but got %d", 150, statefulData) + } + + err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } + + err = UnSubscribe("topic2", "*notifier.fakeStatefulHandler") + if err != nil { + t.Fatal(err) + } +} diff --git a/src/common/notifier/scan_policy_notitification_handler.go b/src/common/notifier/scan_policy_notitification_handler.go new file mode 100644 index 000000000..56c1bf885 --- /dev/null +++ b/src/common/notifier/scan_policy_notitification_handler.go @@ -0,0 +1,72 @@ +package notifier + +import ( + "errors" + "reflect" + + "time" + + "github.com/vmware/harbor/src/common/scheduler" + "github.com/vmware/harbor/src/common/scheduler/policy" + "github.com/vmware/harbor/src/common/scheduler/task" +) + +const ( + //PolicyTypeDaily specify the policy type is "daily" + PolicyTypeDaily = "daily" + + alternatePolicy = "Alternate Policy" +) + +//ScanPolicyNotification is defined for pass the policy change data. +type ScanPolicyNotification struct { + //Type is used to keep the scan policy type: "none","daily" and "refresh". + Type string + + //DailyTime is used when the type is 'daily', the offset with UTC time 00:00. + DailyTime int64 +} + +//ScanPolicyNotificationHandler is defined to handle the changes of scanning +//policy. +type ScanPolicyNotificationHandler struct{} + +//IsStateful to indicate this handler is stateful. +func (s *ScanPolicyNotificationHandler) IsStateful() bool { + //Policy change should be done one by one. + return true +} + +//Handle the policy change notification. +func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { + if value == nil { + return errors.New("ScanPolicyNotificationHandler can not handle nil value") + } + + if reflect.TypeOf(value).Kind() != reflect.Struct || + reflect.TypeOf(value).String() != "notifier.ScanPolicyNotification" { + return errors.New("ScanPolicyNotificationHandler can not handle value with invalid type") + } + + notification := value.(ScanPolicyNotification) + + hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy) + if notification.Type == PolicyTypeDaily { + if !hasScheduled { + schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ + Duration: 24 * time.Hour, + OffsetTime: notification.DailyTime, + }) + attachTask := task.NewScanAllTask() + schedulePolicy.AttachTasks(attachTask) + + return scheduler.DefaultScheduler.Schedule(schedulePolicy) + } + } else { + if hasScheduled { + return scheduler.DefaultScheduler.UnSchedule(alternatePolicy) + } + } + + return nil +} diff --git a/src/common/notifier/scan_policy_notitification_handler_test.go b/src/common/notifier/scan_policy_notitification_handler_test.go new file mode 100644 index 000000000..46efffbe5 --- /dev/null +++ b/src/common/notifier/scan_policy_notitification_handler_test.go @@ -0,0 +1,54 @@ +package notifier + +import ( + "testing" + "time" + + "github.com/vmware/harbor/src/common/scheduler" +) + +var testingScheduler = scheduler.DefaultScheduler + +func TestScanPolicyNotificationHandler(t *testing.T) { + //Scheduler should be running. + testingScheduler.Start() + if !testingScheduler.IsRunning() { + t.Fatal("scheduler should be running") + } + + handler := &ScanPolicyNotificationHandler{} + if !handler.IsStateful() { + t.Fail() + } + + utcTime := time.Now().UTC().Unix() + notification := ScanPolicyNotification{"daily", utcTime + 3600} + if err := handler.Handle(notification); err != nil { + t.Fatal(err) + } + + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if !testingScheduler.HasScheduled("Alternate Policy") { + t.Fatal("Handler does not work") + } + + notification2 := ScanPolicyNotification{"none", 0} + if err := handler.Handle(notification2); err != nil { + t.Fatal(err) + } + + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if testingScheduler.HasScheduled("Alternate Policy") { + t.Fail() + } + + //Clear + testingScheduler.Stop() + //Waiting for everything is ready. + <-time.After(1 * time.Second) + if testingScheduler.IsRunning() { + t.Fatal("scheduler should be stopped") + } +} diff --git a/src/common/notifier/topics.go b/src/common/notifier/topics.go new file mode 100644 index 000000000..869ff1055 --- /dev/null +++ b/src/common/notifier/topics.go @@ -0,0 +1,11 @@ +package notifier + +import ( + "github.com/vmware/harbor/src/common" +) + +//Define global topic names +const ( + //ScanAllPolicyTopic is for notifying the change of scanning all policy. + ScanAllPolicyTopic = common.ScanAllPolicy +) diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go new file mode 100644 index 000000000..38a20e92f --- /dev/null +++ b/src/common/scheduler/policy/alternate_policy.go @@ -0,0 +1,154 @@ +package policy + +import ( + "errors" + "time" + + "github.com/vmware/harbor/src/common/scheduler/task" +) + +//AlternatePolicyConfiguration store the related configurations for alternate policy. +type AlternatePolicyConfiguration struct { + //Duration is the interval of executing attached tasks. + Duration time.Duration + + //OffsetTime is the execution time point of each turn + //It's a number to indicate the seconds offset to the 00:00 of UTC time. + OffsetTime int64 +} + +//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope. +type AlternatePolicy struct { + //Keep the attached tasks. + tasks []task.Task + + //Policy configurations. + config *AlternatePolicyConfiguration + + //Generate time ticks with specified duration. + ticker *time.Ticker + + //To indicated whether policy is completed. + isEnabled bool + + //Channel used to send evaluation result signals. + evaluation chan bool + + //Channel used to notify policy termination. + done chan bool + + //Channel used to receive terminate signal. + terminator chan bool +} + +//NewAlternatePolicy is constructor of creating AlternatePolicy. +func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy { + return &AlternatePolicy{ + tasks: []task.Task{}, + config: config, + isEnabled: false, + } +} + +//GetConfig returns the current configuration options of this policy. +func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration { + return alp.config +} + +//Name is an implementation of same method in policy interface. +func (alp *AlternatePolicy) Name() string { + return "Alternate Policy" +} + +//Tasks is an implementation of same method in policy interface. +func (alp *AlternatePolicy) Tasks() []task.Task { + copyList := []task.Task{} + if alp.tasks != nil && len(alp.tasks) > 0 { + copyList = append(copyList, alp.tasks...) + } + + return copyList +} + +//Done is an implementation of same method in policy interface. +func (alp *AlternatePolicy) Done() <-chan bool { + return alp.done +} + +//AttachTasks is an implementation of same method in policy interface. +func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error { + if tasks == nil || len(tasks) == 0 { + return errors.New("No tasks can be attached") + } + + alp.tasks = append(alp.tasks, tasks...) + + return nil +} + +//Disable is an implementation of same method in policy interface. +func (alp *AlternatePolicy) Disable() error { + //Stop the ticker + if alp.ticker != nil { + alp.ticker.Stop() + } + + //Stop the evaluation goroutine + alp.terminator <- true + alp.ticker = nil + + return nil +} + +//Evaluate is an implementation of same method in policy interface. +func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { + //Keep idempotent + if alp.isEnabled && alp.evaluation != nil { + return alp.evaluation, nil + } + + alp.done = make(chan bool) + alp.terminator = make(chan bool) + alp.evaluation = make(chan bool) + + go func() { + defer func() { + alp.isEnabled = false + }() + timeNow := time.Now().UTC() + + //Reach the execution time point? + utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60) + diff := alp.config.OffsetTime - utcTime + if diff < 0 { + diff += 24 * 3600 + } + if diff > 0 { + //Wait for a while. + select { + case <-time.After(time.Duration(diff) * time.Second): + case <-alp.terminator: + return + } + } + + //Trigger the first tick. + alp.evaluation <- true + + //Start the ticker for repeat checking. + alp.ticker = time.NewTicker(alp.config.Duration) + for { + select { + case <-alp.ticker.C: + alp.evaluation <- true + case <-alp.terminator: + return + } + } + }() + + //Enabled + alp.isEnabled = true + + return alp.evaluation, nil +} diff --git a/src/common/scheduler/policy/alternate_policy_test.go b/src/common/scheduler/policy/alternate_policy_test.go new file mode 100644 index 000000000..5340b04eb --- /dev/null +++ b/src/common/scheduler/policy/alternate_policy_test.go @@ -0,0 +1,114 @@ +package policy + +import ( + "testing" + "time" +) + +type fakeTask struct { + number int +} + +func (ft *fakeTask) Name() string { + return "for testing" +} + +func (ft *fakeTask) Run() error { + ft.number++ + return nil +} + +func TestBasic(t *testing.T) { + tp := NewAlternatePolicy(&AlternatePolicyConfiguration{}) + err := tp.AttachTasks(&fakeTask{number: 100}) + if err != nil { + t.Fail() + } + + if tp.GetConfig() == nil { + t.Fail() + } + + if tp.Name() != "Alternate Policy" { + t.Fail() + } + + tks := tp.Tasks() + if tks == nil || len(tks) != 1 { + t.Fail() + } + +} + +func TestEvaluatePolicy(t *testing.T) { + now := time.Now().UTC() + utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) + tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ + Duration: 1 * time.Second, + OffsetTime: utcOffset + 1, + }) + err := tp.AttachTasks(&fakeTask{number: 100}) + if err != nil { + t.Fail() + } + ch, _ := tp.Evaluate() + counter := 0 + + for i := 0; i < 3; i++ { + select { + case <-ch: + counter++ + case <-time.After(2 * time.Second): + continue + } + } + + if counter != 3 { + t.Fail() + } + + tp.Disable() +} + +func TestDisablePolicy(t *testing.T) { + now := time.Now().UTC() + utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) + tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ + Duration: 1 * time.Second, + OffsetTime: utcOffset + 1, + }) + err := tp.AttachTasks(&fakeTask{number: 100}) + if err != nil { + t.Fail() + } + ch, _ := tp.Evaluate() + counter := 0 + terminate := make(chan bool) + defer func() { + terminate <- true + }() + go func() { + for { + select { + case <-ch: + counter++ + case <-terminate: + return + case <-time.After(6 * time.Second): + return + } + } + }() + time.Sleep(2 * time.Second) + if tp.Disable() != nil { + t.Fatal("Failed to disable policy") + } + //Waiting for everything is stable + <-time.After(1 * time.Second) + //Copy value + copiedCounter := counter + time.Sleep(2 * time.Second) + if counter != copiedCounter { + t.Fatalf("Policy is still running after calling Disable() %d=%d", copiedCounter, counter) + } +} diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go new file mode 100644 index 000000000..ad11cf5df --- /dev/null +++ b/src/common/scheduler/policy/policy.go @@ -0,0 +1,39 @@ +package policy + +import ( + "github.com/vmware/harbor/src/common/scheduler/task" +) + +//Policy is an if-then logic to determine how the attached tasks should be +//executed based on the evaluation result of the defined conditions. +//E.g: +// Daily execute TASK between 2017/06/24 and 2018/06/23 +// Execute TASK at 2017/09/01 14:30:00 +// +//Each policy should have a name to identify itself. +//Please be aware that policy with no tasks will be treated as invalid. +// +type Policy interface { + //Name will return the name of the policy. + Name() string + + //Tasks will return the attached tasks with this policy. + Tasks() []task.Task + + //AttachTasks is to attach tasks to this policy + AttachTasks(...task.Task) error + + //Done will setup a channel for other components to check whether or not + //the policy is completed. Possibly designed for the none loop policy. + Done() <-chan bool + + //Evaluate the policy based on its definition and return the result via + //result channel. Policy is enabled after it is evaluated. + //Make sure Evaluate is idempotent, that means one policy can be only enabled + //only once even if Evaluate is called more than one times. + Evaluate() (<-chan bool, error) + + //Disable the enabled policy and release all the allocated resources. + //Disable should also send signal to the terminated channel which returned by Done. + Disable() error +} diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go new file mode 100644 index 000000000..3073f6343 --- /dev/null +++ b/src/common/scheduler/scheduler.go @@ -0,0 +1,265 @@ +package scheduler + +import ( + "github.com/vmware/harbor/src/common/scheduler/policy" + "github.com/vmware/harbor/src/common/utils/log" + + "errors" + "fmt" + "reflect" + "strings" + "time" +) + +const ( + defaultQueueSize = 10 + + statSchedulePolicy = "Schedule Policy" + statUnSchedulePolicy = "Unschedule Policy" + statTaskRun = "Task Run" + statTaskComplete = "Task Complete" + statTaskFail = "Task Fail" +) + +//StatItem is defined for the stat metrics. +type StatItem struct { + //Metrics catalog + Type string + + //The stat value + Value uint32 + + //Attach some other info + Attachment interface{} +} + +//StatSummary is used to collect some metrics of scheduler. +type StatSummary struct { + //Count of scheduled policy + PolicyCount uint32 + + //Total count of tasks + Tasks uint32 + + //Count of successfully complete tasks + CompletedTasks uint32 + + //Count of tasks with errors + TasksWithError uint32 +} + +//Configuration defines configuration of Scheduler. +type Configuration struct { + QueueSize uint8 +} + +//Scheduler is designed for scheduling policies. +type Scheduler struct { + //Related configuration options for scheduler. + config *Configuration + + //Store to keep the references of scheduled policies. + policies Store + + //Queue for accepting the scheduling polices. + scheduleQueue chan policy.Policy + + //Queue for receiving policy unschedule request or complete signal. + unscheduleQueue chan string + + //Channel for receiving stat metrics. + statChan chan *StatItem + + //Channel for terminate scheduler damon. + terminateChan chan bool + + //The stat metrics of scheduler. + stats *StatSummary + + //To indicate whether scheduler is running or not + isRunning bool +} + +//DefaultScheduler is a default scheduler. +var DefaultScheduler = NewScheduler(nil) + +//NewScheduler is constructor for creating a scheduler. +func NewScheduler(config *Configuration) *Scheduler { + var qSize uint8 = defaultQueueSize + if config != nil && config.QueueSize > 0 { + qSize = config.QueueSize + } + + sq := make(chan policy.Policy, qSize) + usq := make(chan string, qSize) + stChan := make(chan *StatItem, 4) + tc := make(chan bool, 1) + + store := NewConcurrentStore() + return &Scheduler{ + config: config, + policies: store, + scheduleQueue: sq, + unscheduleQueue: usq, + statChan: stChan, + terminateChan: tc, + stats: &StatSummary{ + PolicyCount: 0, + Tasks: 0, + CompletedTasks: 0, + TasksWithError: 0, + }, + isRunning: false, + } +} + +//Start the scheduler damon. +func (sch *Scheduler) Start() { + if sch.isRunning { + return + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Runtime error in scheduler:%s\n", r) + } + }() + defer func() { + sch.isRunning = false + }() + for { + select { + case <-sch.terminateChan: + //Exit + return + case p := <-sch.scheduleQueue: + //Schedule the policy. + watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue) + + //Keep the policy for future use after it's successfully scheduled. + sch.policies.Put(p.Name(), watcher) + + //Enable it. + watcher.Start() + + sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} + case name := <-sch.unscheduleQueue: + //Find the watcher. + watcher := sch.policies.Remove(name) + if watcher != nil && watcher.IsRunning() { + watcher.Stop() + } + + sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} + + case stat := <-sch.statChan: + { + switch stat.Type { + case statSchedulePolicy: + sch.stats.PolicyCount += stat.Value + break + case statUnSchedulePolicy: + sch.stats.PolicyCount -= stat.Value + break + case statTaskRun: + sch.stats.Tasks += stat.Value + break + case statTaskComplete: + sch.stats.CompletedTasks += stat.Value + break + case statTaskFail: + sch.stats.TasksWithError += stat.Value + break + default: + break + } + log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n", + sch.stats.PolicyCount, + sch.stats.Tasks, + sch.stats.CompletedTasks, + sch.stats.TasksWithError) + + if stat.Attachment != nil && + reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" { + log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error()) + } + } + + } + } + }() + + sch.isRunning = true + log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339)) +} + +//Stop the scheduler damon. +func (sch *Scheduler) Stop() { + if !sch.isRunning { + return + } + + //Terminate damon firstly to stop receiving signals. + sch.terminateChan <- true + + //Stop all watchers. + for _, wt := range sch.policies.GetAll() { + wt.Stop() + } + + //Clear resources + sch.policies.Clear() + + log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339)) +} + +//Schedule and enable the policy. +func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error { + if scheduledPolicy == nil { + return errors.New("nil is not Policy object") + } + + if strings.TrimSpace(scheduledPolicy.Name()) == "" { + return errors.New("Policy should be assigned a name") + } + + tasks := scheduledPolicy.Tasks() + if tasks == nil || len(tasks) == 0 { + return errors.New("Policy must attach task(s)") + } + + if sch.policies.Exists(scheduledPolicy.Name()) { + return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name()) + } + + //Schedule the policy. + sch.scheduleQueue <- scheduledPolicy + + return nil +} + +//UnSchedule the specified policy from the enabled policies list. +func (sch *Scheduler) UnSchedule(policyName string) error { + if strings.TrimSpace(policyName) == "" { + return errors.New("Empty policy name is invalid") + } + + if !sch.policies.Exists(policyName) { + return fmt.Errorf("Policy %s is not existing", policyName) + } + + //Unschedule the policy. + sch.unscheduleQueue <- policyName + + return nil +} + +//IsRunning to indicate whether the scheduler is running. +func (sch *Scheduler) IsRunning() bool { + return sch.isRunning +} + +//HasScheduled is to check whether the given policy has been scheduled or not. +func (sch *Scheduler) HasScheduled(policyName string) bool { + return sch.policies.Exists(policyName) +} diff --git a/src/common/scheduler/scheduler_store.go b/src/common/scheduler/scheduler_store.go new file mode 100644 index 000000000..9a94d3b70 --- /dev/null +++ b/src/common/scheduler/scheduler_store.go @@ -0,0 +1,136 @@ +package scheduler + +import ( + "strings" + "sync" +) + +//Store define the basic operations for storing and managing policy watcher. +//The concrete implementation should consider concurrent supporting scenario. +// +type Store interface { + //Put a new policy in. + Put(key string, value *Watcher) + + //Get the corresponding policy with the key. + Get(key string) *Watcher + + //Exists is to check if the key existing in the store. + Exists(key string) bool + + //Remove the specified policy and return its reference. + Remove(key string) *Watcher + + //Size return the total count of items in store. + Size() uint32 + + //GetAll is to get all the items in the store. + GetAll() []*Watcher + + //Clear store. + Clear() +} + +//ConcurrentStore implements Store interface and supports concurrent operations. +type ConcurrentStore struct { + //Read-write mutex to synchronize the data map. + *sync.RWMutex + + //Map used to keep the policy list. + data map[string]*Watcher +} + +//NewConcurrentStore is used to create a new store and return the pointer reference. +func NewConcurrentStore() *ConcurrentStore { + mutex := new(sync.RWMutex) + data := make(map[string]*Watcher) + + return &ConcurrentStore{mutex, data} +} + +//Put a policy into store. +func (cs *ConcurrentStore) Put(key string, value *Watcher) { + if strings.TrimSpace(key) == "" || value == nil { + return + } + + defer cs.Unlock() + + cs.Lock() + cs.data[key] = value +} + +//Get policy via key. +func (cs *ConcurrentStore) Get(key string) *Watcher { + if strings.TrimSpace(key) == "" { + return nil + } + + defer cs.RUnlock() + + cs.RLock() + return cs.data[key] +} + +//Exists is used to check whether or not the key exists in store. +func (cs *ConcurrentStore) Exists(key string) bool { + if strings.TrimSpace(key) == "" { + return false + } + + defer cs.RUnlock() + + cs.RLock() + _, ok := cs.data[key] + + return ok +} + +//Remove is to delete the specified policy. +func (cs *ConcurrentStore) Remove(key string) *Watcher { + if !cs.Exists(key) { + return nil + } + + defer cs.Unlock() + + cs.Lock() + if wt, ok := cs.data[key]; ok { + delete(cs.data, key) + return wt + } + + return nil +} + +//Size return the total count of items in store. +func (cs *ConcurrentStore) Size() uint32 { + return (uint32)(len(cs.data)) +} + +//GetAll to get all the items of store. +func (cs *ConcurrentStore) GetAll() []*Watcher { + all := []*Watcher{} + + defer cs.RUnlock() + cs.RLock() + for _, v := range cs.data { + all = append(all, v) + } + + return all +} + +//Clear all the items in store. +func (cs *ConcurrentStore) Clear() { + if cs.Size() == 0 { + return + } + + defer cs.Unlock() + cs.Lock() + + for k := range cs.data { + delete(cs.data, k) + } +} diff --git a/src/common/scheduler/scheduler_store_test.go b/src/common/scheduler/scheduler_store_test.go new file mode 100644 index 000000000..11dc1d8c2 --- /dev/null +++ b/src/common/scheduler/scheduler_store_test.go @@ -0,0 +1,71 @@ +package scheduler + +import ( + "testing" +) + +func TestPut(t *testing.T) { + store := NewConcurrentStore() + if store == nil { + t.Fatal("Failed to creat store instance") + } + + store.Put("testing", NewWatcher(nil, nil, nil)) + if store.Size() != 1 { + t.Fail() + } +} + +func TestGet(t *testing.T) { + store := NewConcurrentStore() + if store == nil { + t.Fatal("Failed to creat store instance") + } + store.Put("testing", NewWatcher(nil, nil, nil)) + w := store.Get("testing") + if w == nil { + t.Fail() + } +} + +func TestRemove(t *testing.T) { + store := NewConcurrentStore() + if store == nil { + t.Fatal("Failed to creat store instance") + } + store.Put("testing", NewWatcher(nil, nil, nil)) + if !store.Exists("testing") { + t.Fail() + } + w := store.Remove("testing") + if w == nil { + t.Fail() + } +} + +func TestExisting(t *testing.T) { + store := NewConcurrentStore() + if store == nil { + t.Fatal("Failed to creat store instance") + } + store.Put("testing", NewWatcher(nil, nil, nil)) + if !store.Exists("testing") { + t.Fail() + } + if store.Exists("fake_key") { + t.Fail() + } +} + +func TestGetAll(t *testing.T) { + store := NewConcurrentStore() + if store == nil { + t.Fatal("Failed to creat store instance") + } + store.Put("testing", NewWatcher(nil, nil, nil)) + store.Put("testing2", NewWatcher(nil, nil, nil)) + list := store.GetAll() + if list == nil || len(list) != 2 { + t.Fail() + } +} diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go new file mode 100644 index 000000000..f038e1802 --- /dev/null +++ b/src/common/scheduler/scheduler_test.go @@ -0,0 +1,142 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/vmware/harbor/src/common/scheduler/task" +) + +type fakePolicy struct { + tasks []task.Task + done chan bool + evaluation chan bool + terminate chan bool + ticker *time.Ticker +} + +func (fp *fakePolicy) Name() string { + return "testing policy" +} + +func (fp *fakePolicy) Tasks() []task.Task { + return fp.tasks +} + +func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error { + fp.tasks = append(fp.tasks, tasks...) + return nil +} + +func (fp *fakePolicy) Done() <-chan bool { + return fp.done +} + +func (fp *fakePolicy) Evaluate() (<-chan bool, error) { + fp.evaluation = make(chan bool, 1) + fp.done = make(chan bool) + fp.terminate = make(chan bool) + + fp.evaluation <- true + go func() { + fp.ticker = time.NewTicker(1 * time.Second) + for { + select { + case <-fp.terminate: + return + case <-fp.ticker.C: + fp.evaluation <- true + } + } + }() + return fp.evaluation, nil +} + +func (fp *fakePolicy) Disable() error { + if fp.ticker != nil { + fp.ticker.Stop() + } + + fp.terminate <- true + return nil +} + +type fakeTask struct { + number int +} + +func (ft *fakeTask) Name() string { + return "for testing" +} + +func (ft *fakeTask) Run() error { + ft.number++ + return nil +} + +//Wacher will be tested together with scheduler. +func TestScheduler(t *testing.T) { + DefaultScheduler.Start() + if DefaultScheduler.policies.Size() != 0 { + t.Fail() + } + + if DefaultScheduler.stats.PolicyCount != 0 { + t.Fail() + } + + if !DefaultScheduler.IsRunning() { + t.Fatal("Scheduler is not started") + } + + fp := &fakePolicy{ + tasks: []task.Task{}, + } + fk := &fakeTask{number: 100} + fp.AttachTasks(fk) + + if DefaultScheduler.Schedule(fp) != nil { + t.Fatal("Schedule policy failed") + } + //Waiting for everything is stable + time.Sleep(1 * time.Second) + if DefaultScheduler.policies.Size() == 0 { + t.Fatal("No policy in the store after calling Schedule()") + } + if DefaultScheduler.stats.PolicyCount != 1 { + t.Fatal("Policy stats do not match") + } + + time.Sleep(2 * time.Second) + if fk.number == 100 { + t.Fatal("Task is not triggered") + } + if DefaultScheduler.stats.Tasks == 0 { + t.Fail() + } + if DefaultScheduler.stats.CompletedTasks == 0 { + t.Fail() + } + + if DefaultScheduler.UnSchedule(fp.Name()) != nil { + t.Fatal("Unschedule policy failed") + } + //Waiting for everything is stable + time.Sleep(1 * time.Second) + + if DefaultScheduler.stats.PolicyCount != 0 { + t.Fatal("Policy count does not match after calling UnSchedule()") + } + copiedValue := DefaultScheduler.stats.CompletedTasks + <-time.After(2 * time.Second) + + if copiedValue != DefaultScheduler.stats.CompletedTasks { + t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", copiedValue, DefaultScheduler.stats.CompletedTasks) + } + + DefaultScheduler.Stop() + <-time.After(1 * time.Second) + if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() { + t.Fatal("Scheduler is still running after stopping") + } +} diff --git a/src/common/scheduler/task/scan_all_task.go b/src/common/scheduler/task/scan_all_task.go new file mode 100644 index 000000000..c7d70d838 --- /dev/null +++ b/src/common/scheduler/task/scan_all_task.go @@ -0,0 +1,23 @@ +package task + +import ( + "github.com/vmware/harbor/src/ui/utils" +) + +//ScanAllTask is task of scanning all tags. +type ScanAllTask struct{} + +//NewScanAllTask is constructor of creating ScanAllTask. +func NewScanAllTask() *ScanAllTask { + return &ScanAllTask{} +} + +//Name returns the name of the task. +func (sat *ScanAllTask) Name() string { + return "scan all" +} + +//Run the actions. +func (sat *ScanAllTask) Run() error { + return utils.ScanAllImages() +} diff --git a/src/common/scheduler/task/scan_all_task_test.go b/src/common/scheduler/task/scan_all_task_test.go new file mode 100644 index 000000000..18ac9202b --- /dev/null +++ b/src/common/scheduler/task/scan_all_task_test.go @@ -0,0 +1,16 @@ +package task + +import ( + "testing" +) + +func TestTask(t *testing.T) { + tk := NewScanAllTask() + if tk == nil { + t.Fail() + } + + if tk.Name() != "scan all" { + t.Fail() + } +} diff --git a/src/common/scheduler/task/task.go b/src/common/scheduler/task/task.go new file mode 100644 index 000000000..162a6501a --- /dev/null +++ b/src/common/scheduler/task/task.go @@ -0,0 +1,10 @@ +package task + +//Task is used to synchronously run specific action(s). +type Task interface { + //Name should return the name of the task. + Name() string + + //Run the concrete code here + Run() error +} diff --git a/src/common/scheduler/watcher.go b/src/common/scheduler/watcher.go new file mode 100644 index 000000000..feb2f7739 --- /dev/null +++ b/src/common/scheduler/watcher.go @@ -0,0 +1,139 @@ +package scheduler + +import ( + "github.com/vmware/harbor/src/common/scheduler/policy" + "github.com/vmware/harbor/src/common/scheduler/task" + "github.com/vmware/harbor/src/common/utils/log" + + "fmt" +) + +//Watcher is an asynchronous runner to provide an evaluation environment for the policy. +type Watcher struct { + //The target policy. + p policy.Policy + + //The channel for receive stop signal. + cmdChan chan bool + + //Indicate whether the watch is started and running. + isRunning bool + + //Report stats to scheduler. + stats chan *StatItem + + //If policy is automatically completed, report the policy to scheduler. + doneChan chan string +} + +//NewWatcher is used as a constructor. +func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher { + return &Watcher{ + p: p, + cmdChan: make(chan bool), + isRunning: false, + stats: st, + doneChan: done, + } +} + +//Start the running. +func (wc *Watcher) Start() { + if wc.isRunning { + return + } + + if wc.p == nil { + return + } + + go func(pl policy.Policy) { + defer func() { + if r := recover(); r != nil { + log.Errorf("Runtime error in watcher:%s\n", r) + } + }() + + defer func() { + wc.isRunning = false + }() + + evalChan, err := pl.Evaluate() + if err != nil { + log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error()) + return + } + done := pl.Done() + + for { + select { + case <-evalChan: + { + //Start to run the attached tasks. + for _, t := range pl.Tasks() { + go func(tk task.Task) { + defer func() { + if r := recover(); r != nil { + st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)} + if wc.stats != nil { + wc.stats <- st + } + } + }() + err := tk.Run() + + //Report task execution stats. + st := &StatItem{statTaskComplete, 1, err} + if err != nil { + st.Type = statTaskFail + } + if wc.stats != nil { + wc.stats <- st + } + }(t) + + //Report task run stats. + st := &StatItem{statTaskRun, 1, nil} + if wc.stats != nil { + wc.stats <- st + } + } + } + case <-done: + { + //Policy is automatically completed. + //Report policy change stats. + if wc.doneChan != nil { + wc.doneChan <- wc.p.Name() + } + + return + } + case <-wc.cmdChan: + //Exit goroutine. + return + } + } + }(wc.p) + + wc.isRunning = true +} + +//Stop the running. +func (wc *Watcher) Stop() { + if !wc.isRunning { + return + } + + //Disable policy. + if wc.p != nil { + wc.p.Disable() + } + //Stop watcher. + wc.cmdChan <- true +} + +//IsRunning to indicate if the watcher is still running. +func (wc *Watcher) IsRunning() bool { + return wc.isRunning +} diff --git a/src/common/utils/utils.go b/src/common/utils/utils.go index ff1fcd2b1..5436e17b3 100644 --- a/src/common/utils/utils.go +++ b/src/common/utils/utils.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "net/url" + "reflect" "strconv" "strings" "time" @@ -127,6 +128,48 @@ func ParseTimeStamp(timestamp string) (*time.Time, error) { return &t, nil } +//ConvertMapToStruct is used to fill the specified struct with map. +func ConvertMapToStruct(object interface{}, valuesInMap map[string]interface{}) error { + if object == nil { + return fmt.Errorf("nil struct is not supported") + } + + if reflect.TypeOf(object).Kind() != reflect.Ptr { + return fmt.Errorf("object should be referred by pointer") + } + + for k, v := range valuesInMap { + if err := setField(object, k, v); err != nil { + return err + } + } + + return nil +} + +func setField(object interface{}, field string, value interface{}) error { + structValue := reflect.ValueOf(object).Elem() + + structFieldValue := structValue.FieldByName(field) + if !structFieldValue.IsValid() { + return fmt.Errorf("No such field: %s in obj", field) + } + + if !structFieldValue.CanSet() { + return fmt.Errorf("Cannot set value for field %s", field) + } + + structFieldType := structFieldValue.Type() + val := reflect.ValueOf(value) + if structFieldType != val.Type() { + return errors.New("Provided value type didn't match object field type") + } + + structFieldValue.Set(val) + + return nil +} + // ParseProjectIDOrName parses value to ID(int64) or name(string) func ParseProjectIDOrName(value interface{}) (int64, string, error) { if value == nil { diff --git a/src/common/utils/utils_test.go b/src/common/utils/utils_test.go index fda1695b7..82b7b5e9e 100644 --- a/src/common/utils/utils_test.go +++ b/src/common/utils/utils_test.go @@ -243,3 +243,23 @@ func TestParseHarborIDOrName(t *testing.T) { assert.Equal(t, int64(0), id) assert.Equal(t, "project", name) } + +type testingStruct struct { + Name string + Count int +} + +func TestConvertMapToStruct(t *testing.T) { + dataMap := make(map[string]interface{}) + dataMap["Name"] = "testing" + dataMap["Count"] = 100 + + obj := &testingStruct{} + if err := ConvertMapToStruct(obj, dataMap); err != nil { + t.Fatal(err) + } else { + if obj.Name != "testing" || obj.Count != 100 { + t.Fail() + } + } +} diff --git a/src/ui/api/config.go b/src/ui/api/config.go index 7b5af6cce..e1854d702 100644 --- a/src/ui/api/config.go +++ b/src/ui/api/config.go @@ -190,6 +190,9 @@ func (c *ConfigAPI) Put() { log.Errorf("failed to load configurations: %v", err) c.CustomAbort(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) } + + //Everything is ok, detect the configurations to confirm if the option we are caring is changed. + watchConfigChanges(cfg) } // Reset system configurations diff --git a/src/ui/api/utils.go b/src/ui/api/utils.go index 2fff9de26..0919c31c8 100644 --- a/src/ui/api/utils.go +++ b/src/ui/api/utils.go @@ -17,14 +17,17 @@ package api import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" + "reflect" "sort" "strings" "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils/clair" registry_error "github.com/vmware/harbor/src/common/utils/error" @@ -496,3 +499,35 @@ func transformVulnerabilities(layerWithVuln *models.ClairLayerEnvelope) []*model } return res } + +//Watch the configuration changes. +func watchConfigChanges(cfg map[string]interface{}) error { + if cfg == nil { + return errors.New("Empty configurations") + } + + //Currently only watch the scan all policy change. + if v, ok := cfg[notifier.ScanAllPolicyTopic]; ok { + if reflect.TypeOf(v).Kind() == reflect.Map { + policyCfg := &models.ScanAllPolicy{} + if err := utils.ConvertMapToStruct(policyCfg, v.(map[string]interface{})); err != nil { + return err + } + + policyNotification := notifier.ScanPolicyNotification{ + Type: policyCfg.Type, + DailyTime: 0, + } + + if t, yes := policyCfg.Parm["daily_time"]; yes { + if reflect.TypeOf(t).Kind() == reflect.Int { + policyNotification.DailyTime = (int64)(t.(int)) + } + } + + return notifier.Publish(notifier.ScanAllPolicyTopic, policyNotification) + } + } + + return nil +} diff --git a/src/ui/config/config.go b/src/ui/config/config.go index e85cd69ff..785b0efba 100644 --- a/src/ui/config/config.go +++ b/src/ui/config/config.go @@ -143,7 +143,7 @@ func Reset() error { return mg.Reset() } -// Upload uploads all system configutations to admin server +// Upload uploads all system configurations to admin server func Upload(cfg map[string]interface{}) error { return mg.Upload(cfg) } diff --git a/src/ui/main.go b/src/ui/main.go index 5a3651912..7f7f57e47 100644 --- a/src/ui/main.go +++ b/src/ui/main.go @@ -17,6 +17,7 @@ package main import ( "fmt" "os" + "reflect" "github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils/log" @@ -26,6 +27,8 @@ import ( "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/notifier" + "github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/ui/api" _ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/ldap" @@ -98,6 +101,26 @@ func main() { log.Error(err) } + //Enable the policy scheduler here. + scheduler.DefaultScheduler.Start() + + //Subscribe the policy change topic. + notifier.Subscribe(notifier.ScanAllPolicyTopic, ¬ifier.ScanPolicyNotificationHandler{}) + + //Get policy configuration. + scanAllPolicy := config.ScanAllPolicy() + if scanAllPolicy.Type == notifier.PolicyTypeDaily { + dailyTime := 0 + if t, ok := scanAllPolicy.Parm["daily_time"]; ok { + if reflect.TypeOf(t).Kind() == reflect.Int { + dailyTime = t.(int) + } + } + + //Send notification to handle first policy change. + notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)}) + } + filter.Init() beego.InsertFilter("/*", beego.BeforeRouter, filter.SecurityFilter) diff --git a/src/ui_ng/lib/src/config/config.ts b/src/ui_ng/lib/src/config/config.ts index a5672b35a..383e490f7 100644 --- a/src/ui_ng/lib/src/config/config.ts +++ b/src/ui_ng/lib/src/config/config.ts @@ -100,7 +100,7 @@ export class Configuration { this.verify_remote_cert = new BoolValueItem(false, true); this.scan_all_policy = new ComplexValueItem({ type: "daily", - parameters: { + parameter: { daily_time: 0 } }, true); diff --git a/src/ui_ng/lib/src/config/registry-config.component.html.ts b/src/ui_ng/lib/src/config/registry-config.component.html.ts index 6402681bd..98c594fa8 100644 --- a/src/ui_ng/lib/src/config/registry-config.component.html.ts +++ b/src/ui_ng/lib/src/config/registry-config.component.html.ts @@ -1,7 +1,12 @@ export const REGISTRY_CONFIG_HTML: string = `
- - - + + + +
+ + +
+
`; \ No newline at end of file diff --git a/src/ui_ng/lib/src/config/registry-config.component.spec.ts b/src/ui_ng/lib/src/config/registry-config.component.spec.ts index ed1a2894e..65dfc83a3 100644 --- a/src/ui_ng/lib/src/config/registry-config.component.spec.ts +++ b/src/ui_ng/lib/src/config/registry-config.component.spec.ts @@ -8,6 +8,7 @@ import { ReplicationConfigComponent } from './replication/replication-config.com import { SystemSettingsComponent } from './system/system-settings.component'; import { VulnerabilityConfigComponent } from './vulnerability/vulnerability-config.component'; import { RegistryConfigComponent } from './registry-config.component'; +import { ConfirmationDialogComponent } from '../confirmation-dialog/confirmation-dialog.component'; import { ConfigurationService, @@ -29,7 +30,7 @@ describe('RegistryConfigComponent (inline template)', () => { mockConfig.verify_remote_cert.value = true; mockConfig.scan_all_policy.value = { type: "daily", - parameters: { + parameter: { daily_time: 0 } }; @@ -46,7 +47,8 @@ describe('RegistryConfigComponent (inline template)', () => { ReplicationConfigComponent, SystemSettingsComponent, VulnerabilityConfigComponent, - RegistryConfigComponent + RegistryConfigComponent, + ConfirmationDialogComponent ], providers: [ ErrorHandler, diff --git a/src/ui_ng/lib/src/config/registry-config.component.ts b/src/ui_ng/lib/src/config/registry-config.component.ts index f3b21a94c..49c61fbd4 100644 --- a/src/ui_ng/lib/src/config/registry-config.component.ts +++ b/src/ui_ng/lib/src/config/registry-config.component.ts @@ -1,10 +1,22 @@ -import { Component, OnInit, EventEmitter, Output } from '@angular/core'; +import { Component, OnInit, EventEmitter, Output, ViewChild } from '@angular/core'; import { Configuration, ComplexValueItem } from './config'; import { REGISTRY_CONFIG_HTML } from './registry-config.component.html'; import { ConfigurationService } from '../service/index'; import { toPromise } from '../utils'; import { ErrorHandler } from '../error-handler'; +import { + ReplicationConfigComponent, + SystemSettingsComponent, + VulnerabilityConfigComponent +} from './index'; + +import { ConfirmationState, ConfirmationTargets, ConfirmationButtons } from '../shared/shared.const'; +import { ConfirmationDialogComponent } from '../confirmation-dialog/confirmation-dialog.component'; +import { ConfirmationMessage } from '../confirmation-dialog/confirmation-message'; +import { ConfirmationAcknowledgement } from '../confirmation-dialog/confirmation-state-message'; +import { TranslateService } from '@ngx-translate/core'; + @Component({ selector: 'hbr-registry-config', @@ -13,27 +25,56 @@ import { ErrorHandler } from '../error-handler'; export class RegistryConfigComponent implements OnInit { config: Configuration = new Configuration(); configCopy: Configuration; + onGoing: boolean = false; - @Output() configChanged: EventEmitter = new EventEmitter(); + @ViewChild("replicationConfig") replicationCfg: ReplicationConfigComponent; + @ViewChild("systemSettings") systemSettings: SystemSettingsComponent; + @ViewChild("vulnerabilityConfig") vulnerabilityCfg: VulnerabilityConfigComponent; + @ViewChild("cfgConfirmationDialog") confirmationDlg: ConfirmationDialogComponent; constructor( private configService: ConfigurationService, - private errorHandler: ErrorHandler + private errorHandler: ErrorHandler, + private translate: TranslateService ) { } + get shouldDisable(): boolean { + return !this.isValid() || !this.hasChanges() || this.onGoing; + } + ngOnInit(): void { //Initialize this.load(); } + isValid(): boolean { + return this.replicationCfg && + this.replicationCfg.isValid && + this.systemSettings && + this.systemSettings.isValid && + this.vulnerabilityCfg && + this.vulnerabilityCfg.isValid; + } + + hasChanges(): boolean { + return !this._isEmptyObject(this.getChanges()); + } + //Load configurations load(): void { + this.onGoing = true; toPromise(this.configService.getConfigurations()) .then((config: Configuration) => { - this.configCopy = Object.assign({}, config); + this.onGoing = false; + + this.configCopy = this._clone(config); this.config = config; }) - .catch(error => this.errorHandler.error(error)); + .catch(error => { + this.onGoing = false; + + this.errorHandler.error(error); + }); } //Save configuration changes @@ -45,26 +86,48 @@ export class RegistryConfigComponent implements OnInit { return; } - //Fix policy parameters issue - let scanningAllPolicy = changes["scan_all_policy"]; - if (scanningAllPolicy && - scanningAllPolicy.type !== "daily" && - scanningAllPolicy.parameters) { - delete (scanningAllPolicy.parameters); - } - + this.onGoing = true; toPromise(this.configService.saveConfigurations(changes)) .then(() => { - this.configChanged.emit(changes); + this.onGoing = false; + + this.translate.get("CONFIG.SAVE_SUCCESS").subscribe((res: string) => { + this.errorHandler.info(res); + }); + //Reload to fetch all the updates + this.load(); }) - .catch(error => this.errorHandler.error(error)); + .catch(error => { + this.onGoing = false; + this.errorHandler.error(error); + }); + } + + //Cancel the changes if have + cancel(): void { + let msg = new ConfirmationMessage( + "CONFIG.CONFIRM_TITLE", + "CONFIG.CONFIRM_SUMMARY", + "", + {}, + ConfirmationTargets.CONFIG + ); + this.confirmationDlg.open(msg); + } + + //Confirm cancel + confirmCancel(ack: ConfirmationAcknowledgement): void { + if (ack && ack.source === ConfirmationTargets.CONFIG && + ack.state === ConfirmationState.CONFIRMED) { + this.reset(); + } } reset(): void { //Reset to the values of copy let changes: { [key: string]: any | any[] } = this.getChanges(); for (let prop in changes) { - this.config[prop] = Object.assign({}, this.configCopy[prop]); + this.config[prop] = this._clone(this.configCopy[prop]); } } @@ -107,4 +170,10 @@ export class RegistryConfigComponent implements OnInit { _isEmptyObject(obj: any): boolean { return !obj || JSON.stringify(obj) === "{}"; } + + //Deeper clone all + _clone(srcObj: any): any { + if (!srcObj) return null; + return JSON.parse(JSON.stringify(srcObj)); + } } \ No newline at end of file diff --git a/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.template.ts b/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.template.ts index 67c0fc373..361f4cb1b 100644 --- a/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.template.ts +++ b/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.template.ts @@ -5,7 +5,7 @@ export const VULNERABILITY_CONFIG_HTML: string = `
- diff --git a/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.ts b/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.ts index ee57b9894..b8cceb360 100644 --- a/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.ts +++ b/src/ui_ng/lib/src/config/vulnerability/vulnerability-config.component.ts @@ -30,13 +30,13 @@ export class VulnerabilityConfigComponent { this.config = cfg; if (this.config.scan_all_policy && this.config.scan_all_policy.value) { - if (this.config.scan_all_policy.value.type === "daily"){ - if(!this.config.scan_all_policy.value.parameters){ - this.config.scan_all_policy.value.parameters = { - daily_time: 0 - }; - } + if (this.config.scan_all_policy.value.type === "daily") { + if (!this.config.scan_all_policy.value.parameter) { + this.config.scan_all_policy.value.parameter = { + daily_time: 0 + }; } + } } this.configChange.emit(this.config); } @@ -51,8 +51,8 @@ export class VulnerabilityConfigComponent { } let timeOffset: number = 0;//seconds - if (this.config.scan_all_policy.value.parameters) { - let daily_time = this.config.scan_all_policy.value.parameters.daily_time; + if (this.config.scan_all_policy.value.parameter) { + let daily_time = this.config.scan_all_policy.value.parameter.daily_time; if (daily_time && typeof daily_time === "number") { timeOffset = +daily_time; } @@ -99,8 +99,9 @@ export class VulnerabilityConfigComponent { return; } - if (!this.config.scan_all_policy.value.parameters) { - this.config.scan_all_policy.value.parameters = { + //Double confirm inner parameter existing. + if (!this.config.scan_all_policy.value.parameter) { + this.config.scan_all_policy.value.parameter = { daily_time: 0 }; } @@ -124,7 +125,41 @@ export class VulnerabilityConfigComponent { utcTimes -= ONE_DAY_SECONDS; } - this.config.scan_all_policy.value.parameters.daily_time = utcTimes; + this.config.scan_all_policy.value.parameter.daily_time = utcTimes; + } + + //Scanning type + get scanningType(): string { + if (this.config && + this.config.scan_all_policy && + this.config.scan_all_policy.value) { + return this.config.scan_all_policy.value.type; + } else { + //default + return "none"; + } + } + + set scanningType(v: string) { + if (this.config && + this.config.scan_all_policy && + this.config.scan_all_policy.value) { + let type: string = (v && v.trim() !== "") ? v : "none"; + this.config.scan_all_policy.value.type = type; + if (type !== "daily") { + //No parameter + if (this.config.scan_all_policy.value.parameter) { + delete (this.config.scan_all_policy.value.parameter); + } + } else { + //Has parameter + if (!this.config.scan_all_policy.value.parameter) { + this.config.scan_all_policy.value.parameter = { + daily_time: 0 + }; + } + } + } } @ViewChild("systemConfigFrom") systemSettingsForm: NgForm; diff --git a/src/ui_ng/package.json b/src/ui_ng/package.json index 190f33685..56b3f804d 100644 --- a/src/ui_ng/package.json +++ b/src/ui_ng/package.json @@ -31,7 +31,7 @@ "clarity-icons": "^0.9.8", "clarity-ui": "^0.9.8", "core-js": "^2.4.1", - "harbor-ui": "^0.2.40", + "harbor-ui": "^0.2.52", "intl": "^1.2.5", "mutationobserver-shim": "^0.3.2", "ngx-cookie": "^1.0.0", diff --git a/src/ui_ng/src/app/config/config.component.html b/src/ui_ng/src/app/config/config.component.html index 060a4b21e..b5b19707f 100644 --- a/src/ui_ng/src/app/config/config.component.html +++ b/src/ui_ng/src/app/config/config.component.html @@ -16,7 +16,7 @@
@@ -41,7 +41,6 @@ -
\ No newline at end of file diff --git a/src/ui_ng/src/app/config/config.component.ts b/src/ui_ng/src/app/config/config.component.ts index b99734116..f239e8d45 100644 --- a/src/ui_ng/src/app/config/config.component.ts +++ b/src/ui_ng/src/app/config/config.component.ts @@ -71,11 +71,6 @@ export class ConfigurationComponent implements OnInit, OnDestroy { private appConfigService: AppConfigService, private session: SessionService) { } - consoleTest(): void { - console.log(this.allConfig, this.originalCopy); - console.log("-------------"); - console.log(this.getChanges()); - } isCurrentTabLink(tabId: string): boolean { return this.currentTabId === tabId; }