sync command to synchronize shell state with wave prompt (#444)

* remove two unused packet types, remove unused detatched command code

* CmdStart is invalid in this command loop

* slight refactor, remove closure funcs

* pass rct through to 'handle' funcs

* deal with rct (running command), update handler funcs accordingly

* update for runningcmdtype to be a pointer in the map (for updates)

* lots of changes related to ephemeral commands (for sync), checkpoint

* fix ephemeral setting

* sync shell state when you switch to a new tab
This commit is contained in:
Mike Sawka 2024-03-13 18:52:41 -07:00 committed by GitHub
parent 550d9c9716
commit bff51c851a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 194 additions and 839 deletions

View File

@ -304,6 +304,10 @@ class CommandRunner {
GlobalModel.clientSettingsViewModel.showClientSettingsView();
}
syncShellState() {
GlobalModel.submitCommand("sync", null, null, { nohist: "1" }, false);
}
historyView(params: HistorySearchParams) {
let kwargs = { nohist: "1" };
kwargs["offset"] = String(params.offset);

View File

@ -1050,6 +1050,9 @@ class Model {
this.activeMainView.set("session");
this.deactivateScreenLines();
this.ws.watchScreen(newActiveSessionId, newActiveScreenId);
setTimeout(() => {
GlobalCommandRunner.syncShellState();
}, 100);
}
} else {
console.warn("unknown update", genUpdate);

View File

@ -61,15 +61,7 @@ func handleSingle() {
sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("run packets from server must have a CK: %v", err))
}
if runPacket.Detached {
cmd, startPk, err := shexec.RunCommandDetached(runPacket, sender)
if err != nil {
sender.SendErrorResponse(runPacket.ReqId, err)
return
}
sender.SendPacket(startPk)
sender.Close()
sender.WaitForDone()
cmd.DetachedWait(startPk)
sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("detached mode not supported"))
return
} else {
shexec.IgnoreSigPipe()

View File

@ -1,473 +0,0 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package cmdtail
import (
"encoding/base64"
"fmt"
"io"
"os"
"regexp"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/wavetermdev/waveterm/waveshell/pkg/base"
"github.com/wavetermdev/waveterm/waveshell/pkg/packet"
)
const MaxDataBytes = 4096
const FileTypePty = "ptyout"
const FileTypeRun = "runout"
type Tailer struct {
Lock *sync.Mutex
WatchList map[base.CommandKey]CmdWatchEntry
Watcher *fsnotify.Watcher
Sender *packet.PacketSender
Gen FileNameGenerator
Sessions map[string]bool
}
type TailPos struct {
ReqId string
Running bool // an active tailer sending data
TailPtyPos int64
TailRunPos int64
Follow bool
}
type CmdWatchEntry struct {
CmdKey base.CommandKey
FilePtyLen int64
FileRunLen int64
Tails []TailPos
Done bool
}
type FileNameGenerator interface {
PtyOutFile(ck base.CommandKey) string
RunOutFile(ck base.CommandKey) string
SessionDir(sessionId string) string
}
func (w CmdWatchEntry) getTailPos(reqId string) (TailPos, bool) {
for _, pos := range w.Tails {
if pos.ReqId == reqId {
return pos, true
}
}
return TailPos{}, false
}
func (w *CmdWatchEntry) updateTailPos(reqId string, newPos TailPos) {
for idx, pos := range w.Tails {
if pos.ReqId == reqId {
w.Tails[idx] = newPos
return
}
}
w.Tails = append(w.Tails, newPos)
}
func (w *CmdWatchEntry) removeTailPos(reqId string) {
var newTails []TailPos
for _, pos := range w.Tails {
if pos.ReqId == reqId {
continue
}
newTails = append(newTails, pos)
}
w.Tails = newTails
}
func (pos TailPos) IsCurrent(entry CmdWatchEntry) bool {
return pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen
}
func (t *Tailer) updateTailPos_nolock(cmdKey base.CommandKey, reqId string, pos TailPos) {
entry, found := t.WatchList[cmdKey]
if !found {
return
}
entry.updateTailPos(reqId, pos)
t.WatchList[cmdKey] = entry
}
func (t *Tailer) removeTailPos(cmdKey base.CommandKey, reqId string) {
t.Lock.Lock()
defer t.Lock.Unlock()
t.removeTailPos_nolock(cmdKey, reqId)
}
func (t *Tailer) removeTailPos_nolock(cmdKey base.CommandKey, reqId string) {
entry, found := t.WatchList[cmdKey]
if !found {
return
}
entry.removeTailPos(reqId)
t.WatchList[cmdKey] = entry
if len(entry.Tails) == 0 {
t.removeWatch_nolock(cmdKey)
}
}
func (t *Tailer) removeWatch_nolock(cmdKey base.CommandKey) {
// delete from watchlist, remove watches
delete(t.WatchList, cmdKey)
t.Watcher.Remove(t.Gen.PtyOutFile(cmdKey))
t.Watcher.Remove(t.Gen.RunOutFile(cmdKey))
}
func (t *Tailer) getEntryAndPos_nolock(cmdKey base.CommandKey, reqId string) (CmdWatchEntry, TailPos, bool) {
entry, found := t.WatchList[cmdKey]
if !found {
return CmdWatchEntry{}, TailPos{}, false
}
pos, found := entry.getTailPos(reqId)
if !found {
return CmdWatchEntry{}, TailPos{}, false
}
return entry, pos, true
}
func (t *Tailer) addSessionWatcher(sessionId string) error {
t.Lock.Lock()
defer t.Lock.Unlock()
if t.Sessions[sessionId] {
return nil
}
sdir := t.Gen.SessionDir(sessionId)
err := t.Watcher.Add(sdir)
if err != nil {
return err
}
t.Sessions[sessionId] = true
return nil
}
func (t *Tailer) removeSessionWatcher(sessionId string) {
t.Lock.Lock()
defer t.Lock.Unlock()
if !t.Sessions[sessionId] {
return
}
sdir := t.Gen.SessionDir(sessionId)
t.Watcher.Remove(sdir)
}
func MakeTailer(sender *packet.PacketSender, gen FileNameGenerator) (*Tailer, error) {
rtn := &Tailer{
Lock: &sync.Mutex{},
WatchList: make(map[base.CommandKey]CmdWatchEntry),
Sessions: make(map[string]bool),
Sender: sender,
Gen: gen,
}
var err error
rtn.Watcher, err = fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return rtn, nil
}
func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]byte, error) {
fd, err := os.Open(fileName)
defer fd.Close()
if err != nil {
return nil, err
}
buf := make([]byte, maxBytes)
nr, err := fd.ReadAt(buf, pos)
if err != nil && err != io.EOF { // ignore EOF error
return nil, err
}
return buf[0:nr], nil
}
func (t *Tailer) makeCmdDataPacket(entry CmdWatchEntry, pos TailPos) (*packet.CmdDataPacketType, error) {
dataPacket := packet.MakeCmdDataPacket(pos.ReqId)
dataPacket.CK = entry.CmdKey
dataPacket.PtyPos = pos.TailPtyPos
dataPacket.RunPos = pos.TailRunPos
if entry.FilePtyLen > pos.TailPtyPos {
ptyData, err := t.readDataFromFile(t.Gen.PtyOutFile(entry.CmdKey), pos.TailPtyPos, MaxDataBytes)
if err != nil {
return nil, err
}
dataPacket.PtyData64 = base64.StdEncoding.EncodeToString(ptyData)
dataPacket.PtyDataLen = len(ptyData)
}
if entry.FileRunLen > pos.TailRunPos {
runData, err := t.readDataFromFile(t.Gen.RunOutFile(entry.CmdKey), pos.TailRunPos, MaxDataBytes)
if err != nil {
return nil, err
}
dataPacket.RunData64 = base64.StdEncoding.EncodeToString(runData)
dataPacket.RunDataLen = len(runData)
}
return dataPacket, nil
}
// returns (data-packet, keepRunning)
func (t *Tailer) runSingleDataTransfer(key base.CommandKey, reqId string) (*packet.CmdDataPacketType, bool, error) {
t.Lock.Lock()
entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId)
t.Lock.Unlock()
if !foundPos {
return nil, false, nil
}
dataPacket, dataErr := t.makeCmdDataPacket(entry, pos)
t.Lock.Lock()
defer t.Lock.Unlock()
entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId)
if !foundPos {
return nil, false, nil
}
// pos was updated between first and second get, throw out data-packet and re-run
if pos.TailPtyPos != dataPacket.PtyPos || pos.TailRunPos != dataPacket.RunPos {
return nil, true, nil
}
if dataErr != nil {
// error, so return error packet, and stop running
pos.Running = false
t.updateTailPos_nolock(key, reqId, pos)
return nil, false, dataErr
}
pos.TailPtyPos += int64(dataPacket.PtyDataLen)
pos.TailRunPos += int64(dataPacket.RunDataLen)
if pos.IsCurrent(entry) {
// we caught up, tail position equals file length
pos.Running = false
}
t.updateTailPos_nolock(key, reqId, pos)
return dataPacket, pos.Running, nil
}
// returns (removed)
func (t *Tailer) checkRemove(cmdKey base.CommandKey, reqId string) bool {
t.Lock.Lock()
defer t.Lock.Unlock()
entry, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId)
if !foundPos {
return false
}
if !pos.IsCurrent(entry) {
return false
}
if !pos.Follow || entry.Done {
t.removeTailPos_nolock(cmdKey, reqId)
return true
}
return false
}
func (t *Tailer) RunDataTransfer(key base.CommandKey, reqId string) {
for {
dataPacket, keepRunning, err := t.runSingleDataTransfer(key, reqId)
if dataPacket != nil {
t.Sender.SendPacket(dataPacket)
}
if err != nil {
t.removeTailPos(key, reqId)
t.Sender.SendErrorResponse(reqId, err)
break
}
if !keepRunning {
removed := t.checkRemove(key, reqId)
if removed {
t.Sender.SendResponse(reqId, true)
}
break
}
time.Sleep(10 * time.Millisecond)
}
}
func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) {
if pos.Running {
return
}
if pos.IsCurrent(entry) {
return
}
pos.Running = true
t.updateTailPos_nolock(entry.CmdKey, pos.ReqId, pos)
go t.RunDataTransfer(entry.CmdKey, pos.ReqId)
}
var updateFileRe = regexp.MustCompile("/([a-z0-9-]+)/([a-z0-9-]+)\\.(ptyout|runout)$")
func (t *Tailer) updateFile(relFileName string) {
m := updateFileRe.FindStringSubmatch(relFileName)
if m == nil {
return
}
finfo, err := os.Stat(relFileName)
if err != nil {
t.Sender.SendPacket(packet.FmtMessagePacket("error trying to stat file '%s': %v", relFileName, err))
return
}
cmdKey := base.MakeCommandKey(m[1], m[2])
t.Lock.Lock()
defer t.Lock.Unlock()
entry, foundEntry := t.WatchList[cmdKey]
if !foundEntry {
return
}
fileType := m[3]
if fileType == FileTypePty {
entry.FilePtyLen = finfo.Size()
} else if fileType == FileTypeRun {
entry.FileRunLen = finfo.Size()
}
t.WatchList[cmdKey] = entry
for _, pos := range entry.Tails {
t.tryStartRun_nolock(entry, pos)
}
}
func (t *Tailer) Run() {
for {
select {
case event, ok := <-t.Watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
t.updateFile(event.Name)
}
case err, ok := <-t.Watcher.Errors:
if !ok {
return
}
// what to do with this error? just send a message
t.Sender.SendPacket(packet.FmtMessagePacket("error in tailer: %v", err))
}
}
}
func (t *Tailer) Close() error {
return t.Watcher.Close()
}
func max(v1 int64, v2 int64) int64 {
if v1 > v2 {
return v1
}
return v2
}
func (entry *CmdWatchEntry) fillFilePos(gen FileNameGenerator) {
ptyInfo, _ := os.Stat(gen.PtyOutFile(entry.CmdKey))
if ptyInfo != nil {
entry.FilePtyLen = ptyInfo.Size()
}
runoutInfo, _ := os.Stat(gen.RunOutFile(entry.CmdKey))
if runoutInfo != nil {
entry.FileRunLen = runoutInfo.Size()
}
}
func (t *Tailer) KeyDone(key base.CommandKey) {
t.Lock.Lock()
defer t.Lock.Unlock()
entry, foundEntry := t.WatchList[key]
if !foundEntry {
return
}
entry.Done = true
var newTails []TailPos
for _, pos := range entry.Tails {
if pos.IsCurrent(entry) {
continue
}
newTails = append(newTails, pos)
}
entry.Tails = newTails
t.WatchList[key] = entry
if len(entry.Tails) == 0 {
t.removeWatch_nolock(key)
}
t.WatchList[key] = entry
}
func (t *Tailer) RemoveWatch(pk *packet.UntailCmdPacketType) {
t.Lock.Lock()
defer t.Lock.Unlock()
t.removeTailPos_nolock(pk.CK, pk.ReqId)
}
func (t *Tailer) AddFileWatches_nolock(key base.CommandKey, ptyOnly bool) error {
ptyName := t.Gen.PtyOutFile(key)
runName := t.Gen.RunOutFile(key)
fmt.Printf("WATCH> add %s\n", ptyName)
err := t.Watcher.Add(ptyName)
if err != nil {
return err
}
if ptyOnly {
return nil
}
err = t.Watcher.Add(runName)
if err != nil {
t.Watcher.Remove(ptyName) // best effort clean up
return err
}
return nil
}
// returns (up-to-date/done, error)
func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) (bool, error) {
if err := getPacket.CK.Validate("getcmd"); err != nil {
return false, err
}
if getPacket.ReqId == "" {
return false, fmt.Errorf("getcmd, no reqid specified")
}
t.Lock.Lock()
defer t.Lock.Unlock()
key := getPacket.CK
entry, foundEntry := t.WatchList[key]
if !foundEntry {
// initialize entry, add watches
entry = CmdWatchEntry{CmdKey: key}
entry.fillFilePos(t.Gen)
}
pos, foundPos := entry.getTailPos(getPacket.ReqId)
if !foundPos {
// initialize a new tailpos
pos = TailPos{ReqId: getPacket.ReqId}
}
// update tailpos with new values from getpacket
pos.TailPtyPos = getPacket.PtyPos
pos.TailRunPos = getPacket.RunPos
pos.Follow = getPacket.Tail
// convert negative pos to positive
if pos.TailPtyPos < 0 {
pos.TailPtyPos = max(0, entry.FilePtyLen+pos.TailPtyPos) // + because negative
}
if pos.TailRunPos < 0 {
pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative
}
entry.updateTailPos(pos.ReqId, pos)
if !pos.Follow && pos.IsCurrent(entry) {
// don't add to t.WatchList, don't t.AddFileWatches_nolock, send rpc response
return true, nil
}
if !foundEntry {
err := t.AddFileWatches_nolock(key, getPacket.PtyOnly)
if err != nil {
return false, err
}
}
t.WatchList[key] = entry
t.tryStartRun_nolock(entry, pos)
return false, nil
}

View File

@ -43,12 +43,10 @@ const (
DataEndPacketStr = "dataend"
ResponsePacketStr = "resp" // rpc-response
DonePacketStr = "done"
CmdErrorPacketStr = "cmderror" // command
MessagePacketStr = "message"
GetCmdPacketStr = "getcmd" // rpc
UntailCmdPacketStr = "untailcmd" // rpc
CdPacketStr = "cd" // rpc
CmdDataPacketStr = "cmddata" // rpc-response
RawPacketStr = "raw"
SpecialInputPacketStr = "sinput" // command
CompGenPacketStr = "compgen" // rpc
@ -90,7 +88,6 @@ func init() {
TypeStrToFactory[PingPacketStr] = reflect.TypeOf(PingPacketType{})
TypeStrToFactory[ResponsePacketStr] = reflect.TypeOf(ResponsePacketType{})
TypeStrToFactory[DonePacketStr] = reflect.TypeOf(DonePacketType{})
TypeStrToFactory[CmdErrorPacketStr] = reflect.TypeOf(CmdErrorPacketType{})
TypeStrToFactory[MessagePacketStr] = reflect.TypeOf(MessagePacketType{})
TypeStrToFactory[CmdStartPacketStr] = reflect.TypeOf(CmdStartPacketType{})
TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{})
@ -98,7 +95,6 @@ func init() {
TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{})
TypeStrToFactory[InitPacketStr] = reflect.TypeOf(InitPacketType{})
TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{})
TypeStrToFactory[CmdDataPacketStr] = reflect.TypeOf(CmdDataPacketType{})
TypeStrToFactory[RawPacketStr] = reflect.TypeOf(RawPacketType{})
TypeStrToFactory[SpecialInputPacketStr] = reflect.TypeOf(SpecialInputPacketType{})
TypeStrToFactory[DataPacketStr] = reflect.TypeOf(DataPacketType{})
@ -128,7 +124,6 @@ func init() {
var _ RpcResponsePacketType = (*CmdStartPacketType)(nil)
var _ RpcResponsePacketType = (*ResponsePacketType)(nil)
var _ RpcResponsePacketType = (*CmdDataPacketType)(nil)
var _ RpcResponsePacketType = (*StreamFileResponseType)(nil)
var _ RpcResponsePacketType = (*FileDataPacketType)(nil)
var _ RpcResponsePacketType = (*WriteFileReadyPacketType)(nil)
@ -155,36 +150,6 @@ func MakePacket(packetType string) (PacketType, error) {
return rtn.Interface().(PacketType), nil
}
type CmdDataPacketType struct {
Type string `json:"type"`
RespId string `json:"respid"`
CK base.CommandKey `json:"ck"`
PtyPos int64 `json:"ptypos"`
PtyLen int64 `json:"ptylen"`
RunPos int64 `json:"runpos"`
RunLen int64 `json:"runlen"`
PtyData64 string `json:"ptydata64"`
PtyDataLen int `json:"ptydatalen"`
RunData64 string `json:"rundata64"`
RunDataLen int `json:"rundatalen"`
}
func (*CmdDataPacketType) GetType() string {
return CmdDataPacketStr
}
func (p *CmdDataPacketType) GetResponseId() string {
return p.RespId
}
func (*CmdDataPacketType) GetResponseDone() bool {
return false
}
func MakeCmdDataPacket(reqId string) *CmdDataPacketType {
return &CmdDataPacketType{Type: CmdDataPacketStr, RespId: reqId}
}
type PingPacketType struct {
Type string `json:"type"`
}
@ -830,28 +795,6 @@ type BarePacketType struct {
Type string `json:"type"`
}
type CmdErrorPacketType struct {
Type string `json:"type"`
CK base.CommandKey `json:"ck"`
Error string `json:"error"`
}
func (*CmdErrorPacketType) GetType() string {
return CmdErrorPacketStr
}
func (p *CmdErrorPacketType) GetCK() base.CommandKey {
return p.CK
}
func (p *CmdErrorPacketType) String() string {
return fmt.Sprintf("error[%s]", p.Error)
}
func MakeCmdErrorPacket(ck base.CommandKey, err error) *CmdErrorPacketType {
return &CmdErrorPacketType{Type: CmdErrorPacketStr, CK: ck, Error: err.Error()}
}
type WriteFilePacketType struct {
Type string `json:"type"`
ReqId string `json:"reqid"`
@ -1074,10 +1017,6 @@ func SendPacket(w io.Writer, packet PacketType) error {
return nil
}
func SendCmdError(w io.Writer, ck base.CommandKey, err error) error {
return SendPacket(w, MakeCmdErrorPacket(ck, err))
}
type PacketSender struct {
Lock *sync.Mutex
SendCh chan PacketType
@ -1197,10 +1136,6 @@ func (sender *PacketSender) SendPacket(pk PacketType) error {
return nil
}
func (sender *PacketSender) SendCmdError(ck base.CommandKey, err error) error {
return sender.SendPacket(MakeCmdErrorPacket(ck, err))
}
func (sender *PacketSender) SendErrorResponse(reqId string, err error) error {
pk := MakeErrorResponsePacket(reqId, err)
return sender.SendPacket(pk)
@ -1222,17 +1157,13 @@ type UnknownPacketReporter interface {
type DefaultUPR struct{}
func (DefaultUPR) UnknownPacket(pk PacketType) {
if pk.GetType() == CmdErrorPacketStr {
errPacket := pk.(*CmdErrorPacketType)
// at this point, just send the error packet to stderr rather than try to do something special
fmt.Fprintf(os.Stderr, "[error] %s\n", errPacket.Error)
} else if pk.GetType() == RawPacketStr {
if pk.GetType() == RawPacketStr {
rawPacket := pk.(*RawPacketType)
fmt.Fprintf(os.Stderr, "%s\n", rawPacket.Data)
} else if pk.GetType() == CmdStartPacketStr {
return // do nothing
} else {
fmt.Fprintf(os.Stderr, "[error] invalid packet received '%s'", AsExtType(pk))
wlog.Logf("[upr] invalid packet received '%s'", AsExtType(pk))
}
}

View File

@ -151,7 +151,7 @@ func (m *MServer) ProcessCommandPacket(pk packet.CommandPacketType) {
cproc := m.ClientMap[ck]
m.Lock.Unlock()
if cproc == nil {
m.Sender.SendCmdError(ck, fmt.Errorf("no client proc for ck '%s', pk=%s", ck, packet.AsString(pk)))
wlog.Logf("no client proc for ck %q, pk=%s", ck, packet.AsString(pk))
return
}
cproc.Input.SendPacket(pk)

View File

@ -1069,106 +1069,6 @@ func copyToCirFile(dest *cirfile.File, src io.Reader) error {
}
}
func (cmd *ShExecType) DetachedWait(startPacket *packet.CmdStartPacketType) {
// after Start(), any output/errors must go to DetachedOutput
// close stdin, redirect stdout/stderr to /dev/null, but wait for cmdstart packet to get sent
cmd.DetachedOutput.SendPacket(startPacket)
err := os.Stdin.Close()
if err != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot close stdin: %w", err))
}
err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stdout.Fd()))
if err != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err))
}
err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stderr.Fd()))
if err != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err))
}
ptyOutFile, err := cirfile.CreateCirFile(cmd.FileNames.PtyOutFile, cmd.MaxPtySize)
if err != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot open ptyout file '%s': %w", cmd.FileNames.PtyOutFile, err))
// don't return (command is already running)
}
ptyCopyDone := make(chan bool)
go func() {
// copy pty output to .ptyout file
defer close(ptyCopyDone)
defer ptyOutFile.Close()
copyErr := copyToCirFile(ptyOutFile, cmd.CmdPty)
if copyErr != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("copying pty output to ptyout file: %w", copyErr))
}
}()
go func() {
// copy .stdin fifo contents to pty input
copyFifoErr := MakeAndCopyStdinFifo(cmd.CmdPty, cmd.FileNames.StdinFifo)
if copyFifoErr != nil {
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("reading from stdin fifo: %w", copyFifoErr))
}
}()
donePacket := cmd.WaitForCommand()
cmd.DetachedOutput.SendPacket(donePacket)
<-ptyCopyDone
cmd.Close()
}
func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) {
sapi, err := shellapi.MakeShellApi(pk.ShellType)
if err != nil {
return nil, nil, err
}
fileNames, err := base.GetCommandFileNames(pk.CK)
if err != nil {
return nil, nil, err
}
runOutInfo, err := os.Stat(fileNames.RunnerOutFile)
if err == nil { // non-nil error will be caught by regular OpenFile below
// must have size 0
if runOutInfo.Size() != 0 {
return nil, nil, fmt.Errorf("cmdkey '%s' was already used (runout len=%d)", pk.CK, runOutInfo.Size())
}
}
cmdPty, cmdTty, err := pty.Open()
if err != nil {
return nil, nil, fmt.Errorf("opening new pty: %w", err)
}
pty.Setsize(cmdPty, GetWinsize(pk))
defer func() {
cmdTty.Close()
}()
cmd := MakeShExec(pk.CK, nil, sapi)
cmd.FileNames = fileNames
cmd.CmdPty = cmdPty
cmd.Detached = true
cmd.MaxPtySize = DefaultMaxPtySize
if pk.TermOpts != nil && pk.TermOpts.MaxPtySize > 0 {
cmd.MaxPtySize = base.BoundInt64(pk.TermOpts.MaxPtySize, MinMaxPtySize, MaxMaxPtySize)
}
cmd.RunnerOutFd, err = os.OpenFile(fileNames.RunnerOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return nil, nil, fmt.Errorf("cannot open runout file '%s': %w", fileNames.RunnerOutFile, err)
}
cmd.DetachedOutput = packet.MakePacketSender(cmd.RunnerOutFd, nil)
ecmd, err := MakeDetachedExecCmd(pk, cmdTty)
if err != nil {
return nil, nil, err
}
cmd.Cmd = ecmd
SetupSignalsForDetach()
err = ecmd.Start()
if err != nil {
return nil, nil, fmt.Errorf("starting command: %w", err)
}
for _, fd := range ecmd.ExtraFiles {
if fd != cmdTty {
fd.Close()
}
}
startPacket := cmd.MakeCmdStartPacket(pk.ReqId)
return cmd, startPacket, nil
}
func GetExitCode(err error) int {
if err == nil {
return 0

View File

@ -496,7 +496,7 @@ func getEvalDepth(ctx context.Context) int {
func SyncCommand(ctx context.Context, pk *scpacket.FeCommandPacketType) (scbus.UpdatePacket, error) {
ids, err := resolveUiIds(ctx, pk, R_Session|R_Screen|R_RemoteConnected)
if err != nil {
return nil, fmt.Errorf("/run error: %w", err)
return nil, fmt.Errorf("/sync error: %w", err)
}
runPacket := packet.MakeRunPacket()
runPacket.ReqId = uuid.New().String()
@ -513,22 +513,21 @@ func SyncCommand(ctx context.Context, pk *scpacket.FeCommandPacketType) (scbus.U
SessionId: ids.SessionId,
ScreenId: ids.ScreenId,
RemotePtr: ids.Remote.RemotePtr,
Ephemeral: true,
}
cmd, callback, err := remote.RunCommand(ctx, rcOpts, runPacket)
_, callback, err := remote.RunCommand(ctx, rcOpts, runPacket)
if callback != nil {
defer callback()
}
if err != nil {
return nil, err
}
cmd.RawCmdStr = pk.GetRawStr()
update, err := addLineForCmd(ctx, "/sync", true, ids, cmd, "terminal", nil)
if err != nil {
return nil, err
}
update.AddUpdate(sstore.InteractiveUpdate(pk.Interactive))
scbus.MainUpdateBus.DoScreenUpdate(ids.ScreenId, update)
return nil, nil
update := scbus.MakeUpdatePacket()
update.AddUpdate(sstore.InfoMsgType{
InfoMsg: "syncing state",
TimeoutMs: 2000,
})
return update, nil
}
func getRendererArg(pk *scpacket.FeCommandPacketType) (string, error) {
@ -1178,7 +1177,8 @@ func deferWriteCmdStatus(ctx context.Context, cmd *sstore.CmdType, startTime tim
donePk.Ts = time.Now().UnixMilli()
donePk.ExitCode = exitCode
donePk.DurationMs = duration.Milliseconds()
update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus)
update := scbus.MakeUpdatePacket()
err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus)
if err != nil {
// nothing to do
log.Printf("error updating cmddoneinfo (in openai): %v\n", err)
@ -2554,7 +2554,8 @@ func doOpenAICompletion(cmd *sstore.CmdType, opts *sstore.OpenAIOptsType, prompt
donePk.Ts = time.Now().UnixMilli()
donePk.ExitCode = exitCode
donePk.DurationMs = duration.Milliseconds()
update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus)
update := scbus.MakeUpdatePacket()
err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus)
if err != nil {
// nothing to do
log.Printf("error updating cmddoneinfo (in openai): %v\n", err)
@ -2713,7 +2714,8 @@ func doOpenAIStreamCompletion(cmd *sstore.CmdType, clientId string, opts *sstore
donePk.Ts = time.Now().UnixMilli()
donePk.ExitCode = exitCode
donePk.DurationMs = duration.Milliseconds()
update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus)
update := scbus.MakeUpdatePacket()
err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus)
if err != nil {
// nothing to do
log.Printf("error updating cmddoneinfo (in openai): %v\n", err)

View File

@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@ -160,17 +161,20 @@ type MShellProc struct {
InstallCancelFn context.CancelFunc
InstallErr error
RunningCmds map[base.CommandKey]RunCmdType
RunningCmds map[base.CommandKey]*RunCmdType
PendingStateCmds map[pendingStateKey]base.CommandKey // key=[remoteinstance name]
launcher Launcher // for conditional launch method based on ssh library in use. remove once ssh library is stabilized
Client *ssh.Client
}
type RunCmdType struct {
CK base.CommandKey
SessionId string
ScreenId string
RemotePtr sstore.RemotePtrType
RunPacket *packet.RunPacketType
Ephemeral bool
EphCancled atomic.Bool // only for Ephemeral commands, if true, then the command result should be discarded
}
type RemoteRuntimeState = sstore.RemoteRuntimeState
@ -704,7 +708,7 @@ func MakeMShell(r *sstore.RemoteType) *MShellProc {
Status: StatusDisconnected,
PtyBuffer: buf,
InstallStatus: StatusDisconnected,
RunningCmds: make(map[base.CommandKey]RunCmdType),
RunningCmds: make(map[base.CommandKey]*RunCmdType),
PendingStateCmds: make(map[pendingStateKey]base.CommandKey),
StateMap: server.MakeShellStateMap(),
launcher: LegacyLauncher{}, // for conditional launch method based on ssh library in use. remove once ssh library is stabilized
@ -1928,6 +1932,8 @@ func makeTermOpts(runPk *packet.RunPacketType) sstore.TermOpts {
}
// returns (ok, currentPSC)
// if ok is true, currentPSC will be nil
// if ok is false, currentPSC will be the existing pending state command (not nil)
func (msh *MShellProc) testAndSetPendingStateCmd(screenId string, rptr sstore.RemotePtrType, newCK *base.CommandKey) (bool, *base.CommandKey) {
key := pendingStateKey{ScreenId: screenId, RemotePtr: rptr}
msh.Lock.Lock()
@ -1986,6 +1992,10 @@ type RunCommandOpts struct {
// set to true to skip creating the pty file (for restarted commands)
NoCreateCmdPtyFile bool
// this command will not go into the DB, and will not have a ptyout file created
// forces special packet handling (sets RunCommandType.Ephemeral)
Ephemeral bool
}
// returns (CmdType, allow-updates-callback, err)
@ -2022,14 +2032,14 @@ func RunCommand(ctx context.Context, rcOpts RunCommandOpts, runPacket *packet.Ru
}
ok, existingPSC := msh.testAndSetPendingStateCmd(screenId, remotePtr, newPSC)
if !ok {
rct := msh.GetRunningCmd(*existingPSC)
if rct.Ephemeral {
// if the existing command is ephemeral, we cancel it and continue
rct.EphCancled.Store(true)
} else {
line, _, err := sstore.GetLineCmdByLineId(ctx, screenId, existingPSC.GetCmdId())
if err != nil {
return nil, nil, fmt.Errorf("cannot run command while a stateful command is still running: %v", err)
return nil, nil, makePSCLineError(*existingPSC, line, err)
}
if line == nil {
return nil, nil, fmt.Errorf("cannot run command while a stateful command is still running %s", *existingPSC)
}
return nil, nil, fmt.Errorf("cannot run command while a stateful command (linenum=%d) is still running", line.LineNum)
}
if newPSC != nil {
defer func() {
@ -2121,24 +2131,37 @@ func RunCommand(ctx context.Context, rcOpts RunCommandOpts, runPacket *packet.Ru
RunOut: nil,
RtnState: runPacket.ReturnState,
}
if !rcOpts.NoCreateCmdPtyFile {
if !rcOpts.NoCreateCmdPtyFile && !rcOpts.Ephemeral {
err = sstore.CreateCmdPtyFile(ctx, cmd.ScreenId, cmd.LineId, cmd.TermOpts.MaxPtySize)
if err != nil {
// TODO the cmd is running, so this is a tricky error to handle
return nil, nil, fmt.Errorf("cannot create local ptyout file for running command: %v", err)
}
}
msh.AddRunningCmd(RunCmdType{
msh.AddRunningCmd(&RunCmdType{
CK: runPacket.CK,
SessionId: sessionId,
ScreenId: screenId,
RemotePtr: remotePtr,
RunPacket: runPacket,
Ephemeral: rcOpts.Ephemeral,
})
return cmd, func() { removeCmdWait(runPacket.CK) }, nil
}
func (msh *MShellProc) AddRunningCmd(rct RunCmdType) {
// helper func to construct the proper error given what information we have
func makePSCLineError(existingPSC base.CommandKey, line *sstore.LineType, lineErr error) error {
if lineErr != nil {
return fmt.Errorf("cannot run command while a stateful command is still running: %v", lineErr)
}
if line == nil {
return fmt.Errorf("cannot run command while a stateful command is still running %s", existingPSC)
}
return fmt.Errorf("cannot run command while a stateful command (linenum=%d) is still running", line.LineNum)
}
func (msh *MShellProc) AddRunningCmd(rct *RunCmdType) {
msh.Lock.Lock()
defer msh.Lock.Unlock()
msh.RunningCmds[rct.RunPacket.CK] = rct
@ -2147,11 +2170,7 @@ func (msh *MShellProc) AddRunningCmd(rct RunCmdType) {
func (msh *MShellProc) GetRunningCmd(ck base.CommandKey) *RunCmdType {
msh.Lock.Lock()
defer msh.Lock.Unlock()
rct, found := msh.RunningCmds[ck]
if !found {
return nil
}
return &rct
return msh.RunningCmds[ck]
}
func (msh *MShellProc) RemoveRunningCmd(ck base.CommandKey) {
@ -2241,22 +2260,54 @@ func (msh *MShellProc) notifyHangups_nolock() {
scbus.MainUpdateBus.DoScreenUpdate(ck.GetGroupId(), update)
go pushNumRunningCmdsUpdate(&ck, -1)
}
msh.RunningCmds = make(map[base.CommandKey]RunCmdType)
msh.RunningCmds = make(map[base.CommandKey]*RunCmdType)
msh.PendingStateCmds = make(map[pendingStateKey]base.CommandKey)
}
func (msh *MShellProc) handleCmdDonePacket(donePk *packet.CmdDonePacketType) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
// this will remove from RunningCmds and from PendingStateCmds
defer msh.RemoveRunningCmd(donePk.CK)
// either fullstate or statediff will be set (not both) <- this is so the result is compatible with the sstore.UpdateRemoteState function
// note that this function *does* touch the DB, if FinalStateDiff is set, will ensure that StateBase is written to DB
func (msh *MShellProc) makeStatePtrFromFinalState(ctx context.Context, donePk *packet.CmdDonePacketType) (*sstore.ShellStatePtr, map[string]string, *packet.ShellState, *packet.ShellStateDiff, error) {
if donePk.FinalState != nil {
donePk.FinalState = stripScVarsFromState(donePk.FinalState)
finalState := stripScVarsFromState(donePk.FinalState)
feState := sstore.FeStateFromShellState(finalState)
statePtr := &sstore.ShellStatePtr{BaseHash: finalState.GetHashVal(false)}
return statePtr, feState, finalState, nil, nil
}
if donePk.FinalStateDiff != nil {
donePk.FinalStateDiff = stripScVarsFromStateDiff(donePk.FinalStateDiff)
stateDiff := stripScVarsFromStateDiff(donePk.FinalStateDiff)
feState, err := msh.getFeStateFromDiff(stateDiff)
if err != nil {
return nil, nil, nil, nil, err
}
update, err := sstore.UpdateCmdDoneInfo(ctx, donePk.CK, donePk, sstore.CmdStatusDone)
fullState := msh.StateMap.GetStateByHash(stateDiff.GetShellType(), stateDiff.BaseHash)
if fullState != nil {
sstore.StoreStateBase(ctx, fullState)
}
diffHashArr := append(([]string)(nil), donePk.FinalStateDiff.DiffHashArr...)
diffHashArr = append(diffHashArr, donePk.FinalStateDiff.GetHashVal(false))
statePtr := &sstore.ShellStatePtr{BaseHash: donePk.FinalStateDiff.BaseHash, DiffHashArr: diffHashArr}
return statePtr, feState, nil, stateDiff, nil
}
return nil, nil, nil, nil, nil
}
func (msh *MShellProc) handleCmdDonePacket(rct *RunCmdType, donePk *packet.CmdDonePacketType) {
if rct == nil {
log.Printf("cmddone packet received, but no running command found for it %q\n", donePk.CK)
return
}
// this will remove from RunningCmds and from PendingStateCmds
defer msh.RemoveRunningCmd(donePk.CK)
if rct.Ephemeral && rct.EphCancled.Load() {
// do nothing when an ephemeral command is canceled
return
}
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
update := scbus.MakeUpdatePacket()
if !rct.Ephemeral {
// only update DB for non-ephemeral commands
err := sstore.UpdateCmdDoneInfo(ctx, update, donePk.CK, donePk, sstore.CmdStatusDone)
if err != nil {
msh.WriteToPtyBuffer("*error updating cmddone: %v\n", err)
return
@ -2269,11 +2320,14 @@ func (msh *MShellProc) handleCmdDonePacket(donePk *packet.CmdDonePacketType) {
if screen != nil {
update.AddUpdate(*screen)
}
rct := msh.GetRunningCmd(donePk.CK)
var statePtr *sstore.ShellStatePtr
if donePk.FinalState != nil && rct != nil {
feState := sstore.FeStateFromShellState(donePk.FinalState)
remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, donePk.FinalState, nil)
}
// ephemeral commands *do* update the remote state
if donePk.FinalState != nil || donePk.FinalStateDiff != nil {
statePtr, feState, finalState, finalStateDiff, err := msh.makeStatePtrFromFinalState(ctx, donePk)
if err != nil {
msh.WriteToPtyBuffer("*error trying to read final command state: %v\n", err)
}
remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, finalState, finalStateDiff)
if err != nil {
msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err)
// fall-through (nothing to do)
@ -2281,43 +2335,28 @@ func (msh *MShellProc) handleCmdDonePacket(donePk *packet.CmdDonePacketType) {
if remoteInst != nil {
update.AddUpdate(sstore.MakeSessionUpdateForRemote(rct.SessionId, remoteInst))
}
statePtr = &sstore.ShellStatePtr{BaseHash: donePk.FinalState.GetHashVal(false)}
} else if donePk.FinalStateDiff != nil && rct != nil {
feState, err := msh.getFeStateFromDiff(donePk.FinalStateDiff)
if err != nil {
msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err)
// fall-through (nothing to do)
} else {
stateDiff := donePk.FinalStateDiff
fullState := msh.StateMap.GetStateByHash(stateDiff.GetShellType(), stateDiff.BaseHash)
if fullState != nil {
sstore.StoreStateBase(ctx, fullState)
}
remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, nil, stateDiff)
if err != nil {
msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err)
// fall-through (nothing to do)
}
if remoteInst != nil {
update.AddUpdate(sstore.MakeSessionUpdateForRemote(rct.SessionId, remoteInst))
}
diffHashArr := append(([]string)(nil), donePk.FinalStateDiff.DiffHashArr...)
diffHashArr = append(diffHashArr, donePk.FinalStateDiff.GetHashVal(false))
statePtr = &sstore.ShellStatePtr{BaseHash: donePk.FinalStateDiff.BaseHash, DiffHashArr: diffHashArr}
}
}
if statePtr != nil {
// ephemeral commands *do not* update cmd state (there is no command)
if statePtr != nil && !rct.Ephemeral {
err = sstore.UpdateCmdRtnState(ctx, donePk.CK, *statePtr)
if err != nil {
msh.WriteToPtyBuffer("*error trying to update cmd rtnstate: %v\n", err)
// fall-through (nothing to do)
}
}
}
scbus.MainUpdateBus.DoUpdate(update)
}
func (msh *MShellProc) handleCmdFinalPacket(finalPk *packet.CmdFinalPacketType) {
func (msh *MShellProc) handleCmdFinalPacket(rct *RunCmdType, finalPk *packet.CmdFinalPacketType) {
if rct == nil {
// this is somewhat expected, since cmddone should have removed the running command
return
}
defer msh.RemoveRunningCmd(finalPk.CK)
if rct.Ephemeral {
// just remove the running command, but there is no DB state to update in this case
return
}
rtnCmd, err := sstore.GetCmdByScreenId(context.Background(), finalPk.CK.GetGroupId(), finalPk.CK.GetCmdId())
if err != nil {
log.Printf("error calling GetCmdById in handleCmdFinalPacket: %v\n", err)
@ -2350,31 +2389,31 @@ func (msh *MShellProc) handleCmdFinalPacket(finalPk *packet.CmdFinalPacketType)
scbus.MainUpdateBus.DoUpdate(update)
}
// TODO notify FE about cmd errors
func (msh *MShellProc) handleCmdErrorPacket(errPk *packet.CmdErrorPacketType) {
err := sstore.AppendCmdErrorPk(context.Background(), errPk)
if err != nil {
msh.WriteToPtyBuffer("cmderr> [remote %s] [error] adding cmderr: %v\n", msh.GetRemoteName(), err)
return
}
}
func (msh *MShellProc) ResetDataPos(ck base.CommandKey) {
msh.DataPosMap.Delete(ck)
}
func (msh *MShellProc) handleDataPacket(dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) {
func (msh *MShellProc) handleDataPacket(rct *RunCmdType, dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) {
if rct == nil {
ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, fmt.Errorf("no running cmd found"))
msh.ServerProc.Input.SendPacket(ack)
return
}
realData, err := base64.StdEncoding.DecodeString(dataPk.Data64)
if err != nil {
ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, err)
msh.ServerProc.Input.SendPacket(ack)
return
}
if rct.Ephemeral {
ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, len(realData), nil)
msh.ServerProc.Input.SendPacket(ack)
return
}
var ack *packet.DataAckPacketType
if len(realData) > 0 {
dataPos := dataPosMap.Get(dataPk.CK)
rcmd := msh.GetRunningCmd(dataPk.CK)
update, err := sstore.AppendToCmdPtyBlob(context.Background(), rcmd.ScreenId, dataPk.CK.GetCmdId(), realData, dataPos)
update, err := sstore.AppendToCmdPtyBlob(context.Background(), rct.ScreenId, dataPk.CK.GetCmdId(), realData, dataPos)
if err != nil {
ack = makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, err)
} else {
@ -2388,25 +2427,6 @@ func (msh *MShellProc) handleDataPacket(dataPk *packet.DataPacketType, dataPosMa
if ack != nil {
msh.ServerProc.Input.SendPacket(ack)
}
// log.Printf("data %s fd=%d len=%d eof=%v err=%v\n", dataPk.CK, dataPk.FdNum, len(realData), dataPk.Eof, dataPk.Error)
}
func (msh *MShellProc) makeHandleDataPacketClosure(dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) func() {
return func() {
msh.handleDataPacket(dataPk, dataPosMap)
}
}
func (msh *MShellProc) makeHandleCmdDonePacketClosure(donePk *packet.CmdDonePacketType) func() {
return func() {
msh.handleCmdDonePacket(donePk)
}
}
func (msh *MShellProc) makeHandleCmdFinalPacketClosure(finalPk *packet.CmdFinalPacketType) func() {
return func() {
msh.handleCmdFinalPacket(finalPk)
}
}
func sendScreenUpdates(screens []*sstore.ScreenType) {
@ -2417,6 +2437,45 @@ func sendScreenUpdates(screens []*sstore.ScreenType) {
}
}
func (msh *MShellProc) processSinglePacket(pk packet.PacketType) {
if _, ok := pk.(*packet.DataAckPacketType); ok {
// TODO process ack (need to keep track of buffer size for sending)
// this is low priority though since most input is coming from keyboard and won't overflow this buffer
return
}
if dataPk, ok := pk.(*packet.DataPacketType); ok {
runCmdUpdateFn(dataPk.CK, func() {
rct := msh.GetRunningCmd(dataPk.CK)
msh.handleDataPacket(rct, dataPk, msh.DataPosMap)
})
go pushStatusIndicatorUpdate(&dataPk.CK, sstore.StatusIndicatorLevel_Output)
return
}
if donePk, ok := pk.(*packet.CmdDonePacketType); ok {
runCmdUpdateFn(donePk.CK, func() {
rct := msh.GetRunningCmd(donePk.CK)
msh.handleCmdDonePacket(rct, donePk)
})
return
}
if finalPk, ok := pk.(*packet.CmdFinalPacketType); ok {
runCmdUpdateFn(finalPk.CK, func() {
rct := msh.GetRunningCmd(finalPk.CK)
msh.handleCmdFinalPacket(rct, finalPk)
})
return
}
if msgPk, ok := pk.(*packet.MessagePacketType); ok {
msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPk.CK, msgPk.Message)
return
}
if rawPk, ok := pk.(*packet.RawPacketType); ok {
msh.WriteToPtyBuffer("stderr> [remote %s] %s\n", msh.GetRemoteName(), rawPk.Data)
return
}
msh.WriteToPtyBuffer("MSH> [remote %s] unhandled packet %s\n", msh.GetRemoteName(), packet.AsString(pk))
}
func (msh *MShellProc) ProcessPackets() {
defer msh.WithLock(func() {
if msh.Status == StatusConnected {
@ -2433,53 +2492,7 @@ func (msh *MShellProc) ProcessPackets() {
}
})
for pk := range msh.ServerProc.Output.MainCh {
if pk.GetType() == packet.DataPacketStr {
dataPk := pk.(*packet.DataPacketType)
runCmdUpdateFn(dataPk.CK, msh.makeHandleDataPacketClosure(dataPk, msh.DataPosMap))
go pushStatusIndicatorUpdate(&dataPk.CK, sstore.StatusIndicatorLevel_Output)
continue
}
if pk.GetType() == packet.DataAckPacketStr {
// TODO process ack (need to keep track of buffer size for sending)
// this is low priority though since most input is coming from keyboard and won't overflow this buffer
continue
}
if pk.GetType() == packet.CmdDataPacketStr {
dataPacket := pk.(*packet.CmdDataPacketType)
go msh.WriteToPtyBuffer("cmd-data> [remote %s] [%s] pty=%d run=%d\n", msh.GetRemoteName(), dataPacket.CK, dataPacket.PtyDataLen, dataPacket.RunDataLen)
go pushStatusIndicatorUpdate(&dataPacket.CK, sstore.StatusIndicatorLevel_Output)
continue
}
if pk.GetType() == packet.CmdDonePacketStr {
donePk := pk.(*packet.CmdDonePacketType)
runCmdUpdateFn(donePk.CK, msh.makeHandleCmdDonePacketClosure(donePk))
continue
}
if pk.GetType() == packet.CmdFinalPacketStr {
finalPk := pk.(*packet.CmdFinalPacketType)
runCmdUpdateFn(finalPk.CK, msh.makeHandleCmdFinalPacketClosure(finalPk))
continue
}
if pk.GetType() == packet.CmdErrorPacketStr {
msh.handleCmdErrorPacket(pk.(*packet.CmdErrorPacketType))
continue
}
if pk.GetType() == packet.MessagePacketStr {
msgPacket := pk.(*packet.MessagePacketType)
msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPacket.CK, msgPacket.Message)
continue
}
if pk.GetType() == packet.RawPacketStr {
rawPacket := pk.(*packet.RawPacketType)
msh.WriteToPtyBuffer("stderr> [remote %s] %s\n", msh.GetRemoteName(), rawPacket.Data)
continue
}
if pk.GetType() == packet.CmdStartPacketStr {
startPk := pk.(*packet.CmdStartPacketType)
msh.WriteToPtyBuffer("start> [remote %s] reqid=%s (%p)\n", msh.GetRemoteName(), startPk.RespId, msh.ServerProc.Output)
continue
}
msh.WriteToPtyBuffer("MSH> [remote %s] unhandled packet %s\n", msh.GetRemoteName(), packet.AsString(pk))
msh.processSinglePacket(pk)
}
}

View File

@ -916,12 +916,12 @@ func UpdateCmdForRestart(ctx context.Context, ck base.CommandKey, ts int64, cmdP
})
}
func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.CmdDonePacketType, status string) (*scbus.ModelUpdatePacketType, error) {
func UpdateCmdDoneInfo(ctx context.Context, update *scbus.ModelUpdatePacketType, ck base.CommandKey, donePk *packet.CmdDonePacketType, status string) error {
if donePk == nil {
return nil, fmt.Errorf("invalid cmddone packet")
return fmt.Errorf("invalid cmddone packet")
}
if ck.IsEmpty() {
return nil, fmt.Errorf("cannot update cmddoneinfo, empty ck")
return fmt.Errorf("cannot update cmddoneinfo, empty ck")
}
screenId := ck.GetGroupId()
var rtnCmd *CmdType
@ -944,15 +944,12 @@ func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.C
return nil
})
if txErr != nil {
return nil, txErr
return txErr
}
if rtnCmd == nil {
return nil, fmt.Errorf("cmd data not found for ck[%s]", ck)
return fmt.Errorf("cmd data not found for ck[%s]", ck)
}
update := scbus.MakeUpdatePacket()
update.AddUpdate(*rtnCmd)
// Update in-memory screen indicator status
var indicator StatusIndicatorLevel
if rtnCmd.ExitCode == 0 {
@ -960,15 +957,13 @@ func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.C
} else {
indicator = StatusIndicatorLevel_Error
}
err := SetStatusIndicatorLevel_Update(ctx, update, screenId, indicator, false)
if err != nil {
// This is not a fatal error, so just log it
log.Printf("error setting status indicator level after done packet: %v\n", err)
}
IncrementNumRunningCmds_Update(update, screenId, -1)
return update, nil
return nil
}
func UpdateCmdRtnState(ctx context.Context, ck base.CommandKey, statePtr ShellStatePtr) error {
@ -991,18 +986,6 @@ func UpdateCmdRtnState(ctx context.Context, ck base.CommandKey, statePtr ShellSt
return nil
}
func AppendCmdErrorPk(ctx context.Context, errPk *packet.CmdErrorPacketType) error {
if errPk == nil || errPk.CK.IsEmpty() {
return fmt.Errorf("invalid cmderror packet (no ck)")
}
screenId := errPk.CK.GetGroupId()
return WithTx(ctx, func(tx *TxWrap) error {
query := `UPDATE cmd SET runout = json_insert(runout, '$[#]', ?) WHERE screenid = ? AND lineid = ?`
tx.Exec(query, quickJson(errPk), screenId, lineIdFromCK(errPk.CK))
return nil
})
}
func ReInitFocus(ctx context.Context) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `UPDATE screen SET focustype = 'input'`