Refactor retention launcher

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-07-30 12:42:01 +08:00
parent 63d16d4b6b
commit 61f54cc4a9
8 changed files with 162 additions and 66 deletions

View File

@ -33,3 +33,8 @@ func (c *client) DeleteChart(project, repository, version string) error {
url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s/%s", project, repository, version)) url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s/%s", project, repository, version))
return c.httpclient.Delete(url) return c.httpclient.Delete(url)
} }
func (c *client) DeleteChartRepository(project, repository string) error {
url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s", project, repository))
return c.httpclient.Delete(url)
}

View File

@ -37,12 +37,14 @@ type Client interface {
type ImageClient interface { type ImageClient interface {
ListAllImages(project, repository string) ([]*models.TagResp, error) ListAllImages(project, repository string) ([]*models.TagResp, error)
DeleteImage(project, repository, tag string) error DeleteImage(project, repository, tag string) error
DeleteImageRepository(project, repository string) error
} }
// ChartClient defines the methods that a chart client should implement // ChartClient defines the methods that a chart client should implement
type ChartClient interface { type ChartClient interface {
ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error)
DeleteChart(project, repository, version string) error DeleteChart(project, repository, version string) error
DeleteChartRepository(project, repository string) error
} }
// New returns an instance of the client which is a default implement for Client // New returns an instance of the client which is a default implement for Client

View File

@ -33,3 +33,8 @@ func (c *client) DeleteImage(project, repository, tag string) error {
url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s/tags/%s", project, repository, tag)) url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s/tags/%s", project, repository, tag))
return c.httpclient.Delete(url) return c.httpclient.Delete(url)
} }
func (c *client) DeleteImageRepository(project, repository string) error {
url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s", project, repository))
return c.httpclient.Delete(url)
}

View File

@ -1,11 +1,12 @@
package retention package retention
import ( import (
"testing"
"github.com/goharbor/harbor/src/pkg/retention/dep" "github.com/goharbor/harbor/src/pkg/retention/dep"
"github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule" "github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"testing"
) )
type ControllerTestSuite struct { type ControllerTestSuite struct {

View File

@ -148,7 +148,17 @@ func (bc *basicClient) GetCandidates(repository *res.Repository) ([]*res.Candida
// DeleteRepository deletes the specified repository // DeleteRepository deletes the specified repository
func (bc *basicClient) DeleteRepository(repo *res.Repository) error { func (bc *basicClient) DeleteRepository(repo *res.Repository) error {
return errors.New("not implemented") if repo == nil {
return errors.New("repository is nil")
}
switch repo.Kind {
case res.Image:
return bc.coreClient.DeleteImageRepository(repo.Namespace, repo.Name)
case res.Chart:
return bc.coreClient.DeleteChartRepository(repo.Namespace, repo.Name)
default:
return fmt.Errorf("unsupported repository kind: %s", repo.Kind)
}
} }
// Deletes the specified candidate // Deletes the specified candidate

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/index" "github.com/goharbor/harbor/src/pkg/retention/res/selectors/index"
cjob "github.com/goharbor/harbor/src/common/job" cjob "github.com/goharbor/harbor/src/common/job"
@ -26,7 +27,6 @@ import (
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy"
@ -82,6 +82,13 @@ func NewLauncher(projectMgr project.Manager, repositoryMgr repository.Manager,
} }
} }
type jobData struct {
TaskID int64
Repository res.Repository
JobName string
JobParams map[string]interface{}
}
type launcher struct { type launcher struct {
retentionMgr Manager retentionMgr Manager
projectMgr project.Manager projectMgr project.Manager
@ -91,38 +98,34 @@ type launcher struct {
chartServerEnabled bool chartServerEnabled bool
} }
type jobData struct {
repository res.Repository
policy lwp.Metadata
taskID int64
}
func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool) (int64, error) { func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool) (int64, error) {
if ply == nil { if ply == nil {
return 0, launcherError(fmt.Errorf("the policy is nil")) return 0, launcherError(fmt.Errorf("the policy is nil"))
} }
// no rules, return directly // no rules, return directly
if len(ply.Rules) == 0 { if len(ply.Rules) == 0 {
log.Debugf("no rules for policy %d, skip", ply.ID)
return 0, nil return 0, nil
} }
scope := ply.Scope scope := ply.Scope
if scope == nil { if scope == nil {
return 0, launcherError(fmt.Errorf("the scope of policy is nil")) return 0, launcherError(fmt.Errorf("the scope of policy is nil"))
} }
allRepositories := make(map[res.Repository]struct{}, 0)
repositoryRules := make(map[res.Repository]*lwp.Metadata, 0) repositoryRules := make(map[res.Repository]*lwp.Metadata, 0)
level := scope.Level level := scope.Level
var projectCandidates []*res.Candidate var allProjects []*res.Candidate
var err error var err error
if level == "system" { if level == "system" {
// get projects // get projects
projectCandidates, err = getProjects(l.projectMgr) allProjects, err = getProjects(l.projectMgr)
if err != nil { if err != nil {
return 0, launcherError(err) return 0, launcherError(err)
} }
} }
for _, rule := range ply.Rules { for _, rule := range ply.Rules {
projectCandidates := allProjects
switch level { switch level {
case "system": case "system":
// filter projects according to the project selectors // filter projects according to the project selectors
@ -150,7 +153,15 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
if err != nil { if err != nil {
return 0, launcherError(err) return 0, launcherError(err)
} }
repositoryCandidates = append(repositoryCandidates, repositories...) for _, repository := range repositories {
repo := res.Repository{
Namespace: repository.Namespace,
Name: repository.Repository,
Kind: repository.Kind,
}
allRepositories[repo] = struct{}{}
repositoryCandidates = append(repositoryCandidates, repository)
}
} }
// filter repositories according to the repository selectors // filter repositories according to the repository selectors
for _, repositorySelector := range rule.ScopeSelectors["repository"] { for _, repositorySelector := range rule.ScopeSelectors["repository"] {
@ -179,67 +190,116 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
repositoryRules[reposit].Rules = append(repositoryRules[reposit].Rules, &rule) repositoryRules[reposit].Rules = append(repositoryRules[reposit].Rules, &rule)
} }
} }
// no tasks need to be submitted
if len(repositoryRules) == 0 {
return 0, nil
}
// create task records // create job data list
jobDatas := make([]*jobData, 0) jobDatas, err := createJobs(allRepositories, repositoryRules, isDryRun)
now := time.Now()
for repo, p := range repositoryRules {
taskID, err := l.retentionMgr.CreateTask(&Task{
ExecutionID: executionID,
Repository: repo.Name,
StartTime: now,
})
if err != nil { if err != nil {
return 0, launcherError(err) return 0, launcherError(err)
} }
jobDatas = append(jobDatas, &jobData{
repository: repo, // no jobs, return directly
policy: *p, if len(jobDatas) == 0 {
taskID: taskID, log.Debugf("no candidates for policy %d, skip", ply.ID)
}) return 0, nil
} }
// create task records in database
if err = l.createTasks(executionID, jobDatas); err != nil {
return 0, launcherError(err)
}
// submit jobs to jobservice
if err = l.submitJobs(jobDatas); err != nil {
return 0, launcherError(err)
}
return int64(len(jobDatas)), nil
}
func createJobs(allRepositories map[res.Repository]struct{},
repositoryRules map[res.Repository]*lwp.Metadata, isDryRun bool) ([]*jobData, error) {
jobDatas := []*jobData{}
for repository, policy := range repositoryRules {
jobData := &jobData{
Repository: repository,
JobName: job.Retention,
JobParams: make(map[string]interface{}, 3),
}
// set dry run
jobData.JobParams[ParamDryRun] = isDryRun
// set repository
repoJSON, err := repository.ToJSON()
if err != nil {
return nil, err
}
jobData.JobParams[ParamRepo] = repoJSON
// set retention policy
policyJSON, err := policy.ToJSON()
if err != nil {
return nil, err
}
jobData.JobParams[ParamMeta] = policyJSON
jobDatas = append(jobDatas, jobData)
}
for repository := range allRepositories {
if _, exist := repositoryRules[repository]; exist {
continue
}
jobData := &jobData{
Repository: repository,
JobName: job.RetentionDel,
JobParams: make(map[string]interface{}, 2),
}
// set dry run
jobData.JobParams[ParamDryRun] = isDryRun
// set repository
repoJSON, err := repository.ToJSON()
if err != nil {
return nil, err
}
jobData.JobParams[ParamRepo] = repoJSON
jobDatas = append(jobDatas, jobData)
}
return jobDatas, nil
}
// create task records in database
func (l *launcher) createTasks(executionID int64, jobDatas []*jobData) error {
now := time.Now()
for _, jobData := range jobDatas {
taskID, err := l.retentionMgr.CreateTask(&Task{
ExecutionID: executionID,
Repository: jobData.Repository.Name,
StartTime: now,
})
if err != nil {
return err
}
jobData.TaskID = taskID
}
return nil
}
// create task records in database
func (l *launcher) submitJobs(jobDatas []*jobData) error {
allFailed := true allFailed := true
for _, jobData := range jobDatas { for _, jobData := range jobDatas {
task := &Task{
ID: jobData.TaskID,
}
props := []string{"Status"}
j := &models.JobData{ j := &models.JobData{
Name: job.Retention, Name: jobData.JobName,
Metadata: &models.JobMetadata{ Metadata: &models.JobMetadata{
JobKind: job.KindGeneric, JobKind: job.KindGeneric,
}, },
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.taskID), StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.TaskID),
Parameters: make(map[string]interface{}, 3), Parameters: jobData.JobParams,
} }
var (
repoJSON, policyJSON string
)
// Set dry run
j.Parameters[ParamDryRun] = isDryRun
// Set repository
if repoJSON, err = jobData.repository.ToJSON(); err == nil {
j.Parameters[ParamRepo] = repoJSON
// Set retention policy
if policyJSON, err = jobData.policy.ToJSON(); err == nil {
j.Parameters[ParamMeta] = policyJSON
}
}
var jobID string
if err == nil {
// Submit job // Submit job
jobID, err = l.jobserviceClient.SubmitJob(j) jobID, err := l.jobserviceClient.SubmitJob(j)
}
task := &Task{
ID: jobData.taskID,
}
props := []string{"Status"}
if err != nil { if err != nil {
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.taskID, err))) log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.TaskID, err)))
task.Status = cmodels.JobError task.Status = cmodels.JobError
task.EndTime = time.Now() task.EndTime = time.Now()
props = append(props, "EndTime") props = append(props, "EndTime")
@ -253,11 +313,10 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
log.Errorf("failed to update the status of task %d: %v", task.ID, err) log.Errorf("failed to update the status of task %d: %v", task.ID, err)
} }
} }
if allFailed { if allFailed {
return 0, launcherError(fmt.Errorf("all tasks failed")) return launcherError(fmt.Errorf("all tasks failed"))
} }
return int64(len(jobDatas)), nil return nil
} }
func (l *launcher) Stop(executionID int64) error { func (l *launcher) Stop(executionID int64) error {

View File

@ -137,19 +137,26 @@ type launchTestSuite struct {
} }
func (l *launchTestSuite) SetupTest() { func (l *launchTestSuite) SetupTest() {
pro := &models.Project{ pro1 := &models.Project{
ProjectID: 1, ProjectID: 1,
Name: "library", Name: "library",
} }
pro2 := &models.Project{
ProjectID: 2,
Name: "test",
}
l.projectMgr = &fakeProjectManager{ l.projectMgr = &fakeProjectManager{
projects: []*models.Project{ projects: []*models.Project{
pro, pro1, pro2,
}} }}
l.repositoryMgr = &fakeRepositoryManager{ l.repositoryMgr = &fakeRepositoryManager{
imageRepositories: []*models.RepoRecord{ imageRepositories: []*models.RepoRecord{
{ {
Name: "library/image", Name: "library/image",
}, },
{
Name: "test/image",
},
}, },
chartRepositories: []*chartserver.ChartInfo{ chartRepositories: []*chartserver.ChartInfo{
{ {
@ -166,7 +173,7 @@ func (l *launchTestSuite) SetupTest() {
func (l *launchTestSuite) TestGetProjects() { func (l *launchTestSuite) TestGetProjects() {
projects, err := getProjects(l.projectMgr) projects, err := getProjects(l.projectMgr)
require.Nil(l.T(), err) require.Nil(l.T(), err)
assert.Equal(l.T(), 1, len(projects)) assert.Equal(l.T(), 2, len(projects))
assert.Equal(l.T(), int64(1), projects[0].NamespaceID) assert.Equal(l.T(), int64(1), projects[0].NamespaceID)
assert.Equal(l.T(), "library", projects[0].Namespace) assert.Equal(l.T(), "library", projects[0].Namespace)
} }
@ -174,13 +181,10 @@ func (l *launchTestSuite) TestGetProjects() {
func (l *launchTestSuite) TestGetRepositories() { func (l *launchTestSuite) TestGetRepositories() {
repositories, err := getRepositories(l.projectMgr, l.repositoryMgr, 1, true) repositories, err := getRepositories(l.projectMgr, l.repositoryMgr, 1, true)
require.Nil(l.T(), err) require.Nil(l.T(), err)
assert.Equal(l.T(), 2, len(repositories)) assert.Equal(l.T(), 3, len(repositories))
assert.Equal(l.T(), "library", repositories[0].Namespace) assert.Equal(l.T(), "library", repositories[0].Namespace)
assert.Equal(l.T(), "image", repositories[0].Repository) assert.Equal(l.T(), "image", repositories[0].Repository)
assert.Equal(l.T(), "image", repositories[0].Kind) assert.Equal(l.T(), "image", repositories[0].Kind)
assert.Equal(l.T(), "library", repositories[1].Namespace)
assert.Equal(l.T(), "chart", repositories[1].Repository)
assert.Equal(l.T(), "chart", repositories[1].Kind)
} }
func (l *launchTestSuite) TestLaunch() { func (l *launchTestSuite) TestLaunch() {
@ -224,7 +228,7 @@ func (l *launchTestSuite) TestLaunch() {
{ {
Kind: "doublestar", Kind: "doublestar",
Decoration: "nsMatches", Decoration: "nsMatches",
Pattern: "**", Pattern: "library",
}, },
}, },
"repository": { "repository": {
@ -240,7 +244,7 @@ func (l *launchTestSuite) TestLaunch() {
} }
n, err = launcher.Launch(ply, 1, false) n, err = launcher.Launch(ply, 1, false)
require.Nil(l.T(), err) require.Nil(l.T(), err)
assert.Equal(l.T(), int64(2), n) assert.Equal(l.T(), int64(3), n)
} }
func (l *launchTestSuite) TestStop() { func (l *launchTestSuite) TestStop() {

View File

@ -33,6 +33,11 @@ func (d *DumbCoreClient) DeleteImage(project, repository, tag string) error {
return nil return nil
} }
// DeleteImageRepository ...
func (d *DumbCoreClient) DeleteImageRepository(project, repository string) error {
return nil
}
// ListAllCharts ... // ListAllCharts ...
func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) { func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) {
return nil, nil return nil, nil
@ -42,3 +47,8 @@ func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserv
func (d *DumbCoreClient) DeleteChart(project, repository, version string) error { func (d *DumbCoreClient) DeleteChart(project, repository, version string) error {
return nil return nil
} }
// DeleteChartRepository ...
func (d *DumbCoreClient) DeleteChartRepository(project, repository string) error {
return nil
}