From 1fea594c3d8d50cc8a07867c3a99e9f375666474 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Sat, 24 Mar 2018 12:36:27 +0800 Subject: [PATCH] Trigger replication job from UI with new jobservice --- .../job/impl/replication/transfer.go | 4 +- src/replication/core/controller.go | 61 +++-------- src/replication/core/controller_test.go | 17 ++- src/replication/replicator/replicator.go | 102 ++++++++++++++++-- src/replication/replicator/replicator_test.go | 18 +--- src/replication/target/target.go | 20 +++- src/ui/config/config.go | 5 - 7 files changed, 144 insertions(+), 83 deletions(-) diff --git a/src/jobservice_v2/job/impl/replication/transfer.go b/src/jobservice_v2/job/impl/replication/transfer.go index 3cda41c5c..27cd5f9d0 100644 --- a/src/jobservice_v2/job/impl/replication/transfer.go +++ b/src/jobservice_v2/job/impl/replication/transfer.go @@ -303,12 +303,12 @@ func (t *Transfer) pushManifest(tag, digest string, manifest distribution.Manife } repository := t.repository.name - _, exist, err := t.dstRegistry.ManifestExist(digest) + dgt, exist, err := t.dstRegistry.ManifestExist(tag) if err != nil { t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest", repository, tag, err) } else { - if exist { + if exist && dgt == digest { t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing", repository, tag) return nil diff --git a/src/replication/core/controller.go b/src/replication/core/controller.go index 07ec4a9a1..a7b22597a 100644 --- a/src/replication/core/controller.go +++ b/src/replication/core/controller.go @@ -16,11 +16,9 @@ package core import ( "fmt" - "strings" common_models "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/utils/log" - "github.com/vmware/harbor/src/jobservice/client" "github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/policy" @@ -28,7 +26,7 @@ import ( "github.com/vmware/harbor/src/replication/source" "github.com/vmware/harbor/src/replication/target" "github.com/vmware/harbor/src/replication/trigger" - "github.com/vmware/harbor/src/ui/config" + "github.com/vmware/harbor/src/ui/utils" ) // Controller defines the methods that a replicatoin controllter should implement @@ -81,7 +79,7 @@ func NewDefaultController(cfg ControllerConfig) *DefaultController { triggerManager: trigger.NewManager(cfg.CacheCapacity), } - ctl.replicator = replicator.NewDefaultReplicator(config.GlobalJobserviceClient) + ctl.replicator = replicator.NewDefaultReplicator(utils.GetJobServiceClient()) return ctl } @@ -225,20 +223,25 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i // prepare candidates for replication candidates := getCandidates(&policy, ctl.sourcer, metadata...) + if len(candidates) == 0 { + log.Debugf("replicaton candidates are null, no further action needed") + } - /* - targets := []*common_models.RepTarget{} - for _, targetID := range policy.TargetIDs { - target, err := ctl.targetManager.GetTarget(targetID) - if err != nil { - return err - } - targets = append(targets, target) + targets := []*common_models.RepTarget{} + for _, targetID := range policy.TargetIDs { + target, err := ctl.targetManager.GetTarget(targetID) + if err != nil { + return err } - */ + targets = append(targets, target) + } // submit the replication - return replicate(ctl.replicator, policyID, candidates) + return ctl.replicator.Replicate(&replicator.Replication{ + PolicyID: policyID, + Candidates: candidates, + Targets: targets, + }) } func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, @@ -286,33 +289,3 @@ func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) return source.NewDefaultFilterChain(filters) } - -func replicate(replicator replicator.Replicator, policyID int64, candidates []models.FilterItem) error { - if len(candidates) == 0 { - log.Debugf("replicaton candidates are null, no further action needed") - } - - repositories := map[string][]string{} - // TODO the operation of all candidates are same for now. Update it after supporting - // replicate deletion - operation := "" - for _, candidate := range candidates { - strs := strings.SplitN(candidate.Value, ":", 2) - repositories[strs[0]] = append(repositories[strs[0]], strs[1]) - operation = candidate.Operation - } - - for repository, tags := range repositories { - replication := &client.Replication{ - PolicyID: policyID, - Repository: repository, - Operation: operation, - Tags: tags, - } - log.Debugf("submiting replication job to jobservice: %v", replication) - if err := replicator.Replicate(replication); err != nil { - return err - } - } - return nil -} diff --git a/src/replication/core/controller_test.go b/src/replication/core/controller_test.go index b88e2165b..cbe8b9967 100644 --- a/src/replication/core/controller_test.go +++ b/src/replication/core/controller_test.go @@ -23,21 +23,20 @@ import ( "github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/source" + "github.com/vmware/harbor/src/replication/target" + "github.com/vmware/harbor/src/replication/trigger" ) func TestMain(m *testing.M) { - GlobalController = NewDefaultController(ControllerConfig{}) - // set the policy manager used by GlobalController with a fake policy manager - controller := GlobalController.(*DefaultController) - controller.policyManager = &test.FakePolicyManager{} + GlobalController = &DefaultController{ + policyManager: &test.FakePolicyManager{}, + targetManager: target.NewDefaultManager(), + sourcer: source.NewSourcer(), + triggerManager: trigger.NewManager(0), + } os.Exit(m.Run()) } -func TestNewDefaultController(t *testing.T) { - controller := NewDefaultController(ControllerConfig{}) - assert.NotNil(t, controller) -} - func TestInit(t *testing.T) { assert.Nil(t, GlobalController.Init()) } diff --git a/src/replication/replicator/replicator.go b/src/replication/replicator/replicator.go index 37e387ce8..faa284422 100644 --- a/src/replication/replicator/replicator.go +++ b/src/replication/replicator/replicator.go @@ -15,27 +15,117 @@ package replicator import ( - "github.com/vmware/harbor/src/jobservice/client" + "strings" + + "github.com/vmware/harbor/src/common/dao" + common_job "github.com/vmware/harbor/src/common/job" + job_models "github.com/vmware/harbor/src/common/job/models" + common_models "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils/log" + "github.com/vmware/harbor/src/replication/models" + "github.com/vmware/harbor/src/ui/config" ) +// Replication holds information for a replication +type Replication struct { + PolicyID int64 + Candidates []models.FilterItem + Targets []*common_models.RepTarget + Operation string +} + // Replicator submits the replication work to the jobservice type Replicator interface { - Replicate(*client.Replication) error + Replicate(*Replication) error } // DefaultReplicator provides a default implement for Replicator type DefaultReplicator struct { - client client.Client + client common_job.Client } // NewDefaultReplicator returns an instance of DefaultReplicator -func NewDefaultReplicator(client client.Client) *DefaultReplicator { +func NewDefaultReplicator(client common_job.Client) *DefaultReplicator { return &DefaultReplicator{ client: client, } } // Replicate ... -func (d *DefaultReplicator) Replicate(replication *client.Replication) error { - return d.client.SubmitReplicationJob(replication) +func (d *DefaultReplicator) Replicate(replication *Replication) error { + repositories := map[string][]string{} + // TODO the operation of all candidates are same for now. Update it after supporting + // replicate deletion + operation := "" + for _, candidate := range replication.Candidates { + strs := strings.SplitN(candidate.Value, ":", 2) + repositories[strs[0]] = append(repositories[strs[0]], strs[1]) + operation = candidate.Operation + } + + for _, target := range replication.Targets { + for repository, tags := range repositories { + // create job in database + id, err := dao.AddRepJob(common_models.RepJob{ + PolicyID: replication.PolicyID, + Repository: repository, + TagList: tags, + Operation: operation, + }) + if err != nil { + return err + } + + // submit job to jobservice + log.Debugf("submiting replication job to jobservice, repository: %s, tags: %v, operation: %s, target: %s", + repository, tags, operation, target.URL) + job := &job_models.JobData{ + Metadata: &job_models.JobMetadata{ + JobKind: common_job.JobKindGeneric, + }, + // TODO + StatusHook: "", + } + + if operation == common_models.RepOpTransfer { + url, err := config.ExtEndpoint() + if err != nil { + return err + } + job.Name = common_job.ImageTransfer + job.Parameters = map[string]interface{}{ + "repository": repository, + "tags": tags, + "src_registry_url": url, + "src_registry_insecure": true, + //"src_token_service_url":"", + "dst_registry_url": target.URL, + "dst_registry_insecure": target.Insecure, + "dst_registry_username": target.Username, + "dst_registry_password": target.Password, + } + } else { + job.Name = common_job.ImageDelete + job.Parameters = map[string]interface{}{ + "repository": repository, + "tags": tags, + "dst_registry_url": target.URL, + "dst_registry_insecure": target.Insecure, + "dst_registry_username": target.Username, + "dst_registry_password": target.Password, + } + } + + uuid, err := d.client.SubmitJob(job) + if err != nil { + return err + } + + // create the mapping relationship between the jobs in database and jobservice + if err = dao.SetRepJobUUID(id, uuid); err != nil { + return err + } + } + } + return nil } diff --git a/src/replication/replicator/replicator_test.go b/src/replication/replicator/replicator_test.go index 934939cd0..5ae22086e 100644 --- a/src/replication/replicator/replicator_test.go +++ b/src/replication/replicator/replicator_test.go @@ -16,22 +16,8 @@ package replicator import ( "testing" - - "github.com/stretchr/testify/assert" - "github.com/vmware/harbor/src/jobservice/client" ) -type fakeJobserviceClient struct{} - -func (f *fakeJobserviceClient) SubmitReplicationJob(replication *client.Replication) error { - return nil -} - -func (f *fakeJobserviceClient) StopReplicationJobs(policyID int64) error { - return nil -} - -func TestReplicate(t *testing.T) { - replicator := NewDefaultReplicator(&fakeJobserviceClient{}) - assert.Nil(t, replicator.Replicate(&client.Replication{})) +func TestNewDefaultReplicator(t *testing.T) { + NewDefaultReplicator(nil) } diff --git a/src/replication/target/target.go b/src/replication/target/target.go index ab8e815e5..3db7e7694 100644 --- a/src/replication/target/target.go +++ b/src/replication/target/target.go @@ -17,6 +17,8 @@ package target import ( "github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils" + "github.com/vmware/harbor/src/ui/config" ) // Manager defines the methods that a target manager should implement @@ -34,5 +36,21 @@ func NewDefaultManager() *DefaultManager { // GetTarget ... func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) { - return dao.GetRepTarget(id) + target, err := dao.GetRepTarget(id) + if err != nil { + return nil, err + } + // decrypt the password + if len(target.Password) > 0 { + key, err := config.SecretKey() + if err != nil { + return nil, err + } + pwd, err := utils.ReversibleDecrypt(target.Password, key) + if err != nil { + return nil, err + } + target.Password = pwd + } + return target, nil } diff --git a/src/ui/config/config.go b/src/ui/config/config.go index a16887f58..2b29c666c 100644 --- a/src/ui/config/config.go +++ b/src/ui/config/config.go @@ -96,11 +96,6 @@ func InitByURL(adminServerURL string) error { // init project manager based on deploy mode initProjectManager() - GlobalJobserviceClient = jobservice_client.NewDefaultClient(InternalJobServiceURL(), - &jobservice_client.Config{ - Secret: UISecret(), - }) - return nil }