sendpacket with context, initialize rpcmap

This commit is contained in:
sawka 2022-07-06 14:35:27 -07:00
parent 56e1ddf8e6
commit 51df0479ff
2 changed files with 21 additions and 1 deletions

View File

@ -8,6 +8,7 @@ package packet
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -693,6 +694,19 @@ func (sender *PacketSender) checkStatus() error {
return nil
}
func (sender *PacketSender) SendPacketCtx(ctx context.Context, pk PacketType) error {
err := sender.checkStatus()
if err != nil {
return err
}
select {
case sender.SendCh <- pk:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (sender *PacketSender) SendPacket(pk PacketType) error {
err := sender.checkStatus()
if err != nil {

View File

@ -31,6 +31,7 @@ func CombinePacketParsers(p1 *PacketParser, p2 *PacketParser) *PacketParser {
rtnParser := &PacketParser{
Lock: &sync.Mutex{},
MainCh: make(chan PacketType),
RpcMap: make(map[string]*RpcEntry),
}
var wg sync.WaitGroup
wg.Add(2)
@ -78,7 +79,11 @@ func (p *PacketParser) UnRegisterRpc(reqId string) {
}
}
func (p *PacketParser) RegisterRpc(reqId string, queueSize int) chan RpcResponsePacketType {
func (p *PacketParser) RegisterRpc(reqId string) chan RpcResponsePacketType {
return p.RegisterRpcSz(reqId, 2)
}
func (p *PacketParser) RegisterRpcSz(reqId string, queueSize int) chan RpcResponsePacketType {
p.Lock.Lock()
defer p.Lock.Unlock()
ch := make(chan RpcResponsePacketType, queueSize)
@ -135,6 +140,7 @@ func MakePacketParser(input io.Reader) *PacketParser {
parser := &PacketParser{
Lock: &sync.Mutex{},
MainCh: make(chan PacketType),
RpcMap: make(map[string]*RpcEntry),
}
bufReader := bufio.NewReader(input)
go func() {