Add flag for connection rate limit and slow zero-progress operations

This commit is contained in:
Geoff Bourne 2019-07-09 07:22:20 -05:00
parent 7c7f0e1b3c
commit 433dd2233b
3 changed files with 38 additions and 17 deletions

View File

@ -24,6 +24,7 @@ var (
inKubeCluster = flag.Bool("in-kube-cluster", false, "Use in-cluster kubernetes config") inKubeCluster = flag.Bool("in-kube-cluster", false, "Use in-cluster kubernetes config")
cpuProfile = flag.String("cpu-profile", "", "Enables CPU profiling and writes to given path") cpuProfile = flag.String("cpu-profile", "", "Enables CPU profiling and writes to given path")
debug = flag.Bool("debug", false, "Enable debug logs") debug = flag.Bool("debug", false, "Enable debug logs")
connRateLimit = flag.Int("connection-rate-limit", 1, "Max number of connections to allow per second")
) )
var ( var (
@ -71,7 +72,10 @@ func main() {
server.Routes.RegisterAll(parseMappings(*mappings)) server.Routes.RegisterAll(parseMappings(*mappings))
server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port))) if *connRateLimit < 1 {
*connRateLimit = 1
}
server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit)
if *apiBinding != "" { if *apiBinding != "" {
server.StartApiServer(*apiBinding) server.StartApiServer(*apiBinding)

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"net" "net"
"strings" "strings"
"time"
) )
func ReadPacket(reader io.Reader, addr net.Addr) (*Packet, error) { func ReadPacket(reader io.Reader, addr net.Addr) (*Packet, error) {
@ -70,6 +71,15 @@ func ReadFrame(reader io.Reader, addr net.Addr) (*Frame, error) {
WithField("total", total). WithField("total", total).
WithField("length", frame.Length). WithField("length", frame.Length).
Debug("Reading frame content") Debug("Reading frame content")
if n == 0 {
logrus.
WithField("client", addr).
WithField("frame", frame).
Debug("No progress on frame reading")
time.Sleep(100 * time.Millisecond)
}
} }
logrus. logrus.

View File

@ -11,14 +11,13 @@ import (
) )
const ( const (
connectionsLimitPerSec = 1 handshakeTimeout = 2 * time.Second
handshakeTimeout = 30 * time.Second
) )
var noDeadline time.Time var noDeadline time.Time
type IConnector interface { type IConnector interface {
StartAcceptingConnections(ctx context.Context, listenAddress string) error StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error
} }
var Connector IConnector = &connectorImpl{} var Connector IConnector = &connectorImpl{}
@ -26,7 +25,7 @@ var Connector IConnector = &connectorImpl{}
type connectorImpl struct { type connectorImpl struct {
} }
func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAddress string) error { func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error {
ln, err := net.Listen("tcp", listenAddress) ln, err := net.Listen("tcp", listenAddress)
if err != nil { if err != nil {
@ -35,16 +34,16 @@ func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAdd
} }
logrus.WithField("listenAddress", listenAddress).Info("Listening for Minecraft client connections") logrus.WithField("listenAddress", listenAddress).Info("Listening for Minecraft client connections")
go c.acceptConnections(ctx, ln) go c.acceptConnections(ctx, ln, connRateLimit)
return nil return nil
} }
func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener) { func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener, connRateLimit int) {
//noinspection GoUnhandledErrorResult //noinspection GoUnhandledErrorResult
defer ln.Close() defer ln.Close()
limiter := time.Tick(time.Second / connectionsLimitPerSec) limiter := time.Tick(time.Second / time.Duration(connRateLimit))
for { for {
select { select {
@ -67,7 +66,9 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
defer frontendConn.Close() defer frontendConn.Close()
clientAddr := frontendConn.RemoteAddr() clientAddr := frontendConn.RemoteAddr()
logrus.WithFields(logrus.Fields{"clientAddr": clientAddr}).Info("Got connection") logrus.
WithField("client", clientAddr).
Info("Got connection")
inspectionBuffer := new(bytes.Buffer) inspectionBuffer := new(bytes.Buffer)
@ -76,7 +77,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
if err := frontendConn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { if err := frontendConn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
logrus. logrus.
WithError(err). WithError(err).
WithField("client", frontendConn). WithField("client", clientAddr).
Error("Failed to set read deadline") Error("Failed to set read deadline")
return return
} }
@ -128,14 +129,14 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
if err = frontendConn.SetReadDeadline(noDeadline); err != nil { if err = frontendConn.SetReadDeadline(noDeadline); err != nil {
logrus. logrus.
WithError(err). WithError(err).
WithField("client", frontendConn). WithField("client", clientAddr).
Error("Failed to clear read deadline") Error("Failed to clear read deadline")
return return
} }
pumpConnections(ctx, frontendConn, backendConn) pumpConnections(ctx, frontendConn, backendConn)
} else { } else {
logrus. logrus.
WithField("client", frontendConn). WithField("client", clientAddr).
WithField("packetID", packet.PacketID). WithField("packetID", packet.PacketID).
Error("Unexpected packetID, expected handshake") Error("Unexpected packetID, expected handshake")
return return
@ -145,17 +146,20 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
//noinspection GoUnhandledErrorResult //noinspection GoUnhandledErrorResult
defer backendConn.Close() defer backendConn.Close()
clientAddr := frontendConn.RemoteAddr()
errors := make(chan error, 2) errors := make(chan error, 2)
go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend") go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr)
go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend") go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr)
for { for {
select { select {
case err := <-errors: case err := <-errors:
if err != io.EOF { if err != io.EOF {
logrus.WithError(err).Error("Error observed on connection relay") logrus.WithError(err).
WithField("client", clientAddr).
Error("Error observed on connection relay")
} }
return return
@ -166,10 +170,13 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
} }
} }
func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string) { func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) {
amount, err := io.Copy(outgoing, incoming) amount, err := io.Copy(outgoing, incoming)
if err != nil { if err != nil {
errors <- err errors <- err
} }
logrus.WithField("amount", amount).Infof("Finished relay %s->%s", from, to) logrus.
WithField("client", clientAddr).
WithField("amount", amount).
Infof("Finished relay %s->%s", from, to)
} }