diff --git a/main-mshell.go b/main-mshell.go index 4e58eba23..351130a0c 100644 --- a/main-mshell.go +++ b/main-mshell.go @@ -13,6 +13,7 @@ import ( "strings" "github.com/scripthaus-dev/mshell/pkg/base" + "github.com/scripthaus-dev/mshell/pkg/cmdtail" "github.com/scripthaus-dev/mshell/pkg/packet" "github.com/scripthaus-dev/mshell/pkg/server" "github.com/scripthaus-dev/mshell/pkg/shexec" @@ -73,13 +74,13 @@ import ( // }() // } -// func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error { -// err := tailer.AddWatch(pk) -// if err != nil { -// return err -// } -// return nil -// } +func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error { + err := tailer.AddWatch(pk) + if err != nil { + return err + } + return nil +} // func doMain() { // homeDir := base.GetHomeDir() @@ -176,11 +177,16 @@ func handleSingle() { return } if runPacket.Detached { - err := shexec.RunCommandDetached(runPacket, sender) + cmd, startPk, err := shexec.RunCommandDetached(runPacket, sender) if err != nil { sender.SendErrorResponse(runPacket.ReqId, err) return } + sender.SendPacket(startPk) + sender.Close() + sender.WaitForDone() + cmd.DetachedWait(startPk) + return } else { cmd, err := shexec.RunCommandSimple(runPacket, sender) if err != nil { @@ -188,7 +194,7 @@ func handleSingle() { return } defer cmd.Close() - startPacket := cmd.MakeCmdStartPacket() + startPacket := cmd.MakeCmdStartPacket(runPacket.ReqId) sender.SendPacket(startPacket) cmd.RunRemoteIOAndWait(packetParser, sender) return diff --git a/pkg/cmdtail/cmdtail.go b/pkg/cmdtail/cmdtail.go index c301c0cfc..d39c56e11 100644 --- a/pkg/cmdtail/cmdtail.go +++ b/pkg/cmdtail/cmdtail.go @@ -7,6 +7,7 @@ package cmdtail import ( + "encoding/base64" "fmt" "io" "os" @@ -89,6 +90,12 @@ func (t *Tailer) updateTailPos_nolock(cmdKey base.CommandKey, reqId string, pos t.WatchList[cmdKey] = entry } +func (t *Tailer) removeTailPos(cmdKey base.CommandKey, reqId string) { + t.Lock.Lock() + defer t.Lock.Unlock() + t.removeTailPos_nolock(cmdKey, reqId) +} + func (t *Tailer) removeTailPos_nolock(cmdKey base.CommandKey, reqId string) { entry, found := t.WatchList[cmdKey] if !found { @@ -107,16 +114,6 @@ func (t *Tailer) removeTailPos_nolock(cmdKey base.CommandKey, reqId string) { t.Watcher.Remove(fileNames.RunnerOutFile) } -func (t *Tailer) updateEntrySizes_nolock(cmdKey base.CommandKey, ptyLen int64, runLen int64) { - entry, found := t.WatchList[cmdKey] - if !found { - return - } - entry.FilePtyLen = ptyLen - entry.FileRunLen = runLen - t.WatchList[cmdKey] = entry -} - func (t *Tailer) getEntryAndPos_nolock(cmdKey base.CommandKey, reqId string) (CmdWatchEntry, TailPos, bool) { entry, found := t.WatchList[cmdKey] if !found { @@ -159,90 +156,98 @@ func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]b return buf[0:nr], nil } -func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, entry CmdWatchEntry, pos TailPos) *packet.CmdDataPacketType { - dataPacket := packet.MakeCmdDataPacket() - dataPacket.RespId = pos.ReqId +func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, entry CmdWatchEntry, pos TailPos) (*packet.CmdDataPacketType, error) { + dataPacket := packet.MakeCmdDataPacket(pos.ReqId) dataPacket.CK = entry.CmdKey dataPacket.PtyPos = pos.TailPtyPos dataPacket.RunPos = pos.TailRunPos if entry.FilePtyLen > pos.TailPtyPos { ptyData, err := t.readDataFromFile(fileNames.PtyOutFile, pos.TailPtyPos, MaxDataBytes) if err != nil { - dataPacket.Error = err.Error() - return dataPacket + return nil, err } - dataPacket.PtyData = string(ptyData) + dataPacket.PtyData64 = base64.StdEncoding.EncodeToString(ptyData) dataPacket.PtyDataLen = len(ptyData) } if entry.FileRunLen > pos.TailRunPos { runData, err := t.readDataFromFile(fileNames.RunnerOutFile, pos.TailRunPos, MaxDataBytes) if err != nil { - dataPacket.Error = err.Error() - return dataPacket + return nil, err } - dataPacket.RunData = string(runData) + dataPacket.RunData64 = base64.StdEncoding.EncodeToString(runData) dataPacket.RunDataLen = len(runData) } - return dataPacket + return dataPacket, nil } // returns (data-packet, keepRunning) -func (t *Tailer) runSingleDataTransfer(key base.CommandKey, reqId string) (*packet.CmdDataPacketType, bool) { +func (t *Tailer) runSingleDataTransfer(key base.CommandKey, reqId string) (*packet.CmdDataPacketType, bool, error) { t.Lock.Lock() entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId) t.Lock.Unlock() if !foundPos { - return nil, false + return nil, false, nil } fileNames := base.MakeCommandFileNamesWithHome(t.MHomeDir, key) - dataPacket := t.makeCmdDataPacket(fileNames, entry, pos) + dataPacket, dataErr := t.makeCmdDataPacket(fileNames, entry, pos) t.Lock.Lock() defer t.Lock.Unlock() entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId) if !foundPos { - return nil, false + return nil, false, nil } // pos was updated between first and second get, throw out data-packet and re-run if pos.TailPtyPos != dataPacket.PtyPos || pos.TailRunPos != dataPacket.RunPos { - return nil, true + return nil, true, nil } - if dataPacket.Error != "" { + if dataErr != nil { // error, so return error packet, and stop running pos.Running = false t.updateTailPos_nolock(key, reqId, pos) - return dataPacket, false + return nil, false, dataErr } - pos.TailPtyPos += int64(len(dataPacket.PtyData)) - pos.TailRunPos += int64(len(dataPacket.RunData)) + pos.TailPtyPos += int64(dataPacket.PtyDataLen) + pos.TailRunPos += int64(dataPacket.RunDataLen) if pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen { // we caught up, tail position equals file length pos.Running = false } t.updateTailPos_nolock(key, reqId, pos) - return dataPacket, pos.Running + return dataPacket, pos.Running, nil } -func (t *Tailer) checkRemoveNoFollow(cmdKey base.CommandKey, reqId string) { +// returns (removed) +func (t *Tailer) checkRemoveNoFollow(cmdKey base.CommandKey, reqId string) bool { t.Lock.Lock() defer t.Lock.Unlock() _, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId) if !foundPos { - return + return false } if !pos.Follow { t.removeTailPos_nolock(cmdKey, reqId) + return true } + return false } func (t *Tailer) RunDataTransfer(key base.CommandKey, reqId string) { for { - dataPacket, keepRunning := t.runSingleDataTransfer(key, reqId) + dataPacket, keepRunning, err := t.runSingleDataTransfer(key, reqId) if dataPacket != nil { t.Sender.SendPacket(dataPacket) } + if err != nil { + t.removeTailPos(key, reqId) + t.Sender.SendErrorResponse(reqId, err) + break + } if !keepRunning { - t.checkRemoveNoFollow(key, reqId) + removed := t.checkRemoveNoFollow(key, reqId) + if removed { + t.Sender.SendResponse(reqId, true) + } break } time.Sleep(10 * time.Millisecond) @@ -254,7 +259,6 @@ func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) { return } if pos.IsCurrent(entry) { - return } pos.Running = true @@ -344,6 +348,19 @@ func (t *Tailer) RemoveWatch(pk *packet.UntailCmdPacketType) { t.removeTailPos_nolock(pk.CK, pk.ReqId) } +func (t *Tailer) AddFileWatches_nolock(fileNames *base.CommandFileNames) error { + err := t.Watcher.Add(fileNames.PtyOutFile) + if err != nil { + return err + } + err = t.Watcher.Add(fileNames.RunnerOutFile) + if err != nil { + t.Watcher.Remove(fileNames.PtyOutFile) // best effort clean up + return err + } + return nil +} + func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error { if err := getPacket.CK.Validate("getcmd"); err != nil { return err @@ -357,16 +374,7 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error { key := getPacket.CK entry, foundEntry := t.WatchList[key] if !foundEntry { - // add watches, initialize entry - err := t.Watcher.Add(fileNames.PtyOutFile) - if err != nil { - return err - } - err = t.Watcher.Add(fileNames.RunnerOutFile) - if err != nil { - t.Watcher.Remove(fileNames.PtyOutFile) // best effort clean up - return err - } + // initialize entry, add watches entry = CmdWatchEntry{CmdKey: key} entry.fillFilePos(t.MHomeDir) } @@ -387,6 +395,14 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error { pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative } entry.updateTailPos(pos.ReqId, pos) + if !pos.Follow && pos.IsCurrent(entry) { + // don't add to t.WatchList, don't t.AddFileWatches_nolock, send rpc response + go func() { t.Sender.SendResponse(getPacket.ReqId, true) }() + return nil + } + if !foundEntry { + t.AddFileWatches_nolock(fileNames) + } t.WatchList[key] = entry t.tryStartRun_nolock(entry, pos) return nil diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index dcdd5f093..b6142406d 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -109,12 +109,10 @@ type CmdDataPacketType struct { PtyLen int64 `json:"ptylen"` RunPos int64 `json:"runpos"` RunLen int64 `json:"runlen"` - PtyData string `json:"ptydata"` + PtyData64 string `json:"ptydata64"` PtyDataLen int `json:"ptydatalen"` - RunData string `json:"rundata"` + RunData64 string `json:"rundata64"` RunDataLen int `json:"rundatalen"` - Error string `json:"error"` - NotFound bool `json:"notfound,omitempty"` } func (*CmdDataPacketType) GetType() string { @@ -125,8 +123,12 @@ func (p *CmdDataPacketType) GetResponseId() string { return p.RespId } -func MakeCmdDataPacket() *CmdDataPacketType { - return &CmdDataPacketType{Type: CmdDataPacketStr} +func (*CmdDataPacketType) GetResponseDone() bool { + return false +} + +func MakeCmdDataPacket(reqId string) *CmdDataPacketType { + return &CmdDataPacketType{Type: CmdDataPacketStr, RespId: reqId} } type PingPacketType struct { @@ -326,6 +328,10 @@ func (p *ResponsePacketType) GetResponseId() string { return p.RespId } +func (*ResponsePacketType) GetResponseDone() bool { + return true +} + func MakeErrorResponsePacket(reqId string, err error) *ResponsePacketType { return &ResponsePacketType{Type: ResponsePacketStr, RespId: reqId, Error: err.Error()} } @@ -421,8 +427,8 @@ func (p *CmdDonePacketType) GetCK() base.CommandKey { return p.CK } -func MakeCmdDonePacket() *CmdDonePacketType { - return &CmdDonePacketType{Type: CmdDonePacketStr} +func MakeCmdDonePacket(ck base.CommandKey) *CmdDonePacketType { + return &CmdDonePacketType{Type: CmdDonePacketStr, CK: ck} } type CmdStartPacketType struct { @@ -442,8 +448,12 @@ func (p *CmdStartPacketType) GetResponseId() string { return p.RespId } -func MakeCmdStartPacket() *CmdStartPacketType { - return &CmdStartPacketType{Type: CmdStartPacketStr} +func (*CmdStartPacketType) GetResponseDone() bool { + return true +} + +func MakeCmdStartPacket(reqId string) *CmdStartPacketType { + return &CmdStartPacketType{Type: CmdStartPacketStr, RespId: reqId} } type TermSize struct { @@ -534,6 +544,7 @@ type RpcPacketType interface { type RpcResponsePacketType interface { GetType() string GetResponseId() string + GetResponseDone() bool } type CommandPacketType interface { diff --git a/pkg/packet/parser.go b/pkg/packet/parser.go index 5d2c91e32..d09dac47b 100644 --- a/pkg/packet/parser.go +++ b/pkg/packet/parser.go @@ -8,6 +8,7 @@ package packet import ( "bufio" + "context" "io" "strconv" "strings" @@ -17,9 +18,15 @@ import ( type PacketParser struct { Lock *sync.Mutex MainCh chan PacketType + RpcMap map[string]*RpcEntry Err error } +type RpcEntry struct { + ReqId string + RespCh chan RpcResponsePacketType +} + func CombinePacketParsers(p1 *PacketParser, p2 *PacketParser) *PacketParser { rtnParser := &PacketParser{ Lock: &sync.Mutex{}, @@ -46,6 +53,70 @@ func CombinePacketParsers(p1 *PacketParser, p2 *PacketParser) *PacketParser { return rtnParser } +// should have already registered rpc +func (p *PacketParser) WaitForResponse(ctx context.Context, reqId string) RpcResponsePacketType { + entry := p.getRpcEntry(reqId, false) + if entry == nil { + return nil + } + defer p.UnRegisterRpc(reqId) + select { + case resp := <-entry.RespCh: + return resp + case <-ctx.Done(): + return nil + } +} + +func (p *PacketParser) UnRegisterRpc(reqId string) { + p.Lock.Lock() + defer p.Lock.Unlock() + entry := p.RpcMap[reqId] + if entry != nil { + close(entry.RespCh) + delete(p.RpcMap, reqId) + } +} + +func (p *PacketParser) RegisterRpc(reqId string, queueSize int) chan RpcResponsePacketType { + p.Lock.Lock() + defer p.Lock.Unlock() + ch := make(chan RpcResponsePacketType, queueSize) + entry := &RpcEntry{ReqId: reqId, RespCh: ch} + p.RpcMap[reqId] = entry + return ch +} + +func (p *PacketParser) getRpcEntry(reqId string, remove bool) *RpcEntry { + p.Lock.Lock() + defer p.Lock.Unlock() + entry := p.RpcMap[reqId] + if entry != nil && remove { + delete(p.RpcMap, reqId) + close(entry.RespCh) + } + return entry +} + +func (p *PacketParser) trySendRpcResponse(respPk RpcResponsePacketType) bool { + p.Lock.Lock() + defer p.Lock.Unlock() + entry := p.RpcMap[respPk.GetResponseId()] + if entry == nil { + return false + } + // nonblocking send + select { + case entry.RespCh <- respPk: + default: + } + if respPk.GetResponseDone() { + delete(p.RpcMap, respPk.GetResponseId()) + close(entry.RespCh) + } + return true +} + func (p *PacketParser) GetErr() error { p.Lock.Lock() defer p.Lock.Unlock() @@ -108,6 +179,12 @@ func MakePacketParser(input io.Reader) *PacketParser { if pk.GetType() == PingPacketStr { continue } + if respPk, ok := pk.(RpcResponsePacketType); ok { + sent := parser.trySendRpcResponse(respPk) + if sent { + continue + } + } parser.MainCh <- pk } }() diff --git a/pkg/server/server.go b/pkg/server/server.go index fa736aa50..ab66c35e5 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -161,6 +161,8 @@ func RunServer() (int, error) { if server.Debug { fmt.Printf("PK> %s\n", packet.AsString(pk)) } + + // run-start combo ok, runPacket := builder.ProcessPacket(pk) if server.Debug { fmt.Printf("PP> %s | %v\n", pk.GetType(), ok) @@ -179,6 +181,8 @@ func RunServer() (int, error) { server.Sender.SendPacket(startPk) continue } + + // command packet if cmdPk, ok := pk.(packet.CommandPacketType); ok { server.ProcessCommandPacket(cmdPk) continue diff --git a/pkg/shexec/shexec.go b/pkg/shexec/shexec.go index 5981fdcad..8eb2c46f5 100644 --- a/pkg/shexec/shexec.go +++ b/pkg/shexec/shexec.go @@ -129,8 +129,8 @@ func (c *ShExecType) Close() { } } -func (c *ShExecType) MakeCmdStartPacket() *packet.CmdStartPacketType { - startPacket := packet.MakeCmdStartPacket() +func (c *ShExecType) MakeCmdStartPacket(reqId string) *packet.CmdStartPacketType { + startPacket := packet.MakeCmdStartPacket(reqId) startPacket.Ts = time.Now().UnixMilli() startPacket.CK = c.CK startPacket.Pid = c.Cmd.Process.Pid @@ -848,21 +848,67 @@ func SetupSignalsForDetach() { }() } -func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) error { +func (cmd *ShExecType) DetachedWait(startPacket *packet.CmdStartPacketType) { + // after Start(), any output/errors must go to DetachedOutput + // close stdin/stdout/stderr, but wait for cmdstart packet to get sent + nullFd, err := os.OpenFile("/dev/null", os.O_RDWR, 0) + if err != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot open /dev/null: %w", err)) + } + if nullFd != nil { + err := unix.Dup2(int(nullFd.Fd()), int(os.Stdin.Fd())) + if err != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) + } + err = unix.Dup2(int(nullFd.Fd()), int(os.Stdout.Fd())) + if err != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) + } + err = unix.Dup2(int(nullFd.Fd()), int(os.Stderr.Fd())) + if err != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) + } + } + cmd.DetachedOutput.SendPacket(startPacket) + ptyOutFd, err := os.OpenFile(cmd.FileNames.PtyOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot open ptyout file '%s': %w", cmd.FileNames.PtyOutFile, err)) + // don't return (command is already running) + } + go func() { + // copy pty output to .ptyout file + _, copyErr := io.Copy(ptyOutFd, cmd.CmdPty) + if copyErr != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("copying pty output to ptyout file: %w", copyErr)) + } + }() + go func() { + // copy .stdin fifo contents to pty input + copyFifoErr := MakeAndCopyStdinFifo(cmd.CmdPty, cmd.FileNames.StdinFifo) + if copyFifoErr != nil { + cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("reading from stdin fifo: %w", copyFifoErr)) + } + }() + donePacket := cmd.WaitForCommand() + cmd.DetachedOutput.SendPacket(donePacket) + return +} + +func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) { fileNames, err := base.GetCommandFileNames(pk.CK) if err != nil { - return err + return nil, nil, err } ptyOutInfo, err := os.Stat(fileNames.PtyOutFile) if err == nil { // non-nil error will be caught by regular OpenFile below // must have size 0 if ptyOutInfo.Size() != 0 { - return fmt.Errorf("cmdkey '%s' was already used (ptyout len=%d)", pk.CK, ptyOutInfo.Size()) + return nil, nil, fmt.Errorf("cmdkey '%s' was already used (ptyout len=%d)", pk.CK, ptyOutInfo.Size()) } } cmdPty, cmdTty, err := pty.Open() if err != nil { - return fmt.Errorf("opening new pty: %w", err) + return nil, nil, fmt.Errorf("opening new pty: %w", err) } pty.Setsize(cmdPty, GetWinsize(pk)) defer func() { @@ -874,72 +920,26 @@ func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) e cmd.Detached = true cmd.RunnerOutFd, err = os.OpenFile(fileNames.RunnerOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) if err != nil { - return fmt.Errorf("cannot open runout file '%s': %w", fileNames.RunnerOutFile, err) - } - nullFd, err := os.OpenFile("/dev/null", os.O_RDWR, 0) - if err != nil { - return fmt.Errorf("cannot open /dev/null: %w", err) + return nil, nil, fmt.Errorf("cannot open runout file '%s': %w", fileNames.RunnerOutFile, err) } cmd.DetachedOutput = packet.MakePacketSender(cmd.RunnerOutFd) ecmd, err := MakeDetachedExecCmd(pk, cmdTty) if err != nil { - return err + return nil, nil, err } cmd.Cmd = ecmd SetupSignalsForDetach() err = ecmd.Start() if err != nil { - return fmt.Errorf("starting command: %w", err) + return nil, nil, fmt.Errorf("starting command: %w", err) } for _, fd := range ecmd.ExtraFiles { if fd != cmdTty { fd.Close() } } - // after Start(), any errors must go to DetachedOutput - // close stdin/stdout/stderr, but wait for cmdstart packet to get sent - startPacket := cmd.MakeCmdStartPacket() - go func() { - sender.SendPacket(startPacket) - sender.Close() - sender.WaitForDone() - fmt.Printf("sender done! start: %v\n", startPacket) - err = unix.Dup2(int(nullFd.Fd()), int(os.Stdin.Fd())) - if err != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) - } - err = unix.Dup2(int(nullFd.Fd()), int(os.Stdout.Fd())) - if err != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) - } - err = unix.Dup2(int(nullFd.Fd()), int(os.Stderr.Fd())) - if err != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("cannot dup2 stdin to /dev/null: %w", err)) - } - cmd.DetachedOutput.SendPacket(startPacket) - }() - ptyOutFd, err := os.OpenFile(fileNames.PtyOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) - if err != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("cannot open ptyout file '%s': %w", fileNames.PtyOutFile, err)) - // don't return (command is already running) - } - go func() { - // copy pty output to .ptyout file - _, copyErr := io.Copy(ptyOutFd, cmdPty) - if copyErr != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("copying pty output to ptyout file: %w", copyErr)) - } - }() - go func() { - // copy .stdin fifo contents to pty input - copyFifoErr := MakeAndCopyStdinFifo(cmdPty, fileNames.StdinFifo) - if copyFifoErr != nil { - cmd.DetachedOutput.SendCmdError(pk.CK, fmt.Errorf("reading from stdin fifo: %w", copyFifoErr)) - } - }() - donePacket := cmd.WaitForCommand() - cmd.DetachedOutput.SendPacket(donePacket) - return nil + startPacket := cmd.MakeCmdStartPacket(pk.ReqId) + return cmd, startPacket, nil } func GetExitCode(err error) int { @@ -958,9 +958,8 @@ func (c *ShExecType) WaitForCommand() *packet.CmdDonePacketType { endTs := time.Now() cmdDuration := endTs.Sub(c.StartTs) exitCode := GetExitCode(exitErr) - donePacket := packet.MakeCmdDonePacket() + donePacket := packet.MakeCmdDonePacket(c.CK) donePacket.Ts = endTs.UnixMilli() - donePacket.CK = c.CK donePacket.ExitCode = exitCode donePacket.DurationMs = int64(cmdDuration / time.Millisecond) if c.FileNames != nil {