From 61f54cc4a9e5dcd406dbb6f7803c04c50659f575 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 30 Jul 2019 12:42:01 +0800 Subject: [PATCH] Refactor retention launcher Signed-off-by: Wenkai Yin --- src/pkg/clients/core/chart.go | 5 + src/pkg/clients/core/client.go | 2 + src/pkg/clients/core/image.go | 5 + src/pkg/retention/controller_test.go | 3 +- src/pkg/retention/dep/client.go | 12 +- src/pkg/retention/launcher.go | 169 ++++++++++++++++-------- src/pkg/retention/launcher_test.go | 22 +-- src/testing/clients/dumb_core_client.go | 10 ++ 8 files changed, 162 insertions(+), 66 deletions(-) diff --git a/src/pkg/clients/core/chart.go b/src/pkg/clients/core/chart.go index 2e2cc4a7d..75d8c3983 100644 --- a/src/pkg/clients/core/chart.go +++ b/src/pkg/clients/core/chart.go @@ -33,3 +33,8 @@ func (c *client) DeleteChart(project, repository, version string) error { url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s/%s", project, repository, version)) return c.httpclient.Delete(url) } + +func (c *client) DeleteChartRepository(project, repository string) error { + url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s", project, repository)) + return c.httpclient.Delete(url) +} diff --git a/src/pkg/clients/core/client.go b/src/pkg/clients/core/client.go index 74d09bc62..2234fd17c 100644 --- a/src/pkg/clients/core/client.go +++ b/src/pkg/clients/core/client.go @@ -37,12 +37,14 @@ type Client interface { type ImageClient interface { ListAllImages(project, repository string) ([]*models.TagResp, error) DeleteImage(project, repository, tag string) error + DeleteImageRepository(project, repository string) error } // ChartClient defines the methods that a chart client should implement type ChartClient interface { ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) DeleteChart(project, repository, version string) error + DeleteChartRepository(project, repository string) error } // New returns an instance of the client which is a default implement for Client diff --git a/src/pkg/clients/core/image.go b/src/pkg/clients/core/image.go index 83530d194..1b8811790 100644 --- a/src/pkg/clients/core/image.go +++ b/src/pkg/clients/core/image.go @@ -33,3 +33,8 @@ func (c *client) DeleteImage(project, repository, tag string) error { url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s/tags/%s", project, repository, tag)) return c.httpclient.Delete(url) } + +func (c *client) DeleteImageRepository(project, repository string) error { + url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s", project, repository)) + return c.httpclient.Delete(url) +} diff --git a/src/pkg/retention/controller_test.go b/src/pkg/retention/controller_test.go index dfe9d75a3..9b7d03935 100644 --- a/src/pkg/retention/controller_test.go +++ b/src/pkg/retention/controller_test.go @@ -1,11 +1,12 @@ package retention import ( + "testing" + "github.com/goharbor/harbor/src/pkg/retention/dep" "github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy/rule" "github.com/stretchr/testify/suite" - "testing" ) type ControllerTestSuite struct { diff --git a/src/pkg/retention/dep/client.go b/src/pkg/retention/dep/client.go index c711dfd44..14417567d 100644 --- a/src/pkg/retention/dep/client.go +++ b/src/pkg/retention/dep/client.go @@ -148,7 +148,17 @@ func (bc *basicClient) GetCandidates(repository *res.Repository) ([]*res.Candida // DeleteRepository deletes the specified repository func (bc *basicClient) DeleteRepository(repo *res.Repository) error { - return errors.New("not implemented") + if repo == nil { + return errors.New("repository is nil") + } + switch repo.Kind { + case res.Image: + return bc.coreClient.DeleteImageRepository(repo.Namespace, repo.Name) + case res.Chart: + return bc.coreClient.DeleteChartRepository(repo.Namespace, repo.Name) + default: + return fmt.Errorf("unsupported repository kind: %s", repo.Kind) + } } // Deletes the specified candidate diff --git a/src/pkg/retention/launcher.go b/src/pkg/retention/launcher.go index ccec50663..42b9d7ef2 100644 --- a/src/pkg/retention/launcher.go +++ b/src/pkg/retention/launcher.go @@ -18,6 +18,7 @@ import ( "fmt" "time" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/retention/res/selectors/index" cjob "github.com/goharbor/harbor/src/common/job" @@ -26,7 +27,6 @@ import ( "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/retention/policy" @@ -82,6 +82,13 @@ func NewLauncher(projectMgr project.Manager, repositoryMgr repository.Manager, } } +type jobData struct { + TaskID int64 + Repository res.Repository + JobName string + JobParams map[string]interface{} +} + type launcher struct { retentionMgr Manager projectMgr project.Manager @@ -91,38 +98,34 @@ type launcher struct { chartServerEnabled bool } -type jobData struct { - repository res.Repository - policy lwp.Metadata - taskID int64 -} - func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool) (int64, error) { if ply == nil { return 0, launcherError(fmt.Errorf("the policy is nil")) } // no rules, return directly if len(ply.Rules) == 0 { + log.Debugf("no rules for policy %d, skip", ply.ID) return 0, nil } scope := ply.Scope if scope == nil { return 0, launcherError(fmt.Errorf("the scope of policy is nil")) } - + allRepositories := make(map[res.Repository]struct{}, 0) repositoryRules := make(map[res.Repository]*lwp.Metadata, 0) level := scope.Level - var projectCandidates []*res.Candidate + var allProjects []*res.Candidate var err error if level == "system" { // get projects - projectCandidates, err = getProjects(l.projectMgr) + allProjects, err = getProjects(l.projectMgr) if err != nil { return 0, launcherError(err) } } for _, rule := range ply.Rules { + projectCandidates := allProjects switch level { case "system": // filter projects according to the project selectors @@ -150,7 +153,15 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool if err != nil { return 0, launcherError(err) } - repositoryCandidates = append(repositoryCandidates, repositories...) + for _, repository := range repositories { + repo := res.Repository{ + Namespace: repository.Namespace, + Name: repository.Repository, + Kind: repository.Kind, + } + allRepositories[repo] = struct{}{} + repositoryCandidates = append(repositoryCandidates, repository) + } } // filter repositories according to the repository selectors for _, repositorySelector := range rule.ScopeSelectors["repository"] { @@ -179,67 +190,116 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool repositoryRules[reposit].Rules = append(repositoryRules[reposit].Rules, &rule) } } - // no tasks need to be submitted - if len(repositoryRules) == 0 { + + // create job data list + jobDatas, err := createJobs(allRepositories, repositoryRules, isDryRun) + if err != nil { + return 0, launcherError(err) + } + + // no jobs, return directly + if len(jobDatas) == 0 { + log.Debugf("no candidates for policy %d, skip", ply.ID) return 0, nil } - // create task records - jobDatas := make([]*jobData, 0) + // create task records in database + if err = l.createTasks(executionID, jobDatas); err != nil { + return 0, launcherError(err) + } + + // submit jobs to jobservice + if err = l.submitJobs(jobDatas); err != nil { + return 0, launcherError(err) + } + + return int64(len(jobDatas)), nil +} + +func createJobs(allRepositories map[res.Repository]struct{}, + repositoryRules map[res.Repository]*lwp.Metadata, isDryRun bool) ([]*jobData, error) { + jobDatas := []*jobData{} + for repository, policy := range repositoryRules { + jobData := &jobData{ + Repository: repository, + JobName: job.Retention, + JobParams: make(map[string]interface{}, 3), + } + // set dry run + jobData.JobParams[ParamDryRun] = isDryRun + // set repository + repoJSON, err := repository.ToJSON() + if err != nil { + return nil, err + } + jobData.JobParams[ParamRepo] = repoJSON + // set retention policy + policyJSON, err := policy.ToJSON() + if err != nil { + return nil, err + } + jobData.JobParams[ParamMeta] = policyJSON + jobDatas = append(jobDatas, jobData) + } + for repository := range allRepositories { + if _, exist := repositoryRules[repository]; exist { + continue + } + jobData := &jobData{ + Repository: repository, + JobName: job.RetentionDel, + JobParams: make(map[string]interface{}, 2), + } + // set dry run + jobData.JobParams[ParamDryRun] = isDryRun + // set repository + repoJSON, err := repository.ToJSON() + if err != nil { + return nil, err + } + jobData.JobParams[ParamRepo] = repoJSON + jobDatas = append(jobDatas, jobData) + } + return jobDatas, nil +} + +// create task records in database +func (l *launcher) createTasks(executionID int64, jobDatas []*jobData) error { now := time.Now() - for repo, p := range repositoryRules { + for _, jobData := range jobDatas { taskID, err := l.retentionMgr.CreateTask(&Task{ ExecutionID: executionID, - Repository: repo.Name, + Repository: jobData.Repository.Name, StartTime: now, }) if err != nil { - return 0, launcherError(err) + return err } - jobDatas = append(jobDatas, &jobData{ - repository: repo, - policy: *p, - taskID: taskID, - }) + jobData.TaskID = taskID } + return nil +} +// create task records in database +func (l *launcher) submitJobs(jobDatas []*jobData) error { allFailed := true for _, jobData := range jobDatas { + task := &Task{ + ID: jobData.TaskID, + } + props := []string{"Status"} j := &models.JobData{ - Name: job.Retention, + Name: jobData.JobName, Metadata: &models.JobMetadata{ JobKind: job.KindGeneric, }, - StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.taskID), - Parameters: make(map[string]interface{}, 3), + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.TaskID), + Parameters: jobData.JobParams, } - - var ( - repoJSON, policyJSON string - ) - // Set dry run - j.Parameters[ParamDryRun] = isDryRun - // Set repository - if repoJSON, err = jobData.repository.ToJSON(); err == nil { - j.Parameters[ParamRepo] = repoJSON - // Set retention policy - if policyJSON, err = jobData.policy.ToJSON(); err == nil { - j.Parameters[ParamMeta] = policyJSON - } - } - - var jobID string - if err == nil { - // Submit job - jobID, err = l.jobserviceClient.SubmitJob(j) - } - - task := &Task{ - ID: jobData.taskID, - } - props := []string{"Status"} + // Submit job + jobID, err := l.jobserviceClient.SubmitJob(j) if err != nil { - log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.taskID, err))) + log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.TaskID, err))) task.Status = cmodels.JobError task.EndTime = time.Now() props = append(props, "EndTime") @@ -253,11 +313,10 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool log.Errorf("failed to update the status of task %d: %v", task.ID, err) } } - if allFailed { - return 0, launcherError(fmt.Errorf("all tasks failed")) + return launcherError(fmt.Errorf("all tasks failed")) } - return int64(len(jobDatas)), nil + return nil } func (l *launcher) Stop(executionID int64) error { diff --git a/src/pkg/retention/launcher_test.go b/src/pkg/retention/launcher_test.go index 9da42861a..1b5136bbd 100644 --- a/src/pkg/retention/launcher_test.go +++ b/src/pkg/retention/launcher_test.go @@ -137,19 +137,26 @@ type launchTestSuite struct { } func (l *launchTestSuite) SetupTest() { - pro := &models.Project{ + pro1 := &models.Project{ ProjectID: 1, Name: "library", } + pro2 := &models.Project{ + ProjectID: 2, + Name: "test", + } l.projectMgr = &fakeProjectManager{ projects: []*models.Project{ - pro, + pro1, pro2, }} l.repositoryMgr = &fakeRepositoryManager{ imageRepositories: []*models.RepoRecord{ { Name: "library/image", }, + { + Name: "test/image", + }, }, chartRepositories: []*chartserver.ChartInfo{ { @@ -166,7 +173,7 @@ func (l *launchTestSuite) SetupTest() { func (l *launchTestSuite) TestGetProjects() { projects, err := getProjects(l.projectMgr) require.Nil(l.T(), err) - assert.Equal(l.T(), 1, len(projects)) + assert.Equal(l.T(), 2, len(projects)) assert.Equal(l.T(), int64(1), projects[0].NamespaceID) assert.Equal(l.T(), "library", projects[0].Namespace) } @@ -174,13 +181,10 @@ func (l *launchTestSuite) TestGetProjects() { func (l *launchTestSuite) TestGetRepositories() { repositories, err := getRepositories(l.projectMgr, l.repositoryMgr, 1, true) require.Nil(l.T(), err) - assert.Equal(l.T(), 2, len(repositories)) + assert.Equal(l.T(), 3, len(repositories)) assert.Equal(l.T(), "library", repositories[0].Namespace) assert.Equal(l.T(), "image", repositories[0].Repository) assert.Equal(l.T(), "image", repositories[0].Kind) - assert.Equal(l.T(), "library", repositories[1].Namespace) - assert.Equal(l.T(), "chart", repositories[1].Repository) - assert.Equal(l.T(), "chart", repositories[1].Kind) } func (l *launchTestSuite) TestLaunch() { @@ -224,7 +228,7 @@ func (l *launchTestSuite) TestLaunch() { { Kind: "doublestar", Decoration: "nsMatches", - Pattern: "**", + Pattern: "library", }, }, "repository": { @@ -240,7 +244,7 @@ func (l *launchTestSuite) TestLaunch() { } n, err = launcher.Launch(ply, 1, false) require.Nil(l.T(), err) - assert.Equal(l.T(), int64(2), n) + assert.Equal(l.T(), int64(3), n) } func (l *launchTestSuite) TestStop() { diff --git a/src/testing/clients/dumb_core_client.go b/src/testing/clients/dumb_core_client.go index 5af6c11e1..a0a27129b 100644 --- a/src/testing/clients/dumb_core_client.go +++ b/src/testing/clients/dumb_core_client.go @@ -33,6 +33,11 @@ func (d *DumbCoreClient) DeleteImage(project, repository, tag string) error { return nil } +// DeleteImageRepository ... +func (d *DumbCoreClient) DeleteImageRepository(project, repository string) error { + return nil +} + // ListAllCharts ... func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) { return nil, nil @@ -42,3 +47,8 @@ func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserv func (d *DumbCoreClient) DeleteChart(project, repository, version string) error { return nil } + +// DeleteChartRepository ... +func (d *DumbCoreClient) DeleteChartRepository(project, repository string) error { + return nil +}