diff --git a/waveshell/main-waveshell.go b/waveshell/main-waveshell.go index d861cf77c..43493b3c4 100644 --- a/waveshell/main-waveshell.go +++ b/waveshell/main-waveshell.go @@ -156,7 +156,7 @@ func readFullRunPacket(packetParser *packet.PacketParser) (*packet.RunPacketType } func handleSingle(fromServer bool) { - packetParser := packet.MakePacketParser(os.Stdin, false) + packetParser := packet.MakePacketParser(os.Stdin, nil) sender := packet.MakePacketSender(os.Stdout, nil) defer func() { sender.Close() diff --git a/waveshell/pkg/packet/parser.go b/waveshell/pkg/packet/parser.go index 8b1a207a5..5be054b9e 100644 --- a/waveshell/pkg/packet/parser.go +++ b/waveshell/pkg/packet/parser.go @@ -177,13 +177,22 @@ func (p *PacketParser) SetErr(err error) { } } -func MakePacketParser(input io.Reader, rpcHandler bool) *PacketParser { +type PacketParserOpts struct { + RpcHandler bool + IgnoreUntilValid bool +} + +func MakePacketParser(input io.Reader, opts *PacketParserOpts) *PacketParser { + if opts == nil { + opts = &PacketParserOpts{} + } parser := &PacketParser{ Lock: &sync.Mutex{}, MainCh: make(chan PacketType), RpcMap: make(map[string]*RpcEntry), - RpcHandler: rpcHandler, + RpcHandler: opts.RpcHandler, } + ignoreUntilValid := opts.IgnoreUntilValid bufReader := bufio.NewReader(input) go func() { defer func() { @@ -204,11 +213,15 @@ func MakePacketParser(input io.Reader, rpcHandler bool) *PacketParser { // ##[len][json]\n // ##14{"hello":true}\n // ##N{...} + hasPrefix := strings.HasPrefix(line, "##") bracePos := strings.Index(line, "{") - if !strings.HasPrefix(line, "##") || bracePos == -1 { - parser.MainCh <- MakeRawPacket(line[:len(line)-1]) + if !hasPrefix || bracePos == -1 { + if !ignoreUntilValid { + parser.MainCh <- MakeRawPacket(line[:len(line)-1]) + } continue } + ignoreUntilValid = false packetLen := -1 if line[2:bracePos] != "N" { packetLen, err = strconv.Atoi(line[2:bracePos]) diff --git a/waveshell/pkg/server/server.go b/waveshell/pkg/server/server.go index 33475c295..164597b89 100644 --- a/waveshell/pkg/server/server.go +++ b/waveshell/pkg/server/server.go @@ -717,7 +717,7 @@ func RunServer() (int, error) { if debug { packet.GlobalDebug = true } - server.MainInput = packet.MakePacketParser(os.Stdin, false) + server.MainInput = packet.MakePacketParser(os.Stdin, nil) server.Sender = packet.MakePacketSender(os.Stdout, server.packetSenderErrorHandler) defer server.Close() var err error diff --git a/waveshell/pkg/shexec/client.go b/waveshell/pkg/shexec/client.go index ab9e0c02e..22770f924 100644 --- a/waveshell/pkg/shexec/client.go +++ b/waveshell/pkg/shexec/client.go @@ -50,8 +50,8 @@ func MakeClientProc(ctx context.Context, ecmd *exec.Cmd) (*ClientProc, *packet.I return nil, nil, fmt.Errorf("running local client: %w", err) } sender := packet.MakePacketSender(inputWriter, nil) - stdoutPacketParser := packet.MakePacketParser(stdoutReader, false) - stderrPacketParser := packet.MakePacketParser(stderrReader, false) + stdoutPacketParser := packet.MakePacketParser(stdoutReader, &packet.PacketParserOpts{IgnoreUntilValid: true}) + stderrPacketParser := packet.MakePacketParser(stderrReader, nil) packetParser := packet.CombinePacketParsers(stdoutPacketParser, stderrPacketParser, true) cproc := &ClientProc{ Cmd: ecmd, diff --git a/waveshell/pkg/shexec/shexec.go b/waveshell/pkg/shexec/shexec.go index a44c0db55..ce8103326 100644 --- a/waveshell/pkg/shexec/shexec.go +++ b/waveshell/pkg/shexec/shexec.go @@ -754,7 +754,7 @@ func RunInstallFromCmd(ctx context.Context, ecmd *exec.Cmd, tryDetect bool, mshe if mshellStream != nil { sendMShellBinary(inputWriter, mshellStream) } - packetParser := packet.MakePacketParser(stdoutReader, false) + packetParser := packet.MakePacketParser(stdoutReader, nil) err = ecmd.Start() if err != nil { return fmt.Errorf("running ssh command: %w", err) @@ -887,8 +887,8 @@ func RunClientSSHCommandAndWait(runPacket *packet.RunPacketType, fdContext FdCon return nil, fmt.Errorf("running ssh command: %w", err) } defer cmd.Close() - stdoutPacketParser := packet.MakePacketParser(stdoutReader, false) - stderrPacketParser := packet.MakePacketParser(stderrReader, false) + stdoutPacketParser := packet.MakePacketParser(stdoutReader, nil) + stderrPacketParser := packet.MakePacketParser(stderrReader, nil) packetParser := packet.CombinePacketParsers(stdoutPacketParser, stderrPacketParser, false) sender := packet.MakePacketSender(inputWriter, nil) versionOk := false diff --git a/wavesrv/pkg/remote/remote.go b/wavesrv/pkg/remote/remote.go index 193ced352..513e5eadf 100644 --- a/wavesrv/pkg/remote/remote.go +++ b/wavesrv/pkg/remote/remote.go @@ -43,6 +43,11 @@ const RemoteTermCols = 80 const PtyReadBufSize = 100 const RemoteConnectTimeout = 15 * time.Second +// we add this ping packet to the MShellServer Commands in order to deal with spurious SSH output +// basically we guarantee the parser will see a valid packet (either an init error or a ping) +// so we can pass ignoreUntilValid to PacketParser +const PrintPingPacket = `printf "\n##N{\"type\": \"ping\"}\n"` + const MShellServerCommandFmt = ` PATH=$PATH:~/.mshell; which mshell-[%VERSION%] > /dev/null; @@ -50,6 +55,7 @@ if [[ "$?" -ne 0 ]] then printf "\n##N{\"type\": \"init\", \"notfound\": true, \"uname\": \"%s | %s\"}\n" "$(uname -s)" "$(uname -m)" else + [%PINGPACKET%] mshell-[%VERSION%] --server fi ` @@ -60,14 +66,16 @@ func MakeLocalMShellCommandStr(isSudo bool) (string, error) { return "", err } if isSudo { - return fmt.Sprintf("sudo %s --server", mshellPath), nil + return fmt.Sprintf(`%s; sudo %s --server`, PrintPingPacket, mshellPath), nil } else { - return fmt.Sprintf("%s --server", mshellPath), nil + return fmt.Sprintf(`%s; %s --server`, PrintPingPacket, mshellPath), nil } } func MakeServerCommandStr() string { - return strings.ReplaceAll(MShellServerCommandFmt, "[%VERSION%]", semver.MajorMinor(scbase.MShellVersion)) + rtn := strings.ReplaceAll(MShellServerCommandFmt, "[%VERSION%]", semver.MajorMinor(scbase.MShellVersion)) + rtn = strings.ReplaceAll(rtn, "[%PINGPACKET%]", PrintPingPacket) + return rtn } const (