waveterm/pkg/packet/parser.go

104 lines
2.1 KiB
Go

// Copyright 2022 Dashborg Inc
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package packet
import (
"bufio"
"fmt"
"io"
"strconv"
"strings"
"sync"
)
type PacketParser struct {
Lock *sync.Mutex
MainCh chan PacketType
}
func CombinePacketParsers(p1 *PacketParser, p2 *PacketParser) *PacketParser {
rtnParser := &PacketParser{
Lock: &sync.Mutex{},
MainCh: make(chan PacketType),
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for v := range p1.MainCh {
rtnParser.MainCh <- v
}
}()
go func() {
defer wg.Done()
for v := range p2.MainCh {
rtnParser.MainCh <- v
}
}()
go func() {
wg.Wait()
close(rtnParser.MainCh)
}()
return rtnParser
}
func MakePacketParser(input io.Reader) *PacketParser {
parser := &PacketParser{
Lock: &sync.Mutex{},
MainCh: make(chan PacketType),
}
bufReader := bufio.NewReader(input)
go func() {
defer func() {
close(parser.MainCh)
}()
for {
line, err := bufReader.ReadString('\n')
if err == io.EOF {
return
}
if err != nil {
errPacket := MakeErrorPacket(fmt.Sprintf("reading packets from input: %v", err))
parser.MainCh <- errPacket
return
}
if line == "\n" {
continue
}
// ##[len][json]\n
// ##14{"hello":true}\n
bracePos := strings.Index(line, "{")
if !strings.HasPrefix(line, "##") || bracePos == -1 {
parser.MainCh <- MakeRawPacket(line[:len(line)-1])
continue
}
packetLen := -1
if line[2:bracePos] != "N" {
packetLen, err = strconv.Atoi(line[2:bracePos])
if err != nil || packetLen != len(line)-bracePos-1 {
parser.MainCh <- MakeRawPacket(line[:len(line)-1])
continue
}
}
pk, err := ParseJsonPacket([]byte(line[bracePos:]))
if err != nil {
errPk := MakeErrorPacket(fmt.Sprintf("parsing packet json from input: %v", err))
parser.MainCh <- errPk
return
}
if pk.GetType() == DonePacketStr {
return
}
if pk.GetType() == PingPacketStr {
continue
}
parser.MainCh <- pk
}
}()
return parser
}