add ws reconnect handlers -- wps + route announcements

This commit is contained in:
sawka 2024-09-16 19:35:36 -07:00
parent 9bf30ce121
commit dbdcc49ff2
5 changed files with 53 additions and 19 deletions

View File

@ -24,6 +24,12 @@ type WaveEventUnsubscribe = {
const fileSubjects = new Map<string, SubjectWithRef<WSFileEventData>>(); const fileSubjects = new Map<string, SubjectWithRef<WSFileEventData>>();
const waveEventSubjects = new Map<string, WaveEventSubjectContainer[]>(); const waveEventSubjects = new Map<string, WaveEventSubjectContainer[]>();
function wpsReconnectHandler() {
for (const eventType of waveEventSubjects.keys()) {
updateWaveEventSub(eventType);
}
}
function makeWaveReSubCommand(eventType: string): RpcMessage { function makeWaveReSubCommand(eventType: string): RpcMessage {
let subjects = waveEventSubjects.get(eventType); let subjects = waveEventSubjects.get(eventType);
if (subjects == null) { if (subjects == null) {
@ -134,4 +140,4 @@ function handleWaveEvent(event: WaveEvent) {
} }
} }
export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe }; export { getFileSubject, handleWaveEvent, waveEventSubscribe, waveEventUnsubscribe, wpsReconnectHandler };

View File

@ -1,9 +1,24 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
import debug from "debug";
import { sprintf } from "sprintf-js"; import { sprintf } from "sprintf-js";
const dlog = debug("wave:ws");
const MaxWebSocketSendSize = 1024 * 1024; // 1MB const MaxWebSocketSendSize = 1024 * 1024; // 1MB
const reconnectHandlers: (() => void)[] = [];
function addWSReconnectHandler(handler: () => void) {
reconnectHandlers.push(handler);
}
function removeWSReconnectHandler(handler: () => void) {
const index = this.reconnectHandlers.indexOf(handler);
if (index > -1) {
reconnectHandlers.splice(index, 1);
}
}
type WSEventCallback = (arg0: WSEventType) => void; type WSEventCallback = (arg0: WSEventType) => void;
@ -29,20 +44,12 @@ class WSControl {
setInterval(this.sendPing.bind(this), 5000); setInterval(this.sendPing.bind(this), 5000);
} }
log(str: string) {
const ts = Date.now();
this.wsLog.push("[" + ts + "] " + str);
if (this.wsLog.length > 50) {
this.wsLog.splice(0, this.wsLog.length - 50);
}
}
connectNow(desc: string) { connectNow(desc: string) {
if (this.open) { if (this.open) {
return; return;
} }
this.lastReconnectTime = Date.now(); this.lastReconnectTime = Date.now();
this.log(sprintf("try reconnect (%s)", desc)); dlog("try reconnect:", desc);
this.opening = true; this.opening = true;
this.wsConn = new WebSocket(this.baseHostPort + "/ws?windowid=" + this.windowId); this.wsConn = new WebSocket(this.baseHostPort + "/ws?windowid=" + this.windowId);
this.wsConn.onopen = this.onopen.bind(this); this.wsConn.onopen = this.onopen.bind(this);
@ -61,7 +68,7 @@ class WSControl {
} }
this.reconnectTimes++; this.reconnectTimes++;
if (this.reconnectTimes > 20) { if (this.reconnectTimes > 20) {
this.log("cannot connect, giving up"); dlog("cannot connect, giving up");
return; return;
} }
const timeoutArr = [0, 0, 2, 5, 10, 10, 30, 60]; const timeoutArr = [0, 0, 2, 5, 10, 10, 30, 60];
@ -73,7 +80,7 @@ class WSControl {
timeout = 1; timeout = 1;
} }
if (timeout > 0) { if (timeout > 0) {
this.log(sprintf("sleeping %ds", timeout)); dlog(sprintf("sleeping %ds", timeout));
} }
setTimeout(() => { setTimeout(() => {
this.connectNow(String(this.reconnectTimes)); this.connectNow(String(this.reconnectTimes));
@ -83,9 +90,9 @@ class WSControl {
onclose(event: any) { onclose(event: any) {
// console.log("close", event); // console.log("close", event);
if (event.wasClean) { if (event.wasClean) {
this.log("connection closed"); dlog("connection closed");
} else { } else {
this.log("connection error/disconnected"); dlog("connection error/disconnected");
} }
if (this.open || this.opening) { if (this.open || this.opening) {
this.open = false; this.open = false;
@ -95,9 +102,12 @@ class WSControl {
} }
onopen() { onopen() {
this.log("connection open"); dlog("connection open");
this.open = true; this.open = true;
this.opening = false; this.opening = false;
for (let handler of reconnectHandlers) {
handler();
}
this.runMsgQueue(); this.runMsgQueue();
// reconnectTimes is reset in onmessage:hello // reconnectTimes is reset in onmessage:hello
} }
@ -174,4 +184,4 @@ class WSControl {
} }
} }
export { WSControl }; export { addWSReconnectHandler, removeWSReconnectHandler, WSControl };

View File

@ -34,6 +34,17 @@ class WshRouter {
this.upstreamClient = upstreamClient; this.upstreamClient = upstreamClient;
} }
reannounceRoutes() {
for (const [routeId, client] of this.routeMap) {
const announceMsg: RpcMessage = {
command: "routeannounce",
data: routeId,
source: routeId,
};
this.upstreamClient.recvRpcMessage(announceMsg);
}
}
// returns true if the message was sent // returns true if the message was sent
_sendRoutedMessage(msg: RpcMessage, destRouteId: string): boolean { _sendRoutedMessage(msg: RpcMessage, destRouteId: string): boolean {
const client = this.routeMap.get(destRouteId); const client = this.routeMap.get(destRouteId);

View File

@ -1,10 +1,11 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
import { wpsReconnectHandler } from "@/app/store/wps";
import { WshClient } from "@/app/store/wshclient"; import { WshClient } from "@/app/store/wshclient";
import { makeWindowRouteId, WshRouter } from "@/app/store/wshrouter"; import { makeWindowRouteId, WshRouter } from "@/app/store/wshrouter";
import { getWSServerEndpoint } from "@/util/endpoints"; import { getWSServerEndpoint } from "@/util/endpoints";
import { WSControl } from "./ws"; import { addWSReconnectHandler, WSControl } from "./ws";
let globalWS: WSControl; let globalWS: WSControl;
let DefaultRouter: WshRouter; let DefaultRouter: WshRouter;
@ -113,7 +114,7 @@ if (globalThis.window != null) {
globalThis["consumeGenerator"] = consumeGenerator; globalThis["consumeGenerator"] = consumeGenerator;
} }
function initWshrpc(windowId: string) { function initWshrpc(windowId: string): WSControl {
DefaultRouter = new WshRouter(new UpstreamWshRpcProxy()); DefaultRouter = new WshRouter(new UpstreamWshRpcProxy());
const handleFn = (event: WSEventType) => { const handleFn = (event: WSEventType) => {
DefaultRouter.recvRpcMessage(event.data); DefaultRouter.recvRpcMessage(event.data);
@ -123,6 +124,11 @@ function initWshrpc(windowId: string) {
globalWS.connectNow("connectWshrpc"); globalWS.connectNow("connectWshrpc");
WindowRpcClient = new WshClient(makeWindowRouteId(windowId)); WindowRpcClient = new WshClient(makeWindowRouteId(windowId));
DefaultRouter.registerRoute(WindowRpcClient.routeId, WindowRpcClient); DefaultRouter.registerRoute(WindowRpcClient.routeId, WindowRpcClient);
addWSReconnectHandler(() => {
DefaultRouter.reannounceRoutes();
});
addWSReconnectHandler(wpsReconnectHandler);
return globalWS;
} }
function sendWSCommand(cmd: WSCommandType) { function sendWSCommand(cmd: WSCommandType) {

View File

@ -59,7 +59,8 @@ document.addEventListener("DOMContentLoaded", async () => {
console.log("DOMContentLoaded"); console.log("DOMContentLoaded");
// Init WPS event handlers // Init WPS event handlers
initWshrpc(windowId); const globalWS = initWshrpc(windowId);
(window as any).globalWS = globalWS;
(window as any).WindowRpcClient = WindowRpcClient; (window as any).WindowRpcClient = WindowRpcClient;
await loadConnStatus(); await loadConnStatus();
initGlobalWaveEventSubs(); initGlobalWaveEventSubs();