fix first level bugs in new conncontroller --router command

This commit is contained in:
sawka 2024-10-21 18:46:18 -07:00
parent e0ffa4fa86
commit bdb2f04d76
2 changed files with 45 additions and 10 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/spf13/cobra"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshremote"
"github.com/wavetermdev/waveterm/pkg/wshutil"
"golang.org/x/crypto/ssh/terminal"
@ -108,6 +109,34 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) {
}
}
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter) (*wshutil.WshRpc, error) {
jwtToken := os.Getenv(wshutil.WaveJwtTokenVarName)
if jwtToken == "" {
return nil, fmt.Errorf("no jwt token found for connserver")
}
rpcCtx, err := wshutil.ExtractUnverifiedRpcContext(jwtToken)
if err != nil {
return nil, fmt.Errorf("error extracting rpc context from %s: %v", wshutil.WaveJwtTokenVarName, err)
}
RpcContext = *rpcCtx
inputCh := make(chan []byte, wshutil.DefaultInputChSize)
outputCh := make(chan []byte, wshutil.DefaultOutputChSize)
connServerClient := wshutil.MakeWshRpc(inputCh, outputCh, *rpcCtx, &wshremote.ServerImpl{LogWriter: os.Stdout})
upstreamClient := router.GetUpstreamClient().(*wshutil.WshRpc)
resp, err := wshclient.AuthenticateCommand(upstreamClient, jwtToken, nil)
if err != nil {
return nil, fmt.Errorf("error authenticating connserver: %v", err)
}
if resp.AuthToken == "" {
return nil, fmt.Errorf("no auth token returned from connserver")
}
log.Printf("authenticated connserver route: %s\n", resp.RouteId)
connServerClient.SetAuthToken(resp.AuthToken)
router.RegisterRoute(resp.RouteId, connServerClient, false)
wshclient.RouteAnnounceCommand(connServerClient, nil)
return connServerClient, nil
}
func serverRunRouter() error {
isTerminal := terminal.IsTerminal(int(os.Stdout.Fd()))
if isTerminal {
@ -137,8 +166,13 @@ func serverRunRouter() error {
if err != nil {
return fmt.Errorf("cannot create unix listener: %v", err)
}
runListener(unixListener, router)
go runListener(unixListener, router)
client, err := setupConnServerRpcClientWithRouter(router)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
// run the sysinfo loop
wshremote.RunSysInfoLoop(client, client.GetRpcContext().Conn)
select {}
}

View File

@ -363,9 +363,10 @@ type WriteFlusher interface {
}
// blocking, returns if there is an error, or on EOF of input
func HandleStdIOClient(logName string, input io.Reader, output WriteFlusher) {
func HandleStdIOClient(logName string, input io.Reader, output io.Writer) {
log.Printf("[%s] starting (HandleStdIOClient)\n", logName)
proxy := MakeRpcMultiProxy()
ptyBuffer := MakePtyBuffer(WaveServerOSCPrefix, input, proxy.FromRemoteRawCh)
ptyBuffer := MakePtyBuffer(WaveOSCPrefix, input, proxy.FromRemoteRawCh)
doneCh := make(chan struct{})
var doneOnce sync.Once
closeDoneCh := func() {
@ -373,20 +374,19 @@ func HandleStdIOClient(logName string, input io.Reader, output WriteFlusher) {
close(doneCh)
})
}
go func() {
proxy.RunUnauthLoop()
}()
go func() {
defer closeDoneCh()
for msg := range proxy.ToRemoteCh {
barr := EncodeWaveOSCBytes(WaveOSC, msg)
log.Printf("[%s] sending message: %s\n", logName, string(msg))
barr := EncodeWaveOSCBytes(WaveServerOSC, msg)
_, err := output.Write(barr)
if err != nil {
log.Printf("[%s] error writing to output: %v\n", logName, err)
break
}
err = output.Flush()
if err != nil {
log.Printf("[%s] error flushing output: %v\n", logName, err)
break
}
}
}()
go func() {
@ -482,5 +482,6 @@ func ExtractUnverifiedSocketName(tokenStr string) (string, error) {
if !ok {
return "", fmt.Errorf("sock claim is missing or invalid")
}
sockName = wavebase.ExpandHomeDirSafe(sockName)
return sockName, nil
}