diff --git a/src/replication/operation/controller.go b/src/replication/operation/controller.go index 5b7756ad9..4d233dac9 100644 --- a/src/replication/operation/controller.go +++ b/src/replication/operation/controller.go @@ -41,16 +41,26 @@ type Controller interface { GetTaskLog(int64) ([]byte, error) } +const ( + maxReplicators = 1024 +) + // NewController returns a controller implementation func NewController(js job.Client) Controller { - return &controller{ + ctl := &controller{ + replicators: make(chan struct{}, maxReplicators), executionMgr: execution.NewDefaultManager(), scheduler: scheduler.NewScheduler(js), flowCtl: flow.NewController(), } + for i := 0; i < maxReplicators; i++ { + ctl.replicators <- struct{}{} + } + return ctl } type controller struct { + replicators chan struct{} flowCtl flow.Controller executionMgr execution.Manager scheduler scheduler.Scheduler @@ -67,24 +77,31 @@ func (c *controller) StartReplication(policy *model.Policy, resource *model.Reso if err != nil { return 0, err } - - flow := c.createFlow(id, policy, resource) - if n, err := c.flowCtl.Start(flow); err != nil { - // only update the execution when got error. - // if got no error, it will be updated automatically - // when listing the execution records - if e := c.executionMgr.Update(&models.Execution{ - ID: id, - Status: models.ExecutionStatusFailed, - StatusText: err.Error(), - Total: n, - Failed: n, - }, "Status", "StatusText", "Total", "Failed"); e != nil { - log.Errorf("failed to update the execution %d: %v", id, e) + // control the count of concurrent replication requests + log.Debugf("waiting for the available replicator ...") + <-c.replicators + log.Debugf("got an available replicator, starting the replication ...") + go func() { + defer func() { + c.replicators <- struct{}{} + }() + flow := c.createFlow(id, policy, resource) + if n, err := c.flowCtl.Start(flow); err != nil { + // only update the execution when got error. + // if got no error, it will be updated automatically + // when listing the execution records + if e := c.executionMgr.Update(&models.Execution{ + ID: id, + Status: models.ExecutionStatusFailed, + StatusText: err.Error(), + Total: n, + Failed: n, + }, "Status", "StatusText", "Total", "Failed"); e != nil { + log.Errorf("failed to update the execution %d: %v", id, e) + } + log.Errorf("the execution %d failed: %v", id, err) } - log.Errorf("the execution %d failed: %v", id, err) - } - + }() return id, nil } diff --git a/src/replication/operation/controller_test.go b/src/replication/operation/controller_test.go index b2698ba9a..c19daa8c9 100644 --- a/src/replication/operation/controller_test.go +++ b/src/replication/operation/controller_test.go @@ -16,6 +16,7 @@ package operation import ( "io" + "os" "testing" "github.com/docker/distribution" @@ -197,16 +198,25 @@ func (f *fakedAdapter) DeleteChart(name, version string) error { return nil } -var ctl = &controller{ - executionMgr: &fakedExecutionManager{}, - scheduler: &fakedScheduler{}, - flowCtl: flow.NewController(), +var ctl *controller + +func TestMain(m *testing.M) { + ctl = &controller{ + replicators: make(chan struct{}, 1), + executionMgr: &fakedExecutionManager{}, + scheduler: &fakedScheduler{}, + flowCtl: flow.NewController(), + } + ctl.replicators <- struct{}{} + os.Exit(m.Run()) } func TestStartReplication(t *testing.T) { err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory) require.Nil(t, err) config.Config = &config.Configuration{} + + // policy is disabled policy := &model.Policy{ SrcRegistry: &model.Registry{ Type: model.RegistryTypeHarbor, @@ -224,24 +234,67 @@ func TestStartReplication(t *testing.T) { Vtags: []string{"1.0", "2.0"}, }, } - // policy is disabled _, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.NotNil(t, err) - policy.Enabled = true // replicate resource deletion - resource.Deleted = true + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } + resource = &model.Resource{ + Type: model.ResourceTypeImage, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "library/hello-world", + }, + Vtags: []string{"1.0", "2.0"}, + }, + Deleted: true, + } id, err := ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // replicate resource copy - resource.Deleted = false + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } + resource = &model.Resource{ + Type: model.ResourceTypeImage, + Metadata: &model.ResourceMetadata{ + Repository: &model.Repository{ + Name: "library/hello-world", + }, + Vtags: []string{"1.0", "2.0"}, + }, + Deleted: false, + } id, err = ctl.StartReplication(policy, resource, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id) // nil resource + policy = &model.Policy{ + SrcRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + DestRegistry: &model.Registry{ + Type: model.RegistryTypeHarbor, + }, + Enabled: true, + } id, err = ctl.StartReplication(policy, nil, model.TriggerTypeEventBased) require.Nil(t, err) assert.Equal(t, int64(1), id)