diff --git a/cmd/mc-router/main.go b/cmd/mc-router/main.go index 0f87f42..1c76201 100644 --- a/cmd/mc-router/main.go +++ b/cmd/mc-router/main.go @@ -24,6 +24,7 @@ var ( inKubeCluster = flag.Bool("in-kube-cluster", false, "Use in-cluster kubernetes config") cpuProfile = flag.String("cpu-profile", "", "Enables CPU profiling and writes to given path") debug = flag.Bool("debug", false, "Enable debug logs") + connRateLimit = flag.Int("connection-rate-limit", 1, "Max number of connections to allow per second") ) var ( @@ -71,7 +72,10 @@ func main() { server.Routes.RegisterAll(parseMappings(*mappings)) - server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port))) + if *connRateLimit < 1 { + *connRateLimit = 1 + } + server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit) if *apiBinding != "" { server.StartApiServer(*apiBinding) diff --git a/mcproto/read.go b/mcproto/read.go index d5831fc..ab3986e 100644 --- a/mcproto/read.go +++ b/mcproto/read.go @@ -7,6 +7,7 @@ import ( "io" "net" "strings" + "time" ) func ReadPacket(reader io.Reader, addr net.Addr) (*Packet, error) { @@ -70,6 +71,15 @@ func ReadFrame(reader io.Reader, addr net.Addr) (*Frame, error) { WithField("total", total). WithField("length", frame.Length). Debug("Reading frame content") + + if n == 0 { + logrus. + WithField("client", addr). + WithField("frame", frame). + Debug("No progress on frame reading") + + time.Sleep(100 * time.Millisecond) + } } logrus. diff --git a/server/connector.go b/server/connector.go index bc47230..6f3cf9a 100644 --- a/server/connector.go +++ b/server/connector.go @@ -11,14 +11,13 @@ import ( ) const ( - connectionsLimitPerSec = 1 - handshakeTimeout = 30 * time.Second + handshakeTimeout = 2 * time.Second ) var noDeadline time.Time type IConnector interface { - StartAcceptingConnections(ctx context.Context, listenAddress string) error + StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error } var Connector IConnector = &connectorImpl{} @@ -26,7 +25,7 @@ var Connector IConnector = &connectorImpl{} type connectorImpl struct { } -func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAddress string) error { +func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error { ln, err := net.Listen("tcp", listenAddress) if err != nil { @@ -35,16 +34,16 @@ func (c *connectorImpl) StartAcceptingConnections(ctx context.Context, listenAdd } logrus.WithField("listenAddress", listenAddress).Info("Listening for Minecraft client connections") - go c.acceptConnections(ctx, ln) + go c.acceptConnections(ctx, ln, connRateLimit) return nil } -func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener) { +func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener, connRateLimit int) { //noinspection GoUnhandledErrorResult defer ln.Close() - limiter := time.Tick(time.Second / connectionsLimitPerSec) + limiter := time.Tick(time.Second / time.Duration(connRateLimit)) for { select { @@ -67,7 +66,9 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C defer frontendConn.Close() clientAddr := frontendConn.RemoteAddr() - logrus.WithFields(logrus.Fields{"clientAddr": clientAddr}).Info("Got connection") + logrus. + WithField("client", clientAddr). + Info("Got connection") inspectionBuffer := new(bytes.Buffer) @@ -76,7 +77,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C if err := frontendConn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { logrus. WithError(err). - WithField("client", frontendConn). + WithField("client", clientAddr). Error("Failed to set read deadline") return } @@ -128,14 +129,14 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C if err = frontendConn.SetReadDeadline(noDeadline); err != nil { logrus. WithError(err). - WithField("client", frontendConn). + WithField("client", clientAddr). Error("Failed to clear read deadline") return } pumpConnections(ctx, frontendConn, backendConn) } else { logrus. - WithField("client", frontendConn). + WithField("client", clientAddr). WithField("packetID", packet.PacketID). Error("Unexpected packetID, expected handshake") return @@ -145,17 +146,20 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { //noinspection GoUnhandledErrorResult defer backendConn.Close() + clientAddr := frontendConn.RemoteAddr() errors := make(chan error, 2) - go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend") - go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend") + go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr) + go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr) for { select { case err := <-errors: if err != io.EOF { - logrus.WithError(err).Error("Error observed on connection relay") + logrus.WithError(err). + WithField("client", clientAddr). + Error("Error observed on connection relay") } return @@ -166,10 +170,13 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { } } -func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string) { +func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) { amount, err := io.Copy(outgoing, incoming) if err != nil { errors <- err } - logrus.WithField("amount", amount).Infof("Finished relay %s->%s", from, to) + logrus. + WithField("client", clientAddr). + WithField("amount", amount). + Infof("Finished relay %s->%s", from, to) }