Merge pull request #8258 from steven-zou/feature/tag_retention_job

implement retention job
This commit is contained in:
Steven Zou 2019-07-10 14:31:04 +08:00 committed by GitHub
commit 1575d90523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 41 deletions

View File

@ -21,12 +21,12 @@ type Client interface {
// Get the tag candidates under the repository // Get the tag candidates under the repository
// //
// Arguments: // Arguments:
// repo string : name of the repository with namespace // repo *res.Repository : repository info
// //
// Returns: // Returns:
// []*res.Candidate : candidates returned // []*res.Candidate : candidates returned
// error : common error if any errors occurred // error : common error if any errors occurred
GetCandidates(repo string) ([]*res.Candidate, error) GetCandidates(repo *res.Repository) ([]*res.Candidate, error)
// Delete the specified candidate // Delete the specified candidate
// //
@ -47,8 +47,10 @@ func New() Client {
type basicClient struct{} type basicClient struct{}
// GetCandidates gets the tag candidates under the repository // GetCandidates gets the tag candidates under the repository
func (bc *basicClient) GetCandidates(repo string) ([]*res.Candidate, error) { func (bc *basicClient) GetCandidates(repo *res.Repository) ([]*res.Candidate, error) {
return nil, nil results := make([]*res.Candidate, 0)
return results, nil
} }
// Deletes the specified candidate // Deletes the specified candidate

View File

@ -15,11 +15,26 @@
package retention package retention
import ( import (
"encoding/json"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/res"
"github.com/pkg/errors"
)
const (
// ParamRepo ...
ParamRepo = "repository"
// ParamMeta ...
ParamMeta = "liteMeta"
) )
// Job of running retention process // Job of running retention process
type Job struct{} type Job struct {
// client used to talk to core
client Client
}
// MaxFails of the job // MaxFails of the job
func (pj *Job) MaxFails() uint { func (pj *Job) MaxFails() uint {
@ -33,10 +48,89 @@ func (pj *Job) ShouldRetry() bool {
// Validate the parameters // Validate the parameters
func (pj *Job) Validate(params job.Parameters) error { func (pj *Job) Validate(params job.Parameters) error {
if _, err := getParamRepo(params); err != nil {
return err
}
if _, err := getParamMeta(params); err != nil {
return err
}
return nil return nil
} }
// Run the job // Run the job
func (pj *Job) Run(ctx job.Context, params job.Parameters) error { func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
// logger for logging
myLogger := ctx.GetLogger()
// Parameters have been validated, ignore error checking
repo, _ := getParamRepo(params)
liteMeta, _ := getParamMeta(params)
// Retrieve all the candidates under the specified repository
allCandidates, err := pj.client.GetCandidates(repo)
if err != nil {
return logError(myLogger, err)
}
// Build the processor
builder := policy.NewBuilder(allCandidates)
processor, err := builder.Build(liteMeta)
if err != nil {
return logError(myLogger, err)
}
// Run the flow
results, err := processor.Process(allCandidates)
if err != nil {
return logError(myLogger, err)
}
// Check in the results
bytes, err := json.Marshal(results)
if err != nil {
return logError(myLogger, err)
}
if err := ctx.Checkin(string(bytes)); err != nil {
return logError(myLogger, err)
}
return nil return nil
} }
func logError(logger logger.Interface, err error) error {
wrappedErr := errors.Wrap(err, "retention job")
logger.Error(wrappedErr)
return wrappedErr
}
func getParamRepo(params job.Parameters) (*res.Repository, error) {
v, ok := params[ParamRepo]
if !ok {
return nil, errors.Errorf("missing parameter: %s", ParamRepo)
}
repo, ok := v.(*res.Repository)
if !ok {
return nil, errors.Errorf("invalid parameter: %s", ParamRepo)
}
return repo, nil
}
func getParamMeta(params job.Parameters) (*policy.LiteMeta, error) {
v, ok := params[ParamMeta]
if !ok {
return nil, errors.Errorf("missing parameter: %s", ParamMeta)
}
meta, ok := v.(*policy.LiteMeta)
if !ok {
return nil, errors.Errorf("invalid parameter: %s", ParamMeta)
}
return meta, nil
}

View File

@ -29,12 +29,12 @@ type Builder interface {
// Builds runnable processor // Builds runnable processor
// //
// Arguments: // Arguments:
// rawPolicy string : the simple retention policy with JSON format // policy *LiteMeta : the simple metadata of retention policy
// //
// Returns: // Returns:
// Processor : a processor implementation to process the candidates // Processor : a processor implementation to process the candidates
// error : common error object if any errors occurred // error : common error object if any errors occurred
Build(rawPolicy string) (alg.Processor, error) Build(policy *LiteMeta) (alg.Processor, error)
} }
// NewBuilder news a basic builder // NewBuilder news a basic builder
@ -50,22 +50,16 @@ type basicBuilder struct {
} }
// Build policy processor from the raw policy // Build policy processor from the raw policy
func (bb *basicBuilder) Build(rawPolicy string) (alg.Processor, error) { func (bb *basicBuilder) Build(policy *LiteMeta) (alg.Processor, error) {
if len(rawPolicy) == 0 { if policy == nil {
return nil, errors.New("empty raw policy to build processor") return nil, errors.New("nil policy to build processor")
} }
// Decode metadata switch policy.Algorithm {
liteMeta := &LiteMeta{}
if err := liteMeta.Decode(rawPolicy); err != nil {
return nil, errors.Wrap(err, "build policy processor")
}
switch liteMeta.Algorithm {
case AlgorithmOR: case AlgorithmOR:
// New OR processor // New OR processor
p := or.New() p := or.New()
for _, r := range liteMeta.Rules { for _, r := range policy.Rules {
evaluator, err := rule.Get(r.Template, r.Parameters) evaluator, err := rule.Get(r.Template, r.Parameters)
if err != nil { if err != nil {
return nil, err return nil, err
@ -94,5 +88,5 @@ func (bb *basicBuilder) Build(rawPolicy string) (alg.Processor, error) {
default: default:
} }
return nil, errors.Errorf("algorithm %s is not supported", liteMeta.Algorithm) return nil, errors.Errorf("algorithm %s is not supported", policy.Algorithm)
} }

View File

@ -15,9 +15,7 @@
package policy package policy
import ( import (
"encoding/json"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule" "github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/pkg/errors"
) )
const ( const (
@ -82,21 +80,3 @@ type LiteMeta struct {
// Rule collection // Rule collection
Rules []rule.Metadata `json:"rules"` Rules []rule.Metadata `json:"rules"`
} }
// Encode the lit meta by compressed json string
func (lm *LiteMeta) Encode() (string, error) {
data, err := json.Marshal(lm)
if err != nil {
return "", err
}
return string(data), nil
}
// Decode the lite meta from the data string
func (lm *LiteMeta) Decode(data string) error {
if len(data) == 0 {
return errors.New("no data for decoding")
}
return json.Unmarshal([]byte(data), lm)
}

View File

@ -26,6 +26,17 @@ const (
Chart = "chart" Chart = "chart"
) )
// Repository of candidate
type Repository struct {
// Namespace
Namespace string
// Repository name
Name string
// So far we need the kind of repository and retrieve candidates with different APIs
// TODO: REMOVE IT IN THE FUTURE IF WE SUPPORT UNIFIED ARTIFACT MODEL
Kind string
}
// Candidate for retention processor to match // Candidate for retention processor to match
type Candidate struct { type Candidate struct {
// Namespace // Namespace

View File

@ -16,7 +16,7 @@ package res
// Result keeps the action result // Result keeps the action result
type Result struct { type Result struct {
Target *Candidate Target *Candidate `json:"target"`
// nil error means success // nil error means success
Error error Error error `json:"error"`
} }