From 5e655c7c551e545871738df5fe2ef3458c97fdb7 Mon Sep 17 00:00:00 2001 From: Mike Sawka Date: Thu, 20 Jun 2024 16:01:55 -0700 Subject: [PATCH] refactor cmdqueue out of blockcontroller (#65) --- frontend/app/store/global.ts | 1 + pkg/blockcontroller/blockcontroller.go | 221 ++--------------- pkg/cmdqueue/cmdqueue.go | 268 +++++++++++++++++++++ pkg/service/blockservice/blockservice.go | 19 +- pkg/service/objectservice/objectservice.go | 5 +- pkg/web/ws.go | 19 +- pkg/wshutil/wshcommands.go | 15 ++ 7 files changed, 328 insertions(+), 220 deletions(-) create mode 100644 pkg/cmdqueue/cmdqueue.go diff --git a/frontend/app/store/global.ts b/frontend/app/store/global.ts index 3b6a9583b..11a3d5609 100644 --- a/frontend/app/store/global.ts +++ b/frontend/app/store/global.ts @@ -76,6 +76,7 @@ const atoms = { settingsConfigAtom: settingsConfigAtom, tabAtom: tabAtom, }; +(window as any).globalAtoms = atoms; // key is "eventType" or "eventType|oref" const eventSubjects = new Map>(); diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 29a0c0fa2..48aaa7d47 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -34,6 +34,11 @@ const ( BlockFile_Html = "html" // used for alt html layout ) +const ( + DefaultTermMaxFileSize = 256 * 1024 + DefaultHtmlMaxFileSize = 256 * 1024 +) + const DefaultTimeout = 2 * time.Second var globalLock = &sync.Mutex{} @@ -45,8 +50,11 @@ type BlockInputUnion struct { TermSize *shellexec.TermSize `json:"termsize,omitempty"` } +type RunCmdFnType = func(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (wshutil.ResponseDataType, error) + type BlockController struct { Lock *sync.Mutex + TabId string BlockId string BlockDef *wstore.BlockDef InputCh chan wshutil.BlockCommand @@ -54,6 +62,7 @@ type BlockController struct { CreatedHtmlFile bool ShellProc *shellexec.ShellProc ShellInputCh chan *BlockInputUnion + RunCmdFn RunCmdFnType } func (bc *BlockController) WithLock(f func()) { @@ -101,10 +110,7 @@ func (bc *BlockController) Close() { } } -const DefaultTermMaxFileSize = 256 * 1024 -const DefaultHtmlMaxFileSize = 256 * 1024 - -func handleAppendBlockFile(blockId string, blockFile string, data []byte) error { +func HandleAppendBlockFile(blockId string, blockFile string, data []byte) error { ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) defer cancelFn() err := filestore.WFS.AppendData(ctx, blockId, blockFile, data) @@ -124,32 +130,6 @@ func handleAppendBlockFile(blockId string, blockFile string, data []byte) error return nil } -func handleAppendIJsonFile(blockId string, blockFile string, cmd map[string]any, tryCreate bool) error { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - if blockFile == BlockFile_Html && tryCreate { - err := filestore.WFS.MakeFile(ctx, blockId, blockFile, nil, filestore.FileOptsType{MaxSize: DefaultHtmlMaxFileSize, IJson: true}) - if err != nil && err != filestore.ErrAlreadyExists { - return fmt.Errorf("error creating blockfile[html]: %w", err) - } - } - err := filestore.WFS.AppendIJson(ctx, blockId, blockFile, cmd) - if err != nil { - return fmt.Errorf("error appending to blockfile(ijson): %w", err) - } - eventbus.SendEvent(eventbus.WSEventType{ - EventType: "blockfile", - ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(), - Data: &eventbus.WSFileEventData{ - ZoneId: blockId, - FileName: blockFile, - FileOp: eventbus.FileOp_Append, - Data64: base64.StdEncoding.EncodeToString([]byte("{}")), - }, - }) - return nil -} - func (bc *BlockController) resetTerminalState() { ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) defer cancelFn() @@ -165,114 +145,12 @@ func (bc *BlockController) resetTerminalState() { } } -func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) { - if strings.Contains(simpleId, ":") { - rtn, err := waveobj.ParseORef(simpleId) - if err != nil { - return nil, fmt.Errorf("error parsing simple id: %w", err) - } - return &rtn, nil - } - return wstore.DBResolveEasyOID(ctx, simpleId) -} - -func staticHandleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) { - oref, err := waveobj.ParseORef(cmd.ORef) - if err != nil { - return nil, fmt.Errorf("error parsing oref: %w", err) - } - obj, err := wstore.DBGetORef(ctx, oref) - if err != nil { - return nil, fmt.Errorf("error getting object: %w", err) - } - if obj == nil { - return nil, fmt.Errorf("object not found: %s", oref) - } - return waveobj.GetMeta(obj), nil -} - -func staticHandleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, curBlockId string) (map[string]any, error) { - var oref *waveobj.ORef - if cmd.ORef != "" { - orefVal, err := waveobj.ParseORef(cmd.ORef) - if err != nil { - return nil, fmt.Errorf("error parsing oref: %w", err) - } - oref = &orefVal - } else { - orefVal := waveobj.MakeORef(wstore.OType_Block, curBlockId) - oref = &orefVal - } - log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta) - obj, err := wstore.DBGetORef(ctx, *oref) - if err != nil { - return nil, fmt.Errorf("error getting object: %w", err) - } - if obj == nil { - return nil, nil - } - meta := waveobj.GetMeta(obj) - if meta == nil { - meta = make(map[string]any) - } - for k, v := range cmd.Meta { - if v == nil { - delete(meta, k) - continue - } - meta[k] = v - } - waveobj.SetMeta(obj, meta) - err = wstore.DBUpdate(ctx, obj) - if err != nil { - return nil, fmt.Errorf("error updating block: %w", err) - } - // send a waveobj:update event - updatedBlock, err := wstore.DBGetORef(ctx, *oref) - if err != nil { - return nil, fmt.Errorf("error getting object (2): %w", err) - } - eventbus.SendEvent(eventbus.WSEventType{ - EventType: "waveobj:update", - ORef: oref.String(), - Data: wstore.WaveObjUpdate{ - UpdateType: wstore.UpdateType_Update, - OType: updatedBlock.GetOType(), - OID: waveobj.GetOID(updatedBlock), - Obj: updatedBlock, - }, - }) - return nil, nil -} - -func staticHandleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) { - rtn := make(map[string]any) - for _, simpleId := range cmd.Ids { - oref, err := resolveSimpleId(ctx, simpleId) - if err != nil || oref == nil { - continue - } - rtn[simpleId] = oref.String() - } - return rtn, nil -} - func (bc *BlockController) waveOSCMessageHandler(ctx context.Context, cmd wshutil.BlockCommand, respFn wshutil.ResponseFnType) (wshutil.ResponseDataType, error) { if strings.HasPrefix(cmd.GetCommand(), "controller:") { bc.InputCh <- cmd return nil, nil } - switch cmd.GetCommand() { - case wshutil.BlockCommand_GetMeta: - return staticHandleGetMeta(ctx, cmd.(*wshutil.BlockGetMetaCommand)) - case wshutil.Command_ResolveIds: - return staticHandleResolveIds(ctx, cmd.(*wshutil.ResolveIdsCommand)) - case wshutil.Command_CreateBlock: - return nil, nil - } - - ProcessStaticCommand(bc.BlockId, cmd) - return nil, nil + return bc.RunCmdFn(ctx, cmd, wshutil.CmdContextType{BlockId: bc.BlockId, TabId: bc.TabId}) } func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { @@ -317,7 +195,7 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { for { nr, err := ptyBuffer.Read(buf) if nr > 0 { - err := handleAppendBlockFile(bc.BlockId, BlockFile_Main, buf[:nr]) + err := HandleAppendBlockFile(bc.BlockId, BlockFile_Main, buf[:nr]) if err != nil { log.Printf("error appending to blockfile: %v\n", err) } @@ -411,7 +289,7 @@ func (bc *BlockController) Run(bdata *wstore.Block) { } } -func StartBlockController(ctx context.Context, blockId string) error { +func StartBlockController(ctx context.Context, tabId string, blockId string, runCmdFn RunCmdFnType) error { blockData, err := wstore.DBMustGet[*wstore.Block](ctx, blockId) if err != nil { return fmt.Errorf("error getting block: %w", err) @@ -430,10 +308,12 @@ func StartBlockController(ctx context.Context, blockId string) error { return nil } bc := &BlockController{ - Lock: &sync.Mutex{}, - BlockId: blockId, - Status: "init", - InputCh: make(chan wshutil.BlockCommand), + Lock: &sync.Mutex{}, + TabId: tabId, + BlockId: blockId, + Status: "init", + InputCh: make(chan wshutil.BlockCommand), + RunCmdFn: runCmdFn, } blockControllerMap[blockId] = bc go bc.Run(blockData) @@ -454,66 +334,3 @@ func GetBlockController(blockId string) *BlockController { defer globalLock.Unlock() return blockControllerMap[blockId] } - -func ProcessStaticCommand(blockId string, cmdGen wshutil.BlockCommand) error { - ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) - defer cancelFn() - switch cmd := cmdGen.(type) { - case *wshutil.BlockSetViewCommand: - log.Printf("SETVIEW: %s | %q\n", blockId, cmd.View) - block, err := wstore.DBGet[*wstore.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block: %w", err) - } - block.View = cmd.View - err = wstore.DBUpdate(ctx, block) - if err != nil { - return fmt.Errorf("error updating block: %w", err) - } - // send a waveobj:update event - updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block: %w", err) - } - eventbus.SendEvent(eventbus.WSEventType{ - EventType: "waveobj:update", - ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(), - Data: wstore.WaveObjUpdate{ - UpdateType: wstore.UpdateType_Update, - OType: wstore.OType_Block, - OID: blockId, - Obj: updatedBlock, - }, - }) - return nil - case *wshutil.BlockSetMetaCommand: - _, err := staticHandleSetMeta(ctx, cmd, blockId) - if err != nil { - return err - } - return nil - - case *wshutil.BlockMessageCommand: - log.Printf("MESSAGE: %s | %q\n", blockId, cmd.Message) - return nil - - case *wshutil.BlockAppendFileCommand: - log.Printf("APPENDFILE: %s | %q | len:%d\n", blockId, cmd.FileName, len(cmd.Data)) - err := handleAppendBlockFile(blockId, cmd.FileName, cmd.Data) - if err != nil { - return fmt.Errorf("error appending blockfile: %w", err) - } - return nil - - case *wshutil.BlockAppendIJsonCommand: - log.Printf("APPENDIJSON: %s | %q\n", blockId, cmd.FileName) - err := handleAppendIJsonFile(blockId, cmd.FileName, cmd.Data, true) - if err != nil { - return fmt.Errorf("error appending blockfile(ijson): %w", err) - } - return nil - - default: - return fmt.Errorf("unknown command type %T", cmdGen) - } -} diff --git a/pkg/cmdqueue/cmdqueue.go b/pkg/cmdqueue/cmdqueue.go new file mode 100644 index 000000000..315243d04 --- /dev/null +++ b/pkg/cmdqueue/cmdqueue.go @@ -0,0 +1,268 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmdqueue + +import ( + "context" + "encoding/base64" + "fmt" + "log" + "runtime/debug" + "strings" + "time" + + "github.com/wavetermdev/thenextwave/pkg/blockcontroller" + "github.com/wavetermdev/thenextwave/pkg/eventbus" + "github.com/wavetermdev/thenextwave/pkg/filestore" + "github.com/wavetermdev/thenextwave/pkg/waveobj" + "github.com/wavetermdev/thenextwave/pkg/wshutil" + "github.com/wavetermdev/thenextwave/pkg/wstore" +) + +const DefaultTimeout = 2 * time.Second +const CmdQueueSize = 100 + +func RunCmd(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (rtnData wshutil.ResponseDataType, rtnErr error) { + defer func() { + if r := recover(); r != nil { + log.Printf("PANIC: %v\n", r) + debug.PrintStack() + rtnData = nil + rtnErr = fmt.Errorf("panic: %v", r) + return + } + }() + blockId := cmdCtx.BlockId + bcCmd, ok := cmd.(wshutil.BlockControllerCommand) + if ok && bcCmd.GetBlockId() != "" { + blockId = bcCmd.GetBlockId() + } + if strings.HasPrefix(cmd.GetCommand(), "controller:") { + // send to block controller + bc := blockcontroller.GetBlockController(blockId) + if bc == nil { + return nil, fmt.Errorf("block controller not found for block %q", blockId) + } + bc.InputCh <- cmd + return nil, nil + } + switch typedCmd := cmd.(type) { + case *wshutil.BlockGetMetaCommand: + return handleGetMeta(ctx, typedCmd) + case *wshutil.ResolveIdsCommand: + return handleResolveIds(ctx, typedCmd) + case *wshutil.BlockSetMetaCommand: + return handleSetMeta(ctx, typedCmd, cmdCtx) + case *wshutil.BlockSetViewCommand: + return handleSetView(ctx, typedCmd, cmdCtx) + case *wshutil.BlockMessageCommand: + log.Printf("MESSAGE: %s | %q\n", blockId, typedCmd.Message) + return nil, nil + case *wshutil.BlockAppendFileCommand: + log.Printf("APPENDFILE: %s | %q | len:%d\n", blockId, typedCmd.FileName, len(typedCmd.Data)) + err := handleAppendBlockFile(blockId, typedCmd.FileName, typedCmd.Data) + if err != nil { + return nil, fmt.Errorf("error appending blockfile: %w", err) + } + return nil, nil + case *wshutil.BlockAppendIJsonCommand: + log.Printf("APPENDIJSON: %s | %q\n", blockId, typedCmd.FileName) + err := handleAppendIJsonFile(blockId, typedCmd.FileName, typedCmd.Data, true) + if err != nil { + return nil, fmt.Errorf("error appending blockfile(ijson): %w", err) + } + return nil, nil + case *wshutil.CreateBlockCommand: + return handleCreateBlock(ctx, typedCmd, cmdCtx) + default: + return nil, fmt.Errorf("unknown command: %q", cmd.GetCommand()) + } +} + +func handleSetView(ctx context.Context, cmd *wshutil.BlockSetViewCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) { + log.Printf("SETVIEW: %s | %q\n", cmdCtx.BlockId, cmd.View) + block, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId) + if err != nil { + return nil, fmt.Errorf("error getting block: %w", err) + } + block.View = cmd.View + err = wstore.DBUpdate(ctx, block) + if err != nil { + return nil, fmt.Errorf("error updating block: %w", err) + } + // send a waveobj:update event + updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId) + if err != nil { + return nil, fmt.Errorf("error getting block: %w", err) + } + eventbus.SendEvent(eventbus.WSEventType{ + EventType: "waveobj:update", + ORef: waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId).String(), + Data: wstore.WaveObjUpdate{ + UpdateType: wstore.UpdateType_Update, + OType: wstore.OType_Block, + OID: cmdCtx.BlockId, + Obj: updatedBlock, + }, + }) + return nil, nil +} + +func handleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) { + oref, err := waveobj.ParseORef(cmd.ORef) + if err != nil { + return nil, fmt.Errorf("error parsing oref: %w", err) + } + obj, err := wstore.DBGetORef(ctx, oref) + if err != nil { + return nil, fmt.Errorf("error getting object: %w", err) + } + if obj == nil { + return nil, fmt.Errorf("object not found: %s", oref) + } + return waveobj.GetMeta(obj), nil +} + +func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) { + if strings.Contains(simpleId, ":") { + rtn, err := waveobj.ParseORef(simpleId) + if err != nil { + return nil, fmt.Errorf("error parsing simple id: %w", err) + } + return &rtn, nil + } + return wstore.DBResolveEasyOID(ctx, simpleId) +} + +func handleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) { + rtn := make(map[string]any) + for _, simpleId := range cmd.Ids { + oref, err := resolveSimpleId(ctx, simpleId) + if err != nil || oref == nil { + continue + } + rtn[simpleId] = oref.String() + } + return rtn, nil +} + +func handleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) { + var oref *waveobj.ORef + if cmd.ORef != "" { + orefVal, err := waveobj.ParseORef(cmd.ORef) + if err != nil { + return nil, fmt.Errorf("error parsing oref: %w", err) + } + oref = &orefVal + } else { + orefVal := waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId) + oref = &orefVal + } + log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta) + obj, err := wstore.DBGetORef(ctx, *oref) + if err != nil { + return nil, fmt.Errorf("error getting object: %w", err) + } + if obj == nil { + return nil, nil + } + meta := waveobj.GetMeta(obj) + if meta == nil { + meta = make(map[string]any) + } + for k, v := range cmd.Meta { + if v == nil { + delete(meta, k) + continue + } + meta[k] = v + } + waveobj.SetMeta(obj, meta) + err = wstore.DBUpdate(ctx, obj) + if err != nil { + return nil, fmt.Errorf("error updating block: %w", err) + } + // send a waveobj:update event + updatedBlock, err := wstore.DBGetORef(ctx, *oref) + if err != nil { + return nil, fmt.Errorf("error getting object (2): %w", err) + } + eventbus.SendEvent(eventbus.WSEventType{ + EventType: "waveobj:update", + ORef: oref.String(), + Data: wstore.WaveObjUpdate{ + UpdateType: wstore.UpdateType_Update, + OType: updatedBlock.GetOType(), + OID: waveobj.GetOID(updatedBlock), + Obj: updatedBlock, + }, + }) + return nil, nil +} + +func handleAppendBlockFile(blockId string, blockFile string, data []byte) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + err := filestore.WFS.AppendData(ctx, blockId, blockFile, data) + if err != nil { + return fmt.Errorf("error appending to blockfile: %w", err) + } + eventbus.SendEvent(eventbus.WSEventType{ + EventType: "blockfile", + ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(), + Data: &eventbus.WSFileEventData{ + ZoneId: blockId, + FileName: blockFile, + FileOp: eventbus.FileOp_Append, + Data64: base64.StdEncoding.EncodeToString(data), + }, + }) + return nil +} + +func handleAppendIJsonFile(blockId string, blockFile string, cmd map[string]any, tryCreate bool) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + if blockFile == blockcontroller.BlockFile_Html && tryCreate { + err := filestore.WFS.MakeFile(ctx, blockId, blockFile, nil, filestore.FileOptsType{MaxSize: blockcontroller.DefaultHtmlMaxFileSize, IJson: true}) + if err != nil && err != filestore.ErrAlreadyExists { + return fmt.Errorf("error creating blockfile[html]: %w", err) + } + } + err := filestore.WFS.AppendIJson(ctx, blockId, blockFile, cmd) + if err != nil { + return fmt.Errorf("error appending to blockfile(ijson): %w", err) + } + eventbus.SendEvent(eventbus.WSEventType{ + EventType: "blockfile", + ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(), + Data: &eventbus.WSFileEventData{ + ZoneId: blockId, + FileName: blockFile, + FileOp: eventbus.FileOp_Append, + Data64: base64.StdEncoding.EncodeToString([]byte("{}")), + }, + }) + return nil +} + +func handleCreateBlock(ctx context.Context, cmd *wshutil.CreateBlockCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) { + tabId := cmdCtx.TabId + if cmd.TabId != "" { + tabId = cmd.TabId + } + log.Printf("handleCreateBlock %s %v\n", tabId, cmd.BlockDef) + blockData, err := wstore.CreateBlock(ctx, tabId, cmd.BlockDef, cmd.RtOpts) + log.Printf("blockData: %v err:%v\n", blockData, err) + if err != nil { + return nil, fmt.Errorf("error creating block: %w", err) + } + if blockData.Controller != "" { + err = blockcontroller.StartBlockController(ctx, cmd.TabId, blockData.OID, RunCmd) + if err != nil { + return nil, fmt.Errorf("error starting block controller: %w", err) + } + } + return map[string]any{"blockId": blockData.OID}, nil +} diff --git a/pkg/service/blockservice/blockservice.go b/pkg/service/blockservice/blockservice.go index 499d763a8..2f0a15503 100644 --- a/pkg/service/blockservice/blockservice.go +++ b/pkg/service/blockservice/blockservice.go @@ -6,10 +6,9 @@ package blockservice import ( "context" "fmt" - "strings" "time" - "github.com/wavetermdev/thenextwave/pkg/blockcontroller" + "github.com/wavetermdev/thenextwave/pkg/cmdqueue" "github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" "github.com/wavetermdev/thenextwave/pkg/wshutil" @@ -29,17 +28,11 @@ func (bs *BlockService) SendCommand_Meta() tsgenmeta.MethodMeta { } } -func (bs *BlockService) SendCommand(blockId string, cmd wshutil.BlockCommand) error { - if strings.HasPrefix(cmd.GetCommand(), "controller:") { - bc := blockcontroller.GetBlockController(blockId) - if bc == nil { - return fmt.Errorf("block controller not found for block %q", blockId) - } - bc.InputCh <- cmd - } else { - blockcontroller.ProcessStaticCommand(blockId, cmd) - } - return nil +func (bs *BlockService) SendCommand(uiContext wstore.UIContext, blockId string, cmd wshutil.BlockCommand) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + _, err := cmdqueue.RunCmd(ctx, cmd, wshutil.CmdContextType{BlockId: blockId, TabId: uiContext.ActiveTabId}) + return err } func (bs *BlockService) SaveTerminalState(ctx context.Context, blockId string, state string, stateType string, ptyOffset int64) error { diff --git a/pkg/service/objectservice/objectservice.go b/pkg/service/objectservice/objectservice.go index e682ed75b..00e446d4d 100644 --- a/pkg/service/objectservice/objectservice.go +++ b/pkg/service/objectservice/objectservice.go @@ -11,6 +11,7 @@ import ( "time" "github.com/wavetermdev/thenextwave/pkg/blockcontroller" + "github.com/wavetermdev/thenextwave/pkg/cmdqueue" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wstore" @@ -136,7 +137,7 @@ func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string) return nil, fmt.Errorf("error getting tab: %w", err) } for _, blockId := range tab.BlockIds { - blockErr := blockcontroller.StartBlockController(ctx, blockId) + blockErr := blockcontroller.StartBlockController(ctx, tabId, blockId, cmdqueue.RunCmd) if blockErr != nil { // we don't want to fail the set active tab operation if a block controller fails to start log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr) @@ -173,7 +174,7 @@ func (svc *ObjectService) CreateBlock(uiContext wstore.UIContext, blockDef *wsto return "", nil, fmt.Errorf("error creating block: %w", err) } if blockData.Controller != "" { - err = blockcontroller.StartBlockController(ctx, blockData.OID) + err = blockcontroller.StartBlockController(ctx, uiContext.ActiveTabId, blockData.OID, cmdqueue.RunCmd) if err != nil { return "", nil, fmt.Errorf("error starting block controller: %w", err) } diff --git a/pkg/web/ws.go b/pkg/web/ws.go index 4ea23af10..d8a08eff1 100644 --- a/pkg/web/ws.go +++ b/pkg/web/ws.go @@ -4,6 +4,7 @@ package web import ( + "context" "encoding/json" "fmt" "log" @@ -15,8 +16,8 @@ import ( "github.com/google/uuid" "github.com/gorilla/mux" "github.com/gorilla/websocket" + "github.com/wavetermdev/thenextwave/pkg/cmdqueue" "github.com/wavetermdev/thenextwave/pkg/eventbus" - "github.com/wavetermdev/thenextwave/pkg/service/blockservice" "github.com/wavetermdev/thenextwave/pkg/web/webcmd" "github.com/wavetermdev/thenextwave/pkg/wshutil" ) @@ -26,6 +27,8 @@ const wsWriteWaitTimeout = 10 * time.Second const wsPingPeriodTickTime = 10 * time.Second const wsInitialPingTime = 1 * time.Second +const DefaultCommandTimeout = 2 * time.Second + func RunWebSocketServer() { gr := mux.NewRouter() gr.HandleFunc("/ws", HandleWs) @@ -99,13 +102,23 @@ func processWSCommand(jmsg map[string]any, outputCh chan any) { Command: wshutil.BlockCommand_Input, TermSize: &cmd.TermSize, } - blockservice.BlockServiceInstance.SendCommand(cmd.BlockId, blockCmd) + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultCommandTimeout) + defer cancelFn() + _, err = cmdqueue.RunCmd(ctx, blockCmd, wshutil.CmdContextType{BlockId: cmd.BlockId}) + if err != nil { + log.Printf("error running command %q: %v\n", blockCmd.Command, err) + } case *webcmd.BlockInputWSCommand: blockCmd := &wshutil.BlockInputCommand{ Command: wshutil.BlockCommand_Input, InputData64: cmd.InputData64, } - blockservice.BlockServiceInstance.SendCommand(cmd.BlockId, blockCmd) + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultCommandTimeout) + defer cancelFn() + _, err = cmdqueue.RunCmd(ctx, blockCmd, wshutil.CmdContextType{BlockId: cmd.BlockId}) + if err != nil { + log.Printf("error running command %q: %v\n", blockCmd.Command, err) + } } } diff --git a/pkg/wshutil/wshcommands.go b/pkg/wshutil/wshcommands.go index 05935e1f9..8f9b8175e 100644 --- a/pkg/wshutil/wshcommands.go +++ b/pkg/wshutil/wshcommands.go @@ -52,6 +52,11 @@ func CommandTypeUnionMeta() tsgenmeta.TypeUnionMeta { } } +type CmdContextType struct { + BlockId string + TabId string +} + type baseCommand struct { Command string `json:"command"` } @@ -60,6 +65,10 @@ type BlockCommand interface { GetCommand() string } +type BlockControllerCommand interface { + GetBlockId() string +} + type BlockCommandWrapper struct { BlockCommand } @@ -86,6 +95,7 @@ func ParseCmdMap(cmdMap map[string]any) (BlockCommand, error) { } type BlockInputCommand struct { + BlockId string `json:"blockid"` Command string `json:"command" tstype:"\"controller:input\""` InputData64 string `json:"inputdata64,omitempty"` SigName string `json:"signame,omitempty"` @@ -96,6 +106,10 @@ func (ic *BlockInputCommand) GetCommand() string { return BlockCommand_Input } +func (ic *BlockInputCommand) GetBlockId() string { + return ic.BlockId +} + type ResolveIdsCommand struct { Command string `json:"command" tstype:"\"resolveids\""` Ids []string `json:"ids"` @@ -164,6 +178,7 @@ func (bwc *BlockAppendIJsonCommand) GetCommand() string { type CreateBlockCommand struct { Command string `json:"command" tstype:"\"createblock\""` + TabId string `json:"tabid"` BlockDef *wstore.BlockDef `json:"blockdef"` RtOpts *wstore.RuntimeOpts `json:"rtopts,omitempty"` }