conn updates 3 (#1711)

lots of misc connection refactoring / fixes:

* adds blocklogger as a way to writing logging information from the backend directly to the a terminal block
* use blocklogger in conncontroller
* use blocklogger in sshclient
* fix remote name in password prompt
* use sh -c to get around shell weirdness
* remove cmd.exe special cases
* use GetWatcher().GetFullConfig() rather than re-reading the config file
* change order of things we do when establishing a connection.  ask for wsh up front.  then do domain socket, then connserver
* reduce number of sessions required in the common case when wsh is already installed.  running the connserver is now a "multi-command" which checks if it is installed, then asks for the version
* send jwt token over stdin instead of in initial command string
* fix focus bug for frontend conn modal
* track more information in connstatus
* simplify wshinstall function
* add nowshreason
* other misc cleanup
This commit is contained in:
Mike Sawka 2025-01-10 14:09:32 -08:00 committed by GitHub
parent 37929d90c1
commit ba5f929b3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 682 additions and 241 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/authkey"
"github.com/wavetermdev/waveterm/pkg/blockcontroller"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
@ -297,6 +298,7 @@ func main() {
go stdinReadWatch()
go telemetryLoop()
configWatcher()
blocklogger.InitBlockLogger()
webListener, err := web.MakeTCPListener("web")
if err != nil {
log.Printf("error creating web listener: %v\n", err)

View File

@ -128,7 +128,11 @@ func connReinstallRun(cmd *cobra.Command, args []string) error {
if err := validateConnectionName(connName); err != nil {
return err
}
err := wshclient.ConnReinstallWshCommand(RpcClient, connName, &wshrpc.RpcOpts{Timeout: 60000})
data := wshrpc.ConnExtData{
ConnName: connName,
LogBlockId: RpcContext.BlockId,
}
err := wshclient.ConnReinstallWshCommand(RpcClient, data, &wshrpc.RpcOpts{Timeout: 60000})
if err != nil {
return fmt.Errorf("reinstalling connection: %w", err)
}
@ -173,7 +177,11 @@ func connConnectRun(cmd *cobra.Command, args []string) error {
if err := validateConnectionName(connName); err != nil {
return err
}
err := wshclient.ConnConnectCommand(RpcClient, wshrpc.ConnRequest{Host: connName}, &wshrpc.RpcOpts{Timeout: 60000})
data := wshrpc.ConnRequest{
Host: connName,
LogBlockId: RpcContext.BlockId,
}
err := wshclient.ConnConnectCommand(RpcClient, data, &wshrpc.RpcOpts{Timeout: 60000})
if err != nil {
return fmt.Errorf("connecting connection: %w", err)
}
@ -186,7 +194,11 @@ func connEnsureRun(cmd *cobra.Command, args []string) error {
if err := validateConnectionName(connName); err != nil {
return err
}
err := wshclient.ConnEnsureCommand(RpcClient, connName, &wshrpc.RpcOpts{Timeout: 60000})
data := wshrpc.ConnExtData{
ConnName: connName,
LogBlockId: RpcContext.BlockId,
}
err := wshclient.ConnEnsureCommand(RpcClient, data, &wshrpc.RpcOpts{Timeout: 60000})
if err != nil {
return fmt.Errorf("ensuring connection: %w", err)
}

View File

@ -39,7 +39,8 @@ func sshRun(cmd *cobra.Command, args []string) (rtnErr error) {
}
// first, make a connection independent of the block
connOpts := wshrpc.ConnRequest{
Host: sshArg,
Host: sshArg,
LogBlockId: blockId,
Keywords: wshrpc.ConnKeywords{
SshIdentityFile: identityFiles,
},

View File

@ -23,10 +23,10 @@ import {
getSettingsKeyAtom,
getUserName,
globalStore,
refocusNode,
useBlockAtom,
WOS,
} from "@/app/store/global";
import { globalRefocusWithTimeout } from "@/app/store/keymodel";
import { RpcApi } from "@/app/store/wshclientapi";
import { TabRpcClient } from "@/app/store/wshrpcutil";
import { ErrorBoundary } from "@/element/errorboundary";
@ -356,7 +356,11 @@ const ConnStatusOverlay = React.memo(
}, [width, connStatus, setShowError]);
const handleTryReconnect = React.useCallback(() => {
const prtn = RpcApi.ConnConnectCommand(TabRpcClient, { host: connName }, { timeout: 60000 });
const prtn = RpcApi.ConnConnectCommand(
TabRpcClient,
{ host: connName, logblockid: nodeModel.blockId },
{ timeout: 60000 }
);
prtn.catch((e) => console.log("error reconnecting", connName, e));
}, [connName]);
@ -541,7 +545,11 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => {
const connName = blockData?.meta?.connection;
if (!util.isBlank(connName)) {
console.log("ensure conn", nodeModel.blockId, connName);
RpcApi.ConnEnsureCommand(TabRpcClient, connName, { timeout: 60000 }).catch((e) => {
RpcApi.ConnEnsureCommand(
TabRpcClient,
{ connname: connName, logblockid: nodeModel.blockId },
{ timeout: 60000 }
).catch((e) => {
console.log("error ensuring connection", nodeModel.blockId, connName, e);
});
}
@ -691,7 +699,11 @@ const ChangeConnectionBlockModal = React.memo(
meta: { connection: connName, file: newCwd },
});
try {
await RpcApi.ConnEnsureCommand(TabRpcClient, connName, { timeout: 60000 });
await RpcApi.ConnEnsureCommand(
TabRpcClient,
{ connname: connName, logblockid: blockId },
{ timeout: 60000 }
);
} catch (e) {
console.log("error connecting", blockId, connName, e);
}
@ -756,7 +768,7 @@ const ChangeConnectionBlockModal = React.memo(
onSelect: async (_: string) => {
const prtn = RpcApi.ConnConnectCommand(
TabRpcClient,
{ host: connStatus.connection },
{ host: connStatus.connection, logblockid: blockId },
{ timeout: 60000 }
);
prtn.catch((e) => console.log("error reconnecting", connStatus.connection, e));
@ -879,12 +891,13 @@ const ChangeConnectionBlockModal = React.memo(
} else {
changeConnection(rowItem.value);
globalStore.set(changeConnModalAtom, false);
globalRefocusWithTimeout(10);
}
}
if (keyutil.checkKeyPressed(waveEvent, "Escape")) {
globalStore.set(changeConnModalAtom, false);
setConnSelected("");
refocusNode(blockId);
globalRefocusWithTimeout(10);
return true;
}
if (keyutil.checkKeyPressed(waveEvent, "ArrowUp")) {
@ -916,6 +929,7 @@ const ChangeConnectionBlockModal = React.memo(
onSelect={(selected: string) => {
changeConnection(selected);
globalStore.set(changeConnModalAtom, false);
globalRefocusWithTimeout(10);
}}
selectIndex={rowIndex}
autoFocus={isNodeFocused}

View File

@ -146,6 +146,12 @@ function handleCmdI() {
globalRefocus();
}
function globalRefocusWithTimeout(timeoutVal: number) {
setTimeout(() => {
globalRefocus();
}, timeoutVal);
}
function globalRefocus() {
const layoutModel = getLayoutModelForStaticTab();
const focusedNode = globalStore.get(layoutModel.focusedNode);
@ -403,6 +409,7 @@ export {
getAllGlobalKeyBindings,
getSimpleControlShiftAtom,
globalRefocus,
globalRefocusWithTimeout,
registerControlShiftStateUpdateHandler,
registerElectronReinjectKeyHandler,
registerGlobalKeys,

View File

@ -38,7 +38,7 @@ class RpcApiType {
}
// command "connensure" [call]
ConnEnsureCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
ConnEnsureCommand(client: WshClient, data: ConnExtData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connensure", data, opts);
}
@ -48,7 +48,7 @@ class RpcApiType {
}
// command "connreinstallwsh" [call]
ConnReinstallWshCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
ConnReinstallWshCommand(client: WshClient, data: ConnExtData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("connreinstallwsh", data, opts);
}
@ -57,6 +57,11 @@ class RpcApiType {
return client.wshRpcCall("connstatus", null, opts);
}
// command "controllerappendoutput" [call]
ControllerAppendOutputCommand(client: WshClient, data: CommandControllerAppendOutputData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerappendoutput", data, opts);
}
// command "controllerinput" [call]
ControllerInputCommand(client: WshClient, data: CommandBlockInputData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("controllerinput", data, opts);

View File

@ -364,7 +364,7 @@ export class PreviewModel implements ViewModel {
this.connection = atom<Promise<string>>(async (get) => {
const connName = get(this.blockAtom)?.meta?.connection;
try {
await RpcApi.ConnEnsureCommand(TabRpcClient, connName, { timeout: 60000 });
await RpcApi.ConnEnsureCommand(TabRpcClient, { connname: connName }, { timeout: 60000 });
globalStore.set(this.connectionError, "");
} catch (e) {
globalStore.set(this.connectionError, e as string);

View File

@ -682,6 +682,45 @@ class TermViewModel implements ViewModel {
},
});
}
const debugConn = blockData?.meta?.["term:conndebug"];
fullMenu.push({
label: "Debug Connection",
submenu: [
{
label: "Off",
type: "checkbox",
checked: !debugConn,
click: () => {
RpcApi.SetMetaCommand(TabRpcClient, {
oref: WOS.makeORef("block", this.blockId),
meta: { "term:conndebug": null },
});
},
},
{
label: "Info",
type: "checkbox",
checked: debugConn == "info",
click: () => {
RpcApi.SetMetaCommand(TabRpcClient, {
oref: WOS.makeORef("block", this.blockId),
meta: { "term:conndebug": "info" },
});
},
},
{
label: "Verbose",
type: "checkbox",
checked: debugConn == "debug",
click: () => {
RpcApi.SetMetaCommand(TabRpcClient, {
oref: WOS.makeORef("block", this.blockId),
meta: { "term:conndebug": "debug" },
});
},
},
],
});
return fullMenu;
}
}

View File

@ -125,6 +125,12 @@ declare global {
view: string;
};
// wshrpc.CommandControllerAppendOutputData
type CommandControllerAppendOutputData = {
blockid: string;
data64: string;
};
// wshrpc.CommandControllerResyncData
type CommandControllerResyncData = {
forcerestart?: boolean;
@ -286,11 +292,18 @@ declare global {
metamaptype: MetaType;
};
// wshrpc.ConnExtData
type ConnExtData = {
connname: string;
logblockid?: string;
};
// wshrpc.ConnKeywords
type ConnKeywords = {
"conn:wshenabled"?: boolean;
"conn:askbeforewshinstall"?: boolean;
"conn:overrideconfig"?: boolean;
"conn:wshpath"?: string;
"display:hidden"?: boolean;
"display:order"?: number;
"term:*"?: boolean;
@ -317,6 +330,7 @@ declare global {
type ConnRequest = {
host: string;
keywords?: ConnKeywords;
logblockid?: string;
};
// wshrpc.ConnStatus
@ -329,6 +343,8 @@ declare global {
activeconnnum: number;
error?: string;
wsherror?: string;
nowshreason?: string;
wshversion?: string;
};
// wshrpc.CpuDataRequest
@ -494,6 +510,7 @@ declare global {
"term:vdomtoolbarblockid"?: string;
"term:transparency"?: number;
"term:allowbracketedpaste"?: boolean;
"term:conndebug"?: string;
"web:zoom"?: number;
"web:hidenav"?: boolean;
"markdown:fontsize"?: number;

View File

@ -16,6 +16,7 @@ import (
"sync/atomic"
"time"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote"
@ -375,9 +376,7 @@ func (bc *BlockController) setupAndStartShellProcess(rc *RunShellOpts, blockMeta
} else {
shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
conn.WithLock(func() {
conn.WshError = err.Error()
})
conn.SetWshError(err)
conn.WshEnabled.Store(false)
log.Printf("error starting remote shell proc with wsh: %v", err)
log.Print("attempting install without wsh")
@ -759,6 +758,13 @@ func getOrCreateBlockController(tabId string, blockId string, controllerName str
return bc
}
func formatConnNameForLog(connName string) string {
if connName == "" {
return "local"
}
return connName
}
func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts, force bool) error {
if tabId == "" || blockId == "" {
return fmt.Errorf("invalid tabId or blockId passed to ResyncController")
@ -769,6 +775,7 @@ func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts
}
if force {
StopBlockController(blockId)
time.Sleep(100 * time.Millisecond) // TODO see if we can remove this (the "process finished with exit code" message comes out after we start reconnecting otherwise)
}
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "")
@ -784,8 +791,10 @@ func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts
if curBc != nil {
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName {
blocklogger.Infof(ctx, "\n[conndebug] stopping blockcontroller due to conn change %q => %q\n", formatConnNameForLog(bcStatus.ShellProcConnName), formatConnNameForLog(connName))
log.Printf("stopping blockcontroller %s due to conn change\n", blockId)
StopBlockControllerAndSetStatus(blockId, Status_Init)
time.Sleep(100 * time.Millisecond) // TODO see if we can remove this (the "process finished with exit code" message comes out after we start reconnecting otherwise)
}
}
// now if there is a conn, ensure it is connected

View File

@ -0,0 +1,92 @@
// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blocklogger
import (
"context"
"encoding/base64"
"fmt"
"log"
"strings"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
)
// Buffer size for the output channel
const outputBufferSize = 1000
var outputChan chan wshrpc.CommandControllerAppendOutputData
func InitBlockLogger() {
outputChan = make(chan wshrpc.CommandControllerAppendOutputData, outputBufferSize)
// Start the output runner
go outputRunner()
}
func outputRunner() {
defer log.Printf("blocklogger: outputRunner exiting")
client := wshclient.GetBareRpcClient()
for data := range outputChan {
// Process each output request synchronously, waiting for response
wshclient.ControllerAppendOutputCommand(client, data, nil)
}
}
type logBlockIdContextKeyType struct{}
var logBlockIdContextKey = logBlockIdContextKeyType{}
type logBlockIdData struct {
BlockId string
Verbose bool
}
func ContextWithLogBlockId(ctx context.Context, blockId string, verbose bool) context.Context {
return context.WithValue(ctx, logBlockIdContextKey, &logBlockIdData{BlockId: blockId, Verbose: verbose})
}
func getLogBlockData(ctx context.Context) *logBlockIdData {
if ctx == nil {
return nil
}
dataPtr := ctx.Value(logBlockIdContextKey)
if dataPtr == nil {
return nil
}
return dataPtr.(*logBlockIdData)
}
func queueLogData(data wshrpc.CommandControllerAppendOutputData) {
select {
case outputChan <- data:
default:
}
}
func writeLogf(blockId string, format string, args []any) {
logStr := fmt.Sprintf(format, args...)
logStr = strings.ReplaceAll(logStr, "\n", "\r\n")
data := wshrpc.CommandControllerAppendOutputData{
BlockId: blockId,
Data64: base64.StdEncoding.EncodeToString([]byte(logStr)),
}
queueLogData(data)
}
func Infof(ctx context.Context, format string, args ...any) {
logData := getLogBlockData(ctx)
if logData == nil {
return
}
writeLogf(logData.BlockId, format, args)
}
func Debugf(ctx context.Context, format string, args ...interface{}) {
logData := getLogBlockData(ctx)
if logData == nil || !logData.Verbose {
return
}
writeLogf(logData.BlockId, format, args)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/kevinburke/ssh_config"
"github.com/skeema/knownhosts"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/genconn"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote"
@ -56,16 +57,24 @@ type SSHConn struct {
WshEnabled *atomic.Bool
Opts *remote.SSHOpts
Client *ssh.Client
SockName string
DomainSockName string // if "", then no domain socket
DomainSockListener net.Listener
ConnController *ssh.Session
Error string
WshError string
NoWshReason string
WshVersion string
HasWaiter *atomic.Bool
LastConnectTime int64
ActiveConnNum int
}
var ConnServerCmdTemplate = strings.TrimSpace(`
%s version || echo "not-installed"
read jwt_token
WAVETERM_JWT="$jwt_token" %s connserver
`)
func GetAllConnStatus() []wshrpc.ConnStatus {
globalLock.Lock()
defer globalLock.Unlock()
@ -96,15 +105,22 @@ func (conn *SSHConn) DeriveConnStatus() wshrpc.ConnStatus {
return wshrpc.ConnStatus{
Status: conn.Status,
Connected: conn.Status == Status_Connected,
WshEnabled: conn.WshEnabled.Load(),
Connection: conn.Opts.String(),
HasConnected: (conn.LastConnectTime > 0),
ActiveConnNum: conn.ActiveConnNum,
Error: conn.Error,
WshEnabled: conn.WshEnabled.Load(),
WshError: conn.WshError,
NoWshReason: conn.NoWshReason,
WshVersion: conn.WshVersion,
}
}
func (conn *SSHConn) Infof(ctx context.Context, format string, args ...any) {
log.Print(fmt.Sprintf("[conn:%s] ", conn.GetName()) + fmt.Sprintf(format, args...))
blocklogger.Infof(ctx, "[conndebug] "+format, args...)
}
func (conn *SSHConn) FireConnChangeEvent() {
status := conn.DeriveConnStatus()
event := wps.WaveEvent{
@ -143,6 +159,7 @@ func (conn *SSHConn) close_nolock() {
if conn.DomainSockListener != nil {
conn.DomainSockListener.Close()
conn.DomainSockListener = nil
conn.DomainSockName = ""
}
if conn.ConnController != nil {
conn.ConnController.Close()
@ -157,7 +174,7 @@ func (conn *SSHConn) close_nolock() {
func (conn *SSHConn) GetDomainSocketName() string {
conn.Lock.Lock()
defer conn.Lock.Unlock()
return conn.SockName
return conn.DomainSockName
}
func (conn *SSHConn) GetStatus() string {
@ -171,14 +188,10 @@ func (conn *SSHConn) GetName() string {
return conn.Opts.String()
}
func (conn *SSHConn) OpenDomainSocketListener() error {
var allowed bool
conn.WithLock(func() {
if conn.Status != Status_Connecting {
allowed = false
} else {
allowed = true
}
func (conn *SSHConn) OpenDomainSocketListener(ctx context.Context) error {
conn.Infof(ctx, "running OpenDomainSocketListener...\n")
allowed := WithLockRtn(conn, func() bool {
return conn.Status == Status_Connecting
})
if !allowed {
return fmt.Errorf("cannot open domain socket for %q when status is %q", conn.GetName(), conn.GetStatus())
@ -189,39 +202,57 @@ func (conn *SSHConn) OpenDomainSocketListener() error {
return fmt.Errorf("error generating random string: %w", err)
}
sockName := fmt.Sprintf("/tmp/waveterm-%s.sock", randStr)
log.Printf("remote domain socket %s %q\n", conn.GetName(), conn.GetDomainSocketName())
conn.Infof(ctx, "generated domain socket name %s\n", sockName)
listener, err := client.ListenUnix(sockName)
if err != nil {
return fmt.Errorf("unable to request connection domain socket: %v", err)
}
conn.WithLock(func() {
conn.SockName = sockName
conn.DomainSockName = sockName
conn.DomainSockListener = listener
})
conn.Infof(ctx, "successfully connected domain socket\n")
go func() {
defer func() {
panichandler.PanicHandler("conncontroller:OpenDomainSocketListener", recover())
}()
defer conn.WithLock(func() {
conn.DomainSockListener = nil
conn.SockName = ""
conn.DomainSockName = ""
})
wshutil.RunWshRpcOverListener(listener)
}()
return nil
}
func (conn *SSHConn) StartConnServer() error {
var allowed bool
conn.WithLock(func() {
if conn.Status != Status_Connecting {
allowed = false
} else {
allowed = true
}
// expects the output of `wsh version` which looks like `wsh v0.10.4` or "not-installed"
// returns (up-to-date, semver, error)
// if not up to date, or error, version might be ""
func isWshVersionUpToDate(wshVersionLine string) (bool, string, error) {
wshVersionLine = strings.TrimSpace(wshVersionLine)
if wshVersionLine == "not-installed" {
return false, "", nil
}
parts := strings.Fields(wshVersionLine)
if len(parts) != 2 {
return false, "", fmt.Errorf("unexpected version format: %s", wshVersionLine)
}
clientVersion := parts[1]
expectedVersion := fmt.Sprintf("v%s", wavebase.WaveVersion)
if semver.Compare(clientVersion, expectedVersion) < 0 {
return false, clientVersion, nil
}
return true, clientVersion, nil
}
// returns (needsInstall, clientVersion, error)
func (conn *SSHConn) StartConnServer(ctx context.Context) (bool, string, error) {
conn.Infof(ctx, "running StartConnServer...\n")
allowed := WithLockRtn(conn, func() bool {
return conn.Status == Status_Connecting
})
if !allowed {
return fmt.Errorf("cannot start conn server for %q when status is %q", conn.GetName(), conn.GetStatus())
return false, "", fmt.Errorf("cannot start conn server for %q when status is %q", conn.GetName(), conn.GetStatus())
}
client := conn.GetClient()
wshPath := remote.GetWshPath(client)
@ -232,29 +263,49 @@ func (conn *SSHConn) StartConnServer() error {
sockName := conn.GetDomainSocketName()
jwtToken, err := wshutil.MakeClientJWTToken(rpcCtx, sockName)
if err != nil {
return fmt.Errorf("unable to create jwt token for conn controller: %w", err)
return false, "", fmt.Errorf("unable to create jwt token for conn controller: %w", err)
}
sshSession, err := client.NewSession()
if err != nil {
return fmt.Errorf("unable to create ssh session for conn controller: %w", err)
return false, "", fmt.Errorf("unable to create ssh session for conn controller: %w", err)
}
pipeRead, pipeWrite := io.Pipe()
sshSession.Stdout = pipeWrite
sshSession.Stderr = pipeWrite
shellPath, err := remote.DetectShell(client)
stdinPipe, err := sshSession.StdinPipe()
if err != nil {
return err
}
var cmdStr string
if remote.IsPowershell(shellPath) {
cmdStr = fmt.Sprintf("$env:%s=\"%s\"; %s connserver", wshutil.WaveJwtTokenVarName, jwtToken, wshPath)
} else {
cmdStr = fmt.Sprintf("%s=\"%s\" %s connserver", wshutil.WaveJwtTokenVarName, jwtToken, wshPath)
return false, "", fmt.Errorf("unable to get stdin pipe: %w", err)
}
cmdStr := fmt.Sprintf(ConnServerCmdTemplate, wshPath, wshPath)
log.Printf("starting conn controller: %s\n", cmdStr)
err = sshSession.Start(cmdStr)
shWrappedCmdStr := fmt.Sprintf("sh -c %s", genconn.HardQuote(cmdStr))
err = sshSession.Start(shWrappedCmdStr)
if err != nil {
return fmt.Errorf("unable to start conn controller: %w", err)
return false, "", fmt.Errorf("unable to start conn controller command: %w", err)
}
linesChan := wshutil.StreamToLinesChan(pipeRead)
versionLine, err := wshutil.ReadLineWithTimeout(linesChan, 2*time.Second)
if err != nil {
sshSession.Close()
return false, "", fmt.Errorf("error reading wsh version: %w", err)
}
conn.Infof(ctx, "got connserver version: %s\n", strings.TrimSpace(versionLine))
isUpToDate, clientVersion, err := isWshVersionUpToDate(versionLine)
if err != nil {
sshSession.Close()
return false, "", fmt.Errorf("error checking wsh version: %w", err)
}
conn.Infof(ctx, "connserver update to date: %v\n", isUpToDate)
if !isUpToDate {
sshSession.Close()
return true, clientVersion, nil
}
// write the jwt
conn.Infof(ctx, "writing jwt token to connserver\n")
_, err = fmt.Fprintf(stdinPipe, "%s\n", jwtToken)
if err != nil {
sshSession.Close()
return false, clientVersion, fmt.Errorf("failed to write JWT token: %w", err)
}
conn.WithLock(func() {
conn.ConnController = sshSession
@ -265,35 +316,46 @@ func (conn *SSHConn) StartConnServer() error {
panichandler.PanicHandler("conncontroller:sshSession.Wait", recover())
}()
// wait for termination, clear the controller
var waitErr error
defer conn.WithLock(func() {
if conn.ConnController != nil {
conn.WshEnabled.Store(false)
conn.NoWshReason = "connserver terminated"
if waitErr != nil {
conn.WshError = fmt.Sprintf("connserver terminated unexpectedly with error: %v", waitErr)
}
}
conn.ConnController = nil
})
waitErr := sshSession.Wait()
waitErr = sshSession.Wait()
log.Printf("conn controller (%q) terminated: %v", conn.GetName(), waitErr)
}()
go func() {
defer func() {
panichandler.PanicHandler("conncontroller:sshSession-output", recover())
}()
readErr := wshutil.StreamToLines(pipeRead, func(line []byte) {
lineStr := string(line)
if !strings.HasSuffix(lineStr, "\n") {
lineStr += "\n"
for output := range linesChan {
if output.Error != nil {
log.Printf("[conncontroller:%s:output] error: %v\n", conn.GetName(), output.Error)
continue
}
log.Printf("[conncontroller:%s:output] %s", conn.GetName(), lineStr)
})
if readErr != nil && readErr != io.EOF {
log.Printf("[conncontroller:%s] error reading output: %v\n", conn.GetName(), readErr)
line := output.Line
if !strings.HasSuffix(line, "\n") {
line += "\n"
}
log.Printf("[conncontroller:%s:output] %s", conn.GetName(), line)
}
}()
conn.Infof(ctx, "connserver started, waiting for route to be registered\n")
regCtx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
err = wshutil.DefaultRouter.WaitForRegister(regCtx, wshutil.MakeConnectionRouteId(rpcCtx.Conn))
if err != nil {
return fmt.Errorf("timeout waiting for connserver to register")
return false, clientVersion, fmt.Errorf("timeout waiting for connserver to register")
}
time.Sleep(300 * time.Millisecond) // TODO remove this sleep (but we need to wait until connserver is "ready")
return nil
conn.Infof(ctx, "connserver is registered and ready\n")
return false, clientVersion, nil
}
type WshInstallOpts struct {
@ -307,78 +369,78 @@ func (wise *WshInstallSkipError) Error() string {
return "skipping wsh installation"
}
func (conn *SSHConn) CheckAndInstallWsh(ctx context.Context, clientDisplayName string, opts *WshInstallOpts) error {
if opts == nil {
opts = &WshInstallOpts{}
var queryTextTemplate = strings.TrimSpace(`
Wave requires Wave Shell Extensions to be
installed on %q
to ensure a seamless experience.
Would you like to install them?
`)
// returns (allowed, error)
func (conn *SSHConn) getPermissionToInstallWsh(ctx context.Context, clientDisplayName string) (bool, error) {
conn.Infof(ctx, "running getPermissionToInstallWsh...\n")
queryText := fmt.Sprintf(queryTextTemplate, clientDisplayName)
title := "Install Wave Shell Extensions"
request := &userinput.UserInputRequest{
ResponseType: "confirm",
QueryText: queryText,
Title: title,
Markdown: true,
CheckBoxMsg: "Automatically install for all connections",
OkLabel: "Install wsh",
CancelLabel: "No wsh",
}
conn.Infof(ctx, "requesting user confirmation...\n")
response, err := userinput.GetUserInput(ctx, request)
if err != nil {
conn.Infof(ctx, "error getting user input: %v\n", err)
return false, err
}
conn.Infof(ctx, "user response to allowing wsh: %v\n", response.Confirm)
meta := make(map[string]any)
meta["conn:wshenabled"] = response.Confirm
conn.Infof(ctx, "writing conn:wshenabled=%v to connections.json\n", response.Confirm)
err = wconfig.SetConnectionsConfigValue(conn.GetName(), meta)
if err != nil {
log.Printf("warning: error writing to connections file: %v", err)
}
if !response.Confirm {
return false, nil
}
if response.CheckboxStat {
conn.Infof(ctx, "writing conn:askbeforewshinstall=false to settings.json\n")
meta := waveobj.MetaMapType{
wconfig.ConfigKey_ConnAskBeforeWshInstall: false,
}
setConfigErr := wconfig.SetBaseConfigValue(meta)
if setConfigErr != nil {
// this is not a critical error, just log and continue
log.Printf("warning: error writing to base config file: %v", err)
}
}
return true, nil
}
func (conn *SSHConn) InstallWsh(ctx context.Context) error {
conn.Infof(ctx, "running installWsh...\n")
client := conn.GetClient()
if client == nil {
return fmt.Errorf("client is nil")
conn.Infof(ctx, "ERROR ssh client is not connected, cannot install\n")
return fmt.Errorf("ssh client is not connected, cannot install")
}
// check that correct wsh extensions are installed
expectedVersion := fmt.Sprintf("v%s", wavebase.WaveVersion)
clientVersion, err := remote.GetWshVersion(client)
if err == nil && !opts.Force && semver.Compare(clientVersion, expectedVersion) >= 0 {
return nil
}
var queryText string
var title string
if opts.Force {
queryText = fmt.Sprintf("ReInstalling Wave Shell Extensions (%s) on `%s`\n", wavebase.WaveVersion, clientDisplayName)
title = "Install Wave Shell Extensions"
} else if err != nil {
queryText = fmt.Sprintf("Wave requires Wave Shell Extensions to be \n"+
"installed on `%s` \n"+
"to ensure a seamless experience. \n\n"+
"Would you like to install them?", clientDisplayName)
title = "Install Wave Shell Extensions"
} else {
// don't ask for upgrading the version
opts.NoUserPrompt = true
}
if !opts.NoUserPrompt {
request := &userinput.UserInputRequest{
ResponseType: "confirm",
QueryText: queryText,
Title: title,
Markdown: true,
CheckBoxMsg: "Automatically install for all connections",
OkLabel: "Install wsh",
CancelLabel: "No wsh",
}
response, err := userinput.GetUserInput(ctx, request)
if err != nil {
return err
}
if !response.Confirm {
meta := make(map[string]any)
meta["conn:wshenabled"] = false
err = wconfig.SetConnectionsConfigValue(conn.GetName(), meta)
if err != nil {
log.Printf("warning: error writing to connections file: %v", err)
}
return &WshInstallSkipError{}
}
if response.CheckboxStat {
meta := waveobj.MetaMapType{
wconfig.ConfigKey_ConnAskBeforeWshInstall: false,
}
err := wconfig.SetBaseConfigValue(meta)
if err != nil {
return fmt.Errorf("error setting conn:askbeforewshinstall value: %w", err)
}
}
}
log.Printf("attempting to install wsh to `%s`", clientDisplayName)
clientOs, clientArch, err := remote.GetClientPlatform(ctx, genconn.MakeSSHShellClient(client))
if err != nil {
conn.Infof(ctx, "ERROR detecting client platform: %v\n", err)
return err
}
conn.Infof(ctx, "detected remote platform os:%s arch:%s\n", clientOs, clientArch)
err = remote.CpWshToRemote(ctx, client, clientOs, clientArch)
if err != nil {
return fmt.Errorf("error installing wsh to remote: %w", err)
conn.Infof(ctx, "ERROR copying wsh binary to remote: %v\n", err)
return fmt.Errorf("error copying wsh binary to remote: %w", err)
}
log.Printf("successfully installed wsh on %s\n", conn.GetName())
conn.Infof(ctx, "successfully installed wsh\n")
return nil
}
@ -422,6 +484,7 @@ func (conn *SSHConn) WaitForConnect(ctx context.Context) error {
// does not return an error since that error is stored inside of SSHConn
func (conn *SSHConn) Connect(ctx context.Context, connFlags *wshrpc.ConnKeywords) error {
blocklogger.Infof(ctx, "\n")
var connectAllowed bool
conn.WithLock(func() {
if conn.Status == Status_Connecting || conn.Status == Status_Connected {
@ -432,14 +495,16 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wshrpc.ConnKeywords
connectAllowed = true
}
})
log.Printf("Connect %s\n", conn.GetName())
if !connectAllowed {
conn.Infof(ctx, "cannot connect to when status is %q\n", conn.GetStatus())
return fmt.Errorf("cannot connect to %q when status is %q", conn.GetName(), conn.GetStatus())
}
conn.Infof(ctx, "trying to connect to %q...\n", conn.GetName())
conn.FireConnChangeEvent()
err := conn.connectInternal(ctx, connFlags)
conn.WithLock(func() {
if err != nil {
conn.Infof(ctx, "ERROR %v\n\n", err)
conn.Status = Status_Error
conn.Error = err.Error()
conn.close_nolock()
@ -447,6 +512,7 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wshrpc.ConnKeywords
Conn: map[string]int{"ssh:connecterror": 1},
}, "ssh-connconnect")
} else {
conn.Infof(ctx, "successfully connected (wsh:%v)\n\n", conn.WshEnabled.Load())
conn.Status = Status_Connected
conn.LastConnectTime = time.Now().UnixMilli()
if conn.ActiveConnNum == 0 {
@ -465,7 +531,7 @@ func (conn *SSHConn) Connect(ctx context.Context, connFlags *wshrpc.ConnKeywords
// logic for saving connection and potential flags (we only save once a connection has been made successfully)
// at the moment, identity files is the only saved flag
var identityFiles []string
existingConfig := wconfig.ReadFullConfig()
existingConfig := wconfig.GetWatcher().GetFullConfig()
existingConnection, ok := existingConfig.Connections[conn.GetName()]
if ok {
identityFiles = existingConnection.SshIdentityFile
@ -499,79 +565,143 @@ func (conn *SSHConn) WithLock(fn func()) {
fn()
}
func (conn *SSHConn) connectInternal(ctx context.Context, connFlags *wshrpc.ConnKeywords) error {
client, _, err := remote.ConnectToClient(ctx, conn.Opts, nil, 0, connFlags)
if err != nil {
log.Printf("error: failed to connect to client %s: %s\n", conn.GetName(), err)
return err
}
fmtAddr := knownhosts.Normalize(fmt.Sprintf("%s@%s", client.User(), client.RemoteAddr().String()))
clientDisplayName := fmt.Sprintf("%s (%s)", conn.GetName(), fmtAddr)
conn.WithLock(func() {
conn.Client = client
})
config := wconfig.ReadFullConfig()
func WithLockRtn[T any](conn *SSHConn, fn func() T) T {
conn.Lock.Lock()
defer conn.Lock.Unlock()
return fn()
}
// returns (enable-wsh, ask-before-install)
func (conn *SSHConn) getConnWshSettings() (bool, bool) {
config := wconfig.GetWatcher().GetFullConfig()
enableWsh := config.Settings.ConnWshEnabled
askBeforeInstall := config.Settings.ConnAskBeforeWshInstall
askBeforeInstall := wconfig.DefaultBoolPtr(config.Settings.ConnAskBeforeWshInstall, true)
connSettings, ok := config.Connections[conn.GetName()]
if ok {
if connSettings.ConnWshEnabled != nil {
enableWsh = *connSettings.ConnWshEnabled
}
if connSettings.ConnAskBeforeWshInstall != nil {
// if the connection object exists, and conn:askbeforewshinstall is not set, the user must have allowed it
// TODO: in v0.12+ this should be removed. we'll explicitly write a "false" into the connection object on successful connection
if connSettings.ConnAskBeforeWshInstall == nil {
askBeforeInstall = false
} else {
askBeforeInstall = *connSettings.ConnAskBeforeWshInstall
}
}
if enableWsh {
installErr := conn.CheckAndInstallWsh(ctx, clientDisplayName, &WshInstallOpts{NoUserPrompt: !askBeforeInstall})
if errors.Is(installErr, &WshInstallSkipError{}) {
// skips are not true errors
conn.WithLock(func() {
conn.WshEnabled.Store(false)
})
} else if installErr != nil {
log.Printf("error: unable to install wsh shell extensions for %s: %v\n", conn.GetName(), err)
log.Print("attempting to run with nowsh instead")
conn.WithLock(func() {
conn.WshError = installErr.Error()
})
conn.WshEnabled.Store(false)
} else {
conn.WshEnabled.Store(true)
}
return enableWsh, askBeforeInstall
}
if conn.WshEnabled.Load() {
dsErr := conn.OpenDomainSocketListener()
var csErr error
if dsErr != nil {
log.Printf("error: unable to open domain socket listener for %s: %v\n", conn.GetName(), dsErr)
} else {
csErr = conn.StartConnServer()
if csErr != nil {
log.Printf("error: unable to start conn server for %s: %v\n", conn.GetName(), csErr)
}
}
if dsErr != nil || csErr != nil {
log.Print("attempting to run with nowsh instead")
var errmsgs []string
if dsErr != nil {
errmsgs = append(errmsgs, fmt.Sprintf("domain socket error: %s", dsErr.Error()))
}
if csErr != nil {
errmsgs = append(errmsgs, fmt.Sprintf("conn server error: %s", csErr.Error()))
}
combinedErr := fmt.Errorf("%s", strings.Join(errmsgs, " | "))
conn.WithLock(func() {
conn.WshError = combinedErr.Error()
})
conn.WshEnabled.Store(false)
}
}
} else {
conn.WshEnabled.Store(false)
type WshCheckResult struct {
WshEnabled bool
ClientVersion string
NoWshReason string
WshError error
}
// returns (wsh-enabled, clientVersion, text-reason, wshError)
func (conn *SSHConn) tryEnableWsh(ctx context.Context, clientDisplayName string) WshCheckResult {
conn.Infof(ctx, "running tryEnableWsh...\n")
enableWsh, askBeforeInstall := conn.getConnWshSettings()
conn.Infof(ctx, "wsh settings enable:%v ask:%v\n", enableWsh, askBeforeInstall)
if !enableWsh {
return WshCheckResult{NoWshReason: "conn:wshenabled set to false"}
}
conn.HasWaiter.Store(true)
if askBeforeInstall {
allowInstall, err := conn.getPermissionToInstallWsh(ctx, clientDisplayName)
if err != nil {
log.Printf("error getting permission to install wsh: %v\n", err)
return WshCheckResult{NoWshReason: "error getting user permission to install", WshError: err}
}
if !allowInstall {
return WshCheckResult{NoWshReason: "user selected not to install wsh extensions"}
}
}
err := conn.OpenDomainSocketListener(ctx)
if err != nil {
conn.Infof(ctx, "ERROR opening domain socket listener: %v\n", err)
err = fmt.Errorf("error opening domain socket listener: %w", err)
return WshCheckResult{NoWshReason: "error opening domain socket", WshError: err}
}
needsInstall, clientVersion, err := conn.StartConnServer(ctx)
if err != nil {
conn.Infof(ctx, "ERROR starting conn server: %v\n", err)
err = fmt.Errorf("error starting conn server: %w", err)
return WshCheckResult{NoWshReason: "error starting connserver", WshError: err}
}
if needsInstall {
conn.Infof(ctx, "connserver needs to be (re)installed\n")
err = conn.InstallWsh(ctx)
if err != nil {
conn.Infof(ctx, "ERROR installing wsh: %v\n", err)
err = fmt.Errorf("error installing wsh: %w", err)
return WshCheckResult{NoWshReason: "error installing wsh/connserver", WshError: err}
}
needsInstall, clientVersion, err = conn.StartConnServer(ctx)
if err != nil {
conn.Infof(ctx, "ERROR starting conn server (after install): %v\n", err)
err = fmt.Errorf("error starting conn server (after install): %w", err)
return WshCheckResult{NoWshReason: "error starting connserver", WshError: err}
}
if needsInstall {
conn.Infof(ctx, "conn server not installed correctly (after install)\n")
err = fmt.Errorf("conn server not installed correctly (after install)")
return WshCheckResult{NoWshReason: "connserver not installed properly", WshError: err}
}
return WshCheckResult{WshEnabled: true, ClientVersion: clientVersion}
} else {
return WshCheckResult{WshEnabled: true, ClientVersion: clientVersion}
}
}
func (conn *SSHConn) persistWshInstalled(ctx context.Context, result WshCheckResult) {
conn.WshEnabled.Store(result.WshEnabled)
conn.SetWshError(result.WshError)
conn.WithLock(func() {
conn.NoWshReason = result.NoWshReason
conn.WshVersion = result.ClientVersion
})
config := wconfig.GetWatcher().GetFullConfig()
connSettings, ok := config.Connections[conn.GetName()]
if ok && connSettings.ConnWshEnabled != nil {
return
}
meta := make(map[string]any)
meta["conn:wshenabled"] = result.WshEnabled
err := wconfig.SetConnectionsConfigValue(conn.GetName(), meta)
if err != nil {
conn.Infof(ctx, "WARN could not write conn:wshenabled=%v to connections.json: %v\n", result.WshEnabled, err)
log.Printf("warning: error writing to connections file: %v", err)
}
// doesn't return an error since none of this is required for connection to work
}
// returns (connect-error)
func (conn *SSHConn) connectInternal(ctx context.Context, connFlags *wshrpc.ConnKeywords) error {
conn.Infof(ctx, "connectInternal %s\n", conn.GetName())
client, _, err := remote.ConnectToClient(ctx, conn.Opts, nil, 0, connFlags)
if err != nil {
conn.Infof(ctx, "ERROR ConnectToClient: %s\n", remote.SimpleMessageFromPossibleConnectionError(err))
log.Printf("error: failed to connect to client %s: %s\n", conn.GetName(), err)
return err
}
conn.WithLock(func() {
conn.Client = client
})
go conn.waitForDisconnect()
fmtAddr := knownhosts.Normalize(fmt.Sprintf("%s@%s", client.User(), client.RemoteAddr().String()))
conn.Infof(ctx, "normalized knownhosts address: %s\n", fmtAddr)
clientDisplayName := fmt.Sprintf("%s (%s)", conn.GetName(), fmtAddr)
wshResult := conn.tryEnableWsh(ctx, clientDisplayName)
if !wshResult.WshEnabled {
if wshResult.WshError != nil {
conn.Infof(ctx, "ERROR enabling wsh: %v\n", wshResult.WshError)
conn.Infof(ctx, "will connect with wsh disabled\n")
} else {
conn.Infof(ctx, "wsh not enabled: %s\n", wshResult.NoWshReason)
}
}
conn.persistWshInstalled(ctx, wshResult)
return nil
}
@ -597,6 +727,22 @@ func (conn *SSHConn) waitForDisconnect() {
})
}
func (conn *SSHConn) SetWshError(err error) {
conn.WithLock(func() {
if err == nil {
conn.WshError = ""
} else {
conn.WshError = err.Error()
}
})
}
func (conn *SSHConn) ClearWshError() {
conn.WithLock(func() {
conn.WshError = ""
})
}
func getConnInternal(opts *remote.SSHOpts) *SSHConn {
globalLock.Lock()
defer globalLock.Unlock()
@ -743,7 +889,7 @@ func GetConnectionsList() ([]string, error) {
func GetConnectionsFromInternalConfig() []string {
var internalNames []string
config := wconfig.ReadFullConfig()
config := wconfig.GetWatcher().GetFullConfig()
for internalName := range config.Connections {
if strings.HasPrefix(internalName, "wsl://") {
// don't add wsl conns to this list

View File

@ -20,7 +20,6 @@ import (
"github.com/wavetermdev/waveterm/pkg/util/shellutil"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"golang.org/x/crypto/ssh"
"golang.org/x/mod/semver"
)
var userHostRe = regexp.MustCompile(`^([a-zA-Z0-9][a-zA-Z0-9._@\\-]*@)?([a-zA-Z0-9][a-zA-Z0-9.-]*)(?::([0-9]+))?$`)
@ -55,32 +54,6 @@ func DetectShell(client *ssh.Client) (string, error) {
return fmt.Sprintf(`"%s"`, strings.TrimSpace(string(out))), nil
}
// returns a valid semver version string
func GetWshVersion(client *ssh.Client) (string, error) {
wshPath := GetWshPath(client)
session, err := client.NewSession()
if err != nil {
return "", err
}
out, err := session.Output(wshPath + " version")
if err != nil {
return "", err
}
// output is expected to be in the form of "wsh v0.10.4"
// should strip off the "wsh" prefix, and return a semver object
fields := strings.Fields(strings.TrimSpace(string(out)))
if len(fields) != 2 {
return "", fmt.Errorf("unexpected output from wsh version: %s", out)
}
wshVersion := strings.TrimSpace(fields[1])
if !semver.IsValid(wshVersion) {
return "", fmt.Errorf("invalid semver version: %s", wshVersion)
}
return wshVersion, nil
}
func GetWshPath(client *ssh.Client) string {
defaultPath := wavebase.RemoteFullWshBinPath
session, err := client.NewSession()

View File

@ -23,6 +23,7 @@ import (
"github.com/kevinburke/ssh_config"
"github.com/skeema/knownhosts"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/trimquotes"
"github.com/wavetermdev/waveterm/pkg/userinput"
@ -72,9 +73,19 @@ type ConnectionError struct {
func (ce ConnectionError) Error() string {
if ce.CurrentClient == nil {
return fmt.Sprintf("Connecting to %+#v, Error: %v", ce.NextOpts, ce.Err)
return fmt.Sprintf("Connecting to %s, Error: %v", ce.NextOpts, ce.Err)
}
return fmt.Sprintf("Connecting from %v to %+#v (jump number %d), Error: %v", ce.CurrentClient, ce.NextOpts, ce.JumpNum, ce.Err)
return fmt.Sprintf("Connecting from %v to %s (jump number %d), Error: %v", ce.CurrentClient, ce.NextOpts, ce.JumpNum, ce.Err)
}
func SimpleMessageFromPossibleConnectionError(err error) string {
if err == nil {
return ""
}
if ce, ok := err.(ConnectionError); ok {
return ce.Err.Error()
}
return err.Error()
}
// This exists to trick the ssh library into continuing to try
@ -142,6 +153,7 @@ func createPublicKeyCallback(connCtx context.Context, sshKeywords *wshrpc.ConnKe
return nil, ConnectionError{ConnectionDebugInfo: debugInfo, Err: fmt.Errorf("no identity files remaining")}
}
identityFile := (*identityFilesPtr)[0]
blocklogger.Infof(connCtx, "[conndebug] trying keyfile %q...\n", identityFile)
*identityFilesPtr = (*identityFilesPtr)[1:]
privateKey, ok := existingKeys[identityFile]
if !ok {
@ -208,6 +220,7 @@ func createPublicKeyCallback(connCtx context.Context, sshKeywords *wshrpc.ConnKe
func createInteractivePasswordCallbackPrompt(connCtx context.Context, remoteDisplayName string, debugInfo *ConnectionDebugInfo) func() (secret string, err error) {
return func() (secret string, err error) {
blocklogger.Infof(connCtx, "[conndebug] Password Authentication requested from connection %s...\n", remoteDisplayName)
ctx, cancelFn := context.WithTimeout(connCtx, 60*time.Second)
defer cancelFn()
queryText := fmt.Sprintf(
@ -222,8 +235,10 @@ func createInteractivePasswordCallbackPrompt(connCtx context.Context, remoteDisp
}
response, err := userinput.GetUserInput(ctx, request)
if err != nil {
blocklogger.Infof(connCtx, "[conndebug] ERROR Password Authentication failed: %v\n", SimpleMessageFromPossibleConnectionError(err))
return "", ConnectionError{ConnectionDebugInfo: debugInfo, Err: err}
}
blocklogger.Infof(connCtx, "[conndebug] got password from user, sending to ssh\n")
return response.Text, nil
}
}
@ -557,7 +572,10 @@ func createClientConfig(connCtx context.Context, sshKeywords *wshrpc.ConnKeyword
chosenUser := utilfn.SafeDeref(sshKeywords.SshUser)
chosenHostName := utilfn.SafeDeref(sshKeywords.SshHostName)
chosenPort := utilfn.SafeDeref(sshKeywords.SshPort)
remoteName := chosenUser + xknownhosts.Normalize(chosenHostName+":"+chosenPort)
remoteName := xknownhosts.Normalize(chosenHostName + ":" + chosenPort)
if chosenUser != "" {
remoteName = chosenUser + "@" + remoteName
}
var authSockSigners []ssh.Signer
var agentClient agent.ExtendedAgent
@ -619,24 +637,31 @@ func connectInternal(ctx context.Context, networkAddr string, clientConfig *ssh.
var err error
if currentClient == nil {
d := net.Dialer{Timeout: clientConfig.Timeout}
blocklogger.Infof(ctx, "[conndebug] ssh dial %s\n", networkAddr)
clientConn, err = d.DialContext(ctx, "tcp", networkAddr)
if err != nil {
blocklogger.Infof(ctx, "[conndebug] ERROR dial error: %v\n", err)
return nil, err
}
} else {
blocklogger.Infof(ctx, "[conndebug] ssh dial (from client) %s\n", networkAddr)
clientConn, err = currentClient.DialContext(ctx, "tcp", networkAddr)
if err != nil {
blocklogger.Infof(ctx, "[conndebug] ERROR dial error: %v\n", err)
return nil, err
}
}
c, chans, reqs, err := ssh.NewClientConn(clientConn, networkAddr, clientConfig)
if err != nil {
blocklogger.Infof(ctx, "[conndebug] ERROR ssh auth/negotiation: %s\n", SimpleMessageFromPossibleConnectionError(err))
return nil, err
}
blocklogger.Infof(ctx, "[conndebug] successful ssh connection to %s\n", networkAddr)
return ssh.NewClient(c, chans, reqs), nil
}
func ConnectToClient(connCtx context.Context, opts *SSHOpts, currentClient *ssh.Client, jumpNum int32, connFlags *wshrpc.ConnKeywords) (*ssh.Client, int32, error) {
blocklogger.Infof(connCtx, "[conndebug] ConnectToClient %s (jump:%d)...\n", opts.String(), jumpNum)
debugInfo := &ConnectionDebugInfo{
CurrentClient: currentClient,
NextOpts: opts,
@ -660,7 +685,7 @@ func ConnectToClient(connCtx context.Context, opts *SSHOpts, currentClient *ssh.
}
rawName := opts.String()
fullConfig := wconfig.ReadFullConfig()
fullConfig := wconfig.GetWatcher().GetFullConfig()
internalSshConfigKeywords, ok := fullConfig.Connections[rawName]
if !ok {
internalSshConfigKeywords = wshrpc.ConnKeywords{}

View File

@ -95,6 +95,7 @@ const (
MetaKey_TermVDomToolbarBlockId = "term:vdomtoolbarblockid"
MetaKey_TermTransparency = "term:transparency"
MetaKey_TermAllowBracketedPaste = "term:allowbracketedpaste"
MetaKey_TermConnDebug = "term:conndebug"
MetaKey_WebZoom = "web:zoom"
MetaKey_WebHideNav = "web:hidenav"

View File

@ -96,6 +96,7 @@ type MetaTSType struct {
TermVDomToolbarBlockId string `json:"term:vdomtoolbarblockid,omitempty"`
TermTransparency *float64 `json:"term:transparency,omitempty"` // default 0.5
TermAllowBracketedPaste *bool `json:"term:allowbracketedpaste,omitempty"`
TermConnDebug string `json:"term:conndebug,omitempty"` // null, info, debug
WebZoom float64 `json:"web:zoom,omitempty"`
WebHideNav *bool `json:"web:hidenav,omitempty"`

View File

@ -115,9 +115,9 @@ type SettingsType struct {
TelemetryClear bool `json:"telemetry:*,omitempty"`
TelemetryEnabled bool `json:"telemetry:enabled,omitempty"`
ConnClear bool `json:"conn:*,omitempty"`
ConnAskBeforeWshInstall bool `json:"conn:askbeforewshinstall,omitempty"`
ConnWshEnabled bool `json:"conn:wshenabled,omitempty"`
ConnClear bool `json:"conn:*,omitempty"`
ConnAskBeforeWshInstall *bool `json:"conn:askbeforewshinstall,omitempty"`
ConnWshEnabled bool `json:"conn:wshenabled,omitempty"`
}
type ConfigError struct {
@ -136,6 +136,13 @@ type FullConfigType struct {
ConfigErrors []ConfigError `json:"configerrors" configfile:"-"`
}
func DefaultBoolPtr(arg *bool, def bool) bool {
if arg == nil {
return def
}
return *arg
}
func goBackWS(barr []byte, offset int) int {
if offset >= len(barr) {
offset = offset - 1
@ -307,6 +314,8 @@ func readConfigPart(partName string, simpleMerge bool) (waveobj.MetaMapType, []C
return mergeMetaMap(rtn, homeConfigs, simpleMerge), allErrs
}
// this function should only be called by the wconfig code.
// in golang code, the best way to get the current config is via the watcher -- wconfig.GetWatcher().GetFullConfig()
func ReadFullConfig() FullConfigType {
var fullConfig FullConfigType
configRType := reflect.TypeOf(fullConfig)

View File

@ -50,7 +50,7 @@ func ConnDisconnectCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts)
}
// command "connensure", wshserver.ConnEnsureCommand
func ConnEnsureCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
func ConnEnsureCommand(w *wshutil.WshRpc, data wshrpc.ConnExtData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "connensure", data, opts)
return err
}
@ -62,7 +62,7 @@ func ConnListCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) ([]string, error)
}
// command "connreinstallwsh", wshserver.ConnReinstallWshCommand
func ConnReinstallWshCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
func ConnReinstallWshCommand(w *wshutil.WshRpc, data wshrpc.ConnExtData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "connreinstallwsh", data, opts)
return err
}
@ -73,6 +73,12 @@ func ConnStatusCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) ([]wshrpc.ConnSt
return resp, err
}
// command "controllerappendoutput", wshserver.ControllerAppendOutputCommand
func ControllerAppendOutputCommand(w *wshutil.WshRpc, data wshrpc.CommandControllerAppendOutputData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "controllerappendoutput", data, opts)
return err
}
// command "controllerinput", wshserver.ControllerInputCommand
func ControllerInputCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockInputData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "controllerinput", data, opts)

View File

@ -118,6 +118,7 @@ type WshRpcInterface interface {
ControllerInputCommand(ctx context.Context, data CommandBlockInputData) error
ControllerStopCommand(ctx context.Context, blockId string) error
ControllerResyncCommand(ctx context.Context, data CommandControllerResyncData) error
ControllerAppendOutputCommand(ctx context.Context, data CommandControllerAppendOutputData) error
ResolveIdsCommand(ctx context.Context, data CommandResolveIdsData) (CommandResolveIdsRtnData, error)
CreateBlockCommand(ctx context.Context, data CommandCreateBlockData) (waveobj.ORef, error)
CreateSubBlockCommand(ctx context.Context, data CommandCreateSubBlockData) (waveobj.ORef, error)
@ -154,8 +155,8 @@ type WshRpcInterface interface {
// connection functions
ConnStatusCommand(ctx context.Context) ([]ConnStatus, error)
WslStatusCommand(ctx context.Context) ([]ConnStatus, error)
ConnEnsureCommand(ctx context.Context, connName string) error
ConnReinstallWshCommand(ctx context.Context, connName string) error
ConnEnsureCommand(ctx context.Context, data ConnExtData) error
ConnReinstallWshCommand(ctx context.Context, data ConnExtData) error
ConnConnectCommand(ctx context.Context, connRequest ConnRequest) error
ConnDisconnectCommand(ctx context.Context, connName string) error
ConnListCommand(ctx context.Context) ([]string, error)
@ -311,6 +312,11 @@ type CommandControllerResyncData struct {
RtOpts *waveobj.RuntimeOpts `json:"rtopts,omitempty"`
}
type CommandControllerAppendOutputData struct {
BlockId string `json:"blockid"`
Data64 string `json:"data64"`
}
type CommandBlockInputData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
InputData64 string `json:"inputdata64,omitempty"`
@ -459,9 +465,10 @@ type CommandRemoteWriteFileData struct {
}
type ConnKeywords struct {
ConnWshEnabled *bool `json:"conn:wshenabled,omitempty"`
ConnAskBeforeWshInstall *bool `json:"conn:askbeforewshinstall,omitempty"`
ConnOverrideConfig bool `json:"conn:overrideconfig,omitempty"`
ConnWshEnabled *bool `json:"conn:wshenabled,omitempty"`
ConnAskBeforeWshInstall *bool `json:"conn:askbeforewshinstall,omitempty"`
ConnOverrideConfig bool `json:"conn:overrideconfig,omitempty"`
ConnWshPath string `json:"conn:wshpath,omitempty"`
DisplayHidden *bool `json:"display:hidden,omitempty"`
DisplayOrder float32 `json:"display:order,omitempty"`
@ -488,8 +495,9 @@ type ConnKeywords struct {
}
type ConnRequest struct {
Host string `json:"host"`
Keywords ConnKeywords `json:"keywords,omitempty"`
Host string `json:"host"`
Keywords ConnKeywords `json:"keywords,omitempty"`
LogBlockId string `json:"logblockid,omitempty"`
}
const (
@ -534,6 +542,8 @@ type ConnStatus struct {
ActiveConnNum int `json:"activeconnnum"`
Error string `json:"error,omitempty"`
WshError string `json:"wsherror,omitempty"`
NoWshReason string `json:"nowshreason,omitempty"`
WshVersion string `json:"wshversion,omitempty"`
}
type WebSelectorOpts struct {
@ -646,3 +656,8 @@ type ActivityUpdate struct {
WshCmds map[string]int `json:"wshcmds,omitempty"`
Conn map[string]int `json:"conn,omitempty"`
}
type ConnExtData struct {
ConnName string `json:"connname"`
LogBlockId string `json:"logblockid,omitempty"`
}

View File

@ -19,6 +19,7 @@ import (
"github.com/skratchdot/open-golang/open"
"github.com/wavetermdev/waveterm/pkg/blockcontroller"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote"
@ -237,6 +238,7 @@ func (ws *WshServer) ControllerStopCommand(ctx context.Context, blockId string)
}
func (ws *WshServer) ControllerResyncCommand(ctx context.Context, data wshrpc.CommandControllerResyncData) error {
ctx = termCtxWithLogBlockId(ctx, data.BlockId)
return blockcontroller.ResyncController(ctx, data.TabId, data.BlockId, data.RtOpts, data.ForceRestart)
}
@ -260,6 +262,19 @@ func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.Com
return bc.SendInput(inputUnion)
}
func (ws *WshServer) ControllerAppendOutputCommand(ctx context.Context, data wshrpc.CommandControllerAppendOutputData) error {
outputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(data.Data64)))
nw, err := base64.StdEncoding.Decode(outputBuf, []byte(data.Data64))
if err != nil {
return fmt.Errorf("error decoding output data: %w", err)
}
err = blockcontroller.HandleAppendBlockFile(data.BlockId, blockcontroller.BlockFile_Term, outputBuf[:nw])
if err != nil {
return fmt.Errorf("error appending to block file: %w", err)
}
return nil
}
func (ws *WshServer) FileCreateCommand(ctx context.Context, data wshrpc.CommandFileCreateData) error {
var fileOpts filestore.FileOptsType
if data.Opts != nil {
@ -596,12 +611,28 @@ func (ws *WshServer) WslStatusCommand(ctx context.Context) ([]wshrpc.ConnStatus,
return rtn, nil
}
func (ws *WshServer) ConnEnsureCommand(ctx context.Context, connName string) error {
if strings.HasPrefix(connName, "wsl://") {
distroName := strings.TrimPrefix(connName, "wsl://")
func termCtxWithLogBlockId(ctx context.Context, logBlockId string) context.Context {
if logBlockId == "" {
return ctx
}
block, err := wstore.DBMustGet[*waveobj.Block](ctx, logBlockId)
if err != nil {
return ctx
}
connDebug := block.Meta.GetString(waveobj.MetaKey_TermConnDebug, "")
if connDebug == "" {
return ctx
}
return blocklogger.ContextWithLogBlockId(ctx, logBlockId, connDebug == "debug")
}
func (ws *WshServer) ConnEnsureCommand(ctx context.Context, data wshrpc.ConnExtData) error {
ctx = termCtxWithLogBlockId(ctx, data.LogBlockId)
if strings.HasPrefix(data.ConnName, "wsl://") {
distroName := strings.TrimPrefix(data.ConnName, "wsl://")
return wsl.EnsureConnection(ctx, distroName)
}
return conncontroller.EnsureConnection(ctx, connName)
return conncontroller.EnsureConnection(ctx, data.ConnName)
}
func (ws *WshServer) ConnDisconnectCommand(ctx context.Context, connName string) error {
@ -625,6 +656,7 @@ func (ws *WshServer) ConnDisconnectCommand(ctx context.Context, connName string)
}
func (ws *WshServer) ConnConnectCommand(ctx context.Context, connRequest wshrpc.ConnRequest) error {
ctx = termCtxWithLogBlockId(ctx, connRequest.LogBlockId)
connName := connRequest.Host
if strings.HasPrefix(connName, "wsl://") {
distroName := strings.TrimPrefix(connName, "wsl://")
@ -645,7 +677,9 @@ func (ws *WshServer) ConnConnectCommand(ctx context.Context, connRequest wshrpc.
return conn.Connect(ctx, &connRequest.Keywords)
}
func (ws *WshServer) ConnReinstallWshCommand(ctx context.Context, connName string) error {
func (ws *WshServer) ConnReinstallWshCommand(ctx context.Context, data wshrpc.ConnExtData) error {
ctx = termCtxWithLogBlockId(ctx, data.LogBlockId)
connName := data.ConnName
if strings.HasPrefix(connName, "wsl://") {
distroName := strings.TrimPrefix(connName, "wsl://")
conn := wsl.GetWslConn(ctx, distroName, false)
@ -662,7 +696,7 @@ func (ws *WshServer) ConnReinstallWshCommand(ctx context.Context, connName strin
if conn == nil {
return fmt.Errorf("connection not found: %s", connName)
}
return conn.CheckAndInstallWsh(ctx, connName, &conncontroller.WshInstallOpts{Force: true, NoUserPrompt: true})
return conn.InstallWsh(ctx)
}
func (ws *WshServer) ConnListCommand(ctx context.Context) ([]string, error) {
@ -708,9 +742,7 @@ func (ws *WshServer) DismissWshFailCommand(ctx context.Context, connName string)
if conn == nil {
return fmt.Errorf("connection %s not found", connName)
}
conn.WithLock(func() {
conn.WshError = ""
})
conn.ClearWshError()
conn.FireConnChangeEvent()
return nil
}

View File

@ -5,8 +5,10 @@ package wshutil
import (
"bytes"
"context"
"fmt"
"io"
"time"
)
// special I/O wrappers for wshrpc
@ -55,6 +57,38 @@ func StreamToLines(input io.Reader, lineFn func([]byte)) error {
}
}
type LineOutput struct {
Line string
Error error
}
// starts a goroutine to drive the channel
func StreamToLinesChan(input io.Reader) chan LineOutput {
ch := make(chan LineOutput)
go func() {
defer close(ch)
err := StreamToLines(input, func(line []byte) {
ch <- LineOutput{Line: string(line)}
})
if err != nil && err != io.EOF {
ch <- LineOutput{Error: err}
}
}()
return ch
}
func ReadLineWithTimeout(ch chan LineOutput, timeout time.Duration) (string, error) {
select {
case output := <-ch:
if output.Error != nil {
return "", output.Error
}
return output.Line, nil
case <-time.After(timeout):
return "", context.DeadlineExceeded
}
}
func AdaptStreamToMsgCh(input io.Reader, output chan []byte) error {
return StreamToLines(input, func(line []byte) {
output <- line

View File

@ -446,8 +446,9 @@ func (conn *WslConn) connectInternal(ctx context.Context) error {
if err != nil {
return err
}
config := wconfig.ReadFullConfig()
installErr := conn.CheckAndInstallWsh(ctx, conn.GetName(), &WshInstallOpts{NoUserPrompt: !config.Settings.ConnAskBeforeWshInstall})
config := wconfig.GetWatcher().GetFullConfig()
wshAsk := wconfig.DefaultBoolPtr(config.Settings.ConnAskBeforeWshInstall, true)
installErr := conn.CheckAndInstallWsh(ctx, conn.GetName(), &WshInstallOpts{NoUserPrompt: !wshAsk})
if installErr != nil {
return fmt.Errorf("conncontroller %s wsh install error: %v", conn.GetName(), installErr)
}