mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-02 18:39:05 +01:00
remove topicbus (moved to wps). also have CreateWindow call CreateTab
This commit is contained in:
parent
4641271bfa
commit
fc7c640e6b
@ -1,251 +0,0 @@
|
|||||||
// Copyright 2024, Command Line Inc.
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
|
|
||||||
package topicbus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
MaxMaxEntries = 1000
|
|
||||||
MaxMaxDataLen = 1024 * 1024
|
|
||||||
MaxTotalData = 50 * 1024 * 1024
|
|
||||||
NotifyQueueLen = 100
|
|
||||||
|
|
||||||
DefaultMaxEntries = 1
|
|
||||||
DefaultMaxDataLen = 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
type TopicOpts struct {
|
|
||||||
MaxEntries int
|
|
||||||
MaxDataLen int
|
|
||||||
}
|
|
||||||
|
|
||||||
type Topic struct {
|
|
||||||
ZoneId string
|
|
||||||
Name string
|
|
||||||
Opts TopicOpts
|
|
||||||
Data [][]byte
|
|
||||||
StartPos int
|
|
||||||
Size int
|
|
||||||
DataSize int
|
|
||||||
Subs map[string]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type topicKey struct {
|
|
||||||
ZoneId string
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
type TopicNotify struct {
|
|
||||||
SubscribeId string
|
|
||||||
ZoneId string
|
|
||||||
Name string
|
|
||||||
Data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type Bus struct {
|
|
||||||
lock *sync.Mutex
|
|
||||||
topics map[topicKey]*Topic
|
|
||||||
subsReverse map[string]map[topicKey]bool
|
|
||||||
curDataLen int64
|
|
||||||
notifyCh chan TopicNotify
|
|
||||||
}
|
|
||||||
|
|
||||||
var GlobalBus *Bus
|
|
||||||
|
|
||||||
func InitGlobalBus(notifyCh chan TopicNotify) {
|
|
||||||
GlobalBus = &Bus{
|
|
||||||
lock: &sync.Mutex{},
|
|
||||||
topics: make(map[topicKey]*Topic),
|
|
||||||
subsReverse: make(map[string]map[topicKey]bool),
|
|
||||||
curDataLen: 0,
|
|
||||||
notifyCh: notifyCh,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bus) TopicSubscribe(zoneId string, name string, subscribeId string, opts TopicOpts) (*Topic, error) {
|
|
||||||
if opts.MaxEntries < 0 {
|
|
||||||
return nil, fmt.Errorf("max entries must not be negative")
|
|
||||||
}
|
|
||||||
if opts.MaxDataLen < 0 {
|
|
||||||
return nil, fmt.Errorf("max data length must not be negative")
|
|
||||||
}
|
|
||||||
if opts.MaxEntries == 0 {
|
|
||||||
opts.MaxEntries = DefaultMaxEntries
|
|
||||||
}
|
|
||||||
if opts.MaxDataLen == 0 {
|
|
||||||
opts.MaxDataLen = DefaultMaxDataLen
|
|
||||||
}
|
|
||||||
if opts.MaxEntries > MaxMaxEntries {
|
|
||||||
return nil, fmt.Errorf("max entries exceeds limit")
|
|
||||||
}
|
|
||||||
if opts.MaxDataLen > MaxMaxDataLen {
|
|
||||||
return nil, fmt.Errorf("max data length exceeds limit")
|
|
||||||
}
|
|
||||||
if opts.MaxEntries <= 0 || opts.MaxDataLen <= 0 {
|
|
||||||
return nil, fmt.Errorf("max entries and max data length must be positive")
|
|
||||||
}
|
|
||||||
if subscribeId == "" {
|
|
||||||
return nil, fmt.Errorf("subscribe id must be provided")
|
|
||||||
}
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
if b.curDataLen > MaxTotalData {
|
|
||||||
return nil, fmt.Errorf("total data exceeds limit")
|
|
||||||
}
|
|
||||||
key := topicKey{ZoneId: zoneId, Name: name}
|
|
||||||
topic := b.topics[key]
|
|
||||||
if topic == nil {
|
|
||||||
topic = &Topic{
|
|
||||||
ZoneId: zoneId,
|
|
||||||
Name: name,
|
|
||||||
Opts: opts,
|
|
||||||
Data: make([][]byte, opts.MaxEntries),
|
|
||||||
StartPos: 0,
|
|
||||||
Size: 0,
|
|
||||||
Subs: make(map[string]bool),
|
|
||||||
}
|
|
||||||
b.curDataLen += int64(opts.MaxDataLen)
|
|
||||||
}
|
|
||||||
if topic.Opts.MaxDataLen < opts.MaxDataLen {
|
|
||||||
b.curDataLen += int64(opts.MaxDataLen - topic.Opts.MaxDataLen)
|
|
||||||
topic.Opts.MaxDataLen = opts.MaxDataLen
|
|
||||||
}
|
|
||||||
if topic.Opts.MaxEntries < opts.MaxEntries {
|
|
||||||
topic.Opts.MaxEntries = opts.MaxEntries
|
|
||||||
newData := make([][]byte, opts.MaxEntries)
|
|
||||||
copy(newData, topic.Data)
|
|
||||||
topic.Data = newData
|
|
||||||
}
|
|
||||||
topic.Subs[subscribeId] = true
|
|
||||||
subMap := b.subsReverse[subscribeId]
|
|
||||||
if subMap == nil {
|
|
||||||
subMap = make(map[topicKey]bool)
|
|
||||||
b.subsReverse[subscribeId] = subMap
|
|
||||||
}
|
|
||||||
subMap[key] = true
|
|
||||||
return topic, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Topic) nextIdx(idx int) int {
|
|
||||||
return (t.StartPos + idx) % t.Opts.MaxEntries
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Topic) writePos() int {
|
|
||||||
return (t.StartPos + t.Size) % t.Opts.MaxEntries
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns subscribers to notify
|
|
||||||
func (b *Bus) Publish(zoneId string, name string, data []byte) error {
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
key := topicKey{ZoneId: zoneId, Name: name}
|
|
||||||
topic := b.topics[key]
|
|
||||||
if topic == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if len(data) > topic.Opts.MaxDataLen {
|
|
||||||
return fmt.Errorf("data too large")
|
|
||||||
}
|
|
||||||
if topic.Size < topic.Opts.MaxEntries {
|
|
||||||
topic.Data[topic.writePos()] = data
|
|
||||||
topic.Size++
|
|
||||||
topic.DataSize += len(data)
|
|
||||||
} else {
|
|
||||||
topic.DataSize += len(data) - len(topic.Data[topic.StartPos])
|
|
||||||
topic.Data[topic.StartPos] = data
|
|
||||||
topic.StartPos = topic.nextIdx(topic.StartPos)
|
|
||||||
}
|
|
||||||
// remove data items to make DataSize < MaxDataLen
|
|
||||||
// we know it will fit because len(data) <= MaxDataLen
|
|
||||||
for topic.DataSize > topic.Opts.MaxDataLen {
|
|
||||||
topic.DataSize -= len(topic.Data[topic.StartPos])
|
|
||||||
topic.Data[topic.StartPos] = nil
|
|
||||||
topic.StartPos = topic.nextIdx(topic.StartPos)
|
|
||||||
topic.Size--
|
|
||||||
}
|
|
||||||
for sub := range topic.Subs {
|
|
||||||
// yes, this can block, it will lock then entire bus which will create backpressure
|
|
||||||
// this implementation is good enough for now, but can be improved in the future
|
|
||||||
b.notifyCh <- TopicNotify{
|
|
||||||
SubscribeId: sub,
|
|
||||||
ZoneId: zoneId,
|
|
||||||
Name: name,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Topic) lastN(n int) [][]byte {
|
|
||||||
if t == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if n > t.Size {
|
|
||||||
n = t.Size
|
|
||||||
}
|
|
||||||
data := make([][]byte, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
idx := (t.StartPos + i) % t.Opts.MaxEntries
|
|
||||||
data[i] = t.Data[idx]
|
|
||||||
}
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bus) GetLastN(zoneId string, name string, n int) [][]byte {
|
|
||||||
if n <= 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
key := topicKey{ZoneId: zoneId, Name: name}
|
|
||||||
topic := b.topics[key]
|
|
||||||
return topic.lastN(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bus) GetAll(zoneId string, name string) [][]byte {
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
key := topicKey{ZoneId: zoneId, Name: name}
|
|
||||||
topic := b.topics[key]
|
|
||||||
return topic.lastN(topic.Size)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bus) Subscribe(zoneId string, name string, subscribeId string) {
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
key := topicKey{ZoneId: zoneId, Name: name}
|
|
||||||
topic := b.topics[key]
|
|
||||||
if topic != nil {
|
|
||||||
topic.Subs[subscribeId] = true
|
|
||||||
}
|
|
||||||
subMap := b.subsReverse[subscribeId]
|
|
||||||
if subMap == nil {
|
|
||||||
subMap = make(map[topicKey]bool)
|
|
||||||
b.subsReverse[subscribeId] = subMap
|
|
||||||
}
|
|
||||||
subMap[key] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bus) Unsubscribe(subscribeId string) {
|
|
||||||
b.lock.Lock()
|
|
||||||
defer b.lock.Unlock()
|
|
||||||
subMap := b.subsReverse[subscribeId]
|
|
||||||
if subMap == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for key := range subMap {
|
|
||||||
topic := b.topics[key]
|
|
||||||
if topic == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
delete(topic.Subs, subscribeId)
|
|
||||||
if len(topic.Subs) == 0 {
|
|
||||||
delete(b.topics, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delete(b.subsReverse, subscribeId)
|
|
||||||
}
|
|
@ -320,8 +320,6 @@ func UpdateObjectMeta(ctx context.Context, oref waveobj.ORef, meta MetaMapType)
|
|||||||
func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) {
|
func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) {
|
||||||
windowId := uuid.NewString()
|
windowId := uuid.NewString()
|
||||||
workspaceId := uuid.NewString()
|
workspaceId := uuid.NewString()
|
||||||
tabId := uuid.NewString()
|
|
||||||
layoutNodeId := uuid.NewString()
|
|
||||||
if winSize == nil {
|
if winSize == nil {
|
||||||
winSize = &WinSize{
|
winSize = &WinSize{
|
||||||
Width: 1200,
|
Width: 1200,
|
||||||
@ -331,7 +329,6 @@ func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) {
|
|||||||
window := &Window{
|
window := &Window{
|
||||||
OID: windowId,
|
OID: windowId,
|
||||||
WorkspaceId: workspaceId,
|
WorkspaceId: workspaceId,
|
||||||
ActiveTabId: tabId,
|
|
||||||
ActiveBlockMap: make(map[string]string),
|
ActiveBlockMap: make(map[string]string),
|
||||||
Pos: Point{
|
Pos: Point{
|
||||||
X: 100,
|
X: 100,
|
||||||
@ -344,31 +341,20 @@ func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) {
|
|||||||
return nil, fmt.Errorf("error inserting window: %w", err)
|
return nil, fmt.Errorf("error inserting window: %w", err)
|
||||||
}
|
}
|
||||||
ws := &Workspace{
|
ws := &Workspace{
|
||||||
OID: workspaceId,
|
OID: workspaceId,
|
||||||
Name: "w" + workspaceId[0:8],
|
Name: "w" + workspaceId[0:8],
|
||||||
TabIds: []string{tabId},
|
|
||||||
}
|
}
|
||||||
err = DBInsert(ctx, ws)
|
err = DBInsert(ctx, ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error inserting workspace: %w", err)
|
return nil, fmt.Errorf("error inserting workspace: %w", err)
|
||||||
}
|
}
|
||||||
tab := &Tab{
|
tab, err := CreateTab(ctx, ws.OID, "T1")
|
||||||
OID: tabId,
|
|
||||||
Name: "T1",
|
|
||||||
BlockIds: []string{},
|
|
||||||
LayoutNode: layoutNodeId,
|
|
||||||
}
|
|
||||||
err = DBInsert(ctx, tab)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error inserting tab: %w", err)
|
return nil, fmt.Errorf("error inserting tab: %w", err)
|
||||||
}
|
}
|
||||||
|
err = SetActiveTab(ctx, window.OID, tab.OID)
|
||||||
layoutNode := &LayoutNode{
|
|
||||||
OID: layoutNodeId,
|
|
||||||
}
|
|
||||||
err = DBInsert(ctx, layoutNode)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error inserting layout node: %w", err)
|
return nil, fmt.Errorf("error setting active tab: %w", err)
|
||||||
}
|
}
|
||||||
client, err := DBGetSingleton[*Client](ctx)
|
client, err := DBGetSingleton[*Client](ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user