From 3653d3cdef04643e6f483824e67bd142b375357f Mon Sep 17 00:00:00 2001 From: fanjiankong Date: Thu, 23 Jul 2020 22:33:34 +0800 Subject: [PATCH] Schedule preheat policy. Signed-off-by: fanjiankong --- src/controller/p2p/preheat/controller.go | 110 +++++++++++++++++- src/controller/p2p/preheat/controllor_test.go | 51 ++++++-- src/core/api/base.go | 23 +++- src/pkg/p2p/preheat/models/policy/policy.go | 3 +- src/pkg/retention/controller.go | 2 +- src/server/v2.0/handler/preheat.go | 5 +- 6 files changed, 171 insertions(+), 23 deletions(-) diff --git a/src/controller/p2p/preheat/controller.go b/src/controller/p2p/preheat/controller.go index 1061a734b..12645813a 100644 --- a/src/controller/p2p/preheat/controller.go +++ b/src/controller/p2p/preheat/controller.go @@ -11,6 +11,12 @@ import ( providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/policy" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider" + "github.com/goharbor/harbor/src/pkg/scheduler" +) + +const ( + // SchedulerCallback ... + SchedulerCallback = "P2PPreheatCallback" ) var ( @@ -119,14 +125,16 @@ type controller struct { // For instance iManager instance.Manager // For policy - pManager policy.Manager + pManager policy.Manager + scheduler scheduler.Scheduler } // NewController is constructor of controller func NewController() Controller { return &controller{ - iManager: instance.Mgr, - pManager: policy.Mgr, + iManager: instance.Mgr, + pManager: policy.Mgr, + scheduler: scheduler.Sched, } } @@ -197,14 +205,45 @@ func (c *controller) CountPolicy(ctx context.Context, query *q.Query) (int64, er return c.pManager.Count(ctx, query) } +// TriggerParam ... +type TriggerParam struct { + PolicyID int64 +} + // CreatePolicy creates the policy. -func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (int64, error) { +func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (id int64, err error) { if schema != nil { now := time.Now() schema.CreatedAt = now schema.UpdatedTime = now } - return c.pManager.Create(ctx, schema) + + id, err = c.pManager.Create(ctx, schema) + if err != nil { + return + } + + if schema.Trigger != nil && + schema.Trigger.Type == policyModels.TriggerTypeScheduled && + len(schema.Trigger.Settings.Cron) > 0 { + // schedule and update policy + schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id}) + if err != nil { + return 0, err + } + + schema.ID = id + err = c.pManager.Update(ctx, schema, "trigger") + if err != nil { + errUnsch := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID) + if errUnsch != nil { + return 0, errUnsch + } + return 0, err + } + } + + return } // GetPolicy gets the policy by id. @@ -222,11 +261,72 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche if schema != nil { schema.UpdatedTime = time.Now() } + s0, err := c.pManager.Get(ctx, schema.ID) + if err != nil { + return err + } + var cron = schema.Trigger.Settings.Cron + var oldJobID = s0.Trigger.Settings.JobID + var needUn bool + var needSch bool + + if s0.Trigger.Type != schema.Trigger.Type { + if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 { + needUn = true + } + if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 { + needSch = true + } + } else { + // not change trigger type + if schema.Trigger.Type == policyModels.TriggerTypeScheduled && + s0.Trigger.Settings.Cron != cron { + // unschedule old + if oldJobID > 0 { + needUn = true + } + // schedule new + if len(cron) > 0 { + // valid cron + needSch = true + } + } + + } + + // unschedule old + if needUn { + err = c.scheduler.UnSchedule(ctx, oldJobID) + if err != nil { + return err + } + } + + // schedule new + if needSch { + jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID}) + if err != nil { + return err + } + schema.Trigger.Settings.JobID = jobid + } + return c.pManager.Update(ctx, schema, props...) } // DeletePolicy deletes the policy by id. func (c *controller) DeletePolicy(ctx context.Context, id int64) error { + s, err := c.pManager.Get(ctx, id) + if err != nil { + return err + } + if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 { + err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID) + if err != nil { + return err + } + } + return c.pManager.Delete(ctx, id) } diff --git a/src/controller/p2p/preheat/controllor_test.go b/src/controller/p2p/preheat/controllor_test.go index d9e5c1b90..2fe4b3cb2 100644 --- a/src/controller/p2p/preheat/controllor_test.go +++ b/src/controller/p2p/preheat/controllor_test.go @@ -7,16 +7,17 @@ import ( "net/http/httptest" "testing" - "github.com/goharbor/harbor/src/lib/q" - - "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth" - "github.com/goharbor/harbor/src/core/config" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy" providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider" + "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth" + ormtesting "github.com/goharbor/harbor/src/testing/lib/orm" "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance" pmocks "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy" + smocks "github.com/goharbor/harbor/src/testing/pkg/scheduler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -28,6 +29,7 @@ type preheatSuite struct { controller Controller fakeInstanceMgr *instance.FakeManager fakePolicyMgr *pmocks.FakeManager + fakeScheduler *smocks.Scheduler mockInstanceServer *httptest.Server } @@ -35,18 +37,22 @@ func TestPreheatSuite(t *testing.T) { t.Log("Start TestPreheatSuite") fakeInstanceMgr := &instance.FakeManager{} fakePolicyMgr := &pmocks.FakeManager{} + fakeScheduler := &smocks.Scheduler{} var c = &controller{ - iManager: fakeInstanceMgr, - pManager: fakePolicyMgr, + iManager: fakeInstanceMgr, + pManager: fakePolicyMgr, + scheduler: fakeScheduler, } assert.NotNil(t, c) + ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{}) suite.Run(t, &preheatSuite{ - ctx: context.Background(), + ctx: ctx, controller: c, fakeInstanceMgr: fakeInstanceMgr, fakePolicyMgr: fakePolicyMgr, + fakeScheduler: fakeScheduler, }) } @@ -192,8 +198,12 @@ func (s *preheatSuite) TestCountPolicy() { } func (s *preheatSuite) TestCreatePolicy() { - policy := &policy.Schema{Name: "test"} + policy := &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} + policy.Trigger.Settings.Cron = "* * * * */1" + s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil) s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil) + s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil) + s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil) id, err := s.controller.CreatePolicy(s.ctx, policy) s.NoError(err) s.Equal(int64(1), id) @@ -216,14 +226,31 @@ func (s *preheatSuite) TestGetPolicyByName() { } func (s *preheatSuite) TestUpdatePolicy() { - policy := &policy.Schema{Name: "test"} - s.fakePolicyMgr.On("Update", s.ctx, policy, mock.Anything).Return(nil) - err := s.controller.UpdatePolicy(s.ctx, policy, "") + var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} + p0.Trigger.Settings.JobID = 1 + p0.Trigger.Settings.Cron = "* * * * */1" + s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil) + + // need change to schedule + p1 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeManual}} + s.fakePolicyMgr.On("Update", s.ctx, p1, mock.Anything).Return(nil) + err := s.controller.UpdatePolicy(s.ctx, p1, "") s.NoError(err) - s.False(policy.UpdatedTime.IsZero()) + s.False(p1.UpdatedTime.IsZero()) + + // need update schedule + p2 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} + p2.Trigger.Settings.Cron = "* * * * */2" + s.fakePolicyMgr.On("Update", s.ctx, p2, mock.Anything).Return(nil) + err = s.controller.UpdatePolicy(s.ctx, p2, "") + s.NoError(err) + s.False(p2.UpdatedTime.IsZero()) } func (s *preheatSuite) TestDeletePolicy() { + var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}} + p0.Trigger.Settings.JobID = 1 + s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil) s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil) err := s.controller.DeletePolicy(s.ctx, 1) s.NoError(err) diff --git a/src/core/api/base.go b/src/core/api/base.go index 94173aa61..ca1ab05e7 100644 --- a/src/core/api/base.go +++ b/src/core/api/base.go @@ -25,10 +25,12 @@ import ( "github.com/goharbor/harbor/src/common/rbac" "github.com/goharbor/harbor/src/common/security" "github.com/goharbor/harbor/src/common/utils" + "github.com/goharbor/harbor/src/controller/p2p/preheat" "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/promgr" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/retention" @@ -190,7 +192,7 @@ func Init() error { retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher) - callbackFun := func(p interface{}) error { + retentionCallbackFun := func(p interface{}) error { str, ok := p.(string) if !ok { return fmt.Errorf("the type of param %v isn't string", p) @@ -202,7 +204,24 @@ func Init() error { _, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false) return err } - err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, callbackFun) + err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, retentionCallbackFun) + if err != nil { + return err + } + + p2pPreheatCallbackFun := func(p interface{}) error { + str, ok := p.(string) + if !ok { + return fmt.Errorf("the type of param %v isn't string", p) + } + param := &preheat.TriggerParam{} + if err := json.Unmarshal([]byte(str), param); err != nil { + return fmt.Errorf("failed to unmarshal the param: %v", err) + } + _, err := preheat.Enf.EnforcePolicy(orm.Context(), param.PolicyID) + return err + } + err = scheduler.RegisterCallbackFunc(preheat.SchedulerCallback, p2pPreheatCallbackFun) return err } diff --git a/src/pkg/p2p/preheat/models/policy/policy.go b/src/pkg/p2p/preheat/models/policy/policy.go index aad2c68e6..69656bd69 100644 --- a/src/pkg/p2p/preheat/models/policy/policy.go +++ b/src/pkg/p2p/preheat/models/policy/policy.go @@ -96,7 +96,8 @@ type Trigger struct { Type TriggerType `json:"type"` Settings struct { // The cron string for scheduled trigger. - Cron string `json:"cron,omitempty"` + Cron string `json:"cron,omitempty"` + JobID int64 `json:"job_id,omitempty"` } `json:"trigger_setting,omitempty"` } diff --git a/src/pkg/retention/controller.go b/src/pkg/retention/controller.go index ba6859b17..c725e18f7 100644 --- a/src/pkg/retention/controller.go +++ b/src/pkg/retention/controller.go @@ -16,9 +16,9 @@ package retention import ( "fmt" - "github.com/goharbor/harbor/src/lib/orm" "time" + "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/retention/policy" diff --git a/src/server/v2.0/handler/preheat.go b/src/server/v2.0/handler/preheat.go index 5efc1d3ff..cc56d772b 100644 --- a/src/server/v2.0/handler/preheat.go +++ b/src/server/v2.0/handler/preheat.go @@ -348,13 +348,14 @@ func (api *preheatAPI) ManualPreheat(ctx context.Context, params operation.Manua return api.SendError(ctx, err) } - _, err = api.enforcer.EnforcePolicy(ctx, policy.ID) + executionID, err := api.enforcer.EnforcePolicy(ctx, policy.ID) if err != nil { return api.SendError(ctx, err) } // TODO: build execution URL - var location = "" + var location = fmt.Sprintf("/projects/%s/preheat/policies/%s/executions/%d", + params.ProjectName, params.PreheatPolicyName, executionID) return operation.NewManualPreheatCreated().WithLocation(location) }