mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-21 21:32:13 +01:00
updates to allow parsing non-packets into raw packets (to see stderr output), and new sessionwatcher
This commit is contained in:
parent
ecceb67f20
commit
5176128346
@ -128,7 +128,6 @@ func doMainRun(pk *packet.RunPacketType, sender *packet.PacketSender) {
|
||||
|
||||
func doGetCmd(tailer *cmdtail.Tailer, pk *packet.GetCmdPacketType, sender *packet.PacketSender) error {
|
||||
// non-tail packets?
|
||||
sender.SendPacket(packet.MakeMessagePacket(fmt.Sprintf("getcmd %s", pk.CmdId)))
|
||||
err := tailer.AddWatch(pk)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -161,7 +160,6 @@ func doMain() {
|
||||
return
|
||||
}
|
||||
go tailer.Run()
|
||||
sender.SendPacket(packet.MakeMessagePacket(fmt.Sprintf("starting scripthaus runner @ %s", scHomeDir)))
|
||||
initPacket := packet.MakeRunnerInitPacket()
|
||||
initPacket.Env = os.Environ()
|
||||
initPacket.HomeDir = homeDir
|
||||
|
@ -10,12 +10,9 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/google/uuid"
|
||||
"github.com/scripthaus-dev/sh2-runner/pkg/base"
|
||||
"github.com/scripthaus-dev/sh2-runner/pkg/packet"
|
||||
@ -26,11 +23,15 @@ const MaxDataBytes = 4096
|
||||
type TailPos struct {
|
||||
CmdKey CmdKey
|
||||
Running bool // an active tailer sending data
|
||||
Version int
|
||||
FilePtyLen int64
|
||||
FileRunLen int64
|
||||
TailPtyPos int64
|
||||
TailRunPos int64
|
||||
Follow bool
|
||||
}
|
||||
|
||||
func (pos TailPos) IsCurrent() bool {
|
||||
return pos.TailPtyPos >= pos.FilePtyLen && pos.TailRunPos >= pos.FileRunLen
|
||||
}
|
||||
|
||||
type CmdKey struct {
|
||||
@ -41,9 +42,8 @@ type CmdKey struct {
|
||||
type Tailer struct {
|
||||
Lock *sync.Mutex
|
||||
WatchList map[CmdKey]TailPos
|
||||
Sessions map[string]bool
|
||||
Watcher *fsnotify.Watcher
|
||||
ScHomeDir string
|
||||
Watcher *SessionWatcher
|
||||
Sender *packet.PacketSender
|
||||
}
|
||||
|
||||
@ -55,11 +55,10 @@ func MakeTailer(sender *packet.PacketSender) (*Tailer, error) {
|
||||
rtn := &Tailer{
|
||||
Lock: &sync.Mutex{},
|
||||
WatchList: make(map[CmdKey]TailPos),
|
||||
Sessions: make(map[string]bool),
|
||||
ScHomeDir: scHomeDir,
|
||||
Sender: sender,
|
||||
}
|
||||
rtn.Watcher, err = fsnotify.NewWatcher()
|
||||
rtn.Watcher, err = MakeSessionWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -105,8 +104,6 @@ func (t *Tailer) makeCmdDataPacket(fileNames *base.CommandFileNames, pos TailPos
|
||||
return dataPacket
|
||||
}
|
||||
|
||||
var updateFileRe = regexp.MustCompile("/([a-z0-9-]+)/([a-z0-9-]+)\\.(ptyout|runout)$")
|
||||
|
||||
// returns (data-packet, keepRunning)
|
||||
func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, bool) {
|
||||
t.Lock.Lock()
|
||||
@ -150,6 +147,18 @@ func (t *Tailer) runSingleDataTransfer(key CmdKey) (*packet.CmdDataPacketType, b
|
||||
return dataPacket, pos.Running
|
||||
}
|
||||
|
||||
func (t *Tailer) checkRemoveNoFollow(cmdKey CmdKey) {
|
||||
t.Lock.Lock()
|
||||
defer t.Lock.Unlock()
|
||||
pos, foundPos := t.WatchList[cmdKey]
|
||||
if !foundPos {
|
||||
return
|
||||
}
|
||||
if !pos.Follow {
|
||||
delete(t.WatchList, cmdKey)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tailer) RunDataTransfer(key CmdKey) {
|
||||
for {
|
||||
dataPacket, keepRunning := t.runSingleDataTransfer(key)
|
||||
@ -157,73 +166,78 @@ func (t *Tailer) RunDataTransfer(key CmdKey) {
|
||||
t.Sender.SendPacket(dataPacket)
|
||||
}
|
||||
if !keepRunning {
|
||||
t.checkRemoveNoFollow(key)
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tailer) UpdateFile(relFileName string) {
|
||||
m := updateFileRe.FindStringSubmatch(relFileName)
|
||||
if m == nil {
|
||||
// should already hold t.Lock
|
||||
func (t *Tailer) tryStartRun_nolock(pos TailPos) {
|
||||
if pos.Running || pos.IsCurrent() {
|
||||
return
|
||||
}
|
||||
finfo, err := os.Stat(relFileName)
|
||||
if err != nil {
|
||||
t.Sender.SendMessage("error stating file '%s': %w", relFileName, err)
|
||||
pos.Running = true
|
||||
t.WatchList[pos.CmdKey] = pos
|
||||
go t.RunDataTransfer(pos.CmdKey)
|
||||
}
|
||||
|
||||
func (t *Tailer) updateFile(event FileUpdateEvent) {
|
||||
if event.Err != nil {
|
||||
t.Sender.SendMessage("error in FileUpdateEvent %s/%s: %v", event.SessionId, event.CmdId, event.Err)
|
||||
return
|
||||
}
|
||||
isPtyFile := m[3] == "ptyout"
|
||||
cmdKey := CmdKey{m[1], m[2]}
|
||||
fileSize := finfo.Size()
|
||||
cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId}
|
||||
t.Lock.Lock()
|
||||
defer t.Lock.Unlock()
|
||||
pos, foundPos := t.WatchList[cmdKey]
|
||||
if !foundPos {
|
||||
return
|
||||
}
|
||||
if isPtyFile {
|
||||
pos.FilePtyLen = fileSize
|
||||
} else {
|
||||
pos.FileRunLen = fileSize
|
||||
if event.FileType == FileTypePty {
|
||||
pos.FilePtyLen = event.Size
|
||||
} else if event.FileType == FileTypeRun {
|
||||
pos.FileRunLen = event.Size
|
||||
}
|
||||
t.WatchList[cmdKey] = pos
|
||||
if !pos.Running && (pos.FilePtyLen > pos.TailPtyPos || pos.FileRunLen > pos.TailRunPos) {
|
||||
go t.RunDataTransfer(cmdKey)
|
||||
}
|
||||
t.tryStartRun_nolock(pos)
|
||||
}
|
||||
|
||||
func (t *Tailer) Run() {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-t.Watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if (event.Op&fsnotify.Write == fsnotify.Write) || (event.Op&fsnotify.Create == fsnotify.Create) {
|
||||
t.UpdateFile(event.Name)
|
||||
}
|
||||
|
||||
case err, ok := <-t.Watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// what to do with watcher error?
|
||||
t.Sender.SendMessage("error in tailer '%v'", err)
|
||||
func (t *Tailer) Run() error {
|
||||
go func() {
|
||||
for event := range t.Watcher.EventCh {
|
||||
t.updateFile(event)
|
||||
}
|
||||
}
|
||||
}()
|
||||
err := t.Watcher.Run(nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func max(v1 int64, v2 int64) int64 {
|
||||
if v1 > v2 {
|
||||
return v1
|
||||
}
|
||||
return v2
|
||||
}
|
||||
|
||||
// also converts negative positions to positive positions
|
||||
func (tp *TailPos) fillFilePos(scHomeDir string) {
|
||||
fileNames := base.MakeCommandFileNamesWithHome(scHomeDir, tp.CmdKey.SessionId, tp.CmdKey.CmdId)
|
||||
ptyInfo, _ := os.Stat(fileNames.PtyOutFile)
|
||||
if ptyInfo != nil {
|
||||
tp.FilePtyLen = ptyInfo.Size()
|
||||
}
|
||||
if tp.TailPtyPos < 0 {
|
||||
tp.TailPtyPos = max(0, tp.FilePtyLen-tp.TailPtyPos)
|
||||
}
|
||||
runoutInfo, _ := os.Stat(fileNames.RunnerOutFile)
|
||||
if runoutInfo != nil {
|
||||
tp.FileRunLen = runoutInfo.Size()
|
||||
}
|
||||
if tp.TailRunPos < 0 {
|
||||
tp.TailRunPos = max(0, tp.FileRunLen-tp.TailRunPos)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error {
|
||||
@ -241,17 +255,13 @@ func (t *Tailer) AddWatch(getPacket *packet.GetCmdPacketType) error {
|
||||
t.Lock.Lock()
|
||||
defer t.Lock.Unlock()
|
||||
key := CmdKey{getPacket.SessionId, getPacket.CmdId}
|
||||
if !t.Sessions[getPacket.SessionId] {
|
||||
sessionDir := path.Join(t.ScHomeDir, base.SessionsDirBaseName, getPacket.SessionId)
|
||||
err = t.Watcher.Add(sessionDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error adding watcher for session dir '%s': %v", sessionDir, err)
|
||||
}
|
||||
t.Sessions[getPacket.SessionId] = true
|
||||
err = t.Watcher.WatchSession(getPacket.SessionId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error trying to watch sesion '%s': %v", getPacket.SessionId, err)
|
||||
}
|
||||
oldPos := t.WatchList[key]
|
||||
pos := TailPos{CmdKey: key, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Version: oldPos.Version + 1}
|
||||
pos := TailPos{CmdKey: key, TailPtyPos: getPacket.PtyPos, TailRunPos: getPacket.RunPos, Follow: getPacket.Tail}
|
||||
pos.fillFilePos(t.ScHomeDir)
|
||||
t.WatchList[key] = pos
|
||||
t.tryStartRun_nolock(pos)
|
||||
return nil
|
||||
}
|
||||
|
161
pkg/cmdtail/sessionwatcher.go
Normal file
161
pkg/cmdtail/sessionwatcher.go
Normal file
@ -0,0 +1,161 @@
|
||||
// Copyright 2022 Dashborg Inc
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package cmdtail
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/google/uuid"
|
||||
"github.com/scripthaus-dev/sh2-runner/pkg/base"
|
||||
)
|
||||
|
||||
const FileTypePty = "ptyout"
|
||||
const FileTypeRun = "runout"
|
||||
const eventChSize = 10
|
||||
|
||||
type FileUpdateEvent struct {
|
||||
SessionId string
|
||||
CmdId string
|
||||
FileType string
|
||||
Size int64
|
||||
Err error
|
||||
}
|
||||
|
||||
type SessionWatcher struct {
|
||||
Lock *sync.Mutex
|
||||
Sessions map[string]bool
|
||||
ScHomeDir string
|
||||
Watcher *fsnotify.Watcher
|
||||
EventCh chan FileUpdateEvent
|
||||
Err error
|
||||
Running bool
|
||||
}
|
||||
|
||||
func MakeSessionWatcher() (*SessionWatcher, error) {
|
||||
scHomeDir, err := base.GetScHomeDir()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rtn := &SessionWatcher{
|
||||
Lock: &sync.Mutex{},
|
||||
Sessions: make(map[string]bool),
|
||||
ScHomeDir: scHomeDir,
|
||||
EventCh: make(chan FileUpdateEvent, eventChSize),
|
||||
}
|
||||
rtn.Watcher, err = fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rtn, nil
|
||||
}
|
||||
|
||||
func (w *SessionWatcher) UnWatchSession(sessionId string) error {
|
||||
_, err := uuid.Parse(sessionId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("WatchSession, bad sessionid '%s': %w", sessionId, err)
|
||||
}
|
||||
w.Lock.Lock()
|
||||
defer w.Lock.Unlock()
|
||||
if !w.Sessions[sessionId] {
|
||||
return nil
|
||||
}
|
||||
sessionDir := path.Join(w.ScHomeDir, base.SessionsDirBaseName, sessionId)
|
||||
err = w.Watcher.Remove(sessionDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.Sessions[sessionId] = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *SessionWatcher) WatchSession(sessionId string) error {
|
||||
_, err := uuid.Parse(sessionId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("WatchSession, bad sessionid '%s': %w", sessionId, err)
|
||||
}
|
||||
|
||||
w.Lock.Lock()
|
||||
defer w.Lock.Unlock()
|
||||
if w.Sessions[sessionId] {
|
||||
return nil
|
||||
}
|
||||
sessionDir := path.Join(w.ScHomeDir, base.SessionsDirBaseName, sessionId)
|
||||
err = w.Watcher.Add(sessionDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.Sessions[sessionId] = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *SessionWatcher) setRunning() bool {
|
||||
w.Lock.Lock()
|
||||
defer w.Lock.Unlock()
|
||||
if w.Running {
|
||||
return false
|
||||
}
|
||||
w.Running = true
|
||||
return true
|
||||
}
|
||||
|
||||
var swUpdateFileRe = regexp.MustCompile("/([a-z0-9-]+)/([a-z0-9-]+)\\.(ptyout|runout)$")
|
||||
|
||||
func (w *SessionWatcher) updateFile(relFileName string) {
|
||||
m := swUpdateFileRe.FindStringSubmatch(relFileName)
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
event := FileUpdateEvent{SessionId: m[1], CmdId: m[2], FileType: m[3]}
|
||||
finfo, err := os.Stat(relFileName)
|
||||
if err != nil {
|
||||
event.Err = err
|
||||
w.EventCh <- event
|
||||
return
|
||||
}
|
||||
event.Size = finfo.Size()
|
||||
w.EventCh <- event
|
||||
return
|
||||
}
|
||||
|
||||
func (w *SessionWatcher) Run(stopCh chan bool) error {
|
||||
ok := w.setRunning()
|
||||
if !ok {
|
||||
return fmt.Errorf("Cannot run SessionWatcher (alreaady running)")
|
||||
}
|
||||
defer func() {
|
||||
w.Lock.Lock()
|
||||
defer w.Lock.Unlock()
|
||||
w.Running = false
|
||||
close(w.EventCh)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-w.Watcher.Events:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if (event.Op&fsnotify.Write == fsnotify.Write) || (event.Op&fsnotify.Create == fsnotify.Create) {
|
||||
w.updateFile(event.Name)
|
||||
}
|
||||
|
||||
case err, ok := <-w.Watcher.Errors:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Got error in SessionWatcher: %w", err)
|
||||
|
||||
case <-stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -8,10 +8,13 @@ package packet
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -28,6 +31,7 @@ const RunnerInitPacketStr = "runnerinit"
|
||||
const CdPacketStr = "cd"
|
||||
const CdResponseStr = "cdresp"
|
||||
const CmdDataPacketStr = "cmddata"
|
||||
const RawPacketStr = "raw"
|
||||
|
||||
var TypeStrToFactory map[string]reflect.Type
|
||||
|
||||
@ -46,7 +50,7 @@ func init() {
|
||||
TypeStrToFactory[CdPacketStr] = reflect.TypeOf(CdPacketType{})
|
||||
TypeStrToFactory[CdResponseStr] = reflect.TypeOf(CdResponseType{})
|
||||
TypeStrToFactory[CmdDataPacketStr] = reflect.TypeOf(CmdDataPacketType{})
|
||||
|
||||
TypeStrToFactory[RawPacketStr] = reflect.TypeOf(RawPacketType{})
|
||||
}
|
||||
|
||||
func MakePacket(packetType string) (PacketType, error) {
|
||||
@ -63,11 +67,13 @@ type CmdDataPacketType struct {
|
||||
SessionId string `json:"sessionid"`
|
||||
CmdId string `json:"cmdid"`
|
||||
PtyPos int64 `json:"ptypos"`
|
||||
PtyLen int64 `json:"ptylen"`
|
||||
RunPos int64 `json:"runpos"`
|
||||
RunLen int64 `json:"runlen"`
|
||||
PtyData string `json:"ptydata"`
|
||||
RunData string `json:"rundata"`
|
||||
Done bool `json:"done"`
|
||||
Error string `json:"error"`
|
||||
NotFound bool `json:"notfound,omitempty"`
|
||||
}
|
||||
|
||||
func (*CmdDataPacketType) GetType() string {
|
||||
@ -149,6 +155,19 @@ func MakeCdResponse() *CdResponseType {
|
||||
return &CdResponseType{Type: CdResponseStr}
|
||||
}
|
||||
|
||||
type RawPacketType struct {
|
||||
Type string `json:"type"`
|
||||
Data string `json:"data"`
|
||||
}
|
||||
|
||||
func (*RawPacketType) GetType() string {
|
||||
return RawPacketStr
|
||||
}
|
||||
|
||||
func MakeRawPacket(val string) *RawPacketType {
|
||||
return &RawPacketType{Type: RawPacketStr, Data: val}
|
||||
}
|
||||
|
||||
type MessagePacketType struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
@ -288,12 +307,16 @@ func SendPacket(w io.Writer, packet PacketType) error {
|
||||
if packet == nil {
|
||||
return nil
|
||||
}
|
||||
barr, err := json.Marshal(packet)
|
||||
jsonBytes, err := json.Marshal(packet)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling '%s' packet: %w", packet.GetType(), err)
|
||||
}
|
||||
barr = append(barr, '\n')
|
||||
_, err = w.Write(barr)
|
||||
var outBuf bytes.Buffer
|
||||
outBuf.WriteByte('\n')
|
||||
outBuf.WriteString(fmt.Sprintf("##%d", len(jsonBytes)))
|
||||
outBuf.Write(jsonBytes)
|
||||
outBuf.WriteByte('\n')
|
||||
_, err = w.Write(outBuf.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -392,7 +415,22 @@ func PacketParser(input io.Reader) chan PacketType {
|
||||
rtnCh <- errPacket
|
||||
return
|
||||
}
|
||||
pk, err := ParseJsonPacket([]byte(line))
|
||||
if line == "\n" {
|
||||
continue
|
||||
}
|
||||
// ##[len][json]\n
|
||||
// ##14{"hello":true}\n
|
||||
bracePos := strings.Index(line, "{")
|
||||
if !strings.HasPrefix(line, "##") || bracePos == -1 {
|
||||
rtnCh <- MakeRawPacket(line[:len(line)-1])
|
||||
continue
|
||||
}
|
||||
packetLen, err := strconv.Atoi(line[2:bracePos])
|
||||
if err != nil || packetLen != len(line)-bracePos-1 {
|
||||
rtnCh <- MakeRawPacket(line[:len(line)-1])
|
||||
continue
|
||||
}
|
||||
pk, err := ParseJsonPacket([]byte(line[bracePos:]))
|
||||
if err != nil {
|
||||
errPk := MakeErrorPacket(fmt.Sprintf("parsing packet json from input: %v", err))
|
||||
rtnCh <- errPk
|
||||
|
@ -205,17 +205,22 @@ func RunCommand(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecT
|
||||
}, nil
|
||||
}
|
||||
|
||||
func GetExitCode(err error) int {
|
||||
if err == nil {
|
||||
return 0
|
||||
}
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
return exitErr.ExitCode()
|
||||
} else {
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ShExecType) WaitForCommand(cmdId string) *packet.CmdDonePacketType {
|
||||
err := c.Cmd.Wait()
|
||||
exitErr := c.Cmd.Wait()
|
||||
endTs := time.Now()
|
||||
cmdDuration := endTs.Sub(c.StartTs)
|
||||
exitCode := 0
|
||||
if err != nil {
|
||||
exitErr, ok := err.(*exec.ExitError)
|
||||
if ok {
|
||||
exitCode = exitErr.ExitCode()
|
||||
}
|
||||
}
|
||||
exitCode := GetExitCode(exitErr)
|
||||
donePacket := packet.MakeCmdDonePacket()
|
||||
donePacket.Ts = endTs.UnixMilli()
|
||||
donePacket.CmdId = cmdId
|
||||
|
Loading…
Reference in New Issue
Block a user