Merge pull request #4506 from ywk253100/180327_replication_job

Trigger replication job from UI with new jobservice
This commit is contained in:
Wenkai Yin 2018-03-28 12:57:04 +08:00 committed by GitHub
commit d28bad4806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 144 additions and 83 deletions

View File

@ -303,12 +303,12 @@ func (t *Transfer) pushManifest(tag, digest string, manifest distribution.Manife
} }
repository := t.repository.name repository := t.repository.name
_, exist, err := t.dstRegistry.ManifestExist(digest) dgt, exist, err := t.dstRegistry.ManifestExist(tag)
if err != nil { if err != nil {
t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest", t.logger.Warningf("an error occurred while checking the existence of manifest of %s:%s on the destination registry: %v, try to push manifest",
repository, tag, err) repository, tag, err)
} else { } else {
if exist { if exist && dgt == digest {
t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing", t.logger.Infof("manifest of %s:%s exists on the destination registry, skip manifest pushing",
repository, tag) repository, tag)
return nil return nil

View File

@ -16,11 +16,9 @@ package core
import ( import (
"fmt" "fmt"
"strings"
common_models "github.com/vmware/harbor/src/common/models" common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/jobservice/client"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/policy" "github.com/vmware/harbor/src/replication/policy"
@ -28,7 +26,7 @@ import (
"github.com/vmware/harbor/src/replication/source" "github.com/vmware/harbor/src/replication/source"
"github.com/vmware/harbor/src/replication/target" "github.com/vmware/harbor/src/replication/target"
"github.com/vmware/harbor/src/replication/trigger" "github.com/vmware/harbor/src/replication/trigger"
"github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/utils"
) )
// Controller defines the methods that a replicatoin controllter should implement // Controller defines the methods that a replicatoin controllter should implement
@ -81,7 +79,7 @@ func NewDefaultController(cfg ControllerConfig) *DefaultController {
triggerManager: trigger.NewManager(cfg.CacheCapacity), triggerManager: trigger.NewManager(cfg.CacheCapacity),
} }
ctl.replicator = replicator.NewDefaultReplicator(config.GlobalJobserviceClient) ctl.replicator = replicator.NewDefaultReplicator(utils.GetJobServiceClient())
return ctl return ctl
} }
@ -225,20 +223,25 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i
// prepare candidates for replication // prepare candidates for replication
candidates := getCandidates(&policy, ctl.sourcer, metadata...) candidates := getCandidates(&policy, ctl.sourcer, metadata...)
if len(candidates) == 0 {
log.Debugf("replicaton candidates are null, no further action needed")
}
/* targets := []*common_models.RepTarget{}
targets := []*common_models.RepTarget{} for _, targetID := range policy.TargetIDs {
for _, targetID := range policy.TargetIDs { target, err := ctl.targetManager.GetTarget(targetID)
target, err := ctl.targetManager.GetTarget(targetID) if err != nil {
if err != nil { return err
return err
}
targets = append(targets, target)
} }
*/ targets = append(targets, target)
}
// submit the replication // submit the replication
return replicate(ctl.replicator, policyID, candidates) return ctl.replicator.Replicate(&replicator.Replication{
PolicyID: policyID,
Candidates: candidates,
Targets: targets,
})
} }
func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer, func getCandidates(policy *models.ReplicationPolicy, sourcer *source.Sourcer,
@ -286,33 +289,3 @@ func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer)
return source.NewDefaultFilterChain(filters) return source.NewDefaultFilterChain(filters)
} }
func replicate(replicator replicator.Replicator, policyID int64, candidates []models.FilterItem) error {
if len(candidates) == 0 {
log.Debugf("replicaton candidates are null, no further action needed")
}
repositories := map[string][]string{}
// TODO the operation of all candidates are same for now. Update it after supporting
// replicate deletion
operation := ""
for _, candidate := range candidates {
strs := strings.SplitN(candidate.Value, ":", 2)
repositories[strs[0]] = append(repositories[strs[0]], strs[1])
operation = candidate.Operation
}
for repository, tags := range repositories {
replication := &client.Replication{
PolicyID: policyID,
Repository: repository,
Operation: operation,
Tags: tags,
}
log.Debugf("submiting replication job to jobservice: %v", replication)
if err := replicator.Replicate(replication); err != nil {
return err
}
}
return nil
}

View File

@ -23,21 +23,20 @@ import (
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/source" "github.com/vmware/harbor/src/replication/source"
"github.com/vmware/harbor/src/replication/target"
"github.com/vmware/harbor/src/replication/trigger"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
GlobalController = NewDefaultController(ControllerConfig{}) GlobalController = &DefaultController{
// set the policy manager used by GlobalController with a fake policy manager policyManager: &test.FakePolicyManager{},
controller := GlobalController.(*DefaultController) targetManager: target.NewDefaultManager(),
controller.policyManager = &test.FakePolicyManager{} sourcer: source.NewSourcer(),
triggerManager: trigger.NewManager(0),
}
os.Exit(m.Run()) os.Exit(m.Run())
} }
func TestNewDefaultController(t *testing.T) {
controller := NewDefaultController(ControllerConfig{})
assert.NotNil(t, controller)
}
func TestInit(t *testing.T) { func TestInit(t *testing.T) {
assert.Nil(t, GlobalController.Init()) assert.Nil(t, GlobalController.Init())
} }

View File

@ -15,27 +15,117 @@
package replicator package replicator
import ( import (
"github.com/vmware/harbor/src/jobservice/client" "strings"
"github.com/vmware/harbor/src/common/dao"
common_job "github.com/vmware/harbor/src/common/job"
job_models "github.com/vmware/harbor/src/common/job/models"
common_models "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/ui/config"
) )
// Replication holds information for a replication
type Replication struct {
PolicyID int64
Candidates []models.FilterItem
Targets []*common_models.RepTarget
Operation string
}
// Replicator submits the replication work to the jobservice // Replicator submits the replication work to the jobservice
type Replicator interface { type Replicator interface {
Replicate(*client.Replication) error Replicate(*Replication) error
} }
// DefaultReplicator provides a default implement for Replicator // DefaultReplicator provides a default implement for Replicator
type DefaultReplicator struct { type DefaultReplicator struct {
client client.Client client common_job.Client
} }
// NewDefaultReplicator returns an instance of DefaultReplicator // NewDefaultReplicator returns an instance of DefaultReplicator
func NewDefaultReplicator(client client.Client) *DefaultReplicator { func NewDefaultReplicator(client common_job.Client) *DefaultReplicator {
return &DefaultReplicator{ return &DefaultReplicator{
client: client, client: client,
} }
} }
// Replicate ... // Replicate ...
func (d *DefaultReplicator) Replicate(replication *client.Replication) error { func (d *DefaultReplicator) Replicate(replication *Replication) error {
return d.client.SubmitReplicationJob(replication) repositories := map[string][]string{}
// TODO the operation of all candidates are same for now. Update it after supporting
// replicate deletion
operation := ""
for _, candidate := range replication.Candidates {
strs := strings.SplitN(candidate.Value, ":", 2)
repositories[strs[0]] = append(repositories[strs[0]], strs[1])
operation = candidate.Operation
}
for _, target := range replication.Targets {
for repository, tags := range repositories {
// create job in database
id, err := dao.AddRepJob(common_models.RepJob{
PolicyID: replication.PolicyID,
Repository: repository,
TagList: tags,
Operation: operation,
})
if err != nil {
return err
}
// submit job to jobservice
log.Debugf("submiting replication job to jobservice, repository: %s, tags: %v, operation: %s, target: %s",
repository, tags, operation, target.URL)
job := &job_models.JobData{
Metadata: &job_models.JobMetadata{
JobKind: common_job.JobKindGeneric,
},
// TODO
StatusHook: "",
}
if operation == common_models.RepOpTransfer {
url, err := config.ExtEndpoint()
if err != nil {
return err
}
job.Name = common_job.ImageTransfer
job.Parameters = map[string]interface{}{
"repository": repository,
"tags": tags,
"src_registry_url": url,
"src_registry_insecure": true,
//"src_token_service_url":"",
"dst_registry_url": target.URL,
"dst_registry_insecure": target.Insecure,
"dst_registry_username": target.Username,
"dst_registry_password": target.Password,
}
} else {
job.Name = common_job.ImageDelete
job.Parameters = map[string]interface{}{
"repository": repository,
"tags": tags,
"dst_registry_url": target.URL,
"dst_registry_insecure": target.Insecure,
"dst_registry_username": target.Username,
"dst_registry_password": target.Password,
}
}
uuid, err := d.client.SubmitJob(job)
if err != nil {
return err
}
// create the mapping relationship between the jobs in database and jobservice
if err = dao.SetRepJobUUID(id, uuid); err != nil {
return err
}
}
}
return nil
} }

View File

@ -16,22 +16,8 @@ package replicator
import ( import (
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/jobservice/client"
) )
type fakeJobserviceClient struct{} func TestNewDefaultReplicator(t *testing.T) {
NewDefaultReplicator(nil)
func (f *fakeJobserviceClient) SubmitReplicationJob(replication *client.Replication) error {
return nil
}
func (f *fakeJobserviceClient) StopReplicationJobs(policyID int64) error {
return nil
}
func TestReplicate(t *testing.T) {
replicator := NewDefaultReplicator(&fakeJobserviceClient{})
assert.Nil(t, replicator.Replicate(&client.Replication{}))
} }

View File

@ -17,6 +17,8 @@ package target
import ( import (
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/ui/config"
) )
// Manager defines the methods that a target manager should implement // Manager defines the methods that a target manager should implement
@ -34,5 +36,21 @@ func NewDefaultManager() *DefaultManager {
// GetTarget ... // GetTarget ...
func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) { func (d *DefaultManager) GetTarget(id int64) (*models.RepTarget, error) {
return dao.GetRepTarget(id) target, err := dao.GetRepTarget(id)
if err != nil {
return nil, err
}
// decrypt the password
if len(target.Password) > 0 {
key, err := config.SecretKey()
if err != nil {
return nil, err
}
pwd, err := utils.ReversibleDecrypt(target.Password, key)
if err != nil {
return nil, err
}
target.Password = pwd
}
return target, nil
} }

View File

@ -96,11 +96,6 @@ func InitByURL(adminServerURL string) error {
// init project manager based on deploy mode // init project manager based on deploy mode
initProjectManager() initProjectManager()
GlobalJobserviceClient = jobservice_client.NewDefaultClient(InternalJobServiceURL(),
&jobservice_client.Config{
Secret: UISecret(),
})
return nil return nil
} }