Implement policy scheduler

This commit is contained in:
Steven Zou 2017-07-05 02:17:25 +08:00
parent b51b3ea5ac
commit 2bc557b559
12 changed files with 1141 additions and 0 deletions

View File

@ -0,0 +1,185 @@
package policy
import "github.com/vmware/harbor/src/common/scheduler/task"
import "errors"
import "time"
//AlternatePolicyConfiguration store the related configurations for alternate policy.
type AlternatePolicyConfiguration struct {
//The interval of executing attached tasks.
Duration time.Duration
//The execution time point of each turn
//It's a number to indicate the seconds offset to the 00:00 of UTC time.
OffsetTime int64
//Time should be later than start time.
//If set <=0 value, no limitation.
StartTimestamp int64
//Time should be earlier than end time.
//If set <=0 value, no limitation.
EndTimestamp int64
}
//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
type AlternatePolicy struct {
//Keep the attached tasks.
tasks []task.Task
//Policy configurations.
config *AlternatePolicyConfiguration
//Generate time ticks with specified duration.
ticker *time.Ticker
//To indicated whether policy is completed.
isEnabled bool
//Channel used to send evaluation result signals.
evaluation chan EvaluationResult
//Channel used to notify policy termination.
done chan bool
//Channel used to receive terminate signal.
terminator chan bool
}
//NewAlternatePolicy is constructor of creating AlternatePolicy.
func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
return &AlternatePolicy{
tasks: []task.Task{},
config: config,
isEnabled: false,
}
}
//GetConfig returns the current configuration options of this policy.
func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration {
return alp.config
}
//Name is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Name() string {
return "Alternate Policy"
}
//Tasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Tasks() []task.Task {
copyList := []task.Task{}
if alp.tasks != nil && len(alp.tasks) > 0 {
copyList = append(copyList, alp.tasks...)
}
return copyList
}
//Done is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Done() chan bool {
return alp.done
}
//AttachTasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
if tasks == nil || len(tasks) == 0 {
return errors.New("No tasks can be attached")
}
alp.tasks = append(alp.tasks, tasks...)
return nil
}
//Disable is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Disable() error {
alp.isEnabled = false
//Stop the ticker
if alp.ticker != nil {
alp.ticker.Stop()
}
//Stop the evaluation goroutine
alp.terminator <- true
alp.ticker = nil
return nil
}
//Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() chan EvaluationResult {
//Keep idempotent
if alp.isEnabled && alp.evaluation != nil {
return alp.evaluation
}
alp.done = make(chan bool)
alp.terminator = make(chan bool)
alp.evaluation = make(chan EvaluationResult)
go func() {
timeNow := time.Now().UTC()
timeSeconds := timeNow.Unix()
//Pre-check
//If now is still in the specified time scope.
if alp.config.EndTimestamp > 0 && timeSeconds >= alp.config.EndTimestamp {
//Invalid configuration, exit.
alp.done <- true
alp.isEnabled = false
return
}
if alp.config.StartTimestamp > 0 && timeSeconds < alp.config.StartTimestamp {
//Let's hold on for a while.
forWhile := alp.config.StartTimestamp - timeSeconds
time.Sleep(time.Duration(forWhile) * time.Second)
}
//Reach the execution time point?
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
diff := alp.config.OffsetTime - utcTime
if diff < 0 {
diff += 24 * 3600
}
if diff > 0 {
//Wait for a while.
time.Sleep(time.Duration(diff) * time.Second)
}
//Trigger the first tick.
alp.evaluation <- EvaluationResult{}
//Start the ticker for repeat checking.
alp.ticker = time.NewTicker(alp.config.Duration)
for {
select {
case now := <-alp.ticker.C:
{
time := now.UTC().Unix()
if alp.config.EndTimestamp > 0 && time >= alp.config.EndTimestamp {
//Ploicy is done.
alp.done <- true
alp.isEnabled = false
if alp.ticker != nil {
alp.ticker.Stop()
}
alp.ticker = nil
return
}
res := EvaluationResult{}
alp.evaluation <- res
}
case <-alp.terminator:
return
}
}
}()
//Enabled
alp.isEnabled = true
return alp.evaluation
}

View File

@ -0,0 +1,120 @@
package policy
import (
"testing"
"time"
)
type fakeTask struct {
number int
}
func (ft *fakeTask) TaskName() string {
return "for testing"
}
func (ft *fakeTask) Run() error {
ft.number++
return nil
}
func TestBasic(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{})
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
t.Fail()
}
if tp.GetConfig() == nil {
t.Fail()
}
if tp.Name() != "Alternate Policy" {
t.Fail()
}
tks := tp.Tasks()
if tks == nil || len(tks) != 1 {
t.Fail()
}
}
func TestEvaluatePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
StartTimestamp: -1,
EndTimestamp: now.Add(3 * time.Second).Unix(),
})
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
t.Fail()
}
ch := tp.Evaluate()
done := tp.Done()
counter := 0
READ_SIGNAL:
for {
select {
case <-ch:
counter++
case <-done:
break READ_SIGNAL
case <-time.After(5 * time.Second):
t.Fail()
return
}
}
if counter != 2 {
t.Fail()
}
}
func TestDisablePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
StartTimestamp: -1,
EndTimestamp: -1,
})
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
t.Fail()
}
ch := tp.Evaluate()
counter := 0
terminate := make(chan bool)
defer func() {
terminate <- true
}()
go func() {
for {
select {
case <-ch:
counter++
case <-terminate:
return
case <-time.After(6 * time.Second):
return
}
}
}()
time.Sleep(2 * time.Second)
if tp.Disable() != nil {
t.Fatal("Failed to disable policy")
}
//Waiting for everything is stabel
<-time.After(1 * time.Second)
//Copy value
copiedCounter := counter
time.Sleep(2 * time.Second)
if counter != copiedCounter {
t.Fatalf("Policy is still running after calling Disable() %d=%d", copiedCounter, counter)
}
}

View File

@ -0,0 +1,15 @@
package policy
//EvaluationResult is defined to carry the policy evaluated result.
//
//Filed 'Result' is optional.
//Filed 'Error' is optional
//
type EvaluationResult struct {
//Policy is successfully evaluated and the related information can
//be contained in Result if have.
Result interface{}
//Policy is failed to evaluated.
Error error
}

View File

@ -0,0 +1,37 @@
package policy
import "github.com/vmware/harbor/src/common/scheduler/task"
//Policy is an if-then logic to determine how the attached tasks should be
//executed based on the evaluation result of the defined conditions.
//E.g:
// Daily execute TASK between 2017/06/24 and 2018/06/23
// Execute TASK at 2017/09/01 14:30:00
//
//Each policy should have a name to identify itself.
//Please be aware that policy with no tasks will be treated as invalid.
//
type Policy interface {
//Return the name of the policy.
Name() string
//Return the attached tasks with this policy.
Tasks() []task.Task
//Attach tasks to this policy
AttachTasks(...task.Task) error
//Done will setup a channel for other components to check whether or not
//the policy is completed. Possibly designed for the none loop policy.
Done() chan bool
//Evaluate the policy based on its definition and return the result via
//result channel. Policy is enabled after it is evaluated.
//Make sure Evaluate is idempotent, that means one policy can be only enabled
//only once even if Evaluate is called more than one times.
Evaluate() chan EvaluationResult
//Disable the enabled policy and release all the allocated resources.
//Disable should also send signal to the terminated channel which returned by Done.
Disable() error
}

View File

@ -0,0 +1,258 @@
package scheduler
import "github.com/vmware/harbor/src/common/scheduler/policy"
import "github.com/vmware/harbor/src/common/utils/log"
import "errors"
import "strings"
import "fmt"
import "reflect"
import "time"
const (
defaultQueueSize = 10
statSchedulePolicy = "Schedule Policy"
statUnSchedulePolicy = "Unschedule Policy"
statTaskRun = "Task Run"
statTaskComplete = "Task Complete"
statTaskFail = "Task Fail"
)
//StatItem is defined for the stat metrics.
type StatItem struct {
//Metrics catalog
Type string
//The stat value
Value uint32
//Attach some other info
Attachment interface{}
}
//StatSummary is used to collect some metrics of scheduler.
type StatSummary struct {
//Count of scheduled policy
PolicyCount uint32
//Total count of tasks
Tasks uint32
//Count of successfully complete tasks
CompletedTasks uint32
//Count of tasks with errors
TasksWithError uint32
}
//Configuration defines configuration of Scheduler.
type Configuration struct {
QueueSize uint8
}
//Scheduler is designed for scheduling policies.
type Scheduler struct {
//Related configuration options for scheduler.
config *Configuration
//Store to keep the references of scheduled policies.
policies Store
//Queue for accepting the scheduling polices.
scheduleQueue chan policy.Policy
//Queue for receiving policy unschedule request or complete signal.
unscheduleQueue chan string
//Channel for receiving stat metrics.
statChan chan *StatItem
//Channel for terminate scheduler damon.
terminateChan chan bool
//The stat metrics of scheduler.
stats *StatSummary
//To indicate whether scheduler is stopped or not
stopped bool
}
//DefaultScheduler is a default scheduler.
var DefaultScheduler = NewScheduler(nil)
//NewScheduler is constructor for creating a scheduler.
func NewScheduler(config *Configuration) *Scheduler {
var qSize uint8 = defaultQueueSize
if config != nil && config.QueueSize > 0 {
qSize = config.QueueSize
}
sq := make(chan policy.Policy, qSize)
usq := make(chan string, qSize)
stChan := make(chan *StatItem, 4)
tc := make(chan bool, 2)
store := NewConcurrentStore(10)
return &Scheduler{
config: config,
policies: store,
scheduleQueue: sq,
unscheduleQueue: usq,
statChan: stChan,
terminateChan: tc,
stats: &StatSummary{
PolicyCount: 0,
Tasks: 0,
CompletedTasks: 0,
TasksWithError: 0,
},
stopped: true,
}
}
//Start the scheduler damon.
func (sch *Scheduler) Start() {
if !sch.stopped {
return
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in scheduler:%s\n", r)
}
}()
defer func() {
sch.stopped = true
}()
for {
select {
case p := <-sch.scheduleQueue:
//Schedule the policy.
watcher := NewWatcher(p, sch.statChan, sch.unscheduleQueue)
//Keep the policy for future use after it's successfully scheduled.
sch.policies.Put(p.Name(), watcher)
//Enable it.
watcher.Start()
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
case name := <-sch.unscheduleQueue:
//Find the watcher.
watcher := sch.policies.Remove(name)
if watcher != nil && watcher.IsRunning() {
watcher.Stop()
}
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
case <-sch.terminateChan:
//Exit
return
case stat := <-sch.statChan:
{
switch stat.Type {
case statSchedulePolicy:
sch.stats.PolicyCount += stat.Value
break
case statUnSchedulePolicy:
sch.stats.PolicyCount -= stat.Value
break
case statTaskRun:
sch.stats.Tasks += stat.Value
break
case statTaskComplete:
sch.stats.CompletedTasks += stat.Value
break
case statTaskFail:
sch.stats.TasksWithError += stat.Value
break
default:
break
}
log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n",
sch.stats.PolicyCount,
sch.stats.Tasks,
sch.stats.CompletedTasks,
sch.stats.TasksWithError)
if stat.Attachment != nil &&
reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" {
log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error())
}
}
}
}
}()
sch.stopped = false
log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339))
}
//Stop the scheduler damon.
func (sch *Scheduler) Stop() {
if sch.stopped {
return
}
//Terminate damon firstly to stop receiving signals.
sch.terminateChan <- true
//Stop all watchers.
for _, wt := range sch.policies.GetAll() {
wt.Stop()
}
//Clear resources
sch.policies.Clear()
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
}
//Schedule and enable the policy.
func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
if scheduledPolicy == nil {
return errors.New("nil is not Policy object")
}
if strings.TrimSpace(scheduledPolicy.Name()) == "" {
return errors.New("Policy should be assigned a name")
}
tasks := scheduledPolicy.Tasks()
if tasks == nil || len(tasks) == 0 {
return errors.New("Policy must attach task(s)")
}
if sch.policies.Exists(scheduledPolicy.Name()) {
return errors.New("Duplicated policy")
}
//Schedule the policy.
sch.scheduleQueue <- scheduledPolicy
return nil
}
//UnSchedule the specified policy from the enabled policies list.
func (sch *Scheduler) UnSchedule(policyName string) error {
if strings.TrimSpace(policyName) == "" {
return errors.New("Empty policy name is invalid")
}
if !sch.policies.Exists(policyName) {
return fmt.Errorf("Policy %s is not existing", policyName)
}
//Unschedule the policy.
sch.unscheduleQueue <- policyName
return nil
}
//IsStopped to indicate whether the scheduler is stopped
func (sch *Scheduler) IsStopped() bool {
return sch.stopped
}

View File

@ -0,0 +1,140 @@
package scheduler
import "sync"
import "strings"
const defaultSize = 10
//Store define the basic operations for storing and managing policy watcher.
//The concrete implementation should consider concurrent supporting scenario.
//
type Store interface {
//Put a new policy in.
Put(key string, value *Watcher)
//Get the corresponding policy with the key.
Get(key string) *Watcher
//Check if the key existing in the store.
Exists(key string) bool
//Remove the specified policy and return its reference.
Remove(key string) *Watcher
//Size return the total count of items in store.
Size() uint32
//Get all the items in the store.
GetAll() []*Watcher
//Clear store.
Clear()
}
//ConcurrentStore implements Store interface and supports concurrent operations.
type ConcurrentStore struct {
//Read-write mutex to synchronize the data map.
mutex *sync.RWMutex
//Map used to keep the policy list.
data map[string]*Watcher
}
//NewConcurrentStore is used to create a new store and return the pointer reference.
func NewConcurrentStore(initialSize uint32) *ConcurrentStore {
var initSize uint32 = defaultSize
if initialSize > 0 {
initSize = initialSize
}
mutex := new(sync.RWMutex)
data := make(map[string]*Watcher, initSize)
return &ConcurrentStore{mutex, data}
}
//Put a policy into store.
func (cs *ConcurrentStore) Put(key string, value *Watcher) {
if strings.TrimSpace(key) == "" || value == nil {
return
}
defer cs.mutex.Unlock()
cs.mutex.Lock()
cs.data[key] = value
}
//Get policy via key.
func (cs *ConcurrentStore) Get(key string) *Watcher {
if strings.TrimSpace(key) == "" {
return nil
}
defer cs.mutex.RUnlock()
cs.mutex.RLock()
return cs.data[key]
}
//Exists is used to check whether or not the key exists in store.
func (cs *ConcurrentStore) Exists(key string) bool {
if strings.TrimSpace(key) == "" {
return false
}
defer cs.mutex.RUnlock()
cs.mutex.RLock()
_, ok := cs.data[key]
return ok
}
//Remove is to delete the specified policy.
func (cs *ConcurrentStore) Remove(key string) *Watcher {
if !cs.Exists(key) {
return nil
}
defer cs.mutex.Unlock()
cs.mutex.Lock()
if wt, ok := cs.data[key]; ok {
delete(cs.data, key)
return wt
}
return nil
}
//Size return the total count of items in store.
func (cs *ConcurrentStore) Size() uint32 {
return (uint32)(len(cs.data))
}
//Get all the items of store.
func (cs *ConcurrentStore) GetAll() []*Watcher {
all := []*Watcher{}
defer cs.mutex.RUnlock()
cs.mutex.RLock()
for _, v := range cs.data {
all = append(all, v)
}
return all
}
//Clear all the items in store.
func (cs *ConcurrentStore) Clear() {
if cs.Size() == 0 {
return
}
defer cs.mutex.Unlock()
cs.mutex.Lock()
for k := range cs.data {
delete(cs.data, k)
}
}

View File

@ -0,0 +1,71 @@
package scheduler
import (
"testing"
)
func TestPut(t *testing.T) {
store := NewConcurrentStore(10)
if store == nil {
t.Fatal("Failed to creat store instance")
}
store.Put("testing", NewWatcher(nil, nil, nil))
if store.Size() != 1 {
t.Fail()
}
}
func TestGet(t *testing.T) {
store := NewConcurrentStore(10)
if store == nil {
t.Fatal("Failed to creat store instance")
}
store.Put("testing", NewWatcher(nil, nil, nil))
w := store.Get("testing")
if w == nil {
t.Fail()
}
}
func TestRemove(t *testing.T) {
store := NewConcurrentStore(10)
if store == nil {
t.Fatal("Failed to creat store instance")
}
store.Put("testing", NewWatcher(nil, nil, nil))
if !store.Exists("testing") {
t.Fail()
}
w := store.Remove("testing")
if w == nil {
t.Fail()
}
}
func TestExisting(t *testing.T) {
store := NewConcurrentStore(10)
if store == nil {
t.Fatal("Failed to creat store instance")
}
store.Put("testing", NewWatcher(nil, nil, nil))
if !store.Exists("testing") {
t.Fail()
}
if store.Exists("fake_key") {
t.Fail()
}
}
func TestGetAll(t *testing.T) {
store := NewConcurrentStore(10)
if store == nil {
t.Fatal("Failed to creat store instance")
}
store.Put("testing", NewWatcher(nil, nil, nil))
store.Put("testing2", NewWatcher(nil, nil, nil))
list := store.GetAll()
if list == nil || len(list) != 2 {
t.Fail()
}
}

View File

@ -0,0 +1,142 @@
package scheduler
import (
"testing"
"time"
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task"
)
type fakePolicy struct {
tasks []task.Task
done chan bool
evaluation chan policy.EvaluationResult
terminate chan bool
ticker *time.Ticker
}
func (fp *fakePolicy) Name() string {
return "testing policy"
}
func (fp *fakePolicy) Tasks() []task.Task {
return fp.tasks
}
func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error {
fp.tasks = append(fp.tasks, tasks...)
return nil
}
func (fp *fakePolicy) Done() chan bool {
return fp.done
}
func (fp *fakePolicy) Evaluate() chan policy.EvaluationResult {
fp.evaluation = make(chan policy.EvaluationResult, 2)
fp.done = make(chan bool)
fp.terminate = make(chan bool)
fp.evaluation <- policy.EvaluationResult{}
go func() {
fp.ticker = time.NewTicker(1 * time.Second)
for {
select {
case <-fp.terminate:
return
case <-fp.ticker.C:
fp.evaluation <- policy.EvaluationResult{}
}
}
}()
return fp.evaluation
}
func (fp *fakePolicy) Disable() error {
if fp.ticker != nil {
fp.ticker.Stop()
}
fp.terminate <- true
return nil
}
type fakeTask struct {
number int
}
func (ft *fakeTask) TaskName() string {
return "for testing"
}
func (ft *fakeTask) Run() error {
ft.number++
return nil
}
//Wacher will be tested together with scheduler.
func TestScheduler(t *testing.T) {
DefaultScheduler.Start()
if DefaultScheduler.policies.Size() != 0 {
t.Fail()
}
if DefaultScheduler.stats.PolicyCount != 0 {
t.Fail()
}
if DefaultScheduler.IsStopped() {
t.Fatal("Scheduler is not started")
}
fp := &fakePolicy{
tasks: []task.Task{},
}
fk := &fakeTask{number: 100}
fp.AttachTasks(fk)
if DefaultScheduler.Schedule(fp) != nil {
t.Fatal("Schedule policy failed")
}
//Waiting for everything is stable
time.Sleep(1 * time.Second)
if DefaultScheduler.policies.Size() == 0 {
t.Fatal("No policy in the store after calling Schedule()")
}
if DefaultScheduler.stats.PolicyCount != 1 {
t.Fatal("Policy stats do not match")
}
time.Sleep(2 * time.Second)
if fk.number == 100 {
t.Fatal("Task is not triggered")
}
if DefaultScheduler.stats.Tasks == 0 {
t.Fail()
}
if DefaultScheduler.stats.CompletedTasks == 0 {
t.Fail()
}
if DefaultScheduler.UnSchedule(fp.Name()) != nil {
t.Fatal("Unschedule policy failed")
}
//Waiting for everything is stable
time.Sleep(1 * time.Second)
if DefaultScheduler.stats.PolicyCount != 0 {
t.Fatal("Policy count does not match after calling UnSchedule()")
}
copiedValue := DefaultScheduler.stats.CompletedTasks
<-time.After(2 * time.Second)
if copiedValue != DefaultScheduler.stats.CompletedTasks {
t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", copiedValue, DefaultScheduler.stats.CompletedTasks)
}
DefaultScheduler.Stop()
if DefaultScheduler.policies.Size() != 0 {
t.Fatal("Scheduler is not cleared after stopping")
}
}

View File

@ -0,0 +1,21 @@
package task
import "github.com/vmware/harbor/src/ui/utils"
//ScanAllTask is task of scanning all tags.
type ScanAllTask struct{}
//NewScanAllTask is constructor of creating ScanAllTask.
func NewScanAllTask() *ScanAllTask {
return &ScanAllTask{}
}
//TaskName returns the name of the task.
func (sat *ScanAllTask) TaskName() string {
return "scan all"
}
//Run the actions.
func (sat *ScanAllTask) Run() error {
return utils.ScanAllImages()
}

View File

@ -0,0 +1,14 @@
package task
import "testing"
func TestTask(t *testing.T) {
tk := NewScanAllTask()
if tk == nil {
t.Fail()
}
if tk.TaskName() != "scan all" {
t.Fail()
}
}

View File

@ -0,0 +1,10 @@
package task
//Task is used to synchronously run specific action(s).
type Task interface {
//Name of the task.
TaskName() string
//Run the concrete code here
Run() error
}

View File

@ -0,0 +1,128 @@
package scheduler
import (
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task"
"github.com/vmware/harbor/src/common/utils/log"
)
//Watcher is an asynchronous runner to provide an evaluation environment for the policy.
type Watcher struct {
//The target policy.
p policy.Policy
//The channel for receive stop signal.
cmdChan chan bool
//Indicate whether the watch is started and running.
isRunning bool
//Report stats to scheduler.
stats chan *StatItem
//If policy is automatically completed, report the policy to scheduler.
doneChan chan string
}
//NewWatcher is used as a constructor.
func NewWatcher(p policy.Policy, st chan *StatItem, done chan string) *Watcher {
return &Watcher{
p: p,
cmdChan: make(chan bool),
isRunning: false,
stats: st,
doneChan: done,
}
}
//Start the running.
func (wc *Watcher) Start() {
if wc.isRunning {
return
}
if wc.p == nil {
return
}
go func(pl policy.Policy) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in watcher:%s\n", r)
}
}()
evalChan := pl.Evaluate()
done := pl.Done()
for {
select {
case <-evalChan:
{
//Start to run the attached tasks.
for _, t := range pl.Tasks() {
go func(tk task.Task) {
defer func() {
if r := recover(); r != nil {
log.Errorf("Runtime error in task execution:%s\n", r)
}
}()
err := tk.Run()
//Report task execution stats.
st := &StatItem{statTaskComplete, 1, err}
if err != nil {
st.Type = statTaskFail
}
if wc.stats != nil {
wc.stats <- st
}
}(t)
//Report task run stats.
st := &StatItem{statTaskRun, 1, nil}
if wc.stats != nil {
wc.stats <- st
}
}
}
case <-done:
{
//Policy is automatically completed.
wc.isRunning = false
//Report policy change stats.
wc.doneChan <- wc.p.Name()
return
}
case <-wc.cmdChan:
//Exit goroutine.
return
}
}
}(wc.p)
wc.isRunning = true
}
//Stop the running.
func (wc *Watcher) Stop() {
if !wc.isRunning {
return
}
//Disable policy.
if wc.p != nil {
wc.p.Disable()
}
//Stop watcher.
wc.cmdChan <- true
wc.isRunning = false
}
//IsRunning to indicate if the watcher is still running.
func (wc *Watcher) IsRunning() bool {
return wc.isRunning
}