mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-24 01:27:49 +01:00
Merge pull request #12567 from kofj/schedule-preheat
Schedule preheat policy.
This commit is contained in:
commit
5d76a1bd51
@ -11,6 +11,12 @@ import (
|
|||||||
providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
|
providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
|
||||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/policy"
|
"github.com/goharbor/harbor/src/pkg/p2p/preheat/policy"
|
||||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
|
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// SchedulerCallback ...
|
||||||
|
SchedulerCallback = "P2PPreheatCallback"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -119,14 +125,16 @@ type controller struct {
|
|||||||
// For instance
|
// For instance
|
||||||
iManager instance.Manager
|
iManager instance.Manager
|
||||||
// For policy
|
// For policy
|
||||||
pManager policy.Manager
|
pManager policy.Manager
|
||||||
|
scheduler scheduler.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController is constructor of controller
|
// NewController is constructor of controller
|
||||||
func NewController() Controller {
|
func NewController() Controller {
|
||||||
return &controller{
|
return &controller{
|
||||||
iManager: instance.Mgr,
|
iManager: instance.Mgr,
|
||||||
pManager: policy.Mgr,
|
pManager: policy.Mgr,
|
||||||
|
scheduler: scheduler.Sched,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,14 +205,45 @@ func (c *controller) CountPolicy(ctx context.Context, query *q.Query) (int64, er
|
|||||||
return c.pManager.Count(ctx, query)
|
return c.pManager.Count(ctx, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TriggerParam ...
|
||||||
|
type TriggerParam struct {
|
||||||
|
PolicyID int64
|
||||||
|
}
|
||||||
|
|
||||||
// CreatePolicy creates the policy.
|
// CreatePolicy creates the policy.
|
||||||
func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (int64, error) {
|
func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (id int64, err error) {
|
||||||
if schema != nil {
|
if schema != nil {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
schema.CreatedAt = now
|
schema.CreatedAt = now
|
||||||
schema.UpdatedTime = now
|
schema.UpdatedTime = now
|
||||||
}
|
}
|
||||||
return c.pManager.Create(ctx, schema)
|
|
||||||
|
id, err = c.pManager.Create(ctx, schema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if schema.Trigger != nil &&
|
||||||
|
schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
|
||||||
|
len(schema.Trigger.Settings.Cron) > 0 {
|
||||||
|
// schedule and update policy
|
||||||
|
schema.Trigger.Settings.JobID, err = c.scheduler.Schedule(ctx, schema.Trigger.Settings.Cron, SchedulerCallback, TriggerParam{PolicyID: id})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
schema.ID = id
|
||||||
|
err = c.pManager.Update(ctx, schema, "trigger")
|
||||||
|
if err != nil {
|
||||||
|
errUnsch := c.scheduler.UnSchedule(ctx, schema.Trigger.Settings.JobID)
|
||||||
|
if errUnsch != nil {
|
||||||
|
return 0, errUnsch
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPolicy gets the policy by id.
|
// GetPolicy gets the policy by id.
|
||||||
@ -222,11 +261,72 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
|||||||
if schema != nil {
|
if schema != nil {
|
||||||
schema.UpdatedTime = time.Now()
|
schema.UpdatedTime = time.Now()
|
||||||
}
|
}
|
||||||
|
s0, err := c.pManager.Get(ctx, schema.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var cron = schema.Trigger.Settings.Cron
|
||||||
|
var oldJobID = s0.Trigger.Settings.JobID
|
||||||
|
var needUn bool
|
||||||
|
var needSch bool
|
||||||
|
|
||||||
|
if s0.Trigger.Type != schema.Trigger.Type {
|
||||||
|
if s0.Trigger.Type == policyModels.TriggerTypeScheduled && oldJobID > 0 {
|
||||||
|
needUn = true
|
||||||
|
}
|
||||||
|
if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 {
|
||||||
|
needSch = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// not change trigger type
|
||||||
|
if schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
|
||||||
|
s0.Trigger.Settings.Cron != cron {
|
||||||
|
// unschedule old
|
||||||
|
if oldJobID > 0 {
|
||||||
|
needUn = true
|
||||||
|
}
|
||||||
|
// schedule new
|
||||||
|
if len(cron) > 0 {
|
||||||
|
// valid cron
|
||||||
|
needSch = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// unschedule old
|
||||||
|
if needUn {
|
||||||
|
err = c.scheduler.UnSchedule(ctx, oldJobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// schedule new
|
||||||
|
if needSch {
|
||||||
|
jobid, err := c.scheduler.Schedule(ctx, cron, SchedulerCallback, TriggerParam{PolicyID: schema.ID})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
schema.Trigger.Settings.JobID = jobid
|
||||||
|
}
|
||||||
|
|
||||||
return c.pManager.Update(ctx, schema, props...)
|
return c.pManager.Update(ctx, schema, props...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletePolicy deletes the policy by id.
|
// DeletePolicy deletes the policy by id.
|
||||||
func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
|
func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
|
||||||
|
s, err := c.pManager.Get(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && s.Trigger.Settings.JobID > 0 {
|
||||||
|
err = c.scheduler.UnSchedule(ctx, s.Trigger.Settings.JobID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return c.pManager.Delete(ctx, id)
|
return c.pManager.Delete(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,16 +7,17 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
|
"github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
|
||||||
providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
|
providerModel "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
|
||||||
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
|
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
|
||||||
|
"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth"
|
||||||
|
ormtesting "github.com/goharbor/harbor/src/testing/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance"
|
"github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/instance"
|
||||||
pmocks "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy"
|
pmocks "github.com/goharbor/harbor/src/testing/pkg/p2p/preheat/policy"
|
||||||
|
smocks "github.com/goharbor/harbor/src/testing/pkg/scheduler"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
@ -28,6 +29,7 @@ type preheatSuite struct {
|
|||||||
controller Controller
|
controller Controller
|
||||||
fakeInstanceMgr *instance.FakeManager
|
fakeInstanceMgr *instance.FakeManager
|
||||||
fakePolicyMgr *pmocks.FakeManager
|
fakePolicyMgr *pmocks.FakeManager
|
||||||
|
fakeScheduler *smocks.Scheduler
|
||||||
mockInstanceServer *httptest.Server
|
mockInstanceServer *httptest.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,18 +37,22 @@ func TestPreheatSuite(t *testing.T) {
|
|||||||
t.Log("Start TestPreheatSuite")
|
t.Log("Start TestPreheatSuite")
|
||||||
fakeInstanceMgr := &instance.FakeManager{}
|
fakeInstanceMgr := &instance.FakeManager{}
|
||||||
fakePolicyMgr := &pmocks.FakeManager{}
|
fakePolicyMgr := &pmocks.FakeManager{}
|
||||||
|
fakeScheduler := &smocks.Scheduler{}
|
||||||
|
|
||||||
var c = &controller{
|
var c = &controller{
|
||||||
iManager: fakeInstanceMgr,
|
iManager: fakeInstanceMgr,
|
||||||
pManager: fakePolicyMgr,
|
pManager: fakePolicyMgr,
|
||||||
|
scheduler: fakeScheduler,
|
||||||
}
|
}
|
||||||
assert.NotNil(t, c)
|
assert.NotNil(t, c)
|
||||||
|
|
||||||
|
ctx := orm.NewContext(context.TODO(), &ormtesting.FakeOrmer{})
|
||||||
suite.Run(t, &preheatSuite{
|
suite.Run(t, &preheatSuite{
|
||||||
ctx: context.Background(),
|
ctx: ctx,
|
||||||
controller: c,
|
controller: c,
|
||||||
fakeInstanceMgr: fakeInstanceMgr,
|
fakeInstanceMgr: fakeInstanceMgr,
|
||||||
fakePolicyMgr: fakePolicyMgr,
|
fakePolicyMgr: fakePolicyMgr,
|
||||||
|
fakeScheduler: fakeScheduler,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,8 +198,12 @@ func (s *preheatSuite) TestCountPolicy() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *preheatSuite) TestCreatePolicy() {
|
func (s *preheatSuite) TestCreatePolicy() {
|
||||||
policy := &policy.Schema{Name: "test"}
|
policy := &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||||
|
policy.Trigger.Settings.Cron = "* * * * */1"
|
||||||
|
s.fakeScheduler.On("Schedule", s.ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
|
||||||
s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil)
|
s.fakePolicyMgr.On("Create", s.ctx, policy).Return(int64(1), nil)
|
||||||
|
s.fakePolicyMgr.On("Update", s.ctx, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
s.fakeScheduler.On("UnSchedule", s.ctx, mock.Anything).Return(nil)
|
||||||
id, err := s.controller.CreatePolicy(s.ctx, policy)
|
id, err := s.controller.CreatePolicy(s.ctx, policy)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.Equal(int64(1), id)
|
s.Equal(int64(1), id)
|
||||||
@ -216,14 +226,31 @@ func (s *preheatSuite) TestGetPolicyByName() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *preheatSuite) TestUpdatePolicy() {
|
func (s *preheatSuite) TestUpdatePolicy() {
|
||||||
policy := &policy.Schema{Name: "test"}
|
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||||
s.fakePolicyMgr.On("Update", s.ctx, policy, mock.Anything).Return(nil)
|
p0.Trigger.Settings.JobID = 1
|
||||||
err := s.controller.UpdatePolicy(s.ctx, policy, "")
|
p0.Trigger.Settings.Cron = "* * * * */1"
|
||||||
|
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
|
||||||
|
|
||||||
|
// need change to schedule
|
||||||
|
p1 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeManual}}
|
||||||
|
s.fakePolicyMgr.On("Update", s.ctx, p1, mock.Anything).Return(nil)
|
||||||
|
err := s.controller.UpdatePolicy(s.ctx, p1, "")
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.False(policy.UpdatedTime.IsZero())
|
s.False(p1.UpdatedTime.IsZero())
|
||||||
|
|
||||||
|
// need update schedule
|
||||||
|
p2 := &policy.Schema{ID: 1, Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||||
|
p2.Trigger.Settings.Cron = "* * * * */2"
|
||||||
|
s.fakePolicyMgr.On("Update", s.ctx, p2, mock.Anything).Return(nil)
|
||||||
|
err = s.controller.UpdatePolicy(s.ctx, p2, "")
|
||||||
|
s.NoError(err)
|
||||||
|
s.False(p2.UpdatedTime.IsZero())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *preheatSuite) TestDeletePolicy() {
|
func (s *preheatSuite) TestDeletePolicy() {
|
||||||
|
var p0 = &policy.Schema{Name: "test", Trigger: &policy.Trigger{Type: policy.TriggerTypeScheduled}}
|
||||||
|
p0.Trigger.Settings.JobID = 1
|
||||||
|
s.fakePolicyMgr.On("Get", s.ctx, int64(1)).Return(p0, nil)
|
||||||
s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil)
|
s.fakePolicyMgr.On("Delete", s.ctx, int64(1)).Return(nil)
|
||||||
err := s.controller.DeletePolicy(s.ctx, 1)
|
err := s.controller.DeletePolicy(s.ctx, 1)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
@ -25,10 +25,12 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/common/rbac"
|
"github.com/goharbor/harbor/src/common/rbac"
|
||||||
"github.com/goharbor/harbor/src/common/security"
|
"github.com/goharbor/harbor/src/common/security"
|
||||||
"github.com/goharbor/harbor/src/common/utils"
|
"github.com/goharbor/harbor/src/common/utils"
|
||||||
|
"github.com/goharbor/harbor/src/controller/p2p/preheat"
|
||||||
"github.com/goharbor/harbor/src/core/config"
|
"github.com/goharbor/harbor/src/core/config"
|
||||||
"github.com/goharbor/harbor/src/core/promgr"
|
"github.com/goharbor/harbor/src/core/promgr"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/log"
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/pkg/project"
|
"github.com/goharbor/harbor/src/pkg/project"
|
||||||
"github.com/goharbor/harbor/src/pkg/repository"
|
"github.com/goharbor/harbor/src/pkg/repository"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention"
|
"github.com/goharbor/harbor/src/pkg/retention"
|
||||||
@ -190,7 +192,7 @@ func Init() error {
|
|||||||
|
|
||||||
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
|
retentionController = retention.NewAPIController(retentionMgr, projectMgr, repository.Mgr, scheduler.Sched, retentionLauncher)
|
||||||
|
|
||||||
callbackFun := func(p interface{}) error {
|
retentionCallbackFun := func(p interface{}) error {
|
||||||
str, ok := p.(string)
|
str, ok := p.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("the type of param %v isn't string", p)
|
return fmt.Errorf("the type of param %v isn't string", p)
|
||||||
@ -202,7 +204,24 @@ func Init() error {
|
|||||||
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
|
_, err := retentionController.TriggerRetentionExec(param.PolicyID, param.Trigger, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, callbackFun)
|
err := scheduler.RegisterCallbackFunc(retention.SchedulerCallback, retentionCallbackFun)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
p2pPreheatCallbackFun := func(p interface{}) error {
|
||||||
|
str, ok := p.(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("the type of param %v isn't string", p)
|
||||||
|
}
|
||||||
|
param := &preheat.TriggerParam{}
|
||||||
|
if err := json.Unmarshal([]byte(str), param); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal the param: %v", err)
|
||||||
|
}
|
||||||
|
_, err := preheat.Enf.EnforcePolicy(orm.Context(), param.PolicyID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = scheduler.RegisterCallbackFunc(preheat.SchedulerCallback, p2pPreheatCallbackFun)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,8 @@ type Trigger struct {
|
|||||||
Type TriggerType `json:"type"`
|
Type TriggerType `json:"type"`
|
||||||
Settings struct {
|
Settings struct {
|
||||||
// The cron string for scheduled trigger.
|
// The cron string for scheduled trigger.
|
||||||
Cron string `json:"cron,omitempty"`
|
Cron string `json:"cron,omitempty"`
|
||||||
|
JobID int64 `json:"job_id,omitempty"`
|
||||||
} `json:"trigger_setting,omitempty"`
|
} `json:"trigger_setting,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,9 +16,9 @@ package retention
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/goharbor/harbor/src/lib/orm"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/pkg/project"
|
"github.com/goharbor/harbor/src/pkg/project"
|
||||||
"github.com/goharbor/harbor/src/pkg/repository"
|
"github.com/goharbor/harbor/src/pkg/repository"
|
||||||
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
"github.com/goharbor/harbor/src/pkg/retention/policy"
|
||||||
|
@ -348,13 +348,14 @@ func (api *preheatAPI) ManualPreheat(ctx context.Context, params operation.Manua
|
|||||||
return api.SendError(ctx, err)
|
return api.SendError(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = api.enforcer.EnforcePolicy(ctx, policy.ID)
|
executionID, err := api.enforcer.EnforcePolicy(ctx, policy.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return api.SendError(ctx, err)
|
return api.SendError(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: build execution URL
|
// TODO: build execution URL
|
||||||
var location = ""
|
var location = fmt.Sprintf("/projects/%s/preheat/policies/%s/executions/%d",
|
||||||
|
params.ProjectName, params.PreheatPolicyName, executionID)
|
||||||
|
|
||||||
return operation.NewManualPreheatCreated().WithLocation(location)
|
return operation.NewManualPreheatCreated().WithLocation(location)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user