mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-23 02:35:17 +01:00
fix: skip replication to proxy cache project
Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
parent
70155172ad
commit
b1afd2efb0
@ -20,6 +20,7 @@ import (
|
||||
|
||||
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
"github.com/goharbor/harbor/src/pkg/reg/model"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
@ -103,12 +104,29 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
|
||||
}
|
||||
|
||||
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32) error {
|
||||
for i, resource := range srcResources {
|
||||
src, err := json.Marshal(resource)
|
||||
var taskCnt int
|
||||
defer func() {
|
||||
// if no task be created, mark execution done.
|
||||
if taskCnt == 0 {
|
||||
if err := c.executionMgr.MarkDone(ctx, c.executionID, "no resources need to be replicated"); err != nil {
|
||||
logger.Errorf("failed to mark done for the execution %d: %v", c.executionID, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i, srcResource := range srcResources {
|
||||
dstResource := dstResources[i]
|
||||
// if dest resource should be skipped, ignore replicate.
|
||||
if dstResource.Skip {
|
||||
log.Warningf("skip create replication task because of dest limitation, src: %s, dst: %s", srcResource.Metadata, dstResource.Metadata)
|
||||
continue
|
||||
}
|
||||
|
||||
src, err := json.Marshal(srcResource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dest, err := json.Marshal(dstResources[i])
|
||||
dest, err := json.Marshal(dstResource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -127,11 +145,13 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
|
||||
|
||||
if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
|
||||
"operation": "copy",
|
||||
"resource_type": string(resource.Type),
|
||||
"source_resource": getResourceName(resource),
|
||||
"destination_resource": getResourceName(dstResources[i])}); err != nil {
|
||||
"resource_type": string(srcResource.Type),
|
||||
"source_resource": getResourceName(srcResource),
|
||||
"destination_resource": getResourceName(dstResource)}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
taskCnt++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -51,6 +51,17 @@ func (c *copyFlowTestSuite) TestRun() {
|
||||
},
|
||||
Override: false,
|
||||
},
|
||||
{
|
||||
Type: model.ResourceTypeArtifact,
|
||||
Metadata: &model.ResourceMetadata{
|
||||
Repository: &model.Repository{
|
||||
Name: "proxy/hello-world",
|
||||
},
|
||||
Vtags: []string{"latest"},
|
||||
},
|
||||
Override: false,
|
||||
Skip: true,
|
||||
},
|
||||
}, nil)
|
||||
adp.On("PrepareForPush", mock.Anything).Return(nil)
|
||||
|
||||
@ -60,7 +71,7 @@ func (c *copyFlowTestSuite) TestRun() {
|
||||
}, nil)
|
||||
|
||||
taskMgr := &testingTask.Manager{}
|
||||
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||
taskMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil).Once()
|
||||
policy := &repctlmodel.Policy{
|
||||
SrcRegistry: &model.Registry{
|
||||
Type: "TEST_FOR_COPY_FLOW",
|
||||
|
@ -16,10 +16,11 @@ package flow
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
|
||||
repctlmodel "github.com/goharbor/harbor/src/controller/replication/model"
|
||||
"github.com/goharbor/harbor/src/lib/log"
|
||||
adp "github.com/goharbor/harbor/src/pkg/reg/adapter"
|
||||
@ -138,6 +139,7 @@ func assembleDestinationResources(resources []*model.Resource,
|
||||
Deleted: resource.Deleted,
|
||||
IsDeleteTag: resource.IsDeleteTag,
|
||||
Override: policy.Override,
|
||||
Skip: resource.Skip,
|
||||
}
|
||||
res.Metadata = &model.ResourceMetadata{
|
||||
Repository: &model.Repository{
|
||||
|
@ -182,15 +182,19 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
|
||||
return errors.Wrapf(err, "list projects with query %s", q)
|
||||
}
|
||||
|
||||
existProjects := make(map[string]*Project)
|
||||
proxyCacheProjects := make(map[string]bool)
|
||||
existProjects := make(map[string]bool)
|
||||
for _, p := range queryProjects {
|
||||
existProjects[p.Name] = p
|
||||
existProjects[p.Name] = true
|
||||
// if project with registry_id, that means this is a proxy cache project.
|
||||
if p.RegistryID > 0 {
|
||||
proxyCacheProjects[p.Name] = true
|
||||
}
|
||||
}
|
||||
|
||||
var notExistProjects []*Project
|
||||
for _, p := range projects {
|
||||
_, exist := existProjects[p.Name]
|
||||
if !exist {
|
||||
if !existProjects[p.Name] {
|
||||
notExistProjects = append(notExistProjects, p)
|
||||
}
|
||||
}
|
||||
@ -205,6 +209,17 @@ func (a *Adapter) PrepareForPush(resources []*model.Resource) error {
|
||||
}
|
||||
log.Debugf("project %s created", project.Name)
|
||||
}
|
||||
|
||||
// do filter for proxy cache projects.
|
||||
for _, res := range resources {
|
||||
paths := strings.Split(res.Metadata.Repository.Name, "/")
|
||||
projectName := paths[0]
|
||||
if proxyCacheProjects[projectName] {
|
||||
// set resource skip flag to true if it's a proxy cache project.
|
||||
res.Skip = true
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -298,6 +313,7 @@ type Project struct {
|
||||
ID int64 `json:"project_id"`
|
||||
Name string `json:"name"`
|
||||
Metadata map[string]interface{} `json:"metadata"`
|
||||
RegistryID int64 `json:"registry_id"`
|
||||
}
|
||||
|
||||
func isLocalHarbor(url string) bool {
|
||||
|
@ -170,6 +170,33 @@ func TestPrepareForPush(t *testing.T) {
|
||||
},
|
||||
})
|
||||
require.Nil(t, err)
|
||||
|
||||
// project already exists and the type is proxy cache
|
||||
server = test.NewServer(&test.RequestHandlerMapping{
|
||||
Method: http.MethodGet,
|
||||
Pattern: "/api/projects",
|
||||
Handler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`[{"name": "library", "registry_id": 1}]`))
|
||||
},
|
||||
})
|
||||
registry = &model.Registry{
|
||||
URL: server.URL,
|
||||
}
|
||||
adapter, err = New(registry)
|
||||
require.Nil(t, err)
|
||||
resources := []*model.Resource{
|
||||
{
|
||||
Metadata: &model.ResourceMetadata{
|
||||
Repository: &model.Repository{
|
||||
Name: "library/hello-world",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = adapter.PrepareForPush(resources)
|
||||
require.Nil(t, err)
|
||||
require.True(t, resources[0].Skip)
|
||||
}
|
||||
|
||||
func TestParsePublic(t *testing.T) {
|
||||
|
@ -14,6 +14,11 @@
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// the resource type
|
||||
const (
|
||||
ResourceTypeArtifact = "artifact"
|
||||
@ -33,6 +38,9 @@ type Resource struct {
|
||||
IsDeleteTag bool `json:"is_delete_tag"`
|
||||
// indicate whether the resource can be overridden
|
||||
Override bool `json:"override"`
|
||||
// Skip is a flag for resource which satisfies replication rules but should
|
||||
// be skipped because of other limits like when dest project's type is proxy cache.
|
||||
Skip bool `json:"-"`
|
||||
}
|
||||
|
||||
// ResourceMetadata of resource
|
||||
@ -55,3 +63,12 @@ type Artifact struct {
|
||||
Labels []string `json:"labels"`
|
||||
Tags []string `json:"tags"`
|
||||
}
|
||||
|
||||
func (r *ResourceMetadata) String() string {
|
||||
data, err := json.Marshal(r)
|
||||
if err == nil {
|
||||
return string(data)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("repository: %+v, artifacts: %+v, tags: %+v", r.Repository, r.Artifacts, r.Vtags)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user