From 621d2f20f3954fa6750d1b5b1bf5c865e0175ce1 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 26 Apr 2019 10:43:15 +0800 Subject: [PATCH] Return immediately after creating the execution record when starting the replication When fetching resources is slow, the starting replication request may timeout, this commit returns immediately after creating the execution record when starting the replication Signed-off-by: Wenkai Yin --- src/replication/operation/controller.go | 53 ++++++++++----- src/replication/operation/controller_test.go | 69 +++++++++++++++++--- 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index c56bf42f9..90ffaea28 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -42,16 +42,26 @@ type Controller interface { GetTaskLog(int64) ([]byte, error) } +const ( + maxReplicators = 1024 +) + // NewController returns a controller implementation func NewController(js job.Client) Controller { - return &controller{ + ctl := &controller{ + replicators: make(chan struct{}, maxReplicators), executionMgr: execution.NewDefaultManager(), scheduler: scheduler.NewScheduler(js), flowCtl: flow.NewController(), } + for i := 0; i < maxReplicators; i++ { + ctl.replicators <- struct{}{} + } + return ctl } type controller struct { + replicators chan struct{} flowCtl flow.Controller executionMgr execution.Manager scheduler scheduler.Scheduler @@ -68,24 +78,31 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso if err != nil { return 0, err } - - flow := c.createFlow(id, policy, resource) - if n, err := c.flowCtl.Start(flow); err != nil { - // only update the execution when got error. - // if got no error, it will be updated automatically - // when listing the execution records - if e := c.executionMgr.Update(&models.Execution{ - ID: id, - Status: models.ExecutionStatusFailed, - StatusText: err.Error(), - Total: n, - Failed: n, - }, "Status", "StatusText", "Total", "Failed"); e != nil { - log.Errorf("failed to update the execution %d: %v", id, e) + // control the count of concurrent replication requests + log.Debugf("waiting for the available replicator ...") + <-c.replicators + log.Debugf("got an available replicator, starting the replication ...") + go func() { + defer func() { + c.replicators <- struct{}{} + }() + flow := c.createFlow(id, policy, resource) + if n, err := c.flowCtl.Start(flow); err != nil { + // only update the execution when got error. + // if got no error, it will be updated automatically + // when listing the execution records + if e := c.executionMgr.Update(&models.Execution{ + ID: id, + Status: models.ExecutionStatusFailed, + StatusText: err.Error(), + Total: n, + Failed: n, + }, "Status", "StatusText", "Total", "Failed"); e != nil { + log.Errorf("failed to update the execution %d: %v", id, e) + } + log.Errorf("the execution %d failed: %v", id, err) } - log.Errorf("the execution %d failed: %v", id, err) - } - + }() return id, nil } diff --git a/src/replication/operation/controller_test.go b/src/replication/operation/controller_test.go index 482022e46..81c928631 100644 --- a/src/replication/operation/controller_test.go +++ b/src/replication/operation/controller_test.go @@ -17,6 +17,7 @@ package operation import ( "errors" "io" + "os" "testing" "github.com/docker/distribution" @@ -198,16 +199,25 @@ func (f *fakedAdapter) DeleteChart(name, version string) error { return nil } -var ctl = &controller{ - executionMgr: &fakedExecutionManager{}, - scheduler: &fakedScheduler{}, - flowCtl: flow.NewController(), +var ctl *controller + +func TestMain(m *testing.M) { + ctl = &controller{ + replicators: make(chan struct{}, 1), + executionMgr: &fakedExecutionManager{}, + scheduler: &fakedScheduler{}, + flowCtl: flow.NewController(), + } + ctl.replicators <- struct{}{} + os.Exit(m.Run()) } func TestStartReplication(t *testing.T) { err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory) require.Nil(t, err) config.Config = &config.Configuration{} + + // policy is disabled policy := &model.Policy{ SrcRegistry: &model.Registry{ Type: model.RegistryTypeHarbor, @@ -225,24 +235,67 @@ func TestStartReplication(t *testing.T) { Vtags: []string{"1.0", "2.0"}, }, } - // policy is disabled _, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.NotNil(t, err) - policy.Enabled = true // replicate resource deletion - resource.Deleted = true + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } + resource = &model.Resource{ + Type: model.ResourceTypeImage, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "library/hello-world", + }, + Vtags: []string{"1.0", "2.0"}, + }, + Deleted: true, + } id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // replicate resource copy - resource.Deleted = false + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } + resource = &model.Resource{ + Type: model.ResourceTypeImage, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "library/hello-world", + }, + Vtags: []string{"1.0", "2.0"}, + }, + Deleted: false, + } id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // nil resource + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id)