fe wsh router + wsh client impl (#381)

This commit is contained in:
Mike Sawka 2024-09-16 11:59:39 -07:00 committed by GitHub
parent 9e405fe3c8
commit 3939648bbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 806 additions and 518 deletions

View File

@ -30,9 +30,12 @@ func generateTypesFile(tsTypesMap map[reflect.Type]string) error {
os.Exit(1)
}
err = tsgen.GenerateWshServerTypes(tsTypesMap)
if err != nil {
return fmt.Errorf("error generating wsh server types: %w", err)
}
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n")
fmt.Fprintf(fd, "declare global {\n\n")
var keys []reflect.Type
for key := range tsTypesMap {
@ -66,7 +69,7 @@ func generateServicesFile(tsTypesMap map[reflect.Type]string) error {
fmt.Fprintf(os.Stderr, "generating services file to %s\n", fd.Name())
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n")
fmt.Fprintf(fd, "import * as WOS from \"./wos\";\n\n")
orderedKeys := utilfn.GetOrderedMapKeys(service.ServiceMap)
for _, serviceName := range orderedKeys {
@ -78,29 +81,29 @@ func generateServicesFile(tsTypesMap map[reflect.Type]string) error {
return nil
}
func generateWshServerFile(tsTypeMap map[reflect.Type]string) error {
fd, err := os.Create("frontend/app/store/wshserver.ts")
func generateWshClientApiFile(tsTypeMap map[reflect.Type]string) error {
fd, err := os.Create("frontend/app/store/wshclientapi.ts")
if err != nil {
return err
}
defer fd.Close()
declMap := wshrpc.GenerateWshCommandDeclMap()
fmt.Fprintf(os.Stderr, "generating wshserver file to %s\n", fd.Name())
fmt.Fprintf(os.Stderr, "generating wshclientapi file to %s\n", fd.Name())
fmt.Fprintf(fd, "// Copyright 2024, Command Line Inc.\n")
fmt.Fprintf(fd, "// SPDX-License-Identifier: Apache-2.0\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generate.go\n\n")
fmt.Fprintf(fd, "import { wshServerRpcHelper_call, wshServerRpcHelper_responsestream } from \"./wshrpc\";\n\n")
fmt.Fprintf(fd, "// generated by cmd/generate/main-generatets.go\n\n")
fmt.Fprintf(fd, "import { WshClient } from \"./wshclient\";\n\n")
orderedKeys := utilfn.GetOrderedMapKeys(declMap)
fmt.Fprintf(fd, "// WshServerCommandToDeclMap\n")
fmt.Fprintf(fd, "class WshServerType {\n")
fmt.Fprintf(fd, "class RpcApiType {\n")
for _, methodDecl := range orderedKeys {
methodDecl := declMap[methodDecl]
methodStr := tsgen.GenerateWshServerMethod(methodDecl, tsTypeMap)
methodStr := tsgen.GenerateWshClientApiMethod(methodDecl, tsTypeMap)
fmt.Fprint(fd, methodStr)
fmt.Fprintf(fd, "\n")
}
fmt.Fprintf(fd, "}\n\n")
fmt.Fprintf(fd, "export const WshServer = new WshServerType();\n")
fmt.Fprintf(fd, "export const RpcApi = new RpcApiType();\n")
return nil
}
@ -121,7 +124,7 @@ func main() {
fmt.Fprintf(os.Stderr, "Error generating services file: %v\n", err)
os.Exit(1)
}
err = generateWshServerFile(tsTypesMap)
err = generateWshClientApiFile(tsTypesMap)
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating wshserver file: %v\n", err)
os.Exit(1)

View File

@ -27,7 +27,8 @@ import {
WOS,
} from "@/app/store/global";
import * as services from "@/app/store/services";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { ErrorBoundary } from "@/element/errorboundary";
import { IconButton } from "@/element/iconbutton";
import { MagnifyIcon } from "@/element/magnify";
@ -307,7 +308,7 @@ const ConnStatusOverlay = React.memo(
}, [width, connStatus, setShowError]);
const handleTryReconnect = React.useCallback(() => {
const prtn = WshServer.ConnConnectCommand(connName, { timeout: 60000 });
const prtn = RpcApi.ConnConnectCommand(WindowRpcClient, connName, { timeout: 60000 });
prtn.catch((e) => console.log("error reconnecting", connName, e));
}, [connName]);
@ -426,7 +427,7 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => {
const connName = blockData?.meta?.connection;
if (!util.isBlank(connName)) {
console.log("ensure conn", nodeModel.blockId, connName);
WshServer.ConnEnsureCommand(connName, { timeout: 60000 }).catch((e) => {
RpcApi.ConnEnsureCommand(WindowRpcClient, connName, { timeout: 60000 }).catch((e) => {
console.log("error ensuring connection", nodeModel.blockId, connName, e);
});
}
@ -525,7 +526,7 @@ const ChangeConnectionBlockModal = React.memo(
setConnList([]);
return;
}
const prtn = WshServer.ConnListCommand({ timeout: 2000 });
const prtn = RpcApi.ConnListCommand(WindowRpcClient, { timeout: 2000 });
prtn.then((newConnList) => {
setConnList(newConnList ?? []);
}).catch((e) => console.log("unable to load conn list from backend. using blank list: ", e));
@ -546,12 +547,12 @@ const ChangeConnectionBlockModal = React.memo(
} else {
newCwd = "~";
}
await WshServer.SetMetaCommand({
await RpcApi.SetMetaCommand(WindowRpcClient, {
oref: WOS.makeORef("block", blockId),
meta: { connection: connName, file: newCwd },
});
try {
await WshServer.ConnEnsureCommand(connName, { timeout: 60000 });
await RpcApi.ConnEnsureCommand(WindowRpcClient, connName, { timeout: 60000 });
} catch (e) {
console.log("error connecting", blockId, connName, e);
}
@ -597,7 +598,7 @@ const ChangeConnectionBlockModal = React.memo(
label: `Reconnect to ${connStatus.connection}`,
value: "",
onSelect: async (_: string) => {
const prtn = WshServer.ConnConnectCommand(connStatus.connection, { timeout: 60000 });
const prtn = RpcApi.ConnConnectCommand(WindowRpcClient, connStatus.connection, { timeout: 60000 });
prtn.catch((e) => console.log("error reconnecting", connStatus.connection, e));
},
};

View File

@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
import { CopyButton } from "@/app/element/copybutton";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { getWebServerEndpoint } from "@/util/endpoints";
import { isBlank, makeConnRoute, useAtomValueSafe } from "@/util/util";
import { clsx } from "clsx";
@ -141,7 +142,9 @@ const MarkdownImg = ({
}
const resolveFn = async () => {
const route = makeConnRoute(resolveOpts.connName);
const fileInfo = await WshServer.RemoteFileJoinCommand([resolveOpts.baseDir, props.src], { route: route });
const fileInfo = await RpcApi.RemoteFileJoinCommand(WindowRpcClient, [resolveOpts.baseDir, props.src], {
route: route,
});
const usp = new URLSearchParams();
usp.set("path", fileInfo.path);
if (!isBlank(resolveOpts.connName)) {

View File

@ -4,12 +4,13 @@
import Logo from "@/app/asset/logo.svg";
import { Button } from "@/app/element/button";
import { Toggle } from "@/app/element/toggle";
import { WshServer } from "@/app/store/wshserver";
import * as services from "@/store/services";
import { OverlayScrollbarsComponent } from "overlayscrollbars-react";
import { useEffect, useRef, useState } from "react";
import { FlexiModal } from "./modal";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import "./tos.less";
const TosModal = () => {
@ -43,7 +44,7 @@ const TosModal = () => {
};
const setTelemetry = (value: boolean) => {
WshServer.SetConfigCommand({ "telemetry:enabled": value })
RpcApi.SetConfigCommand(WindowRpcClient, { "telemetry:enabled": value })
.then(() => {
setTelemetryEnabled(value);
})

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generate/main-generate.go
// generated by cmd/generate/main-generatets.go
import * as WOS from "./wos";

View File

@ -1,6 +1,6 @@
import { isBlank } from "@/util/util";
import { Subject } from "rxjs";
import { sendRawRpcMessage, setRpcEventHandlerFn } from "./wshrpc";
import { sendRawRpcMessage } from "./wshrpcutil";
type WaveEventSubject = {
handler: (event: WaveEvent) => void;
@ -134,8 +134,4 @@ function handleWaveEvent(event: WaveEvent) {
}
}
function initWps() {
setRpcEventHandlerFn(handleWaveEvent);
}
export { getFileSubject, initWps, waveEventSubscribe, waveEventUnsubscribe };
export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe };

View File

@ -0,0 +1,150 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { sendRpcCommand, sendRpcResponse } from "@/app/store/wshrpcutil";
import * as util from "@/util/util";
const notFoundLogMap = new Map<string, boolean>();
class RpcResponseHelper {
client: WshClient;
cmdMsg: RpcMessage;
done: boolean;
constructor(client: WshClient, cmdMsg: RpcMessage) {
this.client = client;
this.cmdMsg = cmdMsg;
// if reqid is null, no response required
this.done = cmdMsg.reqid == null;
}
sendResponse(msg: RpcMessage) {
if (this.done || util.isBlank(this.cmdMsg.reqid)) {
return;
}
if (msg == null) {
msg = {};
}
msg.resid = this.cmdMsg.reqid;
msg.source = this.client.routeId;
sendRpcResponse(msg);
if (!msg.cont) {
this.done = true;
this.client.openRpcs.delete(this.cmdMsg.reqid);
}
}
}
class WshClient {
routeId: string;
openRpcs: Map<string, ClientRpcEntry> = new Map();
constructor(routeId: string) {
this.routeId = routeId;
}
wshRpcCall(command: string, data: any, opts: RpcOpts): Promise<any> {
const msg: RpcMessage = {
command: command,
data: data,
source: this.routeId,
};
if (!opts?.noresponse) {
msg.reqid = crypto.randomUUID();
}
if (opts?.timeout) {
msg.timeout = opts.timeout;
}
if (opts?.route) {
msg.route = opts.route;
}
const rpcGen = sendRpcCommand(this.openRpcs, msg);
if (rpcGen == null) {
return null;
}
const respMsgPromise = rpcGen.next(true); // pass true to force termination of rpc after 1 response (not streaming)
return respMsgPromise.then((msg: IteratorResult<any, void>) => {
return msg.value;
});
}
wshRpcStream(command: string, data: any, opts: RpcOpts): AsyncGenerator<any, void, boolean> {
if (opts?.noresponse) {
throw new Error("noresponse not supported for responsestream calls");
}
const msg: RpcMessage = {
command: command,
data: data,
reqid: crypto.randomUUID(),
source: this.routeId,
};
if (opts?.timeout) {
msg.timeout = opts.timeout;
}
if (opts?.route) {
msg.route = opts.route;
}
const rpcGen = sendRpcCommand(this.openRpcs, msg);
return rpcGen;
}
async handleIncomingCommand(msg: RpcMessage) {
// TODO implement a timeout (setTimeout + sendResponse)
const helper = new RpcResponseHelper(this, msg);
const handlerName = `handle_${msg.command}`;
let prtn: any = null;
if (handlerName in this) {
prtn = this[handlerName](helper, msg.data);
return;
} else {
prtn = this.handle_default(helper, msg);
}
try {
if (prtn instanceof Promise) {
await prtn;
}
} catch (e) {
if (!helper.done) {
helper.sendResponse({ error: e.message });
} else {
console.log(`rpc-client[${this.routeId}] command[${msg.command}] error`, e.message);
}
} finally {
if (!helper.done) {
helper.sendResponse({});
}
}
return;
}
recvRpcMessage(msg: RpcMessage) {
const isRequest = msg.command != null || msg.reqid != null;
if (isRequest) {
this.handleIncomingCommand(msg);
return;
}
if (msg.resid == null) {
console.log("rpc response missing resid", msg);
return;
}
const entry = this.openRpcs.get(msg.resid);
if (entry == null) {
if (!notFoundLogMap.has(msg.resid)) {
notFoundLogMap.set(msg.resid, true);
console.log("rpc response generator not found", msg);
}
return;
}
entry.msgFn(msg);
}
async handle_message(helper: RpcResponseHelper, data: CommandMessageData): Promise<void> {
console.log(`rpc:message[${this.routeId}]`, data?.message);
}
async handle_default(helper: RpcResponseHelper, msg: RpcMessage): Promise<void> {
throw new Error(`rpc command "${msg.command}" not supported by [${this.routeId}]`);
}
}
export { RpcResponseHelper, WshClient };

View File

@ -0,0 +1,212 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generate/main-generatets.go
import { WshClient } from "./wshclient";
// WshServerCommandToDeclMap
class RpcApiType {
// command "authenticate" [call]
AuthenticateCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<CommandAuthenticateRtnData> {
return client.wshRpcCall("authenticate", data, opts);
}
// command "connconnect" [call]
ConnConnectCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connconnect", data, opts);
}
// command "conndisconnect" [call]
ConnDisconnectCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("conndisconnect", data, opts);
}
// command "connensure" [call]
ConnEnsureCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connensure", data, opts);
}
// command "connlist" [call]
ConnListCommand(client: WshClient, opts?: RpcOpts): Promise<string[]> {
return client.wshRpcCall("connlist", null, opts);
}
// command "connreinstallwsh" [call]
ConnReinstallWshCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connreinstallwsh", data, opts);
}
// command "connstatus" [call]
ConnStatusCommand(client: WshClient, opts?: RpcOpts): Promise<ConnStatus[]> {
return client.wshRpcCall("connstatus", null, opts);
}
// command "controllerinput" [call]
ControllerInputCommand(client: WshClient, data: CommandBlockInputData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerinput", data, opts);
}
// command "controllerresync" [call]
ControllerResyncCommand(client: WshClient, data: CommandControllerResyncData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerresync", data, opts);
}
// command "controllerstop" [call]
ControllerStopCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerstop", data, opts);
}
// command "createblock" [call]
CreateBlockCommand(client: WshClient, data: CommandCreateBlockData, opts?: RpcOpts): Promise<ORef> {
return client.wshRpcCall("createblock", data, opts);
}
// command "deleteblock" [call]
DeleteBlockCommand(client: WshClient, data: CommandDeleteBlockData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("deleteblock", data, opts);
}
// command "eventpublish" [call]
EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventpublish", data, opts);
}
// command "eventreadhistory" [call]
EventReadHistoryCommand(client: WshClient, data: CommandEventReadHistoryData, opts?: RpcOpts): Promise<WaveEvent[]> {
return client.wshRpcCall("eventreadhistory", data, opts);
}
// command "eventrecv" [call]
EventRecvCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventrecv", data, opts);
}
// command "eventsub" [call]
EventSubCommand(client: WshClient, data: SubscriptionRequest, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventsub", data, opts);
}
// command "eventunsub" [call]
EventUnsubCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventunsub", data, opts);
}
// command "eventunsuball" [call]
EventUnsubAllCommand(client: WshClient, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventunsuball", null, opts);
}
// command "fileappend" [call]
FileAppendCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("fileappend", data, opts);
}
// command "fileappendijson" [call]
FileAppendIJsonCommand(client: WshClient, data: CommandAppendIJsonData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("fileappendijson", data, opts);
}
// command "fileread" [call]
FileReadCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise<string> {
return client.wshRpcCall("fileread", data, opts);
}
// command "filewrite" [call]
FileWriteCommand(client: WshClient, data: CommandFileData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("filewrite", data, opts);
}
// command "getmeta" [call]
GetMetaCommand(client: WshClient, data: CommandGetMetaData, opts?: RpcOpts): Promise<MetaType> {
return client.wshRpcCall("getmeta", data, opts);
}
// command "message" [call]
MessageCommand(client: WshClient, data: CommandMessageData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("message", data, opts);
}
// command "remotefiledelete" [call]
RemoteFileDeleteCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("remotefiledelete", data, opts);
}
// command "remotefileinfo" [call]
RemoteFileInfoCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<FileInfo> {
return client.wshRpcCall("remotefileinfo", data, opts);
}
// command "remotefilejoin" [call]
RemoteFileJoinCommand(client: WshClient, data: string[], opts?: RpcOpts): Promise<FileInfo> {
return client.wshRpcCall("remotefilejoin", data, opts);
}
// command "remotestreamcpudata" [responsestream]
RemoteStreamCpuDataCommand(client: WshClient, opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return client.wshRpcStream("remotestreamcpudata", null, opts);
}
// command "remotestreamfile" [responsestream]
RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<CommandRemoteStreamFileRtnData, void, boolean> {
return client.wshRpcStream("remotestreamfile", data, opts);
}
// command "remotewritefile" [call]
RemoteWriteFileCommand(client: WshClient, data: CommandRemoteWriteFileData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("remotewritefile", data, opts);
}
// command "resolveids" [call]
ResolveIdsCommand(client: WshClient, data: CommandResolveIdsData, opts?: RpcOpts): Promise<CommandResolveIdsRtnData> {
return client.wshRpcCall("resolveids", data, opts);
}
// command "routeannounce" [call]
RouteAnnounceCommand(client: WshClient, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("routeannounce", null, opts);
}
// command "routeunannounce" [call]
RouteUnannounceCommand(client: WshClient, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("routeunannounce", null, opts);
}
// command "setconfig" [call]
SetConfigCommand(client: WshClient, data: SettingsType, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("setconfig", data, opts);
}
// command "setmeta" [call]
SetMetaCommand(client: WshClient, data: CommandSetMetaData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("setmeta", data, opts);
}
// command "setview" [call]
SetViewCommand(client: WshClient, data: CommandBlockSetViewData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("setview", data, opts);
}
// command "streamcpudata" [responsestream]
StreamCpuDataCommand(client: WshClient, data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return client.wshRpcStream("streamcpudata", data, opts);
}
// command "streamtest" [responsestream]
StreamTestCommand(client: WshClient, opts?: RpcOpts): AsyncGenerator<number, void, boolean> {
return client.wshRpcStream("streamtest", null, opts);
}
// command "streamwaveai" [responsestream]
StreamWaveAiCommand(client: WshClient, data: OpenAiStreamRequest, opts?: RpcOpts): AsyncGenerator<OpenAIPacketType, void, boolean> {
return client.wshRpcStream("streamwaveai", data, opts);
}
// command "test" [call]
TestCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("test", data, opts);
}
}
export const RpcApi = new RpcApiType();

View File

@ -0,0 +1,155 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { handleWaveEvent } from "@/app/store/wps";
import debug from "debug";
import * as util from "../../util/util";
const dlog = debug("wave:router");
const SysRouteName = "sys";
type RouteInfo = {
rpcId: string;
sourceRouteId: string;
destRouteId: string;
};
function makeWindowRouteId(windowId: string): string {
return `window:${windowId}`;
}
function makeFeBlockRouteId(feBlockId: string): string {
return `feblock:${feBlockId}`;
}
class WshRouter {
routeMap: Map<string, AbstractWshClient>; // routeid -> client
upstreamClient: AbstractWshClient;
rpcMap: Map<string, RouteInfo>; // rpcid -> routeinfo
constructor(upstreamClient: AbstractWshClient) {
this.routeMap = new Map();
this.rpcMap = new Map();
this.upstreamClient = upstreamClient;
}
// returns true if the message was sent
_sendRoutedMessage(msg: RpcMessage, destRouteId: string): boolean {
const client = this.routeMap.get(destRouteId);
if (client) {
client.recvRpcMessage(msg);
return true;
}
if (!this.upstreamClient) {
// there should always be an upstream client
return false;
}
this.upstreamClient?.recvRpcMessage(msg);
return true;
}
_handleNoRoute(msg: RpcMessage) {
dlog("no route for message", msg);
if (util.isBlank(msg.reqid)) {
// send a message instead
if (msg.command == "message") {
return;
}
const nrMsg = { command: "message", route: msg.source, data: { message: `no route for ${msg.route}` } };
this._sendRoutedMessage(nrMsg, SysRouteName);
return;
}
// send an error response
const nrMsg = { resid: msg.reqid, error: `no route for ${msg.route}` };
this._sendRoutedMessage(nrMsg, msg.source);
}
_registerRouteInfo(reqid: string, sourceRouteId: string, destRouteId: string) {
dlog("registering route info", reqid, sourceRouteId, destRouteId);
if (util.isBlank(reqid)) {
return;
}
const routeInfo: RouteInfo = {
rpcId: reqid,
sourceRouteId: sourceRouteId,
destRouteId: destRouteId,
};
this.rpcMap.set(reqid, routeInfo);
}
recvRpcMessage(msg: RpcMessage) {
dlog("router received message", msg);
// we are a terminal node by definition, so we don't need to process with announce/unannounce messages
if (msg.command == "routeannounce" || msg.command == "routeunannounce") {
return;
}
// handle events
if (msg.command == "eventrecv") {
handleWaveEvent(msg.data);
return;
}
if (!util.isBlank(msg.command)) {
// send + register routeinfo
const ok = this._sendRoutedMessage(msg, msg.route);
if (!ok) {
this._handleNoRoute(msg);
return;
}
this._registerRouteInfo(msg.reqid, msg.source, msg.route);
return;
}
if (!util.isBlank(msg.reqid)) {
const routeInfo = this.rpcMap.get(msg.reqid);
if (!routeInfo) {
// no route info, discard
return;
}
this._sendRoutedMessage(msg, routeInfo.destRouteId);
return;
}
if (!util.isBlank(msg.resid)) {
const routeInfo = this.rpcMap.get(msg.resid);
if (!routeInfo) {
// no route info, discard
return;
}
this._sendRoutedMessage(msg, routeInfo.sourceRouteId);
if (!msg.cont) {
dlog("deleting route info", msg.resid);
this.rpcMap.delete(msg.resid);
}
return;
}
dlog("bad rpc message recevied by router, no command, reqid, or resid (discarding)", msg);
}
registerRoute(routeId: string, client: AbstractWshClient) {
if (routeId == SysRouteName) {
throw new Error(`Cannot register route with reserved name (${routeId})`);
}
dlog("registering route: ", routeId);
// announce
const announceMsg: RpcMessage = {
command: "routeannounce",
data: routeId,
source: routeId,
};
this.upstreamClient.recvRpcMessage(announceMsg);
this.routeMap.set(routeId, client);
}
unregisterRoute(routeId: string) {
dlog("unregister route: ", routeId);
// unannounce
const unannounceMsg: RpcMessage = {
command: "routeunannounce",
data: routeId,
source: routeId,
};
this.upstreamClient?.recvRpcMessage(unannounceMsg);
this.routeMap.delete(routeId);
}
}
export { makeFeBlockRouteId, makeWindowRouteId, WshRouter };

View File

@ -1,230 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { getWSServerEndpoint } from "@/util/endpoints";
import { WSControl } from "./ws";
type RpcEntry = {
reqId: string;
startTs: number;
command: string;
msgFn: (msg: RpcMessage) => void;
};
const openRpcs = new Map<string, RpcEntry>();
function wshServerRpcHelper_responsestream(
command: string,
data: any,
opts: RpcOpts
): AsyncGenerator<any, void, boolean> {
if (opts?.noresponse) {
throw new Error("noresponse not supported for responsestream calls");
}
const msg: RpcMessage = {
command: command,
data: data,
reqid: crypto.randomUUID(),
};
if (opts?.timeout) {
msg.timeout = opts.timeout;
}
if (opts?.route) {
msg.route = opts.route;
}
const rpcGen = sendRpcCommand(msg);
return rpcGen;
}
function wshServerRpcHelper_call(command: string, data: any, opts: RpcOpts): Promise<any> {
const msg: RpcMessage = {
command: command,
data: data,
};
if (!opts?.noresponse) {
msg.reqid = crypto.randomUUID();
}
if (opts?.timeout) {
msg.timeout = opts.timeout;
}
if (opts?.route) {
msg.route = opts.route;
}
const rpcGen = sendRpcCommand(msg);
if (rpcGen == null) {
return null;
}
const respMsgPromise = rpcGen.next(true); // pass true to force termination of rpc after 1 response (not streaming)
return respMsgPromise.then((msg: IteratorResult<any, void>) => {
return msg.value;
});
}
async function* rpcResponseGenerator(
command: string,
reqid: string,
timeout: number
): AsyncGenerator<any, void, boolean> {
const msgQueue: RpcMessage[] = [];
let signalFn: () => void;
let signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
let timeoutId: NodeJS.Timeout = null;
if (timeout > 0) {
timeoutId = setTimeout(() => {
msgQueue.push({ resid: reqid, error: "EC-TIME: timeout waiting for response" });
signalFn();
}, timeout);
}
const msgFn = (msg: RpcMessage) => {
msgQueue.push(msg);
signalFn();
// reset signal promise
signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
};
openRpcs.set(reqid, {
reqId: reqid,
startTs: Date.now(),
command: command,
msgFn: msgFn,
});
yield null;
try {
while (true) {
while (msgQueue.length > 0) {
const msg = msgQueue.shift()!;
if (msg.error != null) {
throw new Error(msg.error);
}
if (!msg.cont && msg.data == null) {
return;
}
const shouldTerminate = yield msg.data;
if (shouldTerminate) {
sendRpcCancel(reqid);
return;
}
if (!msg.cont) {
return;
}
}
await signalPromise;
}
} finally {
openRpcs.delete(reqid);
if (timeoutId != null) {
clearTimeout(timeoutId);
}
}
}
function sendRpcCancel(reqid: string) {
const rpcMsg: RpcMessage = { reqid: reqid, cancel: true };
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: rpcMsg };
sendWSCommand(wsMsg);
}
function sendRpcCommand(msg: RpcMessage): AsyncGenerator<RpcMessage, void, boolean> {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
sendWSCommand(wsMsg);
if (msg.reqid == null) {
return null;
}
const rtnGen = rpcResponseGenerator(msg.command, msg.reqid, msg.timeout);
rtnGen.next(); // start the generator (run the initialization/registration logic, throw away the result)
return rtnGen;
}
function sendRawRpcMessage(msg: RpcMessage) {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
sendWSCommand(wsMsg);
}
const notFoundLogMap = new Map<string, boolean>();
let rpcEventHandlerFn: (evt: WaveEvent) => void;
function setRpcEventHandlerFn(fn: (evt: WaveEvent) => void) {
if (rpcEventHandlerFn) {
throw new Error("wshrpc.setRpcEventHandlerFn called more than once");
}
rpcEventHandlerFn = fn;
}
function handleIncomingRpcMessage(event: WSEventType) {
if (event.eventtype !== "rpc") {
console.warn("unsupported ws event type:", event.eventtype, event);
}
const msg: RpcMessage = event.data;
const isRequest = msg.command != null || msg.reqid != null;
if (isRequest) {
// handle events
if (msg.command == "eventrecv") {
rpcEventHandlerFn?.(msg.data);
return;
}
if (msg.command == "message") {
if (msg.data?.oref != null) {
console.log("rpc:message", msg.data?.oref, msg.data?.message);
} else {
console.log("rpc:message", msg.data?.message);
}
return;
}
console.log("rpc command not supported", msg);
return;
}
if (msg.resid == null) {
console.log("rpc response missing resid", msg);
return;
}
const entry = openRpcs.get(msg.resid);
if (entry == null) {
if (!notFoundLogMap.has(msg.resid)) {
notFoundLogMap.set(msg.resid, true);
console.log("rpc response generator not found", msg);
}
return;
}
entry.msgFn(msg);
}
async function consumeGenerator(gen: AsyncGenerator<any, any, any>) {
let idx = 0;
try {
for await (const msg of gen) {
console.log("gen", idx, msg);
idx++;
}
const result = await gen.return(undefined);
console.log("gen done", result.value);
} catch (e) {
console.log("gen error", e);
}
}
if (globalThis.window != null) {
globalThis["consumeGenerator"] = consumeGenerator;
}
let globalWS: WSControl;
function initWshrpc(windowId: string) {
globalWS = new WSControl(getWSServerEndpoint(), windowId, handleIncomingRpcMessage);
globalWS.connectNow("connectWshrpc");
}
function sendWSCommand(cmd: WSCommandType) {
globalWS?.sendMessage(cmd);
}
export {
handleIncomingRpcMessage,
initWshrpc,
sendRawRpcMessage,
sendRpcCommand,
sendWSCommand,
setRpcEventHandlerFn,
wshServerRpcHelper_call,
wshServerRpcHelper_responsestream,
};

View File

@ -0,0 +1,147 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { WshClient } from "@/app/store/wshclient";
import { makeWindowRouteId, WshRouter } from "@/app/store/wshrouter";
import { getWSServerEndpoint } from "@/util/endpoints";
import { WSControl } from "./ws";
let globalWS: WSControl;
let DefaultRouter: WshRouter;
let WindowRpcClient: WshClient;
async function* rpcResponseGenerator(
openRpcs: Map<string, ClientRpcEntry>,
command: string,
reqid: string,
timeout: number
): AsyncGenerator<any, void, boolean> {
const msgQueue: RpcMessage[] = [];
let signalFn: () => void;
let signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
let timeoutId: NodeJS.Timeout = null;
if (timeout > 0) {
timeoutId = setTimeout(() => {
msgQueue.push({ resid: reqid, error: "EC-TIME: timeout waiting for response" });
signalFn();
}, timeout);
}
const msgFn = (msg: RpcMessage) => {
msgQueue.push(msg);
signalFn();
// reset signal promise
signalPromise = new Promise<void>((resolve) => (signalFn = resolve));
};
openRpcs.set(reqid, {
reqId: reqid,
startTs: Date.now(),
command: command,
msgFn: msgFn,
});
yield null;
try {
while (true) {
while (msgQueue.length > 0) {
const msg = msgQueue.shift()!;
if (msg.error != null) {
throw new Error(msg.error);
}
if (!msg.cont && msg.data == null) {
return;
}
const shouldTerminate = yield msg.data;
if (shouldTerminate) {
sendRpcCancel(reqid);
return;
}
if (!msg.cont) {
return;
}
}
await signalPromise;
}
} finally {
openRpcs.delete(reqid);
if (timeoutId != null) {
clearTimeout(timeoutId);
}
}
}
function sendRpcCancel(reqid: string) {
const rpcMsg: RpcMessage = { reqid: reqid, cancel: true };
DefaultRouter.recvRpcMessage(rpcMsg);
}
function sendRpcResponse(msg: RpcMessage) {
DefaultRouter.recvRpcMessage(msg);
}
function sendRpcCommand(
openRpcs: Map<string, ClientRpcEntry>,
msg: RpcMessage
): AsyncGenerator<RpcMessage, void, boolean> {
DefaultRouter.recvRpcMessage(msg);
if (msg.reqid == null) {
return null;
}
const rtnGen = rpcResponseGenerator(openRpcs, msg.command, msg.reqid, msg.timeout);
rtnGen.next(); // start the generator (run the initialization/registration logic, throw away the result)
return rtnGen;
}
function sendRawRpcMessage(msg: RpcMessage) {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
sendWSCommand(wsMsg);
}
async function consumeGenerator(gen: AsyncGenerator<any, any, any>) {
let idx = 0;
try {
for await (const msg of gen) {
console.log("gen", idx, msg);
idx++;
}
const result = await gen.return(undefined);
console.log("gen done", result.value);
} catch (e) {
console.log("gen error", e);
}
}
if (globalThis.window != null) {
globalThis["consumeGenerator"] = consumeGenerator;
}
function initWshrpc(windowId: string) {
DefaultRouter = new WshRouter(new UpstreamWshRpcProxy());
const handleFn = (event: WSEventType) => {
DefaultRouter.recvRpcMessage(event.data);
// handleIncomingRpcMessage(globalOpenRpcs, event);
};
globalWS = new WSControl(getWSServerEndpoint(), windowId, handleFn);
globalWS.connectNow("connectWshrpc");
WindowRpcClient = new WshClient(makeWindowRouteId(windowId));
DefaultRouter.registerRoute(WindowRpcClient.routeId, WindowRpcClient);
}
function sendWSCommand(cmd: WSCommandType) {
globalWS?.pushMessage(cmd);
}
class UpstreamWshRpcProxy implements AbstractWshClient {
recvRpcMessage(msg: RpcMessage): void {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
globalWS?.pushMessage(wsMsg);
}
}
export {
DefaultRouter,
initWshrpc,
sendRawRpcMessage,
sendRpcCommand,
sendRpcResponse,
sendWSCommand,
WindowRpcClient,
};

View File

@ -1,207 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generate/main-generate.go
import { wshServerRpcHelper_call, wshServerRpcHelper_responsestream } from "./wshrpc";
// WshServerCommandToDeclMap
class WshServerType {
// command "announce" [call]
AnnounceCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("announce", data, opts);
}
// command "authenticate" [call]
AuthenticateCommand(data: string, opts?: RpcOpts): Promise<CommandAuthenticateRtnData> {
return wshServerRpcHelper_call("authenticate", data, opts);
}
// command "connconnect" [call]
ConnConnectCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("connconnect", data, opts);
}
// command "conndisconnect" [call]
ConnDisconnectCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("conndisconnect", data, opts);
}
// command "connensure" [call]
ConnEnsureCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("connensure", data, opts);
}
// command "connlist" [call]
ConnListCommand(opts?: RpcOpts): Promise<string[]> {
return wshServerRpcHelper_call("connlist", null, opts);
}
// command "connreinstallwsh" [call]
ConnReinstallWshCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("connreinstallwsh", data, opts);
}
// command "connstatus" [call]
ConnStatusCommand(opts?: RpcOpts): Promise<ConnStatus[]> {
return wshServerRpcHelper_call("connstatus", null, opts);
}
// command "controllerinput" [call]
ControllerInputCommand(data: CommandBlockInputData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("controllerinput", data, opts);
}
// command "controllerresync" [call]
ControllerResyncCommand(data: CommandControllerResyncData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("controllerresync", data, opts);
}
// command "controllerstop" [call]
ControllerStopCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("controllerstop", data, opts);
}
// command "createblock" [call]
CreateBlockCommand(data: CommandCreateBlockData, opts?: RpcOpts): Promise<ORef> {
return wshServerRpcHelper_call("createblock", data, opts);
}
// command "deleteblock" [call]
DeleteBlockCommand(data: CommandDeleteBlockData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("deleteblock", data, opts);
}
// command "eventpublish" [call]
EventPublishCommand(data: WaveEvent, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("eventpublish", data, opts);
}
// command "eventreadhistory" [call]
EventReadHistoryCommand(data: CommandEventReadHistoryData, opts?: RpcOpts): Promise<WaveEvent[]> {
return wshServerRpcHelper_call("eventreadhistory", data, opts);
}
// command "eventrecv" [call]
EventRecvCommand(data: WaveEvent, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("eventrecv", data, opts);
}
// command "eventsub" [call]
EventSubCommand(data: SubscriptionRequest, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("eventsub", data, opts);
}
// command "eventunsub" [call]
EventUnsubCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("eventunsub", data, opts);
}
// command "eventunsuball" [call]
EventUnsubAllCommand(opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("eventunsuball", null, opts);
}
// command "fileappend" [call]
FileAppendCommand(data: CommandFileData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("fileappend", data, opts);
}
// command "fileappendijson" [call]
FileAppendIJsonCommand(data: CommandAppendIJsonData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("fileappendijson", data, opts);
}
// command "fileread" [call]
FileReadCommand(data: CommandFileData, opts?: RpcOpts): Promise<string> {
return wshServerRpcHelper_call("fileread", data, opts);
}
// command "filewrite" [call]
FileWriteCommand(data: CommandFileData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("filewrite", data, opts);
}
// command "getmeta" [call]
GetMetaCommand(data: CommandGetMetaData, opts?: RpcOpts): Promise<MetaType> {
return wshServerRpcHelper_call("getmeta", data, opts);
}
// command "message" [call]
MessageCommand(data: CommandMessageData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("message", data, opts);
}
// command "remotefiledelete" [call]
RemoteFileDeleteCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("remotefiledelete", data, opts);
}
// command "remotefileinfo" [call]
RemoteFileInfoCommand(data: string, opts?: RpcOpts): Promise<FileInfo> {
return wshServerRpcHelper_call("remotefileinfo", data, opts);
}
// command "remotefilejoin" [call]
RemoteFileJoinCommand(data: string[], opts?: RpcOpts): Promise<FileInfo> {
return wshServerRpcHelper_call("remotefilejoin", data, opts);
}
// command "remotestreamcpudata" [responsestream]
RemoteStreamCpuDataCommand(opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return wshServerRpcHelper_responsestream("remotestreamcpudata", null, opts);
}
// command "remotestreamfile" [responsestream]
RemoteStreamFileCommand(data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<CommandRemoteStreamFileRtnData, void, boolean> {
return wshServerRpcHelper_responsestream("remotestreamfile", data, opts);
}
// command "remotewritefile" [call]
RemoteWriteFileCommand(data: CommandRemoteWriteFileData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("remotewritefile", data, opts);
}
// command "resolveids" [call]
ResolveIdsCommand(data: CommandResolveIdsData, opts?: RpcOpts): Promise<CommandResolveIdsRtnData> {
return wshServerRpcHelper_call("resolveids", data, opts);
}
// command "setconfig" [call]
SetConfigCommand(data: SettingsType, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("setconfig", data, opts);
}
// command "setmeta" [call]
SetMetaCommand(data: CommandSetMetaData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("setmeta", data, opts);
}
// command "setview" [call]
SetViewCommand(data: CommandBlockSetViewData, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("setview", data, opts);
}
// command "streamcpudata" [responsestream]
StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return wshServerRpcHelper_responsestream("streamcpudata", data, opts);
}
// command "streamtest" [responsestream]
StreamTestCommand(opts?: RpcOpts): AsyncGenerator<number, void, boolean> {
return wshServerRpcHelper_responsestream("streamtest", null, opts);
}
// command "streamwaveai" [responsestream]
StreamWaveAiCommand(data: OpenAiStreamRequest, opts?: RpcOpts): AsyncGenerator<OpenAIPacketType, void, boolean> {
return wshServerRpcHelper_responsestream("streamwaveai", data, opts);
}
// command "test" [call]
TestCommand(data: string, opts?: RpcOpts): Promise<void> {
return wshServerRpcHelper_call("test", data, opts);
}
}
export const WshServer = new WshServerType();

View File

@ -4,7 +4,6 @@
import { useHeight } from "@/app/hook/useHeight";
import { useWidth } from "@/app/hook/useWidth";
import { getConnStatusAtom, globalStore, WOS } from "@/store/global";
import { WshServer } from "@/store/wshserver";
import * as util from "@/util/util";
import * as Plot from "@observablehq/plot";
import dayjs from "dayjs";
@ -13,6 +12,8 @@ import * as jotai from "jotai";
import * as React from "react";
import { waveEventSubscribe } from "@/app/store/wps";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import "./cpuplot.less";
const DefaultNumPoints = 120;
@ -112,7 +113,10 @@ class CpuPlotViewModel {
this.incrementCount = jotai.atom(null, async (get, set) => {
const meta = get(this.blockAtom).meta;
const count = meta.count ?? 0;
await WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { count: count + 1 } });
await RpcApi.SetMetaCommand(WindowRpcClient, {
oref: WOS.makeORef("block", this.blockId),
meta: { count: count + 1 },
});
});
this.connection = jotai.atom((get) => {
const blockData = get(this.blockAtom);
@ -137,7 +141,7 @@ class CpuPlotViewModel {
try {
const numPoints = globalStore.get(this.numPoints);
const connName = globalStore.get(this.connection);
const initialData = await WshServer.EventReadHistoryCommand({
const initialData = await RpcApi.EventReadHistoryCommand(WindowRpcClient, {
event: "sysinfo",
scope: connName,
maxitems: numPoints,

View File

@ -4,7 +4,8 @@
import { TypeAheadModal } from "@/app/modals/typeaheadmodal";
import { ContextMenuModel } from "@/app/store/contextmenu";
import { tryReinjectKey } from "@/app/store/keymodel";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { Markdown } from "@/element/markdown";
import { NodeModel } from "@/layout/index";
import { atoms, createBlock, getConnStatusAtom, globalStore, refocusNode } from "@/store/global";
@ -638,7 +639,7 @@ export class PreviewModel implements ViewModel {
}
const conn = globalStore.get(this.connection);
try {
const newFileInfo = await WshServer.RemoteFileJoinCommand([fileInfo.dir, filePath], {
const newFileInfo = await RpcApi.RemoteFileJoinCommand(WindowRpcClient, [fileInfo.dir, filePath], {
route: makeConnRoute(conn),
});
this.updateOpenFileModalAndError(false);

View File

@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
import { waveEventSubscribe } from "@/app/store/wps";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { VDomView } from "@/app/view/term/vdom";
import { WOS, atoms, getConnStatusAtom, globalStore, useSettingsPrefixAtom } from "@/store/global";
import * as services from "@/store/services";
@ -169,7 +170,10 @@ class TermViewModel {
}
setTerminalTheme(themeName: string) {
WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { "term:theme": themeName } });
RpcApi.SetMetaCommand(WindowRpcClient, {
oref: WOS.makeORef("block", this.blockId),
meta: { "term:theme": themeName },
});
}
getSettingsMenuItems(): ContextMenuItem[] {
@ -199,7 +203,7 @@ class TermViewModel {
rows: this.termRef.current?.terminal?.rows,
cols: this.termRef.current?.terminal?.cols,
};
const prtn = WshServer.ControllerResyncCommand({
const prtn = RpcApi.ControllerResyncCommand(WindowRpcClient, {
tabid: globalStore.get(atoms.activeTabId),
blockid: this.blockId,
forcerestart: true,
@ -264,7 +268,10 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) {
event.preventDefault();
event.stopPropagation();
WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } });
RpcApi.SetMetaCommand(WindowRpcClient, {
oref: WOS.makeORef("block", blockId),
meta: { "term:mode": null },
});
return false;
}
if (
@ -302,7 +309,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
if (shellProcStatusRef.current != "running" && keyutil.checkKeyPressed(waveEvent, "Enter")) {
// restart
const tabId = globalStore.get(atoms.activeTabId);
const prtn = WshServer.ControllerResyncCommand({ tabid: tabId, blockid: blockId });
const prtn = RpcApi.ControllerResyncCommand(WindowRpcClient, { tabid: tabId, blockid: blockId });
prtn.catch((e) => console.log("error controller resync (enter)", blockId, e));
return false;
}
@ -346,7 +353,10 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
const waveEvent = keyutil.adaptFromReactOrNativeKeyEvent(event);
if (keyutil.checkKeyPressed(waveEvent, "Cmd:Escape")) {
// reset term:mode
WshServer.SetMetaCommand({ oref: WOS.makeORef("block", blockId), meta: { "term:mode": null } });
RpcApi.SetMetaCommand(WindowRpcClient, {
oref: WOS.makeORef("block", blockId),
meta: { "term:mode": null },
});
return false;
}
const asciiVal = keyboardEventToASCII(event);
@ -354,7 +364,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
return false;
}
const b64data = util.stringToBase64(asciiVal);
WshServer.ControllerInputCommand({ blockid: blockId, inputdata64: b64data });
RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: blockId, inputdata64: b64data });
return true;
};

View File

@ -1,7 +1,8 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { createBlock } from "@/store/global";
import { getWebServerEndpoint } from "@/util/endpoints";
import { stringToBase64 } from "@/util/util";
@ -100,7 +101,7 @@ function TermSticker({ sticker, config }: { sticker: StickerType; config: Sticke
console.log("clickHandler", sticker.clickcmd, sticker.clickblockdef);
if (sticker.clickcmd) {
const b64data = stringToBase64(sticker.clickcmd);
WshServer.ControllerInputCommand({ blockid: config.blockId, inputdata64: b64data });
RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: config.blockId, inputdata64: b64data });
}
if (sticker.clickblockdef) {
createBlock(sticker.clickblockdef);

View File

@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
import { getFileSubject } from "@/app/store/wps";
import { sendWSCommand } from "@/app/store/wshrpc";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient, sendWSCommand } from "@/app/store/wshrpcutil";
import { PLATFORM, WOS, atoms, fetchWaveFile, globalStore, openLink } from "@/store/global";
import * as services from "@/store/services";
import * as util from "@/util/util";
@ -154,7 +154,7 @@ export class TermWrap {
handleTermData(data: string) {
const b64data = util.stringToBase64(data);
WshServer.ControllerInputCommand({ blockid: this.blockId, inputdata64: b64data });
RpcApi.ControllerInputCommand(WindowRpcClient, { blockid: this.blockId, inputdata64: b64data });
}
addFocusListener(focusFn: () => void) {
@ -219,7 +219,11 @@ export class TermWrap {
const tabId = globalStore.get(atoms.activeTabId);
const rtOpts: RuntimeOpts = { termsize: { rows: this.terminal.rows, cols: this.terminal.cols } };
try {
await WshServer.ControllerResyncCommand({ tabid: tabId, blockid: this.blockId, rtopts: rtOpts });
await RpcApi.ControllerResyncCommand(WindowRpcClient, {
tabid: tabId,
blockid: this.blockId,
rtopts: rtOpts,
});
} catch (e) {
console.log(`error controller resync (${reason})`, this.blockId, e);
}

View File

@ -3,9 +3,10 @@
import { Markdown } from "@/app/element/markdown";
import { TypingIndicator } from "@/app/element/typingindicator";
import { RpcApi } from "@/app/store/wshclientapi";
import { WindowRpcClient } from "@/app/store/wshrpcutil";
import { atoms, fetchWaveFile, getUserName, globalStore, WOS } from "@/store/global";
import { BlockService } from "@/store/services";
import { WshServer } from "@/store/wshserver";
import { adaptFromReactOrNativeKeyEvent, checkKeyPressed } from "@/util/keyutil";
import { isBlank } from "@/util/util";
import { atom, Atom, PrimitiveAtom, useAtomValue, useSetAtom, WritableAtom } from "jotai";
@ -182,7 +183,7 @@ export class WaveAiModel implements ViewModel {
opts: opts,
prompt: [...history, newPrompt],
};
const aiGen = WshServer.StreamWaveAiCommand(beMsg, { timeout: 60000 });
const aiGen = RpcApi.StreamWaveAiCommand(WindowRpcClient, beMsg, { timeout: 60000 });
let fullMsg = "";
for await (const msg of aiGen) {
fullMsg += msg.text ?? "";

View File

@ -283,6 +283,17 @@ declare global {
message: string;
expiration: number;
};
interface AbstractWshClient {
recvRpcMessage(msg: RpcMessage): void;
}
type ClientRpcEntry = {
reqId: string;
startTs: number;
command: string;
msgFn: (msg: RpcMessage) => void;
};
}
export {};

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// generated by cmd/generate/main-generate.go
// generated by cmd/generate/main-generatets.go
declare global {

View File

@ -5,10 +5,12 @@
let net: Electron.Net;
try {
import("electron").then(({ net: electronNet }) => (net = electronNet));
} catch (e) {
// do nothing
if (typeof window === "undefined") {
try {
import("electron").then(({ net: electronNet }) => (net = electronNet));
} catch (e) {
// do nothing
}
}
export function fetch(input: string | GlobalRequest | URL, init?: RequestInit): Promise<Response> {

View File

@ -8,9 +8,8 @@ import {
registerGlobalKeys,
} from "@/app/store/keymodel";
import { FileService, ObjectService } from "@/app/store/services";
import { initWps } from "@/app/store/wps";
import { initWshrpc } from "@/app/store/wshrpc";
import { WshServer } from "@/app/store/wshserver";
import { RpcApi } from "@/app/store/wshclientapi";
import { initWshrpc, WindowRpcClient } from "@/app/store/wshrpcutil";
import { loadMonaco } from "@/app/view/codeeditor/codeeditor";
import { getLayoutModelForActiveTab } from "@/layout/index";
import {
@ -47,7 +46,7 @@ loadFonts();
(window as any).WOS = WOS;
(window as any).globalStore = globalStore;
(window as any).globalAtoms = atoms;
(window as any).WshServer = WshServer;
(window as any).RpcApi = RpcApi;
(window as any).isFullScreen = false;
(window as any).countersPrint = countersPrint;
(window as any).countersClear = countersClear;
@ -61,8 +60,8 @@ document.addEventListener("DOMContentLoaded", async () => {
// Init WPS event handlers
initWshrpc(windowId);
(window as any).WindowRpcClient = WindowRpcClient;
await loadConnStatus();
initWps();
initGlobalWaveEventSubs();
subscribeToConnEvents();

View File

@ -409,17 +409,17 @@ func GenerateServiceClass(serviceName string, serviceObj any, tsTypesMap map[ref
return sb.String()
}
func GenerateWshServerMethod(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
func GenerateWshClientApiMethod(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
if methodDecl.CommandType == wshrpc.RpcType_ResponseStream {
return GenerateWshServerMethod_ResponseStream(methodDecl, tsTypesMap)
return generateWshClientApiMethod_ResponseStream(methodDecl, tsTypesMap)
} else if methodDecl.CommandType == wshrpc.RpcType_Call {
return GenerateWshServerMethod_Call(methodDecl, tsTypesMap)
return generateWshClientApiMethod_Call(methodDecl, tsTypesMap)
} else {
panic(fmt.Sprintf("cannot generate wshserver commandtype %q", methodDecl.CommandType))
}
}
func GenerateWshServerMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
func generateWshClientApiMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" // command %q [%s]\n", methodDecl.Command, methodDecl.CommandType))
respType := "any"
@ -433,16 +433,16 @@ func GenerateWshServerMethod_ResponseStream(methodDecl *wshrpc.WshRpcMethodDecl,
genRespType := fmt.Sprintf("AsyncGenerator<%s, void, boolean>", respType)
if methodDecl.CommandDataType != nil {
cmdDataTsName, _ := TypeToTSType(methodDecl.CommandDataType, tsTypesMap)
sb.WriteString(fmt.Sprintf(" %s(data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, genRespType))
sb.WriteString(fmt.Sprintf(" %s(client: WshClient, data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, genRespType))
} else {
sb.WriteString(fmt.Sprintf(" %s(opts?: RpcOpts): %s {\n", methodDecl.MethodName, genRespType))
sb.WriteString(fmt.Sprintf(" %s(client: WshClient, opts?: RpcOpts): %s {\n", methodDecl.MethodName, genRespType))
}
sb.WriteString(fmt.Sprintf(" return wshServerRpcHelper_responsestream(%q, %s, opts);\n", methodDecl.Command, dataName))
sb.WriteString(fmt.Sprintf(" return client.wshRpcStream(%q, %s, opts);\n", methodDecl.Command, dataName))
sb.WriteString(" }\n")
return sb.String()
}
func GenerateWshServerMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
func generateWshClientApiMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMap map[reflect.Type]string) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" // command %q [%s]\n", methodDecl.Command, methodDecl.CommandType))
rtnType := "Promise<void>"
@ -456,11 +456,11 @@ func GenerateWshServerMethod_Call(methodDecl *wshrpc.WshRpcMethodDecl, tsTypesMa
}
if methodDecl.CommandDataType != nil {
cmdDataTsName, _ := TypeToTSType(methodDecl.CommandDataType, tsTypesMap)
sb.WriteString(fmt.Sprintf(" %s(data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, rtnType))
sb.WriteString(fmt.Sprintf(" %s(client: WshClient, data: %s, opts?: RpcOpts): %s {\n", methodDecl.MethodName, cmdDataTsName, rtnType))
} else {
sb.WriteString(fmt.Sprintf(" %s(opts?: RpcOpts): %s {\n", methodDecl.MethodName, rtnType))
sb.WriteString(fmt.Sprintf(" %s(client: WshClient, opts?: RpcOpts): %s {\n", methodDecl.MethodName, rtnType))
}
methodBody := fmt.Sprintf(" return wshServerRpcHelper_call(%q, %s, opts);\n", methodDecl.Command, dataName)
methodBody := fmt.Sprintf(" return client.wshRpcCall(%q, %s, opts);\n", methodDecl.Command, dataName)
sb.WriteString(methodBody)
sb.WriteString(" }\n")
return sb.String()

View File

@ -13,12 +13,6 @@ import (
"github.com/wavetermdev/waveterm/pkg/wps"
)
// command "announce", wshserver.AnnounceCommand
func AnnounceCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "announce", data, opts)
return err
}
// command "authenticate", wshserver.AuthenticateCommand
func AuthenticateCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) (wshrpc.CommandAuthenticateRtnData, error) {
resp, err := sendRpcRequestCallHelper[wshrpc.CommandAuthenticateRtnData](w, "authenticate", data, opts)
@ -203,6 +197,18 @@ func ResolveIdsCommand(w *wshutil.WshRpc, data wshrpc.CommandResolveIdsData, opt
return resp, err
}
// command "routeannounce", wshserver.RouteAnnounceCommand
func RouteAnnounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "routeannounce", nil, opts)
return err
}
// command "routeunannounce", wshserver.RouteUnannounceCommand
func RouteUnannounceCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "routeunannounce", nil, opts)
return err
}
// command "setconfig", wshserver.SetConfigCommand
func SetConfigCommand(w *wshutil.WshRpc, data wconfig.MetaSettingsType, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "setconfig", data, opts)

View File

@ -26,8 +26,9 @@ const (
)
const (
Command_Authenticate = "authenticate" // special
Command_Announce = "announce" // special (for routing)
Command_Authenticate = "authenticate" // special
Command_RouteAnnounce = "routeannounce" // special (for routing)
Command_RouteUnannounce = "routeunannounce" // special (for routing)
Command_Message = "message"
Command_GetMeta = "getmeta"
Command_SetMeta = "setmeta"
@ -73,7 +74,8 @@ type RespOrErrorUnion[T any] struct {
type WshRpcInterface interface {
AuthenticateCommand(ctx context.Context, data string) (CommandAuthenticateRtnData, error)
AnnounceCommand(ctx context.Context, data string) error // (special) announces a new route to the main router
RouteAnnounceCommand(ctx context.Context) error // (special) announces a new route to the main router
RouteUnannounceCommand(ctx context.Context) error // (special) unannounces a route to the main router
MessageCommand(ctx context.Context, data CommandMessageData) error
GetMetaCommand(ctx context.Context, data CommandGetMetaData) (waveobj.MetaMapType, error)

View File

@ -158,6 +158,12 @@ func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute
router.AnnouncedRoutes[msg.Source] = input.fromRouteId
}
func (router *WshRouter) handleUnannounceMessage(msg RpcMessage) {
router.Lock.Lock()
defer router.Lock.Unlock()
delete(router.AnnouncedRoutes, msg.Source)
}
func (router *WshRouter) getAnnouncedRoute(routeId string) string {
router.Lock.Lock()
defer router.Lock.Unlock()
@ -197,10 +203,14 @@ func (router *WshRouter) runServer() {
continue
}
routeId := msg.Route
if msg.Command == wshrpc.Command_Announce {
if msg.Command == wshrpc.Command_RouteAnnounce {
router.handleAnnounceMessage(msg, input)
continue
}
if msg.Command == wshrpc.Command_RouteUnannounce {
router.handleUnannounceMessage(msg)
continue
}
if msg.Command != "" {
// new comand, setup new rpc
ok := router.sendRoutedMessage(msgBytes, routeId)
@ -267,7 +277,7 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
go func() {
// announce
if router.GetUpstreamClient() != nil {
announceMsg := RpcMessage{Command: wshrpc.Command_Announce, Source: routeId}
announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId}
announceBytes, _ := json.Marshal(announceMsg)
router.GetUpstreamClient().SendRpcMessage(announceBytes)
}
@ -303,6 +313,12 @@ func (router *WshRouter) UnregisterRoute(routeId string) {
router.Lock.Lock()
defer router.Lock.Unlock()
delete(router.RouteMap, routeId)
// clear out announced routes
for routeId, localRouteId := range router.AnnouncedRoutes {
if localRouteId == routeId {
delete(router.AnnouncedRoutes, routeId)
}
}
go func() {
wps.Broker.UnsubscribeAll(routeId)
}()