From fc7c640e6b66726c756c74d5b86dccb1767cc5d7 Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 8 Aug 2024 14:51:40 -0700 Subject: [PATCH] remove topicbus (moved to wps). also have CreateWindow call CreateTab --- pkg/topicbus/topicbus.go | 251 --------------------------------------- pkg/wstore/wstore.go | 24 +--- 2 files changed, 5 insertions(+), 270 deletions(-) delete mode 100644 pkg/topicbus/topicbus.go diff --git a/pkg/topicbus/topicbus.go b/pkg/topicbus/topicbus.go deleted file mode 100644 index bb02e013b..000000000 --- a/pkg/topicbus/topicbus.go +++ /dev/null @@ -1,251 +0,0 @@ -// Copyright 2024, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -package topicbus - -import ( - "fmt" - "sync" -) - -const ( - MaxMaxEntries = 1000 - MaxMaxDataLen = 1024 * 1024 - MaxTotalData = 50 * 1024 * 1024 - NotifyQueueLen = 100 - - DefaultMaxEntries = 1 - DefaultMaxDataLen = 1024 -) - -type TopicOpts struct { - MaxEntries int - MaxDataLen int -} - -type Topic struct { - ZoneId string - Name string - Opts TopicOpts - Data [][]byte - StartPos int - Size int - DataSize int - Subs map[string]bool -} - -type topicKey struct { - ZoneId string - Name string -} - -type TopicNotify struct { - SubscribeId string - ZoneId string - Name string - Data []byte -} - -type Bus struct { - lock *sync.Mutex - topics map[topicKey]*Topic - subsReverse map[string]map[topicKey]bool - curDataLen int64 - notifyCh chan TopicNotify -} - -var GlobalBus *Bus - -func InitGlobalBus(notifyCh chan TopicNotify) { - GlobalBus = &Bus{ - lock: &sync.Mutex{}, - topics: make(map[topicKey]*Topic), - subsReverse: make(map[string]map[topicKey]bool), - curDataLen: 0, - notifyCh: notifyCh, - } -} - -func (b *Bus) TopicSubscribe(zoneId string, name string, subscribeId string, opts TopicOpts) (*Topic, error) { - if opts.MaxEntries < 0 { - return nil, fmt.Errorf("max entries must not be negative") - } - if opts.MaxDataLen < 0 { - return nil, fmt.Errorf("max data length must not be negative") - } - if opts.MaxEntries == 0 { - opts.MaxEntries = DefaultMaxEntries - } - if opts.MaxDataLen == 0 { - opts.MaxDataLen = DefaultMaxDataLen - } - if opts.MaxEntries > MaxMaxEntries { - return nil, fmt.Errorf("max entries exceeds limit") - } - if opts.MaxDataLen > MaxMaxDataLen { - return nil, fmt.Errorf("max data length exceeds limit") - } - if opts.MaxEntries <= 0 || opts.MaxDataLen <= 0 { - return nil, fmt.Errorf("max entries and max data length must be positive") - } - if subscribeId == "" { - return nil, fmt.Errorf("subscribe id must be provided") - } - b.lock.Lock() - defer b.lock.Unlock() - if b.curDataLen > MaxTotalData { - return nil, fmt.Errorf("total data exceeds limit") - } - key := topicKey{ZoneId: zoneId, Name: name} - topic := b.topics[key] - if topic == nil { - topic = &Topic{ - ZoneId: zoneId, - Name: name, - Opts: opts, - Data: make([][]byte, opts.MaxEntries), - StartPos: 0, - Size: 0, - Subs: make(map[string]bool), - } - b.curDataLen += int64(opts.MaxDataLen) - } - if topic.Opts.MaxDataLen < opts.MaxDataLen { - b.curDataLen += int64(opts.MaxDataLen - topic.Opts.MaxDataLen) - topic.Opts.MaxDataLen = opts.MaxDataLen - } - if topic.Opts.MaxEntries < opts.MaxEntries { - topic.Opts.MaxEntries = opts.MaxEntries - newData := make([][]byte, opts.MaxEntries) - copy(newData, topic.Data) - topic.Data = newData - } - topic.Subs[subscribeId] = true - subMap := b.subsReverse[subscribeId] - if subMap == nil { - subMap = make(map[topicKey]bool) - b.subsReverse[subscribeId] = subMap - } - subMap[key] = true - return topic, nil -} - -func (t *Topic) nextIdx(idx int) int { - return (t.StartPos + idx) % t.Opts.MaxEntries -} - -func (t *Topic) writePos() int { - return (t.StartPos + t.Size) % t.Opts.MaxEntries -} - -// returns subscribers to notify -func (b *Bus) Publish(zoneId string, name string, data []byte) error { - b.lock.Lock() - defer b.lock.Unlock() - key := topicKey{ZoneId: zoneId, Name: name} - topic := b.topics[key] - if topic == nil { - return nil - } - if len(data) > topic.Opts.MaxDataLen { - return fmt.Errorf("data too large") - } - if topic.Size < topic.Opts.MaxEntries { - topic.Data[topic.writePos()] = data - topic.Size++ - topic.DataSize += len(data) - } else { - topic.DataSize += len(data) - len(topic.Data[topic.StartPos]) - topic.Data[topic.StartPos] = data - topic.StartPos = topic.nextIdx(topic.StartPos) - } - // remove data items to make DataSize < MaxDataLen - // we know it will fit because len(data) <= MaxDataLen - for topic.DataSize > topic.Opts.MaxDataLen { - topic.DataSize -= len(topic.Data[topic.StartPos]) - topic.Data[topic.StartPos] = nil - topic.StartPos = topic.nextIdx(topic.StartPos) - topic.Size-- - } - for sub := range topic.Subs { - // yes, this can block, it will lock then entire bus which will create backpressure - // this implementation is good enough for now, but can be improved in the future - b.notifyCh <- TopicNotify{ - SubscribeId: sub, - ZoneId: zoneId, - Name: name, - Data: data, - } - } - return nil -} - -func (t *Topic) lastN(n int) [][]byte { - if t == nil { - return nil - } - if n > t.Size { - n = t.Size - } - data := make([][]byte, n) - for i := 0; i < n; i++ { - idx := (t.StartPos + i) % t.Opts.MaxEntries - data[i] = t.Data[idx] - } - return data -} - -func (b *Bus) GetLastN(zoneId string, name string, n int) [][]byte { - if n <= 0 { - return nil - } - b.lock.Lock() - defer b.lock.Unlock() - key := topicKey{ZoneId: zoneId, Name: name} - topic := b.topics[key] - return topic.lastN(n) -} - -func (b *Bus) GetAll(zoneId string, name string) [][]byte { - b.lock.Lock() - defer b.lock.Unlock() - key := topicKey{ZoneId: zoneId, Name: name} - topic := b.topics[key] - return topic.lastN(topic.Size) -} - -func (b *Bus) Subscribe(zoneId string, name string, subscribeId string) { - b.lock.Lock() - defer b.lock.Unlock() - key := topicKey{ZoneId: zoneId, Name: name} - topic := b.topics[key] - if topic != nil { - topic.Subs[subscribeId] = true - } - subMap := b.subsReverse[subscribeId] - if subMap == nil { - subMap = make(map[topicKey]bool) - b.subsReverse[subscribeId] = subMap - } - subMap[key] = true -} - -func (b *Bus) Unsubscribe(subscribeId string) { - b.lock.Lock() - defer b.lock.Unlock() - subMap := b.subsReverse[subscribeId] - if subMap == nil { - return - } - for key := range subMap { - topic := b.topics[key] - if topic == nil { - continue - } - delete(topic.Subs, subscribeId) - if len(topic.Subs) == 0 { - delete(b.topics, key) - } - } - delete(b.subsReverse, subscribeId) -} diff --git a/pkg/wstore/wstore.go b/pkg/wstore/wstore.go index 7e3240354..6efa82763 100644 --- a/pkg/wstore/wstore.go +++ b/pkg/wstore/wstore.go @@ -320,8 +320,6 @@ func UpdateObjectMeta(ctx context.Context, oref waveobj.ORef, meta MetaMapType) func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) { windowId := uuid.NewString() workspaceId := uuid.NewString() - tabId := uuid.NewString() - layoutNodeId := uuid.NewString() if winSize == nil { winSize = &WinSize{ Width: 1200, @@ -331,7 +329,6 @@ func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) { window := &Window{ OID: windowId, WorkspaceId: workspaceId, - ActiveTabId: tabId, ActiveBlockMap: make(map[string]string), Pos: Point{ X: 100, @@ -344,31 +341,20 @@ func CreateWindow(ctx context.Context, winSize *WinSize) (*Window, error) { return nil, fmt.Errorf("error inserting window: %w", err) } ws := &Workspace{ - OID: workspaceId, - Name: "w" + workspaceId[0:8], - TabIds: []string{tabId}, + OID: workspaceId, + Name: "w" + workspaceId[0:8], } err = DBInsert(ctx, ws) if err != nil { return nil, fmt.Errorf("error inserting workspace: %w", err) } - tab := &Tab{ - OID: tabId, - Name: "T1", - BlockIds: []string{}, - LayoutNode: layoutNodeId, - } - err = DBInsert(ctx, tab) + tab, err := CreateTab(ctx, ws.OID, "T1") if err != nil { return nil, fmt.Errorf("error inserting tab: %w", err) } - - layoutNode := &LayoutNode{ - OID: layoutNodeId, - } - err = DBInsert(ctx, layoutNode) + err = SetActiveTab(ctx, window.OID, tab.OID) if err != nil { - return nil, fmt.Errorf("error inserting layout node: %w", err) + return nil, fmt.Errorf("error setting active tab: %w", err) } client, err := DBGetSingleton[*Client](ctx) if err != nil {