From c600027d7274ec5d55b7a0bb68fabd81fda7b1e3 Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 15 Sep 2022 00:17:23 -0700 Subject: [PATCH] remote pty work --- pkg/cmdrunner/cmdrunner.go | 5 +++-- pkg/remote/remote.go | 26 +++++++++++++++++++++++--- pkg/sstore/updatebus.go | 13 ++++++++++--- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/pkg/cmdrunner/cmdrunner.go b/pkg/cmdrunner/cmdrunner.go index 9c58bbaa6..187bfa0d5 100644 --- a/pkg/cmdrunner/cmdrunner.go +++ b/pkg/cmdrunner/cmdrunner.go @@ -564,8 +564,9 @@ func RemoteShowCommand(ctx context.Context, pk *scpacket.FeCommandPacketType) (s } return sstore.ModelUpdate{ Info: &sstore.InfoMsgType{ - InfoTitle: fmt.Sprintf("show remote [%s] info", ids.Remote.DisplayName), - InfoLines: splitLinesForInfo(buf.String()), + InfoTitle: fmt.Sprintf("show remote [%s] info", ids.Remote.DisplayName), + InfoLines: splitLinesForInfo(buf.String()), + PtyRemoteId: state.RemoteId, }, }, nil } diff --git a/pkg/remote/remote.go b/pkg/remote/remote.go index bff2b4df2..dca06d90a 100644 --- a/pkg/remote/remote.go +++ b/pkg/remote/remote.go @@ -30,6 +30,9 @@ const RemoteTypeMShell = "mshell" const DefaultTerm = "xterm-256color" const DefaultMaxPtySize = 1024 * 1024 const CircBufSize = 64 * 1024 +const RemoteTermRows = 10 +const RemoteTermCols = 80 +const PtyReadBufSize = 100 const MShellServerCommand = ` PATH=$PATH:~/.mshell; @@ -457,6 +460,7 @@ func (msh *MShellProc) addControllingTty(ecmd *exec.Cmd) (*os.File, error) { if err != nil { return nil, err } + pty.Setsize(cmdPty, &pty.Winsize{Rows: RemoteTermRows, Cols: RemoteTermCols}) msh.ControllingPty = cmdPty ecmd.ExtraFiles = append(ecmd.ExtraFiles, cmdTty) if ecmd.SysProcAttr == nil { @@ -524,7 +528,21 @@ func (msh *MShellProc) writeToPtyBuffer_nolock(strFmt string, args ...interface{ } else { realStr = realStr[1:] } - msh.PtyBuffer.Write([]byte(realStr)) + curOffset := msh.PtyBuffer.TotalWritten() + data := []byte(realStr) + msh.PtyBuffer.Write(data) + sendRemotePtyUpdate(msh.Remote.RemoteId, curOffset, data) +} + +func sendRemotePtyUpdate(remoteId string, dataOffset int64, data []byte) { + data64 := base64.StdEncoding.EncodeToString(data) + update := &sstore.PtyDataUpdate{ + RemoteId: remoteId, + PtyPos: dataOffset, + PtyData64: data64, + PtyDataLen: int64(len(data)), + } + sstore.MainBus.SendUpdate("", update) } func (msh *MShellProc) Launch() { @@ -534,7 +552,7 @@ func (msh *MShellProc) Launch() { msh.WriteToPtyBuffer("cannot launch archived remote\n") return } - msh.WriteToPtyBuffer("connecting to %s\n", remoteCopy.GetName()) + msh.WriteToPtyBuffer("connecting to %s...\n", remoteCopy.GetName()) sshOpts := convertSSHOpts(remoteCopy.SSHOpts) sshOpts.SSHErrorsToTty = true ecmd := sshOpts.MakeSSHExecCmd(MShellServerCommand) @@ -551,7 +569,7 @@ func (msh *MShellProc) Launch() { } }() go func() { - buf := make([]byte, 100) + buf := make([]byte, PtyReadBufSize) for { n, readErr := cmdPty.Read(buf) if readErr == io.EOF { @@ -562,7 +580,9 @@ func (msh *MShellProc) Launch() { break } msh.WithLock(func() { + curOffset := msh.PtyBuffer.TotalWritten() msh.PtyBuffer.Write(buf[0:n]) + sendRemotePtyUpdate(remoteCopy.RemoteId, curOffset, buf[0:n]) }) } }() diff --git a/pkg/sstore/updatebus.go b/pkg/sstore/updatebus.go index 524b5af3f..7942739aa 100644 --- a/pkg/sstore/updatebus.go +++ b/pkg/sstore/updatebus.go @@ -9,6 +9,7 @@ var MainBus *UpdateBus = MakeUpdateBus() const PtyDataUpdateStr = "pty" const ModelUpdateStr = "model" +const UpdateChSize = 100 type UpdatePacket interface { UpdateType() string @@ -86,6 +87,7 @@ type InfoMsgType struct { InfoCompsMore bool `json:"infocompssmore,omitempty"` InfoLines []string `json:"infolines,omitempty"` TimeoutMs int64 `json:"timeoutms,omitempty"` + PtyRemoteId string `json:"ptyremoteid,omitempty"` } type HistoryInfoType struct { @@ -134,12 +136,12 @@ func (bus *UpdateBus) RegisterChannel(clientId string, sessionId string) chan in if found { close(uch.Ch) uch.SessionId = sessionId - uch.Ch = make(chan interface{}) + uch.Ch = make(chan interface{}, UpdateChSize) } else { uch = UpdateChannel{ ClientId: clientId, SessionId: sessionId, - Ch: make(chan interface{}), + Ch: make(chan interface{}, UpdateChSize), } } bus.Channels[clientId] = uch @@ -161,7 +163,12 @@ func (bus *UpdateBus) SendUpdate(sessionId string, update interface{}) { defer bus.Lock.Unlock() for _, uch := range bus.Channels { if uch.Match(sessionId) { - uch.Ch <- update + select { + case uch.Ch <- update: + + default: + fmt.Printf("[error] dropped update on updatebus uch clientid=%s\n", uch.ClientId) + } } } }