update tailer to send packets to a channel instead of hard coding to PacketSender

This commit is contained in:
sawka 2022-06-16 01:10:56 -07:00
parent 5176128346
commit c6165f15f4
3 changed files with 38 additions and 6 deletions

View File

@ -154,7 +154,7 @@ func doMain() {
}
packetCh := packet.PacketParser(os.Stdin)
sender := packet.MakePacketSender(os.Stdout)
tailer, err := cmdtail.MakeTailer(sender)
tailer, err := cmdtail.MakeTailer(sender.SendCh)
if err != nil {
packet.SendErrorPacket(os.Stdout, err.Error())
return

View File

@ -44,10 +44,10 @@ type Tailer struct {
WatchList map[CmdKey]TailPos
ScHomeDir string
Watcher *SessionWatcher
Sender *packet.PacketSender
SendCh chan packet.PacketType
}
func MakeTailer(sender *packet.PacketSender) (*Tailer, error) {
func MakeTailer(sendCh chan packet.PacketType) (*Tailer, error) {
scHomeDir, err := base.GetScHomeDir()
if err != nil {
return nil, err
@ -56,7 +56,7 @@ func MakeTailer(sender *packet.PacketSender) (*Tailer, error) {
Lock: &sync.Mutex{},
WatchList: make(map[CmdKey]TailPos),
ScHomeDir: scHomeDir,
Sender: sender,
SendCh: sendCh,
}
rtn.Watcher, err = MakeSessionWatcher()
if err != nil {
@ -163,7 +163,7 @@ func (t *Tailer) RunDataTransfer(key CmdKey) {
for {
dataPacket, keepRunning := t.runSingleDataTransfer(key)
if dataPacket != nil {
t.Sender.SendPacket(dataPacket)
t.SendCh <- dataPacket
}
if !keepRunning {
t.checkRemoveNoFollow(key)
@ -185,7 +185,7 @@ func (t *Tailer) tryStartRun_nolock(pos TailPos) {
func (t *Tailer) updateFile(event FileUpdateEvent) {
if event.Err != nil {
t.Sender.SendMessage("error in FileUpdateEvent %s/%s: %v", event.SessionId, event.CmdId, event.Err)
t.SendCh <- packet.FmtMessagePacket("error in FileUpdateEvent %s/%s: %v", event.SessionId, event.CmdId, event.Err)
return
}
cmdKey := CmdKey{SessionId: event.SessionId, CmdId: event.CmdId}

View File

@ -181,6 +181,11 @@ func MakeMessagePacket(message string) *MessagePacketType {
return &MessagePacketType{Type: MessagePacketStr, Message: message}
}
func FmtMessagePacket(fmtStr string, args ...interface{}) *MessagePacketType {
message := fmt.Sprintf(fmtStr, args...)
return &MessagePacketType{Type: MessagePacketStr, Message: message}
}
type RunnerInitPacketType struct {
Type string `json:"type"`
ScHomeDir string `json:"schomedir"`
@ -444,3 +449,30 @@ func PacketParser(input io.Reader) chan PacketType {
}()
return rtnCh
}
type ErrorReporter interface {
ReportError(err error)
}
func PacketToByteArrBridge(pkCh chan PacketType, byteCh chan []byte, errorReporter ErrorReporter, closeOnDone bool) {
go func() {
defer func() {
if closeOnDone {
close(byteCh)
}
}()
for pk := range pkCh {
if pk == nil {
continue
}
jsonBytes, err := json.Marshal(pk)
if err != nil {
if errorReporter != nil {
errorReporter.ReportError(fmt.Errorf("error marshaling packet: %w", err))
}
continue
}
byteCh <- jsonBytes
}
}()
}