refine or.processor implementation and also add UT case

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2019-07-09 19:00:35 +08:00
parent 3409065438
commit 185f4f3861
13 changed files with 183 additions and 31 deletions

View File

@ -32,11 +32,11 @@ func (pj *Job) ShouldRetry() bool {
}
// Validate the parameters
func (pj *Job) Validate(params Parameters) error {
func (pj *Job) Validate(params job.Parameters) error {
return nil
}
// Run the job
func (pj *Job) Run(ctx job.Context, params Parameters) error {
func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
return nil
}

View File

@ -16,7 +16,7 @@ package retention
import "time"
// Retention execution
// Execution of retention
type Execution struct {
ID string `json:"id"`
PolicyID string `json:"policy_id"`
@ -25,7 +25,7 @@ type Execution struct {
Status string `json:"status"`
}
// Retention history
// History of retention
type History struct {
ExecutionID string `json:"execution_id"`
Rule struct {

View File

@ -34,11 +34,11 @@ func (pj *PeriodicJob) ShouldRetry() bool {
}
// Validate the parameters
func (pj *PeriodicJob) Validate(params Parameters) error {
func (pj *PeriodicJob) Validate(params job.Parameters) error {
return nil
}
// Run the job
func (pj *PeriodicJob) Run(ctx job.Context, params Parameters) error {
func (pj *PeriodicJob) Run(ctx job.Context, params job.Parameters) error {
return ctx.Checkin(fmt.Sprintf("pong=%d", time.Now().Unix()))
}

View File

@ -44,7 +44,16 @@ type retainAction struct {
// Perform the action
func (ra *retainAction) Perform(candidates []*res.Candidate) ([]*res.Result, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
results := make([]*res.Result, 0)
for _, c := range candidates {
results = append(results, &res.Result{
Target: c,
})
}
return results, nil
}
// NewRetainAction is factory method for RetainAction

View File

@ -52,7 +52,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
// collect errors by wrapping
err error
// collect processed candidates
processedCandidates = make(map[string][]*res.Candidate)
processedCandidates = make(map[string]cHash)
)
// for sync
@ -67,30 +67,36 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
// control chan
done := make(chan bool, 1)
defer func() {
// signal the result listener loop exit
done <- true
}()
// go routine for receiving results/error
go func() {
defer func() {
// done
done <- true
}()
for {
select {
case result := <-resChan:
if _, ok := processedCandidates[result.action]; !ok {
processedCandidates[result.action] = make([]*res.Candidate, 0)
if result == nil {
// chan is closed
return
}
processedCandidates[result.action] = append(processedCandidates[result.action], result.processed...)
if _, ok := processedCandidates[result.action]; !ok {
processedCandidates[result.action] = make(cHash)
}
listByAction := processedCandidates[result.action]
for _, rp := range result.processed {
// remove duplicated ones
listByAction[rp.Hash()] = rp
}
case e := <-errChan:
if err == nil {
err = errors.Wrap(e, "artifact processing error")
} else {
err = errors.Wrap(e, err.Error())
}
case <-done:
// exit
return
}
}
}()
@ -143,6 +149,10 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
// waiting for all the rules are evaluated
wg.Wait()
// close result chan
close(resChan)
// check if the receiving loop exists
<-done
if err != nil {
return nil, err
@ -150,11 +160,13 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
results := make([]*res.Result, 0)
// Perform actions
for act, candidates := range processedCandidates {
for act, hash := range processedCandidates {
var attachedErr error
cl := hash.toList()
if pf, ok := p.performers[act]; ok {
if theRes, err := pf.Perform(candidates); err != nil {
if theRes, err := pf.Perform(cl); err != nil {
attachedErr = err
} else {
results = append(results, theRes...)
@ -164,7 +176,7 @@ func (p *processor) Process(artifacts []*res.Candidate) ([]*res.Result, error) {
}
if attachedErr != nil {
for _, c := range candidates {
for _, c := range cl {
results = append(results, &res.Result{
Target: c,
Error: attachedErr,
@ -187,3 +199,15 @@ func (p *processor) AddEvaluator(evaluator rule.Evaluator, selectors []res.Selec
func (p *processor) AddActionPerformer(action string, performer action.Performer) {
p.performers[action] = performer
}
type cHash map[string]*res.Candidate
func (ch cHash) toList() []*res.Candidate {
l := make([]*res.Candidate, 0)
for _, v := range ch {
l = append(l, v)
}
return l
}

View File

@ -0,0 +1,103 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package or
import (
"github.com/goharbor/harbor/src/pkg/retention/policy/action"
"github.com/goharbor/harbor/src/pkg/retention/policy/alg"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule/lastx"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule/latestk"
"github.com/goharbor/harbor/src/pkg/retention/res"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/label"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/regexp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)
// ProcessorTestSuite is suite for testing processor
type ProcessorTestSuite struct {
suite.Suite
p alg.Processor
all []*res.Candidate
}
// TestProcessor is entrance for ProcessorTestSuite
func TestProcessor(t *testing.T) {
suite.Run(t, new(ProcessorTestSuite))
}
// SetupSuite ...
func (suite *ProcessorTestSuite) SetupSuite() {
suite.all = []*res.Candidate{
{
Namespace: "library",
Repository: "harbor",
Kind: "image",
Tag: "latest",
PushedTime: time.Now().Unix(),
Labels: []string{"L1", "L2"},
},
{
Namespace: "library",
Repository: "harbor",
Kind: "image",
Tag: "dev",
PushedTime: time.Now().Unix(),
Labels: []string{"L3"},
},
}
p := New()
p.AddActionPerformer(action.Retain, action.NewRetainAction(suite.all))
lastxParams := make(map[string]rule.Parameter)
lastxParams[lastx.ParameterX] = 10
p.AddEvaluator(lastx.New(lastxParams), []res.Selector{
regexp.New(regexp.Matches, "*dev*"),
label.New(label.With, "L1,L2"),
})
latestKParams := make(map[string]rule.Parameter)
latestKParams[latestk.ParameterK] = 10
p.AddEvaluator(latestk.New(latestKParams), []res.Selector{
label.New(label.With, "L3"),
})
suite.p = p
}
// TearDownSuite ...
func (suite *ProcessorTestSuite) TearDownSuite() {}
// TestProcess tests process method
func (suite *ProcessorTestSuite) TestProcess() {
results, err := suite.p.Process(suite.all)
require.NoError(suite.T(), err)
assert.Equal(suite.T(), 2, len(results))
assert.Condition(suite.T(), func() bool {
for _, r := range results {
if r.Error != nil {
return false
}
}
return true
}, "no errors in the returned result list")
}

View File

@ -32,5 +32,5 @@ type Evaluator interface {
Action() string
}
// RuleFactory defines a factory method for creating rule evaluator
type RuleFactory func(parameters Parameters) Evaluator
// Factory defines a factory method for creating rule evaluator
type Factory func(parameters Parameters) Evaluator

View File

@ -37,7 +37,7 @@ type IndexMeta struct {
type IndexedParam struct {
Name string `json:"name"`
//Type of the param
// Type of the param
// "int", "string" or "[]string"
Type string `json:"type"`
@ -50,11 +50,11 @@ type IndexedParam struct {
type indexedItem struct {
Meta *IndexMeta
Factory RuleFactory
Factory Factory
}
// Register the rule evaluator with the corresponding rule template
func Register(meta *IndexMeta, factory RuleFactory) {
func Register(meta *IndexMeta, factory Factory) {
if meta == nil || factory == nil || len(meta.TemplateID) == 0 {
// do nothing
return

View File

@ -38,7 +38,8 @@ type evaluator struct {
// Process the candidates based on the rule definition
func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}
// Specify what action is performed to the candidates processed by this evaluator

View File

@ -38,7 +38,8 @@ type evaluator struct {
// Process the candidates based on the rule definition
func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}
// Specify what action is performed to the candidates processed by this evaluator

View File

@ -14,6 +14,11 @@
package res
import (
"encoding/base64"
"fmt"
)
const (
// Image kind
Image = "image"
@ -37,3 +42,10 @@ type Candidate struct {
// Labels attached with the candidate
Labels []string
}
// Hash code based on the candidate info for differentiation
func (c *Candidate) Hash() string {
raw := fmt.Sprintf("%s:%s/%s:%s", c.Kind, c.Namespace, c.Repository, c.Tag)
return base64.StdEncoding.EncodeToString([]byte(raw))
}

View File

@ -40,7 +40,8 @@ type selector struct {
// Select candidates by regular expressions
func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}
// New is factory method for list selector

View File

@ -39,7 +39,8 @@ type selector struct {
// Select candidates by regular expressions
func (s *selector) Select(artifacts []*res.Candidate) ([]*res.Candidate, error) {
return nil, nil
// TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION
return artifacts, nil
}
// New is factory method for regexp selector