From cee0bcec22cef1958a2a283763192ca8ff116e56 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Tue, 31 Oct 2017 19:43:53 +0800 Subject: [PATCH] Update the alternate policy and corresponding task to support by weekly besides daily --- .../scan_policy_notitification_handler.go | 4 +- .../scheduler/policy/alternate_policy.go | 65 +++++++++++++++++-- .../scheduler/policy/alternate_policy_test.go | 41 ++++++++++-- src/common/scheduler/policy/policy.go | 1 + src/common/scheduler/policy/uuid.go | 22 +++++++ src/common/scheduler/task/replication_task.go | 24 +++++++ .../scheduler/task/replication_task_test.go | 14 ++++ .../scheduler/task/scan_all_task_test.go | 2 +- 8 files changed, 157 insertions(+), 16 deletions(-) create mode 100644 src/common/scheduler/policy/uuid.go create mode 100644 src/common/scheduler/task/replication_task.go create mode 100644 src/common/scheduler/task/replication_task_test.go diff --git a/src/common/notifier/scan_policy_notitification_handler.go b/src/common/notifier/scan_policy_notitification_handler.go index 076019f06..e3f7d3530 100644 --- a/src/common/notifier/scan_policy_notitification_handler.go +++ b/src/common/notifier/scan_policy_notitification_handler.go @@ -63,7 +63,7 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { //To check and compare if the related parameter is changed. if pl := scheduler.DefaultScheduler.GetPolicy(alternatePolicy); pl != nil { - policyCandidate := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ + policyCandidate := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{ Duration: 24 * time.Hour, OffsetTime: notification.DailyTime, }) @@ -95,7 +95,7 @@ func (s *ScanPolicyNotificationHandler) Handle(value interface{}) error { //Schedule policy. func schedulePolicy(notification ScanPolicyNotification) error { - schedulePolicy := policy.NewAlternatePolicy(&policy.AlternatePolicyConfiguration{ + schedulePolicy := policy.NewAlternatePolicy(alternatePolicy, &policy.AlternatePolicyConfiguration{ Duration: 24 * time.Hour, OffsetTime: notification.DailyTime, }) diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go index e37b466f4..dc1d3864f 100644 --- a/src/common/scheduler/policy/alternate_policy.go +++ b/src/common/scheduler/policy/alternate_policy.go @@ -10,11 +10,22 @@ import ( "github.com/vmware/harbor/src/common/utils/log" ) +const ( + oneDay = 24 * 3600 +) + //AlternatePolicyConfiguration store the related configurations for alternate policy. type AlternatePolicyConfiguration struct { //Duration is the interval of executing attached tasks. + //E.g: 24*3600 for daily + // 7*24*3600 for weekly Duration time.Duration + //An integer to indicate the the weekday of the week. Please be noted that Sunday is 7. + //Use default value 0 to indicate weekday is not set. + //To support by weekly function. + Weekday int8 + //OffsetTime is the execution time point of each turn //It's a number to indicate the seconds offset to the 00:00 of UTC time. OffsetTime int64 @@ -42,16 +53,21 @@ type AlternatePolicy struct { //Channel used to receive terminate signal. terminator chan bool + + //Unique name of this policy to support multiple instances + name string } //NewAlternatePolicy is constructor of creating AlternatePolicy. -func NewAlternatePolicy(config *AlternatePolicyConfiguration) *AlternatePolicy { +//Accept name and configuration as parameters. +func NewAlternatePolicy(name string, config *AlternatePolicyConfiguration) *AlternatePolicy { return &AlternatePolicy{ RWMutex: new(sync.RWMutex), tasks: task.NewDefaultStore(), config: config, isEnabled: false, terminator: make(chan bool), + name: name, } } @@ -62,7 +78,7 @@ func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration { //Name is an implementation of same method in policy interface. func (alp *AlternatePolicy) Name() string { - return "Alternate Policy" + return alp.name } //Tasks is an implementation of same method in policy interface. @@ -110,6 +126,11 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { defer alp.Unlock() alp.Lock() + //Check if configuration is valid + if !alp.isValidConfig() { + return nil, errors.New("Policy configuration is not valid") + } + //Check if policy instance is still running if alp.isEnabled { return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name()) @@ -124,19 +145,41 @@ func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { alp.evaluation = make(chan bool) go func() { + var ( + waitingTime int64 + ) timeNow := time.Now().UTC() //Reach the execution time point? + //Weekday is set + if alp.config.Weekday > 0 { + targetWeekday := (alp.config.Weekday + 7) % 7 + currentWeekday := timeNow.Weekday() + weekdayDiff := (int)(targetWeekday - (int8)(currentWeekday)) + if weekdayDiff < 0 { + weekdayDiff += 7 + } + waitingTime = (int64)(weekdayDiff * oneDay) + } + + //Time utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60) diff := alp.config.OffsetTime - utcTime - if diff < 0 { - diff += 24 * 3600 + if waitingTime > 0 { + waitingTime += diff + } else { + waitingTime = diff + if waitingTime < 0 { + waitingTime += oneDay + } } - if diff > 0 { + + //Let's wait for a while + if waitingTime > 0 { //Wait for a while. log.Infof("Waiting for %d seconds after comparing offset %d and utc time %d\n", diff, alp.config.OffsetTime, utcTime) select { - case <-time.After(time.Duration(diff) * time.Second): + case <-time.After(time.Duration(waitingTime) * time.Second): case <-alp.terminator: return } @@ -188,7 +231,10 @@ func (alp *AlternatePolicy) Equal(p Policy) bool { return false } - return cfg == nil || (cfg.Duration == cfg2.Duration && cfg.OffsetTime == cfg2.OffsetTime) + return cfg == nil || + (cfg.Duration == cfg2.Duration && + cfg.OffsetTime == cfg2.OffsetTime && + cfg.Weekday == cfg2.Weekday) } //IsEnabled is an implementation of same method in policy interface. @@ -198,3 +244,8 @@ func (alp *AlternatePolicy) IsEnabled() bool { return alp.isEnabled } + +//Check if the config is valid. At least it should have the configurations for supporting daily policy. +func (alp *AlternatePolicy) isValidConfig() bool { + return alp.config != nil && alp.config.Duration > 0 && alp.config.OffsetTime >= 0 +} diff --git a/src/common/scheduler/policy/alternate_policy_test.go b/src/common/scheduler/policy/alternate_policy_test.go index 777fe59de..5a3eda4e3 100644 --- a/src/common/scheduler/policy/alternate_policy_test.go +++ b/src/common/scheduler/policy/alternate_policy_test.go @@ -6,6 +6,10 @@ import ( "time" ) +const ( + testPolicyName = "TestingPolicy" +) + type fakeTask struct { number int32 } @@ -24,18 +28,18 @@ func (ft *fakeTask) Number() int32 { } func TestBasic(t *testing.T) { - tp := NewAlternatePolicy(&AlternatePolicyConfiguration{}) + tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{}) err := tp.AttachTasks(&fakeTask{number: 100}) if err != nil { t.Fail() } if tp.GetConfig() == nil { - t.Fail() + t.Fatal("nil config") } - if tp.Name() != "Alternate Policy" { - t.Fail() + if tp.Name() != testPolicyName { + t.Fatalf("Wrong name %s", tp.Name()) } tks := tp.Tasks() @@ -48,7 +52,7 @@ func TestBasic(t *testing.T) { func TestEvaluatePolicy(t *testing.T) { now := time.Now().UTC() utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) - tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ + tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ Duration: 1 * time.Second, OffsetTime: utcOffset + 1, }) @@ -78,7 +82,7 @@ func TestEvaluatePolicy(t *testing.T) { func TestDisablePolicy(t *testing.T) { now := time.Now().UTC() utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) - tp := NewAlternatePolicy(&AlternatePolicyConfiguration{ + tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ Duration: 1 * time.Second, OffsetTime: utcOffset + 1, }) @@ -118,3 +122,28 @@ func TestDisablePolicy(t *testing.T) { t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter)) } } + +func TestPolicyEqual(t *testing.T) { + tp1 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ + Duration: 1 * time.Second, + OffsetTime: 8000, + }) + + tp2 := NewAlternatePolicy(testPolicyName+"2", &AlternatePolicyConfiguration{ + Duration: 100 * time.Second, + OffsetTime: 8000, + }) + + if tp1.Equal(tp2) { + t.Fatal("tp1 should not equal tp2") + } + + tp3 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ + Duration: 1 * time.Second, + OffsetTime: 8000, + }) + + if !tp1.Equal(tp3) { + t.Fatal("tp1 should equal tp3") + } +} diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go index 732d56cca..4fc72c0d6 100644 --- a/src/common/scheduler/policy/policy.go +++ b/src/common/scheduler/policy/policy.go @@ -15,6 +15,7 @@ import ( // type Policy interface { //Name will return the name of the policy. + //If the policy supports multiple instances, please make sure the name is unique as an UUID. Name() string //Tasks will return the attached tasks with this policy. diff --git a/src/common/scheduler/policy/uuid.go b/src/common/scheduler/policy/uuid.go new file mode 100644 index 000000000..8bd1bd72c --- /dev/null +++ b/src/common/scheduler/policy/uuid.go @@ -0,0 +1,22 @@ +package policy + +import ( + "crypto/rand" + "fmt" + "io" +) + +//NewUUID will generate a new UUID. +//Code copied from https://play.golang.org/p/4FkNSiUDMg +func newUUID() (string, error) { + uuid := make([]byte, 16) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + return "", err + } + + uuid[8] = uuid[8]&^0xc0 | 0x80 + uuid[6] = uuid[6]&^0xf0 | 0x40 + + return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil +} diff --git a/src/common/scheduler/task/replication_task.go b/src/common/scheduler/task/replication_task.go new file mode 100644 index 000000000..aaefcc571 --- /dev/null +++ b/src/common/scheduler/task/replication_task.go @@ -0,0 +1,24 @@ +package task + +import ( + "errors" +) + +//ReplicationTask is the task for triggering one replication +type ReplicationTask struct{} + +//NewReplicationTask is constructor of creating ReplicationTask +func NewReplicationTask() *ReplicationTask { + return &ReplicationTask{} +} + +//Name returns the name of this task +func (rt *ReplicationTask) Name() string { + return "replication" +} + +//Run the actions here +func (rt *ReplicationTask) Run() error { + //Trigger the replication here + return errors.New("Not implemented") +} diff --git a/src/common/scheduler/task/replication_task_test.go b/src/common/scheduler/task/replication_task_test.go new file mode 100644 index 000000000..cd0acd2c0 --- /dev/null +++ b/src/common/scheduler/task/replication_task_test.go @@ -0,0 +1,14 @@ +package task + +import "testing" + +func TestReplicationTask(t *testing.T) { + tk := NewReplicationTask() + if tk == nil { + t.Fail() + } + + if tk.Name() != "replication" { + t.Fail() + } +} diff --git a/src/common/scheduler/task/scan_all_task_test.go b/src/common/scheduler/task/scan_all_task_test.go index 18ac9202b..b7482fbfc 100644 --- a/src/common/scheduler/task/scan_all_task_test.go +++ b/src/common/scheduler/task/scan_all_task_test.go @@ -4,7 +4,7 @@ import ( "testing" ) -func TestTask(t *testing.T) { +func TestScanAllTask(t *testing.T) { tk := NewScanAllTask() if tk == nil { t.Fail()