diff --git a/src/common/scheduler/policy/alternate_policy.go b/src/common/scheduler/policy/alternate_policy.go deleted file mode 100644 index da9e7ba63..000000000 --- a/src/common/scheduler/policy/alternate_policy.go +++ /dev/null @@ -1,251 +0,0 @@ -package policy - -import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/goharbor/harbor/src/common/scheduler/task" - "github.com/goharbor/harbor/src/common/utils/log" -) - -const ( - oneDay = 24 * 3600 -) - -// AlternatePolicyConfiguration store the related configurations for alternate policy. -type AlternatePolicyConfiguration struct { - // Duration is the interval of executing attached tasks. - // E.g: 24*3600 for daily - // 7*24*3600 for weekly - Duration time.Duration - - // An integer to indicate the the weekday of the week. Please be noted that Sunday is 7. - // Use default value 0 to indicate weekday is not set. - // To support by weekly function. - Weekday int8 - - // OffsetTime is the execution time point of each turn - // It's a number to indicate the seconds offset to the 00:00 of UTC time. - OffsetTime int64 -} - -// AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope. -type AlternatePolicy struct { - // To sync the related operations. - *sync.RWMutex - - // Keep the attached tasks. - tasks task.Store - - // Policy configurations. - config *AlternatePolicyConfiguration - - // To indicated whether policy is enabled or not. - isEnabled bool - - // Channel used to send evaluation result signals. - evaluation chan bool - - // Channel used to notify policy termination. - done chan bool - - // Channel used to receive terminate signal. - terminator chan bool - - // Unique name of this policy to support multiple instances - name string -} - -// NewAlternatePolicy is constructor of creating AlternatePolicy. -// Accept name and configuration as parameters. -func NewAlternatePolicy(name string, config *AlternatePolicyConfiguration) *AlternatePolicy { - return &AlternatePolicy{ - RWMutex: new(sync.RWMutex), - tasks: task.NewDefaultStore(), - config: config, - isEnabled: false, - terminator: make(chan bool), - name: name, - } -} - -// GetConfig returns the current configuration options of this policy. -func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration { - return alp.config -} - -// Name is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Name() string { - return alp.name -} - -// Tasks is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Tasks() []task.Task { - return alp.tasks.GetTasks() -} - -// Done is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Done() <-chan bool { - return alp.done -} - -// AttachTasks is an implementation of same method in policy interface. -func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error { - if len(tasks) == 0 { - return errors.New("No tasks can be attached") - } - - alp.tasks.AddTasks(tasks...) - - return nil -} - -// Disable is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Disable() error { - alp.Lock() - if !alp.isEnabled { - alp.Unlock() - return fmt.Errorf("Instance of policy %s is not enabled", alp.Name()) - } - - // Set state to disabled - alp.isEnabled = false - alp.Unlock() - - // Stop the evaluation goroutine - alp.terminator <- true - - return nil -} - -// Evaluate is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) { - // Lock for state changing - defer alp.Unlock() - alp.Lock() - - // Check if configuration is valid - if !alp.isValidConfig() { - return nil, errors.New("Policy configuration is not valid") - } - - // Check if policy instance is still running - if alp.isEnabled { - return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name()) - } - - // Keep idempotent - if alp.evaluation != nil { - return alp.evaluation, nil - } - - alp.done = make(chan bool) - alp.evaluation = make(chan bool) - - go func() { - var ( - waitingTime int64 - ) - timeNow := time.Now().UTC() - - // Reach the execution time point? - // Weekday is set - if alp.config.Weekday > 0 { - targetWeekday := (alp.config.Weekday + 7) % 7 - currentWeekday := timeNow.Weekday() - weekdayDiff := (int)(targetWeekday - (int8)(currentWeekday)) - if weekdayDiff < 0 { - weekdayDiff += 7 - } - waitingTime = (int64)(weekdayDiff * oneDay) - } - - // Time - utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60) - diff := alp.config.OffsetTime - utcTime - if waitingTime > 0 { - waitingTime += diff - } else { - waitingTime = diff - if waitingTime < 0 { - waitingTime += oneDay - } - } - - // Let's wait for a while - if waitingTime > 0 { - // Wait for a while. - log.Infof("Waiting for %d seconds after comparing offset %d and utc time %d\n", diff, alp.config.OffsetTime, utcTime) - select { - case <-time.After(time.Duration(waitingTime) * time.Second): - case <-alp.terminator: - return - } - } - - // Trigger the first tick. - alp.evaluation <- true - - // Start the ticker for repeat checking. - tk := time.NewTicker(alp.config.Duration) - defer func() { - if tk != nil { - tk.Stop() - } - }() - - for { - select { - case <-tk.C: - if alp.IsEnabled() { - alp.evaluation <- true - } - case <-alp.terminator: - return - } - } - }() - - // Enabled - alp.isEnabled = true - - return alp.evaluation, nil -} - -// Equal is an implementation of same method in policy interface. -func (alp *AlternatePolicy) Equal(p Policy) bool { - if p == nil { - return false - } - - pl, ok := p.(*AlternatePolicy) - if !ok { - return false - } - - cfg := pl.GetConfig() - cfg2 := alp.GetConfig() - if (cfg == nil && cfg2 != nil) || (cfg != nil && cfg2 == nil) { - return false - } - - return cfg == nil || - (cfg.Duration == cfg2.Duration && - cfg.OffsetTime == cfg2.OffsetTime && - cfg.Weekday == cfg2.Weekday) -} - -// IsEnabled is an implementation of same method in policy interface. -func (alp *AlternatePolicy) IsEnabled() bool { - defer alp.RUnlock() - alp.RLock() - - return alp.isEnabled -} - -// Check if the config is valid. At least it should have the configurations for supporting daily policy. -func (alp *AlternatePolicy) isValidConfig() bool { - return alp.config != nil && alp.config.Duration > 0 && alp.config.OffsetTime >= 0 -} diff --git a/src/common/scheduler/policy/alternate_policy_test.go b/src/common/scheduler/policy/alternate_policy_test.go deleted file mode 100644 index 0492c3426..000000000 --- a/src/common/scheduler/policy/alternate_policy_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package policy - -import ( - "sync/atomic" - "testing" - "time" -) - -const ( - testPolicyName = "TestingPolicy" -) - -type fakeTask struct { - number int32 -} - -func (ft *fakeTask) Name() string { - return "for testing" -} - -func (ft *fakeTask) Run() error { - atomic.AddInt32(&(ft.number), 1) - return nil -} - -func (ft *fakeTask) Number() int32 { - return atomic.LoadInt32(&ft.number) -} - -func TestBasic(t *testing.T) { - tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{}) - err := tp.AttachTasks(&fakeTask{number: 100}) - if err != nil { - t.Fail() - } - - if tp.GetConfig() == nil { - t.Fatal("nil config") - } - - if tp.Name() != testPolicyName { - t.Fatalf("Wrong name %s", tp.Name()) - } - - tks := tp.Tasks() - if tks == nil || len(tks) != 1 { - t.Fail() - } - -} - -func TestEvaluatePolicy(t *testing.T) { - now := time.Now().UTC() - utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) - tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ - Duration: 1 * time.Second, - OffsetTime: utcOffset + 1, - }) - err := tp.AttachTasks(&fakeTask{number: 100}) - if err != nil { - t.Fail() - } - ch, _ := tp.Evaluate() - var counter int32 - - for i := 0; i < 3; i++ { - select { - case <-ch: - atomic.AddInt32(&counter, 1) - case <-time.After(2 * time.Second): - continue - } - } - - if atomic.LoadInt32(&counter) != 3 { - t.Fail() - } - - tp.Disable() -} - -func TestDisablePolicy(t *testing.T) { - now := time.Now().UTC() - utcOffset := (int64)(now.Hour()*3600 + now.Minute()*60) - tp := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ - Duration: 1 * time.Second, - OffsetTime: utcOffset + 1, - }) - err := tp.AttachTasks(&fakeTask{number: 100}) - if err != nil { - t.Fail() - } - ch, _ := tp.Evaluate() - var counter int32 - terminate := make(chan bool) - defer func() { - terminate <- true - }() - go func() { - for { - select { - case <-ch: - atomic.AddInt32(&counter, 1) - case <-terminate: - return - case <-time.After(6 * time.Second): - return - } - } - }() - time.Sleep(2 * time.Second) - if tp.Disable() != nil { - t.Fatal("Failed to disable policy") - } - // Waiting for everything is stable - <-time.After(1 * time.Second) - // Copy value - var copiedCounter int32 - atomic.StoreInt32(&copiedCounter, atomic.LoadInt32(&counter)) - time.Sleep(2 * time.Second) - if atomic.LoadInt32(&counter) != atomic.LoadInt32(&copiedCounter) { - t.Fatalf("Policy is still running after calling Disable() %d=%d", atomic.LoadInt32(&copiedCounter), atomic.LoadInt32(&counter)) - } -} - -func TestPolicyEqual(t *testing.T) { - tp1 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ - Duration: 1 * time.Second, - OffsetTime: 8000, - }) - - tp2 := NewAlternatePolicy(testPolicyName+"2", &AlternatePolicyConfiguration{ - Duration: 100 * time.Second, - OffsetTime: 8000, - }) - - if tp1.Equal(tp2) { - t.Fatal("tp1 should not equal tp2") - } - - tp3 := NewAlternatePolicy(testPolicyName, &AlternatePolicyConfiguration{ - Duration: 1 * time.Second, - OffsetTime: 8000, - }) - - if !tp1.Equal(tp3) { - t.Fatal("tp1 should equal tp3") - } -} diff --git a/src/common/scheduler/policy/policy.go b/src/common/scheduler/policy/policy.go deleted file mode 100644 index 6957d4ef0..000000000 --- a/src/common/scheduler/policy/policy.go +++ /dev/null @@ -1,48 +0,0 @@ -package policy - -import ( - "github.com/goharbor/harbor/src/common/scheduler/task" -) - -// Policy is an if-then logic to determine how the attached tasks should be -// executed based on the evaluation result of the defined conditions. -// E.g: -// Daily execute TASK between 2017/06/24 and 2018/06/23 -// Execute TASK at 2017/09/01 14:30:00 -// -// Each policy should have a name to identify itself. -// Please be aware that policy with no tasks will be treated as invalid. -// -type Policy interface { - // Name will return the name of the policy. - // If the policy supports multiple instances, please make sure the name is unique as an UUID. - Name() string - - // Tasks will return the attached tasks with this policy. - Tasks() []task.Task - - // AttachTasks is to attach tasks to this policy - AttachTasks(...task.Task) error - - // Done will setup a channel for other components to check whether or not - // the policy is completed. Possibly designed for the none loop policy. - Done() <-chan bool - - // Evaluate the policy based on its definition and return the result via - // result channel. Policy is enabled after it is evaluated. - // Make sure Evaluate is idempotent, that means one policy can be only enabled - // only once even if Evaluate is called more than one times. - Evaluate() (<-chan bool, error) - - // Disable the enabled policy and release all the allocated resources. - Disable() error - - // Equal will compare the two policies based on related factors if existing such as confgiuration etc. - // to determine whether the two policies are same ones or not. Please pay attention that, not every policy - // needs to support this method. If no need, please directly return false to indicate each policies are - // different. - Equal(p Policy) bool - - // IsEnabled is to indicate whether the policy is enabled or not (disabled). - IsEnabled() bool -} diff --git a/src/common/scheduler/policy/uuid.go b/src/common/scheduler/policy/uuid.go deleted file mode 100644 index 1f001e00b..000000000 --- a/src/common/scheduler/policy/uuid.go +++ /dev/null @@ -1,22 +0,0 @@ -package policy - -import ( - "crypto/rand" - "fmt" - "io" -) - -// NewUUID will generate a new UUID. -// Code copied from https://play.golang.org/p/4FkNSiUDMg -func newUUID() (string, error) { - uuid := make([]byte, 16) - n, err := io.ReadFull(rand.Reader, uuid) - if n != len(uuid) || err != nil { - return "", err - } - - uuid[8] = uuid[8]&^0xc0 | 0x80 - uuid[6] = uuid[6]&^0xf0 | 0x40 - - return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil -} diff --git a/src/common/scheduler/scheduler.go b/src/common/scheduler/scheduler.go deleted file mode 100644 index 311ac17ac..000000000 --- a/src/common/scheduler/scheduler.go +++ /dev/null @@ -1,311 +0,0 @@ -package scheduler - -import ( - "github.com/goharbor/harbor/src/common/scheduler/policy" - "github.com/goharbor/harbor/src/common/utils/log" - - "errors" - "fmt" - "reflect" - "strings" - "sync" - "time" -) - -const ( - defaultQueueSize = 10 - - statSchedulePolicy = "Schedule Policy" - statUnSchedulePolicy = "Unschedule Policy" - statTaskRun = "Task Run" - statTaskComplete = "Task Complete" - statTaskFail = "Task Fail" -) - -// StatItem is defined for the stat metrics. -type StatItem struct { - // Metrics catalog - Type string - - // The stat value - Value uint32 - - // Attach some other info - Attachment interface{} -} - -// StatSummary is used to collect some metrics of scheduler. -type StatSummary struct { - // Count of scheduled policy - PolicyCount uint32 - - // Total count of tasks - Tasks uint32 - - // Count of successfully complete tasks - CompletedTasks uint32 - - // Count of tasks with errors - TasksWithError uint32 -} - -// Configuration defines configuration of Scheduler. -type Configuration struct { - QueueSize uint8 -} - -// Scheduler is designed for scheduling policies. -type Scheduler struct { - // Mutex for sync controlling. - *sync.RWMutex - - // Related configuration options for scheduler. - config *Configuration - - // Store to keep the references of scheduled policies. - policies Store - - // Queue for receiving policy scheduling request - scheduleQueue chan *Watcher - - // Queue for receiving policy unscheduling request or complete signal. - unscheduleQueue chan *Watcher - - // Channel for receiving stat metrics. - statChan chan *StatItem - - // Channel for terminate scheduler damon. - terminateChan chan bool - - // The stat metrics of scheduler. - stats *StatSummary - - // To indicate whether scheduler is running or not - isRunning bool -} - -// DefaultScheduler is a default scheduler. -var DefaultScheduler = NewScheduler(nil) - -// NewScheduler is constructor for creating a scheduler. -func NewScheduler(config *Configuration) *Scheduler { - var qSize uint8 = defaultQueueSize - if config != nil && config.QueueSize > 0 { - qSize = config.QueueSize - } - - sq := make(chan *Watcher, qSize) - usq := make(chan *Watcher, qSize) - stChan := make(chan *StatItem, 4) - tc := make(chan bool, 1) - - store := NewDefaultStore() - return &Scheduler{ - RWMutex: new(sync.RWMutex), - config: config, - policies: store, - scheduleQueue: sq, - unscheduleQueue: usq, - statChan: stChan, - terminateChan: tc, - stats: &StatSummary{ - PolicyCount: 0, - Tasks: 0, - CompletedTasks: 0, - TasksWithError: 0, - }, - isRunning: false, - } -} - -// Start the scheduler damon. -func (sch *Scheduler) Start() { - sch.Lock() - defer sch.Unlock() - - // If scheduler is already running - if sch.isRunning { - return - } - - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Runtime error in scheduler:%s\n", r) - } - }() - defer func() { - // Clear resources - sch.policies.Clear() - log.Infof("Policy scheduler stop at %s\n", time.Now().UTC().Format(time.RFC3339)) - }() - for { - select { - case <-sch.terminateChan: - // Exit - return - case wt := <-sch.scheduleQueue: - // If status is stopped, no requests should be served - if !sch.IsRunning() { - continue - } - go func(watcher *Watcher) { - if watcher != nil && watcher.p != nil { - // Enable it. - watcher.Start() - - // Update stats and log info. - log.Infof("Policy %s is scheduled", watcher.p.Name()) - sch.statChan <- &StatItem{statSchedulePolicy, 1, nil} - } - }(wt) - case wt := <-sch.unscheduleQueue: - // If status is stopped, no requests should be served - if !sch.IsRunning() { - continue - } - go func(watcher *Watcher) { - if watcher != nil && watcher.IsRunning() { - watcher.Stop() - - // Update stats and log info. - log.Infof("Policy %s is unscheduled", watcher.p.Name()) - sch.statChan <- &StatItem{statUnSchedulePolicy, 1, nil} - } - }(wt) - case stat := <-sch.statChan: - { - // If status is stopped, no requests should be served - if !sch.IsRunning() { - continue - } - switch stat.Type { - case statSchedulePolicy: - sch.stats.PolicyCount += stat.Value - break - case statUnSchedulePolicy: - sch.stats.PolicyCount -= stat.Value - break - case statTaskRun: - sch.stats.Tasks += stat.Value - break - case statTaskComplete: - sch.stats.CompletedTasks += stat.Value - break - case statTaskFail: - sch.stats.TasksWithError += stat.Value - break - default: - break - } - log.Infof("Policies:%d, Tasks:%d, CompletedTasks:%d, FailedTasks:%d\n", - sch.stats.PolicyCount, - sch.stats.Tasks, - sch.stats.CompletedTasks, - sch.stats.TasksWithError) - - if stat.Attachment != nil && - reflect.TypeOf(stat.Attachment).String() == "*errors.errorString" { - log.Errorf("%s: %s\n", stat.Type, stat.Attachment.(error).Error()) - } - } - - } - } - }() - - sch.isRunning = true - log.Infof("Policy scheduler start at %s\n", time.Now().UTC().Format(time.RFC3339)) -} - -// Stop the scheduler damon. -func (sch *Scheduler) Stop() { - // Lock for state changing - sch.Lock() - - // Check if the scheduler is running - if !sch.isRunning { - sch.Unlock() - return - } - - sch.isRunning = false - sch.Unlock() - - // Terminate damon to stop receiving signals. - sch.terminateChan <- true -} - -// Schedule and enable the policy. -func (sch *Scheduler) Schedule(scheduledPolicy policy.Policy) error { - if scheduledPolicy == nil { - return errors.New("nil is not Policy object") - } - - if strings.TrimSpace(scheduledPolicy.Name()) == "" { - return errors.New("Policy should be assigned a name") - } - - tasks := scheduledPolicy.Tasks() - if tasks == nil || len(tasks) == 0 { - return errors.New("Policy must attach task(s)") - } - - // Try to schedule the policy. - // Keep the policy for future use after it's successfully scheduled. - watcher := NewWatcher(scheduledPolicy, sch.statChan, sch.unscheduleQueue) - if err := sch.policies.Put(scheduledPolicy.Name(), watcher); err != nil { - return err - } - - // Schedule the policy - sch.scheduleQueue <- watcher - - return nil -} - -// UnSchedule the specified policy from the enabled policies list. -func (sch *Scheduler) UnSchedule(policyName string) error { - if strings.TrimSpace(policyName) == "" { - return errors.New("Empty policy name is invalid") - } - - // Find the watcher. - watcher := sch.policies.Remove(policyName) - if watcher == nil { - return fmt.Errorf("Policy %s is not existing", policyName) - } - - // Unschedule the policy. - sch.unscheduleQueue <- watcher - - return nil -} - -// IsRunning to indicate whether the scheduler is running. -func (sch *Scheduler) IsRunning() bool { - sch.RLock() - defer sch.RUnlock() - - return sch.isRunning -} - -// HasScheduled is to check whether the given policy has been scheduled or not. -func (sch *Scheduler) HasScheduled(policyName string) bool { - return sch.policies.Exists(policyName) -} - -// GetPolicy is used to get related policy reference by its name. -func (sch *Scheduler) GetPolicy(policyName string) policy.Policy { - wk := sch.policies.Get(policyName) - if wk != nil { - return wk.p - } - - return nil -} - -// PolicyCount returns the count of currently scheduled policies in the scheduler. -func (sch *Scheduler) PolicyCount() uint32 { - return sch.policies.Size() -} diff --git a/src/common/scheduler/scheduler_store.go b/src/common/scheduler/scheduler_store.go deleted file mode 100644 index 08c31b46c..000000000 --- a/src/common/scheduler/scheduler_store.go +++ /dev/null @@ -1,145 +0,0 @@ -package scheduler - -import ( - "errors" - "fmt" - "strings" - "sync" -) - -// Store define the basic operations for storing and managing policy watcher. -type Store interface { - // Put a new policy in. - Put(key string, value *Watcher) error - - // Get the corresponding policy with the key. - Get(key string) *Watcher - - // Exists is to check if the key existing in the store. - Exists(key string) bool - - // Remove the specified policy and return its reference. - Remove(key string) *Watcher - - // Size return the total count of items in store. - Size() uint32 - - // GetAll is to get all the items in the store. - GetAll() []*Watcher - - // Clear store. - Clear() -} - -// DefaultStore implements Store interface to keep the scheduled policies. -// Not support concurrent sync. -type DefaultStore struct { - // Support sync locking - *sync.RWMutex - - // Map used to keep the policy list. - data map[string]*Watcher -} - -// NewDefaultStore is used to create a new store and return the pointer reference. -func NewDefaultStore() *DefaultStore { - return &DefaultStore{new(sync.RWMutex), make(map[string]*Watcher)} -} - -// Put a policy into store. -func (cs *DefaultStore) Put(key string, value *Watcher) error { - if strings.TrimSpace(key) == "" || value == nil { - return errors.New("Bad arguments") - } - - cs.Lock() - defer cs.Unlock() - - if _, ok := cs.data[key]; ok { - return fmt.Errorf("Duplicayed policy with name %s", key) - } - - cs.data[key] = value - - return nil -} - -// Get policy via key. -func (cs *DefaultStore) Get(key string) *Watcher { - if strings.TrimSpace(key) == "" { - return nil - } - - cs.RLock() - defer cs.RUnlock() - - return cs.data[key] -} - -// Exists is used to check whether or not the key exists in store. -func (cs *DefaultStore) Exists(key string) bool { - if strings.TrimSpace(key) == "" { - return false - } - - cs.RLock() - defer cs.RUnlock() - - _, ok := cs.data[key] - - return ok -} - -// Remove is to delete the specified policy. -func (cs *DefaultStore) Remove(key string) *Watcher { - if strings.TrimSpace(key) == "" { - return nil - } - - cs.Lock() - defer cs.Unlock() - - if wt, ok := cs.data[key]; ok { - delete(cs.data, key) - return wt - } - - return nil -} - -// Size return the total count of items in store. -func (cs *DefaultStore) Size() uint32 { - cs.RLock() - defer cs.RUnlock() - - return (uint32)(len(cs.data)) -} - -// GetAll to get all the items of store. -func (cs *DefaultStore) GetAll() []*Watcher { - cs.RLock() - defer cs.RUnlock() - - all := []*Watcher{} - - for _, v := range cs.data { - all = append(all, v) - } - - return all -} - -// Clear all the items in store. -func (cs *DefaultStore) Clear() { - cs.Lock() - defer cs.Unlock() - - if (uint32)(len(cs.data)) == 0 { - return - } - - for k, v := range cs.data { - delete(cs.data, k) - v.Stop() - } -} diff --git a/src/common/scheduler/scheduler_store_test.go b/src/common/scheduler/scheduler_store_test.go deleted file mode 100644 index 75aa38bd4..000000000 --- a/src/common/scheduler/scheduler_store_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package scheduler - -import ( - "testing" -) - -func TestPut(t *testing.T) { - store := NewDefaultStore() - if store == nil { - t.Fatal("Failed to creat store instance") - } - - store.Put("testing", NewWatcher(nil, nil, nil)) - if store.Size() != 1 { - t.Fail() - } -} - -func TestGet(t *testing.T) { - store := NewDefaultStore() - if store == nil { - t.Fatal("Failed to creat store instance") - } - store.Put("testing", NewWatcher(nil, nil, nil)) - w := store.Get("testing") - if w == nil { - t.Fail() - } -} - -func TestRemove(t *testing.T) { - store := NewDefaultStore() - if store == nil { - t.Fatal("Failed to creat store instance") - } - store.Put("testing", NewWatcher(nil, nil, nil)) - if !store.Exists("testing") { - t.Fail() - } - w := store.Remove("testing") - if w == nil { - t.Fail() - } -} - -func TestExisting(t *testing.T) { - store := NewDefaultStore() - if store == nil { - t.Fatal("Failed to creat store instance") - } - store.Put("testing", NewWatcher(nil, nil, nil)) - if !store.Exists("testing") { - t.Fail() - } - if store.Exists("fake_key") { - t.Fail() - } -} - -func TestGetAll(t *testing.T) { - store := NewDefaultStore() - if store == nil { - t.Fatal("Failed to creat store instance") - } - store.Put("testing", NewWatcher(nil, nil, nil)) - store.Put("testing2", NewWatcher(nil, nil, nil)) - list := store.GetAll() - if list == nil || len(list) != 2 { - t.Fail() - } -} diff --git a/src/common/scheduler/scheduler_test.go b/src/common/scheduler/scheduler_test.go deleted file mode 100644 index 63109728a..000000000 --- a/src/common/scheduler/scheduler_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package scheduler - -import ( - "sync/atomic" - "testing" - "time" - - "github.com/goharbor/harbor/src/common/scheduler/policy" - "github.com/goharbor/harbor/src/common/scheduler/task" -) - -type fakePolicy struct { - tasks []task.Task - done chan bool - evaluation chan bool - terminate chan bool - ticker *time.Ticker -} - -func (fp *fakePolicy) Name() string { - return "testing policy" -} - -func (fp *fakePolicy) Tasks() []task.Task { - return fp.tasks -} - -func (fp *fakePolicy) AttachTasks(tasks ...task.Task) error { - fp.tasks = append(fp.tasks, tasks...) - return nil -} - -func (fp *fakePolicy) Done() <-chan bool { - return fp.done -} - -func (fp *fakePolicy) Evaluate() (<-chan bool, error) { - fp.evaluation = make(chan bool, 1) - fp.done = make(chan bool) - fp.terminate = make(chan bool) - - fp.evaluation <- true - go func() { - fp.ticker = time.NewTicker(1 * time.Second) - for { - select { - case <-fp.terminate: - return - case <-fp.ticker.C: - fp.evaluation <- true - } - } - }() - return fp.evaluation, nil -} - -func (fp *fakePolicy) Disable() error { - if fp.ticker != nil { - fp.ticker.Stop() - } - - fp.terminate <- true - return nil -} - -func (fp *fakePolicy) Equal(policy.Policy) bool { - return false -} - -func (fp *fakePolicy) IsEnabled() bool { - return true -} - -type fakeTask struct { - number int32 -} - -func (ft *fakeTask) Name() string { - return "for testing" -} - -func (ft *fakeTask) Run() error { - atomic.AddInt32(&(ft.number), 1) - return nil -} - -func (ft *fakeTask) Number() int32 { - return atomic.LoadInt32(&(ft.number)) -} - -// Wacher will be tested together with scheduler. -func TestScheduler(t *testing.T) { - DefaultScheduler.Start() - if DefaultScheduler.policies.Size() != 0 { - t.Fail() - } - - if DefaultScheduler.stats.PolicyCount != 0 { - t.Fail() - } - - if !DefaultScheduler.IsRunning() { - t.Fatal("Scheduler is not started") - } - - fp := &fakePolicy{ - tasks: []task.Task{}, - } - fk := &fakeTask{number: 100} - fp.AttachTasks(fk) - - if DefaultScheduler.Schedule(fp) != nil { - t.Fatal("Schedule policy failed") - } - if DefaultScheduler.policies.Size() == 0 { - t.Fatal("No policy in the store after calling Schedule()") - } - if DefaultScheduler.GetPolicy(fp.Name()) == nil { - t.Fatal("Failed to get poicy by name") - } - - time.Sleep(2 * time.Second) - if fk.Number() == 100 { - t.Fatal("Task is not triggered") - } - - if DefaultScheduler.UnSchedule(fp.Name()) != nil { - t.Fatal("Unschedule policy failed") - } - - if DefaultScheduler.PolicyCount() != 0 { - t.Fatal("Policy count does not match after calling UnSchedule()") - } - - var copiedValue int32 - <-time.After(1 * time.Second) - atomic.StoreInt32(&copiedValue, fk.Number()) - <-time.After(2 * time.Second) - - if atomic.LoadInt32(&copiedValue) != fk.Number() { - t.Fatalf("Policy is still enabled after calling UnSchedule(),%d=%d", atomic.LoadInt32(&copiedValue), fk.Number()) - } - - DefaultScheduler.Stop() - <-time.After(1 * time.Second) - if DefaultScheduler.PolicyCount() != 0 || DefaultScheduler.IsRunning() { - t.Fatal("Scheduler is still running after stopping") - } -} diff --git a/src/common/scheduler/task/replication/replication_task.go b/src/common/scheduler/task/replication/replication_task.go deleted file mode 100644 index d70c7d92a..000000000 --- a/src/common/scheduler/task/replication/replication_task.go +++ /dev/null @@ -1,31 +0,0 @@ -package replication - -import ( - "github.com/goharbor/harbor/src/core/notifier" - "github.com/goharbor/harbor/src/replication/event/notification" - "github.com/goharbor/harbor/src/replication/event/topic" -) - -// Task is the task for triggering one replication -type Task struct { - PolicyID int64 -} - -// NewTask is constructor of creating ReplicationTask -func NewTask(policyID int64) *Task { - return &Task{ - PolicyID: policyID, - } -} - -// Name returns the name of this task -func (t *Task) Name() string { - return "replication" -} - -// Run the actions here -func (t *Task) Run() error { - return notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{ - PolicyID: t.PolicyID, - }) -} diff --git a/src/common/scheduler/task/replication/replication_task_test.go b/src/common/scheduler/task/replication/replication_task_test.go deleted file mode 100644 index a914b46f9..000000000 --- a/src/common/scheduler/task/replication/replication_task_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package replication - -import "testing" - -func TestTask(t *testing.T) { - tk := NewTask(1) - if tk == nil { - t.Fail() - } - - if tk.Name() != "replication" { - t.Fail() - } -} diff --git a/src/common/scheduler/task/scan_all_task.go b/src/common/scheduler/task/scan_all_task.go deleted file mode 100644 index 11837d0c2..000000000 --- a/src/common/scheduler/task/scan_all_task.go +++ /dev/null @@ -1,23 +0,0 @@ -package task - -import ( - "github.com/goharbor/harbor/src/core/utils" -) - -// ScanAllTask is task of scanning all tags. -type ScanAllTask struct{} - -// NewScanAllTask is constructor of creating ScanAllTask. -func NewScanAllTask() *ScanAllTask { - return &ScanAllTask{} -} - -// Name returns the name of the task. -func (sat *ScanAllTask) Name() string { - return "scan all" -} - -// Run the actions. -func (sat *ScanAllTask) Run() error { - return utils.ScanAllImages() -} diff --git a/src/common/scheduler/task/scan_all_task_test.go b/src/common/scheduler/task/scan_all_task_test.go deleted file mode 100644 index b7482fbfc..000000000 --- a/src/common/scheduler/task/scan_all_task_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package task - -import ( - "testing" -) - -func TestScanAllTask(t *testing.T) { - tk := NewScanAllTask() - if tk == nil { - t.Fail() - } - - if tk.Name() != "scan all" { - t.Fail() - } -} diff --git a/src/common/scheduler/task/task.go b/src/common/scheduler/task/task.go deleted file mode 100644 index d5eb189b9..000000000 --- a/src/common/scheduler/task/task.go +++ /dev/null @@ -1,10 +0,0 @@ -package task - -// Task is used to synchronously run specific action(s). -type Task interface { - // Name should return the name of the task. - Name() string - - // Run the concrete code here - Run() error -} diff --git a/src/common/scheduler/task/task_list.go b/src/common/scheduler/task/task_list.go deleted file mode 100644 index d46095b26..000000000 --- a/src/common/scheduler/task/task_list.go +++ /dev/null @@ -1,55 +0,0 @@ -package task - -import ( - "sync" -) - -// Store is designed to keep the tasks. -type Store interface { - // GetTasks return the current existing list in store. - GetTasks() []Task - - // AddTasks is used to append tasks to the list. - AddTasks(tasks ...Task) -} - -// DefaultStore is the default implemetation of Store interface. -type DefaultStore struct { - // To sync the related operations. - *sync.RWMutex - - // The space to keep the tasks. - tasks []Task -} - -// NewDefaultStore is constructor method for DefaultStore. -func NewDefaultStore() *DefaultStore { - return &DefaultStore{new(sync.RWMutex), []Task{}} -} - -// GetTasks implements the same method in Store interface. -func (ds *DefaultStore) GetTasks() []Task { - copyList := []Task{} - - ds.RLock() - defer ds.RUnlock() - - if ds.tasks != nil && len(ds.tasks) > 0 { - copyList = append(copyList, ds.tasks...) - } - - return copyList -} - -// AddTasks implements the same method in Store interface. -func (ds *DefaultStore) AddTasks(tasks ...Task) { - // Double confirm. - if ds.tasks == nil { - ds.tasks = []Task{} - } - - ds.Lock() - defer ds.Unlock() - - ds.tasks = append(ds.tasks, tasks...) -} diff --git a/src/common/scheduler/task/task_list_test.go b/src/common/scheduler/task/task_list_test.go deleted file mode 100644 index a874ab255..000000000 --- a/src/common/scheduler/task/task_list_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package task - -import ( - "sync/atomic" - "testing" - "time" -) - -func TestTaskList(t *testing.T) { - ds := NewDefaultStore() - if ds.tasks == nil { - t.Fatal("Failed to create store") - } - - go func() { - var count int32 - for { - ds.AddTasks(NewScanAllTask()) - atomic.AddInt32(&count, 1) - time.Sleep(100 * time.Millisecond) - if atomic.LoadInt32(&count) > 9 { - return - } - } - }() - go func() { - var count int32 - for { - ds.GetTasks() - atomic.AddInt32(&count, 1) - time.Sleep(100 * time.Millisecond) - if atomic.LoadInt32(&count) > 8 { - return - } - } - }() - - <-time.After(2 * time.Second) - - var taskCount int32 - atomic.StoreInt32(&taskCount, (int32)(len(ds.GetTasks()))) - - if atomic.LoadInt32(&taskCount) != 10 { - t.Fatalf("Expect %d tasks but got %d", 10, atomic.LoadInt32(&taskCount)) - } -} diff --git a/src/common/scheduler/watcher.go b/src/common/scheduler/watcher.go deleted file mode 100644 index 36223fab0..000000000 --- a/src/common/scheduler/watcher.go +++ /dev/null @@ -1,162 +0,0 @@ -package scheduler - -import ( - "github.com/goharbor/harbor/src/common/scheduler/policy" - "github.com/goharbor/harbor/src/common/scheduler/task" - "github.com/goharbor/harbor/src/common/utils/log" - - "fmt" - "sync" -) - -// Watcher is an asynchronous runner to provide an evaluation environment for the policy. -type Watcher struct { - // Locker to sync related operations. - *sync.RWMutex - - // The target policy. - p policy.Policy - - // The channel for receive stop signal. - cmdChan chan bool - - // Indicate whether the watcher is started and running. - isRunning bool - - // Report stats to scheduler. - stats chan *StatItem - - // If policy is automatically completed, report the policy to scheduler. - doneChan chan *Watcher -} - -// NewWatcher is used as a constructor. -func NewWatcher(p policy.Policy, st chan *StatItem, done chan *Watcher) *Watcher { - return &Watcher{ - RWMutex: new(sync.RWMutex), - p: p, - cmdChan: make(chan bool), - isRunning: false, - stats: st, - doneChan: done, - } -} - -// Start the running. -func (wc *Watcher) Start() { - // Lock for state changing - wc.Lock() - defer wc.Unlock() - - if wc.isRunning { - return - } - - if wc.p == nil { - return - } - - go func(pl policy.Policy) { - defer func() { - if r := recover(); r != nil { - log.Errorf("Runtime error in watcher:%s\n", r) - } - }() - - evalChan, err := pl.Evaluate() - if err != nil { - log.Errorf("Failed to evaluate ploicy %s with error: %s\n", pl.Name(), err.Error()) - return - } - done := pl.Done() - - for { - select { - case <-evalChan: - { - // If worker is not running, should not response any requests. - if !wc.IsRunning() { - continue - } - - log.Infof("Receive evaluation signal from policy '%s'\n", pl.Name()) - // Start to run the attached tasks. - for _, t := range pl.Tasks() { - go func(tk task.Task) { - defer func() { - if r := recover(); r != nil { - st := &StatItem{statTaskFail, 1, fmt.Errorf("Runtime error in task execution:%s", r)} - if wc.stats != nil { - wc.stats <- st - } - } - }() - err := tk.Run() - - // Report task execution stats. - st := &StatItem{statTaskComplete, 1, err} - if err != nil { - st.Type = statTaskFail - } - if wc.stats != nil { - wc.stats <- st - } - }(t) - - // Report task run stats. - st := &StatItem{statTaskRun, 1, nil} - if wc.stats != nil { - wc.stats <- st - } - } - } - case <-done: - { - // Policy is automatically completed. - // Report policy change stats. - if wc.doneChan != nil { - wc.doneChan <- wc - } - - return - } - case <-wc.cmdChan: - // Exit goroutine. - return - } - } - }(wc.p) - - wc.isRunning = true -} - -// Stop the running. -func (wc *Watcher) Stop() { - // Lock for state changing - wc.Lock() - if !wc.isRunning { - wc.Unlock() - return - } - - wc.isRunning = false - wc.Unlock() - - // Disable policy. - if wc.p != nil { - wc.p.Disable() - } - - // Stop watcher. - wc.cmdChan <- true - - log.Infof("Worker for policy %s is stopped.\n", wc.p.Name()) -} - -// IsRunning to indicate if the watcher is still running. -func (wc *Watcher) IsRunning() bool { - wc.RLock() - defer wc.RUnlock() - - return wc.isRunning -}