write wshrpcmultiproxy. add authtoken to message, add dispose call to api. multiproxy manages a set of authenticated proxies on the wavesrv side

This commit is contained in:
sawka 2024-10-21 13:22:16 -07:00
parent 613a583513
commit c512dd8cb6
8 changed files with 203 additions and 23 deletions

View File

@ -15,18 +15,34 @@ var serverCmd = &cobra.Command{
Hidden: true, Hidden: true,
Short: "remote server to power wave blocks", Short: "remote server to power wave blocks",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: serverRun, RunE: serverRun,
PreRunE: preRunSetupRpcClient,
} }
var connServerRouter bool
func init() { func init() {
serverCmd.Flags().BoolVar(&connServerRouter, "router", false, "run in local router mode")
rootCmd.AddCommand(serverCmd) rootCmd.AddCommand(serverCmd)
} }
func serverRun(cmd *cobra.Command, args []string) { func serverRunRouter() error {
select {}
}
func serverRunNormal() error {
err := setupRpcClient(&wshremote.ServerImpl{LogWriter: os.Stdout})
if err != nil {
return err
}
WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn) WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
go wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn) go wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
RpcClient.SetServerImpl(&wshremote.ServerImpl{LogWriter: os.Stdout})
select {} // run forever select {} // run forever
} }
func serverRun(cmd *cobra.Command, args []string) error {
if connServerRouter {
return serverRunRouter()
} else {
return serverRunNormal()
}
}

View File

@ -72,6 +72,11 @@ class RpcApiType {
return client.wshRpcCall("deleteblock", data, opts); return client.wshRpcCall("deleteblock", data, opts);
} }
// command "dispose" [call]
DisposeCommand(client: WshClient, data: CommandDisposeData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("dispose", data, opts);
}
// command "eventpublish" [call] // command "eventpublish" [call]
EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise<void> { EventPublishCommand(client: WshClient, data: WaveEvent, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("eventpublish", data, opts); return client.wshRpcCall("eventpublish", data, opts);

View File

@ -63,6 +63,7 @@ declare global {
// wshrpc.CommandAuthenticateRtnData // wshrpc.CommandAuthenticateRtnData
type CommandAuthenticateRtnData = { type CommandAuthenticateRtnData = {
routeid: string; routeid: string;
authtoken?: string;
}; };
// wshrpc.CommandBlockInputData // wshrpc.CommandBlockInputData
@ -100,6 +101,11 @@ declare global {
blockid: string; blockid: string;
}; };
// wshrpc.CommandDisposeData
type CommandDisposeData = {
routeid: string;
};
// wshrpc.CommandEventReadHistoryData // wshrpc.CommandEventReadHistoryData
type CommandEventReadHistoryData = { type CommandEventReadHistoryData = {
event: string; event: string;
@ -416,6 +422,7 @@ declare global {
resid?: string; resid?: string;
timeout?: number; timeout?: number;
route?: string; route?: string;
authtoken?: string;
source?: string; source?: string;
cont?: boolean; cont?: boolean;
cancel?: boolean; cancel?: boolean;

View File

@ -92,6 +92,12 @@ func DeleteBlockCommand(w *wshutil.WshRpc, data wshrpc.CommandDeleteBlockData, o
return err return err
} }
// command "dispose", wshserver.DisposeCommand
func DisposeCommand(w *wshutil.WshRpc, data wshrpc.CommandDisposeData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "dispose", data, opts)
return err
}
// command "eventpublish", wshserver.EventPublishCommand // command "eventpublish", wshserver.EventPublishCommand
func EventPublishCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error { func EventPublishCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventpublish", data, opts) _, err := sendRpcRequestCallHelper[any](w, "eventpublish", data, opts)

View File

@ -28,6 +28,7 @@ const (
const ( const (
Command_Authenticate = "authenticate" // special Command_Authenticate = "authenticate" // special
Command_Dispose = "dispose" // special (disposes of the route, for multiproxy only)
Command_RouteAnnounce = "routeannounce" // special (for routing) Command_RouteAnnounce = "routeannounce" // special (for routing)
Command_RouteUnannounce = "routeunannounce" // special (for routing) Command_RouteUnannounce = "routeunannounce" // special (for routing)
Command_Message = "message" Command_Message = "message"
@ -83,6 +84,7 @@ type RespOrErrorUnion[T any] struct {
type WshRpcInterface interface { type WshRpcInterface interface {
AuthenticateCommand(ctx context.Context, data string) (CommandAuthenticateRtnData, error) AuthenticateCommand(ctx context.Context, data string) (CommandAuthenticateRtnData, error)
DisposeCommand(ctx context.Context, data CommandDisposeData) error
RouteAnnounceCommand(ctx context.Context) error // (special) announces a new route to the main router RouteAnnounceCommand(ctx context.Context) error // (special) announces a new route to the main router
RouteUnannounceCommand(ctx context.Context) error // (special) unannounces a route to the main router RouteUnannounceCommand(ctx context.Context) error // (special) unannounces a route to the main router
@ -201,6 +203,12 @@ func HackRpcContextIntoData(dataPtr any, rpcContext RpcContext) {
type CommandAuthenticateRtnData struct { type CommandAuthenticateRtnData struct {
RouteId string `json:"routeid"` RouteId string `json:"routeid"`
AuthToken string `json:"authtoken,omitempty"`
}
type CommandDisposeData struct {
RouteId string `json:"routeid"`
// auth token travels in the packet directly
} }
type CommandMessageData struct { type CommandMessageData struct {

View File

@ -0,0 +1,137 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshutil
import (
"encoding/json"
"fmt"
"log"
"sync"
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
)
type multiProxyRouteInfo struct {
RouteId string
AuthToken string
Proxy *WshRpcProxy
RpcContext *wshrpc.RpcContext
}
// handles messages from multiple unauthenitcated clients
type WshRpcMultiProxy struct {
Lock *sync.Mutex
RouteInfo map[string]*multiProxyRouteInfo // authtoken to info
ToRemoteCh chan []byte
FromRemoteRawCh chan []byte // raw message from the remote
}
func MakeRpcMultiProxy() *WshRpcMultiProxy {
return &WshRpcMultiProxy{
Lock: &sync.Mutex{},
RouteInfo: make(map[string]*multiProxyRouteInfo),
ToRemoteCh: make(chan []byte, DefaultInputChSize),
FromRemoteRawCh: make(chan []byte, DefaultOutputChSize),
}
}
func (p *WshRpcMultiProxy) getRouteInfo(authToken string) *multiProxyRouteInfo {
p.Lock.Lock()
defer p.Lock.Unlock()
return p.RouteInfo[authToken]
}
func (p *WshRpcMultiProxy) setRouteInfo(authToken string, routeInfo *multiProxyRouteInfo) {
p.Lock.Lock()
defer p.Lock.Unlock()
p.RouteInfo[authToken] = routeInfo
}
func (p *WshRpcMultiProxy) removeRouteInfo(authToken string) {
p.Lock.Lock()
defer p.Lock.Unlock()
delete(p.RouteInfo, authToken)
}
func (p *WshRpcMultiProxy) sendResponseError(msg RpcMessage, sendErr error) {
if msg.ReqId == "" {
// no response needed
return
}
resp := RpcMessage{
ResId: msg.ReqId,
Error: sendErr.Error(),
}
respBytes, _ := json.Marshal(resp)
p.ToRemoteCh <- respBytes
}
func (p *WshRpcMultiProxy) sendAuthResponse(msg RpcMessage, routeId string, authToken string) {
if msg.ReqId == "" {
// no response needed
return
}
resp := RpcMessage{
ResId: msg.ReqId,
Data: wshrpc.CommandAuthenticateRtnData{RouteId: routeId, AuthToken: authToken},
}
respBytes, _ := json.Marshal(resp)
p.ToRemoteCh <- respBytes
}
func (p *WshRpcMultiProxy) handleUnauthMessage(msgBytes []byte) {
var msg RpcMessage
err := json.Unmarshal(msgBytes, &msg)
if err != nil {
// nothing to do here, malformed message
return
}
if msg.Command == wshrpc.Command_Authenticate {
rpcContext, routeId, err := handleAuthenticationCommand(msg)
if err != nil {
log.Printf("error handling authentication command (multiproxy): %v\n", err)
p.sendResponseError(msg, err)
return
}
routeInfo := &multiProxyRouteInfo{
RouteId: routeId,
AuthToken: uuid.New().String(),
RpcContext: rpcContext,
}
routeInfo.Proxy = MakeRpcProxy()
routeInfo.Proxy.SetRpcContext(rpcContext)
p.setRouteInfo(routeInfo.AuthToken, routeInfo)
p.sendAuthResponse(msg, routeId, routeInfo.AuthToken)
DefaultRouter.RegisterRoute(routeId, routeInfo.Proxy)
return
}
if msg.AuthToken == "" {
p.sendResponseError(msg, fmt.Errorf("no auth token"))
return
}
routeInfo := p.getRouteInfo(msg.AuthToken)
if routeInfo == nil {
p.sendResponseError(msg, fmt.Errorf("invalid auth token"))
return
}
if msg.Source != routeInfo.RouteId {
p.sendResponseError(msg, fmt.Errorf("invalid source route for auth token"))
return
}
if msg.Command == wshrpc.Command_Dispose {
DefaultRouter.UnregisterRoute(routeInfo.RouteId)
p.removeRouteInfo(msg.AuthToken)
return
}
routeInfo.Proxy.FromRemoteCh <- msgBytes
}
func (p *WshRpcMultiProxy) RunUnauthLoop() {
// loop over unauthenticated message
// handle Authenicate commands, and pass authenticated messages to the AuthCh
for msgBytes := range p.FromRemoteRawCh {
p.handleUnauthMessage(msgBytes)
}
}

View File

@ -136,9 +136,9 @@ func (p *WshRpcProxy) SendRpcMessage(msg []byte) {
} }
func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool) { func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool) {
msgBytes, ok := <-p.FromRemoteCh msgBytes, more := <-p.FromRemoteCh
if !ok || p.RpcContext == nil { if !more || p.RpcContext == nil {
return msgBytes, ok return msgBytes, more
} }
var msg RpcMessage var msg RpcMessage
err := json.Unmarshal(msgBytes, &msg) err := json.Unmarshal(msgBytes, &msg)

View File

@ -109,6 +109,7 @@ type RpcMessage struct {
ResId string `json:"resid,omitempty"` ResId string `json:"resid,omitempty"`
Timeout int `json:"timeout,omitempty"` Timeout int `json:"timeout,omitempty"`
Route string `json:"route,omitempty"` // to route/forward requests to alternate servers Route string `json:"route,omitempty"` // to route/forward requests to alternate servers
AuthToken string `json:"authtoken,omitempty"` // needed for routing unauthenticated requests (WshRpcMultiProxy)
Source string `json:"source,omitempty"` // source route id Source string `json:"source,omitempty"` // source route id
Cont bool `json:"cont,omitempty"` // flag if additional requests/responses are forthcoming Cont bool `json:"cont,omitempty"` // flag if additional requests/responses are forthcoming
Cancel bool `json:"cancel,omitempty"` // used to cancel a streaming request or response (sent from the side that is not streaming) Cancel bool `json:"cancel,omitempty"` // used to cancel a streaming request or response (sent from the side that is not streaming)