diff --git a/cmd/wsh/cmd/getmeta.go b/cmd/wsh/cmd/getmeta.go index f6274104e..d9551a459 100644 --- a/cmd/wsh/cmd/getmeta.go +++ b/cmd/wsh/cmd/getmeta.go @@ -4,8 +4,10 @@ package cmd import ( + "encoding/json" "fmt" - "os" + "log" + "strings" "github.com/spf13/cobra" "github.com/wavetermdev/thenextwave/pkg/wshutil" @@ -33,10 +35,29 @@ func getMetaRun(cmd *cobra.Command, args []string) { fmt.Printf("%v\n", err) return } + + setTermRawMode() + fullORef, err := resolveSimpleId(oref) + if err != nil { + fmt.Printf("error resolving oref: %v\r\n", err) + return + } getMetaWshCmd := &wshutil.BlockGetMetaCommand{ Command: wshutil.BlockCommand_SetMeta, - OID: oref, + ORef: fullORef, } - barr, _ := wshutil.EncodeWaveOSCMessage(getMetaWshCmd) - os.Stdout.Write(barr) + resp, err := RpcClient.SendRpcRequest(getMetaWshCmd, 2000) + if err != nil { + log.Printf("error getting metadata: %v\r\n", err) + return + } + outArr, err := json.MarshalIndent(resp, "", " ") + if err != nil { + log.Printf("error formatting metadata: %v\r\n", err) + return + } + outStr := string(outArr) + outStr = strings.ReplaceAll(outStr, "\n", "\r\n") + fmt.Print(outStr) + fmt.Print("\r\n") } diff --git a/cmd/wsh/cmd/html.go b/cmd/wsh/cmd/html.go index 1fde55b42..7d5a97dda 100644 --- a/cmd/wsh/cmd/html.go +++ b/cmd/wsh/cmd/html.go @@ -5,7 +5,6 @@ package cmd import ( "fmt" - "os" "github.com/spf13/cobra" ) @@ -14,12 +13,18 @@ func init() { rootCmd.AddCommand(htmlCmd) } +var htmlCmd = &cobra.Command{ + Use: "html", + Short: "Launch a demo html-mode terminal", + Run: htmlRun, +} + func htmlRun(cmd *cobra.Command, args []string) { defer doShutdown("normal exit", 0) setTermHtmlMode() for { var buf [1]byte - _, err := os.Stdin.Read(buf[:]) + _, err := WrappedStdin.Read(buf[:]) if err != nil { doShutdown(fmt.Sprintf("stdin closed/error (%v)", err), 1) } @@ -33,9 +38,3 @@ func htmlRun(cmd *cobra.Command, args []string) { } } } - -var htmlCmd = &cobra.Command{ - Use: "html", - Short: "Launch a demo html-mode terminal", - Run: htmlRun, -} diff --git a/cmd/wsh/cmd/root.go b/cmd/wsh/cmd/root.go index f79e4abc5..730b97c14 100644 --- a/cmd/wsh/cmd/root.go +++ b/cmd/wsh/cmd/root.go @@ -5,6 +5,7 @@ package cmd import ( "fmt" + "io" "log" "os" "os/signal" @@ -12,6 +13,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/google/uuid" "github.com/spf13/cobra" @@ -30,8 +32,11 @@ var ( var shutdownOnce sync.Once var origTermState *term.State +var madeRaw bool var usingHtmlMode bool var shutdownSignalHandlersInstalled bool +var WrappedStdin io.Reader +var RpcClient *wshutil.WshRpc func doShutdown(reason string, exitCode int) { shutdownOnce.Do(func() { @@ -42,8 +47,8 @@ func doShutdown(reason string, exitCode int) { Command: wshutil.BlockCommand_SetMeta, Meta: map[string]any{"term:mode": nil}, } - barr, _ := wshutil.EncodeWaveOSCMessage(cmd) - os.Stdout.Write(barr) + RpcClient.SendCommand(cmd) + time.Sleep(10 * time.Millisecond) } if origTermState != nil { term.Restore(int(os.Stdin.Fd()), origTermState) @@ -51,20 +56,42 @@ func doShutdown(reason string, exitCode int) { }) } -func setTermHtmlMode() { - installShutdownSignalHandlers() +// returns the wrapped stdin and a new rpc client (that wraps the stdin input and stdout output) +func setupRpcClient(handlerFn wshutil.CommandHandlerFnType) { + log.Printf("setup rpc client\r\n") + messageCh := make(chan wshutil.RpcMessage) + ptyBuf := wshutil.MakePtyBuffer(wshutil.WaveServerOSCPrefix, os.Stdin, messageCh) + rpcClient, outputCh := wshutil.MakeWshRpc(wshutil.WaveOSC, messageCh, handlerFn) + go func() { + for barr := range outputCh { + os.Stdout.Write(barr) + } + }() + WrappedStdin = ptyBuf + RpcClient = rpcClient +} + +func setTermRawMode() { + if madeRaw { + return + } origState, err := term.MakeRaw(int(os.Stdin.Fd())) if err != nil { fmt.Fprintf(os.Stderr, "Error setting raw mode: %v\n", err) return } origTermState = origState + madeRaw = true +} + +func setTermHtmlMode() { + installShutdownSignalHandlers() + setTermRawMode() cmd := &wshutil.BlockSetMetaCommand{ Command: wshutil.BlockCommand_SetMeta, Meta: map[string]any{"term:mode": "html"}, } - barr, _ := wshutil.EncodeWaveOSCMessage(cmd) - os.Stdout.Write(barr) + RpcClient.SendCommand(cmd) usingHtmlMode = true } @@ -85,7 +112,7 @@ func installShutdownSignalHandlers() { var oidRe = regexp.MustCompile(`^[0-9a-f]{8}$`) func validateEasyORef(oref string) error { - if strings.Index(oref, ":") >= 0 { + if strings.Contains(oref, ":") { _, err := waveobj.ParseORef(oref) if err != nil { return fmt.Errorf("invalid ORef: %v", err) @@ -105,7 +132,31 @@ func validateEasyORef(oref string) error { return nil } +func isFullORef(orefStr string) bool { + _, err := waveobj.ParseORef(orefStr) + return err == nil +} + +func resolveSimpleId(id string) (string, error) { + if isFullORef(id) { + return id, nil + } + resolveCmd := &wshutil.ResolveIdsCommand{ + Command: wshutil.Command_ResolveIds, + Ids: []string{id}, + } + resp, err := RpcClient.SendRpcRequest(resolveCmd, 2000) + if err != nil { + return "", err + } + if resp[id] == nil { + return "", fmt.Errorf("id not found: %q", id) + } + return resp[id].(string), nil +} + // Execute executes the root command. func Execute() error { + setupRpcClient(nil) return rootCmd.Execute() } diff --git a/cmd/wsh/cmd/setmeta.go b/cmd/wsh/cmd/setmeta.go index 7b19d0082..2058bc2c4 100644 --- a/cmd/wsh/cmd/setmeta.go +++ b/cmd/wsh/cmd/setmeta.go @@ -6,7 +6,6 @@ package cmd import ( "encoding/json" "fmt" - "os" "strconv" "strings" @@ -76,11 +75,21 @@ func setMetaRun(cmd *cobra.Command, args []string) { fmt.Printf("%v\n", err) return } + setTermRawMode() + fullORef, err := resolveSimpleId(oref) + if err != nil { + fmt.Printf("error resolving oref: %v\n", err) + return + } setMetaWshCmd := &wshutil.BlockSetMetaCommand{ Command: wshutil.BlockCommand_SetMeta, - OID: oref, + ORef: fullORef, Meta: meta, } - barr, _ := wshutil.EncodeWaveOSCMessage(setMetaWshCmd) - os.Stdout.Write(barr) + _, err = RpcClient.SendRpcRequest(setMetaWshCmd, 2000) + if err != nil { + fmt.Printf("error setting metadata: %v\n", err) + return + } + fmt.Print("metadata set\r\n") } diff --git a/electron.vite.config.ts b/electron.vite.config.ts index f7ac2e494..94e1ffbb7 100644 --- a/electron.vite.config.ts +++ b/electron.vite.config.ts @@ -47,6 +47,9 @@ export default defineConfig({ }, }, }, + server: { + open: false, + }, plugins: [ react({}), tsconfigPaths(), diff --git a/emain/emain.ts b/emain/emain.ts index b2f834264..9da5ef51c 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -2,13 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 import * as electron from "electron"; +import fs from "fs"; import * as child_process from "node:child_process"; +import os from "os"; import * as path from "path"; import * as readline from "readline"; import { debounce } from "throttle-debounce"; import * as services from "../frontend/app/store/services"; -import os from "os"; -import fs from "fs"; const electronApp = electron.app; const isDev = process.env.WAVETERM_DEV; @@ -65,7 +65,7 @@ function getWaveSrvPath(): string { function getWaveSrvPathWin(): string { const appPath = path.join(getGoAppBasePath(), "bin", "wavesrv.exe"); - return `& "${appPath}"` + return `& "${appPath}"`; } function getWaveSrvCwd(): string { @@ -85,12 +85,12 @@ function runWaveSrv(): Promise { envCopy[WaveDevVarName] = "1"; } envCopy[WaveSrvReadySignalPidVarName] = process.pid.toString(); - let waveSrvCmd: string; - if (process.platform === "win32") { - waveSrvCmd = getWaveSrvPathWin(); - } else { - waveSrvCmd = getWaveSrvPath(); - } + let waveSrvCmd: string; + if (process.platform === "win32") { + waveSrvCmd = getWaveSrvPathWin(); + } else { + waveSrvCmd = getWaveSrvPath(); + } console.log("trying to run local server", waveSrvCmd); const proc = child_process.spawn(getWaveSrvPath(), { cwd: getWaveSrvCwd(), @@ -121,23 +121,29 @@ function runWaveSrv(): Promise { terminal: false, }); rlStderr.on("line", (line) => { - if (line.includes("WAVESRV-ESTART")) { - waveSrvReadyResolve(true); - return; - } + if (line.includes("WAVESRV-ESTART")) { + waveSrvReadyResolve(true); + return; + } console.log(line); }); return rtnPromise; } -function mainResizeHandler(_: any, win: Electron.BrowserWindow) { +async function mainResizeHandler(_: any, windowId: string, win: Electron.BrowserWindow) { if (win == null || win.isDestroyed() || win.fullScreen) { return; } const bounds = win.getBounds(); - const winSize = { width: bounds.width, height: bounds.height, top: bounds.y, left: bounds.x }; - const url = new URL(getBaseHostPort() + "/api/set-winsize"); - // TODO + try { + await services.WindowService.SetWindowPosAndSize( + windowId, + { x: bounds.x, y: bounds.y }, + { width: bounds.width, height: bounds.height } + ); + } catch (e) { + console.log("error resizing window", e); + } } function shNavHandler(event: Electron.Event, url: string) { @@ -182,12 +188,29 @@ function shFrameNavHandler(event: Electron.Event primaryDisplay.workAreaSize.height) { + winHeight = primaryDisplay.workAreaSize.height; + } + if (winWidth > primaryDisplay.workAreaSize.width) { + winWidth = primaryDisplay.workAreaSize.width; + } + let winX = waveWindow.pos.x; + let winY = waveWindow.pos.y; + if (winX + winWidth > primaryDisplay.workAreaSize.width) { + winX = Math.floor((primaryDisplay.workAreaSize.width - winWidth) / 2); + } + if (winY + winHeight > primaryDisplay.workAreaSize.height) { + winY = Math.floor((primaryDisplay.workAreaSize.height - winHeight) / 2); + } const win = new electron.BrowserWindow({ - x: 200, - y: 200, + x: winX, + y: winY, titleBarStyle: "hiddenInset", - width: waveWindow.winsize.width, - height: waveWindow.winsize.height, + width: winWidth, + height: winHeight, minWidth: 500, minHeight: 300, icon: @@ -221,11 +244,11 @@ function createWindow(client: Client, waveWindow: WaveWindow): Electron.BrowserW win.webContents.on("will-frame-navigate", shFrameNavHandler); win.on( "resize", - debounce(400, (e) => mainResizeHandler(e, win)) + debounce(400, (e) => mainResizeHandler(e, waveWindow.oid, win)) ); win.on( "move", - debounce(400, (e) => mainResizeHandler(e, win)) + debounce(400, (e) => mainResizeHandler(e, waveWindow.oid, win)) ); win.webContents.on("zoom-changed", (e) => { win.webContents.send("zoom-changed"); @@ -257,10 +280,10 @@ electron.ipcMain.on("isDevServer", () => { electronApp.quit(); return; } - const waveHomeDir = getWaveHomeDir(); - if (!fs.existsSync(waveHomeDir)) { - fs.mkdirSync(waveHomeDir); - } + const waveHomeDir = getWaveHomeDir(); + if (!fs.existsSync(waveHomeDir)) { + fs.mkdirSync(waveHomeDir); + } try { await runWaveSrv(); } catch (e) { @@ -270,7 +293,7 @@ electron.ipcMain.on("isDevServer", () => { console.log("wavesrv ready signal received", ready, Date.now() - startTs, "ms"); console.log("get client data"); - let clientData = await services.ClientService.GetClientData().catch(e => console.log(e)) as Client; + let clientData = (await services.ClientService.GetClientData().catch((e) => console.log(e))) as Client; console.log("client data ready"); let windowData: WaveWindow = (await services.ObjectService.GetObject( "window:" + clientData.mainwindowid diff --git a/frontend/app/store/services.ts b/frontend/app/store/services.ts index 132a306b7..a16d23ca5 100644 --- a/frontend/app/store/services.ts +++ b/frontend/app/store/services.ts @@ -98,3 +98,13 @@ class ObjectServiceType { export const ObjectService = new ObjectServiceType() +// windowservice.WindowService (window) +class WindowServiceType { + // @returns object updates + SetWindowPosAndSize(arg2: string, arg3: Point, arg4: WinSize): Promise { + return WOS.callBackendService("window", "SetWindowPosAndSize", Array.from(arguments)) + } +} + +export const WindowService = new WindowServiceType() + diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 27bb7c88b..b0011693f 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -30,7 +30,7 @@ declare global { type BlockCommand = { command: string; - } & ( BlockAppendIJsonCommand | BlockInputCommand | BlockSetViewCommand | BlockSetMetaCommand | BlockMessageCommand | BlockAppendFileCommand ); + } & ( BlockSetMetaCommand | BlockGetMetaCommand | BlockMessageCommand | BlockAppendFileCommand | BlockAppendIJsonCommand | BlockInputCommand | BlockSetViewCommand ); // wstore.BlockDef type BlockDef = { @@ -40,6 +40,12 @@ declare global { meta?: MetaType; }; + // wshutil.BlockGetMetaCommand + type BlockGetMetaCommand = { + command: "getmeta"; + oid: string; + }; + // wshutil.BlockInputCommand type BlockInputCommand = { command: "controller:input"; @@ -57,6 +63,7 @@ declare global { // wshutil.BlockSetMetaCommand type BlockSetMetaCommand = { command: "setmeta"; + oid?: string; meta: MetaType; }; diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 60d84a885..9df62e60c 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -39,6 +39,12 @@ const DefaultTimeout = 2 * time.Second var globalLock = &sync.Mutex{} var blockControllerMap = make(map[string]*BlockController) +type BlockInputUnion struct { + InputData []byte `json:"inputdata,omitempty"` + SigName string `json:"signame,omitempty"` + TermSize *shellexec.TermSize `json:"termsize,omitempty"` +} + type BlockController struct { Lock *sync.Mutex BlockId string @@ -47,7 +53,7 @@ type BlockController struct { Status string CreatedHtmlFile bool ShellProc *shellexec.ShellProc - ShellInputCh chan *wshutil.BlockInputCommand + ShellInputCh chan *BlockInputUnion } func (bc *BlockController) WithLock(f func()) { @@ -159,6 +165,114 @@ func (bc *BlockController) resetTerminalState() { } } +func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) { + if strings.Contains(simpleId, ":") { + rtn, err := waveobj.ParseORef(simpleId) + if err != nil { + return nil, fmt.Errorf("error parsing simple id: %w", err) + } + return &rtn, nil + } + return wstore.DBResolveEasyOID(ctx, simpleId) +} + +func staticHandleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) { + oref, err := waveobj.ParseORef(cmd.ORef) + if err != nil { + return nil, fmt.Errorf("error parsing oref: %w", err) + } + obj, err := wstore.DBGetORef(ctx, oref) + if err != nil { + return nil, fmt.Errorf("error getting object: %w", err) + } + if obj == nil { + return nil, fmt.Errorf("object not found: %s", oref) + } + return waveobj.GetMeta(obj), nil +} + +func staticHandleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, curBlockId string) (map[string]any, error) { + var oref *waveobj.ORef + if cmd.ORef != "" { + orefVal, err := waveobj.ParseORef(cmd.ORef) + if err != nil { + return nil, fmt.Errorf("error parsing oref: %w", err) + } + oref = &orefVal + } else { + orefVal := waveobj.MakeORef(wstore.OType_Block, curBlockId) + oref = &orefVal + } + log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta) + obj, err := wstore.DBGetORef(ctx, *oref) + if err != nil { + return nil, fmt.Errorf("error getting object: %w", err) + } + if obj == nil { + return nil, nil + } + meta := waveobj.GetMeta(obj) + if meta == nil { + meta = make(map[string]any) + } + for k, v := range cmd.Meta { + if v == nil { + delete(meta, k) + continue + } + meta[k] = v + } + waveobj.SetMeta(obj, meta) + err = wstore.DBUpdate(ctx, obj) + if err != nil { + return nil, fmt.Errorf("error updating block: %w", err) + } + // send a waveobj:update event + updatedBlock, err := wstore.DBGetORef(ctx, *oref) + if err != nil { + return nil, fmt.Errorf("error getting object (2): %w", err) + } + eventbus.SendEvent(eventbus.WSEventType{ + EventType: "waveobj:update", + ORef: oref.String(), + Data: wstore.WaveObjUpdate{ + UpdateType: wstore.UpdateType_Update, + OType: updatedBlock.GetOType(), + OID: waveobj.GetOID(updatedBlock), + Obj: updatedBlock, + }, + }) + return nil, nil +} + +func staticHandleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) { + rtn := make(map[string]any) + for _, simpleId := range cmd.Ids { + oref, err := resolveSimpleId(ctx, simpleId) + if err != nil || oref == nil { + continue + } + rtn[simpleId] = oref.String() + } + return rtn, nil +} + +func (bc *BlockController) waveOSCMessageHandler(ctx context.Context, cmd wshutil.BlockCommand, respFn wshutil.ResponseFnType) (wshutil.ResponseDataType, error) { + if strings.HasPrefix(cmd.GetCommand(), "controller:") { + bc.InputCh <- cmd + return nil, nil + } + switch cmd.GetCommand() { + case wshutil.BlockCommand_GetMeta: + return staticHandleGetMeta(ctx, cmd.(*wshutil.BlockGetMetaCommand)) + case wshutil.Command_ResolveIds: + return staticHandleResolveIds(ctx, cmd.(*wshutil.ResolveIdsCommand)) + } + + ProcessStaticCommand(bc.BlockId, cmd) + return nil, nil +} + func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { // create a circular blockfile for the output ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) @@ -183,20 +297,13 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { bc.ShellProc.Close() return err } - shellInputCh := make(chan *wshutil.BlockInputCommand) + shellInputCh := make(chan *BlockInputUnion, 32) bc.ShellInputCh = shellInputCh - commandCh := make(chan wshutil.BlockCommand, 32) - ptyBuffer := wshutil.MakePtyBuffer(bc.ShellProc.Pty, commandCh) - go func() { - for cmd := range commandCh { - if strings.HasPrefix(cmd.GetCommand(), "controller:") { - bc.InputCh <- cmd - } else { - ProcessStaticCommand(bc.BlockId, cmd) - } - } - }() + messageCh := make(chan wshutil.RpcMessage, 32) + ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, bc.ShellProc.Pty, messageCh) + _, outputCh := wshutil.MakeWshRpc(wshutil.WaveServerOSC, messageCh, bc.waveOSCMessageHandler) go func() { + // handles regular output from the pty (goes to the blockfile and xterm) defer func() { // needs synchronization bc.ShellProc.Close() @@ -223,15 +330,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { } }() go func() { + // handles input from the shellInputCh, sent to pty for ic := range shellInputCh { - if ic.InputData64 != "" { - inputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(ic.InputData64))) - nw, err := base64.StdEncoding.Decode(inputBuf, []byte(ic.InputData64)) - if err != nil { - log.Printf("error decoding input data: %v\n", err) - continue - } - bc.ShellProc.Pty.Write(inputBuf[:nw]) + if len(ic.InputData) > 0 { + bc.ShellProc.Pty.Write(ic.InputData) } if ic.TermSize != nil { log.Printf("SETTERMSIZE: %dx%d\n", ic.TermSize.Rows, ic.TermSize.Cols) @@ -240,6 +342,13 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { log.Printf("error setting term size: %v\n", err) } } + // TODO signals + } + }() + go func() { + // handles outputCh -> shellInputCh + for out := range outputCh { + shellInputCh <- &BlockInputUnion{InputData: out} } }() return nil @@ -277,10 +386,24 @@ func (bc *BlockController) Run(bdata *wstore.Block) { for genCmd := range bc.InputCh { switch cmd := genCmd.(type) { case *wshutil.BlockInputCommand: - log.Printf("INPUT: %s | %q\n", bc.BlockId, cmd.InputData64) - if bc.ShellInputCh != nil { - bc.ShellInputCh <- cmd + if bc.ShellInputCh == nil { + continue } + inputUnion := &BlockInputUnion{ + SigName: cmd.SigName, + TermSize: cmd.TermSize, + } + if len(cmd.InputData64) > 0 { + inputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(cmd.InputData64))) + nw, err := base64.StdEncoding.Decode(inputBuf, []byte(cmd.InputData64)) + if err != nil { + log.Printf("error decoding input data: %v\n", err) + continue + } + inputUnion.InputData = inputBuf[:nw] + } + log.Printf("INPUT: %s | %q\n", bc.BlockId, string(inputUnion.InputData)) + bc.ShellInputCh <- inputUnion default: log.Printf("unknown command type %T\n", cmd) } @@ -363,44 +486,12 @@ func ProcessStaticCommand(blockId string, cmdGen wshutil.BlockCommand) error { }) return nil case *wshutil.BlockSetMetaCommand: - log.Printf("SETMETA: %s | %v\n", blockId, cmd.Meta) - block, err := wstore.DBGet[*wstore.Block](ctx, blockId) + _, err := staticHandleSetMeta(ctx, cmd, blockId) if err != nil { - return fmt.Errorf("error getting block: %w", err) + return err } - if block == nil { - return nil - } - if block.Meta == nil { - block.Meta = make(map[string]any) - } - for k, v := range cmd.Meta { - if v == nil { - delete(block.Meta, k) - continue - } - block.Meta[k] = v - } - err = wstore.DBUpdate(ctx, block) - if err != nil { - return fmt.Errorf("error updating block: %w", err) - } - // send a waveobj:update event - updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, blockId) - if err != nil { - return fmt.Errorf("error getting block: %w", err) - } - eventbus.SendEvent(eventbus.WSEventType{ - EventType: "waveobj:update", - ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(), - Data: wstore.WaveObjUpdate{ - UpdateType: wstore.UpdateType_Update, - OType: wstore.OType_Block, - OID: blockId, - Obj: updatedBlock, - }, - }) return nil + case *wshutil.BlockMessageCommand: log.Printf("MESSAGE: %s | %q\n", blockId, cmd.Message) return nil diff --git a/pkg/service/objectservice/objectservice.go b/pkg/service/objectservice/objectservice.go index a65db3c70..7e2d8599c 100644 --- a/pkg/service/objectservice/objectservice.go +++ b/pkg/service/objectservice/objectservice.go @@ -5,7 +5,6 @@ package objectservice import ( "context" - "encoding/json" "fmt" "log" "strings" @@ -72,26 +71,6 @@ func (svc *ObjectService) GetObjects(orefStrArr []string) ([]waveobj.WaveObj, er return wstore.DBSelectORefs(ctx, orefArr) } -func updatesRtn(ctx context.Context, rtnVal map[string]any) (any, error) { - updates := wstore.ContextGetUpdates(ctx) - if len(updates) == 0 { - return nil, nil - } - updateArr := make([]wstore.WaveObjUpdate, 0, len(updates)) - for _, update := range updates { - updateArr = append(updateArr, update) - } - jval, err := json.Marshal(updateArr) - if err != nil { - return nil, fmt.Errorf("error converting updates to JSON: %w", err) - } - if rtnVal == nil { - rtnVal = make(map[string]any) - } - rtnVal["updates"] = json.RawMessage(jval) - return rtnVal, nil -} - func (svc *ObjectService) AddTabToWorkspace_Meta() tsgenmeta.MethodMeta { return tsgenmeta.MethodMeta{ ArgNames: []string{"uiContext", "tabName", "activateTab"}, diff --git a/pkg/service/service.go b/pkg/service/service.go index fc72b15ce..fcf3cb17e 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -13,7 +13,9 @@ import ( "github.com/wavetermdev/thenextwave/pkg/service/clientservice" "github.com/wavetermdev/thenextwave/pkg/service/fileservice" "github.com/wavetermdev/thenextwave/pkg/service/objectservice" + "github.com/wavetermdev/thenextwave/pkg/service/windowservice" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" + "github.com/wavetermdev/thenextwave/pkg/util/utilfn" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/web/webcmd" "github.com/wavetermdev/thenextwave/pkg/wshutil" @@ -25,6 +27,7 @@ var ServiceMap = map[string]any{ "object": &objectservice.ObjectService{}, "file": &fileservice.FileService{}, "client": &clientservice.ClientService{}, + "window": &windowservice.WindowService{}, } var contextRType = reflect.TypeOf((*context.Context)(nil)).Elem() @@ -85,7 +88,7 @@ func convertNumber(argType reflect.Type, jsonArg float64) (any, error) { func convertComplex(argType reflect.Type, jsonArg any) (any, error) { nativeArgVal := reflect.New(argType) - err := waveobj.DoMapStucture(nativeArgVal.Interface(), jsonArg) + err := utilfn.DoMapStucture(nativeArgVal.Interface(), jsonArg) if err != nil { return nil, err } diff --git a/pkg/service/windowservice/windowservice.go b/pkg/service/windowservice/windowservice.go new file mode 100644 index 000000000..f9b24a6fd --- /dev/null +++ b/pkg/service/windowservice/windowservice.go @@ -0,0 +1,34 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package windowservice + +import ( + "context" + + "github.com/wavetermdev/thenextwave/pkg/wstore" +) + +type WindowService struct{} + +func (ws *WindowService) SetWindowPosAndSize(ctx context.Context, windowId string, pos *wstore.Point, size *wstore.WinSize) (wstore.UpdatesRtnType, error) { + if pos == nil && size == nil { + return nil, nil + } + ctx = wstore.ContextWithUpdates(ctx) + win, err := wstore.DBMustGet[*wstore.Window](ctx, windowId) + if err != nil { + return nil, err + } + if pos != nil { + win.Pos = *pos + } + if size != nil { + win.WinSize = *size + } + err = wstore.DBUpdate(ctx, win) + if err != nil { + return nil, err + } + return wstore.ContextGetUpdatesRtn(ctx), nil +} diff --git a/pkg/util/utilfn/utilfn.go b/pkg/util/utilfn/utilfn.go index 850483d9a..90c727ac2 100644 --- a/pkg/util/utilfn/utilfn.go +++ b/pkg/util/utilfn/utilfn.go @@ -23,6 +23,8 @@ import ( "strings" "syscall" "unicode/utf8" + + "github.com/mitchellh/mapstructure" ) var HexDigits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'} @@ -725,3 +727,16 @@ func IndentString(indent string, str string) string { } return rtn.String() } + +// does a mapstructure using "json" tags +func DoMapStucture(out any, input any) error { + dconfig := &mapstructure.DecoderConfig{ + Result: out, + TagName: "json", + } + decoder, err := mapstructure.NewDecoder(dconfig) + if err != nil { + return err + } + return decoder.Decode(input) +} diff --git a/pkg/waveobj/waveobj.go b/pkg/waveobj/waveobj.go index 0d6bc3c13..3cd7d4728 100644 --- a/pkg/waveobj/waveobj.go +++ b/pkg/waveobj/waveobj.go @@ -184,18 +184,6 @@ func SetMeta(waveObj WaveObj, meta map[string]any) { reflect.ValueOf(waveObj).Elem().FieldByIndex(desc.MetaField.Index).Set(reflect.ValueOf(meta)) } -func DoMapStucture(out any, input any) error { - dconfig := &mapstructure.DecoderConfig{ - Result: out, - TagName: "json", - } - decoder, err := mapstructure.NewDecoder(dconfig) - if err != nil { - return err - } - return decoder.Decode(input) -} - func ToJsonMap(w WaveObj) (map[string]any, error) { if w == nil { return nil, nil diff --git a/pkg/web/webcmd/webcmd.go b/pkg/web/webcmd/webcmd.go index 26fc23ce7..155072a52 100644 --- a/pkg/web/webcmd/webcmd.go +++ b/pkg/web/webcmd/webcmd.go @@ -9,7 +9,7 @@ import ( "github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" - "github.com/wavetermdev/thenextwave/pkg/waveobj" + "github.com/wavetermdev/thenextwave/pkg/util/utilfn" ) const ( @@ -48,7 +48,7 @@ func ParseWSCommandMap(cmdMap map[string]any) (WSCommandType, error) { switch cmdType { case WSCommand_SetBlockTermSize: var cmd SetBlockTermSizeWSCommand - err := waveobj.DoMapStucture(&cmd, cmdMap) + err := utilfn.DoMapStucture(&cmd, cmdMap) if err != nil { return nil, fmt.Errorf("error decoding SetBlockTermSizeWSCommand: %w", err) } diff --git a/pkg/wshutil/unmarshalhelper.go b/pkg/wshutil/unmarshalhelper.go new file mode 100644 index 000000000..65048ee4e --- /dev/null +++ b/pkg/wshutil/unmarshalhelper.go @@ -0,0 +1,102 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshutil + +import ( + "encoding/json" + "fmt" +) + +type RpcMessageUnmarshalHelper struct { + Command string + ReqId string + ResId string + M map[string]any + + Req *RpcRequest + Res *RpcResponse +} + +func (helper *RpcMessageUnmarshalHelper) UnmarshalJSON(data []byte) error { + var rmap map[string]any + if err := json.Unmarshal(data, &rmap); err != nil { + return err + } + if command, ok := rmap["command"].(string); ok { + helper.Command = command + } + if reqid, ok := rmap["reqid"].(string); ok { + helper.ReqId = reqid + } + if resid, ok := rmap["resid"].(string); ok { + helper.ResId = resid + } + if helper.ReqId != "" && helper.ResId != "" { + return fmt.Errorf("both reqid and resid cannot be set") + } + if helper.Command == "" && helper.ResId == "" { + return fmt.Errorf("either command or resid must be set") + } + helper.M = rmap + if helper.Command != "" { + // ok, this is a request, so lets parse it + req, err := helper.parseRequest() + if err != nil { + return fmt.Errorf("error parsing request: %w", err) + } + helper.Req = req + } else { + // this is a response, parse it + res, err := helper.parseResponse() + if err != nil { + return fmt.Errorf("error parsing response: %w", err) + } + helper.Res = res + } + return nil +} + +func (helper *RpcMessageUnmarshalHelper) parseRequest() (*RpcRequest, error) { + req := &RpcRequest{ + ReqId: helper.ReqId, + } + if helper.M["timeoutms"] != nil { + timeoutMs, ok := helper.M["timeoutms"].(float64) + if !ok { + return nil, fmt.Errorf("timeoutms field is not a number") + } + req.TimeoutMs = int(timeoutMs) + } + cmd, err := ParseCmdMap(helper.M) + if err != nil { + return nil, fmt.Errorf("error parsing command: %w", err) + } + req.Command = cmd + return req, nil +} + +func (helper *RpcMessageUnmarshalHelper) parseResponse() (*RpcResponse, error) { + rtn := &RpcResponse{ + ResId: helper.ResId, + Data: helper.M, + } + if helper.M["error"] != nil { + errStr, ok := helper.M["error"].(string) + if !ok { + return nil, fmt.Errorf("error field is not a string") + } + rtn.Error = errStr + } + if helper.M["cont"] != nil { + cont, ok := helper.M["cont"].(bool) + if !ok { + return nil, fmt.Errorf("cont field is not a bool") + } + rtn.Cont = cont + } + delete(rtn.Data, "resid") + delete(rtn.Data, "error") + delete(rtn.Data, "cont") + return rtn, nil +} diff --git a/pkg/wshutil/wshcmdreader.go b/pkg/wshutil/wshcmdreader.go index 655e16a05..f59f8b482 100644 --- a/pkg/wshutil/wshcmdreader.go +++ b/pkg/wshutil/wshcmdreader.go @@ -1,3 +1,6 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + package wshutil import ( @@ -5,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "log" "sync" ) @@ -22,19 +26,25 @@ type PtyBuffer struct { DataBuf *bytes.Buffer EscMode string EscSeqBuf []byte + OSCPrefix string InputReader io.Reader - CommandCh chan BlockCommand + MessageCh chan RpcMessage AtEOF bool Err error } -func MakePtyBuffer(input io.Reader, commandCh chan BlockCommand) *PtyBuffer { +// closes messageCh when input is closed (or error) +func MakePtyBuffer(oscPrefix string, input io.Reader, messageCh chan RpcMessage) *PtyBuffer { + if len(oscPrefix) != WaveOSCPrefixLen { + panic(fmt.Sprintf("invalid OSC prefix length: %d", len(oscPrefix))) + } b := &PtyBuffer{ CVar: sync.NewCond(&sync.Mutex{}), DataBuf: &bytes.Buffer{}, + OSCPrefix: oscPrefix, EscMode: Mode_Normal, InputReader: input, - CommandCh: commandCh, + MessageCh: messageCh, } go b.run() return b @@ -57,22 +67,21 @@ func (b *PtyBuffer) setEOF() { } func (b *PtyBuffer) processWaveEscSeq(escSeq []byte) { - jmsg := make(map[string]any) - err := json.Unmarshal(escSeq, &jmsg) + var helper RpcMessageUnmarshalHelper + err := json.Unmarshal(escSeq, &helper) if err != nil { - b.setErr(fmt.Errorf("error unmarshalling Wave OSC sequence data: %w", err)) + log.Printf("error unmarshalling Wave OSC sequence data: %v\n", err) return } - cmd, err := ParseCmdMap(jmsg) - if err != nil { - b.setErr(fmt.Errorf("error parsing Wave OSC command: %w", err)) - return + if helper.Req != nil { + b.MessageCh <- helper.Req + } else { + b.MessageCh <- helper.Res } - b.CommandCh <- cmd } func (b *PtyBuffer) run() { - defer close(b.CommandCh) + defer close(b.MessageCh) buf := make([]byte, 4096) for { n, err := b.InputReader.Read(buf) @@ -101,7 +110,7 @@ func (b *PtyBuffer) processData(data []byte) { } else if ch == BEL || ch == ST { // terminates the escpae sequence (is a valid Wave OSC command) b.EscMode = Mode_Normal - waveEscSeq := b.EscSeqBuf[len(WaveOSCPrefix):] + waveEscSeq := b.EscSeqBuf[WaveOSCPrefixLen:] b.EscSeqBuf = nil b.processWaveEscSeq(waveEscSeq) } else { @@ -115,21 +124,22 @@ func (b *PtyBuffer) processData(data []byte) { b.EscMode = Mode_Normal outputBuf = append(outputBuf, b.EscSeqBuf...) outputBuf = append(outputBuf, ch) - } else { - if ch == WaveOSCPrefixBytes[len(b.EscSeqBuf)] { - // we're still building what could be a Wave OSC sequence - b.EscSeqBuf = append(b.EscSeqBuf, ch) - } else { - // this is not a Wave OSC sequence, just an escape sequence - b.EscMode = Mode_Normal - outputBuf = append(outputBuf, b.EscSeqBuf...) - outputBuf = append(outputBuf, ch) - continue - } - // check to see if we have a full Wave OSC prefix - if len(b.EscSeqBuf) == len(WaveOSCPrefixBytes) { - b.EscMode = Mode_WaveEsc - } + b.EscSeqBuf = nil + continue + } + if ch != b.OSCPrefix[len(b.EscSeqBuf)] { + // this is not a Wave OSC sequence, just an escape sequence + b.EscMode = Mode_Normal + outputBuf = append(outputBuf, b.EscSeqBuf...) + outputBuf = append(outputBuf, ch) + b.EscSeqBuf = nil + continue + } + // we're still building what could be a Wave OSC sequence + b.EscSeqBuf = append(b.EscSeqBuf, ch) + // check to see if we have a full Wave OSC prefix + if len(b.EscSeqBuf) == len(b.OSCPrefix) { + b.EscMode = Mode_WaveEsc } continue } diff --git a/pkg/wshutil/wshcommands.go b/pkg/wshutil/wshcommands.go index d87929556..dfea9fa0b 100644 --- a/pkg/wshutil/wshcommands.go +++ b/pkg/wshutil/wshcommands.go @@ -23,6 +23,7 @@ const ( BlockCommand_Input = "controller:input" BlockCommand_AppendBlockFile = "blockfile:append" BlockCommand_AppendIJson = "blockfile:appendijson" + Command_ResolveIds = "resolveids" ) var CommandToTypeMap = map[string]reflect.Type{ @@ -33,6 +34,7 @@ var CommandToTypeMap = map[string]reflect.Type{ BlockCommand_Message: reflect.TypeOf(BlockMessageCommand{}), BlockCommand_AppendBlockFile: reflect.TypeOf(BlockAppendFileCommand{}), BlockCommand_AppendIJson: reflect.TypeOf(BlockAppendIJsonCommand{}), + Command_ResolveIds: reflect.TypeOf(ResolveIdsCommand{}), } func CommandTypeUnionMeta() tsgenmeta.TypeUnionMeta { @@ -91,6 +93,15 @@ func (ic *BlockInputCommand) GetCommand() string { return BlockCommand_Input } +type ResolveIdsCommand struct { + Command string `json:"command" tstype:"\"resolveids\""` + Ids []string `json:"ids"` +} + +func (ric *ResolveIdsCommand) GetCommand() string { + return Command_ResolveIds +} + type BlockSetViewCommand struct { Command string `json:"command" tstype:"\"setview\""` View string `json:"view"` @@ -102,8 +113,7 @@ func (svc *BlockSetViewCommand) GetCommand() string { type BlockGetMetaCommand struct { Command string `json:"command" tstype:"\"getmeta\""` - RpcId string `json:"rpcid"` - OID string `json:"oid"` // allows oref, 8-char oid, or full uuid + ORef string `json:"oref"` // oref string } func (gmc *BlockGetMetaCommand) GetCommand() string { @@ -112,7 +122,7 @@ func (gmc *BlockGetMetaCommand) GetCommand() string { type BlockSetMetaCommand struct { Command string `json:"command" tstype:"\"setmeta\""` - OID string `json:"oid"` // allows oref, 8-char oid, or full uuid + ORef string `json:"oref,omitempty"` // allows oref, 8-char oid, or full uuid (empty is current block) Meta map[string]any `json:"meta"` } diff --git a/pkg/wshutil/wshrpc.go b/pkg/wshutil/wshrpc.go new file mode 100644 index 000000000..59139c037 --- /dev/null +++ b/pkg/wshutil/wshrpc.go @@ -0,0 +1,330 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshutil + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/google/uuid" + "github.com/wavetermdev/thenextwave/pkg/util/utilfn" +) + +const DefaultTimeoutMs = 5000 +const RespChSize = 32 +const DefaultOutputChSize = 32 + +type ResponseDataType = map[string]any +type ResponseFnType = func(ResponseDataType) error +type CommandHandlerFnType = func(context.Context, BlockCommand, ResponseFnType) (ResponseDataType, error) + +type RpcMessage interface { + IsRpcRequest() bool +} + +type WshRpc struct { + Lock *sync.Mutex + InputCh chan RpcMessage + OutputCh chan []byte + OSCEsc string // either 23198 or 23199 + RpcMap map[string]*rpcData + HandlerFn CommandHandlerFnType +} + +type RpcRequest struct { + ReqId string + TimeoutMs int + Command BlockCommand +} + +func (r *RpcRequest) IsRpcRequest() bool { + return true +} + +func (r *RpcRequest) MarshalJSON() ([]byte, error) { + if r == nil { + return []byte("null"), nil + } + rtn := make(map[string]any) + utilfn.DoMapStucture(&rtn, r.Command) + rtn["command"] = r.Command.GetCommand() + if r.ReqId != "" { + rtn["reqid"] = r.ReqId + } else { + delete(rtn, "reqid") + } + if r.TimeoutMs != 0 { + rtn["timeoutms"] = float64(r.TimeoutMs) + } else { + delete(rtn, "timeoutms") + } + return json.Marshal(rtn) +} + +type RpcResponse struct { + ResId string `json:"resid"` + Error string `json:"error,omitempty"` + Cont bool `json:"cont,omitempty"` + Data map[string]any `json:"data,omitempty"` +} + +func (r *RpcResponse) IsRpcRequest() bool { + return false +} + +func (r *RpcResponse) MarshalJSON() ([]byte, error) { + rtn := make(map[string]any) + // rest goes first (since other fields will overwrite) + for k, v := range r.Data { + rtn[k] = v + } + rtn["resid"] = r.ResId + if r.Error != "" { + rtn["error"] = r.Error + } else { + delete(rtn, "error") + } + if r.Cont { + rtn["cont"] = true + } else { + delete(rtn, "cont") + } + return json.Marshal(rtn) +} + +type rpcData struct { + ResCh chan *RpcResponse + Ctx context.Context + CancelFn context.CancelFunc +} + +// oscEsc is the OSC escape sequence to use for *sending* messages +// closes outputCh when inputCh is closed/done +func MakeWshRpc(oscEsc string, inputCh chan RpcMessage, commandHandlerFn CommandHandlerFnType) (*WshRpc, chan []byte) { + if len(oscEsc) != 5 { + panic("oscEsc must be 5 characters") + } + outputCh := make(chan []byte, DefaultOutputChSize) + rtn := &WshRpc{ + Lock: &sync.Mutex{}, + InputCh: inputCh, + OutputCh: outputCh, + OSCEsc: oscEsc, + RpcMap: make(map[string]*rpcData), + HandlerFn: commandHandlerFn, + } + go rtn.runServer() + return rtn, outputCh +} + +func (w *WshRpc) handleRequest(req *RpcRequest) { + defer func() { + if r := recover(); r != nil { + errResp := &RpcResponse{ + ResId: req.ReqId, + Error: fmt.Sprintf("panic: %v", r), + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, errResp) + if err != nil { + return + } + w.OutputCh <- barr + } + }() + respFn := func(resp ResponseDataType) error { + if req.ReqId == "" { + // request is not expecting a response + return nil + } + respMsg := &RpcResponse{ + ResId: req.ReqId, + Cont: true, + Data: resp, + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, respMsg) + if err != nil { + return fmt.Errorf("error marshalling response to json: %w", err) + } + w.OutputCh <- barr + return nil + } + timeoutMs := req.TimeoutMs + if timeoutMs <= 0 { + timeoutMs = DefaultTimeoutMs + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond) + defer cancelFn() + respData, err := w.HandlerFn(ctx, req.Command, respFn) + log.Printf("handler for %q returned resp: %v\n", req.Command.GetCommand(), respData) + if req.ReqId == "" { + // no response expected + if err != nil { + log.Printf("error handling request (no response): %v\n", err) + } + return + } + if err != nil { + errResp := &RpcResponse{ + ResId: req.ReqId, + Error: err.Error(), + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, errResp) + if err != nil { + return + } + w.OutputCh <- barr + return + } + respMsg := &RpcResponse{ + ResId: req.ReqId, + Data: respData, + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, respMsg) + if err != nil { + respMsg := &RpcResponse{ + ResId: req.ReqId, + Error: err.Error(), + } + barr, _ = EncodeWaveOSCMessageEx(w.OSCEsc, respMsg) + } + w.OutputCh <- barr +} + +func (w *WshRpc) runServer() { + defer close(w.OutputCh) + for msg := range w.InputCh { + if msg.IsRpcRequest() { + if w.HandlerFn == nil { + continue + } + req := msg.(*RpcRequest) + w.handleRequest(req) + } else { + resp := msg.(*RpcResponse) + respCh := w.getResponseCh(resp.ResId) + if respCh == nil { + continue + } + respCh <- resp + if !resp.Cont { + w.unregisterRpc(resp.ResId, nil) + } + } + } +} + +func (w *WshRpc) getResponseCh(resId string) chan *RpcResponse { + if resId == "" { + return nil + } + w.Lock.Lock() + defer w.Lock.Unlock() + rd := w.RpcMap[resId] + if rd == nil { + return nil + } + return rd.ResCh +} + +func (w *WshRpc) SetHandler(handler CommandHandlerFnType) { + w.Lock.Lock() + defer w.Lock.Unlock() + w.HandlerFn = handler +} + +// no response +func (w *WshRpc) SendCommand(cmd BlockCommand) error { + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, &RpcRequest{Command: cmd}) + if err != nil { + return fmt.Errorf("error marshalling request to json: %w", err) + } + w.OutputCh <- barr + return nil +} + +func (w *WshRpc) registerRpc(reqId string, timeoutMs int) chan *RpcResponse { + w.Lock.Lock() + defer w.Lock.Unlock() + if timeoutMs <= 0 { + timeoutMs = DefaultTimeoutMs + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond) + rpcCh := make(chan *RpcResponse, RespChSize) + w.RpcMap[reqId] = &rpcData{ + ResCh: rpcCh, + Ctx: ctx, + CancelFn: cancelFn, + } + go func() { + <-ctx.Done() + w.unregisterRpc(reqId, fmt.Errorf("EC-TIME: timeout waiting for response")) + }() + return rpcCh +} + +func (w *WshRpc) unregisterRpc(reqId string, err error) { + w.Lock.Lock() + defer w.Lock.Unlock() + rd := w.RpcMap[reqId] + if rd != nil { + if err != nil { + errResp := &RpcResponse{ + ResId: reqId, + Error: err.Error(), + } + rd.ResCh <- errResp + } + close(rd.ResCh) + rd.CancelFn() + } + delete(w.RpcMap, reqId) +} + +// single response +func (w *WshRpc) SendRpcRequest(cmd BlockCommand, timeoutMs int) (map[string]any, error) { + if timeoutMs < 0 { + return nil, fmt.Errorf("timeout must be >= 0") + } + req := &RpcRequest{ + Command: cmd, + ReqId: uuid.New().String(), + TimeoutMs: timeoutMs, + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, req) + if err != nil { + return nil, fmt.Errorf("error marshalling request to ANSI esc: %w", err) + } + rpcCh := w.registerRpc(req.ReqId, timeoutMs) + defer w.unregisterRpc(req.ReqId, nil) + w.OutputCh <- barr + resp := <-rpcCh + if resp.Error != "" { + return nil, errors.New(resp.Error) + } + return resp.Data, nil +} + +// streaming response +func (w *WshRpc) SendRpcRequestEx(cmd BlockCommand, timeoutMs int) (chan *RpcResponse, error) { + if timeoutMs < 0 { + return nil, fmt.Errorf("timeout must be >= 0") + } + req := &RpcRequest{ + Command: cmd, + ReqId: uuid.New().String(), + TimeoutMs: timeoutMs, + } + barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, req) + if err != nil { + return nil, fmt.Errorf("error marshalling request to json: %w", err) + } + rpcCh := w.registerRpc(req.ReqId, timeoutMs) + w.OutputCh <- barr + return rpcCh, nil +} diff --git a/pkg/wshutil/wshutil.go b/pkg/wshutil/wshutil.go index ade3cc1f3..865fbdd84 100644 --- a/pkg/wshutil/wshutil.go +++ b/pkg/wshutil/wshutil.go @@ -11,18 +11,19 @@ import ( "reflect" ) +// these should both be 5 characters const WaveOSC = "23198" +const WaveServerOSC = "23199" +const WaveOSCPrefixLen = 5 + 3 // \x1b] + WaveOSC + ; + \x07 + const WaveOSCPrefix = "\x1b]" + WaveOSC + ";" -const WaveResponseOSC = "23199" -const WaveResponseOSCPrefix = "\x1b]" + WaveResponseOSC + ";" +const WaveServerOSCPrefix = "\x1b]" + WaveServerOSC + ";" const HexChars = "0123456789ABCDEF" const BEL = 0x07 const ST = 0x9c const ESC = 0x1b -var WaveOSCPrefixBytes = []byte(WaveOSCPrefix) - // OSC escape types // OSC 23198 ; (JSON | base64-JSON) ST // JSON = must escape all ASCII control characters ([\x00-\x1F\x7F]) @@ -31,19 +32,37 @@ var WaveOSCPrefixBytes = []byte(WaveOSCPrefix) // for responses (terminal -> program), we'll use OSC 23199 // same json format -func EncodeWaveOSCMessage(cmd BlockCommand) ([]byte, error) { - if cmd.GetCommand() == "" { - return nil, fmt.Errorf("command field not set in struct") +func copyOscPrefix(dst []byte, oscNum string) { + dst[0] = ESC + dst[1] = ']' + copy(dst[2:], oscNum) + dst[len(oscNum)+2] = ';' +} + +func oscPrefixLen(oscNum string) int { + return 3 + len(oscNum) +} + +func makeOscPrefix(oscNum string) []byte { + output := make([]byte, oscPrefixLen(oscNum)) + copyOscPrefix(output, oscNum) + return output +} + +func EncodeWaveReq(cmd BlockCommand) ([]byte, error) { + req := &RpcRequest{Command: cmd} + return EncodeWaveOSCMessage(req) +} + +func EncodeWaveOSCMessage(msg RpcMessage) ([]byte, error) { + return EncodeWaveOSCMessageEx(WaveOSC, msg) +} + +func EncodeWaveOSCMessageEx(oscNum string, msg RpcMessage) ([]byte, error) { + if msg == nil { + return nil, fmt.Errorf("nil message") } - ctype, ok := CommandToTypeMap[cmd.GetCommand()] - if !ok { - return nil, fmt.Errorf("unknown command type %q", cmd.GetCommand()) - } - cmdType := reflect.TypeOf(cmd) - if cmdType != ctype && (cmdType.Kind() == reflect.Pointer && cmdType.Elem() != ctype) { - return nil, fmt.Errorf("command type does not match %q", cmd.GetCommand()) - } - barr, err := json.Marshal(cmd) + barr, err := json.Marshal(msg) if err != nil { return nil, fmt.Errorf("error marshalling message to json: %w", err) } @@ -57,15 +76,15 @@ func EncodeWaveOSCMessage(cmd BlockCommand) ([]byte, error) { if !hasControlChars { // If no control characters, directly construct the output // \x1b] (2) + WaveOSC + ; (1) + message + \x07 (1) - output := make([]byte, len(WaveOSCPrefix)+len(barr)+1) - copy(output, WaveOSCPrefixBytes) - copy(output[len(WaveOSCPrefix):], barr) + output := make([]byte, oscPrefixLen(oscNum)+len(barr)+1) + copyOscPrefix(output, oscNum) + copy(output[oscPrefixLen(oscNum):], barr) output[len(output)-1] = BEL return output, nil } var buf bytes.Buffer - buf.Write(WaveOSCPrefixBytes) + buf.Write(makeOscPrefix(oscNum)) escSeq := [6]byte{'\\', 'u', '0', '0', '0', '0'} for _, b := range barr { if b < 0x20 || b == 0x7f {