From dc4baaea27c00f022340e7e547895d40fac42b4f Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 16 Jun 2022 17:24:29 -0700 Subject: [PATCH] cmdtail now can support multiple tails on the same cmd with independent tail positions --- main-runner.go | 1 - pkg/cmdtail/cmdtail.go | 190 +++++++++++++++++++++++++++++------------ pkg/packet/packet.go | 43 +++++++--- 3 files changed, 166 insertions(+), 68 deletions(-) diff --git a/main-runner.go b/main-runner.go index c9b9fbe70..a74bcf034 100644 --- a/main-runner.go +++ b/main-runner.go @@ -127,7 +127,6 @@ func doMainRun(pk *packet.RunPacketType, sender *packet.PacketSender) { } func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error { - // non-tail packets? err := tailer.AddWatch(pk) if err != nil { return err diff --git a/pkg/cmdtail/cmdtail.go b/pkg/cmdtail/cmdtail.go index e09408697..b43e044d1 100644 --- a/pkg/cmdtail/cmdtail.go +++ b/pkg/cmdtail/cmdtail.go @@ -21,17 +21,52 @@ import ( const MaxDataBytes = 4096 type TailPos struct { - CmdKey CmdKey + ReqId string Running bool // an active tailer sending data - FilePtyLen int64 - FileRunLen int64 TailPtyPos int64 TailRunPos int64 Follow bool } -func (pos TailPos) IsCurrent() bool { - return pos.TailPtyPos >= pos.FilePtyLen && pos.TailRunPos >= pos.FileRunLen +type CmdWatchEntry struct { + CmdKey CmdKey + FilePtyLen int64 + FileRunLen int64 + Tails []TailPos +} + +func (w CmdWatchEntry) getTailPos(reqId string) (TailPos, bool) { + for _, pos := range w.Tails { + if pos.ReqId == reqId { + return pos, true + } + } + return TailPos{}, false +} + +func (w *CmdWatchEntry) updateTailPos(reqId string, pos TailPos) { + for idx, pos := range w.Tails { + if pos.ReqId == reqId { + w.Tails[idx] = pos + return + } + } + w.Tails = append(w.Tails, pos) +} + +func (w *CmdWatchEntry) removeTailPos(reqId string) { + var newTails []TailPos + for _, pos := range w.Tails { + if pos.ReqId == reqId { + continue + } + newTails = append(newTails, pos) + } + w.Tails = newTails +} + +func (pos TailPos) IsCurrent(entry CmdWatchEntry) bool { + return pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen } type CmdKey struct { @@ -41,12 +76,43 @@ type CmdKey struct { type Tailer struct { Lock *sync.Mutex - WatchList map[CmdKey]TailPos + WatchList map[CmdKey]CmdWatchEntry ScHomeDir string Watcher *SessionWatcher SendCh chan packet.PacketType } +func (t *Tailer) updateTailPos_nolock(cmdKey CmdKey, reqId string, pos TailPos) { + entry, found := t.WatchList[cmdKey] + if !found { + return + } + entry.updateTailPos(reqId, pos) + t.WatchList[cmdKey] = entry +} + +func (t *Tailer) updateEntrySizes_nolock(cmdKey CmdKey, ptyLen int64, runLen int64) { + entry, found := t.WatchList[cmdKey] + if !found { + return + } + entry.FilePtyLen = ptyLen + entry.FileRunLen = runLen + t.WatchList[cmdKey] = entry +} + +func (t *Tailer) getEntryAndPos_nolock(cmdKey CmdKey, reqId string) (CmdWatchEntry, TailPos, bool) { + entry, found := t.WatchList[cmdKey] + if !found { + return CmdWatchEntry{}, TailPos{}, false + } + pos, found := entry.getTailPos(reqId) + if !found { + return CmdWatchEntry{}, TailPos{}, false + } + return entry, pos, true +} + func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) { scHomeDir, err := base.GetScHomeDir() if err != nil { @@ -54,7 +120,7 @@ func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) { } rtn := &Tailer{ Lock: &sync.Mutex{}, - WatchList: make(map[CmdKey]TailPos), + WatchList: make(map[CmdKey]CmdWatchEntry), ScHomeDir: scHomeDir, SendCh: sendCh, } @@ -79,45 +145,48 @@ func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]b return buf[0:nr], nil } -func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, pos TailPos) *packet.CmdDataPacketType { +func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, entry CmdWatchEntry, pos TailPos) *packet.CmdDataPacketType { dataPacket := packet.MakeCmdDataPacket() - dataPacket.SessionId = pos.CmdKey.SessionId - dataPacket.CmdId = pos.CmdKey.CmdId + dataPacket.ReqId = pos.ReqId + dataPacket.SessionId = entry.CmdKey.SessionId + dataPacket.CmdId = entry.CmdKey.CmdId dataPacket.PtyPos = pos.TailPtyPos dataPacket.RunPos = pos.TailRunPos - if pos.FilePtyLen > pos.TailPtyPos { + if entry.FilePtyLen > pos.TailPtyPos { ptyData, err := t.readDataFromFile(fileNames.PtyOutFile, pos.TailPtyPos, MaxDataBytes) if err != nil { dataPacket.Error = err.Error() return dataPacket } dataPacket.PtyData = string(ptyData) + dataPacket.PtyDataLen = len(ptyData) } - if pos.FileRunLen > pos.TailRunPos { + if entry.FileRunLen > pos.TailRunPos { runData, err := t.readDataFromFile(fileNames.RunnerOutFile, pos.TailRunPos, MaxDataBytes) if err != nil { dataPacket.Error = err.Error() return dataPacket } dataPacket.RunData = string(runData) + dataPacket.RunDataLen = len(runData) } return dataPacket } // returns (data-packet, keepRunning) -func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, bool) { +func (t *Tailer) runSingleDataTransfer(key CmdKey, reqId string) (*packet.CmdDataPacketType, bool) { t.Lock.Lock() - pos, foundPos := t.WatchList[key] + entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId) t.Lock.Unlock() if !foundPos { return nil, false } fileNames := base.MakeCommandFileNamesWithHome(t.ScHomeDir, key.SessionId, key.CmdId) - dataPacket := t.makeCmdDataPacket(fileNames, pos) + dataPacket := t.makeCmdDataPacket(fileNames, entry, pos) t.Lock.Lock() defer t.Lock.Unlock() - pos, foundPos = t.WatchList[key] + entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId) if !foundPos { return nil, false } @@ -128,45 +197,44 @@ func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, b if dataPacket.Error != "" { // error, so return error packet, and stop running pos.Running = false - t.WatchList[key] = pos + t.updateTailPos_nolock(key, reqId, pos) return dataPacket, false } pos.TailPtyPos += int64(len(dataPacket.PtyData)) pos.TailRunPos += int64(len(dataPacket.RunData)) - if pos.TailPtyPos > pos.FilePtyLen { - pos.FilePtyLen = pos.TailPtyPos - } - if pos.TailRunPos > pos.FileRunLen { - pos.FileRunLen = pos.TailRunPos - } - if pos.TailPtyPos >= pos.FilePtyLen && pos.TailRunPos >= pos.FileRunLen { + if pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen { // we caught up, tail position equals file length pos.Running = false } - t.WatchList[key] = pos + t.updateTailPos_nolock(key, reqId, pos) return dataPacket, pos.Running } -func (t *Tailer) checkRemoveNoFollow(cmdKey CmdKey) { +func (t *Tailer) checkRemoveNoFollow(cmdKey CmdKey, reqId string) { t.Lock.Lock() defer t.Lock.Unlock() - pos, foundPos := t.WatchList[cmdKey] + entry, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId) if !foundPos { return } if !pos.Follow { - delete(t.WatchList, cmdKey) + entry.removeTailPos(reqId) + if len(entry.Tails) == 0 { + delete(t.WatchList, cmdKey) + } else { + t.WatchList[cmdKey] = entry + } } } -func (t *Tailer) RunDataTransfer(key CmdKey) { +func (t *Tailer) RunDataTransfer(key CmdKey, reqId string) { for { - dataPacket, keepRunning := t.runSingleDataTransfer(key) + dataPacket, keepRunning := t.runSingleDataTransfer(key, reqId) if dataPacket != nil { t.SendCh <- dataPacket } if !keepRunning { - t.checkRemoveNoFollow(key) + t.checkRemoveNoFollow(key, reqId) break } time.Sleep(10 * time.Millisecond) @@ -174,13 +242,13 @@ func (t *Tailer) RunDataTransfer(key CmdKey) { } // should already hold t.Lock -func (t *Tailer) tryStartRun_nolock(pos TailPos) { - if pos.Running || pos.IsCurrent() { +func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) { + if pos.Running || pos.IsCurrent(entry) { return } pos.Running = true - t.WatchList[pos.CmdKey] = pos - go t.RunDataTransfer(pos.CmdKey) + t.updateTailPos_nolock(entry.CmdKey, pos.ReqId, pos) + go t.RunDataTransfer(entry.CmdKey, pos.ReqId) } func (t *Tailer) updateFile(event FileUpdateEvent) { @@ -191,17 +259,19 @@ func (t *Tailer) updateFile(event FileUpdateEvent) { cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId} t.Lock.Lock() defer t.Lock.Unlock() - pos, foundPos := t.WatchList[cmdKey] - if !foundPos { + entry, foundEntry := t.WatchList[cmdKey] + if !foundEntry { return } if event.FileType == FileTypePty { - pos.FilePtyLen = event.Size + entry.FilePtyLen = event.Size } else if event.FileType == FileTypeRun { - pos.FileRunLen = event.Size + entry.FileRunLen = event.Size + } + t.WatchList[cmdKey] = entry + for _, pos := range entry.Tails { + t.tryStartRun_nolock(entry, pos) } - t.WatchList[cmdKey] = pos - t.tryStartRun_nolock(pos) } func (t *Tailer) Run() error { @@ -221,22 +291,15 @@ func max(v1 int64, v2 int64) int64 { return v2 } -// also converts negative positions to positive positions -func (tp *TailPos) fillFilePos(scHomeDir string) { - fileNames := base.MakeCommandFileNamesWithHome(scHomeDir, tp.CmdKey.SessionId, tp.CmdKey.CmdId) +func (entry *CmdWatchEntry) fillFilePos(scHomeDir string) { + fileNames := base.MakeCommandFileNamesWithHome(scHomeDir, entry.CmdKey.SessionId, entry.CmdKey.CmdId) ptyInfo, _ := os.Stat(fileNames.PtyOutFile) if ptyInfo != nil { - tp.FilePtyLen = ptyInfo.Size() - } - if tp.TailPtyPos < 0 { - tp.TailPtyPos = max(0, tp.FilePtyLen-tp.TailPtyPos) + entry.FilePtyLen = ptyInfo.Size() } runoutInfo, _ := os.Stat(fileNames.RunnerOutFile) if runoutInfo != nil { - tp.FileRunLen = runoutInfo.Size() - } - if tp.TailRunPos < 0 { - tp.TailRunPos = max(0, tp.FileRunLen-tp.TailRunPos) + entry.FileRunLen = runoutInfo.Size() } } @@ -252,6 +315,9 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error { if err != nil { return fmt.Errorf("getcmd, bad cmdid '%s': %w", getPacket.CmdId, err) } + if getPacket.ReqId == "" { + return fmt.Errorf("getcmd, no reqid specified") + } t.Lock.Lock() defer t.Lock.Unlock() key := CmdKey{getPacket.SessionId, getPacket.CmdId} @@ -259,9 +325,21 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error { if err != nil { return fmt.Errorf("error trying to watch sesion '%s': %v", getPacket.SessionId, err) } - pos := TailPos{CmdKey: key, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Follow: getPacket.Tail} - pos.fillFilePos(t.ScHomeDir) - t.WatchList[key] = pos - t.tryStartRun_nolock(pos) + entry, foundEntry := t.WatchList[key] + if !foundEntry { + entry = CmdWatchEntry{CmdKey: key} + entry.fillFilePos(t.ScHomeDir) + } + pos := TailPos{ReqId: getPacket.ReqId, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Follow: getPacket.Tail} + // convert negative pos to positive + if pos.TailPtyPos < 0 { + pos.TailPtyPos = max(0, entry.FilePtyLen+pos.TailPtyPos) // + because negative + } + if pos.TailRunPos < 0 { + pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative + } + entry.updateTailPos(pos.ReqId, pos) + t.WatchList[key] = entry + t.tryStartRun_nolock(entry, pos) return nil } diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index 78f77cb02..dec400665 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -27,6 +27,7 @@ const CmdStartPacketStr = "cmdstart" const CmdDonePacketStr = "cmddone" const ListCmdPacketStr = "lscmd" const GetCmdPacketStr = "getcmd" +const UntailCmdPacketStr = "untailcmd" const RunnerInitPacketStr = "runnerinit" const CdPacketStr = "cd" const CdResponseStr = "cdresp" @@ -46,6 +47,7 @@ func init() { TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{}) TypeStrToFactory[ListCmdPacketStr] = reflect.TypeOf(ListCmdPacketType{}) TypeStrToFactory[GetCmdPacketStr] = reflect.TypeOf(GetCmdPacketType{}) + TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{}) TypeStrToFactory[RunnerInitPacketStr] = reflect.TypeOf(RunnerInitPacketType{}) TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{}) TypeStrToFactory[CdResponseStr] = reflect.TypeOf(CdResponseType{}) @@ -63,17 +65,20 @@ func MakePacket(packetType string) (PacketType, error) { } type CmdDataPacketType struct { - Type string `json:"type"` - SessionId string `json:"sessionid"` - CmdId string `json:"cmdid"` - PtyPos int64 `json:"ptypos"` - PtyLen int64 `json:"ptylen"` - RunPos int64 `json:"runpos"` - RunLen int64 `json:"runlen"` - PtyData string `json:"ptydata"` - RunData string `json:"rundata"` - Error string `json:"error"` - NotFound bool `json:"notfound,omitempty"` + Type string `json:"type"` + ReqId string `json:"reqid"` + SessionId string `json:"sessionid"` + CmdId string `json:"cmdid"` + PtyPos int64 `json:"ptypos"` + PtyLen int64 `json:"ptylen"` + RunPos int64 `json:"runpos"` + RunLen int64 `json:"runlen"` + PtyData string `json:"ptydata"` + PtyDataLen int `json:"ptydatalen"` + RunData string `json:"rundata"` + RunDataLen int `json:"rundatalen"` + Error string `json:"error"` + NotFound bool `json:"notfound,omitempty"` } func (*CmdDataPacketType) GetType() string { @@ -96,8 +101,24 @@ func MakePingPacket() *PingPacketType { return &PingPacketType{Type: PingPacketStr} } +type UntailCmdPacketType struct { + Type string `json:"type"` + ReqId string `json:"reqid"` + SessionId string `json:"sessionid"` + CmdId string `json:"cmdid"` +} + +func (*UntailCmdPacketType) GetType() string { + return UntailCmdPacketStr +} + +func MakeUntailCmdPacket() *UntailCmdPacketType { + return &UntailCmdPacketType{Type: UntailCmdPacketStr} +} + type GetCmdPacketType struct { Type string `json:"type"` + ReqId string `json:"reqid"` SessionId string `json:"sessionid"` CmdId string `json:"cmdid"` PtyPos int64 `json:"ptypos"`