2024-07-23 23:21:19 +02:00
|
|
|
// Copyright 2024, Command Line Inc.
|
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
|
|
|
// wave pubsub system
|
|
|
|
package wps
|
|
|
|
|
|
|
|
import (
|
2024-07-26 22:30:11 +02:00
|
|
|
"strings"
|
2024-07-23 23:21:19 +02:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
|
2024-07-26 22:30:11 +02:00
|
|
|
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
|
2024-07-23 23:21:19 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// this broker interface is mostly generic
|
|
|
|
// strong typing and event types can be defined elsewhere
|
|
|
|
|
|
|
|
type Client interface {
|
2024-08-14 01:52:35 +02:00
|
|
|
SendEvent(routeId string, event wshrpc.WaveEvent)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
type BrokerSubscription struct {
|
2024-08-14 01:52:35 +02:00
|
|
|
AllSubs []string // routeids subscribed to "all" events
|
|
|
|
ScopeSubs map[string][]string // routeids subscribed to specific scopes
|
|
|
|
StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
|
2024-07-26 22:30:11 +02:00
|
|
|
type BrokerType struct {
|
2024-08-14 01:52:35 +02:00
|
|
|
Lock *sync.Mutex
|
|
|
|
Client Client
|
|
|
|
SubMap map[string]*BrokerSubscription
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
|
2024-07-26 22:30:11 +02:00
|
|
|
var Broker = &BrokerType{
|
2024-08-14 01:52:35 +02:00
|
|
|
Lock: &sync.Mutex{},
|
|
|
|
SubMap: make(map[string]*BrokerSubscription),
|
2024-07-26 22:30:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func scopeHasStarMatch(scope string) bool {
|
|
|
|
parts := strings.Split(scope, ":")
|
|
|
|
for _, part := range parts {
|
|
|
|
if part == "*" || part == "**" {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func (b *BrokerType) SetClient(client Client) {
|
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
|
|
|
b.Client = client
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *BrokerType) GetClient() Client {
|
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
|
|
|
return b.Client
|
|
|
|
}
|
|
|
|
|
2024-08-24 03:12:40 +02:00
|
|
|
// if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)
|
2024-08-14 01:52:35 +02:00
|
|
|
func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) {
|
2024-08-24 03:12:40 +02:00
|
|
|
if sub.Event == "" {
|
|
|
|
return
|
|
|
|
}
|
2024-07-23 23:21:19 +02:00
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
2024-08-24 03:12:40 +02:00
|
|
|
b.unsubscribe_nolock(subRouteId, sub.Event)
|
2024-07-23 23:21:19 +02:00
|
|
|
bs := b.SubMap[sub.Event]
|
|
|
|
if bs == nil {
|
|
|
|
bs = &BrokerSubscription{
|
|
|
|
AllSubs: []string{},
|
|
|
|
ScopeSubs: make(map[string][]string),
|
2024-07-26 22:30:11 +02:00
|
|
|
StarSubs: make(map[string][]string),
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
b.SubMap[sub.Event] = bs
|
|
|
|
}
|
|
|
|
if sub.AllScopes {
|
2024-08-14 01:52:35 +02:00
|
|
|
bs.AllSubs = utilfn.AddElemToSliceUniq(bs.AllSubs, subRouteId)
|
2024-08-24 03:12:40 +02:00
|
|
|
return
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
for _, scope := range sub.Scopes {
|
2024-07-26 22:30:11 +02:00
|
|
|
starMatch := scopeHasStarMatch(scope)
|
|
|
|
if starMatch {
|
2024-08-14 01:52:35 +02:00
|
|
|
addStrToScopeMap(bs.StarSubs, scope, subRouteId)
|
2024-07-26 22:30:11 +02:00
|
|
|
} else {
|
2024-08-14 01:52:35 +02:00
|
|
|
addStrToScopeMap(bs.ScopeSubs, scope, subRouteId)
|
2024-07-26 22:30:11 +02:00
|
|
|
}
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bs *BrokerSubscription) IsEmpty() bool {
|
2024-07-26 22:30:11 +02:00
|
|
|
return len(bs.AllSubs) == 0 && len(bs.ScopeSubs) == 0 && len(bs.StarSubs) == 0
|
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func removeStrFromScopeMap(scopeMap map[string][]string, scope string, routeId string) {
|
2024-07-26 22:30:11 +02:00
|
|
|
scopeSubs := scopeMap[scope]
|
2024-08-14 01:52:35 +02:00
|
|
|
scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId)
|
2024-07-26 22:30:11 +02:00
|
|
|
if len(scopeSubs) == 0 {
|
|
|
|
delete(scopeMap, scope)
|
|
|
|
} else {
|
|
|
|
scopeMap[scope] = scopeSubs
|
|
|
|
}
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func removeStrFromScopeMapAll(scopeMap map[string][]string, routeId string) {
|
2024-07-26 22:30:11 +02:00
|
|
|
for scope, scopeSubs := range scopeMap {
|
2024-08-14 01:52:35 +02:00
|
|
|
scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId)
|
2024-07-26 22:30:11 +02:00
|
|
|
if len(scopeSubs) == 0 {
|
|
|
|
delete(scopeMap, scope)
|
|
|
|
} else {
|
|
|
|
scopeMap[scope] = scopeSubs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func addStrToScopeMap(scopeMap map[string][]string, scope string, routeId string) {
|
2024-07-26 22:30:11 +02:00
|
|
|
scopeSubs := scopeMap[scope]
|
2024-08-14 01:52:35 +02:00
|
|
|
scopeSubs = utilfn.AddElemToSliceUniq(scopeSubs, routeId)
|
2024-07-26 22:30:11 +02:00
|
|
|
scopeMap[scope] = scopeSubs
|
|
|
|
}
|
|
|
|
|
2024-08-24 03:12:40 +02:00
|
|
|
func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) {
|
2024-07-23 23:21:19 +02:00
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
2024-08-24 03:12:40 +02:00
|
|
|
b.unsubscribe_nolock(subRouteId, eventName)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *BrokerType) unsubscribe_nolock(subRouteId string, eventName string) {
|
|
|
|
bs := b.SubMap[eventName]
|
2024-07-23 23:21:19 +02:00
|
|
|
if bs == nil {
|
|
|
|
return
|
|
|
|
}
|
2024-08-24 03:12:40 +02:00
|
|
|
bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId)
|
|
|
|
for scope := range bs.ScopeSubs {
|
|
|
|
removeStrFromScopeMap(bs.ScopeSubs, scope, subRouteId)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
2024-08-24 03:12:40 +02:00
|
|
|
for scope := range bs.StarSubs {
|
|
|
|
removeStrFromScopeMap(bs.StarSubs, scope, subRouteId)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
if bs.IsEmpty() {
|
2024-08-24 03:12:40 +02:00
|
|
|
delete(b.SubMap, eventName)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func (b *BrokerType) UnsubscribeAll(subRouteId string) {
|
2024-07-23 23:21:19 +02:00
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
|
|
|
for eventType, bs := range b.SubMap {
|
2024-08-14 01:52:35 +02:00
|
|
|
bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId)
|
|
|
|
removeStrFromScopeMapAll(bs.StarSubs, subRouteId)
|
|
|
|
removeStrFromScopeMapAll(bs.ScopeSubs, subRouteId)
|
2024-07-23 23:21:19 +02:00
|
|
|
if bs.IsEmpty() {
|
|
|
|
delete(b.SubMap, eventType)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-26 22:30:11 +02:00
|
|
|
func (b *BrokerType) Publish(event wshrpc.WaveEvent) {
|
2024-08-14 01:52:35 +02:00
|
|
|
client := b.GetClient()
|
|
|
|
if client == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
routeIds := b.getMatchingRouteIds(event)
|
|
|
|
for _, routeId := range routeIds {
|
|
|
|
client.SendEvent(routeId, event)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-08-14 01:52:35 +02:00
|
|
|
func (b *BrokerType) getMatchingRouteIds(event wshrpc.WaveEvent) []string {
|
2024-07-23 23:21:19 +02:00
|
|
|
b.Lock.Lock()
|
|
|
|
defer b.Lock.Unlock()
|
|
|
|
bs := b.SubMap[event.Event]
|
|
|
|
if bs == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2024-08-14 01:52:35 +02:00
|
|
|
routeIds := make(map[string]bool)
|
|
|
|
for _, routeId := range bs.AllSubs {
|
|
|
|
routeIds[routeId] = true
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
for _, scope := range event.Scopes {
|
2024-08-14 01:52:35 +02:00
|
|
|
for _, routeId := range bs.ScopeSubs[scope] {
|
|
|
|
routeIds[routeId] = true
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
2024-07-26 22:30:11 +02:00
|
|
|
for starScope := range bs.StarSubs {
|
|
|
|
if utilfn.StarMatchString(starScope, scope, ":") {
|
2024-08-14 01:52:35 +02:00
|
|
|
for _, routeId := range bs.StarSubs[starScope] {
|
|
|
|
routeIds[routeId] = true
|
2024-07-26 22:30:11 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
var rtn []string
|
2024-08-14 01:52:35 +02:00
|
|
|
for routeId := range routeIds {
|
|
|
|
rtn = append(rtn, routeId)
|
2024-07-23 23:21:19 +02:00
|
|
|
}
|
|
|
|
return rtn
|
|
|
|
}
|