Fix data race issues of go sources
This commit is contained in:
Steven Zou 2017-08-07 21:57:03 +08:00 committed by GitHub
commit b6b232ce6a
12 changed files with 343 additions and 135 deletions

View File

@ -164,8 +164,8 @@ func (nw *NotificationWatcher) Notify(notification Notification) error {
return errors.New("Empty topic can not be notified") return errors.New("Empty topic can not be notified")
} }
nw.RLock()
defer nw.RUnlock() defer nw.RUnlock()
nw.RLock()
var ( var (
indexer HandlerIndexer indexer HandlerIndexer

View File

@ -2,6 +2,8 @@ package policy
import ( import (
"errors" "errors"
"fmt"
"sync"
"time" "time"
"github.com/vmware/harbor/src/common/scheduler/task" "github.com/vmware/harbor/src/common/scheduler/task"
@ -20,16 +22,16 @@ type AlternatePolicyConfiguration struct {
//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope. //AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
type AlternatePolicy struct { type AlternatePolicy struct {
//To sync the related operations.
*sync.RWMutex
//Keep the attached tasks. //Keep the attached tasks.
tasks []task.Task tasks task.Store
//Policy configurations. //Policy configurations.
config *AlternatePolicyConfiguration config *AlternatePolicyConfiguration
//Generate time ticks with specified duration. //To indicated whether policy is enabled or not.
ticker *time.Ticker
//To indicated whether policy is completed.
isEnabled bool isEnabled bool
//Channel used to send evaluation result signals. //Channel used to send evaluation result signals.
@ -45,7 +47,8 @@ type AlternatePolicy struct {
//NewAlternatePolicy is constructor of creating AlternatePolicy. //NewAlternatePolicy is constructor of creating AlternatePolicy.
func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy { func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
return &AlternatePolicy{ return &AlternatePolicy{
tasks: []task.Task{}, RWMutex: new(sync.RWMutex),
tasks: task.NewDefaultStore(),
config: config, config: config,
isEnabled: false, isEnabled: false,
terminator: make(chan bool), terminator: make(chan bool),
@ -64,12 +67,7 @@ func (alp *AlternatePolicy) Name() string {
//Tasks is an implementation of same method in policy interface. //Tasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Tasks() []task.Task { func (alp *AlternatePolicy) Tasks() []task.Task {
copyList := []task.Task{} return alp.tasks.GetTasks()
if alp.tasks != nil && len(alp.tasks) > 0 {
copyList = append(copyList, alp.tasks...)
}
return copyList
} }
//Done is an implementation of same method in policy interface. //Done is an implementation of same method in policy interface.
@ -83,29 +81,42 @@ func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
return errors.New("No tasks can be attached") return errors.New("No tasks can be attached")
} }
alp.tasks = append(alp.tasks, tasks...) alp.tasks.AddTasks(tasks...)
return nil return nil
} }
//Disable is an implementation of same method in policy interface. //Disable is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Disable() error { func (alp *AlternatePolicy) Disable() error {
//Stop the ticker alp.Lock()
if alp.ticker != nil { if !alp.isEnabled {
alp.ticker.Stop() alp.Unlock()
return fmt.Errorf("Instance of policy %s is not enabled", alp.Name())
} }
//Set state to disabled
alp.isEnabled = false
alp.Unlock()
//Stop the evaluation goroutine //Stop the evaluation goroutine
alp.terminator <- true alp.terminator <- true
alp.ticker = nil
return nil return nil
} }
//Evaluate is an implementation of same method in policy interface. //Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
//Lock for state changing
defer alp.Unlock()
alp.Lock()
//Check if policy instance is still running
if alp.isEnabled {
return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name())
}
//Keep idempotent //Keep idempotent
if alp.isEnabled && alp.evaluation != nil { if alp.evaluation != nil {
return alp.evaluation, nil return alp.evaluation, nil
} }
@ -113,9 +124,6 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
alp.evaluation = make(chan bool) alp.evaluation = make(chan bool)
go func() { go func() {
defer func() {
alp.isEnabled = false
}()
timeNow := time.Now().UTC() timeNow := time.Now().UTC()
//Reach the execution time point? //Reach the execution time point?
@ -138,11 +146,19 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
alp.evaluation <- true alp.evaluation <- true
//Start the ticker for repeat checking. //Start the ticker for repeat checking.
alp.ticker = time.NewTicker(alp.config.Duration) tk := time.NewTicker(alp.config.Duration)
defer func() {
if tk != nil {
tk.Stop()
}
}()
for { for {
select { select {
case <-alp.ticker.C: case <-tk.C:
alp.evaluation <- true if alp.IsEnabled() {
alp.evaluation <- true
}
case <-alp.terminator: case <-alp.terminator:
return return
} }
@ -174,3 +190,11 @@ func (alp *AlternatePolicy) Equal(p Policy) bool {
return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime) return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime)
} }
//IsEnabled is an implementation of same method in policy interface.
func (alp *AlternatePolicy) IsEnabled() bool {
defer alp.RUnlock()
alp.RLock()
return alp.isEnabled
}

View File

@ -1,12 +1,13 @@
package policy package policy
import ( import (
"sync/atomic"
"testing" "testing"
"time" "time"
) )
type fakeTask struct { type fakeTask struct {
number int number int32
} }
func (ft *fakeTask) Name() string { func (ft *fakeTask) Name() string {
@ -14,10 +15,14 @@ func (ft *fakeTask) Name() string {
} }
func (ft *fakeTask) Run() error { func (ft *fakeTask) Run() error {
ft.number++ atomic.AddInt32(&(ft.number), 1)
return nil return nil
} }
func (ft *fakeTask) Number() int32 {
return atomic.LoadInt32(&ft.number)
}
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{}) tp := NewAlternatePolicy(&AlternatePolicyConfiguration{})
err := tp.AttachTasks(&fakeTask{number: 100}) err := tp.AttachTasks(&fakeTask{number: 100})
@ -52,18 +57,18 @@ func TestEvaluatePolicy(t *testing.T) {
t.Fail() t.Fail()
} }
ch, _ := tp.Evaluate() ch, _ := tp.Evaluate()
counter := 0 var counter int32
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
select { select {
case <-ch: case <-ch:
counter++ atomic.AddInt32(&counter, 1)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
continue continue
} }
} }
if counter != 3 { if atomic.LoadInt32(&counter) != 3 {
t.Fail() t.Fail()
} }
@ -82,7 +87,7 @@ func TestDisablePolicy(t *testing.T) {
t.Fail() t.Fail()
} }
ch, _ := tp.Evaluate() ch, _ := tp.Evaluate()
counter := 0 var counter int32
terminate := make(chan bool) terminate := make(chan bool)
defer func() { defer func() {
terminate <- true terminate <- true
@ -91,7 +96,7 @@ func TestDisablePolicy(t *testing.T) {
for { for {
select { select {
case <-ch: case <-ch:
counter++ atomic.AddInt32(&counter, 1)
case <-terminate: case <-terminate:
return return
case <-time.After(6 * time.Second): case <-time.After(6 * time.Second):
@ -106,9 +111,10 @@ func TestDisablePolicy(t *testing.T) {
//Waiting for everything is stable //Waiting for everything is stable
<-time.After(1 * time.Second) <-time.After(1 * time.Second)
//Copy value //Copy value
copiedCounter := counter var copiedCounter int32
atomic.StoreInt32(&copiedCounter, atomic.LoadInt32(&counter))
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if counter != copiedCounter { if atomic.LoadInt32(&counter) != atomic.LoadInt32(&copiedCounter) {
t.Fatalf("Policy is still running after calling Disable() %d=%d", copiedCounter, counter) t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter))
} }
} }

View File

@ -41,4 +41,7 @@ type Policy interface {
//needs to support this method. If no need, please directly return false to indicate each policies are //needs to support this method. If no need, please directly return false to indicate each policies are
//different. //different.
Equal(p Policy) bool Equal(p Policy) bool
//IsEnabled is to indicate whether the policy is enabled or not (disabled).
IsEnabled() bool
} }

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
) )
@ -55,14 +56,20 @@ type Configuration struct {
//Scheduler is designed for scheduling policies. //Scheduler is designed for scheduling policies.
type Scheduler struct { type Scheduler struct {
//Mutex for sync controling.
*sync.RWMutex
//Related configuration options for scheduler. //Related configuration options for scheduler.
config *Configuration config *Configuration
//Store to keep the references of scheduled policies. //Store to keep the references of scheduled policies.
policies Store policies Store
//Queue for receiving policy unschedule request or complete signal. //Queue for receiving policy scheduling request
unscheduleQueue chan string scheduleQueue chan *Watcher
//Queue for receiving policy unscheduling request or complete signal.
unscheduleQueue chan *Watcher
//Channel for receiving stat metrics. //Channel for receiving stat metrics.
statChan chan *StatItem statChan chan *StatItem
@ -87,14 +94,17 @@ func NewScheduler(config *Configuration) *Scheduler {
qSize = config.QueueSize qSize = config.QueueSize
} }
usq := make(chan string, qSize) sq := make(chan *Watcher, qSize)
usq := make(chan *Watcher, qSize)
stChan := make(chan *StatItem, 4) stChan := make(chan *StatItem, 4)
tc := make(chan bool, 1) tc := make(chan bool, 1)
store := NewConcurrentStore() store := NewDefaultStore()
return &Scheduler{ return &Scheduler{
RWMutex: new(sync.RWMutex),
config: config, config: config,
policies: store, policies: store,
scheduleQueue: sq,
unscheduleQueue: usq, unscheduleQueue: usq,
statChan: stChan, statChan: stChan,
terminateChan: tc, terminateChan: tc,
@ -110,9 +120,14 @@ func NewScheduler(config *Configuration) *Scheduler {
//Start the scheduler damon. //Start the scheduler damon.
func (sch *Scheduler) Start() { func (sch *Scheduler) Start() {
sch.Lock()
defer sch.Unlock()
//If scheduler is already running
if sch.isRunning { if sch.isRunning {
return return
} }
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -120,12 +135,6 @@ func (sch *Scheduler) Start() {
} }
}() }()
defer func() { defer func() {
//Exit and clear.
sch.isRunning = false
//Stop all watchers.
for _, wt := range sch.policies.GetAll() {
wt.Stop()
}
//Clear resources //Clear resources
sch.policies.Clear() sch.policies.Clear()
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339)) log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
@ -135,13 +144,41 @@ func (sch *Scheduler) Start() {
case <-sch.terminateChan: case <-sch.terminateChan:
//Exit //Exit
return return
case name := <-sch.unscheduleQueue: case wt := <-sch.scheduleQueue:
//Unscheduled when policy is completed. //If status is stopped, no requests should be served
if err := sch.UnSchedule(name); err != nil { if !sch.IsRunning() {
log.Error(err.Error()) continue
} }
go func(watcher *Watcher) {
if watcher != nil && watcher.p != nil {
//Enable it.
watcher.Start()
//Update stats and log info.
log.Infof("Policy %s is scheduled", watcher.p.Name())
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
}
}(wt)
case wt := <-sch.unscheduleQueue:
//If status is stopped, no requests should be served
if !sch.IsRunning() {
continue
}
go func(watcher *Watcher) {
if watcher != nil && watcher.IsRunning() {
watcher.Stop()
//Update stats and log info.
log.Infof("Policy %s is unscheduled", watcher.p.Name())
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
}
}(wt)
case stat := <-sch.statChan: case stat := <-sch.statChan:
{ {
//If status is stopped, no requests should be served
if !sch.IsRunning() {
continue
}
switch stat.Type { switch stat.Type {
case statSchedulePolicy: case statSchedulePolicy:
sch.stats.PolicyCount += stat.Value sch.stats.PolicyCount += stat.Value
@ -183,10 +220,18 @@ func (sch *Scheduler) Start() {
//Stop the scheduler damon. //Stop the scheduler damon.
func (sch *Scheduler) Stop() { func (sch *Scheduler) Stop() {
//Lock for state changing
sch.Lock()
//Check if the scheduler is running
if !sch.isRunning { if !sch.isRunning {
sch.Unlock()
return return
} }
sch.isRunning = false
sch.Unlock()
//Terminate damon to stop receiving signals. //Terminate damon to stop receiving signals.
sch.terminateChan <- true sch.terminateChan <- true
} }
@ -206,21 +251,15 @@ func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
return errors.New("Policy must attach task(s)") return errors.New("Policy must attach task(s)")
} }
if sch.policies.Exists(scheduledPolicy.Name()) { //Try to schedule the policy.
return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name()) //Keep the policy for future use after it's successfully scheduled.
watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue)
if err := sch.policies.Put(scheduledPolicy.Name(), watcher); err != nil {
return err
} }
//Schedule the policy. //Schedule the policy
watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue) sch.scheduleQueue <- watcher
//Enable it.
watcher.Start()
//Keep the policy for future use after it's successfully scheduled.
sch.policies.Put(scheduledPolicy.Name(), watcher)
//Update stats and log info.
log.Infof("Policy %s is scheduled", scheduledPolicy.Name())
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
return nil return nil
} }
@ -231,28 +270,23 @@ func (sch *Scheduler) UnSchedule(policyName string) error {
return errors.New("Empty policy name is invalid") return errors.New("Empty policy name is invalid")
} }
if !sch.policies.Exists(policyName) { //Find the watcher.
watcher := sch.policies.Remove(policyName)
if watcher == nil {
return fmt.Errorf("Policy %s is not existing", policyName) return fmt.Errorf("Policy %s is not existing", policyName)
} }
//Unschedule the policy. //Unschedule the policy.
//Find the watcher. sch.unscheduleQueue <- watcher
watcher := sch.policies.Remove(policyName)
if watcher != nil && watcher.IsRunning() {
watcher.Stop()
//Update stats and log info.
log.Infof("Policy %s is unscheduled", policyName)
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
} else {
log.Warningf("Inconsistent worker status for policy '%s'.\n", policyName)
}
return nil return nil
} }
//IsRunning to indicate whether the scheduler is running. //IsRunning to indicate whether the scheduler is running.
func (sch *Scheduler) IsRunning() bool { func (sch *Scheduler) IsRunning() bool {
sch.RLock()
defer sch.RUnlock()
return sch.isRunning return sch.isRunning
} }
@ -270,3 +304,8 @@ func (sch *Scheduler) GetPolicy(policyName string) policy.Policy {
return nil return nil
} }
//PolicyCount returns the count of currently scheduled policies in the scheduler.
func (sch *Scheduler) PolicyCount() uint32 {
return sch.policies.Size()
}

View File

@ -1,16 +1,16 @@
package scheduler package scheduler
import ( import (
"errors"
"fmt"
"strings" "strings"
"sync" "sync"
) )
//Store define the basic operations for storing and managing policy watcher. //Store define the basic operations for storing and managing policy watcher.
//The concrete implementation should consider concurrent supporting scenario.
//
type Store interface { type Store interface {
//Put a new policy in. //Put a new policy in.
Put(key string, value *Watcher) Put(key string, value *Watcher) error
//Get the corresponding policy with the key. //Get the corresponding policy with the key.
Get(key string) *Watcher Get(key string) *Watcher
@ -31,70 +31,74 @@ type Store interface {
Clear() Clear()
} }
//ConcurrentStore implements Store interface and supports concurrent operations. //DefaultStore implements Store interface to keep the scheduled policies.
type ConcurrentStore struct { //Not support concurrent sync.
//Read-write mutex to synchronize the data map. type DefaultStore struct {
//Support sync locking
*sync.RWMutex *sync.RWMutex
//Map used to keep the policy list. //Map used to keep the policy list.
data map[string]*Watcher data map[string]*Watcher
} }
//NewConcurrentStore is used to create a new store and return the pointer reference. //NewDefaultStore is used to create a new store and return the pointer reference.
func NewConcurrentStore() *ConcurrentStore { func NewDefaultStore() *DefaultStore {
mutex := new(sync.RWMutex) return &DefaultStore{new(sync.RWMutex), make(map[string]*Watcher)}
data := make(map[string]*Watcher)
return &ConcurrentStore{mutex, data}
} }
//Put a policy into store. //Put a policy into store.
func (cs *ConcurrentStore) Put(key string, value *Watcher) { func (cs *DefaultStore) Put(key string, value *Watcher) error {
if strings.TrimSpace(key) == "" || value == nil { if strings.TrimSpace(key) == "" || value == nil {
return return errors.New("Bad arguments")
} }
cs.Lock()
defer cs.Unlock() defer cs.Unlock()
cs.Lock() if _, ok := cs.data[key]; ok {
return fmt.Errorf("Duplicayed policy with name %s", key)
}
cs.data[key] = value cs.data[key] = value
return nil
} }
//Get policy via key. //Get policy via key.
func (cs *ConcurrentStore) Get(key string) *Watcher { func (cs *DefaultStore) Get(key string) *Watcher {
if strings.TrimSpace(key) == "" { if strings.TrimSpace(key) == "" {
return nil return nil
} }
cs.RLock()
defer cs.RUnlock() defer cs.RUnlock()
cs.RLock()
return cs.data[key] return cs.data[key]
} }
//Exists is used to check whether or not the key exists in store. //Exists is used to check whether or not the key exists in store.
func (cs *ConcurrentStore) Exists(key string) bool { func (cs *DefaultStore) Exists(key string) bool {
if strings.TrimSpace(key) == "" { if strings.TrimSpace(key) == "" {
return false return false
} }
cs.RLock()
defer cs.RUnlock() defer cs.RUnlock()
cs.RLock()
_, ok := cs.data[key] _, ok := cs.data[key]
return ok return ok
} }
//Remove is to delete the specified policy. //Remove is to delete the specified policy.
func (cs *ConcurrentStore) Remove(key string) *Watcher { func (cs *DefaultStore) Remove(key string) *Watcher {
if !cs.Exists(key) { if strings.TrimSpace(key) == "" {
return nil return nil
} }
cs.Lock()
defer cs.Unlock() defer cs.Unlock()
cs.Lock()
if wt, ok := cs.data[key]; ok { if wt, ok := cs.data[key]; ok {
delete(cs.data, key) delete(cs.data, key)
return wt return wt
@ -104,16 +108,20 @@ func (cs *ConcurrentStore) Remove(key string) *Watcher {
} }
//Size return the total count of items in store. //Size return the total count of items in store.
func (cs *ConcurrentStore) Size() uint32 { func (cs *DefaultStore) Size() uint32 {
cs.RLock()
defer cs.RUnlock()
return (uint32)(len(cs.data)) return (uint32)(len(cs.data))
} }
//GetAll to get all the items of store. //GetAll to get all the items of store.
func (cs *ConcurrentStore) GetAll() []*Watcher { func (cs *DefaultStore) GetAll() []*Watcher {
cs.RLock()
defer cs.RUnlock()
all := []*Watcher{} all := []*Watcher{}
defer cs.RUnlock()
cs.RLock()
for _, v := range cs.data { for _, v := range cs.data {
all = append(all, v) all = append(all, v)
} }
@ -122,15 +130,16 @@ func (cs *ConcurrentStore) GetAll() []*Watcher {
} }
//Clear all the items in store. //Clear all the items in store.
func (cs *ConcurrentStore) Clear() { func (cs *DefaultStore) Clear() {
if cs.Size() == 0 { cs.Lock()
defer cs.Unlock()
if (uint32)(len(cs.data)) == 0 {
return return
} }
defer cs.Unlock() for k, v := range cs.data {
cs.Lock()
for k := range cs.data {
delete(cs.data, k) delete(cs.data, k)
v.Stop()
} }
} }

View File

@ -5,7 +5,7 @@ import (
) )
func TestPut(t *testing.T) { func TestPut(t *testing.T) {
store := NewConcurrentStore() store := NewDefaultStore()
if store == nil { if store == nil {
t.Fatal("Failed to creat store instance") t.Fatal("Failed to creat store instance")
} }
@ -17,7 +17,7 @@ func TestPut(t *testing.T) {
} }
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
store := NewConcurrentStore() store := NewDefaultStore()
if store == nil { if store == nil {
t.Fatal("Failed to creat store instance") t.Fatal("Failed to creat store instance")
} }
@ -29,7 +29,7 @@ func TestGet(t *testing.T) {
} }
func TestRemove(t *testing.T) { func TestRemove(t *testing.T) {
store := NewConcurrentStore() store := NewDefaultStore()
if store == nil { if store == nil {
t.Fatal("Failed to creat store instance") t.Fatal("Failed to creat store instance")
} }
@ -44,7 +44,7 @@ func TestRemove(t *testing.T) {
} }
func TestExisting(t *testing.T) { func TestExisting(t *testing.T) {
store := NewConcurrentStore() store := NewDefaultStore()
if store == nil { if store == nil {
t.Fatal("Failed to creat store instance") t.Fatal("Failed to creat store instance")
} }
@ -58,7 +58,7 @@ func TestExisting(t *testing.T) {
} }
func TestGetAll(t *testing.T) { func TestGetAll(t *testing.T) {
store := NewConcurrentStore() store := NewDefaultStore()
if store == nil { if store == nil {
t.Fatal("Failed to creat store instance") t.Fatal("Failed to creat store instance")
} }

View File

@ -1,6 +1,7 @@
package scheduler package scheduler
import ( import (
"sync/atomic"
"testing" "testing"
"time" "time"
@ -66,8 +67,12 @@ func (fp *fakePolicy) Equal(policy.Policy) bool {
return false return false
} }
func (fp *fakePolicy) IsEnabled() bool {
return true
}
type fakeTask struct { type fakeTask struct {
number int number int32
} }
func (ft *fakeTask) Name() string { func (ft *fakeTask) Name() string {
@ -75,10 +80,14 @@ func (ft *fakeTask) Name() string {
} }
func (ft *fakeTask) Run() error { func (ft *fakeTask) Run() error {
ft.number++ atomic.AddInt32(&(ft.number), 1)
return nil return nil
} }
func (ft *fakeTask) Number() int32 {
return atomic.LoadInt32(&(ft.number))
}
//Wacher will be tested together with scheduler. //Wacher will be tested together with scheduler.
func TestScheduler(t *testing.T) { func TestScheduler(t *testing.T) {
DefaultScheduler.Start() DefaultScheduler.Start()
@ -111,34 +120,30 @@ func TestScheduler(t *testing.T) {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if fk.number == 100 { if fk.Number() == 100 {
t.Fatal("Task is not triggered") t.Fatal("Task is not triggered")
} }
if DefaultScheduler.stats.Tasks == 0 {
t.Fail()
}
if DefaultScheduler.stats.CompletedTasks == 0 {
t.Fail()
}
if DefaultScheduler.UnSchedule(fp.Name()) != nil { if DefaultScheduler.UnSchedule(fp.Name()) != nil {
t.Fatal("Unschedule policy failed") t.Fatal("Unschedule policy failed")
} }
if DefaultScheduler.policies.Size() != 0 { if DefaultScheduler.PolicyCount() != 0 {
t.Fatal("Policy count does not match after calling UnSchedule()") t.Fatal("Policy count does not match after calling UnSchedule()")
} }
var copiedValue int32
<-time.After(1 * time.Second) <-time.After(1 * time.Second)
copiedValue := DefaultScheduler.stats.CompletedTasks atomic.StoreInt32(&copiedValue, fk.Number())
<-time.After(2 * time.Second) <-time.After(2 * time.Second)
if copiedValue != DefaultScheduler.stats.CompletedTasks { if atomic.LoadInt32(&copiedValue) != fk.Number() {
t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", copiedValue, DefaultScheduler.stats.CompletedTasks) t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", atomic.LoadInt32(&copiedValue), fk.Number())
} }
DefaultScheduler.Stop() DefaultScheduler.Stop()
<-time.After(1 * time.Second) <-time.After(1 * time.Second)
if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() { if DefaultScheduler.PolicyCount() != 0 || DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is still running after stopping") t.Fatal("Scheduler is still running after stopping")
} }
} }

View File

@ -0,0 +1,55 @@
package task
import (
"sync"
)
//Store is designed to keep the tasks.
type Store interface {
//GetTasks return the current existing list in store.
GetTasks() []Task
//AddTasks is used to append tasks to the list.
AddTasks(tasks ...Task)
}
//DefaultStore is the default implemetation of Store interface.
type DefaultStore struct {
//To sync the related operations.
*sync.RWMutex
//The space to keep the tasks.
tasks []Task
}
//NewDefaultStore is constructor method for DefaultStore.
func NewDefaultStore() *DefaultStore {
return &DefaultStore{new(sync.RWMutex), []Task{}}
}
//GetTasks implements the same method in Store interface.
func (ds *DefaultStore) GetTasks() []Task {
copyList := []Task{}
ds.RLock()
defer ds.RUnlock()
if ds.tasks != nil && len(ds.tasks) > 0 {
copyList = append(copyList, ds.tasks...)
}
return copyList
}
//AddTasks implements the same method in Store interface.
func (ds *DefaultStore) AddTasks(tasks ...Task) {
//Double confirm.
if ds.tasks == nil {
ds.tasks = []Task{}
}
ds.Lock()
defer ds.Unlock()
ds.tasks = append(ds.tasks, tasks...)
}

View File

@ -0,0 +1,46 @@
package task
import (
"sync/atomic"
"testing"
"time"
)
func TestTaskList(t *testing.T) {
ds := NewDefaultStore()
if ds.tasks == nil {
t.Fatal("Failed to create store")
}
go func() {
var count int32
for {
ds.AddTasks(NewScanAllTask())
atomic.AddInt32(&count, 1)
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&count) > 9 {
return
}
}
}()
go func() {
var count int32
for {
ds.GetTasks()
atomic.AddInt32(&count, 1)
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt32(&count) > 8 {
return
}
}
}()
<-time.After(2 * time.Second)
var taskCount int32
atomic.StoreInt32(&taskCount, (int32)(len(ds.GetTasks())))
if atomic.LoadInt32(&taskCount) != 10 {
t.Fatalf("Expect %d tasks but got %d", 10, atomic.LoadInt32(&taskCount))
}
}

View File

@ -6,29 +6,34 @@ import (
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"fmt" "fmt"
"sync"
) )
//Watcher is an asynchronous runner to provide an evaluation environment for the policy. //Watcher is an asynchronous runner to provide an evaluation environment for the policy.
type Watcher struct { type Watcher struct {
//Locker to sync related operations.
*sync.RWMutex
//The target policy. //The target policy.
p policy.Policy p policy.Policy
//The channel for receive stop signal. //The channel for receive stop signal.
cmdChan chan bool cmdChan chan bool
//Indicate whether the watch is started and running. //Indicate whether the watcher is started and running.
isRunning bool isRunning bool
//Report stats to scheduler. //Report stats to scheduler.
stats chan *StatItem stats chan *StatItem
//If policy is automatically completed, report the policy to scheduler. //If policy is automatically completed, report the policy to scheduler.
doneChan chan string doneChan chan *Watcher
} }
//NewWatcher is used as a constructor. //NewWatcher is used as a constructor.
func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher { func NewWatcher(p policy.Policy, st chan *StatItem, done chan *Watcher) *Watcher {
return &Watcher{ return &Watcher{
RWMutex: new(sync.RWMutex),
p: p, p: p,
cmdChan: make(chan bool), cmdChan: make(chan bool),
isRunning: false, isRunning: false,
@ -39,6 +44,10 @@ func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher {
//Start the running. //Start the running.
func (wc *Watcher) Start() { func (wc *Watcher) Start() {
//Lock for state changing
wc.Lock()
defer wc.Unlock()
if wc.isRunning { if wc.isRunning {
return return
} }
@ -54,11 +63,6 @@ func (wc *Watcher) Start() {
} }
}() }()
defer func() {
wc.isRunning = false
log.Infof("Work for policy %s is stopped.\n", wc.p.Name())
}()
evalChan, err := pl.Evaluate() evalChan, err := pl.Evaluate()
if err != nil { if err != nil {
log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error()) log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error())
@ -70,6 +74,11 @@ func (wc *Watcher) Start() {
select { select {
case <-evalChan: case <-evalChan:
{ {
//If worker is not running, should not response any requests.
if !wc.IsRunning() {
continue
}
log.Infof("Receive evaluation signal from policy '%s'\n", pl.Name()) log.Infof("Receive evaluation signal from policy '%s'\n", pl.Name())
//Start to run the attached tasks. //Start to run the attached tasks.
for _, t := range pl.Tasks() { for _, t := range pl.Tasks() {
@ -106,7 +115,7 @@ func (wc *Watcher) Start() {
//Policy is automatically completed. //Policy is automatically completed.
//Report policy change stats. //Report policy change stats.
if wc.doneChan != nil { if wc.doneChan != nil {
wc.doneChan <- wc.p.Name() wc.doneChan <- wc
} }
return return
@ -123,19 +132,31 @@ func (wc *Watcher) Start() {
//Stop the running. //Stop the running.
func (wc *Watcher) Stop() { func (wc *Watcher) Stop() {
//Lock for state changing
wc.Lock()
if !wc.isRunning { if !wc.isRunning {
wc.Unlock()
return return
} }
wc.isRunning = false
wc.Unlock()
//Disable policy. //Disable policy.
if wc.p != nil { if wc.p != nil {
wc.p.Disable() wc.p.Disable()
} }
//Stop watcher. //Stop watcher.
wc.cmdChan <- true wc.cmdChan <- true
log.Infof("Worker for policy %s is stopped.\n", wc.p.Name())
} }
//IsRunning to indicate if the watcher is still running. //IsRunning to indicate if the watcher is still running.
func (wc *Watcher) IsRunning() bool { func (wc *Watcher) IsRunning() bool {
wc.RLock()
defer wc.RUnlock()
return wc.isRunning return wc.isRunning
} }

View File

@ -26,7 +26,7 @@ for package in $packages
do do
listDeps $package listDeps $package
go test -cover -coverprofile=profile.tmp -coverpkg "$deps" $package go test -race -cover -coverprofile=profile.tmp -coverpkg "$deps" $package
if [ -f profile.tmp ] if [ -f profile.tmp ]
then then
cat profile.tmp | tail -n +2 >> profile.cov cat profile.tmp | tail -n +2 >> profile.cov