From f8f3ce65fbcd595ac67288cdb604dbdfc1b54849 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 14 Jun 2022 22:39:09 -0700 Subject: [PATCH] get cmddata sent via websocket connection --- cmd/main-server.go | 134 ++++++++++++++++++++--------------------- pkg/wsshell/wsshell.go | 10 +++ 2 files changed, 75 insertions(+), 69 deletions(-) diff --git a/cmd/main-server.go b/cmd/main-server.go index 4da0259ea..a145a3465 100644 --- a/cmd/main-server.go +++ b/cmd/main-server.go @@ -8,9 +8,9 @@ import ( "os" "os/exec" "strings" + "sync" "time" - "github.com/fsnotify/fsnotify" "github.com/google/uuid" "github.com/gorilla/mux" @@ -27,42 +27,29 @@ const HttpTimeoutDuration = 21 * time.Second var GlobalRunnerProc *RunnerProc -type PtyTailWs struct { - Shell *wsshell.WSShell - SessionId string - CmdId string - Position string - Watcher *fsnotify.Watcher +type WsConnType struct { + Id string + Shell *wsshell.WSShell } type RunnerProc struct { - Cmd *exec.Cmd - Input *packet.PacketSender - Output chan packet.PacketType + Lock *sync.Mutex + Cmd *exec.Cmd + Input *packet.PacketSender + Output chan packet.PacketType + WsConnMap map[string]*WsConnType } -func TailFile(tailWs *PtyTailWs) error { -outer: - for { - select { - case event, ok := <-tailWs.Watcher.Events: - if !ok { - break outer - } - if event.Op&fsnotify.Write == fsnotify.Write { - tailWs.Shell.WriteChan <- []byte("*") - } +func (rp *RunnerProc) AddWsConn(ws *WsConnType) { + rp.Lock.Lock() + defer rp.Lock.Unlock() + rp.WsConnMap[ws.Id] = ws +} - case _, ok := <-tailWs.Watcher.Errors: - if !ok { - break outer - } - - case <-tailWs.Shell.CloseChan: - break outer - } - } - return nil +func (rp *RunnerProc) RemoveWsConn(ws *WsConnType) { + rp.Lock.Lock() + defer rp.Lock.Unlock() + delete(rp.WsConnMap, ws.Id) } func HandleWs(w http.ResponseWriter, r *http.Request) { @@ -72,19 +59,11 @@ func HandleWs(w http.ResponseWriter, r *http.Request) { w.Write([]byte(fmt.Sprintf("cannot ugprade websocket: %v", err))) return } - defer shell.Conn.Close() - tailWs := &PtyTailWs{ - Shell: shell, - } - tailWs.Watcher, err = fsnotify.NewWatcher() - if err != nil { - fmt.Printf("Error creating watcher: %v\n", err) - return - } - defer tailWs.Watcher.Close() - go func() { - defer shell.Conn.Close() - TailFile(tailWs) + wsConn := &WsConnType{Id: uuid.New().String(), Shell: shell} + GlobalRunnerProc.AddWsConn(wsConn) + defer func() { + GlobalRunnerProc.RemoveWsConn(wsConn) + wsConn.Shell.Conn.Close() }() for msg := range shell.ReadChan { jmsg := map[string]interface{}{} @@ -93,28 +72,7 @@ func HandleWs(w http.ResponseWriter, r *http.Request) { fmt.Printf("error unmarshalling ws message: %v\n", err) break } - sessionId, ok := jmsg["sessionid"].(string) - if !ok || sessionId == "" { - fmt.Printf("bad ws message, no sessionid\n") - break - } - cmdId, ok := jmsg["cmdid"].(string) - if !ok || cmdId == "" { - fmt.Printf("bad ws message, no cmdId\n") - break - } - if tailWs.SessionId != "" { - fmt.Printf("bad ws message, sessionid already set\n") - break - } - tailWs.SessionId = sessionId - tailWs.CmdId = cmdId - pathStr := GetPtyOutFile(sessionId, cmdId) - err = tailWs.Watcher.Add(pathStr) - if err != nil { - fmt.Printf("error adding watcher: %v\n", err) - break - } + fmt.Printf("got ws message: %v\n", jmsg) } } @@ -220,6 +178,11 @@ func HandleRunCommand(w http.ResponseWriter, r *http.Request) { WriteJsonSuccess(w, &runCommandResponse{Line: rtnLine}) go func() { GlobalRunnerProc.Input.SendPacket(runPacket) + getPacket := packet.MakeGetCmdPacket() + getPacket.SessionId = runPacket.SessionId + getPacket.CmdId = runPacket.CmdId + getPacket.Tail = true + GlobalRunnerProc.Input.SendPacket(getPacket) }() return } @@ -317,15 +280,48 @@ func LaunchRunnerProc() (*RunnerProc, error) { } ecmd.Stderr = nil // /dev/null ecmd.Start() - rtn := &RunnerProc{Cmd: ecmd} + rtn := &RunnerProc{Lock: &sync.Mutex{}, Cmd: ecmd, WsConnMap: make(map[string]*WsConnType)} rtn.Output = packet.PacketParser(outputReader) rtn.Input = packet.MakePacketSender(inputWriter) return rtn, nil } -func ProcessPackets(runner *RunnerProc) { +func (runner *RunnerProc) ForwardDataPacket(pk *packet.CmdDataPacketType) int { + barr, err := json.Marshal(pk) + if err != nil { + fmt.Printf("cannot marshal cmddata packet %s/%s: %v)\n", pk.SessionId, pk.CmdId, err) + return 0 + } + runner.Lock.Lock() + defer runner.Lock.Unlock() + numSent := 0 + for _, ws := range runner.WsConnMap { + ok := ws.Shell.NonBlockingWrite(barr) + if !ok { + fmt.Printf("write was dropped, no queue space in '%s'\n", ws.Id) + continue + } + numSent++ + } + return numSent +} + +func (runner *RunnerProc) ProcessPackets() { for pk := range runner.Output { + if pk.GetType() == packet.CmdDataPacketStr { + dataPacket := pk.(*packet.CmdDataPacketType) + runner.ForwardDataPacket(dataPacket) + fmt.Printf("cmd-data %s/%s pty=%d run=%d\n", dataPacket.SessionId, dataPacket.CmdId, len(dataPacket.PtyData), len(dataPacket.RunData)) + + continue + } + if pk.GetType() == packet.RunnerInitPacketStr { + initPacket := pk.(*packet.RunnerInitPacketType) + fmt.Printf("runner-init %s\n", initPacket.ScHomeDir) + continue + } fmt.Printf("runner-packet: %v\n", pk) + } } @@ -336,7 +332,7 @@ func main() { return } GlobalRunnerProc = runnerProc - go ProcessPackets(runnerProc) + go runnerProc.ProcessPackets() fmt.Printf("Started local runner pid[%d]\n", runnerProc.Cmd.Process.Pid) gr := mux.NewRouter() gr.HandleFunc("/api/ptyout", GetPtyOut) diff --git a/pkg/wsshell/wsshell.go b/pkg/wsshell/wsshell.go index c969b6878..cd620e3b6 100644 --- a/pkg/wsshell/wsshell.go +++ b/pkg/wsshell/wsshell.go @@ -35,6 +35,16 @@ type WSShell struct { ReadChan chan []byte } +func (ws *WSShell) NonBlockingWrite(data []byte) bool { + select { + case ws.WriteChan <- data: + return true + + default: + return false + } +} + func (ws *WSShell) WritePump() { writeWait := 2 * time.Second pingPeriod := 2 * time.Second