mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-26 09:31:24 +01:00
Merge pull request #6043 from reasonerjt/remove-local-sched
Remove the local scheduler
This commit is contained in:
commit
f131196781
@ -1,251 +0,0 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/scheduler/task"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
)
|
||||
|
||||
const (
|
||||
oneDay = 24 * 3600
|
||||
)
|
||||
|
||||
// AlternatePolicyConfiguration store the related configurations for alternate policy.
|
||||
type AlternatePolicyConfiguration struct {
|
||||
// Duration is the interval of executing attached tasks.
|
||||
// E.g: 24*3600 for daily
|
||||
// 7*24*3600 for weekly
|
||||
Duration time.Duration
|
||||
|
||||
// An integer to indicate the the weekday of the week. Please be noted that Sunday is 7.
|
||||
// Use default value 0 to indicate weekday is not set.
|
||||
// To support by weekly function.
|
||||
Weekday int8
|
||||
|
||||
// OffsetTime is the execution time point of each turn
|
||||
// It's a number to indicate the seconds offset to the 00:00 of UTC time.
|
||||
OffsetTime int64
|
||||
}
|
||||
|
||||
// AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
|
||||
type AlternatePolicy struct {
|
||||
// To sync the related operations.
|
||||
*sync.RWMutex
|
||||
|
||||
// Keep the attached tasks.
|
||||
tasks task.Store
|
||||
|
||||
// Policy configurations.
|
||||
config *AlternatePolicyConfiguration
|
||||
|
||||
// To indicated whether policy is enabled or not.
|
||||
isEnabled bool
|
||||
|
||||
// Channel used to send evaluation result signals.
|
||||
evaluation chan bool
|
||||
|
||||
// Channel used to notify policy termination.
|
||||
done chan bool
|
||||
|
||||
// Channel used to receive terminate signal.
|
||||
terminator chan bool
|
||||
|
||||
// Unique name of this policy to support multiple instances
|
||||
name string
|
||||
}
|
||||
|
||||
// NewAlternatePolicy is constructor of creating AlternatePolicy.
|
||||
// Accept name and configuration as parameters.
|
||||
func NewAlternatePolicy(name string, config *AlternatePolicyConfiguration) *AlternatePolicy {
|
||||
return &AlternatePolicy{
|
||||
RWMutex: new(sync.RWMutex),
|
||||
tasks: task.NewDefaultStore(),
|
||||
config: config,
|
||||
isEnabled: false,
|
||||
terminator: make(chan bool),
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// GetConfig returns the current configuration options of this policy.
|
||||
func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration {
|
||||
return alp.config
|
||||
}
|
||||
|
||||
// Name is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Name() string {
|
||||
return alp.name
|
||||
}
|
||||
|
||||
// Tasks is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Tasks() []task.Task {
|
||||
return alp.tasks.GetTasks()
|
||||
}
|
||||
|
||||
// Done is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Done() <-chan bool {
|
||||
return alp.done
|
||||
}
|
||||
|
||||
// AttachTasks is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
|
||||
if len(tasks) == 0 {
|
||||
return errors.New("No tasks can be attached")
|
||||
}
|
||||
|
||||
alp.tasks.AddTasks(tasks...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Disable is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Disable() error {
|
||||
alp.Lock()
|
||||
if !alp.isEnabled {
|
||||
alp.Unlock()
|
||||
return fmt.Errorf("Instance of policy %s is not enabled", alp.Name())
|
||||
}
|
||||
|
||||
// Set state to disabled
|
||||
alp.isEnabled = false
|
||||
alp.Unlock()
|
||||
|
||||
// Stop the evaluation goroutine
|
||||
alp.terminator <- true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Evaluate is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
|
||||
// Lock for state changing
|
||||
defer alp.Unlock()
|
||||
alp.Lock()
|
||||
|
||||
// Check if configuration is valid
|
||||
if !alp.isValidConfig() {
|
||||
return nil, errors.New("Policy configuration is not valid")
|
||||
}
|
||||
|
||||
// Check if policy instance is still running
|
||||
if alp.isEnabled {
|
||||
return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name())
|
||||
}
|
||||
|
||||
// Keep idempotent
|
||||
if alp.evaluation != nil {
|
||||
return alp.evaluation, nil
|
||||
}
|
||||
|
||||
alp.done = make(chan bool)
|
||||
alp.evaluation = make(chan bool)
|
||||
|
||||
go func() {
|
||||
var (
|
||||
waitingTime int64
|
||||
)
|
||||
timeNow := time.Now().UTC()
|
||||
|
||||
// Reach the execution time point?
|
||||
// Weekday is set
|
||||
if alp.config.Weekday > 0 {
|
||||
targetWeekday := (alp.config.Weekday + 7) % 7
|
||||
currentWeekday := timeNow.Weekday()
|
||||
weekdayDiff := (int)(targetWeekday - (int8)(currentWeekday))
|
||||
if weekdayDiff < 0 {
|
||||
weekdayDiff += 7
|
||||
}
|
||||
waitingTime = (int64)(weekdayDiff * oneDay)
|
||||
}
|
||||
|
||||
// Time
|
||||
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
|
||||
diff := alp.config.OffsetTime - utcTime
|
||||
if waitingTime > 0 {
|
||||
waitingTime += diff
|
||||
} else {
|
||||
waitingTime = diff
|
||||
if waitingTime < 0 {
|
||||
waitingTime += oneDay
|
||||
}
|
||||
}
|
||||
|
||||
// Let's wait for a while
|
||||
if waitingTime > 0 {
|
||||
// Wait for a while.
|
||||
log.Infof("Waiting for %d seconds after comparing offset %d and utc time %d\n", diff, alp.config.OffsetTime, utcTime)
|
||||
select {
|
||||
case <-time.After(time.Duration(waitingTime) * time.Second):
|
||||
case <-alp.terminator:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger the first tick.
|
||||
alp.evaluation <- true
|
||||
|
||||
// Start the ticker for repeat checking.
|
||||
tk := time.NewTicker(alp.config.Duration)
|
||||
defer func() {
|
||||
if tk != nil {
|
||||
tk.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tk.C:
|
||||
if alp.IsEnabled() {
|
||||
alp.evaluation <- true
|
||||
}
|
||||
case <-alp.terminator:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Enabled
|
||||
alp.isEnabled = true
|
||||
|
||||
return alp.evaluation, nil
|
||||
}
|
||||
|
||||
// Equal is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) Equal(p Policy) bool {
|
||||
if p == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
pl, ok := p.(*AlternatePolicy)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
cfg := pl.GetConfig()
|
||||
cfg2 := alp.GetConfig()
|
||||
if (cfg == nil && cfg2 != nil) || (cfg != nil && cfg2 == nil) {
|
||||
return false
|
||||
}
|
||||
|
||||
return cfg == nil ||
|
||||
(cfg.Duration == cfg2.Duration &&
|
||||
cfg.OffsetTime == cfg2.OffsetTime &&
|
||||
cfg.Weekday == cfg2.Weekday)
|
||||
}
|
||||
|
||||
// IsEnabled is an implementation of same method in policy interface.
|
||||
func (alp *AlternatePolicy) IsEnabled() bool {
|
||||
defer alp.RUnlock()
|
||||
alp.RLock()
|
||||
|
||||
return alp.isEnabled
|
||||
}
|
||||
|
||||
// Check if the config is valid. At least it should have the configurations for supporting daily policy.
|
||||
func (alp *AlternatePolicy) isValidConfig() bool {
|
||||
return alp.config != nil && alp.config.Duration > 0 && alp.config.OffsetTime >= 0
|
||||
}
|
@ -1,149 +0,0 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
testPolicyName = "TestingPolicy"
|
||||
)
|
||||
|
||||
type fakeTask struct {
|
||||
number int32
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Name() string {
|
||||
return "for testing"
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Run() error {
|
||||
atomic.AddInt32(&(ft.number), 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Number() int32 {
|
||||
return atomic.LoadInt32(&ft.number)
|
||||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{})
|
||||
err := tp.AttachTasks(&fakeTask{number: 100})
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if tp.GetConfig() == nil {
|
||||
t.Fatal("nil config")
|
||||
}
|
||||
|
||||
if tp.Name() != testPolicyName {
|
||||
t.Fatalf("Wrong name %s", tp.Name())
|
||||
}
|
||||
|
||||
tks := tp.Tasks()
|
||||
if tks == nil || len(tks) != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEvaluatePolicy(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
|
||||
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
|
||||
Duration: 1 * time.Second,
|
||||
OffsetTime: utcOffset + 1,
|
||||
})
|
||||
err := tp.AttachTasks(&fakeTask{number: 100})
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
ch, _ := tp.Evaluate()
|
||||
var counter int32
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-ch:
|
||||
atomic.AddInt32(&counter, 1)
|
||||
case <-time.After(2 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&counter) != 3 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
tp.Disable()
|
||||
}
|
||||
|
||||
func TestDisablePolicy(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
|
||||
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
|
||||
Duration: 1 * time.Second,
|
||||
OffsetTime: utcOffset + 1,
|
||||
})
|
||||
err := tp.AttachTasks(&fakeTask{number: 100})
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
ch, _ := tp.Evaluate()
|
||||
var counter int32
|
||||
terminate := make(chan bool)
|
||||
defer func() {
|
||||
terminate <- true
|
||||
}()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
atomic.AddInt32(&counter, 1)
|
||||
case <-terminate:
|
||||
return
|
||||
case <-time.After(6 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
time.Sleep(2 * time.Second)
|
||||
if tp.Disable() != nil {
|
||||
t.Fatal("Failed to disable policy")
|
||||
}
|
||||
// Waiting for everything is stable
|
||||
<-time.After(1 * time.Second)
|
||||
// Copy value
|
||||
var copiedCounter int32
|
||||
atomic.StoreInt32(&copiedCounter, atomic.LoadInt32(&counter))
|
||||
time.Sleep(2 * time.Second)
|
||||
if atomic.LoadInt32(&counter) != atomic.LoadInt32(&copiedCounter) {
|
||||
t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter))
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolicyEqual(t *testing.T) {
|
||||
tp1 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
|
||||
Duration: 1 * time.Second,
|
||||
OffsetTime: 8000,
|
||||
})
|
||||
|
||||
tp2 := NewAlternatePolicy(testPolicyName+"2", &AlternatePolicyConfiguration{
|
||||
Duration: 100 * time.Second,
|
||||
OffsetTime: 8000,
|
||||
})
|
||||
|
||||
if tp1.Equal(tp2) {
|
||||
t.Fatal("tp1 should not equal tp2")
|
||||
}
|
||||
|
||||
tp3 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
|
||||
Duration: 1 * time.Second,
|
||||
OffsetTime: 8000,
|
||||
})
|
||||
|
||||
if !tp1.Equal(tp3) {
|
||||
t.Fatal("tp1 should equal tp3")
|
||||
}
|
||||
}
|
@ -1,48 +0,0 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/scheduler/task"
|
||||
)
|
||||
|
||||
// Policy is an if-then logic to determine how the attached tasks should be
|
||||
// executed based on the evaluation result of the defined conditions.
|
||||
// E.g:
|
||||
// Daily execute TASK between 2017/06/24 and 2018/06/23
|
||||
// Execute TASK at 2017/09/01 14:30:00
|
||||
//
|
||||
// Each policy should have a name to identify itself.
|
||||
// Please be aware that policy with no tasks will be treated as invalid.
|
||||
//
|
||||
type Policy interface {
|
||||
// Name will return the name of the policy.
|
||||
// If the policy supports multiple instances, please make sure the name is unique as an UUID.
|
||||
Name() string
|
||||
|
||||
// Tasks will return the attached tasks with this policy.
|
||||
Tasks() []task.Task
|
||||
|
||||
// AttachTasks is to attach tasks to this policy
|
||||
AttachTasks(...task.Task) error
|
||||
|
||||
// Done will setup a channel for other components to check whether or not
|
||||
// the policy is completed. Possibly designed for the none loop policy.
|
||||
Done() <-chan bool
|
||||
|
||||
// Evaluate the policy based on its definition and return the result via
|
||||
// result channel. Policy is enabled after it is evaluated.
|
||||
// Make sure Evaluate is idempotent, that means one policy can be only enabled
|
||||
// only once even if Evaluate is called more than one times.
|
||||
Evaluate() (<-chan bool, error)
|
||||
|
||||
// Disable the enabled policy and release all the allocated resources.
|
||||
Disable() error
|
||||
|
||||
// Equal will compare the two policies based on related factors if existing such as confgiuration etc.
|
||||
// to determine whether the two policies are same ones or not. Please pay attention that, not every policy
|
||||
// needs to support this method. If no need, please directly return false to indicate each policies are
|
||||
// different.
|
||||
Equal(p Policy) bool
|
||||
|
||||
// IsEnabled is to indicate whether the policy is enabled or not (disabled).
|
||||
IsEnabled() bool
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// NewUUID will generate a new UUID.
|
||||
// Code copied from https://play.golang.org/p/4FkNSiUDMg
|
||||
func newUUID() (string, error) {
|
||||
uuid := make([]byte, 16)
|
||||
n, err := io.ReadFull(rand.Reader, uuid)
|
||||
if n != len(uuid) || err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
uuid[8] = uuid[8]&^0xc0 | 0x80
|
||||
uuid[6] = uuid[6]&^0xf0 | 0x40
|
||||
|
||||
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
|
||||
}
|
@ -1,311 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/scheduler/policy"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultQueueSize = 10
|
||||
|
||||
statSchedulePolicy = "Schedule Policy"
|
||||
statUnSchedulePolicy = "Unschedule Policy"
|
||||
statTaskRun = "Task Run"
|
||||
statTaskComplete = "Task Complete"
|
||||
statTaskFail = "Task Fail"
|
||||
)
|
||||
|
||||
// StatItem is defined for the stat metrics.
|
||||
type StatItem struct {
|
||||
// Metrics catalog
|
||||
Type string
|
||||
|
||||
// The stat value
|
||||
Value uint32
|
||||
|
||||
// Attach some other info
|
||||
Attachment interface{}
|
||||
}
|
||||
|
||||
// StatSummary is used to collect some metrics of scheduler.
|
||||
type StatSummary struct {
|
||||
// Count of scheduled policy
|
||||
PolicyCount uint32
|
||||
|
||||
// Total count of tasks
|
||||
Tasks uint32
|
||||
|
||||
// Count of successfully complete tasks
|
||||
CompletedTasks uint32
|
||||
|
||||
// Count of tasks with errors
|
||||
TasksWithError uint32
|
||||
}
|
||||
|
||||
// Configuration defines configuration of Scheduler.
|
||||
type Configuration struct {
|
||||
QueueSize uint8
|
||||
}
|
||||
|
||||
// Scheduler is designed for scheduling policies.
|
||||
type Scheduler struct {
|
||||
// Mutex for sync controlling.
|
||||
*sync.RWMutex
|
||||
|
||||
// Related configuration options for scheduler.
|
||||
config *Configuration
|
||||
|
||||
// Store to keep the references of scheduled policies.
|
||||
policies Store
|
||||
|
||||
// Queue for receiving policy scheduling request
|
||||
scheduleQueue chan *Watcher
|
||||
|
||||
// Queue for receiving policy unscheduling request or complete signal.
|
||||
unscheduleQueue chan *Watcher
|
||||
|
||||
// Channel for receiving stat metrics.
|
||||
statChan chan *StatItem
|
||||
|
||||
// Channel for terminate scheduler damon.
|
||||
terminateChan chan bool
|
||||
|
||||
// The stat metrics of scheduler.
|
||||
stats *StatSummary
|
||||
|
||||
// To indicate whether scheduler is running or not
|
||||
isRunning bool
|
||||
}
|
||||
|
||||
// DefaultScheduler is a default scheduler.
|
||||
var DefaultScheduler = NewScheduler(nil)
|
||||
|
||||
// NewScheduler is constructor for creating a scheduler.
|
||||
func NewScheduler(config *Configuration) *Scheduler {
|
||||
var qSize uint8 = defaultQueueSize
|
||||
if config != nil && config.QueueSize > 0 {
|
||||
qSize = config.QueueSize
|
||||
}
|
||||
|
||||
sq := make(chan *Watcher, qSize)
|
||||
usq := make(chan *Watcher, qSize)
|
||||
stChan := make(chan *StatItem, 4)
|
||||
tc := make(chan bool, 1)
|
||||
|
||||
store := NewDefaultStore()
|
||||
return &Scheduler{
|
||||
RWMutex: new(sync.RWMutex),
|
||||
config: config,
|
||||
policies: store,
|
||||
scheduleQueue: sq,
|
||||
unscheduleQueue: usq,
|
||||
statChan: stChan,
|
||||
terminateChan: tc,
|
||||
stats: &StatSummary{
|
||||
PolicyCount: 0,
|
||||
Tasks: 0,
|
||||
CompletedTasks: 0,
|
||||
TasksWithError: 0,
|
||||
},
|
||||
isRunning: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Start the scheduler damon.
|
||||
func (sch *Scheduler) Start() {
|
||||
sch.Lock()
|
||||
defer sch.Unlock()
|
||||
|
||||
// If scheduler is already running
|
||||
if sch.isRunning {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Errorf("Runtime error in scheduler:%s\n", r)
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
// Clear resources
|
||||
sch.policies.Clear()
|
||||
log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339))
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-sch.terminateChan:
|
||||
// Exit
|
||||
return
|
||||
case wt := <-sch.scheduleQueue:
|
||||
// If status is stopped, no requests should be served
|
||||
if !sch.IsRunning() {
|
||||
continue
|
||||
}
|
||||
go func(watcher *Watcher) {
|
||||
if watcher != nil && watcher.p != nil {
|
||||
// Enable it.
|
||||
watcher.Start()
|
||||
|
||||
// Update stats and log info.
|
||||
log.Infof("Policy %s is scheduled", watcher.p.Name())
|
||||
sch.statChan <- &StatItem{statSchedulePolicy, 1, nil}
|
||||
}
|
||||
}(wt)
|
||||
case wt := <-sch.unscheduleQueue:
|
||||
// If status is stopped, no requests should be served
|
||||
if !sch.IsRunning() {
|
||||
continue
|
||||
}
|
||||
go func(watcher *Watcher) {
|
||||
if watcher != nil && watcher.IsRunning() {
|
||||
watcher.Stop()
|
||||
|
||||
// Update stats and log info.
|
||||
log.Infof("Policy %s is unscheduled", watcher.p.Name())
|
||||
sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil}
|
||||
}
|
||||
}(wt)
|
||||
case stat := <-sch.statChan:
|
||||
{
|
||||
// If status is stopped, no requests should be served
|
||||
if !sch.IsRunning() {
|
||||
continue
|
||||
}
|
||||
switch stat.Type {
|
||||
case statSchedulePolicy:
|
||||
sch.stats.PolicyCount += stat.Value
|
||||
break
|
||||
case statUnSchedulePolicy:
|
||||
sch.stats.PolicyCount -= stat.Value
|
||||
break
|
||||
case statTaskRun:
|
||||
sch.stats.Tasks += stat.Value
|
||||
break
|
||||
case statTaskComplete:
|
||||
sch.stats.CompletedTasks += stat.Value
|
||||
break
|
||||
case statTaskFail:
|
||||
sch.stats.TasksWithError += stat.Value
|
||||
break
|
||||
default:
|
||||
break
|
||||
}
|
||||
log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n",
|
||||
sch.stats.PolicyCount,
|
||||
sch.stats.Tasks,
|
||||
sch.stats.CompletedTasks,
|
||||
sch.stats.TasksWithError)
|
||||
|
||||
if stat.Attachment != nil &&
|
||||
reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" {
|
||||
log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
sch.isRunning = true
|
||||
log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339))
|
||||
}
|
||||
|
||||
// Stop the scheduler damon.
|
||||
func (sch *Scheduler) Stop() {
|
||||
// Lock for state changing
|
||||
sch.Lock()
|
||||
|
||||
// Check if the scheduler is running
|
||||
if !sch.isRunning {
|
||||
sch.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
sch.isRunning = false
|
||||
sch.Unlock()
|
||||
|
||||
// Terminate damon to stop receiving signals.
|
||||
sch.terminateChan <- true
|
||||
}
|
||||
|
||||
// Schedule and enable the policy.
|
||||
func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error {
|
||||
if scheduledPolicy == nil {
|
||||
return errors.New("nil is not Policy object")
|
||||
}
|
||||
|
||||
if strings.TrimSpace(scheduledPolicy.Name()) == "" {
|
||||
return errors.New("Policy should be assigned a name")
|
||||
}
|
||||
|
||||
tasks := scheduledPolicy.Tasks()
|
||||
if tasks == nil || len(tasks) == 0 {
|
||||
return errors.New("Policy must attach task(s)")
|
||||
}
|
||||
|
||||
// Try to schedule the policy.
|
||||
// Keep the policy for future use after it's successfully scheduled.
|
||||
watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue)
|
||||
if err := sch.policies.Put(scheduledPolicy.Name(), watcher); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Schedule the policy
|
||||
sch.scheduleQueue <- watcher
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnSchedule the specified policy from the enabled policies list.
|
||||
func (sch *Scheduler) UnSchedule(policyName string) error {
|
||||
if strings.TrimSpace(policyName) == "" {
|
||||
return errors.New("Empty policy name is invalid")
|
||||
}
|
||||
|
||||
// Find the watcher.
|
||||
watcher := sch.policies.Remove(policyName)
|
||||
if watcher == nil {
|
||||
return fmt.Errorf("Policy %s is not existing", policyName)
|
||||
}
|
||||
|
||||
// Unschedule the policy.
|
||||
sch.unscheduleQueue <- watcher
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning to indicate whether the scheduler is running.
|
||||
func (sch *Scheduler) IsRunning() bool {
|
||||
sch.RLock()
|
||||
defer sch.RUnlock()
|
||||
|
||||
return sch.isRunning
|
||||
}
|
||||
|
||||
// HasScheduled is to check whether the given policy has been scheduled or not.
|
||||
func (sch *Scheduler) HasScheduled(policyName string) bool {
|
||||
return sch.policies.Exists(policyName)
|
||||
}
|
||||
|
||||
// GetPolicy is used to get related policy reference by its name.
|
||||
func (sch *Scheduler) GetPolicy(policyName string) policy.Policy {
|
||||
wk := sch.policies.Get(policyName)
|
||||
if wk != nil {
|
||||
return wk.p
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PolicyCount returns the count of currently scheduled policies in the scheduler.
|
||||
func (sch *Scheduler) PolicyCount() uint32 {
|
||||
return sch.policies.Size()
|
||||
}
|
@ -1,145 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Store define the basic operations for storing and managing policy watcher.
|
||||
type Store interface {
|
||||
// Put a new policy in.
|
||||
Put(key string, value *Watcher) error
|
||||
|
||||
// Get the corresponding policy with the key.
|
||||
Get(key string) *Watcher
|
||||
|
||||
// Exists is to check if the key existing in the store.
|
||||
Exists(key string) bool
|
||||
|
||||
// Remove the specified policy and return its reference.
|
||||
Remove(key string) *Watcher
|
||||
|
||||
// Size return the total count of items in store.
|
||||
Size() uint32
|
||||
|
||||
// GetAll is to get all the items in the store.
|
||||
GetAll() []*Watcher
|
||||
|
||||
// Clear store.
|
||||
Clear()
|
||||
}
|
||||
|
||||
// DefaultStore implements Store interface to keep the scheduled policies.
|
||||
// Not support concurrent sync.
|
||||
type DefaultStore struct {
|
||||
// Support sync locking
|
||||
*sync.RWMutex
|
||||
|
||||
// Map used to keep the policy list.
|
||||
data map[string]*Watcher
|
||||
}
|
||||
|
||||
// NewDefaultStore is used to create a new store and return the pointer reference.
|
||||
func NewDefaultStore() *DefaultStore {
|
||||
return &DefaultStore{new(sync.RWMutex), make(map[string]*Watcher)}
|
||||
}
|
||||
|
||||
// Put a policy into store.
|
||||
func (cs *DefaultStore) Put(key string, value *Watcher) error {
|
||||
if strings.TrimSpace(key) == "" || value == nil {
|
||||
return errors.New("Bad arguments")
|
||||
}
|
||||
|
||||
cs.Lock()
|
||||
defer cs.Unlock()
|
||||
|
||||
if _, ok := cs.data[key]; ok {
|
||||
return fmt.Errorf("Duplicayed policy with name %s", key)
|
||||
}
|
||||
|
||||
cs.data[key] = value
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get policy via key.
|
||||
func (cs *DefaultStore) Get(key string) *Watcher {
|
||||
if strings.TrimSpace(key) == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
cs.RLock()
|
||||
defer cs.RUnlock()
|
||||
|
||||
return cs.data[key]
|
||||
}
|
||||
|
||||
// Exists is used to check whether or not the key exists in store.
|
||||
func (cs *DefaultStore) Exists(key string) bool {
|
||||
if strings.TrimSpace(key) == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
cs.RLock()
|
||||
defer cs.RUnlock()
|
||||
|
||||
_, ok := cs.data[key]
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
// Remove is to delete the specified policy.
|
||||
func (cs *DefaultStore) Remove(key string) *Watcher {
|
||||
if strings.TrimSpace(key) == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
cs.Lock()
|
||||
defer cs.Unlock()
|
||||
|
||||
if wt, ok := cs.data[key]; ok {
|
||||
delete(cs.data, key)
|
||||
return wt
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Size return the total count of items in store.
|
||||
func (cs *DefaultStore) Size() uint32 {
|
||||
cs.RLock()
|
||||
defer cs.RUnlock()
|
||||
|
||||
return (uint32)(len(cs.data))
|
||||
}
|
||||
|
||||
// GetAll to get all the items of store.
|
||||
func (cs *DefaultStore) GetAll() []*Watcher {
|
||||
cs.RLock()
|
||||
defer cs.RUnlock()
|
||||
|
||||
all := []*Watcher{}
|
||||
|
||||
for _, v := range cs.data {
|
||||
all = append(all, v)
|
||||
}
|
||||
|
||||
return all
|
||||
}
|
||||
|
||||
// Clear all the items in store.
|
||||
func (cs *DefaultStore) Clear() {
|
||||
cs.Lock()
|
||||
defer cs.Unlock()
|
||||
|
||||
if (uint32)(len(cs.data)) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for k, v := range cs.data {
|
||||
delete(cs.data, k)
|
||||
v.Stop()
|
||||
}
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPut(t *testing.T) {
|
||||
store := NewDefaultStore()
|
||||
if store == nil {
|
||||
t.Fatal("Failed to creat store instance")
|
||||
}
|
||||
|
||||
store.Put("testing", NewWatcher(nil, nil, nil))
|
||||
if store.Size() != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
store := NewDefaultStore()
|
||||
if store == nil {
|
||||
t.Fatal("Failed to creat store instance")
|
||||
}
|
||||
store.Put("testing", NewWatcher(nil, nil, nil))
|
||||
w := store.Get("testing")
|
||||
if w == nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
store := NewDefaultStore()
|
||||
if store == nil {
|
||||
t.Fatal("Failed to creat store instance")
|
||||
}
|
||||
store.Put("testing", NewWatcher(nil, nil, nil))
|
||||
if !store.Exists("testing") {
|
||||
t.Fail()
|
||||
}
|
||||
w := store.Remove("testing")
|
||||
if w == nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestExisting(t *testing.T) {
|
||||
store := NewDefaultStore()
|
||||
if store == nil {
|
||||
t.Fatal("Failed to creat store instance")
|
||||
}
|
||||
store.Put("testing", NewWatcher(nil, nil, nil))
|
||||
if !store.Exists("testing") {
|
||||
t.Fail()
|
||||
}
|
||||
if store.Exists("fake_key") {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAll(t *testing.T) {
|
||||
store := NewDefaultStore()
|
||||
if store == nil {
|
||||
t.Fatal("Failed to creat store instance")
|
||||
}
|
||||
store.Put("testing", NewWatcher(nil, nil, nil))
|
||||
store.Put("testing2", NewWatcher(nil, nil, nil))
|
||||
list := store.GetAll()
|
||||
if list == nil || len(list) != 2 {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -1,149 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/common/scheduler/policy"
|
||||
"github.com/goharbor/harbor/src/common/scheduler/task"
|
||||
)
|
||||
|
||||
type fakePolicy struct {
|
||||
tasks []task.Task
|
||||
done chan bool
|
||||
evaluation chan bool
|
||||
terminate chan bool
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Name() string {
|
||||
return "testing policy"
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Tasks() []task.Task {
|
||||
return fp.tasks
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error {
|
||||
fp.tasks = append(fp.tasks, tasks...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Done() <-chan bool {
|
||||
return fp.done
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Evaluate() (<-chan bool, error) {
|
||||
fp.evaluation = make(chan bool, 1)
|
||||
fp.done = make(chan bool)
|
||||
fp.terminate = make(chan bool)
|
||||
|
||||
fp.evaluation <- true
|
||||
go func() {
|
||||
fp.ticker = time.NewTicker(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-fp.terminate:
|
||||
return
|
||||
case <-fp.ticker.C:
|
||||
fp.evaluation <- true
|
||||
}
|
||||
}
|
||||
}()
|
||||
return fp.evaluation, nil
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Disable() error {
|
||||
if fp.ticker != nil {
|
||||
fp.ticker.Stop()
|
||||
}
|
||||
|
||||
fp.terminate <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) Equal(policy.Policy) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (fp *fakePolicy) IsEnabled() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type fakeTask struct {
|
||||
number int32
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Name() string {
|
||||
return "for testing"
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Run() error {
|
||||
atomic.AddInt32(&(ft.number), 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ft *fakeTask) Number() int32 {
|
||||
return atomic.LoadInt32(&(ft.number))
|
||||
}
|
||||
|
||||
// Wacher will be tested together with scheduler.
|
||||
func TestScheduler(t *testing.T) {
|
||||
DefaultScheduler.Start()
|
||||
if DefaultScheduler.policies.Size() != 0 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if DefaultScheduler.stats.PolicyCount != 0 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if !DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Scheduler is not started")
|
||||
}
|
||||
|
||||
fp := &fakePolicy{
|
||||
tasks: []task.Task{},
|
||||
}
|
||||
fk := &fakeTask{number: 100}
|
||||
fp.AttachTasks(fk)
|
||||
|
||||
if DefaultScheduler.Schedule(fp) != nil {
|
||||
t.Fatal("Schedule policy failed")
|
||||
}
|
||||
if DefaultScheduler.policies.Size() == 0 {
|
||||
t.Fatal("No policy in the store after calling Schedule()")
|
||||
}
|
||||
if DefaultScheduler.GetPolicy(fp.Name()) == nil {
|
||||
t.Fatal("Failed to get poicy by name")
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
if fk.Number() == 100 {
|
||||
t.Fatal("Task is not triggered")
|
||||
}
|
||||
|
||||
if DefaultScheduler.UnSchedule(fp.Name()) != nil {
|
||||
t.Fatal("Unschedule policy failed")
|
||||
}
|
||||
|
||||
if DefaultScheduler.PolicyCount() != 0 {
|
||||
t.Fatal("Policy count does not match after calling UnSchedule()")
|
||||
}
|
||||
|
||||
var copiedValue int32
|
||||
<-time.After(1 * time.Second)
|
||||
atomic.StoreInt32(&copiedValue, fk.Number())
|
||||
<-time.After(2 * time.Second)
|
||||
|
||||
if atomic.LoadInt32(&copiedValue) != fk.Number() {
|
||||
t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", atomic.LoadInt32(&copiedValue), fk.Number())
|
||||
}
|
||||
|
||||
DefaultScheduler.Stop()
|
||||
<-time.After(1 * time.Second)
|
||||
if DefaultScheduler.PolicyCount() != 0 || DefaultScheduler.IsRunning() {
|
||||
t.Fatal("Scheduler is still running after stopping")
|
||||
}
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/core/notifier"
|
||||
"github.com/goharbor/harbor/src/replication/event/notification"
|
||||
"github.com/goharbor/harbor/src/replication/event/topic"
|
||||
)
|
||||
|
||||
// Task is the task for triggering one replication
|
||||
type Task struct {
|
||||
PolicyID int64
|
||||
}
|
||||
|
||||
// NewTask is constructor of creating ReplicationTask
|
||||
func NewTask(policyID int64) *Task {
|
||||
return &Task{
|
||||
PolicyID: policyID,
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the name of this task
|
||||
func (t *Task) Name() string {
|
||||
return "replication"
|
||||
}
|
||||
|
||||
// Run the actions here
|
||||
func (t *Task) Run() error {
|
||||
return notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
|
||||
PolicyID: t.PolicyID,
|
||||
})
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
package replication
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestTask(t *testing.T) {
|
||||
tk := NewTask(1)
|
||||
if tk == nil {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if tk.Name() != "replication" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/core/utils"
|
||||
)
|
||||
|
||||
// ScanAllTask is task of scanning all tags.
|
||||
type ScanAllTask struct{}
|
||||
|
||||
// NewScanAllTask is constructor of creating ScanAllTask.
|
||||
func NewScanAllTask() *ScanAllTask {
|
||||
return &ScanAllTask{}
|
||||
}
|
||||
|
||||
// Name returns the name of the task.
|
||||
func (sat *ScanAllTask) Name() string {
|
||||
return "scan all"
|
||||
}
|
||||
|
||||
// Run the actions.
|
||||
func (sat *ScanAllTask) Run() error {
|
||||
return utils.ScanAllImages()
|
||||
}
|
@ -1,16 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestScanAllTask(t *testing.T) {
|
||||
tk := NewScanAllTask()
|
||||
if tk == nil {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if tk.Name() != "scan all" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -1,10 +0,0 @@
|
||||
package task
|
||||
|
||||
// Task is used to synchronously run specific action(s).
|
||||
type Task interface {
|
||||
// Name should return the name of the task.
|
||||
Name() string
|
||||
|
||||
// Run the concrete code here
|
||||
Run() error
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Store is designed to keep the tasks.
|
||||
type Store interface {
|
||||
// GetTasks return the current existing list in store.
|
||||
GetTasks() []Task
|
||||
|
||||
// AddTasks is used to append tasks to the list.
|
||||
AddTasks(tasks ...Task)
|
||||
}
|
||||
|
||||
// DefaultStore is the default implemetation of Store interface.
|
||||
type DefaultStore struct {
|
||||
// To sync the related operations.
|
||||
*sync.RWMutex
|
||||
|
||||
// The space to keep the tasks.
|
||||
tasks []Task
|
||||
}
|
||||
|
||||
// NewDefaultStore is constructor method for DefaultStore.
|
||||
func NewDefaultStore() *DefaultStore {
|
||||
return &DefaultStore{new(sync.RWMutex), []Task{}}
|
||||
}
|
||||
|
||||
// GetTasks implements the same method in Store interface.
|
||||
func (ds *DefaultStore) GetTasks() []Task {
|
||||
copyList := []Task{}
|
||||
|
||||
ds.RLock()
|
||||
defer ds.RUnlock()
|
||||
|
||||
if ds.tasks != nil && len(ds.tasks) > 0 {
|
||||
copyList = append(copyList, ds.tasks...)
|
||||
}
|
||||
|
||||
return copyList
|
||||
}
|
||||
|
||||
// AddTasks implements the same method in Store interface.
|
||||
func (ds *DefaultStore) AddTasks(tasks ...Task) {
|
||||
// Double confirm.
|
||||
if ds.tasks == nil {
|
||||
ds.tasks = []Task{}
|
||||
}
|
||||
|
||||
ds.Lock()
|
||||
defer ds.Unlock()
|
||||
|
||||
ds.tasks = append(ds.tasks, tasks...)
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTaskList(t *testing.T) {
|
||||
ds := NewDefaultStore()
|
||||
if ds.tasks == nil {
|
||||
t.Fatal("Failed to create store")
|
||||
}
|
||||
|
||||
go func() {
|
||||
var count int32
|
||||
for {
|
||||
ds.AddTasks(NewScanAllTask())
|
||||
atomic.AddInt32(&count, 1)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if atomic.LoadInt32(&count) > 9 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
var count int32
|
||||
for {
|
||||
ds.GetTasks()
|
||||
atomic.AddInt32(&count, 1)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if atomic.LoadInt32(&count) > 8 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-time.After(2 * time.Second)
|
||||
|
||||
var taskCount int32
|
||||
atomic.StoreInt32(&taskCount, (int32)(len(ds.GetTasks())))
|
||||
|
||||
if atomic.LoadInt32(&taskCount) != 10 {
|
||||
t.Fatalf("Expect %d tasks but got %d", 10, atomic.LoadInt32(&taskCount))
|
||||
}
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/common/scheduler/policy"
|
||||
"github.com/goharbor/harbor/src/common/scheduler/task"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Watcher is an asynchronous runner to provide an evaluation environment for the policy.
|
||||
type Watcher struct {
|
||||
// Locker to sync related operations.
|
||||
*sync.RWMutex
|
||||
|
||||
// The target policy.
|
||||
p policy.Policy
|
||||
|
||||
// The channel for receive stop signal.
|
||||
cmdChan chan bool
|
||||
|
||||
// Indicate whether the watcher is started and running.
|
||||
isRunning bool
|
||||
|
||||
// Report stats to scheduler.
|
||||
stats chan *StatItem
|
||||
|
||||
// If policy is automatically completed, report the policy to scheduler.
|
||||
doneChan chan *Watcher
|
||||
}
|
||||
|
||||
// NewWatcher is used as a constructor.
|
||||
func NewWatcher(p policy.Policy, st chan *StatItem, done chan *Watcher) *Watcher {
|
||||
return &Watcher{
|
||||
RWMutex: new(sync.RWMutex),
|
||||
p: p,
|
||||
cmdChan: make(chan bool),
|
||||
isRunning: false,
|
||||
stats: st,
|
||||
doneChan: done,
|
||||
}
|
||||
}
|
||||
|
||||
// Start the running.
|
||||
func (wc *Watcher) Start() {
|
||||
// Lock for state changing
|
||||
wc.Lock()
|
||||
defer wc.Unlock()
|
||||
|
||||
if wc.isRunning {
|
||||
return
|
||||
}
|
||||
|
||||
if wc.p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
go func(pl policy.Policy) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Errorf("Runtime error in watcher:%s\n", r)
|
||||
}
|
||||
}()
|
||||
|
||||
evalChan, err := pl.Evaluate()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error())
|
||||
return
|
||||
}
|
||||
done := pl.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-evalChan:
|
||||
{
|
||||
// If worker is not running, should not response any requests.
|
||||
if !wc.IsRunning() {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("Receive evaluation signal from policy '%s'\n", pl.Name())
|
||||
// Start to run the attached tasks.
|
||||
for _, t := range pl.Tasks() {
|
||||
go func(tk task.Task) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)}
|
||||
if wc.stats != nil {
|
||||
wc.stats <- st
|
||||
}
|
||||
}
|
||||
}()
|
||||
err := tk.Run()
|
||||
|
||||
// Report task execution stats.
|
||||
st := &StatItem{statTaskComplete, 1, err}
|
||||
if err != nil {
|
||||
st.Type = statTaskFail
|
||||
}
|
||||
if wc.stats != nil {
|
||||
wc.stats <- st
|
||||
}
|
||||
}(t)
|
||||
|
||||
// Report task run stats.
|
||||
st := &StatItem{statTaskRun, 1, nil}
|
||||
if wc.stats != nil {
|
||||
wc.stats <- st
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-done:
|
||||
{
|
||||
// Policy is automatically completed.
|
||||
// Report policy change stats.
|
||||
if wc.doneChan != nil {
|
||||
wc.doneChan <- wc
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
case <-wc.cmdChan:
|
||||
// Exit goroutine.
|
||||
return
|
||||
}
|
||||
}
|
||||
}(wc.p)
|
||||
|
||||
wc.isRunning = true
|
||||
}
|
||||
|
||||
// Stop the running.
|
||||
func (wc *Watcher) Stop() {
|
||||
// Lock for state changing
|
||||
wc.Lock()
|
||||
if !wc.isRunning {
|
||||
wc.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
wc.isRunning = false
|
||||
wc.Unlock()
|
||||
|
||||
// Disable policy.
|
||||
if wc.p != nil {
|
||||
wc.p.Disable()
|
||||
}
|
||||
|
||||
// Stop watcher.
|
||||
wc.cmdChan <- true
|
||||
|
||||
log.Infof("Worker for policy %s is stopped.\n", wc.p.Name())
|
||||
}
|
||||
|
||||
// IsRunning to indicate if the watcher is still running.
|
||||
func (wc *Watcher) IsRunning() bool {
|
||||
wc.RLock()
|
||||
defer wc.RUnlock()
|
||||
|
||||
return wc.isRunning
|
||||
}
|
Loading…
Reference in New Issue
Block a user