mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-11 18:38:14 +01:00
Merge pull request #8488 from ywk253100/190730_launcher
Refactor retention launcher
This commit is contained in:
commit
eb69ae98eb
@ -33,3 +33,8 @@ func (c *client) DeleteChart(project, repository, version string) error {
|
||||
url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s/%s", project, repository, version))
|
||||
return c.httpclient.Delete(url)
|
||||
}
|
||||
|
||||
func (c *client) DeleteChartRepository(project, repository string) error {
|
||||
url := c.buildURL(fmt.Sprintf("/api/chartrepo/%s/charts/%s", project, repository))
|
||||
return c.httpclient.Delete(url)
|
||||
}
|
||||
|
@ -37,12 +37,14 @@ type Client interface {
|
||||
type ImageClient interface {
|
||||
ListAllImages(project, repository string) ([]*models.TagResp, error)
|
||||
DeleteImage(project, repository, tag string) error
|
||||
DeleteImageRepository(project, repository string) error
|
||||
}
|
||||
|
||||
// ChartClient defines the methods that a chart client should implement
|
||||
type ChartClient interface {
|
||||
ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error)
|
||||
DeleteChart(project, repository, version string) error
|
||||
DeleteChartRepository(project, repository string) error
|
||||
}
|
||||
|
||||
// New returns an instance of the client which is a default implement for Client
|
||||
|
@ -33,3 +33,8 @@ func (c *client) DeleteImage(project, repository, tag string) error {
|
||||
url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s/tags/%s", project, repository, tag))
|
||||
return c.httpclient.Delete(url)
|
||||
}
|
||||
|
||||
func (c *client) DeleteImageRepository(project, repository string) error {
|
||||
url := c.buildURL(fmt.Sprintf("/api/repositories/%s/%s", project, repository))
|
||||
return c.httpclient.Delete(url)
|
||||
}
|
||||
|
@ -1,11 +1,12 @@
|
||||
package retention
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/pkg/retention/dep"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy/rule"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type ControllerTestSuite struct {
|
||||
|
@ -148,7 +148,17 @@ func (bc *basicClient) GetCandidates(repository *res.Repository) ([]*res.Candida
|
||||
|
||||
// DeleteRepository deletes the specified repository
|
||||
func (bc *basicClient) DeleteRepository(repo *res.Repository) error {
|
||||
return errors.New("not implemented")
|
||||
if repo == nil {
|
||||
return errors.New("repository is nil")
|
||||
}
|
||||
switch repo.Kind {
|
||||
case res.Image:
|
||||
return bc.coreClient.DeleteImageRepository(repo.Namespace, repo.Name)
|
||||
case res.Chart:
|
||||
return bc.coreClient.DeleteChartRepository(repo.Namespace, repo.Name)
|
||||
default:
|
||||
return fmt.Errorf("unsupported repository kind: %s", repo.Kind)
|
||||
}
|
||||
}
|
||||
|
||||
// Deletes the specified candidate
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/res/selectors/index"
|
||||
|
||||
cjob "github.com/goharbor/harbor/src/common/job"
|
||||
@ -26,7 +27,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/common/utils"
|
||||
"github.com/goharbor/harbor/src/common/utils/log"
|
||||
"github.com/goharbor/harbor/src/core/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/project"
|
||||
"github.com/goharbor/harbor/src/pkg/repository"
|
||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||
@ -82,6 +82,13 @@ func NewLauncher(projectMgr project.Manager, repositoryMgr repository.Manager,
|
||||
}
|
||||
}
|
||||
|
||||
type jobData struct {
|
||||
TaskID int64
|
||||
Repository res.Repository
|
||||
JobName string
|
||||
JobParams map[string]interface{}
|
||||
}
|
||||
|
||||
type launcher struct {
|
||||
retentionMgr Manager
|
||||
projectMgr project.Manager
|
||||
@ -91,38 +98,34 @@ type launcher struct {
|
||||
chartServerEnabled bool
|
||||
}
|
||||
|
||||
type jobData struct {
|
||||
repository res.Repository
|
||||
policy lwp.Metadata
|
||||
taskID int64
|
||||
}
|
||||
|
||||
func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool) (int64, error) {
|
||||
if ply == nil {
|
||||
return 0, launcherError(fmt.Errorf("the policy is nil"))
|
||||
}
|
||||
// no rules, return directly
|
||||
if len(ply.Rules) == 0 {
|
||||
log.Debugf("no rules for policy %d, skip", ply.ID)
|
||||
return 0, nil
|
||||
}
|
||||
scope := ply.Scope
|
||||
if scope == nil {
|
||||
return 0, launcherError(fmt.Errorf("the scope of policy is nil"))
|
||||
}
|
||||
|
||||
allRepositories := make(map[res.Repository]struct{}, 0)
|
||||
repositoryRules := make(map[res.Repository]*lwp.Metadata, 0)
|
||||
level := scope.Level
|
||||
var projectCandidates []*res.Candidate
|
||||
var allProjects []*res.Candidate
|
||||
var err error
|
||||
if level == "system" {
|
||||
// get projects
|
||||
projectCandidates, err = getProjects(l.projectMgr)
|
||||
allProjects, err = getProjects(l.projectMgr)
|
||||
if err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, rule := range ply.Rules {
|
||||
projectCandidates := allProjects
|
||||
switch level {
|
||||
case "system":
|
||||
// filter projects according to the project selectors
|
||||
@ -150,7 +153,15 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
|
||||
if err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
repositoryCandidates = append(repositoryCandidates, repositories...)
|
||||
for _, repository := range repositories {
|
||||
repo := res.Repository{
|
||||
Namespace: repository.Namespace,
|
||||
Name: repository.Repository,
|
||||
Kind: repository.Kind,
|
||||
}
|
||||
allRepositories[repo] = struct{}{}
|
||||
repositoryCandidates = append(repositoryCandidates, repository)
|
||||
}
|
||||
}
|
||||
// filter repositories according to the repository selectors
|
||||
for _, repositorySelector := range rule.ScopeSelectors["repository"] {
|
||||
@ -179,67 +190,116 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
|
||||
repositoryRules[reposit].Rules = append(repositoryRules[reposit].Rules, &rule)
|
||||
}
|
||||
}
|
||||
// no tasks need to be submitted
|
||||
if len(repositoryRules) == 0 {
|
||||
|
||||
// create job data list
|
||||
jobDatas, err := createJobs(allRepositories, repositoryRules, isDryRun)
|
||||
if err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
|
||||
// no jobs, return directly
|
||||
if len(jobDatas) == 0 {
|
||||
log.Debugf("no candidates for policy %d, skip", ply.ID)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// create task records
|
||||
jobDatas := make([]*jobData, 0)
|
||||
// create task records in database
|
||||
if err = l.createTasks(executionID, jobDatas); err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
|
||||
// submit jobs to jobservice
|
||||
if err = l.submitJobs(jobDatas); err != nil {
|
||||
return 0, launcherError(err)
|
||||
}
|
||||
|
||||
return int64(len(jobDatas)), nil
|
||||
}
|
||||
|
||||
func createJobs(allRepositories map[res.Repository]struct{},
|
||||
repositoryRules map[res.Repository]*lwp.Metadata, isDryRun bool) ([]*jobData, error) {
|
||||
jobDatas := []*jobData{}
|
||||
for repository, policy := range repositoryRules {
|
||||
jobData := &jobData{
|
||||
Repository: repository,
|
||||
JobName: job.Retention,
|
||||
JobParams: make(map[string]interface{}, 3),
|
||||
}
|
||||
// set dry run
|
||||
jobData.JobParams[ParamDryRun] = isDryRun
|
||||
// set repository
|
||||
repoJSON, err := repository.ToJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobData.JobParams[ParamRepo] = repoJSON
|
||||
// set retention policy
|
||||
policyJSON, err := policy.ToJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobData.JobParams[ParamMeta] = policyJSON
|
||||
jobDatas = append(jobDatas, jobData)
|
||||
}
|
||||
for repository := range allRepositories {
|
||||
if _, exist := repositoryRules[repository]; exist {
|
||||
continue
|
||||
}
|
||||
jobData := &jobData{
|
||||
Repository: repository,
|
||||
JobName: job.RetentionDel,
|
||||
JobParams: make(map[string]interface{}, 2),
|
||||
}
|
||||
// set dry run
|
||||
jobData.JobParams[ParamDryRun] = isDryRun
|
||||
// set repository
|
||||
repoJSON, err := repository.ToJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobData.JobParams[ParamRepo] = repoJSON
|
||||
jobDatas = append(jobDatas, jobData)
|
||||
}
|
||||
return jobDatas, nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
func (l *launcher) createTasks(executionID int64, jobDatas []*jobData) error {
|
||||
now := time.Now()
|
||||
for repo, p := range repositoryRules {
|
||||
for _, jobData := range jobDatas {
|
||||
taskID, err := l.retentionMgr.CreateTask(&Task{
|
||||
ExecutionID: executionID,
|
||||
Repository: repo.Name,
|
||||
Repository: jobData.Repository.Name,
|
||||
StartTime: now,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, launcherError(err)
|
||||
return err
|
||||
}
|
||||
jobDatas = append(jobDatas, &jobData{
|
||||
repository: repo,
|
||||
policy: *p,
|
||||
taskID: taskID,
|
||||
})
|
||||
jobData.TaskID = taskID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// create task records in database
|
||||
func (l *launcher) submitJobs(jobDatas []*jobData) error {
|
||||
allFailed := true
|
||||
for _, jobData := range jobDatas {
|
||||
task := &Task{
|
||||
ID: jobData.TaskID,
|
||||
}
|
||||
props := []string{"Status"}
|
||||
j := &models.JobData{
|
||||
Name: job.Retention,
|
||||
Name: jobData.JobName,
|
||||
Metadata: &models.JobMetadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.taskID),
|
||||
Parameters: make(map[string]interface{}, 3),
|
||||
StatusHook: fmt.Sprintf("%s/service/notifications/jobs/retention/task/%d", l.internalCoreURL, jobData.TaskID),
|
||||
Parameters: jobData.JobParams,
|
||||
}
|
||||
|
||||
var (
|
||||
repoJSON, policyJSON string
|
||||
)
|
||||
// Set dry run
|
||||
j.Parameters[ParamDryRun] = isDryRun
|
||||
// Set repository
|
||||
if repoJSON, err = jobData.repository.ToJSON(); err == nil {
|
||||
j.Parameters[ParamRepo] = repoJSON
|
||||
// Set retention policy
|
||||
if policyJSON, err = jobData.policy.ToJSON(); err == nil {
|
||||
j.Parameters[ParamMeta] = policyJSON
|
||||
}
|
||||
}
|
||||
|
||||
var jobID string
|
||||
if err == nil {
|
||||
// Submit job
|
||||
jobID, err = l.jobserviceClient.SubmitJob(j)
|
||||
}
|
||||
|
||||
task := &Task{
|
||||
ID: jobData.taskID,
|
||||
}
|
||||
props := []string{"Status"}
|
||||
// Submit job
|
||||
jobID, err := l.jobserviceClient.SubmitJob(j)
|
||||
if err != nil {
|
||||
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.taskID, err)))
|
||||
log.Error(launcherError(fmt.Errorf("failed to submit task %d: %v", jobData.TaskID, err)))
|
||||
task.Status = cmodels.JobError
|
||||
task.EndTime = time.Now()
|
||||
props = append(props, "EndTime")
|
||||
@ -253,11 +313,10 @@ func (l *launcher) Launch(ply *policy.Metadata, executionID int64, isDryRun bool
|
||||
log.Errorf("failed to update the status of task %d: %v", task.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if allFailed {
|
||||
return 0, launcherError(fmt.Errorf("all tasks failed"))
|
||||
return launcherError(fmt.Errorf("all tasks failed"))
|
||||
}
|
||||
return int64(len(jobDatas)), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *launcher) Stop(executionID int64) error {
|
||||
|
@ -137,19 +137,26 @@ type launchTestSuite struct {
|
||||
}
|
||||
|
||||
func (l *launchTestSuite) SetupTest() {
|
||||
pro := &models.Project{
|
||||
pro1 := &models.Project{
|
||||
ProjectID: 1,
|
||||
Name: "library",
|
||||
}
|
||||
pro2 := &models.Project{
|
||||
ProjectID: 2,
|
||||
Name: "test",
|
||||
}
|
||||
l.projectMgr = &fakeProjectManager{
|
||||
projects: []*models.Project{
|
||||
pro,
|
||||
pro1, pro2,
|
||||
}}
|
||||
l.repositoryMgr = &fakeRepositoryManager{
|
||||
imageRepositories: []*models.RepoRecord{
|
||||
{
|
||||
Name: "library/image",
|
||||
},
|
||||
{
|
||||
Name: "test/image",
|
||||
},
|
||||
},
|
||||
chartRepositories: []*chartserver.ChartInfo{
|
||||
{
|
||||
@ -166,7 +173,7 @@ func (l *launchTestSuite) SetupTest() {
|
||||
func (l *launchTestSuite) TestGetProjects() {
|
||||
projects, err := getProjects(l.projectMgr)
|
||||
require.Nil(l.T(), err)
|
||||
assert.Equal(l.T(), 1, len(projects))
|
||||
assert.Equal(l.T(), 2, len(projects))
|
||||
assert.Equal(l.T(), int64(1), projects[0].NamespaceID)
|
||||
assert.Equal(l.T(), "library", projects[0].Namespace)
|
||||
}
|
||||
@ -174,13 +181,10 @@ func (l *launchTestSuite) TestGetProjects() {
|
||||
func (l *launchTestSuite) TestGetRepositories() {
|
||||
repositories, err := getRepositories(l.projectMgr, l.repositoryMgr, 1, true)
|
||||
require.Nil(l.T(), err)
|
||||
assert.Equal(l.T(), 2, len(repositories))
|
||||
assert.Equal(l.T(), 3, len(repositories))
|
||||
assert.Equal(l.T(), "library", repositories[0].Namespace)
|
||||
assert.Equal(l.T(), "image", repositories[0].Repository)
|
||||
assert.Equal(l.T(), "image", repositories[0].Kind)
|
||||
assert.Equal(l.T(), "library", repositories[1].Namespace)
|
||||
assert.Equal(l.T(), "chart", repositories[1].Repository)
|
||||
assert.Equal(l.T(), "chart", repositories[1].Kind)
|
||||
}
|
||||
|
||||
func (l *launchTestSuite) TestLaunch() {
|
||||
@ -224,7 +228,7 @@ func (l *launchTestSuite) TestLaunch() {
|
||||
{
|
||||
Kind: "doublestar",
|
||||
Decoration: "nsMatches",
|
||||
Pattern: "**",
|
||||
Pattern: "library",
|
||||
},
|
||||
},
|
||||
"repository": {
|
||||
@ -240,7 +244,7 @@ func (l *launchTestSuite) TestLaunch() {
|
||||
}
|
||||
n, err = launcher.Launch(ply, 1, false)
|
||||
require.Nil(l.T(), err)
|
||||
assert.Equal(l.T(), int64(2), n)
|
||||
assert.Equal(l.T(), int64(3), n)
|
||||
}
|
||||
|
||||
func (l *launchTestSuite) TestStop() {
|
||||
|
@ -33,6 +33,11 @@ func (d *DumbCoreClient) DeleteImage(project, repository, tag string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteImageRepository ...
|
||||
func (d *DumbCoreClient) DeleteImageRepository(project, repository string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListAllCharts ...
|
||||
func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserver.ChartVersion, error) {
|
||||
return nil, nil
|
||||
@ -42,3 +47,8 @@ func (d *DumbCoreClient) ListAllCharts(project, repository string) ([]*chartserv
|
||||
func (d *DumbCoreClient) DeleteChart(project, repository, version string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteChartRepository ...
|
||||
func (d *DumbCoreClient) DeleteChartRepository(project, repository string) error {
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user