mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-06 19:18:22 +01:00
252 lines
5.7 KiB
Go
252 lines
5.7 KiB
Go
|
// Copyright 2024, Command Line Inc.
|
||
|
// SPDX-License-Identifier: Apache-2.0
|
||
|
|
||
|
package topicbus
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
MaxMaxEntries = 1000
|
||
|
MaxMaxDataLen = 1024 * 1024
|
||
|
MaxTotalData = 50 * 1024 * 1024
|
||
|
NotifyQueueLen = 100
|
||
|
|
||
|
DefaultMaxEntries = 1
|
||
|
DefaultMaxDataLen = 1024
|
||
|
)
|
||
|
|
||
|
type TopicOpts struct {
|
||
|
MaxEntries int
|
||
|
MaxDataLen int
|
||
|
}
|
||
|
|
||
|
type Topic struct {
|
||
|
ZoneId string
|
||
|
Name string
|
||
|
Opts TopicOpts
|
||
|
Data [][]byte
|
||
|
StartPos int
|
||
|
Size int
|
||
|
DataSize int
|
||
|
Subs map[string]bool
|
||
|
}
|
||
|
|
||
|
type topicKey struct {
|
||
|
ZoneId string
|
||
|
Name string
|
||
|
}
|
||
|
|
||
|
type TopicNotify struct {
|
||
|
SubscribeId string
|
||
|
ZoneId string
|
||
|
Name string
|
||
|
Data []byte
|
||
|
}
|
||
|
|
||
|
type Bus struct {
|
||
|
lock *sync.Mutex
|
||
|
topics map[topicKey]*Topic
|
||
|
subsReverse map[string]map[topicKey]bool
|
||
|
curDataLen int64
|
||
|
notifyCh chan TopicNotify
|
||
|
}
|
||
|
|
||
|
var GlobalBus *Bus
|
||
|
|
||
|
func InitGlobalBus(notifyCh chan TopicNotify) {
|
||
|
GlobalBus = &Bus{
|
||
|
lock: &sync.Mutex{},
|
||
|
topics: make(map[topicKey]*Topic),
|
||
|
subsReverse: make(map[string]map[topicKey]bool),
|
||
|
curDataLen: 0,
|
||
|
notifyCh: notifyCh,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *Bus) TopicSubscribe(zoneId string, name string, subscribeId string, opts TopicOpts) (*Topic, error) {
|
||
|
if opts.MaxEntries < 0 {
|
||
|
return nil, fmt.Errorf("max entries must not be negative")
|
||
|
}
|
||
|
if opts.MaxDataLen < 0 {
|
||
|
return nil, fmt.Errorf("max data length must not be negative")
|
||
|
}
|
||
|
if opts.MaxEntries == 0 {
|
||
|
opts.MaxEntries = DefaultMaxEntries
|
||
|
}
|
||
|
if opts.MaxDataLen == 0 {
|
||
|
opts.MaxDataLen = DefaultMaxDataLen
|
||
|
}
|
||
|
if opts.MaxEntries > MaxMaxEntries {
|
||
|
return nil, fmt.Errorf("max entries exceeds limit")
|
||
|
}
|
||
|
if opts.MaxDataLen > MaxMaxDataLen {
|
||
|
return nil, fmt.Errorf("max data length exceeds limit")
|
||
|
}
|
||
|
if opts.MaxEntries <= 0 || opts.MaxDataLen <= 0 {
|
||
|
return nil, fmt.Errorf("max entries and max data length must be positive")
|
||
|
}
|
||
|
if subscribeId == "" {
|
||
|
return nil, fmt.Errorf("subscribe id must be provided")
|
||
|
}
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
if b.curDataLen > MaxTotalData {
|
||
|
return nil, fmt.Errorf("total data exceeds limit")
|
||
|
}
|
||
|
key := topicKey{ZoneId: zoneId, Name: name}
|
||
|
topic := b.topics[key]
|
||
|
if topic == nil {
|
||
|
topic = &Topic{
|
||
|
ZoneId: zoneId,
|
||
|
Name: name,
|
||
|
Opts: opts,
|
||
|
Data: make([][]byte, opts.MaxEntries),
|
||
|
StartPos: 0,
|
||
|
Size: 0,
|
||
|
Subs: make(map[string]bool),
|
||
|
}
|
||
|
b.curDataLen += int64(opts.MaxDataLen)
|
||
|
}
|
||
|
if topic.Opts.MaxDataLen < opts.MaxDataLen {
|
||
|
b.curDataLen += int64(opts.MaxDataLen - topic.Opts.MaxDataLen)
|
||
|
topic.Opts.MaxDataLen = opts.MaxDataLen
|
||
|
}
|
||
|
if topic.Opts.MaxEntries < opts.MaxEntries {
|
||
|
topic.Opts.MaxEntries = opts.MaxEntries
|
||
|
newData := make([][]byte, opts.MaxEntries)
|
||
|
copy(newData, topic.Data)
|
||
|
topic.Data = newData
|
||
|
}
|
||
|
topic.Subs[subscribeId] = true
|
||
|
subMap := b.subsReverse[subscribeId]
|
||
|
if subMap == nil {
|
||
|
subMap = make(map[topicKey]bool)
|
||
|
b.subsReverse[subscribeId] = subMap
|
||
|
}
|
||
|
subMap[key] = true
|
||
|
return topic, nil
|
||
|
}
|
||
|
|
||
|
func (t *Topic) nextIdx(idx int) int {
|
||
|
return (t.StartPos + idx) % t.Opts.MaxEntries
|
||
|
}
|
||
|
|
||
|
func (t *Topic) writePos() int {
|
||
|
return (t.StartPos + t.Size) % t.Opts.MaxEntries
|
||
|
}
|
||
|
|
||
|
// returns subscribers to notify
|
||
|
func (b *Bus) Publish(zoneId string, name string, data []byte) error {
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
key := topicKey{ZoneId: zoneId, Name: name}
|
||
|
topic := b.topics[key]
|
||
|
if topic == nil {
|
||
|
return nil
|
||
|
}
|
||
|
if len(data) > topic.Opts.MaxDataLen {
|
||
|
return fmt.Errorf("data too large")
|
||
|
}
|
||
|
if topic.Size < topic.Opts.MaxEntries {
|
||
|
topic.Data[topic.writePos()] = data
|
||
|
topic.Size++
|
||
|
topic.DataSize += len(data)
|
||
|
} else {
|
||
|
topic.DataSize += len(data) - len(topic.Data[topic.StartPos])
|
||
|
topic.Data[topic.StartPos] = data
|
||
|
topic.StartPos = topic.nextIdx(topic.StartPos)
|
||
|
}
|
||
|
// remove data items to make DataSize < MaxDataLen
|
||
|
// we know it will fit because len(data) <= MaxDataLen
|
||
|
for topic.DataSize > topic.Opts.MaxDataLen {
|
||
|
topic.DataSize -= len(topic.Data[topic.StartPos])
|
||
|
topic.Data[topic.StartPos] = nil
|
||
|
topic.StartPos = topic.nextIdx(topic.StartPos)
|
||
|
topic.Size--
|
||
|
}
|
||
|
for sub := range topic.Subs {
|
||
|
// yes, this can block, it will lock then entire bus which will create backpressure
|
||
|
// this implementation is good enough for now, but can be improved in the future
|
||
|
b.notifyCh <- TopicNotify{
|
||
|
SubscribeId: sub,
|
||
|
ZoneId: zoneId,
|
||
|
Name: name,
|
||
|
Data: data,
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (t *Topic) lastN(n int) [][]byte {
|
||
|
if t == nil {
|
||
|
return nil
|
||
|
}
|
||
|
if n > t.Size {
|
||
|
n = t.Size
|
||
|
}
|
||
|
data := make([][]byte, n)
|
||
|
for i := 0; i < n; i++ {
|
||
|
idx := (t.StartPos + i) % t.Opts.MaxEntries
|
||
|
data[i] = t.Data[idx]
|
||
|
}
|
||
|
return data
|
||
|
}
|
||
|
|
||
|
func (b *Bus) GetLastN(zoneId string, name string, n int) [][]byte {
|
||
|
if n <= 0 {
|
||
|
return nil
|
||
|
}
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
key := topicKey{ZoneId: zoneId, Name: name}
|
||
|
topic := b.topics[key]
|
||
|
return topic.lastN(n)
|
||
|
}
|
||
|
|
||
|
func (b *Bus) GetAll(zoneId string, name string) [][]byte {
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
key := topicKey{ZoneId: zoneId, Name: name}
|
||
|
topic := b.topics[key]
|
||
|
return topic.lastN(topic.Size)
|
||
|
}
|
||
|
|
||
|
func (b *Bus) Subscribe(zoneId string, name string, subscribeId string) {
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
key := topicKey{ZoneId: zoneId, Name: name}
|
||
|
topic := b.topics[key]
|
||
|
if topic != nil {
|
||
|
topic.Subs[subscribeId] = true
|
||
|
}
|
||
|
subMap := b.subsReverse[subscribeId]
|
||
|
if subMap == nil {
|
||
|
subMap = make(map[topicKey]bool)
|
||
|
b.subsReverse[subscribeId] = subMap
|
||
|
}
|
||
|
subMap[key] = true
|
||
|
}
|
||
|
|
||
|
func (b *Bus) Unsubscribe(subscribeId string) {
|
||
|
b.lock.Lock()
|
||
|
defer b.lock.Unlock()
|
||
|
subMap := b.subsReverse[subscribeId]
|
||
|
if subMap == nil {
|
||
|
return
|
||
|
}
|
||
|
for key := range subMap {
|
||
|
topic := b.topics[key]
|
||
|
if topic == nil {
|
||
|
continue
|
||
|
}
|
||
|
delete(topic.Subs, subscribeId)
|
||
|
if len(topic.Subs) == 0 {
|
||
|
delete(b.topics, key)
|
||
|
}
|
||
|
}
|
||
|
delete(b.subsReverse, subscribeId)
|
||
|
}
|