Initial support for tracking connection metrics

This commit is contained in:
Geoff Bourne 2019-07-14 16:34:46 -05:00
parent a86eb65ca5
commit b290243d40
8 changed files with 101 additions and 16 deletions

View File

@ -25,6 +25,7 @@ var (
cpuProfile = flag.String("cpu-profile", "", "Enables CPU profiling and writes to given path")
debug = flag.Bool("debug", false, "Enable debug logs")
connRateLimit = flag.Int("connection-rate-limit", 1, "Max number of connections to allow per second")
metricsBackend = flag.String("metrics-backend", "discard", "Backend to use for metrics exposure/publishing: discard,expvar")
)
var (
@ -67,6 +68,8 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
metricsBuilder := NewMetricsBuilder()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
@ -75,7 +78,8 @@ func main() {
if *connRateLimit < 1 {
*connRateLimit = 1
}
server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit)
connector := server.NewConnector(metricsBuilder.BuildConnectorMetrics())
connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit)
if *apiBinding != "" {
server.StartApiServer(*apiBinding)

48
cmd/mc-router/metrics.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
discardMetrics "github.com/go-kit/kit/metrics/discard"
expvarMetrics "github.com/go-kit/kit/metrics/expvar"
"github.com/itzg/mc-router/server"
"github.com/sirupsen/logrus"
)
type MetricsBuilder interface {
BuildConnectorMetrics() *server.ConnectorMetrics
}
func NewMetricsBuilder() MetricsBuilder {
switch *metricsBackend {
case "discard":
return &discardMetricsBuilder{}
case "expvar":
return &expvarMetricsBuilder{}
default:
logrus.Fatalf("Unsupported metrics backend: %s", metricsBackend)
return nil
}
}
type expvarMetricsBuilder struct {
}
func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
return &server.ConnectorMetrics{
Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"),
BytesTransmitted: expvarMetrics.NewCounter("bytes"),
Connections: expvarMetrics.NewCounter("connections"),
ActiveConnections: expvarMetrics.NewGauge("active_connections"),
}
}
type discardMetricsBuilder struct {
}
func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
return &server.ConnectorMetrics{
Errors: discardMetrics.NewCounter(),
BytesTransmitted: discardMetrics.NewCounter(),
Connections: discardMetrics.NewCounter(),
ActiveConnections: discardMetrics.NewGauge(),
}
}

2
go.mod
View File

@ -3,6 +3,8 @@ module github.com/itzg/mc-router
go 1.12
require (
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/go-kit/kit v0.9.0
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.1 // indirect

4
go.sum
View File

@ -1,6 +1,10 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk=

View File

@ -1,15 +1,19 @@
package server
import (
"net/http"
"github.com/sirupsen/logrus"
"expvar"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"net/http"
)
var apiRoutes = mux.NewRouter()
func StartApiServer(apiBinding string) {
logrus.WithField("binding", apiBinding).Info("Serving API requests")
apiRoutes.Path("/vars").Handler(expvar.Handler())
go func() {
logrus.WithError(
http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed")

View File

@ -3,6 +3,7 @@ package server
import (
"bytes"
"context"
"github.com/go-kit/kit/metrics"
"github.com/itzg/mc-router/mcproto"
"github.com/juju/ratelimit"
"github.com/sirupsen/logrus"
@ -17,11 +18,12 @@ const (
var noDeadline time.Time
type IConnector interface {
type Connector interface {
StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error
}
type ConnectorMetrics struct {
Errors metrics.Counter
BytesTransmitted metrics.Counter
Connections metrics.Counter
ActiveConnections metrics.Gauge
@ -76,6 +78,7 @@ func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener,
}
func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.Conn) {
c.metrics.Connections.With("direction", "frontend").Add(1)
//noinspection GoUnhandledErrorResult
defer frontendConn.Close()
@ -94,11 +97,13 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
WithError(err).
WithField("client", clientAddr).
Error("Failed to set read deadline")
c.metrics.Errors.With("type", "read_deadline").Add(1)
return
}
packet, err := mcproto.ReadPacket(inspectionReader, clientAddr, c.state)
if err != nil {
logrus.WithError(err).WithField("clientAddr", clientAddr).Error("Failed to read packet")
c.metrics.Errors.With("type", "read").Add(1)
return
}
@ -113,6 +118,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
if err != nil {
logrus.WithError(err).WithField("clientAddr", clientAddr).
Error("Failed to read handshake")
c.metrics.Errors.With("type", "read").Add(1)
return
}
@ -131,6 +137,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
WithField("client", clientAddr).
WithField("packet", packet).
Warn("Unexpected data type for PacketIdLegacyServerListPing")
c.metrics.Errors.With("type", "unexpected_content").Add(1)
return
}
@ -147,6 +154,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
WithField("client", clientAddr).
WithField("packetID", packet.PacketID).
Error("Unexpected packetID, expected handshake")
c.metrics.Errors.With("type", "unexpected_content").Add(1)
return
}
}
@ -154,9 +162,10 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn net.Conn,
clientAddr net.Addr, preReadContent io.Reader, serverAddress string) {
backendHostPort := Routes.FindBackendForServerAddress(serverAddress)
backendHostPort, resolvedHost := Routes.FindBackendForServerAddress(serverAddress)
if backendHostPort == "" {
logrus.WithField("serverAddress", serverAddress).Warn("Unable to find registered backend")
c.metrics.Errors.With("type", "missing_backend").Add(1)
return
}
logrus.
@ -172,11 +181,18 @@ func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn
WithField("serverAddress", serverAddress).
WithField("backend", backendHostPort).
Warn("Unable to connect to backend")
c.metrics.Errors.With("type", "backend_failed").Add(1)
return
}
c.metrics.Connections.With("direction", "backend", "host", resolvedHost).Add(1)
c.metrics.ActiveConnections.Add(1)
defer c.metrics.ActiveConnections.Add(-1)
amount, err := io.Copy(backendConn, preReadContent)
if err != nil {
logrus.WithError(err).Error("Failed to write handshake to backend connection")
c.metrics.Errors.With("type", "backend_failed").Add(1)
return
}
logrus.WithField("amount", amount).Debug("Relayed handshake to backend")
@ -185,13 +201,14 @@ func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn
WithError(err).
WithField("client", clientAddr).
Error("Failed to clear read deadline")
c.metrics.Errors.With("type", "read_deadline").Add(1)
return
}
pumpConnections(ctx, frontendConn, backendConn)
c.pumpConnections(ctx, frontendConn, backendConn)
return
}
func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
func (c *connectorImpl) pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
//noinspection GoUnhandledErrorResult
defer backendConn.Close()
@ -200,8 +217,8 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
errors := make(chan error, 2)
go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr)
go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr)
go c.pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr)
go c.pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr)
select {
case err := <-errors:
@ -209,6 +226,7 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
logrus.WithError(err).
WithField("client", clientAddr).
Error("Error observed on connection relay")
c.metrics.Errors.With("type", "relay").Add(1)
}
case <-ctx.Done():
@ -216,13 +234,15 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) {
}
}
func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) {
func (c *connectorImpl) pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) {
amount, err := io.Copy(outgoing, incoming)
logrus.
WithField("client", clientAddr).
WithField("amount", amount).
Infof("Finished relay %s->%s", from, to)
c.metrics.BytesTransmitted.Add(float64(amount))
if err != nil {
errors <- err
} else {

View File

@ -87,7 +87,7 @@ type IRoutes interface {
RegisterAll(mappings map[string]string)
// FindBackendForServerAddress returns the host:port for the external server address, if registered.
// Otherwise, an empty string is returned
FindBackendForServerAddress(serverAddress string) string
FindBackendForServerAddress(serverAddress string) (string, string)
GetMappings() map[string]string
DeleteMapping(serverAddress string) bool
CreateMapping(serverAddress string, backend string)
@ -125,7 +125,7 @@ func (r *routesImpl) SetDefaultRoute(backend string) {
}).Info("Using default route")
}
func (r *routesImpl) FindBackendForServerAddress(serverAddress string) string {
func (r *routesImpl) FindBackendForServerAddress(serverAddress string) (string, string) {
r.RLock()
defer r.RUnlock()
@ -134,13 +134,13 @@ func (r *routesImpl) FindBackendForServerAddress(serverAddress string) string {
address := strings.ToLower(addressParts[0])
if r.mappings == nil {
return r.defaultRoute
return r.defaultRoute, address
} else {
if route, exists := r.mappings[address]; exists {
return route
return route, address
} else {
return r.defaultRoute
return r.defaultRoute, address
}
}
}

View File

@ -1,6 +1,7 @@
package server
import (
"github.com/stretchr/testify/assert"
"testing"
)
@ -45,8 +46,10 @@ func Test_routesImpl_FindBackendForServerAddress(t *testing.T) {
r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend)
if got := r.FindBackendForServerAddress(tt.args.serverAddress); got != tt.want {
if got, server := r.FindBackendForServerAddress(tt.args.serverAddress); got != tt.want {
t.Errorf("routesImpl.FindBackendForServerAddress() = %v, want %v", got, tt.want)
} else {
assert.Equal(t, tt.mapping.serverAddress, server)
}
})
}