mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-11 18:38:14 +01:00
490 lines
14 KiB
Go
490 lines
14 KiB
Go
/*
|
|
Copyright (c) 2016 VMware, Inc. All Rights Reserved.
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package replication
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/manifest/schema1"
|
|
"github.com/docker/distribution/manifest/schema2"
|
|
"github.com/vmware/harbor/dao"
|
|
"github.com/vmware/harbor/models"
|
|
"github.com/vmware/harbor/utils/log"
|
|
"github.com/vmware/harbor/utils/registry"
|
|
"github.com/vmware/harbor/utils/registry/auth"
|
|
)
|
|
|
|
const (
|
|
// StateInitialize ...
|
|
StateInitialize = "initialize"
|
|
// StateCheck ...
|
|
StateCheck = "check"
|
|
// StatePullManifest ...
|
|
StatePullManifest = "pull_manifest"
|
|
// StateTransferBlob ...
|
|
StateTransferBlob = "transfer_blob"
|
|
// StatePushManifest ...
|
|
StatePushManifest = "push_manifest"
|
|
)
|
|
|
|
var (
|
|
// ErrConflict represents http 409 error
|
|
ErrConflict = errors.New("conflict")
|
|
)
|
|
|
|
// BaseHandler holds informations shared by other state handlers
|
|
type BaseHandler struct {
|
|
project string // project_name
|
|
repository string // prject_name/repo_name
|
|
tags []string
|
|
|
|
srcURL string // url of source registry
|
|
srcSecret string
|
|
|
|
dstURL string // url of target registry
|
|
dstUsr string // username ...
|
|
dstPwd string // password ...
|
|
|
|
insecure bool // whether skip secure check when using https
|
|
|
|
srcClient *registry.Repository
|
|
dstClient *registry.Repository
|
|
|
|
manifest distribution.Manifest // manifest of tags[0]
|
|
digest string //digest of tags[0]'s manifest
|
|
blobs []string // blobs need to be transferred for tags[0]
|
|
|
|
blobsExistence map[string]bool //key: digest of blob, value: existence
|
|
|
|
logger *log.Logger
|
|
}
|
|
|
|
// InitBaseHandler initializes a BaseHandler.
|
|
func InitBaseHandler(repository, srcURL, srcSecret,
|
|
dstURL, dstUsr, dstPwd string, insecure bool, tags []string, logger *log.Logger) *BaseHandler {
|
|
|
|
base := &BaseHandler{
|
|
repository: repository,
|
|
tags: tags,
|
|
srcURL: srcURL,
|
|
srcSecret: srcSecret,
|
|
dstURL: dstURL,
|
|
dstUsr: dstUsr,
|
|
dstPwd: dstPwd,
|
|
insecure: insecure,
|
|
blobsExistence: make(map[string]bool, 10),
|
|
logger: logger,
|
|
}
|
|
|
|
base.project = getProjectName(base.repository)
|
|
|
|
return base
|
|
}
|
|
|
|
// Exit ...
|
|
func (b *BaseHandler) Exit() error {
|
|
return nil
|
|
}
|
|
|
|
func getProjectName(repository string) string {
|
|
repository = strings.TrimSpace(repository)
|
|
repository = strings.TrimRight(repository, "/")
|
|
return repository[:strings.LastIndex(repository, "/")]
|
|
}
|
|
|
|
// Initializer creates clients for source and destination registry,
|
|
// lists tags of the repository if parameter tags is nil.
|
|
type Initializer struct {
|
|
*BaseHandler
|
|
}
|
|
|
|
// Enter ...
|
|
func (i *Initializer) Enter() (string, error) {
|
|
i.logger.Infof("initializing: repository: %s, tags: %v, source URL: %s, destination URL: %s, insecure: %v, destination user: %s",
|
|
i.repository, i.tags, i.srcURL, i.dstURL, i.insecure, i.dstUsr)
|
|
|
|
state, err := i.enter()
|
|
if err != nil && retry(err) {
|
|
i.logger.Info("waiting for retrying...")
|
|
return models.JobRetrying, nil
|
|
}
|
|
|
|
return state, err
|
|
}
|
|
|
|
func (i *Initializer) enter() (string, error) {
|
|
c := &http.Cookie{Name: models.UISecretCookie, Value: i.srcSecret}
|
|
srcCred := auth.NewCookieCredential(c)
|
|
srcClient, err := newRepositoryClient(i.srcURL, i.insecure, srcCred,
|
|
i.repository, "repository", i.repository, "pull", "push", "*")
|
|
if err != nil {
|
|
i.logger.Errorf("an error occurred while creating source repository client: %v", err)
|
|
return "", err
|
|
}
|
|
i.srcClient = srcClient
|
|
|
|
dstCred := auth.NewBasicAuthCredential(i.dstUsr, i.dstPwd)
|
|
dstClient, err := newRepositoryClient(i.dstURL, i.insecure, dstCred,
|
|
i.repository, "repository", i.repository, "pull", "push", "*")
|
|
if err != nil {
|
|
i.logger.Errorf("an error occurred while creating destination repository client: %v", err)
|
|
return "", err
|
|
}
|
|
i.dstClient = dstClient
|
|
|
|
if len(i.tags) == 0 {
|
|
tags, err := i.srcClient.ListTag()
|
|
if err != nil {
|
|
i.logger.Errorf("an error occurred while listing tags for source repository: %v", err)
|
|
return "", err
|
|
}
|
|
i.tags = tags
|
|
}
|
|
|
|
i.logger.Infof("initialization completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, insecure: %v, destination user: %s",
|
|
i.project, i.repository, i.tags, i.srcURL, i.dstURL, i.insecure, i.dstUsr)
|
|
|
|
return StateCheck, nil
|
|
}
|
|
|
|
// Checker checks the existence of project and the user's privlege to the project
|
|
type Checker struct {
|
|
*BaseHandler
|
|
}
|
|
|
|
// Enter check existence of project, if it does not exist, create it,
|
|
// if it exists, check whether the user has write privilege to it.
|
|
func (c *Checker) Enter() (string, error) {
|
|
state, err := c.enter()
|
|
if err != nil && retry(err) {
|
|
c.logger.Info("waiting for retrying...")
|
|
return models.JobRetrying, nil
|
|
}
|
|
|
|
return state, err
|
|
}
|
|
|
|
func (c *Checker) enter() (string, error) {
|
|
project, err := dao.GetProjectByName(c.project)
|
|
if err != nil {
|
|
c.logger.Errorf("an error occurred while getting project %s in DB: %v", c.project, err)
|
|
return "", err
|
|
}
|
|
|
|
err = c.createProject(project.Public == 1)
|
|
if err == nil {
|
|
c.logger.Infof("project %s is created on %s with user %s", c.project, c.dstURL, c.dstUsr)
|
|
return StatePullManifest, nil
|
|
}
|
|
|
|
// other job may be also doing the same thing when the current job
|
|
// is creating project, so when the response code is 409, continue
|
|
// to do next step
|
|
if err == ErrConflict {
|
|
c.logger.Warningf("the status code is 409 when creating project %s on %s with user %s, try to do next step", c.project, c.dstURL, c.dstUsr)
|
|
return StatePullManifest, nil
|
|
}
|
|
|
|
c.logger.Errorf("an error occurred while creating project %s on %s with user %s : %v", c.project, c.dstURL, c.dstUsr, err)
|
|
|
|
return "", err
|
|
}
|
|
|
|
func (c *Checker) createProject(isPublic bool) error {
|
|
project := struct {
|
|
ProjectName string `json:"project_name"`
|
|
Public bool `json:"public"`
|
|
}{
|
|
ProjectName: c.project,
|
|
Public: isPublic,
|
|
}
|
|
|
|
data, err := json.Marshal(project)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
url := strings.TrimRight(c.dstURL, "/") + "/api/projects/"
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.SetBasicAuth(c.dstUsr, c.dstPwd)
|
|
|
|
client := &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{
|
|
InsecureSkipVerify: c.insecure,
|
|
},
|
|
},
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
// version 0.1.1's reponse code is 200
|
|
if resp.StatusCode == http.StatusCreated ||
|
|
resp.StatusCode == http.StatusOK {
|
|
return nil
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
return ErrConflict
|
|
}
|
|
|
|
message, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
c.logger.Errorf("an error occurred while reading message from response: %v", err)
|
|
}
|
|
|
|
return fmt.Errorf("failed to create project %s on %s with user %s: %d %s",
|
|
c.project, c.dstURL, c.dstUsr, resp.StatusCode, string(message))
|
|
}
|
|
|
|
// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled,
|
|
// the next state that state machine should enter is "finished".
|
|
type ManifestPuller struct {
|
|
*BaseHandler
|
|
}
|
|
|
|
// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry
|
|
func (m *ManifestPuller) Enter() (string, error) {
|
|
state, err := m.enter()
|
|
if err != nil && retry(err) {
|
|
m.logger.Info("waiting for retrying...")
|
|
return models.JobRetrying, nil
|
|
}
|
|
|
|
return state, err
|
|
|
|
}
|
|
|
|
func (m *ManifestPuller) enter() (string, error) {
|
|
if len(m.tags) == 0 {
|
|
m.logger.Infof("no tag needs to be replicated, next state is \"finished\"")
|
|
return models.JobFinished, nil
|
|
}
|
|
|
|
name := m.repository
|
|
tag := m.tags[0]
|
|
|
|
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
|
|
digest, mediaType, payload, err := m.srcClient.PullManifest(tag, acceptMediaTypes)
|
|
if err != nil {
|
|
m.logger.Errorf("an error occurred while pulling manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
|
|
return "", err
|
|
}
|
|
m.digest = digest
|
|
m.logger.Infof("manifest of %s:%s pulled successfully from %s: %s", name, tag, m.srcURL, digest)
|
|
|
|
if strings.Contains(mediaType, "application/json") {
|
|
mediaType = schema1.MediaTypeManifest
|
|
}
|
|
|
|
manifest, _, err := registry.UnMarshal(mediaType, payload)
|
|
if err != nil {
|
|
m.logger.Errorf("an error occurred while parsing manifest of %s:%s from %s: %v", name, tag, m.srcURL, err)
|
|
return "", err
|
|
}
|
|
|
|
m.manifest = manifest
|
|
|
|
// all blobs(layers and config)
|
|
var blobs []string
|
|
|
|
for _, discriptor := range manifest.References() {
|
|
blobs = append(blobs, discriptor.Digest.String())
|
|
}
|
|
|
|
// config is also need to be transferred if the schema of manifest is v2
|
|
manifest2, ok := manifest.(*schema2.DeserializedManifest)
|
|
if ok {
|
|
blobs = append(blobs, manifest2.Target().Digest.String())
|
|
}
|
|
|
|
m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs)
|
|
|
|
for _, blob := range blobs {
|
|
exist, ok := m.blobsExistence[blob]
|
|
if !ok {
|
|
exist, err = m.dstClient.BlobExist(blob)
|
|
if err != nil {
|
|
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err)
|
|
return "", err
|
|
}
|
|
m.blobsExistence[blob] = exist
|
|
}
|
|
|
|
if !exist {
|
|
m.blobs = append(m.blobs, blob)
|
|
} else {
|
|
m.logger.Infof("blob %s of %s:%s already exists in %s", blob, name, tag, m.dstURL)
|
|
}
|
|
}
|
|
m.logger.Infof("blobs of %s:%s need to be transferred to %s: %v", name, tag, m.dstURL, m.blobs)
|
|
|
|
return StateTransferBlob, nil
|
|
}
|
|
|
|
// BlobTransfer transfers blobs of a tag
|
|
type BlobTransfer struct {
|
|
*BaseHandler
|
|
}
|
|
|
|
// Enter pulls blobs and then pushs them to destination registry.
|
|
func (b *BlobTransfer) Enter() (string, error) {
|
|
state, err := b.enter()
|
|
if err != nil && retry(err) {
|
|
b.logger.Info("waiting for retrying...")
|
|
return models.JobRetrying, nil
|
|
}
|
|
|
|
return state, err
|
|
|
|
}
|
|
|
|
func (b *BlobTransfer) enter() (string, error) {
|
|
name := b.repository
|
|
tag := b.tags[0]
|
|
for _, blob := range b.blobs {
|
|
b.logger.Infof("transferring blob %s of %s:%s to %s ...", blob, name, tag, b.dstURL)
|
|
size, data, err := b.srcClient.PullBlob(blob)
|
|
if err != nil {
|
|
b.logger.Errorf("an error occurred while pulling blob %s of %s:%s from %s: %v", blob, name, tag, b.srcURL, err)
|
|
return "", err
|
|
}
|
|
if data != nil {
|
|
defer data.Close()
|
|
}
|
|
if err = b.dstClient.PushBlob(blob, size, data); err != nil {
|
|
b.logger.Errorf("an error occurred while pushing blob %s of %s:%s to %s : %v", blob, name, tag, b.dstURL, err)
|
|
return "", err
|
|
}
|
|
b.logger.Infof("blob %s of %s:%s transferred to %s completed", blob, name, tag, b.dstURL)
|
|
}
|
|
|
|
return StatePushManifest, nil
|
|
}
|
|
|
|
// ManifestPusher pushs the manifest to destination registry
|
|
type ManifestPusher struct {
|
|
*BaseHandler
|
|
}
|
|
|
|
// Enter checks the existence of manifest in the source registry first, and if it
|
|
// exists, pushs it to destination registry. The checking operation is to avoid
|
|
// the situation that the tag is deleted during the blobs transfering
|
|
func (m *ManifestPusher) Enter() (string, error) {
|
|
state, err := m.enter()
|
|
if err != nil && retry(err) {
|
|
m.logger.Info("waiting for retrying...")
|
|
return models.JobRetrying, nil
|
|
}
|
|
|
|
return state, err
|
|
|
|
}
|
|
|
|
func (m *ManifestPusher) enter() (string, error) {
|
|
name := m.repository
|
|
tag := m.tags[0]
|
|
_, exist, err := m.srcClient.ManifestExist(tag)
|
|
if err != nil {
|
|
m.logger.Infof("an error occurred while checking the existence of manifest of %s:%s on %s: %v", name, tag, m.srcURL, err)
|
|
return "", err
|
|
}
|
|
if !exist {
|
|
m.logger.Infof("manifest of %s:%s does not exist on source registry %s, cancel manifest pushing", name, tag, m.srcURL)
|
|
} else {
|
|
m.logger.Infof("manifest of %s:%s exists on source registry %s, continue manifest pushing", name, tag, m.srcURL)
|
|
|
|
digest, manifestExist, err := m.dstClient.ManifestExist(tag)
|
|
if manifestExist && digest == m.digest {
|
|
m.logger.Infof("manifest of %s:%s exists on destination registry %s, skip manifest pushing", name, tag, m.dstURL)
|
|
|
|
m.tags = m.tags[1:]
|
|
m.manifest = nil
|
|
m.digest = ""
|
|
m.blobs = nil
|
|
|
|
return StatePullManifest, nil
|
|
}
|
|
|
|
mediaType, data, err := m.manifest.Payload()
|
|
if err != nil {
|
|
m.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", name, tag, err)
|
|
return "", err
|
|
}
|
|
|
|
if _, err = m.dstClient.PushManifest(tag, mediaType, data); err != nil {
|
|
m.logger.Errorf("an error occurred while pushing manifest of %s:%s to %s : %v", name, tag, m.dstURL, err)
|
|
return "", err
|
|
}
|
|
m.logger.Infof("manifest of %s:%s has been pushed to %s", name, tag, m.dstURL)
|
|
}
|
|
|
|
m.tags = m.tags[1:]
|
|
m.manifest = nil
|
|
m.digest = ""
|
|
m.blobs = nil
|
|
|
|
return StatePullManifest, nil
|
|
}
|
|
|
|
func newRepositoryClient(endpoint string, insecure bool, credential auth.Credential, repository, scopeType, scopeName string,
|
|
scopeActions ...string) (*registry.Repository, error) {
|
|
|
|
authorizer := auth.NewStandardTokenAuthorizer(credential, insecure, scopeType, scopeName, scopeActions...)
|
|
|
|
store, err := auth.NewAuthorizerStore(endpoint, insecure, authorizer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uam := &userAgentModifier{
|
|
userAgent: "harbor-registry-client",
|
|
}
|
|
|
|
client, err := registry.NewRepositoryWithModifiers(repository, endpoint, insecure, store, uam)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
type userAgentModifier struct {
|
|
userAgent string
|
|
}
|
|
|
|
// Modify adds user-agent header to the request
|
|
func (u *userAgentModifier) Modify(req *http.Request) error {
|
|
req.Header.Set(http.CanonicalHeaderKey("User-Agent"), u.userAgent)
|
|
return nil
|
|
}
|