Schedule preheat policy.

Signed-off-by: fanjiankong <fanjiankong@tencent.com>
This commit is contained in:
fanjiankong 2020-07-23 22:33:34 +08:00
parent 86bf6df0d2
commit 3653d3cdef
6 changed files with 171 additions and 23 deletions

View File

@ -11,6 +11,12 @@ import (
providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/policy" "github.com/goharbor/harbor/src/pkg/p2p/preheat/policy"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/scheduler"
)
const (
// SchedulerCallback ...
SchedulerCallback = "P2PPreheatCallback"
) )
var ( var (
@ -119,14 +125,16 @@ type controller struct {
// For instance // For instance
iManager instance.Manager iManager instance.Manager
// For policy // For policy
pManager policy.Manager pManager policy.Manager
scheduler scheduler.Scheduler
} }
// NewController is constructor of controller // NewController is constructor of controller
func NewController() Controller { func NewController() Controller {
return &controller{ return &controller{
iManager: instance.Mgr, iManager: instance.Mgr,
pManager: policy.Mgr, pManager: policy.Mgr,
scheduler: scheduler.Sched,
} }
} }
@ -197,14 +205,45 @@ func (c *controller) CountPolicy(ctx context.Context, query *q.Query) (int64, er
return c.pManager.Count(ctx, query) return c.pManager.Count(ctx, query)
} }
// TriggerParam ...
type TriggerParam struct {
PolicyID int64
}
// CreatePolicy creates the policy. // CreatePolicy creates the policy.
func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (int64, error) { func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (id int64, err error) {
if schema != nil { if schema != nil {
now := time.Now() now := time.Now()
schema.CreatedAt = now schema.CreatedAt = now
schema.UpdatedTime = now schema.UpdatedTime = now
} }
return c.pManager.Create(ctx, schema)
id, err = c.pManager.Create(ctx, schema)
if err != nil {
return
}
if schema.Trigger != nil &&
schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
len(schema.Trigger.Settings.Cron) > 0 {
// schedule and update policy
schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id})
if err != nil {
return 0, err
}
schema.ID = id
err = c.pManager.Update(ctx, schema, "trigger")
if err != nil {
errUnsch := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID)
if errUnsch != nil {
return 0, errUnsch
}
return 0, err
}
}
return
} }
// GetPolicy gets the policy by id. // GetPolicy gets the policy by id.
@ -222,11 +261,72 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
if schema != nil { if schema != nil {
schema.UpdatedTime = time.Now() schema.UpdatedTime = time.Now()
} }
s0, err := c.pManager.Get(ctx, schema.ID)
if err != nil {
return err
}
var cron = schema.Trigger.Settings.Cron
var oldJobID = s0.Trigger.Settings.JobID
var needUn bool
var needSch bool
if s0.Trigger.Type != schema.Trigger.Type {
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 {
needUn = true
}
if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 {
needSch = true
}
} else {
// not change trigger type
if schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
s0.Trigger.Settings.Cron != cron {
// unschedule old
if oldJobID > 0 {
needUn = true
}
// schedule new
if len(cron) > 0 {
// valid cron
needSch = true
}
}
}
// unschedule old
if needUn {
err = c.scheduler.UnSchedule(ctx, oldJobID)
if err != nil {
return err
}
}
// schedule new
if needSch {
jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID})
if err != nil {
return err
}
schema.Trigger.Settings.JobID = jobid
}
return c.pManager.Update(ctx, schema, props...) return c.pManager.Update(ctx, schema, props...)
} }
// DeletePolicy deletes the policy by id. // DeletePolicy deletes the policy by id.
func (c *controller) DeletePolicy(ctx context.Context, id int64) error { func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
s, err := c.pManager.Get(ctx, id)
if err != nil {
return err
}
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 {
err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID)
if err != nil {
return err
}
}
return c.pManager.Delete(ctx, id) return c.pManager.Delete(ctx, id)
} }

View File

@ -7,16 +7,17 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
ormtesting "github.com/goharbor/harbor/src/testing/lib/orm"
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance" "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance"
pmocks "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy" pmocks "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy"
smocks "github.com/goharbor/harbor/src/testing/pkg/scheduler"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -28,6 +29,7 @@ type preheatSuite struct {
controller Controller controller Controller
fakeInstanceMgr *instance.FakeManager fakeInstanceMgr *instance.FakeManager
fakePolicyMgr *pmocks.FakeManager fakePolicyMgr *pmocks.FakeManager
fakeScheduler *smocks.Scheduler
mockInstanceServer *httptest.Server mockInstanceServer *httptest.Server
} }
@ -35,18 +37,22 @@ func TestPreheatSuite(t *testing.T) {
t.Log("Start TestPreheatSuite") t.Log("Start TestPreheatSuite")
fakeInstanceMgr := &instance.FakeManager{} fakeInstanceMgr := &instance.FakeManager{}
fakePolicyMgr := &pmocks.FakeManager{} fakePolicyMgr := &pmocks.FakeManager{}
fakeScheduler := &smocks.Scheduler{}
var c = &controller{ var c = &controller{
iManager: fakeInstanceMgr, iManager: fakeInstanceMgr,
pManager: fakePolicyMgr, pManager: fakePolicyMgr,
scheduler: fakeScheduler,
} }
assert.NotNil(t, c) assert.NotNil(t, c)
ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{})
suite.Run(t, &preheatSuite{ suite.Run(t, &preheatSuite{
ctx: context.Background(), ctx: ctx,
controller: c, controller: c,
fakeInstanceMgr: fakeInstanceMgr, fakeInstanceMgr: fakeInstanceMgr,
fakePolicyMgr: fakePolicyMgr, fakePolicyMgr: fakePolicyMgr,
fakeScheduler: fakeScheduler,
}) })
} }
@ -192,8 +198,12 @@ func (s *preheatSuite) TestCountPolicy() {
} }
func (s *preheatSuite) TestCreatePolicy() { func (s *preheatSuite) TestCreatePolicy() {
policy := &policy.Schema{Name: "test"} policy := &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
policy.Trigger.Settings.Cron = "* * * * */1"
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil) s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil)
s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil)
s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil)
id, err := s.controller.CreatePolicy(s.ctx, policy) id, err := s.controller.CreatePolicy(s.ctx, policy)
s.NoError(err) s.NoError(err)
s.Equal(int64(1), id) s.Equal(int64(1), id)
@ -216,14 +226,31 @@ func (s *preheatSuite) TestGetPolicyByName() {
} }
func (s *preheatSuite) TestUpdatePolicy() { func (s *preheatSuite) TestUpdatePolicy() {
policy := &policy.Schema{Name: "test"} var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
s.fakePolicyMgr.On("Update", s.ctx, policy, mock.Anything).Return(nil) p0.Trigger.Settings.JobID = 1
err := s.controller.UpdatePolicy(s.ctx, policy, "") p0.Trigger.Settings.Cron = "* * * * */1"
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
// need change to schedule
p1 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeManual}}
s.fakePolicyMgr.On("Update", s.ctx, p1, mock.Anything).Return(nil)
err := s.controller.UpdatePolicy(s.ctx, p1, "")
s.NoError(err) s.NoError(err)
s.False(policy.UpdatedTime.IsZero()) s.False(p1.UpdatedTime.IsZero())
// need update schedule
p2 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
p2.Trigger.Settings.Cron = "* * * * */2"
s.fakePolicyMgr.On("Update", s.ctx, p2, mock.Anything).Return(nil)
err = s.controller.UpdatePolicy(s.ctx, p2, "")
s.NoError(err)
s.False(p2.UpdatedTime.IsZero())
} }
func (s *preheatSuite) TestDeletePolicy() { func (s *preheatSuite) TestDeletePolicy() {
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
p0.Trigger.Settings.JobID = 1
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil) s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil)
err := s.controller.DeletePolicy(s.ctx, 1) err := s.controller.DeletePolicy(s.ctx, 1)
s.NoError(err) s.NoError(err)

View File

@ -25,10 +25,12 @@ import (
"github.com/goharbor/harbor/src/common/rbac" "github.com/goharbor/harbor/src/common/rbac"
"github.com/goharbor/harbor/src/common/security" "github.com/goharbor/harbor/src/common/security"
"github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/controller/p2p/preheat"
"github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/promgr" "github.com/goharbor/harbor/src/core/promgr"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/retention"
@ -190,7 +192,7 @@ func Init() error {
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher) retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
callbackFun := func(p interface{}) error { retentionCallbackFun := func(p interface{}) error {
str, ok := p.(string) str, ok := p.(string)
if !ok { if !ok {
return fmt.Errorf("the type of param %v isn't string", p) return fmt.Errorf("the type of param %v isn't string", p)
@ -202,7 +204,24 @@ func Init() error {
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false) _, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
return err return err
} }
err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, callbackFun) err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, retentionCallbackFun)
if err != nil {
return err
}
p2pPreheatCallbackFun := func(p interface{}) error {
str, ok := p.(string)
if !ok {
return fmt.Errorf("the type of param %v isn't string", p)
}
param := &preheat.TriggerParam{}
if err := json.Unmarshal([]byte(str), param); err != nil {
return fmt.Errorf("failed to unmarshal the param: %v", err)
}
_, err := preheat.Enf.EnforcePolicy(orm.Context(), param.PolicyID)
return err
}
err = scheduler.RegisterCallbackFunc(preheat.SchedulerCallback, p2pPreheatCallbackFun)
return err return err
} }

View File

@ -96,7 +96,8 @@ type Trigger struct {
Type TriggerType `json:"type"` Type TriggerType `json:"type"`
Settings struct { Settings struct {
// The cron string for scheduled trigger. // The cron string for scheduled trigger.
Cron string `json:"cron,omitempty"` Cron string `json:"cron,omitempty"`
JobID int64 `json:"job_id,omitempty"`
} `json:"trigger_setting,omitempty"` } `json:"trigger_setting,omitempty"`
} }

View File

@ -16,9 +16,9 @@ package retention
import ( import (
"fmt" "fmt"
"github.com/goharbor/harbor/src/lib/orm"
"time" "time"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/project" "github.com/goharbor/harbor/src/pkg/project"
"github.com/goharbor/harbor/src/pkg/repository" "github.com/goharbor/harbor/src/pkg/repository"
"github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy"

View File

@ -348,13 +348,14 @@ func (api *preheatAPI) ManualPreheat(ctx context.Context, params operation.Manua
return api.SendError(ctx, err) return api.SendError(ctx, err)
} }
_, err = api.enforcer.EnforcePolicy(ctx, policy.ID) executionID, err := api.enforcer.EnforcePolicy(ctx, policy.ID)
if err != nil { if err != nil {
return api.SendError(ctx, err) return api.SendError(ctx, err)
} }
// TODO: build execution URL // TODO: build execution URL
var location = "" var location = fmt.Sprintf("/projects/%s/preheat/policies/%s/executions/%d",
params.ProjectName, params.PreheatPolicyName, executionID)
return operation.NewManualPreheatCreated().WithLocation(location) return operation.NewManualPreheatCreated().WithLocation(location)
} }