Update the alternate policy and corresponding task to support by weekly besides daily

This commit is contained in:
Steven Zou 2017-10-31 19:43:53 +08:00
parent 410908d863
commit cee0bcec22
8 changed files with 157 additions and 16 deletions

View File

@ -63,7 +63,7 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
//To check and compare if the related parameter is changed.
if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil {
policyCandidate := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
policyCandidate := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{
Duration: 24 * time.Hour,
OffsetTime: notification.DailyTime,
})
@ -95,7 +95,7 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error {
//Schedule policy.
func schedulePolicy(notification ScanPolicyNotification) error {
schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{
schedulePolicy := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{
Duration: 24 * time.Hour,
OffsetTime: notification.DailyTime,
})

View File

@ -10,11 +10,22 @@ import (
"github.com/vmware/harbor/src/common/utils/log"
)
const (
oneDay = 24 * 3600
)
//AlternatePolicyConfiguration store the related configurations for alternate policy.
type AlternatePolicyConfiguration struct {
//Duration is the interval of executing attached tasks.
//E.g: 24*3600 for daily
// 7*24*3600 for weekly
Duration time.Duration
//An integer to indicate the the weekday of the week. Please be noted that Sunday is 7.
//Use default value 0 to indicate weekday is not set.
//To support by weekly function.
Weekday int8
//OffsetTime is the execution time point of each turn
//It's a number to indicate the seconds offset to the 00:00 of UTC time.
OffsetTime int64
@ -42,16 +53,21 @@ type AlternatePolicy struct {
//Channel used to receive terminate signal.
terminator chan bool
//Unique name of this policy to support multiple instances
name string
}
//NewAlternatePolicy is constructor of creating AlternatePolicy.
func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy {
//Accept name and configuration as parameters.
func NewAlternatePolicy(name string, config *AlternatePolicyConfiguration) *AlternatePolicy {
return &AlternatePolicy{
RWMutex: new(sync.RWMutex),
tasks: task.NewDefaultStore(),
config: config,
isEnabled: false,
terminator: make(chan bool),
name: name,
}
}
@ -62,7 +78,7 @@ func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration {
//Name is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Name() string {
return "Alternate Policy"
return alp.name
}
//Tasks is an implementation of same method in policy interface.
@ -110,6 +126,11 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
defer alp.Unlock()
alp.Lock()
//Check if configuration is valid
if !alp.isValidConfig() {
return nil, errors.New("Policy configuration is not valid")
}
//Check if policy instance is still running
if alp.isEnabled {
return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name())
@ -124,19 +145,41 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
alp.evaluation = make(chan bool)
go func() {
var (
waitingTime int64
)
timeNow := time.Now().UTC()
//Reach the execution time point?
//Weekday is set
if alp.config.Weekday > 0 {
targetWeekday := (alp.config.Weekday + 7) % 7
currentWeekday := timeNow.Weekday()
weekdayDiff := (int)(targetWeekday - (int8)(currentWeekday))
if weekdayDiff < 0 {
weekdayDiff += 7
}
waitingTime = (int64)(weekdayDiff * oneDay)
}
//Time
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
diff := alp.config.OffsetTime - utcTime
if diff < 0 {
diff += 24 * 3600
if waitingTime > 0 {
waitingTime += diff
} else {
waitingTime = diff
if waitingTime < 0 {
waitingTime += oneDay
}
}
if diff > 0 {
//Let's wait for a while
if waitingTime > 0 {
//Wait for a while.
log.Infof("Waiting for %d seconds after comparing offset %d and utc time %d\n", diff, alp.config.OffsetTime, utcTime)
select {
case <-time.After(time.Duration(diff) * time.Second):
case <-time.After(time.Duration(waitingTime) * time.Second):
case <-alp.terminator:
return
}
@ -188,7 +231,10 @@ func (alp *AlternatePolicy) Equal(p Policy) bool {
return false
}
return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime)
return cfg == nil ||
(cfg.Duration == cfg2.Duration &&
cfg.OffsetTime == cfg2.OffsetTime &&
cfg.Weekday == cfg2.Weekday)
}
//IsEnabled is an implementation of same method in policy interface.
@ -198,3 +244,8 @@ func (alp *AlternatePolicy) IsEnabled() bool {
return alp.isEnabled
}
//Check if the config is valid. At least it should have the configurations for supporting daily policy.
func (alp *AlternatePolicy) isValidConfig() bool {
return alp.config != nil && alp.config.Duration > 0 && alp.config.OffsetTime >= 0
}

View File

@ -6,6 +6,10 @@ import (
"time"
)
const (
testPolicyName = "TestingPolicy"
)
type fakeTask struct {
number int32
}
@ -24,18 +28,18 @@ func (ft *fakeTask) Number() int32 {
}
func TestBasic(t *testing.T) {
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{})
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{})
err := tp.AttachTasks(&fakeTask{number: 100})
if err != nil {
t.Fail()
}
if tp.GetConfig() == nil {
t.Fail()
t.Fatal("nil config")
}
if tp.Name() != "Alternate Policy" {
t.Fail()
if tp.Name() != testPolicyName {
t.Fatalf("Wrong name %s", tp.Name())
}
tks := tp.Tasks()
@ -48,7 +52,7 @@ func TestBasic(t *testing.T) {
func TestEvaluatePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
})
@ -78,7 +82,7 @@ func TestEvaluatePolicy(t *testing.T) {
func TestDisablePolicy(t *testing.T) {
now := time.Now().UTC()
utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60)
tp := NewAlternatePolicy(&AlternatePolicyConfiguration{
tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: utcOffset + 1,
})
@ -118,3 +122,28 @@ func TestDisablePolicy(t *testing.T) {
t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter))
}
}
func TestPolicyEqual(t *testing.T) {
tp1 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: 8000,
})
tp2 := NewAlternatePolicy(testPolicyName+"2", &AlternatePolicyConfiguration{
Duration: 100 * time.Second,
OffsetTime: 8000,
})
if tp1.Equal(tp2) {
t.Fatal("tp1 should not equal tp2")
}
tp3 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{
Duration: 1 * time.Second,
OffsetTime: 8000,
})
if !tp1.Equal(tp3) {
t.Fatal("tp1 should equal tp3")
}
}

View File

@ -15,6 +15,7 @@ import (
//
type Policy interface {
//Name will return the name of the policy.
//If the policy supports multiple instances, please make sure the name is unique as an UUID.
Name() string
//Tasks will return the attached tasks with this policy.

View File

@ -0,0 +1,22 @@
package policy
import (
"crypto/rand"
"fmt"
"io"
)
//NewUUID will generate a new UUID.
//Code copied from https://play.golang.org/p/4FkNSiUDMg
func newUUID() (string, error) {
uuid := make([]byte, 16)
n, err := io.ReadFull(rand.Reader, uuid)
if n != len(uuid) || err != nil {
return "", err
}
uuid[8] = uuid[8]&^0xc0 | 0x80
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}

View File

@ -0,0 +1,24 @@
package task
import (
"errors"
)
//ReplicationTask is the task for triggering one replication
type ReplicationTask struct{}
//NewReplicationTask is constructor of creating ReplicationTask
func NewReplicationTask() *ReplicationTask {
return &ReplicationTask{}
}
//Name returns the name of this task
func (rt *ReplicationTask) Name() string {
return "replication"
}
//Run the actions here
func (rt *ReplicationTask) Run() error {
//Trigger the replication here
return errors.New("Not implemented")
}

View File

@ -0,0 +1,14 @@
package task
import "testing"
func TestReplicationTask(t *testing.T) {
tk := NewReplicationTask()
if tk == nil {
t.Fail()
}
if tk.Name() != "replication" {
t.Fail()
}
}

View File

@ -4,7 +4,7 @@ import (
"testing"
)
func TestTask(t *testing.T) {
func TestScanAllTask(t *testing.T) {
tk := NewScanAllTask()
if tk == nil {
t.Fail()