Merge pull request #3705 from ywk253100/171127_subscribe

Publish replication notification for manual, schedule and immediate trigger
This commit is contained in:
Wenkai Yin 2017-12-04 17:19:21 +08:00 committed by GitHub
commit 075fab93c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 458 additions and 208 deletions

View File

@ -1,13 +0,0 @@
package notifier
import (
"github.com/vmware/harbor/src/replication/event"
)
//Subscribe related topics
func init() {
//Listen the related event topics
Subscribe(event.StartReplicationTopic, &event.StartReplicationHandler{})
Subscribe(event.ReplicationEventTopicOnPush, &event.OnPushHandler{})
Subscribe(event.ReplicationEventTopicOnDeletion, &event.OnDeletionHandler{})
}

View File

@ -0,0 +1,31 @@
package replication
import (
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/replication/event/notification"
"github.com/vmware/harbor/src/replication/event/topic"
)
//Task is the task for triggering one replication
type Task struct {
PolicyID int64
}
//NewTask is constructor of creating ReplicationTask
func NewTask(policyID int64) *Task {
return &Task{
PolicyID: policyID,
}
}
//Name returns the name of this task
func (t *Task) Name() string {
return "replication"
}
//Run the actions here
func (t *Task) Run() error {
return notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
PolicyID: t.PolicyID,
})
}

View File

@ -1,9 +1,9 @@
package task package replication
import "testing" import "testing"
func TestReplicationTask(t *testing.T) { func TestTask(t *testing.T) {
tk := NewReplicationTask() tk := NewTask(1)
if tk == nil { if tk == nil {
t.Fail() t.Fail()
} }

View File

@ -1,24 +0,0 @@
package task
import (
"errors"
)
//ReplicationTask is the task for triggering one replication
type ReplicationTask struct{}
//NewReplicationTask is constructor of creating ReplicationTask
func NewReplicationTask() *ReplicationTask {
return &ReplicationTask{}
}
//Name returns the name of this task
func (rt *ReplicationTask) Name() string {
return "replication"
}
//Run the actions here
func (rt *ReplicationTask) Run() error {
//Trigger the replication here
return errors.New("Not implemented")
}

View File

@ -0,0 +1,65 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"github.com/vmware/harbor/src/common/models"
)
// FakeWatchItemDAO is the fake implement for the dao.WatchItemDAO
type FakeWatchItemDAO struct {
items []models.WatchItem
}
// Add ...
func (f *FakeWatchItemDAO) Add(item *models.WatchItem) (int64, error) {
f.items = append(f.items, *item)
return int64(len(f.items) + 1), nil
}
// DeleteByPolicyID : delete the WatchItem specified by policy ID
func (f *FakeWatchItemDAO) DeleteByPolicyID(policyID int64) error {
for i, item := range f.items {
if item.PolicyID == policyID {
f.items = append(f.items[:i], f.items[i+1:]...)
break
}
}
return nil
}
// Get returns WatchItem list according to the namespace and operation
func (f *FakeWatchItemDAO) Get(namespace, operation string) ([]models.WatchItem, error) {
items := []models.WatchItem{}
for _, item := range f.items {
if item.Namespace != namespace {
continue
}
if operation == "push" {
if item.OnPush {
items = append(items, item)
}
}
if operation == "delete" {
if item.OnDeletion {
items = append(items, item)
}
}
}
return items, nil
}

View File

@ -22,4 +22,9 @@ const (
TriggerScheduleDaily = "daily" TriggerScheduleDaily = "daily"
//TriggerScheduleWeekly : type of scheduling is 'weekly' //TriggerScheduleWeekly : type of scheduling is 'weekly'
TriggerScheduleWeekly = "weekly" TriggerScheduleWeekly = "weekly"
//OperationPush : operation for pushing images
OperationPush = "push"
//OperationDelete : operation for deleting images
OperationDelete = "delete"
) )

View File

@ -135,17 +135,19 @@ func (ctl *Controller) UpdatePolicy(updatedPolicy models.ReplicationPolicy) erro
} }
} }
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
return err
}
if reset { if reset {
if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil { if err = ctl.triggerManager.UnsetTrigger(id, *originPolicy.Trigger); err != nil {
return err return err
} }
if err = ctl.policyManager.UpdatePolicy(updatedPolicy); err != nil {
return err
}
return ctl.triggerManager.SetupTrigger(&updatedPolicy) return ctl.triggerManager.SetupTrigger(&updatedPolicy)
} }
return ctl.policyManager.UpdatePolicy(updatedPolicy) return nil
} }
//RemovePolicy will remove the specified policy and clean the related settings //RemovePolicy will remove the specified policy and clean the related settings
@ -180,9 +182,9 @@ func (ctl *Controller) GetPolicies(query models.QueryParameter) ([]models.Replic
//Replicate starts one replication defined in the specified policy; //Replicate starts one replication defined in the specified policy;
//Can be launched by the API layer and related triggers. //Can be launched by the API layer and related triggers.
func (ctl *Controller) Replicate(policyID int64, item ...*models.FilterItem) error { func (ctl *Controller) Replicate(policyID int64, metadate ...map[string]interface{}) error {
fmt.Printf("replicating %d ...\n", policyID) fmt.Printf("replicating %d, metadata: %v ...\n", policyID, metadate)
return nil return nil
} }

View File

@ -0,0 +1,39 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/event/topic"
)
//Subscribe related topics
func init() {
//Listen the related event topics
handlers := map[string]notifier.NotificationHandler{
topic.StartReplicationTopic: &StartReplicationHandler{},
topic.ReplicationEventTopicOnPush: &OnPushHandler{},
topic.ReplicationEventTopicOnDeletion: &OnDeletionHandler{},
}
for topic, handler := range handlers {
if err := notifier.Subscribe(topic, handler); err != nil {
log.Errorf("failed to subscribe topic %s: %v", topic, err)
continue
}
log.Debugf("topic %s is subscribed", topic)
}
}

View File

@ -0,0 +1,34 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notification
//OnPushNotification contains the data required by this handler
type OnPushNotification struct {
//The name of the image that is being pushed
Image string
}
//OnDeletionNotification contains the data required by this handler
type OnDeletionNotification struct {
//The name of the image that is being deleted
Image string
}
//StartReplicationNotification contains data required by this handler
type StartReplicationNotification struct {
//ID of the policy
PolicyID int64
Metadata map[string]interface{}
}

View File

@ -1,3 +1,17 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event package event
import ( import (
@ -5,19 +19,13 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/event/notification"
) )
//OnDeletionHandler implements the notification handler interface to handle image on push event. //OnDeletionHandler implements the notification handler interface to handle image on push event.
type OnDeletionHandler struct{} type OnDeletionHandler struct{}
//OnDeletionNotification contains the data required by this handler
type OnDeletionNotification struct {
//The name of the project where the being pushed images are located
ProjectName string
}
//Handle implements the same method of notification handler interface //Handle implements the same method of notification handler interface
func (oph *OnDeletionHandler) Handle(value interface{}) error { func (oph *OnDeletionHandler) Handle(value interface{}) error {
if value == nil { if value == nil {
@ -25,32 +33,12 @@ func (oph *OnDeletionHandler) Handle(value interface{}) error {
} }
vType := reflect.TypeOf(value) vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "event.OnDeletionNotification" { if vType.Kind() != reflect.Struct || vType.String() != "notification.OnDeletionNotification" {
return fmt.Errorf("Mismatch value type of OnDeletionHandler, expect %s but got %s", "event.OnDeletionNotification", vType.String()) return fmt.Errorf("Mismatch value type of OnDeletionHandler, expect %s but got %s", "notification.OnDeletionNotification", vType.String())
} }
notification := value.(OnDeletionNotification) notification := value.(notification.OnDeletionNotification)
//TODO:Call projectManager to get the projectID return checkAndTriggerReplication(notification.Image, replication.OperationDelete)
fmt.Println(notification.ProjectName)
query := models.QueryParameter{
ProjectID: 0,
}
policies, err := core.DefaultController.GetPolicies(query)
if err != nil {
return err
}
if policies != nil && len(policies) > 0 {
for _, p := range policies {
//Error accumulated and then return?
if err := core.DefaultController.Replicate(p.ID); err != nil {
//TODO:Log error
fmt.Println(err.Error())
}
}
}
return nil
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface

View File

@ -0,0 +1,43 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/replication/event/notification"
)
func TestHandleOfOnDeletionHandler(t *testing.T) {
dao.DefaultDatabaseWatchItemDAO = &test.FakeWatchItemDAO{}
handler := &OnDeletionHandler{}
assert.NotNil(t, handler.Handle(nil))
assert.NotNil(t, handler.Handle(map[string]string{}))
assert.NotNil(t, handler.Handle(struct{}{}))
assert.Nil(t, handler.Handle(notification.OnDeletionNotification{
Image: "library/hello-world:latest",
}))
}
func TestIsStatefulOfOnDeletionHandler(t *testing.T) {
handler := &OnDeletionHandler{}
assert.False(t, handler.IsStateful())
}

View File

@ -1,3 +1,17 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event package event
import ( import (
@ -5,19 +19,19 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/event/notification"
"github.com/vmware/harbor/src/replication/event/topic"
"github.com/vmware/harbor/src/replication/models" "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/trigger"
) )
//OnPushHandler implements the notification handler interface to handle image on push event. //OnPushHandler implements the notification handler interface to handle image on push event.
type OnPushHandler struct{} type OnPushHandler struct{}
//OnPushNotification contains the data required by this handler
type OnPushNotification struct {
//The ID of the project where the being pushed images are located
ProjectID int
}
//Handle implements the same method of notification handler interface //Handle implements the same method of notification handler interface
func (oph *OnPushHandler) Handle(value interface{}) error { func (oph *OnPushHandler) Handle(value interface{}) error {
if value == nil { if value == nil {
@ -25,31 +39,13 @@ func (oph *OnPushHandler) Handle(value interface{}) error {
} }
vType := reflect.TypeOf(value) vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "event.OnPushNotification" { if vType.Kind() != reflect.Struct || vType.String() != "notification.OnPushNotification" {
return fmt.Errorf("Mismatch value type of OnPushHandler, expect %s but got %s", "event.OnPushNotification", vType.String()) return fmt.Errorf("Mismatch value type of OnPushHandler, expect %s but got %s", "notification.OnPushNotification", vType.String())
} }
notification := value.(OnDeletionNotification) notification := value.(notification.OnPushNotification)
//TODO:Call projectManager to get the projectID
fmt.Println(notification.ProjectName)
query := models.QueryParameter{
ProjectID: 0,
}
policies, err := core.DefaultController.GetPolicies(query) return checkAndTriggerReplication(notification.Image, replication.OperationPush)
if err != nil {
return err
}
if policies != nil && len(policies) > 0 {
for _, p := range policies {
if err := core.DefaultController.Replicate(p.ID); err != nil {
//TODO:Log error
fmt.Println(err.Error())
}
}
}
return nil
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface
@ -57,3 +53,40 @@ func (oph *OnPushHandler) IsStateful() bool {
//Statless //Statless
return false return false
} }
// checks whether replication policy is set on the resource, if is, trigger the replication
func checkAndTriggerReplication(image, operation string) error {
project, _ := utils.ParseRepository(image)
watchItems, err := trigger.DefaultWatchList.Get(project, operation)
if err != nil {
return fmt.Errorf("failed to get watch list for resource %s, operation %s: %v",
image, operation, err)
}
if len(watchItems) == 0 {
log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation)
return nil
}
for _, watchItem := range watchItems {
item := &models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: image,
Metadata: map[string]interface{}{
"operation": operation,
},
}
if err := notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
PolicyID: watchItem.PolicyID,
Metadata: map[string]interface{}{
"": []*models.FilterItem{item},
},
}); err != nil {
return fmt.Errorf("failed to publish replication topic for resource %s, operation %s, policy %d: %v",
image, operation, watchItem.PolicyID, err)
}
log.Infof("replication topic for resource %s, operation %s, policy %d triggered",
image, operation, watchItem.PolicyID)
}
return nil
}

View File

@ -0,0 +1,43 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/utils/test"
"github.com/vmware/harbor/src/replication/event/notification"
)
func TestHandleOfOnPushHandler(t *testing.T) {
dao.DefaultDatabaseWatchItemDAO = &test.FakeWatchItemDAO{}
handler := &OnPushHandler{}
assert.NotNil(t, handler.Handle(nil))
assert.NotNil(t, handler.Handle(map[string]string{}))
assert.NotNil(t, handler.Handle(struct{}{}))
assert.Nil(t, handler.Handle(notification.OnPushNotification{
Image: "library/hello-world:latest",
}))
}
func TestIsStatefulOfOnPushHandler(t *testing.T) {
handler := &OnPushHandler{}
assert.False(t, handler.IsStateful())
}

View File

@ -1,3 +1,17 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event package event
import ( import (
@ -6,17 +20,12 @@ import (
"reflect" "reflect"
"github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/replication/core"
"github.com/vmware/harbor/src/replication/event/notification"
) )
//StartReplicationHandler implements the notification handler interface to handle start replication requests. //StartReplicationHandler implements the notification handler interface to handle start replication requests.
type StartReplicationHandler struct{} type StartReplicationHandler struct{}
//StartReplicationNotification contains data required by this handler
type StartReplicationNotification struct {
//ID of the policy
PolicyID int64
}
//Handle implements the same method of notification handler interface //Handle implements the same method of notification handler interface
func (srh *StartReplicationHandler) Handle(value interface{}) error { func (srh *StartReplicationHandler) Handle(value interface{}) error {
if value == nil { if value == nil {
@ -24,18 +33,18 @@ func (srh *StartReplicationHandler) Handle(value interface{}) error {
} }
vType := reflect.TypeOf(value) vType := reflect.TypeOf(value)
if vType.Kind() != reflect.Struct || vType.String() != "core.StartReplicationNotification" { if vType.Kind() != reflect.Struct || vType.String() != "notification.StartReplicationNotification" {
return fmt.Errorf("Mismatch value type of StartReplicationHandler, expect %s but got %s", "core.StartReplicationNotification", vType.String()) return fmt.Errorf("Mismatch value type of StartReplicationHandler, expect %s but got %s", "notification.StartReplicationNotification", vType.String())
} }
notification := value.(StartReplicationNotification) notification := value.(notification.StartReplicationNotification)
if notification.PolicyID <= 0 { if notification.PolicyID <= 0 {
return errors.New("Invalid policy") return errors.New("Invalid policy")
} }
//Start replication //Start replication
//TODO:
return core.DefaultController.Replicate(notification.PolicyID) return core.DefaultController.Replicate(notification.PolicyID, notification.Metadata)
} }
//IsStateful implements the same method of notification handler interface //IsStateful implements the same method of notification handler interface

View File

@ -0,0 +1,41 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/vmware/harbor/src/replication/event/notification"
)
func TestHandle(t *testing.T) {
handler := &StartReplicationHandler{}
assert.NotNil(t, handler.Handle(nil))
assert.NotNil(t, handler.Handle(map[string]string{}))
assert.NotNil(t, handler.Handle(struct{}{}))
assert.NotNil(t, handler.Handle(notification.StartReplicationNotification{
PolicyID: -1,
}))
assert.Nil(t, handler.Handle(notification.StartReplicationNotification{
PolicyID: 1,
}))
}
func TestIsStateful(t *testing.T) {
handler := &StartReplicationHandler{}
assert.False(t, handler.IsStateful())
}

View File

@ -1,4 +1,4 @@
package event package topic
const ( const (
//ReplicationEventTopicOnPush : OnPush event //ReplicationEventTopicOnPush : OnPush event

View File

@ -5,7 +5,7 @@ import (
"github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/common/scheduler"
"github.com/vmware/harbor/src/common/scheduler/policy" "github.com/vmware/harbor/src/common/scheduler/policy"
"github.com/vmware/harbor/src/common/scheduler/task" replication_task "github.com/vmware/harbor/src/common/scheduler/task/replication"
"github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication"
) )
@ -42,7 +42,7 @@ func (st *ScheduleTrigger) Setup() error {
} }
schedulePolicy := policy.NewAlternatePolicy(assembleName(st.params.PolicyID), config) schedulePolicy := policy.NewAlternatePolicy(assembleName(st.params.PolicyID), config)
attachTask := task.NewReplicationTask() attachTask := replication_task.NewTask(st.params.PolicyID)
schedulePolicy.AttachTasks(attachTask) schedulePolicy.AttachTasks(attachTask)
return scheduler.DefaultScheduler.Schedule(schedulePolicy) return scheduler.DefaultScheduler.Schedule(schedulePolicy)
} }

View File

@ -20,55 +20,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/utils/test"
) )
type fakeWatchItemDAO struct {
items []models.WatchItem
}
func (f *fakeWatchItemDAO) Add(item *models.WatchItem) (int64, error) {
f.items = append(f.items, *item)
return int64(len(f.items) + 1), nil
}
// Delete the WatchItem specified by policy ID
func (f *fakeWatchItemDAO) DeleteByPolicyID(policyID int64) error {
for i, item := range f.items {
if item.PolicyID == policyID {
f.items = append(f.items[:i], f.items[i+1:]...)
break
}
}
return nil
}
// Get returns WatchItem list according to the namespace and operation
func (f *fakeWatchItemDAO) Get(namespace, operation string) ([]models.WatchItem, error) {
items := []models.WatchItem{}
for _, item := range f.items {
if item.Namespace != namespace {
continue
}
if operation == "push" {
if item.OnPush {
items = append(items, item)
}
}
if operation == "delete" {
if item.OnDeletion {
items = append(items, item)
}
}
}
return items, nil
}
func TestMethodsOfWatchList(t *testing.T) { func TestMethodsOfWatchList(t *testing.T) {
dao.DefaultDatabaseWatchItemDAO = &fakeWatchItemDAO{} dao.DefaultDatabaseWatchItemDAO = &test.FakeWatchItemDAO{}
var policyID int64 = 1 var policyID int64 = 1

View File

@ -40,6 +40,7 @@ import (
"github.com/dghubble/sling" "github.com/dghubble/sling"
//for test env prepare //for test env prepare
_ "github.com/vmware/harbor/src/replication/event"
_ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/db"
_ "github.com/vmware/harbor/src/ui/auth/ldap" _ "github.com/vmware/harbor/src/ui/auth/ldap"
) )

View File

@ -17,7 +17,11 @@ package api
import ( import (
"fmt" "fmt"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/replication/core"
"github.com/vmware/harbor/src/replication/event/notification"
"github.com/vmware/harbor/src/replication/event/topic"
"github.com/vmware/harbor/src/ui/api/models" "github.com/vmware/harbor/src/ui/api/models"
) )
@ -56,8 +60,11 @@ func (r *ReplicationAPI) Post() {
return return
} }
if err = core.DefaultController.Replicate(replication.PolicyID); err != nil { if err = notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{
r.HandleInternalServerError(fmt.Sprintf("failed to trigger the replication policy %d: %v", replication.PolicyID, err)) PolicyID: replication.PolicyID,
}); err != nil {
r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err))
return return
} }
log.Infof("replication topic for policy %d triggered", replication.PolicyID)
} }

View File

@ -26,12 +26,15 @@ import (
"github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/manifest/schema2"
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/clair" "github.com/vmware/harbor/src/common/utils/clair"
registry_error "github.com/vmware/harbor/src/common/utils/error" registry_error "github.com/vmware/harbor/src/common/utils/error"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/notary" "github.com/vmware/harbor/src/common/utils/notary"
"github.com/vmware/harbor/src/common/utils/registry" "github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/replication/event/notification"
"github.com/vmware/harbor/src/replication/event/topic"
"github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/config"
uiutils "github.com/vmware/harbor/src/ui/utils" uiutils "github.com/vmware/harbor/src/ui/utils"
) )
@ -255,7 +258,17 @@ func (ra *RepositoryAPI) Delete() {
} }
log.Infof("delete tag: %s:%s", repoName, t) log.Infof("delete tag: %s:%s", repoName, t)
go CheckAndTriggerReplication(repoName+":"+t, "delete") go func() {
image := repoName + ":" + t
err := notifier.Publish(topic.ReplicationEventTopicOnDeletion, notification.OnDeletionNotification{
Image: image,
})
if err != nil {
log.Errorf("failed to publish on deletion topic for resource %s: %v", image, err)
return
}
log.Debugf("the on deletion topic for resource %s published", image)
}()
go func(tag string) { go func(tag string) {
if err := dao.AddAccessLog(models.AccessLog{ if err := dao.AddAccessLog(models.AccessLog{

View File

@ -32,10 +32,6 @@ import (
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/common/utils/registry" "github.com/vmware/harbor/src/common/utils/registry"
"github.com/vmware/harbor/src/common/utils/registry/auth" "github.com/vmware/harbor/src/common/utils/registry/auth"
"github.com/vmware/harbor/src/replication"
"github.com/vmware/harbor/src/replication/core"
rep_models "github.com/vmware/harbor/src/replication/models"
"github.com/vmware/harbor/src/replication/trigger"
"github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/config"
"github.com/vmware/harbor/src/ui/promgr" "github.com/vmware/harbor/src/ui/promgr"
"github.com/vmware/harbor/src/ui/service/token" "github.com/vmware/harbor/src/ui/service/token"
@ -81,39 +77,6 @@ func checkUserExists(name string) int {
return 0 return 0
} }
// CheckAndTriggerReplication checks whether replication policy is set
// on the resource, if is, trigger the replication
func CheckAndTriggerReplication(image, operation string) {
project, _ := utils.ParseRepository(image)
watchItems, err := trigger.DefaultWatchList.Get(project, operation)
if err != nil {
log.Errorf("failed to get watch list for resource %s, operation %s: %v", image, operation, err)
return
}
if len(watchItems) == 0 {
log.Debugf("no replication should be triggered for resource %s, operation %s, skip", image, operation)
return
}
for _, watchItem := range watchItems {
// TODO define a new type ReplicationItem to wrap FilterItem and operation.
// Maybe change the FilterItem to interface and define a type Resource to
// implement FilterItem is better?
item := &rep_models.FilterItem{
Kind: replication.FilterItemKindTag,
Value: image,
Metadata: map[string]interface{}{
"operation": operation,
},
}
if err := core.DefaultController.Replicate(watchItem.PolicyID, item); err != nil {
log.Errorf("failed to trigger replication for resource: %s, operation: %s: %v", image, operation, err)
return
}
log.Infof("replication for resource: %s, operation: %s triggered", image, operation)
}
}
// TriggerReplication triggers the replication according to the policy // TriggerReplication triggers the replication according to the policy
// TODO remove // TODO remove
func TriggerReplication(policyID int64, repository string, func TriggerReplication(policyID int64, repository string,

View File

@ -29,6 +29,7 @@ import (
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/common/scheduler"
_ "github.com/vmware/harbor/src/replication/event"
"github.com/vmware/harbor/src/ui/api" "github.com/vmware/harbor/src/ui/api"
_ "github.com/vmware/harbor/src/ui/auth/db" _ "github.com/vmware/harbor/src/ui/auth/db"
_ "github.com/vmware/harbor/src/ui/auth/ldap" _ "github.com/vmware/harbor/src/ui/auth/ldap"

View File

@ -23,8 +23,11 @@ import (
"github.com/vmware/harbor/src/common/dao" "github.com/vmware/harbor/src/common/dao"
clairdao "github.com/vmware/harbor/src/common/dao/clair" clairdao "github.com/vmware/harbor/src/common/dao/clair"
"github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/notifier"
"github.com/vmware/harbor/src/common/utils" "github.com/vmware/harbor/src/common/utils"
"github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/common/utils/log"
rep_notification "github.com/vmware/harbor/src/replication/event/notification"
"github.com/vmware/harbor/src/replication/event/topic"
"github.com/vmware/harbor/src/ui/api" "github.com/vmware/harbor/src/ui/api"
"github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/config"
uiutils "github.com/vmware/harbor/src/ui/utils" uiutils "github.com/vmware/harbor/src/ui/utils"
@ -104,7 +107,17 @@ func (n *NotificationHandler) Post() {
} }
}() }()
go api.CheckAndTriggerReplication(repository+":"+tag, "push") go func() {
image := repository + ":" + tag
err := notifier.Publish(topic.ReplicationEventTopicOnPush, rep_notification.OnPushNotification{
Image: image,
})
if err != nil {
log.Errorf("failed to publish on push topic for resource %s: %v", image, err)
return
}
log.Debugf("the on push topic for resource %s published", image)
}()
if autoScanEnabled(pro) { if autoScanEnabled(pro) {
last, err := clairdao.GetLastUpdate() last, err := clairdao.GetLastUpdate()