diff --git a/src/common/notifier/notifier.go b/src/common/notifier/notifier.go index d68b9a997..d88690b8d 100644 --- a/src/common/notifier/notifier.go +++ b/src/common/notifier/notifier.go @@ -164,8 +164,8 @@ func (nw *NotificationWatcher) Notify(notification Notification) error { return errors.New("Empty topic can not be notified") } - nw.RLock() defer nw.RUnlock() + nw.RLock() var ( indexer HandlerIndexer diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go index d57a103b4..e37b466f4 100644 --- a/src/common/scheduler/policy/alternate_policy.go +++ b/src/common/scheduler/policy/alternate_policy.go @@ -2,6 +2,8 @@ package policy import ( "errors" + "fmt" + "sync" "time" "github.com/vmware/harbor/src/common/scheduler/task" @@ -20,16 +22,16 @@ type AlternatePolicyConfiguration struct { //AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope. type AlternatePolicy struct { + //To sync the related operations. + *sync.RWMutex + //Keep the attached tasks. - tasks []task.Task + tasks task.Store //Policy configurations. config *AlternatePolicyConfiguration - //Generate time ticks with specified duration. - ticker *time.Ticker - - //To indicated whether policy is completed. + //To indicated whether policy is enabled or not. isEnabled bool //Channel used to send evaluation result signals. @@ -45,7 +47,8 @@ type AlternatePolicy struct { //NewAlternatePolicy is constructor of creating AlternatePolicy. func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy { return &AlternatePolicy{ - tasks: []task.Task{}, + RWMutex: new(sync.RWMutex), + tasks: task.NewDefaultStore(), config: config, isEnabled: false, terminator: make(chan bool), @@ -64,12 +67,7 @@ func (alp *AlternatePolicy) Name() string { //Tasks is an implementation of same method in policy interface. func (alp *AlternatePolicy) Tasks() []task.Task { - copyList := []task.Task{} - if alp.tasks != nil && len(alp.tasks) > 0 { - copyList = append(copyList, alp.tasks...) - } - - return copyList + return alp.tasks.GetTasks() } //Done is an implementation of same method in policy interface. @@ -83,29 +81,42 @@ func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error { return errors.New("No tasks can be attached") } - alp.tasks = append(alp.tasks, tasks...) + alp.tasks.AddTasks(tasks...) return nil } //Disable is an implementation of same method in policy interface. func (alp *AlternatePolicy) Disable() error { - //Stop the ticker - if alp.ticker != nil { - alp.ticker.Stop() + alp.Lock() + if !alp.isEnabled { + alp.Unlock() + return fmt.Errorf("Instance of policy %s is not enabled", alp.Name()) } + //Set state to disabled + alp.isEnabled = false + alp.Unlock() + //Stop the evaluation goroutine alp.terminator <- true - alp.ticker = nil return nil } //Evaluate is an implementation of same method in policy interface. func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { + //Lock for state changing + defer alp.Unlock() + alp.Lock() + + //Check if policy instance is still running + if alp.isEnabled { + return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name()) + } + //Keep idempotent - if alp.isEnabled && alp.evaluation != nil { + if alp.evaluation != nil { return alp.evaluation, nil } @@ -113,9 +124,6 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { alp.evaluation = make(chan bool) go func() { - defer func() { - alp.isEnabled = false - }() timeNow := time.Now().UTC() //Reach the execution time point? @@ -138,11 +146,19 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { alp.evaluation <- true //Start the ticker for repeat checking. - alp.ticker = time.NewTicker(alp.config.Duration) + tk := time.NewTicker(alp.config.Duration) + defer func() { + if tk != nil { + tk.Stop() + } + }() + for { select { - case <-alp.ticker.C: - alp.evaluation <- true + case <-tk.C: + if alp.IsEnabled() { + alp.evaluation <- true + } case <-alp.terminator: return } @@ -174,3 +190,11 @@ func (alp *AlternatePolicy) Equal(p Policy) bool { return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime) } + +//IsEnabled is an implementation of same method in policy interface. +func (alp *AlternatePolicy) IsEnabled() bool { + defer alp.RUnlock() + alp.RLock() + + return alp.isEnabled +} diff --git a/src/common/scheduler/policy/alternate_policy_test.go b/src/common/scheduler/policy/alternate_policy_test.go index 5340b04eb..777fe59de 100644 --- a/src/common/scheduler/policy/alternate_policy_test.go +++ b/src/common/scheduler/policy/alternate_policy_test.go @@ -1,12 +1,13 @@ package policy import ( + "sync/atomic" "testing" "time" ) type fakeTask struct { - number int + number int32 } func (ft *fakeTask) Name() string { @@ -14,10 +15,14 @@ func (ft *fakeTask) Name() string { } func (ft *fakeTask) Run() error { - ft.number++ + atomic.AddInt32(&(ft.number), 1) return nil } +func (ft *fakeTask) Number() int32 { + return atomic.LoadInt32(&ft.number) +} + func TestBasic(t *testing.T) { tp := NewAlternatePolicy(&AlternatePolicyConfiguration{}) err := tp.AttachTasks(&fakeTask{number: 100}) @@ -52,18 +57,18 @@ func TestEvaluatePolicy(t *testing.T) { t.Fail() } ch, _ := tp.Evaluate() - counter := 0 + var counter int32 for i := 0; i < 3; i++ { select { case <-ch: - counter++ + atomic.AddInt32(&counter, 1) case <-time.After(2 * time.Second): continue } } - if counter != 3 { + if atomic.LoadInt32(&counter) != 3 { t.Fail() } @@ -82,7 +87,7 @@ func TestDisablePolicy(t *testing.T) { t.Fail() } ch, _ := tp.Evaluate() - counter := 0 + var counter int32 terminate := make(chan bool) defer func() { terminate <- true @@ -91,7 +96,7 @@ func TestDisablePolicy(t *testing.T) { for { select { case <-ch: - counter++ + atomic.AddInt32(&counter, 1) case <-terminate: return case <-time.After(6 * time.Second): @@ -106,9 +111,10 @@ func TestDisablePolicy(t *testing.T) { //Waiting for everything is stable <-time.After(1 * time.Second) //Copy value - copiedCounter := counter + var copiedCounter int32 + atomic.StoreInt32(&copiedCounter, atomic.LoadInt32(&counter)) time.Sleep(2 * time.Second) - if counter != copiedCounter { - t.Fatalf("Policy is still running after calling Disable() %d=%d", copiedCounter, counter) + if atomic.LoadInt32(&counter) != atomic.LoadInt32(&copiedCounter) { + t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter)) } } diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go index 2f50be368..732d56cca 100644 --- a/src/common/scheduler/policy/policy.go +++ b/src/common/scheduler/policy/policy.go @@ -41,4 +41,7 @@ type Policy interface { //needs to support this method. If no need, please directly return false to indicate each policies are //different. Equal(p Policy) bool + + //IsEnabled is to indicate whether the policy is enabled or not (disabled). + IsEnabled() bool } diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go index db2f15c5c..ff58cd750 100644 --- a/src/common/scheduler/scheduler.go +++ b/src/common/scheduler/scheduler.go @@ -8,6 +8,7 @@ import ( "fmt" "reflect" "strings" + "sync" "time" ) @@ -55,14 +56,20 @@ type Configuration struct { //Scheduler is designed for scheduling policies. type Scheduler struct { + //Mutex for sync controling. + *sync.RWMutex + //Related configuration options for scheduler. config *Configuration //Store to keep the references of scheduled policies. policies Store - //Queue for receiving policy unschedule request or complete signal. - unscheduleQueue chan string + //Queue for receiving policy scheduling request + scheduleQueue chan *Watcher + + //Queue for receiving policy unscheduling request or complete signal. + unscheduleQueue chan *Watcher //Channel for receiving stat metrics. statChan chan *StatItem @@ -87,14 +94,17 @@ func NewScheduler(config *Configuration) *Scheduler { qSize = config.QueueSize } - usq := make(chan string, qSize) + sq := make(chan *Watcher, qSize) + usq := make(chan *Watcher, qSize) stChan := make(chan *StatItem, 4) tc := make(chan bool, 1) - store := NewConcurrentStore() + store := NewDefaultStore() return &Scheduler{ + RWMutex: new(sync.RWMutex), config: config, policies: store, + scheduleQueue: sq, unscheduleQueue: usq, statChan: stChan, terminateChan: tc, @@ -110,9 +120,14 @@ func NewScheduler(config *Configuration) *Scheduler { //Start the scheduler damon. func (sch *Scheduler) Start() { + sch.Lock() + defer sch.Unlock() + + //If scheduler is already running if sch.isRunning { return } + go func() { defer func() { if r := recover(); r != nil { @@ -120,12 +135,6 @@ func (sch *Scheduler) Start() { } }() defer func() { - //Exit and clear. - sch.isRunning = false - //Stop all watchers. - for _, wt := range sch.policies.GetAll() { - wt.Stop() - } //Clear resources sch.policies.Clear() log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339)) @@ -135,13 +144,41 @@ func (sch *Scheduler) Start() { case <-sch.terminateChan: //Exit return - case name := <-sch.unscheduleQueue: - //Unscheduled when policy is completed. - if err := sch.UnSchedule(name); err != nil { - log.Error(err.Error()) + case wt := <-sch.scheduleQueue: + //If status is stopped, no requests should be served + if !sch.IsRunning() { + continue } + go func(watcher *Watcher) { + if watcher != nil && watcher.p != nil { + //Enable it. + watcher.Start() + + //Update stats and log info. + log.Infof("Policy %s is scheduled", watcher.p.Name()) + sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} + } + }(wt) + case wt := <-sch.unscheduleQueue: + //If status is stopped, no requests should be served + if !sch.IsRunning() { + continue + } + go func(watcher *Watcher) { + if watcher != nil && watcher.IsRunning() { + watcher.Stop() + + //Update stats and log info. + log.Infof("Policy %s is unscheduled", watcher.p.Name()) + sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} + } + }(wt) case stat := <-sch.statChan: { + //If status is stopped, no requests should be served + if !sch.IsRunning() { + continue + } switch stat.Type { case statSchedulePolicy: sch.stats.PolicyCount += stat.Value @@ -183,10 +220,18 @@ func (sch *Scheduler) Start() { //Stop the scheduler damon. func (sch *Scheduler) Stop() { + //Lock for state changing + sch.Lock() + + //Check if the scheduler is running if !sch.isRunning { + sch.Unlock() return } + sch.isRunning = false + sch.Unlock() + //Terminate damon to stop receiving signals. sch.terminateChan <- true } @@ -206,21 +251,15 @@ func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error { return errors.New("Policy must attach task(s)") } - if sch.policies.Exists(scheduledPolicy.Name()) { - return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name()) + //Try to schedule the policy. + //Keep the policy for future use after it's successfully scheduled. + watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue) + if err := sch.policies.Put(scheduledPolicy.Name(), watcher); err != nil { + return err } - //Schedule the policy. - watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue) - //Enable it. - watcher.Start() - - //Keep the policy for future use after it's successfully scheduled. - sch.policies.Put(scheduledPolicy.Name(), watcher) - - //Update stats and log info. - log.Infof("Policy %s is scheduled", scheduledPolicy.Name()) - sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} + //Schedule the policy + sch.scheduleQueue <- watcher return nil } @@ -231,28 +270,23 @@ func (sch *Scheduler) UnSchedule(policyName string) error { return errors.New("Empty policy name is invalid") } - if !sch.policies.Exists(policyName) { + //Find the watcher. + watcher := sch.policies.Remove(policyName) + if watcher == nil { return fmt.Errorf("Policy %s is not existing", policyName) } //Unschedule the policy. - //Find the watcher. - watcher := sch.policies.Remove(policyName) - if watcher != nil && watcher.IsRunning() { - watcher.Stop() - - //Update stats and log info. - log.Infof("Policy %s is unscheduled", policyName) - sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} - } else { - log.Warningf("Inconsistent worker status for policy '%s'.\n", policyName) - } + sch.unscheduleQueue <- watcher return nil } //IsRunning to indicate whether the scheduler is running. func (sch *Scheduler) IsRunning() bool { + sch.RLock() + defer sch.RUnlock() + return sch.isRunning } @@ -270,3 +304,8 @@ func (sch *Scheduler) GetPolicy(policyName string) policy.Policy { return nil } + +//PolicyCount returns the count of currently scheduled policies in the scheduler. +func (sch *Scheduler) PolicyCount() uint32 { + return sch.policies.Size() +} diff --git a/src/common/scheduler/scheduler_store.go b/src/common/scheduler/scheduler_store.go index 9a94d3b70..dac97cc9d 100644 --- a/src/common/scheduler/scheduler_store.go +++ b/src/common/scheduler/scheduler_store.go @@ -1,16 +1,16 @@ package scheduler import ( + "errors" + "fmt" "strings" "sync" ) //Store define the basic operations for storing and managing policy watcher. -//The concrete implementation should consider concurrent supporting scenario. -// type Store interface { //Put a new policy in. - Put(key string, value *Watcher) + Put(key string, value *Watcher) error //Get the corresponding policy with the key. Get(key string) *Watcher @@ -31,70 +31,74 @@ type Store interface { Clear() } -//ConcurrentStore implements Store interface and supports concurrent operations. -type ConcurrentStore struct { - //Read-write mutex to synchronize the data map. +//DefaultStore implements Store interface to keep the scheduled policies. +//Not support concurrent sync. +type DefaultStore struct { + //Support sync locking *sync.RWMutex //Map used to keep the policy list. data map[string]*Watcher } -//NewConcurrentStore is used to create a new store and return the pointer reference. -func NewConcurrentStore() *ConcurrentStore { - mutex := new(sync.RWMutex) - data := make(map[string]*Watcher) - - return &ConcurrentStore{mutex, data} +//NewDefaultStore is used to create a new store and return the pointer reference. +func NewDefaultStore() *DefaultStore { + return &DefaultStore{new(sync.RWMutex), make(map[string]*Watcher)} } //Put a policy into store. -func (cs *ConcurrentStore) Put(key string, value *Watcher) { +func (cs *DefaultStore) Put(key string, value *Watcher) error { if strings.TrimSpace(key) == "" || value == nil { - return + return errors.New("Bad arguments") } + cs.Lock() defer cs.Unlock() - cs.Lock() + if _, ok := cs.data[key]; ok { + return fmt.Errorf("Duplicayed policy with name %s", key) + } + cs.data[key] = value + + return nil } //Get policy via key. -func (cs *ConcurrentStore) Get(key string) *Watcher { +func (cs *DefaultStore) Get(key string) *Watcher { if strings.TrimSpace(key) == "" { return nil } + cs.RLock() defer cs.RUnlock() - cs.RLock() return cs.data[key] } //Exists is used to check whether or not the key exists in store. -func (cs *ConcurrentStore) Exists(key string) bool { +func (cs *DefaultStore) Exists(key string) bool { if strings.TrimSpace(key) == "" { return false } + cs.RLock() defer cs.RUnlock() - cs.RLock() _, ok := cs.data[key] return ok } //Remove is to delete the specified policy. -func (cs *ConcurrentStore) Remove(key string) *Watcher { - if !cs.Exists(key) { +func (cs *DefaultStore) Remove(key string) *Watcher { + if strings.TrimSpace(key) == "" { return nil } + cs.Lock() defer cs.Unlock() - cs.Lock() if wt, ok := cs.data[key]; ok { delete(cs.data, key) return wt @@ -104,16 +108,20 @@ func (cs *ConcurrentStore) Remove(key string) *Watcher { } //Size return the total count of items in store. -func (cs *ConcurrentStore) Size() uint32 { +func (cs *DefaultStore) Size() uint32 { + cs.RLock() + defer cs.RUnlock() + return (uint32)(len(cs.data)) } //GetAll to get all the items of store. -func (cs *ConcurrentStore) GetAll() []*Watcher { +func (cs *DefaultStore) GetAll() []*Watcher { + cs.RLock() + defer cs.RUnlock() + all := []*Watcher{} - defer cs.RUnlock() - cs.RLock() for _, v := range cs.data { all = append(all, v) } @@ -122,15 +130,16 @@ func (cs *ConcurrentStore) GetAll() []*Watcher { } //Clear all the items in store. -func (cs *ConcurrentStore) Clear() { - if cs.Size() == 0 { +func (cs *DefaultStore) Clear() { + cs.Lock() + defer cs.Unlock() + + if (uint32)(len(cs.data)) == 0 { return } - defer cs.Unlock() - cs.Lock() - - for k := range cs.data { + for k, v := range cs.data { delete(cs.data, k) + v.Stop() } } diff --git a/src/common/scheduler/scheduler_store_test.go b/src/common/scheduler/scheduler_store_test.go index 11dc1d8c2..75aa38bd4 100644 --- a/src/common/scheduler/scheduler_store_test.go +++ b/src/common/scheduler/scheduler_store_test.go @@ -5,7 +5,7 @@ import ( ) func TestPut(t *testing.T) { - store := NewConcurrentStore() + store := NewDefaultStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -17,7 +17,7 @@ func TestPut(t *testing.T) { } func TestGet(t *testing.T) { - store := NewConcurrentStore() + store := NewDefaultStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -29,7 +29,7 @@ func TestGet(t *testing.T) { } func TestRemove(t *testing.T) { - store := NewConcurrentStore() + store := NewDefaultStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -44,7 +44,7 @@ func TestRemove(t *testing.T) { } func TestExisting(t *testing.T) { - store := NewConcurrentStore() + store := NewDefaultStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -58,7 +58,7 @@ func TestExisting(t *testing.T) { } func TestGetAll(t *testing.T) { - store := NewConcurrentStore() + store := NewDefaultStore() if store == nil { t.Fatal("Failed to creat store instance") } diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go index 4e7379494..262f17a3c 100644 --- a/src/common/scheduler/scheduler_test.go +++ b/src/common/scheduler/scheduler_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "sync/atomic" "testing" "time" @@ -66,8 +67,12 @@ func (fp *fakePolicy) Equal(policy.Policy) bool { return false } +func (fp *fakePolicy) IsEnabled() bool { + return true +} + type fakeTask struct { - number int + number int32 } func (ft *fakeTask) Name() string { @@ -75,10 +80,14 @@ func (ft *fakeTask) Name() string { } func (ft *fakeTask) Run() error { - ft.number++ + atomic.AddInt32(&(ft.number), 1) return nil } +func (ft *fakeTask) Number() int32 { + return atomic.LoadInt32(&(ft.number)) +} + //Wacher will be tested together with scheduler. func TestScheduler(t *testing.T) { DefaultScheduler.Start() @@ -111,34 +120,30 @@ func TestScheduler(t *testing.T) { } time.Sleep(2 * time.Second) - if fk.number == 100 { + if fk.Number() == 100 { t.Fatal("Task is not triggered") } - if DefaultScheduler.stats.Tasks == 0 { - t.Fail() - } - if DefaultScheduler.stats.CompletedTasks == 0 { - t.Fail() - } if DefaultScheduler.UnSchedule(fp.Name()) != nil { t.Fatal("Unschedule policy failed") } - if DefaultScheduler.policies.Size() != 0 { + if DefaultScheduler.PolicyCount() != 0 { t.Fatal("Policy count does not match after calling UnSchedule()") } + + var copiedValue int32 <-time.After(1 * time.Second) - copiedValue := DefaultScheduler.stats.CompletedTasks + atomic.StoreInt32(&copiedValue, fk.Number()) <-time.After(2 * time.Second) - if copiedValue != DefaultScheduler.stats.CompletedTasks { - t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", copiedValue, DefaultScheduler.stats.CompletedTasks) + if atomic.LoadInt32(&copiedValue) != fk.Number() { + t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", atomic.LoadInt32(&copiedValue), fk.Number()) } DefaultScheduler.Stop() <-time.After(1 * time.Second) - if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() { + if DefaultScheduler.PolicyCount() != 0 || DefaultScheduler.IsRunning() { t.Fatal("Scheduler is still running after stopping") } } diff --git a/src/common/scheduler/task/task_list.go b/src/common/scheduler/task/task_list.go new file mode 100644 index 000000000..2c5d4eb40 --- /dev/null +++ b/src/common/scheduler/task/task_list.go @@ -0,0 +1,55 @@ +package task + +import ( + "sync" +) + +//Store is designed to keep the tasks. +type Store interface { + //GetTasks return the current existing list in store. + GetTasks() []Task + + //AddTasks is used to append tasks to the list. + AddTasks(tasks ...Task) +} + +//DefaultStore is the default implemetation of Store interface. +type DefaultStore struct { + //To sync the related operations. + *sync.RWMutex + + //The space to keep the tasks. + tasks []Task +} + +//NewDefaultStore is constructor method for DefaultStore. +func NewDefaultStore() *DefaultStore { + return &DefaultStore{new(sync.RWMutex), []Task{}} +} + +//GetTasks implements the same method in Store interface. +func (ds *DefaultStore) GetTasks() []Task { + copyList := []Task{} + + ds.RLock() + defer ds.RUnlock() + + if ds.tasks != nil && len(ds.tasks) > 0 { + copyList = append(copyList, ds.tasks...) + } + + return copyList +} + +//AddTasks implements the same method in Store interface. +func (ds *DefaultStore) AddTasks(tasks ...Task) { + //Double confirm. + if ds.tasks == nil { + ds.tasks = []Task{} + } + + ds.Lock() + defer ds.Unlock() + + ds.tasks = append(ds.tasks, tasks...) +} diff --git a/src/common/scheduler/task/task_list_test.go b/src/common/scheduler/task/task_list_test.go new file mode 100644 index 000000000..a874ab255 --- /dev/null +++ b/src/common/scheduler/task/task_list_test.go @@ -0,0 +1,46 @@ +package task + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestTaskList(t *testing.T) { + ds := NewDefaultStore() + if ds.tasks == nil { + t.Fatal("Failed to create store") + } + + go func() { + var count int32 + for { + ds.AddTasks(NewScanAllTask()) + atomic.AddInt32(&count, 1) + time.Sleep(100 * time.Millisecond) + if atomic.LoadInt32(&count) > 9 { + return + } + } + }() + go func() { + var count int32 + for { + ds.GetTasks() + atomic.AddInt32(&count, 1) + time.Sleep(100 * time.Millisecond) + if atomic.LoadInt32(&count) > 8 { + return + } + } + }() + + <-time.After(2 * time.Second) + + var taskCount int32 + atomic.StoreInt32(&taskCount, (int32)(len(ds.GetTasks()))) + + if atomic.LoadInt32(&taskCount) != 10 { + t.Fatalf("Expect %d tasks but got %d", 10, atomic.LoadInt32(&taskCount)) + } +} diff --git a/src/common/scheduler/watcher.go b/src/common/scheduler/watcher.go index 23067dab5..8f133c70b 100644 --- a/src/common/scheduler/watcher.go +++ b/src/common/scheduler/watcher.go @@ -6,29 +6,34 @@ import ( "github.com/vmware/harbor/src/common/utils/log" "fmt" + "sync" ) //Watcher is an asynchronous runner to provide an evaluation environment for the policy. type Watcher struct { + //Locker to sync related operations. + *sync.RWMutex + //The target policy. p policy.Policy //The channel for receive stop signal. cmdChan chan bool - //Indicate whether the watch is started and running. + //Indicate whether the watcher is started and running. isRunning bool //Report stats to scheduler. stats chan *StatItem //If policy is automatically completed, report the policy to scheduler. - doneChan chan string + doneChan chan *Watcher } //NewWatcher is used as a constructor. -func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher { +func NewWatcher(p policy.Policy, st chan *StatItem, done chan *Watcher) *Watcher { return &Watcher{ + RWMutex: new(sync.RWMutex), p: p, cmdChan: make(chan bool), isRunning: false, @@ -39,6 +44,10 @@ func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher { //Start the running. func (wc *Watcher) Start() { + //Lock for state changing + wc.Lock() + defer wc.Unlock() + if wc.isRunning { return } @@ -54,11 +63,6 @@ func (wc *Watcher) Start() { } }() - defer func() { - wc.isRunning = false - log.Infof("Work for policy %s is stopped.\n", wc.p.Name()) - }() - evalChan, err := pl.Evaluate() if err != nil { log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error()) @@ -70,6 +74,11 @@ func (wc *Watcher) Start() { select { case <-evalChan: { + //If worker is not running, should not response any requests. + if !wc.IsRunning() { + continue + } + log.Infof("Receive evaluation signal from policy '%s'\n", pl.Name()) //Start to run the attached tasks. for _, t := range pl.Tasks() { @@ -106,7 +115,7 @@ func (wc *Watcher) Start() { //Policy is automatically completed. //Report policy change stats. if wc.doneChan != nil { - wc.doneChan <- wc.p.Name() + wc.doneChan <- wc } return @@ -123,19 +132,31 @@ func (wc *Watcher) Start() { //Stop the running. func (wc *Watcher) Stop() { + //Lock for state changing + wc.Lock() if !wc.isRunning { + wc.Unlock() return } + wc.isRunning = false + wc.Unlock() + //Disable policy. if wc.p != nil { wc.p.Disable() } + //Stop watcher. wc.cmdChan <- true + + log.Infof("Worker for policy %s is stopped.\n", wc.p.Name()) } //IsRunning to indicate if the watcher is still running. func (wc *Watcher) IsRunning() bool { + wc.RLock() + defer wc.RUnlock() + return wc.isRunning } diff --git a/tests/coverage4gotest.sh b/tests/coverage4gotest.sh index fa025a6b5..b29ebf66e 100755 --- a/tests/coverage4gotest.sh +++ b/tests/coverage4gotest.sh @@ -26,7 +26,7 @@ for package in $packages do listDeps $package - go test -cover -coverprofile=profile.tmp -coverpkg "$deps" $package + go test -race -cover -coverprofile=profile.tmp -coverpkg "$deps" $package if [ -f profile.tmp ] then cat profile.tmp | tail -n +2 >> profile.cov