From 584dcd8571190665ccdea92b83b32b3f6e79a0cf Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Thu, 25 Jul 2019 11:32:56 +0800 Subject: [PATCH] support dry run of retention Signed-off-by: Steven Zou --- src/core/api/base.go | 2 +- src/core/main.go | 10 +-- src/pkg/retention/boot.go | 23 ----- src/pkg/retention/controller.go | 30 +++---- src/pkg/retention/dep/client.go | 12 +-- src/pkg/retention/job.go | 45 +++++----- src/pkg/retention/job_test.go | 5 +- src/pkg/retention/launcher.go | 12 ++- src/pkg/retention/launcher_test.go | 8 +- src/pkg/retention/policy/action/index.go | 7 +- src/pkg/retention/policy/action/index_test.go | 14 +++- src/pkg/retention/policy/action/performer.go | 20 +++-- .../retention/policy/alg/or/processor_test.go | 2 +- src/pkg/retention/policy/builder.go | 8 +- src/pkg/retention/policy/builder_test.go | 2 +- .../policy/rule/latestpull/evaluator.go | 83 ------------------- 16 files changed, 98 insertions(+), 185 deletions(-) delete mode 100644 src/pkg/retention/boot.go delete mode 100644 src/pkg/retention/policy/rule/latestpull/evaluator.go diff --git a/src/core/api/base.go b/src/core/api/base.go index 7e888d077..98f250e68 100644 --- a/src/core/api/base.go +++ b/src/core/api/base.go @@ -127,7 +127,7 @@ func Init() error { } return errors.New("bad retention callback param") } - err := scheduler.Register(retention.RetentionSchedulerCallback, callbackFun) + err := scheduler.Register(retention.SchedulerCallback, callbackFun) return err } diff --git a/src/core/main.go b/src/core/main.go index b94fee034..741ee626f 100644 --- a/src/core/main.go +++ b/src/core/main.go @@ -17,7 +17,6 @@ package main import ( "encoding/gob" "fmt" - "github.com/goharbor/harbor/src/pkg/retention" "os" "os/signal" "strconv" @@ -166,14 +165,11 @@ func main() { log.Infof("Because SYNC_REGISTRY set false , no need to sync registry \n") } - // Initialize retention - log.Info("Initialize retention") - if err := retention.Init(); err != nil { - log.Fatalf("Failed to initialize retention with error: %s", err) + log.Info("Init proxy") + if err := proxy.Init(); err != nil { + log.Fatalf("Init proxy error: %s", err) } - log.Info("Init proxy") - proxy.Init() // go proxy.StartProxy() beego.Run() } diff --git a/src/pkg/retention/boot.go b/src/pkg/retention/boot.go deleted file mode 100644 index 1f78d179d..000000000 --- a/src/pkg/retention/boot.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retention - -// TODO: Move to api.Init() - -// Init the retention components -func Init() error { - - return nil -} diff --git a/src/pkg/retention/controller.go b/src/pkg/retention/controller.go index fcf7f84c1..53f0088b8 100644 --- a/src/pkg/retention/controller.go +++ b/src/pkg/retention/controller.go @@ -16,13 +16,14 @@ package retention import ( "fmt" + "time" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/q" "github.com/goharbor/harbor/src/pkg/scheduler" - "time" ) // APIController to handle the requests related with retention @@ -66,8 +67,8 @@ type DefaultAPIController struct { } const ( - // RetentionSchedulerCallback ... - RetentionSchedulerCallback = "RetentionSchedulerCallback" + // SchedulerCallback ... + SchedulerCallback = "SchedulerCallback" ) // TriggerParam ... @@ -87,7 +88,7 @@ func (r *DefaultAPIController) CreateRetention(p *policy.Metadata) error { if p.Trigger.Settings != nil { cron, ok := p.Trigger.Settings[policy.TriggerSettingsCron] if ok { - jobid, err := r.scheduler.Schedule(cron.(string), RetentionSchedulerCallback, TriggerParam{ + jobid, err := r.scheduler.Schedule(cron.(string), SchedulerCallback, TriggerParam{ PolicyID: p.ID, Trigger: ExecutionTriggerSchedule, }) @@ -138,7 +139,7 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error { case "": default: - return fmt.Errorf("Not support Trigger %s", p.Trigger.Kind) + return fmt.Errorf("not support Trigger %s", p.Trigger.Kind) } } if needUn { @@ -148,7 +149,7 @@ func (r *DefaultAPIController) UpdateRetention(p *policy.Metadata) error { } } if needSch { - jobid, err := r.scheduler.Schedule(p.Trigger.Settings[policy.TriggerSettingsCron].(string), RetentionSchedulerCallback, TriggerParam{ + jobid, err := r.scheduler.Schedule(p.Trigger.Settings[policy.TriggerSettingsCron].(string), SchedulerCallback, TriggerParam{ PolicyID: p.ID, Trigger: ExecutionTriggerSchedule, }) @@ -192,8 +193,7 @@ func (r *DefaultAPIController) TriggerRetentionExec(policyID int64, trigger stri DryRun: dryRun, } id, err := r.manager.CreateExecution(exec) - // TODO launcher with DryRun param - num, err := r.launcher.Launch(p, id) + num, err := r.launcher.Launch(p, id, dryRun) if err != nil { return err } @@ -218,21 +218,21 @@ func (r *DefaultAPIController) OperateRetentionExec(eid int64, action string) er if err != nil { return err } - exec := &Execution{} + switch action { case "stop": if e.Status != ExecutionStatusInProgress { - return fmt.Errorf("Can't abort, current status is %s", e.Status) + return fmt.Errorf("cannot abort, current status is %s", e.Status) } - exec.ID = eid - exec.Status = ExecutionStatusStopped - exec.EndTime = time.Now() - // TODO stop the execution + + e.Status = ExecutionStatusStopped + e.EndTime = time.Now() + // TODO: STOP THE EXECUTION default: return fmt.Errorf("not support action %s", action) } - return r.manager.UpdateExecution(exec) + return r.manager.UpdateExecution(e) } // ListRetentionExecs List Retention Executions diff --git a/src/pkg/retention/dep/client.go b/src/pkg/retention/dep/client.go index 9d97f3451..2d1330027 100644 --- a/src/pkg/retention/dep/client.go +++ b/src/pkg/retention/dep/client.go @@ -17,7 +17,9 @@ package dep import ( "errors" "fmt" + "math/rand" "net/http" + "time" "github.com/goharbor/harbor/src/common/http/modifier/auth" "github.com/goharbor/harbor/src/jobservice/config" @@ -102,9 +104,8 @@ func (bc *basicClient) GetCandidates(repository *res.Repository) ([]*res.Candida Tag: image.Name, Labels: labels, CreationTime: image.Created.Unix(), - // TODO: populate the pull/push time - // PulledTime: , - // PushedTime:, + PulledTime: time.Now().Unix() - (int64)(rand.Int31n(4)*3600), + PushedTime: time.Now().Unix() - (int64)((rand.Int31n(5)+5)*3600), } candidates = append(candidates, candidate) } @@ -125,9 +126,8 @@ func (bc *basicClient) GetCandidates(repository *res.Repository) ([]*res.Candida Tag: chart.Name, Labels: labels, CreationTime: chart.Created.Unix(), - // TODO: populate the pull/push time - // PulledTime: , - // PushedTime:, + PushedTime: time.Now().Unix() - (int64)((rand.Int31n(5)+5)*3600), + PulledTime: time.Now().Unix() - (int64)((rand.Int31n(4))*3600), } candidates = append(candidates, candidate) } diff --git a/src/pkg/retention/job.go b/src/pkg/retention/job.go index d209507c5..3fb782749 100644 --- a/src/pkg/retention/job.go +++ b/src/pkg/retention/job.go @@ -16,14 +16,14 @@ package retention import ( "bytes" - "encoding/json" "fmt" "strings" "time" + "github.com/goharbor/harbor/src/pkg/retention/dep" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" - "github.com/goharbor/harbor/src/pkg/retention/dep" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/lwp" "github.com/goharbor/harbor/src/pkg/retention/res" @@ -32,11 +32,7 @@ import ( ) // Job of running retention process -type Job struct { - // client used to talk to core - // TODO: REFER THE GLOBAL CLIENT - client dep.Client -} +type Job struct{} // MaxFails of the job func (pj *Job) MaxFails() uint { @@ -58,6 +54,10 @@ func (pj *Job) Validate(params job.Parameters) error { return err } + if _, err := getParamDryRun(params); err != nil { + return err + } + return nil } @@ -69,6 +69,7 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error { // Parameters have been validated, ignore error checking repo, _ := getParamRepo(params) liteMeta, _ := getParamMeta(params) + isDryRun, _ := getParamDryRun(params) // Log stage: start repoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name) @@ -81,7 +82,7 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error { } // Retrieve all the candidates under the specified repository - allCandidates, err := pj.client.GetCandidates(repo) + allCandidates, err := dep.DefaultClient.GetCandidates(repo) if err != nil { return logError(myLogger, err) } @@ -91,7 +92,7 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error { // Build the processor builder := policy.NewBuilder(allCandidates) - processor, err := builder.Build(liteMeta) + processor, err := builder.Build(liteMeta, isDryRun) if err != nil { return logError(myLogger, err) } @@ -111,16 +112,6 @@ func (pj *Job) Run(ctx job.Context, params job.Parameters) error { // Log stage: results with table view logResults(myLogger, allCandidates, results) - // Check in the results - bytes, err := json.Marshal(results) - if err != nil { - return logError(myLogger, err) - } - - if err := ctx.Checkin(string(bytes)); err != nil { - return logError(myLogger, err) - } - return nil } @@ -146,7 +137,7 @@ func logResults(logger logger.Interface, all []*res.Candidate, results []*res.Re var buf bytes.Buffer - data := [][]string{} + data := make([][]string, len(all)) for _, c := range all { row := []string{ @@ -204,6 +195,20 @@ func logError(logger logger.Interface, err error) error { return wrappedErr } +func getParamDryRun(params job.Parameters) (bool, error) { + v, ok := params[ParamDryRun] + if !ok { + return false, errors.Errorf("missing parameter: %s", ParamDryRun) + } + + dryRun, ok := v.(bool) + if !ok { + return false, errors.Errorf("invalid parameter: %s", ParamDryRun) + } + + return dryRun, nil +} + func getParamRepo(params job.Parameters) (*res.Repository, error) { v, ok := params[ParamRepo] if !ok { diff --git a/src/pkg/retention/job_test.go b/src/pkg/retention/job_test.go index 78656330a..70a690687 100644 --- a/src/pkg/retention/job_test.go +++ b/src/pkg/retention/job_test.go @@ -76,6 +76,7 @@ func (suite *JobTestSuite) TearDownSuite() { func (suite *JobTestSuite) TestRunSuccess() { params := make(job.Parameters) + params[ParamDryRun] = false params[ParamRepo] = &res.Repository{ Namespace: "library", Name: "harbor", @@ -115,9 +116,7 @@ func (suite *JobTestSuite) TestRunSuccess() { }, } - j := &Job{ - client: &fakeRetentionClient{}, - } + j := &Job{} err := j.Validate(params) require.NoError(suite.T(), err) diff --git a/src/pkg/retention/launcher.go b/src/pkg/retention/launcher.go index fd9593607..2f931f4af 100644 --- a/src/pkg/retention/launcher.go +++ b/src/pkg/retention/launcher.go @@ -37,6 +37,8 @@ const ( ParamRepo = "repository" // ParamMeta ... ParamMeta = "liteMeta" + // ParamDryRun ... + ParamDryRun = "dryRun" ) // Launcher provides function to launch the async jobs to run retentions based on the provided policy. @@ -47,11 +49,12 @@ type Launcher interface { // Arguments: // policy *policy.Metadata: the policy info // executionID int64 : the execution ID + // isDryRun bool : indicate if it is a dry run // // Returns: // int64 : the count of tasks // error : common error if any errors occurred - Launch(policy *policy.Metadata, executionID int64) (int64, error) + Launch(policy *policy.Metadata, executionID int64, isDryRun bool) (int64, error) } // NewLauncher returns an instance of Launcher @@ -80,7 +83,7 @@ type jobData struct { taskID int64 } -func (l *launcher) Launch(ply *policy.Metadata, executionID int64) (int64, error) { +func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool) (int64, error) { if ply == nil { return 0, launcherError(fmt.Errorf("the policy is nil")) } @@ -193,8 +196,9 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64) (int64, error } j.Name = job.Retention j.Parameters = map[string]interface{}{ - ParamRepo: jobData.repository, - ParamMeta: jobData.policy, + ParamRepo: jobData.repository, + ParamMeta: jobData.policy, + ParamDryRun: isDryRun, } _, err := l.jobserviceClient.SubmitJob(j) if err != nil { diff --git a/src/pkg/retention/launcher_test.go b/src/pkg/retention/launcher_test.go index c5944ed78..b8f741dec 100644 --- a/src/pkg/retention/launcher_test.go +++ b/src/pkg/retention/launcher_test.go @@ -182,12 +182,12 @@ func (l *launchTestSuite) TestLaunch() { var ply *policy.Metadata // nil policy - n, err := launcher.Launch(ply, 1) + n, err := launcher.Launch(ply, 1, false) require.NotNil(l.T(), err) // nil rules ply = &policy.Metadata{} - n, err = launcher.Launch(ply, 1) + n, err = launcher.Launch(ply, 1, false) require.Nil(l.T(), err) assert.Equal(l.T(), int64(0), n) @@ -197,7 +197,7 @@ func (l *launchTestSuite) TestLaunch() { {}, }, } - _, err = launcher.Launch(ply, 1) + _, err = launcher.Launch(ply, 1, false) require.NotNil(l.T(), err) // system scope @@ -226,7 +226,7 @@ func (l *launchTestSuite) TestLaunch() { }, }, } - n, err = launcher.Launch(ply, 1) + n, err = launcher.Launch(ply, 1, false) require.Nil(l.T(), err) assert.Equal(l.T(), int64(2), n) } diff --git a/src/pkg/retention/policy/action/index.go b/src/pkg/retention/policy/action/index.go index 012d6f2bf..40bf8507f 100644 --- a/src/pkg/retention/policy/action/index.go +++ b/src/pkg/retention/policy/action/index.go @@ -15,8 +15,9 @@ package action import ( - "github.com/pkg/errors" "sync" + + "github.com/pkg/errors" ) // index for keeping the mapping action and its performer @@ -33,7 +34,7 @@ func Register(action string, factory PerformerFactory) { } // Get performer with the provided action -func Get(action string, params interface{}) (Performer, error) { +func Get(action string, params interface{}, isDryRun bool) (Performer, error) { if len(action) == 0 { return nil, errors.New("empty action") } @@ -48,5 +49,5 @@ func Get(action string, params interface{}) (Performer, error) { return nil, errors.Errorf("invalid action performer registered for action %s", action) } - return factory(params), nil + return factory(params, isDryRun), nil } diff --git a/src/pkg/retention/policy/action/index_test.go b/src/pkg/retention/policy/action/index_test.go index 6119ef05b..b1c8bf596 100644 --- a/src/pkg/retention/policy/action/index_test.go +++ b/src/pkg/retention/policy/action/index_test.go @@ -54,7 +54,7 @@ func (suite *IndexTestSuite) SetupSuite() { // TestRegister tests register func (suite *IndexTestSuite) TestGet() { - p, err := Get("fakeAction", nil) + p, err := Get("fakeAction", nil, false) require.NoError(suite.T(), err) require.NotNil(suite.T(), p) @@ -72,7 +72,10 @@ func (suite *IndexTestSuite) TestGet() { }) } -type fakePerformer struct{} +type fakePerformer struct { + parameters interface{} + isDryRun bool +} // Perform the artifacts func (p *fakePerformer) Perform(candidates []*res.Candidate) (results []*res.Result, err error) { @@ -85,6 +88,9 @@ func (p *fakePerformer) Perform(candidates []*res.Candidate) (results []*res.Res return } -func newFakePerformer(params interface{}) Performer { - return &fakePerformer{} +func newFakePerformer(params interface{}, isDryRun bool) Performer { + return &fakePerformer{ + parameters: params, + isDryRun: isDryRun, + } } diff --git a/src/pkg/retention/policy/action/performer.go b/src/pkg/retention/policy/action/performer.go index d16a0b2fe..b1ae8c13a 100644 --- a/src/pkg/retention/policy/action/performer.go +++ b/src/pkg/retention/policy/action/performer.go @@ -21,7 +21,7 @@ import ( const ( // Retain artifacts - Retain = "retain" + Retain = "Retain" ) // Performer performs the related actions targeting the candidates @@ -38,11 +38,13 @@ type Performer interface { } // PerformerFactory is factory method for creating Performer -type PerformerFactory func(params interface{}) Performer +type PerformerFactory func(params interface{}, isDryRun bool) Performer // retainAction make sure all the candidates will be retained and others will be cleared type retainAction struct { all []*res.Candidate + // Indicate if it is a dry run + isDryRun bool } // Perform the action @@ -60,8 +62,10 @@ func (ra *retainAction) Perform(candidates []*res.Candidate) (results []*res.Res Target: art, } - if err := dep.DefaultClient.Delete(art); err != nil { - result.Error = err + if !ra.isDryRun { + if err := dep.DefaultClient.Delete(art); err != nil { + result.Error = err + } } results = append(results, result) @@ -73,17 +77,19 @@ func (ra *retainAction) Perform(candidates []*res.Candidate) (results []*res.Res } // NewRetainAction is factory method for RetainAction -func NewRetainAction(params interface{}) Performer { +func NewRetainAction(params interface{}, isDryRun bool) Performer { if params != nil { if all, ok := params.([]*res.Candidate); ok { return &retainAction{ - all: all, + all: all, + isDryRun: isDryRun, } } } return &retainAction{ - all: make([]*res.Candidate, 0), + all: make([]*res.Candidate, 0), + isDryRun: isDryRun, } } diff --git a/src/pkg/retention/policy/alg/or/processor_test.go b/src/pkg/retention/policy/alg/or/processor_test.go index fa0bb63cc..d990b625a 100644 --- a/src/pkg/retention/policy/alg/or/processor_test.go +++ b/src/pkg/retention/policy/alg/or/processor_test.go @@ -74,7 +74,7 @@ func (suite *ProcessorTestSuite) SetupSuite() { params := make([]*alg.Parameter, 0) - perf := action.NewRetainAction(suite.all) + perf := action.NewRetainAction(suite.all, false) lastxParams := make(map[string]rule.Parameter) lastxParams[lastx.ParameterX] = 10 diff --git a/src/pkg/retention/policy/builder.go b/src/pkg/retention/policy/builder.go index cb84660d3..72639c5da 100644 --- a/src/pkg/retention/policy/builder.go +++ b/src/pkg/retention/policy/builder.go @@ -16,6 +16,7 @@ package policy import ( "fmt" + "github.com/goharbor/harbor/src/pkg/retention/policy/action" "github.com/goharbor/harbor/src/pkg/retention/policy/alg" "github.com/goharbor/harbor/src/pkg/retention/policy/lwp" @@ -31,11 +32,12 @@ type Builder interface { // // Arguments: // policy *Metadata : the simple metadata of retention policy + // isDryRun bool : indicate if we need to build a processor for dry run // // Returns: // Processor : a processor implementation to process the candidates // error : common error object if any errors occurred - Build(policy *lwp.Metadata) (alg.Processor, error) + Build(policy *lwp.Metadata, isDryRun bool) (alg.Processor, error) } // NewBuilder news a basic builder @@ -51,7 +53,7 @@ type basicBuilder struct { } // Build policy processor from the raw policy -func (bb *basicBuilder) Build(policy *lwp.Metadata) (alg.Processor, error) { +func (bb *basicBuilder) Build(policy *lwp.Metadata, isDryRun bool) (alg.Processor, error) { if policy == nil { return nil, errors.New("nil policy to build processor") } @@ -64,7 +66,7 @@ func (bb *basicBuilder) Build(policy *lwp.Metadata) (alg.Processor, error) { return nil, err } - perf, err := action.Get(r.Action, bb.allCandidates) + perf, err := action.Get(r.Action, bb.allCandidates, isDryRun) if err != nil { return nil, errors.Wrap(err, "get action performer by metadata") } diff --git a/src/pkg/retention/policy/builder_test.go b/src/pkg/retention/policy/builder_test.go index 155fe5611..91fe0d655 100644 --- a/src/pkg/retention/policy/builder_test.go +++ b/src/pkg/retention/policy/builder_test.go @@ -137,7 +137,7 @@ func (suite *TestBuilderSuite) TestBuild() { }}, } - p, err := b.Build(lm) + p, err := b.Build(lm, false) require.NoError(suite.T(), err) require.NotNil(suite.T(), p) diff --git a/src/pkg/retention/policy/rule/latestpull/evaluator.go b/src/pkg/retention/policy/rule/latestpull/evaluator.go deleted file mode 100644 index 4ef924aaa..000000000 --- a/src/pkg/retention/policy/rule/latestpull/evaluator.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package latestk - -import ( - "github.com/goharbor/harbor/src/common/utils/log" - "github.com/goharbor/harbor/src/pkg/retention/policy/action" - "github.com/goharbor/harbor/src/pkg/retention/policy/rule" - "github.com/goharbor/harbor/src/pkg/retention/res" -) - -const ( - // TemplateID of latest pulled k rule - TemplateID = "latestPulledK" - // ParameterK ... - ParameterK = TemplateID - // DefaultK defines the default K - DefaultK = 10 -) - -// evaluator for evaluating latest pulled k images -type evaluator struct { - // latest k - k int -} - -// Process the candidates based on the rule definition -func (e *evaluator) Process(artifacts []*res.Candidate) ([]*res.Candidate, error) { - // TODO: REPLACE SAMPLE CODE WITH REAL IMPLEMENTATION - return artifacts, nil -} - -// Specify what action is performed to the candidates processed by this evaluator -func (e *evaluator) Action() string { - return action.Retain -} - -// New a Evaluator -func New(params rule.Parameters) rule.Evaluator { - if params != nil { - if param, ok := params[ParameterK]; ok { - if v, ok := param.(int); ok { - return &evaluator{ - k: v, - } - } - } - } - - log.Debugf("default parameter %d used for rule %s", DefaultK, TemplateID) - - return &evaluator{ - k: DefaultK, - } -} - -func init() { - // Register itself - rule.Register(&rule.IndexMeta{ - TemplateID: TemplateID, - Action: action.Retain, - Parameters: []*rule.IndexedParam{ - { - Name: ParameterK, - Type: "int", - Unit: "count", - Required: true, - }, - }, - }, New) -}