From 3939648bbb909ed00e9008a34b488adfb7d92613 Mon Sep 17 00:00:00 2001 From: Mike Sawka Date: Mon, 16 Sep 2024 11:59:39 -0700 Subject: [PATCH] fe wsh router + wsh client impl (#381) --- cmd/generatets/main-generatets.go | 25 +-- frontend/app/block/blockframe.tsx | 15 +- frontend/app/element/markdown.tsx | 7 +- frontend/app/modals/tos.tsx | 5 +- frontend/app/store/services.ts | 2 +- frontend/app/store/wps.ts | 8 +- frontend/app/store/wshclient.ts | 150 ++++++++++++++++ frontend/app/store/wshclientapi.ts | 212 +++++++++++++++++++++++ frontend/app/store/wshrouter.ts | 155 +++++++++++++++++ frontend/app/store/wshrpc.ts | 230 ------------------------- frontend/app/store/wshrpcutil.ts | 147 ++++++++++++++++ frontend/app/store/wshserver.ts | 207 ---------------------- frontend/app/view/cpuplot/cpuplot.tsx | 10 +- frontend/app/view/preview/preview.tsx | 5 +- frontend/app/view/term/term.tsx | 24 ++- frontend/app/view/term/termsticker.tsx | 5 +- frontend/app/view/term/termwrap.ts | 12 +- frontend/app/view/waveai/waveai.tsx | 5 +- frontend/types/custom.d.ts | 11 ++ frontend/types/gotypes.d.ts | 2 +- frontend/util/fetchutil.ts | 10 +- frontend/wave.ts | 9 +- pkg/tsgen/tsgen.go | 22 +-- pkg/wshrpc/wshclient/wshclient.go | 18 +- pkg/wshrpc/wshrpctypes.go | 8 +- pkg/wshutil/wshrouter.go | 20 ++- 26 files changed, 806 insertions(+), 518 deletions(-) create mode 100644 frontend/app/store/wshclient.ts create mode 100644 frontend/app/store/wshclientapi.ts create mode 100644 frontend/app/store/wshrouter.ts delete mode 100644 frontend/app/store/wshrpc.ts create mode 100644 frontend/app/store/wshrpcutil.ts delete mode 100644 frontend/app/store/wshserver.ts diff --git a/cmd/generatets/main-generatets.go b/cmd/generatets/main-generatets.go index d1db0cc74..446312adc 100644 --- a/cmd/generatets/main-generatets.go +++ b/cmd/generatets/main-generatets.go @@ -30,9 +30,12 @@ func generateTypesFile(tsTypesMap map[reflect.Type]string) error { os.Exit(1) } err = tsgen.GenerateWshServerTypes(tsTypesMap) + if err != nil { + return fmt.Errorf("error generating wsh server types: %w", err) + } fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n") fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n") - fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n") + fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n") fmt.Fprintf(fd, "declare global {\n\n") var keys []reflect.Type for key := range tsTypesMap { @@ -66,7 +69,7 @@ func generateServicesFile(tsTypesMap map[reflect.Type]string) error { fmt.Fprintf(os.Stderr, "generating services file to %s\n", fd.Name()) fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n") fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n") - fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n") + fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n") fmt.Fprintf(fd, "import * as WOS from \"./wos\";\n\n") orderedKeys := utilfn.GetOrderedMapKeys(service.ServiceMap) for _, serviceName := range orderedKeys { @@ -78,29 +81,29 @@ func generateServicesFile(tsTypesMap map[reflect.Type]string) error { return nil } -func generateWshServerFile(tsTypeMap map[reflect.Type]string) error { - fd, err := os.Create("frontend/app/store/wshserver.ts") +func generateWshClientApiFile(tsTypeMap map[reflect.Type]string) error { + fd, err := os.Create("frontend/app/store/wshclientapi.ts") if err != nil { return err } defer fd.Close() declMap := wshrpc.GenerateWshCommandDeclMap() - fmt.Fprintf(os.Stderr, "generating wshserver file to %s\n", fd.Name()) + fmt.Fprintf(os.Stderr, "generating wshclientapi file to %s\n", fd.Name()) fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n") fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n") - fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n") - fmt.Fprintf(fd, "import { wshServerRpcHelper_call, wshServerRpcHelper_responsestream } from \"./wshrpc\";\n\n") + fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n") + fmt.Fprintf(fd, "import { WshClient } from \"./wshclient\";\n\n") orderedKeys := utilfn.GetOrderedMapKeys(declMap) fmt.Fprintf(fd, "// WshServerCommandToDeclMap\n") - fmt.Fprintf(fd, "class WshServerType {\n") + fmt.Fprintf(fd, "class RpcApiType {\n") for _, methodDecl := range orderedKeys { methodDecl := declMap[methodDecl] - methodStr := tsgen.GenerateWshServerMethod(methodDecl, tsTypeMap) + methodStr := tsgen.GenerateWshClientApiMethod(methodDecl, tsTypeMap) fmt.Fprint(fd, methodStr) fmt.Fprintf(fd, "\n") } fmt.Fprintf(fd, "}\n\n") - fmt.Fprintf(fd, "export const WshServer = new WshServerType();\n") + fmt.Fprintf(fd, "export const RpcApi = new RpcApiType();\n") return nil } @@ -121,7 +124,7 @@ func main() { fmt.Fprintf(os.Stderr, "Error generating services file: %v\n", err) os.Exit(1) } - err = generateWshServerFile(tsTypesMap) + err = generateWshClientApiFile(tsTypesMap) if err != nil { fmt.Fprintf(os.Stderr, "Error generating wshserver file: %v\n", err) os.Exit(1) diff --git a/frontend/app/block/blockframe.tsx b/frontend/app/block/blockframe.tsx index b2794b640..96cef3a74 100644 --- a/frontend/app/block/blockframe.tsx +++ b/frontend/app/block/blockframe.tsx @@ -27,7 +27,8 @@ import { WOS, } from "@/app/store/global"; import * as services from "@/app/store/services"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { ErrorBoundary } from "@/element/errorboundary"; import { IconButton } from "@/element/iconbutton"; import { MagnifyIcon } from "@/element/magnify"; @@ -307,7 +308,7 @@ const ConnStatusOverlay = React.memo( }, [width, connStatus, setShowError]); const handleTryReconnect = React.useCallback(() => { - const prtn = WshServer.ConnConnectCommand(connName, { timeout: 60000 }); + const prtn = RpcApi.ConnConnectCommand(WindowRpcClient, connName, { timeout: 60000 }); prtn.catch((e) => console.log("error reconnecting", connName, e)); }, [connName]); @@ -426,7 +427,7 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => { const connName = blockData?.meta?.connection; if (!util.isBlank(connName)) { console.log("ensure conn", nodeModel.blockId, connName); - WshServer.ConnEnsureCommand(connName, { timeout: 60000 }).catch((e) => { + RpcApi.ConnEnsureCommand(WindowRpcClient, connName, { timeout: 60000 }).catch((e) => { console.log("error ensuring connection", nodeModel.blockId, connName, e); }); } @@ -525,7 +526,7 @@ const ChangeConnectionBlockModal = React.memo( setConnList([]); return; } - const prtn = WshServer.ConnListCommand({ timeout: 2000 }); + const prtn = RpcApi.ConnListCommand(WindowRpcClient, { timeout: 2000 }); prtn.then((newConnList) => { setConnList(newConnList ?? []); }).catch((e) => console.log("unable to load conn list from backend. using blank list: ", e)); @@ -546,12 +547,12 @@ const ChangeConnectionBlockModal = React.memo( } else { newCwd = "~"; } - await WshServer.SetMetaCommand({ + await RpcApi.SetMetaCommand(WindowRpcClient, { oref: WOS.makeORef("block", blockId), meta: { connection: connName, file: newCwd }, }); try { - await WshServer.ConnEnsureCommand(connName, { timeout: 60000 }); + await RpcApi.ConnEnsureCommand(WindowRpcClient, connName, { timeout: 60000 }); } catch (e) { console.log("error connecting", blockId, connName, e); } @@ -597,7 +598,7 @@ const ChangeConnectionBlockModal = React.memo( label: `Reconnect to ${connStatus.connection}`, value: "", onSelect: async (_: string) => { - const prtn = WshServer.ConnConnectCommand(connStatus.connection, { timeout: 60000 }); + const prtn = RpcApi.ConnConnectCommand(WindowRpcClient, connStatus.connection, { timeout: 60000 }); prtn.catch((e) => console.log("error reconnecting", connStatus.connection, e)); }, }; diff --git a/frontend/app/element/markdown.tsx b/frontend/app/element/markdown.tsx index 59925fdeb..1f1d64689 100644 --- a/frontend/app/element/markdown.tsx +++ b/frontend/app/element/markdown.tsx @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { CopyButton } from "@/app/element/copybutton"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { getWebServerEndpoint } from "@/util/endpoints"; import { isBlank, makeConnRoute, useAtomValueSafe } from "@/util/util"; import { clsx } from "clsx"; @@ -141,7 +142,9 @@ const MarkdownImg = ({ } const resolveFn = async () => { const route = makeConnRoute(resolveOpts.connName); - const fileInfo = await WshServer.RemoteFileJoinCommand([resolveOpts.baseDir, props.src], { route: route }); + const fileInfo = await RpcApi.RemoteFileJoinCommand(WindowRpcClient, [resolveOpts.baseDir, props.src], { + route: route, + }); const usp = new URLSearchParams(); usp.set("path", fileInfo.path); if (!isBlank(resolveOpts.connName)) { diff --git a/frontend/app/modals/tos.tsx b/frontend/app/modals/tos.tsx index cd4420c89..4a860560f 100644 --- a/frontend/app/modals/tos.tsx +++ b/frontend/app/modals/tos.tsx @@ -4,12 +4,13 @@ import Logo from "@/app/asset/logo.svg"; import { Button } from "@/app/element/button"; import { Toggle } from "@/app/element/toggle"; -import { WshServer } from "@/app/store/wshserver"; import * as services from "@/store/services"; import { OverlayScrollbarsComponent } from "overlayscrollbars-react"; import { useEffect, useRef, useState } from "react"; import { FlexiModal } from "./modal"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import "./tos.less"; const TosModal = () => { @@ -43,7 +44,7 @@ const TosModal = () => { }; const setTelemetry = (value: boolean) => { - WshServer.SetConfigCommand({ "telemetry:enabled": value }) + RpcApi.SetConfigCommand(WindowRpcClient, { "telemetry:enabled": value }) .then(() => { setTelemetryEnabled(value); }) diff --git a/frontend/app/store/services.ts b/frontend/app/store/services.ts index 78e07d166..549ea73c2 100644 --- a/frontend/app/store/services.ts +++ b/frontend/app/store/services.ts @@ -1,7 +1,7 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -// generated by cmd/generate/main-generate.go +// generated by cmd/generate/main-generatets.go import * as WOS from "./wos"; diff --git a/frontend/app/store/wps.ts b/frontend/app/store/wps.ts index b217d3da0..e094237a9 100644 --- a/frontend/app/store/wps.ts +++ b/frontend/app/store/wps.ts @@ -1,6 +1,6 @@ import { isBlank } from "@/util/util"; import { Subject } from "rxjs"; -import { sendRawRpcMessage, setRpcEventHandlerFn } from "./wshrpc"; +import { sendRawRpcMessage } from "./wshrpcutil"; type WaveEventSubject = { handler: (event: WaveEvent) => void; @@ -134,8 +134,4 @@ function handleWaveEvent(event: WaveEvent) { } } -function initWps() { - setRpcEventHandlerFn(handleWaveEvent); -} - -export { getFileSubject, initWps, waveEventSubscribe, waveEventUnsubscribe }; +export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe }; diff --git a/frontend/app/store/wshclient.ts b/frontend/app/store/wshclient.ts new file mode 100644 index 000000000..ec74c15f0 --- /dev/null +++ b/frontend/app/store/wshclient.ts @@ -0,0 +1,150 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { sendRpcCommand, sendRpcResponse } from "@/app/store/wshrpcutil"; +import * as util from "@/util/util"; + +const notFoundLogMap = new Map(); + +class RpcResponseHelper { + client: WshClient; + cmdMsg: RpcMessage; + done: boolean; + + constructor(client: WshClient, cmdMsg: RpcMessage) { + this.client = client; + this.cmdMsg = cmdMsg; + // if reqid is null, no response required + this.done = cmdMsg.reqid == null; + } + + sendResponse(msg: RpcMessage) { + if (this.done || util.isBlank(this.cmdMsg.reqid)) { + return; + } + if (msg == null) { + msg = {}; + } + msg.resid = this.cmdMsg.reqid; + msg.source = this.client.routeId; + sendRpcResponse(msg); + if (!msg.cont) { + this.done = true; + this.client.openRpcs.delete(this.cmdMsg.reqid); + } + } +} + +class WshClient { + routeId: string; + openRpcs: Map = new Map(); + + constructor(routeId: string) { + this.routeId = routeId; + } + + wshRpcCall(command: string, data: any, opts: RpcOpts): Promise { + const msg: RpcMessage = { + command: command, + data: data, + source: this.routeId, + }; + if (!opts?.noresponse) { + msg.reqid = crypto.randomUUID(); + } + if (opts?.timeout) { + msg.timeout = opts.timeout; + } + if (opts?.route) { + msg.route = opts.route; + } + const rpcGen = sendRpcCommand(this.openRpcs, msg); + if (rpcGen == null) { + return null; + } + const respMsgPromise = rpcGen.next(true); // pass true to force termination of rpc after 1 response (not streaming) + return respMsgPromise.then((msg: IteratorResult) => { + return msg.value; + }); + } + + wshRpcStream(command: string, data: any, opts: RpcOpts): AsyncGenerator { + if (opts?.noresponse) { + throw new Error("noresponse not supported for responsestream calls"); + } + const msg: RpcMessage = { + command: command, + data: data, + reqid: crypto.randomUUID(), + source: this.routeId, + }; + if (opts?.timeout) { + msg.timeout = opts.timeout; + } + if (opts?.route) { + msg.route = opts.route; + } + const rpcGen = sendRpcCommand(this.openRpcs, msg); + return rpcGen; + } + + async handleIncomingCommand(msg: RpcMessage) { + // TODO implement a timeout (setTimeout + sendResponse) + const helper = new RpcResponseHelper(this, msg); + const handlerName = `handle_${msg.command}`; + let prtn: any = null; + if (handlerName in this) { + prtn = this[handlerName](helper, msg.data); + return; + } else { + prtn = this.handle_default(helper, msg); + } + try { + if (prtn instanceof Promise) { + await prtn; + } + } catch (e) { + if (!helper.done) { + helper.sendResponse({ error: e.message }); + } else { + console.log(`rpc-client[${this.routeId}] command[${msg.command}] error`, e.message); + } + } finally { + if (!helper.done) { + helper.sendResponse({}); + } + } + return; + } + + recvRpcMessage(msg: RpcMessage) { + const isRequest = msg.command != null || msg.reqid != null; + if (isRequest) { + this.handleIncomingCommand(msg); + return; + } + if (msg.resid == null) { + console.log("rpc response missing resid", msg); + return; + } + const entry = this.openRpcs.get(msg.resid); + if (entry == null) { + if (!notFoundLogMap.has(msg.resid)) { + notFoundLogMap.set(msg.resid, true); + console.log("rpc response generator not found", msg); + } + return; + } + entry.msgFn(msg); + } + + async handle_message(helper: RpcResponseHelper, data: CommandMessageData): Promise { + console.log(`rpc:message[${this.routeId}]`, data?.message); + } + + async handle_default(helper: RpcResponseHelper, msg: RpcMessage): Promise { + throw new Error(`rpc command "${msg.command}" not supported by [${this.routeId}]`); + } +} + +export { RpcResponseHelper, WshClient }; diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts new file mode 100644 index 000000000..4687eeab7 --- /dev/null +++ b/frontend/app/store/wshclientapi.ts @@ -0,0 +1,212 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +// generated by cmd/generate/main-generatets.go + +import { WshClient } from "./wshclient"; + +// WshServerCommandToDeclMap +class RpcApiType { + // command "authenticate" [call] + AuthenticateCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("authenticate", data, opts); + } + + // command "connconnect" [call] + ConnConnectCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("connconnect", data, opts); + } + + // command "conndisconnect" [call] + ConnDisconnectCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("conndisconnect", data, opts); + } + + // command "connensure" [call] + ConnEnsureCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("connensure", data, opts); + } + + // command "connlist" [call] + ConnListCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("connlist", null, opts); + } + + // command "connreinstallwsh" [call] + ConnReinstallWshCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("connreinstallwsh", data, opts); + } + + // command "connstatus" [call] + ConnStatusCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("connstatus", null, opts); + } + + // command "controllerinput" [call] + ControllerInputCommand(client: WshClient, data: CommandBlockInputData, opts?: RpcOpts): Promise { + return client.wshRpcCall("controllerinput", data, opts); + } + + // command "controllerresync" [call] + ControllerResyncCommand(client: WshClient, data: CommandControllerResyncData, opts?: RpcOpts): Promise { + return client.wshRpcCall("controllerresync", data, opts); + } + + // command "controllerstop" [call] + ControllerStopCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("controllerstop", data, opts); + } + + // command "createblock" [call] + CreateBlockCommand(client: WshClient, data: CommandCreateBlockData, opts?: RpcOpts): Promise { + return client.wshRpcCall("createblock", data, opts); + } + + // command "deleteblock" [call] + DeleteBlockCommand(client: WshClient, data: CommandDeleteBlockData, opts?: RpcOpts): Promise { + return client.wshRpcCall("deleteblock", data, opts); + } + + // command "eventpublish" [call] + EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventpublish", data, opts); + } + + // command "eventreadhistory" [call] + EventReadHistoryCommand(client: WshClient, data: CommandEventReadHistoryData, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventreadhistory", data, opts); + } + + // command "eventrecv" [call] + EventRecvCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventrecv", data, opts); + } + + // command "eventsub" [call] + EventSubCommand(client: WshClient, data: SubscriptionRequest, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventsub", data, opts); + } + + // command "eventunsub" [call] + EventUnsubCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventunsub", data, opts); + } + + // command "eventunsuball" [call] + EventUnsubAllCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("eventunsuball", null, opts); + } + + // command "fileappend" [call] + FileAppendCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise { + return client.wshRpcCall("fileappend", data, opts); + } + + // command "fileappendijson" [call] + FileAppendIJsonCommand(client: WshClient, data: CommandAppendIJsonData, opts?: RpcOpts): Promise { + return client.wshRpcCall("fileappendijson", data, opts); + } + + // command "fileread" [call] + FileReadCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise { + return client.wshRpcCall("fileread", data, opts); + } + + // command "filewrite" [call] + FileWriteCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise { + return client.wshRpcCall("filewrite", data, opts); + } + + // command "getmeta" [call] + GetMetaCommand(client: WshClient, data: CommandGetMetaData, opts?: RpcOpts): Promise { + return client.wshRpcCall("getmeta", data, opts); + } + + // command "message" [call] + MessageCommand(client: WshClient, data: CommandMessageData, opts?: RpcOpts): Promise { + return client.wshRpcCall("message", data, opts); + } + + // command "remotefiledelete" [call] + RemoteFileDeleteCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("remotefiledelete", data, opts); + } + + // command "remotefileinfo" [call] + RemoteFileInfoCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("remotefileinfo", data, opts); + } + + // command "remotefilejoin" [call] + RemoteFileJoinCommand(client: WshClient, data: string[], opts?: RpcOpts): Promise { + return client.wshRpcCall("remotefilejoin", data, opts); + } + + // command "remotestreamcpudata" [responsestream] + RemoteStreamCpuDataCommand(client: WshClient, opts?: RpcOpts): AsyncGenerator { + return client.wshRpcStream("remotestreamcpudata", null, opts); + } + + // command "remotestreamfile" [responsestream] + RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator { + return client.wshRpcStream("remotestreamfile", data, opts); + } + + // command "remotewritefile" [call] + RemoteWriteFileCommand(client: WshClient, data: CommandRemoteWriteFileData, opts?: RpcOpts): Promise { + return client.wshRpcCall("remotewritefile", data, opts); + } + + // command "resolveids" [call] + ResolveIdsCommand(client: WshClient, data: CommandResolveIdsData, opts?: RpcOpts): Promise { + return client.wshRpcCall("resolveids", data, opts); + } + + // command "routeannounce" [call] + RouteAnnounceCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("routeannounce", null, opts); + } + + // command "routeunannounce" [call] + RouteUnannounceCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("routeunannounce", null, opts); + } + + // command "setconfig" [call] + SetConfigCommand(client: WshClient, data: SettingsType, opts?: RpcOpts): Promise { + return client.wshRpcCall("setconfig", data, opts); + } + + // command "setmeta" [call] + SetMetaCommand(client: WshClient, data: CommandSetMetaData, opts?: RpcOpts): Promise { + return client.wshRpcCall("setmeta", data, opts); + } + + // command "setview" [call] + SetViewCommand(client: WshClient, data: CommandBlockSetViewData, opts?: RpcOpts): Promise { + return client.wshRpcCall("setview", data, opts); + } + + // command "streamcpudata" [responsestream] + StreamCpuDataCommand(client: WshClient, data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator { + return client.wshRpcStream("streamcpudata", data, opts); + } + + // command "streamtest" [responsestream] + StreamTestCommand(client: WshClient, opts?: RpcOpts): AsyncGenerator { + return client.wshRpcStream("streamtest", null, opts); + } + + // command "streamwaveai" [responsestream] + StreamWaveAiCommand(client: WshClient, data: OpenAiStreamRequest, opts?: RpcOpts): AsyncGenerator { + return client.wshRpcStream("streamwaveai", data, opts); + } + + // command "test" [call] + TestCommand(client: WshClient, data: string, opts?: RpcOpts): Promise { + return client.wshRpcCall("test", data, opts); + } + +} + +export const RpcApi = new RpcApiType(); diff --git a/frontend/app/store/wshrouter.ts b/frontend/app/store/wshrouter.ts new file mode 100644 index 000000000..e9d1df06a --- /dev/null +++ b/frontend/app/store/wshrouter.ts @@ -0,0 +1,155 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { handleWaveEvent } from "@/app/store/wps"; +import debug from "debug"; +import * as util from "../../util/util"; + +const dlog = debug("wave:router"); + +const SysRouteName = "sys"; + +type RouteInfo = { + rpcId: string; + sourceRouteId: string; + destRouteId: string; +}; + +function makeWindowRouteId(windowId: string): string { + return `window:${windowId}`; +} + +function makeFeBlockRouteId(feBlockId: string): string { + return `feblock:${feBlockId}`; +} + +class WshRouter { + routeMap: Map; // routeid -> client + upstreamClient: AbstractWshClient; + rpcMap: Map; // rpcid -> routeinfo + + constructor(upstreamClient: AbstractWshClient) { + this.routeMap = new Map(); + this.rpcMap = new Map(); + this.upstreamClient = upstreamClient; + } + + // returns true if the message was sent + _sendRoutedMessage(msg: RpcMessage, destRouteId: string): boolean { + const client = this.routeMap.get(destRouteId); + if (client) { + client.recvRpcMessage(msg); + return true; + } + if (!this.upstreamClient) { + // there should always be an upstream client + return false; + } + this.upstreamClient?.recvRpcMessage(msg); + return true; + } + + _handleNoRoute(msg: RpcMessage) { + dlog("no route for message", msg); + if (util.isBlank(msg.reqid)) { + // send a message instead + if (msg.command == "message") { + return; + } + const nrMsg = { command: "message", route: msg.source, data: { message: `no route for ${msg.route}` } }; + this._sendRoutedMessage(nrMsg, SysRouteName); + return; + } + // send an error response + const nrMsg = { resid: msg.reqid, error: `no route for ${msg.route}` }; + this._sendRoutedMessage(nrMsg, msg.source); + } + + _registerRouteInfo(reqid: string, sourceRouteId: string, destRouteId: string) { + dlog("registering route info", reqid, sourceRouteId, destRouteId); + if (util.isBlank(reqid)) { + return; + } + const routeInfo: RouteInfo = { + rpcId: reqid, + sourceRouteId: sourceRouteId, + destRouteId: destRouteId, + }; + this.rpcMap.set(reqid, routeInfo); + } + + recvRpcMessage(msg: RpcMessage) { + dlog("router received message", msg); + // we are a terminal node by definition, so we don't need to process with announce/unannounce messages + if (msg.command == "routeannounce" || msg.command == "routeunannounce") { + return; + } + // handle events + if (msg.command == "eventrecv") { + handleWaveEvent(msg.data); + return; + } + if (!util.isBlank(msg.command)) { + // send + register routeinfo + const ok = this._sendRoutedMessage(msg, msg.route); + if (!ok) { + this._handleNoRoute(msg); + return; + } + this._registerRouteInfo(msg.reqid, msg.source, msg.route); + return; + } + if (!util.isBlank(msg.reqid)) { + const routeInfo = this.rpcMap.get(msg.reqid); + if (!routeInfo) { + // no route info, discard + return; + } + this._sendRoutedMessage(msg, routeInfo.destRouteId); + return; + } + if (!util.isBlank(msg.resid)) { + const routeInfo = this.rpcMap.get(msg.resid); + if (!routeInfo) { + // no route info, discard + return; + } + this._sendRoutedMessage(msg, routeInfo.sourceRouteId); + if (!msg.cont) { + dlog("deleting route info", msg.resid); + this.rpcMap.delete(msg.resid); + } + return; + } + dlog("bad rpc message recevied by router, no command, reqid, or resid (discarding)", msg); + } + + registerRoute(routeId: string, client: AbstractWshClient) { + if (routeId == SysRouteName) { + throw new Error(`Cannot register route with reserved name (${routeId})`); + } + dlog("registering route: ", routeId); + // announce + const announceMsg: RpcMessage = { + command: "routeannounce", + data: routeId, + source: routeId, + }; + this.upstreamClient.recvRpcMessage(announceMsg); + this.routeMap.set(routeId, client); + } + + unregisterRoute(routeId: string) { + dlog("unregister route: ", routeId); + // unannounce + const unannounceMsg: RpcMessage = { + command: "routeunannounce", + data: routeId, + source: routeId, + }; + this.upstreamClient?.recvRpcMessage(unannounceMsg); + this.routeMap.delete(routeId); + } +} + +export { makeFeBlockRouteId, makeWindowRouteId, WshRouter }; diff --git a/frontend/app/store/wshrpc.ts b/frontend/app/store/wshrpc.ts deleted file mode 100644 index f33612e84..000000000 --- a/frontend/app/store/wshrpc.ts +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2024, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -import { getWSServerEndpoint } from "@/util/endpoints"; -import { WSControl } from "./ws"; - -type RpcEntry = { - reqId: string; - startTs: number; - command: string; - msgFn: (msg: RpcMessage) => void; -}; - -const openRpcs = new Map(); - -function wshServerRpcHelper_responsestream( - command: string, - data: any, - opts: RpcOpts -): AsyncGenerator { - if (opts?.noresponse) { - throw new Error("noresponse not supported for responsestream calls"); - } - const msg: RpcMessage = { - command: command, - data: data, - reqid: crypto.randomUUID(), - }; - if (opts?.timeout) { - msg.timeout = opts.timeout; - } - if (opts?.route) { - msg.route = opts.route; - } - const rpcGen = sendRpcCommand(msg); - return rpcGen; -} - -function wshServerRpcHelper_call(command: string, data: any, opts: RpcOpts): Promise { - const msg: RpcMessage = { - command: command, - data: data, - }; - if (!opts?.noresponse) { - msg.reqid = crypto.randomUUID(); - } - if (opts?.timeout) { - msg.timeout = opts.timeout; - } - if (opts?.route) { - msg.route = opts.route; - } - const rpcGen = sendRpcCommand(msg); - if (rpcGen == null) { - return null; - } - const respMsgPromise = rpcGen.next(true); // pass true to force termination of rpc after 1 response (not streaming) - return respMsgPromise.then((msg: IteratorResult) => { - return msg.value; - }); -} - -async function* rpcResponseGenerator( - command: string, - reqid: string, - timeout: number -): AsyncGenerator { - const msgQueue: RpcMessage[] = []; - let signalFn: () => void; - let signalPromise = new Promise((resolve) => (signalFn = resolve)); - let timeoutId: NodeJS.Timeout = null; - if (timeout > 0) { - timeoutId = setTimeout(() => { - msgQueue.push({ resid: reqid, error: "EC-TIME: timeout waiting for response" }); - signalFn(); - }, timeout); - } - const msgFn = (msg: RpcMessage) => { - msgQueue.push(msg); - signalFn(); - // reset signal promise - signalPromise = new Promise((resolve) => (signalFn = resolve)); - }; - openRpcs.set(reqid, { - reqId: reqid, - startTs: Date.now(), - command: command, - msgFn: msgFn, - }); - yield null; - try { - while (true) { - while (msgQueue.length > 0) { - const msg = msgQueue.shift()!; - if (msg.error != null) { - throw new Error(msg.error); - } - if (!msg.cont && msg.data == null) { - return; - } - const shouldTerminate = yield msg.data; - if (shouldTerminate) { - sendRpcCancel(reqid); - return; - } - if (!msg.cont) { - return; - } - } - await signalPromise; - } - } finally { - openRpcs.delete(reqid); - if (timeoutId != null) { - clearTimeout(timeoutId); - } - } -} - -function sendRpcCancel(reqid: string) { - const rpcMsg: RpcMessage = { reqid: reqid, cancel: true }; - const wsMsg: WSRpcCommand = { wscommand: "rpc", message: rpcMsg }; - sendWSCommand(wsMsg); -} - -function sendRpcCommand(msg: RpcMessage): AsyncGenerator { - const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg }; - sendWSCommand(wsMsg); - if (msg.reqid == null) { - return null; - } - const rtnGen = rpcResponseGenerator(msg.command, msg.reqid, msg.timeout); - rtnGen.next(); // start the generator (run the initialization/registration logic, throw away the result) - return rtnGen; -} - -function sendRawRpcMessage(msg: RpcMessage) { - const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg }; - sendWSCommand(wsMsg); -} - -const notFoundLogMap = new Map(); - -let rpcEventHandlerFn: (evt: WaveEvent) => void; - -function setRpcEventHandlerFn(fn: (evt: WaveEvent) => void) { - if (rpcEventHandlerFn) { - throw new Error("wshrpc.setRpcEventHandlerFn called more than once"); - } - rpcEventHandlerFn = fn; -} - -function handleIncomingRpcMessage(event: WSEventType) { - if (event.eventtype !== "rpc") { - console.warn("unsupported ws event type:", event.eventtype, event); - } - const msg: RpcMessage = event.data; - const isRequest = msg.command != null || msg.reqid != null; - if (isRequest) { - // handle events - if (msg.command == "eventrecv") { - rpcEventHandlerFn?.(msg.data); - return; - } - if (msg.command == "message") { - if (msg.data?.oref != null) { - console.log("rpc:message", msg.data?.oref, msg.data?.message); - } else { - console.log("rpc:message", msg.data?.message); - } - return; - } - - console.log("rpc command not supported", msg); - return; - } - if (msg.resid == null) { - console.log("rpc response missing resid", msg); - return; - } - const entry = openRpcs.get(msg.resid); - if (entry == null) { - if (!notFoundLogMap.has(msg.resid)) { - notFoundLogMap.set(msg.resid, true); - console.log("rpc response generator not found", msg); - } - return; - } - entry.msgFn(msg); -} - -async function consumeGenerator(gen: AsyncGenerator) { - let idx = 0; - try { - for await (const msg of gen) { - console.log("gen", idx, msg); - idx++; - } - const result = await gen.return(undefined); - console.log("gen done", result.value); - } catch (e) { - console.log("gen error", e); - } -} - -if (globalThis.window != null) { - globalThis["consumeGenerator"] = consumeGenerator; -} - -let globalWS: WSControl; - -function initWshrpc(windowId: string) { - globalWS = new WSControl(getWSServerEndpoint(), windowId, handleIncomingRpcMessage); - globalWS.connectNow("connectWshrpc"); -} - -function sendWSCommand(cmd: WSCommandType) { - globalWS?.sendMessage(cmd); -} - -export { - handleIncomingRpcMessage, - initWshrpc, - sendRawRpcMessage, - sendRpcCommand, - sendWSCommand, - setRpcEventHandlerFn, - wshServerRpcHelper_call, - wshServerRpcHelper_responsestream, -}; diff --git a/frontend/app/store/wshrpcutil.ts b/frontend/app/store/wshrpcutil.ts new file mode 100644 index 000000000..95047efc3 --- /dev/null +++ b/frontend/app/store/wshrpcutil.ts @@ -0,0 +1,147 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import { WshClient } from "@/app/store/wshclient"; +import { makeWindowRouteId, WshRouter } from "@/app/store/wshrouter"; +import { getWSServerEndpoint } from "@/util/endpoints"; +import { WSControl } from "./ws"; + +let globalWS: WSControl; +let DefaultRouter: WshRouter; +let WindowRpcClient: WshClient; + +async function* rpcResponseGenerator( + openRpcs: Map, + command: string, + reqid: string, + timeout: number +): AsyncGenerator { + const msgQueue: RpcMessage[] = []; + let signalFn: () => void; + let signalPromise = new Promise((resolve) => (signalFn = resolve)); + let timeoutId: NodeJS.Timeout = null; + if (timeout > 0) { + timeoutId = setTimeout(() => { + msgQueue.push({ resid: reqid, error: "EC-TIME: timeout waiting for response" }); + signalFn(); + }, timeout); + } + const msgFn = (msg: RpcMessage) => { + msgQueue.push(msg); + signalFn(); + // reset signal promise + signalPromise = new Promise((resolve) => (signalFn = resolve)); + }; + openRpcs.set(reqid, { + reqId: reqid, + startTs: Date.now(), + command: command, + msgFn: msgFn, + }); + yield null; + try { + while (true) { + while (msgQueue.length > 0) { + const msg = msgQueue.shift()!; + if (msg.error != null) { + throw new Error(msg.error); + } + if (!msg.cont && msg.data == null) { + return; + } + const shouldTerminate = yield msg.data; + if (shouldTerminate) { + sendRpcCancel(reqid); + return; + } + if (!msg.cont) { + return; + } + } + await signalPromise; + } + } finally { + openRpcs.delete(reqid); + if (timeoutId != null) { + clearTimeout(timeoutId); + } + } +} + +function sendRpcCancel(reqid: string) { + const rpcMsg: RpcMessage = { reqid: reqid, cancel: true }; + DefaultRouter.recvRpcMessage(rpcMsg); +} + +function sendRpcResponse(msg: RpcMessage) { + DefaultRouter.recvRpcMessage(msg); +} + +function sendRpcCommand( + openRpcs: Map, + msg: RpcMessage +): AsyncGenerator { + DefaultRouter.recvRpcMessage(msg); + if (msg.reqid == null) { + return null; + } + const rtnGen = rpcResponseGenerator(openRpcs, msg.command, msg.reqid, msg.timeout); + rtnGen.next(); // start the generator (run the initialization/registration logic, throw away the result) + return rtnGen; +} + +function sendRawRpcMessage(msg: RpcMessage) { + const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg }; + sendWSCommand(wsMsg); +} + +async function consumeGenerator(gen: AsyncGenerator) { + let idx = 0; + try { + for await (const msg of gen) { + console.log("gen", idx, msg); + idx++; + } + const result = await gen.return(undefined); + console.log("gen done", result.value); + } catch (e) { + console.log("gen error", e); + } +} + +if (globalThis.window != null) { + globalThis["consumeGenerator"] = consumeGenerator; +} + +function initWshrpc(windowId: string) { + DefaultRouter = new WshRouter(new UpstreamWshRpcProxy()); + const handleFn = (event: WSEventType) => { + DefaultRouter.recvRpcMessage(event.data); + // handleIncomingRpcMessage(globalOpenRpcs, event); + }; + globalWS = new WSControl(getWSServerEndpoint(), windowId, handleFn); + globalWS.connectNow("connectWshrpc"); + WindowRpcClient = new WshClient(makeWindowRouteId(windowId)); + DefaultRouter.registerRoute(WindowRpcClient.routeId, WindowRpcClient); +} + +function sendWSCommand(cmd: WSCommandType) { + globalWS?.pushMessage(cmd); +} + +class UpstreamWshRpcProxy implements AbstractWshClient { + recvRpcMessage(msg: RpcMessage): void { + const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg }; + globalWS?.pushMessage(wsMsg); + } +} + +export { + DefaultRouter, + initWshrpc, + sendRawRpcMessage, + sendRpcCommand, + sendRpcResponse, + sendWSCommand, + WindowRpcClient, +}; diff --git a/frontend/app/store/wshserver.ts b/frontend/app/store/wshserver.ts deleted file mode 100644 index 489320d77..000000000 --- a/frontend/app/store/wshserver.ts +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright 2024, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -// generated by cmd/generate/main-generate.go - -import { wshServerRpcHelper_call, wshServerRpcHelper_responsestream } from "./wshrpc"; - -// WshServerCommandToDeclMap -class WshServerType { - // command "announce" [call] - AnnounceCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("announce", data, opts); - } - - // command "authenticate" [call] - AuthenticateCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("authenticate", data, opts); - } - - // command "connconnect" [call] - ConnConnectCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("connconnect", data, opts); - } - - // command "conndisconnect" [call] - ConnDisconnectCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("conndisconnect", data, opts); - } - - // command "connensure" [call] - ConnEnsureCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("connensure", data, opts); - } - - // command "connlist" [call] - ConnListCommand(opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("connlist", null, opts); - } - - // command "connreinstallwsh" [call] - ConnReinstallWshCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("connreinstallwsh", data, opts); - } - - // command "connstatus" [call] - ConnStatusCommand(opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("connstatus", null, opts); - } - - // command "controllerinput" [call] - ControllerInputCommand(data: CommandBlockInputData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("controllerinput", data, opts); - } - - // command "controllerresync" [call] - ControllerResyncCommand(data: CommandControllerResyncData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("controllerresync", data, opts); - } - - // command "controllerstop" [call] - ControllerStopCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("controllerstop", data, opts); - } - - // command "createblock" [call] - CreateBlockCommand(data: CommandCreateBlockData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("createblock", data, opts); - } - - // command "deleteblock" [call] - DeleteBlockCommand(data: CommandDeleteBlockData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("deleteblock", data, opts); - } - - // command "eventpublish" [call] - EventPublishCommand(data: WaveEvent, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventpublish", data, opts); - } - - // command "eventreadhistory" [call] - EventReadHistoryCommand(data: CommandEventReadHistoryData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventreadhistory", data, opts); - } - - // command "eventrecv" [call] - EventRecvCommand(data: WaveEvent, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventrecv", data, opts); - } - - // command "eventsub" [call] - EventSubCommand(data: SubscriptionRequest, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventsub", data, opts); - } - - // command "eventunsub" [call] - EventUnsubCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventunsub", data, opts); - } - - // command "eventunsuball" [call] - EventUnsubAllCommand(opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("eventunsuball", null, opts); - } - - // command "fileappend" [call] - FileAppendCommand(data: CommandFileData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("fileappend", data, opts); - } - - // command "fileappendijson" [call] - FileAppendIJsonCommand(data: CommandAppendIJsonData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("fileappendijson", data, opts); - } - - // command "fileread" [call] - FileReadCommand(data: CommandFileData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("fileread", data, opts); - } - - // command "filewrite" [call] - FileWriteCommand(data: CommandFileData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("filewrite", data, opts); - } - - // command "getmeta" [call] - GetMetaCommand(data: CommandGetMetaData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("getmeta", data, opts); - } - - // command "message" [call] - MessageCommand(data: CommandMessageData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("message", data, opts); - } - - // command "remotefiledelete" [call] - RemoteFileDeleteCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("remotefiledelete", data, opts); - } - - // command "remotefileinfo" [call] - RemoteFileInfoCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("remotefileinfo", data, opts); - } - - // command "remotefilejoin" [call] - RemoteFileJoinCommand(data: string[], opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("remotefilejoin", data, opts); - } - - // command "remotestreamcpudata" [responsestream] - RemoteStreamCpuDataCommand(opts?: RpcOpts): AsyncGenerator { - return wshServerRpcHelper_responsestream("remotestreamcpudata", null, opts); - } - - // command "remotestreamfile" [responsestream] - RemoteStreamFileCommand(data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator { - return wshServerRpcHelper_responsestream("remotestreamfile", data, opts); - } - - // command "remotewritefile" [call] - RemoteWriteFileCommand(data: CommandRemoteWriteFileData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("remotewritefile", data, opts); - } - - // command "resolveids" [call] - ResolveIdsCommand(data: CommandResolveIdsData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("resolveids", data, opts); - } - - // command "setconfig" [call] - SetConfigCommand(data: SettingsType, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("setconfig", data, opts); - } - - // command "setmeta" [call] - SetMetaCommand(data: CommandSetMetaData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("setmeta", data, opts); - } - - // command "setview" [call] - SetViewCommand(data: CommandBlockSetViewData, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("setview", data, opts); - } - - // command "streamcpudata" [responsestream] - StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator { - return wshServerRpcHelper_responsestream("streamcpudata", data, opts); - } - - // command "streamtest" [responsestream] - StreamTestCommand(opts?: RpcOpts): AsyncGenerator { - return wshServerRpcHelper_responsestream("streamtest", null, opts); - } - - // command "streamwaveai" [responsestream] - StreamWaveAiCommand(data: OpenAiStreamRequest, opts?: RpcOpts): AsyncGenerator { - return wshServerRpcHelper_responsestream("streamwaveai", data, opts); - } - - // command "test" [call] - TestCommand(data: string, opts?: RpcOpts): Promise { - return wshServerRpcHelper_call("test", data, opts); - } - -} - -export const WshServer = new WshServerType(); diff --git a/frontend/app/view/cpuplot/cpuplot.tsx b/frontend/app/view/cpuplot/cpuplot.tsx index 89a20622f..eb060cf58 100644 --- a/frontend/app/view/cpuplot/cpuplot.tsx +++ b/frontend/app/view/cpuplot/cpuplot.tsx @@ -4,7 +4,6 @@ import { useHeight } from "@/app/hook/useHeight"; import { useWidth } from "@/app/hook/useWidth"; import { getConnStatusAtom, globalStore, WOS } from "@/store/global"; -import { WshServer } from "@/store/wshserver"; import * as util from "@/util/util"; import * as Plot from "@observablehq/plot"; import dayjs from "dayjs"; @@ -13,6 +12,8 @@ import * as jotai from "jotai"; import * as React from "react"; import { waveEventSubscribe } from "@/app/store/wps"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import "./cpuplot.less"; const DefaultNumPoints = 120; @@ -112,7 +113,10 @@ class CpuPlotViewModel { this.incrementCount = jotai.atom(null, async (get, set) => { const meta = get(this.blockAtom).meta; const count = meta.count ?? 0; - await WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { count: count + 1 } }); + await RpcApi.SetMetaCommand(WindowRpcClient, { + oref: WOS.makeORef("block", this.blockId), + meta: { count: count + 1 }, + }); }); this.connection = jotai.atom((get) => { const blockData = get(this.blockAtom); @@ -137,7 +141,7 @@ class CpuPlotViewModel { try { const numPoints = globalStore.get(this.numPoints); const connName = globalStore.get(this.connection); - const initialData = await WshServer.EventReadHistoryCommand({ + const initialData = await RpcApi.EventReadHistoryCommand(WindowRpcClient, { event: "sysinfo", scope: connName, maxitems: numPoints, diff --git a/frontend/app/view/preview/preview.tsx b/frontend/app/view/preview/preview.tsx index 5981dbd14..d4eba1e46 100644 --- a/frontend/app/view/preview/preview.tsx +++ b/frontend/app/view/preview/preview.tsx @@ -4,7 +4,8 @@ import { TypeAheadModal } from "@/app/modals/typeaheadmodal"; import { ContextMenuModel } from "@/app/store/contextmenu"; import { tryReinjectKey } from "@/app/store/keymodel"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { Markdown } from "@/element/markdown"; import { NodeModel } from "@/layout/index"; import { atoms, createBlock, getConnStatusAtom, globalStore, refocusNode } from "@/store/global"; @@ -638,7 +639,7 @@ export class PreviewModel implements ViewModel { } const conn = globalStore.get(this.connection); try { - const newFileInfo = await WshServer.RemoteFileJoinCommand([fileInfo.dir, filePath], { + const newFileInfo = await RpcApi.RemoteFileJoinCommand(WindowRpcClient, [fileInfo.dir, filePath], { route: makeConnRoute(conn), }); this.updateOpenFileModalAndError(false); diff --git a/frontend/app/view/term/term.tsx b/frontend/app/view/term/term.tsx index 8296c47ad..3647afdce 100644 --- a/frontend/app/view/term/term.tsx +++ b/frontend/app/view/term/term.tsx @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { waveEventSubscribe } from "@/app/store/wps"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { VDomView } from "@/app/view/term/vdom"; import { WOS, atoms, getConnStatusAtom, globalStore, useSettingsPrefixAtom } from "@/store/global"; import * as services from "@/store/services"; @@ -169,7 +170,10 @@ class TermViewModel { } setTerminalTheme(themeName: string) { - WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { "term:theme": themeName } }); + RpcApi.SetMetaCommand(WindowRpcClient, { + oref: WOS.makeORef("block", this.blockId), + meta: { "term:theme": themeName }, + }); } getSettingsMenuItems(): ContextMenuItem[] { @@ -199,7 +203,7 @@ class TermViewModel { rows: this.termRef.current?.terminal?.rows, cols: this.termRef.current?.terminal?.cols, }; - const prtn = WshServer.ControllerResyncCommand({ + const prtn = RpcApi.ControllerResyncCommand(WindowRpcClient, { tabid: globalStore.get(atoms.activeTabId), blockid: this.blockId, forcerestart: true, @@ -264,7 +268,10 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) { event.preventDefault(); event.stopPropagation(); - WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } }); + RpcApi.SetMetaCommand(WindowRpcClient, { + oref: WOS.makeORef("block", blockId), + meta: { "term:mode": null }, + }); return false; } if ( @@ -302,7 +309,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { if (shellProcStatusRef.current != "running" && keyutil.checkKeyPressed(waveEvent, "Enter")) { // restart const tabId = globalStore.get(atoms.activeTabId); - const prtn = WshServer.ControllerResyncCommand({ tabid: tabId, blockid: blockId }); + const prtn = RpcApi.ControllerResyncCommand(WindowRpcClient, { tabid: tabId, blockid: blockId }); prtn.catch((e) => console.log("error controller resync (enter)", blockId, e)); return false; } @@ -346,7 +353,10 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { const waveEvent = keyutil.adaptFromReactOrNativeKeyEvent(event); if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) { // reset term:mode - WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } }); + RpcApi.SetMetaCommand(WindowRpcClient, { + oref: WOS.makeORef("block", blockId), + meta: { "term:mode": null }, + }); return false; } const asciiVal = keyboardEventToASCII(event); @@ -354,7 +364,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { return false; } const b64data = util.stringToBase64(asciiVal); - WshServer.ControllerInputCommand({ blockid: blockId, inputdata64: b64data }); + RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: blockId, inputdata64: b64data }); return true; }; diff --git a/frontend/app/view/term/termsticker.tsx b/frontend/app/view/term/termsticker.tsx index 29971e43b..2d4a4d0b6 100644 --- a/frontend/app/view/term/termsticker.tsx +++ b/frontend/app/view/term/termsticker.tsx @@ -1,7 +1,8 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { createBlock } from "@/store/global"; import { getWebServerEndpoint } from "@/util/endpoints"; import { stringToBase64 } from "@/util/util"; @@ -100,7 +101,7 @@ function TermSticker({ sticker, config }: { sticker: StickerType; config: Sticke console.log("clickHandler", sticker.clickcmd, sticker.clickblockdef); if (sticker.clickcmd) { const b64data = stringToBase64(sticker.clickcmd); - WshServer.ControllerInputCommand({ blockid: config.blockId, inputdata64: b64data }); + RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: config.blockId, inputdata64: b64data }); } if (sticker.clickblockdef) { createBlock(sticker.clickblockdef); diff --git a/frontend/app/view/term/termwrap.ts b/frontend/app/view/term/termwrap.ts index 638a98a72..2ef539188 100644 --- a/frontend/app/view/term/termwrap.ts +++ b/frontend/app/view/term/termwrap.ts @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 import { getFileSubject } from "@/app/store/wps"; -import { sendWSCommand } from "@/app/store/wshrpc"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient, sendWSCommand } from "@/app/store/wshrpcutil"; import { PLATFORM, WOS, atoms, fetchWaveFile, globalStore, openLink } from "@/store/global"; import * as services from "@/store/services"; import * as util from "@/util/util"; @@ -154,7 +154,7 @@ export class TermWrap { handleTermData(data: string) { const b64data = util.stringToBase64(data); - WshServer.ControllerInputCommand({ blockid: this.blockId, inputdata64: b64data }); + RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: this.blockId, inputdata64: b64data }); } addFocusListener(focusFn: () => void) { @@ -219,7 +219,11 @@ export class TermWrap { const tabId = globalStore.get(atoms.activeTabId); const rtOpts: RuntimeOpts = { termsize: { rows: this.terminal.rows, cols: this.terminal.cols } }; try { - await WshServer.ControllerResyncCommand({ tabid: tabId, blockid: this.blockId, rtopts: rtOpts }); + await RpcApi.ControllerResyncCommand(WindowRpcClient, { + tabid: tabId, + blockid: this.blockId, + rtopts: rtOpts, + }); } catch (e) { console.log(`error controller resync (${reason})`, this.blockId, e); } diff --git a/frontend/app/view/waveai/waveai.tsx b/frontend/app/view/waveai/waveai.tsx index 348f540b9..bf1b1b0c7 100644 --- a/frontend/app/view/waveai/waveai.tsx +++ b/frontend/app/view/waveai/waveai.tsx @@ -3,9 +3,10 @@ import { Markdown } from "@/app/element/markdown"; import { TypingIndicator } from "@/app/element/typingindicator"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { WindowRpcClient } from "@/app/store/wshrpcutil"; import { atoms, fetchWaveFile, getUserName, globalStore, WOS } from "@/store/global"; import { BlockService } from "@/store/services"; -import { WshServer } from "@/store/wshserver"; import { adaptFromReactOrNativeKeyEvent, checkKeyPressed } from "@/util/keyutil"; import { isBlank } from "@/util/util"; import { atom, Atom, PrimitiveAtom, useAtomValue, useSetAtom, WritableAtom } from "jotai"; @@ -182,7 +183,7 @@ export class WaveAiModel implements ViewModel { opts: opts, prompt: [...history, newPrompt], }; - const aiGen = WshServer.StreamWaveAiCommand(beMsg, { timeout: 60000 }); + const aiGen = RpcApi.StreamWaveAiCommand(WindowRpcClient, beMsg, { timeout: 60000 }); let fullMsg = ""; for await (const msg of aiGen) { fullMsg += msg.text ?? ""; diff --git a/frontend/types/custom.d.ts b/frontend/types/custom.d.ts index cd79683c4..e751e55ac 100644 --- a/frontend/types/custom.d.ts +++ b/frontend/types/custom.d.ts @@ -283,6 +283,17 @@ declare global { message: string; expiration: number; }; + + interface AbstractWshClient { + recvRpcMessage(msg: RpcMessage): void; + } + + type ClientRpcEntry = { + reqId: string; + startTs: number; + command: string; + msgFn: (msg: RpcMessage) => void; + }; } export {}; diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index a6b96e646..c61308762 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -1,7 +1,7 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -// generated by cmd/generate/main-generate.go +// generated by cmd/generate/main-generatets.go declare global { diff --git a/frontend/util/fetchutil.ts b/frontend/util/fetchutil.ts index 59dd68e02..7a32eab8d 100644 --- a/frontend/util/fetchutil.ts +++ b/frontend/util/fetchutil.ts @@ -5,10 +5,12 @@ let net: Electron.Net; -try { - import("electron").then(({ net: electronNet }) => (net = electronNet)); -} catch (e) { - // do nothing +if (typeof window === "undefined") { + try { + import("electron").then(({ net: electronNet }) => (net = electronNet)); + } catch (e) { + // do nothing + } } export function fetch(input: string | GlobalRequest | URL, init?: RequestInit): Promise { diff --git a/frontend/wave.ts b/frontend/wave.ts index a9564367c..9be135584 100644 --- a/frontend/wave.ts +++ b/frontend/wave.ts @@ -8,9 +8,8 @@ import { registerGlobalKeys, } from "@/app/store/keymodel"; import { FileService, ObjectService } from "@/app/store/services"; -import { initWps } from "@/app/store/wps"; -import { initWshrpc } from "@/app/store/wshrpc"; -import { WshServer } from "@/app/store/wshserver"; +import { RpcApi } from "@/app/store/wshclientapi"; +import { initWshrpc, WindowRpcClient } from "@/app/store/wshrpcutil"; import { loadMonaco } from "@/app/view/codeeditor/codeeditor"; import { getLayoutModelForActiveTab } from "@/layout/index"; import { @@ -47,7 +46,7 @@ loadFonts(); (window as any).WOS = WOS; (window as any).globalStore = globalStore; (window as any).globalAtoms = atoms; -(window as any).WshServer = WshServer; +(window as any).RpcApi = RpcApi; (window as any).isFullScreen = false; (window as any).countersPrint = countersPrint; (window as any).countersClear = countersClear; @@ -61,8 +60,8 @@ document.addEventListener("DOMContentLoaded", async () => { // Init WPS event handlers initWshrpc(windowId); + (window as any).WindowRpcClient = WindowRpcClient; await loadConnStatus(); - initWps(); initGlobalWaveEventSubs(); subscribeToConnEvents(); diff --git a/pkg/tsgen/tsgen.go b/pkg/tsgen/tsgen.go index 75726a3fb..cbde00941 100644 --- a/pkg/tsgen/tsgen.go +++ b/pkg/tsgen/tsgen.go @@ -409,17 +409,17 @@ func GenerateServiceClass(serviceName string, serviceObj any, tsTypesMap map[ref return sb.String() } -func GenerateWshServerMethod(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { +func GenerateWshClientApiMethod(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { if methodDecl.CommandType == wshrpc.RpcType_ResponseStream { - return GenerateWshServerMethod_ResponseStream(methodDecl, tsTypesMap) + return generateWshClientApiMethod_ResponseStream(methodDecl, tsTypesMap) } else if methodDecl.CommandType == wshrpc.RpcType_Call { - return GenerateWshServerMethod_Call(methodDecl, tsTypesMap) + return generateWshClientApiMethod_Call(methodDecl, tsTypesMap) } else { panic(fmt.Sprintf("cannot generate wshserver commandtype %q", methodDecl.CommandType)) } } -func GenerateWshServerMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { +func generateWshClientApiMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { var sb strings.Builder sb.WriteString(fmt.Sprintf(" // command %q [%s]\n", methodDecl.Command, methodDecl.CommandType)) respType := "any" @@ -433,16 +433,16 @@ func GenerateWshServerMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl, genRespType := fmt.Sprintf("AsyncGenerator<%s, void, boolean>", respType) if methodDecl.CommandDataType != nil { cmdDataTsName, _ := TypeToTSType(methodDecl.CommandDataType, tsTypesMap) - sb.WriteString(fmt.Sprintf(" %s(data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, genRespType)) + sb.WriteString(fmt.Sprintf(" %s(client: WshClient, data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, genRespType)) } else { - sb.WriteString(fmt.Sprintf(" %s(opts?: RpcOpts): %s {\n", methodDecl.MethodName, genRespType)) + sb.WriteString(fmt.Sprintf(" %s(client: WshClient, opts?: RpcOpts): %s {\n", methodDecl.MethodName, genRespType)) } - sb.WriteString(fmt.Sprintf(" return wshServerRpcHelper_responsestream(%q, %s, opts);\n", methodDecl.Command, dataName)) + sb.WriteString(fmt.Sprintf(" return client.wshRpcStream(%q, %s, opts);\n", methodDecl.Command, dataName)) sb.WriteString(" }\n") return sb.String() } -func GenerateWshServerMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { +func generateWshClientApiMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string { var sb strings.Builder sb.WriteString(fmt.Sprintf(" // command %q [%s]\n", methodDecl.Command, methodDecl.CommandType)) rtnType := "Promise" @@ -456,11 +456,11 @@ func GenerateWshServerMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMa } if methodDecl.CommandDataType != nil { cmdDataTsName, _ := TypeToTSType(methodDecl.CommandDataType, tsTypesMap) - sb.WriteString(fmt.Sprintf(" %s(data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, rtnType)) + sb.WriteString(fmt.Sprintf(" %s(client: WshClient, data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, rtnType)) } else { - sb.WriteString(fmt.Sprintf(" %s(opts?: RpcOpts): %s {\n", methodDecl.MethodName, rtnType)) + sb.WriteString(fmt.Sprintf(" %s(client: WshClient, opts?: RpcOpts): %s {\n", methodDecl.MethodName, rtnType)) } - methodBody := fmt.Sprintf(" return wshServerRpcHelper_call(%q, %s, opts);\n", methodDecl.Command, dataName) + methodBody := fmt.Sprintf(" return client.wshRpcCall(%q, %s, opts);\n", methodDecl.Command, dataName) sb.WriteString(methodBody) sb.WriteString(" }\n") return sb.String() diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index ee61dee3b..c3becfbda 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -13,12 +13,6 @@ import ( "github.com/wavetermdev/waveterm/pkg/wps" ) -// command "announce", wshserver.AnnounceCommand -func AnnounceCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error { - _, err := sendRpcRequestCallHelper[any](w, "announce", data, opts) - return err -} - // command "authenticate", wshserver.AuthenticateCommand func AuthenticateCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) (wshrpc.CommandAuthenticateRtnData, error) { resp, err := sendRpcRequestCallHelper[wshrpc.CommandAuthenticateRtnData](w, "authenticate", data, opts) @@ -203,6 +197,18 @@ func ResolveIdsCommand(w *wshutil.WshRpc, data wshrpc.CommandResolveIdsData, opt return resp, err } +// command "routeannounce", wshserver.RouteAnnounceCommand +func RouteAnnounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "routeannounce", nil, opts) + return err +} + +// command "routeunannounce", wshserver.RouteUnannounceCommand +func RouteUnannounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "routeunannounce", nil, opts) + return err +} + // command "setconfig", wshserver.SetConfigCommand func SetConfigCommand(w *wshutil.WshRpc, data wconfig.MetaSettingsType, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "setconfig", data, opts) diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 6ec6b4ceb..678b9498a 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -26,8 +26,9 @@ const ( ) const ( - Command_Authenticate = "authenticate" // special - Command_Announce = "announce" // special (for routing) + Command_Authenticate = "authenticate" // special + Command_RouteAnnounce = "routeannounce" // special (for routing) + Command_RouteUnannounce = "routeunannounce" // special (for routing) Command_Message = "message" Command_GetMeta = "getmeta" Command_SetMeta = "setmeta" @@ -73,7 +74,8 @@ type RespOrErrorUnion[T any] struct { type WshRpcInterface interface { AuthenticateCommand(ctx context.Context, data string) (CommandAuthenticateRtnData, error) - AnnounceCommand(ctx context.Context, data string) error // (special) announces a new route to the main router + RouteAnnounceCommand(ctx context.Context) error // (special) announces a new route to the main router + RouteUnannounceCommand(ctx context.Context) error // (special) unannounces a route to the main router MessageCommand(ctx context.Context, data CommandMessageData) error GetMetaCommand(ctx context.Context, data CommandGetMetaData) (waveobj.MetaMapType, error) diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index 12e4d7410..6bd8afb59 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -158,6 +158,12 @@ func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute router.AnnouncedRoutes[msg.Source] = input.fromRouteId } +func (router *WshRouter) handleUnannounceMessage(msg RpcMessage) { + router.Lock.Lock() + defer router.Lock.Unlock() + delete(router.AnnouncedRoutes, msg.Source) +} + func (router *WshRouter) getAnnouncedRoute(routeId string) string { router.Lock.Lock() defer router.Lock.Unlock() @@ -197,10 +203,14 @@ func (router *WshRouter) runServer() { continue } routeId := msg.Route - if msg.Command == wshrpc.Command_Announce { + if msg.Command == wshrpc.Command_RouteAnnounce { router.handleAnnounceMessage(msg, input) continue } + if msg.Command == wshrpc.Command_RouteUnannounce { + router.handleUnannounceMessage(msg) + continue + } if msg.Command != "" { // new comand, setup new rpc ok := router.sendRoutedMessage(msgBytes, routeId) @@ -267,7 +277,7 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) { go func() { // announce if router.GetUpstreamClient() != nil { - announceMsg := RpcMessage{Command: wshrpc.Command_Announce, Source: routeId} + announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId} announceBytes, _ := json.Marshal(announceMsg) router.GetUpstreamClient().SendRpcMessage(announceBytes) } @@ -303,6 +313,12 @@ func (router *WshRouter) UnregisterRoute(routeId string) { router.Lock.Lock() defer router.Lock.Unlock() delete(router.RouteMap, routeId) + // clear out announced routes + for routeId, localRouteId := range router.AnnouncedRoutes { + if localRouteId == routeId { + delete(router.AnnouncedRoutes, routeId) + } + } go func() { wps.Broker.UnsubscribeAll(routeId) }()