From 82e17fade119a47520630bbdd50dc6ba7ed41549 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 5 Jul 2017 19:13:49 +0800 Subject: [PATCH] refine scheduler according to review comments --- .../scheduler/policy/alternate_policy.go | 9 ++-- .../scheduler/policy/alternate_policy_test.go | 2 +- src/common/scheduler/policy/policy.go | 4 +- src/common/scheduler/scheduler.go | 40 +++++++++--------- src/common/scheduler/scheduler_store.go | 42 +++++++++---------- src/common/scheduler/scheduler_store_test.go | 10 ++--- src/common/scheduler/scheduler_test.go | 4 +- src/common/scheduler/task/scan_all_task.go | 8 ++-- .../scheduler/task/scan_all_task_test.go | 6 ++- src/common/scheduler/task/task.go | 4 +- src/common/scheduler/watcher.go | 11 ++++- 11 files changed, 77 insertions(+), 63 deletions(-) diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go index 2b7faa5fb0..a126f74689 100644 --- a/src/common/scheduler/policy/alternate_policy.go +++ b/src/common/scheduler/policy/alternate_policy.go @@ -1,8 +1,11 @@ package policy -import "github.com/vmware/harbor/src/common/scheduler/task" -import "errors" -import "time" +import ( + "errors" + "time" + + "github.com/vmware/harbor/src/common/scheduler/task" +) //AlternatePolicyConfiguration store the related configurations for alternate policy. type AlternatePolicyConfiguration struct { diff --git a/src/common/scheduler/policy/alternate_policy_test.go b/src/common/scheduler/policy/alternate_policy_test.go index 071ad30d1f..3dc252ad64 100644 --- a/src/common/scheduler/policy/alternate_policy_test.go +++ b/src/common/scheduler/policy/alternate_policy_test.go @@ -9,7 +9,7 @@ type fakeTask struct { number int } -func (ft *fakeTask) TaskName() string { +func (ft *fakeTask) Name() string { return "for testing" } diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go index eb248679cd..38ba2da4e7 100644 --- a/src/common/scheduler/policy/policy.go +++ b/src/common/scheduler/policy/policy.go @@ -1,6 +1,8 @@ package policy -import "github.com/vmware/harbor/src/common/scheduler/task" +import ( + "github.com/vmware/harbor/src/common/scheduler/task" +) //Policy is an if-then logic to determine how the attached tasks should be //executed based on the evaluation result of the defined conditions. diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go index 66bdeab523..38e5b204e0 100644 --- a/src/common/scheduler/scheduler.go +++ b/src/common/scheduler/scheduler.go @@ -1,13 +1,15 @@ package scheduler -import "github.com/vmware/harbor/src/common/scheduler/policy" -import "github.com/vmware/harbor/src/common/utils/log" +import ( + "github.com/vmware/harbor/src/common/scheduler/policy" + "github.com/vmware/harbor/src/common/utils/log" -import "errors" -import "strings" -import "fmt" -import "reflect" -import "time" + "errors" + "fmt" + "reflect" + "strings" + "time" +) const ( defaultQueueSize = 10 @@ -74,8 +76,8 @@ type Scheduler struct { //The stat metrics of scheduler. stats *StatSummary - //To indicate whether scheduler is stopped or not - stopped bool + //To indicate whether scheduler is running or not + isRunning bool } //DefaultScheduler is a default scheduler. @@ -93,7 +95,7 @@ func NewScheduler(config *Configuration) *Scheduler { stChan := make(chan *StatItem, 4) tc := make(chan bool, 2) - store := NewConcurrentStore(10) + store := NewConcurrentStore() return &Scheduler{ config: config, policies: store, @@ -107,13 +109,13 @@ func NewScheduler(config *Configuration) *Scheduler { CompletedTasks: 0, TasksWithError: 0, }, - stopped: true, + isRunning: false, } } //Start the scheduler damon. func (sch *Scheduler) Start() { - if !sch.stopped { + if sch.isRunning { return } go func() { @@ -123,7 +125,7 @@ func (sch *Scheduler) Start() { } }() defer func() { - sch.stopped = true + sch.isRunning = false }() for { select { @@ -187,13 +189,13 @@ func (sch *Scheduler) Start() { } }() - sch.stopped = false + sch.isRunning = true log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339)) } //Stop the scheduler damon. func (sch *Scheduler) Stop() { - if sch.stopped { + if !sch.isRunning { return } @@ -227,7 +229,7 @@ func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error { } if sch.policies.Exists(scheduledPolicy.Name()) { - return errors.New("Duplicated policy") + return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name()) } //Schedule the policy. @@ -252,7 +254,7 @@ func (sch *Scheduler) UnSchedule(policyName string) error { return nil } -//IsStopped to indicate whether the scheduler is stopped -func (sch *Scheduler) IsStopped() bool { - return sch.stopped +//IsRunning to indicate whether the scheduler is running. +func (sch *Scheduler) IsRunning() bool { + return sch.isRunning } diff --git a/src/common/scheduler/scheduler_store.go b/src/common/scheduler/scheduler_store.go index a2d690947b..9a94d3b701 100644 --- a/src/common/scheduler/scheduler_store.go +++ b/src/common/scheduler/scheduler_store.go @@ -1,9 +1,9 @@ package scheduler -import "sync" -import "strings" - -const defaultSize = 10 +import ( + "strings" + "sync" +) //Store define the basic operations for storing and managing policy watcher. //The concrete implementation should consider concurrent supporting scenario. @@ -34,20 +34,16 @@ type Store interface { //ConcurrentStore implements Store interface and supports concurrent operations. type ConcurrentStore struct { //Read-write mutex to synchronize the data map. - mutex *sync.RWMutex + *sync.RWMutex //Map used to keep the policy list. data map[string]*Watcher } //NewConcurrentStore is used to create a new store and return the pointer reference. -func NewConcurrentStore(initialSize uint32) *ConcurrentStore { - var initSize uint32 = defaultSize - if initialSize > 0 { - initSize = initialSize - } +func NewConcurrentStore() *ConcurrentStore { mutex := new(sync.RWMutex) - data := make(map[string]*Watcher, initSize) + data := make(map[string]*Watcher) return &ConcurrentStore{mutex, data} } @@ -58,9 +54,9 @@ func (cs *ConcurrentStore) Put(key string, value *Watcher) { return } - defer cs.mutex.Unlock() + defer cs.Unlock() - cs.mutex.Lock() + cs.Lock() cs.data[key] = value } @@ -70,9 +66,9 @@ func (cs *ConcurrentStore) Get(key string) *Watcher { return nil } - defer cs.mutex.RUnlock() + defer cs.RUnlock() - cs.mutex.RLock() + cs.RLock() return cs.data[key] } @@ -82,9 +78,9 @@ func (cs *ConcurrentStore) Exists(key string) bool { return false } - defer cs.mutex.RUnlock() + defer cs.RUnlock() - cs.mutex.RLock() + cs.RLock() _, ok := cs.data[key] return ok @@ -96,9 +92,9 @@ func (cs *ConcurrentStore) Remove(key string) *Watcher { return nil } - defer cs.mutex.Unlock() + defer cs.Unlock() - cs.mutex.Lock() + cs.Lock() if wt, ok := cs.data[key]; ok { delete(cs.data, key) return wt @@ -116,8 +112,8 @@ func (cs *ConcurrentStore) Size() uint32 { func (cs *ConcurrentStore) GetAll() []*Watcher { all := []*Watcher{} - defer cs.mutex.RUnlock() - cs.mutex.RLock() + defer cs.RUnlock() + cs.RLock() for _, v := range cs.data { all = append(all, v) } @@ -131,8 +127,8 @@ func (cs *ConcurrentStore) Clear() { return } - defer cs.mutex.Unlock() - cs.mutex.Lock() + defer cs.Unlock() + cs.Lock() for k := range cs.data { delete(cs.data, k) diff --git a/src/common/scheduler/scheduler_store_test.go b/src/common/scheduler/scheduler_store_test.go index 828615a334..11dc1d8c27 100644 --- a/src/common/scheduler/scheduler_store_test.go +++ b/src/common/scheduler/scheduler_store_test.go @@ -5,7 +5,7 @@ import ( ) func TestPut(t *testing.T) { - store := NewConcurrentStore(10) + store := NewConcurrentStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -17,7 +17,7 @@ func TestPut(t *testing.T) { } func TestGet(t *testing.T) { - store := NewConcurrentStore(10) + store := NewConcurrentStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -29,7 +29,7 @@ func TestGet(t *testing.T) { } func TestRemove(t *testing.T) { - store := NewConcurrentStore(10) + store := NewConcurrentStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -44,7 +44,7 @@ func TestRemove(t *testing.T) { } func TestExisting(t *testing.T) { - store := NewConcurrentStore(10) + store := NewConcurrentStore() if store == nil { t.Fatal("Failed to creat store instance") } @@ -58,7 +58,7 @@ func TestExisting(t *testing.T) { } func TestGetAll(t *testing.T) { - store := NewConcurrentStore(10) + store := NewConcurrentStore() if store == nil { t.Fatal("Failed to creat store instance") } diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go index 1fde4ef9cc..a25d5527f9 100644 --- a/src/common/scheduler/scheduler_test.go +++ b/src/common/scheduler/scheduler_test.go @@ -66,7 +66,7 @@ type fakeTask struct { number int } -func (ft *fakeTask) TaskName() string { +func (ft *fakeTask) Name() string { return "for testing" } @@ -86,7 +86,7 @@ func TestScheduler(t *testing.T) { t.Fail() } - if DefaultScheduler.IsStopped() { + if !DefaultScheduler.IsRunning() { t.Fatal("Scheduler is not started") } diff --git a/src/common/scheduler/task/scan_all_task.go b/src/common/scheduler/task/scan_all_task.go index f68b2f6fa7..c7d70d8385 100644 --- a/src/common/scheduler/task/scan_all_task.go +++ b/src/common/scheduler/task/scan_all_task.go @@ -1,6 +1,8 @@ package task -import "github.com/vmware/harbor/src/ui/utils" +import ( + "github.com/vmware/harbor/src/ui/utils" +) //ScanAllTask is task of scanning all tags. type ScanAllTask struct{} @@ -10,8 +12,8 @@ func NewScanAllTask() *ScanAllTask { return &ScanAllTask{} } -//TaskName returns the name of the task. -func (sat *ScanAllTask) TaskName() string { +//Name returns the name of the task. +func (sat *ScanAllTask) Name() string { return "scan all" } diff --git a/src/common/scheduler/task/scan_all_task_test.go b/src/common/scheduler/task/scan_all_task_test.go index 308cbb6ce3..18ac9202b0 100644 --- a/src/common/scheduler/task/scan_all_task_test.go +++ b/src/common/scheduler/task/scan_all_task_test.go @@ -1,6 +1,8 @@ package task -import "testing" +import ( + "testing" +) func TestTask(t *testing.T) { tk := NewScanAllTask() @@ -8,7 +10,7 @@ func TestTask(t *testing.T) { t.Fail() } - if tk.TaskName() != "scan all" { + if tk.Name() != "scan all" { t.Fail() } } diff --git a/src/common/scheduler/task/task.go b/src/common/scheduler/task/task.go index 7479d34958..162a6501a7 100644 --- a/src/common/scheduler/task/task.go +++ b/src/common/scheduler/task/task.go @@ -2,8 +2,8 @@ package task //Task is used to synchronously run specific action(s). type Task interface { - //TaskName should return the name of the task. - TaskName() string + //Name should return the name of the task. + Name() string //Run the concrete code here Run() error diff --git a/src/common/scheduler/watcher.go b/src/common/scheduler/watcher.go index 4846acdc12..2fe32a127f 100644 --- a/src/common/scheduler/watcher.go +++ b/src/common/scheduler/watcher.go @@ -4,6 +4,8 @@ import ( "github.com/vmware/harbor/src/common/scheduler/policy" "github.com/vmware/harbor/src/common/scheduler/task" "github.com/vmware/harbor/src/common/utils/log" + + "fmt" ) //Watcher is an asynchronous runner to provide an evaluation environment for the policy. @@ -64,7 +66,10 @@ func (wc *Watcher) Start() { go func(tk task.Task) { defer func() { if r := recover(); r != nil { - log.Errorf("Runtime error in task execution:%s\n", r) + st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)} + if wc.stats != nil { + wc.stats <- st + } } }() err := tk.Run() @@ -92,7 +97,9 @@ func (wc *Watcher) Start() { wc.isRunning = false //Report policy change stats. - wc.doneChan <- wc.p.Name() + if wc.doneChan != nil { + wc.doneChan <- wc.p.Name() + } return }