// Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 // wave pubsub system package wps import ( "strings" "sync" "github.com/wavetermdev/thenextwave/pkg/util/utilfn" "github.com/wavetermdev/thenextwave/pkg/wshrpc" ) // this broker interface is mostly generic // strong typing and event types can be defined elsewhere type Client interface { SendEvent(routeId string, event wshrpc.WaveEvent) } type BrokerSubscription struct { AllSubs []string // routeids subscribed to "all" events ScopeSubs map[string][]string // routeids subscribed to specific scopes StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them) } type BrokerType struct { Lock *sync.Mutex Client Client SubMap map[string]*BrokerSubscription } var Broker = &BrokerType{ Lock: &sync.Mutex{}, SubMap: make(map[string]*BrokerSubscription), } func scopeHasStarMatch(scope string) bool { parts := strings.Split(scope, ":") for _, part := range parts { if part == "*" || part == "**" { return true } } return false } func (b *BrokerType) SetClient(client Client) { b.Lock.Lock() defer b.Lock.Unlock() b.Client = client } func (b *BrokerType) GetClient() Client { b.Lock.Lock() defer b.Lock.Unlock() return b.Client } // if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one) func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) { if sub.Event == "" { return } b.Lock.Lock() defer b.Lock.Unlock() b.unsubscribe_nolock(subRouteId, sub.Event) bs := b.SubMap[sub.Event] if bs == nil { bs = &BrokerSubscription{ AllSubs: []string{}, ScopeSubs: make(map[string][]string), StarSubs: make(map[string][]string), } b.SubMap[sub.Event] = bs } if sub.AllScopes { bs.AllSubs = utilfn.AddElemToSliceUniq(bs.AllSubs, subRouteId) return } for _, scope := range sub.Scopes { starMatch := scopeHasStarMatch(scope) if starMatch { addStrToScopeMap(bs.StarSubs, scope, subRouteId) } else { addStrToScopeMap(bs.ScopeSubs, scope, subRouteId) } } } func (bs *BrokerSubscription) IsEmpty() bool { return len(bs.AllSubs) == 0 && len(bs.ScopeSubs) == 0 && len(bs.StarSubs) == 0 } func removeStrFromScopeMap(scopeMap map[string][]string, scope string, routeId string) { scopeSubs := scopeMap[scope] scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId) if len(scopeSubs) == 0 { delete(scopeMap, scope) } else { scopeMap[scope] = scopeSubs } } func removeStrFromScopeMapAll(scopeMap map[string][]string, routeId string) { for scope, scopeSubs := range scopeMap { scopeSubs = utilfn.RemoveElemFromSlice(scopeSubs, routeId) if len(scopeSubs) == 0 { delete(scopeMap, scope) } else { scopeMap[scope] = scopeSubs } } } func addStrToScopeMap(scopeMap map[string][]string, scope string, routeId string) { scopeSubs := scopeMap[scope] scopeSubs = utilfn.AddElemToSliceUniq(scopeSubs, routeId) scopeMap[scope] = scopeSubs } func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) { b.Lock.Lock() defer b.Lock.Unlock() b.unsubscribe_nolock(subRouteId, eventName) } func (b *BrokerType) unsubscribe_nolock(subRouteId string, eventName string) { bs := b.SubMap[eventName] if bs == nil { return } bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId) for scope := range bs.ScopeSubs { removeStrFromScopeMap(bs.ScopeSubs, scope, subRouteId) } for scope := range bs.StarSubs { removeStrFromScopeMap(bs.StarSubs, scope, subRouteId) } if bs.IsEmpty() { delete(b.SubMap, eventName) } } func (b *BrokerType) UnsubscribeAll(subRouteId string) { b.Lock.Lock() defer b.Lock.Unlock() for eventType, bs := range b.SubMap { bs.AllSubs = utilfn.RemoveElemFromSlice(bs.AllSubs, subRouteId) removeStrFromScopeMapAll(bs.StarSubs, subRouteId) removeStrFromScopeMapAll(bs.ScopeSubs, subRouteId) if bs.IsEmpty() { delete(b.SubMap, eventType) } } } func (b *BrokerType) Publish(event wshrpc.WaveEvent) { client := b.GetClient() if client == nil { return } routeIds := b.getMatchingRouteIds(event) for _, routeId := range routeIds { client.SendEvent(routeId, event) } } func (b *BrokerType) getMatchingRouteIds(event wshrpc.WaveEvent) []string { b.Lock.Lock() defer b.Lock.Unlock() bs := b.SubMap[event.Event] if bs == nil { return nil } routeIds := make(map[string]bool) for _, routeId := range bs.AllSubs { routeIds[routeId] = true } for _, scope := range event.Scopes { for _, routeId := range bs.ScopeSubs[scope] { routeIds[routeId] = true } for starScope := range bs.StarSubs { if utilfn.StarMatchString(starScope, scope, ":") { for _, routeId := range bs.StarSubs[starScope] { routeIds[routeId] = true } } } } var rtn []string for routeId := range routeIds { rtn = append(rtn, routeId) } return rtn }