Merge pull request #7178 from ywk253100/190315_chart

Replicate helm charts
This commit is contained in:
Wenkai Yin 2019-03-20 14:29:35 +08:00 committed by GitHub
commit bb76a4d97d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 531 additions and 70 deletions

View File

@ -95,12 +95,16 @@ func (c *Client) Head(url string) error {
func (c *Client) Post(url string, v ...interface{}) error {
var reader io.Reader
if len(v) > 0 {
data, err := json.Marshal(v[0])
if err != nil {
return err
}
if r, ok := v[0].(io.Reader); ok {
reader = r
} else {
data, err := json.Marshal(v[0])
if err != nil {
return err
}
reader = bytes.NewReader(data)
reader = bytes.NewReader(data)
}
}
req, err := http.NewRequest(http.MethodPost, url, reader)

View File

@ -13,3 +13,18 @@
// limitations under the License.
package adapter
import (
"io"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// ChartRegistry defines the capabilities that a chart registry should have
type ChartRegistry interface {
FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error)
ChartExist(name, version string) (bool, error)
DownloadChart(name, version string) (io.ReadCloser, error)
UploadChart(name, version string, chart io.Reader) error
DeleteChart(name, version string) error
}

View File

@ -28,11 +28,14 @@ import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
// TODO add UT
func init() {
// TODO add more information to the info
info := &adp.Info{
Type: model.RegistryTypeHarbor,
SupportedResourceTypes: []model.ResourceType{model.ResourceTypeRepository},
Type: model.RegistryTypeHarbor,
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository, model.ResourceTypeChart},
}
// TODO passing coreServiceURL and tokenServiceURL
coreServiceURL := "http://core:8080"
@ -141,59 +144,12 @@ func (a *adapter) GetNamespace(namespace string) (*model.Namespace, error) {
}, nil
}
// TODO implement filter
func (a *adapter) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
resources := []*model.Resource{}
for _, namespace := range namespaces {
project, err := a.getProject(namespace)
if err != nil {
return nil, err
}
repositories := []*repository{}
url := fmt.Sprintf("%s/api/repositories?project_id=%d", a.coreServiceURL, project.ID)
if err = a.client.Get(url, &repositories); err != nil {
return nil, err
}
for _, repository := range repositories {
url := fmt.Sprintf("%s/api/repositories/%s/tags", a.coreServiceURL, repository.Name)
tags := []*tag{}
if err = a.client.Get(url, &tags); err != nil {
return nil, err
}
vtags := []string{}
for _, tag := range tags {
vtags = append(vtags, tag.Name)
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeRepository,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Namespace: namespace,
Name: repository.Name,
Vtags: vtags,
},
})
}
}
return resources, nil
}
type project struct {
ID int64 `json:"project_id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
}
type repository struct {
Name string `json:"name"`
}
type tag struct {
Name string `json:"name"`
}
func (a *adapter) getProject(name string) (*project, error) {
// TODO need an API to exact match project by name
projects := []*project{}

View File

@ -0,0 +1,199 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package harbor
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"strings"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// TODO review the logic in this file
type chart struct {
Name string `json:"name"`
}
type chartVersion struct {
Name string `json:"name"`
Version string `json:"version"`
// TODO handle system/project level labels
// Labels string `json:"labels"`
}
type chartVersionDetail struct {
Metadata *chartVersionMetadata `json:"metadata"`
}
type chartVersionMetadata struct {
URLs []string `json:"urls"`
}
func (a *adapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
resources := []*model.Resource{}
for _, namespace := range namespaces {
url := fmt.Sprintf("%s/api/chartrepo/%s/charts", a.coreServiceURL, namespace)
charts := []*chart{}
if err := a.client.Get(url, &charts); err != nil {
return nil, err
}
for _, chart := range charts {
url := fmt.Sprintf("%s/api/chartrepo/%s/charts/%s", a.coreServiceURL, namespace, chart.Name)
chartVersions := []*chartVersion{}
if err := a.client.Get(url, &chartVersions); err != nil {
return nil, err
}
for _, version := range chartVersions {
resources = append(resources, &model.Resource{
Type: model.ResourceTypeChart,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Namespace: namespace,
Name: fmt.Sprintf("%s/%s", namespace, chart.Name),
Vtags: []string{version.Version},
},
})
}
}
}
return resources, nil
}
func (a *adapter) ChartExist(name, version string) (bool, error) {
_, err := a.getChartInfo(name, version)
if err == nil {
return true, nil
}
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusNotFound {
return false, nil
}
// TODO this is a workaround for https://github.com/goharbor/harbor/issues/7171
if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusInternalServerError {
if strings.Contains(httpErr.Message, "no chart name found") ||
strings.Contains(httpErr.Message, "No chart version found") {
return false, nil
}
}
return false, err
}
func (a *adapter) getChartInfo(name, version string) (*chartVersionDetail, error) {
project, name, err := parseChartName(name)
if err != nil {
return nil, err
}
url := fmt.Sprintf("%s/api/chartrepo/%s/charts/%s/%s", a.coreServiceURL, project, name, version)
info := &chartVersionDetail{}
if err = a.client.Get(url, info); err != nil {
return nil, err
}
return info, nil
}
func (a *adapter) DownloadChart(name, version string) (io.ReadCloser, error) {
info, err := a.getChartInfo(name, version)
if err != nil {
return nil, err
}
if info.Metadata == nil || len(info.Metadata.URLs) == 0 || len(info.Metadata.URLs[0]) == 0 {
return nil, fmt.Errorf("cannot got the download url for chart %s:%s", name, version)
}
url := strings.ToLower(info.Metadata.URLs[0])
// relative URL
if !(strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://")) {
project, _, err := parseChartName(name)
if err != nil {
return nil, err
}
url = fmt.Sprintf("%s/chartrepo/%s/%s", a.coreServiceURL, project, url)
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}
return resp.Body, nil
}
func (a *adapter) UploadChart(name, version string, chart io.Reader) error {
project, name, err := parseChartName(name)
if err != nil {
return err
}
buf := &bytes.Buffer{}
w := multipart.NewWriter(buf)
fw, err := w.CreateFormFile("chart", name+".tgz")
if err != nil {
return err
}
if _, err = io.Copy(fw, chart); err != nil {
return err
}
w.Close()
url := fmt.Sprintf("%s/api/chartrepo/%s/charts", a.coreServiceURL, project)
req, err := http.NewRequest(http.MethodPost, url, buf)
if err != nil {
return err
}
req.Header.Set("Content-Type", w.FormDataContentType())
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return &common_http.Error{
Code: resp.StatusCode,
Message: string(data),
}
}
return nil
}
func (a *adapter) DeleteChart(name, version string) error {
project, name, err := parseChartName(name)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/chartrepo/%s/charts/%s/%s", a.coreServiceURL, project, name, version)
return a.client.Delete(url)
}
func parseChartName(name string) (string, string, error) {
strs := strings.Split(name, "/")
if len(strs) == 2 && len(strs[0]) > 0 && len(strs[1]) > 0 {
return strs[0], strs[1], nil
}
return "", "", fmt.Errorf("invalid chart name format: %s", name)
}

View File

@ -0,0 +1,68 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package harbor
import (
"fmt"
"github.com/goharbor/harbor/src/replication/ng/model"
)
type repository struct {
Name string `json:"name"`
}
type tag struct {
Name string `json:"name"`
}
// TODO implement filter
func (a *adapter) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
resources := []*model.Resource{}
for _, namespace := range namespaces {
project, err := a.getProject(namespace)
if err != nil {
return nil, err
}
repositories := []*repository{}
url := fmt.Sprintf("%s/api/repositories?project_id=%d", a.coreServiceURL, project.ID)
if err = a.client.Get(url, &repositories); err != nil {
return nil, err
}
for _, repository := range repositories {
url := fmt.Sprintf("%s/api/repositories/%s/tags", a.coreServiceURL, repository.Name)
tags := []*tag{}
if err = a.client.Get(url, &tags); err != nil {
return nil, err
}
vtags := []string{}
for _, tag := range tags {
vtags = append(vtags, tag.Name)
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeRepository,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Namespace: namespace,
Name: repository.Name,
Vtags: vtags,
},
})
}
}
return resources, nil
}

View File

@ -85,7 +85,9 @@ func (f *fakedRegistryManager) HealthCheck() error {
return nil
}
type fakedExecutionManager struct{}
type fakedExecutionManager struct {
taskID int64
}
func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) {
return 1, nil
@ -106,7 +108,9 @@ func (f *fakedExecutionManager) RemoveAll(int64) error {
return nil
}
func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) {
return 1, nil
f.taskID++
id := f.taskID
return id, nil
}
func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
return 0, nil, nil
@ -203,13 +207,40 @@ func (f *fakedAdapter) PullBlob(repository, digest string) (size int64, blob io.
func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
func (f *fakedAdapter) FetchCharts(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
return []*model.Resource{
{
Type: model.ResourceTypeChart,
Metadata: &model.ResourceMetadata{
Name: "library/harbor",
Namespace: "library",
Vtags: []string{"0.2.0"},
},
},
}, nil
}
func (f *fakedAdapter) ChartExist(name, version string) (bool, error) {
return false, nil
}
func (f *fakedAdapter) DownloadChart(name, version string) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakedAdapter) UploadChart(name, version string, chart io.Reader) error {
return nil
}
func (f *fakedAdapter) DeleteChart(name, version string) error {
return nil
}
func TestStartReplication(t *testing.T) {
config.InitWithSettings(nil)
err := adapter.RegisterFactory(
&adapter.Info{
Type: "faked_registry",
SupportedResourceTypes: []model.ResourceType{"repository"},
Type: "faked_registry",
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
model.ResourceTypeChart,
},
}, fakedAdapterFactory)
require.Nil(t, err)

View File

@ -154,6 +154,8 @@ func (f *flow) fetchResources() error {
// TODO consider whether the logic can be refactored by using reflect
srcResources := []*model.Resource{}
for _, typ := range resTypes {
log.Debugf("fetching %s...", typ)
// images
if typ == model.ResourceTypeRepository {
reg, ok := f.srcAdapter.(adapter.ImageRegistry)
if !ok {
@ -169,7 +171,22 @@ func (f *flow) fetchResources() error {
srcResources = append(srcResources, res...)
continue
}
// TODO add support for chart
// charts
if typ == model.ResourceTypeChart {
reg, ok := f.srcAdapter.(adapter.ChartRegistry)
if !ok {
err := fmt.Errorf("the adapter doesn't implement the ChartRegistry interface")
f.markExecutionFailure(err)
return err
}
res, err := reg.FetchCharts(f.policy.SrcNamespaces, filters)
if err != nil {
f.markExecutionFailure(err)
return err
}
srcResources = append(srcResources, res...)
continue
}
}
dstResources := []*model.Resource{}
@ -305,7 +322,7 @@ func (f *flow) schedule() error {
// if the task is failed to be submitted, update the status of the
// task as failure
if result.Error != nil {
log.Errorf("failed to schedule task %d: %v", result.TaskID, err)
log.Errorf("failed to schedule task %d: %v", result.TaskID, result.Error)
if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil {
log.Errorf("failed to update task status %d: %v", result.TaskID, err)
}

View File

@ -13,3 +13,174 @@
// limitations under the License.
package chart
import (
"errors"
"github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/replication/ng/model"
trans "github.com/goharbor/harbor/src/replication/ng/transfer"
)
var (
jobStoppedErr = errs.JobStoppedError()
)
func init() {
if err := trans.RegisterFactory(model.ResourceTypeChart, factory); err != nil {
log.Errorf("failed to register transfer factory: %v", err)
}
}
func factory(logger trans.Logger, stopFunc trans.StopFunc) (trans.Transfer, error) {
return &transfer{
logger: logger,
isStopped: stopFunc,
}, nil
}
type chart struct {
name string
version string
}
type transfer struct {
logger trans.Logger
isStopped trans.StopFunc
src adapter.ChartRegistry
dst adapter.ChartRegistry
}
func (t *transfer) Transfer(src *model.Resource, dst *model.Resource) error {
// initialize
if err := t.initialize(src, dst); err != nil {
return err
}
// delete the chart on destination registry
if dst.Deleted {
return t.delete(&chart{
name: dst.Metadata.Name,
version: dst.Metadata.Vtags[0],
})
}
srcChart := &chart{
name: src.Metadata.Name,
version: src.Metadata.Vtags[0],
}
dstChart := &chart{
name: dst.Metadata.Name,
version: dst.Metadata.Vtags[0],
}
// copy the chart from source registry to the destination
return t.copy(srcChart, dstChart, dst.Override)
}
func (t *transfer) initialize(src, dst *model.Resource) error {
if t.shouldStop() {
return jobStoppedErr
}
// create client for source registry
srcReg, err := createRegistry(src.Registry)
if err != nil {
t.logger.Errorf("failed to create client for source registry: %v", err)
return err
}
t.src = srcReg
t.logger.Infof("client for source registry [type: %s, URL: %s, insecure: %v] created",
src.Registry.Type, src.Registry.URL, src.Registry.Insecure)
// create client for destination registry
dstReg, err := createRegistry(dst.Registry)
if err != nil {
t.logger.Errorf("failed to create client for destination registry: %v", err)
return err
}
t.dst = dstReg
t.logger.Infof("client for destination registry [type: %s, URL: %s, insecure: %v] created",
dst.Registry.Type, dst.Registry.URL, dst.Registry.Insecure)
return nil
}
func createRegistry(reg *model.Registry) (adapter.ChartRegistry, error) {
factory, err := adapter.GetFactory(reg.Type)
if err != nil {
return nil, err
}
ad, err := factory(reg)
if err != nil {
return nil, err
}
registry, ok := ad.(adapter.ChartRegistry)
if !ok {
return nil, errors.New("the adapter doesn't implement the \"ChartRegistry\" interface")
}
return registry, nil
}
func (t *transfer) shouldStop() bool {
isStopped := t.isStopped()
if isStopped {
t.logger.Info("the job is stopped")
}
return isStopped
}
func (t *transfer) copy(src, dst *chart, override bool) error {
if t.shouldStop() {
return jobStoppedErr
}
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
src.name, src.version, dst.name, dst.version)
// check the existence of the chart on the destination registry
exist, err := t.dst.ChartExist(dst.name, dst.version)
if err != nil {
t.logger.Errorf("failed to check the existence of chart %s:%s on the destination registry: %v", dst.name, dst.version, err)
return err
}
if exist {
// the same name chart exists, but not allowed to override
if !override {
t.logger.Warningf("the same name chart %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dst.name, dst.version)
return nil
}
// the same name chart exists, but allowed to override
t.logger.Warningf("the same name chart %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
dst.name, dst.version)
}
// copy the chart between the source and destination registries
chart, err := t.src.DownloadChart(src.name, src.version)
if err != nil {
t.logger.Errorf("failed to download the chart %s:%s: %v", src.name, src.version, err)
return err
}
defer chart.Close()
if err = t.dst.UploadChart(dst.name, dst.version, chart); err != nil {
t.logger.Errorf("failed to upload the chart %s:%s: %v", dst.name, dst.version, err)
return err
}
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
src.name, src.version, dst.name, dst.version)
return nil
}
func (t *transfer) delete(chart *chart) error {
t.logger.Infof("deleting the chart %s:%s on the destination registry...", chart.name, chart.version)
if err := t.dst.DeleteChart(chart.name, chart.version); err != nil {
t.logger.Errorf("failed to delete the chart %s:%s on the destination registry", chart.name, chart.version)
return err
}
t.logger.Infof("delete the chart %s:%s on the destination registry completed", chart.name, chart.version)
return nil
}

View File

@ -31,16 +31,16 @@ import (
"github.com/stretchr/testify/require"
)
type fakeRregistry struct{}
type fakeRegistry struct{}
func (f *fakeRregistry) FetchImages([]string, []*model.Filter) ([]*model.Resource, error) {
func (f *fakeRegistry) FetchImages([]string, []*model.Filter) ([]*model.Resource, error) {
return nil, nil
}
func (f *fakeRregistry) ManifestExist(repository, reference string) (bool, string, error) {
func (f *fakeRegistry) ManifestExist(repository, reference string) (bool, string, error) {
return false, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
func (f *fakeRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) {
manifest := `{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
@ -75,17 +75,17 @@ func (f *fakeRregistry) PullManifest(repository, reference string, accepttedMedi
}
return mani, "sha256:c6b2b2c507a0944348e0303114d8d93aaaa081732b86451d9bce1f432a537bc7", nil
}
func (f *fakeRregistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
func (f *fakeRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error {
return nil
}
func (f *fakeRregistry) BlobExist(repository, digest string) (bool, error) {
func (f *fakeRegistry) BlobExist(repository, digest string) (bool, error) {
return false, nil
}
func (f *fakeRregistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
func (f *fakeRegistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) {
r := ioutil.NopCloser(bytes.NewReader([]byte{'a'}))
return 1, r, nil
}
func (f *fakeRregistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
func (f *fakeRegistry) PushBlob(repository, digest string, size int64, blob io.Reader) error {
return nil
}
@ -118,8 +118,8 @@ func TestCopy(t *testing.T) {
tr := &transfer{
logger: log.DefaultLogger(),
isStopped: stopFunc,
src: &fakeRregistry{},
dst: &fakeRregistry{},
src: &fakeRegistry{},
dst: &fakeRegistry{},
}
src := &repository{