refine scheduler according to review comments

This commit is contained in:
Steven Zou 2017-07-05 19:13:49 +08:00
parent 4b2bae4558
commit 82e17fade1
11 changed files with 77 additions and 63 deletions

View File

@ -1,8 +1,11 @@
package policy
import "github.com/vmware/harbor/src/common/scheduler/task"
import "errors"
import "time"
import (
"errors"
"time"
"github.com/vmware/harbor/src/common/scheduler/task"
)
//AlternatePolicyConfiguration store the related configurations for alternate policy.
type AlternatePolicyConfiguration struct {

View File

@ -9,7 +9,7 @@ type fakeTask struct {
number int
}
func (ft *fakeTask) TaskName() string {
func (ft *fakeTask) Name() string {
return "for testing"
}

View File

@ -1,6 +1,8 @@
package policy
import "github.com/vmware/harbor/src/common/scheduler/task"
import (
"github.com/vmware/harbor/src/common/scheduler/task"
)
//Policy is an if-then logic to determine how the attached tasks should be
//executed based on the evaluation result of the defined conditions.

View File

@ -1,13 +1,15 @@
package scheduler
import "github.com/vmware/harbor/src/common/scheduler/policy"
import "github.com/vmware/harbor/src/common/utils/log"
import (
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/utils/log"
import "errors"
import "strings"
import "fmt"
import "reflect"
import "time"
"errors"
"fmt"
"reflect"
"strings"
"time"
)
const (
defaultQueueSize = 10
@ -74,8 +76,8 @@ type Scheduler struct {
//The stat metrics of scheduler.
stats *StatSummary
//To indicate whether scheduler is stopped or not
stopped bool
//To indicate whether scheduler is running or not
isRunning bool
}
//DefaultScheduler is a default scheduler.
@ -93,7 +95,7 @@ func NewScheduler(config *Configuration) *Scheduler {
stChan := make(chan *StatItem, 4)
tc := make(chan bool, 2)
store := NewConcurrentStore(10)
store := NewConcurrentStore()
return &Scheduler{
config: config,
policies: store,
@ -107,13 +109,13 @@ func NewScheduler(config *Configuration) *Scheduler {
CompletedTasks: 0,
TasksWithError: 0,
},
stopped: true,
isRunning: false,
}
}
//Start the scheduler damon.
func (sch *Scheduler) Start() {
if !sch.stopped {
if sch.isRunning {
return
}
go func() {
@ -123,7 +125,7 @@ func (sch *Scheduler) Start() {
}
}()
defer func() {
sch.stopped = true
sch.isRunning = false
}()
for {
select {
@ -187,13 +189,13 @@ func (sch *Scheduler) Start() {
}
}()
sch.stopped = false
sch.isRunning = true
log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339))
}
//Stop the scheduler damon.
func (sch *Scheduler) Stop() {
if sch.stopped {
if !sch.isRunning {
return
}
@ -227,7 +229,7 @@ func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
}
if sch.policies.Exists(scheduledPolicy.Name()) {
return errors.New("Duplicated policy")
return fmt.Errorf("Duplicated policy: %s", scheduledPolicy.Name())
}
//Schedule the policy.
@ -252,7 +254,7 @@ func (sch *Scheduler) UnSchedule(policyName string) error {
return nil
}
//IsStopped to indicate whether the scheduler is stopped
func (sch *Scheduler) IsStopped() bool {
return sch.stopped
//IsRunning to indicate whether the scheduler is running.
func (sch *Scheduler) IsRunning() bool {
return sch.isRunning
}

View File

@ -1,9 +1,9 @@
package scheduler
import "sync"
import "strings"
const defaultSize = 10
import (
"strings"
"sync"
)
//Store define the basic operations for storing and managing policy watcher.
//The concrete implementation should consider concurrent supporting scenario.
@ -34,20 +34,16 @@ type Store interface {
//ConcurrentStore implements Store interface and supports concurrent operations.
type ConcurrentStore struct {
//Read-write mutex to synchronize the data map.
mutex *sync.RWMutex
*sync.RWMutex
//Map used to keep the policy list.
data map[string]*Watcher
}
//NewConcurrentStore is used to create a new store and return the pointer reference.
func NewConcurrentStore(initialSize uint32) *ConcurrentStore {
var initSize uint32 = defaultSize
if initialSize > 0 {
initSize = initialSize
}
func NewConcurrentStore() *ConcurrentStore {
mutex := new(sync.RWMutex)
data := make(map[string]*Watcher, initSize)
data := make(map[string]*Watcher)
return &ConcurrentStore{mutex, data}
}
@ -58,9 +54,9 @@ func (cs *ConcurrentStore) Put(key string, value *Watcher) {
return
}
defer cs.mutex.Unlock()
defer cs.Unlock()
cs.mutex.Lock()
cs.Lock()
cs.data[key] = value
}
@ -70,9 +66,9 @@ func (cs *ConcurrentStore) Get(key string) *Watcher {
return nil
}
defer cs.mutex.RUnlock()
defer cs.RUnlock()
cs.mutex.RLock()
cs.RLock()
return cs.data[key]
}
@ -82,9 +78,9 @@ func (cs *ConcurrentStore) Exists(key string) bool {
return false
}
defer cs.mutex.RUnlock()
defer cs.RUnlock()
cs.mutex.RLock()
cs.RLock()
_, ok := cs.data[key]
return ok
@ -96,9 +92,9 @@ func (cs *ConcurrentStore) Remove(key string) *Watcher {
return nil
}
defer cs.mutex.Unlock()
defer cs.Unlock()
cs.mutex.Lock()
cs.Lock()
if wt, ok := cs.data[key]; ok {
delete(cs.data, key)
return wt
@ -116,8 +112,8 @@ func (cs *ConcurrentStore) Size() uint32 {
func (cs *ConcurrentStore) GetAll() []*Watcher {
all := []*Watcher{}
defer cs.mutex.RUnlock()
cs.mutex.RLock()
defer cs.RUnlock()
cs.RLock()
for _, v := range cs.data {
all = append(all, v)
}
@ -131,8 +127,8 @@ func (cs *ConcurrentStore) Clear() {
return
}
defer cs.mutex.Unlock()
cs.mutex.Lock()
defer cs.Unlock()
cs.Lock()
for k := range cs.data {
delete(cs.data, k)

View File

@ -5,7 +5,7 @@ import (
)
func TestPut(t *testing.T) {
store := NewConcurrentStore(10)
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
}
@ -17,7 +17,7 @@ func TestPut(t *testing.T) {
}
func TestGet(t *testing.T) {
store := NewConcurrentStore(10)
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
}
@ -29,7 +29,7 @@ func TestGet(t *testing.T) {
}
func TestRemove(t *testing.T) {
store := NewConcurrentStore(10)
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
}
@ -44,7 +44,7 @@ func TestRemove(t *testing.T) {
}
func TestExisting(t *testing.T) {
store := NewConcurrentStore(10)
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
}
@ -58,7 +58,7 @@ func TestExisting(t *testing.T) {
}
func TestGetAll(t *testing.T) {
store := NewConcurrentStore(10)
store := NewConcurrentStore()
if store == nil {
t.Fatal("Failed to creat store instance")
}

View File

@ -66,7 +66,7 @@ type fakeTask struct {
number int
}
func (ft *fakeTask) TaskName() string {
func (ft *fakeTask) Name() string {
return "for testing"
}
@ -86,7 +86,7 @@ func TestScheduler(t *testing.T) {
t.Fail()
}
if DefaultScheduler.IsStopped() {
if !DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is not started")
}

View File

@ -1,6 +1,8 @@
package task
import "github.com/vmware/harbor/src/ui/utils"
import (
"github.com/vmware/harbor/src/ui/utils"
)
//ScanAllTask is task of scanning all tags.
type ScanAllTask struct{}
@ -10,8 +12,8 @@ func NewScanAllTask() *ScanAllTask {
return &ScanAllTask{}
}
//TaskName returns the name of the task.
func (sat *ScanAllTask) TaskName() string {
//Name returns the name of the task.
func (sat *ScanAllTask) Name() string {
return "scan all"
}

View File

@ -1,6 +1,8 @@
package task
import "testing"
import (
"testing"
)
func TestTask(t *testing.T) {
tk := NewScanAllTask()
@ -8,7 +10,7 @@ func TestTask(t *testing.T) {
t.Fail()
}
if tk.TaskName() != "scan all" {
if tk.Name() != "scan all" {
t.Fail()
}
}

View File

@ -2,8 +2,8 @@ package task
//Task is used to synchronously run specific action(s).
type Task interface {
//TaskName should return the name of the task.
TaskName() string
//Name should return the name of the task.
Name() string
//Run the concrete code here
Run() error

View File

@ -4,6 +4,8 @@ import (
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task"
"github.com/vmware/harbor/src/common/utils/log"
"fmt"
)
//Watcher is an asynchronous runner to provide an evaluation environment for the policy.
@ -64,7 +66,10 @@ func (wc *Watcher) Start() {
go func(tk task.Task) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in task execution:%s\n", r)
st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)}
if wc.stats != nil {
wc.stats <- st
}
}
}()
err := tk.Run()
@ -92,7 +97,9 @@ func (wc *Watcher) Start() {
wc.isRunning = false
//Report policy change stats.
if wc.doneChan != nil {
wc.doneChan <- wc.p.Name()
}
return
}