From c6165f15f450f13ae795892c41555a5bfbd19770 Mon Sep 17 00:00:00 2001 From: sawka Date: Thu, 16 Jun 2022 01:10:56 -0700 Subject: [PATCH] update tailer to send packets to a channel instead of hard coding to PacketSender --- main-runner.go | 2 +- pkg/cmdtail/cmdtail.go | 10 +++++----- pkg/packet/packet.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/main-runner.go b/main-runner.go index 3c74d49a1..c9b9fbe70 100644 --- a/main-runner.go +++ b/main-runner.go @@ -154,7 +154,7 @@ func doMain() { } packetCh := packet.PacketParser(os.Stdin) sender := packet.MakePacketSender(os.Stdout) - tailer, err := cmdtail.MakeTailer(sender) + tailer, err := cmdtail.MakeTailer(sender.SendCh) if err != nil { packet.SendErrorPacket(os.Stdout, err.Error()) return diff --git a/pkg/cmdtail/cmdtail.go b/pkg/cmdtail/cmdtail.go index f82c41e62..e09408697 100644 --- a/pkg/cmdtail/cmdtail.go +++ b/pkg/cmdtail/cmdtail.go @@ -44,10 +44,10 @@ type Tailer struct { WatchList map[CmdKey]TailPos ScHomeDir string Watcher *SessionWatcher - Sender *packet.PacketSender + SendCh chan packet.PacketType } -func MakeTailer(sender *packet.PacketSender) (*Tailer, error) { +func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) { scHomeDir, err := base.GetScHomeDir() if err != nil { return nil, err @@ -56,7 +56,7 @@ func MakeTailer(sender *packet.PacketSender) (*Tailer, error) { Lock: &sync.Mutex{}, WatchList: make(map[CmdKey]TailPos), ScHomeDir: scHomeDir, - Sender: sender, + SendCh: sendCh, } rtn.Watcher, err = MakeSessionWatcher() if err != nil { @@ -163,7 +163,7 @@ func (t *Tailer) RunDataTransfer(key CmdKey) { for { dataPacket, keepRunning := t.runSingleDataTransfer(key) if dataPacket != nil { - t.Sender.SendPacket(dataPacket) + t.SendCh <- dataPacket } if !keepRunning { t.checkRemoveNoFollow(key) @@ -185,7 +185,7 @@ func (t *Tailer) tryStartRun_nolock(pos TailPos) { func (t *Tailer) updateFile(event FileUpdateEvent) { if event.Err != nil { - t.Sender.SendMessage("error in FileUpdateEvent %s/%s: %v", event.SessionId, event.CmdId, event.Err) + t.SendCh <- packet.FmtMessagePacket("error in FileUpdateEvent %s/%s: %v", event.SessionId, event.CmdId, event.Err) return } cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId} diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index ff56c4ac0..78f77cb02 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -181,6 +181,11 @@ func MakeMessagePacket(message string) *MessagePacketType { return &MessagePacketType{Type: MessagePacketStr, Message: message} } +func FmtMessagePacket(fmtStr string, args ...interface{}) *MessagePacketType { + message := fmt.Sprintf(fmtStr, args...) + return &MessagePacketType{Type: MessagePacketStr, Message: message} +} + type RunnerInitPacketType struct { Type string `json:"type"` ScHomeDir string `json:"schomedir"` @@ -444,3 +449,30 @@ func PacketParser(input io.Reader) chan PacketType { }() return rtnCh } + +type ErrorReporter interface { + ReportError(err error) +} + +func PacketToByteArrBridge(pkCh chan PacketType, byteCh chan []byte, errorReporter ErrorReporter, closeOnDone bool) { + go func() { + defer func() { + if closeOnDone { + close(byteCh) + } + }() + for pk := range pkCh { + if pk == nil { + continue + } + jsonBytes, err := json.Marshal(pk) + if err != nil { + if errorReporter != nil { + errorReporter.ReportError(fmt.Errorf("error marshaling packet: %w", err)) + } + continue + } + byteCh <- jsonBytes + } + }() +}