Kubernetes auto scale-up support (#29)

This commit is contained in:
Michael Vorburger ⛑️ 2022-01-07 00:41:10 +01:00 committed by GitHub
parent d7c8b4628e
commit 32fb918128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 233 additions and 78 deletions

View File

@ -12,6 +12,8 @@ Routes Minecraft client connections to backend servers based upon the requested
```text ```text
-api-binding host:port -api-binding host:port
The host:port bound for servicing API requests (env API_BINDING) The host:port bound for servicing API requests (env API_BINDING)
-auto-scale-up
Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed (env AUTO_SCALE_UP)
-connection-rate-limit int -connection-rate-limit int
Max number of connections to allow per second (env CONNECTION_RATE_LIMIT) (default 1) Max number of connections to allow per second (env CONNECTION_RATE_LIMIT) (default 1)
-cpu-profile string -cpu-profile string
@ -19,9 +21,9 @@ Routes Minecraft client connections to backend servers based upon the requested
-debug -debug
Enable debug logs (env DEBUG) Enable debug logs (env DEBUG)
-in-kube-cluster -in-kube-cluster
Use in-cluster kubernetes config (env IN_KUBE_CLUSTER) Use in-cluster Kubernetes config (env IN_KUBE_CLUSTER)
-kube-config string -kube-config string
The path to a kubernetes configuration file (env KUBE_CONFIG) The path to a Kubernetes configuration file (env KUBE_CONFIG)
-mapping string -mapping string
Comma-separated mappings of externalHostname=host:port (env MAPPING) Comma-separated mappings of externalHostname=host:port (env MAPPING)
-metrics-backend string -metrics-backend string
@ -101,9 +103,9 @@ To test out this example, I added these two entries to my "hosts" file:
# Kubernetes Usage # Kubernetes Usage
## Using kubernetes service auto-discovery ## Using Kubernetes Service auto-discovery
When running `mc-router` as a kubernetes pod and you pass the `--in-kube-cluster` command-line argument, then When running `mc-router` as a Kubernetes Pod and you pass the `--in-kube-cluster` command-line argument, then
it will automatically watch for any services annotated with it will automatically watch for any services annotated with
- `mc-router.itzg.me/externalServerName` : The value of the annotation will be registered as the external hostname Minecraft clients would used to connect to the - `mc-router.itzg.me/externalServerName` : The value of the annotation will be registered as the external hostname Minecraft clients would used to connect to the
routed service. The service's clusterIP and target port are used as the routed backend. You can use more hostnames by splitting them with comma. routed service. The service's clusterIP and target port are used as the routed backend. You can use more hostnames by splitting them with comma.
@ -140,7 +142,7 @@ metadata:
"mc-router.itzg.me/externalServerName": "external.host.name,other.host.name" "mc-router.itzg.me/externalServerName": "external.host.name,other.host.name"
``` ```
## Example kubernetes deployment ## Example Kubernetes deployment
[This example deployment](docs/k8s-example-auto.yaml) [This example deployment](docs/k8s-example-auto.yaml)
* Declares an `mc-router` service that exposes a node port 25565 * Declares an `mc-router` service that exposes a node port 25565
@ -160,6 +162,29 @@ kubectl apply -f https://raw.githubusercontent.com/itzg/mc-router/master/docs/k8
* I extended the allowed node port range by adding `--service-node-port-range=25000-32767` * I extended the allowed node port range by adding `--service-node-port-range=25000-32767`
to `/etc/kubernetes/manifests/kube-apiserver.yaml` to `/etc/kubernetes/manifests/kube-apiserver.yaml`
#### Auto Scale Up
The `-auto-scale-up` flag argument makes the router "wake up" any stopped backend servers, by changing `replicas: 0` to `replicas: 1`.
This requires using `kind: StatefulSet` instead of `kind: Service` for the Minecraft backend servers.
It also requires the `ClusterRole` to permit `get` + `update` for `statefulsets` & `statefulsets/scale`,
e.g. like this (or some equivalent more fine-grained one to only watch/list services+statefulsets, and only get+update scale):
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: services-watcher
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["watch","list"]
- apiGroups: ["apps"]
resources: ["statefulsets", "statefulsets/scale"]
verbs: ["watch","list","get","update"]
```
# Development # Development
## Building locally with Docker ## Building locally with Docker
@ -168,16 +193,26 @@ kubectl apply -f https://raw.githubusercontent.com/itzg/mc-router/master/docs/k8
docker build -t mc-router . docker build -t mc-router .
``` ```
## Build locally without Docker
After [installing Go](https://go.dev/doc/install) and doing a `go mod download` to install all required prerequisites, just like the [Dockerfile](Dockerfile) does, you can:
```bash
make test # go test -v ./...
go build ./cmd/mc-router/
```
## Skaffold ## Skaffold
For "in-cluster development" it's convenient to use https://skaffold.dev. Any changes to Go source code For "in-cluster development" it's convenient to use https://skaffold.dev. Any changes to Go source code
will trigger a go build, new container image pushed to registry with a new tag, and refresh in Kubernetes will trigger a go build, new container image pushed to registry with a new tag, and refresh in Kubernetes
with the image tag used in the deployment transparently updated to the new tag and thus new pod created pulling new images: with the image tag used in the deployment transparently updated to the new tag and thus new pod created pulling new images,
as configured by [skaffold.yaml](skaffold.yaml):
skaffold dev skaffold dev
When using Google Cloud (GCP), first create a _Docker Artifact Registry_, When using Google Cloud (GCP), first create a _Docker Artifact Registry_,
then add the _Artifact Registry Reader_ Role to the _Compute Engine default service account_ of your GKE clusterService Account_ (to avoid error like "container mc-router is waiting to start: ...-docker.pkg.dev/... can't be pulled"), then add the _Artifact Registry Reader_ Role to the _Compute Engine default service account_ of your _GKE `clusterService` Account_ (to avoid error like "container mc-router is waiting to start: ...-docker.pkg.dev/... can't be pulled"),
then use e.g. `gcloud auth configure-docker europe-docker.pkg.dev` or equivalent one time (to create a `~/.docker/config.json`), then use e.g. `gcloud auth configure-docker europe-docker.pkg.dev` or equivalent one time (to create a `~/.docker/config.json`),
and then use e.g. `--default-repo=europe-docker.pkg.dev/YOUR-PROJECT/YOUR-ARTIFACT-REGISTRY` option for `skaffold dev`. and then use e.g. `--default-repo=europe-docker.pkg.dev/YOUR-PROJECT/YOUR-ARTIFACT-REGISTRY` option for `skaffold dev`.

View File

@ -37,8 +37,9 @@ type Config struct {
CpuProfile string `usage:"Enables CPU profiling and writes to given path"` CpuProfile string `usage:"Enables CPU profiling and writes to given path"`
Debug bool `usage:"Enable debug logs"` Debug bool `usage:"Enable debug logs"`
ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"` ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"`
InKubeCluster bool `usage:"Use in-cluster kubernetes config"` InKubeCluster bool `usage:"Use in-cluster Kubernetes config"`
KubeConfig string `usage:"The path to a kubernetes configuration file"` KubeConfig string `usage:"The path to a Kubernetes configuration file"`
AutoScaleUp bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"`
MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb"` MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb"`
UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"` UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"`
MetricsBackendConfig MetricsBackendConfig MetricsBackendConfig MetricsBackendConfig
@ -113,14 +114,14 @@ func main() {
} }
if config.InKubeCluster { if config.InKubeCluster {
err = server.K8sWatcher.StartInCluster() err = server.K8sWatcher.StartInCluster(config.AutoScaleUp)
if err != nil { if err != nil {
logrus.WithError(err).Fatal("Unable to start k8s integration") logrus.WithError(err).Fatal("Unable to start k8s integration")
} else { } else {
defer server.K8sWatcher.Stop() defer server.K8sWatcher.Stop()
} }
} else if config.KubeConfig != "" { } else if config.KubeConfig != "" {
err := server.K8sWatcher.StartWithConfig(config.KubeConfig) err := server.K8sWatcher.StartWithConfig(config.KubeConfig, config.AutoScaleUp)
if err != nil { if err != nil {
logrus.WithError(err).Fatal("Unable to start k8s integration") logrus.WithError(err).Fatal("Unable to start k8s integration")
} else { } else {

View File

@ -20,6 +20,7 @@ spec:
containers: containers:
- image: itzg/mc-router:latest - image: itzg/mc-router:latest
name: mc-router name: mc-router
# Add "--auto-scale-up" here for https://github.com/itzg/mc-router/#auto-scale-up
args: ["--api-binding", ":8080", "--in-kube-cluster"] args: ["--api-binding", ":8080", "--in-kube-cluster"]
ports: ports:
- name: proxy - name: proxy

View File

@ -167,7 +167,15 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C
func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn net.Conn, func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn net.Conn,
clientAddr net.Addr, preReadContent io.Reader, serverAddress string) { clientAddr net.Addr, preReadContent io.Reader, serverAddress string) {
backendHostPort, resolvedHost := Routes.FindBackendForServerAddress(serverAddress) backendHostPort, resolvedHost, waker := Routes.FindBackendForServerAddress(ctx, serverAddress)
if waker != nil {
if err := waker(ctx); err != nil {
logrus.WithFields(logrus.Fields{"serverAddress": serverAddress}).WithError(err).Error("failed to wake up backend")
c.metrics.Errors.With("type", "wakeup_failed").Add(1)
return
}
}
if backendHostPort == "" { if backendHostPort == "" {
logrus.WithField("serverAddress", serverAddress).Warn("Unable to find registered backend") logrus.WithField("serverAddress", serverAddress).Warn("Unable to find registered backend")
c.metrics.Errors.With("type", "missing_backend").Add(1) c.metrics.Errors.With("type", "missing_backend").Add(1)

View File

@ -1,13 +1,19 @@
package server package server
import ( import (
"context"
"fmt"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1" apps "k8s.io/api/apps/v1"
autoscaling "k8s.io/api/autoscaling/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
@ -21,51 +27,57 @@ const (
) )
type IK8sWatcher interface { type IK8sWatcher interface {
StartWithConfig(kubeConfigFile string) error StartWithConfig(kubeConfigFile string, autoScaleUp bool) error
StartInCluster() error StartInCluster(autoScaleUp bool) error
Stop() Stop()
} }
var K8sWatcher IK8sWatcher = &k8sWatcherImpl{} var K8sWatcher IK8sWatcher = &k8sWatcherImpl{}
type k8sWatcherImpl struct { type k8sWatcherImpl struct {
sync.RWMutex
// The key in mappings is a Service, and the value the StatefulSet name
mappings map[string]string
clientset *kubernetes.Clientset
stop chan struct{} stop chan struct{}
} }
func (w *k8sWatcherImpl) StartInCluster() error { func (w *k8sWatcherImpl) StartInCluster(autoScaleUp bool) error {
config, err := rest.InClusterConfig() config, err := rest.InClusterConfig()
if err != nil { if err != nil {
return errors.Wrap(err, "Unable to load in-cluster config") return errors.Wrap(err, "Unable to load in-cluster config")
} }
return w.startWithLoadedConfig(config) return w.startWithLoadedConfig(config, autoScaleUp)
} }
func (w *k8sWatcherImpl) StartWithConfig(kubeConfigFile string) error { func (w *k8sWatcherImpl) StartWithConfig(kubeConfigFile string, autoScaleUp bool) error {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile) config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil { if err != nil {
return errors.Wrap(err, "Could not load kube config file") return errors.Wrap(err, "Could not load kube config file")
} }
return w.startWithLoadedConfig(config) return w.startWithLoadedConfig(config, autoScaleUp)
} }
func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config) error { func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config, autoScaleUp bool) error {
w.stop = make(chan struct{}, 1)
clientset, err := kubernetes.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
return errors.Wrap(err, "Could not create kube clientset") return errors.Wrap(err, "Could not create kube clientset")
} }
w.clientset = clientset
watchlist := cache.NewListWatchFromClient( _, serviceController := cache.NewInformer(
cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(), clientset.CoreV1().RESTClient(),
string(v1.ResourceServices), string(core.ResourceServices),
v1.NamespaceAll, core.NamespaceAll,
fields.Everything(), fields.Everything(),
) ),
&core.Service{},
_, controller := cache.NewInformer(
watchlist,
&v1.Service{},
0, 0,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: w.handleAdd, AddFunc: w.handleAdd,
@ -73,17 +85,64 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config) error {
UpdateFunc: w.handleUpdate, UpdateFunc: w.handleUpdate,
}, },
) )
go serviceController.Run(w.stop)
w.stop = make(chan struct{}, 1) w.mappings = make(map[string]string)
logrus.Info("Monitoring kubernetes for minecraft services") if autoScaleUp {
go controller.Run(w.stop) _, statefulSetController := cache.NewInformer(
cache.NewListWatchFromClient(
clientset.AppsV1().RESTClient(),
"statefulSets",
core.NamespaceAll,
fields.Everything(),
),
&apps.StatefulSet{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
w.mappings[statefulSet.Spec.ServiceName] = statefulSet.Name
},
DeleteFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, statefulSet.Spec.ServiceName)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldStatefulSet, ok := oldObj.(*apps.StatefulSet)
if !ok {
return
}
newStatefulSet, ok := newObj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, oldStatefulSet.Spec.ServiceName)
w.mappings[newStatefulSet.Spec.ServiceName] = newStatefulSet.Name
},
},
)
go statefulSetController.Run(w.stop)
}
logrus.Info("Monitoring Kubernetes for Minecraft services")
return nil return nil
} }
// oldObj and newObj are expected to be *v1.Service // oldObj and newObj are expected to be *v1.Service
func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) { func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
for _, oldRoutableService := range extractRoutableServices(oldObj) { for _, oldRoutableService := range w.extractRoutableServices(oldObj) {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"old": oldRoutableService, "old": oldRoutableService,
}).Debug("UPDATE") }).Debug("UPDATE")
@ -92,12 +151,12 @@ func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
} }
} }
for _, newRoutableService := range extractRoutableServices(newObj) { for _, newRoutableService := range w.extractRoutableServices(newObj) {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"new": newRoutableService, "new": newRoutableService,
}).Debug("UPDATE") }).Debug("UPDATE")
if newRoutableService.externalServiceName != "" { if newRoutableService.externalServiceName != "" {
Routes.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint) Routes.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp)
} else { } else {
Routes.SetDefaultRoute(newRoutableService.containerEndpoint) Routes.SetDefaultRoute(newRoutableService.containerEndpoint)
} }
@ -106,7 +165,7 @@ func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
// obj is expected to be a *v1.Service // obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleDelete(obj interface{}) { func (w *k8sWatcherImpl) handleDelete(obj interface{}) {
routableServices := extractRoutableServices(obj) routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices { for _, routableService := range routableServices {
if routableService != nil { if routableService != nil {
logrus.WithField("routableService", routableService).Debug("DELETE") logrus.WithField("routableService", routableService).Debug("DELETE")
@ -122,13 +181,13 @@ func (w *k8sWatcherImpl) handleDelete(obj interface{}) {
// obj is expected to be a *v1.Service // obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleAdd(obj interface{}) { func (w *k8sWatcherImpl) handleAdd(obj interface{}) {
routableServices := extractRoutableServices(obj) routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices { for _, routableService := range routableServices {
if routableService != nil { if routableService != nil {
logrus.WithField("routableService", routableService).Debug("ADD") logrus.WithField("routableService", routableService).Debug("ADD")
if routableService.externalServiceName != "" { if routableService.externalServiceName != "" {
Routes.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint) Routes.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp)
} else { } else {
Routes.SetDefaultRoute(routableService.containerEndpoint) Routes.SetDefaultRoute(routableService.containerEndpoint)
} }
@ -138,18 +197,19 @@ func (w *k8sWatcherImpl) handleAdd(obj interface{}) {
func (w *k8sWatcherImpl) Stop() { func (w *k8sWatcherImpl) Stop() {
if w.stop != nil { if w.stop != nil {
w.stop <- struct{}{} close(w.stop)
} }
} }
type routableService struct { type routableService struct {
externalServiceName string externalServiceName string
containerEndpoint string containerEndpoint string
autoScaleUp func(ctx context.Context) error
} }
// obj is expected to be a *v1.Service // obj is expected to be a *v1.Service
func extractRoutableServices(obj interface{}) []*routableService { func (w *k8sWatcherImpl) extractRoutableServices(obj interface{}) []*routableService {
service, ok := obj.(*v1.Service) service, ok := obj.(*core.Service)
if !ok { if !ok {
return nil return nil
} }
@ -158,17 +218,17 @@ func extractRoutableServices(obj interface{}) []*routableService {
if externalServiceName, exists := service.Annotations[AnnotationExternalServerName]; exists { if externalServiceName, exists := service.Annotations[AnnotationExternalServerName]; exists {
serviceNames := strings.Split(externalServiceName, ",") serviceNames := strings.Split(externalServiceName, ",")
for _, serviceName := range serviceNames { for _, serviceName := range serviceNames {
routableServices = append(routableServices, buildDetails(service, serviceName)) routableServices = append(routableServices, w.buildDetails(service, serviceName))
} }
return routableServices return routableServices
} else if _, exists := service.Annotations[AnnotationDefaultServer]; exists { } else if _, exists := service.Annotations[AnnotationDefaultServer]; exists {
return []*routableService{buildDetails(service, "")} return []*routableService{w.buildDetails(service, "")}
} }
return nil return nil
} }
func buildDetails(service *v1.Service, externalServiceName string) *routableService { func (w *k8sWatcherImpl) buildDetails(service *core.Service, externalServiceName string) *routableService {
clusterIp := service.Spec.ClusterIP clusterIp := service.Spec.ClusterIP
port := "25565" port := "25565"
for _, p := range service.Spec.Ports { for _, p := range service.Spec.Ports {
@ -179,6 +239,45 @@ func buildDetails(service *v1.Service, externalServiceName string) *routableServ
rs := &routableService{ rs := &routableService{
externalServiceName: externalServiceName, externalServiceName: externalServiceName,
containerEndpoint: net.JoinHostPort(clusterIp, port), containerEndpoint: net.JoinHostPort(clusterIp, port),
autoScaleUp: w.buildScaleUpFunction(service),
} }
return rs return rs
} }
func (w *k8sWatcherImpl) buildScaleUpFunction(service *core.Service) func(ctx context.Context) error {
return func(ctx context.Context) error {
serviceName := service.Name
if statefulSetName, exists := w.mappings[serviceName]; exists {
if scale, err := w.clientset.AppsV1().StatefulSets(service.Namespace).GetScale(ctx, statefulSetName, meta.GetOptions{}); err == nil {
replicas := scale.Status.Replicas
logrus.WithFields(logrus.Fields{
"service": serviceName,
"statefulSet": statefulSetName,
"replicas": replicas,
}).Debug("StatefulSet of Service Replicas")
if replicas == 0 {
if _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale(ctx, statefulSetName, &autoscaling.Scale{
ObjectMeta: meta.ObjectMeta{
Name: scale.Name,
Namespace: scale.Namespace,
UID: scale.UID,
ResourceVersion: scale.ResourceVersion,
},
Spec: autoscaling.ScaleSpec{Replicas: 1}}, meta.UpdateOptions{},
); err == nil {
logrus.WithFields(logrus.Fields{
"service": serviceName,
"statefulSet": statefulSetName,
"replicas": replicas,
}).Info("StatefulSet Replicas Autoscaled from 0 to 1 (wake up)")
} else {
return errors.Wrap(err, "UpdateScale for Replicas=1 failed for StatefulSet: "+statefulSetName)
}
}
} else {
return fmt.Errorf("GetScale failed for StatefulSet %s: %w", statefulSetName, err)
}
}
return nil
}
}

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"encoding/json" "encoding/json"
"testing" "testing"
@ -87,7 +88,7 @@ func TestK8sWatcherImpl_handleAddThenUpdate(t *testing.T) {
watcher.handleAdd(&initialSvc) watcher.handleAdd(&initialSvc)
for _, s := range test.initial.scenarios { for _, s := range test.initial.scenarios {
backend, _ := Routes.FindBackendForServerAddress(s.given) backend, _, _ := Routes.FindBackendForServerAddress(context.Background(), s.given)
assert.Equal(t, s.expect, backend, "initial: given=%s", s.given) assert.Equal(t, s.expect, backend, "initial: given=%s", s.given)
} }
@ -97,7 +98,7 @@ func TestK8sWatcherImpl_handleAddThenUpdate(t *testing.T) {
watcher.handleUpdate(&initialSvc, &updatedSvc) watcher.handleUpdate(&initialSvc, &updatedSvc)
for _, s := range test.update.scenarios { for _, s := range test.update.scenarios {
backend, _ := Routes.FindBackendForServerAddress(s.given) backend, _, _ := Routes.FindBackendForServerAddress(context.Background(), s.given)
assert.Equal(t, s.expect, backend, "update: given=%s", s.given) assert.Equal(t, s.expect, backend, "update: given=%s", s.given)
} }
}) })
@ -159,13 +160,13 @@ func TestK8sWatcherImpl_handleAddThenDelete(t *testing.T) {
watcher.handleAdd(&initialSvc) watcher.handleAdd(&initialSvc)
for _, s := range test.initial.scenarios { for _, s := range test.initial.scenarios {
backend, _ := Routes.FindBackendForServerAddress(s.given) backend, _, _ := Routes.FindBackendForServerAddress(context.Background(), s.given)
assert.Equal(t, s.expect, backend, "initial: given=%s", s.given) assert.Equal(t, s.expect, backend, "initial: given=%s", s.given)
} }
watcher.handleDelete(&initialSvc) watcher.handleDelete(&initialSvc)
for _, s := range test.delete { for _, s := range test.delete {
backend, _ := Routes.FindBackendForServerAddress(s.given) backend, _, _ := Routes.FindBackendForServerAddress(context.Background(), s.given)
assert.Equal(t, s.expect, backend, "update: given=%s", s.given) assert.Equal(t, s.expect, backend, "update: given=%s", s.given)
} }
}) })

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"encoding/json" "encoding/json"
"net/http" "net/http"
"strings" "strings"
@ -61,7 +62,7 @@ func routesCreateHandler(writer http.ResponseWriter, request *http.Request) {
return return
} }
Routes.CreateMapping(definition.ServerAddress, definition.Backend) Routes.CreateMapping(definition.ServerAddress, definition.Backend, func(ctx context.Context) error { return nil })
writer.WriteHeader(http.StatusCreated) writer.WriteHeader(http.StatusCreated)
} }
@ -87,12 +88,12 @@ func routesSetDefault(writer http.ResponseWriter, request *http.Request) {
type IRoutes interface { type IRoutes interface {
RegisterAll(mappings map[string]string) RegisterAll(mappings map[string]string)
// FindBackendForServerAddress returns the host:port for the external server address, if registered. // FindBackendForServerAddress returns the host:port for the external server address, if registered.
// Otherwise, an empty string is returned // Otherwise, an empty string is returned. Also returns the normalized version of the given serverAddress.
// Also returns the normalized version of the given serverAddress // The 3rd value returned is an (optional) "waker" function which a caller must invoke to wake up serverAddress.
FindBackendForServerAddress(serverAddress string) (string, string) FindBackendForServerAddress(ctx context.Context, serverAddress string) (string, string, func(ctx context.Context) error)
GetMappings() map[string]string GetMappings() map[string]string
DeleteMapping(serverAddress string) bool DeleteMapping(serverAddress string) bool
CreateMapping(serverAddress string, backend string) CreateMapping(serverAddress string, backend string, waker func(ctx context.Context) error)
SetDefaultRoute(backend string) SetDefaultRoute(backend string)
} }
@ -100,7 +101,7 @@ var Routes IRoutes = &routesImpl{}
func NewRoutes() IRoutes { func NewRoutes() IRoutes {
r := &routesImpl{ r := &routesImpl{
mappings: make(map[string]string), mappings: make(map[string]mapping),
} }
return r return r
@ -110,12 +111,20 @@ func (r *routesImpl) RegisterAll(mappings map[string]string) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.mappings = mappings r.mappings = make(map[string]mapping)
for k, v := range mappings {
r.mappings[k] = mapping{backend: v, waker: func(ctx context.Context) error { return nil }}
}
}
type mapping struct {
backend string
waker func(ctx context.Context) error
} }
type routesImpl struct { type routesImpl struct {
sync.RWMutex sync.RWMutex
mappings map[string]string mappings map[string]mapping
defaultRoute string defaultRoute string
} }
@ -127,7 +136,7 @@ func (r *routesImpl) SetDefaultRoute(backend string) {
}).Info("Using default route") }).Info("Using default route")
} }
func (r *routesImpl) FindBackendForServerAddress(serverAddress string) (string, string) { func (r *routesImpl) FindBackendForServerAddress(ctx context.Context, serverAddress string) (string, string, func(ctx context.Context) error) {
r.RLock() r.RLock()
defer r.RUnlock() defer r.RUnlock()
@ -136,11 +145,11 @@ func (r *routesImpl) FindBackendForServerAddress(serverAddress string) (string,
address := strings.ToLower(addressParts[0]) address := strings.ToLower(addressParts[0])
if r.mappings != nil { if r.mappings != nil {
if route, exists := r.mappings[address]; exists { if mapping, exists := r.mappings[address]; exists {
return route, address return mapping.backend, address, mapping.waker
} }
} }
return r.defaultRoute, address return r.defaultRoute, address, nil
} }
func (r *routesImpl) GetMappings() map[string]string { func (r *routesImpl) GetMappings() map[string]string {
@ -149,7 +158,7 @@ func (r *routesImpl) GetMappings() map[string]string {
result := make(map[string]string, len(r.mappings)) result := make(map[string]string, len(r.mappings))
for k, v := range r.mappings { for k, v := range r.mappings {
result[k] = v result[k] = v.backend
} }
return result return result
} }
@ -167,7 +176,7 @@ func (r *routesImpl) DeleteMapping(serverAddress string) bool {
} }
} }
func (r *routesImpl) CreateMapping(serverAddress string, backend string) { func (r *routesImpl) CreateMapping(serverAddress string, backend string, waker func(ctx context.Context) error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -177,5 +186,5 @@ func (r *routesImpl) CreateMapping(serverAddress string, backend string) {
"serverAddress": serverAddress, "serverAddress": serverAddress,
"backend": backend, "backend": backend,
}).Info("Creating route") }).Info("Creating route")
r.mappings[serverAddress] = backend r.mappings[serverAddress] = mapping{backend: backend, waker: waker}
} }

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -45,9 +46,9 @@ func Test_routesImpl_FindBackendForServerAddress(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
r := NewRoutes() r := NewRoutes()
r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend) r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend, func(ctx context.Context) error { return nil })
if got, server := r.FindBackendForServerAddress(tt.args.serverAddress); got != tt.want { if got, server, _ := r.FindBackendForServerAddress(context.Background(), tt.args.serverAddress); got != tt.want {
t.Errorf("routesImpl.FindBackendForServerAddress() = %v, want %v", got, tt.want) t.Errorf("routesImpl.FindBackendForServerAddress() = %v, want %v", got, tt.want)
} else { } else {
assert.Equal(t, tt.mapping.serverAddress, server) assert.Equal(t, tt.mapping.serverAddress, server)