diff --git a/src/models/commandrunner.ts b/src/models/commandrunner.ts index 98150ccee..5b57a0c4d 100644 --- a/src/models/commandrunner.ts +++ b/src/models/commandrunner.ts @@ -304,6 +304,10 @@ class CommandRunner { GlobalModel.clientSettingsViewModel.showClientSettingsView(); } + syncShellState() { + GlobalModel.submitCommand("sync", null, null, { nohist: "1" }, false); + } + historyView(params: HistorySearchParams) { let kwargs = { nohist: "1" }; kwargs["offset"] = String(params.offset); diff --git a/src/models/model.ts b/src/models/model.ts index ea0128bcf..ad65aa55f 100644 --- a/src/models/model.ts +++ b/src/models/model.ts @@ -1050,6 +1050,9 @@ class Model { this.activeMainView.set("session"); this.deactivateScreenLines(); this.ws.watchScreen(newActiveSessionId, newActiveScreenId); + setTimeout(() => { + GlobalCommandRunner.syncShellState(); + }, 100); } } else { console.warn("unknown update", genUpdate); diff --git a/waveshell/main-waveshell.go b/waveshell/main-waveshell.go index 77f1511d0..d032b194a 100644 --- a/waveshell/main-waveshell.go +++ b/waveshell/main-waveshell.go @@ -61,15 +61,7 @@ func handleSingle() { sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("run packets from server must have a CK: %v", err)) } if runPacket.Detached { - cmd, startPk, err := shexec.RunCommandDetached(runPacket, sender) - if err != nil { - sender.SendErrorResponse(runPacket.ReqId, err) - return - } - sender.SendPacket(startPk) - sender.Close() - sender.WaitForDone() - cmd.DetachedWait(startPk) + sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("detached mode not supported")) return } else { shexec.IgnoreSigPipe() diff --git a/waveshell/pkg/cmdtail/cmdtail.go b/waveshell/pkg/cmdtail/cmdtail.go deleted file mode 100644 index fed754ad4..000000000 --- a/waveshell/pkg/cmdtail/cmdtail.go +++ /dev/null @@ -1,473 +0,0 @@ -// Copyright 2023, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -package cmdtail - -import ( - "encoding/base64" - "fmt" - "io" - "os" - "regexp" - "sync" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/wavetermdev/waveterm/waveshell/pkg/base" - "github.com/wavetermdev/waveterm/waveshell/pkg/packet" -) - -const MaxDataBytes = 4096 -const FileTypePty = "ptyout" -const FileTypeRun = "runout" - -type Tailer struct { - Lock *sync.Mutex - WatchList map[base.CommandKey]CmdWatchEntry - Watcher *fsnotify.Watcher - Sender *packet.PacketSender - Gen FileNameGenerator - Sessions map[string]bool -} - -type TailPos struct { - ReqId string - Running bool // an active tailer sending data - TailPtyPos int64 - TailRunPos int64 - Follow bool -} - -type CmdWatchEntry struct { - CmdKey base.CommandKey - FilePtyLen int64 - FileRunLen int64 - Tails []TailPos - Done bool -} - -type FileNameGenerator interface { - PtyOutFile(ck base.CommandKey) string - RunOutFile(ck base.CommandKey) string - SessionDir(sessionId string) string -} - -func (w CmdWatchEntry) getTailPos(reqId string) (TailPos, bool) { - for _, pos := range w.Tails { - if pos.ReqId == reqId { - return pos, true - } - } - return TailPos{}, false -} - -func (w *CmdWatchEntry) updateTailPos(reqId string, newPos TailPos) { - for idx, pos := range w.Tails { - if pos.ReqId == reqId { - w.Tails[idx] = newPos - return - } - } - w.Tails = append(w.Tails, newPos) -} - -func (w *CmdWatchEntry) removeTailPos(reqId string) { - var newTails []TailPos - for _, pos := range w.Tails { - if pos.ReqId == reqId { - continue - } - newTails = append(newTails, pos) - } - w.Tails = newTails -} - -func (pos TailPos) IsCurrent(entry CmdWatchEntry) bool { - return pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen -} - -func (t *Tailer) updateTailPos_nolock(cmdKey base.CommandKey, reqId string, pos TailPos) { - entry, found := t.WatchList[cmdKey] - if !found { - return - } - entry.updateTailPos(reqId, pos) - t.WatchList[cmdKey] = entry -} - -func (t *Tailer) removeTailPos(cmdKey base.CommandKey, reqId string) { - t.Lock.Lock() - defer t.Lock.Unlock() - t.removeTailPos_nolock(cmdKey, reqId) -} - -func (t *Tailer) removeTailPos_nolock(cmdKey base.CommandKey, reqId string) { - entry, found := t.WatchList[cmdKey] - if !found { - return - } - entry.removeTailPos(reqId) - t.WatchList[cmdKey] = entry - if len(entry.Tails) == 0 { - t.removeWatch_nolock(cmdKey) - } -} - -func (t *Tailer) removeWatch_nolock(cmdKey base.CommandKey) { - // delete from watchlist, remove watches - delete(t.WatchList, cmdKey) - t.Watcher.Remove(t.Gen.PtyOutFile(cmdKey)) - t.Watcher.Remove(t.Gen.RunOutFile(cmdKey)) -} - -func (t *Tailer) getEntryAndPos_nolock(cmdKey base.CommandKey, reqId string) (CmdWatchEntry, TailPos, bool) { - entry, found := t.WatchList[cmdKey] - if !found { - return CmdWatchEntry{}, TailPos{}, false - } - pos, found := entry.getTailPos(reqId) - if !found { - return CmdWatchEntry{}, TailPos{}, false - } - return entry, pos, true -} - -func (t *Tailer) addSessionWatcher(sessionId string) error { - t.Lock.Lock() - defer t.Lock.Unlock() - - if t.Sessions[sessionId] { - return nil - } - sdir := t.Gen.SessionDir(sessionId) - err := t.Watcher.Add(sdir) - if err != nil { - return err - } - t.Sessions[sessionId] = true - return nil -} - -func (t *Tailer) removeSessionWatcher(sessionId string) { - t.Lock.Lock() - defer t.Lock.Unlock() - - if !t.Sessions[sessionId] { - return - } - sdir := t.Gen.SessionDir(sessionId) - t.Watcher.Remove(sdir) -} - -func MakeTailer(sender *packet.PacketSender, gen FileNameGenerator) (*Tailer, error) { - rtn := &Tailer{ - Lock: &sync.Mutex{}, - WatchList: make(map[base.CommandKey]CmdWatchEntry), - Sessions: make(map[string]bool), - Sender: sender, - Gen: gen, - } - var err error - rtn.Watcher, err = fsnotify.NewWatcher() - if err != nil { - return nil, err - } - return rtn, nil -} - -func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]byte, error) { - fd, err := os.Open(fileName) - defer fd.Close() - if err != nil { - return nil, err - } - buf := make([]byte, maxBytes) - nr, err := fd.ReadAt(buf, pos) - if err != nil && err != io.EOF { // ignore EOF error - return nil, err - } - return buf[0:nr], nil -} - -func (t *Tailer) makeCmdDataPacket(entry CmdWatchEntry, pos TailPos) (*packet.CmdDataPacketType, error) { - dataPacket := packet.MakeCmdDataPacket(pos.ReqId) - dataPacket.CK = entry.CmdKey - dataPacket.PtyPos = pos.TailPtyPos - dataPacket.RunPos = pos.TailRunPos - if entry.FilePtyLen > pos.TailPtyPos { - ptyData, err := t.readDataFromFile(t.Gen.PtyOutFile(entry.CmdKey), pos.TailPtyPos, MaxDataBytes) - if err != nil { - return nil, err - } - dataPacket.PtyData64 = base64.StdEncoding.EncodeToString(ptyData) - dataPacket.PtyDataLen = len(ptyData) - } - if entry.FileRunLen > pos.TailRunPos { - runData, err := t.readDataFromFile(t.Gen.RunOutFile(entry.CmdKey), pos.TailRunPos, MaxDataBytes) - if err != nil { - return nil, err - } - dataPacket.RunData64 = base64.StdEncoding.EncodeToString(runData) - dataPacket.RunDataLen = len(runData) - } - return dataPacket, nil -} - -// returns (data-packet, keepRunning) -func (t *Tailer) runSingleDataTransfer(key base.CommandKey, reqId string) (*packet.CmdDataPacketType, bool, error) { - t.Lock.Lock() - entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId) - t.Lock.Unlock() - if !foundPos { - return nil, false, nil - } - dataPacket, dataErr := t.makeCmdDataPacket(entry, pos) - - t.Lock.Lock() - defer t.Lock.Unlock() - entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId) - if !foundPos { - return nil, false, nil - } - // pos was updated between first and second get, throw out data-packet and re-run - if pos.TailPtyPos != dataPacket.PtyPos || pos.TailRunPos != dataPacket.RunPos { - return nil, true, nil - } - if dataErr != nil { - // error, so return error packet, and stop running - pos.Running = false - t.updateTailPos_nolock(key, reqId, pos) - return nil, false, dataErr - } - pos.TailPtyPos += int64(dataPacket.PtyDataLen) - pos.TailRunPos += int64(dataPacket.RunDataLen) - if pos.IsCurrent(entry) { - // we caught up, tail position equals file length - pos.Running = false - } - t.updateTailPos_nolock(key, reqId, pos) - return dataPacket, pos.Running, nil -} - -// returns (removed) -func (t *Tailer) checkRemove(cmdKey base.CommandKey, reqId string) bool { - t.Lock.Lock() - defer t.Lock.Unlock() - entry, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId) - if !foundPos { - return false - } - if !pos.IsCurrent(entry) { - return false - } - if !pos.Follow || entry.Done { - t.removeTailPos_nolock(cmdKey, reqId) - return true - } - return false -} - -func (t *Tailer) RunDataTransfer(key base.CommandKey, reqId string) { - for { - dataPacket, keepRunning, err := t.runSingleDataTransfer(key, reqId) - if dataPacket != nil { - t.Sender.SendPacket(dataPacket) - } - if err != nil { - t.removeTailPos(key, reqId) - t.Sender.SendErrorResponse(reqId, err) - break - } - if !keepRunning { - removed := t.checkRemove(key, reqId) - if removed { - t.Sender.SendResponse(reqId, true) - } - break - } - time.Sleep(10 * time.Millisecond) - } -} - -func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) { - if pos.Running { - return - } - if pos.IsCurrent(entry) { - return - } - pos.Running = true - t.updateTailPos_nolock(entry.CmdKey, pos.ReqId, pos) - go t.RunDataTransfer(entry.CmdKey, pos.ReqId) -} - -var updateFileRe = regexp.MustCompile("/([a-z0-9-]+)/([a-z0-9-]+)\\.(ptyout|runout)$") - -func (t *Tailer) updateFile(relFileName string) { - m := updateFileRe.FindStringSubmatch(relFileName) - if m == nil { - return - } - finfo, err := os.Stat(relFileName) - if err != nil { - t.Sender.SendPacket(packet.FmtMessagePacket("error trying to stat file '%s': %v", relFileName, err)) - return - } - cmdKey := base.MakeCommandKey(m[1], m[2]) - t.Lock.Lock() - defer t.Lock.Unlock() - entry, foundEntry := t.WatchList[cmdKey] - if !foundEntry { - return - } - fileType := m[3] - if fileType == FileTypePty { - entry.FilePtyLen = finfo.Size() - } else if fileType == FileTypeRun { - entry.FileRunLen = finfo.Size() - } - t.WatchList[cmdKey] = entry - for _, pos := range entry.Tails { - t.tryStartRun_nolock(entry, pos) - } -} - -func (t *Tailer) Run() { - for { - select { - case event, ok := <-t.Watcher.Events: - if !ok { - return - } - if event.Op&fsnotify.Write == fsnotify.Write { - t.updateFile(event.Name) - } - - case err, ok := <-t.Watcher.Errors: - if !ok { - return - } - // what to do with this error? just send a message - t.Sender.SendPacket(packet.FmtMessagePacket("error in tailer: %v", err)) - } - } -} - -func (t *Tailer) Close() error { - return t.Watcher.Close() -} - -func max(v1 int64, v2 int64) int64 { - if v1 > v2 { - return v1 - } - return v2 -} - -func (entry *CmdWatchEntry) fillFilePos(gen FileNameGenerator) { - ptyInfo, _ := os.Stat(gen.PtyOutFile(entry.CmdKey)) - if ptyInfo != nil { - entry.FilePtyLen = ptyInfo.Size() - } - runoutInfo, _ := os.Stat(gen.RunOutFile(entry.CmdKey)) - if runoutInfo != nil { - entry.FileRunLen = runoutInfo.Size() - } -} - -func (t *Tailer) KeyDone(key base.CommandKey) { - t.Lock.Lock() - defer t.Lock.Unlock() - entry, foundEntry := t.WatchList[key] - if !foundEntry { - return - } - entry.Done = true - var newTails []TailPos - for _, pos := range entry.Tails { - if pos.IsCurrent(entry) { - continue - } - newTails = append(newTails, pos) - } - entry.Tails = newTails - t.WatchList[key] = entry - if len(entry.Tails) == 0 { - t.removeWatch_nolock(key) - } - t.WatchList[key] = entry -} - -func (t *Tailer) RemoveWatch(pk *packet.UntailCmdPacketType) { - t.Lock.Lock() - defer t.Lock.Unlock() - t.removeTailPos_nolock(pk.CK, pk.ReqId) -} - -func (t *Tailer) AddFileWatches_nolock(key base.CommandKey, ptyOnly bool) error { - ptyName := t.Gen.PtyOutFile(key) - runName := t.Gen.RunOutFile(key) - fmt.Printf("WATCH> add %s\n", ptyName) - err := t.Watcher.Add(ptyName) - if err != nil { - return err - } - if ptyOnly { - return nil - } - err = t.Watcher.Add(runName) - if err != nil { - t.Watcher.Remove(ptyName) // best effort clean up - return err - } - return nil -} - -// returns (up-to-date/done, error) -func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) (bool, error) { - if err := getPacket.CK.Validate("getcmd"); err != nil { - return false, err - } - if getPacket.ReqId == "" { - return false, fmt.Errorf("getcmd, no reqid specified") - } - t.Lock.Lock() - defer t.Lock.Unlock() - key := getPacket.CK - entry, foundEntry := t.WatchList[key] - if !foundEntry { - // initialize entry, add watches - entry = CmdWatchEntry{CmdKey: key} - entry.fillFilePos(t.Gen) - } - pos, foundPos := entry.getTailPos(getPacket.ReqId) - if !foundPos { - // initialize a new tailpos - pos = TailPos{ReqId: getPacket.ReqId} - } - // update tailpos with new values from getpacket - pos.TailPtyPos = getPacket.PtyPos - pos.TailRunPos = getPacket.RunPos - pos.Follow = getPacket.Tail - // convert negative pos to positive - if pos.TailPtyPos < 0 { - pos.TailPtyPos = max(0, entry.FilePtyLen+pos.TailPtyPos) // + because negative - } - if pos.TailRunPos < 0 { - pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative - } - entry.updateTailPos(pos.ReqId, pos) - if !pos.Follow && pos.IsCurrent(entry) { - // don't add to t.WatchList, don't t.AddFileWatches_nolock, send rpc response - return true, nil - } - if !foundEntry { - err := t.AddFileWatches_nolock(key, getPacket.PtyOnly) - if err != nil { - return false, err - } - } - t.WatchList[key] = entry - t.tryStartRun_nolock(entry, pos) - return false, nil -} diff --git a/waveshell/pkg/packet/packet.go b/waveshell/pkg/packet/packet.go index 077e9e884..ff89de4d4 100644 --- a/waveshell/pkg/packet/packet.go +++ b/waveshell/pkg/packet/packet.go @@ -43,12 +43,10 @@ const ( DataEndPacketStr = "dataend" ResponsePacketStr = "resp" // rpc-response DonePacketStr = "done" - CmdErrorPacketStr = "cmderror" // command MessagePacketStr = "message" GetCmdPacketStr = "getcmd" // rpc UntailCmdPacketStr = "untailcmd" // rpc CdPacketStr = "cd" // rpc - CmdDataPacketStr = "cmddata" // rpc-response RawPacketStr = "raw" SpecialInputPacketStr = "sinput" // command CompGenPacketStr = "compgen" // rpc @@ -90,7 +88,6 @@ func init() { TypeStrToFactory[PingPacketStr] = reflect.TypeOf(PingPacketType{}) TypeStrToFactory[ResponsePacketStr] = reflect.TypeOf(ResponsePacketType{}) TypeStrToFactory[DonePacketStr] = reflect.TypeOf(DonePacketType{}) - TypeStrToFactory[CmdErrorPacketStr] = reflect.TypeOf(CmdErrorPacketType{}) TypeStrToFactory[MessagePacketStr] = reflect.TypeOf(MessagePacketType{}) TypeStrToFactory[CmdStartPacketStr] = reflect.TypeOf(CmdStartPacketType{}) TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{}) @@ -98,7 +95,6 @@ func init() { TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{}) TypeStrToFactory[InitPacketStr] = reflect.TypeOf(InitPacketType{}) TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{}) - TypeStrToFactory[CmdDataPacketStr] = reflect.TypeOf(CmdDataPacketType{}) TypeStrToFactory[RawPacketStr] = reflect.TypeOf(RawPacketType{}) TypeStrToFactory[SpecialInputPacketStr] = reflect.TypeOf(SpecialInputPacketType{}) TypeStrToFactory[DataPacketStr] = reflect.TypeOf(DataPacketType{}) @@ -128,7 +124,6 @@ func init() { var _ RpcResponsePacketType = (*CmdStartPacketType)(nil) var _ RpcResponsePacketType = (*ResponsePacketType)(nil) - var _ RpcResponsePacketType = (*CmdDataPacketType)(nil) var _ RpcResponsePacketType = (*StreamFileResponseType)(nil) var _ RpcResponsePacketType = (*FileDataPacketType)(nil) var _ RpcResponsePacketType = (*WriteFileReadyPacketType)(nil) @@ -155,36 +150,6 @@ func MakePacket(packetType string) (PacketType, error) { return rtn.Interface().(PacketType), nil } -type CmdDataPacketType struct { - Type string `json:"type"` - RespId string `json:"respid"` - CK base.CommandKey `json:"ck"` - PtyPos int64 `json:"ptypos"` - PtyLen int64 `json:"ptylen"` - RunPos int64 `json:"runpos"` - RunLen int64 `json:"runlen"` - PtyData64 string `json:"ptydata64"` - PtyDataLen int `json:"ptydatalen"` - RunData64 string `json:"rundata64"` - RunDataLen int `json:"rundatalen"` -} - -func (*CmdDataPacketType) GetType() string { - return CmdDataPacketStr -} - -func (p *CmdDataPacketType) GetResponseId() string { - return p.RespId -} - -func (*CmdDataPacketType) GetResponseDone() bool { - return false -} - -func MakeCmdDataPacket(reqId string) *CmdDataPacketType { - return &CmdDataPacketType{Type: CmdDataPacketStr, RespId: reqId} -} - type PingPacketType struct { Type string `json:"type"` } @@ -830,28 +795,6 @@ type BarePacketType struct { Type string `json:"type"` } -type CmdErrorPacketType struct { - Type string `json:"type"` - CK base.CommandKey `json:"ck"` - Error string `json:"error"` -} - -func (*CmdErrorPacketType) GetType() string { - return CmdErrorPacketStr -} - -func (p *CmdErrorPacketType) GetCK() base.CommandKey { - return p.CK -} - -func (p *CmdErrorPacketType) String() string { - return fmt.Sprintf("error[%s]", p.Error) -} - -func MakeCmdErrorPacket(ck base.CommandKey, err error) *CmdErrorPacketType { - return &CmdErrorPacketType{Type: CmdErrorPacketStr, CK: ck, Error: err.Error()} -} - type WriteFilePacketType struct { Type string `json:"type"` ReqId string `json:"reqid"` @@ -1074,10 +1017,6 @@ func SendPacket(w io.Writer, packet PacketType) error { return nil } -func SendCmdError(w io.Writer, ck base.CommandKey, err error) error { - return SendPacket(w, MakeCmdErrorPacket(ck, err)) -} - type PacketSender struct { Lock *sync.Mutex SendCh chan PacketType @@ -1197,10 +1136,6 @@ func (sender *PacketSender) SendPacket(pk PacketType) error { return nil } -func (sender *PacketSender) SendCmdError(ck base.CommandKey, err error) error { - return sender.SendPacket(MakeCmdErrorPacket(ck, err)) -} - func (sender *PacketSender) SendErrorResponse(reqId string, err error) error { pk := MakeErrorResponsePacket(reqId, err) return sender.SendPacket(pk) @@ -1222,17 +1157,13 @@ type UnknownPacketReporter interface { type DefaultUPR struct{} func (DefaultUPR) UnknownPacket(pk PacketType) { - if pk.GetType() == CmdErrorPacketStr { - errPacket := pk.(*CmdErrorPacketType) - // at this point, just send the error packet to stderr rather than try to do something special - fmt.Fprintf(os.Stderr, "[error] %s\n", errPacket.Error) - } else if pk.GetType() == RawPacketStr { + if pk.GetType() == RawPacketStr { rawPacket := pk.(*RawPacketType) fmt.Fprintf(os.Stderr, "%s\n", rawPacket.Data) } else if pk.GetType() == CmdStartPacketStr { return // do nothing } else { - fmt.Fprintf(os.Stderr, "[error] invalid packet received '%s'", AsExtType(pk)) + wlog.Logf("[upr] invalid packet received '%s'", AsExtType(pk)) } } diff --git a/waveshell/pkg/server/server.go b/waveshell/pkg/server/server.go index d3729eb16..9fc098ce0 100644 --- a/waveshell/pkg/server/server.go +++ b/waveshell/pkg/server/server.go @@ -151,7 +151,7 @@ func (m *MServer) ProcessCommandPacket(pk packet.CommandPacketType) { cproc := m.ClientMap[ck] m.Lock.Unlock() if cproc == nil { - m.Sender.SendCmdError(ck, fmt.Errorf("no client proc for ck '%s', pk=%s", ck, packet.AsString(pk))) + wlog.Logf("no client proc for ck %q, pk=%s", ck, packet.AsString(pk)) return } cproc.Input.SendPacket(pk) diff --git a/waveshell/pkg/shexec/shexec.go b/waveshell/pkg/shexec/shexec.go index 93446f5d5..817692bbe 100644 --- a/waveshell/pkg/shexec/shexec.go +++ b/waveshell/pkg/shexec/shexec.go @@ -1069,106 +1069,6 @@ func copyToCirFile(dest *cirfile.File, src io.Reader) error { } } -func (cmd *ShExecType) DetachedWait(startPacket *packet.CmdStartPacketType) { - // after Start(), any output/errors must go to DetachedOutput - // close stdin, redirect stdout/stderr to /dev/null, but wait for cmdstart packet to get sent - cmd.DetachedOutput.SendPacket(startPacket) - err := os.Stdin.Close() - if err != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot close stdin: %w", err)) - } - err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stdout.Fd())) - if err != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err)) - } - err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stderr.Fd())) - if err != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err)) - } - ptyOutFile, err := cirfile.CreateCirFile(cmd.FileNames.PtyOutFile, cmd.MaxPtySize) - if err != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot open ptyout file '%s': %w", cmd.FileNames.PtyOutFile, err)) - // don't return (command is already running) - } - ptyCopyDone := make(chan bool) - go func() { - // copy pty output to .ptyout file - defer close(ptyCopyDone) - defer ptyOutFile.Close() - copyErr := copyToCirFile(ptyOutFile, cmd.CmdPty) - if copyErr != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("copying pty output to ptyout file: %w", copyErr)) - } - }() - go func() { - // copy .stdin fifo contents to pty input - copyFifoErr := MakeAndCopyStdinFifo(cmd.CmdPty, cmd.FileNames.StdinFifo) - if copyFifoErr != nil { - cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("reading from stdin fifo: %w", copyFifoErr)) - } - }() - donePacket := cmd.WaitForCommand() - cmd.DetachedOutput.SendPacket(donePacket) - <-ptyCopyDone - cmd.Close() -} - -func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) { - sapi, err := shellapi.MakeShellApi(pk.ShellType) - if err != nil { - return nil, nil, err - } - fileNames, err := base.GetCommandFileNames(pk.CK) - if err != nil { - return nil, nil, err - } - runOutInfo, err := os.Stat(fileNames.RunnerOutFile) - if err == nil { // non-nil error will be caught by regular OpenFile below - // must have size 0 - if runOutInfo.Size() != 0 { - return nil, nil, fmt.Errorf("cmdkey '%s' was already used (runout len=%d)", pk.CK, runOutInfo.Size()) - } - } - cmdPty, cmdTty, err := pty.Open() - if err != nil { - return nil, nil, fmt.Errorf("opening new pty: %w", err) - } - pty.Setsize(cmdPty, GetWinsize(pk)) - defer func() { - cmdTty.Close() - }() - cmd := MakeShExec(pk.CK, nil, sapi) - cmd.FileNames = fileNames - cmd.CmdPty = cmdPty - cmd.Detached = true - cmd.MaxPtySize = DefaultMaxPtySize - if pk.TermOpts != nil && pk.TermOpts.MaxPtySize > 0 { - cmd.MaxPtySize = base.BoundInt64(pk.TermOpts.MaxPtySize, MinMaxPtySize, MaxMaxPtySize) - } - cmd.RunnerOutFd, err = os.OpenFile(fileNames.RunnerOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) - if err != nil { - return nil, nil, fmt.Errorf("cannot open runout file '%s': %w", fileNames.RunnerOutFile, err) - } - cmd.DetachedOutput = packet.MakePacketSender(cmd.RunnerOutFd, nil) - ecmd, err := MakeDetachedExecCmd(pk, cmdTty) - if err != nil { - return nil, nil, err - } - cmd.Cmd = ecmd - SetupSignalsForDetach() - err = ecmd.Start() - if err != nil { - return nil, nil, fmt.Errorf("starting command: %w", err) - } - for _, fd := range ecmd.ExtraFiles { - if fd != cmdTty { - fd.Close() - } - } - startPacket := cmd.MakeCmdStartPacket(pk.ReqId) - return cmd, startPacket, nil -} - func GetExitCode(err error) int { if err == nil { return 0 diff --git a/wavesrv/pkg/cmdrunner/cmdrunner.go b/wavesrv/pkg/cmdrunner/cmdrunner.go index 0d674efb7..2d66c5ae2 100644 --- a/wavesrv/pkg/cmdrunner/cmdrunner.go +++ b/wavesrv/pkg/cmdrunner/cmdrunner.go @@ -496,7 +496,7 @@ func getEvalDepth(ctx context.Context) int { func SyncCommand(ctx context.Context, pk *scpacket.FeCommandPacketType) (scbus.UpdatePacket, error) { ids, err := resolveUiIds(ctx, pk, R_Session|R_Screen|R_RemoteConnected) if err != nil { - return nil, fmt.Errorf("/run error: %w", err) + return nil, fmt.Errorf("/sync error: %w", err) } runPacket := packet.MakeRunPacket() runPacket.ReqId = uuid.New().String() @@ -513,22 +513,21 @@ func SyncCommand(ctx context.Context, pk *scpacket.FeCommandPacketType) (scbus.U SessionId: ids.SessionId, ScreenId: ids.ScreenId, RemotePtr: ids.Remote.RemotePtr, + Ephemeral: true, } - cmd, callback, err := remote.RunCommand(ctx, rcOpts, runPacket) + _, callback, err := remote.RunCommand(ctx, rcOpts, runPacket) if callback != nil { defer callback() } if err != nil { return nil, err } - cmd.RawCmdStr = pk.GetRawStr() - update, err := addLineForCmd(ctx, "/sync", true, ids, cmd, "terminal", nil) - if err != nil { - return nil, err - } - update.AddUpdate(sstore.InteractiveUpdate(pk.Interactive)) - scbus.MainUpdateBus.DoScreenUpdate(ids.ScreenId, update) - return nil, nil + update := scbus.MakeUpdatePacket() + update.AddUpdate(sstore.InfoMsgType{ + InfoMsg: "syncing state", + TimeoutMs: 2000, + }) + return update, nil } func getRendererArg(pk *scpacket.FeCommandPacketType) (string, error) { @@ -1178,7 +1177,8 @@ func deferWriteCmdStatus(ctx context.Context, cmd *sstore.CmdType, startTime tim donePk.Ts = time.Now().UnixMilli() donePk.ExitCode = exitCode donePk.DurationMs = duration.Milliseconds() - update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus) + update := scbus.MakeUpdatePacket() + err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus) if err != nil { // nothing to do log.Printf("error updating cmddoneinfo (in openai): %v\n", err) @@ -2554,7 +2554,8 @@ func doOpenAICompletion(cmd *sstore.CmdType, opts *sstore.OpenAIOptsType, prompt donePk.Ts = time.Now().UnixMilli() donePk.ExitCode = exitCode donePk.DurationMs = duration.Milliseconds() - update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus) + update := scbus.MakeUpdatePacket() + err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus) if err != nil { // nothing to do log.Printf("error updating cmddoneinfo (in openai): %v\n", err) @@ -2713,7 +2714,8 @@ func doOpenAIStreamCompletion(cmd *sstore.CmdType, clientId string, opts *sstore donePk.Ts = time.Now().UnixMilli() donePk.ExitCode = exitCode donePk.DurationMs = duration.Milliseconds() - update, err := sstore.UpdateCmdDoneInfo(context.Background(), ck, donePk, cmdStatus) + update := scbus.MakeUpdatePacket() + err := sstore.UpdateCmdDoneInfo(context.Background(), update, ck, donePk, cmdStatus) if err != nil { // nothing to do log.Printf("error updating cmddoneinfo (in openai): %v\n", err) diff --git a/wavesrv/pkg/remote/remote.go b/wavesrv/pkg/remote/remote.go index 07f61dec9..3304f3412 100644 --- a/wavesrv/pkg/remote/remote.go +++ b/wavesrv/pkg/remote/remote.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -160,17 +161,20 @@ type MShellProc struct { InstallCancelFn context.CancelFunc InstallErr error - RunningCmds map[base.CommandKey]RunCmdType + RunningCmds map[base.CommandKey]*RunCmdType PendingStateCmds map[pendingStateKey]base.CommandKey // key=[remoteinstance name] launcher Launcher // for conditional launch method based on ssh library in use. remove once ssh library is stabilized Client *ssh.Client } type RunCmdType struct { - SessionId string - ScreenId string - RemotePtr sstore.RemotePtrType - RunPacket *packet.RunPacketType + CK base.CommandKey + SessionId string + ScreenId string + RemotePtr sstore.RemotePtrType + RunPacket *packet.RunPacketType + Ephemeral bool + EphCancled atomic.Bool // only for Ephemeral commands, if true, then the command result should be discarded } type RemoteRuntimeState = sstore.RemoteRuntimeState @@ -704,7 +708,7 @@ func MakeMShell(r *sstore.RemoteType) *MShellProc { Status: StatusDisconnected, PtyBuffer: buf, InstallStatus: StatusDisconnected, - RunningCmds: make(map[base.CommandKey]RunCmdType), + RunningCmds: make(map[base.CommandKey]*RunCmdType), PendingStateCmds: make(map[pendingStateKey]base.CommandKey), StateMap: server.MakeShellStateMap(), launcher: LegacyLauncher{}, // for conditional launch method based on ssh library in use. remove once ssh library is stabilized @@ -1928,6 +1932,8 @@ func makeTermOpts(runPk *packet.RunPacketType) sstore.TermOpts { } // returns (ok, currentPSC) +// if ok is true, currentPSC will be nil +// if ok is false, currentPSC will be the existing pending state command (not nil) func (msh *MShellProc) testAndSetPendingStateCmd(screenId string, rptr sstore.RemotePtrType, newCK *base.CommandKey) (bool, *base.CommandKey) { key := pendingStateKey{ScreenId: screenId, RemotePtr: rptr} msh.Lock.Lock() @@ -1986,6 +1992,10 @@ type RunCommandOpts struct { // set to true to skip creating the pty file (for restarted commands) NoCreateCmdPtyFile bool + + // this command will not go into the DB, and will not have a ptyout file created + // forces special packet handling (sets RunCommandType.Ephemeral) + Ephemeral bool } // returns (CmdType, allow-updates-callback, err) @@ -2022,14 +2032,14 @@ func RunCommand(ctx context.Context, rcOpts RunCommandOpts, runPacket *packet.Ru } ok, existingPSC := msh.testAndSetPendingStateCmd(screenId, remotePtr, newPSC) if !ok { - line, _, err := sstore.GetLineCmdByLineId(ctx, screenId, existingPSC.GetCmdId()) - if err != nil { - return nil, nil, fmt.Errorf("cannot run command while a stateful command is still running: %v", err) + rct := msh.GetRunningCmd(*existingPSC) + if rct.Ephemeral { + // if the existing command is ephemeral, we cancel it and continue + rct.EphCancled.Store(true) + } else { + line, _, err := sstore.GetLineCmdByLineId(ctx, screenId, existingPSC.GetCmdId()) + return nil, nil, makePSCLineError(*existingPSC, line, err) } - if line == nil { - return nil, nil, fmt.Errorf("cannot run command while a stateful command is still running %s", *existingPSC) - } - return nil, nil, fmt.Errorf("cannot run command while a stateful command (linenum=%d) is still running", line.LineNum) } if newPSC != nil { defer func() { @@ -2121,24 +2131,37 @@ func RunCommand(ctx context.Context, rcOpts RunCommandOpts, runPacket *packet.Ru RunOut: nil, RtnState: runPacket.ReturnState, } - if !rcOpts.NoCreateCmdPtyFile { + if !rcOpts.NoCreateCmdPtyFile && !rcOpts.Ephemeral { err = sstore.CreateCmdPtyFile(ctx, cmd.ScreenId, cmd.LineId, cmd.TermOpts.MaxPtySize) if err != nil { // TODO the cmd is running, so this is a tricky error to handle return nil, nil, fmt.Errorf("cannot create local ptyout file for running command: %v", err) } } - msh.AddRunningCmd(RunCmdType{ + msh.AddRunningCmd(&RunCmdType{ + CK: runPacket.CK, SessionId: sessionId, ScreenId: screenId, RemotePtr: remotePtr, RunPacket: runPacket, + Ephemeral: rcOpts.Ephemeral, }) return cmd, func() { removeCmdWait(runPacket.CK) }, nil } -func (msh *MShellProc) AddRunningCmd(rct RunCmdType) { +// helper func to construct the proper error given what information we have +func makePSCLineError(existingPSC base.CommandKey, line *sstore.LineType, lineErr error) error { + if lineErr != nil { + return fmt.Errorf("cannot run command while a stateful command is still running: %v", lineErr) + } + if line == nil { + return fmt.Errorf("cannot run command while a stateful command is still running %s", existingPSC) + } + return fmt.Errorf("cannot run command while a stateful command (linenum=%d) is still running", line.LineNum) +} + +func (msh *MShellProc) AddRunningCmd(rct *RunCmdType) { msh.Lock.Lock() defer msh.Lock.Unlock() msh.RunningCmds[rct.RunPacket.CK] = rct @@ -2147,11 +2170,7 @@ func (msh *MShellProc) AddRunningCmd(rct RunCmdType) { func (msh *MShellProc) GetRunningCmd(ck base.CommandKey) *RunCmdType { msh.Lock.Lock() defer msh.Lock.Unlock() - rct, found := msh.RunningCmds[ck] - if !found { - return nil - } - return &rct + return msh.RunningCmds[ck] } func (msh *MShellProc) RemoveRunningCmd(ck base.CommandKey) { @@ -2241,39 +2260,74 @@ func (msh *MShellProc) notifyHangups_nolock() { scbus.MainUpdateBus.DoScreenUpdate(ck.GetGroupId(), update) go pushNumRunningCmdsUpdate(&ck, -1) } - msh.RunningCmds = make(map[base.CommandKey]RunCmdType) + msh.RunningCmds = make(map[base.CommandKey]*RunCmdType) msh.PendingStateCmds = make(map[pendingStateKey]base.CommandKey) } -func (msh *MShellProc) handleCmdDonePacket(donePk *packet.CmdDonePacketType) { - ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) - defer cancelFn() - // this will remove from RunningCmds and from PendingStateCmds - defer msh.RemoveRunningCmd(donePk.CK) +// either fullstate or statediff will be set (not both) <- this is so the result is compatible with the sstore.UpdateRemoteState function +// note that this function *does* touch the DB, if FinalStateDiff is set, will ensure that StateBase is written to DB +func (msh *MShellProc) makeStatePtrFromFinalState(ctx context.Context, donePk *packet.CmdDonePacketType) (*sstore.ShellStatePtr, map[string]string, *packet.ShellState, *packet.ShellStateDiff, error) { if donePk.FinalState != nil { - donePk.FinalState = stripScVarsFromState(donePk.FinalState) + finalState := stripScVarsFromState(donePk.FinalState) + feState := sstore.FeStateFromShellState(finalState) + statePtr := &sstore.ShellStatePtr{BaseHash: finalState.GetHashVal(false)} + return statePtr, feState, finalState, nil, nil } if donePk.FinalStateDiff != nil { - donePk.FinalStateDiff = stripScVarsFromStateDiff(donePk.FinalStateDiff) + stateDiff := stripScVarsFromStateDiff(donePk.FinalStateDiff) + feState, err := msh.getFeStateFromDiff(stateDiff) + if err != nil { + return nil, nil, nil, nil, err + } + fullState := msh.StateMap.GetStateByHash(stateDiff.GetShellType(), stateDiff.BaseHash) + if fullState != nil { + sstore.StoreStateBase(ctx, fullState) + } + diffHashArr := append(([]string)(nil), donePk.FinalStateDiff.DiffHashArr...) + diffHashArr = append(diffHashArr, donePk.FinalStateDiff.GetHashVal(false)) + statePtr := &sstore.ShellStatePtr{BaseHash: donePk.FinalStateDiff.BaseHash, DiffHashArr: diffHashArr} + return statePtr, feState, nil, stateDiff, nil } - update, err := sstore.UpdateCmdDoneInfo(ctx, donePk.CK, donePk, sstore.CmdStatusDone) - if err != nil { - msh.WriteToPtyBuffer("*error updating cmddone: %v\n", err) + return nil, nil, nil, nil, nil +} + +func (msh *MShellProc) handleCmdDonePacket(rct *RunCmdType, donePk *packet.CmdDonePacketType) { + if rct == nil { + log.Printf("cmddone packet received, but no running command found for it %q\n", donePk.CK) return } - screen, err := sstore.UpdateScreenFocusForDoneCmd(ctx, donePk.CK.GetGroupId(), donePk.CK.GetCmdId()) - if err != nil { - msh.WriteToPtyBuffer("*error trying to update screen focus type: %v\n", err) - // fall-through (nothing to do) + // this will remove from RunningCmds and from PendingStateCmds + defer msh.RemoveRunningCmd(donePk.CK) + if rct.Ephemeral && rct.EphCancled.Load() { + // do nothing when an ephemeral command is canceled + return } - if screen != nil { - update.AddUpdate(*screen) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + update := scbus.MakeUpdatePacket() + if !rct.Ephemeral { + // only update DB for non-ephemeral commands + err := sstore.UpdateCmdDoneInfo(ctx, update, donePk.CK, donePk, sstore.CmdStatusDone) + if err != nil { + msh.WriteToPtyBuffer("*error updating cmddone: %v\n", err) + return + } + screen, err := sstore.UpdateScreenFocusForDoneCmd(ctx, donePk.CK.GetGroupId(), donePk.CK.GetCmdId()) + if err != nil { + msh.WriteToPtyBuffer("*error trying to update screen focus type: %v\n", err) + // fall-through (nothing to do) + } + if screen != nil { + update.AddUpdate(*screen) + } } - rct := msh.GetRunningCmd(donePk.CK) - var statePtr *sstore.ShellStatePtr - if donePk.FinalState != nil && rct != nil { - feState := sstore.FeStateFromShellState(donePk.FinalState) - remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, donePk.FinalState, nil) + // ephemeral commands *do* update the remote state + if donePk.FinalState != nil || donePk.FinalStateDiff != nil { + statePtr, feState, finalState, finalStateDiff, err := msh.makeStatePtrFromFinalState(ctx, donePk) + if err != nil { + msh.WriteToPtyBuffer("*error trying to read final command state: %v\n", err) + } + remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, finalState, finalStateDiff) if err != nil { msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err) // fall-through (nothing to do) @@ -2281,43 +2335,28 @@ func (msh *MShellProc) handleCmdDonePacket(donePk *packet.CmdDonePacketType) { if remoteInst != nil { update.AddUpdate(sstore.MakeSessionUpdateForRemote(rct.SessionId, remoteInst)) } - statePtr = &sstore.ShellStatePtr{BaseHash: donePk.FinalState.GetHashVal(false)} - } else if donePk.FinalStateDiff != nil && rct != nil { - feState, err := msh.getFeStateFromDiff(donePk.FinalStateDiff) - if err != nil { - msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err) - // fall-through (nothing to do) - } else { - stateDiff := donePk.FinalStateDiff - fullState := msh.StateMap.GetStateByHash(stateDiff.GetShellType(), stateDiff.BaseHash) - if fullState != nil { - sstore.StoreStateBase(ctx, fullState) - } - remoteInst, err := sstore.UpdateRemoteState(ctx, rct.SessionId, rct.ScreenId, rct.RemotePtr, feState, nil, stateDiff) + // ephemeral commands *do not* update cmd state (there is no command) + if statePtr != nil && !rct.Ephemeral { + err = sstore.UpdateCmdRtnState(ctx, donePk.CK, *statePtr) if err != nil { - msh.WriteToPtyBuffer("*error trying to update remotestate: %v\n", err) + msh.WriteToPtyBuffer("*error trying to update cmd rtnstate: %v\n", err) // fall-through (nothing to do) } - if remoteInst != nil { - update.AddUpdate(sstore.MakeSessionUpdateForRemote(rct.SessionId, remoteInst)) - } - diffHashArr := append(([]string)(nil), donePk.FinalStateDiff.DiffHashArr...) - diffHashArr = append(diffHashArr, donePk.FinalStateDiff.GetHashVal(false)) - statePtr = &sstore.ShellStatePtr{BaseHash: donePk.FinalStateDiff.BaseHash, DiffHashArr: diffHashArr} - } - } - if statePtr != nil { - err = sstore.UpdateCmdRtnState(ctx, donePk.CK, *statePtr) - if err != nil { - msh.WriteToPtyBuffer("*error trying to update cmd rtnstate: %v\n", err) - // fall-through (nothing to do) } } scbus.MainUpdateBus.DoUpdate(update) } -func (msh *MShellProc) handleCmdFinalPacket(finalPk *packet.CmdFinalPacketType) { +func (msh *MShellProc) handleCmdFinalPacket(rct *RunCmdType, finalPk *packet.CmdFinalPacketType) { + if rct == nil { + // this is somewhat expected, since cmddone should have removed the running command + return + } defer msh.RemoveRunningCmd(finalPk.CK) + if rct.Ephemeral { + // just remove the running command, but there is no DB state to update in this case + return + } rtnCmd, err := sstore.GetCmdByScreenId(context.Background(), finalPk.CK.GetGroupId(), finalPk.CK.GetCmdId()) if err != nil { log.Printf("error calling GetCmdById in handleCmdFinalPacket: %v\n", err) @@ -2350,31 +2389,31 @@ func (msh *MShellProc) handleCmdFinalPacket(finalPk *packet.CmdFinalPacketType) scbus.MainUpdateBus.DoUpdate(update) } -// TODO notify FE about cmd errors -func (msh *MShellProc) handleCmdErrorPacket(errPk *packet.CmdErrorPacketType) { - err := sstore.AppendCmdErrorPk(context.Background(), errPk) - if err != nil { - msh.WriteToPtyBuffer("cmderr> [remote %s] [error] adding cmderr: %v\n", msh.GetRemoteName(), err) - return - } -} - func (msh *MShellProc) ResetDataPos(ck base.CommandKey) { msh.DataPosMap.Delete(ck) } -func (msh *MShellProc) handleDataPacket(dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) { +func (msh *MShellProc) handleDataPacket(rct *RunCmdType, dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) { + if rct == nil { + ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, fmt.Errorf("no running cmd found")) + msh.ServerProc.Input.SendPacket(ack) + return + } realData, err := base64.StdEncoding.DecodeString(dataPk.Data64) if err != nil { ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, err) msh.ServerProc.Input.SendPacket(ack) return } + if rct.Ephemeral { + ack := makeDataAckPacket(dataPk.CK, dataPk.FdNum, len(realData), nil) + msh.ServerProc.Input.SendPacket(ack) + return + } var ack *packet.DataAckPacketType if len(realData) > 0 { dataPos := dataPosMap.Get(dataPk.CK) - rcmd := msh.GetRunningCmd(dataPk.CK) - update, err := sstore.AppendToCmdPtyBlob(context.Background(), rcmd.ScreenId, dataPk.CK.GetCmdId(), realData, dataPos) + update, err := sstore.AppendToCmdPtyBlob(context.Background(), rct.ScreenId, dataPk.CK.GetCmdId(), realData, dataPos) if err != nil { ack = makeDataAckPacket(dataPk.CK, dataPk.FdNum, 0, err) } else { @@ -2388,25 +2427,6 @@ func (msh *MShellProc) handleDataPacket(dataPk *packet.DataPacketType, dataPosMa if ack != nil { msh.ServerProc.Input.SendPacket(ack) } - // log.Printf("data %s fd=%d len=%d eof=%v err=%v\n", dataPk.CK, dataPk.FdNum, len(realData), dataPk.Eof, dataPk.Error) -} - -func (msh *MShellProc) makeHandleDataPacketClosure(dataPk *packet.DataPacketType, dataPosMap *utilfn.SyncMap[base.CommandKey, int64]) func() { - return func() { - msh.handleDataPacket(dataPk, dataPosMap) - } -} - -func (msh *MShellProc) makeHandleCmdDonePacketClosure(donePk *packet.CmdDonePacketType) func() { - return func() { - msh.handleCmdDonePacket(donePk) - } -} - -func (msh *MShellProc) makeHandleCmdFinalPacketClosure(finalPk *packet.CmdFinalPacketType) func() { - return func() { - msh.handleCmdFinalPacket(finalPk) - } } func sendScreenUpdates(screens []*sstore.ScreenType) { @@ -2417,6 +2437,45 @@ func sendScreenUpdates(screens []*sstore.ScreenType) { } } +func (msh *MShellProc) processSinglePacket(pk packet.PacketType) { + if _, ok := pk.(*packet.DataAckPacketType); ok { + // TODO process ack (need to keep track of buffer size for sending) + // this is low priority though since most input is coming from keyboard and won't overflow this buffer + return + } + if dataPk, ok := pk.(*packet.DataPacketType); ok { + runCmdUpdateFn(dataPk.CK, func() { + rct := msh.GetRunningCmd(dataPk.CK) + msh.handleDataPacket(rct, dataPk, msh.DataPosMap) + }) + go pushStatusIndicatorUpdate(&dataPk.CK, sstore.StatusIndicatorLevel_Output) + return + } + if donePk, ok := pk.(*packet.CmdDonePacketType); ok { + runCmdUpdateFn(donePk.CK, func() { + rct := msh.GetRunningCmd(donePk.CK) + msh.handleCmdDonePacket(rct, donePk) + }) + return + } + if finalPk, ok := pk.(*packet.CmdFinalPacketType); ok { + runCmdUpdateFn(finalPk.CK, func() { + rct := msh.GetRunningCmd(finalPk.CK) + msh.handleCmdFinalPacket(rct, finalPk) + }) + return + } + if msgPk, ok := pk.(*packet.MessagePacketType); ok { + msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPk.CK, msgPk.Message) + return + } + if rawPk, ok := pk.(*packet.RawPacketType); ok { + msh.WriteToPtyBuffer("stderr> [remote %s] %s\n", msh.GetRemoteName(), rawPk.Data) + return + } + msh.WriteToPtyBuffer("MSH> [remote %s] unhandled packet %s\n", msh.GetRemoteName(), packet.AsString(pk)) +} + func (msh *MShellProc) ProcessPackets() { defer msh.WithLock(func() { if msh.Status == StatusConnected { @@ -2433,53 +2492,7 @@ func (msh *MShellProc) ProcessPackets() { } }) for pk := range msh.ServerProc.Output.MainCh { - if pk.GetType() == packet.DataPacketStr { - dataPk := pk.(*packet.DataPacketType) - runCmdUpdateFn(dataPk.CK, msh.makeHandleDataPacketClosure(dataPk, msh.DataPosMap)) - go pushStatusIndicatorUpdate(&dataPk.CK, sstore.StatusIndicatorLevel_Output) - continue - } - if pk.GetType() == packet.DataAckPacketStr { - // TODO process ack (need to keep track of buffer size for sending) - // this is low priority though since most input is coming from keyboard and won't overflow this buffer - continue - } - if pk.GetType() == packet.CmdDataPacketStr { - dataPacket := pk.(*packet.CmdDataPacketType) - go msh.WriteToPtyBuffer("cmd-data> [remote %s] [%s] pty=%d run=%d\n", msh.GetRemoteName(), dataPacket.CK, dataPacket.PtyDataLen, dataPacket.RunDataLen) - go pushStatusIndicatorUpdate(&dataPacket.CK, sstore.StatusIndicatorLevel_Output) - continue - } - if pk.GetType() == packet.CmdDonePacketStr { - donePk := pk.(*packet.CmdDonePacketType) - runCmdUpdateFn(donePk.CK, msh.makeHandleCmdDonePacketClosure(donePk)) - continue - } - if pk.GetType() == packet.CmdFinalPacketStr { - finalPk := pk.(*packet.CmdFinalPacketType) - runCmdUpdateFn(finalPk.CK, msh.makeHandleCmdFinalPacketClosure(finalPk)) - continue - } - if pk.GetType() == packet.CmdErrorPacketStr { - msh.handleCmdErrorPacket(pk.(*packet.CmdErrorPacketType)) - continue - } - if pk.GetType() == packet.MessagePacketStr { - msgPacket := pk.(*packet.MessagePacketType) - msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPacket.CK, msgPacket.Message) - continue - } - if pk.GetType() == packet.RawPacketStr { - rawPacket := pk.(*packet.RawPacketType) - msh.WriteToPtyBuffer("stderr> [remote %s] %s\n", msh.GetRemoteName(), rawPacket.Data) - continue - } - if pk.GetType() == packet.CmdStartPacketStr { - startPk := pk.(*packet.CmdStartPacketType) - msh.WriteToPtyBuffer("start> [remote %s] reqid=%s (%p)\n", msh.GetRemoteName(), startPk.RespId, msh.ServerProc.Output) - continue - } - msh.WriteToPtyBuffer("MSH> [remote %s] unhandled packet %s\n", msh.GetRemoteName(), packet.AsString(pk)) + msh.processSinglePacket(pk) } } diff --git a/wavesrv/pkg/sstore/dbops.go b/wavesrv/pkg/sstore/dbops.go index e4d6685dd..8accfbf82 100644 --- a/wavesrv/pkg/sstore/dbops.go +++ b/wavesrv/pkg/sstore/dbops.go @@ -916,12 +916,12 @@ func UpdateCmdForRestart(ctx context.Context, ck base.CommandKey, ts int64, cmdP }) } -func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.CmdDonePacketType, status string) (*scbus.ModelUpdatePacketType, error) { +func UpdateCmdDoneInfo(ctx context.Context, update *scbus.ModelUpdatePacketType, ck base.CommandKey, donePk *packet.CmdDonePacketType, status string) error { if donePk == nil { - return nil, fmt.Errorf("invalid cmddone packet") + return fmt.Errorf("invalid cmddone packet") } if ck.IsEmpty() { - return nil, fmt.Errorf("cannot update cmddoneinfo, empty ck") + return fmt.Errorf("cannot update cmddoneinfo, empty ck") } screenId := ck.GetGroupId() var rtnCmd *CmdType @@ -944,15 +944,12 @@ func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.C return nil }) if txErr != nil { - return nil, txErr + return txErr } if rtnCmd == nil { - return nil, fmt.Errorf("cmd data not found for ck[%s]", ck) + return fmt.Errorf("cmd data not found for ck[%s]", ck) } - - update := scbus.MakeUpdatePacket() update.AddUpdate(*rtnCmd) - // Update in-memory screen indicator status var indicator StatusIndicatorLevel if rtnCmd.ExitCode == 0 { @@ -960,15 +957,13 @@ func UpdateCmdDoneInfo(ctx context.Context, ck base.CommandKey, donePk *packet.C } else { indicator = StatusIndicatorLevel_Error } - err := SetStatusIndicatorLevel_Update(ctx, update, screenId, indicator, false) if err != nil { // This is not a fatal error, so just log it log.Printf("error setting status indicator level after done packet: %v\n", err) } IncrementNumRunningCmds_Update(update, screenId, -1) - - return update, nil + return nil } func UpdateCmdRtnState(ctx context.Context, ck base.CommandKey, statePtr ShellStatePtr) error { @@ -991,18 +986,6 @@ func UpdateCmdRtnState(ctx context.Context, ck base.CommandKey, statePtr ShellSt return nil } -func AppendCmdErrorPk(ctx context.Context, errPk *packet.CmdErrorPacketType) error { - if errPk == nil || errPk.CK.IsEmpty() { - return fmt.Errorf("invalid cmderror packet (no ck)") - } - screenId := errPk.CK.GetGroupId() - return WithTx(ctx, func(tx *TxWrap) error { - query := `UPDATE cmd SET runout = json_insert(runout, '$[#]', ?) WHERE screenid = ? AND lineid = ?` - tx.Exec(query, quickJson(errPk), screenId, lineIdFromCK(errPk.CK)) - return nil - }) -} - func ReInitFocus(ctx context.Context) error { return WithTx(ctx, func(tx *TxWrap) error { query := `UPDATE screen SET focustype = 'input'`