Update policy scheduler according to comments

This commit is contained in:
Steven Zou 2017-07-07 21:02:16 +08:00
parent ff889cedde
commit 7ee052b9dd
9 changed files with 62 additions and 97 deletions

View File

@ -15,14 +15,6 @@ type AlternatePolicyConfiguration struct {
//OffsetTime is the execution time point of each turn //OffsetTime is the execution time point of each turn
//It's a number to indicate the seconds offset to the 00:00 of UTC time. //It's a number to indicate the seconds offset to the 00:00 of UTC time.
OffsetTime int64 OffsetTime int64
//StartTimestamp is the time should be later than start time.
//If set <=0 value, no limitation.
StartTimestamp int64
//EndTimestamp is the time should be earlier than end time.
//If set <=0 value, no limitation.
EndTimestamp int64
} }
//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope. //AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
@ -40,7 +32,7 @@ type AlternatePolicy struct {
isEnabled bool isEnabled bool
//Channel used to send evaluation result signals. //Channel used to send evaluation result signals.
evaluation chan EvaluationResult evaluation chan bool
//Channel used to notify policy termination. //Channel used to notify policy termination.
done chan bool done chan bool
@ -109,39 +101,21 @@ func (alp *AlternatePolicy) Disable() error {
} }
//Evaluate is an implementation of same method in policy interface. //Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() <-chan EvaluationResult { func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
//Keep idempotent //Keep idempotent
if alp.isEnabled && alp.evaluation != nil { if alp.isEnabled && alp.evaluation != nil {
return alp.evaluation return alp.evaluation, nil
} }
alp.done = make(chan bool) alp.done = make(chan bool)
alp.terminator = make(chan bool) alp.terminator = make(chan bool)
alp.evaluation = make(chan EvaluationResult) alp.evaluation = make(chan bool)
go func() { go func() {
defer func() { defer func() {
alp.isEnabled = false alp.isEnabled = false
}() }()
timeNow := time.Now().UTC() timeNow := time.Now().UTC()
timeSeconds := timeNow.Unix()
//Pre-check
//If now is still in the specified time scope.
if alp.config.EndTimestamp > 0 && timeSeconds >= alp.config.EndTimestamp {
//Invalid configuration, exit.
alp.done <- true
return
}
if alp.config.StartTimestamp > 0 && timeSeconds < alp.config.StartTimestamp {
//Let's hold on for a while.
forWhile := alp.config.StartTimestamp - timeSeconds
select {
case <-time.After(time.Duration(forWhile) * time.Second):
case <-alp.terminator:
return
}
}
//Reach the execution time point? //Reach the execution time point?
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60) utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
@ -159,29 +133,14 @@ func (alp *AlternatePolicy) Evaluate() <-chan EvaluationResult {
} }
//Trigger the first tick. //Trigger the first tick.
alp.evaluation <- EvaluationResult{} alp.evaluation <- true
//Start the ticker for repeat checking. //Start the ticker for repeat checking.
alp.ticker = time.NewTicker(alp.config.Duration) alp.ticker = time.NewTicker(alp.config.Duration)
for { for {
select { select {
case now := <-alp.ticker.C: case <-alp.ticker.C:
{ alp.evaluation <- true
time := now.UTC().Unix()
if alp.config.EndTimestamp > 0 && time >= alp.config.EndTimestamp {
//Ploicy is done.
alp.done <- true
if alp.ticker != nil {
alp.ticker.Stop()
}
alp.ticker = nil
return
}
res := EvaluationResult{}
alp.evaluation <- res
}
case <-alp.terminator: case <-alp.terminator:
return return
} }
@ -191,5 +150,5 @@ func (alp *AlternatePolicy) Evaluate() <-chan EvaluationResult {
//Enabled //Enabled
alp.isEnabled = true alp.isEnabled = true
return alp.evaluation return alp.evaluation, nil
} }

View File

@ -46,32 +46,28 @@ func TestEvaluatePolicy(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second, Duration: 1 * time.Second,
OffsetTime: utcOffset + 1, OffsetTime: utcOffset + 1,
StartTimestamp: -1,
EndTimestamp: now.Add(3 * time.Second).Unix(),
}) })
err := tp.AttachTasks(&fakeTask{number: 100}) err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil { if err != nil {
t.Fail() t.Fail()
} }
ch := tp.Evaluate() ch, _ := tp.Evaluate()
done := tp.Done()
counter := 0 counter := 0
READ_SIGNAL:
for { for i := 0; i < 3; i++ {
select { select {
case <-ch: case <-ch:
counter++ counter++
case <-done: case <-time.After(2 * time.Second):
break READ_SIGNAL continue
case <-time.After(5 * time.Second):
t.Fail()
return
} }
} }
if counter != 2 { if counter != 3 {
t.Fail() t.Fail()
} }
tp.Disable()
} }
func TestDisablePolicy(t *testing.T) { func TestDisablePolicy(t *testing.T) {
@ -80,14 +76,12 @@ func TestDisablePolicy(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
Duration: 1 * time.Second, Duration: 1 * time.Second,
OffsetTime: utcOffset + 1, OffsetTime: utcOffset + 1,
StartTimestamp: -1,
EndTimestamp: -1,
}) })
err := tp.AttachTasks(&fakeTask{number: 100}) err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil { if err != nil {
t.Fail() t.Fail()
} }
ch := tp.Evaluate() ch, _ := tp.Evaluate()
counter := 0 counter := 0
terminate := make(chan bool) terminate := make(chan bool)
defer func() { defer func() {
@ -109,7 +103,7 @@ func TestDisablePolicy(t *testing.T) {
if tp.Disable() != nil { if tp.Disable() != nil {
t.Fatal("Failed to disable policy") t.Fatal("Failed to disable policy")
} }
//Waiting for everything is stabel //Waiting for everything is stable
<-time.After(1 * time.Second) <-time.After(1 * time.Second)
//Copy value //Copy value
copiedCounter := counter copiedCounter := counter

View File

@ -1,15 +0,0 @@
package policy
//EvaluationResult is defined to carry the policy evaluated result.
//
//Filed 'Result' is optional.
//Filed 'Error' is optional
//
type EvaluationResult struct {
//Policy is successfully evaluated and the related information can
//be contained in Result if have.
Result interface{}
//Policy is failed to evaluated.
Error error
}

View File

@ -31,7 +31,7 @@ type Policy interface {
//result channel. Policy is enabled after it is evaluated. //result channel. Policy is enabled after it is evaluated.
//Make sure Evaluate is idempotent, that means one policy can be only enabled //Make sure Evaluate is idempotent, that means one policy can be only enabled
//only once even if Evaluate is called more than one times. //only once even if Evaluate is called more than one times.
Evaluate() <-chan EvaluationResult Evaluate() (<-chan bool, error)
//Disable the enabled policy and release all the allocated resources. //Disable the enabled policy and release all the allocated resources.
//Disable should also send signal to the terminated channel which returned by Done. //Disable should also send signal to the terminated channel which returned by Done.

View File

@ -4,14 +4,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task" "github.com/vmware/harbor/src/common/scheduler/task"
) )
type fakePolicy struct { type fakePolicy struct {
tasks []task.Task tasks []task.Task
done chan bool done chan bool
evaluation chan policy.EvaluationResult evaluation chan bool
terminate chan bool terminate chan bool
ticker *time.Ticker ticker *time.Ticker
} }
@ -33,12 +32,12 @@ func (fp *fakePolicy) Done() <-chan bool {
return fp.done return fp.done
} }
func (fp *fakePolicy) Evaluate() <-chan policy.EvaluationResult { func (fp *fakePolicy) Evaluate() (<-chan bool, error) {
fp.evaluation = make(chan policy.EvaluationResult, 2) fp.evaluation = make(chan bool, 1)
fp.done = make(chan bool) fp.done = make(chan bool)
fp.terminate = make(chan bool) fp.terminate = make(chan bool)
fp.evaluation <- policy.EvaluationResult{} fp.evaluation <- true
go func() { go func() {
fp.ticker = time.NewTicker(1 * time.Second) fp.ticker = time.NewTicker(1 * time.Second)
for { for {
@ -46,11 +45,11 @@ func (fp *fakePolicy) Evaluate() <-chan policy.EvaluationResult {
case <-fp.terminate: case <-fp.terminate:
return return
case <-fp.ticker.C: case <-fp.ticker.C:
fp.evaluation <- policy.EvaluationResult{} fp.evaluation <- true
} }
} }
}() }()
return fp.evaluation return fp.evaluation, nil
} }
func (fp *fakePolicy) Disable() error { func (fp *fakePolicy) Disable() error {
@ -136,7 +135,8 @@ func TestScheduler(t *testing.T) {
} }
DefaultScheduler.Stop() DefaultScheduler.Stop()
<-time.After(1 * time.Second)
if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() { if DefaultScheduler.policies.Size() != 0 || DefaultScheduler.IsRunning() {
t.Fatal("Scheduler is not cleared after stopping") t.Fatal("Scheduler is still running after stopping")
} }
} }

View File

@ -58,7 +58,11 @@ func (wc *Watcher) Start() {
wc.isRunning = false wc.isRunning = false
}() }()
evalChan := pl.Evaluate() evalChan, err := pl.Evaluate()
if err != nil {
log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error())
return
}
done := pl.Done() done := pl.Done()
for { for {

View File

@ -243,3 +243,23 @@ func TestParseHarborIDOrName(t *testing.T) {
assert.Equal(t, int64(0), id) assert.Equal(t, int64(0), id)
assert.Equal(t, "project", name) assert.Equal(t, "project", name)
} }
type testingStruct struct {
Name string
Count int
}
func TestConvertMapToStruct(t *testing.T) {
dataMap := make(map[string]interface{})
dataMap["Name"] = "testing"
dataMap["Count"] = 100
obj := &testingStruct{}
if err := ConvertMapToStruct(obj, dataMap); err != nil {
t.Fatal(err)
} else {
if obj.Name != "testing" || obj.Count != 100 {
t.Fail()
}
}
}

View File

@ -21,7 +21,7 @@ var jsonText = `
func TestWatchConfiguration(t *testing.T) { func TestWatchConfiguration(t *testing.T) {
now := time.Now().UTC() now := time.Now().UTC()
offset := (now.Hour+1)*3600 + now.Minute*60 offset := (now.Hour()+1)*3600 + now.Minute()*60
jsonT := strings.Replace(jsonText, "<PLACE_HOLDER>", strconv.Itoa(offset), -1) jsonT := strings.Replace(jsonText, "<PLACE_HOLDER>", strconv.Itoa(offset), -1)
v := make(map[string]interface{}) v := make(map[string]interface{})
if err := json.Unmarshal([]byte(jsonT), &v); err != nil { if err := json.Unmarshal([]byte(jsonT), &v); err != nil {

View File

@ -17,6 +17,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"reflect"
"github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
@ -111,11 +112,13 @@ func main() {
if scanAllPolicy.Type == notifier.PolicyTypeDaily { if scanAllPolicy.Type == notifier.PolicyTypeDaily {
dailyTime := 0 dailyTime := 0
if t, ok := scanAllPolicy.Parm["daily_time"]; ok { if t, ok := scanAllPolicy.Parm["daily_time"]; ok {
dailyTime = t if reflect.TypeOf(t).Kind() == reflect.Int {
dailyTime = t.(int)
}
} }
//Send notification to handle first policy change. //Send notification to handle first policy change.
notifier.publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{scanAllPolicy.Type, dailyTime}) notifier.Publish(notifier.ScanAllPolicyTopic, notifier.ScanPolicyNotification{Type: scanAllPolicy.Type, DailyTime: (int64)(dailyTime)})
} }
filter.Init() filter.Init()