Merge pull request #12549 from chlins/fix/jfrog-replication

fix(replication): fix jfrog replication when filter includes multi im…
This commit is contained in:
Wenkai Yin(尹文开) 2020-07-24 16:50:36 +08:00 committed by GitHub
commit 6df1f8ae5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 434 additions and 73 deletions

View File

@ -1,24 +1,41 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jfrog
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/goharbor/harbor/src/pkg/registry/auth/basic"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"github.com/goharbor/harbor/src/pkg/registry/auth/basic"
"github.com/goharbor/harbor/src/pkg/registry"
"github.com/goharbor/harbor/src/replication/filter"
"github.com/goharbor/harbor/src/replication/util"
"github.com/goharbor/harbor/src/common/utils"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier"
"github.com/goharbor/harbor/src/lib/log"
adp "github.com/goharbor/harbor/src/replication/adapter"
"github.com/goharbor/harbor/src/replication/adapter/native"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/util"
)
func init() {
@ -52,7 +69,7 @@ var (
type adapter struct {
*native.Adapter
registry *model.Registry
client *common_http.Client
client *client
}
var _ adp.Adapter = (*adapter)(nil)
@ -83,24 +100,10 @@ func (a *adapter) Info() (info *model.RegistryInfo, err error) {
}
func newAdapter(registry *model.Registry) (adp.Adapter, error) {
var (
modifiers = []modifier.Modifier{}
)
if registry.Credential != nil {
modifiers = append(modifiers, basic.NewAuthorizer(
registry.Credential.AccessKey,
registry.Credential.AccessSecret))
}
return &adapter{
Adapter: native.NewAdapter(registry),
registry: registry,
client: common_http.NewClient(
&http.Client{
Transport: util.GetHTTPTransport(registry.Insecure),
},
modifiers...,
),
client: newClient(registry),
}, nil
}
@ -127,7 +130,7 @@ func (a *adapter) PrepareForPush(resources []*model.Resource) error {
}
}
repositories, err := a.getLocalRepositories()
repositories, err := a.client.getDockerRepositories()
if err != nil {
log.Errorf("Get local repositories error: %v", err)
return err
@ -142,7 +145,7 @@ func (a *adapter) PrepareForPush(resources []*model.Resource) error {
if _, ok := existedRepositories[namespace]; ok {
log.Debugf("Namespace %s already existed in remote, skip create it", namespace)
} else {
err = a.createNamespace(namespace)
err = a.client.createDockerRepository(namespace)
if err != nil {
log.Errorf("Create Namespace %s error: %v", namespace, err)
return err
@ -153,66 +156,128 @@ func (a *adapter) PrepareForPush(resources []*model.Resource) error {
return nil
}
func (a *adapter) getLocalRepositories() ([]*repository, error) {
var repositories []*repository
url := fmt.Sprintf("%s/artifactory/api/repositories?type=local&packageType=docker", a.registry.URL)
req, err := http.NewRequest(http.MethodGet, url, nil)
// FetchArtifacts fetches artifacts from jfrog
func (a *adapter) FetchArtifacts(filters []*model.Filter) ([]*model.Resource, error) {
repositories, err := a.listRepositories(filters)
if err != nil {
return repositories, err
return nil, err
}
if len(repositories) == 0 {
return nil, nil
}
resp, err := a.client.Do(req)
var rawResources = make([]*model.Resource, len(repositories))
runner := utils.NewLimitedConcurrentRunner(adp.MaxConcurrency)
defer runner.Cancel()
for i, r := range repositories {
index := i
repo := r
runner.AddTask(func() error {
artifacts, err := a.listArtifacts(repo.Name, filters)
if err != nil {
return repositories, err
return fmt.Errorf("failed to list artifacts of repository %s: %v", repo.Name, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return repositories, err
}
err = json.Unmarshal(body, &repositories)
return repositories, err
}
// create repository with docker local type
// this operation needs admin
func (a *adapter) createNamespace(namespace string) error {
ns := newDefaultDockerLocalRepository(namespace)
body, err := json.Marshal(ns)
if err != nil {
return err
}
url := fmt.Sprintf("%s/artifactory/api/repositories/%s", a.registry.URL, namespace)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if len(artifacts) == 0 {
return nil
}
rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: repo.Name,
},
Artifacts: artifacts,
},
}
b, err := ioutil.ReadAll(resp.Body)
return nil
})
}
runner.Wait()
if runner.IsCancelled() {
return nil, fmt.Errorf("FetchArtifacts error when collect tags for repos")
}
var resources []*model.Resource
for _, r := range rawResources {
if r != nil {
resources = append(resources, r)
}
}
return resources, nil
}
// listRepositories lists repositories from jfrog
func (a *adapter) listRepositories(filters []*model.Filter) ([]*model.Repository, error) {
pattern := ""
for _, filter := range filters {
if filter.Type == model.FilterTypeName {
pattern = filter.Value.(string)
break
}
}
var repositories []string
// if the pattern of repository name filter is a specific repository name, just returns
// the parsed repositories and will check the existence later when filtering the tags
if paths, ok := util.IsSpecificPath(pattern); ok {
repositories = paths
} else {
// search repositories from catalog API
dockerRepos, err := a.client.getDockerRepositories()
if err != nil {
return err
return nil, err
}
return &common_http.Error{
Code: resp.StatusCode,
Message: string(b),
for _, docker := range dockerRepos {
url := fmt.Sprintf("%s/artifactory/api/docker/%s", a.client.url, docker.Key)
regClient := registry.NewClientWithAuthorizer(url, basic.NewAuthorizer(a.client.username, a.client.password), a.client.insecure)
repos, err := regClient.Catalog()
if err != nil {
return nil, err
}
for _, repo := range repos {
repositories = append(repositories, fmt.Sprintf("%s/%s", docker.Key, repo))
}
}
}
var result []*model.Repository
for _, repository := range repositories {
result = append(result, &model.Repository{
Name: repository,
})
}
return filter.DoFilterRepositories(result, filters)
}
// listArtifacts lists one repository tags
func (a *adapter) listArtifacts(repository string, filters []*model.Filter) ([]*model.Artifact, error) {
// split docker registry name and repo name
key, repoName := "", ""
s := strings.Split(repository, "/")
if len(s) > 1 {
key = s[0]
repoName = strings.Join(s[1:], "/")
}
url := fmt.Sprintf("%s/artifactory/api/docker/%s", a.client.url, key)
regClient := registry.NewClientWithAuthorizer(url, basic.NewAuthorizer(a.client.username, a.client.password), a.client.insecure)
tags, err := regClient.ListTags(repoName)
if err != nil {
return nil, err
}
var artifacts []*model.Artifact
for _, tag := range tags {
artifacts = append(artifacts, &model.Artifact{
Tags: []string{tag},
})
}
return filter.DoFilterArtifacts(artifacts, filters)
}
// PushBlob can not use naive PushBlob due to MonolithicUpload, Jfrog now just support push by chunk
@ -233,7 +298,7 @@ func (a *adapter) PushBlob(repository, digest string, size int64, blob io.Reader
req.Header.Set("Content-Range", fmt.Sprintf("0-%s", rangeSize))
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := a.client.Do(req)
resp, err := a.client.client.Do(req)
if err != nil {
return err
}
@ -263,7 +328,7 @@ func (a *adapter) preparePushBlob(repository string) (string, error) {
}
req.Header.Set(http.CanonicalHeaderKey("Content-Length"), "0")
resp, err := a.client.Do(req)
resp, err := a.client.client.Do(req)
if err != nil {
return "", err
}
@ -294,7 +359,7 @@ func (a *adapter) ackPushBlob(repository, digest, location, size string) error {
return err
}
resp, err := a.client.Do(req)
resp, err := a.client.client.Do(req)
if err != nil {
return err
}

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jfrog
import (
@ -52,6 +66,41 @@ func getMockAdapter(t *testing.T, hasCred, health bool) (*adapter, *httptest.Ser
w.WriteHeader(http.StatusOK)
},
},
&test.RequestHandlerMapping{
Method: http.MethodGet,
Pattern: fmt.Sprintf("/artifactory/api/docker/%s/v2/_catalog", "cyzhang"),
Handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"repositories": []}`))
},
},
&test.RequestHandlerMapping{
Method: http.MethodGet,
Pattern: fmt.Sprintf("/artifactory/api/docker/%s/v2/_catalog", fakeNamespace),
Handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"repositories": [
"nginx"
]
}`))
},
},
&test.RequestHandlerMapping{
Method: http.MethodGet,
Pattern: fmt.Sprintf("/artifactory/api/docker/%s/v2/%s/tags/list", fakeNamespace, "nginx"),
Handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"name": "nginx",
"tags": [
"latest",
"v1",
"v2"
]
}`))
},
},
&test.RequestHandlerMapping{
Method: http.MethodPost,
Pattern: fmt.Sprintf("/v2/%s/blobs/uploads/", fakeRepository),
@ -130,3 +179,25 @@ func TestAdapter_PushBlob(t *testing.T) {
err := a.PushBlob(fakeRepository, fakeDigest, 20, bytes.NewReader([]byte("test")))
assert.Nil(t, err)
}
func TestAdapter_FetchArtifacts(t *testing.T) {
a, s := getMockAdapter(t, true, true)
defer s.Close()
filters := []*model.Filter{
{
Type: model.FilterTypeName,
Value: "mydocker/**",
},
{
Type: model.FilterTypeTag,
Value: "v1",
},
}
res, err := a.FetchArtifacts(filters)
assert.Nil(t, err)
assert.Len(t, res, 1)
assert.Equal(t, "mydocker/nginx", res[0].Metadata.Repository.Name)
assert.Len(t, res[0].Metadata.Artifacts, 1)
assert.Equal(t, "v1", res[0].Metadata.Artifacts[0].Tags[0])
}

View File

@ -0,0 +1,122 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jfrog
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/pkg/registry/auth/basic"
"github.com/goharbor/harbor/src/replication/model"
"github.com/goharbor/harbor/src/replication/util"
)
// client is a client to interact with Jfrog
type client struct {
// client is a client to access jfrog
client *common_http.Client
url string
insecure bool
username string
password string
}
// newClient constructs a jfrog client
func newClient(reg *model.Registry) *client {
username, password := "", ""
if reg.Credential != nil {
username = reg.Credential.AccessKey
password = reg.Credential.AccessSecret
}
return &client{
client: common_http.NewClient(
&http.Client{
Transport: util.GetHTTPTransport(reg.Insecure),
},
basic.NewAuthorizer(username, password),
),
url: reg.URL,
insecure: reg.Insecure,
username: username,
password: password,
}
}
// getDockerRepositories gets docker repositories from jfrog
func (c *client) getDockerRepositories() ([]*repository, error) {
var repositories []*repository
url := fmt.Sprintf("%s/artifactory/api/repositories?packageType=docker", c.url)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return repositories, err
}
resp, err := c.client.Do(req)
if err != nil {
return repositories, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return repositories, err
}
err = json.Unmarshal(body, &repositories)
return repositories, err
}
// createDockerRepository creates docker repository on jfrog
func (c *client) createDockerRepository(name string) error {
ns := newDefaultDockerLocalRepository(name)
body, err := json.Marshal(ns)
if err != nil {
return err
}
url := fmt.Sprintf("%s/artifactory/api/repositories/%s", c.url, name)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return &common_http.Error{
Code: resp.StatusCode,
Message: string(b),
}
}

View File

@ -0,0 +1,89 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jfrog
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/goharbor/harbor/src/replication/model"
"github.com/stretchr/testify/suite"
)
type clientTestSuite struct {
suite.Suite
client *client
mockServer *httptest.Server
}
func TestClientTestSuite(t *testing.T) {
suite.Run(t, &clientTestSuite{})
}
func (c *clientTestSuite) SetupSuite() {
c.mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case "/artifactory/api/repositories?packageType=docker":
if r.Method == http.MethodGet {
w.Write([]byte(`[
{
"key": "repo1",
"description": "",
"type": "LOCAL",
"url": "http://49.4.2.82:8081/artifactory/repo1",
"packageType": "Docker"
},
{
"key": "mydocker",
"type": "LOCAL",
"url": "http://49.4.2.82:8081/artifactory/mydocker",
"packageType": "Docker"
}
]`))
return
}
w.WriteHeader(http.StatusNotImplemented)
case "/artifactory/api/repositories/test":
if r.Method == http.MethodPut {
w.WriteHeader(200)
return
}
w.WriteHeader(http.StatusNotImplemented)
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))
c.client = newClient(&model.Registry{URL: c.mockServer.URL})
}
func (c *clientTestSuite) TearDownSuite() {
c.mockServer.Close()
}
func (c *clientTestSuite) TestGetDockerRepositories() {
repos, err := c.client.getDockerRepositories()
c.NoError(err)
c.Len(repos, 2)
c.Equal("repo1", repos[0].Key)
}
func (c *clientTestSuite) TestCreateDockerRepository() {
err := c.client.createDockerRepository("test")
c.NoError(err)
}

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package jfrog
type repository struct {