mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-02-12 01:01:50 +01:00
remove two unused packet types, remove unused detatched command code
This commit is contained in:
parent
d1baf504ba
commit
8656387329
@ -61,15 +61,7 @@ func handleSingle() {
|
|||||||
sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("run packets from server must have a CK: %v", err))
|
sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("run packets from server must have a CK: %v", err))
|
||||||
}
|
}
|
||||||
if runPacket.Detached {
|
if runPacket.Detached {
|
||||||
cmd, startPk, err := shexec.RunCommandDetached(runPacket, sender)
|
sender.SendErrorResponse(runPacket.ReqId, fmt.Errorf("detached mode not supported"))
|
||||||
if err != nil {
|
|
||||||
sender.SendErrorResponse(runPacket.ReqId, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sender.SendPacket(startPk)
|
|
||||||
sender.Close()
|
|
||||||
sender.WaitForDone()
|
|
||||||
cmd.DetachedWait(startPk)
|
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
shexec.IgnoreSigPipe()
|
shexec.IgnoreSigPipe()
|
||||||
|
@ -1,473 +0,0 @@
|
|||||||
// Copyright 2023, Command Line Inc.
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
|
|
||||||
package cmdtail
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"regexp"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
|
||||||
"github.com/wavetermdev/waveterm/waveshell/pkg/base"
|
|
||||||
"github.com/wavetermdev/waveterm/waveshell/pkg/packet"
|
|
||||||
)
|
|
||||||
|
|
||||||
const MaxDataBytes = 4096
|
|
||||||
const FileTypePty = "ptyout"
|
|
||||||
const FileTypeRun = "runout"
|
|
||||||
|
|
||||||
type Tailer struct {
|
|
||||||
Lock *sync.Mutex
|
|
||||||
WatchList map[base.CommandKey]CmdWatchEntry
|
|
||||||
Watcher *fsnotify.Watcher
|
|
||||||
Sender *packet.PacketSender
|
|
||||||
Gen FileNameGenerator
|
|
||||||
Sessions map[string]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type TailPos struct {
|
|
||||||
ReqId string
|
|
||||||
Running bool // an active tailer sending data
|
|
||||||
TailPtyPos int64
|
|
||||||
TailRunPos int64
|
|
||||||
Follow bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type CmdWatchEntry struct {
|
|
||||||
CmdKey base.CommandKey
|
|
||||||
FilePtyLen int64
|
|
||||||
FileRunLen int64
|
|
||||||
Tails []TailPos
|
|
||||||
Done bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type FileNameGenerator interface {
|
|
||||||
PtyOutFile(ck base.CommandKey) string
|
|
||||||
RunOutFile(ck base.CommandKey) string
|
|
||||||
SessionDir(sessionId string) string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w CmdWatchEntry) getTailPos(reqId string) (TailPos, bool) {
|
|
||||||
for _, pos := range w.Tails {
|
|
||||||
if pos.ReqId == reqId {
|
|
||||||
return pos, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return TailPos{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *CmdWatchEntry) updateTailPos(reqId string, newPos TailPos) {
|
|
||||||
for idx, pos := range w.Tails {
|
|
||||||
if pos.ReqId == reqId {
|
|
||||||
w.Tails[idx] = newPos
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
w.Tails = append(w.Tails, newPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *CmdWatchEntry) removeTailPos(reqId string) {
|
|
||||||
var newTails []TailPos
|
|
||||||
for _, pos := range w.Tails {
|
|
||||||
if pos.ReqId == reqId {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newTails = append(newTails, pos)
|
|
||||||
}
|
|
||||||
w.Tails = newTails
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pos TailPos) IsCurrent(entry CmdWatchEntry) bool {
|
|
||||||
return pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) updateTailPos_nolock(cmdKey base.CommandKey, reqId string, pos TailPos) {
|
|
||||||
entry, found := t.WatchList[cmdKey]
|
|
||||||
if !found {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
entry.updateTailPos(reqId, pos)
|
|
||||||
t.WatchList[cmdKey] = entry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) removeTailPos(cmdKey base.CommandKey, reqId string) {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
t.removeTailPos_nolock(cmdKey, reqId)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) removeTailPos_nolock(cmdKey base.CommandKey, reqId string) {
|
|
||||||
entry, found := t.WatchList[cmdKey]
|
|
||||||
if !found {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
entry.removeTailPos(reqId)
|
|
||||||
t.WatchList[cmdKey] = entry
|
|
||||||
if len(entry.Tails) == 0 {
|
|
||||||
t.removeWatch_nolock(cmdKey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) removeWatch_nolock(cmdKey base.CommandKey) {
|
|
||||||
// delete from watchlist, remove watches
|
|
||||||
delete(t.WatchList, cmdKey)
|
|
||||||
t.Watcher.Remove(t.Gen.PtyOutFile(cmdKey))
|
|
||||||
t.Watcher.Remove(t.Gen.RunOutFile(cmdKey))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) getEntryAndPos_nolock(cmdKey base.CommandKey, reqId string) (CmdWatchEntry, TailPos, bool) {
|
|
||||||
entry, found := t.WatchList[cmdKey]
|
|
||||||
if !found {
|
|
||||||
return CmdWatchEntry{}, TailPos{}, false
|
|
||||||
}
|
|
||||||
pos, found := entry.getTailPos(reqId)
|
|
||||||
if !found {
|
|
||||||
return CmdWatchEntry{}, TailPos{}, false
|
|
||||||
}
|
|
||||||
return entry, pos, true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) addSessionWatcher(sessionId string) error {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
|
|
||||||
if t.Sessions[sessionId] {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
sdir := t.Gen.SessionDir(sessionId)
|
|
||||||
err := t.Watcher.Add(sdir)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.Sessions[sessionId] = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) removeSessionWatcher(sessionId string) {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
|
|
||||||
if !t.Sessions[sessionId] {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sdir := t.Gen.SessionDir(sessionId)
|
|
||||||
t.Watcher.Remove(sdir)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeTailer(sender *packet.PacketSender, gen FileNameGenerator) (*Tailer, error) {
|
|
||||||
rtn := &Tailer{
|
|
||||||
Lock: &sync.Mutex{},
|
|
||||||
WatchList: make(map[base.CommandKey]CmdWatchEntry),
|
|
||||||
Sessions: make(map[string]bool),
|
|
||||||
Sender: sender,
|
|
||||||
Gen: gen,
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
rtn.Watcher, err = fsnotify.NewWatcher()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return rtn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]byte, error) {
|
|
||||||
fd, err := os.Open(fileName)
|
|
||||||
defer fd.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
buf := make([]byte, maxBytes)
|
|
||||||
nr, err := fd.ReadAt(buf, pos)
|
|
||||||
if err != nil && err != io.EOF { // ignore EOF error
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return buf[0:nr], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) makeCmdDataPacket(entry CmdWatchEntry, pos TailPos) (*packet.CmdDataPacketType, error) {
|
|
||||||
dataPacket := packet.MakeCmdDataPacket(pos.ReqId)
|
|
||||||
dataPacket.CK = entry.CmdKey
|
|
||||||
dataPacket.PtyPos = pos.TailPtyPos
|
|
||||||
dataPacket.RunPos = pos.TailRunPos
|
|
||||||
if entry.FilePtyLen > pos.TailPtyPos {
|
|
||||||
ptyData, err := t.readDataFromFile(t.Gen.PtyOutFile(entry.CmdKey), pos.TailPtyPos, MaxDataBytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dataPacket.PtyData64 = base64.StdEncoding.EncodeToString(ptyData)
|
|
||||||
dataPacket.PtyDataLen = len(ptyData)
|
|
||||||
}
|
|
||||||
if entry.FileRunLen > pos.TailRunPos {
|
|
||||||
runData, err := t.readDataFromFile(t.Gen.RunOutFile(entry.CmdKey), pos.TailRunPos, MaxDataBytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dataPacket.RunData64 = base64.StdEncoding.EncodeToString(runData)
|
|
||||||
dataPacket.RunDataLen = len(runData)
|
|
||||||
}
|
|
||||||
return dataPacket, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns (data-packet, keepRunning)
|
|
||||||
func (t *Tailer) runSingleDataTransfer(key base.CommandKey, reqId string) (*packet.CmdDataPacketType, bool, error) {
|
|
||||||
t.Lock.Lock()
|
|
||||||
entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId)
|
|
||||||
t.Lock.Unlock()
|
|
||||||
if !foundPos {
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
dataPacket, dataErr := t.makeCmdDataPacket(entry, pos)
|
|
||||||
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId)
|
|
||||||
if !foundPos {
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
// pos was updated between first and second get, throw out data-packet and re-run
|
|
||||||
if pos.TailPtyPos != dataPacket.PtyPos || pos.TailRunPos != dataPacket.RunPos {
|
|
||||||
return nil, true, nil
|
|
||||||
}
|
|
||||||
if dataErr != nil {
|
|
||||||
// error, so return error packet, and stop running
|
|
||||||
pos.Running = false
|
|
||||||
t.updateTailPos_nolock(key, reqId, pos)
|
|
||||||
return nil, false, dataErr
|
|
||||||
}
|
|
||||||
pos.TailPtyPos += int64(dataPacket.PtyDataLen)
|
|
||||||
pos.TailRunPos += int64(dataPacket.RunDataLen)
|
|
||||||
if pos.IsCurrent(entry) {
|
|
||||||
// we caught up, tail position equals file length
|
|
||||||
pos.Running = false
|
|
||||||
}
|
|
||||||
t.updateTailPos_nolock(key, reqId, pos)
|
|
||||||
return dataPacket, pos.Running, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns (removed)
|
|
||||||
func (t *Tailer) checkRemove(cmdKey base.CommandKey, reqId string) bool {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
entry, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId)
|
|
||||||
if !foundPos {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !pos.IsCurrent(entry) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !pos.Follow || entry.Done {
|
|
||||||
t.removeTailPos_nolock(cmdKey, reqId)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) RunDataTransfer(key base.CommandKey, reqId string) {
|
|
||||||
for {
|
|
||||||
dataPacket, keepRunning, err := t.runSingleDataTransfer(key, reqId)
|
|
||||||
if dataPacket != nil {
|
|
||||||
t.Sender.SendPacket(dataPacket)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.removeTailPos(key, reqId)
|
|
||||||
t.Sender.SendErrorResponse(reqId, err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !keepRunning {
|
|
||||||
removed := t.checkRemove(key, reqId)
|
|
||||||
if removed {
|
|
||||||
t.Sender.SendResponse(reqId, true)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) {
|
|
||||||
if pos.Running {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if pos.IsCurrent(entry) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pos.Running = true
|
|
||||||
t.updateTailPos_nolock(entry.CmdKey, pos.ReqId, pos)
|
|
||||||
go t.RunDataTransfer(entry.CmdKey, pos.ReqId)
|
|
||||||
}
|
|
||||||
|
|
||||||
var updateFileRe = regexp.MustCompile("/([a-z0-9-]+)/([a-z0-9-]+)\\.(ptyout|runout)$")
|
|
||||||
|
|
||||||
func (t *Tailer) updateFile(relFileName string) {
|
|
||||||
m := updateFileRe.FindStringSubmatch(relFileName)
|
|
||||||
if m == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
finfo, err := os.Stat(relFileName)
|
|
||||||
if err != nil {
|
|
||||||
t.Sender.SendPacket(packet.FmtMessagePacket("error trying to stat file '%s': %v", relFileName, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cmdKey := base.MakeCommandKey(m[1], m[2])
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
entry, foundEntry := t.WatchList[cmdKey]
|
|
||||||
if !foundEntry {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fileType := m[3]
|
|
||||||
if fileType == FileTypePty {
|
|
||||||
entry.FilePtyLen = finfo.Size()
|
|
||||||
} else if fileType == FileTypeRun {
|
|
||||||
entry.FileRunLen = finfo.Size()
|
|
||||||
}
|
|
||||||
t.WatchList[cmdKey] = entry
|
|
||||||
for _, pos := range entry.Tails {
|
|
||||||
t.tryStartRun_nolock(entry, pos)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) Run() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-t.Watcher.Events:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if event.Op&fsnotify.Write == fsnotify.Write {
|
|
||||||
t.updateFile(event.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
case err, ok := <-t.Watcher.Errors:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// what to do with this error? just send a message
|
|
||||||
t.Sender.SendPacket(packet.FmtMessagePacket("error in tailer: %v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) Close() error {
|
|
||||||
return t.Watcher.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func max(v1 int64, v2 int64) int64 {
|
|
||||||
if v1 > v2 {
|
|
||||||
return v1
|
|
||||||
}
|
|
||||||
return v2
|
|
||||||
}
|
|
||||||
|
|
||||||
func (entry *CmdWatchEntry) fillFilePos(gen FileNameGenerator) {
|
|
||||||
ptyInfo, _ := os.Stat(gen.PtyOutFile(entry.CmdKey))
|
|
||||||
if ptyInfo != nil {
|
|
||||||
entry.FilePtyLen = ptyInfo.Size()
|
|
||||||
}
|
|
||||||
runoutInfo, _ := os.Stat(gen.RunOutFile(entry.CmdKey))
|
|
||||||
if runoutInfo != nil {
|
|
||||||
entry.FileRunLen = runoutInfo.Size()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) KeyDone(key base.CommandKey) {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
entry, foundEntry := t.WatchList[key]
|
|
||||||
if !foundEntry {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
entry.Done = true
|
|
||||||
var newTails []TailPos
|
|
||||||
for _, pos := range entry.Tails {
|
|
||||||
if pos.IsCurrent(entry) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newTails = append(newTails, pos)
|
|
||||||
}
|
|
||||||
entry.Tails = newTails
|
|
||||||
t.WatchList[key] = entry
|
|
||||||
if len(entry.Tails) == 0 {
|
|
||||||
t.removeWatch_nolock(key)
|
|
||||||
}
|
|
||||||
t.WatchList[key] = entry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) RemoveWatch(pk *packet.UntailCmdPacketType) {
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
t.removeTailPos_nolock(pk.CK, pk.ReqId)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Tailer) AddFileWatches_nolock(key base.CommandKey, ptyOnly bool) error {
|
|
||||||
ptyName := t.Gen.PtyOutFile(key)
|
|
||||||
runName := t.Gen.RunOutFile(key)
|
|
||||||
fmt.Printf("WATCH> add %s\n", ptyName)
|
|
||||||
err := t.Watcher.Add(ptyName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if ptyOnly {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err = t.Watcher.Add(runName)
|
|
||||||
if err != nil {
|
|
||||||
t.Watcher.Remove(ptyName) // best effort clean up
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns (up-to-date/done, error)
|
|
||||||
func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) (bool, error) {
|
|
||||||
if err := getPacket.CK.Validate("getcmd"); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if getPacket.ReqId == "" {
|
|
||||||
return false, fmt.Errorf("getcmd, no reqid specified")
|
|
||||||
}
|
|
||||||
t.Lock.Lock()
|
|
||||||
defer t.Lock.Unlock()
|
|
||||||
key := getPacket.CK
|
|
||||||
entry, foundEntry := t.WatchList[key]
|
|
||||||
if !foundEntry {
|
|
||||||
// initialize entry, add watches
|
|
||||||
entry = CmdWatchEntry{CmdKey: key}
|
|
||||||
entry.fillFilePos(t.Gen)
|
|
||||||
}
|
|
||||||
pos, foundPos := entry.getTailPos(getPacket.ReqId)
|
|
||||||
if !foundPos {
|
|
||||||
// initialize a new tailpos
|
|
||||||
pos = TailPos{ReqId: getPacket.ReqId}
|
|
||||||
}
|
|
||||||
// update tailpos with new values from getpacket
|
|
||||||
pos.TailPtyPos = getPacket.PtyPos
|
|
||||||
pos.TailRunPos = getPacket.RunPos
|
|
||||||
pos.Follow = getPacket.Tail
|
|
||||||
// convert negative pos to positive
|
|
||||||
if pos.TailPtyPos < 0 {
|
|
||||||
pos.TailPtyPos = max(0, entry.FilePtyLen+pos.TailPtyPos) // + because negative
|
|
||||||
}
|
|
||||||
if pos.TailRunPos < 0 {
|
|
||||||
pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative
|
|
||||||
}
|
|
||||||
entry.updateTailPos(pos.ReqId, pos)
|
|
||||||
if !pos.Follow && pos.IsCurrent(entry) {
|
|
||||||
// don't add to t.WatchList, don't t.AddFileWatches_nolock, send rpc response
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
if !foundEntry {
|
|
||||||
err := t.AddFileWatches_nolock(key, getPacket.PtyOnly)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.WatchList[key] = entry
|
|
||||||
t.tryStartRun_nolock(entry, pos)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
@ -43,12 +43,10 @@ const (
|
|||||||
DataEndPacketStr = "dataend"
|
DataEndPacketStr = "dataend"
|
||||||
ResponsePacketStr = "resp" // rpc-response
|
ResponsePacketStr = "resp" // rpc-response
|
||||||
DonePacketStr = "done"
|
DonePacketStr = "done"
|
||||||
CmdErrorPacketStr = "cmderror" // command
|
|
||||||
MessagePacketStr = "message"
|
MessagePacketStr = "message"
|
||||||
GetCmdPacketStr = "getcmd" // rpc
|
GetCmdPacketStr = "getcmd" // rpc
|
||||||
UntailCmdPacketStr = "untailcmd" // rpc
|
UntailCmdPacketStr = "untailcmd" // rpc
|
||||||
CdPacketStr = "cd" // rpc
|
CdPacketStr = "cd" // rpc
|
||||||
CmdDataPacketStr = "cmddata" // rpc-response
|
|
||||||
RawPacketStr = "raw"
|
RawPacketStr = "raw"
|
||||||
SpecialInputPacketStr = "sinput" // command
|
SpecialInputPacketStr = "sinput" // command
|
||||||
CompGenPacketStr = "compgen" // rpc
|
CompGenPacketStr = "compgen" // rpc
|
||||||
@ -90,7 +88,6 @@ func init() {
|
|||||||
TypeStrToFactory[PingPacketStr] = reflect.TypeOf(PingPacketType{})
|
TypeStrToFactory[PingPacketStr] = reflect.TypeOf(PingPacketType{})
|
||||||
TypeStrToFactory[ResponsePacketStr] = reflect.TypeOf(ResponsePacketType{})
|
TypeStrToFactory[ResponsePacketStr] = reflect.TypeOf(ResponsePacketType{})
|
||||||
TypeStrToFactory[DonePacketStr] = reflect.TypeOf(DonePacketType{})
|
TypeStrToFactory[DonePacketStr] = reflect.TypeOf(DonePacketType{})
|
||||||
TypeStrToFactory[CmdErrorPacketStr] = reflect.TypeOf(CmdErrorPacketType{})
|
|
||||||
TypeStrToFactory[MessagePacketStr] = reflect.TypeOf(MessagePacketType{})
|
TypeStrToFactory[MessagePacketStr] = reflect.TypeOf(MessagePacketType{})
|
||||||
TypeStrToFactory[CmdStartPacketStr] = reflect.TypeOf(CmdStartPacketType{})
|
TypeStrToFactory[CmdStartPacketStr] = reflect.TypeOf(CmdStartPacketType{})
|
||||||
TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{})
|
TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{})
|
||||||
@ -98,7 +95,6 @@ func init() {
|
|||||||
TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{})
|
TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{})
|
||||||
TypeStrToFactory[InitPacketStr] = reflect.TypeOf(InitPacketType{})
|
TypeStrToFactory[InitPacketStr] = reflect.TypeOf(InitPacketType{})
|
||||||
TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{})
|
TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{})
|
||||||
TypeStrToFactory[CmdDataPacketStr] = reflect.TypeOf(CmdDataPacketType{})
|
|
||||||
TypeStrToFactory[RawPacketStr] = reflect.TypeOf(RawPacketType{})
|
TypeStrToFactory[RawPacketStr] = reflect.TypeOf(RawPacketType{})
|
||||||
TypeStrToFactory[SpecialInputPacketStr] = reflect.TypeOf(SpecialInputPacketType{})
|
TypeStrToFactory[SpecialInputPacketStr] = reflect.TypeOf(SpecialInputPacketType{})
|
||||||
TypeStrToFactory[DataPacketStr] = reflect.TypeOf(DataPacketType{})
|
TypeStrToFactory[DataPacketStr] = reflect.TypeOf(DataPacketType{})
|
||||||
@ -128,7 +124,6 @@ func init() {
|
|||||||
|
|
||||||
var _ RpcResponsePacketType = (*CmdStartPacketType)(nil)
|
var _ RpcResponsePacketType = (*CmdStartPacketType)(nil)
|
||||||
var _ RpcResponsePacketType = (*ResponsePacketType)(nil)
|
var _ RpcResponsePacketType = (*ResponsePacketType)(nil)
|
||||||
var _ RpcResponsePacketType = (*CmdDataPacketType)(nil)
|
|
||||||
var _ RpcResponsePacketType = (*StreamFileResponseType)(nil)
|
var _ RpcResponsePacketType = (*StreamFileResponseType)(nil)
|
||||||
var _ RpcResponsePacketType = (*FileDataPacketType)(nil)
|
var _ RpcResponsePacketType = (*FileDataPacketType)(nil)
|
||||||
var _ RpcResponsePacketType = (*WriteFileReadyPacketType)(nil)
|
var _ RpcResponsePacketType = (*WriteFileReadyPacketType)(nil)
|
||||||
@ -155,36 +150,6 @@ func MakePacket(packetType string) (PacketType, error) {
|
|||||||
return rtn.Interface().(PacketType), nil
|
return rtn.Interface().(PacketType), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type CmdDataPacketType struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
RespId string `json:"respid"`
|
|
||||||
CK base.CommandKey `json:"ck"`
|
|
||||||
PtyPos int64 `json:"ptypos"`
|
|
||||||
PtyLen int64 `json:"ptylen"`
|
|
||||||
RunPos int64 `json:"runpos"`
|
|
||||||
RunLen int64 `json:"runlen"`
|
|
||||||
PtyData64 string `json:"ptydata64"`
|
|
||||||
PtyDataLen int `json:"ptydatalen"`
|
|
||||||
RunData64 string `json:"rundata64"`
|
|
||||||
RunDataLen int `json:"rundatalen"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*CmdDataPacketType) GetType() string {
|
|
||||||
return CmdDataPacketStr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *CmdDataPacketType) GetResponseId() string {
|
|
||||||
return p.RespId
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*CmdDataPacketType) GetResponseDone() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeCmdDataPacket(reqId string) *CmdDataPacketType {
|
|
||||||
return &CmdDataPacketType{Type: CmdDataPacketStr, RespId: reqId}
|
|
||||||
}
|
|
||||||
|
|
||||||
type PingPacketType struct {
|
type PingPacketType struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
}
|
}
|
||||||
@ -830,28 +795,6 @@ type BarePacketType struct {
|
|||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CmdErrorPacketType struct {
|
|
||||||
Type string `json:"type"`
|
|
||||||
CK base.CommandKey `json:"ck"`
|
|
||||||
Error string `json:"error"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*CmdErrorPacketType) GetType() string {
|
|
||||||
return CmdErrorPacketStr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *CmdErrorPacketType) GetCK() base.CommandKey {
|
|
||||||
return p.CK
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *CmdErrorPacketType) String() string {
|
|
||||||
return fmt.Sprintf("error[%s]", p.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeCmdErrorPacket(ck base.CommandKey, err error) *CmdErrorPacketType {
|
|
||||||
return &CmdErrorPacketType{Type: CmdErrorPacketStr, CK: ck, Error: err.Error()}
|
|
||||||
}
|
|
||||||
|
|
||||||
type WriteFilePacketType struct {
|
type WriteFilePacketType struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
ReqId string `json:"reqid"`
|
ReqId string `json:"reqid"`
|
||||||
@ -1074,10 +1017,6 @@ func SendPacket(w io.Writer, packet PacketType) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SendCmdError(w io.Writer, ck base.CommandKey, err error) error {
|
|
||||||
return SendPacket(w, MakeCmdErrorPacket(ck, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
type PacketSender struct {
|
type PacketSender struct {
|
||||||
Lock *sync.Mutex
|
Lock *sync.Mutex
|
||||||
SendCh chan PacketType
|
SendCh chan PacketType
|
||||||
@ -1197,10 +1136,6 @@ func (sender *PacketSender) SendPacket(pk PacketType) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sender *PacketSender) SendCmdError(ck base.CommandKey, err error) error {
|
|
||||||
return sender.SendPacket(MakeCmdErrorPacket(ck, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sender *PacketSender) SendErrorResponse(reqId string, err error) error {
|
func (sender *PacketSender) SendErrorResponse(reqId string, err error) error {
|
||||||
pk := MakeErrorResponsePacket(reqId, err)
|
pk := MakeErrorResponsePacket(reqId, err)
|
||||||
return sender.SendPacket(pk)
|
return sender.SendPacket(pk)
|
||||||
@ -1222,17 +1157,13 @@ type UnknownPacketReporter interface {
|
|||||||
type DefaultUPR struct{}
|
type DefaultUPR struct{}
|
||||||
|
|
||||||
func (DefaultUPR) UnknownPacket(pk PacketType) {
|
func (DefaultUPR) UnknownPacket(pk PacketType) {
|
||||||
if pk.GetType() == CmdErrorPacketStr {
|
if pk.GetType() == RawPacketStr {
|
||||||
errPacket := pk.(*CmdErrorPacketType)
|
|
||||||
// at this point, just send the error packet to stderr rather than try to do something special
|
|
||||||
fmt.Fprintf(os.Stderr, "[error] %s\n", errPacket.Error)
|
|
||||||
} else if pk.GetType() == RawPacketStr {
|
|
||||||
rawPacket := pk.(*RawPacketType)
|
rawPacket := pk.(*RawPacketType)
|
||||||
fmt.Fprintf(os.Stderr, "%s\n", rawPacket.Data)
|
fmt.Fprintf(os.Stderr, "%s\n", rawPacket.Data)
|
||||||
} else if pk.GetType() == CmdStartPacketStr {
|
} else if pk.GetType() == CmdStartPacketStr {
|
||||||
return // do nothing
|
return // do nothing
|
||||||
} else {
|
} else {
|
||||||
fmt.Fprintf(os.Stderr, "[error] invalid packet received '%s'", AsExtType(pk))
|
wlog.Logf("[upr] invalid packet received '%s'", AsExtType(pk))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ func (m *MServer) ProcessCommandPacket(pk packet.CommandPacketType) {
|
|||||||
cproc := m.ClientMap[ck]
|
cproc := m.ClientMap[ck]
|
||||||
m.Lock.Unlock()
|
m.Lock.Unlock()
|
||||||
if cproc == nil {
|
if cproc == nil {
|
||||||
m.Sender.SendCmdError(ck, fmt.Errorf("no client proc for ck '%s', pk=%s", ck, packet.AsString(pk)))
|
wlog.Logf("no client proc for ck %q, pk=%s", ck, packet.AsString(pk))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cproc.Input.SendPacket(pk)
|
cproc.Input.SendPacket(pk)
|
||||||
|
@ -1069,106 +1069,6 @@ func copyToCirFile(dest *cirfile.File, src io.Reader) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *ShExecType) DetachedWait(startPacket *packet.CmdStartPacketType) {
|
|
||||||
// after Start(), any output/errors must go to DetachedOutput
|
|
||||||
// close stdin, redirect stdout/stderr to /dev/null, but wait for cmdstart packet to get sent
|
|
||||||
cmd.DetachedOutput.SendPacket(startPacket)
|
|
||||||
err := os.Stdin.Close()
|
|
||||||
if err != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot close stdin: %w", err))
|
|
||||||
}
|
|
||||||
err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stdout.Fd()))
|
|
||||||
if err != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err))
|
|
||||||
}
|
|
||||||
err = unix.Dup2(int(cmd.RunnerOutFd.Fd()), int(os.Stderr.Fd()))
|
|
||||||
if err != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot dup2 stdin to runout: %w", err))
|
|
||||||
}
|
|
||||||
ptyOutFile, err := cirfile.CreateCirFile(cmd.FileNames.PtyOutFile, cmd.MaxPtySize)
|
|
||||||
if err != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("cannot open ptyout file '%s': %w", cmd.FileNames.PtyOutFile, err))
|
|
||||||
// don't return (command is already running)
|
|
||||||
}
|
|
||||||
ptyCopyDone := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
// copy pty output to .ptyout file
|
|
||||||
defer close(ptyCopyDone)
|
|
||||||
defer ptyOutFile.Close()
|
|
||||||
copyErr := copyToCirFile(ptyOutFile, cmd.CmdPty)
|
|
||||||
if copyErr != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("copying pty output to ptyout file: %w", copyErr))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
go func() {
|
|
||||||
// copy .stdin fifo contents to pty input
|
|
||||||
copyFifoErr := MakeAndCopyStdinFifo(cmd.CmdPty, cmd.FileNames.StdinFifo)
|
|
||||||
if copyFifoErr != nil {
|
|
||||||
cmd.DetachedOutput.SendCmdError(cmd.CK, fmt.Errorf("reading from stdin fifo: %w", copyFifoErr))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
donePacket := cmd.WaitForCommand()
|
|
||||||
cmd.DetachedOutput.SendPacket(donePacket)
|
|
||||||
<-ptyCopyDone
|
|
||||||
cmd.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) {
|
|
||||||
sapi, err := shellapi.MakeShellApi(pk.ShellType)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
fileNames, err := base.GetCommandFileNames(pk.CK)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
runOutInfo, err := os.Stat(fileNames.RunnerOutFile)
|
|
||||||
if err == nil { // non-nil error will be caught by regular OpenFile below
|
|
||||||
// must have size 0
|
|
||||||
if runOutInfo.Size() != 0 {
|
|
||||||
return nil, nil, fmt.Errorf("cmdkey '%s' was already used (runout len=%d)", pk.CK, runOutInfo.Size())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cmdPty, cmdTty, err := pty.Open()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("opening new pty: %w", err)
|
|
||||||
}
|
|
||||||
pty.Setsize(cmdPty, GetWinsize(pk))
|
|
||||||
defer func() {
|
|
||||||
cmdTty.Close()
|
|
||||||
}()
|
|
||||||
cmd := MakeShExec(pk.CK, nil, sapi)
|
|
||||||
cmd.FileNames = fileNames
|
|
||||||
cmd.CmdPty = cmdPty
|
|
||||||
cmd.Detached = true
|
|
||||||
cmd.MaxPtySize = DefaultMaxPtySize
|
|
||||||
if pk.TermOpts != nil && pk.TermOpts.MaxPtySize > 0 {
|
|
||||||
cmd.MaxPtySize = base.BoundInt64(pk.TermOpts.MaxPtySize, MinMaxPtySize, MaxMaxPtySize)
|
|
||||||
}
|
|
||||||
cmd.RunnerOutFd, err = os.OpenFile(fileNames.RunnerOutFile, os.O_TRUNC|os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("cannot open runout file '%s': %w", fileNames.RunnerOutFile, err)
|
|
||||||
}
|
|
||||||
cmd.DetachedOutput = packet.MakePacketSender(cmd.RunnerOutFd, nil)
|
|
||||||
ecmd, err := MakeDetachedExecCmd(pk, cmdTty)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
cmd.Cmd = ecmd
|
|
||||||
SetupSignalsForDetach()
|
|
||||||
err = ecmd.Start()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("starting command: %w", err)
|
|
||||||
}
|
|
||||||
for _, fd := range ecmd.ExtraFiles {
|
|
||||||
if fd != cmdTty {
|
|
||||||
fd.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
startPacket := cmd.MakeCmdStartPacket(pk.ReqId)
|
|
||||||
return cmd, startPacket, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetExitCode(err error) int {
|
func GetExitCode(err error) int {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return 0
|
return 0
|
||||||
|
@ -2350,15 +2350,6 @@ func (msh *MShellProc) handleCmdFinalPacket(finalPk *packet.CmdFinalPacketType)
|
|||||||
scbus.MainUpdateBus.DoUpdate(update)
|
scbus.MainUpdateBus.DoUpdate(update)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO notify FE about cmd errors
|
|
||||||
func (msh *MShellProc) handleCmdErrorPacket(errPk *packet.CmdErrorPacketType) {
|
|
||||||
err := sstore.AppendCmdErrorPk(context.Background(), errPk)
|
|
||||||
if err != nil {
|
|
||||||
msh.WriteToPtyBuffer("cmderr> [remote %s] [error] adding cmderr: %v\n", msh.GetRemoteName(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (msh *MShellProc) ResetDataPos(ck base.CommandKey) {
|
func (msh *MShellProc) ResetDataPos(ck base.CommandKey) {
|
||||||
msh.DataPosMap.Delete(ck)
|
msh.DataPosMap.Delete(ck)
|
||||||
}
|
}
|
||||||
@ -2444,12 +2435,6 @@ func (msh *MShellProc) ProcessPackets() {
|
|||||||
// this is low priority though since most input is coming from keyboard and won't overflow this buffer
|
// this is low priority though since most input is coming from keyboard and won't overflow this buffer
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if pk.GetType() == packet.CmdDataPacketStr {
|
|
||||||
dataPacket := pk.(*packet.CmdDataPacketType)
|
|
||||||
go msh.WriteToPtyBuffer("cmd-data> [remote %s] [%s] pty=%d run=%d\n", msh.GetRemoteName(), dataPacket.CK, dataPacket.PtyDataLen, dataPacket.RunDataLen)
|
|
||||||
go pushStatusIndicatorUpdate(&dataPacket.CK, sstore.StatusIndicatorLevel_Output)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if pk.GetType() == packet.CmdDonePacketStr {
|
if pk.GetType() == packet.CmdDonePacketStr {
|
||||||
donePk := pk.(*packet.CmdDonePacketType)
|
donePk := pk.(*packet.CmdDonePacketType)
|
||||||
runCmdUpdateFn(donePk.CK, msh.makeHandleCmdDonePacketClosure(donePk))
|
runCmdUpdateFn(donePk.CK, msh.makeHandleCmdDonePacketClosure(donePk))
|
||||||
@ -2460,10 +2445,6 @@ func (msh *MShellProc) ProcessPackets() {
|
|||||||
runCmdUpdateFn(finalPk.CK, msh.makeHandleCmdFinalPacketClosure(finalPk))
|
runCmdUpdateFn(finalPk.CK, msh.makeHandleCmdFinalPacketClosure(finalPk))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if pk.GetType() == packet.CmdErrorPacketStr {
|
|
||||||
msh.handleCmdErrorPacket(pk.(*packet.CmdErrorPacketType))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if pk.GetType() == packet.MessagePacketStr {
|
if pk.GetType() == packet.MessagePacketStr {
|
||||||
msgPacket := pk.(*packet.MessagePacketType)
|
msgPacket := pk.(*packet.MessagePacketType)
|
||||||
msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPacket.CK, msgPacket.Message)
|
msh.WriteToPtyBuffer("msg> [remote %s] [%s] %s\n", msh.GetRemoteName(), msgPacket.CK, msgPacket.Message)
|
||||||
|
@ -991,18 +991,6 @@ func UpdateCmdRtnState(ctx context.Context, ck base.CommandKey, statePtr ShellSt
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func AppendCmdErrorPk(ctx context.Context, errPk *packet.CmdErrorPacketType) error {
|
|
||||||
if errPk == nil || errPk.CK.IsEmpty() {
|
|
||||||
return fmt.Errorf("invalid cmderror packet (no ck)")
|
|
||||||
}
|
|
||||||
screenId := errPk.CK.GetGroupId()
|
|
||||||
return WithTx(ctx, func(tx *TxWrap) error {
|
|
||||||
query := `UPDATE cmd SET runout = json_insert(runout, '$[#]', ?) WHERE screenid = ? AND lineid = ?`
|
|
||||||
tx.Exec(query, quickJson(errPk), screenId, lineIdFromCK(errPk.CK))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func ReInitFocus(ctx context.Context) error {
|
func ReInitFocus(ctx context.Context) error {
|
||||||
return WithTx(ctx, func(tx *TxWrap) error {
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
query := `UPDATE screen SET focustype = 'input'`
|
query := `UPDATE screen SET focustype = 'input'`
|
||||||
|
Loading…
Reference in New Issue
Block a user