updates for websocket ping/pong logic, longer timeouts

This commit is contained in:
sawka 2022-06-15 18:13:16 -07:00
parent 0c1cacc20d
commit 6f7a883cfd
2 changed files with 65 additions and 24 deletions

View File

@ -26,6 +26,9 @@ const HttpWriteTimeout = 21 * time.Second
const HttpMaxHeaderBytes = 60000
const HttpTimeoutDuration = 21 * time.Second
const WebSocketServerAddr = "localhost:8081"
const MainServerAddr = "localhost:8080"
var GlobalRunnerProc *RunnerProc
type WsConnType struct {
@ -39,7 +42,7 @@ type RunnerProc struct {
Input *packet.PacketSender
Output chan packet.PacketType
WsConnMap map[string]*WsConnType
IsLocal bool
Local bool
DoneCh chan bool
}
@ -58,8 +61,7 @@ func (rp *RunnerProc) RemoveWsConn(ws *WsConnType) {
func HandleWs(w http.ResponseWriter, r *http.Request) {
shell, err := wsshell.StartWS(w, r)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("cannot ugprade websocket: %v", err)))
fmt.Printf("WebSocket Upgrade Failed %T: %v\n", w, err)
return
}
wsConn := &WsConnType{Id: uuid.New().String(), Shell: shell}
@ -68,6 +70,7 @@ func HandleWs(w http.ResponseWriter, r *http.Request) {
GlobalRunnerProc.RemoveWsConn(wsConn)
wsConn.Shell.Conn.Close()
}()
fmt.Printf("WebSocket opened %s\n", shell.RemoteAddr)
for msg := range shell.ReadChan {
jmsg := map[string]interface{}{}
err = json.Unmarshal(msg, &jmsg)
@ -181,11 +184,13 @@ func HandleRunCommand(w http.ResponseWriter, r *http.Request) {
WriteJsonSuccess(w, &runCommandResponse{Line: rtnLine})
go func() {
GlobalRunnerProc.Input.SendPacket(runPacket)
getPacket := packet.MakeGetCmdPacket()
getPacket.SessionId = runPacket.SessionId
getPacket.CmdId = runPacket.CmdId
getPacket.Tail = true
GlobalRunnerProc.Input.SendPacket(getPacket)
if !GlobalRunnerProc.Local {
getPacket := packet.MakeGetCmdPacket()
getPacket.SessionId = runPacket.SessionId
getPacket.CmdId = runPacket.CmdId
getPacket.Tail = true
GlobalRunnerProc.Input.SendPacket(getPacket)
}
}()
return
}
@ -283,7 +288,7 @@ func LaunchRunnerProc() (*RunnerProc, error) {
}
ecmd.Stderr = ecmd.Stdout // /dev/null
ecmd.Start()
rtn := &RunnerProc{Lock: &sync.Mutex{}, IsLocal: true, Cmd: ecmd, WsConnMap: make(map[string]*WsConnType)}
rtn := &RunnerProc{Lock: &sync.Mutex{}, Local: true, Cmd: ecmd, WsConnMap: make(map[string]*WsConnType)}
rtn.Output = packet.PacketParser(outputReader)
rtn.Input = packet.MakePacketSender(inputWriter)
rtn.DoneCh = make(chan bool)
@ -343,6 +348,24 @@ func (runner *RunnerProc) ProcessPackets() {
}
}
func runWebSocketServer() {
gr := mux.NewRouter()
gr.HandleFunc("/ws", HandleWs)
server := &http.Server{
Addr: WebSocketServerAddr,
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: gr,
}
server.SetKeepAlivesEnabled(false)
fmt.Printf("Running websocket server on %s\n", WebSocketServerAddr)
err := server.ListenAndServe()
if err != nil {
fmt.Printf("[error] trying to run websocket server: %v\n", err)
}
}
func main() {
runnerProc, err := LaunchRunnerProc()
if err != nil {
@ -352,19 +375,19 @@ func main() {
GlobalRunnerProc = runnerProc
go runnerProc.ProcessPackets()
fmt.Printf("Started local runner pid[%d]\n", runnerProc.Cmd.Process.Pid)
go runWebSocketServer()
gr := mux.NewRouter()
gr.HandleFunc("/api/ptyout", GetPtyOut)
gr.HandleFunc("/ws", HandleWs)
gr.HandleFunc("/api/run-command", HandleRunCommand).Methods("GET", "POST", "OPTIONS")
server := &http.Server{
Addr: "localhost:8080",
Addr: MainServerAddr,
ReadTimeout: HttpReadTimeout,
WriteTimeout: HttpWriteTimeout,
MaxHeaderBytes: HttpMaxHeaderBytes,
Handler: http.TimeoutHandler(gr, HttpTimeoutDuration, "Timeout"),
}
server.SetKeepAlivesEnabled(false)
fmt.Printf("Running on http://localhost:8080\n")
fmt.Printf("Running main server on %s\n", MainServerAddr)
err = server.ListenAndServe()
if err != nil {
fmt.Printf("ERROR: %v\n", err)

View File

@ -12,6 +12,10 @@ import (
"github.com/gorilla/websocket"
)
const readWaitTimeout = 15 * time.Second
const writeWaitTimeout = 10 * time.Second
const pingPeriodTickTime = 10 * time.Second
var upgrader = websocket.Upgrader{
ReadBufferSize: 4 * 1024,
WriteBufferSize: 4 * 1024,
@ -45,31 +49,38 @@ func (ws *WSShell) NonBlockingWrite(data []byte) bool {
}
}
func (ws *WSShell) WritePing() error {
now := time.Now()
pingMessage := map[string]interface{}{"type": "ping", "stime": now.Unix()}
jsonVal, _ := json.Marshal(pingMessage)
_ = ws.Conn.SetWriteDeadline(time.Now().Add(writeWaitTimeout)) // no error
err := ws.Conn.WriteMessage(websocket.TextMessage, jsonVal)
ws.NumPings++
ws.LastPing = now
if err != nil {
return err
}
return nil
}
func (ws *WSShell) WritePump() {
writeWait := 2 * time.Second
pingPeriod := 2 * time.Second
ticker := time.NewTicker(pingPeriod)
ticker := time.NewTicker(pingPeriodTickTime)
defer func() {
ticker.Stop()
ws.Conn.Close()
}()
ws.WritePing()
for {
select {
case <-ticker.C:
now := time.Now()
pingMessage := map[string]interface{}{"type": "ping", "stime": now.Unix()}
jsonVal, _ := json.Marshal(pingMessage)
_ = ws.Conn.SetWriteDeadline(time.Now().Add(writeWait)) // no error
err := ws.Conn.WriteMessage(websocket.TextMessage, jsonVal)
ws.NumPings++
ws.LastPing = now
err := ws.WritePing()
if err != nil {
log.Printf("WritePump %s err: %v\n", ws.RemoteAddr, err)
return
}
case msgBytes := <-ws.WriteChan:
_ = ws.Conn.SetWriteDeadline(time.Now().Add(writeWait)) // no error
_ = ws.Conn.SetWriteDeadline(time.Now().Add(writeWaitTimeout)) // no error
err := ws.Conn.WriteMessage(websocket.TextMessage, msgBytes)
if err != nil {
log.Printf("WritePump %s err: %v\n", ws.RemoteAddr, err)
@ -80,7 +91,7 @@ func (ws *WSShell) WritePump() {
}
func (ws *WSShell) ReadPump() {
readWait := 5 * time.Second
readWait := readWaitTimeout
defer func() {
ws.Conn.Close()
}()
@ -104,6 +115,13 @@ func (ws *WSShell) ReadPump() {
// nothing
continue
}
if str, ok := jmsg["type"].(string); ok && str == "ping" {
now := time.Now()
pongMessage := map[string]interface{}{"type": "pong", "stime": now.Unix()}
jsonVal, _ := json.Marshal(pongMessage)
ws.WriteChan <- jsonVal
continue
}
ws.ReadChan <- message
}