cmdtail now can support multiple tails on the same cmd with independent tail positions

This commit is contained in:
sawka 2022-06-16 17:24:29 -07:00
parent c6165f15f4
commit dc4baaea27
3 changed files with 166 additions and 68 deletions

View File

@ -127,7 +127,6 @@ func doMainRun(pk *packet.RunPacketType, sender *packet.PacketSender) {
} }
func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error { func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error {
// non-tail packets?
err := tailer.AddWatch(pk) err := tailer.AddWatch(pk)
if err != nil { if err != nil {
return err return err

View File

@ -21,17 +21,52 @@ import (
const MaxDataBytes = 4096 const MaxDataBytes = 4096
type TailPos struct { type TailPos struct {
CmdKey CmdKey ReqId string
Running bool // an active tailer sending data Running bool // an active tailer sending data
FilePtyLen int64
FileRunLen int64
TailPtyPos int64 TailPtyPos int64
TailRunPos int64 TailRunPos int64
Follow bool Follow bool
} }
func (pos TailPos) IsCurrent() bool { type CmdWatchEntry struct {
return pos.TailPtyPos >= pos.FilePtyLen && pos.TailRunPos >= pos.FileRunLen CmdKey CmdKey
FilePtyLen int64
FileRunLen int64
Tails []TailPos
}
func (w CmdWatchEntry) getTailPos(reqId string) (TailPos, bool) {
for _, pos := range w.Tails {
if pos.ReqId == reqId {
return pos, true
}
}
return TailPos{}, false
}
func (w *CmdWatchEntry) updateTailPos(reqId string, pos TailPos) {
for idx, pos := range w.Tails {
if pos.ReqId == reqId {
w.Tails[idx] = pos
return
}
}
w.Tails = append(w.Tails, pos)
}
func (w *CmdWatchEntry) removeTailPos(reqId string) {
var newTails []TailPos
for _, pos := range w.Tails {
if pos.ReqId == reqId {
continue
}
newTails = append(newTails, pos)
}
w.Tails = newTails
}
func (pos TailPos) IsCurrent(entry CmdWatchEntry) bool {
return pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen
} }
type CmdKey struct { type CmdKey struct {
@ -41,12 +76,43 @@ type CmdKey struct {
type Tailer struct { type Tailer struct {
Lock *sync.Mutex Lock *sync.Mutex
WatchList map[CmdKey]TailPos WatchList map[CmdKey]CmdWatchEntry
ScHomeDir string ScHomeDir string
Watcher *SessionWatcher Watcher *SessionWatcher
SendCh chan packet.PacketType SendCh chan packet.PacketType
} }
func (t *Tailer) updateTailPos_nolock(cmdKey CmdKey, reqId string, pos TailPos) {
entry, found := t.WatchList[cmdKey]
if !found {
return
}
entry.updateTailPos(reqId, pos)
t.WatchList[cmdKey] = entry
}
func (t *Tailer) updateEntrySizes_nolock(cmdKey CmdKey, ptyLen int64, runLen int64) {
entry, found := t.WatchList[cmdKey]
if !found {
return
}
entry.FilePtyLen = ptyLen
entry.FileRunLen = runLen
t.WatchList[cmdKey] = entry
}
func (t *Tailer) getEntryAndPos_nolock(cmdKey CmdKey, reqId string) (CmdWatchEntry, TailPos, bool) {
entry, found := t.WatchList[cmdKey]
if !found {
return CmdWatchEntry{}, TailPos{}, false
}
pos, found := entry.getTailPos(reqId)
if !found {
return CmdWatchEntry{}, TailPos{}, false
}
return entry, pos, true
}
func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) { func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) {
scHomeDir, err := base.GetScHomeDir() scHomeDir, err := base.GetScHomeDir()
if err != nil { if err != nil {
@ -54,7 +120,7 @@ func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) {
} }
rtn := &Tailer{ rtn := &Tailer{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
WatchList: make(map[CmdKey]TailPos), WatchList: make(map[CmdKey]CmdWatchEntry),
ScHomeDir: scHomeDir, ScHomeDir: scHomeDir,
SendCh: sendCh, SendCh: sendCh,
} }
@ -79,45 +145,48 @@ func (t *Tailer) readDataFromFile(fileName string, pos int64, maxBytes int) ([]b
return buf[0:nr], nil return buf[0:nr], nil
} }
func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, pos TailPos) *packet.CmdDataPacketType { func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, entry CmdWatchEntry, pos TailPos) *packet.CmdDataPacketType {
dataPacket := packet.MakeCmdDataPacket() dataPacket := packet.MakeCmdDataPacket()
dataPacket.SessionId = pos.CmdKey.SessionId dataPacket.ReqId = pos.ReqId
dataPacket.CmdId = pos.CmdKey.CmdId dataPacket.SessionId = entry.CmdKey.SessionId
dataPacket.CmdId = entry.CmdKey.CmdId
dataPacket.PtyPos = pos.TailPtyPos dataPacket.PtyPos = pos.TailPtyPos
dataPacket.RunPos = pos.TailRunPos dataPacket.RunPos = pos.TailRunPos
if pos.FilePtyLen > pos.TailPtyPos { if entry.FilePtyLen > pos.TailPtyPos {
ptyData, err := t.readDataFromFile(fileNames.PtyOutFile, pos.TailPtyPos, MaxDataBytes) ptyData, err := t.readDataFromFile(fileNames.PtyOutFile, pos.TailPtyPos, MaxDataBytes)
if err != nil { if err != nil {
dataPacket.Error = err.Error() dataPacket.Error = err.Error()
return dataPacket return dataPacket
} }
dataPacket.PtyData = string(ptyData) dataPacket.PtyData = string(ptyData)
dataPacket.PtyDataLen = len(ptyData)
} }
if pos.FileRunLen > pos.TailRunPos { if entry.FileRunLen > pos.TailRunPos {
runData, err := t.readDataFromFile(fileNames.RunnerOutFile, pos.TailRunPos, MaxDataBytes) runData, err := t.readDataFromFile(fileNames.RunnerOutFile, pos.TailRunPos, MaxDataBytes)
if err != nil { if err != nil {
dataPacket.Error = err.Error() dataPacket.Error = err.Error()
return dataPacket return dataPacket
} }
dataPacket.RunData = string(runData) dataPacket.RunData = string(runData)
dataPacket.RunDataLen = len(runData)
} }
return dataPacket return dataPacket
} }
// returns (data-packet, keepRunning) // returns (data-packet, keepRunning)
func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, bool) { func (t *Tailer) runSingleDataTransfer(key CmdKey, reqId string) (*packet.CmdDataPacketType, bool) {
t.Lock.Lock() t.Lock.Lock()
pos, foundPos := t.WatchList[key] entry, pos, foundPos := t.getEntryAndPos_nolock(key, reqId)
t.Lock.Unlock() t.Lock.Unlock()
if !foundPos { if !foundPos {
return nil, false return nil, false
} }
fileNames := base.MakeCommandFileNamesWithHome(t.ScHomeDir, key.SessionId, key.CmdId) fileNames := base.MakeCommandFileNamesWithHome(t.ScHomeDir, key.SessionId, key.CmdId)
dataPacket := t.makeCmdDataPacket(fileNames, pos) dataPacket := t.makeCmdDataPacket(fileNames, entry, pos)
t.Lock.Lock() t.Lock.Lock()
defer t.Lock.Unlock() defer t.Lock.Unlock()
pos, foundPos = t.WatchList[key] entry, pos, foundPos = t.getEntryAndPos_nolock(key, reqId)
if !foundPos { if !foundPos {
return nil, false return nil, false
} }
@ -128,45 +197,44 @@ func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, b
if dataPacket.Error != "" { if dataPacket.Error != "" {
// error, so return error packet, and stop running // error, so return error packet, and stop running
pos.Running = false pos.Running = false
t.WatchList[key] = pos t.updateTailPos_nolock(key, reqId, pos)
return dataPacket, false return dataPacket, false
} }
pos.TailPtyPos += int64(len(dataPacket.PtyData)) pos.TailPtyPos += int64(len(dataPacket.PtyData))
pos.TailRunPos += int64(len(dataPacket.RunData)) pos.TailRunPos += int64(len(dataPacket.RunData))
if pos.TailPtyPos > pos.FilePtyLen { if pos.TailPtyPos >= entry.FilePtyLen && pos.TailRunPos >= entry.FileRunLen {
pos.FilePtyLen = pos.TailPtyPos
}
if pos.TailRunPos > pos.FileRunLen {
pos.FileRunLen = pos.TailRunPos
}
if pos.TailPtyPos >= pos.FilePtyLen && pos.TailRunPos >= pos.FileRunLen {
// we caught up, tail position equals file length // we caught up, tail position equals file length
pos.Running = false pos.Running = false
} }
t.WatchList[key] = pos t.updateTailPos_nolock(key, reqId, pos)
return dataPacket, pos.Running return dataPacket, pos.Running
} }
func (t *Tailer) checkRemoveNoFollow(cmdKey CmdKey) { func (t *Tailer) checkRemoveNoFollow(cmdKey CmdKey, reqId string) {
t.Lock.Lock() t.Lock.Lock()
defer t.Lock.Unlock() defer t.Lock.Unlock()
pos, foundPos := t.WatchList[cmdKey] entry, pos, foundPos := t.getEntryAndPos_nolock(cmdKey, reqId)
if !foundPos { if !foundPos {
return return
} }
if !pos.Follow { if !pos.Follow {
entry.removeTailPos(reqId)
if len(entry.Tails) == 0 {
delete(t.WatchList, cmdKey) delete(t.WatchList, cmdKey)
} else {
t.WatchList[cmdKey] = entry
}
} }
} }
func (t *Tailer) RunDataTransfer(key CmdKey) { func (t *Tailer) RunDataTransfer(key CmdKey, reqId string) {
for { for {
dataPacket, keepRunning := t.runSingleDataTransfer(key) dataPacket, keepRunning := t.runSingleDataTransfer(key, reqId)
if dataPacket != nil { if dataPacket != nil {
t.SendCh <- dataPacket t.SendCh <- dataPacket
} }
if !keepRunning { if !keepRunning {
t.checkRemoveNoFollow(key) t.checkRemoveNoFollow(key, reqId)
break break
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -174,13 +242,13 @@ func (t *Tailer) RunDataTransfer(key CmdKey) {
} }
// should already hold t.Lock // should already hold t.Lock
func (t *Tailer) tryStartRun_nolock(pos TailPos) { func (t *Tailer) tryStartRun_nolock(entry CmdWatchEntry, pos TailPos) {
if pos.Running || pos.IsCurrent() { if pos.Running || pos.IsCurrent(entry) {
return return
} }
pos.Running = true pos.Running = true
t.WatchList[pos.CmdKey] = pos t.updateTailPos_nolock(entry.CmdKey, pos.ReqId, pos)
go t.RunDataTransfer(pos.CmdKey) go t.RunDataTransfer(entry.CmdKey, pos.ReqId)
} }
func (t *Tailer) updateFile(event FileUpdateEvent) { func (t *Tailer) updateFile(event FileUpdateEvent) {
@ -191,17 +259,19 @@ func (t *Tailer) updateFile(event FileUpdateEvent) {
cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId} cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId}
t.Lock.Lock() t.Lock.Lock()
defer t.Lock.Unlock() defer t.Lock.Unlock()
pos, foundPos := t.WatchList[cmdKey] entry, foundEntry := t.WatchList[cmdKey]
if !foundPos { if !foundEntry {
return return
} }
if event.FileType == FileTypePty { if event.FileType == FileTypePty {
pos.FilePtyLen = event.Size entry.FilePtyLen = event.Size
} else if event.FileType == FileTypeRun { } else if event.FileType == FileTypeRun {
pos.FileRunLen = event.Size entry.FileRunLen = event.Size
}
t.WatchList[cmdKey] = entry
for _, pos := range entry.Tails {
t.tryStartRun_nolock(entry, pos)
} }
t.WatchList[cmdKey] = pos
t.tryStartRun_nolock(pos)
} }
func (t *Tailer) Run() error { func (t *Tailer) Run() error {
@ -221,22 +291,15 @@ func max(v1 int64, v2 int64) int64 {
return v2 return v2
} }
// also converts negative positions to positive positions func (entry *CmdWatchEntry) fillFilePos(scHomeDir string) {
func (tp *TailPos) fillFilePos(scHomeDir string) { fileNames := base.MakeCommandFileNamesWithHome(scHomeDir, entry.CmdKey.SessionId, entry.CmdKey.CmdId)
fileNames := base.MakeCommandFileNamesWithHome(scHomeDir, tp.CmdKey.SessionId, tp.CmdKey.CmdId)
ptyInfo, _ := os.Stat(fileNames.PtyOutFile) ptyInfo, _ := os.Stat(fileNames.PtyOutFile)
if ptyInfo != nil { if ptyInfo != nil {
tp.FilePtyLen = ptyInfo.Size() entry.FilePtyLen = ptyInfo.Size()
}
if tp.TailPtyPos < 0 {
tp.TailPtyPos = max(0, tp.FilePtyLen-tp.TailPtyPos)
} }
runoutInfo, _ := os.Stat(fileNames.RunnerOutFile) runoutInfo, _ := os.Stat(fileNames.RunnerOutFile)
if runoutInfo != nil { if runoutInfo != nil {
tp.FileRunLen = runoutInfo.Size() entry.FileRunLen = runoutInfo.Size()
}
if tp.TailRunPos < 0 {
tp.TailRunPos = max(0, tp.FileRunLen-tp.TailRunPos)
} }
} }
@ -252,6 +315,9 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error {
if err != nil { if err != nil {
return fmt.Errorf("getcmd, bad cmdid '%s': %w", getPacket.CmdId, err) return fmt.Errorf("getcmd, bad cmdid '%s': %w", getPacket.CmdId, err)
} }
if getPacket.ReqId == "" {
return fmt.Errorf("getcmd, no reqid specified")
}
t.Lock.Lock() t.Lock.Lock()
defer t.Lock.Unlock() defer t.Lock.Unlock()
key := CmdKey{getPacket.SessionId, getPacket.CmdId} key := CmdKey{getPacket.SessionId, getPacket.CmdId}
@ -259,9 +325,21 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error {
if err != nil { if err != nil {
return fmt.Errorf("error trying to watch sesion '%s': %v", getPacket.SessionId, err) return fmt.Errorf("error trying to watch sesion '%s': %v", getPacket.SessionId, err)
} }
pos := TailPos{CmdKey: key, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Follow: getPacket.Tail} entry, foundEntry := t.WatchList[key]
pos.fillFilePos(t.ScHomeDir) if !foundEntry {
t.WatchList[key] = pos entry = CmdWatchEntry{CmdKey: key}
t.tryStartRun_nolock(pos) entry.fillFilePos(t.ScHomeDir)
}
pos := TailPos{ReqId: getPacket.ReqId, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Follow: getPacket.Tail}
// convert negative pos to positive
if pos.TailPtyPos < 0 {
pos.TailPtyPos = max(0, entry.FilePtyLen+pos.TailPtyPos) // + because negative
}
if pos.TailRunPos < 0 {
pos.TailRunPos = max(0, entry.FileRunLen+pos.TailRunPos) // + because negative
}
entry.updateTailPos(pos.ReqId, pos)
t.WatchList[key] = entry
t.tryStartRun_nolock(entry, pos)
return nil return nil
} }

View File

@ -27,6 +27,7 @@ const CmdStartPacketStr = "cmdstart"
const CmdDonePacketStr = "cmddone" const CmdDonePacketStr = "cmddone"
const ListCmdPacketStr = "lscmd" const ListCmdPacketStr = "lscmd"
const GetCmdPacketStr = "getcmd" const GetCmdPacketStr = "getcmd"
const UntailCmdPacketStr = "untailcmd"
const RunnerInitPacketStr = "runnerinit" const RunnerInitPacketStr = "runnerinit"
const CdPacketStr = "cd" const CdPacketStr = "cd"
const CdResponseStr = "cdresp" const CdResponseStr = "cdresp"
@ -46,6 +47,7 @@ func init() {
TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{}) TypeStrToFactory[CmdDonePacketStr] = reflect.TypeOf(CmdDonePacketType{})
TypeStrToFactory[ListCmdPacketStr] = reflect.TypeOf(ListCmdPacketType{}) TypeStrToFactory[ListCmdPacketStr] = reflect.TypeOf(ListCmdPacketType{})
TypeStrToFactory[GetCmdPacketStr] = reflect.TypeOf(GetCmdPacketType{}) TypeStrToFactory[GetCmdPacketStr] = reflect.TypeOf(GetCmdPacketType{})
TypeStrToFactory[UntailCmdPacketStr] = reflect.TypeOf(UntailCmdPacketType{})
TypeStrToFactory[RunnerInitPacketStr] = reflect.TypeOf(RunnerInitPacketType{}) TypeStrToFactory[RunnerInitPacketStr] = reflect.TypeOf(RunnerInitPacketType{})
TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{}) TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{})
TypeStrToFactory[CdResponseStr] = reflect.TypeOf(CdResponseType{}) TypeStrToFactory[CdResponseStr] = reflect.TypeOf(CdResponseType{})
@ -64,6 +66,7 @@ func MakePacket(packetType string) (PacketType, error) {
type CmdDataPacketType struct { type CmdDataPacketType struct {
Type string `json:"type"` Type string `json:"type"`
ReqId string `json:"reqid"`
SessionId string `json:"sessionid"` SessionId string `json:"sessionid"`
CmdId string `json:"cmdid"` CmdId string `json:"cmdid"`
PtyPos int64 `json:"ptypos"` PtyPos int64 `json:"ptypos"`
@ -71,7 +74,9 @@ type CmdDataPacketType struct {
RunPos int64 `json:"runpos"` RunPos int64 `json:"runpos"`
RunLen int64 `json:"runlen"` RunLen int64 `json:"runlen"`
PtyData string `json:"ptydata"` PtyData string `json:"ptydata"`
PtyDataLen int `json:"ptydatalen"`
RunData string `json:"rundata"` RunData string `json:"rundata"`
RunDataLen int `json:"rundatalen"`
Error string `json:"error"` Error string `json:"error"`
NotFound bool `json:"notfound,omitempty"` NotFound bool `json:"notfound,omitempty"`
} }
@ -96,8 +101,24 @@ func MakePingPacket() *PingPacketType {
return &PingPacketType{Type: PingPacketStr} return &PingPacketType{Type: PingPacketStr}
} }
type UntailCmdPacketType struct {
Type string `json:"type"`
ReqId string `json:"reqid"`
SessionId string `json:"sessionid"`
CmdId string `json:"cmdid"`
}
func (*UntailCmdPacketType) GetType() string {
return UntailCmdPacketStr
}
func MakeUntailCmdPacket() *UntailCmdPacketType {
return &UntailCmdPacketType{Type: UntailCmdPacketStr}
}
type GetCmdPacketType struct { type GetCmdPacketType struct {
Type string `json:"type"` Type string `json:"type"`
ReqId string `json:"reqid"`
SessionId string `json:"sessionid"` SessionId string `json:"sessionid"`
CmdId string `json:"cmdid"` CmdId string `json:"cmdid"`
PtyPos int64 `json:"ptypos"` PtyPos int64 `json:"ptypos"`