waveterm/waveshell/pkg/mpio/bufreader.go
2023-10-16 21:31:13 -07:00

148 lines
2.9 KiB
Go

// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package mpio
import (
"io"
"sync"
"github.com/wavetermdev/waveterm/waveshell/pkg/packet"
)
type FdReader struct {
CVar *sync.Cond
M *Multiplexer
FdNum int
Fd io.ReadCloser
BufSize int
Closed bool
ShouldCloseFd bool
IsPty bool
}
func MakeFdReader(m *Multiplexer, fd io.ReadCloser, fdNum int, shouldCloseFd bool, isPty bool) *FdReader {
fr := &FdReader{
CVar: sync.NewCond(&sync.Mutex{}),
M: m,
FdNum: fdNum,
Fd: fd,
BufSize: 0,
ShouldCloseFd: shouldCloseFd,
IsPty: isPty,
}
return fr
}
func (r *FdReader) Close() {
r.CVar.L.Lock()
defer r.CVar.L.Unlock()
if r.Closed {
return
}
if r.Fd != nil && r.ShouldCloseFd {
r.Fd.Close()
}
r.CVar.Broadcast()
}
func (r *FdReader) GetBufSize() int {
r.CVar.L.Lock()
defer r.CVar.L.Unlock()
return r.BufSize
}
func (r *FdReader) NotifyAck(ackLen int) {
r.CVar.L.Lock()
defer r.CVar.L.Unlock()
if r.Closed {
return
}
r.BufSize -= ackLen
if r.BufSize < 0 {
r.BufSize = 0
}
r.CVar.Broadcast()
}
// !! inverse locking. must already hold the lock when you call this method.
// will *unlock*, send the packet, and then *relock* once it is done.
// this can prevent an unlikely deadlock where we are holding r.CVar.L and stuck on sender.SendCh
func (r *FdReader) sendPacket_unlock(pk packet.PacketType) {
r.CVar.L.Unlock()
defer r.CVar.L.Lock()
r.M.sendPacket(pk)
}
// returns (success)
func (r *FdReader) WriteWait(data []byte, isEof bool) bool {
r.CVar.L.Lock()
defer r.CVar.L.Unlock()
for {
bufAvail := ReadBufSize - r.BufSize
if r.Closed {
return false
}
if bufAvail == 0 {
r.CVar.Wait()
continue
}
writeLen := min(bufAvail, len(data))
pk := r.M.makeDataPacket(r.FdNum, data[0:writeLen], nil)
pk.Eof = isEof && (writeLen == len(data))
r.BufSize += writeLen
data = data[writeLen:]
r.sendPacket_unlock(pk)
if len(data) == 0 {
return true
}
// do *not* do a CVar.Wait() here -- because we *unlocked* to send the packet, we should
// recheck the condition before waiting to avoid deadlock.
}
}
func min(v1 int, v2 int) int {
if v1 <= v2 {
return v1
}
return v2
}
func (r *FdReader) isClosed() bool {
r.CVar.L.Lock()
defer r.CVar.L.Unlock()
return r.Closed
}
func (r *FdReader) ReadLoop(wg *sync.WaitGroup) {
defer r.Close()
if wg != nil {
defer wg.Done()
}
buf := make([]byte, 4096)
for {
nr, err := r.Fd.Read(buf)
if r.isClosed() {
return // should not send data or error if we already closed the fd
}
if nr > 0 || err == io.EOF {
isOpen := r.WriteWait(buf[0:nr], (err == io.EOF))
if !isOpen {
return
}
if err == io.EOF {
return
}
}
if err != nil {
if r.IsPty {
r.WriteWait(nil, true)
return
}
errPk := r.M.makeDataPacket(r.FdNum, nil, err)
r.M.sendPacket(errPk)
return
}
}
}