two level routing (based on how network switches work) (#241)

This commit is contained in:
Mike Sawka 2024-08-16 16:49:49 -07:00 committed by GitHub
parent 03587184a0
commit ae7d85630b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 132 additions and 62 deletions

View File

@ -150,8 +150,7 @@ func shutdownActivityUpdate() {
func createMainWshClient() {
rpc := wshserver.GetMainRpcClient()
wshutil.DefaultRouter.RegisterRoute("wavesrv", rpc)
wshutil.DefaultRouter.SetDefaultRoute("wavesrv")
wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc)
wps.Broker.SetClient(wshutil.DefaultRouter)
}

View File

@ -7,6 +7,11 @@ import * as WOS from "./wos";
// WshServerCommandToDeclMap
class WshServerType {
// command "announce" [call]
AnnounceCommand(data: string, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("announce", data, opts);
}
// command "authenticate" [call]
AuthenticateCommand(data: string, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("authenticate", data, opts);

View File

@ -11,6 +11,12 @@ import (
"github.com/wavetermdev/thenextwave/pkg/waveobj"
)
// command "announce", wshserver.AnnounceCommand
func AnnounceCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "announce", data, opts)
return err
}
// command "authenticate", wshserver.AuthenticateCommand
func AuthenticateCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "authenticate", data, opts)

View File

@ -25,6 +25,7 @@ const (
const (
Command_Authenticate = "authenticate"
Command_Announce = "announce" // special (for routing)
Command_Message = "message"
Command_GetMeta = "getmeta"
Command_SetMeta = "setmeta"
@ -60,6 +61,7 @@ type RespOrErrorUnion[T any] struct {
type WshRpcInterface interface {
AuthenticateCommand(ctx context.Context, data string) error
AnnounceCommand(ctx context.Context, data string) error // (special) announces a new route to the main router
MessageCommand(ctx context.Context, data CommandMessageData) error
GetMetaCommand(ctx context.Context, data CommandGetMetaData) (wstore.MetaMapType, error)

View File

@ -7,34 +7,46 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"log"
"sync"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
)
const DefaultRoute = "wavesrv"
const SysRoute = "sys" // this route doesn't exist, just a placeholder for system messages
// this works like a network switch
type routeInfo struct {
RpcId string
SourceRouteId string
DestRouteId string
}
type msgAndRoute struct {
msgBytes []byte
fromRouteId string
}
type WshRouter struct {
Lock *sync.Mutex
DefaultRoute string
RouteMap map[string]AbstractRpcClient
RpcMap map[string]*routeInfo
InputCh chan []byte
Lock *sync.Mutex
RouteMap map[string]AbstractRpcClient // routeid => client
UpstreamClient AbstractRpcClient // upstream client (if we are not the terminal router)
AnnouncedRoutes map[string]string // routeid => local routeid
RpcMap map[string]*routeInfo // rpcid => routeinfo
InputCh chan msgAndRoute
}
var DefaultRouter = NewWshRouter()
func NewWshRouter() *WshRouter {
rtn := &WshRouter{
Lock: &sync.Mutex{},
RouteMap: make(map[string]AbstractRpcClient),
RpcMap: make(map[string]*routeInfo),
InputCh: make(chan []byte, DefaultInputChSize),
Lock: &sync.Mutex{},
RouteMap: make(map[string]AbstractRpcClient),
AnnouncedRoutes: make(map[string]string),
RpcMap: make(map[string]*routeInfo),
InputCh: make(chan msgAndRoute, DefaultInputChSize),
}
go rtn.runServer()
return rtn
@ -47,15 +59,14 @@ func noRouteErr(routeId string) error {
return fmt.Errorf("no route for %q", routeId)
}
func (router *WshRouter) SendEvent(fullRoute string, event wshrpc.WaveEvent) {
nextRoute, routeId := popRoute(fullRoute)
func (router *WshRouter) SendEvent(routeId string, event wshrpc.WaveEvent) {
rpc := router.GetRpc(routeId)
if rpc == nil {
return
}
msg := RpcMessage{
Command: wshrpc.Command_Event,
Route: nextRoute,
Route: routeId,
Data: event,
}
msgBytes, err := json.Marshal(msg)
@ -76,7 +87,7 @@ func (router *WshRouter) handleNoRoute(msg RpcMessage) {
// no response needed, but send message back to source
respMsg := RpcMessage{Command: wshrpc.Command_Message, Route: msg.Source, Data: wshrpc.CommandMessageData{Message: nrErr.Error()}}
respBytes, _ := json.Marshal(respMsg)
router.InputCh <- respBytes
router.InputCh <- msgAndRoute{msgBytes: respBytes, fromRouteId: SysRoute}
return
}
// send error response
@ -86,10 +97,13 @@ func (router *WshRouter) handleNoRoute(msg RpcMessage) {
Error: nrErr.Error(),
}
respBytes, _ := json.Marshal(response)
router.InputCh <- respBytes
router.InputCh <- msgAndRoute{msgBytes: respBytes, fromRouteId: SysRoute}
}
func (router *WshRouter) registerRouteInfo(rpcId string, sourceRouteId string, destRouteId string) {
if rpcId == "" {
return
}
router.Lock.Lock()
defer router.Lock.Unlock()
router.RpcMap[rpcId] = &routeInfo{RpcId: rpcId, SourceRouteId: sourceRouteId, DestRouteId: destRouteId}
@ -107,27 +121,74 @@ func (router *WshRouter) getRouteInfo(rpcId string) *routeInfo {
return router.RpcMap[rpcId]
}
func (router *WshRouter) handleAnnounceMessage(msg RpcMessage, input msgAndRoute) {
// if we have an upstream, send it there
// if we don't (we are the terminal router), then add it to our announced route map
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(input.msgBytes)
return
}
if msg.Source == input.fromRouteId {
// not necessary to save the id mapping
return
}
router.Lock.Lock()
defer router.Lock.Unlock()
router.AnnouncedRoutes[msg.Source] = input.fromRouteId
}
func (router *WshRouter) getAnnouncedRoute(routeId string) string {
router.Lock.Lock()
defer router.Lock.Unlock()
return router.AnnouncedRoutes[routeId]
}
// returns true if message was sent, false if failed
func (router *WshRouter) sendRoutedMessage(msgBytes []byte, routeId string) bool {
rpc := router.GetRpc(routeId)
if rpc != nil {
rpc.SendRpcMessage(msgBytes)
return true
}
upstream := router.GetUpstreamClient()
if upstream != nil {
upstream.SendRpcMessage(msgBytes)
return true
} else {
// we are the upstream, so consult our announced routes map
localRouteId := router.getAnnouncedRoute(routeId)
rpc := router.GetRpc(localRouteId)
if rpc == nil {
return false
}
rpc.SendRpcMessage(msgBytes)
return true
}
}
func (router *WshRouter) runServer() {
for msgBytes := range router.InputCh {
for input := range router.InputCh {
msgBytes := input.msgBytes
var msg RpcMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
fmt.Println("error unmarshalling message: ", err)
continue
}
var routeId string
msg.Route, routeId = popRoute(msg.Route)
routeId := msg.Route
if msg.Command == wshrpc.Command_Announce {
router.handleAnnounceMessage(msg, input)
continue
}
if msg.Command != "" {
// new comand, setup new rpc
rpc := router.GetRpc(routeId)
if rpc == nil {
ok := router.sendRoutedMessage(msgBytes, routeId)
if !ok {
router.handleNoRoute(msg)
continue
}
if msg.ReqId != "" {
router.registerRouteInfo(msg.ReqId, msg.Source, routeId)
}
rpc.SendRpcMessage(msgBytes)
router.registerRouteInfo(msg.ReqId, msg.Source, routeId)
continue
}
// look at reqid or resid to route correctly
@ -137,10 +198,8 @@ func (router *WshRouter) runServer() {
// no route info, nothing to do
continue
}
rpc := router.GetRpc(routeInfo.DestRouteId)
if rpc != nil {
rpc.SendRpcMessage(msgBytes)
}
// no need to check the return value here (noop if failed)
router.sendRoutedMessage(msgBytes, routeInfo.DestRouteId)
continue
} else if msg.ResId != "" {
routeInfo := router.getRouteInfo(msg.ResId)
@ -148,10 +207,7 @@ func (router *WshRouter) runServer() {
// no route info, nothing to do
continue
}
rpc := router.GetRpc(routeInfo.SourceRouteId)
if rpc != nil {
rpc.SendRpcMessage(msgBytes)
}
router.sendRoutedMessage(msgBytes, routeInfo.SourceRouteId)
if !msg.Cont {
router.unregisterRouteInfo(msg.ResId)
}
@ -163,28 +219,23 @@ func (router *WshRouter) runServer() {
}
}
func addRoute(curRoute string, newRoute string) string {
if curRoute == "" {
return newRoute
}
return curRoute + "," + newRoute
}
// returns (newRoute, poppedRoute)
func popRoute(curRoute string) (string, string) {
routes := strings.Split(curRoute, ",")
if len(routes) == 1 {
return "", curRoute
}
return strings.Join(routes[:len(routes)-1], ","), routes[len(routes)-1]
}
// this will also consume the output channel of the abstract client
func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
if routeId == SysRoute {
// cannot register sys route
log.Printf("error: WshRouter cannot register sys route\n")
return
}
router.Lock.Lock()
defer router.Lock.Unlock()
router.RouteMap[routeId] = rpc
go func() {
// announce
if router.GetUpstreamClient() != nil {
announceMsg := RpcMessage{Command: wshrpc.Command_Announce, Source: routeId}
announceBytes, _ := json.Marshal(announceMsg)
router.GetUpstreamClient().SendRpcMessage(announceBytes)
}
for {
msgBytes, ok := rpc.RecvRpcMessage()
if !ok {
@ -196,14 +247,18 @@ func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
continue
}
if rpcMsg.Command != "" {
// new command, add source (for backward routing)
rpcMsg.Source = addRoute(rpcMsg.Source, routeId)
if rpcMsg.Source == "" {
rpcMsg.Source = routeId
}
if rpcMsg.Route == "" {
rpcMsg.Route = DefaultRoute
}
msgBytes, err = json.Marshal(rpcMsg)
if err != nil {
continue
}
}
router.InputCh <- msgBytes
router.InputCh <- msgAndRoute{msgBytes: msgBytes, fromRouteId: routeId}
}
}()
}
@ -214,18 +269,21 @@ func (router *WshRouter) UnregisterRoute(routeId string) {
delete(router.RouteMap, routeId)
}
func (router *WshRouter) SetDefaultRoute(routeId string) {
router.Lock.Lock()
defer router.Lock.Unlock()
router.DefaultRoute = routeId
}
// this may return nil (returns default only for empty routeId)
func (router *WshRouter) GetRpc(routeId string) AbstractRpcClient {
router.Lock.Lock()
defer router.Lock.Unlock()
if routeId == "" {
routeId = router.DefaultRoute
}
return router.RouteMap[routeId]
}
func (router *WshRouter) SetUpstreamClient(rpc AbstractRpcClient) {
router.Lock.Lock()
defer router.Lock.Unlock()
router.UpstreamClient = rpc
}
func (router *WshRouter) GetUpstreamClient() AbstractRpcClient {
router.Lock.Lock()
defer router.Lock.Unlock()
return router.UpstreamClient
}