mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-02 18:39:05 +01:00
fix websocket reconnect error (#1064)
This commit is contained in:
parent
455d45f1b9
commit
62a1149a8d
@ -30,6 +30,9 @@ const wsInitialPingTime = 1 * time.Second
|
|||||||
|
|
||||||
const DefaultCommandTimeout = 2 * time.Second
|
const DefaultCommandTimeout = 2 * time.Second
|
||||||
|
|
||||||
|
var GlobalLock = &sync.Mutex{}
|
||||||
|
var RouteToConnMap = map[string]string{} // routeid => connid
|
||||||
|
|
||||||
func RunWebSocketServer(listener net.Listener) {
|
func RunWebSocketServer(listener net.Listener) {
|
||||||
gr := mux.NewRouter()
|
gr := mux.NewRouter()
|
||||||
gr.HandleFunc("/ws", HandleWs)
|
gr.HandleFunc("/ws", HandleWs)
|
||||||
@ -240,6 +243,31 @@ func WriteLoop(conn *websocket.Conn, outputCh chan any, closeCh chan any, routeI
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func registerConn(wsConnId string, routeId string, wproxy *wshutil.WshRpcProxy) {
|
||||||
|
GlobalLock.Lock()
|
||||||
|
defer GlobalLock.Unlock()
|
||||||
|
curConnId := RouteToConnMap[routeId]
|
||||||
|
if curConnId != "" {
|
||||||
|
log.Printf("[websocket] warning: replacing existing connection for route %q\n", routeId)
|
||||||
|
wshutil.DefaultRouter.UnregisterRoute(routeId)
|
||||||
|
}
|
||||||
|
RouteToConnMap[routeId] = wsConnId
|
||||||
|
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unregisterConn(wsConnId string, routeId string) {
|
||||||
|
GlobalLock.Lock()
|
||||||
|
defer GlobalLock.Unlock()
|
||||||
|
curConnId := RouteToConnMap[routeId]
|
||||||
|
if curConnId != wsConnId {
|
||||||
|
// only unregister if we are the current connection (otherwise we were already removed)
|
||||||
|
log.Printf("[websocket] warning: trying to unregister connection %q for route %q but it is not the current connection (ignoring)\n", wsConnId, routeId)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(RouteToConnMap, routeId)
|
||||||
|
wshutil.DefaultRouter.UnregisterRoute(routeId)
|
||||||
|
}
|
||||||
|
|
||||||
func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
|
func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
|
||||||
windowId := r.URL.Query().Get("windowid")
|
windowId := r.URL.Query().Get("windowid")
|
||||||
if windowId == "" {
|
if windowId == "" {
|
||||||
@ -261,23 +289,19 @@ func HandleWsInternal(w http.ResponseWriter, r *http.Request) error {
|
|||||||
wsConnId := uuid.New().String()
|
wsConnId := uuid.New().String()
|
||||||
outputCh := make(chan any, 100)
|
outputCh := make(chan any, 100)
|
||||||
closeCh := make(chan any)
|
closeCh := make(chan any)
|
||||||
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
|
|
||||||
var routeId string
|
var routeId string
|
||||||
if windowId == wshutil.ElectronRoute {
|
if windowId == wshutil.ElectronRoute {
|
||||||
routeId = wshutil.ElectronRoute
|
routeId = wshutil.ElectronRoute
|
||||||
} else {
|
} else {
|
||||||
routeId = wshutil.MakeWindowRouteId(windowId)
|
routeId = wshutil.MakeWindowRouteId(windowId)
|
||||||
}
|
}
|
||||||
defer eventbus.UnregisterWSChannel(wsConnId)
|
|
||||||
log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId)
|
log.Printf("[websocket] new connection: windowid:%s connid:%s routeid:%s\n", windowId, wsConnId, routeId)
|
||||||
// we create a wshproxy to handle rpc messages to/from the window
|
eventbus.RegisterWSChannel(wsConnId, windowId, outputCh)
|
||||||
wproxy := wshutil.MakeRpcProxy()
|
defer eventbus.UnregisterWSChannel(wsConnId)
|
||||||
wshutil.DefaultRouter.RegisterRoute(routeId, wproxy)
|
wproxy := wshutil.MakeRpcProxy() // we create a wshproxy to handle rpc messages to/from the window
|
||||||
defer func() {
|
defer close(wproxy.ToRemoteCh)
|
||||||
wshutil.DefaultRouter.UnregisterRoute(routeId)
|
registerConn(wsConnId, routeId, wproxy)
|
||||||
close(wproxy.ToRemoteCh)
|
defer unregisterConn(wsConnId, routeId)
|
||||||
}()
|
|
||||||
// WshServerFactoryFn(rpcInputCh, rpcOutputCh, wshrpc.RpcContext{})
|
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -274,10 +274,14 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
|
|||||||
log.Printf("[router] registering wsh route %q\n", routeId)
|
log.Printf("[router] registering wsh route %q\n", routeId)
|
||||||
router.Lock.Lock()
|
router.Lock.Lock()
|
||||||
defer router.Lock.Unlock()
|
defer router.Lock.Unlock()
|
||||||
|
alreadyExists := router.RouteMap[routeId] != nil
|
||||||
|
if alreadyExists {
|
||||||
|
log.Printf("[router] warning: route %q already exists (replacing)\n", routeId)
|
||||||
|
}
|
||||||
router.RouteMap[routeId] = rpc
|
router.RouteMap[routeId] = rpc
|
||||||
go func() {
|
go func() {
|
||||||
// announce
|
// announce
|
||||||
if router.GetUpstreamClient() != nil {
|
if !alreadyExists && router.GetUpstreamClient() != nil {
|
||||||
announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId}
|
announceMsg := RpcMessage{Command: wshrpc.Command_RouteAnnounce, Source: routeId}
|
||||||
announceBytes, _ := json.Marshal(announceMsg)
|
announceBytes, _ := json.Marshal(announceMsg)
|
||||||
router.GetUpstreamClient().SendRpcMessage(announceBytes)
|
router.GetUpstreamClient().SendRpcMessage(announceBytes)
|
||||||
|
Loading…
Reference in New Issue
Block a user