diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 03da995ad..74b350277 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -18,6 +18,7 @@ import ( "time" "github.com/wavetermdev/thenextwave/pkg/authkey" + "github.com/wavetermdev/thenextwave/pkg/blockcontroller" "github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/service" "github.com/wavetermdev/thenextwave/pkg/telemetry" @@ -53,6 +54,7 @@ func doShutdown(reason string) { log.Printf("shutting down: %s\n", reason) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() + go blockcontroller.StopAllBlockControllers() shutdownActivityUpdate() sendTelemetryWrapper() // TODO deal with flush in progress @@ -61,7 +63,7 @@ func doShutdown(reason string) { if watcher != nil { watcher.Close() } - time.Sleep(200 * time.Millisecond) + time.Sleep(500 * time.Millisecond) os.Exit(0) }) } diff --git a/frontend/app/block/block.less b/frontend/app/block/block.less index 2e6962306..b4718cde4 100644 --- a/frontend/app/block/block.less +++ b/frontend/app/block/block.less @@ -276,6 +276,45 @@ } } + .connstatus-overlay { + position: absolute; + top: var(--header-height); + left: 2px; + right: 2px; + background-color: rgba(0, 0, 0, 0.3); + z-index: var(--zindex-block-mask-inner); + display: flex; + align-items: center; + justify-content: flex-start; + flex-direction: column; + overflow: hidden; + + .connstatus-mainelem { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + padding: 20px; + width: 100%; + background-color: rgba(60, 60, 60, 0.65); + backdrop-filter: blur(3px); + font: var(--base-font); + color: var(--secondary-text-color); + + .connstatus-error { + color: var(--error-color); + } + + .connstatus-actions { + margin-top: 10px; + display: flex; + gap: 5px; + flex-direction: row; + flex-wrap: wrap; + } + } + } + .block-mask { position: absolute; top: 0; @@ -288,12 +327,12 @@ border-radius: calc(var(--block-border-radius) + 2px); z-index: var(--zindex-block-mask-inner); - &.is-layoutmode { + &.show-block-mask { user-select: none; pointer-events: auto; } - &.is-layoutmode .block-mask-inner { + &.show-block-mask .block-mask-inner { margin-top: var(--header-height); // TODO fix this magic background-color: rgba(0, 0, 0, 0.5); height: calc(100% - var(--header-height)); diff --git a/frontend/app/block/block.tsx b/frontend/app/block/block.tsx index 8491abbb7..02e8997c4 100644 --- a/frontend/app/block/block.tsx +++ b/frontend/app/block/block.tsx @@ -7,7 +7,12 @@ import { PreviewModel, PreviewView, makePreviewModel } from "@/app/view/preview/ import { ErrorBoundary } from "@/element/errorboundary"; import { CenteredDiv } from "@/element/quickelems"; import { NodeModel, useDebouncedNodeInnerRect } from "@/layout/index"; -import { counterInc, getViewModel, registerViewModel, unregisterViewModel } from "@/store/global"; +import { + counterInc, + getBlockComponentModel, + registerBlockComponentModel, + unregisterBlockComponentModel, +} from "@/store/global"; import * as WOS from "@/store/wos"; import { getElemAsStr } from "@/util/focusutil"; import * as util from "@/util/util"; @@ -251,14 +256,15 @@ const Block = React.memo((props: BlockProps) => { counterInc("render-Block"); counterInc("render-Block-" + props.nodeModel.blockId.substring(0, 8)); const [blockData, loading] = WOS.useWaveObjectValue(WOS.makeORef("block", props.nodeModel.blockId)); - let viewModel = getViewModel(props.nodeModel.blockId); + const bcm = getBlockComponentModel(props.nodeModel.blockId); + let viewModel = bcm?.viewModel; if (viewModel == null || viewModel.viewType != blockData?.meta?.view) { viewModel = makeViewModel(props.nodeModel.blockId, blockData?.meta?.view, props.nodeModel); - registerViewModel(props.nodeModel.blockId, viewModel); + registerBlockComponentModel(props.nodeModel.blockId, { viewModel }); } React.useEffect(() => { return () => { - unregisterViewModel(props.nodeModel.blockId); + unregisterBlockComponentModel(props.nodeModel.blockId); }; }, []); if (loading || util.isBlank(props.nodeModel.blockId) || blockData == null) { diff --git a/frontend/app/block/blockframe.tsx b/frontend/app/block/blockframe.tsx index 8469a97bd..9c45f2868 100644 --- a/frontend/app/block/blockframe.tsx +++ b/frontend/app/block/blockframe.tsx @@ -13,7 +13,15 @@ import { import { Button } from "@/app/element/button"; import { TypeAheadModal } from "@/app/modals/typeaheadmodal"; import { ContextMenuModel } from "@/app/store/contextmenu"; -import { atoms, globalStore, useBlockAtom, useSettingsKeyAtom, WOS } from "@/app/store/global"; +import { + atoms, + getBlockComponentModel, + getConnStatusAtom, + globalStore, + useBlockAtom, + useSettingsKeyAtom, + WOS, +} from "@/app/store/global"; import * as services from "@/app/store/services"; import { WshServer } from "@/app/store/wshserver"; import { MagnifyIcon } from "@/element/magnify"; @@ -61,14 +69,6 @@ function handleHeaderContextMenu( }, }, ]; - const blockController = blockData?.meta?.controller; - if (!util.isBlank(blockController)) { - menu.push({ type: "separator" }); - menu.push({ - label: "Restart Controller", - click: () => WshServer.ControllerRestartCommand({ blockid: blockData.oid }), - }); - } const extraItems = viewModel?.getSettingsMenuItems?.(); if (extraItems && extraItems.length > 0) menu.push({ type: "separator" }, ...extraItems); menu.push( @@ -256,13 +256,70 @@ function renderHeaderElements(headerTextUnion: HeaderElem[], preview: boolean): return headerTextElems; } -const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => { +const ConnStatusOverlay = React.memo( + ({ + nodeModel, + viewModel, + changeConnModalAtom, + }: { + nodeModel: NodeModel; + viewModel: ViewModel; + changeConnModalAtom: jotai.PrimitiveAtom; + }) => { + const [blockData] = WOS.useWaveObjectValue(WOS.makeORef("block", nodeModel.blockId)); + const [connModalOpen, setConnModalOpen] = jotai.useAtom(changeConnModalAtom); + const connName = blockData.meta?.connection; + const connStatus = jotai.useAtomValue(getConnStatusAtom(connName)); + const isLayoutMode = jotai.useAtomValue(atoms.controlShiftDelayAtom); + const handleTryReconnect = React.useCallback(() => { + const prtn = WshServer.ConnConnectCommand(connName, { timeout: 60000 }); + prtn.catch((e) => console.log("error reconnecting", connName, e)); + }, [connName]); + const handleSwitchConnection = React.useCallback(() => { + setConnModalOpen(true); + }, [setConnModalOpen]); + if (isLayoutMode || connStatus.status == "connected" || connModalOpen) { + return null; + } + let statusText = `Disconnected from "${connName}"`; + let showReconnect = true; + if (connStatus.status == "connecting") { + statusText = `Connecting to "${connName}"...`; + showReconnect = false; + } + return ( +
+
+
{statusText}
+ {!util.isBlank(connStatus.error) ? ( +
error: {connStatus.error}
+ ) : null} + {showReconnect ? ( +
+ + +
+ ) : null} +
+
+ ); + } +); + +const BlockMask = React.memo(({ nodeModel }: { nodeModel: NodeModel }) => { const isFocused = jotai.useAtomValue(nodeModel.isFocused); const blockNum = jotai.useAtomValue(nodeModel.blockNum); const isLayoutMode = jotai.useAtomValue(atoms.controlShiftDelayAtom); const [blockData] = WOS.useWaveObjectValue(WOS.makeORef("block", nodeModel.blockId)); - const style: React.CSSProperties = {}; + let showBlockMask = false; + if (!isFocused && blockData?.meta?.["frame:bordercolor"]) { style.borderColor = blockData.meta["frame:bordercolor"]; } @@ -271,6 +328,7 @@ const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => { } let innerElem = null; if (isLayoutMode) { + showBlockMask = true; innerElem = (
{blockNum}
@@ -278,11 +336,11 @@ const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => { ); } return ( -
+
{innerElem}
); -}; +}); const BlockFrame_Default_Component = (props: BlockFrameProps) => { const { nodeModel, viewModel, blockModel, preview, numBlocksInTab, children } = props; @@ -290,10 +348,42 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => { const isFocused = jotai.useAtomValue(nodeModel.isFocused); const viewIconUnion = util.useAtomValueSafe(viewModel.viewIcon) ?? blockViewToIcon(blockData?.meta?.view); const customBg = util.useAtomValueSafe(viewModel.blockBg); + const manageConnection = util.useAtomValueSafe(viewModel.manageConnection); const changeConnModalAtom = useBlockAtom(nodeModel.blockId, "changeConn", () => { return jotai.atom(false); }) as jotai.PrimitiveAtom; const connBtnRef = React.useRef(); + React.useEffect(() => { + if (!manageConnection) { + return; + } + const bcm = getBlockComponentModel(nodeModel.blockId); + if (bcm != null) { + bcm.openSwitchConnection = () => { + globalStore.set(changeConnModalAtom, true); + }; + } + return () => { + const bcm = getBlockComponentModel(nodeModel.blockId); + if (bcm != null) { + bcm.openSwitchConnection = null; + } + }; + }, [manageConnection]); + React.useEffect(() => { + // on mount, if manageConnection, call ConnEnsure + if (!manageConnection || blockData == null || preview) { + return; + } + const connName = blockData?.meta?.connection; + if (!util.isBlank(connName)) { + console.log("ensure conn", nodeModel.blockId, connName); + WshServer.ConnEnsureCommand(connName, { timeout: 60000 }).catch((e) => { + console.log("error ensuring connection", nodeModel.blockId, connName, e); + }); + } + }, [manageConnection, blockData]); + const viewIconElem = getViewIconElem(viewIconUnion, blockData); const innerStyle: React.CSSProperties = {}; if (!preview && customBg?.bg != null) { @@ -319,6 +409,7 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => { ref={blockModel?.blockRef} > +
{preview ? previewElem : children} @@ -359,6 +450,12 @@ const ChangeConnectionBlockModal = React.memo( const isNodeFocused = jotai.useAtomValue(nodeModel.isFocused); const changeConnection = React.useCallback( async (connName: string) => { + if (connName == "") { + connName = null; + } + if (connName == blockData?.meta?.connection) { + return; + } const oldCwd = blockData?.meta?.file ?? ""; let newCwd: string; if (oldCwd == "") { @@ -370,10 +467,14 @@ const ChangeConnectionBlockModal = React.memo( oref: WOS.makeORef("block", blockId), meta: { connection: connName, file: newCwd }, }); - await services.BlockService.EnsureConnection(blockId).catch((e) => console.log(e)); - await WshServer.ControllerRestartCommand({ blockid: blockId }); + const tabId = globalStore.get(atoms.activeTabId); + try { + await WshServer.ConnEnsureCommand(connName, { timeout: 60000 }); + } catch (e) { + console.log("error connecting", blockId, connName, e); + } }, - [blockId] + [blockId, blockData] ); const handleTypeAheadKeyDown = React.useCallback( (waveEvent: WaveKeyboardEvent): boolean => { diff --git a/frontend/app/block/blockutil.tsx b/frontend/app/block/blockutil.tsx index 5c2981c4d..33ba28736 100644 --- a/frontend/app/block/blockutil.tsx +++ b/frontend/app/block/blockutil.tsx @@ -160,6 +160,7 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string } const [blockData] = WOS.useWaveObjectValue(WOS.makeORef("block", blockId)); const hasController = !util.isBlank(blockData?.meta?.controller); const [controllerStatus, setControllerStatus] = React.useState(null); + const [gotInitialStatus, setGotInitialStatus] = React.useState(false); const connection = blockData?.meta?.connection ?? "local"; const connStatusAtom = getConnStatusAtom(connection); const connStatus = jotai.useAtomValue(connStatusAtom); @@ -169,6 +170,7 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string } } const initialRTStatus = services.BlockService.GetControllerStatus(blockId); initialRTStatus.then((rts) => { + setGotInitialStatus(true); setControllerStatus(rts); }); const unsubFn = waveEventSubscribe("controllerstatus", makeORef("block", blockId), (event) => { @@ -179,25 +181,19 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string } unsubFn(); }; }, [hasController]); - if (!hasController) { + if (!hasController || !gotInitialStatus) { return null; } - if ( - controllerStatus == null || - (controllerStatus?.status == "running" && controllerStatus?.shellprocstatus == "running") - ) { + if (controllerStatus?.shellprocstatus == "running") { return null; } if (connStatus?.status != "connected") { return null; } const controllerStatusElem = ( - +
+ +
); return controllerStatusElem; }); @@ -206,7 +202,7 @@ export const ConnectionButton = React.memo( React.forwardRef( ({ connection, changeConnModalAtom }: ConnectionButtonProps, ref) => { const [connModalOpen, setConnModalOpen] = jotai.useAtom(changeConnModalAtom); - const isLocal = util.isBlank(connection) || connection == "local"; + const isLocal = util.isBlank(connection); const connStatusAtom = getConnStatusAtom(connection); const connStatus = jotai.useAtomValue(connStatusAtom); let showDisconnectedSlash = false; diff --git a/frontend/app/store/global.ts b/frontend/app/store/global.ts index 5f0e52206..4bc1c98ef 100644 --- a/frontend/app/store/global.ts +++ b/frontend/app/store/global.ts @@ -23,7 +23,7 @@ let PLATFORM: NodeJS.Platform = "darwin"; const globalStore = jotai.createStore(); let atoms: GlobalAtomsType; let globalEnvironment: "electron" | "renderer"; -const blockViewModelMap = new Map(); +const blockComponentModelMap = new Map(); const Counters = new Map(); const ConnStatusMap = new Map>(); @@ -526,16 +526,16 @@ async function openLink(uri: string, forceOpenInternally = false) { } } -function registerViewModel(blockId: string, viewModel: ViewModel) { - blockViewModelMap.set(blockId, viewModel); +function registerBlockComponentModel(blockId: string, bcm: BlockComponentModel) { + blockComponentModelMap.set(blockId, bcm); } -function unregisterViewModel(blockId: string) { - blockViewModelMap.delete(blockId); +function unregisterBlockComponentModel(blockId: string) { + blockComponentModelMap.delete(blockId); } -function getViewModel(blockId: string): ViewModel { - return blockViewModelMap.get(blockId); +function getBlockComponentModel(blockId: string): BlockComponentModel { + return blockComponentModelMap.get(blockId); } function refocusNode(blockId: string) { @@ -548,8 +548,8 @@ function refocusNode(blockId: string) { return; } layoutModel.focusNode(layoutNodeId.id); - const viewModel = getViewModel(blockId); - const ok = viewModel?.giveFocus?.(); + const bcm = getBlockComponentModel(blockId); + const ok = bcm?.viewModel?.giveFocus?.(); if (!ok) { const inputElem = document.getElementById(`${blockId}-dummy-focus`); inputElem?.focus(); @@ -604,14 +604,26 @@ function subscribeToConnEvents() { function getConnStatusAtom(conn: string): jotai.PrimitiveAtom { let rtn = ConnStatusMap.get(conn); if (rtn == null) { - const connStatus: ConnStatus = { - connection: conn, - connected: false, - error: null, - status: "disconnected", - hasconnected: false, - }; - rtn = jotai.atom(connStatus); + if (util.isBlank(conn)) { + // create a fake "local" status atom that's always connected + const connStatus: ConnStatus = { + connection: conn, + connected: true, + error: null, + status: "connected", + hasconnected: true, + }; + rtn = jotai.atom(connStatus); + } else { + const connStatus: ConnStatus = { + connection: conn, + connected: false, + error: null, + status: "disconnected", + hasconnected: false, + }; + rtn = jotai.atom(connStatus); + } ConnStatusMap.set(conn, rtn); } return rtn; @@ -625,13 +637,13 @@ export { createBlock, fetchWaveFile, getApi, + getBlockComponentModel, getConnStatusAtom, getEventORefSubject, getEventSubject, getFileSubject, getObjectId, getUserName, - getViewModel, globalStore, globalWS, initGlobal, @@ -641,12 +653,12 @@ export { openLink, PLATFORM, refocusNode, - registerViewModel, + registerBlockComponentModel, sendWSCommand, setNodeFocus, setPlatform, subscribeToConnEvents, - unregisterViewModel, + unregisterBlockComponentModel, useBlockAtom, useBlockCache, useBlockDataLoaded, diff --git a/frontend/app/store/keymodel.ts b/frontend/app/store/keymodel.ts index 9d96fdaa4..0aafbe174 100644 --- a/frontend/app/store/keymodel.ts +++ b/frontend/app/store/keymodel.ts @@ -1,7 +1,7 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -import { atoms, createBlock, getApi, getViewModel, globalStore, refocusNode, WOS } from "@/app/store/global"; +import { atoms, createBlock, getApi, getBlockComponentModel, globalStore, refocusNode, WOS } from "@/app/store/global"; import * as services from "@/app/store/services"; import { deleteLayoutModelForTab, @@ -160,7 +160,14 @@ function appHandleKeyDown(waveEvent: WaveKeyboardEvent): boolean { const focusedNode = globalStore.get(layoutModel.focusedNode); const blockId = focusedNode?.data?.blockId; if (blockId != null && shouldDispatchToBlock(waveEvent)) { - const viewModel = getViewModel(blockId); + const bcm = getBlockComponentModel(blockId); + if (bcm.openSwitchConnection != null) { + if (keyutil.checkKeyPressed(waveEvent, "Cmd:g")) { + bcm.openSwitchConnection(); + return true; + } + } + const viewModel = bcm?.viewModel; if (viewModel?.keyDownHandler) { const handledByBlock = viewModel.keyDownHandler(waveEvent); if (handledByBlock) { diff --git a/frontend/app/store/services.ts b/frontend/app/store/services.ts index 915db790c..78e07d166 100644 --- a/frontend/app/store/services.ts +++ b/frontend/app/store/services.ts @@ -7,9 +7,6 @@ import * as WOS from "./wos"; // blockservice.BlockService (block) class BlockServiceType { - EnsureConnection(arg2: string): Promise { - return WOS.callBackendService("block", "EnsureConnection", Array.from(arguments)) - } GetControllerStatus(arg2: string): Promise { return WOS.callBackendService("block", "GetControllerStatus", Array.from(arguments)) } diff --git a/frontend/app/store/wshserver.ts b/frontend/app/store/wshserver.ts index d42375a78..1c418ce87 100644 --- a/frontend/app/store/wshserver.ts +++ b/frontend/app/store/wshserver.ts @@ -47,9 +47,14 @@ class WshServerType { return WOS.wshServerRpcHelper_call("controllerinput", data, opts); } - // command "controllerrestart" [call] - ControllerRestartCommand(data: CommandBlockRestartData, opts?: RpcOpts): Promise { - return WOS.wshServerRpcHelper_call("controllerrestart", data, opts); + // command "controllerresync" [call] + ControllerResyncCommand(data: CommandControllerResyncData, opts?: RpcOpts): Promise { + return WOS.wshServerRpcHelper_call("controllerresync", data, opts); + } + + // command "controllerstop" [call] + ControllerStopCommand(data: string, opts?: RpcOpts): Promise { + return WOS.wshServerRpcHelper_call("controllerstop", data, opts); } // command "createblock" [call] diff --git a/frontend/app/view/cpuplot/cpuplot.tsx b/frontend/app/view/cpuplot/cpuplot.tsx index 97289620b..95302e44e 100644 --- a/frontend/app/view/cpuplot/cpuplot.tsx +++ b/frontend/app/view/cpuplot/cpuplot.tsx @@ -3,7 +3,7 @@ import { useHeight } from "@/app/hook/useHeight"; import { useWidth } from "@/app/hook/useWidth"; -import { globalStore, waveEventSubscribe, WOS } from "@/store/global"; +import { getConnStatusAtom, globalStore, waveEventSubscribe, WOS } from "@/store/global"; import { WshServer } from "@/store/wshserver"; import * as util from "@/util/util"; import * as Plot from "@observablehq/plot"; @@ -61,6 +61,7 @@ class CpuPlotViewModel { metrics: jotai.Atom; connection: jotai.Atom; manageConnection: jotai.Atom; + connStatus: jotai.Atom; constructor(blockId: string) { this.viewType = "cpuplot"; @@ -122,6 +123,12 @@ class CpuPlotViewModel { }); this.dataAtom = jotai.atom(this.getDefaultData()); this.loadInitialData(); + this.connStatus = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const connName = blockData?.meta?.connection; + const connAtom = getConnStatusAtom(connName); + return get(connAtom); + }); } async loadInitialData() { @@ -165,18 +172,24 @@ function makeCpuPlotViewModel(blockId: string): CpuPlotViewModel { const plotColors = ["#58C142", "#FFC107", "#FF5722", "#2196F3", "#9C27B0", "#00BCD4", "#FFEB3B", "#795548"]; -function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) { - const containerRef = React.useRef(); - const plotData = jotai.useAtomValue(model.dataAtom); - const addPlotData = jotai.useSetAtom(model.addDataAtom); - const parentHeight = useHeight(containerRef); - const parentWidth = useWidth(containerRef); - const yvals = jotai.useAtomValue(model.metrics); +type CpuPlotViewProps = { + blockId: string; + model: CpuPlotViewModel; +}; + +function CpuPlotView({ model, blockId }: CpuPlotViewProps) { const connName = jotai.useAtomValue(model.connection); const lastConnName = React.useRef(connName); + const connStatus = jotai.useAtomValue(model.connStatus); + const addPlotData = jotai.useSetAtom(model.addDataAtom); + const loading = jotai.useAtomValue(model.loadingAtom); React.useEffect(() => { + if (connStatus?.status != "connected") { + return; + } if (lastConnName.current !== connName) { + lastConnName.current = connName; model.loadInitialData(); } const unsubFn = waveEventSubscribe("sysinfo", connName, (event: WaveEvent) => { @@ -191,6 +204,22 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) { unsubFn(); }; }, [connName]); + React.useEffect(() => {}, [connName]); + if (connStatus?.status != "connected") { + return null; + } + if (loading) { + return null; + } + return ; +} + +const CpuPlotViewInner = React.memo(({ model }: CpuPlotViewProps) => { + const containerRef = React.useRef(); + const plotData = jotai.useAtomValue(model.dataAtom); + const parentHeight = useHeight(containerRef); + const parentWidth = useWidth(containerRef); + const yvals = jotai.useAtomValue(model.metrics); React.useEffect(() => { const marks: Plot.Markish[] = []; @@ -254,6 +283,6 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) { }, [plotData, parentHeight, parentWidth]); return
; -} +}); export { CpuPlotView, CpuPlotViewModel, makeCpuPlotViewModel }; diff --git a/frontend/app/view/preview/preview.tsx b/frontend/app/view/preview/preview.tsx index 8f1048ed1..6b73f3484 100644 --- a/frontend/app/view/preview/preview.tsx +++ b/frontend/app/view/preview/preview.tsx @@ -7,7 +7,7 @@ import { tryReinjectKey } from "@/app/store/keymodel"; import { WshServer } from "@/app/store/wshserver"; import { Markdown } from "@/element/markdown"; import { NodeModel } from "@/layout/index"; -import { createBlock, globalStore, refocusNode } from "@/store/global"; +import { createBlock, getConnStatusAtom, globalStore, refocusNode } from "@/store/global"; import * as services from "@/store/services"; import * as WOS from "@/store/wos"; import { getWebServerEndpoint } from "@/util/endpoints"; @@ -98,6 +98,7 @@ export class PreviewModel implements ViewModel { specializedView: jotai.Atom>; loadableSpecializedView: jotai.Atom>; manageConnection: jotai.Atom; + connStatus: jotai.Atom; metaFilePath: jotai.Atom; statFilePath: jotai.Atom>; @@ -146,10 +147,15 @@ export class PreviewModel implements ViewModel { this.monacoRef = createRef(); this.viewIcon = jotai.atom((get) => { const blockData = get(this.blockAtom); - const mimeTypeLoadable = get(this.fileMimeTypeLoadable); if (blockData?.meta?.icon) { return blockData.meta.icon; } + const connStatus = get(this.connStatus); + if (connStatus?.status != "connected") { + return null; + } + const fileName = get(this.metaFilePath); + const mimeTypeLoadable = get(this.fileMimeTypeLoadable); const mimeType = util.jotaiLoadableValue(mimeTypeLoadable, ""); if (mimeType == "directory") { return { @@ -189,9 +195,19 @@ export class PreviewModel implements ViewModel { }); this.viewName = jotai.atom("Preview"); this.viewText = jotai.atom((get) => { + let headerPath = get(this.metaFilePath); + const connStatus = get(this.connStatus); + if (connStatus?.status != "connected") { + return [ + { + elemtype: "text", + text: headerPath, + className: "preview-filename", + }, + ]; + } const loadableSV = get(this.loadableSpecializedView); const isCeView = loadableSV.state == "hasData" && loadableSV.data.specializedView == "codeedit"; - let headerPath = get(this.metaFilePath); const loadableFileInfo = get(this.loadableFileInfo); if (loadableFileInfo.state == "hasData") { headerPath = loadableFileInfo.data?.path; @@ -248,6 +264,10 @@ export class PreviewModel implements ViewModel { ] as HeaderElem[]; }); this.preIconButton = jotai.atom((get) => { + const connStatus = get(this.connStatus); + if (connStatus?.status != "connected") { + return null; + } const mimeType = util.jotaiLoadableValue(get(this.fileMimeTypeLoadable), ""); if (mimeType == "directory") { return null; @@ -259,6 +279,10 @@ export class PreviewModel implements ViewModel { }; }); this.endIconButtons = jotai.atom((get) => { + const connStatus = get(this.connStatus); + if (connStatus?.status != "connected") { + return null; + } const mimeType = util.jotaiLoadableValue(get(this.fileMimeTypeLoadable), ""); const loadableSV = get(this.loadableSpecializedView); const isCeView = loadableSV.state == "hasData" && loadableSV.data.specializedView == "codeedit"; @@ -356,6 +380,12 @@ export class PreviewModel implements ViewModel { this.loadableSpecializedView = loadable(this.specializedView); this.canPreview = jotai.atom(false); this.loadableFileInfo = loadable(this.statFile); + this.connStatus = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const connName = blockData?.meta?.connection; + const connAtom = getConnStatusAtom(connName); + return get(connAtom); + }); } markdownShowTocToggle() { @@ -831,6 +861,10 @@ function PreviewView({ contentRef: React.RefObject; model: PreviewModel; }) { + const connStatus = jotai.useAtomValue(model.connStatus); + if (connStatus?.status != "connected") { + return null; + } return ( <> diff --git a/frontend/app/view/term/term.tsx b/frontend/app/view/term/term.tsx index 0c1d424a3..3e15890a8 100644 --- a/frontend/app/view/term/term.tsx +++ b/frontend/app/view/term/term.tsx @@ -3,7 +3,7 @@ import { WshServer } from "@/app/store/wshserver"; import { VDomView } from "@/app/view/term/vdom"; -import { WOS, atoms, getEventORefSubject, globalStore, useSettingsPrefixAtom } from "@/store/global"; +import { WOS, atoms, getConnStatusAtom, getEventORefSubject, globalStore, useSettingsPrefixAtom } from "@/store/global"; import * as services from "@/store/services"; import * as keyutil from "@/util/keyutil"; import * as util from "@/util/util"; @@ -108,6 +108,7 @@ class TermViewModel { viewName: jotai.Atom; blockBg: jotai.Atom; manageConnection: jotai.Atom; + connStatus: jotai.Atom; constructor(blockId: string) { this.viewType = "term"; @@ -142,10 +143,12 @@ class TermViewModel { } return null; }); - } - - resetConnection() { - WshServer.ControllerRestartCommand({ blockid: this.blockId }); + this.connStatus = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const connName = blockData?.meta?.connection; + const connAtom = getConnStatusAtom(connName); + return get(connAtom); + }); } giveFocus(): boolean { @@ -172,21 +175,39 @@ class TermViewModel { const fullConfig = globalStore.get(atoms.fullConfigAtom); const termThemes = fullConfig?.termthemes ?? {}; const termThemeKeys = Object.keys(termThemes); + termThemeKeys.sort((a, b) => { return termThemes[a]["display:order"] - termThemes[b]["display:order"]; }); + const fullMenu: ContextMenuItem[] = []; const submenu: ContextMenuItem[] = termThemeKeys.map((themeName) => { return { label: termThemes[themeName]["display:name"] ?? themeName, click: () => this.setTerminalTheme(themeName), }; }); - return [ - { - label: "Themes", - submenu: submenu, + fullMenu.push({ + label: "Themes", + submenu: submenu, + }); + fullMenu.push({ type: "separator" }); + fullMenu.push({ + label: "Force Restart Controller", + click: () => { + const termsize = { + rows: this.termRef.current?.terminal?.rows, + cols: this.termRef.current?.terminal?.cols, + }; + const prtn = WshServer.ControllerResyncCommand({ + tabid: globalStore.get(atoms.activeTabId), + blockid: this.blockId, + forcerestart: true, + rtopts: { termsize: termsize }, + }); + prtn.catch((e) => console.log("error controller resync (force restart)", e)); }, - ]; + }); + return fullMenu; } } @@ -199,6 +220,28 @@ interface TerminalViewProps { model: TermViewModel; } +const TermResyncHandler = React.memo(({ blockId, model }: TerminalViewProps) => { + const connStatus = jotai.useAtomValue(model.connStatus); + const [lastConnStatus, setLastConnStatus] = React.useState(connStatus); + + React.useEffect(() => { + if (!model.termRef.current?.hasResized) { + return; + } + const isConnected = connStatus?.status == "connected"; + const wasConnected = lastConnStatus?.status == "connected"; + const curConnName = connStatus?.connection; + const lastConnName = lastConnStatus?.connection; + if (isConnected == wasConnected && curConnName == lastConnName) { + return; + } + model.termRef.current?.resyncController("resync handler"); + setLastConnStatus(connStatus); + }, [connStatus]); + + return null; +}); + const TerminalView = ({ blockId, model }: TerminalViewProps) => { const viewRef = React.createRef(); const connectElemRef = React.useRef(null); @@ -257,7 +300,9 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { } if (shellProcStatusRef.current != "running" && keyutil.checkKeyPressed(waveEvent, "Enter")) { // restart - WshServer.ControllerRestartCommand({ blockid: blockId }); + const tabId = globalStore.get(atoms.activeTabId); + const prtn = WshServer.ControllerResyncCommand({ tabid: tabId, blockid: blockId }); + prtn.catch((e) => console.log("error controller resync (enter)", blockId, e)); return false; } return true; @@ -352,6 +397,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => { return (
+
diff --git a/frontend/app/view/term/termwrap.ts b/frontend/app/view/term/termwrap.ts index f5f4c7fba..8be0f2de1 100644 --- a/frontend/app/view/term/termwrap.ts +++ b/frontend/app/view/term/termwrap.ts @@ -2,7 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 import { WshServer } from "@/app/store/wshserver"; -import { PLATFORM, WOS, fetchWaveFile, getFileSubject, openLink, sendWSCommand } from "@/store/global"; +import { + PLATFORM, + WOS, + atoms, + fetchWaveFile, + getFileSubject, + globalStore, + openLink, + sendWSCommand, +} from "@/store/global"; import * as services from "@/store/services"; import * as util from "@/util/util"; import { base64ToArray, fireAndForget } from "@/util/util"; @@ -11,9 +20,12 @@ import { WebLinksAddon } from "@xterm/addon-web-links"; import { WebglAddon } from "@xterm/addon-webgl"; import * as TermTypes from "@xterm/xterm"; import { Terminal } from "@xterm/xterm"; +import debug from "debug"; import { debounce } from "throttle-debounce"; import { FitAddon } from "./fitaddon"; +const dlog = debug("wave:termwrap"); + const TermFileName = "term"; const TermCacheFileName = "cache:term:full"; @@ -49,6 +61,7 @@ export class TermWrap { heldData: Uint8Array[]; handleResize_debounced: () => void; isRunning: boolean; + hasResized: boolean; constructor( blockId: string, @@ -60,13 +73,13 @@ export class TermWrap { this.blockId = blockId; this.ptyOffset = 0; this.dataBytesProcessed = 0; + this.hasResized = false; this.terminal = new Terminal(options); this.fitAddon = new FitAddon(); this.fitAddon.noScrollbar = PLATFORM == "darwin"; this.serializeAddon = new SerializeAddon(); this.terminal.loadAddon(this.fitAddon); this.terminal.loadAddon(this.serializeAddon); - this.terminal.loadAddon( new WebLinksAddon((e, uri) => { e.preventDefault(); @@ -208,18 +221,35 @@ export class TermWrap { } } + async resyncController(reason: string) { + dlog("resync controller", this.blockId, reason); + const tabId = globalStore.get(atoms.activeTabId); + const rtOpts: RuntimeOpts = { termsize: { rows: this.terminal.rows, cols: this.terminal.cols } }; + try { + await WshServer.ControllerResyncCommand({ tabid: tabId, blockid: this.blockId, rtopts: rtOpts }); + } catch (e) { + console.log(`error controller resync (${reason})`, this.blockId, e); + } + } + handleResize() { const oldRows = this.terminal.rows; const oldCols = this.terminal.cols; this.fitAddon.fit(); if (oldRows !== this.terminal.rows || oldCols !== this.terminal.cols) { + const termSize: TermSize = { rows: this.terminal.rows, cols: this.terminal.cols }; const wsCommand: SetBlockTermSizeWSCommand = { wscommand: "setblocktermsize", blockid: this.blockId, - termsize: { rows: this.terminal.rows, cols: this.terminal.cols }, + termsize: termSize, }; sendWSCommand(wsCommand); } + dlog("resize", `${this.terminal.rows}x${this.terminal.cols}`, `${oldRows}x${oldCols}`, this.hasResized); + if (!this.hasResized) { + this.hasResized = true; + this.resyncController("initial resize"); + } } processAndCacheData() { diff --git a/frontend/types/custom.d.ts b/frontend/types/custom.d.ts index bbde56bef..efc58280e 100644 --- a/frontend/types/custom.d.ts +++ b/frontend/types/custom.d.ts @@ -240,6 +240,11 @@ declare global { version: string; buildTime: number; } + + type BlockComponentModel = { + openSwitchConnection?: () => void; + viewModel: ViewModel; + }; } export {}; diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 324ec0cfe..745a29ab2 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -15,8 +15,8 @@ declare global { // blockcontroller.BlockControllerRuntimeStatus type BlockControllerRuntimeStatus = { blockid: string; - status: string; shellprocstatus?: string; + shellprocconnname?: string; }; // waveobj.BlockDef @@ -58,17 +58,20 @@ declare global { termsize?: TermSize; }; - // wshrpc.CommandBlockRestartData - type CommandBlockRestartData = { - blockid: string; - }; - // wshrpc.CommandBlockSetViewData type CommandBlockSetViewData = { blockid: string; view: string; }; + // wshrpc.CommandControllerResyncData + type CommandControllerResyncData = { + forcerestart?: boolean; + tabid: string; + blockid: string; + rtopts?: RuntimeOpts; + }; + // wshrpc.CommandCreateBlockData type CommandCreateBlockData = { tabid: string; diff --git a/frontend/wave.ts b/frontend/wave.ts index 007fc5e50..efbcdc4a0 100644 --- a/frontend/wave.ts +++ b/frontend/wave.ts @@ -72,7 +72,10 @@ document.addEventListener("DOMContentLoaded", async () => { const fullConfig = await services.FileService.GetFullConfig(); console.log("fullconfig", fullConfig); globalStore.set(atoms.fullConfigAtom, fullConfig); - services.ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait + const prtn = services.ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait + prtn.catch((e) => { + console.log("error on initial SetActiveTab", e); + }); const reactElem = React.createElement(App, null, null); const elem = document.getElementById("main"); const root = createRoot(elem); diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 1b116ee53..8156f8317 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "encoding/base64" - "encoding/json" "fmt" "io" "io/fs" @@ -39,7 +38,6 @@ const ( ) const ( - Status_Init = "init" Status_Running = "running" Status_Done = "done" ) @@ -66,18 +64,16 @@ type BlockController struct { TabId string BlockId string BlockDef *waveobj.BlockDef - Status string CreatedHtmlFile bool ShellProc *shellexec.ShellProc ShellInputCh chan *BlockInputUnion ShellProcStatus string - StopCh chan bool } type BlockControllerRuntimeStatus struct { - BlockId string `json:"blockid"` - Status string `json:"status"` - ShellProcStatus string `json:"shellprocstatus,omitempty"` + BlockId string `json:"blockid"` + ShellProcStatus string `json:"shellprocstatus,omitempty"` + ShellProcConnName string `json:"shellprocconnname,omitempty"` } func (bc *BlockController) WithLock(f func()) { @@ -90,25 +86,14 @@ func (bc *BlockController) GetRuntimeStatus() *BlockControllerRuntimeStatus { var rtn BlockControllerRuntimeStatus bc.WithLock(func() { rtn.BlockId = bc.BlockId - rtn.Status = bc.Status rtn.ShellProcStatus = bc.ShellProcStatus + if bc.ShellProc != nil { + rtn.ShellProcConnName = bc.ShellProc.ConnName + } }) return &rtn } -func jsonDeepCopy(val map[string]any) (map[string]any, error) { - barr, err := json.Marshal(val) - if err != nil { - return nil, err - } - var rtn map[string]any - err = json.Unmarshal(barr, &rtn) - if err != nil { - return nil, err - } - return rtn, nil -} - func (bc *BlockController) getShellProc() *shellexec.ShellProc { bc.Lock.Lock() defer bc.Lock.Unlock() @@ -136,6 +121,7 @@ func (bc *BlockController) UpdateControllerAndSendUpdate(updateFn func() bool) { Event: wshrpc.Event_ControllerStatus, Scopes: []string{ waveobj.MakeORef(waveobj.OType_Tab, bc.TabId).String(), + waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(), }, Data: rtStatus, } @@ -228,19 +214,11 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj bc.resetTerminalState() } err = nil - if bc.getShellProc() != nil { + bcInitStatus := bc.GetRuntimeStatus() + if bcInitStatus.ShellProcStatus == Status_Running { return nil } - var shellProcErr error - bc.WithLock(func() { - if bc.ShellProc != nil { - shellProcErr = fmt.Errorf("shell process already running") - return - } - }) - if shellProcErr != nil { - return shellProcErr - } + // TODO better sync here (don't let two starts happen at the same times) remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "") var cmdStr string cmdOpts := shellexec.CommandOptsType{ @@ -288,10 +266,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj if err != nil { return err } - conn := conncontroller.GetConn(credentialCtx, opts, true) + conn := conncontroller.GetConn(credentialCtx, opts, false) connStatus := conn.DeriveConnStatus() - if connStatus.Error != "" { - return fmt.Errorf("error connecting to remote: %s", connStatus.Error) + if connStatus.Status != conncontroller.Status_Connected { + return fmt.Errorf("not connected, cannot start shellproc") } if !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) { jwtStr, err := wshutil.MakeClientJWTToken(wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()}, conn.GetDomainSocketName()) @@ -300,7 +278,7 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj } cmdOpts.Env[wshutil.WaveJwtTokenVarName] = jwtStr } - shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn.Client) + shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn) if err != nil { return err } @@ -335,12 +313,14 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj // handles regular output from the pty (goes to the blockfile and xterm) defer func() { log.Printf("[shellproc] pty-read loop done\n") - - // needs synchronization bc.ShellProc.Close() - close(bc.ShellInputCh) - bc.ShellProc = nil - bc.ShellInputCh = nil + bc.WithLock(func() { + // so no other events are sent + bc.ShellInputCh = nil + }) + // to stop the inputCh loop + time.Sleep(100 * time.Millisecond) + close(shellInputCh) // don't use bc.ShellInputCh (it's nil) }() buf := make([]byte, 4096) for { @@ -365,6 +345,7 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj log.Printf("[shellproc] shellInputCh loop done\n") }() // handles input from the shellInputCh, sent to pty + // use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated) for ic := range shellInputCh { if len(ic.InputData) > 0 { bc.ShellProc.Cmd.Write(ic.InputData) @@ -446,23 +427,7 @@ func setTermSize(ctx context.Context, blockId string, termSize waveobj.TermSize) return nil } -func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any) { - defer func() { - bc.UpdateControllerAndSendUpdate(func() bool { - if bc.Status == Status_Running { - bc.Status = Status_Done - return true - } - return false - }) - globalLock.Lock() - defer globalLock.Unlock() - delete(blockControllerMap, bc.BlockId) - }() - bc.UpdateControllerAndSendUpdate(func() bool { - bc.Status = Status_Running - return true - }) +func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts) { controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "") if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { log.Printf("unknown controller %q\n", controllerName) @@ -477,51 +442,136 @@ func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any) { runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true) if runOnStart { go func() { - err := bc.DoRunShellCommand(&RunShellOpts{TermSize: getTermSize(bdata)}, bdata.Meta) + var termSize waveobj.TermSize + if rtOpts != nil { + termSize = rtOpts.TermSize + } else { + termSize = getTermSize(bdata) + } + err := bc.DoRunShellCommand(&RunShellOpts{TermSize: termSize}, bdata.Meta) if err != nil { log.Printf("error running shell: %v\n", err) } }() } - <-bc.StopCh } func (bc *BlockController) SendInput(inputUnion *BlockInputUnion) error { - if bc.ShellInputCh == nil { + var shellInputCh chan *BlockInputUnion + bc.WithLock(func() { + shellInputCh = bc.ShellInputCh + }) + if shellInputCh == nil { return fmt.Errorf("no shell input chan") } - bc.ShellInputCh <- inputUnion + shellInputCh <- inputUnion return nil } -func (bc *BlockController) RestartController() error { - // kill the command if it's running - bc.Lock.Lock() - if bc.ShellProc != nil { - bc.ShellProc.Close() - } - bc.Lock.Unlock() - - // wait for process to complete - if bc.ShellProc != nil { - doneCh := bc.ShellProc.DoneCh - <-doneCh - } - - // restart controller - bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), bc.BlockId) +func CheckConnStatus(blockId string) error { + bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), blockId) if err != nil { return fmt.Errorf("error getting block: %w", err) } - err = bc.DoRunShellCommand(&RunShellOpts{TermSize: getTermSize(bdata)}, bdata.Meta) + connName := bdata.Meta.GetString(waveobj.MetaKey_Connection, "") + if connName == "" { + return nil + } + opts, err := remote.ParseOpts(connName) if err != nil { - log.Printf("error running shell command: %v\n", err) + return fmt.Errorf("error parsing connection name: %w", err) + } + conn := conncontroller.GetConn(context.Background(), opts, false) + connStatus := conn.DeriveConnStatus() + if connStatus.Status != conncontroller.Status_Connected { + return fmt.Errorf("not connected: %s", connStatus.Status) } return nil } -func StartBlockController(ctx context.Context, tabId string, blockId string) error { - log.Printf("start blockcontroller %q\n", blockId) +func (bc *BlockController) StopShellProc(shouldWait bool) { + bc.Lock.Lock() + defer bc.Lock.Unlock() + if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done { + return + } + bc.ShellProc.Close() + if shouldWait { + doneCh := bc.ShellProc.DoneCh + <-doneCh + } +} + +func getOrCreateBlockController(tabId string, blockId string, controllerName string) *BlockController { + var createdController bool + var bc *BlockController + defer func() { + if !createdController || bc == nil { + return + } + bc.UpdateControllerAndSendUpdate(func() bool { + return true + }) + }() + globalLock.Lock() + defer globalLock.Unlock() + bc = blockControllerMap[blockId] + if bc == nil { + bc = &BlockController{ + Lock: &sync.Mutex{}, + ControllerType: controllerName, + TabId: tabId, + BlockId: blockId, + ShellProcStatus: Status_Done, + } + blockControllerMap[blockId] = bc + createdController = true + } + return bc +} + +func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error { + if tabId == "" || blockId == "" { + return fmt.Errorf("invalid tabId or blockId passed to ResyncController") + } + blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) + if err != nil { + return fmt.Errorf("error getting block: %w", err) + } + connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") + controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "") + curBc := GetBlockController(blockId) + if controllerName == "" { + if curBc != nil { + StopBlockController(blockId) + } + return nil + } + // check if conn is different, if so, stop the current controller + if curBc != nil { + bcStatus := curBc.GetRuntimeStatus() + if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName { + StopBlockController(blockId) + } + } + // now if there is a conn, ensure it is connected + if connName != "" { + err = CheckConnStatus(blockId) + if err != nil { + return fmt.Errorf("cannot start shellproc: %w", err) + } + } + if curBc == nil { + return startBlockController(ctx, tabId, blockId, rtOpts) + } + bcStatus := curBc.GetRuntimeStatus() + if bcStatus.ShellProcStatus != Status_Running { + return startBlockController(ctx, tabId, blockId, rtOpts) + } + return nil +} + +func startBlockController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error { blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) if err != nil { return fmt.Errorf("error getting block: %w", err) @@ -534,23 +584,17 @@ func StartBlockController(ctx context.Context, tabId string, blockId string) err if controllerName != BlockController_Shell && controllerName != BlockController_Cmd { return fmt.Errorf("unknown controller %q", controllerName) } - globalLock.Lock() - defer globalLock.Unlock() - if _, ok := blockControllerMap[blockId]; ok { - // already running - return nil + connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") + log.Printf("start blockcontroller %s %q (%q)\n", blockId, controllerName, connName) + err = CheckConnStatus(blockId) + if err != nil { + return fmt.Errorf("cannot start shellproc: %w", err) } - bc := &BlockController{ - Lock: &sync.Mutex{}, - ControllerType: controllerName, - TabId: tabId, - BlockId: blockId, - Status: Status_Init, - ShellProcStatus: Status_Init, - StopCh: make(chan bool), + bc := getOrCreateBlockController(tabId, blockId, controllerName) + bcStatus := bc.GetRuntimeStatus() + if bcStatus.ShellProcStatus == Status_Done { + go bc.run(blockData, blockData.Meta, rtOpts) } - blockControllerMap[blockId] = bc - go bc.run(blockData, blockData.Meta) return nil } @@ -561,8 +605,32 @@ func StopBlockController(blockId string) { } if bc.getShellProc() != nil { bc.ShellProc.Close() + <-bc.ShellProc.DoneCh + bc.UpdateControllerAndSendUpdate(func() bool { + bc.ShellProcStatus = Status_Done + return true + }) + } + +} + +func getControllerList() []*BlockController { + globalLock.Lock() + defer globalLock.Unlock() + var rtn []*BlockController + for _, bc := range blockControllerMap { + rtn = append(rtn, bc) + } + return rtn +} + +func StopAllBlockControllers() { + clist := getControllerList() + for _, bc := range clist { + if bc.ShellProcStatus == Status_Running { + go StopBlockController(bc.BlockId) + } } - close(bc.StopCh) } func GetBlockController(blockId string) *BlockController { diff --git a/pkg/remote/conncontroller/conncontroller.go b/pkg/remote/conncontroller/conncontroller.go index 93271cf53..e5d7ac738 100644 --- a/pkg/remote/conncontroller/conncontroller.go +++ b/pkg/remote/conncontroller/conncontroller.go @@ -25,7 +25,6 @@ import ( "github.com/wavetermdev/thenextwave/pkg/util/shellutil" "github.com/wavetermdev/thenextwave/pkg/util/utilfn" "github.com/wavetermdev/thenextwave/pkg/wavebase" - "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wps" "github.com/wavetermdev/thenextwave/pkg/wshrpc" "github.com/wavetermdev/thenextwave/pkg/wshutil" @@ -40,6 +39,8 @@ const ( Status_Error = "error" ) +const DefaultConnectionTimeout = 60 * time.Second + var globalLock = &sync.Mutex{} var clientControllerMap = make(map[remote.SSHOpts]*SSHConn) @@ -163,7 +164,7 @@ func (conn *SSHConn) OpenDomainSocketListener() error { return fmt.Errorf("error generating random string: %w", err) } sockName := fmt.Sprintf("/tmp/waveterm-%s.sock", randStr) - log.Printf("remote domain socket %s %q\n", conn.GetName(), sockName) + log.Printf("remote domain socket %s %q\n", conn.GetName(), conn.GetDomainSocketName()) listener, err := client.ListenUnix(sockName) if err != nil { return fmt.Errorf("unable to request connection domain socket: %v", err) @@ -251,6 +252,13 @@ func (conn *SSHConn) StartConnServer() error { log.Printf("[conncontroller:%s] error reading output: %v\n", conn.GetName(), readErr) } }() + regCtx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + err = wshutil.DefaultRouter.WaitForRegister(regCtx, wshutil.MakeConnectionRouteId(rpcCtx.Conn)) + if err != nil { + return fmt.Errorf("timeout waiting for connserver to register") + } + time.Sleep(300 * time.Millisecond) // TODO remove this sleep (but we need to wait until connserver is "ready") return nil } @@ -373,6 +381,7 @@ func (conn *SSHConn) Connect(ctx context.Context) error { connectAllowed = true } }) + log.Printf("Connect %s\n", conn.GetName()) if !connectAllowed { return fmt.Errorf("cannot connect to %q when status is %q", conn.GetName(), conn.GetStatus()) } @@ -467,45 +476,31 @@ func GetConn(ctx context.Context, opts *remote.SSHOpts, shouldConnect bool) *SSH } // Convenience function for ensuring a connection is established -func EnsureConnection(ctx context.Context, blockData *waveobj.Block) error { - connectionName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "") - if connectionName == "" { +func EnsureConnection(ctx context.Context, connName string) error { + if connName == "" { return nil } - credentialCtx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Second) - defer cancelFunc() - - opts, err := remote.ParseOpts(connectionName) + connOpts, err := remote.ParseOpts(connName) if err != nil { - return err + return fmt.Errorf("error parsing connection name: %w", err) } - conn := GetConn(credentialCtx, opts, true) - statusChan := make(chan string, 1) - go func() { - // we need to wait for connected/disconnected/error - // to ensure the connection has been established before - // continuing in the original thread - for { - // GetStatus has a lock which makes this reasonable to loop over - status := conn.GetStatus() - if credentialCtx.Err() != nil { - // prevent infinite loop from context - statusChan <- Status_Error - return - } - if status == Status_Connected || status == Status_Disconnected || status == Status_Error { - statusChan <- status - return - } - } - }() - status := <-statusChan - if status == Status_Error { - return fmt.Errorf("connection error: %v", conn.Error) - } else if status == Status_Disconnected { - return fmt.Errorf("disconnected: %v", conn.Error) + conn := GetConn(ctx, connOpts, false) + if conn == nil { + return fmt.Errorf("connection not found: %s", connName) + } + connStatus := conn.DeriveConnStatus() + switch connStatus.Status { + case Status_Connected: + return nil + case Status_Connecting: + return conn.WaitForConnect(ctx) + case Status_Init, Status_Disconnected: + return conn.Connect(ctx) + case Status_Error: + return fmt.Errorf("connection error: %s", connStatus.Error) + default: + return fmt.Errorf("unknown connection status %q", connStatus.Status) } - return nil } func DisconnectClient(opts *remote.SSHOpts) error { diff --git a/pkg/remote/connutil.go b/pkg/remote/connutil.go index ab0d1d578..1cdf59ed4 100644 --- a/pkg/remote/connutil.go +++ b/pkg/remote/connutil.go @@ -292,9 +292,6 @@ func CpHostToRemote(client *ssh.Client, sourcePath string, destPath string) erro func InstallClientRcFiles(client *ssh.Client) error { path := GetWshPath(client) log.Printf("path to wsh searched is: %s", path) - log.Printf("in bytes is: %v", []byte(path)) - log.Printf("in bytes expected would be: %v", []byte("~/.waveterm/bin/wsh")) - session, err := client.NewSession() if err != nil { // this is a true error that should stop further progress diff --git a/pkg/service/blockservice/blockservice.go b/pkg/service/blockservice/blockservice.go index 0555e0c34..68163c380 100644 --- a/pkg/service/blockservice/blockservice.go +++ b/pkg/service/blockservice/blockservice.go @@ -11,7 +11,6 @@ import ( "github.com/wavetermdev/thenextwave/pkg/blockcontroller" "github.com/wavetermdev/thenextwave/pkg/filestore" - "github.com/wavetermdev/thenextwave/pkg/remote/conncontroller" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wshrpc" @@ -34,10 +33,7 @@ func (bs *BlockService) SendCommand_Meta() tsgenmeta.MethodMeta { func (bs *BlockService) GetControllerStatus(ctx context.Context, blockId string) (*blockcontroller.BlockControllerRuntimeStatus, error) { bc := blockcontroller.GetBlockController(blockId) if bc == nil { - return &blockcontroller.BlockControllerRuntimeStatus{ - BlockId: blockId, - Status: "stopped", - }, nil + return nil, nil } return bc.GetRuntimeStatus(), nil } @@ -84,11 +80,3 @@ func (bs *BlockService) SaveWaveAiData(ctx context.Context, blockId string, hist } return nil } - -func (bs *BlockService) EnsureConnection(ctx context.Context, blockId string) error { - block, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - return err - } - return conncontroller.EnsureConnection(ctx, block) -} diff --git a/pkg/service/objectservice/objectservice.go b/pkg/service/objectservice/objectservice.go index 365f623d7..e7bab8727 100644 --- a/pkg/service/objectservice/objectservice.go +++ b/pkg/service/objectservice/objectservice.go @@ -6,12 +6,9 @@ package objectservice import ( "context" "fmt" - "log" "strings" "time" - "github.com/wavetermdev/thenextwave/pkg/blockcontroller" - "github.com/wavetermdev/thenextwave/pkg/remote/conncontroller" "github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wcore" @@ -22,6 +19,7 @@ import ( type ObjectService struct{} const DefaultTimeout = 2 * time.Second +const ConnContextTimeout = 60 * time.Second func parseORef(oref string) (*waveobj.ORef, error) { fields := strings.Split(oref, ":") @@ -133,22 +131,6 @@ func (svc *ObjectService) SetActiveTab(uiContext waveobj.UIContext, tabId string if err != nil { return nil, fmt.Errorf("error getting tab: %w", err) } - for _, blockId := range tab.BlockIds { - blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) - if err != nil { - return nil, fmt.Errorf("error getting block: %w", err) - } - err = conncontroller.EnsureConnection(ctx, blockData) - if err != nil { - return nil, fmt.Errorf("unable to ensure connection: %v", err) - } - blockErr := blockcontroller.StartBlockController(ctx, tabId, blockId) - if blockErr != nil { - // we don't want to fail the set active tab operation if a block controller fails to start - log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr) - continue - } - } blockORefs := tab.GetBlockORefs() blocks, err := wstore.DBSelectORefs(ctx, blockORefs) if err != nil { diff --git a/pkg/shellexec/conninterface.go b/pkg/shellexec/conninterface.go index a2d5a7dd1..e601f8e1d 100644 --- a/pkg/shellexec/conninterface.go +++ b/pkg/shellexec/conninterface.go @@ -2,7 +2,9 @@ package shellexec import ( "io" + "os" "os/exec" + "time" "github.com/creack/pty" "golang.org/x/crypto/ssh" @@ -10,6 +12,7 @@ import ( type ConnInterface interface { Kill() + KillGraceful(time.Duration) Wait() error Start() error StdinPipe() (io.WriteCloser, error) @@ -32,6 +35,22 @@ func (cw CmdWrap) Wait() error { return cw.Cmd.Wait() } +func (cw CmdWrap) KillGraceful(timeout time.Duration) { + if cw.Cmd.Process == nil { + return + } + if cw.Cmd.ProcessState != nil && cw.Cmd.ProcessState.Exited() { + return + } + cw.Cmd.Process.Signal(os.Interrupt) + go func() { + time.Sleep(timeout) + if cw.Cmd.ProcessState == nil || !cw.Cmd.ProcessState.Exited() { + cw.Cmd.Process.Kill() // force kill if it is already not exited + } + }() +} + func (cw CmdWrap) Start() error { defer func() { for _, extraFile := range cw.Cmd.ExtraFiles { @@ -75,6 +94,10 @@ func (sw SessionWrap) Kill() { sw.Session.Close() } +func (sw SessionWrap) KillGraceful(timeout time.Duration) { + sw.Kill() +} + func (sw SessionWrap) Wait() error { return sw.Session.Wait() } diff --git a/pkg/shellexec/shellexec.go b/pkg/shellexec/shellexec.go index 056d98e66..f23050100 100644 --- a/pkg/shellexec/shellexec.go +++ b/pkg/shellexec/shellexec.go @@ -15,16 +15,19 @@ import ( "strings" "sync" "syscall" + "time" "github.com/creack/pty" "github.com/wavetermdev/thenextwave/pkg/remote" + "github.com/wavetermdev/thenextwave/pkg/remote/conncontroller" "github.com/wavetermdev/thenextwave/pkg/util/shellutil" "github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wshutil" - "golang.org/x/crypto/ssh" ) +const DefaultGracefulKillWait = 400 * time.Millisecond + type CommandOptsType struct { Interactive bool `json:"interactive,omitempty"` Login bool `json:"login,omitempty"` @@ -33,6 +36,7 @@ type CommandOptsType struct { } type ShellProc struct { + ConnName string Cmd ConnInterface CloseOnce *sync.Once DoneCh chan any // closed after proc.Wait() returns @@ -40,7 +44,7 @@ type ShellProc struct { } func (sp *ShellProc) Close() { - sp.Cmd.Kill() + sp.Cmd.KillGraceful(DefaultGracefulKillWait) go func() { waitErr := sp.Cmd.Wait() sp.SetWaitErrorAndSignalDone(waitErr) @@ -134,7 +138,8 @@ func (pp *PipePty) WriteString(s string) (n int, err error) { return pp.Write([]byte(s)) } -func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts CommandOptsType, client *ssh.Client) (*ShellProc, error) { +func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts CommandOptsType, conn *conncontroller.SSHConn) (*ShellProc, error) { + client := conn.GetClient() shellPath, err := remote.DetectShell(client) if err != nil { return nil, err @@ -244,7 +249,7 @@ func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts Comm pipePty.Close() return nil, err } - return &ShellProc{Cmd: sessionWrap, CloseOnce: &sync.Once{}, DoneCh: make(chan any)}, nil + return &ShellProc{Cmd: sessionWrap, ConnName: conn.GetName(), CloseOnce: &sync.Once{}, DoneCh: make(chan any)}, nil } func isZshShell(shellPath string) bool { diff --git a/pkg/wcore/wcore.go b/pkg/wcore/wcore.go index cb4d9e881..c817f49bb 100644 --- a/pkg/wcore/wcore.go +++ b/pkg/wcore/wcore.go @@ -11,7 +11,6 @@ import ( "github.com/google/uuid" "github.com/wavetermdev/thenextwave/pkg/blockcontroller" - "github.com/wavetermdev/thenextwave/pkg/remote/conncontroller" "github.com/wavetermdev/thenextwave/pkg/waveobj" "github.com/wavetermdev/thenextwave/pkg/wps" "github.com/wavetermdev/thenextwave/pkg/wshrpc" @@ -24,6 +23,7 @@ import ( // TODO bring Tx infra into wcore const DefaultTimeout = 2 * time.Second +const DefaultActivateBlockTimeout = 60 * time.Second func DeleteBlock(ctx context.Context, tabId string, blockId string) error { err := wstore.DeleteBlock(ctx, tabId, blockId) @@ -174,17 +174,5 @@ func CreateBlock(ctx context.Context, tabId string, blockDef *waveobj.BlockDef, if err != nil { return nil, fmt.Errorf("error creating block: %w", err) } - err = conncontroller.EnsureConnection(ctx, blockData) - if err != nil { - return nil, fmt.Errorf("unable to ensure connection: %v", err) - } - controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "") - if controllerName != "" { - err = blockcontroller.StartBlockController(ctx, tabId, blockData.OID) - if err != nil { - return nil, fmt.Errorf("error starting block controller: %w", err) - } - } - return blockData, nil } diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 8e66c51ca..fa3fc7e06 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -60,9 +60,15 @@ func ControllerInputCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockInputData return err } -// command "controllerrestart", wshserver.ControllerRestartCommand -func ControllerRestartCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockRestartData, opts *wshrpc.RpcOpts) error { - _, err := sendRpcRequestCallHelper[any](w, "controllerrestart", data, opts) +// command "controllerresync", wshserver.ControllerResyncCommand +func ControllerResyncCommand(w *wshutil.WshRpc, data wshrpc.CommandControllerResyncData, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "controllerresync", data, opts) + return err +} + +// command "controllerstop", wshserver.ControllerStopCommand +func ControllerStopCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "controllerstop", data, opts) return err } diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 88d9714f9..a6d3dd15e 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -41,6 +41,8 @@ const ( Command_SetView = "setview" Command_ControllerInput = "controllerinput" Command_ControllerRestart = "controllerrestart" + Command_ControllerStop = "controllerstop" + Command_ControllerResync = "controllerresync" Command_FileAppend = "fileappend" Command_FileAppendIJson = "fileappendijson" Command_ResolveIds = "resolveids" @@ -84,7 +86,8 @@ type WshRpcInterface interface { SetMetaCommand(ctx context.Context, data CommandSetMetaData) error SetViewCommand(ctx context.Context, data CommandBlockSetViewData) error ControllerInputCommand(ctx context.Context, data CommandBlockInputData) error - ControllerRestartCommand(ctx context.Context, data CommandBlockRestartData) error + ControllerStopCommand(ctx context.Context, blockId string) error + ControllerResyncCommand(ctx context.Context, data CommandControllerResyncData) error FileAppendCommand(ctx context.Context, data CommandFileData) error FileAppendIJsonCommand(ctx context.Context, data CommandAppendIJsonData) error ResolveIdsCommand(ctx context.Context, data CommandResolveIdsData) (CommandResolveIdsRtnData, error) @@ -217,8 +220,11 @@ type CommandBlockSetViewData struct { View string `json:"view"` } -type CommandBlockRestartData struct { - BlockId string `json:"blockid" wshcontext:"BlockId"` +type CommandControllerResyncData struct { + ForceRestart bool `json:"forcerestart,omitempty"` + TabId string `json:"tabid" wshcontext:"TabId"` + BlockId string `json:"blockid" wshcontext:"BlockId"` + RtOpts *waveobj.RuntimeOpts `json:"rtopts,omitempty"` } type CommandBlockInputData struct { diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index c9361be7a..e0a12005a 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -284,12 +284,20 @@ func (ws *WshServer) SetViewCommand(ctx context.Context, data wshrpc.CommandBloc return nil } -func (ws *WshServer) ControllerRestartCommand(ctx context.Context, data wshrpc.CommandBlockRestartData) error { - bc := blockcontroller.GetBlockController(data.BlockId) +func (ws *WshServer) ControllerStopCommand(ctx context.Context, blockId string) error { + bc := blockcontroller.GetBlockController(blockId) if bc == nil { - return fmt.Errorf("block controller not found for block %q", data.BlockId) + return nil } - return bc.RestartController() + bc.StopShellProc(true) + return nil +} + +func (ws *WshServer) ControllerResyncCommand(ctx context.Context, data wshrpc.CommandControllerResyncData) error { + if data.ForceRestart { + blockcontroller.StopBlockController(data.BlockId) + } + return blockcontroller.ResyncController(ctx, data.TabId, data.BlockId, data.RtOpts) } func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.CommandBlockInputData) error { @@ -476,27 +484,7 @@ func (ws *WshServer) ConnStatusCommand(ctx context.Context) ([]wshrpc.ConnStatus } func (ws *WshServer) ConnEnsureCommand(ctx context.Context, connName string) error { - connOpts, err := remote.ParseOpts(connName) - if err != nil { - return fmt.Errorf("error parsing connection name: %w", err) - } - conn := conncontroller.GetConn(ctx, connOpts, false) - if conn == nil { - return fmt.Errorf("connection not found: %s", connName) - } - connStatus := conn.DeriveConnStatus() - switch connStatus.Status { - case conncontroller.Status_Connected: - return nil - case conncontroller.Status_Connecting: - return conn.WaitForConnect(ctx) - case conncontroller.Status_Init, conncontroller.Status_Disconnected: - return conn.Connect(ctx) - case conncontroller.Status_Error: - return fmt.Errorf("connection error: %s", connStatus.Error) - default: - return fmt.Errorf("unknown connection status %q", connStatus.Status) - } + return conncontroller.EnsureConnection(ctx, connName) } func (ws *WshServer) ConnDisconnectCommand(ctx context.Context, connName string) error { diff --git a/pkg/wshutil/wshrouter.go b/pkg/wshutil/wshrouter.go index af5a5ef03..0d29d6776 100644 --- a/pkg/wshutil/wshrouter.go +++ b/pkg/wshutil/wshrouter.go @@ -4,11 +4,13 @@ package wshutil import ( + "context" "encoding/json" "errors" "fmt" "log" "sync" + "time" "github.com/wavetermdev/thenextwave/pkg/wps" "github.com/wavetermdev/thenextwave/pkg/wshrpc" @@ -237,6 +239,20 @@ func (router *WshRouter) runServer() { } } +func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error { + for { + if router.GetRpc(routeId) != nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(30 * time.Millisecond): + continue + } + } +} + // this will also consume the output channel of the abstract client func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) { if routeId == SysRoute {