From dc70ab4014ef56700af1f75c2534d4e7dd2538d2 Mon Sep 17 00:00:00 2001 From: Mike Sawka Date: Thu, 17 Oct 2024 23:42:55 -0700 Subject: [PATCH] ws reconnection bug + clean up logging (#1058) --- emain/docsite.ts | 5 +++- emain/emain.ts | 2 +- frontend/app/store/ws.ts | 32 +++++++++++++++++--------- frontend/app/view/webview/webview.tsx | 1 - pkg/blockcontroller/blockcontroller.go | 1 - pkg/web/ws.go | 31 +++++++++++++------------ pkg/wshrpc/wshserver/wshserver.go | 1 - 7 files changed, 42 insertions(+), 31 deletions(-) diff --git a/emain/docsite.ts b/emain/docsite.ts index ccdcc602b..ddf9d21b8 100644 --- a/emain/docsite.ts +++ b/emain/docsite.ts @@ -17,7 +17,10 @@ export async function initDocsite() { console.log("Embedded docsite is running, using embedded version for help view"); docsiteUrl = docsiteEmbeddedUrl; } else { - console.log("Embedded docsite is not running, using web version for help view", response); + console.log( + "Embedded docsite is not running, using web version for help view", + "status: " + response?.status + ); docsiteUrl = docsiteWebUrl; } } catch (error) { diff --git a/emain/emain.ts b/emain/emain.ts index de447b415..4713f45e6 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -1075,7 +1075,7 @@ async function relaunchBrowserWindows(): Promise { } for (const win of wins) { await win.readyPromise; - console.log("show", win.waveWindowId); + console.log("show window", win.waveWindowId); win.show(); } } diff --git a/frontend/app/store/ws.ts b/frontend/app/store/ws.ts index 1b11ab326..bbf5477b1 100644 --- a/frontend/app/store/ws.ts +++ b/frontend/app/store/ws.ts @@ -12,6 +12,7 @@ const dlog = debug("wave:ws"); const WarnWebSocketSendSize = 1024 * 1024; // 1MB const MaxWebSocketSendSize = 5 * 1024 * 1024; // 5MB const reconnectHandlers: (() => void)[] = []; +const StableConnTime = 2000; function addWSReconnectHandler(handler: () => void) { reconnectHandlers.push(handler); @@ -45,6 +46,7 @@ class WSControl { lastReconnectTime: number = 0; eoOpts: ElectronOverrideOpts; noReconnect: boolean = false; + onOpenTimeoutId: NodeJS.Timeout = null; constructor( baseHostPort: string, @@ -80,9 +82,15 @@ class WSControl { } : null ); - this.wsConn.onopen = this.onopen.bind(this); - this.wsConn.onmessage = this.onmessage.bind(this); - this.wsConn.onclose = this.onclose.bind(this); + this.wsConn.onopen = (e: Event) => { + this.onopen(e); + }; + this.wsConn.onmessage = (e: MessageEvent) => { + this.onmessage(e); + }; + this.wsConn.onclose = (e: CloseEvent) => { + this.onclose(e); + }; // turns out onerror is not necessary (onclose always follows onerror) // this.wsConn.onerror = this.onerror; } @@ -118,8 +126,11 @@ class WSControl { }, timeout * 1000); } - onclose(event: any) { + onclose(event: CloseEvent) { // console.log("close", event); + if (this.onOpenTimeoutId) { + clearTimeout(this.onOpenTimeoutId); + } if (event.wasClean) { dlog("connection closed"); } else { @@ -132,15 +143,18 @@ class WSControl { } } - onopen() { + onopen(e: Event) { dlog("connection open"); this.open = true; this.opening = false; + this.onOpenTimeoutId = setTimeout(() => { + this.reconnectTimes = 0; + dlog("clear reconnect times"); + }, StableConnTime); for (let handler of reconnectHandlers) { handler(); } this.runMsgQueue(); - // reconnectTimes is reset in onmessage:hello } runMsgQueue() { @@ -157,7 +171,7 @@ class WSControl { }, 100); } - onmessage(event: any) { + onmessage(event: MessageEvent) { let eventData = null; if (event.data != null) { eventData = JSON.parse(event.data); @@ -173,10 +187,6 @@ class WSControl { // nothing return; } - if (eventData.type == "hello") { - this.reconnectTimes = 0; - return; - } if (this.messageCallback) { try { this.messageCallback(eventData); diff --git a/frontend/app/view/webview/webview.tsx b/frontend/app/view/webview/webview.tsx index caa345e4d..376e8e9c5 100644 --- a/frontend/app/view/webview/webview.tsx +++ b/frontend/app/view/webview/webview.tsx @@ -60,7 +60,6 @@ export class WebViewModel implements ViewModel { this.homepageUrl = atom((get) => { const defaultUrl = get(defaultUrlAtom); const pinnedUrl = get(this.blockAtom).meta.pinnedurl; - console.log("homepageUrl", pinnedUrl, defaultUrl); return pinnedUrl ?? defaultUrl; }); this.urlWrapperClassName = atom(""); diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index bd55082a3..4e9bc4886 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -369,7 +369,6 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj bc.ShellProc.Cmd.Write(ic.InputData) } if ic.TermSize != nil { - log.Printf("SETTERMSIZE: %dx%d\n", ic.TermSize.Rows, ic.TermSize.Cols) err = setTermSize(ctx, bc.BlockId, *ic.TermSize) if err != nil { log.Printf("error setting pty size: %v\n", err) diff --git a/pkg/web/ws.go b/pkg/web/ws.go index 4c605fa27..bde3ec167 100644 --- a/pkg/web/ws.go +++ b/pkg/web/ws.go @@ -40,10 +40,10 @@ func RunWebSocketServer(listener net.Listener) { Handler: gr, } server.SetKeepAlivesEnabled(false) - log.Printf("Running websocket server on %s\n", listener.Addr()) + log.Printf("[websocket] running websocket server on %s\n", listener.Addr()) err := server.Serve(listener) if err != nil { - log.Printf("[error] trying to run websocket server: %v\n", err) + log.Printf("[websocket] error trying to run websocket server: %v\n", err) } } @@ -81,7 +81,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan [] r := recover() if r != nil { rtnErr = fmt.Errorf("panic: %v", r) - log.Printf("panic in processMessage: %v\n", r) + log.Printf("[websocket] panic in processMessage: %v\n", r) debug.PrintStack() } if rtnErr == nil { @@ -108,7 +108,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan [] msgBytes, err := json.Marshal(rpcMsg) if err != nil { // this really should never fail since we just unmarshalled this value - log.Printf("error marshalling rpc message: %v\n", err) + log.Printf("[websocket] error marshalling rpc message: %v\n", err) return } rpcInputCh <- msgBytes @@ -125,7 +125,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan [] msgBytes, err := json.Marshal(rpcMsg) if err != nil { // this really should never fail since we just unmarshalled this value - log.Printf("error marshalling rpc message: %v\n", err) + log.Printf("[websocket] error marshalling rpc message: %v\n", err) return } rpcInputCh <- msgBytes @@ -152,7 +152,7 @@ func processMessage(jmsg map[string]any, outputCh chan any, rpcInputCh chan []by processWSCommand(jmsg, outputCh, rpcInputCh) } -func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte) { +func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte, routeId string) { readWait := wsReadWaitTimeout conn.SetReadLimit(64 * 1024) conn.SetReadDeadline(time.Now().Add(readWait)) @@ -160,13 +160,13 @@ func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInpu for { _, message, err := conn.ReadMessage() if err != nil { - log.Printf("ReadPump error: %v\n", err) + log.Printf("[websocket] ReadPump error (%s): %v\n", routeId, err) break } jmsg := map[string]any{} err = json.Unmarshal(message, &jmsg) if err != nil { - log.Printf("Error unmarshalling json: %v\n", err) + log.Printf("[websocket] error unmarshalling json: %v\n", err) break } conn.SetReadDeadline(time.Now().Add(readWait)) @@ -197,7 +197,7 @@ func WritePing(conn *websocket.Conn) error { return nil } -func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) { +func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, routeId string) { ticker := time.NewTicker(wsInitialPingTime) defer ticker.Stop() initialPing := true @@ -211,7 +211,7 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) { } else { barr, err = json.Marshal(msg) if err != nil { - log.Printf("cannot marshal websocket message: %v\n", err) + log.Printf("[websocket] cannot marshal websocket message: %v\n", err) // just loop again break } @@ -219,14 +219,14 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) { err = conn.WriteMessage(websocket.TextMessage, barr) if err != nil { conn.Close() - log.Printf("WritePump error: %v\n", err) + log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err) return } case <-ticker.C: err := WritePing(conn) if err != nil { - log.Printf("WritePump error: %v\n", err) + log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err) return } if initialPing { @@ -250,6 +250,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error { if err != nil { w.WriteHeader(http.StatusUnauthorized) w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err))) + log.Printf("[websocket] error validating authkey: %v\n", err) return err } conn, err := WebSocketUpgrader.Upgrade(w, r, nil) @@ -258,7 +259,6 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error { } defer conn.Close() wsConnId := uuid.New().String() - log.Printf("New websocket connection: windowid:%s connid:%s\n", windowId, wsConnId) outputCh := make(chan any, 100) closeCh := make(chan any) eventbus.RegisterWSChannel(wsConnId, windowId, outputCh) @@ -269,6 +269,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error { routeId = wshutil.MakeWindowRouteId(windowId) } defer eventbus.UnregisterWSChannel(wsConnId) + log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId) // we create a wshproxy to handle rpc messages to/from the window wproxy := wshutil.MakeRpcProxy() wshutil.DefaultRouter.RegisterRoute(routeId, wproxy) @@ -293,12 +294,12 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error { go func() { // read loop defer wg.Done() - ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh) + ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh, routeId) }() go func() { // write loop defer wg.Done() - WriteLoop(conn, outputCh, closeCh) + WriteLoop(conn, outputCh, closeCh, routeId) }() wg.Wait() close(wproxy.FromRemoteCh) diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index fe093250f..91facb368 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -422,7 +422,6 @@ func (ws *WshServer) EventPublishCommand(ctx context.Context, data wps.WaveEvent } func (ws *WshServer) EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error { - log.Printf("EventSubCommand: %v\n", data) rpcSource := wshutil.GetRpcSourceFromContext(ctx) if rpcSource == "" { return fmt.Errorf("no rpc source set")