new wshrpc mechanism (#112)

lots of changes. new wshrpc implementation. unify websocket, web,
blockcontroller, domain sockets, and terminal inputs to all use the new
rpc system.

lots of moving files around to deal with circular dependencies

use new wshrpc as a client in wsh cmd
This commit is contained in:
Mike Sawka 2024-07-17 15:24:43 -07:00 committed by GitHub
parent b178434c0a
commit 01b5d71709
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
49 changed files with 1813 additions and 1999 deletions

View File

@ -13,10 +13,12 @@ tasks:
generate:
cmds:
- go run cmd/generate/main-generate.go
- go run cmd/generatewshclient/main-generatewshclient.go
sources:
- "cmd/generate/*.go"
- "pkg/service/**/*.go"
- "pkg/wstore/*.go"
- "pkg/wshrpc/**/*.go"
electron:dev:
cmds:

View File

@ -12,6 +12,7 @@ import (
"github.com/wavetermdev/thenextwave/pkg/service"
"github.com/wavetermdev/thenextwave/pkg/tsgen"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshserver"
)
func generateTypesFile(tsTypesMap map[reflect.Type]string) error {
@ -27,6 +28,7 @@ func generateTypesFile(tsTypesMap map[reflect.Type]string) error {
fmt.Fprintf(os.Stderr, "Error generating service types: %v\n", err)
os.Exit(1)
}
err = tsgen.GenerateWshServerTypes(tsTypesMap)
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n")
@ -71,6 +73,31 @@ func generateServicesFile(tsTypesMap map[reflect.Type]string) error {
return nil
}
func generateWshServerFile(tsTypeMap map[reflect.Type]string) error {
fd, err := os.Create("frontend/app/store/wshserver.ts")
if err != nil {
return err
}
defer fd.Close()
fmt.Fprintf(os.Stderr, "generating wshserver file to %s\n", fd.Name())
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n")
fmt.Fprintf(fd, "import * as WOS from \"./wos\";\n\n")
orderedKeys := utilfn.GetOrderedMapKeys(wshserver.WshServerCommandToDeclMap)
fmt.Fprintf(fd, "// WshServerCommandToDeclMap\n")
fmt.Fprintf(fd, "class WshServerType {\n")
for _, methodDecl := range orderedKeys {
methodDecl := wshserver.WshServerCommandToDeclMap[methodDecl]
methodStr := tsgen.GenerateWshServerMethod(methodDecl, tsTypeMap)
fmt.Fprint(fd, methodStr)
fmt.Fprintf(fd, "\n")
}
fmt.Fprintf(fd, "}\n\n")
fmt.Fprintf(fd, "export const WshServer = new WshServerType();\n")
return nil
}
func main() {
err := service.ValidateServiceMap()
if err != nil {
@ -88,4 +115,9 @@ func main() {
fmt.Fprintf(os.Stderr, "Error generating services file: %v\n", err)
os.Exit(1)
}
err = generateWshServerFile(tsTypesMap)
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating wshserver file: %v\n", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,67 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"os"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshserver"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
)
func genMethod(fd *os.File, methodDecl *wshserver.WshServerMethodDecl) {
fmt.Fprintf(fd, "// command %q, wshserver.%s\n", methodDecl.Command, methodDecl.MethodName)
var dataType string
dataVarName := "nil"
if methodDecl.CommandDataType != nil {
dataType = ", data " + methodDecl.CommandDataType.String()
dataVarName = "data"
}
returnType := "error"
respName := "_"
tParamVal := "any"
if methodDecl.DefaultResponseDataType != nil {
returnType = "(" + methodDecl.DefaultResponseDataType.String() + ", error)"
respName = "resp"
tParamVal = methodDecl.DefaultResponseDataType.String()
}
fmt.Fprintf(fd, "func %s(w *wshutil.WshRpc%s, opts *wshrpc.WshRpcCommandOpts) %s {\n", methodDecl.MethodName, dataType, returnType)
if methodDecl.CommandType == wshutil.RpcType_Call {
fmt.Fprintf(fd, " %s, err := sendRpcRequestHelper[%s](w, %q, %s, opts)\n", respName, tParamVal, methodDecl.Command, dataVarName)
if methodDecl.DefaultResponseDataType != nil {
fmt.Fprintf(fd, " return resp, err\n")
} else {
fmt.Fprintf(fd, " return err\n")
}
} else {
panic("unsupported command type " + methodDecl.CommandType)
}
fmt.Fprintf(fd, "}\n\n")
}
func main() {
fd, err := os.Create("pkg/wshrpc/wshclient/wshclient.go")
if err != nil {
panic(err)
}
defer fd.Close()
fmt.Fprintf(os.Stderr, "generating wshclient file to %s\n", fd.Name())
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generatewshclient/main-generatewshclient.go\n\n")
fmt.Fprintf(fd, "package wshclient\n\n")
fmt.Fprintf(fd, "import (\n")
fmt.Fprintf(fd, " \"github.com/wavetermdev/thenextwave/pkg/wshutil\"\n")
fmt.Fprintf(fd, " \"github.com/wavetermdev/thenextwave/pkg/wshrpc\"\n")
fmt.Fprintf(fd, " \"github.com/wavetermdev/thenextwave/pkg/waveobj\"\n")
fmt.Fprintf(fd, ")\n\n")
for _, key := range utilfn.GetOrderedMapKeys(wshserver.WshServerCommandToDeclMap) {
methodDecl := wshserver.WshServerCommandToDeclMap[key]
genMethod(fd, methodDecl)
}
fmt.Fprintf(fd, "\n")
}

View File

@ -7,7 +7,6 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"strconv"
@ -17,11 +16,13 @@ import (
"syscall"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wconfig"
"github.com/wavetermdev/thenextwave/pkg/web"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshserver"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -78,6 +79,8 @@ func configWatcher() {
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.SetPrefix("[wavesrv] ")
blockcontroller.WshServerFactoryFn = wshserver.MakeWshServer
web.WshServerFactoryFn = wshserver.MakeWshServer
err := service.ValidateServiceMap()
if err != nil {
@ -118,6 +121,7 @@ func main() {
return
}
installShutdownSignalHandlers()
go stdinReadWatch()
configWatcher()
go web.RunWebSocketServer()
@ -126,14 +130,10 @@ func main() {
log.Printf("error creating web listener: %v\n", err)
return
}
var unixListener net.Listener
if runtime.GOOS != "windows" {
var err error
unixListener, err = web.MakeUnixListener()
if err != nil {
log.Printf("error creating unix listener: %v\n", err)
return
}
unixListener, err := web.MakeUnixListener()
if err != nil {
log.Printf("error creating unix listener: %v\n", err)
return
}
go func() {
pidStr := os.Getenv(ReadySignalPidVarName)

49
cmd/test/test-main.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package main
type WaveAppStyle struct {
BackgroundColor string `json:"backgroundColor,omitempty"`
Color string `json:"color,omitempty"`
Border string `json:"border,omitempty"`
FontSize string `json:"fontSize,omitempty"`
FontFamily string `json:"fontFamily,omitempty"`
FontWeight string `json:"fontWeight,omitempty"`
FontStyle string `json:"fontStyle,omitempty"`
TextDecoration string `json:"textDecoration,omitempty"`
}
type WaveAppMouseEvent struct {
TargetId string `json:"targetid"`
}
type WaveAppChangeEvent struct {
TargetId string `json:"targetid"`
Value string `json:"value"`
}
type WaveAppElement struct {
WaveId string `json:"waveid"`
Elem string `json:"elem"`
Props map[string]any `json:"props,omitempty"`
Handlers map[string]string `json:"handlers,omitempty"`
Children []*WaveAppElement `json:"children,omitempty"`
}
func (e *WaveAppElement) AddChild(child *WaveAppElement) {
e.Children = append(e.Children, child)
}
func (e *WaveAppElement) Style() *WaveAppStyle {
style, ok := e.Props["style"].(*WaveAppStyle)
if !ok {
style := &WaveAppStyle{}
e.Props["style"] = style
}
return style
}
func main() {
}

View File

@ -10,7 +10,8 @@ import (
"strings"
"github.com/spf13/cobra"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshclient"
)
var getMetaCmd = &cobra.Command{
@ -42,18 +43,11 @@ func getMetaRun(cmd *cobra.Command, args []string) {
fmt.Printf("error resolving oref: %v\r\n", err)
return
}
getMetaWshCmd := &wshutil.BlockGetMetaCommand{
Command: wshutil.BlockCommand_SetMeta,
ORef: fullORef,
}
resp, err := RpcClient.SendRpcRequest(getMetaWshCmd, 2000)
resp, err := wshclient.GetMetaCommand(RpcClient, wshrpc.CommandGetMetaData{ORef: *fullORef}, &wshrpc.WshRpcCommandOpts{Timeout: 2000})
if err != nil {
log.Printf("error getting metadata: %v\r\n", err)
return
}
if resp == nil {
resp = make(map[string]any)
}
if len(args) > 1 {
val, ok := resp[args[1]]
if !ok {

View File

@ -18,6 +18,8 @@ import (
"github.com/google/uuid"
"github.com/spf13/cobra"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshclient"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"golang.org/x/term"
)
@ -45,11 +47,10 @@ func doShutdown(reason string, exitCode int) {
log.Printf("shutting down: %s\r\n", reason)
}
if usingHtmlMode {
cmd := &wshutil.BlockSetMetaCommand{
Command: wshutil.BlockCommand_SetMeta,
Meta: map[string]any{"term:mode": nil},
cmd := &wshrpc.CommandSetMetaData{
Meta: map[string]any{"term:mode": nil},
}
RpcClient.SendCommand(cmd)
RpcClient.SendCommand(wshrpc.Command_SetMeta, cmd)
time.Sleep(10 * time.Millisecond)
}
if origTermState != nil {
@ -61,11 +62,13 @@ func doShutdown(reason string, exitCode int) {
// returns the wrapped stdin and a new rpc client (that wraps the stdin input and stdout output)
func setupRpcClient(handlerFn wshutil.CommandHandlerFnType) {
log.Printf("setup rpc client\r\n")
messageCh := make(chan wshutil.RpcMessage)
messageCh := make(chan []byte, 32)
outputCh := make(chan []byte, 32)
ptyBuf := wshutil.MakePtyBuffer(wshutil.WaveServerOSCPrefix, os.Stdin, messageCh)
rpcClient, outputCh := wshutil.MakeWshRpc(wshutil.WaveOSC, messageCh, handlerFn)
rpcClient := wshutil.MakeWshRpc(messageCh, outputCh, wshutil.RpcContext{}, handlerFn)
go func() {
for barr := range outputCh {
for msg := range outputCh {
barr := wshutil.EncodeWaveOSCBytes(wshutil.WaveOSC, msg)
os.Stdout.Write(barr)
}
}()
@ -89,11 +92,10 @@ func setTermRawMode() {
func setTermHtmlMode() {
installShutdownSignalHandlers()
setTermRawMode()
cmd := &wshutil.BlockSetMetaCommand{
Command: wshutil.BlockCommand_SetMeta,
Meta: map[string]any{"term:mode": "html"},
cmd := &wshrpc.CommandSetMetaData{
Meta: map[string]any{"term:mode": "html"},
}
RpcClient.SendCommand(cmd)
RpcClient.SendCommand(wshrpc.Command_SetMeta, cmd)
usingHtmlMode = true
}
@ -139,22 +141,23 @@ func isFullORef(orefStr string) bool {
return err == nil
}
func resolveSimpleId(id string) (string, error) {
func resolveSimpleId(id string) (*waveobj.ORef, error) {
if isFullORef(id) {
return id, nil
orefObj, err := waveobj.ParseORef(id)
if err != nil {
return nil, fmt.Errorf("error parsing full ORef: %v", err)
}
return &orefObj, nil
}
resolveCmd := &wshutil.ResolveIdsCommand{
Command: wshutil.Command_ResolveIds,
Ids: []string{id},
}
resp, err := RpcClient.SendRpcRequest(resolveCmd, 2000)
rtnData, err := wshclient.ResolveIdsCommand(RpcClient, wshrpc.CommandResolveIdsData{Ids: []string{id}}, &wshrpc.WshRpcCommandOpts{Timeout: 2000})
if err != nil {
return "", err
return nil, fmt.Errorf("error resolving ids: %v", err)
}
if resp[id] == nil {
return "", fmt.Errorf("id not found: %q", id)
oref, ok := rtnData.ResolvedIds[id]
if !ok {
return nil, fmt.Errorf("id not found: %q", id)
}
return resp[id].(string), nil
return &oref, nil
}
// Execute executes the root command.

View File

@ -10,7 +10,7 @@ import (
"strings"
"github.com/spf13/cobra"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
)
var setMetaCmd = &cobra.Command{
@ -80,12 +80,11 @@ func setMetaRun(cmd *cobra.Command, args []string) {
fmt.Printf("error resolving oref: %v\n", err)
return
}
setMetaWshCmd := &wshutil.BlockSetMetaCommand{
Command: wshutil.BlockCommand_SetMeta,
ORef: fullORef,
Meta: meta,
setMetaWshCmd := &wshrpc.CommandSetMetaData{
ORef: *fullORef,
Meta: meta,
}
_, err = RpcClient.SendRpcRequest(setMetaWshCmd, 2000)
_, err = RpcClient.SendRpcRequest(wshrpc.Command_SetMeta, setMetaWshCmd, 2000)
if err != nil {
fmt.Printf("error setting metadata: %v\n", err)
return

View File

@ -10,7 +10,7 @@ import (
"path/filepath"
"github.com/spf13/cobra"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -44,8 +44,7 @@ func viewRun(cmd *cobra.Command, args []string) {
log.Printf("error getting file info: %v\n", err)
}
setTermRawMode()
viewWshCmd := &wshutil.CreateBlockCommand{
Command: wshutil.Command_CreateBlock,
viewWshCmd := &wshrpc.CommandCreateBlockData{
BlockDef: &wstore.BlockDef{
View: "preview",
Meta: map[string]interface{}{
@ -53,7 +52,7 @@ func viewRun(cmd *cobra.Command, args []string) {
},
},
}
_, err = RpcClient.SendRpcRequest(viewWshCmd, 2000)
_, err = RpcClient.SendRpcRequest(wshrpc.Command_CreateBlock, viewWshCmd, 2000)
if err != nil {
log.Printf("error running view command: %v\r\n", err)
return

View File

@ -420,7 +420,6 @@ const BlockFrame = React.memo((props: BlockFrameProps) => {
});
function blockViewToIcon(view: string): string {
console.log("blockViewToIcon", view);
if (view == "term") {
return "terminal";
}

View File

@ -5,6 +5,7 @@ import { LayoutTreeAction, LayoutTreeActionType, LayoutTreeInsertNodeAction, new
import { getLayoutStateAtomForTab } from "@/faraday/lib/layoutAtom";
import { layoutTreeStateReducer } from "@/faraday/lib/layoutState";
import { handleIncomingRpcMessage } from "@/app/store/wshrpc";
import * as layoututil from "@/util/layoututil";
import { produce } from "immer";
import * as jotai from "jotai";
@ -27,8 +28,8 @@ let globalClientId: string = null;
if (typeof window !== "undefined") {
// this if statement allows us to use the code in nodejs as well
const urlParams = new URLSearchParams(window.location.search);
globalWindowId = urlParams.get("windowid") || "74eba2d0-22fc-4221-82ad-d028dd496342";
globalClientId = urlParams.get("clientid") || "f4bc1713-a364-41b3-a5c4-b000ba10d622";
globalWindowId = urlParams.get("windowid");
globalClientId = urlParams.get("clientid");
}
const windowIdAtom = jotai.atom(null) as jotai.PrimitiveAtom<string>;
const clientIdAtom = jotai.atom(null) as jotai.PrimitiveAtom<string>;
@ -223,6 +224,11 @@ function handleWSEventMessage(msg: WSEventType) {
}
return;
}
if (msg.eventtype == "rpc") {
const rpcMsg: RpcMessage = msg.data;
handleIncomingRpcMessage(rpcMsg);
return;
}
if (msg.eventtype == "layoutaction") {
const layoutAction: WSLayoutActionData = msg.data;
if (layoutAction.actiontype == LayoutTreeActionType.InsertNode) {

View File

@ -13,14 +13,9 @@ class BlockServiceType {
SaveTerminalState(arg2: string, arg3: string, arg4: string, arg5: number): Promise<void> {
return WOS.callBackendService("block", "SaveTerminalState", Array.from(arguments))
}
// send command to block
SendCommand(cmd: string, arg3: BlockCommand): Promise<void> {
return WOS.callBackendService("block", "SendCommand", Array.from(arguments))
}
}
export const BlockService = new BlockServiceType()
export const BlockService = new BlockServiceType();
// clientservice.ClientService (client)
class ClientServiceType {
@ -44,7 +39,7 @@ class ClientServiceType {
}
}
export const ClientService = new ClientServiceType()
export const ClientService = new ClientServiceType();
// fileservice.FileService (file)
class FileServiceType {
@ -71,7 +66,7 @@ class FileServiceType {
}
}
export const FileService = new FileServiceType()
export const FileService = new FileServiceType();
// objectservice.ObjectService (object)
class ObjectServiceType {
@ -126,7 +121,7 @@ class ObjectServiceType {
}
}
export const ObjectService = new ObjectServiceType()
export const ObjectService = new ObjectServiceType();
// windowservice.WindowService (window)
class WindowServiceType {
@ -150,5 +145,5 @@ class WindowServiceType {
}
}
export const WindowService = new WindowServiceType()
export const WindowService = new WindowServiceType();

View File

@ -3,8 +3,10 @@
// WaveObjectStore
import { sendRpcCommand } from "@/app/store/wshrpc";
import * as jotai from "jotai";
import * as React from "react";
import { v4 as uuidv4 } from "uuid";
import { atoms, getBackendHostPort, globalStore } from "./global";
import * as services from "./services";
@ -103,6 +105,50 @@ function callBackendService(service: string, method: string, args: any[], noUICo
return prtn;
}
function callWshServerRpc(
command: string,
data: any,
meta: WshServerCommandMeta,
opts: WshRpcCommandOpts
): Promise<any> {
let msg: RpcMessage = {
command: command,
data: data,
};
if (!opts?.noresponse) {
msg.reqid = uuidv4();
}
if (opts?.timeout) {
msg.timeout = opts.timeout;
}
if (meta.commandtype != "call") {
throw new Error("unimplemented wshserver commandtype " + meta.commandtype);
}
const rpcGen = sendRpcCommand(msg);
if (rpcGen == null) {
return null;
}
let resolveFn: (value: any) => void;
let rejectFn: (reason?: any) => void;
const prtn = new Promise((resolve, reject) => {
resolveFn = resolve;
rejectFn = reject;
});
const respMsg = rpcGen.next(true); // pass true to force termination of rpc after 1 response (not streaing)
respMsg.then((msg: IteratorResult<RpcMessage, void>) => {
if (msg.value == null) {
resolveFn(null);
}
let respMsg: RpcMessage = msg.value as RpcMessage;
if (respMsg.error != null) {
rejectFn(new Error(respMsg.error));
return;
}
resolveFn(respMsg.data);
});
return prtn;
}
const waveObjectValueCache = new Map<string, WaveObjectValue<any>>();
function clearWaveObjectCache() {
@ -320,6 +366,7 @@ function setObjectValue<T extends WaveObj>(value: T, setFn?: jotai.Setter, pushT
export {
callBackendService,
callWshServerRpc,
cleanWaveObjectCache,
clearWaveObjectCache,
getObjectValue,

View File

@ -3,19 +3,9 @@
import * as jotai from "jotai";
import { sprintf } from "sprintf-js";
import { v4 as uuidv4 } from "uuid";
const MaxWebSocketSendSize = 1024 * 1024; // 1MB
type RpcEntry = {
reqId: string;
startTs: number;
method: string;
resolve: (any) => void;
reject: (any) => void;
promise: Promise<any>;
};
type JotaiStore = {
get: <Value>(atom: jotai.Atom<Value>) => Value;
set: <Value>(atom: jotai.WritableAtom<Value, [Value], void>, value: Value) => void;
@ -35,7 +25,6 @@ class WSControl {
authKey: string;
baseHostPort: string;
lastReconnectTime: number = 0;
rpcMap: Map<string, RpcEntry> = new Map(); // reqId -> RpcEntry
jotaiStore: JotaiStore;
constructor(
@ -169,10 +158,6 @@ class WSControl {
this.reconnectTimes = 0;
return;
}
if (eventData.type == "rpcresp") {
this.handleRpcResp(eventData);
return;
}
if (this.messageCallback) {
try {
this.messageCallback(eventData);
@ -189,60 +174,20 @@ class WSControl {
this.wsConn.send(JSON.stringify({ type: "ping", stime: Date.now() }));
}
handleRpcResp(data: any) {
let reqId = data.reqid;
let rpcEntry = this.rpcMap.get(reqId);
if (rpcEntry == null) {
console.log("rpcresp for unknown reqid", reqId);
return;
}
this.rpcMap.delete(reqId);
console.log("rpcresp", rpcEntry.method, Math.round(performance.now() - rpcEntry.startTs) + "ms");
if (data.error != null) {
rpcEntry.reject(data.error);
} else {
rpcEntry.resolve(data.data);
}
}
doRpc(method: string, params: any[]): Promise<any> {
if (!this.isOpen()) {
return Promise.reject("not connected");
}
let reqId = uuidv4();
let req = { type: "rpc", method: method, params: params, reqid: reqId };
let rpcEntry: RpcEntry = {
method: method,
startTs: performance.now(),
reqId: reqId,
resolve: null,
reject: null,
promise: null,
};
let rpcPromise = new Promise((resolve, reject) => {
rpcEntry.resolve = resolve;
rpcEntry.reject = reject;
});
rpcEntry.promise = rpcPromise;
this.rpcMap.set(reqId, rpcEntry);
this.wsConn.send(JSON.stringify(req));
return rpcPromise;
}
sendMessage(data: any) {
sendMessage(data: WSCommandType) {
if (!this.isOpen()) {
return;
}
let msg = JSON.stringify(data);
const byteSize = new Blob([msg]).size;
if (byteSize > MaxWebSocketSendSize) {
console.log("ws message too large", byteSize, data.type, msg.substring(0, 100));
console.log("ws message too large", byteSize, data.wscommand, msg.substring(0, 100));
return;
}
this.wsConn.send(msg);
}
pushMessage(data: any) {
pushMessage(data: WSCommandType) {
if (!this.isOpen()) {
this.msgQueue.push(data);
return;

View File

@ -0,0 +1,88 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { globalWS } from "./global";
type RpcEntry = {
reqId: string;
startTs: number;
command: string;
msgFn: (msg: RpcMessage) => void;
};
let openRpcs = new Map<string, RpcEntry>();
async function* rpcResponseGenerator(
command: string,
reqid: string,
timeout: number
): AsyncGenerator<RpcMessage, void, boolean> {
const msgQueue: RpcMessage[] = [];
let signalFn: () => void;
let signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
let timeoutId: NodeJS.Timeout = null;
if (timeout > 0) {
timeoutId = setTimeout(() => {
msgQueue.push({ resid: reqid, error: "EC-TIME: timeout waiting for response" });
signalFn();
}, timeout);
}
const msgFn = (msg: RpcMessage) => {
msgQueue.push(msg);
signalFn();
// reset signal promise
signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
};
openRpcs.set(reqid, {
reqId: reqid,
startTs: Date.now(),
command: command,
msgFn: msgFn,
});
try {
while (true) {
while (msgQueue.length > 0) {
const msg = msgQueue.shift()!;
const shouldTerminate = yield msg;
if (shouldTerminate || !msg.cont) {
return;
}
}
await signalPromise;
}
} finally {
openRpcs.delete(reqid);
if (timeoutId != null) {
clearTimeout(timeoutId);
}
}
}
function sendRpcCommand(msg: RpcMessage): AsyncGenerator<RpcMessage, void, boolean> {
let wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
globalWS.pushMessage(wsMsg);
if (msg.reqid == null) {
return null;
}
return rpcResponseGenerator(msg.command, msg.reqid, msg.timeout);
}
function handleIncomingRpcMessage(msg: RpcMessage) {
const isRequest = msg.command != null || msg.reqid != null;
if (isRequest) {
console.log("rpc request not supported", msg);
return;
}
if (msg.resid == null) {
console.log("rpc response missing resid", msg);
return;
}
const entry = openRpcs.get(msg.resid);
if (entry == null) {
console.log("rpc response generator not found", msg);
return;
}
entry.msgFn(msg);
}
export { handleIncomingRpcMessage, sendRpcCommand };

View File

@ -0,0 +1,72 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generate/main-generate.go
import * as WOS from "./wos";
// WshServerCommandToDeclMap
class WshServerType {
// command "controller:input" [call]
BlockInputCommand(data: CommandBlockInputData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("controller:input", data, meta, opts);
}
// command "controller:restart" [call]
BlockRestartCommand(data: CommandBlockRestartData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("controller:restart", data, meta, opts);
}
// command "createblock" [call]
CreateBlockCommand(data: CommandCreateBlockData, opts?: WshRpcCommandOpts): Promise<ORef> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("createblock", data, meta, opts);
}
// command "file:append" [call]
AppendFileCommand(data: CommandAppendFileData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("file:append", data, meta, opts);
}
// command "file:appendijson" [call]
AppendIJsonCommand(data: CommandAppendIJsonData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("file:appendijson", data, meta, opts);
}
// command "getmeta" [call]
GetMetaCommand(data: CommandGetMetaData, opts?: WshRpcCommandOpts): Promise<MetaType> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("getmeta", data, meta, opts);
}
// command "message" [call]
MessageCommand(data: CommandMessageData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("message", data, meta, opts);
}
// command "resolveids" [call]
ResolveIdsCommand(data: CommandResolveIdsData, opts?: WshRpcCommandOpts): Promise<CommandResolveIdsRtnData> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("resolveids", data, meta, opts);
}
// command "setmeta" [call]
SetMetaCommand(data: CommandSetMetaData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("setmeta", data, meta, opts);
}
// command "setview" [call]
BlockSetViewCommand(data: CommandBlockSetViewData, opts?: WshRpcCommandOpts): Promise<void> {
const meta: WshServerCommandMeta = {commandtype: "call"};
return WOS.callWshServerRpc("setview", data, meta, opts);
}
}
export const WshServer = new WshServerType();

View File

@ -1,20 +1,10 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import {
WOS,
atoms,
getEventORefSubject,
globalStore,
sendWSCommand,
useBlockAtom,
useSettingsAtom,
} from "@/store/global";
import { WOS, atoms, getEventORefSubject, globalStore, useBlockAtom, useSettingsAtom } from "@/store/global";
import * as services from "@/store/services";
import * as keyutil from "@/util/keyutil";
import { FitAddon } from "@xterm/addon-fit";
import type { ITheme } from "@xterm/xterm";
import { Terminal } from "@xterm/xterm";
import clsx from "clsx";
import { produce } from "immer";
import * as jotai from "jotai";
@ -23,6 +13,7 @@ import { IJsonView } from "./ijson";
import { TermStickers } from "./termsticker";
import { TermWrap } from "./termwrap";
import { WshServer } from "@/app/store/wshserver";
import "public/xterm.css";
import "./term.less";
@ -54,23 +45,6 @@ function getThemeFromCSSVars(el: Element): ITheme {
return theme;
}
function handleResize(fitAddon: FitAddon, blockId: string, term: Terminal) {
if (term == null) {
return;
}
const oldRows = term.rows;
const oldCols = term.cols;
fitAddon.fit();
if (oldRows !== term.rows || oldCols !== term.cols) {
const wsCommand: SetBlockTermSizeWSCommand = {
wscommand: "setblocktermsize",
blockid: blockId,
termsize: { rows: term.rows, cols: term.cols },
};
sendWSCommand(wsCommand);
}
}
const keyMap = {
Enter: "\r",
Backspace: "\x7f",
@ -177,14 +151,12 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) {
event.preventDefault();
event.stopPropagation();
const metaCmd: BlockSetMetaCommand = { command: "setmeta", meta: { "term:mode": "html" } };
services.BlockService.SendCommand(this.blockId, metaCmd);
WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } });
return false;
}
if (shellProcStatusRef.current != "running" && keyutil.checkKeyPressed(waveEvent, "Enter")) {
// restart
const restartCmd: BlockRestartCommand = { command: "controller:restart", blockid: blockId };
services.BlockService.SendCommand(blockId, restartCmd);
WshServer.BlockRestartCommand({ blockid: blockId });
return false;
}
}
@ -224,8 +196,7 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
const waveEvent = keyutil.adaptFromReactOrNativeKeyEvent(event);
if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) {
// reset term:mode
const metaCmd: BlockSetMetaCommand = { command: "setmeta", meta: { "term:mode": null } };
services.BlockService.SendCommand(blockId, metaCmd);
WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } });
return false;
}
const asciiVal = keyboardEventToASCII(event);
@ -233,8 +204,7 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
return false;
}
const b64data = btoa(asciiVal);
const inputCmd: BlockInputCommand = { command: "controller:input", inputdata64: b64data, blockid: blockId };
services.BlockService.SendCommand(blockId, inputCmd);
WshServer.BlockInputCommand({ blockid: blockId, inputdata64: b64data });
return true;
};

View File

@ -1,8 +1,8 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { WshServer } from "@/app/store/wshserver";
import { createBlock, getBackendHostPort } from "@/store/global";
import * as services from "@/store/services";
import clsx from "clsx";
import * as jotai from "jotai";
import * as React from "react";
@ -98,12 +98,7 @@ function TermSticker({ sticker, config }: { sticker: StickerType; config: Sticke
console.log("clickHandler", sticker.clickcmd, sticker.clickblockdef);
if (sticker.clickcmd) {
const b64data = btoa(sticker.clickcmd);
const inputCmd: BlockInputCommand = {
command: "controller:input",
inputdata64: b64data,
blockid: config.blockId,
};
services.BlockService.SendCommand(config.blockId, inputCmd);
WshServer.BlockInputCommand({ blockid: config.blockId, inputdata64: b64data });
}
if (sticker.clickblockdef) {
createBlock(sticker.clickblockdef);

View File

@ -1,6 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { WshServer } from "@/app/store/wshserver";
import { PLATFORM, fetchWaveFile, getFileSubject, sendWSCommand } from "@/store/global";
import * as services from "@/store/services";
import { base64ToArray } from "@/util/util";
@ -75,17 +76,7 @@ export class TermWrap {
handleTermData(data: string) {
const b64data = btoa(data);
if (b64data.length < 512) {
const wsCmd: BlockInputWSCommand = { wscommand: "blockinput", blockid: this.blockId, inputdata64: b64data };
sendWSCommand(wsCmd);
} else {
const inputCmd: BlockInputCommand = {
command: "controller:input",
blockid: this.blockId,
inputdata64: b64data,
};
services.BlockService.SendCommand(this.blockId, inputCmd);
}
WshServer.BlockInputCommand({ blockid: this.blockId, inputdata64: b64data });
}
addFocusListener(focusFn: () => void) {

View File

@ -107,6 +107,73 @@ declare global {
meta: MetaType;
};
// wshrpc.CommandAppendFileData
type CommandAppendFileData = {
zoneid: string;
filename: string;
data64: string;
};
// wshrpc.CommandAppendIJsonData
type CommandAppendIJsonData = {
zoneid: string;
filename: string;
data: MetaType;
};
// wshrpc.CommandBlockInputData
type CommandBlockInputData = {
blockid: string;
inputdata64?: string;
signame?: string;
termsize?: TermSize;
};
// wshrpc.CommandBlockRestartData
type CommandBlockRestartData = {
blockid: string;
};
// wshrpc.CommandBlockSetViewData
type CommandBlockSetViewData = {
blockid: string;
view: string;
};
// wshrpc.CommandCreateBlockData
type CommandCreateBlockData = {
tabid: string;
blockdef: BlockDef;
rtopts: RuntimeOpts;
};
// wshrpc.CommandGetMetaData
type CommandGetMetaData = {
oref: ORef;
};
// wshrpc.CommandMessageData
type CommandMessageData = {
oref: ORef;
message: string;
};
// wshrpc.CommandResolveIdsData
type CommandResolveIdsData = {
ids: string[];
};
// wshrpc.CommandResolveIdsRtnData
type CommandResolveIdsRtnData = {
resolvedids: {[key: string]: ORef};
};
// wshrpc.CommandSetMetaData
type CommandSetMetaData = {
oref: ORef;
meta: MetaType;
};
// wshutil.CreateBlockCommand
type CreateBlockCommand = {
command: "createblock";
@ -115,18 +182,6 @@ declare global {
rtopts?: RuntimeOpts;
};
// wconfig.DateTimeConfigType
type DateTimeConfigType = {
locale: string;
format: DateTimeFormatConfigType;
};
// wconfig.DateTimeFormatConfigType
type DateTimeFormatConfigType = {
dateStyle: number;
timeStyle: number;
};
// wstore.FileDef
type FileDef = {
filetype?: string;
@ -185,10 +240,7 @@ declare global {
};
// waveobj.ORef
type ORef = {
otype: string;
oid: string;
};
type ORef = string;
// wstore.Point
type Point = {
@ -202,6 +254,18 @@ declare global {
ids: string[];
};
// wshutil.RpcMessage
type RpcMessage = {
command?: string;
reqid?: string;
resid?: string;
timeout?: number;
cont?: boolean;
error?: string;
datatype?: string;
data?: any;
};
// wstore.RuntimeOpts
type RuntimeOpts = {
termsize?: TermSize;
@ -218,7 +282,6 @@ declare global {
// wconfig.SettingsConfigType
type SettingsConfigType = {
mimetypes: {[key: string]: MimeTypeConfigType};
datetime: DateTimeConfigType;
term: TerminalConfigType;
widgets: WidgetsConfigType[];
blockheader: BlockHeaderOpts;
@ -273,7 +336,7 @@ declare global {
type WSCommandType = {
wscommand: string;
} & ( SetBlockTermSizeWSCommand | BlockInputWSCommand );
} & ( SetBlockTermSizeWSCommand | BlockInputWSCommand | WSRpcCommand );
// eventbus.WSEventType
type WSEventType = {
@ -297,6 +360,12 @@ declare global {
blockid: string;
};
// webcmd.WSRpcCommand
type WSRpcCommand = {
wscommand: "rpc";
message: RpcMessage;
};
// wconfig.WatcherUpdate
type WatcherUpdate = {
file: string;
@ -380,6 +449,17 @@ declare global {
meta: MetaType;
};
// wshrpc.WshRpcCommandOpts
type WshRpcCommandOpts = {
timeout: number;
noresponse: boolean;
};
// wshrpc.WshServerCommandMeta
type WshServerCommandMeta = {
commandtype: string;
};
}
export {}

View File

@ -1,6 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { WshServer } from "@/app/store/wshserver";
import { atoms, getApi, globalStore, globalWS, initWS, setPlatform } from "@/store/global";
import * as services from "@/store/services";
import * as WOS from "@/store/wos";
@ -25,6 +26,7 @@ loadFonts();
(window as any).globalWS = globalWS;
(window as any).WOS = WOS;
(window as any).globalStore = globalStore;
(window as any).WshServer = WshServer;
document.title = `The Next Wave (${windowId.substring(0, 8)})`;

View File

@ -12,7 +12,6 @@ import (
"io"
"io/fs"
"log"
"strings"
"sync"
"time"
@ -26,6 +25,9 @@ import (
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
// set by main-server.go (for dependency inversion)
var WshServerFactoryFn func(inputCh chan []byte, outputCh chan []byte, initialCtx wshutil.RpcContext) = nil
const (
BlockController_Shell = "shell"
BlockController_Cmd = "cmd"
@ -58,21 +60,18 @@ type BlockInputUnion struct {
TermSize *shellexec.TermSize `json:"termsize,omitempty"`
}
type RunCmdFnType = func(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (wshutil.ResponseDataType, error)
type BlockController struct {
Lock *sync.Mutex
ControllerType string
TabId string
BlockId string
BlockDef *wstore.BlockDef
InputCh chan wshutil.BlockCommand
Status string
CreatedHtmlFile bool
ShellProc *shellexec.ShellProc
ShellInputCh chan *BlockInputUnion
ShellProcStatus string
RunCmdFn RunCmdFnType
StopCh chan bool
}
type BlockControllerRuntimeStatus struct {
@ -206,14 +205,6 @@ func (bc *BlockController) resetTerminalState() {
}
}
func (bc *BlockController) waveOSCMessageHandler(ctx context.Context, cmd wshutil.BlockCommand, respFn wshutil.ResponseFnType) (wshutil.ResponseDataType, error) {
if strings.HasPrefix(cmd.GetCommand(), "controller:") {
bc.InputCh <- cmd
return nil, nil
}
return bc.RunCmdFn(ctx, cmd, wshutil.CmdContextType{BlockId: bc.BlockId, TabId: bc.TabId})
}
func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta map[string]any) error {
// create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
@ -307,9 +298,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta map[str
})
shellInputCh := make(chan *BlockInputUnion, 32)
bc.ShellInputCh = shellInputCh
messageCh := make(chan wshutil.RpcMessage, 32)
messageCh := make(chan []byte, 32)
ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, bc.ShellProc.Pty, messageCh)
_, outputCh := wshutil.MakeWshRpc(wshutil.WaveServerOSC, messageCh, bc.waveOSCMessageHandler)
outputCh := make(chan []byte, 32)
WshServerFactoryFn(messageCh, outputCh, wshutil.RpcContext{BlockId: bc.BlockId, TabId: bc.TabId})
go func() {
// handles regular output from the pty (goes to the blockfile and xterm)
defer func() {
@ -360,8 +352,9 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta map[str
}()
go func() {
// handles outputCh -> shellInputCh
for out := range outputCh {
shellInputCh <- &BlockInputUnion{InputData: out}
for msg := range outputCh {
encodedMsg := wshutil.EncodeWaveOSCBytes(wshutil.WaveServerOSC, msg)
shellInputCh <- &BlockInputUnion{InputData: encodedMsg}
}
}()
go func() {
@ -429,42 +422,33 @@ func (bc *BlockController) run(bdata *wstore.Block, blockMeta map[string]any) {
}
}()
}
for genCmd := range bc.InputCh {
switch cmd := genCmd.(type) {
case *wshutil.BlockInputCommand:
if bc.ShellInputCh == nil {
continue
}
inputUnion := &BlockInputUnion{
SigName: cmd.SigName,
TermSize: cmd.TermSize,
}
if len(cmd.InputData64) > 0 {
inputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(cmd.InputData64)))
nw, err := base64.StdEncoding.Decode(inputBuf, []byte(cmd.InputData64))
if err != nil {
log.Printf("error decoding input data: %v\n", err)
continue
}
inputUnion.InputData = inputBuf[:nw]
}
bc.ShellInputCh <- inputUnion
case *wshutil.BlockRestartCommand:
// TODO: if shell command is already running
// we probably want to kill it off, wait, and then restart it
err := bc.DoRunShellCommand(&RunShellOpts{TermSize: bdata.RuntimeOpts.TermSize}, bdata.Meta)
if err != nil {
log.Printf("error running shell command: %v\n", err)
}
default:
log.Printf("unknown command type %T\n", cmd)
}
}
<-bc.StopCh
}
func StartBlockController(ctx context.Context, tabId string, blockId string, runCmdFn RunCmdFnType) error {
func (bc *BlockController) SendInput(inputUnion *BlockInputUnion) error {
if bc.ShellInputCh == nil {
return fmt.Errorf("no shell input chan")
}
bc.ShellInputCh <- inputUnion
return nil
}
func (bc *BlockController) RestartController() error {
// TODO: if shell command is already running
// we probably want to kill it off, wait, and then restart it
bdata, err := wstore.DBMustGet[*wstore.Block](context.Background(), bc.BlockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
err = bc.DoRunShellCommand(&RunShellOpts{TermSize: bdata.RuntimeOpts.TermSize}, bdata.Meta)
if err != nil {
log.Printf("error running shell command: %v\n", err)
}
return nil
}
func StartBlockController(ctx context.Context, tabId string, blockId string) error {
log.Printf("start blockcontroller %q\n", blockId)
blockData, err := wstore.DBMustGet[*wstore.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
@ -488,9 +472,8 @@ func StartBlockController(ctx context.Context, tabId string, blockId string, run
TabId: tabId,
BlockId: blockId,
Status: Status_Init,
InputCh: make(chan wshutil.BlockCommand),
RunCmdFn: runCmdFn,
ShellProcStatus: Status_Init,
StopCh: make(chan bool),
}
blockControllerMap[blockId] = bc
go bc.run(blockData, blockData.Meta)
@ -505,7 +488,7 @@ func StopBlockController(blockId string) {
if bc.getShellProc() != nil {
bc.ShellProc.Close()
}
close(bc.InputCh)
close(bc.StopCh)
}
func GetBlockController(blockId string) *BlockController {

View File

@ -1,295 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package cmdqueue
import (
"context"
"encoding/base64"
"fmt"
"io/fs"
"log"
"runtime/debug"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const DefaultTimeout = 2 * time.Second
const CmdQueueSize = 100
func RunCmd(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (rtnData wshutil.ResponseDataType, rtnErr error) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC: %v\n", r)
debug.PrintStack()
rtnData = nil
rtnErr = fmt.Errorf("panic: %v", r)
return
}
}()
blockId := cmdCtx.BlockId
bcCmd, ok := cmd.(wshutil.BlockControllerCommand)
if ok && bcCmd.GetBlockId() != "" {
blockId = bcCmd.GetBlockId()
}
if strings.HasPrefix(cmd.GetCommand(), "controller:") {
// send to block controller
bc := blockcontroller.GetBlockController(blockId)
if bc == nil {
return nil, fmt.Errorf("block controller not found for block %q", blockId)
}
bc.InputCh <- cmd
return nil, nil
}
switch typedCmd := cmd.(type) {
case *wshutil.BlockGetMetaCommand:
return handleGetMeta(ctx, typedCmd)
case *wshutil.ResolveIdsCommand:
return handleResolveIds(ctx, typedCmd)
case *wshutil.BlockSetMetaCommand:
return handleSetMeta(ctx, typedCmd, cmdCtx)
case *wshutil.BlockSetViewCommand:
return handleSetView(ctx, typedCmd, cmdCtx)
case *wshutil.BlockMessageCommand:
log.Printf("MESSAGE: %s | %q\n", blockId, typedCmd.Message)
return nil, nil
case *wshutil.BlockAppendFileCommand:
log.Printf("APPENDFILE: %s | %q | len:%d\n", blockId, typedCmd.FileName, len(typedCmd.Data))
err := handleAppendBlockFile(blockId, typedCmd.FileName, typedCmd.Data)
if err != nil {
return nil, fmt.Errorf("error appending blockfile: %w", err)
}
return nil, nil
case *wshutil.BlockAppendIJsonCommand:
log.Printf("APPENDIJSON: %s | %q\n", blockId, typedCmd.FileName)
err := handleAppendIJsonFile(blockId, typedCmd.FileName, typedCmd.Data, true)
if err != nil {
return nil, fmt.Errorf("error appending blockfile(ijson): %w", err)
}
return nil, nil
case *wshutil.CreateBlockCommand:
return handleCreateBlock(ctx, typedCmd, cmdCtx)
default:
return nil, fmt.Errorf("unknown command: %q", cmd.GetCommand())
}
}
func handleSetView(ctx context.Context, cmd *wshutil.BlockSetViewCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
log.Printf("SETVIEW: %s | %q\n", cmdCtx.BlockId, cmd.View)
block, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
}
block.View = cmd.View
err = wstore.DBUpdate(ctx, block)
if err != nil {
return nil, fmt.Errorf("error updating block: %w", err)
}
// send a waveobj:update event
updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId).String(),
Data: wstore.WaveObjUpdate{
UpdateType: wstore.UpdateType_Update,
OType: wstore.OType_Block,
OID: cmdCtx.BlockId,
Obj: updatedBlock,
},
})
return nil, nil
}
func handleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) {
oref, err := waveobj.ParseORef(cmd.ORef)
if err != nil {
return nil, fmt.Errorf("error parsing oref: %w", err)
}
obj, err := wstore.DBGetORef(ctx, oref)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil, fmt.Errorf("object not found: %s", oref)
}
return waveobj.GetMeta(obj), nil
}
func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) {
if strings.Contains(simpleId, ":") {
rtn, err := waveobj.ParseORef(simpleId)
if err != nil {
return nil, fmt.Errorf("error parsing simple id: %w", err)
}
return &rtn, nil
}
return wstore.DBResolveEasyOID(ctx, simpleId)
}
func handleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) {
rtn := make(map[string]any)
for _, simpleId := range cmd.Ids {
oref, err := resolveSimpleId(ctx, simpleId)
if err != nil || oref == nil {
continue
}
rtn[simpleId] = oref.String()
}
return rtn, nil
}
func handleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
var oref *waveobj.ORef
if cmd.ORef != "" {
orefVal, err := waveobj.ParseORef(cmd.ORef)
if err != nil {
return nil, fmt.Errorf("error parsing oref: %w", err)
}
oref = &orefVal
} else {
orefVal := waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId)
oref = &orefVal
}
log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta)
obj, err := wstore.DBGetORef(ctx, *oref)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil, nil
}
meta := waveobj.GetMeta(obj)
if meta == nil {
meta = make(map[string]any)
}
for k, v := range cmd.Meta {
if v == nil {
delete(meta, k)
continue
}
meta[k] = v
}
waveobj.SetMeta(obj, meta)
err = wstore.DBUpdate(ctx, obj)
if err != nil {
return nil, fmt.Errorf("error updating block: %w", err)
}
// send a waveobj:update event
updatedBlock, err := wstore.DBGetORef(ctx, *oref)
if err != nil {
return nil, fmt.Errorf("error getting object (2): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: oref.String(),
Data: wstore.WaveObjUpdate{
UpdateType: wstore.UpdateType_Update,
OType: updatedBlock.GetOType(),
OID: waveobj.GetOID(updatedBlock),
Obj: updatedBlock,
},
})
return nil, nil
}
func handleAppendBlockFile(blockId string, blockFile string, data []byte) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := filestore.WFS.AppendData(ctx, blockId, blockFile, data)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(data),
},
})
return nil
}
func handleAppendIJsonFile(blockId string, blockFile string, cmd map[string]any, tryCreate bool) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
if blockFile == blockcontroller.BlockFile_Html && tryCreate {
err := filestore.WFS.MakeFile(ctx, blockId, blockFile, nil, filestore.FileOptsType{MaxSize: blockcontroller.DefaultHtmlMaxFileSize, IJson: true})
if err != nil && err != fs.ErrExist {
return fmt.Errorf("error creating blockfile[html]: %w", err)
}
}
err := filestore.WFS.AppendIJson(ctx, blockId, blockFile, cmd)
if err != nil {
return fmt.Errorf("error appending to blockfile(ijson): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString([]byte("{}")),
},
})
return nil
}
func sendWStoreUpdatesToEventBus(updates wstore.UpdatesRtnType) {
for _, update := range updates {
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: waveobj.MakeORef(update.OType, update.OID).String(),
Data: update,
})
}
}
func handleCreateBlock(ctx context.Context, cmd *wshutil.CreateBlockCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
ctx = wstore.ContextWithUpdates(ctx)
tabId := cmdCtx.TabId
if cmd.TabId != "" {
tabId = cmd.TabId
}
blockData, err := wstore.CreateBlock(ctx, tabId, cmd.BlockDef, cmd.RtOpts)
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
if blockData.Controller != "" {
err = blockcontroller.StartBlockController(ctx, cmd.TabId, blockData.OID, RunCmd)
if err != nil {
return nil, fmt.Errorf("error starting block controller: %w", err)
}
}
updates := wstore.ContextGetUpdatesRtn(ctx)
sendWStoreUpdatesToEventBus(updates)
windowId, err := wstore.DBFindWindowForTabId(ctx, tabId)
if err != nil {
return nil, fmt.Errorf("error finding window for tab: %w", err)
}
if windowId == "" {
return nil, fmt.Errorf("no window found for tab")
}
eventbus.SendEventToWindow(windowId, eventbus.WSEventType{
EventType: eventbus.WSEvent_LayoutAction,
Data: &eventbus.WSLayoutActionData{
ActionType: "insert",
TabId: tabId,
BlockId: blockData.OID,
},
})
return map[string]any{"blockId": blockData.OID}, nil
}

View File

@ -21,6 +21,7 @@ const (
WSEvent_BlockControllerStatus = "blockcontroller:status"
WSEvent_LayoutAction = "layoutaction"
WSEvent_ElectronNewWindow = "electron:newwindow"
WSEvent_Rpc = "rpc"
)
type WSEventType struct {

View File

@ -9,10 +9,8 @@ import (
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/cmdqueue"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -40,13 +38,6 @@ func (bs *BlockService) GetControllerStatus(ctx context.Context, blockId string)
return bc.GetRuntimeStatus(), nil
}
func (bs *BlockService) SendCommand(uiContext wstore.UIContext, blockId string, cmd wshutil.BlockCommand) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
_, err := cmdqueue.RunCmd(ctx, cmd, wshutil.CmdContextType{BlockId: blockId, TabId: uiContext.ActiveTabId})
return err
}
func (bs *BlockService) SaveTerminalState(ctx context.Context, blockId string, state string, stateType string, ptyOffset int64) error {
_, err := wstore.DBMustGet[*wstore.Block](ctx, blockId)
if err != nil {

View File

@ -91,7 +91,11 @@ func (fs *FileService) ReadFile(path string) (*FullFile, error) {
innerFilesInfo = append(innerFilesInfo, *parentFileInfo)
}
for _, innerFileEntry := range innerFilesEntries {
innerFileInfoInt, _ := innerFileEntry.Info()
innerFileInfoInt, err := innerFileEntry.Info()
if err != nil {
log.Printf("unable to get file info for (innerFileInfo) %s: %v", innerFileEntry.Name(), err)
continue
}
mimeType := utilfn.DetectMimeType(filepath.Join(finfo.Path, innerFileInfoInt.Name()))
var fileSize int64
if mimeType == "directory" {

View File

@ -11,7 +11,6 @@ import (
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/cmdqueue"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wstore"
@ -137,7 +136,7 @@ func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string)
return nil, fmt.Errorf("error getting tab: %w", err)
}
for _, blockId := range tab.BlockIds {
blockErr := blockcontroller.StartBlockController(ctx, tabId, blockId, cmdqueue.RunCmd)
blockErr := blockcontroller.StartBlockController(ctx, tabId, blockId)
if blockErr != nil {
// we don't want to fail the set active tab operation if a block controller fails to start
log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr)
@ -191,7 +190,7 @@ func (svc *ObjectService) CreateBlock(uiContext wstore.UIContext, blockDef *wsto
return "", nil, fmt.Errorf("error creating block: %w", err)
}
if blockData.Controller != "" {
err = blockcontroller.StartBlockController(ctx, uiContext.ActiveTabId, blockData.OID, cmdqueue.RunCmd)
err = blockcontroller.StartBlockController(ctx, uiContext.ActiveTabId, blockData.OID)
if err != nil {
return "", nil, fmt.Errorf("error starting block controller: %w", err)
}

View File

@ -18,7 +18,6 @@ import (
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/web/webcmd"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -39,8 +38,8 @@ var waveObjMapRType = reflect.TypeOf(map[string]waveobj.WaveObj{})
var methodMetaRType = reflect.TypeOf(tsgenmeta.MethodMeta{})
var waveObjUpdateRType = reflect.TypeOf(wstore.WaveObjUpdate{})
var uiContextRType = reflect.TypeOf((*wstore.UIContext)(nil)).Elem()
var blockCommandRType = reflect.TypeOf((*wshutil.BlockCommand)(nil)).Elem()
var wsCommandRType = reflect.TypeOf((*webcmd.WSCommandType)(nil)).Elem()
var orefRType = reflect.TypeOf((*waveobj.ORef)(nil)).Elem()
type WebCallType struct {
Service string `json:"service"`
@ -96,18 +95,7 @@ func convertComplex(argType reflect.Type, jsonArg any) (any, error) {
}
func isSpecialWaveArgType(argType reflect.Type) bool {
return argType == waveObjRType || argType == waveObjSliceRType || argType == waveObjMapRType || argType == blockCommandRType || argType == wsCommandRType
}
func convertBlockCommand(argType reflect.Type, jsonArg any) (any, error) {
if _, ok := jsonArg.(map[string]any); !ok {
return nil, fmt.Errorf("cannot convert %T to %s", jsonArg, argType)
}
cmd, err := wshutil.ParseCmdMap(jsonArg.(map[string]any))
if err != nil {
return nil, fmt.Errorf("error parsing command map: %w", err)
}
return cmd, nil
return argType == waveObjRType || argType == waveObjSliceRType || argType == waveObjMapRType || argType == wsCommandRType
}
func convertWSCommand(argType reflect.Type, jsonArg any) (any, error) {
@ -123,8 +111,15 @@ func convertWSCommand(argType reflect.Type, jsonArg any) (any, error) {
func convertSpecial(argType reflect.Type, jsonArg any) (any, error) {
jsonType := reflect.TypeOf(jsonArg)
if argType == blockCommandRType {
return convertBlockCommand(argType, jsonArg)
if argType == orefRType {
if jsonType.Kind() != reflect.String {
return nil, fmt.Errorf("cannot convert %T to %s", jsonArg, argType)
}
oref, err := waveobj.ParseORef(jsonArg.(string))
if err != nil {
return nil, fmt.Errorf("invalid oref string: %v", err)
}
return oref, nil
} else if argType == wsCommandRType {
return convertWSCommand(argType, jsonArg)
} else if argType == waveObjRType {

View File

@ -17,6 +17,8 @@ import (
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wconfig"
"github.com/wavetermdev/thenextwave/pkg/web/webcmd"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshserver"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -35,11 +37,12 @@ var ExtraTypes = []any{
filestore.WaveFile{},
wconfig.SettingsConfigType{},
wconfig.WatcherUpdate{},
wshutil.RpcMessage{},
wshrpc.WshServerCommandMeta{},
}
// add extra type unions to generate here
var TypeUnions = []tsgenmeta.TypeUnionMeta{
wshutil.CommandTypeUnionMeta(),
webcmd.WSCommandTypeUnionMeta(),
}
@ -50,6 +53,7 @@ var metaRType = reflect.TypeOf((*map[string]any)(nil)).Elem()
var uiContextRType = reflect.TypeOf((*wstore.UIContext)(nil)).Elem()
var waveObjRType = reflect.TypeOf((*waveobj.WaveObj)(nil)).Elem()
var updatesRtnRType = reflect.TypeOf(wstore.UpdatesRtnType{})
var orefRType = reflect.TypeOf((*waveobj.ORef)(nil)).Elem()
func generateTSMethodTypes(method reflect.Method, tsTypesMap map[reflect.Type]string) error {
for idx := 1; idx < method.Type.NumIn(); idx++ {
@ -260,6 +264,10 @@ func GenerateTSType(rtype reflect.Type, tsTypesMap map[reflect.Type]string) {
if _, ok := tsTypesMap[rtype]; ok {
return
}
if rtype == orefRType {
tsTypesMap[orefRType] = "// waveobj.ORef\ntype ORef = string;\n"
return
}
if rtype == waveObjRType {
tsTypesMap[rtype] = GenerateWaveObjTSType()
return
@ -377,7 +385,28 @@ func GenerateServiceClass(serviceName string, serviceObj any, tsTypesMap map[ref
isFirst = false
}
sb.WriteString("}\n\n")
sb.WriteString(fmt.Sprintf("export const %s = new %sType()\n", tsServiceName, tsServiceName))
sb.WriteString(fmt.Sprintf("export const %s = new %sType();\n", tsServiceName, tsServiceName))
return sb.String()
}
func GenerateWshServerMethod(methodDecl *wshserver.WshServerMethodDecl, tsTypesMap map[reflect.Type]string) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" // command %q [%s]\n", methodDecl.Command, methodDecl.CommandType))
rtnType := "Promise<void>"
if methodDecl.DefaultResponseDataType != nil {
rtnTypeName, _ := TypeToTSType(methodDecl.DefaultResponseDataType, tsTypesMap)
rtnType = fmt.Sprintf("Promise<%s>", rtnTypeName)
}
if methodDecl.CommandDataType != nil {
sb.WriteString(fmt.Sprintf(" %s(data: %s, opts?: WshRpcCommandOpts): %s {\n", methodDecl.MethodName, methodDecl.CommandDataType.Name(), rtnType))
} else {
sb.WriteString(fmt.Sprintf(" %s(opts?: WshRpcCommandOpts): %s {\n", methodDecl.MethodName, rtnType))
}
metaData := fmt.Sprintf(" const meta: WshServerCommandMeta = {commandtype: %q};\n", methodDecl.CommandType)
methodBody := fmt.Sprintf(" return WOS.callWshServerRpc(%q, data, meta, opts);\n", methodDecl.Command)
sb.WriteString(metaData)
sb.WriteString(methodBody)
sb.WriteString(" }\n")
return sb.String()
}
@ -406,3 +435,20 @@ func GenerateServiceTypes(tsTypesMap map[reflect.Type]string) error {
}
return nil
}
func GenerateWshServerTypes(tsTypesMap map[reflect.Type]string) error {
GenerateTSType(reflect.TypeOf(wshrpc.WshRpcCommandOpts{}), tsTypesMap)
for _, methodDecl := range wshserver.WshServerCommandToDeclMap {
GenerateTSType(methodDecl.CommandDataType, tsTypesMap)
if methodDecl.DefaultResponseDataType != nil {
GenerateTSType(methodDecl.DefaultResponseDataType, tsTypesMap)
}
for _, rtype := range methodDecl.RequestDataTypes {
GenerateTSType(rtype, tsTypesMap)
}
for _, rtype := range methodDecl.ResponseDataTypes {
GenerateTSType(rtype, tsTypesMap)
}
}
return nil
}

View File

@ -738,6 +738,14 @@ func IndentString(indent string, str string) string {
return rtn.String()
}
func ReUnmarshal(out any, in any) error {
barr, err := json.Marshal(in)
if err != nil {
return err
}
return json.Unmarshal(barr, out)
}
// does a mapstructure using "json" tags
func DoMapStucture(out any, input any) error {
dconfig := &mapstructure.DecoderConfig{

View File

@ -25,6 +25,7 @@ const DefaultWaveHome = "~/.w2"
const WaveHomeVarName = "WAVETERM_HOME"
const WaveDevVarName = "WAVETERM_DEV"
const WaveLockFile = "waveterm.lock"
const DomainSocketBaseName = "wave.sock"
var baseLock = &sync.Mutex{}
var ensureDirCache = map[string]bool{}
@ -64,6 +65,10 @@ func ReplaceHomeDir(pathStr string) string {
return pathStr
}
func GetDomainSocketName() string {
return filepath.Join(GetWaveHomeDir(), DomainSocketBaseName)
}
func GetWaveHomeDir() string {
homeVar := os.Getenv(WaveHomeVarName)
if homeVar != "" {

View File

@ -27,6 +27,7 @@ const (
)
type ORef struct {
// special JSON marshalling to string
OType string `json:"otype" mapstructure:"otype"`
OID string `json:"oid" mapstructure:"oid"`
}
@ -35,6 +36,29 @@ func (oref ORef) String() string {
return fmt.Sprintf("%s:%s", oref.OType, oref.OID)
}
func (oref ORef) MarshalJSON() ([]byte, error) {
return json.Marshal(oref.String())
}
func (oref ORef) IsEmpty() bool {
// either being empty is not valid
return oref.OType == "" || oref.OID == ""
}
func (oref *ORef) UnmarshalJSON(data []byte) error {
var orefStr string
err := json.Unmarshal(data, &orefStr)
if err != nil {
return err
}
parsed, err := ParseORef(orefStr)
if err != nil {
return err
}
*oref = parsed
return nil
}
func MakeORef(otype string, oid string) ORef {
return ORef{
OType: otype,

View File

@ -10,11 +10,13 @@ import (
"github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
)
const (
WSCommand_SetBlockTermSize = "setblocktermsize"
WSCommand_BlockInput = "blockinput"
WSCommand_Rpc = "rpc"
)
type WSCommandType interface {
@ -28,10 +30,20 @@ func WSCommandTypeUnionMeta() tsgenmeta.TypeUnionMeta {
Types: []reflect.Type{
reflect.TypeOf(SetBlockTermSizeWSCommand{}),
reflect.TypeOf(BlockInputWSCommand{}),
reflect.TypeOf(WSRpcCommand{}),
},
}
}
type WSRpcCommand struct {
WSCommand string `json:"wscommand" tstype:"\"rpc\""`
Message *wshutil.RpcMessage `json:"message"`
}
func (cmd *WSRpcCommand) GetWSCommand() string {
return cmd.WSCommand
}
type SetBlockTermSizeWSCommand struct {
WSCommand string `json:"wscommand" tstype:"\"setblocktermsize\""`
BlockId string `json:"blockid"`
@ -72,6 +84,13 @@ func ParseWSCommandMap(cmdMap map[string]any) (WSCommandType, error) {
return nil, fmt.Errorf("error decoding BlockInputWSCommand: %w", err)
}
return &cmd, nil
case WSCommand_Rpc:
var cmd WSRpcCommand
err := utilfn.DoMapStucture(&cmd, cmdMap)
if err != nil {
return nil, fmt.Errorf("error decoding WSRpcCommand: %w", err)
}
return &cmd, nil
default:
return nil, fmt.Errorf("unknown wscommand type %q", cmdType)
}

View File

@ -4,7 +4,6 @@
package web
import (
"context"
"encoding/json"
"fmt"
"log"
@ -16,12 +15,15 @@ import (
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/wavetermdev/thenextwave/pkg/cmdqueue"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/web/webcmd"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
)
// set by main-server.go (for dependency inversion)
var WshServerFactoryFn func(inputCh chan []byte, outputCh chan []byte, initialCtx wshutil.RpcContext) = nil
const wsReadWaitTimeout = 15 * time.Second
const wsWriteWaitTimeout = 10 * time.Second
const wsPingPeriodTickTime = 10 * time.Second
@ -76,7 +78,7 @@ func getStringFromMap(jmsg map[string]any, key string) string {
return ""
}
func processWSCommand(jmsg map[string]any, outputCh chan any) {
func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []byte) {
var rtnErr error
defer func() {
r := recover()
@ -98,34 +100,57 @@ func processWSCommand(jmsg map[string]any, outputCh chan any) {
}
switch cmd := wsCommand.(type) {
case *webcmd.SetBlockTermSizeWSCommand:
blockCmd := &wshutil.BlockInputCommand{
Command: wshutil.BlockCommand_Input,
data := wshrpc.CommandBlockInputData{
BlockId: cmd.BlockId,
TermSize: &cmd.TermSize,
}
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultCommandTimeout)
defer cancelFn()
_, err = cmdqueue.RunCmd(ctx, blockCmd, wshutil.CmdContextType{BlockId: cmd.BlockId})
if err != nil {
log.Printf("error running command %q: %v\n", blockCmd.Command, err)
rpcMsg := wshutil.RpcMessage{
Command: wshrpc.Command_BlockInput,
Data: data,
}
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err)
return
}
rpcInputCh <- msgBytes
case *webcmd.BlockInputWSCommand:
blockCmd := &wshutil.BlockInputCommand{
Command: wshutil.BlockCommand_Input,
data := wshrpc.CommandBlockInputData{
BlockId: cmd.BlockId,
InputData64: cmd.InputData64,
}
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultCommandTimeout)
defer cancelFn()
_, err = cmdqueue.RunCmd(ctx, blockCmd, wshutil.CmdContextType{BlockId: cmd.BlockId})
if err != nil {
log.Printf("error running command %q: %v\n", blockCmd.Command, err)
rpcMsg := wshutil.RpcMessage{
Command: wshrpc.Command_BlockInput,
Data: data,
}
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err)
return
}
rpcInputCh <- msgBytes
case *webcmd.WSRpcCommand:
rpcMsg := cmd.Message
if rpcMsg == nil {
return
}
msgBytes, err := json.Marshal(rpcMsg)
if err != nil {
// this really should never fail since we just unmarshalled this value
return
}
rpcInputCh <- msgBytes
}
}
func processMessage(jmsg map[string]any, outputCh chan any) {
func processMessage(jmsg map[string]any, outputCh chan any, rpcInputCh chan []byte) {
wsCommand := getStringFromMap(jmsg, "wscommand")
if wsCommand != "" {
processWSCommand(jmsg, outputCh)
processWSCommand(jmsg, outputCh, rpcInputCh)
return
}
msgType := getMessageType(jmsg)
@ -151,7 +176,7 @@ func processMessage(jmsg map[string]any, outputCh chan any) {
rtnErr = fmt.Errorf("unknown method %q", method)
}
func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte) {
readWait := wsReadWaitTimeout
conn.SetReadLimit(64 * 1024)
conn.SetReadDeadline(time.Now().Add(readWait))
@ -180,7 +205,7 @@ func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
outputCh <- pongMessage
continue
}
go processMessage(jmsg, outputCh)
go processMessage(jmsg, outputCh, rpcInputCh)
}
}
@ -253,14 +278,28 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
log.Printf("New websocket connection: windowid:%s connid:%s\n", windowId, wsConnId)
outputCh := make(chan any, 100)
closeCh := make(chan any)
rpcInputCh := make(chan []byte, 32)
rpcOutputCh := make(chan []byte, 32)
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
defer eventbus.UnregisterWSChannel(wsConnId)
WshServerFactoryFn(rpcInputCh, rpcOutputCh, wshutil.RpcContext{WindowId: windowId})
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
// no waitgroup add here
// move values from rpcOutputCh to outputCh
for msgBytes := range rpcOutputCh {
rpcWSMsg := map[string]any{
"eventtype": "rpc", // TODO don't hard code this (but def is in eventbus)
"data": json.RawMessage(msgBytes),
}
outputCh <- rpcWSMsg
}
}()
go func() {
// read loop
defer wg.Done()
ReadLoop(conn, outputCh, closeCh)
ReadLoop(conn, outputCh, closeCh, rpcInputCh)
}()
go func() {
// write loop
@ -268,5 +307,6 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
WriteLoop(conn, outputCh, closeCh)
}()
wg.Wait()
close(rpcInputCh)
return nil
}

View File

@ -1,264 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshprc
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
)
// there is a single go-routine that reads from RecvCh
type RpcClient struct {
CVar *sync.Cond
NextSeqNum *atomic.Int64
ReqPacketsInFlight map[int64]string // seqnum -> rpcId
AckList []int64
RpcReqs map[string]*RpcInfo
SendCh chan *RpcPacket
RecvCh chan *RpcPacket
}
type RpcInfo struct {
CloseSync *sync.Once
RpcId string
PacketsInFlight map[int64]bool // seqnum -> bool (for clients this is for requests, for servers it is for responses)
PkCh chan *RpcPacket // for clients this is for responses, for servers it is for requests
}
func MakeRpcClient(sendCh chan *RpcPacket, recvCh chan *RpcPacket) *RpcClient {
if cap(sendCh) < MaxInFlightPackets {
panic(fmt.Errorf("sendCh buffer size must be at least MaxInFlightPackets(%d)", MaxInFlightPackets))
}
rtn := &RpcClient{
CVar: sync.NewCond(&sync.Mutex{}),
NextSeqNum: &atomic.Int64{},
ReqPacketsInFlight: make(map[int64]string),
AckList: nil,
RpcReqs: make(map[string]*RpcInfo),
SendCh: sendCh,
RecvCh: recvCh,
}
go rtn.runRecvLoop()
return rtn
}
func (c *RpcClient) runRecvLoop() {
defer func() {
if r := recover(); r != nil {
log.Printf("RpcClient.runRecvLoop() panic: %v", r)
debug.PrintStack()
}
}()
for pk := range c.RecvCh {
if pk.RpcType == RpcType_Resp {
c.handleResp(pk)
continue
}
log.Printf("RpcClient.runRecvLoop() bad packet type: %v", pk)
}
log.Printf("RpcClient.runRecvLoop() normal exit")
}
func (c *RpcClient) getRpcInfo(rpcId string) *RpcInfo {
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
return c.RpcReqs[rpcId]
}
func (c *RpcClient) handleResp(pk *RpcPacket) {
c.handleAcks(pk.Acks)
if pk.RpcId == "" {
c.ackResp(pk.SeqNum)
log.Printf("RpcClient.handleResp() missing rpcId: %v", pk)
return
}
rpcInfo := c.getRpcInfo(pk.RpcId)
if rpcInfo == nil {
c.ackResp(pk.SeqNum)
log.Printf("RpcClient.handleResp() unknown rpcId: %v", pk)
return
}
select {
case rpcInfo.PkCh <- pk:
default:
log.Printf("RpcClient.handleResp() respCh full, dropping packet")
}
if pk.RespDone {
c.removeReqInfo(pk.RpcId, false)
}
}
func (c *RpcClient) grabAcks() []int64 {
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
acks := c.AckList
c.AckList = nil
return acks
}
func (c *RpcClient) ackResp(seqNum int64) {
if seqNum == 0 {
return
}
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
c.AckList = append(c.AckList, seqNum)
}
func (c *RpcClient) waitForReq(ctx context.Context, req *RpcPacket) (*RpcInfo, error) {
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
// issue with ctx timeout sync -- we need the cvar to be signaled fairly regularly so we can check ctx.Err()
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if len(c.RpcReqs) >= MaxOpenRpcs {
c.CVar.Wait()
continue
}
if len(c.ReqPacketsInFlight) >= MaxOpenRpcs {
c.CVar.Wait()
continue
}
if rpcInfo, ok := c.RpcReqs[req.RpcId]; ok {
if len(rpcInfo.PacketsInFlight) >= MaxUnackedPerRpc {
c.CVar.Wait()
continue
}
}
break
}
select {
case c.SendCh <- req:
default:
return nil, errors.New("SendCh Full")
}
c.ReqPacketsInFlight[req.SeqNum] = req.RpcId
rpcInfo := c.RpcReqs[req.RpcId]
if rpcInfo == nil {
rpcInfo = &RpcInfo{
CloseSync: &sync.Once{},
RpcId: req.RpcId,
PacketsInFlight: make(map[int64]bool),
PkCh: make(chan *RpcPacket, MaxUnackedPerRpc),
}
rpcInfo.PacketsInFlight[req.SeqNum] = true
c.RpcReqs[req.RpcId] = rpcInfo
}
return rpcInfo, nil
}
func (c *RpcClient) handleAcks(acks []int64) {
if len(acks) == 0 {
return
}
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
for _, ack := range acks {
rpcId, ok := c.ReqPacketsInFlight[ack]
if !ok {
continue
}
rpcInfo := c.RpcReqs[rpcId]
if rpcInfo != nil {
delete(rpcInfo.PacketsInFlight, ack)
}
delete(c.ReqPacketsInFlight, ack)
}
c.CVar.Broadcast()
}
func (c *RpcClient) removeReqInfo(rpcId string, clearSend bool) {
c.CVar.L.Lock()
defer c.CVar.L.Unlock()
rpcInfo := c.RpcReqs[rpcId]
delete(c.RpcReqs, rpcId)
if rpcInfo != nil {
if clearSend {
// unblock the recv loop if it happens to be waiting
// because the delete has already happens, it will not be able to send again on the channel
select {
case <-rpcInfo.PkCh:
default:
}
}
rpcInfo.CloseSync.Do(func() {
close(rpcInfo.PkCh)
})
}
}
func (c *RpcClient) SimpleReq(ctx context.Context, command string, data any) (any, error) {
rpcId := uuid.NewString()
seqNum := c.NextSeqNum.Add(1)
var timeoutInfo *TimeoutInfo
deadline, ok := ctx.Deadline()
if ok {
timeoutInfo = &TimeoutInfo{Deadline: deadline.UnixMilli()}
}
req := &RpcPacket{
Command: command,
RpcId: rpcId,
RpcType: RpcType_Req,
SeqNum: seqNum,
ReqDone: true,
Acks: c.grabAcks(),
Timeout: timeoutInfo,
Data: data,
}
rpcInfo, err := c.waitForReq(ctx, req)
if err != nil {
return nil, err
}
defer c.removeReqInfo(rpcId, true)
var rtnPacket *RpcPacket
select {
case <-ctx.Done():
return nil, ctx.Err()
case rtnPacket = <-rpcInfo.PkCh:
// fallthrough
}
if rtnPacket.Error != "" {
return nil, errors.New(rtnPacket.Error)
}
return rtnPacket.Data, nil
}
func (c *RpcClient) StreamReq(ctx context.Context, command string, data any, respTimeout time.Duration) (chan *RpcPacket, error) {
rpcId := uuid.NewString()
seqNum := c.NextSeqNum.Add(1)
var timeoutInfo *TimeoutInfo = &TimeoutInfo{RespPacketTimeout: respTimeout.Milliseconds()}
deadline, ok := ctx.Deadline()
if ok {
timeoutInfo.Deadline = deadline.UnixMilli()
}
req := &RpcPacket{
Command: command,
RpcId: rpcId,
RpcType: RpcType_Req,
SeqNum: seqNum,
ReqDone: true,
Acks: c.grabAcks(),
Timeout: timeoutInfo,
Data: data,
}
rpcInfo, err := c.waitForReq(ctx, req)
if err != nil {
return nil, err
}
return rpcInfo.PkCh, nil
}
func (c *RpcClient) EndStreamReq(rpcId string) {
c.removeReqInfo(rpcId, true)
}

View File

@ -1,299 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshprc
import (
"context"
"fmt"
"log"
"runtime/debug"
"sync"
"sync/atomic"
"time"
)
type SimpleCommandHandlerFn func(context.Context, *RpcServer, string, any) (any, error)
type StreamCommandHandlerFn func(context.Context, *RpcServer, *RpcPacket) error
type RpcServer struct {
CVar *sync.Cond
NextSeqNum *atomic.Int64
RespPacketsInFlight map[int64]string // seqnum -> rpcId
AckList []int64
RpcReqs map[string]*RpcInfo
SendCh chan *RpcPacket
RecvCh chan *RpcPacket
SimpleCommandHandlers map[string]SimpleCommandHandlerFn
StreamCommandHandlers map[string]StreamCommandHandlerFn
}
func MakeRpcServer(sendCh chan *RpcPacket, recvCh chan *RpcPacket) *RpcServer {
if cap(sendCh) < MaxInFlightPackets {
panic(fmt.Errorf("sendCh buffer size must be at least MaxInFlightPackets(%d)", MaxInFlightPackets))
}
rtn := &RpcServer{
CVar: sync.NewCond(&sync.Mutex{}),
NextSeqNum: &atomic.Int64{},
RespPacketsInFlight: make(map[int64]string),
AckList: nil,
RpcReqs: make(map[string]*RpcInfo),
SendCh: sendCh,
RecvCh: recvCh,
SimpleCommandHandlers: make(map[string]SimpleCommandHandlerFn),
StreamCommandHandlers: make(map[string]StreamCommandHandlerFn),
}
go rtn.runRecvLoop()
return rtn
}
func (s *RpcServer) shouldUseStreamHandler(command string) bool {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
_, ok := s.StreamCommandHandlers[command]
return ok
}
func (s *RpcServer) getStreamHandler(command string) StreamCommandHandlerFn {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
return s.StreamCommandHandlers[command]
}
func (s *RpcServer) getSimpleHandler(command string) SimpleCommandHandlerFn {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
return s.SimpleCommandHandlers[command]
}
func (s *RpcServer) RegisterSimpleCommandHandler(command string, handler SimpleCommandHandlerFn) {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
if s.StreamCommandHandlers[command] != nil {
panic(fmt.Errorf("command %q already registered as a stream handler", command))
}
s.SimpleCommandHandlers[command] = handler
}
func (s *RpcServer) RegisterStreamCommandHandler(command string, handler StreamCommandHandlerFn) {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
if s.SimpleCommandHandlers[command] != nil {
panic(fmt.Errorf("command %q already registered as a simple handler", command))
}
s.StreamCommandHandlers[command] = handler
}
func (s *RpcServer) runRecvLoop() {
defer func() {
if r := recover(); r != nil {
log.Printf("RpcServer.runRecvLoop() panic: %v", r)
debug.PrintStack()
}
}()
for pk := range s.RecvCh {
s.handleAcks(pk.Acks)
if pk.RpcType == RpcType_Req {
if s.shouldUseStreamHandler(pk.Command) {
s.handleStreamReq(pk)
} else {
s.handleSimpleReq(pk)
}
continue
}
log.Printf("RpcClient.runRecvLoop() bad packet type: %v", pk)
}
log.Printf("RpcServer.runRecvLoop() normal exit")
}
func (s *RpcServer) ackResp(seqNum int64) {
if seqNum == 0 {
return
}
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
s.AckList = append(s.AckList, seqNum)
}
func makeContextFromTimeout(timeout *TimeoutInfo) (context.Context, context.CancelFunc) {
if timeout == nil {
return context.Background(), func() {}
}
return context.WithDeadline(context.Background(), time.UnixMilli(timeout.Deadline))
}
func (s *RpcServer) SendResponse(ctx context.Context, pk *RpcPacket) error {
return s.waitForSend(ctx, pk)
}
func (s *RpcServer) waitForSend(ctx context.Context, pk *RpcPacket) error {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
for {
if ctx.Err() != nil {
return ctx.Err()
}
if len(s.RespPacketsInFlight) >= MaxInFlightPackets {
s.CVar.Wait()
continue
}
rpcInfo := s.RpcReqs[pk.RpcId]
if rpcInfo != nil {
if len(rpcInfo.PacketsInFlight) >= MaxUnackedPerRpc {
s.CVar.Wait()
continue
}
}
break
}
s.RespPacketsInFlight[pk.SeqNum] = pk.RpcId
pk.Acks = s.grabAcks_nolock()
s.SendCh <- pk
rpcInfo := s.RpcReqs[pk.RpcId]
if !pk.RespDone && rpcInfo != nil {
rpcInfo = &RpcInfo{
CloseSync: &sync.Once{},
RpcId: pk.RpcId,
PkCh: make(chan *RpcPacket, MaxUnackedPerRpc),
PacketsInFlight: make(map[int64]bool),
}
s.RpcReqs[pk.RpcId] = rpcInfo
}
if rpcInfo != nil {
rpcInfo.PacketsInFlight[pk.SeqNum] = true
}
if pk.RespDone {
delete(s.RpcReqs, pk.RpcId)
}
return nil
}
func (s *RpcServer) handleAcks(acks []int64) {
if len(acks) == 0 {
return
}
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
for _, ack := range acks {
rpcId, ok := s.RespPacketsInFlight[ack]
if !ok {
continue
}
rpcInfo := s.RpcReqs[rpcId]
if rpcInfo != nil {
delete(rpcInfo.PacketsInFlight, ack)
}
delete(s.RespPacketsInFlight, ack)
}
s.CVar.Broadcast()
}
func (s *RpcServer) handleSimpleReq(pk *RpcPacket) {
s.ackResp(pk.SeqNum)
handler := s.getSimpleHandler(pk.Command)
if handler == nil {
s.sendErrorResp(pk, fmt.Errorf("unknown command: %s", pk.Command))
log.Printf("RpcServer.handleReq() unknown command: %s", pk.Command)
return
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("RpcServer.handleReq(%q) panic: %v", pk.Command, r)
debug.PrintStack()
}
}()
ctx, cancelFn := makeContextFromTimeout(pk.Timeout)
defer cancelFn()
data, err := handler(ctx, s, pk.Command, pk.Data)
seqNum := s.NextSeqNum.Add(1)
respPk := &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: seqNum,
RespDone: true,
}
if err != nil {
respPk.Error = err.Error()
} else {
respPk.Data = data
}
s.waitForSend(ctx, respPk)
}()
}
func (s *RpcServer) grabAcks_nolock() []int64 {
acks := s.AckList
s.AckList = nil
return acks
}
func (s *RpcServer) sendErrorResp(pk *RpcPacket, err error) {
respPk := &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: true,
Error: err.Error(),
}
s.waitForSend(context.Background(), respPk)
}
func (s *RpcServer) makeRespPk(pk *RpcPacket, data any, done bool) *RpcPacket {
return &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: done,
Data: data,
}
}
func (s *RpcServer) handleStreamReq(pk *RpcPacket) {
s.ackResp(pk.SeqNum)
handler := s.getStreamHandler(pk.Command)
if handler == nil {
s.ackResp(pk.SeqNum)
s.sendErrorResp(pk, fmt.Errorf("unknown command: %s", pk.Command))
log.Printf("RpcServer.handleStreamReq() unknown command: %s", pk.Command)
return
}
go func() {
defer func() {
r := recover()
if r == nil {
return
}
log.Printf("RpcServer.handleStreamReq(%q) panic: %v", pk.Command, r)
debug.PrintStack()
respPk := &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: true,
Error: fmt.Sprintf("panic: %v", r),
}
s.waitForSend(context.Background(), respPk)
}()
ctx, cancelFn := makeContextFromTimeout(pk.Timeout)
defer cancelFn()
err := handler(ctx, s, pk)
if err != nil {
respPk := &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: true,
Error: err.Error(),
}
s.waitForSend(ctx, respPk)
return
}
// check if RespDone has been set, if not, send it here
}()
}

View File

@ -1,194 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshprc
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
)
func TestSimple(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
resp, err := client.SimpleReq(ctx, "test", "hello")
if err != nil {
t.Errorf("SimpleReq() failed: %v", err)
return
}
if resp != "world" {
t.Errorf("SimpleReq() failed: expected 'world', got '%s'", resp)
}
}()
go func() {
defer wg.Done()
req := <-sendCh
if req.Command != "test" {
t.Errorf("expected 'test', got '%s'", req.Command)
}
if req.Data != "hello" {
t.Errorf("expected 'hello', got '%s'", req.Data)
}
resp := &RpcPacket{
Command: "test",
RpcId: req.RpcId,
RpcType: RpcType_Resp,
SeqNum: 1,
RespDone: true,
Acks: []int64{req.SeqNum},
Data: "world",
}
recvCh <- resp
}()
wg.Wait()
}
func makeRpcResp(req *RpcPacket, data any, seqNum int64, done bool) *RpcPacket {
return &RpcPacket{
Command: req.Command,
RpcId: req.RpcId,
RpcType: RpcType_Resp,
SeqNum: seqNum,
RespDone: done,
Data: data,
}
}
func TestStream(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
respCh, err := client.StreamReq(ctx, "test", "hello", 1000)
if err != nil {
t.Errorf("StreamReq() failed: %v", err)
return
}
var output []string
for resp := range respCh {
if resp.Error != "" {
t.Errorf("StreamReq() failed: %v", resp.Error)
return
}
output = append(output, resp.Data.(string))
}
if len(output) != 3 {
t.Errorf("expected 3 responses, got %d (%v)", len(output), output)
return
}
if output[0] != "one" || output[1] != "two" || output[2] != "three" {
t.Errorf("expected 'one', 'two', 'three', got %v", output)
return
}
}()
go func() {
defer wg.Done()
req := <-sendCh
if req.Command != "test" {
t.Errorf("expected 'test', got '%s'", req.Command)
}
if req.Data != "hello" {
t.Errorf("expected 'hello', got '%s'", req.Data)
}
resp := makeRpcResp(req, "one", 1, false)
recvCh <- resp
resp = makeRpcResp(req, "two", 2, false)
recvCh <- resp
resp = makeRpcResp(req, "three", 3, true)
recvCh <- resp
}()
wg.Wait()
}
func TestSimpleClientServer(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
server := MakeRpcServer(recvCh, sendCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
server.RegisterSimpleCommandHandler("test", func(ctx context.Context, s *RpcServer, cmd string, data any) (any, error) {
if data != "hello" {
return nil, fmt.Errorf("expected 'hello', got '%s'", data)
}
return "world", nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.SimpleReq(ctx, "test", "hello")
if err != nil {
t.Errorf("SimpleReq() failed: %v", err)
return
}
if resp != "world" {
t.Errorf("SimpleReq() failed: expected 'world', got '%s'", resp)
}
}()
wg.Wait()
}
func TestStreamClientServer(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
server := MakeRpcServer(recvCh, sendCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
server.RegisterStreamCommandHandler("test", func(ctx context.Context, s *RpcServer, req *RpcPacket) error {
pk1 := s.makeRespPk(req, "one", false)
pk2 := s.makeRespPk(req, "two", false)
pk3 := s.makeRespPk(req, "three", true)
s.SendResponse(ctx, pk1)
s.SendResponse(ctx, pk2)
s.SendResponse(ctx, pk3)
return nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
respCh, err := client.StreamReq(ctx, "test", "hello", 2*time.Second)
if err != nil {
t.Errorf("StreamReq() failed: %v", err)
return
}
var result []string
for respPk := range respCh {
if respPk.Error != "" {
t.Errorf("StreamReq() failed: %v", respPk.Error)
return
}
log.Printf("got response: %#v", respPk)
result = append(result, respPk.Data.(string))
}
if len(result) != 3 {
t.Errorf("expected 3 responses, got %d", len(result))
return
}
if result[0] != "one" || result[1] != "two" || result[2] != "three" {
t.Errorf("expected 'one', 'two', 'three', got %v", result)
return
}
}()
wg.Wait()
}

View File

@ -0,0 +1,74 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generatewshclient/main-generatewshclient.go
package wshclient
import (
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
)
// command "controller:input", wshserver.BlockInputCommand
func BlockInputCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockInputData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "controller:input", data, opts)
return err
}
// command "controller:restart", wshserver.BlockRestartCommand
func BlockRestartCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockRestartData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "controller:restart", data, opts)
return err
}
// command "createblock", wshserver.CreateBlockCommand
func CreateBlockCommand(w *wshutil.WshRpc, data wshrpc.CommandCreateBlockData, opts *wshrpc.WshRpcCommandOpts) (*waveobj.ORef, error) {
resp, err := sendRpcRequestHelper[*waveobj.ORef](w, "createblock", data, opts)
return resp, err
}
// command "file:append", wshserver.AppendFileCommand
func AppendFileCommand(w *wshutil.WshRpc, data wshrpc.CommandAppendFileData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "file:append", data, opts)
return err
}
// command "file:appendijson", wshserver.AppendIJsonCommand
func AppendIJsonCommand(w *wshutil.WshRpc, data wshrpc.CommandAppendIJsonData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "file:appendijson", data, opts)
return err
}
// command "getmeta", wshserver.GetMetaCommand
func GetMetaCommand(w *wshutil.WshRpc, data wshrpc.CommandGetMetaData, opts *wshrpc.WshRpcCommandOpts) (map[string]interface {}, error) {
resp, err := sendRpcRequestHelper[map[string]interface {}](w, "getmeta", data, opts)
return resp, err
}
// command "message", wshserver.MessageCommand
func MessageCommand(w *wshutil.WshRpc, data wshrpc.CommandMessageData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "message", data, opts)
return err
}
// command "resolveids", wshserver.ResolveIdsCommand
func ResolveIdsCommand(w *wshutil.WshRpc, data wshrpc.CommandResolveIdsData, opts *wshrpc.WshRpcCommandOpts) (wshrpc.CommandResolveIdsRtnData, error) {
resp, err := sendRpcRequestHelper[wshrpc.CommandResolveIdsRtnData](w, "resolveids", data, opts)
return resp, err
}
// command "setmeta", wshserver.SetMetaCommand
func SetMetaCommand(w *wshutil.WshRpc, data wshrpc.CommandSetMetaData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "setmeta", data, opts)
return err
}
// command "setview", wshserver.BlockSetViewCommand
func BlockSetViewCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockSetViewData, opts *wshrpc.WshRpcCommandOpts) error {
_, err := sendRpcRequestHelper[any](w, "setview", data, opts)
return err
}

View File

@ -0,0 +1,30 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshclient
import (
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
)
func sendRpcRequestHelper[T any](w *wshutil.WshRpc, command string, data interface{}, opts *wshrpc.WshRpcCommandOpts) (T, error) {
var respData T
if opts.NoResponse {
err := w.SendCommand(command, data)
if err != nil {
return respData, err
}
return respData, nil
}
resp, err := w.SendRpcRequest(command, data, opts.Timeout)
if err != nil {
return respData, err
}
err = utilfn.ReUnmarshal(&respData, resp)
if err != nil {
return respData, err
}
return respData, nil
}

View File

@ -1,58 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshprc
import (
"context"
)
const (
MaxOpenRpcs = 10
MaxUnackedPerRpc = 10
MaxInFlightPackets = MaxOpenRpcs * MaxUnackedPerRpc
)
const (
RpcType_Req = "req"
RpcType_Resp = "resp"
)
const (
CommandType_Ack = ":ack"
CommandType_Ping = ":ping"
CommandType_Cancel = ":cancel"
CommandType_Timeout = ":timeout"
)
var rpcClientContextKey = struct{}{}
type TimeoutInfo struct {
Deadline int64 `json:"deadline,omitempty"`
ReqPacketTimeout int64 `json:"reqpackettimeout,omitempty"` // for streaming requests
RespPacketTimeout int64 `json:"resppackettimeout,omitempty"` // for streaming responses
}
type RpcPacket struct {
Command string `json:"command"`
RpcId string `json:"rpcid"`
RpcType string `json:"rpctype"`
SeqNum int64 `json:"seqnum"`
ReqDone bool `json:"reqdone"`
RespDone bool `json:"resdone"`
Acks []int64 `json:"acks,omitempty"` // seqnums acked
Timeout *TimeoutInfo `json:"timeout,omitempty"` // for initial request only
Data any `json:"data"` // json data for command
Error string `json:"error,omitempty"`
}
func GetRpcClient(ctx context.Context) *RpcClient {
if ctx == nil {
return nil
}
val := ctx.Value(rpcClientContextKey)
if val == nil {
return nil
}
return val.(*RpcClient)
}

130
pkg/wshrpc/wshrpctypes.go Normal file
View File

@ -0,0 +1,130 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// types and methods for wsh rpc calls
package wshrpc
import (
"reflect"
"github.com/wavetermdev/thenextwave/pkg/ijson"
"github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const (
Command_Message = "message"
Command_SetView = "setview"
Command_SetMeta = "setmeta"
Command_GetMeta = "getmeta"
Command_BlockInput = "controller:input"
Command_Restart = "controller:restart"
Command_AppendFile = "file:append"
Command_AppendIJson = "file:appendijson"
Command_ResolveIds = "resolveids"
Command_CreateBlock = "createblock"
)
type MetaDataType = map[string]any
var DataTypeMap = map[string]reflect.Type{
"meta": reflect.TypeOf(MetaDataType{}),
"resolveidsrtn": reflect.TypeOf(CommandResolveIdsRtnData{}),
"oref": reflect.TypeOf(waveobj.ORef{}),
}
// for frontend
type WshServerCommandMeta struct {
CommandType string `json:"commandtype"`
}
type WshRpcCommandOpts struct {
Timeout int `json:"timeout"`
NoResponse bool `json:"noresponse"`
}
func HackRpcContextIntoData(dataPtr any, rpcContext wshutil.RpcContext) {
dataVal := reflect.ValueOf(dataPtr).Elem()
dataType := dataVal.Type()
for i := 0; i < dataVal.NumField(); i++ {
field := dataVal.Field(i)
if !field.IsZero() {
continue
}
fieldType := dataType.Field(i)
tag := fieldType.Tag.Get("wshcontext")
if tag == "" {
continue
}
switch tag {
case "BlockId":
field.SetString(rpcContext.BlockId)
case "TabId":
field.SetString(rpcContext.TabId)
case "WindowId":
field.SetString(rpcContext.WindowId)
case "BlockORef":
if rpcContext.BlockId != "" {
field.Set(reflect.ValueOf(waveobj.MakeORef(wstore.OType_Block, rpcContext.BlockId)))
}
}
}
}
type CommandMessageData struct {
ORef waveobj.ORef `json:"oref" wshcontext:"BlockORef"`
Message string `json:"message"`
}
type CommandGetMetaData struct {
ORef waveobj.ORef `json:"oref" wshcontext:"BlockORef"`
}
type CommandSetMetaData struct {
ORef waveobj.ORef `json:"oref" wshcontext:"BlockORef"`
Meta MetaDataType `json:"meta"`
}
type CommandResolveIdsData struct {
Ids []string `json:"ids"`
}
type CommandResolveIdsRtnData struct {
ResolvedIds map[string]waveobj.ORef `json:"resolvedids"`
}
type CommandCreateBlockData struct {
TabId string `json:"tabid" wshcontext:"TabId"`
BlockDef *wstore.BlockDef `json:"blockdef"`
RtOpts *wstore.RuntimeOpts `json:"rtopts"`
}
type CommandBlockSetViewData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
View string `json:"view"`
}
type CommandBlockRestartData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
}
type CommandBlockInputData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
InputData64 string `json:"inputdata64,omitempty"`
SigName string `json:"signame,omitempty"`
TermSize *shellexec.TermSize `json:"termsize,omitempty"`
}
type CommandAppendFileData struct {
ZoneId string `json:"zoneid" wshcontext:"BlockId"`
FileName string `json:"filename"`
Data64 string `json:"data64"`
}
type CommandAppendIJsonData struct {
ZoneId string `json:"zoneid" wshcontext:"BlockId"`
FileName string `json:"filename"`
Data ijson.Command `json:"data"`
}

View File

@ -0,0 +1,417 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshserver
import (
"context"
"encoding/base64"
"fmt"
"io/fs"
"log"
"net"
"os"
"reflect"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const (
DefaultOutputChSize = 32
DefaultInputChSize = 32
)
type WshServer struct{}
var WshServerImpl = WshServer{}
var contextRType = reflect.TypeOf((*context.Context)(nil)).Elem()
type WshServerMethodDecl struct {
Command string
CommandType string
MethodName string
Method reflect.Value
CommandDataType reflect.Type
DefaultResponseDataType reflect.Type
RequestDataTypes []reflect.Type // for streaming requests
ResponseDataTypes []reflect.Type // for streaming responses
}
var WshServerCommandToDeclMap = map[string]*WshServerMethodDecl{
wshrpc.Command_Message: GetWshServerMethod(wshrpc.Command_Message, wshutil.RpcType_Call, "MessageCommand", WshServerImpl.MessageCommand),
wshrpc.Command_SetView: GetWshServerMethod(wshrpc.Command_SetView, wshutil.RpcType_Call, "BlockSetViewCommand", WshServerImpl.BlockSetViewCommand),
wshrpc.Command_SetMeta: GetWshServerMethod(wshrpc.Command_SetMeta, wshutil.RpcType_Call, "SetMetaCommand", WshServerImpl.SetMetaCommand),
wshrpc.Command_GetMeta: GetWshServerMethod(wshrpc.Command_GetMeta, wshutil.RpcType_Call, "GetMetaCommand", WshServerImpl.GetMetaCommand),
wshrpc.Command_ResolveIds: GetWshServerMethod(wshrpc.Command_ResolveIds, wshutil.RpcType_Call, "ResolveIdsCommand", WshServerImpl.ResolveIdsCommand),
wshrpc.Command_CreateBlock: GetWshServerMethod(wshrpc.Command_CreateBlock, wshutil.RpcType_Call, "CreateBlockCommand", WshServerImpl.CreateBlockCommand),
wshrpc.Command_Restart: GetWshServerMethod(wshrpc.Command_Restart, wshutil.RpcType_Call, "BlockRestartCommand", WshServerImpl.BlockRestartCommand),
wshrpc.Command_BlockInput: GetWshServerMethod(wshrpc.Command_BlockInput, wshutil.RpcType_Call, "BlockInputCommand", WshServerImpl.BlockInputCommand),
wshrpc.Command_AppendFile: GetWshServerMethod(wshrpc.Command_AppendFile, wshutil.RpcType_Call, "AppendFileCommand", WshServerImpl.AppendFileCommand),
wshrpc.Command_AppendIJson: GetWshServerMethod(wshrpc.Command_AppendIJson, wshutil.RpcType_Call, "AppendIJsonCommand", WshServerImpl.AppendIJsonCommand),
}
func GetWshServerMethod(command string, commandType string, methodName string, methodFunc any) *WshServerMethodDecl {
methodVal := reflect.ValueOf(methodFunc)
methodType := methodVal.Type()
if methodType.Kind() != reflect.Func {
panic(fmt.Sprintf("methodVal must be a function got [%v]", methodType))
}
if methodType.In(0) != contextRType {
panic(fmt.Sprintf("methodVal must have a context as the first argument %v", methodType))
}
var defResponseType reflect.Type
if methodType.NumOut() > 1 {
defResponseType = methodType.Out(0)
}
rtn := &WshServerMethodDecl{
Command: command,
CommandType: commandType,
MethodName: methodName,
Method: methodVal,
CommandDataType: methodType.In(1),
DefaultResponseDataType: defResponseType,
}
return rtn
}
func (ws *WshServer) MessageCommand(ctx context.Context, data wshrpc.CommandMessageData) error {
log.Printf("MESSAGE: %s | %q\n", data.ORef, data.Message)
return nil
}
func (ws *WshServer) GetMetaCommand(ctx context.Context, data wshrpc.CommandGetMetaData) (wshrpc.MetaDataType, error) {
log.Printf("calling meta: %s\n", data.ORef)
obj, err := wstore.DBGetORef(ctx, data.ORef)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil, fmt.Errorf("object not found: %s", data.ORef)
}
return waveobj.GetMeta(obj), nil
}
func (ws *WshServer) SetMetaCommand(ctx context.Context, data wshrpc.CommandSetMetaData) error {
oref := data.ORef
if oref.IsEmpty() {
return fmt.Errorf("no oref")
}
log.Printf("SETMETA: %s | %v\n", oref, data.Meta)
obj, err := wstore.DBGetORef(ctx, oref)
if err != nil {
return fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil
}
meta := waveobj.GetMeta(obj)
if meta == nil {
meta = make(map[string]any)
}
for k, v := range data.Meta {
if v == nil {
delete(meta, k)
continue
}
meta[k] = v
}
waveobj.SetMeta(obj, meta)
err = wstore.DBUpdate(ctx, obj)
if err != nil {
return fmt.Errorf("error updating block: %w", err)
}
sendWaveObjUpdate(oref)
return nil
}
func sendWaveObjUpdate(oref waveobj.ORef) {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
// send a waveobj:update event
waveObj, err := wstore.DBGetORef(ctx, oref)
if err != nil {
log.Printf("error getting object for update event: %v", err)
return
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: oref.String(),
Data: wstore.WaveObjUpdate{
UpdateType: wstore.UpdateType_Update,
OType: waveObj.GetOType(),
OID: waveobj.GetOID(waveObj),
Obj: waveObj,
},
})
}
func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) {
if strings.Contains(simpleId, ":") {
rtn, err := waveobj.ParseORef(simpleId)
if err != nil {
return nil, fmt.Errorf("error parsing simple id: %w", err)
}
return &rtn, nil
}
return wstore.DBResolveEasyOID(ctx, simpleId)
}
func (ws *WshServer) ResolveIdsCommand(ctx context.Context, data wshrpc.CommandResolveIdsData) (wshrpc.CommandResolveIdsRtnData, error) {
rtn := wshrpc.CommandResolveIdsRtnData{}
rtn.ResolvedIds = make(map[string]waveobj.ORef)
for _, simpleId := range data.Ids {
oref, err := resolveSimpleId(ctx, simpleId)
if err != nil || oref == nil {
continue
}
rtn.ResolvedIds[simpleId] = *oref
}
return rtn, nil
}
func sendWStoreUpdatesToEventBus(updates wstore.UpdatesRtnType) {
for _, update := range updates {
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: waveobj.MakeORef(update.OType, update.OID).String(),
Data: update,
})
}
}
func (ws *WshServer) CreateBlockCommand(ctx context.Context, data wshrpc.CommandCreateBlockData) (*waveobj.ORef, error) {
ctx = wstore.ContextWithUpdates(ctx)
tabId := data.TabId
if data.TabId != "" {
tabId = data.TabId
}
blockData, err := wstore.CreateBlock(ctx, tabId, data.BlockDef, data.RtOpts)
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
if blockData.Controller != "" {
// TODO
err = blockcontroller.StartBlockController(ctx, data.TabId, blockData.OID)
if err != nil {
return nil, fmt.Errorf("error starting block controller: %w", err)
}
}
updates := wstore.ContextGetUpdatesRtn(ctx)
sendWStoreUpdatesToEventBus(updates)
windowId, err := wstore.DBFindWindowForTabId(ctx, tabId)
if err != nil {
return nil, fmt.Errorf("error finding window for tab: %w", err)
}
if windowId == "" {
return nil, fmt.Errorf("no window found for tab")
}
eventbus.SendEventToWindow(windowId, eventbus.WSEventType{
EventType: eventbus.WSEvent_LayoutAction,
Data: &eventbus.WSLayoutActionData{
ActionType: "insert",
TabId: tabId,
BlockId: blockData.OID,
},
})
return &waveobj.ORef{OType: wstore.OType_Block, OID: blockData.OID}, nil
}
func (ws *WshServer) BlockSetViewCommand(ctx context.Context, data wshrpc.CommandBlockSetViewData) error {
log.Printf("SETVIEW: %s | %q\n", data.BlockId, data.View)
ctx = wstore.ContextWithUpdates(ctx)
block, err := wstore.DBGet[*wstore.Block](ctx, data.BlockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
block.View = data.View
err = wstore.DBUpdate(ctx, block)
if err != nil {
return fmt.Errorf("error updating block: %w", err)
}
updates := wstore.ContextGetUpdatesRtn(ctx)
sendWStoreUpdatesToEventBus(updates)
return nil
}
func (ws *WshServer) BlockRestartCommand(ctx context.Context, data wshrpc.CommandBlockRestartData) error {
bc := blockcontroller.GetBlockController(data.BlockId)
if bc == nil {
return fmt.Errorf("block controller not found for block %q", data.BlockId)
}
return bc.RestartController()
}
func (ws *WshServer) BlockInputCommand(ctx context.Context, data wshrpc.CommandBlockInputData) error {
bc := blockcontroller.GetBlockController(data.BlockId)
if bc == nil {
return fmt.Errorf("block controller not found for block %q", data.BlockId)
}
inputUnion := &blockcontroller.BlockInputUnion{
SigName: data.SigName,
TermSize: data.TermSize,
}
if len(data.InputData64) > 0 {
inputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(data.InputData64)))
nw, err := base64.StdEncoding.Decode(inputBuf, []byte(data.InputData64))
if err != nil {
return fmt.Errorf("error decoding input data: %w", err)
}
inputUnion.InputData = inputBuf[:nw]
}
return bc.SendInput(inputUnion)
}
func (ws *WshServer) AppendFileCommand(ctx context.Context, data wshrpc.CommandAppendFileData) error {
dataBuf, err := base64.StdEncoding.DecodeString(data.Data64)
if err != nil {
return fmt.Errorf("error decoding data64: %w", err)
}
err = filestore.WFS.AppendData(ctx, data.ZoneId, data.FileName, dataBuf)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(wstore.OType_Block, data.ZoneId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: data.ZoneId,
FileName: data.FileName,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(dataBuf),
},
})
return nil
}
func (ws *WshServer) AppendIJsonCommand(ctx context.Context, data wshrpc.CommandAppendIJsonData) error {
tryCreate := true
if data.FileName == blockcontroller.BlockFile_Html && tryCreate {
err := filestore.WFS.MakeFile(ctx, data.ZoneId, data.FileName, nil, filestore.FileOptsType{MaxSize: blockcontroller.DefaultHtmlMaxFileSize, IJson: true})
if err != nil && err != fs.ErrExist {
return fmt.Errorf("error creating blockfile[html]: %w", err)
}
}
err := filestore.WFS.AppendIJson(ctx, data.ZoneId, data.FileName, data.Data)
if err != nil {
return fmt.Errorf("error appending to blockfile(ijson): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(wstore.OType_Block, data.ZoneId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: data.ZoneId,
FileName: data.FileName,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString([]byte("{}")),
},
})
return nil
}
func decodeRtnVals(rtnVals []reflect.Value) (any, error) {
switch len(rtnVals) {
case 0:
return nil, nil
case 1:
errIf := rtnVals[0].Interface()
if errIf == nil {
return nil, nil
}
return nil, errIf.(error)
case 2:
errIf := rtnVals[1].Interface()
if errIf == nil {
return rtnVals[0].Interface(), nil
}
return rtnVals[0].Interface(), errIf.(error)
default:
return nil, fmt.Errorf("too many return values: %d", len(rtnVals))
}
}
func mainWshServerHandler(handler *wshutil.RpcResponseHandler) {
command := handler.GetCommand()
methodDecl := WshServerCommandToDeclMap[command]
if methodDecl == nil {
handler.SendResponseError(fmt.Errorf("command %q not found", command))
return
}
var callParams []reflect.Value
callParams = append(callParams, reflect.ValueOf(handler.Context()))
if methodDecl.CommandDataType != nil {
commandData := reflect.New(methodDecl.CommandDataType).Interface()
err := utilfn.ReUnmarshal(commandData, handler.GetCommandRawData())
if err != nil {
handler.SendResponseError(fmt.Errorf("error re-marshalling command data: %w", err))
return
}
wshrpc.HackRpcContextIntoData(commandData, handler.GetRpcContext())
callParams = append(callParams, reflect.ValueOf(commandData).Elem())
}
rtnVals := methodDecl.Method.Call(callParams)
rtnData, rtnErr := decodeRtnVals(rtnVals)
if rtnErr != nil {
handler.SendResponseError(rtnErr)
return
} else {
handler.SendResponse(rtnData, true)
}
}
func MakeUnixListener(sockName string) (net.Listener, error) {
os.Remove(sockName) // ignore error
rtn, err := net.Listen("unix", sockName)
if err != nil {
return nil, fmt.Errorf("error creating listener at %v: %v", sockName, err)
}
os.Chmod(sockName, 0700)
log.Printf("Server listening on %s\n", sockName)
return rtn, nil
}
func runWshRpcWithStream(conn net.Conn) {
defer conn.Close()
inputCh := make(chan []byte, DefaultInputChSize)
outputCh := make(chan []byte, DefaultOutputChSize)
go wshutil.AdaptMsgChToStream(outputCh, conn)
go wshutil.AdaptStreamToMsgCh(conn, inputCh)
wshutil.MakeWshRpc(inputCh, outputCh, wshutil.RpcContext{}, mainWshServerHandler)
}
func RunWshRpcOverListener(listener net.Listener) {
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("error accepting connection: %v\n", err)
continue
}
go runWshRpcWithStream(conn)
}
}()
}
func RunDomainSocketWshServer() error {
sockName := wavebase.GetDomainSocketName()
listener, err := MakeUnixListener(sockName)
if err != nil {
return fmt.Errorf("error starging unix listener for wsh-server: %w", err)
}
defer listener.Close()
RunWshRpcOverListener(listener)
return nil
}
func MakeWshServer(inputCh chan []byte, outputCh chan []byte, initialCtx wshutil.RpcContext) {
wshutil.MakeWshRpc(inputCh, outputCh, initialCtx, mainWshServerHandler)
}

View File

@ -1,102 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshutil
import (
"encoding/json"
"fmt"
)
type RpcMessageUnmarshalHelper struct {
Command string
ReqId string
ResId string
M map[string]any
Req *RpcRequest
Res *RpcResponse
}
func (helper *RpcMessageUnmarshalHelper) UnmarshalJSON(data []byte) error {
var rmap map[string]any
if err := json.Unmarshal(data, &rmap); err != nil {
return err
}
if command, ok := rmap["command"].(string); ok {
helper.Command = command
}
if reqid, ok := rmap["reqid"].(string); ok {
helper.ReqId = reqid
}
if resid, ok := rmap["resid"].(string); ok {
helper.ResId = resid
}
if helper.ReqId != "" && helper.ResId != "" {
return fmt.Errorf("both reqid and resid cannot be set")
}
if helper.Command == "" && helper.ResId == "" {
return fmt.Errorf("either command or resid must be set")
}
helper.M = rmap
if helper.Command != "" {
// ok, this is a request, so lets parse it
req, err := helper.parseRequest()
if err != nil {
return fmt.Errorf("error parsing request: %w", err)
}
helper.Req = req
} else {
// this is a response, parse it
res, err := helper.parseResponse()
if err != nil {
return fmt.Errorf("error parsing response: %w", err)
}
helper.Res = res
}
return nil
}
func (helper *RpcMessageUnmarshalHelper) parseRequest() (*RpcRequest, error) {
req := &RpcRequest{
ReqId: helper.ReqId,
}
if helper.M["timeoutms"] != nil {
timeoutMs, ok := helper.M["timeoutms"].(float64)
if !ok {
return nil, fmt.Errorf("timeoutms field is not a number")
}
req.TimeoutMs = int(timeoutMs)
}
cmd, err := ParseCmdMap(helper.M)
if err != nil {
return nil, fmt.Errorf("error parsing command: %w", err)
}
req.Command = cmd
return req, nil
}
func (helper *RpcMessageUnmarshalHelper) parseResponse() (*RpcResponse, error) {
rtn := &RpcResponse{
ResId: helper.ResId,
Data: helper.M,
}
if helper.M["error"] != nil {
errStr, ok := helper.M["error"].(string)
if !ok {
return nil, fmt.Errorf("error field is not a string")
}
rtn.Error = errStr
}
if helper.M["cont"] != nil {
cont, ok := helper.M["cont"].(bool)
if !ok {
return nil, fmt.Errorf("cont field is not a bool")
}
rtn.Cont = cont
}
delete(rtn.Data, "resid")
delete(rtn.Data, "error")
delete(rtn.Data, "cont")
return rtn, nil
}

View File

@ -5,10 +5,8 @@ package wshutil
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"sync"
)
@ -28,13 +26,13 @@ type PtyBuffer struct {
EscSeqBuf []byte
OSCPrefix string
InputReader io.Reader
MessageCh chan RpcMessage
MessageCh chan []byte
AtEOF bool
Err error
}
// closes messageCh when input is closed (or error)
func MakePtyBuffer(oscPrefix string, input io.Reader, messageCh chan RpcMessage) *PtyBuffer {
func MakePtyBuffer(oscPrefix string, input io.Reader, messageCh chan []byte) *PtyBuffer {
if len(oscPrefix) != WaveOSCPrefixLen {
panic(fmt.Sprintf("invalid OSC prefix length: %d", len(oscPrefix)))
}
@ -67,17 +65,7 @@ func (b *PtyBuffer) setEOF() {
}
func (b *PtyBuffer) processWaveEscSeq(escSeq []byte) {
var helper RpcMessageUnmarshalHelper
err := json.Unmarshal(escSeq, &helper)
if err != nil {
log.Printf("error unmarshalling Wave OSC sequence data: %v\n", err)
return
}
if helper.Req != nil {
b.MessageCh <- helper.Req
} else {
b.MessageCh <- helper.Res
}
b.MessageCh <- escSeq
}
func (b *PtyBuffer) run() {

View File

@ -1,206 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshutil
import (
"encoding/json"
"fmt"
"reflect"
"github.com/wavetermdev/thenextwave/pkg/ijson"
"github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const CommandKey = "command"
const (
BlockCommand_Message = "message"
BlockCommand_SetView = "setview"
BlockCommand_SetMeta = "setmeta"
BlockCommand_GetMeta = "getmeta"
BlockCommand_Input = "controller:input"
BlockCommand_Restart = "controller:restart"
BlockCommand_AppendBlockFile = "blockfile:append"
BlockCommand_AppendIJson = "blockfile:appendijson"
Command_ResolveIds = "resolveids"
Command_CreateBlock = "createblock"
)
var CommandToTypeMap = map[string]reflect.Type{
BlockCommand_Input: reflect.TypeOf(BlockInputCommand{}),
BlockCommand_Restart: reflect.TypeOf(BlockRestartCommand{}),
BlockCommand_SetView: reflect.TypeOf(BlockSetViewCommand{}),
BlockCommand_SetMeta: reflect.TypeOf(BlockSetMetaCommand{}),
BlockCommand_GetMeta: reflect.TypeOf(BlockGetMetaCommand{}),
BlockCommand_Message: reflect.TypeOf(BlockMessageCommand{}),
BlockCommand_AppendBlockFile: reflect.TypeOf(BlockAppendFileCommand{}),
BlockCommand_AppendIJson: reflect.TypeOf(BlockAppendIJsonCommand{}),
Command_ResolveIds: reflect.TypeOf(ResolveIdsCommand{}),
Command_CreateBlock: reflect.TypeOf(CreateBlockCommand{}),
}
func CommandTypeUnionMeta() tsgenmeta.TypeUnionMeta {
var rtypes []reflect.Type
orderedKeys := utilfn.GetOrderedMapKeys(CommandToTypeMap)
for _, typeKey := range orderedKeys {
rtype := CommandToTypeMap[typeKey]
rtypes = append(rtypes, rtype)
}
return tsgenmeta.TypeUnionMeta{
BaseType: reflect.TypeOf((*BlockCommand)(nil)).Elem(),
TypeFieldName: "command",
Types: rtypes,
}
}
type CmdContextType struct {
BlockId string
TabId string
}
type baseCommand struct {
Command string `json:"command"`
}
type BlockCommand interface {
GetCommand() string
}
type BlockControllerCommand interface {
GetBlockId() string
}
type BlockCommandWrapper struct {
BlockCommand
}
func ParseCmdMap(cmdMap map[string]any) (BlockCommand, error) {
cmdType, ok := cmdMap[CommandKey].(string)
if !ok {
return nil, fmt.Errorf("no %s field in command map", CommandKey)
}
mapJson, err := json.Marshal(cmdMap)
if err != nil {
return nil, fmt.Errorf("error marshalling command map: %w", err)
}
rtype := CommandToTypeMap[cmdType]
if rtype == nil {
return nil, fmt.Errorf("unknown command type %q", cmdType)
}
cmd := reflect.New(rtype).Interface()
err = json.Unmarshal(mapJson, cmd)
if err != nil {
return nil, fmt.Errorf("error unmarshalling command: %w", err)
}
return cmd.(BlockCommand), nil
}
type BlockRestartCommand struct {
Command string `json:"command" tstype:"\"controller:restart\""`
BlockId string `json:"blockid"`
}
func (rc *BlockRestartCommand) GetCommand() string {
return BlockCommand_Restart
}
func (rc *BlockRestartCommand) GetBlockId() string {
return rc.BlockId
}
type BlockInputCommand struct {
BlockId string `json:"blockid"`
Command string `json:"command" tstype:"\"controller:input\""`
InputData64 string `json:"inputdata64,omitempty"`
SigName string `json:"signame,omitempty"`
TermSize *shellexec.TermSize `json:"termsize,omitempty"`
}
func (ic *BlockInputCommand) GetCommand() string {
return BlockCommand_Input
}
func (ic *BlockInputCommand) GetBlockId() string {
return ic.BlockId
}
type ResolveIdsCommand struct {
Command string `json:"command" tstype:"\"resolveids\""`
Ids []string `json:"ids"`
}
func (ric *ResolveIdsCommand) GetCommand() string {
return Command_ResolveIds
}
type BlockSetViewCommand struct {
Command string `json:"command" tstype:"\"setview\""`
View string `json:"view"`
}
func (svc *BlockSetViewCommand) GetCommand() string {
return BlockCommand_SetView
}
type BlockGetMetaCommand struct {
Command string `json:"command" tstype:"\"getmeta\""`
ORef string `json:"oref"` // oref string
}
func (gmc *BlockGetMetaCommand) GetCommand() string {
return BlockCommand_GetMeta
}
type BlockSetMetaCommand struct {
Command string `json:"command" tstype:"\"setmeta\""`
ORef string `json:"oref,omitempty"` // allows oref, 8-char oid, or full uuid (empty is current block)
Meta map[string]any `json:"meta"`
}
func (smc *BlockSetMetaCommand) GetCommand() string {
return BlockCommand_SetMeta
}
type BlockMessageCommand struct {
Command string `json:"command" tstype:"\"message\""`
Message string `json:"message"`
}
func (bmc *BlockMessageCommand) GetCommand() string {
return BlockCommand_Message
}
type BlockAppendFileCommand struct {
Command string `json:"command" tstype:"\"blockfile:append\""`
FileName string `json:"filename"`
Data []byte `json:"data"`
}
func (bwc *BlockAppendFileCommand) GetCommand() string {
return BlockCommand_AppendBlockFile
}
type BlockAppendIJsonCommand struct {
Command string `json:"command" tstype:"\"blockfile:appendijson\""`
FileName string `json:"filename"`
Data ijson.Command `json:"data"`
}
func (bwc *BlockAppendIJsonCommand) GetCommand() string {
return BlockCommand_AppendIJson
}
type CreateBlockCommand struct {
Command string `json:"command" tstype:"\"createblock\""`
TabId string `json:"tabid"`
BlockDef *wstore.BlockDef `json:"blockdef"`
RtOpts *wstore.RuntimeOpts `json:"rtopts,omitempty"`
}
func (cbc *CreateBlockCommand) GetCommand() string {
return Command_CreateBlock
}

View File

@ -9,217 +9,204 @@ import (
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
)
const DefaultTimeoutMs = 5000
const RespChSize = 32
const DefaultOutputChSize = 32
const DefaultMessageChSize = 32
type ResponseDataType = map[string]any
type ResponseFnType = func(ResponseDataType) error
type CommandHandlerFnType = func(context.Context, BlockCommand, ResponseFnType) (ResponseDataType, error)
const (
RpcType_Call = "call" // single response (regular rpc)
RpcType_ResponseStream = "responsestream" // stream of responses (streaming rpc)
RpcType_StreamingRequest = "streamingrequest" // streaming request
RpcType_Complex = "complex" // streaming request/response
)
type RpcMessage interface {
IsRpcRequest() bool
type ResponseFnType = func(any) error
type CommandHandlerFnType = func(*RpcResponseHandler)
type wshRpcContextKey struct{}
func withWshRpcContext(ctx context.Context, wshRpc *WshRpc) context.Context {
return context.WithValue(ctx, wshRpcContextKey{}, wshRpc)
}
func GetWshRpcFromContext(ctx context.Context) *WshRpc {
rtn := ctx.Value(wshRpcContextKey{})
if rtn == nil {
return nil
}
return rtn.(*WshRpc)
}
type RpcMessage struct {
Command string `json:"command,omitempty"`
ReqId string `json:"reqid,omitempty"`
ResId string `json:"resid,omitempty"`
Timeout int `json:"timeout,omitempty"`
Cont bool `json:"cont,omitempty"`
Error string `json:"error,omitempty"`
DataType string `json:"datatype,omitempty"`
Data any `json:"data,omitempty"`
}
func (r *RpcMessage) IsRpcRequest() bool {
return r.Command != "" || r.ReqId != ""
}
func (r *RpcMessage) Validate() error {
if r.Command != "" {
if r.ResId != "" {
return fmt.Errorf("command packets may not have resid set")
}
if r.Error != "" {
return fmt.Errorf("command packets may not have error set")
}
if r.DataType != "" {
return fmt.Errorf("command packets may not have datatype set")
}
return nil
}
if r.ReqId != "" {
if r.ResId == "" {
return fmt.Errorf("request packets must have resid set")
}
if r.Timeout != 0 {
return fmt.Errorf("non-command request packets may not have timeout set")
}
return nil
}
if r.ResId != "" {
if r.Command != "" {
return fmt.Errorf("response packets may not have command set")
}
if r.ReqId == "" {
return fmt.Errorf("response packets must have reqid set")
}
if r.Timeout != 0 {
return fmt.Errorf("response packets may not have timeout set")
}
return nil
}
return fmt.Errorf("invalid packet: must have command, reqid, or resid set")
}
type RpcContext struct {
BlockId string `json:"blockid,omitempty"`
TabId string `json:"tabid,omitempty"`
WindowId string `json:"windowid,omitempty"`
}
type WshRpc struct {
Lock *sync.Mutex
InputCh chan RpcMessage
OutputCh chan []byte
OSCEsc string // either 23198 or 23199
RpcMap map[string]*rpcData
HandlerFn CommandHandlerFnType
}
type RpcRequest struct {
ReqId string
TimeoutMs int
Command BlockCommand
}
func (r *RpcRequest) IsRpcRequest() bool {
return true
}
func (r *RpcRequest) MarshalJSON() ([]byte, error) {
if r == nil {
return []byte("null"), nil
}
rtn := make(map[string]any)
utilfn.DoMapStucture(&rtn, r.Command)
rtn["command"] = r.Command.GetCommand()
if r.ReqId != "" {
rtn["reqid"] = r.ReqId
} else {
delete(rtn, "reqid")
}
if r.TimeoutMs != 0 {
rtn["timeoutms"] = float64(r.TimeoutMs)
} else {
delete(rtn, "timeoutms")
}
return json.Marshal(rtn)
}
type RpcResponse struct {
ResId string `json:"resid"`
Error string `json:"error,omitempty"`
Cont bool `json:"cont,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
func (r *RpcResponse) IsRpcRequest() bool {
return false
}
func (r *RpcResponse) MarshalJSON() ([]byte, error) {
rtn := make(map[string]any)
// rest goes first (since other fields will overwrite)
for k, v := range r.Data {
rtn[k] = v
}
rtn["resid"] = r.ResId
if r.Error != "" {
rtn["error"] = r.Error
} else {
delete(rtn, "error")
}
if r.Cont {
rtn["cont"] = true
} else {
delete(rtn, "cont")
}
return json.Marshal(rtn)
Lock *sync.Mutex
InputCh chan []byte
OutputCh chan []byte
RpcContext *atomic.Pointer[RpcContext]
RpcMap map[string]*rpcData
HandlerFn CommandHandlerFnType
}
type rpcData struct {
ResCh chan *RpcResponse
Ctx context.Context
CancelFn context.CancelFunc
ResCh chan *RpcMessage
Ctx context.Context
}
// oscEsc is the OSC escape sequence to use for *sending* messages
// closes outputCh when inputCh is closed/done
func MakeWshRpc(oscEsc string, inputCh chan RpcMessage, commandHandlerFn CommandHandlerFnType) (*WshRpc, chan []byte) {
if len(oscEsc) != 5 {
panic("oscEsc must be 5 characters")
}
outputCh := make(chan []byte, DefaultOutputChSize)
func MakeWshRpc(inputCh chan []byte, outputCh chan []byte, rpcCtx RpcContext, commandHandlerFn CommandHandlerFnType) *WshRpc {
rtn := &WshRpc{
Lock: &sync.Mutex{},
InputCh: inputCh,
OutputCh: outputCh,
OSCEsc: oscEsc,
RpcMap: make(map[string]*rpcData),
HandlerFn: commandHandlerFn,
Lock: &sync.Mutex{},
InputCh: inputCh,
OutputCh: outputCh,
RpcMap: make(map[string]*rpcData),
RpcContext: &atomic.Pointer[RpcContext]{},
HandlerFn: commandHandlerFn,
}
rtn.RpcContext.Store(&rpcCtx)
go rtn.runServer()
return rtn, outputCh
return rtn
}
func (w *WshRpc) handleRequest(req *RpcRequest) {
func (w *WshRpc) GetRpcContext() RpcContext {
rtnPtr := w.RpcContext.Load()
return *rtnPtr
}
func (w *WshRpc) SetRpcContext(ctx RpcContext) {
w.RpcContext.Store(&ctx)
}
func (w *WshRpc) handleRequest(req *RpcMessage) {
var respHandler *RpcResponseHandler
defer func() {
if r := recover(); r != nil {
errResp := &RpcResponse{
ResId: req.ReqId,
Error: fmt.Sprintf("panic: %v", r),
log.Printf("panic in handleRequest: %v\n", r)
debug.PrintStack()
if respHandler != nil {
respHandler.SendResponseError(fmt.Errorf("panic: %v", r))
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, errResp)
if err != nil {
return
}
w.OutputCh <- barr
}
}()
respFn := func(resp ResponseDataType) error {
if req.ReqId == "" {
// request is not expecting a response
return nil
}
respMsg := &RpcResponse{
ResId: req.ReqId,
Cont: true,
Data: resp,
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, respMsg)
if err != nil {
return fmt.Errorf("error marshalling response to json: %w", err)
}
w.OutputCh <- barr
return nil
}
timeoutMs := req.TimeoutMs
timeoutMs := req.Timeout
if timeoutMs <= 0 {
timeoutMs = DefaultTimeoutMs
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
ctx = withWshRpcContext(ctx, w)
defer cancelFn()
respData, err := w.HandlerFn(ctx, req.Command, respFn)
log.Printf("handler for %q returned resp: %v\n", req.Command.GetCommand(), respData)
if req.ReqId == "" {
// no response expected
if err != nil {
log.Printf("error handling request (no response): %v\n", err)
}
return
respHandler = &RpcResponseHandler{
w: w,
ctx: ctx,
reqId: req.ReqId,
command: req.Command,
commandData: req.Data,
done: &atomic.Bool{},
rpcCtx: w.GetRpcContext(),
}
if err != nil {
errResp := &RpcResponse{
ResId: req.ReqId,
Error: err.Error(),
defer func() {
if r := recover(); r != nil {
log.Printf("panic in handleRequest: %v\n", r)
debug.PrintStack()
respHandler.SendResponseError(fmt.Errorf("panic: %v", r))
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, errResp)
if err != nil {
return
}
w.OutputCh <- barr
return
respHandler.finalize()
}()
if w.HandlerFn != nil {
w.HandlerFn(respHandler)
}
respMsg := &RpcResponse{
ResId: req.ReqId,
Data: respData,
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, respMsg)
if err != nil {
respMsg := &RpcResponse{
ResId: req.ReqId,
Error: err.Error(),
}
barr, _ = EncodeWaveOSCMessageEx(w.OSCEsc, respMsg)
}
w.OutputCh <- barr
}
func (w *WshRpc) runServer() {
defer close(w.OutputCh)
for msg := range w.InputCh {
for msgBytes := range w.InputCh {
var msg RpcMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
log.Printf("wshrpc received bad message: %v\n", err)
continue
}
if msg.IsRpcRequest() {
if w.HandlerFn == nil {
continue
}
req := msg.(*RpcRequest)
w.handleRequest(req)
w.handleRequest(&msg)
} else {
resp := msg.(*RpcResponse)
respCh := w.getResponseCh(resp.ResId)
respCh := w.getResponseCh(msg.ResId)
if respCh == nil {
continue
}
respCh <- resp
if !resp.Cont {
w.unregisterRpc(resp.ResId, nil)
respCh <- &msg
if !msg.Cont {
w.unregisterRpc(msg.ResId, nil)
}
}
}
}
func (w *WshRpc) getResponseCh(resId string) chan *RpcResponse {
func (w *WshRpc) getResponseCh(resId string) chan *RpcMessage {
if resId == "" {
return nil
}
@ -238,28 +225,13 @@ func (w *WshRpc) SetHandler(handler CommandHandlerFnType) {
w.HandlerFn = handler
}
// no response
func (w *WshRpc) SendCommand(cmd BlockCommand) error {
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, &RpcRequest{Command: cmd})
if err != nil {
return fmt.Errorf("error marshalling request to json: %w", err)
}
w.OutputCh <- barr
return nil
}
func (w *WshRpc) registerRpc(reqId string, timeoutMs int) chan *RpcResponse {
func (w *WshRpc) registerRpc(ctx context.Context, reqId string) chan *RpcMessage {
w.Lock.Lock()
defer w.Lock.Unlock()
if timeoutMs <= 0 {
timeoutMs = DefaultTimeoutMs
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
rpcCh := make(chan *RpcResponse, RespChSize)
rpcCh := make(chan *RpcMessage, RespChSize)
w.RpcMap[reqId] = &rpcData{
ResCh: rpcCh,
Ctx: ctx,
CancelFn: cancelFn,
ResCh: rpcCh,
Ctx: ctx,
}
go func() {
<-ctx.Done()
@ -272,59 +244,178 @@ func (w *WshRpc) unregisterRpc(reqId string, err error) {
w.Lock.Lock()
defer w.Lock.Unlock()
rd := w.RpcMap[reqId]
if rd != nil {
if err != nil {
errResp := &RpcResponse{
ResId: reqId,
Error: err.Error(),
}
rd.ResCh <- errResp
if rd == nil {
return
}
if err != nil {
errResp := &RpcMessage{
ResId: reqId,
Error: err.Error(),
}
close(rd.ResCh)
rd.CancelFn()
rd.ResCh <- errResp
}
delete(w.RpcMap, reqId)
close(rd.ResCh)
}
// no response
func (w *WshRpc) SendCommand(command string, data any) error {
handler, err := w.SendComplexRequest(command, data, false, 0)
if err != nil {
return err
}
handler.finalize()
return nil
}
// single response
func (w *WshRpc) SendRpcRequest(cmd BlockCommand, timeoutMs int) (map[string]any, error) {
if timeoutMs < 0 {
return nil, fmt.Errorf("timeout must be >= 0")
}
req := &RpcRequest{
Command: cmd,
ReqId: uuid.New().String(),
TimeoutMs: timeoutMs,
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, req)
func (w *WshRpc) SendRpcRequest(command string, data any, timeoutMs int) (any, error) {
handler, err := w.SendComplexRequest(command, data, true, timeoutMs)
if err != nil {
return nil, fmt.Errorf("error marshalling request to ANSI esc: %w", err)
return nil, err
}
rpcCh := w.registerRpc(req.ReqId, timeoutMs)
defer w.unregisterRpc(req.ReqId, nil)
w.OutputCh <- barr
resp := <-rpcCh
defer handler.finalize()
return handler.NextResponse()
}
type RpcRequestHandler struct {
w *WshRpc
ctx context.Context
cancelFn func()
reqId string
respCh chan *RpcMessage
}
func (handler *RpcRequestHandler) Context() context.Context {
return handler.ctx
}
func (handler *RpcRequestHandler) ResponseDone() bool {
select {
case _, more := <-handler.respCh:
return !more
default:
return false
}
}
func (handler *RpcRequestHandler) NextResponse() (any, error) {
resp := <-handler.respCh
if resp.Error != "" {
return nil, errors.New(resp.Error)
}
return resp.Data, nil
}
// streaming response
func (w *WshRpc) SendRpcRequestEx(cmd BlockCommand, timeoutMs int) (chan *RpcResponse, error) {
if timeoutMs < 0 {
return nil, fmt.Errorf("timeout must be >= 0")
func (handler *RpcRequestHandler) finalize() {
if handler.cancelFn != nil {
handler.cancelFn()
}
req := &RpcRequest{
Command: cmd,
ReqId: uuid.New().String(),
TimeoutMs: timeoutMs,
if handler.reqId != "" {
handler.w.unregisterRpc(handler.reqId, nil)
}
barr, err := EncodeWaveOSCMessageEx(w.OSCEsc, req)
if err != nil {
return nil, fmt.Errorf("error marshalling request to json: %w", err)
}
rpcCh := w.registerRpc(req.ReqId, timeoutMs)
w.OutputCh <- barr
return rpcCh, nil
}
type RpcResponseHandler struct {
w *WshRpc
ctx context.Context
reqId string
command string
commandData any
rpcCtx RpcContext
done *atomic.Bool
}
func (handler *RpcResponseHandler) Context() context.Context {
return handler.ctx
}
func (handler *RpcResponseHandler) GetCommand() string {
return handler.command
}
func (handler *RpcResponseHandler) GetCommandRawData() any {
return handler.commandData
}
func (handler *RpcResponseHandler) GetRpcContext() RpcContext {
return handler.rpcCtx
}
func (handler *RpcResponseHandler) SendResponse(data any, done bool) error {
if handler.reqId == "" {
return nil // no response expected
}
if handler.done.Load() {
return fmt.Errorf("request already done, cannot send additional response")
}
if done {
handler.done.Store(true)
}
msg := &RpcMessage{
ResId: handler.reqId,
Data: data,
Cont: !done,
}
barr, err := json.Marshal(msg)
if err != nil {
return err
}
handler.w.OutputCh <- barr
return nil
}
func (handler *RpcResponseHandler) SendResponseError(err error) {
if handler.reqId == "" || handler.done.Load() {
return
}
handler.done.Store(true)
msg := &RpcMessage{
ResId: handler.reqId,
Error: err.Error(),
}
barr, _ := json.Marshal(msg) // will never fail
handler.w.OutputCh <- barr
}
func (handler *RpcResponseHandler) finalize() {
if handler.reqId == "" || handler.done.Load() {
return
}
handler.done.Store(true)
handler.SendResponse(nil, true)
}
func (handler *RpcResponseHandler) IsDone() bool {
return handler.done.Load()
}
func (w *WshRpc) SendComplexRequest(command string, data any, expectsResponse bool, timeoutMs int) (*RpcRequestHandler, error) {
if command == "" {
return nil, fmt.Errorf("command cannot be empty")
}
handler := &RpcRequestHandler{
w: w,
}
if timeoutMs < 0 {
handler.ctx = context.Background()
} else {
handler.ctx, handler.cancelFn = context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
}
if expectsResponse {
handler.reqId = uuid.New().String()
}
req := &RpcMessage{
Command: command,
ReqId: handler.reqId,
Data: data,
Timeout: timeoutMs,
}
barr, err := json.Marshal(req)
if err != nil {
return nil, err
}
handler.respCh = w.registerRpc(handler.ctx, handler.reqId)
w.OutputCh <- barr
return handler, nil
}

85
pkg/wshutil/wshrpcio.go Normal file
View File

@ -0,0 +1,85 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshutil
import (
"bytes"
"fmt"
"io"
)
// special I/O wrappers for wshrpc
// * terminal (wrap with OSC codes)
// * stream (json lines)
// * websocket (json packets)
type lineBuf struct {
buf []byte
inLongLine bool
}
const maxLineLength = 128 * 1024
func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]byte)) {
for len(readBuf) > 0 {
nlIdx := bytes.IndexByte(readBuf, '\n')
if nlIdx == -1 {
if lineBuf.inLongLine || len(lineBuf.buf)+len(readBuf) > maxLineLength {
lineBuf.buf = nil
lineBuf.inLongLine = true
return
}
lineBuf.buf = append(lineBuf.buf, readBuf...)
return
}
if !lineBuf.inLongLine && len(lineBuf.buf)+nlIdx <= maxLineLength {
line := append(lineBuf.buf, readBuf[:nlIdx]...)
lineFn(line)
}
lineBuf.buf = nil
lineBuf.inLongLine = false
readBuf = readBuf[nlIdx+1:]
}
}
func streamToLines(input io.Reader, lineFn func([]byte)) {
var lineBuf lineBuf
readBuf := make([]byte, 16*1024)
for {
n, err := input.Read(readBuf)
streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn)
if err != nil {
break
}
}
}
func AdaptStreamToMsgCh(input io.Reader, output chan []byte) {
streamToLines(input, func(line []byte) {
output <- line
})
}
func AdaptMsgChToStream(outputCh chan []byte, output io.Writer) error {
for msg := range outputCh {
if _, err := output.Write(msg); err != nil {
return fmt.Errorf("error writing to output: %w", err)
}
}
return nil
}
func AdaptMsgChToPty(outputCh chan []byte, oscEsc string, output io.Writer) error {
if len(oscEsc) != 5 {
panic("oscEsc must be 5 characters")
}
for msg := range outputCh {
barr := EncodeWaveOSCBytes(oscEsc, msg)
_, err := output.Write(barr)
if err != nil {
return fmt.Errorf("error writing to output: %w", err)
}
}
return nil
}

View File

@ -5,10 +5,8 @@ package wshutil
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
)
// these should both be 5 characters
@ -49,22 +47,9 @@ func makeOscPrefix(oscNum string) []byte {
return output
}
func EncodeWaveReq(cmd BlockCommand) ([]byte, error) {
req := &RpcRequest{Command: cmd}
return EncodeWaveOSCMessage(req)
}
func EncodeWaveOSCMessage(msg RpcMessage) ([]byte, error) {
return EncodeWaveOSCMessageEx(WaveOSC, msg)
}
func EncodeWaveOSCMessageEx(oscNum string, msg RpcMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("nil message")
}
barr, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("error marshalling message to json: %w", err)
func EncodeWaveOSCBytes(oscNum string, barr []byte) []byte {
if len(oscNum) != 5 {
panic("oscNum must be 5 characters")
}
hasControlChars := false
for _, b := range barr {
@ -80,7 +65,7 @@ func EncodeWaveOSCMessageEx(oscNum string, msg RpcMessage) ([]byte, error) {
copyOscPrefix(output, oscNum)
copy(output[oscPrefixLen(oscNum):], barr)
output[len(output)-1] = BEL
return output, nil
return output
}
var buf bytes.Buffer
@ -96,38 +81,16 @@ func EncodeWaveOSCMessageEx(oscNum string, msg RpcMessage) ([]byte, error) {
}
}
buf.WriteByte(BEL)
return buf.Bytes(), nil
return buf.Bytes()
}
func decodeWaveOSCMessage(data []byte) (BlockCommand, error) {
var baseCmd baseCommand
err := json.Unmarshal(data, &baseCmd)
func EncodeWaveOSCMessageEx(oscNum string, msg *RpcMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("nil message")
}
barr, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("error unmarshalling json: %w", err)
return nil, fmt.Errorf("error marshalling message to json: %w", err)
}
rtnCmd := reflect.New(CommandToTypeMap[baseCmd.Command]).Interface()
err = json.Unmarshal(data, rtnCmd)
if err != nil {
return nil, fmt.Errorf("error unmarshalling json: %w", err)
}
return rtnCmd.(BlockCommand), nil
}
// data does not contain the escape sequence, just the innards
// this function implements the switch between JSON and base64-JSON
func DecodeWaveOSCMessage(data []byte) (BlockCommand, error) {
if len(data) == 0 {
return nil, fmt.Errorf("empty data")
}
if data[0] != '{' {
// decode base64
rtnLen := base64.StdEncoding.DecodedLen(len(data))
rtn := make([]byte, rtnLen)
nw, err := base64.StdEncoding.Decode(rtn, data)
if err != nil {
return nil, fmt.Errorf("error decoding base64: %w", err)
}
return decodeWaveOSCMessage(rtn[:nw])
}
return decodeWaveOSCMessage(data)
return EncodeWaveOSCBytes(oscNum, barr), nil
}

View File

@ -54,6 +54,10 @@ func genericCastWithErr[T any](v any, err error) (T, error) {
var zeroVal T
return zeroVal, err
}
if v == nil {
var zeroVal T
return zeroVal, nil
}
return v.(T), err
}