From 4f8e283e8e17dea28f4a381280900fb017db8f62 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 26 Mar 2019 19:00:00 +0800 Subject: [PATCH] Add event based trigger and scheduled trigger This commit implements the event based trigger and scheduled trigger in replilcation Signed-off-by: Wenkai Yin --- .../postgresql/0004_1.8.0_schema.up.sql | 13 +- src/common/dao/sqlite.go | 2 +- src/common/job/const.go | 2 + src/core/api/replication_execution.go | 8 +- src/core/api/replication_execution_test.go | 6 +- src/core/api/replication_policy_ng.go | 23 +- src/core/api/replication_policy_ng_test.go | 30 +- src/core/router.go | 2 +- .../service/notifications/jobs/handler.go | 9 +- .../service/notifications/registry/handler.go | 28 +- .../job/impl/scheduler/scheduler.go | 71 ++++ src/jobservice/runtime/bootstrap.go | 2 + src/replication/ng/dao/dao_test.go | 3 +- src/replication/ng/dao/models/base.go | 3 +- src/replication/ng/dao/models/schedule_job.go | 40 +++ src/replication/ng/dao/schedule_job.go | 92 +++++ src/replication/ng/dao/schedule_job_test.go | 83 +++++ src/replication/ng/event/event.go | 31 ++ src/replication/ng/event/handler.go | 113 +++++++ src/replication/ng/event/handler_test.go | 180 ++++++++++ src/replication/ng/model/policy.go | 28 +- src/replication/ng/model/policy_test.go | 153 +++++++++ .../ng/operation/scheduler/scheduler.go | 5 +- src/replication/ng/policy/controller.go | 36 ++ .../ng/policy/controller/controller.go | 145 ++++++++ .../ng/policy/controller/controller_test.go | 315 ++++++++++++++++++ .../ng/policy/{ => manager}/manager.go | 22 +- .../ng/policy/{ => manager}/manager_test.go | 2 +- .../ng/policy/scheduler/scheduler.go | 121 +++++++ .../ng/policy/scheduler/scheduler_test.go | 169 ++++++++++ src/replication/ng/policy/scheduler/status.go | 28 ++ src/replication/ng/replication.go | 25 +- src/replication/ng/replication_test.go | 3 +- 33 files changed, 1704 insertions(+), 89 deletions(-) create mode 100644 src/jobservice/job/impl/scheduler/scheduler.go create mode 100644 src/replication/ng/dao/models/schedule_job.go create mode 100644 src/replication/ng/dao/schedule_job.go create mode 100644 src/replication/ng/dao/schedule_job_test.go create mode 100644 src/replication/ng/event/event.go create mode 100644 src/replication/ng/event/handler.go create mode 100644 src/replication/ng/event/handler_test.go create mode 100644 src/replication/ng/model/policy_test.go create mode 100644 src/replication/ng/policy/controller.go create mode 100644 src/replication/ng/policy/controller/controller.go create mode 100644 src/replication/ng/policy/controller/controller_test.go rename src/replication/ng/policy/{ => manager}/manager.go (89%) rename src/replication/ng/policy/{ => manager}/manager_test.go (99%) create mode 100644 src/replication/ng/policy/scheduler/scheduler.go create mode 100644 src/replication/ng/policy/scheduler/scheduler_test.go create mode 100644 src/replication/ng/policy/scheduler/status.go diff --git a/make/migrations/postgresql/0004_1.8.0_schema.up.sql b/make/migrations/postgresql/0004_1.8.0_schema.up.sql index 7aa4407e3..7ab140ed2 100644 --- a/make/migrations/postgresql/0004_1.8.0_schema.up.sql +++ b/make/migrations/postgresql/0004_1.8.0_schema.up.sql @@ -91,4 +91,15 @@ create table replication_task ( end_time timestamp NULL, PRIMARY KEY (id) ); -CREATE INDEX task_execution ON replication_task (execution_id); \ No newline at end of file +CREATE INDEX task_execution ON replication_task (execution_id); + +create table replication_schedule_job ( + id SERIAL NOT NULL, + policy_id int NOT NULL, + job_id varchar(64), + status varchar(32), + creation_time timestamp default CURRENT_TIMESTAMP, + update_time timestamp NULL, + PRIMARY KEY (id) +); +CREATE INDEX replication_schedule_job_index ON replication_schedule_job (policy_id); \ No newline at end of file diff --git a/src/common/dao/sqlite.go b/src/common/dao/sqlite.go index 9cb4da039..19759ff32 100644 --- a/src/common/dao/sqlite.go +++ b/src/common/dao/sqlite.go @@ -18,7 +18,7 @@ import ( "fmt" "github.com/astaxie/beego/orm" - _ "github.com/mattn/go-sqlite3" // register sqlite driver + // _ "github.com/mattn/go-sqlite3" // register sqlite driver ) type sqlite struct { diff --git a/src/common/job/const.go b/src/common/job/const.go index fa492a477..a465ec680 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -15,6 +15,8 @@ const ( ImageGC = "IMAGE_GC" // Replication : the name of the replication job in job service Replication = "REPLICATION" + // Scheduler : the name of the scheduler job in job service + Scheduler = "SCHEDULER" // JobKindGeneric : Kind of generic job JobKindGeneric = "Generic" diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 0ed3c5a4b..5e2b3027d 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -31,9 +31,9 @@ type ReplicationOperationAPI struct { // Prepare ... func (r *ReplicationOperationAPI) Prepare() { r.BaseController.Prepare() - // TODO if we delegate the jobservice to trigger the scheduled replication, - // add the logic to check whether the user is a solution user - if !r.SecurityCtx.IsSysAdmin() { + // As we delegate the jobservice to trigger the scheduled replication, + // we need to allow the jobservice to call the API + if !(r.SecurityCtx.IsSysAdmin() || r.SecurityCtx.IsSolutionUser()) { if !r.SecurityCtx.IsAuthenticated() { r.HandleUnauthorized() return @@ -102,7 +102,7 @@ func (r *ReplicationOperationAPI) ListExecutions() { func (r *ReplicationOperationAPI) CreateExecution() { execution := &models.Execution{} r.DecodeJSONReq(execution) - policy, err := ng.PolicyMgr.Get(execution.PolicyID) + policy, err := ng.PolicyCtl.Get(execution.PolicyID) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", execution.PolicyID, err)) return diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index 3afe60fba..bfa1493a9 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -147,13 +147,13 @@ func TestListExecutions(t *testing.T) { func TestCreateExecution(t *testing.T) { operationCtl := ng.OperationCtl - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl defer func() { ng.OperationCtl = operationCtl - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr }() ng.OperationCtl = &fakedOperationController{} - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} cases := []*codeCheckingCase{ // 401 diff --git a/src/core/api/replication_policy_ng.go b/src/core/api/replication_policy_ng.go index 8ac6feca1..c5b2c1e1e 100644 --- a/src/core/api/replication_policy_ng.go +++ b/src/core/api/replication_policy_ng.go @@ -52,7 +52,7 @@ func (r *ReplicationPolicyAPI) List() { } query.Page, query.Size = r.GetPaginationParams() - total, policies, err := ng.PolicyMgr.List(query) + total, policies, err := ng.PolicyCtl.List(query) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to list policies: %v", err)) return @@ -73,20 +73,17 @@ func (r *ReplicationPolicyAPI) Create() { return } - id, err := ng.PolicyMgr.Create(policy) + id, err := ng.PolicyCtl.Create(policy) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to create the policy: %v", err)) return } - - // TODO handle replication_now? - r.Redirect(http.StatusCreated, strconv.FormatInt(id, 10)) } // make sure the policy name doesn't exist func (r *ReplicationPolicyAPI) validateName(policy *model.Policy) bool { - p, err := ng.PolicyMgr.GetByName(policy.Name) + p, err := ng.PolicyCtl.GetByName(policy.Name) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to get policy %s: %v", policy.Name, err)) return false @@ -116,8 +113,6 @@ func (r *ReplicationPolicyAPI) validateRegistry(policy *model.Policy) bool { return true } -// TODO validate trigger in create and update - // Get the specified replication policy func (r *ReplicationPolicyAPI) Get() { id, err := r.GetInt64FromPath(":id") @@ -126,7 +121,7 @@ func (r *ReplicationPolicyAPI) Get() { return } - policy, err := ng.PolicyMgr.Get(id) + policy, err := ng.PolicyCtl.Get(id) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err)) return @@ -147,7 +142,7 @@ func (r *ReplicationPolicyAPI) Update() { return } - originalPolicy, err := ng.PolicyMgr.Get(id) + originalPolicy, err := ng.PolicyCtl.Get(id) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err)) return @@ -168,8 +163,8 @@ func (r *ReplicationPolicyAPI) Update() { return } - // TODO passing the properties need to be updated? - if err := ng.PolicyMgr.Update(policy); err != nil { + policy.ID = id + if err := ng.PolicyCtl.Update(policy); err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to update the policy %d: %v", id, err)) return } @@ -183,7 +178,7 @@ func (r *ReplicationPolicyAPI) Delete() { return } - policy, err := ng.PolicyMgr.Get(id) + policy, err := ng.PolicyCtl.Get(id) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err)) return @@ -208,7 +203,7 @@ func (r *ReplicationPolicyAPI) Delete() { } } - if err := ng.PolicyMgr.Remove(id); err != nil { + if err := ng.PolicyCtl.Remove(id); err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to delete the policy %d: %v", id, err)) return } diff --git a/src/core/api/replication_policy_ng_test.go b/src/core/api/replication_policy_ng_test.go index 53a630eff..b7d3c6287 100644 --- a/src/core/api/replication_policy_ng_test.go +++ b/src/core/api/replication_policy_ng_test.go @@ -55,11 +55,11 @@ func (f *fakedRegistryManager) HealthCheck() error { } func TestReplicationPolicyAPIList(t *testing.T) { - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl defer func() { - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr }() - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} cases := []*codeCheckingCase{ // 401 { @@ -93,13 +93,13 @@ func TestReplicationPolicyAPIList(t *testing.T) { } func TestReplicationPolicyAPICreate(t *testing.T) { - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl registryMgr := ng.RegistryMgr defer func() { - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr ng.RegistryMgr = registryMgr }() - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} ng.RegistryMgr = &fakedRegistryManager{} cases := []*codeCheckingCase{ // 401 @@ -206,11 +206,11 @@ func TestReplicationPolicyAPICreate(t *testing.T) { } func TestReplicationPolicyAPIGet(t *testing.T) { - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl defer func() { - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr }() - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} cases := []*codeCheckingCase{ // 401 { @@ -253,13 +253,13 @@ func TestReplicationPolicyAPIGet(t *testing.T) { } func TestReplicationPolicyAPIUpdate(t *testing.T) { - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl registryMgr := ng.RegistryMgr defer func() { - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr ng.RegistryMgr = registryMgr }() - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} ng.RegistryMgr = &fakedRegistryManager{} cases := []*codeCheckingCase{ // 401 @@ -350,11 +350,11 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) { } func TestReplicationPolicyAPIDelete(t *testing.T) { - policyMgr := ng.PolicyMgr + policyMgr := ng.PolicyCtl defer func() { - ng.PolicyMgr = policyMgr + ng.PolicyCtl = policyMgr }() - ng.PolicyMgr = &fakedPolicyManager{} + ng.PolicyCtl = &fakedPolicyManager{} cases := []*codeCheckingCase{ // 401 { diff --git a/src/core/router.go b/src/core/router.go index 5644932d1..467e74e11 100644 --- a/src/core/router.go +++ b/src/core/router.go @@ -128,8 +128,8 @@ func initRouters() { beego.Router("/service/notifications", ®istry.NotificationHandler{}) beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle") beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan") - beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication") beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob") + beego.Router("/service/notifications/jobs/replication/schedule/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationScheduleJob") beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask") beego.Router("/service/token", &token.Handler{}) diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index 0723eaf0c..4f671223a 100644 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -25,6 +25,7 @@ import ( "github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/replication/ng" "github.com/goharbor/harbor/src/replication/ng/operation/hook" + "github.com/goharbor/harbor/src/replication/ng/policy/scheduler" ) var statusMap = map[string]string{ @@ -82,10 +83,10 @@ func (h *Handler) HandleScan() { } } -// HandleReplication handles the webhook of replication job -func (h *Handler) HandleReplication() { - log.Debugf("received replication job status update event: job-%d, status-%s", h.id, h.status) - if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil { +// HandleReplicationScheduleJob handles the webhook of replication schedule job +func (h *Handler) HandleReplicationScheduleJob() { + log.Debugf("received replication schedule job status update event: schedule-job-%d, status-%s", h.id, h.status) + if err := scheduler.UpdateStatus(h.id, h.status); err != nil { log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status) h.HandleInternalServerError(err.Error()) return diff --git a/src/core/service/notifications/registry/handler.go b/src/core/service/notifications/registry/handler.go index 1f7f086c4..7d2a2a997 100644 --- a/src/core/service/notifications/registry/handler.go +++ b/src/core/service/notifications/registry/handler.go @@ -27,10 +27,10 @@ import ( "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/config" - "github.com/goharbor/harbor/src/core/notifier" coreutils "github.com/goharbor/harbor/src/core/utils" - rep_notification "github.com/goharbor/harbor/src/replication/event/notification" - "github.com/goharbor/harbor/src/replication/event/topic" + "github.com/goharbor/harbor/src/replication/ng" + rep_event "github.com/goharbor/harbor/src/replication/ng/event" + "github.com/goharbor/harbor/src/replication/ng/model" ) // NotificationHandler handles request on /service/notifications/, which listens to registry's events. @@ -111,16 +111,22 @@ func (n *NotificationHandler) Post() { return } + // TODO: handle image delete event and chart event go func() { - image := repository + ":" + tag - err := notifier.Publish(topic.ReplicationEventTopicOnPush, rep_notification.OnPushNotification{ - Image: image, - }) - if err != nil { - log.Errorf("failed to publish on push topic for resource %s: %v", image, err) - return + e := &rep_event.Event{ + Type: rep_event.EventTypeImagePush, + Resource: &model.Resource{ + Type: model.ResourceTypeRepository, + Metadata: &model.ResourceMetadata{ + Name: repository, + Namespace: project, + Vtags: []string{tag}, + }, + }, + } + if err := ng.EventHandler.Handle(e); err != nil { + log.Errorf("failed to handle event: %v", err) } - log.Debugf("the on push topic for resource %s published", image) }() if autoScanEnabled(pro) { diff --git a/src/jobservice/job/impl/scheduler/scheduler.go b/src/jobservice/job/impl/scheduler/scheduler.go new file mode 100644 index 000000000..1854ed411 --- /dev/null +++ b/src/jobservice/job/impl/scheduler/scheduler.go @@ -0,0 +1,71 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "net/http" + "os" + + common_http "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/common/http/modifier/auth" + reg "github.com/goharbor/harbor/src/common/utils/registry" + "github.com/goharbor/harbor/src/jobservice/env" + "github.com/goharbor/harbor/src/jobservice/errs" + "github.com/goharbor/harbor/src/jobservice/opm" +) + +// Scheduler is a job running in Jobservice which can be used as +// a scheduler when submitting it as a scheduled job. It receives +// a URL and data, and post the data to the URL when it is running +type Scheduler struct { + ctx env.JobContext +} + +// ShouldRetry ... +func (s *Scheduler) ShouldRetry() bool { + return false +} + +// MaxFails ... +func (s *Scheduler) MaxFails() uint { + return 0 +} + +// Validate .... +func (s *Scheduler) Validate(params map[string]interface{}) error { + return nil +} + +// Run ... +func (s *Scheduler) Run(ctx env.JobContext, params map[string]interface{}) error { + cmd, exist := ctx.OPCommand() + if exist && cmd == opm.CtlCommandStop { + return errs.JobStoppedError() + } + logger := ctx.GetLogger() + + url := params["url"].(string) + data := params["data"] + cred := auth.NewSecretAuthorizer(os.Getenv("JOBSERVICE_SECRET")) + client := common_http.NewClient(&http.Client{ + Transport: reg.GetHTTPTransport(true), + }, cred) + if err := client.Post(url, data); err != nil { + logger.Errorf("failed to run the schedule job: %v", err) + return err + } + logger.Info("the schedule job finished") + return nil +} diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 79e7c0d04..ed3c8c8c9 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -34,6 +34,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng" "github.com/goharbor/harbor/src/jobservice/job/impl/scan" + "github.com/goharbor/harbor/src/jobservice/job/impl/scheduler" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/pool" @@ -213,6 +214,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con job.ImageReplicate: (*replication.Replicator)(nil), job.ImageGC: (*gc.GarbageCollector)(nil), job.Replication: (*ng.Replication)(nil), + job.Scheduler: (*scheduler.Scheduler)(nil), }); err != nil { // exit return nil, err diff --git a/src/replication/ng/dao/dao_test.go b/src/replication/ng/dao/dao_test.go index 2f36ac1f6..170e2679e 100644 --- a/src/replication/ng/dao/dao_test.go +++ b/src/replication/ng/dao/dao_test.go @@ -21,6 +21,7 @@ import ( "github.com/goharbor/harbor/src/common/dao" ) +// TODO clean up the file func TestMain(m *testing.M) { dao.PrepareTestForPostgresSQL() @@ -33,7 +34,7 @@ func TestMain(m *testing.M) { "job_log", "project", "project_member", "project_metadata", "properties", "registry", "replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng", "replication_target", "repository", "robot", "role", "schema_migrations", "user_group", - "replication_execution", "replication_task";`, + "replication_execution", "replication_task", "replication_schedule_job";`, `DROP FUNCTION "update_update_time_at_column"();`, } dao.PrepareTestData(clearSqls, nil) diff --git a/src/replication/ng/dao/models/base.go b/src/replication/ng/dao/models/base.go index da1221e2f..ce97d84c8 100644 --- a/src/replication/ng/dao/models/base.go +++ b/src/replication/ng/dao/models/base.go @@ -9,7 +9,8 @@ func init() { new(Registry), new(RepPolicy), new(Execution), - new(Task)) + new(Task), + new(ScheduleJob)) } // Pagination ... diff --git a/src/replication/ng/dao/models/schedule_job.go b/src/replication/ng/dao/models/schedule_job.go new file mode 100644 index 000000000..7375e489a --- /dev/null +++ b/src/replication/ng/dao/models/schedule_job.go @@ -0,0 +1,40 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO rename the package name to model + +package models + +import "time" + +// ScheduleJob is the persistent model for the schedule job which is +// used as a scheduler +type ScheduleJob struct { + ID int64 `orm:"pk;auto;column(id)" json:"id"` + PolicyID int64 `orm:"column(policy_id)" json:"policy_id"` + JobID string `orm:"column(job_id)" json:"job_id"` + Status string `orm:"column(status)" json:"status"` + CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` + UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` +} + +// TableName is required by by beego orm to map the object to the database table +func (s *ScheduleJob) TableName() string { + return "replication_schedule_job" +} + +// ScheduleJobQuery is the query used to list schedule jobs +type ScheduleJobQuery struct { + PolicyID int64 +} diff --git a/src/replication/ng/dao/schedule_job.go b/src/replication/ng/dao/schedule_job.go new file mode 100644 index 000000000..e1495e58e --- /dev/null +++ b/src/replication/ng/dao/schedule_job.go @@ -0,0 +1,92 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "time" + + "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +// ScheduleJob is the DAO for schedule job +var ScheduleJob ScheduleJobDAO = &scheduleJobDAO{} + +// ScheduleJobDAO ... +type ScheduleJobDAO interface { + Add(*models.ScheduleJob) (int64, error) + Get(int64) (*models.ScheduleJob, error) + Update(*models.ScheduleJob, ...string) error + Delete(int64) error + List(...*models.ScheduleJobQuery) ([]*models.ScheduleJob, error) +} + +type scheduleJobDAO struct{} + +func (s *scheduleJobDAO) Add(sj *models.ScheduleJob) (int64, error) { + now := time.Now() + sj.CreationTime = now + sj.UpdateTime = now + return dao.GetOrmer().Insert(sj) +} + +func (s *scheduleJobDAO) Get(id int64) (*models.ScheduleJob, error) { + sj := &models.ScheduleJob{ + ID: id, + } + if err := dao.GetOrmer().Read(sj); err != nil { + if err == orm.ErrNoRows { + return nil, nil + } + return nil, err + } + return sj, nil +} + +func (s *scheduleJobDAO) Update(sj *models.ScheduleJob, props ...string) error { + if sj.UpdateTime.IsZero() { + now := time.Now() + sj.UpdateTime = now + if len(props) > 0 { + props = append(props, "UpdateTime") + } + } + + _, err := dao.GetOrmer().Update(sj, props...) + return err +} + +func (s *scheduleJobDAO) Delete(id int64) error { + _, err := dao.GetOrmer().Delete(&models.ScheduleJob{ + ID: id, + }) + return err +} + +func (s *scheduleJobDAO) List(query ...*models.ScheduleJobQuery) ([]*models.ScheduleJob, error) { + qs := dao.GetOrmer().QueryTable(&models.ScheduleJob{}) + if len(query) > 0 && query[0] != nil { + if query[0].PolicyID > 0 { + qs = qs.Filter("PolicyID", query[0].PolicyID) + } + } + sjs := []*models.ScheduleJob{} + _, err := qs.All(&sjs) + if err != nil { + return nil, err + } + return sjs, nil +} diff --git a/src/replication/ng/dao/schedule_job_test.go b/src/replication/ng/dao/schedule_job_test.go new file mode 100644 index 000000000..3c7ffd6c9 --- /dev/null +++ b/src/replication/ng/dao/schedule_job_test.go @@ -0,0 +1,83 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +var sjID int64 + +func TestAddScheduleJob(t *testing.T) { + sj := &models.ScheduleJob{ + PolicyID: 1, + JobID: "uuid", + Status: "running", + } + id, err := ScheduleJob.Add(sj) + require.Nil(t, err) + sjID = id +} + +func TestUpdateScheduleJob(t *testing.T) { + err := ScheduleJob.Update(&models.ScheduleJob{ + ID: sjID, + Status: "success", + }, "Status") + require.Nil(t, err) +} + +func TestGetScheduleJob(t *testing.T) { + sj, err := ScheduleJob.Get(sjID) + require.Nil(t, err) + assert.Equal(t, int64(1), sj.PolicyID) + assert.Equal(t, "success", sj.Status) +} + +func TestListScheduleJobs(t *testing.T) { + // nil query + sjs, err := ScheduleJob.List() + require.Nil(t, err) + assert.Equal(t, 1, len(sjs)) + + // query + sjs, err = ScheduleJob.List(&models.ScheduleJobQuery{ + PolicyID: 1, + }) + require.Nil(t, err) + assert.Equal(t, 1, len(sjs)) + + // query + sjs, err = ScheduleJob.List(&models.ScheduleJobQuery{ + PolicyID: 2, + }) + require.Nil(t, err) + assert.Equal(t, 0, len(sjs)) +} + +func TestDeleteScheduleJob(t *testing.T) { + err := ScheduleJob.Delete(sjID) + require.Nil(t, err) + + sj, err := ScheduleJob.Get(sjID) + require.Nil(t, err) + assert.Nil(t, sj) +} diff --git a/src/replication/ng/event/event.go b/src/replication/ng/event/event.go new file mode 100644 index 000000000..414717ce9 --- /dev/null +++ b/src/replication/ng/event/event.go @@ -0,0 +1,31 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import "github.com/goharbor/harbor/src/replication/ng/model" + +// const definitions +const ( + EventTypeImagePush = "image_push" + EventTypeImageDelete = "image_delete" + EventTypeChartUpload = "chart_upload" + EventTypeChartDelete = "chart_delete" +) + +// Event is the model that defines the image/chart pull/push event +type Event struct { + Type string + Resource *model.Resource +} diff --git a/src/replication/ng/event/handler.go b/src/replication/ng/event/handler.go new file mode 100644 index 000000000..48b95e8dc --- /dev/null +++ b/src/replication/ng/event/handler.go @@ -0,0 +1,113 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "errors" + "fmt" + + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/operation" + "github.com/goharbor/harbor/src/replication/ng/policy" +) + +// Handler is the handler to handle event +type Handler interface { + Handle(event *Event) error +} + +// NewHandler ... +func NewHandler(policyCtl policy.Controller, opCtl operation.Controller) Handler { + return &handler{ + policyCtl: policyCtl, + opCtl: opCtl, + } +} + +type handler struct { + policyCtl policy.Controller + opCtl operation.Controller +} + +func (h *handler) Handle(event *Event) error { + if event == nil || event.Resource == nil || + event.Resource.Metadata == nil || + len(event.Resource.Metadata.Vtags) == 0 { + return errors.New("invalid event") + } + var policies []*model.Policy + var err error + switch event.Type { + case EventTypeImagePush, EventTypeChartUpload: + policies, err = h.getRelatedPolicies(event.Resource.Metadata.Namespace) + case EventTypeImageDelete, EventTypeChartDelete: + policies, err = h.getRelatedPolicies(event.Resource.Metadata.Namespace, true) + default: + return fmt.Errorf("unsupported event type %s", event.Type) + } + if err != nil { + return err + } + + if len(policies) == 0 { + log.Debugf("no policy found for the event %v, do nothing", event) + return nil + } + + for _, policy := range policies { + id, err := h.opCtl.StartReplication(policy, event.Resource) + if err != nil { + return err + } + log.Debugf("%s event received, the replication execution %d started", event.Type, id) + } + return nil +} + +func (h *handler) getRelatedPolicies(namespace string, replicateDeletion ...bool) ([]*model.Policy, error) { + _, policies, err := h.policyCtl.List() + if err != nil { + return nil, err + } + result := []*model.Policy{} + for _, policy := range policies { + exist := false + for _, ns := range policy.SrcNamespaces { + if ns == namespace { + exist = true + break + } + } + // contains no namespace that is specified + if !exist { + continue + } + // has no trigger + if policy.Trigger == nil { + continue + } + // trigger type isn't event based + if policy.Trigger.Type != model.TriggerTypeEventBased { + continue + } + // whether replicate deletion doesn't match the value specified in policy + if len(replicateDeletion) > 0 && replicateDeletion[0] != policy.Deletion { + continue + } + result = append(result, policy) + } + return result, nil +} diff --git a/src/replication/ng/event/handler_test.go b/src/replication/ng/event/handler_test.go new file mode 100644 index 000000000..dfcaef85b --- /dev/null +++ b/src/replication/ng/event/handler_test.go @@ -0,0 +1,180 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/goharbor/harbor/src/replication/ng/model" +) + +type fakedOperationController struct{} + +func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) { + return 1, nil +} +func (f *fakedOperationController) StopReplication(int64) error { + return nil +} +func (f *fakedOperationController) ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 0, nil, nil +} +func (f *fakedOperationController) GetExecution(id int64) (*models.Execution, error) { + return nil, nil +} +func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 0, nil, nil +} +func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) { + return nil, nil +} +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { + return nil +} +func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) { + return nil, nil +} + +type fakedPolicyController struct{} + +func (f *fakedPolicyController) Create(*model.Policy) (int64, error) { + return 0, nil +} +func (f *fakedPolicyController) List(...*model.PolicyQuery) (int64, []*model.Policy, error) { + polices := []*model.Policy{ + { + ID: 1, + SrcNamespaces: []string{"test"}, + Deletion: false, + Trigger: &model.Trigger{ + Type: model.TriggerTypeEventBased, + }, + }, + { + ID: 2, + SrcNamespaces: []string{"library"}, + Deletion: true, + Trigger: nil, + }, + { + ID: 3, + SrcNamespaces: []string{"library"}, + Deletion: false, + Trigger: &model.Trigger{ + Type: model.TriggerTypeEventBased, + }, + }, + { + ID: 4, + SrcNamespaces: []string{"library"}, + Deletion: true, + Trigger: &model.Trigger{ + Type: model.TriggerTypeEventBased, + }, + }, + } + return int64(len(polices)), polices, nil +} +func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) { + return nil, nil +} +func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) { + return nil, nil +} +func (f *fakedPolicyController) Update(*model.Policy, ...string) error { + return nil +} +func (f *fakedPolicyController) Remove(int64) error { + return nil +} +func TestGetRelatedPolicies(t *testing.T) { + handler := &handler{ + policyCtl: &fakedPolicyController{}, + } + policies, err := handler.getRelatedPolicies("library") + require.Nil(t, err) + assert.Equal(t, 2, len(policies)) + assert.Equal(t, int64(3), policies[0].ID) + assert.Equal(t, int64(4), policies[1].ID) + + policies, err = handler.getRelatedPolicies("library", true) + require.Nil(t, err) + assert.Equal(t, 1, len(policies)) + assert.Equal(t, int64(4), policies[0].ID) +} + +func TestHandle(t *testing.T) { + handler := NewHandler(&fakedPolicyController{}, &fakedOperationController{}) + // nil event + err := handler.Handle(nil) + require.NotNil(t, err) + + // nil vtags + err = handler.Handle(&Event{ + Resource: &model.Resource{ + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{}, + }, + }, + Type: EventTypeImagePush, + }) + require.NotNil(t, err) + + // unsupported event type + err = handler.Handle(&Event{ + Resource: &model.Resource{ + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + }, + Type: "unsupported", + }) + require.NotNil(t, err) + + // push image + err = handler.Handle(&Event{ + Resource: &model.Resource{ + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + }, + Type: EventTypeImagePush, + }) + require.Nil(t, err) + + // delete image + err = handler.Handle(&Event{ + Resource: &model.Resource{ + Metadata: &model.ResourceMetadata{ + Name: "library/hello-world", + Namespace: "library", + Vtags: []string{"latest"}, + }, + }, + Type: EventTypeImageDelete, + }) + require.Nil(t, err) +} diff --git a/src/replication/ng/model/policy.go b/src/replication/ng/model/policy.go index 6cb8296b1..f6f9ae541 100644 --- a/src/replication/ng/model/policy.go +++ b/src/replication/ng/model/policy.go @@ -15,6 +15,7 @@ package model import ( + "fmt" "time" "github.com/astaxie/beego/validation" @@ -57,6 +58,7 @@ type Policy struct { // Trigger Trigger *Trigger `json:"trigger"` // Settings + // TODO: rename the property name Deletion bool `json:"deletion"` // If override the image tag Override bool `json:"override"` @@ -90,7 +92,29 @@ func (p *Policy) Valid(v *validation.Validation) { } } - // TODO valid trigger and filters + // valid the filters + for _, filter := range p.Filters { + if filter.Type != FilterTypeResource && + filter.Type != FilterTypeName && + filter.Type != FilterTypeTag && + filter.Type != FilterTypeLabel { + v.SetError("filters", "invalid filter type") + break + } + } + + // valid trigger + if p.Trigger != nil { + if p.Trigger.Type != TriggerTypeManual && + p.Trigger.Type != TriggerTypeScheduled && + p.Trigger.Type != TriggerTypeEventBased { + v.SetError("trigger", "invalid trigger type") + } + if p.Trigger.Type == TriggerTypeScheduled && + (p.Trigger.Settings == nil || len(p.Trigger.Settings.Cron) == 0) { + v.SetError("trigger", fmt.Sprintf("the cron string cannot be empty when the trigger type is %s", TriggerTypeScheduled)) + } + } } // FilterType represents the type info of the filter. @@ -105,7 +129,7 @@ type Filter struct { // TriggerType represents the type of trigger. type TriggerType string -// Trigger holds info fot a trigger +// Trigger holds info for a trigger type Trigger struct { Type TriggerType `json:"type"` Settings *TriggerSettings `json:"trigger_settings"` diff --git a/src/replication/ng/model/policy_test.go b/src/replication/ng/model/policy_test.go new file mode 100644 index 000000000..21b7fb98d --- /dev/null +++ b/src/replication/ng/model/policy_test.go @@ -0,0 +1,153 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/astaxie/beego/validation" +) + +func TestValidOfPolicy(t *testing.T) { + cases := []struct { + policy *Policy + pass bool + }{ + // empty name + { + policy: &Policy{}, + pass: false, + }, + // empty source registry and destination registry + { + policy: &Policy{ + Name: "policy01", + }, + pass: false, + }, + // source registry and destination registry both not empty + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 1, + DestRegistryID: 2, + }, + pass: false, + }, + // empty source namespaces + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{}, + }, + pass: false, + }, + // empty source namespaces + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{""}, + }, + pass: false, + }, + // invalid filter + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{"library"}, + Filters: []*Filter{ + { + Type: "invalid_type", + }, + }, + }, + pass: false, + }, + // invalid trigger + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{"library"}, + Filters: []*Filter{ + { + Type: FilterTypeName, + Value: "library", + }, + }, + Trigger: &Trigger{ + Type: "invalid_type", + }, + }, + pass: false, + }, + // invalid trigger + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{"library"}, + Filters: []*Filter{ + { + Type: FilterTypeName, + Value: "library", + }, + }, + Trigger: &Trigger{ + Type: TriggerTypeScheduled, + }, + }, + pass: false, + }, + // pass + { + policy: &Policy{ + Name: "policy01", + SrcRegistryID: 0, + DestRegistryID: 1, + SrcNamespaces: []string{"library"}, + Filters: []*Filter{ + { + Type: FilterTypeName, + Value: "library", + }, + }, + Trigger: &Trigger{ + Type: TriggerTypeScheduled, + Settings: &TriggerSettings{ + Cron: "* * *", + }, + }, + }, + pass: true, + }, + } + + for _, c := range cases { + v := &validation.Validation{} + c.policy.Valid(v) + assert.Equal(t, c.pass, len(v.Errors) == 0) + } +} diff --git a/src/replication/ng/operation/scheduler/scheduler.go b/src/replication/ng/operation/scheduler/scheduler.go index f5686af83..9cbfac285 100644 --- a/src/replication/ng/operation/scheduler/scheduler.go +++ b/src/replication/ng/operation/scheduler/scheduler.go @@ -32,11 +32,12 @@ type defaultScheduler struct { } // TODO use the service account? +// TODO use the common transport // NewScheduler returns an instance of Scheduler -func NewScheduler(jobserviceURL, secret string) Scheduler { +func NewScheduler(js job.Client) Scheduler { return &defaultScheduler{ - client: job.NewDefaultClient(jobserviceURL, secret), + client: js, } } diff --git a/src/replication/ng/policy/controller.go b/src/replication/ng/policy/controller.go new file mode 100644 index 000000000..bd02133c1 --- /dev/null +++ b/src/replication/ng/policy/controller.go @@ -0,0 +1,36 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policy + +import ( + "github.com/goharbor/harbor/src/replication/ng/model" +) + +// Controller controls the replication policies +type Controller interface { + // Create new policy + Create(*model.Policy) (int64, error) + // List the policies, returns the total count, policy list and error + List(...*model.PolicyQuery) (int64, []*model.Policy, error) + // Get policy with specified ID + Get(int64) (*model.Policy, error) + // Get policy by the name + GetByName(string) (*model.Policy, error) + // Update the specified policy, the "props" are the properties of policy + // that need to be updated + Update(policy *model.Policy, props ...string) error + // Remove the specified policy + Remove(int64) error +} diff --git a/src/replication/ng/policy/controller/controller.go b/src/replication/ng/policy/controller/controller.go new file mode 100644 index 000000000..565887e29 --- /dev/null +++ b/src/replication/ng/policy/controller/controller.go @@ -0,0 +1,145 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/policy" + "github.com/goharbor/harbor/src/replication/ng/policy/manager" + "github.com/goharbor/harbor/src/replication/ng/policy/scheduler" +) + +// NewController returns a policy controller which can CURD and schedule policies +func NewController(js job.Client) policy.Controller { + mgr := manager.NewDefaultManager() + scheduler := scheduler.NewScheduler(js) + ctl := &controller{ + scheduler: scheduler, + } + ctl.Controller = mgr + return ctl +} + +type controller struct { + policy.Controller + scheduler scheduler.Scheduler +} + +func (c *controller) Create(policy *model.Policy) (int64, error) { + id, err := c.Controller.Create(policy) + if err != nil { + return 0, err + } + if isScheduledTrigger(policy) { + // TODO: need a way to show the schedule status to users + // maybe we can add a property "schedule status" for + // listing policy API + if err = c.scheduler.Schedule(id, policy.Trigger.Settings.Cron); err != nil { + log.Errorf("failed to schedule the policy %d: %v", id, err) + } + } + return id, nil +} + +func (c *controller) Update(policy *model.Policy, props ...string) error { + origin, err := c.Controller.Get(policy.ID) + if err != nil { + return err + } + if origin == nil { + return fmt.Errorf("policy %d not found", policy.ID) + } + // if no need to reschedule the policy, just update it + if !isScheduleTriggerChanged(origin, policy, props...) { + return c.Controller.Update(policy, props...) + } + // need to reschedule the policy + // unschedule first if needed + if isScheduledTrigger(origin) { + if err = c.scheduler.Unschedule(origin.ID); err != nil { + return fmt.Errorf("failed to unschedule the policy %d: %v", origin.ID, err) + } + } + // update the policy + if err = c.Controller.Update(policy, props...); err != nil { + return err + } + // schedule again if needed + if isScheduledTrigger(policy) { + if err = c.scheduler.Schedule(policy.ID, policy.Trigger.Settings.Cron); err != nil { + return fmt.Errorf("failed to schedule the policy %d: %v", policy.ID, err) + } + } + return nil +} + +func (c *controller) Remove(policyID int64) error { + policy, err := c.Controller.Get(policyID) + if err != nil { + return err + } + if policy == nil { + return fmt.Errorf("policy %d not found", policyID) + } + if isScheduledTrigger(policy) { + if err = c.scheduler.Unschedule(policyID); err != nil { + return err + } + } + return c.Controller.Remove(policyID) +} + +func isScheduledTrigger(policy *model.Policy) bool { + if policy == nil { + return false + } + if policy.Trigger == nil { + return false + } + return policy.Trigger.Type == model.TriggerTypeScheduled +} + +func isScheduleTriggerChanged(origin, current *model.Policy, props ...string) bool { + // doesn't update the trigger property + if len(props) > 0 { + found := false + for _, prop := range props { + if prop == "Trigger" || prop == "cron_str" { + found = true + break + } + } + if !found { + return false + } + } + + o := isScheduledTrigger(origin) + c := isScheduledTrigger(current) + // both triggers are not scheduled + if !o && !c { + return false + } + // both triggers are scheduled + if o && c { + return origin.Trigger.Settings.Cron != current.Trigger.Settings.Cron + } + // one is scheduled but the other one isn't + return true +} diff --git a/src/replication/ng/policy/controller/controller_test.go b/src/replication/ng/policy/controller/controller_test.go new file mode 100644 index 000000000..c31594f68 --- /dev/null +++ b/src/replication/ng/policy/controller/controller_test.go @@ -0,0 +1,315 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" + + "github.com/goharbor/harbor/src/replication/ng/model" +) + +type fakedPolicyController struct { + policy *model.Policy +} + +func (f *fakedPolicyController) Create(*model.Policy) (int64, error) { + return 0, nil +} +func (f *fakedPolicyController) List(...*model.PolicyQuery) (int64, []*model.Policy, error) { + return 0, nil, nil +} +func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) { + return f.policy, nil +} +func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) { + return nil, nil +} +func (f *fakedPolicyController) Update(*model.Policy, ...string) error { + return nil +} +func (f *fakedPolicyController) Remove(int64) error { + return nil +} + +type fakedScheduler struct { + scheduled bool + unscheduled bool +} + +func (f *fakedScheduler) Schedule(policyID int64, cron string) error { + f.scheduled = true + return nil +} +func (f *fakedScheduler) Unschedule(policyID int64) error { + f.unscheduled = true + return nil +} + +func TestIsScheduledTrigger(t *testing.T) { + cases := []struct { + policy *model.Policy + expected bool + }{ + // policy is nil + { + policy: nil, + expected: false, + }, + // trigger is nil + { + policy: &model.Policy{}, + expected: false, + }, + // trigger type isn't scheduled + { + policy: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeManual, + }, + }, + expected: false, + }, + // trigger type is scheduled + { + policy: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + }, + }, + expected: true, + }, + } + for _, c := range cases { + assert.Equal(t, c.expected, isScheduledTrigger(c.policy)) + } +} + +func TestIsScheduleTriggerChanged(t *testing.T) { + cases := []struct { + origin *model.Policy + current *model.Policy + props []string + expected bool + }{ + // props contains no trigger field + { + props: []string{"name"}, + expected: false, + }, + // both triggers are not scheduled + { + + origin: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeManual, + }, + }, + current: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeManual, + }, + }, + props: []string{"Trigger"}, + expected: false, + }, + // both triggers are scheduled and the crons are not same + { + + origin: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + }, + current: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 * * * *", + }, + }, + }, + props: []string{"Trigger"}, + expected: true, + }, + // both triggers are scheduled and the crons are same + { + + origin: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + }, + current: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + }, + props: []string{"Trigger"}, + expected: false, + }, + // one trigger is scheduled but the other one isn't + { + + origin: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + }, + current: &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeManual, + }, + }, + props: []string{"Trigger"}, + expected: true, + }, + } + for _, c := range cases { + assert.Equal(t, c.expected, isScheduleTriggerChanged(c.origin, c.current, c.props...)) + } +} + +func TestCreate(t *testing.T) { + scheduler := &fakedScheduler{} + ctl := &controller{ + scheduler: scheduler, + } + ctl.Controller = &fakedPolicyController{} + + // not scheduled trigger + _, err := ctl.Create(&model.Policy{}) + require.Nil(t, err) + assert.False(t, scheduler.scheduled) + + // scheduled trigger + _, err = ctl.Create(&model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + }) + require.Nil(t, err) + assert.True(t, scheduler.scheduled) +} + +func TestUpdate(t *testing.T) { + scheduler := &fakedScheduler{} + c := &fakedPolicyController{} + ctl := &controller{ + scheduler: scheduler, + } + ctl.Controller = c + + var origin, current *model.Policy + // origin policy is nil + current = &model.Policy{ + ID: 1, + } + err := ctl.Update(current) + assert.NotNil(t, err) + + // the trigger doesn't change + origin = &model.Policy{ + ID: 1, + } + c.policy = origin + current = origin + err = ctl.Update(current, "Trigger") + require.Nil(t, err) + assert.False(t, scheduler.scheduled) + assert.False(t, scheduler.unscheduled) + + // the trigger changed + origin = &model.Policy{ + ID: 1, + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + } + c.policy = origin + current = &model.Policy{ + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 * * * *", + }, + }, + } + err = ctl.Update(current, "Trigger") + require.Nil(t, err) + assert.True(t, scheduler.unscheduled) + assert.True(t, scheduler.scheduled) +} + +func TestRemove(t *testing.T) { + scheduler := &fakedScheduler{} + c := &fakedPolicyController{} + ctl := &controller{ + scheduler: scheduler, + } + ctl.Controller = c + + // policy is nil + err := ctl.Remove(1) + assert.NotNil(t, err) + + // the trigger type isn't scheduled + policy := &model.Policy{ + ID: 1, + Trigger: &model.Trigger{ + Type: model.TriggerTypeManual, + }, + } + c.policy = policy + err = ctl.Remove(1) + require.Nil(t, err) + assert.False(t, scheduler.unscheduled) + + // the trigger type is scheduled + policy = &model.Policy{ + ID: 1, + Trigger: &model.Trigger{ + Type: model.TriggerTypeScheduled, + Settings: &model.TriggerSettings{ + Cron: "03 05 * * *", + }, + }, + } + c.policy = policy + err = ctl.Remove(1) + require.Nil(t, err) + assert.True(t, scheduler.unscheduled) +} diff --git a/src/replication/ng/policy/manager.go b/src/replication/ng/policy/manager/manager.go similarity index 89% rename from src/replication/ng/policy/manager.go rename to src/replication/ng/policy/manager/manager.go index 50eb16d01..1c026467f 100644 --- a/src/replication/ng/policy/manager.go +++ b/src/replication/ng/policy/manager/manager.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package policy +package manager import ( "encoding/json" @@ -23,6 +23,7 @@ import ( "github.com/goharbor/harbor/src/replication/ng/dao" persist_models "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/policy" ) var errNilPolicyModel = errors.New("nil policy model") @@ -113,27 +114,10 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err return ply, nil } -// Manager manages replication policies -type Manager interface { - // Create new policy - Create(*model.Policy) (int64, error) - // List the policies, returns the total count, policy list and error - List(...*model.PolicyQuery) (int64, []*model.Policy, error) - // Get policy with specified ID - Get(int64) (*model.Policy, error) - // Get policy by the name - GetByName(string) (*model.Policy, error) - // Update the specified policy, the "props" are the properties of policy - // that need to be updated - Update(policy *model.Policy, props ...string) error - // Remove the specified policy - Remove(int64) error -} - // DefaultManager provides replication policy CURD capabilities. type DefaultManager struct{} -var _ Manager = &DefaultManager{} +var _ policy.Controller = &DefaultManager{} // NewDefaultManager is the constructor of DefaultManager. func NewDefaultManager() *DefaultManager { diff --git a/src/replication/ng/policy/manager_test.go b/src/replication/ng/policy/manager/manager_test.go similarity index 99% rename from src/replication/ng/policy/manager_test.go rename to src/replication/ng/policy/manager/manager_test.go index 03b1103f4..b6f3191eb 100644 --- a/src/replication/ng/policy/manager_test.go +++ b/src/replication/ng/policy/manager/manager_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package policy +package manager import ( "reflect" diff --git a/src/replication/ng/policy/scheduler/scheduler.go b/src/replication/ng/policy/scheduler/scheduler.go new file mode 100644 index 000000000..c29c8470e --- /dev/null +++ b/src/replication/ng/policy/scheduler/scheduler.go @@ -0,0 +1,121 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "fmt" + "net/http" + "time" + + common_http "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/common/job" + job_models "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/config" + "github.com/goharbor/harbor/src/replication/ng/dao" + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +// Scheduler can be used to schedule or unschedule a scheduled policy +// Currently, the default scheduler implements its capabilities by delegating +// the scheduled job of jobservice +type Scheduler interface { + Schedule(policyID int64, cron string) error + Unschedule(policyID int64) error +} + +// NewScheduler returns an instance of scheduler +func NewScheduler(js job.Client) Scheduler { + return &scheduler{ + jobservice: js, + } +} + +type scheduler struct { + jobservice job.Client +} + +func (s *scheduler) Schedule(policyID int64, cron string) error { + now := time.Now() + id, err := dao.ScheduleJob.Add(&models.ScheduleJob{ + PolicyID: policyID, + Status: job.JobServiceStatusPending, + CreationTime: now, + UpdateTime: now, + }) + if err != nil { + return err + } + log.Debugf("the schedule job record %d added", id) + + replicateURL := fmt.Sprintf("%s/api/replication/executions", config.Config.CoreURL) + statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id) + jobID, err := s.jobservice.SubmitJob(&job_models.JobData{ + Name: job.Scheduler, + Parameters: map[string]interface{}{ + "url": replicateURL, + "data": &models.Execution{ + PolicyID: policyID, + }, + }, + Metadata: &job_models.JobMetadata{ + JobKind: job.JobKindPeriodic, + Cron: cron, + }, + StatusHook: statusHookURL, + }) + if err != nil { + // clean up the record in database + if e := dao.ScheduleJob.Delete(id); e != nil { + log.Errorf("failed to delete the schedule job %d: %v", id, e) + } else { + log.Debugf("the schedule job record %d deleted", id) + } + return err + } + log.Debugf("the schedule job for policy %d submitted to the jobservice", policyID) + + err = dao.ScheduleJob.Update(&models.ScheduleJob{ + ID: id, + JobID: jobID, + }, "JobID") + log.Debugf("the policy %d scheduled", policyID) + return err +} + +func (s *scheduler) Unschedule(policyID int64) error { + sjs, err := dao.ScheduleJob.List(&models.ScheduleJobQuery{ + PolicyID: policyID, + }) + if err != nil { + return err + } + for _, sj := range sjs { + if err = s.jobservice.PostAction(sj.JobID, job.JobActionStop); err != nil { + // if the job specified by jobID is not found in jobservice, just delete + // the record from database + if e, ok := err.(*common_http.Error); !ok || e.Code != http.StatusNotFound { + return err + } + log.Debugf("the stop action for schedule job %s submitted to the jobservice", sj.JobID) + } + if err = dao.ScheduleJob.Delete(sj.ID); err != nil { + return err + } + log.Debugf("the schedule job record %d deleted", sj.ID) + } + log.Debugf("the policy %d unscheduled", policyID) + return nil +} diff --git a/src/replication/ng/policy/scheduler/scheduler_test.go b/src/replication/ng/policy/scheduler/scheduler_test.go new file mode 100644 index 000000000..e048daec9 --- /dev/null +++ b/src/replication/ng/policy/scheduler/scheduler_test.go @@ -0,0 +1,169 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "fmt" + "testing" + + "github.com/goharbor/harbor/src/replication/ng/config" + + "github.com/stretchr/testify/assert" + + "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/replication/ng/dao" + rep_models "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/stretchr/testify/require" +) + +// TODO share the faked implementation in a separated common package? +// TODO can we use a mock framework? + +var ( + uuid = "uuid" + policyID int64 = 100 +) + +type fakedJobserviceClient struct { + jobData *models.JobData + stopped bool +} + +func (f *fakedJobserviceClient) SubmitJob(jobData *models.JobData) (string, error) { + f.jobData = jobData + return uuid, nil +} +func (f *fakedJobserviceClient) GetJobLog(uuid string) ([]byte, error) { + f.stopped = true + return nil, nil +} +func (f *fakedJobserviceClient) PostAction(uuid, action string) error { + f.stopped = true + return nil +} + +type fakedScheduleJobDAO struct { + idCounter int64 + sjs map[int64]*rep_models.ScheduleJob +} + +func (f *fakedScheduleJobDAO) Add(sj *rep_models.ScheduleJob) (int64, error) { + if f.sjs == nil { + f.sjs = make(map[int64]*rep_models.ScheduleJob) + } + id := f.idCounter + 1 + sj.ID = id + f.sjs[id] = sj + return id, nil +} +func (f *fakedScheduleJobDAO) Get(id int64) (*rep_models.ScheduleJob, error) { + if f.sjs == nil { + return nil, nil + } + return f.sjs[id], nil +} +func (f *fakedScheduleJobDAO) Update(sj *rep_models.ScheduleJob, props ...string) error { + err := fmt.Errorf("schedule job %d not found", sj.ID) + if f.sjs == nil { + return err + } + j, exist := f.sjs[sj.ID] + if !exist { + return err + } + if len(props) == 0 { + f.sjs[sj.ID] = sj + return nil + } + + for _, prop := range props { + switch prop { + case "PolicyID": + j.PolicyID = sj.PolicyID + case "JobID": + j.JobID = sj.JobID + case "Status": + j.Status = sj.Status + case "UpdateTime": + j.UpdateTime = sj.UpdateTime + } + } + return nil +} +func (f *fakedScheduleJobDAO) Delete(id int64) error { + if f.sjs == nil { + return nil + } + delete(f.sjs, id) + return nil +} +func (f *fakedScheduleJobDAO) List(query ...*rep_models.ScheduleJobQuery) ([]*rep_models.ScheduleJob, error) { + var policyID int64 + if len(query) > 0 { + policyID = query[0].PolicyID + } + sjs := []*rep_models.ScheduleJob{} + for _, sj := range f.sjs { + if policyID == 0 { + sjs = append(sjs, sj) + continue + } + if sj.PolicyID == policyID { + sjs = append(sjs, sj) + } + } + return sjs, nil +} + +func TestSchedule(t *testing.T) { + config.Config = &config.Configuration{} + dao.ScheduleJob = &fakedScheduleJobDAO{} + js := &fakedJobserviceClient{} + scheduler := NewScheduler(js) + err := scheduler.Schedule(policyID, "1 * * * *") + require.Nil(t, err) + + sjs, err := dao.ScheduleJob.List(&rep_models.ScheduleJobQuery{ + PolicyID: policyID, + }) + require.Nil(t, err) + require.Equal(t, 1, len(sjs)) + assert.Equal(t, uuid, sjs[0].JobID) + + execution, ok := js.jobData.Parameters["data"].(*rep_models.Execution) + require.True(t, ok) + assert.Equal(t, policyID, execution.PolicyID) +} + +func TestUnschedule(t *testing.T) { + config.Config = &config.Configuration{} + dao.ScheduleJob = &fakedScheduleJobDAO{} + _, err := dao.ScheduleJob.Add(&rep_models.ScheduleJob{ + PolicyID: policyID, + }) + require.Nil(t, err) + js := &fakedJobserviceClient{} + scheduler := NewScheduler(js) + err = scheduler.Unschedule(policyID) + require.Nil(t, err) + + sjs, err := dao.ScheduleJob.List(&rep_models.ScheduleJobQuery{ + PolicyID: policyID, + }) + require.Nil(t, err) + require.Equal(t, 0, len(sjs)) + + assert.True(t, js.stopped) +} diff --git a/src/replication/ng/policy/scheduler/status.go b/src/replication/ng/policy/scheduler/status.go new file mode 100644 index 000000000..241b0f22a --- /dev/null +++ b/src/replication/ng/policy/scheduler/status.go @@ -0,0 +1,28 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "github.com/goharbor/harbor/src/replication/ng/dao" + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +// UpdateStatus updates the schedule job status +func UpdateStatus(id int64, status string) error { + return dao.ScheduleJob.Update(&models.ScheduleJob{ + ID: id, + Status: status, + }, "Status") +} diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index 0599fe93c..46f1a97cd 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -17,13 +17,16 @@ package ng import ( + "github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/utils/log" cfg "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/config" + "github.com/goharbor/harbor/src/replication/ng/event" "github.com/goharbor/harbor/src/replication/ng/operation" "github.com/goharbor/harbor/src/replication/ng/operation/execution" - "github.com/goharbor/harbor/src/replication/ng/operation/scheduler" + task_scheduler "github.com/goharbor/harbor/src/replication/ng/operation/scheduler" "github.com/goharbor/harbor/src/replication/ng/policy" + "github.com/goharbor/harbor/src/replication/ng/policy/controller" "github.com/goharbor/harbor/src/replication/ng/registry" // register the Harbor adapter @@ -31,12 +34,14 @@ import ( ) var ( - // PolicyMgr is a global policy manager - PolicyMgr policy.Manager + // PolicyCtl is a global policy controller + PolicyCtl policy.Controller // RegistryMgr is a global registry manager RegistryMgr registry.Manager // OperationCtl is a global operation controller OperationCtl operation.Controller + // EventHandler handles images/chart pull/push events + EventHandler event.Handler ) // Init the global variables and configurations @@ -58,13 +63,17 @@ func Init() error { SecretKey: secretKey, Secret: cfg.CoreSecret(), } - // Init registry manager + // TODO use a global http transport + js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.Secret) + // init registry manager RegistryMgr = registry.NewDefaultManager() - // init policy manager - PolicyMgr = policy.NewDefaultManager() - // init operatoin controller + // init policy controller + PolicyCtl = controller.NewController(js) + // init operation controller OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr, - scheduler.NewScheduler(config.Config.JobserviceURL, config.Config.Secret)) + task_scheduler.NewScheduler(js)) + // init event handler + EventHandler = event.NewHandler(PolicyCtl, OperationCtl) log.Debug("the replication initialization completed") return nil } diff --git a/src/replication/ng/replication_test.go b/src/replication/ng/replication_test.go index 93fcbbfea..d876ef103 100644 --- a/src/replication/ng/replication_test.go +++ b/src/replication/ng/replication_test.go @@ -38,7 +38,8 @@ func TestInit(t *testing.T) { config.InitWithSettings(nil) err = Init() require.Nil(t, err) - assert.NotNil(t, PolicyMgr) + assert.NotNil(t, PolicyCtl) assert.NotNil(t, RegistryMgr) assert.NotNil(t, OperationCtl) + assert.NotNil(t, EventHandler) }