From b73acde051f5061f8dbbed8daa858994b6f28328 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Wed, 10 Apr 2019 13:08:17 +0800 Subject: [PATCH] Support the migration for scheduled replication rule from previous version of Harbor Support the migration for scheduled replication rule from previous version of Harbor Signed-off-by: Wenkai Yin --- src/common/job/const.go | 10 +- src/core/router.go | 2 +- src/jobservice/job/impl/replication/delete.go | 139 ------- .../job/impl/replication/delete_test.go | 38 -- .../ng}/scheduler.go | 14 +- .../job/impl/replication/registry.go | 93 ----- .../job/impl/replication/replicate.go | 97 ----- .../job/impl/replication/replicate_test.go | 36 -- .../job/impl/replication/transfer.go | 368 ------------------ .../job/impl/replication/transfer_test.go | 38 -- src/jobservice/runtime/bootstrap.go | 15 +- .../ng/policy/scheduler/scheduler.go | 13 +- .../ng/policy/scheduler/scheduler_test.go | 4 +- 13 files changed, 25 insertions(+), 842 deletions(-) delete mode 100644 src/jobservice/job/impl/replication/delete.go delete mode 100644 src/jobservice/job/impl/replication/delete_test.go rename src/jobservice/job/impl/{scheduler => replication/ng}/scheduler.go (85%) delete mode 100644 src/jobservice/job/impl/replication/registry.go delete mode 100644 src/jobservice/job/impl/replication/replicate.go delete mode 100644 src/jobservice/job/impl/replication/replicate_test.go delete mode 100644 src/jobservice/job/impl/replication/transfer.go delete mode 100644 src/jobservice/job/impl/replication/transfer_test.go diff --git a/src/common/job/const.go b/src/common/job/const.go index a465ec680..656a1d547 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -5,18 +5,12 @@ const ( ImageScanJob = "IMAGE_SCAN" // ImageScanAllJob is the name of "scanall" job in job service ImageScanAllJob = "IMAGE_SCAN_ALL" - // ImageTransfer : the name of image transfer job in job service - ImageTransfer = "IMAGE_TRANSFER" - // ImageDelete : the name of image delete job in job service - ImageDelete = "IMAGE_DELETE" - // ImageReplicate : the name of image replicate job in job service - ImageReplicate = "IMAGE_REPLICATE" // ImageGC the name of image garbage collection job in job service ImageGC = "IMAGE_GC" // Replication : the name of the replication job in job service Replication = "REPLICATION" - // Scheduler : the name of the scheduler job in job service - Scheduler = "SCHEDULER" + // ReplicationScheduler : the name of the replication scheduler job in job service + ReplicationScheduler = "IMAGE_REPLICATE" // JobKindGeneric : Kind of generic job JobKindGeneric = "Generic" diff --git a/src/core/router.go b/src/core/router.go index 43932370d..cb55b7403 100644 --- a/src/core/router.go +++ b/src/core/router.go @@ -126,7 +126,7 @@ func initRouters() { beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle") beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan") beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob") - beego.Router("/service/notifications/jobs/replication/schedule/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationScheduleJob") + beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationScheduleJob") beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask") beego.Router("/service/token", &token.Handler{}) diff --git a/src/jobservice/job/impl/replication/delete.go b/src/jobservice/job/impl/replication/delete.go deleted file mode 100644 index 6651b2795..000000000 --- a/src/jobservice/job/impl/replication/delete.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "net/http" - - common_http "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/jobservice/env" - "github.com/goharbor/harbor/src/jobservice/logger" -) - -// Deleter deletes repository or images on the destination registry -type Deleter struct { - ctx env.JobContext - repository *repository - dstRegistry *registry - logger logger.Interface - retry bool -} - -// ShouldRetry : retry if the error is network error -func (d *Deleter) ShouldRetry() bool { - return d.retry -} - -// MaxFails ... -func (d *Deleter) MaxFails() uint { - return 3 -} - -// Validate .... -func (d *Deleter) Validate(params map[string]interface{}) error { - return nil -} - -// Run ... -func (d *Deleter) Run(ctx env.JobContext, params map[string]interface{}) error { - err := d.run(ctx, params) - d.retry = retry(err) - return err -} - -func (d *Deleter) run(ctx env.JobContext, params map[string]interface{}) error { - if err := d.init(ctx, params); err != nil { - return err - } - - return d.delete() -} - -func (d *Deleter) init(ctx env.JobContext, params map[string]interface{}) error { - d.logger = ctx.GetLogger() - d.ctx = ctx - - if canceled(d.ctx) { - d.logger.Warning(errCanceled.Error()) - return errCanceled - } - - d.repository = &repository{ - name: params["repository"].(string), - } - if tags, ok := params["tags"]; ok { - tgs := tags.([]interface{}) - for _, tg := range tgs { - d.repository.tags = append(d.repository.tags, tg.(string)) - } - } - - url := params["dst_registry_url"].(string) - insecure := params["dst_registry_insecure"].(bool) - cred := auth.NewBasicAuthCredential( - params["dst_registry_username"].(string), - params["dst_registry_password"].(string)) - - var err error - d.dstRegistry, err = initRegistry(url, insecure, cred, d.repository.name) - if err != nil { - d.logger.Errorf("failed to create client for destination registry: %v", err) - return err - } - - d.logger.Infof("initialization completed: repository: %s, tags: %v, destination URL: %s, insecure: %v", - d.repository.name, d.repository.tags, d.dstRegistry.url, d.dstRegistry.insecure) - - return nil -} - -func (d *Deleter) delete() error { - repository := d.repository.name - tags := d.repository.tags - if len(tags) == 0 { - if canceled(d.ctx) { - d.logger.Warning(errCanceled.Error()) - return errCanceled - } - if err := d.dstRegistry.DeleteRepository(repository); err != nil { - if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound { - d.logger.Warningf("repository %s not found", repository) - return nil - } - d.logger.Errorf("failed to delete repository %s: %v", repository, err) - return err - } - d.logger.Infof("repository %s has been deleted", repository) - return nil - } - - for _, tag := range tags { - if canceled(d.ctx) { - d.logger.Warning(errCanceled.Error()) - return errCanceled - } - if err := d.dstRegistry.DeleteImage(repository, tag); err != nil { - if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusNotFound { - d.logger.Warningf("image %s:%s not found", repository, tag) - return nil - } - d.logger.Errorf("failed to delete image %s:%s: %v", repository, tag, err) - return err - } - d.logger.Infof("image %s:%s has been deleted", repository, tag) - } - return nil -} diff --git a/src/jobservice/job/impl/replication/delete_test.go b/src/jobservice/job/impl/replication/delete_test.go deleted file mode 100644 index c41cb11ee..000000000 --- a/src/jobservice/job/impl/replication/delete_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package replication - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestMaxFailsOfDeleter(t *testing.T) { - d := &Deleter{} - assert.Equal(t, uint(3), d.MaxFails()) -} - -func TestValidateOfDeleter(t *testing.T) { - d := &Deleter{} - require.Nil(t, d.Validate(nil)) -} - -func TestShouldRetryOfDeleter(t *testing.T) { - d := &Deleter{} - assert.False(t, d.ShouldRetry()) - d.retry = true - assert.True(t, d.ShouldRetry()) -} diff --git a/src/jobservice/job/impl/scheduler/scheduler.go b/src/jobservice/job/impl/replication/ng/scheduler.go similarity index 85% rename from src/jobservice/job/impl/scheduler/scheduler.go rename to src/jobservice/job/impl/replication/ng/scheduler.go index 1854ed411..295d63150 100644 --- a/src/jobservice/job/impl/scheduler/scheduler.go +++ b/src/jobservice/job/impl/replication/ng/scheduler.go @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package ng import ( + "fmt" "net/http" "os" + "github.com/goharbor/harbor/src/replication/ng/model" + common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/http/modifier/auth" reg "github.com/goharbor/harbor/src/common/utils/registry" @@ -57,12 +60,17 @@ func (s *Scheduler) Run(ctx env.JobContext, params map[string]interface{}) error logger := ctx.GetLogger() url := params["url"].(string) - data := params["data"] + url = fmt.Sprintf("%s/api/replication/executions?trigger=%s", url, model.TriggerTypeScheduled) + policyID := (int64)(params["policy_id"].(float64)) cred := auth.NewSecretAuthorizer(os.Getenv("JOBSERVICE_SECRET")) client := common_http.NewClient(&http.Client{ Transport: reg.GetHTTPTransport(true), }, cred) - if err := client.Post(url, data); err != nil { + if err := client.Post(url, struct { + PolicyID int64 `json:"policy_id"` + }{ + PolicyID: policyID, + }); err != nil { logger.Errorf("failed to run the schedule job: %v", err) return err } diff --git a/src/jobservice/job/impl/replication/registry.go b/src/jobservice/job/impl/replication/registry.go deleted file mode 100644 index 8f0c07507..000000000 --- a/src/jobservice/job/impl/replication/registry.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "fmt" - "net/url" - "strconv" - "strings" - - common_http "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/models" - reg "github.com/goharbor/harbor/src/common/utils/registry" -) - -type repository struct { - name string - tags []string -} - -// registry wraps operations of Harbor UI and docker registry into one struct -type registry struct { - reg.Repository // docker registry client - client *common_http.Client // Harbor client - url string - insecure bool -} - -func (r *registry) GetProject(name string) (*models.Project, error) { - url, err := url.Parse(strings.TrimRight(r.url, "/") + "/api/projects") - if err != nil { - return nil, err - } - q := url.Query() - q.Set("name", name) - url.RawQuery = q.Encode() - - projects := []*models.Project{} - if err = r.client.Get(url.String(), &projects); err != nil { - return nil, err - } - - for _, project := range projects { - if project.Name == name { - return project, nil - } - } - - return nil, fmt.Errorf("project %s not found", name) -} - -func (r *registry) CreateProject(project *models.Project) error { - // only replicate the public property of project - pro := struct { - models.ProjectRequest - Public int `json:"public"` - }{ - ProjectRequest: models.ProjectRequest{ - Name: project.Name, - Metadata: map[string]string{ - models.ProMetaPublic: strconv.FormatBool(project.IsPublic()), - }, - }, - } - - // put "public" property in both metadata and public field to keep compatibility - // with old version API(<=1.2.0) - if project.IsPublic() { - pro.Public = 1 - } - - return r.client.Post(strings.TrimRight(r.url, "/")+"/api/projects/", pro) -} - -func (r *registry) DeleteRepository(repository string) error { - return r.client.Delete(strings.TrimRight(r.url, "/") + "/api/repositories/" + repository) -} - -func (r *registry) DeleteImage(repository, tag string) error { - return r.client.Delete(strings.TrimRight(r.url, "/") + "/api/repositories/" + repository + "/tags/" + tag) -} diff --git a/src/jobservice/job/impl/replication/replicate.go b/src/jobservice/job/impl/replication/replicate.go deleted file mode 100644 index 7b469ad1d..000000000 --- a/src/jobservice/job/impl/replication/replicate.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "fmt" - "net/http" - - common_http "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/http/modifier/auth" - reg "github.com/goharbor/harbor/src/common/utils/registry" - "github.com/goharbor/harbor/src/jobservice/env" - "github.com/goharbor/harbor/src/jobservice/logger" -) - -// Replicator call UI's API to start a repliation according to the policy ID -// passed in parameters -type Replicator struct { - ctx env.JobContext - url string // the URL of UI service - insecure bool - policyID int64 - client *common_http.Client - logger logger.Interface -} - -// ShouldRetry ... -func (r *Replicator) ShouldRetry() bool { - return false -} - -// MaxFails ... -func (r *Replicator) MaxFails() uint { - return 0 -} - -// Validate .... -func (r *Replicator) Validate(params map[string]interface{}) error { - return nil -} - -// Run ... -func (r *Replicator) Run(ctx env.JobContext, params map[string]interface{}) error { - if err := r.init(ctx, params); err != nil { - return err - } - return r.replicate() -} - -func (r *Replicator) init(ctx env.JobContext, params map[string]interface{}) error { - r.logger = ctx.GetLogger() - r.ctx = ctx - if canceled(r.ctx) { - r.logger.Warning(errCanceled.Error()) - return errCanceled - } - - r.policyID = (int64)(params["policy_id"].(float64)) - r.url = params["url"].(string) - r.insecure = params["insecure"].(bool) - cred := auth.NewSecretAuthorizer(secret()) - - r.client = common_http.NewClient(&http.Client{ - Transport: reg.GetHTTPTransport(r.insecure), - }, cred) - - r.logger.Infof("initialization completed: policy ID: %d, URL: %s, insecure: %v", - r.policyID, r.url, r.insecure) - - return nil -} - -func (r *Replicator) replicate() error { - if err := r.client.Post(fmt.Sprintf("%s/api/replications", r.url), struct { - PolicyID int64 `json:"policy_id"` - }{ - PolicyID: r.policyID, - }); err != nil { - r.logger.Errorf("failed to send the replication request to %s: %v", r.url, err) - return err - } - r.logger.Info("the replication request has been sent successfully") - return nil - -} diff --git a/src/jobservice/job/impl/replication/replicate_test.go b/src/jobservice/job/impl/replication/replicate_test.go deleted file mode 100644 index 285f60c3c..000000000 --- a/src/jobservice/job/impl/replication/replicate_test.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package replication - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestMaxFailsOfReplicator(t *testing.T) { - r := &Replicator{} - assert.Equal(t, uint(0), r.MaxFails()) -} - -func TestValidateOfReplicator(t *testing.T) { - r := &Replicator{} - require.Nil(t, r.Validate(nil)) -} - -func TestShouldRetryOfReplicator(t *testing.T) { - r := &Replicator{} - assert.False(t, r.ShouldRetry()) -} diff --git a/src/jobservice/job/impl/replication/transfer.go b/src/jobservice/job/impl/replication/transfer.go deleted file mode 100644 index a949e45b1..000000000 --- a/src/jobservice/job/impl/replication/transfer.go +++ /dev/null @@ -1,368 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "errors" - "fmt" - "net" - "net/http" - "os" - "strings" - - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/schema1" - "github.com/docker/distribution/manifest/schema2" - common_http "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/common/http/modifier" - httpauth "github.com/goharbor/harbor/src/common/http/modifier/auth" - "github.com/goharbor/harbor/src/common/utils" - reg "github.com/goharbor/harbor/src/common/utils/registry" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/jobservice/env" - job_utils "github.com/goharbor/harbor/src/jobservice/job/impl/utils" - "github.com/goharbor/harbor/src/jobservice/logger" -) - -var ( - errCanceled = errors.New("the job is canceled") -) - -// Transfer images from source registry to the destination one -type Transfer struct { - ctx env.JobContext - repository *repository - srcRegistry *registry - dstRegistry *registry - logger logger.Interface - retry bool -} - -// ShouldRetry : retry if the error is network error -func (t *Transfer) ShouldRetry() bool { - return t.retry -} - -// MaxFails ... -func (t *Transfer) MaxFails() uint { - return 3 -} - -// Validate .... -func (t *Transfer) Validate(params map[string]interface{}) error { - return nil -} - -// Run ... -func (t *Transfer) Run(ctx env.JobContext, params map[string]interface{}) error { - err := t.run(ctx, params) - t.retry = retry(err) - return err -} - -func (t *Transfer) run(ctx env.JobContext, params map[string]interface{}) error { - // initialize - if err := t.init(ctx, params); err != nil { - return err - } - // try to create project on destination registry - if err := t.createProject(); err != nil { - return err - } - // replicate the images - for _, tag := range t.repository.tags { - digest, manifest, err := t.pullManifest(tag) - if err != nil { - return err - } - if err := t.transferLayers(tag, manifest.References()); err != nil { - return err - } - if err := t.pushManifest(tag, digest, manifest); err != nil { - return err - } - } - - return nil -} - -func (t *Transfer) init(ctx env.JobContext, params map[string]interface{}) error { - t.logger = ctx.GetLogger() - t.ctx = ctx - - if canceled(t.ctx) { - t.logger.Warning(errCanceled.Error()) - return errCanceled - } - - // init images that need to be replicated - t.repository = &repository{ - name: params["repository"].(string), - } - if tags, ok := params["tags"]; ok { - tgs := tags.([]interface{}) - for _, tg := range tgs { - t.repository.tags = append(t.repository.tags, tg.(string)) - } - } - - var err error - // init source registry client - srcURL := params["src_registry_url"].(string) - srcInsecure := params["src_registry_insecure"].(bool) - srcCred := httpauth.NewSecretAuthorizer(secret()) - srcTokenServiceURL := "" - if stsu, ok := params["src_token_service_url"]; ok { - srcTokenServiceURL = stsu.(string) - } - - if len(srcTokenServiceURL) > 0 { - t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name, srcTokenServiceURL) - } else { - t.srcRegistry, err = initRegistry(srcURL, srcInsecure, srcCred, t.repository.name) - } - if err != nil { - t.logger.Errorf("failed to create client for source registry: %v", err) - return err - } - - // init destination registry client - dstURL := params["dst_registry_url"].(string) - dstInsecure := params["dst_registry_insecure"].(bool) - dstCred := auth.NewBasicAuthCredential( - params["dst_registry_username"].(string), - params["dst_registry_password"].(string)) - t.dstRegistry, err = initRegistry(dstURL, dstInsecure, dstCred, t.repository.name) - if err != nil { - t.logger.Errorf("failed to create client for destination registry: %v", err) - return err - } - - // get the tag list first if it is null - if len(t.repository.tags) == 0 { - tags, err := t.srcRegistry.ListTag() - if err != nil { - t.logger.Errorf("an error occurred while listing tags for the source repository: %v", err) - return err - } - - if len(tags) == 0 { - err = fmt.Errorf("empty tag list for repository %s", t.repository.name) - t.logger.Error(err) - return err - } - t.repository.tags = tags - } - - t.logger.Infof("initialization completed: repository: %s, tags: %v, source registry: URL-%s insecure-%v, destination registry: URL-%s insecure-%v", - t.repository.name, t.repository.tags, t.srcRegistry.url, t.srcRegistry.insecure, t.dstRegistry.url, t.dstRegistry.insecure) - - return nil -} - -func initRegistry(url string, insecure bool, credential modifier.Modifier, - repository string, tokenServiceURL ...string) (*registry, error) { - registry := ®istry{ - url: url, - insecure: insecure, - } - - // use the same transport for clients connecting to docker registry and Harbor UI - transport := reg.GetHTTPTransport(insecure) - - authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ - Transport: transport, - }, credential, tokenServiceURL...) - uam := &job_utils.UserAgentModifier{ - UserAgent: "harbor-registry-client", - } - repositoryClient, err := reg.NewRepository(repository, url, - &http.Client{ - Transport: reg.NewTransport(transport, authorizer, uam), - }) - if err != nil { - return nil, err - } - registry.Repository = *repositoryClient - - registry.client = common_http.NewClient( - &http.Client{ - Transport: transport, - }, credential) - return registry, nil -} - -func (t *Transfer) createProject() error { - if canceled(t.ctx) { - t.logger.Warning(errCanceled.Error()) - return errCanceled - } - p, _ := utils.ParseRepository(t.repository.name) - project, err := t.srcRegistry.GetProject(p) - if err != nil { - t.logger.Errorf("failed to get project %s from source registry: %v", p, err) - return err - } - - if err = t.dstRegistry.CreateProject(project); err != nil { - // other jobs may be also doing the same thing when the current job - // is creating project or the project has already exist, so when the - // response code is 409, continue to do next step - if e, ok := err.(*common_http.Error); ok && e.Code == http.StatusConflict { - t.logger.Warningf("the status code is 409 when creating project %s on destination registry, try to do next step", p) - return nil - } - - t.logger.Errorf("an error occurred while creating project %s on destination registry: %v", p, err) - return err - } - t.logger.Infof("project %s is created on destination registry", p) - return nil -} - -func (t *Transfer) pullManifest(tag string) (string, distribution.Manifest, error) { - if canceled(t.ctx) { - t.logger.Warning(errCanceled.Error()) - return "", nil, errCanceled - } - - acceptMediaTypes := []string{schema1.MediaTypeManifest, schema2.MediaTypeManifest} - digest, mediaType, payload, err := t.srcRegistry.PullManifest(tag, acceptMediaTypes) - if err != nil { - t.logger.Errorf("an error occurred while pulling manifest of %s:%s from source registry: %v", - t.repository.name, tag, err) - return "", nil, err - } - t.logger.Infof("manifest of %s:%s pulled successfully from source registry: %s", - t.repository.name, tag, digest) - - if strings.Contains(mediaType, "application/json") { - mediaType = schema1.MediaTypeManifest - } - - manifest, _, err := reg.UnMarshal(mediaType, payload) - if err != nil { - t.logger.Errorf("an error occurred while parsing manifest: %v", err) - return "", nil, err - } - - return digest, manifest, nil -} - -func (t *Transfer) transferLayers(tag string, blobs []distribution.Descriptor) error { - repository := t.repository.name - - // all blobs(layers and config) - for _, blob := range blobs { - if canceled(t.ctx) { - t.logger.Warning(errCanceled.Error()) - return errCanceled - } - - digest := blob.Digest.String() - - if blob.MediaType == schema2.MediaTypeForeignLayer { - t.logger.Infof("blob %s of %s:%s is an foreign layer, skip", digest, repository, tag) - continue - } - - exist, err := t.dstRegistry.BlobExist(digest) - if err != nil { - t.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on destination registry: %v", - digest, repository, tag, err) - return err - } - if exist { - t.logger.Infof("blob %s of %s:%s already exists on the destination registry, skip", - digest, repository, tag) - continue - } - - t.logger.Infof("transferring blob %s of %s:%s to the destination registry ...", - digest, repository, tag) - size, data, err := t.srcRegistry.PullBlob(digest) - if err != nil { - t.logger.Errorf("an error occurred while pulling blob %s of %s:%s from the source registry: %v", - digest, repository, tag, err) - return err - } - if data != nil { - defer data.Close() - } - if err = t.dstRegistry.PushBlob(digest, size, data); err != nil { - t.logger.Errorf("an error occurred while pushing blob %s of %s:%s to the distination registry: %v", - digest, repository, tag, err) - return err - } - t.logger.Infof("blob %s of %s:%s transferred to the destination registry completed", - digest, repository, tag) - } - - return nil -} - -func (t *Transfer) pushManifest(tag, digest string, manifest distribution.Manifest) error { - if canceled(t.ctx) { - t.logger.Warning(errCanceled.Error()) - return errCanceled - } - - repository := t.repository.name - dgt, exist, err := t.dstRegistry.ManifestExist(tag) - if err != nil { - t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest", - repository, tag, err) - } else { - if exist && dgt == digest { - t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing", - repository, tag) - return nil - } - } - - mediaType, data, err := manifest.Payload() - if err != nil { - t.logger.Errorf("an error occurred while getting payload of manifest for %s:%s : %v", - repository, tag, err) - return err - } - - if _, err = t.dstRegistry.PushManifest(tag, mediaType, data); err != nil { - t.logger.Errorf("an error occurred while pushing manifest of %s:%s to the destination registry: %v", - repository, tag, err) - return err - } - t.logger.Infof("manifest of %s:%s has been pushed to the destination registry", - repository, tag) - - return nil -} - -func canceled(ctx env.JobContext) bool { - _, canceled := ctx.OPCommand() - return canceled -} - -func retry(err error) bool { - if err == nil { - return false - } - _, ok := err.(net.Error) - return ok -} - -func secret() string { - return os.Getenv("JOBSERVICE_SECRET") -} diff --git a/src/jobservice/job/impl/replication/transfer_test.go b/src/jobservice/job/impl/replication/transfer_test.go deleted file mode 100644 index 585fd29c8..000000000 --- a/src/jobservice/job/impl/replication/transfer_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package replication - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestMaxFailsOfTransfer(t *testing.T) { - r := &Transfer{} - assert.Equal(t, uint(3), r.MaxFails()) -} - -func TestValidateOfTransfer(t *testing.T) { - r := &Transfer{} - require.Nil(t, r.Validate(nil)) -} - -func TestShouldRetryOfTransfer(t *testing.T) { - r := &Transfer{} - assert.False(t, r.ShouldRetry()) - r.retry = true - assert.True(t, r.ShouldRetry()) -} diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index ed3c8c8c9..7b5d6e1fc 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -31,10 +31,8 @@ import ( jsjob "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/job/impl/gc" - "github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng" "github.com/goharbor/harbor/src/jobservice/job/impl/scan" - "github.com/goharbor/harbor/src/jobservice/job/impl/scheduler" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/pool" @@ -207,14 +205,11 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ - job.ImageScanJob: (*scan.ClairJob)(nil), - job.ImageScanAllJob: (*scan.All)(nil), - job.ImageTransfer: (*replication.Transfer)(nil), - job.ImageDelete: (*replication.Deleter)(nil), - job.ImageReplicate: (*replication.Replicator)(nil), - job.ImageGC: (*gc.GarbageCollector)(nil), - job.Replication: (*ng.Replication)(nil), - job.Scheduler: (*scheduler.Scheduler)(nil), + job.ImageScanJob: (*scan.ClairJob)(nil), + job.ImageScanAllJob: (*scan.All)(nil), + job.ImageGC: (*gc.GarbageCollector)(nil), + job.Replication: (*ng.Replication)(nil), + job.ReplicationScheduler: (*ng.Scheduler)(nil), }); err != nil { // exit return nil, err diff --git a/src/replication/ng/policy/scheduler/scheduler.go b/src/replication/ng/policy/scheduler/scheduler.go index 791fb65e9..850a3bd9d 100644 --- a/src/replication/ng/policy/scheduler/scheduler.go +++ b/src/replication/ng/policy/scheduler/scheduler.go @@ -19,8 +19,6 @@ import ( "net/http" "time" - "github.com/goharbor/harbor/src/replication/ng/model" - common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/job" job_models "github.com/goharbor/harbor/src/common/job/models" @@ -62,15 +60,12 @@ func (s *scheduler) Schedule(policyID int64, cron string) error { } log.Debugf("the schedule job record %d added", id) - replicateURL := fmt.Sprintf("%s/api/replication/executions?trigger=%s", config.Config.CoreURL, model.TriggerTypeScheduled) - statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id) + statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/%d", config.Config.CoreURL, id) jobID, err := s.jobservice.SubmitJob(&job_models.JobData{ - Name: job.Scheduler, + Name: job.ReplicationScheduler, Parameters: map[string]interface{}{ - "url": replicateURL, - "data": &models.Execution{ - PolicyID: policyID, - }, + "url": config.Config.CoreURL, + "policy_id": policyID, }, Metadata: &job_models.JobMetadata{ JobKind: job.JobKindPeriodic, diff --git a/src/replication/ng/policy/scheduler/scheduler_test.go b/src/replication/ng/policy/scheduler/scheduler_test.go index e048daec9..d15fb6264 100644 --- a/src/replication/ng/policy/scheduler/scheduler_test.go +++ b/src/replication/ng/policy/scheduler/scheduler_test.go @@ -142,9 +142,9 @@ func TestSchedule(t *testing.T) { require.Equal(t, 1, len(sjs)) assert.Equal(t, uuid, sjs[0].JobID) - execution, ok := js.jobData.Parameters["data"].(*rep_models.Execution) + policyID, ok := js.jobData.Parameters["policy_id"].(int64) require.True(t, ok) - assert.Equal(t, policyID, execution.PolicyID) + assert.Equal(t, policyID, policyID) } func TestUnschedule(t *testing.T) {