ws reconnection bug + clean up logging (#1058)

This commit is contained in:
Mike Sawka 2024-10-17 23:42:55 -07:00 committed by GitHub
parent 123b627640
commit dc70ab4014
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 42 additions and 31 deletions

View File

@ -17,7 +17,10 @@ export async function initDocsite() {
console.log("Embedded docsite is running, using embedded version for help view"); console.log("Embedded docsite is running, using embedded version for help view");
docsiteUrl = docsiteEmbeddedUrl; docsiteUrl = docsiteEmbeddedUrl;
} else { } else {
console.log("Embedded docsite is not running, using web version for help view", response); console.log(
"Embedded docsite is not running, using web version for help view",
"status: " + response?.status
);
docsiteUrl = docsiteWebUrl; docsiteUrl = docsiteWebUrl;
} }
} catch (error) { } catch (error) {

View File

@ -1075,7 +1075,7 @@ async function relaunchBrowserWindows(): Promise<void> {
} }
for (const win of wins) { for (const win of wins) {
await win.readyPromise; await win.readyPromise;
console.log("show", win.waveWindowId); console.log("show window", win.waveWindowId);
win.show(); win.show();
} }
} }

View File

@ -12,6 +12,7 @@ const dlog = debug("wave:ws");
const WarnWebSocketSendSize = 1024 * 1024; // 1MB const WarnWebSocketSendSize = 1024 * 1024; // 1MB
const MaxWebSocketSendSize = 5 * 1024 * 1024; // 5MB const MaxWebSocketSendSize = 5 * 1024 * 1024; // 5MB
const reconnectHandlers: (() => void)[] = []; const reconnectHandlers: (() => void)[] = [];
const StableConnTime = 2000;
function addWSReconnectHandler(handler: () => void) { function addWSReconnectHandler(handler: () => void) {
reconnectHandlers.push(handler); reconnectHandlers.push(handler);
@ -45,6 +46,7 @@ class WSControl {
lastReconnectTime: number = 0; lastReconnectTime: number = 0;
eoOpts: ElectronOverrideOpts; eoOpts: ElectronOverrideOpts;
noReconnect: boolean = false; noReconnect: boolean = false;
onOpenTimeoutId: NodeJS.Timeout = null;
constructor( constructor(
baseHostPort: string, baseHostPort: string,
@ -80,9 +82,15 @@ class WSControl {
} }
: null : null
); );
this.wsConn.onopen = this.onopen.bind(this); this.wsConn.onopen = (e: Event) => {
this.wsConn.onmessage = this.onmessage.bind(this); this.onopen(e);
this.wsConn.onclose = this.onclose.bind(this); };
this.wsConn.onmessage = (e: MessageEvent) => {
this.onmessage(e);
};
this.wsConn.onclose = (e: CloseEvent) => {
this.onclose(e);
};
// turns out onerror is not necessary (onclose always follows onerror) // turns out onerror is not necessary (onclose always follows onerror)
// this.wsConn.onerror = this.onerror; // this.wsConn.onerror = this.onerror;
} }
@ -118,8 +126,11 @@ class WSControl {
}, timeout * 1000); }, timeout * 1000);
} }
onclose(event: any) { onclose(event: CloseEvent) {
// console.log("close", event); // console.log("close", event);
if (this.onOpenTimeoutId) {
clearTimeout(this.onOpenTimeoutId);
}
if (event.wasClean) { if (event.wasClean) {
dlog("connection closed"); dlog("connection closed");
} else { } else {
@ -132,15 +143,18 @@ class WSControl {
} }
} }
onopen() { onopen(e: Event) {
dlog("connection open"); dlog("connection open");
this.open = true; this.open = true;
this.opening = false; this.opening = false;
this.onOpenTimeoutId = setTimeout(() => {
this.reconnectTimes = 0;
dlog("clear reconnect times");
}, StableConnTime);
for (let handler of reconnectHandlers) { for (let handler of reconnectHandlers) {
handler(); handler();
} }
this.runMsgQueue(); this.runMsgQueue();
// reconnectTimes is reset in onmessage:hello
} }
runMsgQueue() { runMsgQueue() {
@ -157,7 +171,7 @@ class WSControl {
}, 100); }, 100);
} }
onmessage(event: any) { onmessage(event: MessageEvent) {
let eventData = null; let eventData = null;
if (event.data != null) { if (event.data != null) {
eventData = JSON.parse(event.data); eventData = JSON.parse(event.data);
@ -173,10 +187,6 @@ class WSControl {
// nothing // nothing
return; return;
} }
if (eventData.type == "hello") {
this.reconnectTimes = 0;
return;
}
if (this.messageCallback) { if (this.messageCallback) {
try { try {
this.messageCallback(eventData); this.messageCallback(eventData);

View File

@ -60,7 +60,6 @@ export class WebViewModel implements ViewModel {
this.homepageUrl = atom((get) => { this.homepageUrl = atom((get) => {
const defaultUrl = get(defaultUrlAtom); const defaultUrl = get(defaultUrlAtom);
const pinnedUrl = get(this.blockAtom).meta.pinnedurl; const pinnedUrl = get(this.blockAtom).meta.pinnedurl;
console.log("homepageUrl", pinnedUrl, defaultUrl);
return pinnedUrl ?? defaultUrl; return pinnedUrl ?? defaultUrl;
}); });
this.urlWrapperClassName = atom(""); this.urlWrapperClassName = atom("");

View File

@ -369,7 +369,6 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
bc.ShellProc.Cmd.Write(ic.InputData) bc.ShellProc.Cmd.Write(ic.InputData)
} }
if ic.TermSize != nil { if ic.TermSize != nil {
log.Printf("SETTERMSIZE: %dx%d\n", ic.TermSize.Rows, ic.TermSize.Cols)
err = setTermSize(ctx, bc.BlockId, *ic.TermSize) err = setTermSize(ctx, bc.BlockId, *ic.TermSize)
if err != nil { if err != nil {
log.Printf("error setting pty size: %v\n", err) log.Printf("error setting pty size: %v\n", err)

View File

@ -40,10 +40,10 @@ func RunWebSocketServer(listener net.Listener) {
Handler: gr, Handler: gr,
} }
server.SetKeepAlivesEnabled(false) server.SetKeepAlivesEnabled(false)
log.Printf("Running websocket server on %s\n", listener.Addr()) log.Printf("[websocket] running websocket server on %s\n", listener.Addr())
err := server.Serve(listener) err := server.Serve(listener)
if err != nil { if err != nil {
log.Printf("[error] trying to run websocket server: %v\n", err) log.Printf("[websocket] error trying to run websocket server: %v\n", err)
} }
} }
@ -81,7 +81,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
r := recover() r := recover()
if r != nil { if r != nil {
rtnErr = fmt.Errorf("panic: %v", r) rtnErr = fmt.Errorf("panic: %v", r)
log.Printf("panic in processMessage: %v\n", r) log.Printf("[websocket] panic in processMessage: %v\n", r)
debug.PrintStack() debug.PrintStack()
} }
if rtnErr == nil { if rtnErr == nil {
@ -108,7 +108,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
msgBytes, err := json.Marshal(rpcMsg) msgBytes, err := json.Marshal(rpcMsg)
if err != nil { if err != nil {
// this really should never fail since we just unmarshalled this value // this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err) log.Printf("[websocket] error marshalling rpc message: %v\n", err)
return return
} }
rpcInputCh <- msgBytes rpcInputCh <- msgBytes
@ -125,7 +125,7 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan []
msgBytes, err := json.Marshal(rpcMsg) msgBytes, err := json.Marshal(rpcMsg)
if err != nil { if err != nil {
// this really should never fail since we just unmarshalled this value // this really should never fail since we just unmarshalled this value
log.Printf("error marshalling rpc message: %v\n", err) log.Printf("[websocket] error marshalling rpc message: %v\n", err)
return return
} }
rpcInputCh <- msgBytes rpcInputCh <- msgBytes
@ -152,7 +152,7 @@ func processMessage(jmsg map[string]any, outputCh chan any, rpcInputCh chan []by
processWSCommand(jmsg, outputCh, rpcInputCh) processWSCommand(jmsg, outputCh, rpcInputCh)
} }
func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte) { func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInputCh chan []byte, routeId string) {
readWait := wsReadWaitTimeout readWait := wsReadWaitTimeout
conn.SetReadLimit(64 * 1024) conn.SetReadLimit(64 * 1024)
conn.SetReadDeadline(time.Now().Add(readWait)) conn.SetReadDeadline(time.Now().Add(readWait))
@ -160,13 +160,13 @@ func ReadLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, rpcInpu
for { for {
_, message, err := conn.ReadMessage() _, message, err := conn.ReadMessage()
if err != nil { if err != nil {
log.Printf("ReadPump error: %v\n", err) log.Printf("[websocket] ReadPump error (%s): %v\n", routeId, err)
break break
} }
jmsg := map[string]any{} jmsg := map[string]any{}
err = json.Unmarshal(message, &jmsg) err = json.Unmarshal(message, &jmsg)
if err != nil { if err != nil {
log.Printf("Error unmarshalling json: %v\n", err) log.Printf("[websocket] error unmarshalling json: %v\n", err)
break break
} }
conn.SetReadDeadline(time.Now().Add(readWait)) conn.SetReadDeadline(time.Now().Add(readWait))
@ -197,7 +197,7 @@ func WritePing(conn *websocket.Conn) error {
return nil return nil
} }
func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) { func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, routeId string) {
ticker := time.NewTicker(wsInitialPingTime) ticker := time.NewTicker(wsInitialPingTime)
defer ticker.Stop() defer ticker.Stop()
initialPing := true initialPing := true
@ -211,7 +211,7 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
} else { } else {
barr, err = json.Marshal(msg) barr, err = json.Marshal(msg)
if err != nil { if err != nil {
log.Printf("cannot marshal websocket message: %v\n", err) log.Printf("[websocket] cannot marshal websocket message: %v\n", err)
// just loop again // just loop again
break break
} }
@ -219,14 +219,14 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any) {
err = conn.WriteMessage(websocket.TextMessage, barr) err = conn.WriteMessage(websocket.TextMessage, barr)
if err != nil { if err != nil {
conn.Close() conn.Close()
log.Printf("WritePump error: %v\n", err) log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err)
return return
} }
case <-ticker.C: case <-ticker.C:
err := WritePing(conn) err := WritePing(conn)
if err != nil { if err != nil {
log.Printf("WritePump error: %v\n", err) log.Printf("[websocket] WritePump error (%s): %v\n", routeId, err)
return return
} }
if initialPing { if initialPing {
@ -250,6 +250,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
if err != nil { if err != nil {
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err))) w.Write([]byte(fmt.Sprintf("error validating authkey: %v", err)))
log.Printf("[websocket] error validating authkey: %v\n", err)
return err return err
} }
conn, err := WebSocketUpgrader.Upgrade(w, r, nil) conn, err := WebSocketUpgrader.Upgrade(w, r, nil)
@ -258,7 +259,6 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
} }
defer conn.Close() defer conn.Close()
wsConnId := uuid.New().String() wsConnId := uuid.New().String()
log.Printf("New websocket connection: windowid:%s connid:%s\n", windowId, wsConnId)
outputCh := make(chan any, 100) outputCh := make(chan any, 100)
closeCh := make(chan any) closeCh := make(chan any)
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh) eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
@ -269,6 +269,7 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
routeId = wshutil.MakeWindowRouteId(windowId) routeId = wshutil.MakeWindowRouteId(windowId)
} }
defer eventbus.UnregisterWSChannel(wsConnId) defer eventbus.UnregisterWSChannel(wsConnId)
log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId)
// we create a wshproxy to handle rpc messages to/from the window // we create a wshproxy to handle rpc messages to/from the window
wproxy := wshutil.MakeRpcProxy() wproxy := wshutil.MakeRpcProxy()
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy) wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
@ -293,12 +294,12 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
go func() { go func() {
// read loop // read loop
defer wg.Done() defer wg.Done()
ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh) ReadLoop(conn, outputCh, closeCh, wproxy.FromRemoteCh, routeId)
}() }()
go func() { go func() {
// write loop // write loop
defer wg.Done() defer wg.Done()
WriteLoop(conn, outputCh, closeCh) WriteLoop(conn, outputCh, closeCh, routeId)
}() }()
wg.Wait() wg.Wait()
close(wproxy.FromRemoteCh) close(wproxy.FromRemoteCh)

View File

@ -422,7 +422,6 @@ func (ws *WshServer) EventPublishCommand(ctx context.Context, data wps.WaveEvent
} }
func (ws *WshServer) EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error { func (ws *WshServer) EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error {
log.Printf("EventSubCommand: %v\n", data)
rpcSource := wshutil.GetRpcSourceFromContext(ctx) rpcSource := wshutil.GetRpcSourceFromContext(ctx)
if rpcSource == "" { if rpcSource == "" {
return fmt.Errorf("no rpc source set") return fmt.Errorf("no rpc source set")