mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 18:25:56 +01:00
Migrate replication job to the new jobservice
This commit is contained in:
parent
aab34efa49
commit
ed08a42e4b
@ -3,7 +3,7 @@ sudo: true
|
||||
language: go
|
||||
|
||||
go:
|
||||
- 1.7.3
|
||||
- 1.9.2
|
||||
|
||||
go_import_path: github.com/vmware/harbor
|
||||
|
||||
|
@ -3,4 +3,10 @@ package job
|
||||
const (
|
||||
//ImageScanJob is name of scan job it will be used as key to register to job service.
|
||||
ImageScanJob = "IMAGE_SCAN"
|
||||
// ImageReplicationTransfer : the name of replication transfer job in job service
|
||||
ImageReplicationTransfer = "IMAGE_REPLICATION_TRANSFER"
|
||||
// ImageReplicationDelete : the name of replication delete job in job service
|
||||
ImageReplicationDelete = "IMAGE_REPLICATION_DELETE"
|
||||
// ImagePeriodReplication : the name of period replication job in job service
|
||||
ImagePeriodReplication = "IMAGE_PERIOD_REPLICATION"
|
||||
)
|
||||
|
@ -1,10 +1,11 @@
|
||||
package job
|
||||
|
||||
// ScanJobParms holds parameters used to submit jobs to jobservice
|
||||
type ScanJobParms struct {
|
||||
JobID int64 `json:"job_int_id"`
|
||||
Repository string `json:"repository"`
|
||||
Tag string `json:"tag"`
|
||||
Secret string `json: "job_service_secret"`
|
||||
Secret string `json:"job_service_secret"`
|
||||
RegistryURL string `json:"registry_url"`
|
||||
ClairEndpoint string `json:"clair_endpoint"`
|
||||
TokenEndpoint string `json:"token_endpoint"`
|
||||
|
@ -16,13 +16,12 @@ package auth
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/vmware/harbor/src/common/http/modifier"
|
||||
)
|
||||
|
||||
// Credential ...
|
||||
type Credential interface {
|
||||
// AddAuthorization adds authorization information to request
|
||||
AddAuthorization(req *http.Request)
|
||||
}
|
||||
type Credential modifier.Modifier
|
||||
|
||||
// Implements interface Credential
|
||||
type basicAuthCredential struct {
|
||||
@ -42,6 +41,12 @@ func (b *basicAuthCredential) AddAuthorization(req *http.Request) {
|
||||
req.SetBasicAuth(b.username, b.password)
|
||||
}
|
||||
|
||||
// implement github.com/vmware/harbor/src/common/http/modifier.Modifier
|
||||
func (b *basicAuthCredential) Modify(req *http.Request) error {
|
||||
b.AddAuthorization(req)
|
||||
return nil
|
||||
}
|
||||
|
||||
type cookieCredential struct {
|
||||
cookie *http.Cookie
|
||||
}
|
||||
@ -57,3 +62,9 @@ func NewCookieCredential(c *http.Cookie) Credential {
|
||||
func (c *cookieCredential) AddAuthorization(req *http.Request) {
|
||||
req.AddCookie(c.cookie)
|
||||
}
|
||||
|
||||
// implement github.com/vmware/harbor/src/common/http/modifier.Modifier
|
||||
func (c *cookieCredential) Modify(req *http.Request) error {
|
||||
c.AddAuthorization(req)
|
||||
return nil
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ func TestAddAuthorizationOfBasicAuthCredential(t *testing.T) {
|
||||
t.Fatalf("failed to create request: %v", err)
|
||||
}
|
||||
|
||||
cred.AddAuthorization(req)
|
||||
cred.Modify(req)
|
||||
|
||||
usr, pwd, ok := req.BasicAuth()
|
||||
if !ok {
|
||||
@ -53,7 +53,7 @@ func TestAddAuthorizationOfCookieCredential(t *testing.T) {
|
||||
t.Fatalf("failed to create request: %v", err)
|
||||
}
|
||||
|
||||
cred.AddAuthorization(req)
|
||||
cred.Modify(req)
|
||||
|
||||
ck, err := req.Cookie("name")
|
||||
if err != nil {
|
||||
|
@ -59,7 +59,7 @@ func getToken(client *http.Client, credential Credential, realm, service string,
|
||||
}
|
||||
|
||||
if credential != nil {
|
||||
credential.AddAuthorization(req)
|
||||
credential.Modify(req)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
|
126
src/jobservice_v2/job/impl/replication/delete.go
Normal file
126
src/jobservice_v2/job/impl/replication/delete.go
Normal file
@ -0,0 +1,126 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
common_http "github.com/vmware/harbor/src/common/http"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
"github.com/vmware/harbor/src/common/utils/registry/auth"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
)
|
||||
|
||||
// Deleter deletes repository or images on the destination registry
|
||||
type Deleter struct {
|
||||
ctx env.JobContext
|
||||
repository *repository
|
||||
dstRegistry *registry
|
||||
logger *log.Logger
|
||||
retry bool
|
||||
}
|
||||
|
||||
// ShouldRetry : retry if the error is network error
|
||||
func (d *Deleter) ShouldRetry() bool {
|
||||
return d.retry
|
||||
}
|
||||
|
||||
// MaxFails ...
|
||||
func (d *Deleter) MaxFails() uint {
|
||||
return 3
|
||||
}
|
||||
|
||||
// Validate ....
|
||||
func (d *Deleter) Validate(params map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run ...
|
||||
func (d *Deleter) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
err := d.run(ctx, params)
|
||||
d.retry = retry(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *Deleter) run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
if err := d.init(ctx, params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.delete()
|
||||
}
|
||||
|
||||
func (d *Deleter) init(ctx env.JobContext, params map[string]interface{}) error {
|
||||
// TODO
|
||||
d.logger = log.DefaultLogger()
|
||||
d.ctx = ctx
|
||||
|
||||
if canceled(d.ctx) {
|
||||
d.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
d.repository = &repository{
|
||||
name: params["repository"].(string),
|
||||
}
|
||||
if tags, ok := params["tags"]; ok {
|
||||
tgs := tags.([]interface{})
|
||||
for _, tg := range tgs {
|
||||
d.repository.tags = append(d.repository.tags, tg.(string))
|
||||
}
|
||||
}
|
||||
|
||||
url := params["dst_registry_url"].(string)
|
||||
insecure := params["dst_registry_insecure"].(bool)
|
||||
cred := auth.NewBasicAuthCredential(
|
||||
params["dst_registry_username"].(string),
|
||||
params["dst_registry_password"].(string))
|
||||
|
||||
var err error
|
||||
d.dstRegistry, err = initRegistry(url, insecure, cred, d.repository.name)
|
||||
if err != nil {
|
||||
d.logger.Errorf("failed to create client for destination registry: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
d.logger.Infof("initialization completed: repository: %s, tags: %v, destination URL: %s, insecure: %v",
|
||||
d.repository.name, d.repository.tags, d.dstRegistry.url, d.dstRegistry.insecure)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Deleter) delete() error {
|
||||
repository := d.repository.name
|
||||
tags := d.repository.tags
|
||||
if len(tags) == 0 {
|
||||
if canceled(d.ctx) {
|
||||
d.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
if err := d.dstRegistry.DeleteRepository(repository); err != nil {
|
||||
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound {
|
||||
d.logger.Warningf("repository %s not found", repository)
|
||||
return nil
|
||||
}
|
||||
d.logger.Errorf("failed to delete repository %s: %v", repository, err)
|
||||
return err
|
||||
}
|
||||
d.logger.Infof("repository %s has been deleted", repository)
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
if canceled(d.ctx) {
|
||||
d.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
if err := d.dstRegistry.DeleteImage(repository, tag); err != nil {
|
||||
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound {
|
||||
d.logger.Warningf("image %s:%s not found", repository, tag)
|
||||
return nil
|
||||
}
|
||||
d.logger.Errorf("failed to delete image %s:%s: %v", repository, tag, err)
|
||||
return err
|
||||
}
|
||||
d.logger.Infof("image %s:%s has been deleted", repository, tag)
|
||||
}
|
||||
return nil
|
||||
}
|
79
src/jobservice_v2/job/impl/replication/registry.go
Normal file
79
src/jobservice_v2/job/impl/replication/registry.go
Normal file
@ -0,0 +1,79 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
common_http "github.com/vmware/harbor/src/common/http"
|
||||
"github.com/vmware/harbor/src/common/models"
|
||||
reg "github.com/vmware/harbor/src/common/utils/registry"
|
||||
)
|
||||
|
||||
type repository struct {
|
||||
name string
|
||||
tags []string
|
||||
}
|
||||
|
||||
// registry wraps operations of Harbor UI and docker registry into one struct
|
||||
type registry struct {
|
||||
reg.Repository // docker registry client
|
||||
client *common_http.Client // Harbor client
|
||||
url string
|
||||
insecure bool
|
||||
}
|
||||
|
||||
func (r *registry) GetProject(name string) (*models.Project, error) {
|
||||
url, err := url.Parse(strings.TrimRight(r.url, "/") + "/api/projects")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := url.Query()
|
||||
q.Set("name", name)
|
||||
url.RawQuery = q.Encode()
|
||||
|
||||
projects := []*models.Project{}
|
||||
if err = r.client.Get(url.String(), &projects); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, project := range projects {
|
||||
if project.Name == name {
|
||||
return project, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("project %s not found", name)
|
||||
}
|
||||
|
||||
func (r *registry) CreateProject(project *models.Project) error {
|
||||
// only replicate the public property of project
|
||||
pro := struct {
|
||||
models.ProjectRequest
|
||||
Public int `json:"public"`
|
||||
}{
|
||||
ProjectRequest: models.ProjectRequest{
|
||||
Name: project.Name,
|
||||
Metadata: map[string]string{
|
||||
models.ProMetaPublic: strconv.FormatBool(project.IsPublic()),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// put "public" property in both metadata and public field to keep compatibility
|
||||
// with old version API(<=1.2.0)
|
||||
if project.IsPublic() {
|
||||
pro.Public = 1
|
||||
}
|
||||
|
||||
return r.client.Post(strings.TrimRight(r.url, "/")+"/api/projects/", pro)
|
||||
}
|
||||
|
||||
func (r *registry) DeleteRepository(repository string) error {
|
||||
return r.client.Delete(strings.TrimRight(r.url, "/") + "/api/repositories/" + repository)
|
||||
}
|
||||
|
||||
func (r *registry) DeleteImage(repository, tag string) error {
|
||||
return r.client.Delete(strings.TrimRight(r.url, "/") + "/api/repositories/" + repository + "/tags/" + tag)
|
||||
}
|
348
src/jobservice_v2/job/impl/replication/replicate.go
Normal file
348
src/jobservice_v2/job/impl/replication/replicate.go
Normal file
@ -0,0 +1,348 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest/schema1"
|
||||
"github.com/docker/distribution/manifest/schema2"
|
||||
common_http "github.com/vmware/harbor/src/common/http"
|
||||
"github.com/vmware/harbor/src/common/http/modifier"
|
||||
"github.com/vmware/harbor/src/common/models"
|
||||
"github.com/vmware/harbor/src/common/utils"
|
||||
"github.com/vmware/harbor/src/common/utils/log"
|
||||
reg "github.com/vmware/harbor/src/common/utils/registry"
|
||||
"github.com/vmware/harbor/src/common/utils/registry/auth"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
job_utils "github.com/vmware/harbor/src/jobservice_v2/job/impl/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
errCanceled = errors.New("the job is canceled")
|
||||
)
|
||||
|
||||
// Replicator replicates images from source registry to the destination one
|
||||
type Replicator struct {
|
||||
ctx env.JobContext
|
||||
repository *repository
|
||||
srcRegistry *registry
|
||||
dstRegistry *registry
|
||||
logger *log.Logger
|
||||
retry bool
|
||||
}
|
||||
|
||||
// ShouldRetry : retry if the error is network error
|
||||
func (r *Replicator) ShouldRetry() bool {
|
||||
return r.retry
|
||||
}
|
||||
|
||||
// MaxFails ...
|
||||
func (r *Replicator) MaxFails() uint {
|
||||
return 3
|
||||
}
|
||||
|
||||
// Validate ....
|
||||
func (r *Replicator) Validate(params map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run ...
|
||||
func (r *Replicator) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
err := r.run(ctx, params)
|
||||
r.retry = retry(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Replicator) run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
// initialize
|
||||
if err := r.init(ctx, params); err != nil {
|
||||
return err
|
||||
}
|
||||
// try to create project on destination registry
|
||||
if err := r.createProject(); err != nil {
|
||||
return err
|
||||
}
|
||||
// replicate the images
|
||||
for _, tag := range r.repository.tags {
|
||||
digest, manifest, err := r.pullManifest(tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.transferLayers(tag, manifest.References()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.pushManifest(tag, digest, manifest); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Replicator) init(ctx env.JobContext, params map[string]interface{}) error {
|
||||
// TODO
|
||||
r.logger = log.DefaultLogger()
|
||||
r.ctx = ctx
|
||||
|
||||
if canceled(r.ctx) {
|
||||
r.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
// init images that need to be replicated
|
||||
r.repository = &repository{
|
||||
name: params["repository"].(string),
|
||||
}
|
||||
if tags, ok := params["tags"]; ok {
|
||||
tgs := tags.([]interface{})
|
||||
for _, tg := range tgs {
|
||||
r.repository.tags = append(r.repository.tags, tg.(string))
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
// init source registry client
|
||||
srcURL := params["src_registry_url"].(string)
|
||||
srcInsecure := params["src_registry_insecure"].(bool)
|
||||
srcCred := auth.NewCookieCredential(&http.Cookie{
|
||||
Name: models.UISecretCookie,
|
||||
Value: os.Getenv("JOBSERVICE_SECRET"),
|
||||
})
|
||||
srcTokenServiceURL := ""
|
||||
if stsu, ok := params["src_token_service_url"]; ok {
|
||||
srcTokenServiceURL = stsu.(string)
|
||||
}
|
||||
|
||||
if len(srcTokenServiceURL) > 0 {
|
||||
r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name, srcTokenServiceURL)
|
||||
} else {
|
||||
r.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, r.repository.name)
|
||||
}
|
||||
if err != nil {
|
||||
r.logger.Errorf("failed to create client for source registry: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// init destination registry client
|
||||
dstURL := params["dst_registry_url"].(string)
|
||||
dstInsecure := params["dst_registry_insecure"].(bool)
|
||||
dstCred := auth.NewBasicAuthCredential(
|
||||
params["dst_registry_username"].(string),
|
||||
params["dst_registry_password"].(string))
|
||||
r.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, r.repository.name)
|
||||
if err != nil {
|
||||
r.logger.Errorf("failed to create client for destination registry: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// get the tag list first if it is null
|
||||
if len(r.repository.tags) == 0 {
|
||||
tags, err := r.srcRegistry.ListTag()
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while listing tags for the source repository: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(tags) == 0 {
|
||||
err = fmt.Errorf("empty tag list for repository %s", r.repository.name)
|
||||
r.logger.Error(err)
|
||||
return err
|
||||
}
|
||||
r.repository.tags = tags
|
||||
}
|
||||
|
||||
r.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v",
|
||||
r.repository.name, r.repository.tags, r.srcRegistry.url, r.srcRegistry.insecure, r.dstRegistry.url, r.dstRegistry.insecure)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func initRegistry(url string, insecure bool, credential modifier.Modifier,
|
||||
repository string, tokenServiceURL ...string) (*registry, error) {
|
||||
registry := ®istry{
|
||||
url: url,
|
||||
insecure: insecure,
|
||||
}
|
||||
|
||||
// use the same transport for clients connecting to docker registry and Harbor UI
|
||||
transport := reg.GetHTTPTransport(insecure)
|
||||
|
||||
authorizer := auth.NewStandardTokenAuthorizer(&http.Client{
|
||||
Transport: transport,
|
||||
}, credential, tokenServiceURL...)
|
||||
uam := &job_utils.UserAgentModifier{
|
||||
UserAgent: "harbor-registry-client",
|
||||
}
|
||||
repositoryClient, err := reg.NewRepository(repository, url,
|
||||
&http.Client{
|
||||
Transport: reg.NewTransport(transport, authorizer, uam),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
registry.Repository = *repositoryClient
|
||||
|
||||
registry.client = common_http.NewClient(
|
||||
&http.Client{
|
||||
Transport: transport,
|
||||
}, credential)
|
||||
return registry, nil
|
||||
}
|
||||
|
||||
func (r *Replicator) createProject() error {
|
||||
if canceled(r.ctx) {
|
||||
r.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
p, _ := utils.ParseRepository(r.repository.name)
|
||||
project, err := r.srcRegistry.GetProject(p)
|
||||
if err != nil {
|
||||
r.logger.Errorf("failed to get project %s from source registry: %v", p, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = r.dstRegistry.CreateProject(project); err != nil {
|
||||
// other jobs may be also doing the same thing when the current job
|
||||
// is creating project or the project has already exist, so when the
|
||||
// response code is 409, continue to do next step
|
||||
if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict {
|
||||
r.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p)
|
||||
return nil
|
||||
}
|
||||
|
||||
r.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err)
|
||||
return err
|
||||
}
|
||||
r.logger.Infof("project %s is created on destination registry", p)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Replicator) pullManifest(tag string) (string, distribution.Manifest, error) {
|
||||
if canceled(r.ctx) {
|
||||
r.logger.Warning(errCanceled.Error())
|
||||
return "", nil, errCanceled
|
||||
}
|
||||
|
||||
acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest}
|
||||
digest, mediaType, payload, err := r.srcRegistry.PullManifest(tag, acceptMediaTypes)
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v",
|
||||
r.repository.name, tag, err)
|
||||
return "", nil, err
|
||||
}
|
||||
r.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s",
|
||||
r.repository.name, tag, digest)
|
||||
|
||||
if strings.Contains(mediaType, "application/json") {
|
||||
mediaType = schema1.MediaTypeManifest
|
||||
}
|
||||
|
||||
manifest, _, err := reg.UnMarshal(mediaType, payload)
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while parsing manifest: %v", err)
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return digest, manifest, nil
|
||||
}
|
||||
|
||||
func (r *Replicator) transferLayers(tag string, blobs []distribution.Descriptor) error {
|
||||
repository := r.repository.name
|
||||
|
||||
// all blobs(layers and config)
|
||||
for _, blob := range blobs {
|
||||
if canceled(r.ctx) {
|
||||
r.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
digest := blob.Digest.String()
|
||||
exist, err := r.dstRegistry.BlobExist(digest)
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v",
|
||||
digest, repository, tag, err)
|
||||
return err
|
||||
}
|
||||
if exist {
|
||||
r.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip",
|
||||
digest, repository, tag)
|
||||
continue
|
||||
}
|
||||
|
||||
r.logger.Infof("transferring blob %s of %s:%s to the destination registry ...",
|
||||
digest, repository, tag)
|
||||
size, data, err := r.srcRegistry.PullBlob(digest)
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v",
|
||||
digest, repository, tag, err)
|
||||
return err
|
||||
}
|
||||
if data != nil {
|
||||
defer data.Close()
|
||||
}
|
||||
if err = r.dstRegistry.PushBlob(digest, size, data); err != nil {
|
||||
r.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v",
|
||||
digest, repository, tag, err)
|
||||
return err
|
||||
}
|
||||
r.logger.Infof("blob %s of %s:%s transferred to the destination registry completed",
|
||||
digest, repository, tag)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Replicator) pushManifest(tag, digest string, manifest distribution.Manifest) error {
|
||||
if canceled(r.ctx) {
|
||||
r.logger.Warning(errCanceled.Error())
|
||||
return errCanceled
|
||||
}
|
||||
|
||||
repository := r.repository.name
|
||||
_, exist, err := r.dstRegistry.ManifestExist(digest)
|
||||
if err != nil {
|
||||
r.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest",
|
||||
repository, tag, err)
|
||||
} else {
|
||||
if exist {
|
||||
r.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing",
|
||||
repository, tag)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
mediaType, data, err := manifest.Payload()
|
||||
if err != nil {
|
||||
r.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v",
|
||||
repository, tag, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = r.dstRegistry.PushManifest(tag, mediaType, data); err != nil {
|
||||
r.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v",
|
||||
repository, tag, err)
|
||||
return err
|
||||
}
|
||||
r.logger.Infof("manifest of %s:%s has been pushed to the destination registry",
|
||||
repository, tag)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func canceled(ctx env.JobContext) bool {
|
||||
_, canceled := ctx.OPCommand()
|
||||
return canceled
|
||||
}
|
||||
|
||||
func retry(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := err.(net.Error)
|
||||
return ok
|
||||
}
|
25
src/jobservice_v2/job/impl/replication/replicate_test.go
Normal file
25
src/jobservice_v2/job/impl/replication/replicate_test.go
Normal file
@ -0,0 +1,25 @@
|
||||
package replication
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMaxFails(t *testing.T) {
|
||||
r := &Replicator{}
|
||||
assert.Equal(t, uint(3), r.MaxFails())
|
||||
}
|
||||
|
||||
func TestValidate(t *testing.T) {
|
||||
r := &Replicator{}
|
||||
require.Nil(t, r.Validate(nil))
|
||||
}
|
||||
|
||||
func TestShouldRetry(t *testing.T) {
|
||||
r := &Replicator{}
|
||||
assert.False(t, r.retry)
|
||||
r.retry = true
|
||||
assert.True(t, r.retry)
|
||||
}
|
@ -20,8 +20,8 @@ func NewRepositoryClient(endpoint string, insecure bool, credential auth.Credent
|
||||
Transport: transport,
|
||||
}, credential, tokenServiceEndpoint)
|
||||
|
||||
uam := &userAgentModifier{
|
||||
userAgent: "harbor-registry-client",
|
||||
uam := &UserAgentModifier{
|
||||
UserAgent: "harbor-registry-client",
|
||||
}
|
||||
|
||||
return registry.NewRepository(repository, endpoint, &http.Client{
|
||||
@ -43,8 +43,8 @@ func NewRepositoryClientForJobservice(repository, internalRegistryURL, secret, i
|
||||
Transport: transport,
|
||||
}, credential, internalTokenServiceURL)
|
||||
|
||||
uam := &userAgentModifier{
|
||||
userAgent: "harbor-registry-client",
|
||||
uam := &UserAgentModifier{
|
||||
UserAgent: "harbor-registry-client",
|
||||
}
|
||||
|
||||
return registry.NewRepository(repository, internalRegistryURL, &http.Client{
|
||||
@ -52,13 +52,14 @@ func NewRepositoryClientForJobservice(repository, internalRegistryURL, secret, i
|
||||
})
|
||||
}
|
||||
|
||||
type userAgentModifier struct {
|
||||
userAgent string
|
||||
// UserAgentModifier adds the "User-Agent" header to the request
|
||||
type UserAgentModifier struct {
|
||||
UserAgent string
|
||||
}
|
||||
|
||||
// Modify adds user-agent header to the request
|
||||
func (u *userAgentModifier) Modify(req *http.Request) error {
|
||||
req.Header.Set(http.CanonicalHeaderKey("User-Agent"), u.userAgent)
|
||||
func (u *UserAgentModifier) Modify(req *http.Request) error {
|
||||
req.Header.Set(http.CanonicalHeaderKey("User-Agent"), u.UserAgent)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/vmware/harbor/src/jobservice_v2/core"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job/impl"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job/impl/replication"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job/impl/scan"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/pool"
|
||||
)
|
||||
@ -143,7 +144,12 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg config.Conf
|
||||
ctx.ErrorChan <- err
|
||||
return redisWorkerPool //avoid nil pointer issue
|
||||
}
|
||||
if err := redisWorkerPool.RegisterJob(job.ImageScanJob, (*scan.ClairJob)(nil)); err != nil {
|
||||
if err := redisWorkerPool.RegisterJobs(
|
||||
map[string]interface{}{
|
||||
job.ImageScanJob: (*scan.ClairJob)(nil),
|
||||
job.ImageReplicationTransfer: (*replication.Replicator)(nil),
|
||||
job.ImageReplicationDelete: (*replication.Deleter)(nil),
|
||||
}); err != nil {
|
||||
//exit
|
||||
ctx.ErrorChan <- err
|
||||
return redisWorkerPool //avoid nil pointer issue
|
||||
|
@ -85,8 +85,10 @@ func init() {
|
||||
log.Fatalf("failed to get database configurations: %v", err)
|
||||
}
|
||||
dao.InitDatabase(database)
|
||||
_, file, _, _ := runtime.Caller(1)
|
||||
apppath, _ := filepath.Abs(filepath.Dir(filepath.Join(file, ".."+string(filepath.Separator))))
|
||||
_, file, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(file)
|
||||
dir = filepath.Join(dir, "..")
|
||||
apppath, _ := filepath.Abs(dir)
|
||||
beego.BConfig.WebConfig.Session.SessionOn = true
|
||||
beego.TestBeegoInit(apppath)
|
||||
|
||||
@ -138,7 +140,7 @@ func init() {
|
||||
beego.Router("/api/replications", &ReplicationAPI{})
|
||||
beego.Router("/api/labels", &LabelAPI{}, "post:Post;get:List")
|
||||
beego.Router("/api/labels/:id([0-9]+", &LabelAPI{}, "get:Get;put:Put;delete:Delete")
|
||||
beego.Router("/api/ping", &SystemInfoAPI{}, "get:Ping")
|
||||
beego.Router("/api/ping", &SystemInfoAPI{}, "get:Ping")
|
||||
_ = updateInitPassword(1, "Harbor12345")
|
||||
|
||||
if err := core.Init(); err != nil {
|
||||
|
@ -48,8 +48,10 @@ import (
|
||||
//var admin *usrInfo
|
||||
|
||||
func init() {
|
||||
_, file, _, _ := runtime.Caller(1)
|
||||
apppath, _ := filepath.Abs(filepath.Dir(filepath.Join(file, ".."+string(filepath.Separator))))
|
||||
_, file, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(file)
|
||||
dir = filepath.Join(dir, "..")
|
||||
apppath, _ := filepath.Abs(dir)
|
||||
beego.BConfig.WebConfig.Session.SessionOn = true
|
||||
beego.TestBeegoInit(apppath)
|
||||
beego.AddTemplateExt("htm")
|
||||
|
Loading…
Reference in New Issue
Block a user