Merge pull request #7127 from cd1989/dockerhub-adaptor

Implement dockerhub adapter
This commit is contained in:
Wenkai Yin 2019-04-12 20:25:18 +08:00 committed by GitHub
commit 5ee30c7909
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 706 additions and 9 deletions

View File

@ -28,6 +28,8 @@ import (
_ "github.com/goharbor/harbor/src/replication/ng/transfer/repository"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
// register the DockerHub adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/dockerhub"
)
// Replication implements the job interface

View File

@ -0,0 +1,387 @@
package dockerhub
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"github.com/goharbor/harbor/src/common/utils/log"
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/util"
)
func init() {
if err := adp.RegisterFactory(model.RegistryTypeDockerHub, func(registry *model.Registry) (adp.Adapter, error) {
client, err := NewClient(registry)
if err != nil {
return nil, err
}
return &adapter{
client: client,
registry: registry,
DefaultImageRegistry: adp.NewDefaultImageRegistry(&model.Registry{
Name: registry.Name,
URL: registryURL,
Credential: registry.Credential,
Insecure: registry.Insecure,
}),
}, nil
}); err != nil {
log.Errorf("Register adapter factory for %s error: %v", model.RegistryTypeDockerHub, err)
return
}
log.Infof("Factory for adapter %s registered", model.RegistryTypeDockerHub)
}
type adapter struct {
*adp.DefaultImageRegistry
registry *model.Registry
client *Client
}
// Ensure '*adapter' implements interface 'Adapter'.
var _ adp.Adapter = (*adapter)(nil)
// Info returns information of the registry
func (a *adapter) Info() (*model.RegistryInfo, error) {
return &model.RegistryInfo{
Type: model.RegistryTypeDockerHub,
SupportNamespace: true,
SupportedResourceTypes: []model.ResourceType{
model.ResourceTypeRepository,
},
SupportedResourceFilters: []*model.FilterStyle{
{
Type: model.FilterTypeName,
Style: model.FilterStyleTypeText,
},
{
Type: model.FilterTypeTag,
Style: model.FilterStyleTypeText,
},
},
SupportedTriggers: []model.TriggerType{
model.TriggerTypeManual,
model.TriggerTypeScheduled,
},
}, nil
}
// ConvertResourceMetadata converts the namespace and repository part of the resource metadata
// to the one that the adapter can handle
func (a *adapter) ConvertResourceMetadata(meta *model.ResourceMetadata, namespace *model.Namespace) (*model.ResourceMetadata, error) {
return meta, nil
}
// PrepareForPush does the prepare work that needed for pushing/uploading the resource
// eg: create the namespace or repository
func (a *adapter) PrepareForPush(resource *model.Resource) error {
if resource == nil {
return errors.New("the resource cannot be null")
}
if resource.Metadata == nil {
return errors.New("the metadata of resource cannot be null")
}
if resource.Metadata.Namespace == nil {
return errors.New("the namespace of resource cannot be null")
}
if len(resource.Metadata.Namespace.Name) == 0 {
return errors.New("the name of the namespace cannot be null")
}
err := a.CreateNamespace(&model.Namespace{
Name: resource.Metadata.Namespace.Name,
})
if err != nil {
return fmt.Errorf("create namespace '%s' in DockerHub error: %v", resource.Metadata.Namespace.Name, err)
}
return nil
}
// ListNamespaces lists namespaces from DockerHub with the provided query conditions.
func (a *adapter) ListNamespaces(query *model.NamespaceQuery) ([]*model.Namespace, error) {
resp, err := a.client.Do(http.MethodGet, listNamespacePath, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
log.Errorf("list namespace error: %s", string(body))
return nil, fmt.Errorf("%s", string(body))
}
namespaces := NamespacesResp{}
err = json.Unmarshal(body, &namespaces)
if err != nil {
return nil, err
}
var result []*model.Namespace
for _, ns := range namespaces.Namespaces {
// If query set, skip the namespace that doesn't match the query.
if query != nil && len(query.Name) > 0 && strings.Index(ns, query.Name) != -1 {
continue
}
result = append(result, &model.Namespace{
Name: ns,
})
}
return result, nil
}
// CreateNamespace creates a new namespace in DockerHub
func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
ns, err := a.getNamespace(namespace.Name)
if err != nil {
return fmt.Errorf("check existence of namespace '%s' error: %v", namespace.Name, err)
}
// If the namespace already exist, return succeeded directly.
if ns != nil {
log.Infof("Namespace %s already exist in DockerHub, skip it.", namespace.Name)
return nil
}
req := &NewOrgReq{
Name: namespace.Name,
FullName: namespace.GetStringMetadata(metadataKeyFullName, namespace.Name),
Company: namespace.GetStringMetadata(metadataKeyCompany, namespace.Name),
}
b, err := json.Marshal(req)
if err != nil {
return err
}
resp, err := a.client.Do(http.MethodPost, createNamespacePath, bytes.NewReader(b))
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode/100 != 2 {
log.Errorf("create namespace error: %d -- %s", resp.StatusCode, string(body))
return fmt.Errorf("%d -- %s", resp.StatusCode, body)
}
return nil
}
// GetNamespace gets a namespace from DockerHub.
func (a *adapter) GetNamespace(namespace string) (*model.Namespace, error) {
return &model.Namespace{
Name: namespace,
}, nil
}
// getNamespace get namespace from DockerHub, if the namespace not found, two nil would be returned.
func (a *adapter) getNamespace(namespace string) (*model.Namespace, error) {
resp, err := a.client.Do(http.MethodGet, getNamespacePath(namespace), nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if resp.StatusCode/100 != 2 {
log.Errorf("create namespace error: %d -- %s", resp.StatusCode, string(body))
return nil, fmt.Errorf("%d -- %s", resp.StatusCode, body)
}
return &model.Namespace{
Name: namespace,
}, nil
}
// FetchImages fetches images
func (a *adapter) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) {
var repos []Repo
nameFilter, err := a.getStringFilterValue(model.FilterTypeName, filters)
if err != nil {
return nil, err
}
tagFilter, err := a.getStringFilterValue(model.FilterTypeTag, filters)
if err != nil {
return nil, err
}
for _, ns := range namespaces {
page := 1
pageSize := 100
for {
pageRepos, err := a.getRepos(ns, "", page, pageSize)
if err != nil {
return nil, fmt.Errorf("get repos for namespace '%s' from DockerHub error: %v", ns, err)
}
repos = append(repos, pageRepos.Repos...)
if len(pageRepos.Next) == 0 {
break
}
page++
}
}
log.Infof("%d repos found for namespaces: %v", len(repos), namespaces)
var resources []*model.Resource
// TODO(ChenDe): Get tags for repos in parallel
for _, repo := range repos {
// If name filter set, skip repos that don't match the filter pattern.
if len(nameFilter) != 0 {
m, err := util.Match(nameFilter, repo.Name)
if err != nil {
return nil, fmt.Errorf("match repo name '%s' against pattern '%s' error: %v", repo.Name, nameFilter, err)
}
if !m {
continue
}
}
var tags []string
page := 1
pageSize := 100
for {
pageTags, err := a.getTags(repo.Namespace, repo.Name, page, pageSize)
if err != nil {
return nil, fmt.Errorf("get tags for repo '%s/%s' from DockerHub error: %v", repo.Namespace, repo.Name, err)
}
for _, t := range pageTags.Tags {
// If tag filter set, skip tags that don't match the filter pattern.
if len(tagFilter) != 0 {
m, err := util.Match(tagFilter, t.Name)
if err != nil {
return nil, fmt.Errorf("match tag name '%s' against pattern '%s' error: %v", t.Name, tagFilter, err)
}
if !m {
continue
}
}
tags = append(tags, t.Name)
}
if len(pageTags.Next) == 0 {
break
}
page++
}
// If the repo has no tags, skip it
if len(tags) == 0 {
continue
}
resources = append(resources, &model.Resource{
Type: model.ResourceTypeRepository,
Registry: a.registry,
Metadata: &model.ResourceMetadata{
Namespace: &model.Namespace{
Name: repo.Namespace,
},
Repository: &model.Repository{
Name: repo.Name,
},
Vtags: tags,
},
})
}
return resources, nil
}
// getRepos gets a page of repos from DockerHub
func (a *adapter) getRepos(namespace, name string, page, pageSize int) (*ReposResp, error) {
resp, err := a.client.Do(http.MethodGet, listReposPath(namespace, name, page, pageSize), nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
log.Errorf("list repos error: %d -- %s", resp.StatusCode, string(body))
return nil, fmt.Errorf("%d -- %s", resp.StatusCode, body)
}
repos := &ReposResp{}
err = json.Unmarshal(body, repos)
if err != nil {
return nil, fmt.Errorf("unmarshal repos list %s error: %v", string(body), err)
}
return repos, nil
}
// getTags gets a page of tags for a repo from DockerHub
func (a *adapter) getTags(namespace, repo string, page, pageSize int) (*TagsResp, error) {
resp, err := a.client.Do(http.MethodGet, listTagsPath(namespace, repo, page, pageSize), nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
log.Errorf("list tags error: %d -- %s", resp.StatusCode, string(body))
return nil, fmt.Errorf("%d -- %s", resp.StatusCode, body)
}
tags := &TagsResp{}
err = json.Unmarshal(body, tags)
if err != nil {
return nil, fmt.Errorf("unmarshal tags list %s error: %v", string(body), err)
}
return tags, nil
}
// getFilter gets specific type filter value from filters list.
func (a *adapter) getStringFilterValue(filterType model.FilterType, filters []*model.Filter) (string, error) {
for _, f := range filters {
if f.Type == filterType {
v, ok := f.Value.(string)
if !ok {
msg := fmt.Sprintf("expect filter value to be string, but got: %v", f.Value)
log.Error(msg)
return "", errors.New(msg)
}
return v, nil
}
}
return "", nil
}

View File

@ -0,0 +1,50 @@
package dockerhub
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
adp "github.com/goharbor/harbor/src/replication/ng/adapter"
"github.com/goharbor/harbor/src/replication/ng/model"
)
const (
testUser = ""
testPassword = ""
)
func getAdapter(t *testing.T) adp.Adapter {
assert := assert.New(t)
factory, err := adp.GetFactory(model.RegistryTypeDockerHub)
assert.Nil(err)
assert.NotNil(factory)
adapter, err := factory(&model.Registry{
Type: model.RegistryTypeDockerHub,
Credential: &model.Credential{
AccessKey: testUser,
AccessSecret: testPassword,
},
})
assert.Nil(err)
assert.NotNil(adapter)
return adapter
}
func TestListNamespaces(t *testing.T) {
if testUser == "" {
return
}
assert := assert.New(t)
adapter := getAdapter(t)
namespaces, err := adapter.ListNamespaces(nil)
assert.Nil(err)
for _, ns := range namespaces {
fmt.Println(ns.Name)
}
}

View File

@ -0,0 +1,110 @@
package dockerhub
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// Client is a client to interact with DockerHub
type Client struct {
client *http.Client
token string
host string
credential LoginCredential
}
// NewClient creates a new DockerHub client.
func NewClient(registry *model.Registry) (*Client, error) {
client := &Client{
host: registry.URL,
client: http.DefaultClient,
credential: LoginCredential{
User: registry.Credential.AccessKey,
Password: registry.Credential.AccessSecret,
},
}
// Login to DockerHub to get access token, default expire date is 30d.
err := client.refreshToken()
if err != nil {
return nil, fmt.Errorf("login to dockerhub error: %v", err)
}
return client, nil
}
// refreshToken login to DockerHub with user/password, and retrieve access token.
func (c *Client) refreshToken() error {
b, err := json.Marshal(c.credential)
if err != nil {
return fmt.Errorf("marshal credential error: %v", err)
}
request, err := http.NewRequest(http.MethodPost, baseURL+loginPath, bytes.NewReader(b))
if err != nil {
return err
}
request.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(request)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode/100 != 2 {
return fmt.Errorf("login to dockerhub error: %s", string(body))
}
token := &TokenResp{}
err = json.Unmarshal(body, token)
if err != nil {
return fmt.Errorf("unmarshal token response error: %v", err)
}
c.token = token.Token
return nil
}
// Do performs http request to DockerHub, it will set token automatically.
func (c *Client) Do(method, path string, body io.Reader) (*http.Response, error) {
url := baseURL + path
log.Infof("%s %s", method, url)
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
if body != nil || method == http.MethodPost || method == http.MethodPut {
req.Header.Set("Content-Type", "application/json")
}
req.Header.Set("Authorization", fmt.Sprintf("JWT %s", c.token))
resp, err := c.client.Do(req)
if err != nil {
log.Errorf("unexpected error: %v", err)
return nil, err
}
if resp.StatusCode/100 != 2 {
b, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return nil, err
}
return nil, fmt.Errorf("unexpected %d error from dockerhub: %s", resp.StatusCode, string(b))
}
return resp, nil
}

View File

@ -0,0 +1,30 @@
package dockerhub
import "fmt"
const (
baseURL = "https://hub.docker.com"
registryURL = "https://registry-1.docker.io"
loginPath = "/v2/users/login/"
listNamespacePath = "/v2/repositories/namespaces"
createNamespacePath = "/v2/orgs/"
metadataKeyCompany = "company"
metadataKeyFullName = "fullName"
)
func getNamespacePath(namespace string) string {
return fmt.Sprintf("/v2/orgs/%s/", namespace)
}
func listReposPath(namespace, name string, page, pageSize int) string {
if len(name) == 0 {
return fmt.Sprintf("/v2/repositories/%s/?page=%d&page_size=%d", namespace, page, pageSize)
}
return fmt.Sprintf("/v2/repositories/%s/?name=%s&page=%d&page_size=%d", namespace, name, page, pageSize)
}
func listTagsPath(namespace, repo string, page, pageSize int) string {
return fmt.Sprintf("/v2/repositories/%s/%s/tags/?page=%d&page_size=%d", namespace, repo, page, pageSize)
}

View File

@ -0,0 +1,92 @@
package dockerhub
// LoginCredential is request to login.
type LoginCredential struct {
User string `json:"username"`
Password string `json:"password"`
}
// TokenResp is response of login.
type TokenResp struct {
Token string `json:"token"`
}
// NamespacesResp is namespace list responsed from DockerHub.
type NamespacesResp struct {
// Namespaces is a list of namespaces
Namespaces []string `json:"namespaces"`
}
// NewOrgReq is request to create a new org as namespace.
type NewOrgReq struct {
// Name is name of the namespace
Name string `json:"orgname"`
// FullName ...
FullName string `json:"full_name"`
// Company ...
Company string `json:"company"`
// Location ...
Location string `json:"location"`
// ProfileUrl ...
ProfileURL string `json:"profile_url"`
// GravatarEmail ...
GravatarEmail string `json:"gravatar_email"`
}
// Repo describes a repo in DockerHub
type Repo struct {
// User ...
User string `json:"user"`
// Name of the repo
Name string `json:"name"`
// Namespace of the repo
Namespace string `json:"namespace"`
// RepoType is type of the repo, e.g. 'image'
RepoType string `json:"repository_type"`
// Status ...
Status int `json:"status"`
// Description ...
Description string `json:"description"`
// IsPrivate indicates whether the repo is private
IsPrivate bool `json:"is_private"`
// IsAutomated ...
IsAutomated bool `json:"is_automated"`
// CanEdit ...
CanEdit bool `json:"can_edit"`
// StarCount ..
StarCount int `json:"star_count"`
// PullCount ...
PullCount int `json:"pull_count"`
}
// ReposResp is response of repo list request
type ReposResp struct {
// Count is total number of repos
Count int `json:"count"`
// Next is the URL of the next page
Next string `json:"next"`
// Previous is the URL of the previous page
Previous string `json:"previous"`
// Repos is repo list
Repos []Repo `json:"results"`
}
// Tag describes a tag in DockerHub
type Tag struct {
// Name of the tag
Name string `json:"name"`
// FullSize is size of the image
FullSize int64 `json:"full_size"`
}
// TagsResp is response of tag list request
type TagsResp struct {
// Count is total number of repos
Count int `json:"count"`
// Next is the URL of the next page
Next string `json:"next"`
// Previous is the URL of the previous page
Previous string `json:"previous"`
// Repos is tags list
Tags []Tag `json:"results"`
}

View File

@ -218,7 +218,6 @@ func (a *adapter) PrepareForPush(resource *model.Resource) error {
public = false
break
}
}
project.Metadata = map[string]interface{}{
"public": public,

View File

@ -22,6 +22,19 @@ type Namespace struct {
Metadata map[string]interface{} `json:"metadata"`
}
// GetStringMetadata get a string value metadata from the namespace, if not found, return the default value.
func (n *Namespace) GetStringMetadata(key string, defaultValue string) string {
if n.Metadata == nil {
return defaultValue
}
if v, ok := n.Metadata[key]; ok {
return v.(string)
}
return defaultValue
}
// NamespaceQuery defines the query condition for listing namespaces
type NamespaceQuery struct {
Name string

View File

@ -23,7 +23,8 @@ import (
// const definition
const (
// RegistryTypeHarbor indicates registry type harbor
RegistryTypeHarbor RegistryType = "harbor"
RegistryTypeHarbor RegistryType = "harbor"
RegistryTypeDockerHub RegistryType = "dockerHub"
FilterStyleTypeText = "input"
FilterStyleTypeRadio = "radio"

View File

@ -61,6 +61,7 @@ func (c *copyFlow) Run(interface{}) (int, error) {
if err != nil {
return 0, err
}
if err = prepareForPush(dstAdapter, dstResources); err != nil {
return 0, err
}
@ -71,6 +72,7 @@ func (c *copyFlow) Run(interface{}) (int, error) {
if err = createTasks(c.executionMgr, c.executionID, items); err != nil {
return 0, err
}
return schedule(c.scheduler, c.executionMgr, items)
}

View File

@ -64,6 +64,7 @@ func (d *deletionFlow) Run(interface{}) (int, error) {
if err != nil {
return 0, err
}
items, err := preprocess(d.scheduler, srcResources, dstResources)
if err != nil {
return 0, err
@ -71,5 +72,6 @@ func (d *deletionFlow) Run(interface{}) (int, error) {
if err = createTasks(d.executionMgr, d.executionID, items); err != nil {
return 0, err
}
return schedule(d.scheduler, d.executionMgr, items)
}

View File

@ -241,6 +241,7 @@ func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.Sc
if item.DstResource.Deleted {
operation = "deletion"
}
task := &models.Task{
ExecutionID: executionID,
Status: models.TaskStatusInitialized,
@ -249,6 +250,7 @@ func createTasks(mgr execution.Manager, executionID int64, items []*scheduler.Sc
DstResource: getResourceName(item.DstResource),
Operation: operation,
}
id, err := mgr.CreateTask(task)
if err != nil {
// if failed to create the task for one of the items,
@ -317,5 +319,10 @@ func getResourceName(res *model.Resource) string {
if len(meta.Vtags) == 0 {
return meta.GetResourceName()
}
return meta.GetResourceName() + ":[" + strings.Join(meta.Vtags, ",") + "]"
if len(meta.Vtags) <= 5 {
return meta.GetResourceName() + ":[" + strings.Join(meta.Vtags, ",") + "]"
}
return fmt.Sprintf("%s:[%s ... %d in total]", meta.GetResourceName(), strings.Join(meta.Vtags[:5], ","), len(meta.Vtags))
}

View File

@ -30,6 +30,8 @@ import (
"github.com/goharbor/harbor/src/replication/ng/registry"
// register the Harbor adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/dockerhub"
// register the DockerHub adapter
_ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor"
)

View File

@ -17,31 +17,31 @@ type MockJobClient struct {
// GetJobLog ...
func (mjc *MockJobClient) GetJobLog(uuid string) ([]byte, error) {
if uuid == "500" {
return nil, &http.Error{500, "Server side error"}
return nil, &http.Error{500, "server side error"}
}
if mjc.validUUID(uuid) {
return []byte("some log"), nil
}
return nil, &http.Error{404, "Not Found"}
return nil, &http.Error{404, "not Found"}
}
// SubmitJob ...
func (mjc *MockJobClient) SubmitJob(data *models.JobData) (string, error) {
if data.Name == job.ImageScanAllJob || data.Name == job.ImageReplicate || data.Name == job.ImageGC || data.Name == job.ImageScanJob {
if data.Name == job.ImageScanAllJob || data.Name == job.Replication || data.Name == job.ImageGC || data.Name == job.ImageScanJob {
uuid := fmt.Sprintf("u-%d", rand.Int())
mjc.JobUUID = append(mjc.JobUUID, uuid)
return uuid, nil
}
return "", fmt.Errorf("Unsupported job %s", data.Name)
return "", fmt.Errorf("unsupported job %s", data.Name)
}
// PostAction ...
func (mjc *MockJobClient) PostAction(uuid, action string) error {
if "500" == uuid {
return &http.Error{500, "Server side error"}
return &http.Error{500, "server side error"}
}
if !mjc.validUUID(uuid) {
return &http.Error{404, "Not Found"}
return &http.Error{404, "not Found"}
}
return nil
}