Merge pull request #7248 from ywk253100/190326_event

Add event based trigger and scheduled trigger
This commit is contained in:
Wenkai Yin 2019-03-29 14:58:09 +08:00 committed by GitHub
commit 8c7b63bac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1704 additions and 89 deletions

View File

@ -91,4 +91,15 @@ create table replication_task (
end_time timestamp NULL,
PRIMARY KEY (id)
);
CREATE INDEX task_execution ON replication_task (execution_id);
CREATE INDEX task_execution ON replication_task (execution_id);
create table replication_schedule_job (
id SERIAL NOT NULL,
policy_id int NOT NULL,
job_id varchar(64),
status varchar(32),
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp NULL,
PRIMARY KEY (id)
);
CREATE INDEX replication_schedule_job_index ON replication_schedule_job (policy_id);

View File

@ -18,7 +18,7 @@ import (
"fmt"
"github.com/astaxie/beego/orm"
_ "github.com/mattn/go-sqlite3" // register sqlite driver
// _ "github.com/mattn/go-sqlite3" // register sqlite driver
)
type sqlite struct {

View File

@ -15,6 +15,8 @@ const (
ImageGC = "IMAGE_GC"
// Replication : the name of the replication job in job service
Replication = "REPLICATION"
// Scheduler : the name of the scheduler job in job service
Scheduler = "SCHEDULER"
// JobKindGeneric : Kind of generic job
JobKindGeneric = "Generic"

View File

@ -31,9 +31,9 @@ type ReplicationOperationAPI struct {
// Prepare ...
func (r *ReplicationOperationAPI) Prepare() {
r.BaseController.Prepare()
// TODO if we delegate the jobservice to trigger the scheduled replication,
// add the logic to check whether the user is a solution user
if !r.SecurityCtx.IsSysAdmin() {
// As we delegate the jobservice to trigger the scheduled replication,
// we need to allow the jobservice to call the API
if !(r.SecurityCtx.IsSysAdmin() || r.SecurityCtx.IsSolutionUser()) {
if !r.SecurityCtx.IsAuthenticated() {
r.HandleUnauthorized()
return
@ -102,7 +102,7 @@ func (r *ReplicationOperationAPI) ListExecutions() {
func (r *ReplicationOperationAPI) CreateExecution() {
execution := &models.Execution{}
r.DecodeJSONReq(execution)
policy, err := ng.PolicyMgr.Get(execution.PolicyID)
policy, err := ng.PolicyCtl.Get(execution.PolicyID)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get policy %d: %v", execution.PolicyID, err))
return

View File

@ -147,13 +147,13 @@ func TestListExecutions(t *testing.T) {
func TestCreateExecution(t *testing.T) {
operationCtl := ng.OperationCtl
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
defer func() {
ng.OperationCtl = operationCtl
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
}()
ng.OperationCtl = &fakedOperationController{}
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
cases := []*codeCheckingCase{
// 401

View File

@ -52,7 +52,7 @@ func (r *ReplicationPolicyAPI) List() {
}
query.Page, query.Size = r.GetPaginationParams()
total, policies, err := ng.PolicyMgr.List(query)
total, policies, err := ng.PolicyCtl.List(query)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to list policies: %v", err))
return
@ -73,20 +73,17 @@ func (r *ReplicationPolicyAPI) Create() {
return
}
id, err := ng.PolicyMgr.Create(policy)
id, err := ng.PolicyCtl.Create(policy)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to create the policy: %v", err))
return
}
// TODO handle replication_now?
r.Redirect(http.StatusCreated, strconv.FormatInt(id, 10))
}
// make sure the policy name doesn't exist
func (r *ReplicationPolicyAPI) validateName(policy *model.Policy) bool {
p, err := ng.PolicyMgr.GetByName(policy.Name)
p, err := ng.PolicyCtl.GetByName(policy.Name)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get policy %s: %v", policy.Name, err))
return false
@ -116,8 +113,6 @@ func (r *ReplicationPolicyAPI) validateRegistry(policy *model.Policy) bool {
return true
}
// TODO validate trigger in create and update
// Get the specified replication policy
func (r *ReplicationPolicyAPI) Get() {
id, err := r.GetInt64FromPath(":id")
@ -126,7 +121,7 @@ func (r *ReplicationPolicyAPI) Get() {
return
}
policy, err := ng.PolicyMgr.Get(id)
policy, err := ng.PolicyCtl.Get(id)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err))
return
@ -147,7 +142,7 @@ func (r *ReplicationPolicyAPI) Update() {
return
}
originalPolicy, err := ng.PolicyMgr.Get(id)
originalPolicy, err := ng.PolicyCtl.Get(id)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err))
return
@ -168,8 +163,8 @@ func (r *ReplicationPolicyAPI) Update() {
return
}
// TODO passing the properties need to be updated?
if err := ng.PolicyMgr.Update(policy); err != nil {
policy.ID = id
if err := ng.PolicyCtl.Update(policy); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to update the policy %d: %v", id, err))
return
}
@ -183,7 +178,7 @@ func (r *ReplicationPolicyAPI) Delete() {
return
}
policy, err := ng.PolicyMgr.Get(id)
policy, err := ng.PolicyCtl.Get(id)
if err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to get the policy %d: %v", id, err))
return
@ -208,7 +203,7 @@ func (r *ReplicationPolicyAPI) Delete() {
}
}
if err := ng.PolicyMgr.Remove(id); err != nil {
if err := ng.PolicyCtl.Remove(id); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to delete the policy %d: %v", id, err))
return
}

View File

@ -55,11 +55,11 @@ func (f *fakedRegistryManager) HealthCheck() error {
}
func TestReplicationPolicyAPIList(t *testing.T) {
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
defer func() {
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
}()
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
cases := []*codeCheckingCase{
// 401
{
@ -93,13 +93,13 @@ func TestReplicationPolicyAPIList(t *testing.T) {
}
func TestReplicationPolicyAPICreate(t *testing.T) {
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() {
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}()
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{
// 401
@ -206,11 +206,11 @@ func TestReplicationPolicyAPICreate(t *testing.T) {
}
func TestReplicationPolicyAPIGet(t *testing.T) {
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
defer func() {
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
}()
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
cases := []*codeCheckingCase{
// 401
{
@ -253,13 +253,13 @@ func TestReplicationPolicyAPIGet(t *testing.T) {
}
func TestReplicationPolicyAPIUpdate(t *testing.T) {
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
registryMgr := ng.RegistryMgr
defer func() {
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
ng.RegistryMgr = registryMgr
}()
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
ng.RegistryMgr = &fakedRegistryManager{}
cases := []*codeCheckingCase{
// 401
@ -350,11 +350,11 @@ func TestReplicationPolicyAPIUpdate(t *testing.T) {
}
func TestReplicationPolicyAPIDelete(t *testing.T) {
policyMgr := ng.PolicyMgr
policyMgr := ng.PolicyCtl
defer func() {
ng.PolicyMgr = policyMgr
ng.PolicyCtl = policyMgr
}()
ng.PolicyMgr = &fakedPolicyManager{}
ng.PolicyCtl = &fakedPolicyManager{}
cases := []*codeCheckingCase{
// 401
{

View File

@ -128,8 +128,8 @@ func initRouters() {
beego.Router("/service/notifications", &registry.NotificationHandler{})
beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle")
beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan")
beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication")
beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob")
beego.Router("/service/notifications/jobs/replication/schedule/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationScheduleJob")
beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask")
beego.Router("/service/token", &token.Handler{})

View File

@ -25,6 +25,7 @@ import (
"github.com/goharbor/harbor/src/core/api"
"github.com/goharbor/harbor/src/replication/ng"
"github.com/goharbor/harbor/src/replication/ng/operation/hook"
"github.com/goharbor/harbor/src/replication/ng/policy/scheduler"
)
var statusMap = map[string]string{
@ -82,10 +83,10 @@ func (h *Handler) HandleScan() {
}
}
// HandleReplication handles the webhook of replication job
func (h *Handler) HandleReplication() {
log.Debugf("received replication job status update event: job-%d, status-%s", h.id, h.status)
if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil {
// HandleReplicationScheduleJob handles the webhook of replication schedule job
func (h *Handler) HandleReplicationScheduleJob() {
log.Debugf("received replication schedule job status update event: schedule-job-%d, status-%s", h.id, h.status)
if err := scheduler.UpdateStatus(h.id, h.status); err != nil {
log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status)
h.HandleInternalServerError(err.Error())
return

View File

@ -27,10 +27,10 @@ import (
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api"
"github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/core/notifier"
coreutils "github.com/goharbor/harbor/src/core/utils"
rep_notification "github.com/goharbor/harbor/src/replication/event/notification"
"github.com/goharbor/harbor/src/replication/event/topic"
"github.com/goharbor/harbor/src/replication/ng"
rep_event "github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng/model"
)
// NotificationHandler handles request on /service/notifications/, which listens to registry's events.
@ -111,16 +111,22 @@ func (n *NotificationHandler) Post() {
return
}
// TODO: handle image delete event and chart event
go func() {
image := repository + ":" + tag
err := notifier.Publish(topic.ReplicationEventTopicOnPush, rep_notification.OnPushNotification{
Image: image,
})
if err != nil {
log.Errorf("failed to publish on push topic for resource %s: %v", image, err)
return
e := &rep_event.Event{
Type: rep_event.EventTypeImagePush,
Resource: &model.Resource{
Type: model.ResourceTypeRepository,
Metadata: &model.ResourceMetadata{
Name: repository,
Namespace: project,
Vtags: []string{tag},
},
},
}
if err := ng.EventHandler.Handle(e); err != nil {
log.Errorf("failed to handle event: %v", err)
}
log.Debugf("the on push topic for resource %s published", image)
}()
if autoScanEnabled(pro) {

View File

@ -0,0 +1,71 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"net/http"
"os"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier/auth"
reg "github.com/goharbor/harbor/src/common/utils/registry"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/opm"
)
// Scheduler is a job running in Jobservice which can be used as
// a scheduler when submitting it as a scheduled job. It receives
// a URL and data, and post the data to the URL when it is running
type Scheduler struct {
ctx env.JobContext
}
// ShouldRetry ...
func (s *Scheduler) ShouldRetry() bool {
return false
}
// MaxFails ...
func (s *Scheduler) MaxFails() uint {
return 0
}
// Validate ....
func (s *Scheduler) Validate(params map[string]interface{}) error {
return nil
}
// Run ...
func (s *Scheduler) Run(ctx env.JobContext, params map[string]interface{}) error {
cmd, exist := ctx.OPCommand()
if exist && cmd == opm.CtlCommandStop {
return errs.JobStoppedError()
}
logger := ctx.GetLogger()
url := params["url"].(string)
data := params["data"]
cred := auth.NewSecretAuthorizer(os.Getenv("JOBSERVICE_SECRET"))
client := common_http.NewClient(&http.Client{
Transport: reg.GetHTTPTransport(true),
}, cred)
if err := client.Post(url, data); err != nil {
logger.Errorf("failed to run the schedule job: %v", err)
return err
}
logger.Info("the schedule job finished")
return nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication/ng"
"github.com/goharbor/harbor/src/jobservice/job/impl/scan"
"github.com/goharbor/harbor/src/jobservice/job/impl/scheduler"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/pool"
@ -213,6 +214,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
job.ImageReplicate: (*replication.Replicator)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.Replication: (*ng.Replication)(nil),
job.Scheduler: (*scheduler.Scheduler)(nil),
}); err != nil {
// exit
return nil, err

View File

@ -21,6 +21,7 @@ import (
"github.com/goharbor/harbor/src/common/dao"
)
// TODO clean up the file
func TestMain(m *testing.M) {
dao.PrepareTestForPostgresSQL()
@ -33,7 +34,7 @@ func TestMain(m *testing.M) {
"job_log", "project", "project_member", "project_metadata", "properties", "registry",
"replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng",
"replication_target", "repository", "robot", "role", "schema_migrations", "user_group",
"replication_execution", "replication_task";`,
"replication_execution", "replication_task", "replication_schedule_job";`,
`DROP FUNCTION "update_update_time_at_column"();`,
}
dao.PrepareTestData(clearSqls, nil)

View File

@ -9,7 +9,8 @@ func init() {
new(Registry),
new(RepPolicy),
new(Execution),
new(Task))
new(Task),
new(ScheduleJob))
}
// Pagination ...

View File

@ -0,0 +1,40 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO rename the package name to model
package models
import "time"
// ScheduleJob is the persistent model for the schedule job which is
// used as a scheduler
type ScheduleJob struct {
ID int64 `orm:"pk;auto;column(id)" json:"id"`
PolicyID int64 `orm:"column(policy_id)" json:"policy_id"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"`
UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"`
}
// TableName is required by by beego orm to map the object to the database table
func (s *ScheduleJob) TableName() string {
return "replication_schedule_job"
}
// ScheduleJobQuery is the query used to list schedule jobs
type ScheduleJobQuery struct {
PolicyID int64
}

View File

@ -0,0 +1,92 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"time"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
)
// ScheduleJob is the DAO for schedule job
var ScheduleJob ScheduleJobDAO = &scheduleJobDAO{}
// ScheduleJobDAO ...
type ScheduleJobDAO interface {
Add(*models.ScheduleJob) (int64, error)
Get(int64) (*models.ScheduleJob, error)
Update(*models.ScheduleJob, ...string) error
Delete(int64) error
List(...*models.ScheduleJobQuery) ([]*models.ScheduleJob, error)
}
type scheduleJobDAO struct{}
func (s *scheduleJobDAO) Add(sj *models.ScheduleJob) (int64, error) {
now := time.Now()
sj.CreationTime = now
sj.UpdateTime = now
return dao.GetOrmer().Insert(sj)
}
func (s *scheduleJobDAO) Get(id int64) (*models.ScheduleJob, error) {
sj := &models.ScheduleJob{
ID: id,
}
if err := dao.GetOrmer().Read(sj); err != nil {
if err == orm.ErrNoRows {
return nil, nil
}
return nil, err
}
return sj, nil
}
func (s *scheduleJobDAO) Update(sj *models.ScheduleJob, props ...string) error {
if sj.UpdateTime.IsZero() {
now := time.Now()
sj.UpdateTime = now
if len(props) > 0 {
props = append(props, "UpdateTime")
}
}
_, err := dao.GetOrmer().Update(sj, props...)
return err
}
func (s *scheduleJobDAO) Delete(id int64) error {
_, err := dao.GetOrmer().Delete(&models.ScheduleJob{
ID: id,
})
return err
}
func (s *scheduleJobDAO) List(query ...*models.ScheduleJobQuery) ([]*models.ScheduleJob, error) {
qs := dao.GetOrmer().QueryTable(&models.ScheduleJob{})
if len(query) > 0 && query[0] != nil {
if query[0].PolicyID > 0 {
qs = qs.Filter("PolicyID", query[0].PolicyID)
}
}
sjs := []*models.ScheduleJob{}
_, err := qs.All(&sjs)
if err != nil {
return nil, err
}
return sjs, nil
}

View File

@ -0,0 +1,83 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dao
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
)
var sjID int64
func TestAddScheduleJob(t *testing.T) {
sj := &models.ScheduleJob{
PolicyID: 1,
JobID: "uuid",
Status: "running",
}
id, err := ScheduleJob.Add(sj)
require.Nil(t, err)
sjID = id
}
func TestUpdateScheduleJob(t *testing.T) {
err := ScheduleJob.Update(&models.ScheduleJob{
ID: sjID,
Status: "success",
}, "Status")
require.Nil(t, err)
}
func TestGetScheduleJob(t *testing.T) {
sj, err := ScheduleJob.Get(sjID)
require.Nil(t, err)
assert.Equal(t, int64(1), sj.PolicyID)
assert.Equal(t, "success", sj.Status)
}
func TestListScheduleJobs(t *testing.T) {
// nil query
sjs, err := ScheduleJob.List()
require.Nil(t, err)
assert.Equal(t, 1, len(sjs))
// query
sjs, err = ScheduleJob.List(&models.ScheduleJobQuery{
PolicyID: 1,
})
require.Nil(t, err)
assert.Equal(t, 1, len(sjs))
// query
sjs, err = ScheduleJob.List(&models.ScheduleJobQuery{
PolicyID: 2,
})
require.Nil(t, err)
assert.Equal(t, 0, len(sjs))
}
func TestDeleteScheduleJob(t *testing.T) {
err := ScheduleJob.Delete(sjID)
require.Nil(t, err)
sj, err := ScheduleJob.Get(sjID)
require.Nil(t, err)
assert.Nil(t, sj)
}

View File

@ -0,0 +1,31 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import "github.com/goharbor/harbor/src/replication/ng/model"
// const definitions
const (
EventTypeImagePush = "image_push"
EventTypeImageDelete = "image_delete"
EventTypeChartUpload = "chart_upload"
EventTypeChartDelete = "chart_delete"
)
// Event is the model that defines the image/chart pull/push event
type Event struct {
Type string
Resource *model.Resource
}

View File

@ -0,0 +1,113 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"errors"
"fmt"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/policy"
)
// Handler is the handler to handle event
type Handler interface {
Handle(event *Event) error
}
// NewHandler ...
func NewHandler(policyCtl policy.Controller, opCtl operation.Controller) Handler {
return &handler{
policyCtl: policyCtl,
opCtl: opCtl,
}
}
type handler struct {
policyCtl policy.Controller
opCtl operation.Controller
}
func (h *handler) Handle(event *Event) error {
if event == nil || event.Resource == nil ||
event.Resource.Metadata == nil ||
len(event.Resource.Metadata.Vtags) == 0 {
return errors.New("invalid event")
}
var policies []*model.Policy
var err error
switch event.Type {
case EventTypeImagePush, EventTypeChartUpload:
policies, err = h.getRelatedPolicies(event.Resource.Metadata.Namespace)
case EventTypeImageDelete, EventTypeChartDelete:
policies, err = h.getRelatedPolicies(event.Resource.Metadata.Namespace, true)
default:
return fmt.Errorf("unsupported event type %s", event.Type)
}
if err != nil {
return err
}
if len(policies) == 0 {
log.Debugf("no policy found for the event %v, do nothing", event)
return nil
}
for _, policy := range policies {
id, err := h.opCtl.StartReplication(policy, event.Resource)
if err != nil {
return err
}
log.Debugf("%s event received, the replication execution %d started", event.Type, id)
}
return nil
}
func (h *handler) getRelatedPolicies(namespace string, replicateDeletion ...bool) ([]*model.Policy, error) {
_, policies, err := h.policyCtl.List()
if err != nil {
return nil, err
}
result := []*model.Policy{}
for _, policy := range policies {
exist := false
for _, ns := range policy.SrcNamespaces {
if ns == namespace {
exist = true
break
}
}
// contains no namespace that is specified
if !exist {
continue
}
// has no trigger
if policy.Trigger == nil {
continue
}
// trigger type isn't event based
if policy.Trigger.Type != model.TriggerTypeEventBased {
continue
}
// whether replicate deletion doesn't match the value specified in policy
if len(replicateDeletion) > 0 && replicateDeletion[0] != policy.Deletion {
continue
}
result = append(result, policy)
}
return result, nil
}

View File

@ -0,0 +1,180 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
)
type fakedOperationController struct{}
func (f *fakedOperationController) StartReplication(policy *model.Policy, resource *model.Resource) (int64, error) {
return 1, nil
}
func (f *fakedOperationController) StopReplication(int64) error {
return nil
}
func (f *fakedOperationController) ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) {
return 0, nil, nil
}
func (f *fakedOperationController) GetExecution(id int64) (*models.Execution, error) {
return nil, nil
}
func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) {
return 0, nil, nil
}
func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) {
return nil, nil
}
func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error {
return nil
}
func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) {
return nil, nil
}
type fakedPolicyController struct{}
func (f *fakedPolicyController) Create(*model.Policy) (int64, error) {
return 0, nil
}
func (f *fakedPolicyController) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
polices := []*model.Policy{
{
ID: 1,
SrcNamespaces: []string{"test"},
Deletion: false,
Trigger: &model.Trigger{
Type: model.TriggerTypeEventBased,
},
},
{
ID: 2,
SrcNamespaces: []string{"library"},
Deletion: true,
Trigger: nil,
},
{
ID: 3,
SrcNamespaces: []string{"library"},
Deletion: false,
Trigger: &model.Trigger{
Type: model.TriggerTypeEventBased,
},
},
{
ID: 4,
SrcNamespaces: []string{"library"},
Deletion: true,
Trigger: &model.Trigger{
Type: model.TriggerTypeEventBased,
},
},
}
return int64(len(polices)), polices, nil
}
func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) {
return nil, nil
}
func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) {
return nil, nil
}
func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
return nil
}
func (f *fakedPolicyController) Remove(int64) error {
return nil
}
func TestGetRelatedPolicies(t *testing.T) {
handler := &handler{
policyCtl: &fakedPolicyController{},
}
policies, err := handler.getRelatedPolicies("library")
require.Nil(t, err)
assert.Equal(t, 2, len(policies))
assert.Equal(t, int64(3), policies[0].ID)
assert.Equal(t, int64(4), policies[1].ID)
policies, err = handler.getRelatedPolicies("library", true)
require.Nil(t, err)
assert.Equal(t, 1, len(policies))
assert.Equal(t, int64(4), policies[0].ID)
}
func TestHandle(t *testing.T) {
handler := NewHandler(&fakedPolicyController{}, &fakedOperationController{})
// nil event
err := handler.Handle(nil)
require.NotNil(t, err)
// nil vtags
err = handler.Handle(&Event{
Resource: &model.Resource{
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{},
},
},
Type: EventTypeImagePush,
})
require.NotNil(t, err)
// unsupported event type
err = handler.Handle(&Event{
Resource: &model.Resource{
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
},
Type: "unsupported",
})
require.NotNil(t, err)
// push image
err = handler.Handle(&Event{
Resource: &model.Resource{
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
},
Type: EventTypeImagePush,
})
require.Nil(t, err)
// delete image
err = handler.Handle(&Event{
Resource: &model.Resource{
Metadata: &model.ResourceMetadata{
Name: "library/hello-world",
Namespace: "library",
Vtags: []string{"latest"},
},
},
Type: EventTypeImageDelete,
})
require.Nil(t, err)
}

View File

@ -15,6 +15,7 @@
package model
import (
"fmt"
"time"
"github.com/astaxie/beego/validation"
@ -57,6 +58,7 @@ type Policy struct {
// Trigger
Trigger *Trigger `json:"trigger"`
// Settings
// TODO: rename the property name
Deletion bool `json:"deletion"`
// If override the image tag
Override bool `json:"override"`
@ -90,7 +92,29 @@ func (p *Policy) Valid(v *validation.Validation) {
}
}
// TODO valid trigger and filters
// valid the filters
for _, filter := range p.Filters {
if filter.Type != FilterTypeResource &&
filter.Type != FilterTypeName &&
filter.Type != FilterTypeTag &&
filter.Type != FilterTypeLabel {
v.SetError("filters", "invalid filter type")
break
}
}
// valid trigger
if p.Trigger != nil {
if p.Trigger.Type != TriggerTypeManual &&
p.Trigger.Type != TriggerTypeScheduled &&
p.Trigger.Type != TriggerTypeEventBased {
v.SetError("trigger", "invalid trigger type")
}
if p.Trigger.Type == TriggerTypeScheduled &&
(p.Trigger.Settings == nil || len(p.Trigger.Settings.Cron) == 0) {
v.SetError("trigger", fmt.Sprintf("the cron string cannot be empty when the trigger type is %s", TriggerTypeScheduled))
}
}
}
// FilterType represents the type info of the filter.
@ -105,7 +129,7 @@ type Filter struct {
// TriggerType represents the type of trigger.
type TriggerType string
// Trigger holds info fot a trigger
// Trigger holds info for a trigger
type Trigger struct {
Type TriggerType `json:"type"`
Settings *TriggerSettings `json:"trigger_settings"`

View File

@ -0,0 +1,153 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/astaxie/beego/validation"
)
func TestValidOfPolicy(t *testing.T) {
cases := []struct {
policy *Policy
pass bool
}{
// empty name
{
policy: &Policy{},
pass: false,
},
// empty source registry and destination registry
{
policy: &Policy{
Name: "policy01",
},
pass: false,
},
// source registry and destination registry both not empty
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 1,
DestRegistryID: 2,
},
pass: false,
},
// empty source namespaces
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{},
},
pass: false,
},
// empty source namespaces
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{""},
},
pass: false,
},
// invalid filter
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
Type: "invalid_type",
},
},
},
pass: false,
},
// invalid trigger
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
Type: FilterTypeName,
Value: "library",
},
},
Trigger: &Trigger{
Type: "invalid_type",
},
},
pass: false,
},
// invalid trigger
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
Type: FilterTypeName,
Value: "library",
},
},
Trigger: &Trigger{
Type: TriggerTypeScheduled,
},
},
pass: false,
},
// pass
{
policy: &Policy{
Name: "policy01",
SrcRegistryID: 0,
DestRegistryID: 1,
SrcNamespaces: []string{"library"},
Filters: []*Filter{
{
Type: FilterTypeName,
Value: "library",
},
},
Trigger: &Trigger{
Type: TriggerTypeScheduled,
Settings: &TriggerSettings{
Cron: "* * *",
},
},
},
pass: true,
},
}
for _, c := range cases {
v := &validation.Validation{}
c.policy.Valid(v)
assert.Equal(t, c.pass, len(v.Errors) == 0)
}
}

View File

@ -32,11 +32,12 @@ type defaultScheduler struct {
}
// TODO use the service account?
// TODO use the common transport
// NewScheduler returns an instance of Scheduler
func NewScheduler(jobserviceURL, secret string) Scheduler {
func NewScheduler(js job.Client) Scheduler {
return &defaultScheduler{
client: job.NewDefaultClient(jobserviceURL, secret),
client: js,
}
}

View File

@ -0,0 +1,36 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package policy
import (
"github.com/goharbor/harbor/src/replication/ng/model"
)
// Controller controls the replication policies
type Controller interface {
// Create new policy
Create(*model.Policy) (int64, error)
// List the policies, returns the total count, policy list and error
List(...*model.PolicyQuery) (int64, []*model.Policy, error)
// Get policy with specified ID
Get(int64) (*model.Policy, error)
// Get policy by the name
GetByName(string) (*model.Policy, error)
// Update the specified policy, the "props" are the properties of policy
// that need to be updated
Update(policy *model.Policy, props ...string) error
// Remove the specified policy
Remove(int64) error
}

View File

@ -0,0 +1,145 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"fmt"
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/policy/manager"
"github.com/goharbor/harbor/src/replication/ng/policy/scheduler"
)
// NewController returns a policy controller which can CURD and schedule policies
func NewController(js job.Client) policy.Controller {
mgr := manager.NewDefaultManager()
scheduler := scheduler.NewScheduler(js)
ctl := &controller{
scheduler: scheduler,
}
ctl.Controller = mgr
return ctl
}
type controller struct {
policy.Controller
scheduler scheduler.Scheduler
}
func (c *controller) Create(policy *model.Policy) (int64, error) {
id, err := c.Controller.Create(policy)
if err != nil {
return 0, err
}
if isScheduledTrigger(policy) {
// TODO: need a way to show the schedule status to users
// maybe we can add a property "schedule status" for
// listing policy API
if err = c.scheduler.Schedule(id, policy.Trigger.Settings.Cron); err != nil {
log.Errorf("failed to schedule the policy %d: %v", id, err)
}
}
return id, nil
}
func (c *controller) Update(policy *model.Policy, props ...string) error {
origin, err := c.Controller.Get(policy.ID)
if err != nil {
return err
}
if origin == nil {
return fmt.Errorf("policy %d not found", policy.ID)
}
// if no need to reschedule the policy, just update it
if !isScheduleTriggerChanged(origin, policy, props...) {
return c.Controller.Update(policy, props...)
}
// need to reschedule the policy
// unschedule first if needed
if isScheduledTrigger(origin) {
if err = c.scheduler.Unschedule(origin.ID); err != nil {
return fmt.Errorf("failed to unschedule the policy %d: %v", origin.ID, err)
}
}
// update the policy
if err = c.Controller.Update(policy, props...); err != nil {
return err
}
// schedule again if needed
if isScheduledTrigger(policy) {
if err = c.scheduler.Schedule(policy.ID, policy.Trigger.Settings.Cron); err != nil {
return fmt.Errorf("failed to schedule the policy %d: %v", policy.ID, err)
}
}
return nil
}
func (c *controller) Remove(policyID int64) error {
policy, err := c.Controller.Get(policyID)
if err != nil {
return err
}
if policy == nil {
return fmt.Errorf("policy %d not found", policyID)
}
if isScheduledTrigger(policy) {
if err = c.scheduler.Unschedule(policyID); err != nil {
return err
}
}
return c.Controller.Remove(policyID)
}
func isScheduledTrigger(policy *model.Policy) bool {
if policy == nil {
return false
}
if policy.Trigger == nil {
return false
}
return policy.Trigger.Type == model.TriggerTypeScheduled
}
func isScheduleTriggerChanged(origin, current *model.Policy, props ...string) bool {
// doesn't update the trigger property
if len(props) > 0 {
found := false
for _, prop := range props {
if prop == "Trigger" || prop == "cron_str" {
found = true
break
}
}
if !found {
return false
}
}
o := isScheduledTrigger(origin)
c := isScheduledTrigger(current)
// both triggers are not scheduled
if !o && !c {
return false
}
// both triggers are scheduled
if o && c {
return origin.Trigger.Settings.Cron != current.Trigger.Settings.Cron
}
// one is scheduled but the other one isn't
return true
}

View File

@ -0,0 +1,315 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/replication/ng/model"
)
type fakedPolicyController struct {
policy *model.Policy
}
func (f *fakedPolicyController) Create(*model.Policy) (int64, error) {
return 0, nil
}
func (f *fakedPolicyController) List(...*model.PolicyQuery) (int64, []*model.Policy, error) {
return 0, nil, nil
}
func (f *fakedPolicyController) Get(id int64) (*model.Policy, error) {
return f.policy, nil
}
func (f *fakedPolicyController) GetByName(name string) (*model.Policy, error) {
return nil, nil
}
func (f *fakedPolicyController) Update(*model.Policy, ...string) error {
return nil
}
func (f *fakedPolicyController) Remove(int64) error {
return nil
}
type fakedScheduler struct {
scheduled bool
unscheduled bool
}
func (f *fakedScheduler) Schedule(policyID int64, cron string) error {
f.scheduled = true
return nil
}
func (f *fakedScheduler) Unschedule(policyID int64) error {
f.unscheduled = true
return nil
}
func TestIsScheduledTrigger(t *testing.T) {
cases := []struct {
policy *model.Policy
expected bool
}{
// policy is nil
{
policy: nil,
expected: false,
},
// trigger is nil
{
policy: &model.Policy{},
expected: false,
},
// trigger type isn't scheduled
{
policy: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
expected: false,
},
// trigger type is scheduled
{
policy: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
},
},
expected: true,
},
}
for _, c := range cases {
assert.Equal(t, c.expected, isScheduledTrigger(c.policy))
}
}
func TestIsScheduleTriggerChanged(t *testing.T) {
cases := []struct {
origin *model.Policy
current *model.Policy
props []string
expected bool
}{
// props contains no trigger field
{
props: []string{"name"},
expected: false,
},
// both triggers are not scheduled
{
origin: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
current: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
props: []string{"Trigger"},
expected: false,
},
// both triggers are scheduled and the crons are not same
{
origin: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
current: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 * * * *",
},
},
},
props: []string{"Trigger"},
expected: true,
},
// both triggers are scheduled and the crons are same
{
origin: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
current: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
props: []string{"Trigger"},
expected: false,
},
// one trigger is scheduled but the other one isn't
{
origin: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
},
current: &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
},
props: []string{"Trigger"},
expected: true,
},
}
for _, c := range cases {
assert.Equal(t, c.expected, isScheduleTriggerChanged(c.origin, c.current, c.props...))
}
}
func TestCreate(t *testing.T) {
scheduler := &fakedScheduler{}
ctl := &controller{
scheduler: scheduler,
}
ctl.Controller = &fakedPolicyController{}
// not scheduled trigger
_, err := ctl.Create(&model.Policy{})
require.Nil(t, err)
assert.False(t, scheduler.scheduled)
// scheduled trigger
_, err = ctl.Create(&model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
})
require.Nil(t, err)
assert.True(t, scheduler.scheduled)
}
func TestUpdate(t *testing.T) {
scheduler := &fakedScheduler{}
c := &fakedPolicyController{}
ctl := &controller{
scheduler: scheduler,
}
ctl.Controller = c
var origin, current *model.Policy
// origin policy is nil
current = &model.Policy{
ID: 1,
}
err := ctl.Update(current)
assert.NotNil(t, err)
// the trigger doesn't change
origin = &model.Policy{
ID: 1,
}
c.policy = origin
current = origin
err = ctl.Update(current, "Trigger")
require.Nil(t, err)
assert.False(t, scheduler.scheduled)
assert.False(t, scheduler.unscheduled)
// the trigger changed
origin = &model.Policy{
ID: 1,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
}
c.policy = origin
current = &model.Policy{
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 * * * *",
},
},
}
err = ctl.Update(current, "Trigger")
require.Nil(t, err)
assert.True(t, scheduler.unscheduled)
assert.True(t, scheduler.scheduled)
}
func TestRemove(t *testing.T) {
scheduler := &fakedScheduler{}
c := &fakedPolicyController{}
ctl := &controller{
scheduler: scheduler,
}
ctl.Controller = c
// policy is nil
err := ctl.Remove(1)
assert.NotNil(t, err)
// the trigger type isn't scheduled
policy := &model.Policy{
ID: 1,
Trigger: &model.Trigger{
Type: model.TriggerTypeManual,
},
}
c.policy = policy
err = ctl.Remove(1)
require.Nil(t, err)
assert.False(t, scheduler.unscheduled)
// the trigger type is scheduled
policy = &model.Policy{
ID: 1,
Trigger: &model.Trigger{
Type: model.TriggerTypeScheduled,
Settings: &model.TriggerSettings{
Cron: "03 05 * * *",
},
},
}
c.policy = policy
err = ctl.Remove(1)
require.Nil(t, err)
assert.True(t, scheduler.unscheduled)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package policy
package manager
import (
"encoding/json"
@ -23,6 +23,7 @@ import (
"github.com/goharbor/harbor/src/replication/ng/dao"
persist_models "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/goharbor/harbor/src/replication/ng/model"
"github.com/goharbor/harbor/src/replication/ng/policy"
)
var errNilPolicyModel = errors.New("nil policy model")
@ -113,27 +114,10 @@ func convertToPersistModel(policy *model.Policy) (*persist_models.RepPolicy, err
return ply, nil
}
// Manager manages replication policies
type Manager interface {
// Create new policy
Create(*model.Policy) (int64, error)
// List the policies, returns the total count, policy list and error
List(...*model.PolicyQuery) (int64, []*model.Policy, error)
// Get policy with specified ID
Get(int64) (*model.Policy, error)
// Get policy by the name
GetByName(string) (*model.Policy, error)
// Update the specified policy, the "props" are the properties of policy
// that need to be updated
Update(policy *model.Policy, props ...string) error
// Remove the specified policy
Remove(int64) error
}
// DefaultManager provides replication policy CURD capabilities.
type DefaultManager struct{}
var _ Manager = &DefaultManager{}
var _ policy.Controller = &DefaultManager{}
// NewDefaultManager is the constructor of DefaultManager.
func NewDefaultManager() *DefaultManager {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package policy
package manager
import (
"reflect"

View File

@ -0,0 +1,121 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"fmt"
"net/http"
"time"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/job"
job_models "github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/dao"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
)
// Scheduler can be used to schedule or unschedule a scheduled policy
// Currently, the default scheduler implements its capabilities by delegating
// the scheduled job of jobservice
type Scheduler interface {
Schedule(policyID int64, cron string) error
Unschedule(policyID int64) error
}
// NewScheduler returns an instance of scheduler
func NewScheduler(js job.Client) Scheduler {
return &scheduler{
jobservice: js,
}
}
type scheduler struct {
jobservice job.Client
}
func (s *scheduler) Schedule(policyID int64, cron string) error {
now := time.Now()
id, err := dao.ScheduleJob.Add(&models.ScheduleJob{
PolicyID: policyID,
Status: job.JobServiceStatusPending,
CreationTime: now,
UpdateTime: now,
})
if err != nil {
return err
}
log.Debugf("the schedule job record %d added", id)
replicateURL := fmt.Sprintf("%s/api/replication/executions", config.Config.CoreURL)
statusHookURL := fmt.Sprintf("%s/service/notifications/jobs/replication/schedule/%d", config.Config.CoreURL, id)
jobID, err := s.jobservice.SubmitJob(&job_models.JobData{
Name: job.Scheduler,
Parameters: map[string]interface{}{
"url": replicateURL,
"data": &models.Execution{
PolicyID: policyID,
},
},
Metadata: &job_models.JobMetadata{
JobKind: job.JobKindPeriodic,
Cron: cron,
},
StatusHook: statusHookURL,
})
if err != nil {
// clean up the record in database
if e := dao.ScheduleJob.Delete(id); e != nil {
log.Errorf("failed to delete the schedule job %d: %v", id, e)
} else {
log.Debugf("the schedule job record %d deleted", id)
}
return err
}
log.Debugf("the schedule job for policy %d submitted to the jobservice", policyID)
err = dao.ScheduleJob.Update(&models.ScheduleJob{
ID: id,
JobID: jobID,
}, "JobID")
log.Debugf("the policy %d scheduled", policyID)
return err
}
func (s *scheduler) Unschedule(policyID int64) error {
sjs, err := dao.ScheduleJob.List(&models.ScheduleJobQuery{
PolicyID: policyID,
})
if err != nil {
return err
}
for _, sj := range sjs {
if err = s.jobservice.PostAction(sj.JobID, job.JobActionStop); err != nil {
// if the job specified by jobID is not found in jobservice, just delete
// the record from database
if e, ok := err.(*common_http.Error); !ok || e.Code != http.StatusNotFound {
return err
}
log.Debugf("the stop action for schedule job %s submitted to the jobservice", sj.JobID)
}
if err = dao.ScheduleJob.Delete(sj.ID); err != nil {
return err
}
log.Debugf("the schedule job record %d deleted", sj.ID)
}
log.Debugf("the policy %d unscheduled", policyID)
return nil
}

View File

@ -0,0 +1,169 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"fmt"
"testing"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/stretchr/testify/assert"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/replication/ng/dao"
rep_models "github.com/goharbor/harbor/src/replication/ng/dao/models"
"github.com/stretchr/testify/require"
)
// TODO share the faked implementation in a separated common package?
// TODO can we use a mock framework?
var (
uuid = "uuid"
policyID int64 = 100
)
type fakedJobserviceClient struct {
jobData *models.JobData
stopped bool
}
func (f *fakedJobserviceClient) SubmitJob(jobData *models.JobData) (string, error) {
f.jobData = jobData
return uuid, nil
}
func (f *fakedJobserviceClient) GetJobLog(uuid string) ([]byte, error) {
f.stopped = true
return nil, nil
}
func (f *fakedJobserviceClient) PostAction(uuid, action string) error {
f.stopped = true
return nil
}
type fakedScheduleJobDAO struct {
idCounter int64
sjs map[int64]*rep_models.ScheduleJob
}
func (f *fakedScheduleJobDAO) Add(sj *rep_models.ScheduleJob) (int64, error) {
if f.sjs == nil {
f.sjs = make(map[int64]*rep_models.ScheduleJob)
}
id := f.idCounter + 1
sj.ID = id
f.sjs[id] = sj
return id, nil
}
func (f *fakedScheduleJobDAO) Get(id int64) (*rep_models.ScheduleJob, error) {
if f.sjs == nil {
return nil, nil
}
return f.sjs[id], nil
}
func (f *fakedScheduleJobDAO) Update(sj *rep_models.ScheduleJob, props ...string) error {
err := fmt.Errorf("schedule job %d not found", sj.ID)
if f.sjs == nil {
return err
}
j, exist := f.sjs[sj.ID]
if !exist {
return err
}
if len(props) == 0 {
f.sjs[sj.ID] = sj
return nil
}
for _, prop := range props {
switch prop {
case "PolicyID":
j.PolicyID = sj.PolicyID
case "JobID":
j.JobID = sj.JobID
case "Status":
j.Status = sj.Status
case "UpdateTime":
j.UpdateTime = sj.UpdateTime
}
}
return nil
}
func (f *fakedScheduleJobDAO) Delete(id int64) error {
if f.sjs == nil {
return nil
}
delete(f.sjs, id)
return nil
}
func (f *fakedScheduleJobDAO) List(query ...*rep_models.ScheduleJobQuery) ([]*rep_models.ScheduleJob, error) {
var policyID int64
if len(query) > 0 {
policyID = query[0].PolicyID
}
sjs := []*rep_models.ScheduleJob{}
for _, sj := range f.sjs {
if policyID == 0 {
sjs = append(sjs, sj)
continue
}
if sj.PolicyID == policyID {
sjs = append(sjs, sj)
}
}
return sjs, nil
}
func TestSchedule(t *testing.T) {
config.Config = &config.Configuration{}
dao.ScheduleJob = &fakedScheduleJobDAO{}
js := &fakedJobserviceClient{}
scheduler := NewScheduler(js)
err := scheduler.Schedule(policyID, "1 * * * *")
require.Nil(t, err)
sjs, err := dao.ScheduleJob.List(&rep_models.ScheduleJobQuery{
PolicyID: policyID,
})
require.Nil(t, err)
require.Equal(t, 1, len(sjs))
assert.Equal(t, uuid, sjs[0].JobID)
execution, ok := js.jobData.Parameters["data"].(*rep_models.Execution)
require.True(t, ok)
assert.Equal(t, policyID, execution.PolicyID)
}
func TestUnschedule(t *testing.T) {
config.Config = &config.Configuration{}
dao.ScheduleJob = &fakedScheduleJobDAO{}
_, err := dao.ScheduleJob.Add(&rep_models.ScheduleJob{
PolicyID: policyID,
})
require.Nil(t, err)
js := &fakedJobserviceClient{}
scheduler := NewScheduler(js)
err = scheduler.Unschedule(policyID)
require.Nil(t, err)
sjs, err := dao.ScheduleJob.List(&rep_models.ScheduleJobQuery{
PolicyID: policyID,
})
require.Nil(t, err)
require.Equal(t, 0, len(sjs))
assert.True(t, js.stopped)
}

View File

@ -0,0 +1,28 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"github.com/goharbor/harbor/src/replication/ng/dao"
"github.com/goharbor/harbor/src/replication/ng/dao/models"
)
// UpdateStatus updates the schedule job status
func UpdateStatus(id int64, status string) error {
return dao.ScheduleJob.Update(&models.ScheduleJob{
ID: id,
Status: status,
}, "Status")
}

View File

@ -17,13 +17,16 @@
package ng
import (
"github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/utils/log"
cfg "github.com/goharbor/harbor/src/core/config"
"github.com/goharbor/harbor/src/replication/ng/config"
"github.com/goharbor/harbor/src/replication/ng/event"
"github.com/goharbor/harbor/src/replication/ng/operation"
"github.com/goharbor/harbor/src/replication/ng/operation/execution"
"github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
task_scheduler "github.com/goharbor/harbor/src/replication/ng/operation/scheduler"
"github.com/goharbor/harbor/src/replication/ng/policy"
"github.com/goharbor/harbor/src/replication/ng/policy/controller"
"github.com/goharbor/harbor/src/replication/ng/registry"
// register the Harbor adapter
@ -31,12 +34,14 @@ import (
)
var (
// PolicyMgr is a global policy manager
PolicyMgr policy.Manager
// PolicyCtl is a global policy controller
PolicyCtl policy.Controller
// RegistryMgr is a global registry manager
RegistryMgr registry.Manager
// OperationCtl is a global operation controller
OperationCtl operation.Controller
// EventHandler handles images/chart pull/push events
EventHandler event.Handler
)
// Init the global variables and configurations
@ -58,13 +63,17 @@ func Init() error {
SecretKey: secretKey,
Secret: cfg.CoreSecret(),
}
// Init registry manager
// TODO use a global http transport
js := job.NewDefaultClient(config.Config.JobserviceURL, config.Config.Secret)
// init registry manager
RegistryMgr = registry.NewDefaultManager()
// init policy manager
PolicyMgr = policy.NewDefaultManager()
// init operatoin controller
// init policy controller
PolicyCtl = controller.NewController(js)
// init operation controller
OperationCtl = operation.NewController(execution.NewDefaultManager(), RegistryMgr,
scheduler.NewScheduler(config.Config.JobserviceURL, config.Config.Secret))
task_scheduler.NewScheduler(js))
// init event handler
EventHandler = event.NewHandler(PolicyCtl, OperationCtl)
log.Debug("the replication initialization completed")
return nil
}

View File

@ -38,7 +38,8 @@ func TestInit(t *testing.T) {
config.InitWithSettings(nil)
err = Init()
require.Nil(t, err)
assert.NotNil(t, PolicyMgr)
assert.NotNil(t, PolicyCtl)
assert.NotNil(t, RegistryMgr)
assert.NotNil(t, OperationCtl)
assert.NotNil(t, EventHandler)
}