mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-12 10:50:44 +01:00
Merge pull request #2799 from vmware/fix_issue_#2793
Fix issue of detecting configuration changes
This commit is contained in:
commit
e6368ab8a0
39
src/common/notifier/config_watcher.go
Normal file
39
src/common/notifier/config_watcher.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package notifier
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/vmware/harbor/src/common/models"
|
||||||
|
"github.com/vmware/harbor/src/common/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
//WatchConfigChanges is used to watch the configuration changes.
|
||||||
|
func WatchConfigChanges(cfg map[string]interface{}) error {
|
||||||
|
if cfg == nil {
|
||||||
|
return errors.New("Empty configurations")
|
||||||
|
}
|
||||||
|
|
||||||
|
//Currently only watch the scan all policy change.
|
||||||
|
if v, ok := cfg[ScanAllPolicyTopic]; ok {
|
||||||
|
policyCfg := &models.ScanAllPolicy{}
|
||||||
|
if err := utils.ConvertMapToStruct(policyCfg, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
policyNotification := ScanPolicyNotification{
|
||||||
|
Type: policyCfg.Type,
|
||||||
|
DailyTime: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
if t, yes := policyCfg.Parm["daily_time"]; yes {
|
||||||
|
if reflect.TypeOf(t).Kind() == reflect.Int {
|
||||||
|
policyNotification.DailyTime = (int64)(t.(int))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Publish(ScanAllPolicyTopic, policyNotification)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
57
src/common/notifier/config_watcher_test.go
Normal file
57
src/common/notifier/config_watcher_test.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package notifier
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var jsonText = `
|
||||||
|
{
|
||||||
|
"scan_all_policy": {
|
||||||
|
"type": "daily",
|
||||||
|
"parameter": {
|
||||||
|
"daily_time": <PLACE_HOLDER>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestWatchConfiguration(t *testing.T) {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
offset := (now.Hour()+1)*3600 + now.Minute()*60
|
||||||
|
jsonT := strings.Replace(jsonText, "<PLACE_HOLDER>", strconv.Itoa(offset), -1)
|
||||||
|
v := make(map[string]interface{})
|
||||||
|
if err := json.Unmarshal([]byte(jsonT), &v); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := WatchConfigChanges(v); err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "No handlers registered") {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var jsonText2 = `
|
||||||
|
{
|
||||||
|
"scan_all_policy": {
|
||||||
|
"type": "none"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func TestWatchConfiguration2(t *testing.T) {
|
||||||
|
v := make(map[string]interface{})
|
||||||
|
if err := json.Unmarshal([]byte(jsonText2), &v); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := WatchConfigChanges(v); err != nil {
|
||||||
|
if !strings.Contains(err.Error(), "No handlers registered") {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -183,7 +183,6 @@ func (nw *NotificationWatcher) Notify(notification Notification) error {
|
|||||||
//Trigger handlers
|
//Trigger handlers
|
||||||
for _, h := range handlers {
|
for _, h := range handlers {
|
||||||
var handlerChan chan bool
|
var handlerChan chan bool
|
||||||
|
|
||||||
if h.IsStateful() {
|
if h.IsStateful() {
|
||||||
t := reflect.TypeOf(h).String()
|
t := reflect.TypeOf(h).String()
|
||||||
handlerChan = nw.handlerChannels[t].channel
|
handlerChan = nw.handlerChannels[t].channel
|
||||||
|
@ -2,11 +2,14 @@ package notifier
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/vmware/harbor/src/common/scheduler"
|
||||||
)
|
)
|
||||||
|
|
||||||
var statefulData int
|
var statefulData int32
|
||||||
|
|
||||||
type fakeStatefulHandler struct {
|
type fakeStatefulHandler struct {
|
||||||
number int
|
number int
|
||||||
@ -21,7 +24,7 @@ func (fsh *fakeStatefulHandler) Handle(v interface{}) error {
|
|||||||
if v != nil && reflect.TypeOf(v).Kind() == reflect.Int {
|
if v != nil && reflect.TypeOf(v).Kind() == reflect.Int {
|
||||||
increment = v.(int)
|
increment = v.(int)
|
||||||
}
|
}
|
||||||
statefulData += increment
|
atomic.AddInt32(&statefulData, (int32)(increment))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,11 +129,12 @@ func TestPublish(t *testing.T) {
|
|||||||
//Waiting for async is done
|
//Waiting for async is done
|
||||||
<-time.After(1 * time.Second)
|
<-time.After(1 * time.Second)
|
||||||
|
|
||||||
if statefulData != 150 {
|
finalData := atomic.LoadInt32(&statefulData)
|
||||||
t.Fatalf("Expect execution result %d, but got %d", 150, statefulData)
|
if finalData != 150 {
|
||||||
|
t.Fatalf("Expect execution result %d, but got %d", 150, finalData)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = UnSubscribe("topic1", "*notifier.fakeStatefulHandler")
|
err = UnSubscribe("topic1", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -139,4 +143,81 @@ func TestPublish(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Clear stateful data.
|
||||||
|
atomic.StoreInt32(&statefulData, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentPublish(t *testing.T) {
|
||||||
|
err := Subscribe("topic1", &fakeStatefulHandler{0})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(notificationWatcher.handlers) != 1 {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
//Publish in a short interval.
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
Publish("topic1", 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Waiting for async is done
|
||||||
|
<-time.After(1 * time.Second)
|
||||||
|
|
||||||
|
finalData := atomic.LoadInt32(&statefulData)
|
||||||
|
if finalData != 1000 {
|
||||||
|
t.Fatalf("Expect execution result %d, but got %d", 1000, finalData)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = UnSubscribe("topic1", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Clear stateful data.
|
||||||
|
atomic.StoreInt32(&statefulData, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentPublishWithScanPolicyHandler(t *testing.T) {
|
||||||
|
scheduler.DefaultScheduler.Start()
|
||||||
|
if !scheduler.DefaultScheduler.IsRunning() {
|
||||||
|
t.Fatal("Policy scheduler is not started")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := Subscribe("testing_topic", &ScanPolicyNotificationHandler{}); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
if len(notificationWatcher.handlers) != 1 {
|
||||||
|
t.Fatal("Handler is not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
utcTime := time.Now().UTC().Unix()
|
||||||
|
notification := ScanPolicyNotification{"daily", utcTime + 3600}
|
||||||
|
for i := 1; i <= 10; i++ {
|
||||||
|
notification.DailyTime += (int64)(i)
|
||||||
|
if err := Publish("testing_topic", notification); err != nil {
|
||||||
|
t.Fatalf("index=%d, error=%s", i, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Wating for everything is ready.
|
||||||
|
<-time.After(2 * time.Second)
|
||||||
|
|
||||||
|
if err := UnSubscribe("testing_topic", ""); err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(notificationWatcher.handlers) != 0 {
|
||||||
|
t.Fatal("Handler is not unregistered")
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduler.DefaultScheduler.Stop()
|
||||||
|
//Wating for everything is ready.
|
||||||
|
<-time.After(1 * time.Second)
|
||||||
|
if scheduler.DefaultScheduler.IsRunning() {
|
||||||
|
t.Fatal("Policy scheduler is not stopped")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/vmware/harbor/src/common/scheduler"
|
"github.com/vmware/harbor/src/common/scheduler"
|
||||||
@ -15,6 +16,9 @@ const (
|
|||||||
//PolicyTypeDaily specify the policy type is "daily"
|
//PolicyTypeDaily specify the policy type is "daily"
|
||||||
PolicyTypeDaily = "daily"
|
PolicyTypeDaily = "daily"
|
||||||
|
|
||||||
|
//PolicyTypeNone specify the policy type is "none"
|
||||||
|
PolicyTypeNone = "none"
|
||||||
|
|
||||||
alternatePolicy = "Alternate Policy"
|
alternatePolicy = "Alternate Policy"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -53,6 +57,44 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
|
|||||||
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
|
hasScheduled := scheduler.DefaultScheduler.HasScheduled(alternatePolicy)
|
||||||
if notification.Type == PolicyTypeDaily {
|
if notification.Type == PolicyTypeDaily {
|
||||||
if !hasScheduled {
|
if !hasScheduled {
|
||||||
|
//Schedule a new policy.
|
||||||
|
return schedulePolicy(notification)
|
||||||
|
}
|
||||||
|
|
||||||
|
//To check and compare if the related parameter is changed.
|
||||||
|
if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil {
|
||||||
|
policyCandidate := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
|
||||||
|
Duration: 24 * time.Hour,
|
||||||
|
OffsetTime: notification.DailyTime,
|
||||||
|
})
|
||||||
|
if !pl.Equal(policyCandidate) {
|
||||||
|
//Parameter changed.
|
||||||
|
//Unschedule policy.
|
||||||
|
if err := scheduler.DefaultScheduler.UnSchedule(alternatePolicy); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//Schedule a new policy.
|
||||||
|
return schedulePolicy(notification)
|
||||||
|
}
|
||||||
|
//Same policy configuration, do nothing
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.New("Inconsistent policy scheduling status")
|
||||||
|
} else if notification.Type == PolicyTypeNone {
|
||||||
|
if hasScheduled {
|
||||||
|
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("Notification type %s is not supported", notification.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//Schedule policy.
|
||||||
|
func schedulePolicy(notification ScanPolicyNotification) error {
|
||||||
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
|
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
|
||||||
Duration: 24 * time.Hour,
|
Duration: 24 * time.Hour,
|
||||||
OffsetTime: notification.DailyTime,
|
OffsetTime: notification.DailyTime,
|
||||||
@ -62,11 +104,3 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
|
|||||||
|
|
||||||
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
|
return scheduler.DefaultScheduler.Schedule(schedulePolicy)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if hasScheduled {
|
|
||||||
return scheduler.DefaultScheduler.UnSchedule(alternatePolicy)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/vmware/harbor/src/common/scheduler"
|
"github.com/vmware/harbor/src/common/scheduler"
|
||||||
|
"github.com/vmware/harbor/src/common/scheduler/policy"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testingScheduler = scheduler.DefaultScheduler
|
var testingScheduler = scheduler.DefaultScheduler
|
||||||
@ -27,19 +28,37 @@ func TestScanPolicyNotificationHandler(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Waiting for everything is ready.
|
|
||||||
<-time.After(1 * time.Second)
|
|
||||||
if !testingScheduler.HasScheduled("Alternate Policy") {
|
if !testingScheduler.HasScheduled("Alternate Policy") {
|
||||||
t.Fatal("Handler does not work")
|
t.Fatal("Handler does not work")
|
||||||
}
|
}
|
||||||
|
|
||||||
notification2 := ScanPolicyNotification{"none", 0}
|
//Policy parameter changed.
|
||||||
|
notification2 := ScanPolicyNotification{"daily", utcTime + 7200}
|
||||||
if err := handler.Handle(notification2); err != nil {
|
if err := handler.Handle(notification2); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//Waiting for everything is ready.
|
if !testingScheduler.HasScheduled("Alternate Policy") {
|
||||||
<-time.After(1 * time.Second)
|
t.Fatal("Handler does not work [2]")
|
||||||
|
}
|
||||||
|
pl := testingScheduler.GetPolicy("Alternate Policy")
|
||||||
|
if pl == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
spl := pl.(*policy.AlternatePolicy)
|
||||||
|
cfg := spl.GetConfig()
|
||||||
|
if cfg == nil {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if cfg.OffsetTime != utcTime+7200 {
|
||||||
|
t.Fatal("Policy is not updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
notification3 := ScanPolicyNotification{"none", 0}
|
||||||
|
if err := handler.Handle(notification3); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
if testingScheduler.HasScheduled("Alternate Policy") {
|
if testingScheduler.HasScheduled("Alternate Policy") {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@ func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
|
|||||||
tasks: []task.Task{},
|
tasks: []task.Task{},
|
||||||
config: config,
|
config: config,
|
||||||
isEnabled: false,
|
isEnabled: false,
|
||||||
|
terminator: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,7 +109,6 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
alp.done = make(chan bool)
|
alp.done = make(chan bool)
|
||||||
alp.terminator = make(chan bool)
|
|
||||||
alp.evaluation = make(chan bool)
|
alp.evaluation = make(chan bool)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -152,3 +152,23 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
|
|||||||
|
|
||||||
return alp.evaluation, nil
|
return alp.evaluation, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Equal is an implementation of same method in policy interface.
|
||||||
|
func (alp *AlternatePolicy) Equal(p Policy) bool {
|
||||||
|
if p == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
pl, ok := p.(*AlternatePolicy)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := pl.GetConfig()
|
||||||
|
cfg2 := alp.GetConfig()
|
||||||
|
if (cfg == nil && cfg2 != nil) || (cfg != nil && cfg2 == nil) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime)
|
||||||
|
}
|
||||||
|
@ -34,6 +34,11 @@ type Policy interface {
|
|||||||
Evaluate() (<-chan bool, error)
|
Evaluate() (<-chan bool, error)
|
||||||
|
|
||||||
//Disable the enabled policy and release all the allocated resources.
|
//Disable the enabled policy and release all the allocated resources.
|
||||||
//Disable should also send signal to the terminated channel which returned by Done.
|
|
||||||
Disable() error
|
Disable() error
|
||||||
|
|
||||||
|
//Equal will compare the two policies based on related factors if existing such as confgiuration etc.
|
||||||
|
//to determine whether the two policies are same ones or not. Please pay attention that, not every policy
|
||||||
|
//needs to support this method. If no need, please directly return false to indicate each policies are
|
||||||
|
//different.
|
||||||
|
Equal(p Policy) bool
|
||||||
}
|
}
|
||||||
|
@ -61,9 +61,6 @@ type Scheduler struct {
|
|||||||
//Store to keep the references of scheduled policies.
|
//Store to keep the references of scheduled policies.
|
||||||
policies Store
|
policies Store
|
||||||
|
|
||||||
//Queue for accepting the scheduling polices.
|
|
||||||
scheduleQueue chan policy.Policy
|
|
||||||
|
|
||||||
//Queue for receiving policy unschedule request or complete signal.
|
//Queue for receiving policy unschedule request or complete signal.
|
||||||
unscheduleQueue chan string
|
unscheduleQueue chan string
|
||||||
|
|
||||||
@ -90,7 +87,6 @@ func NewScheduler(config *Configuration) *Scheduler {
|
|||||||
qSize = config.QueueSize
|
qSize = config.QueueSize
|
||||||
}
|
}
|
||||||
|
|
||||||
sq := make(chan policy.Policy, qSize)
|
|
||||||
usq := make(chan string, qSize)
|
usq := make(chan string, qSize)
|
||||||
stChan := make(chan *StatItem, 4)
|
stChan := make(chan *StatItem, 4)
|
||||||
tc := make(chan bool, 1)
|
tc := make(chan bool, 1)
|
||||||
@ -99,7 +95,6 @@ func NewScheduler(config *Configuration) *Scheduler {
|
|||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
config: config,
|
config: config,
|
||||||
policies: store,
|
policies: store,
|
||||||
scheduleQueue: sq,
|
|
||||||
unscheduleQueue: usq,
|
unscheduleQueue: usq,
|
||||||
statChan: stChan,
|
statChan: stChan,
|
||||||
terminateChan: tc,
|
terminateChan: tc,
|
||||||
@ -125,33 +120,26 @@ func (sch *Scheduler) Start() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
//Exit and clear.
|
||||||
sch.isRunning = false
|
sch.isRunning = false
|
||||||
|
//Stop all watchers.
|
||||||
|
for _, wt := range sch.policies.GetAll() {
|
||||||
|
wt.Stop()
|
||||||
|
}
|
||||||
|
//Clear resources
|
||||||
|
sch.policies.Clear()
|
||||||
|
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sch.terminateChan:
|
case <-sch.terminateChan:
|
||||||
//Exit
|
//Exit
|
||||||
return
|
return
|
||||||
case p := <-sch.scheduleQueue:
|
|
||||||
//Schedule the policy.
|
|
||||||
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
|
|
||||||
|
|
||||||
//Keep the policy for future use after it's successfully scheduled.
|
|
||||||
sch.policies.Put(p.Name(), watcher)
|
|
||||||
|
|
||||||
//Enable it.
|
|
||||||
watcher.Start()
|
|
||||||
|
|
||||||
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
|
|
||||||
case name := <-sch.unscheduleQueue:
|
case name := <-sch.unscheduleQueue:
|
||||||
//Find the watcher.
|
//Unscheduled when policy is completed.
|
||||||
watcher := sch.policies.Remove(name)
|
if err := sch.UnSchedule(name); err != nil {
|
||||||
if watcher != nil && watcher.IsRunning() {
|
log.Error(err.Error())
|
||||||
watcher.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
|
|
||||||
|
|
||||||
case stat := <-sch.statChan:
|
case stat := <-sch.statChan:
|
||||||
{
|
{
|
||||||
switch stat.Type {
|
switch stat.Type {
|
||||||
@ -199,18 +187,8 @@ func (sch *Scheduler) Stop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//Terminate damon firstly to stop receiving signals.
|
//Terminate damon to stop receiving signals.
|
||||||
sch.terminateChan <- true
|
sch.terminateChan <- true
|
||||||
|
|
||||||
//Stop all watchers.
|
|
||||||
for _, wt := range sch.policies.GetAll() {
|
|
||||||
wt.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
//Clear resources
|
|
||||||
sch.policies.Clear()
|
|
||||||
|
|
||||||
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//Schedule and enable the policy.
|
//Schedule and enable the policy.
|
||||||
@ -233,7 +211,16 @@ func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Schedule the policy.
|
//Schedule the policy.
|
||||||
sch.scheduleQueue <- scheduledPolicy
|
watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue)
|
||||||
|
//Enable it.
|
||||||
|
watcher.Start()
|
||||||
|
|
||||||
|
//Keep the policy for future use after it's successfully scheduled.
|
||||||
|
sch.policies.Put(scheduledPolicy.Name(), watcher)
|
||||||
|
|
||||||
|
//Update stats and log info.
|
||||||
|
log.Infof("Policy %s is scheduled", scheduledPolicy.Name())
|
||||||
|
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -249,7 +236,17 @@ func (sch *Scheduler) UnSchedule(policyName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Unschedule the policy.
|
//Unschedule the policy.
|
||||||
sch.unscheduleQueue <- policyName
|
//Find the watcher.
|
||||||
|
watcher := sch.policies.Remove(policyName)
|
||||||
|
if watcher != nil && watcher.IsRunning() {
|
||||||
|
watcher.Stop()
|
||||||
|
|
||||||
|
//Update stats and log info.
|
||||||
|
log.Infof("Policy %s is unscheduled", policyName)
|
||||||
|
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
|
||||||
|
} else {
|
||||||
|
log.Warningf("Inconsistent worker status for policy '%s'.\n", policyName)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -263,3 +260,13 @@ func (sch *Scheduler) IsRunning() bool {
|
|||||||
func (sch *Scheduler) HasScheduled(policyName string) bool {
|
func (sch *Scheduler) HasScheduled(policyName string) bool {
|
||||||
return sch.policies.Exists(policyName)
|
return sch.policies.Exists(policyName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//GetPolicy is used to get related policy reference by its name.
|
||||||
|
func (sch *Scheduler) GetPolicy(policyName string) policy.Policy {
|
||||||
|
wk := sch.policies.Get(policyName)
|
||||||
|
if wk != nil {
|
||||||
|
return wk.p
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/vmware/harbor/src/common/scheduler/policy"
|
||||||
"github.com/vmware/harbor/src/common/scheduler/task"
|
"github.com/vmware/harbor/src/common/scheduler/task"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -61,6 +62,10 @@ func (fp *fakePolicy) Disable() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fp *fakePolicy) Equal(policy.Policy) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type fakeTask struct {
|
type fakeTask struct {
|
||||||
number int
|
number int
|
||||||
}
|
}
|
||||||
@ -98,13 +103,11 @@ func TestScheduler(t *testing.T) {
|
|||||||
if DefaultScheduler.Schedule(fp) != nil {
|
if DefaultScheduler.Schedule(fp) != nil {
|
||||||
t.Fatal("Schedule policy failed")
|
t.Fatal("Schedule policy failed")
|
||||||
}
|
}
|
||||||
//Waiting for everything is stable
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
if DefaultScheduler.policies.Size() == 0 {
|
if DefaultScheduler.policies.Size() == 0 {
|
||||||
t.Fatal("No policy in the store after calling Schedule()")
|
t.Fatal("No policy in the store after calling Schedule()")
|
||||||
}
|
}
|
||||||
if DefaultScheduler.stats.PolicyCount != 1 {
|
if DefaultScheduler.GetPolicy(fp.Name()) == nil {
|
||||||
t.Fatal("Policy stats do not match")
|
t.Fatal("Failed to get poicy by name")
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
@ -121,12 +124,11 @@ func TestScheduler(t *testing.T) {
|
|||||||
if DefaultScheduler.UnSchedule(fp.Name()) != nil {
|
if DefaultScheduler.UnSchedule(fp.Name()) != nil {
|
||||||
t.Fatal("Unschedule policy failed")
|
t.Fatal("Unschedule policy failed")
|
||||||
}
|
}
|
||||||
//Waiting for everything is stable
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
if DefaultScheduler.stats.PolicyCount != 0 {
|
if DefaultScheduler.policies.Size() != 0 {
|
||||||
t.Fatal("Policy count does not match after calling UnSchedule()")
|
t.Fatal("Policy count does not match after calling UnSchedule()")
|
||||||
}
|
}
|
||||||
|
<-time.After(1 * time.Second)
|
||||||
copiedValue := DefaultScheduler.stats.CompletedTasks
|
copiedValue := DefaultScheduler.stats.CompletedTasks
|
||||||
<-time.After(2 * time.Second)
|
<-time.After(2 * time.Second)
|
||||||
|
|
||||||
|
@ -56,6 +56,7 @@ func (wc *Watcher) Start() {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
wc.isRunning = false
|
wc.isRunning = false
|
||||||
|
log.Infof("Work for policy %s is stopped.\n", wc.p.Name())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
evalChan, err := pl.Evaluate()
|
evalChan, err := pl.Evaluate()
|
||||||
|
@ -16,6 +16,7 @@ package utils
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@ -129,45 +130,21 @@ func ParseTimeStamp(timestamp string) (*time.Time, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//ConvertMapToStruct is used to fill the specified struct with map.
|
//ConvertMapToStruct is used to fill the specified struct with map.
|
||||||
func ConvertMapToStruct(object interface{}, valuesInMap map[string]interface{}) error {
|
func ConvertMapToStruct(object interface{}, values interface{}) error {
|
||||||
if object == nil {
|
if object == nil {
|
||||||
return fmt.Errorf("nil struct is not supported")
|
return errors.New("nil struct is not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
if reflect.TypeOf(object).Kind() != reflect.Ptr {
|
if reflect.TypeOf(object).Kind() != reflect.Ptr {
|
||||||
return fmt.Errorf("object should be referred by pointer")
|
return errors.New("object should be referred by pointer")
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range valuesInMap {
|
bytes, err := json.Marshal(values)
|
||||||
if err := setField(object, k, v); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return json.Unmarshal(bytes, object)
|
||||||
}
|
|
||||||
|
|
||||||
func setField(object interface{}, field string, value interface{}) error {
|
|
||||||
structValue := reflect.ValueOf(object).Elem()
|
|
||||||
|
|
||||||
structFieldValue := structValue.FieldByName(field)
|
|
||||||
if !structFieldValue.IsValid() {
|
|
||||||
return fmt.Errorf("No such field: %s in obj", field)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !structFieldValue.CanSet() {
|
|
||||||
return fmt.Errorf("Cannot set value for field %s", field)
|
|
||||||
}
|
|
||||||
|
|
||||||
structFieldType := structFieldValue.Type()
|
|
||||||
val := reflect.ValueOf(value)
|
|
||||||
if structFieldType != val.Type() {
|
|
||||||
return errors.New("Provided value type didn't match object field type")
|
|
||||||
}
|
|
||||||
|
|
||||||
structFieldValue.Set(val)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseProjectIDOrName parses value to ID(int64) or name(string)
|
// ParseProjectIDOrName parses value to ID(int64) or name(string)
|
||||||
|
@ -192,7 +192,9 @@ func (c *ConfigAPI) Put() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Everything is ok, detect the configurations to confirm if the option we are caring is changed.
|
//Everything is ok, detect the configurations to confirm if the option we are caring is changed.
|
||||||
watchConfigChanges(cfg)
|
if err := watchConfigChanges(cfg); err != nil {
|
||||||
|
log.Errorf("Failed to watch configuration change with error: %s\n", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset system configurations
|
// Reset system configurations
|
||||||
|
@ -17,11 +17,9 @@ package api
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -501,33 +499,7 @@ func transformVulnerabilities(layerWithVuln *models.ClairLayerEnvelope) []*model
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Watch the configuration changes.
|
//Watch the configuration changes.
|
||||||
|
//Wrap the same method in common utils.
|
||||||
func watchConfigChanges(cfg map[string]interface{}) error {
|
func watchConfigChanges(cfg map[string]interface{}) error {
|
||||||
if cfg == nil {
|
return notifier.WatchConfigChanges(cfg)
|
||||||
return errors.New("Empty configurations")
|
|
||||||
}
|
|
||||||
|
|
||||||
//Currently only watch the scan all policy change.
|
|
||||||
if v, ok := cfg[notifier.ScanAllPolicyTopic]; ok {
|
|
||||||
if reflect.TypeOf(v).Kind() == reflect.Map {
|
|
||||||
policyCfg := &models.ScanAllPolicy{}
|
|
||||||
if err := utils.ConvertMapToStruct(policyCfg, v.(map[string]interface{})); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
policyNotification := notifier.ScanPolicyNotification{
|
|
||||||
Type: policyCfg.Type,
|
|
||||||
DailyTime: 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
if t, yes := policyCfg.Parm["daily_time"]; yes {
|
|
||||||
if reflect.TypeOf(t).Kind() == reflect.Int {
|
|
||||||
policyNotification.DailyTime = (int64)(t.(int))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return notifier.Publish(notifier.ScanAllPolicyTopic, policyNotification)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,9 @@ func initRouters() {
|
|||||||
beego.SetStaticPath("/static", "./static")
|
beego.SetStaticPath("/static", "./static")
|
||||||
beego.SetStaticPath("/i18n", "./static/i18n")
|
beego.SetStaticPath("/i18n", "./static/i18n")
|
||||||
|
|
||||||
|
// standalone
|
||||||
|
if !config.WithAdmiral() {
|
||||||
|
//Disable page access in integration mode.
|
||||||
//Page Controllers:
|
//Page Controllers:
|
||||||
beego.Router("/", &controllers.IndexController{})
|
beego.Router("/", &controllers.IndexController{})
|
||||||
beego.Router("/sign-in", &controllers.IndexController{})
|
beego.Router("/sign-in", &controllers.IndexController{})
|
||||||
@ -56,8 +59,6 @@ func initRouters() {
|
|||||||
beego.Router("/harbor/tags", &controllers.IndexController{})
|
beego.Router("/harbor/tags", &controllers.IndexController{})
|
||||||
beego.Router("/harbor/configs", &controllers.IndexController{})
|
beego.Router("/harbor/configs", &controllers.IndexController{})
|
||||||
|
|
||||||
// standalone
|
|
||||||
if !config.WithAdmiral() {
|
|
||||||
beego.Router("/login", &controllers.CommonController{}, "post:Login")
|
beego.Router("/login", &controllers.CommonController{}, "post:Login")
|
||||||
beego.Router("/log_out", &controllers.CommonController{}, "get:LogOut")
|
beego.Router("/log_out", &controllers.CommonController{}, "get:LogOut")
|
||||||
beego.Router("/reset", &controllers.CommonController{}, "post:ResetPassword")
|
beego.Router("/reset", &controllers.CommonController{}, "post:ResetPassword")
|
||||||
|
Loading…
Reference in New Issue
Block a user