From dbdcc49ff2dafdc22712cf2d967b38c51ac97878 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 16 Sep 2024 19:35:36 -0700 Subject: [PATCH] add ws reconnect handlers -- wps + route announcements --- frontend/app/store/wps.ts | 8 ++++++- frontend/app/store/ws.ts | 40 ++++++++++++++++++++------------ frontend/app/store/wshrouter.ts | 11 +++++++++ frontend/app/store/wshrpcutil.ts | 10 ++++++-- frontend/wave.ts | 3 ++- 5 files changed, 53 insertions(+), 19 deletions(-) diff --git a/frontend/app/store/wps.ts b/frontend/app/store/wps.ts index e094237a9..28dc2cf55 100644 --- a/frontend/app/store/wps.ts +++ b/frontend/app/store/wps.ts @@ -24,6 +24,12 @@ type WaveEventUnsubscribe = { const fileSubjects = new Map>(); const waveEventSubjects = new Map(); +function wpsReconnectHandler() { + for (const eventType of waveEventSubjects.keys()) { + updateWaveEventSub(eventType); + } +} + function makeWaveReSubCommand(eventType: string): RpcMessage { let subjects = waveEventSubjects.get(eventType); if (subjects == null) { @@ -134,4 +140,4 @@ function handleWaveEvent(event: WaveEvent) { } } -export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe }; +export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe, wpsReconnectHandler }; diff --git a/frontend/app/store/ws.ts b/frontend/app/store/ws.ts index c372108b0..5ab134ead 100644 --- a/frontend/app/store/ws.ts +++ b/frontend/app/store/ws.ts @@ -1,9 +1,24 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 +import debug from "debug"; import { sprintf } from "sprintf-js"; +const dlog = debug("wave:ws"); + const MaxWebSocketSendSize = 1024 * 1024; // 1MB +const reconnectHandlers: (() => void)[] = []; + +function addWSReconnectHandler(handler: () => void) { + reconnectHandlers.push(handler); +} + +function removeWSReconnectHandler(handler: () => void) { + const index = this.reconnectHandlers.indexOf(handler); + if (index > -1) { + reconnectHandlers.splice(index, 1); + } +} type WSEventCallback = (arg0: WSEventType) => void; @@ -29,20 +44,12 @@ class WSControl { setInterval(this.sendPing.bind(this), 5000); } - log(str: string) { - const ts = Date.now(); - this.wsLog.push("[" + ts + "] " + str); - if (this.wsLog.length > 50) { - this.wsLog.splice(0, this.wsLog.length - 50); - } - } - connectNow(desc: string) { if (this.open) { return; } this.lastReconnectTime = Date.now(); - this.log(sprintf("try reconnect (%s)", desc)); + dlog("try reconnect:", desc); this.opening = true; this.wsConn = new WebSocket(this.baseHostPort + "/ws?windowid=" + this.windowId); this.wsConn.onopen = this.onopen.bind(this); @@ -61,7 +68,7 @@ class WSControl { } this.reconnectTimes++; if (this.reconnectTimes > 20) { - this.log("cannot connect, giving up"); + dlog("cannot connect, giving up"); return; } const timeoutArr = [0, 0, 2, 5, 10, 10, 30, 60]; @@ -73,7 +80,7 @@ class WSControl { timeout = 1; } if (timeout > 0) { - this.log(sprintf("sleeping %ds", timeout)); + dlog(sprintf("sleeping %ds", timeout)); } setTimeout(() => { this.connectNow(String(this.reconnectTimes)); @@ -83,9 +90,9 @@ class WSControl { onclose(event: any) { // console.log("close", event); if (event.wasClean) { - this.log("connection closed"); + dlog("connection closed"); } else { - this.log("connection error/disconnected"); + dlog("connection error/disconnected"); } if (this.open || this.opening) { this.open = false; @@ -95,9 +102,12 @@ class WSControl { } onopen() { - this.log("connection open"); + dlog("connection open"); this.open = true; this.opening = false; + for (let handler of reconnectHandlers) { + handler(); + } this.runMsgQueue(); // reconnectTimes is reset in onmessage:hello } @@ -174,4 +184,4 @@ class WSControl { } } -export { WSControl }; +export { addWSReconnectHandler, removeWSReconnectHandler, WSControl }; diff --git a/frontend/app/store/wshrouter.ts b/frontend/app/store/wshrouter.ts index e9d1df06a..87a77cf57 100644 --- a/frontend/app/store/wshrouter.ts +++ b/frontend/app/store/wshrouter.ts @@ -34,6 +34,17 @@ class WshRouter { this.upstreamClient = upstreamClient; } + reannounceRoutes() { + for (const [routeId, client] of this.routeMap) { + const announceMsg: RpcMessage = { + command: "routeannounce", + data: routeId, + source: routeId, + }; + this.upstreamClient.recvRpcMessage(announceMsg); + } + } + // returns true if the message was sent _sendRoutedMessage(msg: RpcMessage, destRouteId: string): boolean { const client = this.routeMap.get(destRouteId); diff --git a/frontend/app/store/wshrpcutil.ts b/frontend/app/store/wshrpcutil.ts index 95047efc3..44f0577a6 100644 --- a/frontend/app/store/wshrpcutil.ts +++ b/frontend/app/store/wshrpcutil.ts @@ -1,10 +1,11 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 +import { wpsReconnectHandler } from "@/app/store/wps"; import { WshClient } from "@/app/store/wshclient"; import { makeWindowRouteId, WshRouter } from "@/app/store/wshrouter"; import { getWSServerEndpoint } from "@/util/endpoints"; -import { WSControl } from "./ws"; +import { addWSReconnectHandler, WSControl } from "./ws"; let globalWS: WSControl; let DefaultRouter: WshRouter; @@ -113,7 +114,7 @@ if (globalThis.window != null) { globalThis["consumeGenerator"] = consumeGenerator; } -function initWshrpc(windowId: string) { +function initWshrpc(windowId: string): WSControl { DefaultRouter = new WshRouter(new UpstreamWshRpcProxy()); const handleFn = (event: WSEventType) => { DefaultRouter.recvRpcMessage(event.data); @@ -123,6 +124,11 @@ function initWshrpc(windowId: string) { globalWS.connectNow("connectWshrpc"); WindowRpcClient = new WshClient(makeWindowRouteId(windowId)); DefaultRouter.registerRoute(WindowRpcClient.routeId, WindowRpcClient); + addWSReconnectHandler(() => { + DefaultRouter.reannounceRoutes(); + }); + addWSReconnectHandler(wpsReconnectHandler); + return globalWS; } function sendWSCommand(cmd: WSCommandType) { diff --git a/frontend/wave.ts b/frontend/wave.ts index 9be135584..c314196f7 100644 --- a/frontend/wave.ts +++ b/frontend/wave.ts @@ -59,7 +59,8 @@ document.addEventListener("DOMContentLoaded", async () => { console.log("DOMContentLoaded"); // Init WPS event handlers - initWshrpc(windowId); + const globalWS = initWshrpc(windowId); + (window as any).globalWS = globalWS; (window as any).WindowRpcClient = WindowRpcClient; await loadConnStatus(); initGlobalWaveEventSubs();