connection handling / block controller handling (#326)

This commit is contained in:
Mike Sawka 2024-09-05 00:21:08 -07:00 committed by GitHub
parent b796ec9729
commit 3e0ca6b41e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 690 additions and 313 deletions

View File

@ -18,6 +18,7 @@ import (
"time"
"github.com/wavetermdev/thenextwave/pkg/authkey"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service"
"github.com/wavetermdev/thenextwave/pkg/telemetry"
@ -53,6 +54,7 @@ func doShutdown(reason string) {
log.Printf("shutting down: %s\n", reason)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
go blockcontroller.StopAllBlockControllers()
shutdownActivityUpdate()
sendTelemetryWrapper()
// TODO deal with flush in progress
@ -61,7 +63,7 @@ func doShutdown(reason string) {
if watcher != nil {
watcher.Close()
}
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
os.Exit(0)
})
}

View File

@ -276,6 +276,45 @@
}
}
.connstatus-overlay {
position: absolute;
top: var(--header-height);
left: 2px;
right: 2px;
background-color: rgba(0, 0, 0, 0.3);
z-index: var(--zindex-block-mask-inner);
display: flex;
align-items: center;
justify-content: flex-start;
flex-direction: column;
overflow: hidden;
.connstatus-mainelem {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 20px;
width: 100%;
background-color: rgba(60, 60, 60, 0.65);
backdrop-filter: blur(3px);
font: var(--base-font);
color: var(--secondary-text-color);
.connstatus-error {
color: var(--error-color);
}
.connstatus-actions {
margin-top: 10px;
display: flex;
gap: 5px;
flex-direction: row;
flex-wrap: wrap;
}
}
}
.block-mask {
position: absolute;
top: 0;
@ -288,12 +327,12 @@
border-radius: calc(var(--block-border-radius) + 2px);
z-index: var(--zindex-block-mask-inner);
&.is-layoutmode {
&.show-block-mask {
user-select: none;
pointer-events: auto;
}
&.is-layoutmode .block-mask-inner {
&.show-block-mask .block-mask-inner {
margin-top: var(--header-height); // TODO fix this magic
background-color: rgba(0, 0, 0, 0.5);
height: calc(100% - var(--header-height));

View File

@ -7,7 +7,12 @@ import { PreviewModel, PreviewView, makePreviewModel } from "@/app/view/preview/
import { ErrorBoundary } from "@/element/errorboundary";
import { CenteredDiv } from "@/element/quickelems";
import { NodeModel, useDebouncedNodeInnerRect } from "@/layout/index";
import { counterInc, getViewModel, registerViewModel, unregisterViewModel } from "@/store/global";
import {
counterInc,
getBlockComponentModel,
registerBlockComponentModel,
unregisterBlockComponentModel,
} from "@/store/global";
import * as WOS from "@/store/wos";
import { getElemAsStr } from "@/util/focusutil";
import * as util from "@/util/util";
@ -251,14 +256,15 @@ const Block = React.memo((props: BlockProps) => {
counterInc("render-Block");
counterInc("render-Block-" + props.nodeModel.blockId.substring(0, 8));
const [blockData, loading] = WOS.useWaveObjectValue<Block>(WOS.makeORef("block", props.nodeModel.blockId));
let viewModel = getViewModel(props.nodeModel.blockId);
const bcm = getBlockComponentModel(props.nodeModel.blockId);
let viewModel = bcm?.viewModel;
if (viewModel == null || viewModel.viewType != blockData?.meta?.view) {
viewModel = makeViewModel(props.nodeModel.blockId, blockData?.meta?.view, props.nodeModel);
registerViewModel(props.nodeModel.blockId, viewModel);
registerBlockComponentModel(props.nodeModel.blockId, { viewModel });
}
React.useEffect(() => {
return () => {
unregisterViewModel(props.nodeModel.blockId);
unregisterBlockComponentModel(props.nodeModel.blockId);
};
}, []);
if (loading || util.isBlank(props.nodeModel.blockId) || blockData == null) {

View File

@ -13,7 +13,15 @@ import {
import { Button } from "@/app/element/button";
import { TypeAheadModal } from "@/app/modals/typeaheadmodal";
import { ContextMenuModel } from "@/app/store/contextmenu";
import { atoms, globalStore, useBlockAtom, useSettingsKeyAtom, WOS } from "@/app/store/global";
import {
atoms,
getBlockComponentModel,
getConnStatusAtom,
globalStore,
useBlockAtom,
useSettingsKeyAtom,
WOS,
} from "@/app/store/global";
import * as services from "@/app/store/services";
import { WshServer } from "@/app/store/wshserver";
import { MagnifyIcon } from "@/element/magnify";
@ -61,14 +69,6 @@ function handleHeaderContextMenu(
},
},
];
const blockController = blockData?.meta?.controller;
if (!util.isBlank(blockController)) {
menu.push({ type: "separator" });
menu.push({
label: "Restart Controller",
click: () => WshServer.ControllerRestartCommand({ blockid: blockData.oid }),
});
}
const extraItems = viewModel?.getSettingsMenuItems?.();
if (extraItems && extraItems.length > 0) menu.push({ type: "separator" }, ...extraItems);
menu.push(
@ -256,13 +256,70 @@ function renderHeaderElements(headerTextUnion: HeaderElem[], preview: boolean):
return headerTextElems;
}
const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => {
const ConnStatusOverlay = React.memo(
({
nodeModel,
viewModel,
changeConnModalAtom,
}: {
nodeModel: NodeModel;
viewModel: ViewModel;
changeConnModalAtom: jotai.PrimitiveAtom<boolean>;
}) => {
const [blockData] = WOS.useWaveObjectValue<Block>(WOS.makeORef("block", nodeModel.blockId));
const [connModalOpen, setConnModalOpen] = jotai.useAtom(changeConnModalAtom);
const connName = blockData.meta?.connection;
const connStatus = jotai.useAtomValue(getConnStatusAtom(connName));
const isLayoutMode = jotai.useAtomValue(atoms.controlShiftDelayAtom);
const handleTryReconnect = React.useCallback(() => {
const prtn = WshServer.ConnConnectCommand(connName, { timeout: 60000 });
prtn.catch((e) => console.log("error reconnecting", connName, e));
}, [connName]);
const handleSwitchConnection = React.useCallback(() => {
setConnModalOpen(true);
}, [setConnModalOpen]);
if (isLayoutMode || connStatus.status == "connected" || connModalOpen) {
return null;
}
let statusText = `Disconnected from "${connName}"`;
let showReconnect = true;
if (connStatus.status == "connecting") {
statusText = `Connecting to "${connName}"...`;
showReconnect = false;
}
return (
<div className="connstatus-overlay">
<div className="connstatus-mainelem">
<div style={{ marginBottom: 5 }}>{statusText}</div>
{!util.isBlank(connStatus.error) ? (
<div className="connstatus-error">error: {connStatus.error}</div>
) : null}
{showReconnect ? (
<div className="connstatus-actions">
<Button className="secondary" onClick={handleTryReconnect}>
<i className="fa-sharp fa-solid fa-arrow-right-arrow-left" style={{ marginRight: 5 }} />
Reconnect Now
</Button>
<Button className="secondary" onClick={handleSwitchConnection}>
<i className="fa-sharp fa-solid fa-arrow-right-arrow-left" style={{ marginRight: 5 }} />
Switch Connection
</Button>
</div>
) : null}
</div>
</div>
);
}
);
const BlockMask = React.memo(({ nodeModel }: { nodeModel: NodeModel }) => {
const isFocused = jotai.useAtomValue(nodeModel.isFocused);
const blockNum = jotai.useAtomValue(nodeModel.blockNum);
const isLayoutMode = jotai.useAtomValue(atoms.controlShiftDelayAtom);
const [blockData] = WOS.useWaveObjectValue<Block>(WOS.makeORef("block", nodeModel.blockId));
const style: React.CSSProperties = {};
let showBlockMask = false;
if (!isFocused && blockData?.meta?.["frame:bordercolor"]) {
style.borderColor = blockData.meta["frame:bordercolor"];
}
@ -271,6 +328,7 @@ const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => {
}
let innerElem = null;
if (isLayoutMode) {
showBlockMask = true;
innerElem = (
<div className="block-mask-inner">
<div className="bignum">{blockNum}</div>
@ -278,11 +336,11 @@ const BlockMask = ({ nodeModel }: { nodeModel: NodeModel }) => {
);
}
return (
<div className={clsx("block-mask", { "is-layoutmode": isLayoutMode })} style={style}>
<div className={clsx("block-mask", { "show-block-mask": showBlockMask })} style={style}>
{innerElem}
</div>
);
};
});
const BlockFrame_Default_Component = (props: BlockFrameProps) => {
const { nodeModel, viewModel, blockModel, preview, numBlocksInTab, children } = props;
@ -290,10 +348,42 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => {
const isFocused = jotai.useAtomValue(nodeModel.isFocused);
const viewIconUnion = util.useAtomValueSafe(viewModel.viewIcon) ?? blockViewToIcon(blockData?.meta?.view);
const customBg = util.useAtomValueSafe(viewModel.blockBg);
const manageConnection = util.useAtomValueSafe(viewModel.manageConnection);
const changeConnModalAtom = useBlockAtom(nodeModel.blockId, "changeConn", () => {
return jotai.atom(false);
}) as jotai.PrimitiveAtom<boolean>;
const connBtnRef = React.useRef<HTMLDivElement>();
React.useEffect(() => {
if (!manageConnection) {
return;
}
const bcm = getBlockComponentModel(nodeModel.blockId);
if (bcm != null) {
bcm.openSwitchConnection = () => {
globalStore.set(changeConnModalAtom, true);
};
}
return () => {
const bcm = getBlockComponentModel(nodeModel.blockId);
if (bcm != null) {
bcm.openSwitchConnection = null;
}
};
}, [manageConnection]);
React.useEffect(() => {
// on mount, if manageConnection, call ConnEnsure
if (!manageConnection || blockData == null || preview) {
return;
}
const connName = blockData?.meta?.connection;
if (!util.isBlank(connName)) {
console.log("ensure conn", nodeModel.blockId, connName);
WshServer.ConnEnsureCommand(connName, { timeout: 60000 }).catch((e) => {
console.log("error ensuring connection", nodeModel.blockId, connName, e);
});
}
}, [manageConnection, blockData]);
const viewIconElem = getViewIconElem(viewIconUnion, blockData);
const innerStyle: React.CSSProperties = {};
if (!preview && customBg?.bg != null) {
@ -319,6 +409,7 @@ const BlockFrame_Default_Component = (props: BlockFrameProps) => {
ref={blockModel?.blockRef}
>
<BlockMask nodeModel={nodeModel} />
<ConnStatusOverlay nodeModel={nodeModel} viewModel={viewModel} changeConnModalAtom={changeConnModalAtom} />
<div className="block-frame-default-inner" style={innerStyle}>
<BlockFrame_Header {...props} connBtnRef={connBtnRef} changeConnModalAtom={changeConnModalAtom} />
{preview ? previewElem : children}
@ -359,6 +450,12 @@ const ChangeConnectionBlockModal = React.memo(
const isNodeFocused = jotai.useAtomValue(nodeModel.isFocused);
const changeConnection = React.useCallback(
async (connName: string) => {
if (connName == "") {
connName = null;
}
if (connName == blockData?.meta?.connection) {
return;
}
const oldCwd = blockData?.meta?.file ?? "";
let newCwd: string;
if (oldCwd == "") {
@ -370,10 +467,14 @@ const ChangeConnectionBlockModal = React.memo(
oref: WOS.makeORef("block", blockId),
meta: { connection: connName, file: newCwd },
});
await services.BlockService.EnsureConnection(blockId).catch((e) => console.log(e));
await WshServer.ControllerRestartCommand({ blockid: blockId });
const tabId = globalStore.get(atoms.activeTabId);
try {
await WshServer.ConnEnsureCommand(connName, { timeout: 60000 });
} catch (e) {
console.log("error connecting", blockId, connName, e);
}
},
[blockId]
[blockId, blockData]
);
const handleTypeAheadKeyDown = React.useCallback(
(waveEvent: WaveKeyboardEvent): boolean => {

View File

@ -160,6 +160,7 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string }
const [blockData] = WOS.useWaveObjectValue<Block>(WOS.makeORef("block", blockId));
const hasController = !util.isBlank(blockData?.meta?.controller);
const [controllerStatus, setControllerStatus] = React.useState<BlockControllerRuntimeStatus>(null);
const [gotInitialStatus, setGotInitialStatus] = React.useState(false);
const connection = blockData?.meta?.connection ?? "local";
const connStatusAtom = getConnStatusAtom(connection);
const connStatus = jotai.useAtomValue(connStatusAtom);
@ -169,6 +170,7 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string }
}
const initialRTStatus = services.BlockService.GetControllerStatus(blockId);
initialRTStatus.then((rts) => {
setGotInitialStatus(true);
setControllerStatus(rts);
});
const unsubFn = waveEventSubscribe("controllerstatus", makeORef("block", blockId), (event) => {
@ -179,25 +181,19 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string }
unsubFn();
};
}, [hasController]);
if (!hasController) {
if (!hasController || !gotInitialStatus) {
return null;
}
if (
controllerStatus == null ||
(controllerStatus?.status == "running" && controllerStatus?.shellprocstatus == "running")
) {
if (controllerStatus?.shellprocstatus == "running") {
return null;
}
if (connStatus?.status != "connected") {
return null;
}
const controllerStatusElem = (
<i
key="controller-status"
className="fa-sharp fa-solid fa-triangle-exclamation"
title="Controller Is Not Running"
style={{ color: "var(--error-color)" }}
/>
<div className="iconbutton disabled" key="controller-status">
<i className="fa-sharp fa-solid fa-triangle-exclamation" title="Shell Process Is Not Running" />
</div>
);
return controllerStatusElem;
});
@ -206,7 +202,7 @@ export const ConnectionButton = React.memo(
React.forwardRef<HTMLDivElement, ConnectionButtonProps>(
({ connection, changeConnModalAtom }: ConnectionButtonProps, ref) => {
const [connModalOpen, setConnModalOpen] = jotai.useAtom(changeConnModalAtom);
const isLocal = util.isBlank(connection) || connection == "local";
const isLocal = util.isBlank(connection);
const connStatusAtom = getConnStatusAtom(connection);
const connStatus = jotai.useAtomValue(connStatusAtom);
let showDisconnectedSlash = false;

View File

@ -23,7 +23,7 @@ let PLATFORM: NodeJS.Platform = "darwin";
const globalStore = jotai.createStore();
let atoms: GlobalAtomsType;
let globalEnvironment: "electron" | "renderer";
const blockViewModelMap = new Map<string, ViewModel>();
const blockComponentModelMap = new Map<string, BlockComponentModel>();
const Counters = new Map<string, number>();
const ConnStatusMap = new Map<string, jotai.PrimitiveAtom<ConnStatus>>();
@ -526,16 +526,16 @@ async function openLink(uri: string, forceOpenInternally = false) {
}
}
function registerViewModel(blockId: string, viewModel: ViewModel) {
blockViewModelMap.set(blockId, viewModel);
function registerBlockComponentModel(blockId: string, bcm: BlockComponentModel) {
blockComponentModelMap.set(blockId, bcm);
}
function unregisterViewModel(blockId: string) {
blockViewModelMap.delete(blockId);
function unregisterBlockComponentModel(blockId: string) {
blockComponentModelMap.delete(blockId);
}
function getViewModel(blockId: string): ViewModel {
return blockViewModelMap.get(blockId);
function getBlockComponentModel(blockId: string): BlockComponentModel {
return blockComponentModelMap.get(blockId);
}
function refocusNode(blockId: string) {
@ -548,8 +548,8 @@ function refocusNode(blockId: string) {
return;
}
layoutModel.focusNode(layoutNodeId.id);
const viewModel = getViewModel(blockId);
const ok = viewModel?.giveFocus?.();
const bcm = getBlockComponentModel(blockId);
const ok = bcm?.viewModel?.giveFocus?.();
if (!ok) {
const inputElem = document.getElementById(`${blockId}-dummy-focus`);
inputElem?.focus();
@ -604,14 +604,26 @@ function subscribeToConnEvents() {
function getConnStatusAtom(conn: string): jotai.PrimitiveAtom<ConnStatus> {
let rtn = ConnStatusMap.get(conn);
if (rtn == null) {
const connStatus: ConnStatus = {
connection: conn,
connected: false,
error: null,
status: "disconnected",
hasconnected: false,
};
rtn = jotai.atom(connStatus);
if (util.isBlank(conn)) {
// create a fake "local" status atom that's always connected
const connStatus: ConnStatus = {
connection: conn,
connected: true,
error: null,
status: "connected",
hasconnected: true,
};
rtn = jotai.atom(connStatus);
} else {
const connStatus: ConnStatus = {
connection: conn,
connected: false,
error: null,
status: "disconnected",
hasconnected: false,
};
rtn = jotai.atom(connStatus);
}
ConnStatusMap.set(conn, rtn);
}
return rtn;
@ -625,13 +637,13 @@ export {
createBlock,
fetchWaveFile,
getApi,
getBlockComponentModel,
getConnStatusAtom,
getEventORefSubject,
getEventSubject,
getFileSubject,
getObjectId,
getUserName,
getViewModel,
globalStore,
globalWS,
initGlobal,
@ -641,12 +653,12 @@ export {
openLink,
PLATFORM,
refocusNode,
registerViewModel,
registerBlockComponentModel,
sendWSCommand,
setNodeFocus,
setPlatform,
subscribeToConnEvents,
unregisterViewModel,
unregisterBlockComponentModel,
useBlockAtom,
useBlockCache,
useBlockDataLoaded,

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { atoms, createBlock, getApi, getViewModel, globalStore, refocusNode, WOS } from "@/app/store/global";
import { atoms, createBlock, getApi, getBlockComponentModel, globalStore, refocusNode, WOS } from "@/app/store/global";
import * as services from "@/app/store/services";
import {
deleteLayoutModelForTab,
@ -160,7 +160,14 @@ function appHandleKeyDown(waveEvent: WaveKeyboardEvent): boolean {
const focusedNode = globalStore.get(layoutModel.focusedNode);
const blockId = focusedNode?.data?.blockId;
if (blockId != null && shouldDispatchToBlock(waveEvent)) {
const viewModel = getViewModel(blockId);
const bcm = getBlockComponentModel(blockId);
if (bcm.openSwitchConnection != null) {
if (keyutil.checkKeyPressed(waveEvent, "Cmd:g")) {
bcm.openSwitchConnection();
return true;
}
}
const viewModel = bcm?.viewModel;
if (viewModel?.keyDownHandler) {
const handledByBlock = viewModel.keyDownHandler(waveEvent);
if (handledByBlock) {

View File

@ -7,9 +7,6 @@ import * as WOS from "./wos";
// blockservice.BlockService (block)
class BlockServiceType {
EnsureConnection(arg2: string): Promise<void> {
return WOS.callBackendService("block", "EnsureConnection", Array.from(arguments))
}
GetControllerStatus(arg2: string): Promise<BlockControllerRuntimeStatus> {
return WOS.callBackendService("block", "GetControllerStatus", Array.from(arguments))
}

View File

@ -47,9 +47,14 @@ class WshServerType {
return WOS.wshServerRpcHelper_call("controllerinput", data, opts);
}
// command "controllerrestart" [call]
ControllerRestartCommand(data: CommandBlockRestartData, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("controllerrestart", data, opts);
// command "controllerresync" [call]
ControllerResyncCommand(data: CommandControllerResyncData, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("controllerresync", data, opts);
}
// command "controllerstop" [call]
ControllerStopCommand(data: string, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("controllerstop", data, opts);
}
// command "createblock" [call]

View File

@ -3,7 +3,7 @@
import { useHeight } from "@/app/hook/useHeight";
import { useWidth } from "@/app/hook/useWidth";
import { globalStore, waveEventSubscribe, WOS } from "@/store/global";
import { getConnStatusAtom, globalStore, waveEventSubscribe, WOS } from "@/store/global";
import { WshServer } from "@/store/wshserver";
import * as util from "@/util/util";
import * as Plot from "@observablehq/plot";
@ -61,6 +61,7 @@ class CpuPlotViewModel {
metrics: jotai.Atom<string[]>;
connection: jotai.Atom<string>;
manageConnection: jotai.Atom<boolean>;
connStatus: jotai.Atom<ConnStatus>;
constructor(blockId: string) {
this.viewType = "cpuplot";
@ -122,6 +123,12 @@ class CpuPlotViewModel {
});
this.dataAtom = jotai.atom(this.getDefaultData());
this.loadInitialData();
this.connStatus = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const connName = blockData?.meta?.connection;
const connAtom = getConnStatusAtom(connName);
return get(connAtom);
});
}
async loadInitialData() {
@ -165,18 +172,24 @@ function makeCpuPlotViewModel(blockId: string): CpuPlotViewModel {
const plotColors = ["#58C142", "#FFC107", "#FF5722", "#2196F3", "#9C27B0", "#00BCD4", "#FFEB3B", "#795548"];
function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) {
const containerRef = React.useRef<HTMLInputElement>();
const plotData = jotai.useAtomValue(model.dataAtom);
const addPlotData = jotai.useSetAtom(model.addDataAtom);
const parentHeight = useHeight(containerRef);
const parentWidth = useWidth(containerRef);
const yvals = jotai.useAtomValue(model.metrics);
type CpuPlotViewProps = {
blockId: string;
model: CpuPlotViewModel;
};
function CpuPlotView({ model, blockId }: CpuPlotViewProps) {
const connName = jotai.useAtomValue(model.connection);
const lastConnName = React.useRef(connName);
const connStatus = jotai.useAtomValue(model.connStatus);
const addPlotData = jotai.useSetAtom(model.addDataAtom);
const loading = jotai.useAtomValue(model.loadingAtom);
React.useEffect(() => {
if (connStatus?.status != "connected") {
return;
}
if (lastConnName.current !== connName) {
lastConnName.current = connName;
model.loadInitialData();
}
const unsubFn = waveEventSubscribe("sysinfo", connName, (event: WaveEvent) => {
@ -191,6 +204,22 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) {
unsubFn();
};
}, [connName]);
React.useEffect(() => {}, [connName]);
if (connStatus?.status != "connected") {
return null;
}
if (loading) {
return null;
}
return <CpuPlotViewInner key={connStatus?.connection ?? "local"} blockId={blockId} model={model} />;
}
const CpuPlotViewInner = React.memo(({ model }: CpuPlotViewProps) => {
const containerRef = React.useRef<HTMLInputElement>();
const plotData = jotai.useAtomValue(model.dataAtom);
const parentHeight = useHeight(containerRef);
const parentWidth = useWidth(containerRef);
const yvals = jotai.useAtomValue(model.metrics);
React.useEffect(() => {
const marks: Plot.Markish[] = [];
@ -254,6 +283,6 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) {
}, [plotData, parentHeight, parentWidth]);
return <div className="plot-view" ref={containerRef} />;
}
});
export { CpuPlotView, CpuPlotViewModel, makeCpuPlotViewModel };

View File

@ -7,7 +7,7 @@ import { tryReinjectKey } from "@/app/store/keymodel";
import { WshServer } from "@/app/store/wshserver";
import { Markdown } from "@/element/markdown";
import { NodeModel } from "@/layout/index";
import { createBlock, globalStore, refocusNode } from "@/store/global";
import { createBlock, getConnStatusAtom, globalStore, refocusNode } from "@/store/global";
import * as services from "@/store/services";
import * as WOS from "@/store/wos";
import { getWebServerEndpoint } from "@/util/endpoints";
@ -98,6 +98,7 @@ export class PreviewModel implements ViewModel {
specializedView: jotai.Atom<Promise<{ specializedView?: string; errorStr?: string }>>;
loadableSpecializedView: jotai.Atom<Loadable<{ specializedView?: string; errorStr?: string }>>;
manageConnection: jotai.Atom<boolean>;
connStatus: jotai.Atom<ConnStatus>;
metaFilePath: jotai.Atom<string>;
statFilePath: jotai.Atom<Promise<string>>;
@ -146,10 +147,15 @@ export class PreviewModel implements ViewModel {
this.monacoRef = createRef();
this.viewIcon = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const mimeTypeLoadable = get(this.fileMimeTypeLoadable);
if (blockData?.meta?.icon) {
return blockData.meta.icon;
}
const connStatus = get(this.connStatus);
if (connStatus?.status != "connected") {
return null;
}
const fileName = get(this.metaFilePath);
const mimeTypeLoadable = get(this.fileMimeTypeLoadable);
const mimeType = util.jotaiLoadableValue(mimeTypeLoadable, "");
if (mimeType == "directory") {
return {
@ -189,9 +195,19 @@ export class PreviewModel implements ViewModel {
});
this.viewName = jotai.atom("Preview");
this.viewText = jotai.atom((get) => {
let headerPath = get(this.metaFilePath);
const connStatus = get(this.connStatus);
if (connStatus?.status != "connected") {
return [
{
elemtype: "text",
text: headerPath,
className: "preview-filename",
},
];
}
const loadableSV = get(this.loadableSpecializedView);
const isCeView = loadableSV.state == "hasData" && loadableSV.data.specializedView == "codeedit";
let headerPath = get(this.metaFilePath);
const loadableFileInfo = get(this.loadableFileInfo);
if (loadableFileInfo.state == "hasData") {
headerPath = loadableFileInfo.data?.path;
@ -248,6 +264,10 @@ export class PreviewModel implements ViewModel {
] as HeaderElem[];
});
this.preIconButton = jotai.atom((get) => {
const connStatus = get(this.connStatus);
if (connStatus?.status != "connected") {
return null;
}
const mimeType = util.jotaiLoadableValue(get(this.fileMimeTypeLoadable), "");
if (mimeType == "directory") {
return null;
@ -259,6 +279,10 @@ export class PreviewModel implements ViewModel {
};
});
this.endIconButtons = jotai.atom((get) => {
const connStatus = get(this.connStatus);
if (connStatus?.status != "connected") {
return null;
}
const mimeType = util.jotaiLoadableValue(get(this.fileMimeTypeLoadable), "");
const loadableSV = get(this.loadableSpecializedView);
const isCeView = loadableSV.state == "hasData" && loadableSV.data.specializedView == "codeedit";
@ -356,6 +380,12 @@ export class PreviewModel implements ViewModel {
this.loadableSpecializedView = loadable(this.specializedView);
this.canPreview = jotai.atom(false);
this.loadableFileInfo = loadable(this.statFile);
this.connStatus = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const connName = blockData?.meta?.connection;
const connAtom = getConnStatusAtom(connName);
return get(connAtom);
});
}
markdownShowTocToggle() {
@ -831,6 +861,10 @@ function PreviewView({
contentRef: React.RefObject<HTMLDivElement>;
model: PreviewModel;
}) {
const connStatus = jotai.useAtomValue(model.connStatus);
if (connStatus?.status != "connected") {
return null;
}
return (
<>
<OpenFileModal blockId={blockId} model={model} blockRef={blockRef} />

View File

@ -3,7 +3,7 @@
import { WshServer } from "@/app/store/wshserver";
import { VDomView } from "@/app/view/term/vdom";
import { WOS, atoms, getEventORefSubject, globalStore, useSettingsPrefixAtom } from "@/store/global";
import { WOS, atoms, getConnStatusAtom, getEventORefSubject, globalStore, useSettingsPrefixAtom } from "@/store/global";
import * as services from "@/store/services";
import * as keyutil from "@/util/keyutil";
import * as util from "@/util/util";
@ -108,6 +108,7 @@ class TermViewModel {
viewName: jotai.Atom<string>;
blockBg: jotai.Atom<MetaType>;
manageConnection: jotai.Atom<boolean>;
connStatus: jotai.Atom<ConnStatus>;
constructor(blockId: string) {
this.viewType = "term";
@ -142,10 +143,12 @@ class TermViewModel {
}
return null;
});
}
resetConnection() {
WshServer.ControllerRestartCommand({ blockid: this.blockId });
this.connStatus = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const connName = blockData?.meta?.connection;
const connAtom = getConnStatusAtom(connName);
return get(connAtom);
});
}
giveFocus(): boolean {
@ -172,21 +175,39 @@ class TermViewModel {
const fullConfig = globalStore.get(atoms.fullConfigAtom);
const termThemes = fullConfig?.termthemes ?? {};
const termThemeKeys = Object.keys(termThemes);
termThemeKeys.sort((a, b) => {
return termThemes[a]["display:order"] - termThemes[b]["display:order"];
});
const fullMenu: ContextMenuItem[] = [];
const submenu: ContextMenuItem[] = termThemeKeys.map((themeName) => {
return {
label: termThemes[themeName]["display:name"] ?? themeName,
click: () => this.setTerminalTheme(themeName),
};
});
return [
{
label: "Themes",
submenu: submenu,
fullMenu.push({
label: "Themes",
submenu: submenu,
});
fullMenu.push({ type: "separator" });
fullMenu.push({
label: "Force Restart Controller",
click: () => {
const termsize = {
rows: this.termRef.current?.terminal?.rows,
cols: this.termRef.current?.terminal?.cols,
};
const prtn = WshServer.ControllerResyncCommand({
tabid: globalStore.get(atoms.activeTabId),
blockid: this.blockId,
forcerestart: true,
rtopts: { termsize: termsize },
});
prtn.catch((e) => console.log("error controller resync (force restart)", e));
},
];
});
return fullMenu;
}
}
@ -199,6 +220,28 @@ interface TerminalViewProps {
model: TermViewModel;
}
const TermResyncHandler = React.memo(({ blockId, model }: TerminalViewProps) => {
const connStatus = jotai.useAtomValue(model.connStatus);
const [lastConnStatus, setLastConnStatus] = React.useState<ConnStatus>(connStatus);
React.useEffect(() => {
if (!model.termRef.current?.hasResized) {
return;
}
const isConnected = connStatus?.status == "connected";
const wasConnected = lastConnStatus?.status == "connected";
const curConnName = connStatus?.connection;
const lastConnName = lastConnStatus?.connection;
if (isConnected == wasConnected && curConnName == lastConnName) {
return;
}
model.termRef.current?.resyncController("resync handler");
setLastConnStatus(connStatus);
}, [connStatus]);
return null;
});
const TerminalView = ({ blockId, model }: TerminalViewProps) => {
const viewRef = React.createRef<HTMLDivElement>();
const connectElemRef = React.useRef<HTMLDivElement>(null);
@ -257,7 +300,9 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
}
if (shellProcStatusRef.current != "running" && keyutil.checkKeyPressed(waveEvent, "Enter")) {
// restart
WshServer.ControllerRestartCommand({ blockid: blockId });
const tabId = globalStore.get(atoms.activeTabId);
const prtn = WshServer.ControllerResyncCommand({ tabid: tabId, blockid: blockId });
prtn.catch((e) => console.log("error controller resync (enter)", blockId, e));
return false;
}
return true;
@ -352,6 +397,7 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
return (
<div className={clsx("view-term", "term-mode-" + termMode)} ref={viewRef}>
<TermResyncHandler blockId={blockId} model={model} />
<TermThemeUpdater blockId={blockId} termRef={termRef} />
<TermStickers config={stickerConfig} />
<div key="conntectElem" className="term-connectelem" ref={connectElemRef}></div>

View File

@ -2,7 +2,16 @@
// SPDX-License-Identifier: Apache-2.0
import { WshServer } from "@/app/store/wshserver";
import { PLATFORM, WOS, fetchWaveFile, getFileSubject, openLink, sendWSCommand } from "@/store/global";
import {
PLATFORM,
WOS,
atoms,
fetchWaveFile,
getFileSubject,
globalStore,
openLink,
sendWSCommand,
} from "@/store/global";
import * as services from "@/store/services";
import * as util from "@/util/util";
import { base64ToArray, fireAndForget } from "@/util/util";
@ -11,9 +20,12 @@ import { WebLinksAddon } from "@xterm/addon-web-links";
import { WebglAddon } from "@xterm/addon-webgl";
import * as TermTypes from "@xterm/xterm";
import { Terminal } from "@xterm/xterm";
import debug from "debug";
import { debounce } from "throttle-debounce";
import { FitAddon } from "./fitaddon";
const dlog = debug("wave:termwrap");
const TermFileName = "term";
const TermCacheFileName = "cache:term:full";
@ -49,6 +61,7 @@ export class TermWrap {
heldData: Uint8Array[];
handleResize_debounced: () => void;
isRunning: boolean;
hasResized: boolean;
constructor(
blockId: string,
@ -60,13 +73,13 @@ export class TermWrap {
this.blockId = blockId;
this.ptyOffset = 0;
this.dataBytesProcessed = 0;
this.hasResized = false;
this.terminal = new Terminal(options);
this.fitAddon = new FitAddon();
this.fitAddon.noScrollbar = PLATFORM == "darwin";
this.serializeAddon = new SerializeAddon();
this.terminal.loadAddon(this.fitAddon);
this.terminal.loadAddon(this.serializeAddon);
this.terminal.loadAddon(
new WebLinksAddon((e, uri) => {
e.preventDefault();
@ -208,18 +221,35 @@ export class TermWrap {
}
}
async resyncController(reason: string) {
dlog("resync controller", this.blockId, reason);
const tabId = globalStore.get(atoms.activeTabId);
const rtOpts: RuntimeOpts = { termsize: { rows: this.terminal.rows, cols: this.terminal.cols } };
try {
await WshServer.ControllerResyncCommand({ tabid: tabId, blockid: this.blockId, rtopts: rtOpts });
} catch (e) {
console.log(`error controller resync (${reason})`, this.blockId, e);
}
}
handleResize() {
const oldRows = this.terminal.rows;
const oldCols = this.terminal.cols;
this.fitAddon.fit();
if (oldRows !== this.terminal.rows || oldCols !== this.terminal.cols) {
const termSize: TermSize = { rows: this.terminal.rows, cols: this.terminal.cols };
const wsCommand: SetBlockTermSizeWSCommand = {
wscommand: "setblocktermsize",
blockid: this.blockId,
termsize: { rows: this.terminal.rows, cols: this.terminal.cols },
termsize: termSize,
};
sendWSCommand(wsCommand);
}
dlog("resize", `${this.terminal.rows}x${this.terminal.cols}`, `${oldRows}x${oldCols}`, this.hasResized);
if (!this.hasResized) {
this.hasResized = true;
this.resyncController("initial resize");
}
}
processAndCacheData() {

View File

@ -240,6 +240,11 @@ declare global {
version: string;
buildTime: number;
}
type BlockComponentModel = {
openSwitchConnection?: () => void;
viewModel: ViewModel;
};
}
export {};

View File

@ -15,8 +15,8 @@ declare global {
// blockcontroller.BlockControllerRuntimeStatus
type BlockControllerRuntimeStatus = {
blockid: string;
status: string;
shellprocstatus?: string;
shellprocconnname?: string;
};
// waveobj.BlockDef
@ -58,17 +58,20 @@ declare global {
termsize?: TermSize;
};
// wshrpc.CommandBlockRestartData
type CommandBlockRestartData = {
blockid: string;
};
// wshrpc.CommandBlockSetViewData
type CommandBlockSetViewData = {
blockid: string;
view: string;
};
// wshrpc.CommandControllerResyncData
type CommandControllerResyncData = {
forcerestart?: boolean;
tabid: string;
blockid: string;
rtopts?: RuntimeOpts;
};
// wshrpc.CommandCreateBlockData
type CommandCreateBlockData = {
tabid: string;

View File

@ -72,7 +72,10 @@ document.addEventListener("DOMContentLoaded", async () => {
const fullConfig = await services.FileService.GetFullConfig();
console.log("fullconfig", fullConfig);
globalStore.set(atoms.fullConfigAtom, fullConfig);
services.ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait
const prtn = services.ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait
prtn.catch((e) => {
console.log("error on initial SetActiveTab", e);
});
const reactElem = React.createElement(App, null, null);
const elem = document.getElementById("main");
const root = createRoot(elem);

View File

@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/fs"
@ -39,7 +38,6 @@ const (
)
const (
Status_Init = "init"
Status_Running = "running"
Status_Done = "done"
)
@ -66,18 +64,16 @@ type BlockController struct {
TabId string
BlockId string
BlockDef *waveobj.BlockDef
Status string
CreatedHtmlFile bool
ShellProc *shellexec.ShellProc
ShellInputCh chan *BlockInputUnion
ShellProcStatus string
StopCh chan bool
}
type BlockControllerRuntimeStatus struct {
BlockId string `json:"blockid"`
Status string `json:"status"`
ShellProcStatus string `json:"shellprocstatus,omitempty"`
BlockId string `json:"blockid"`
ShellProcStatus string `json:"shellprocstatus,omitempty"`
ShellProcConnName string `json:"shellprocconnname,omitempty"`
}
func (bc *BlockController) WithLock(f func()) {
@ -90,25 +86,14 @@ func (bc *BlockController) GetRuntimeStatus() *BlockControllerRuntimeStatus {
var rtn BlockControllerRuntimeStatus
bc.WithLock(func() {
rtn.BlockId = bc.BlockId
rtn.Status = bc.Status
rtn.ShellProcStatus = bc.ShellProcStatus
if bc.ShellProc != nil {
rtn.ShellProcConnName = bc.ShellProc.ConnName
}
})
return &rtn
}
func jsonDeepCopy(val map[string]any) (map[string]any, error) {
barr, err := json.Marshal(val)
if err != nil {
return nil, err
}
var rtn map[string]any
err = json.Unmarshal(barr, &rtn)
if err != nil {
return nil, err
}
return rtn, nil
}
func (bc *BlockController) getShellProc() *shellexec.ShellProc {
bc.Lock.Lock()
defer bc.Lock.Unlock()
@ -136,6 +121,7 @@ func (bc *BlockController) UpdateControllerAndSendUpdate(updateFn func() bool) {
Event: wshrpc.Event_ControllerStatus,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, bc.TabId).String(),
waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(),
},
Data: rtStatus,
}
@ -228,19 +214,11 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
bc.resetTerminalState()
}
err = nil
if bc.getShellProc() != nil {
bcInitStatus := bc.GetRuntimeStatus()
if bcInitStatus.ShellProcStatus == Status_Running {
return nil
}
var shellProcErr error
bc.WithLock(func() {
if bc.ShellProc != nil {
shellProcErr = fmt.Errorf("shell process already running")
return
}
})
if shellProcErr != nil {
return shellProcErr
}
// TODO better sync here (don't let two starts happen at the same times)
remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "")
var cmdStr string
cmdOpts := shellexec.CommandOptsType{
@ -288,10 +266,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
if err != nil {
return err
}
conn := conncontroller.GetConn(credentialCtx, opts, true)
conn := conncontroller.GetConn(credentialCtx, opts, false)
connStatus := conn.DeriveConnStatus()
if connStatus.Error != "" {
return fmt.Errorf("error connecting to remote: %s", connStatus.Error)
if connStatus.Status != conncontroller.Status_Connected {
return fmt.Errorf("not connected, cannot start shellproc")
}
if !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) {
jwtStr, err := wshutil.MakeClientJWTToken(wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()}, conn.GetDomainSocketName())
@ -300,7 +278,7 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
}
cmdOpts.Env[wshutil.WaveJwtTokenVarName] = jwtStr
}
shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn.Client)
shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
return err
}
@ -335,12 +313,14 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
// handles regular output from the pty (goes to the blockfile and xterm)
defer func() {
log.Printf("[shellproc] pty-read loop done\n")
// needs synchronization
bc.ShellProc.Close()
close(bc.ShellInputCh)
bc.ShellProc = nil
bc.ShellInputCh = nil
bc.WithLock(func() {
// so no other events are sent
bc.ShellInputCh = nil
})
// to stop the inputCh loop
time.Sleep(100 * time.Millisecond)
close(shellInputCh) // don't use bc.ShellInputCh (it's nil)
}()
buf := make([]byte, 4096)
for {
@ -365,6 +345,7 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
log.Printf("[shellproc] shellInputCh loop done\n")
}()
// handles input from the shellInputCh, sent to pty
// use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated)
for ic := range shellInputCh {
if len(ic.InputData) > 0 {
bc.ShellProc.Cmd.Write(ic.InputData)
@ -446,23 +427,7 @@ func setTermSize(ctx context.Context, blockId string, termSize waveobj.TermSize)
return nil
}
func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any) {
defer func() {
bc.UpdateControllerAndSendUpdate(func() bool {
if bc.Status == Status_Running {
bc.Status = Status_Done
return true
}
return false
})
globalLock.Lock()
defer globalLock.Unlock()
delete(blockControllerMap, bc.BlockId)
}()
bc.UpdateControllerAndSendUpdate(func() bool {
bc.Status = Status_Running
return true
})
func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts) {
controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
log.Printf("unknown controller %q\n", controllerName)
@ -477,51 +442,136 @@ func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any) {
runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true)
if runOnStart {
go func() {
err := bc.DoRunShellCommand(&RunShellOpts{TermSize: getTermSize(bdata)}, bdata.Meta)
var termSize waveobj.TermSize
if rtOpts != nil {
termSize = rtOpts.TermSize
} else {
termSize = getTermSize(bdata)
}
err := bc.DoRunShellCommand(&RunShellOpts{TermSize: termSize}, bdata.Meta)
if err != nil {
log.Printf("error running shell: %v\n", err)
}
}()
}
<-bc.StopCh
}
func (bc *BlockController) SendInput(inputUnion *BlockInputUnion) error {
if bc.ShellInputCh == nil {
var shellInputCh chan *BlockInputUnion
bc.WithLock(func() {
shellInputCh = bc.ShellInputCh
})
if shellInputCh == nil {
return fmt.Errorf("no shell input chan")
}
bc.ShellInputCh <- inputUnion
shellInputCh <- inputUnion
return nil
}
func (bc *BlockController) RestartController() error {
// kill the command if it's running
bc.Lock.Lock()
if bc.ShellProc != nil {
bc.ShellProc.Close()
}
bc.Lock.Unlock()
// wait for process to complete
if bc.ShellProc != nil {
doneCh := bc.ShellProc.DoneCh
<-doneCh
}
// restart controller
bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), bc.BlockId)
func CheckConnStatus(blockId string) error {
bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
err = bc.DoRunShellCommand(&RunShellOpts{TermSize: getTermSize(bdata)}, bdata.Meta)
connName := bdata.Meta.GetString(waveobj.MetaKey_Connection, "")
if connName == "" {
return nil
}
opts, err := remote.ParseOpts(connName)
if err != nil {
log.Printf("error running shell command: %v\n", err)
return fmt.Errorf("error parsing connection name: %w", err)
}
conn := conncontroller.GetConn(context.Background(), opts, false)
connStatus := conn.DeriveConnStatus()
if connStatus.Status != conncontroller.Status_Connected {
return fmt.Errorf("not connected: %s", connStatus.Status)
}
return nil
}
func StartBlockController(ctx context.Context, tabId string, blockId string) error {
log.Printf("start blockcontroller %q\n", blockId)
func (bc *BlockController) StopShellProc(shouldWait bool) {
bc.Lock.Lock()
defer bc.Lock.Unlock()
if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done {
return
}
bc.ShellProc.Close()
if shouldWait {
doneCh := bc.ShellProc.DoneCh
<-doneCh
}
}
func getOrCreateBlockController(tabId string, blockId string, controllerName string) *BlockController {
var createdController bool
var bc *BlockController
defer func() {
if !createdController || bc == nil {
return
}
bc.UpdateControllerAndSendUpdate(func() bool {
return true
})
}()
globalLock.Lock()
defer globalLock.Unlock()
bc = blockControllerMap[blockId]
if bc == nil {
bc = &BlockController{
Lock: &sync.Mutex{},
ControllerType: controllerName,
TabId: tabId,
BlockId: blockId,
ShellProcStatus: Status_Done,
}
blockControllerMap[blockId] = bc
createdController = true
}
return bc
}
func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error {
if tabId == "" || blockId == "" {
return fmt.Errorf("invalid tabId or blockId passed to ResyncController")
}
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "")
curBc := GetBlockController(blockId)
if controllerName == "" {
if curBc != nil {
StopBlockController(blockId)
}
return nil
}
// check if conn is different, if so, stop the current controller
if curBc != nil {
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName {
StopBlockController(blockId)
}
}
// now if there is a conn, ensure it is connected
if connName != "" {
err = CheckConnStatus(blockId)
if err != nil {
return fmt.Errorf("cannot start shellproc: %w", err)
}
}
if curBc == nil {
return startBlockController(ctx, tabId, blockId, rtOpts)
}
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus != Status_Running {
return startBlockController(ctx, tabId, blockId, rtOpts)
}
return nil
}
func startBlockController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error {
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
@ -534,23 +584,17 @@ func StartBlockController(ctx context.Context, tabId string, blockId string) err
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
return fmt.Errorf("unknown controller %q", controllerName)
}
globalLock.Lock()
defer globalLock.Unlock()
if _, ok := blockControllerMap[blockId]; ok {
// already running
return nil
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
log.Printf("start blockcontroller %s %q (%q)\n", blockId, controllerName, connName)
err = CheckConnStatus(blockId)
if err != nil {
return fmt.Errorf("cannot start shellproc: %w", err)
}
bc := &BlockController{
Lock: &sync.Mutex{},
ControllerType: controllerName,
TabId: tabId,
BlockId: blockId,
Status: Status_Init,
ShellProcStatus: Status_Init,
StopCh: make(chan bool),
bc := getOrCreateBlockController(tabId, blockId, controllerName)
bcStatus := bc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Done {
go bc.run(blockData, blockData.Meta, rtOpts)
}
blockControllerMap[blockId] = bc
go bc.run(blockData, blockData.Meta)
return nil
}
@ -561,8 +605,32 @@ func StopBlockController(blockId string) {
}
if bc.getShellProc() != nil {
bc.ShellProc.Close()
<-bc.ShellProc.DoneCh
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProcStatus = Status_Done
return true
})
}
}
func getControllerList() []*BlockController {
globalLock.Lock()
defer globalLock.Unlock()
var rtn []*BlockController
for _, bc := range blockControllerMap {
rtn = append(rtn, bc)
}
return rtn
}
func StopAllBlockControllers() {
clist := getControllerList()
for _, bc := range clist {
if bc.ShellProcStatus == Status_Running {
go StopBlockController(bc.BlockId)
}
}
close(bc.StopCh)
}
func GetBlockController(blockId string) *BlockController {

View File

@ -25,7 +25,6 @@ import (
"github.com/wavetermdev/thenextwave/pkg/util/shellutil"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wps"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
@ -40,6 +39,8 @@ const (
Status_Error = "error"
)
const DefaultConnectionTimeout = 60 * time.Second
var globalLock = &sync.Mutex{}
var clientControllerMap = make(map[remote.SSHOpts]*SSHConn)
@ -163,7 +164,7 @@ func (conn *SSHConn) OpenDomainSocketListener() error {
return fmt.Errorf("error generating random string: %w", err)
}
sockName := fmt.Sprintf("/tmp/waveterm-%s.sock", randStr)
log.Printf("remote domain socket %s %q\n", conn.GetName(), sockName)
log.Printf("remote domain socket %s %q\n", conn.GetName(), conn.GetDomainSocketName())
listener, err := client.ListenUnix(sockName)
if err != nil {
return fmt.Errorf("unable to request connection domain socket: %v", err)
@ -251,6 +252,13 @@ func (conn *SSHConn) StartConnServer() error {
log.Printf("[conncontroller:%s] error reading output: %v\n", conn.GetName(), readErr)
}
}()
regCtx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
err = wshutil.DefaultRouter.WaitForRegister(regCtx, wshutil.MakeConnectionRouteId(rpcCtx.Conn))
if err != nil {
return fmt.Errorf("timeout waiting for connserver to register")
}
time.Sleep(300 * time.Millisecond) // TODO remove this sleep (but we need to wait until connserver is "ready")
return nil
}
@ -373,6 +381,7 @@ func (conn *SSHConn) Connect(ctx context.Context) error {
connectAllowed = true
}
})
log.Printf("Connect %s\n", conn.GetName())
if !connectAllowed {
return fmt.Errorf("cannot connect to %q when status is %q", conn.GetName(), conn.GetStatus())
}
@ -467,45 +476,31 @@ func GetConn(ctx context.Context, opts *remote.SSHOpts, shouldConnect bool) *SSH
}
// Convenience function for ensuring a connection is established
func EnsureConnection(ctx context.Context, blockData *waveobj.Block) error {
connectionName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
if connectionName == "" {
func EnsureConnection(ctx context.Context, connName string) error {
if connName == "" {
return nil
}
credentialCtx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Second)
defer cancelFunc()
opts, err := remote.ParseOpts(connectionName)
connOpts, err := remote.ParseOpts(connName)
if err != nil {
return err
return fmt.Errorf("error parsing connection name: %w", err)
}
conn := GetConn(credentialCtx, opts, true)
statusChan := make(chan string, 1)
go func() {
// we need to wait for connected/disconnected/error
// to ensure the connection has been established before
// continuing in the original thread
for {
// GetStatus has a lock which makes this reasonable to loop over
status := conn.GetStatus()
if credentialCtx.Err() != nil {
// prevent infinite loop from context
statusChan <- Status_Error
return
}
if status == Status_Connected || status == Status_Disconnected || status == Status_Error {
statusChan <- status
return
}
}
}()
status := <-statusChan
if status == Status_Error {
return fmt.Errorf("connection error: %v", conn.Error)
} else if status == Status_Disconnected {
return fmt.Errorf("disconnected: %v", conn.Error)
conn := GetConn(ctx, connOpts, false)
if conn == nil {
return fmt.Errorf("connection not found: %s", connName)
}
connStatus := conn.DeriveConnStatus()
switch connStatus.Status {
case Status_Connected:
return nil
case Status_Connecting:
return conn.WaitForConnect(ctx)
case Status_Init, Status_Disconnected:
return conn.Connect(ctx)
case Status_Error:
return fmt.Errorf("connection error: %s", connStatus.Error)
default:
return fmt.Errorf("unknown connection status %q", connStatus.Status)
}
return nil
}
func DisconnectClient(opts *remote.SSHOpts) error {

View File

@ -292,9 +292,6 @@ func CpHostToRemote(client *ssh.Client, sourcePath string, destPath string) erro
func InstallClientRcFiles(client *ssh.Client) error {
path := GetWshPath(client)
log.Printf("path to wsh searched is: %s", path)
log.Printf("in bytes is: %v", []byte(path))
log.Printf("in bytes expected would be: %v", []byte("~/.waveterm/bin/wsh"))
session, err := client.NewSession()
if err != nil {
// this is a true error that should stop further progress

View File

@ -11,7 +11,6 @@ import (
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/remote/conncontroller"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
@ -34,10 +33,7 @@ func (bs *BlockService) SendCommand_Meta() tsgenmeta.MethodMeta {
func (bs *BlockService) GetControllerStatus(ctx context.Context, blockId string) (*blockcontroller.BlockControllerRuntimeStatus, error) {
bc := blockcontroller.GetBlockController(blockId)
if bc == nil {
return &blockcontroller.BlockControllerRuntimeStatus{
BlockId: blockId,
Status: "stopped",
}, nil
return nil, nil
}
return bc.GetRuntimeStatus(), nil
}
@ -84,11 +80,3 @@ func (bs *BlockService) SaveWaveAiData(ctx context.Context, blockId string, hist
}
return nil
}
func (bs *BlockService) EnsureConnection(ctx context.Context, blockId string) error {
block, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return err
}
return conncontroller.EnsureConnection(ctx, block)
}

View File

@ -6,12 +6,9 @@ package objectservice
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/remote/conncontroller"
"github.com/wavetermdev/thenextwave/pkg/tsgen/tsgenmeta"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wcore"
@ -22,6 +19,7 @@ import (
type ObjectService struct{}
const DefaultTimeout = 2 * time.Second
const ConnContextTimeout = 60 * time.Second
func parseORef(oref string) (*waveobj.ORef, error) {
fields := strings.Split(oref, ":")
@ -133,22 +131,6 @@ func (svc *ObjectService) SetActiveTab(uiContext waveobj.UIContext, tabId string
if err != nil {
return nil, fmt.Errorf("error getting tab: %w", err)
}
for _, blockId := range tab.BlockIds {
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
}
err = conncontroller.EnsureConnection(ctx, blockData)
if err != nil {
return nil, fmt.Errorf("unable to ensure connection: %v", err)
}
blockErr := blockcontroller.StartBlockController(ctx, tabId, blockId)
if blockErr != nil {
// we don't want to fail the set active tab operation if a block controller fails to start
log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr)
continue
}
}
blockORefs := tab.GetBlockORefs()
blocks, err := wstore.DBSelectORefs(ctx, blockORefs)
if err != nil {

View File

@ -2,7 +2,9 @@ package shellexec
import (
"io"
"os"
"os/exec"
"time"
"github.com/creack/pty"
"golang.org/x/crypto/ssh"
@ -10,6 +12,7 @@ import (
type ConnInterface interface {
Kill()
KillGraceful(time.Duration)
Wait() error
Start() error
StdinPipe() (io.WriteCloser, error)
@ -32,6 +35,22 @@ func (cw CmdWrap) Wait() error {
return cw.Cmd.Wait()
}
func (cw CmdWrap) KillGraceful(timeout time.Duration) {
if cw.Cmd.Process == nil {
return
}
if cw.Cmd.ProcessState != nil && cw.Cmd.ProcessState.Exited() {
return
}
cw.Cmd.Process.Signal(os.Interrupt)
go func() {
time.Sleep(timeout)
if cw.Cmd.ProcessState == nil || !cw.Cmd.ProcessState.Exited() {
cw.Cmd.Process.Kill() // force kill if it is already not exited
}
}()
}
func (cw CmdWrap) Start() error {
defer func() {
for _, extraFile := range cw.Cmd.ExtraFiles {
@ -75,6 +94,10 @@ func (sw SessionWrap) Kill() {
sw.Session.Close()
}
func (sw SessionWrap) KillGraceful(timeout time.Duration) {
sw.Kill()
}
func (sw SessionWrap) Wait() error {
return sw.Session.Wait()
}

View File

@ -15,16 +15,19 @@ import (
"strings"
"sync"
"syscall"
"time"
"github.com/creack/pty"
"github.com/wavetermdev/thenextwave/pkg/remote"
"github.com/wavetermdev/thenextwave/pkg/remote/conncontroller"
"github.com/wavetermdev/thenextwave/pkg/util/shellutil"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"golang.org/x/crypto/ssh"
)
const DefaultGracefulKillWait = 400 * time.Millisecond
type CommandOptsType struct {
Interactive bool `json:"interactive,omitempty"`
Login bool `json:"login,omitempty"`
@ -33,6 +36,7 @@ type CommandOptsType struct {
}
type ShellProc struct {
ConnName string
Cmd ConnInterface
CloseOnce *sync.Once
DoneCh chan any // closed after proc.Wait() returns
@ -40,7 +44,7 @@ type ShellProc struct {
}
func (sp *ShellProc) Close() {
sp.Cmd.Kill()
sp.Cmd.KillGraceful(DefaultGracefulKillWait)
go func() {
waitErr := sp.Cmd.Wait()
sp.SetWaitErrorAndSignalDone(waitErr)
@ -134,7 +138,8 @@ func (pp *PipePty) WriteString(s string) (n int, err error) {
return pp.Write([]byte(s))
}
func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts CommandOptsType, client *ssh.Client) (*ShellProc, error) {
func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts CommandOptsType, conn *conncontroller.SSHConn) (*ShellProc, error) {
client := conn.GetClient()
shellPath, err := remote.DetectShell(client)
if err != nil {
return nil, err
@ -244,7 +249,7 @@ func StartRemoteShellProc(termSize waveobj.TermSize, cmdStr string, cmdOpts Comm
pipePty.Close()
return nil, err
}
return &ShellProc{Cmd: sessionWrap, CloseOnce: &sync.Once{}, DoneCh: make(chan any)}, nil
return &ShellProc{Cmd: sessionWrap, ConnName: conn.GetName(), CloseOnce: &sync.Once{}, DoneCh: make(chan any)}, nil
}
func isZshShell(shellPath string) bool {

View File

@ -11,7 +11,6 @@ import (
"github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/remote/conncontroller"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wps"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
@ -24,6 +23,7 @@ import (
// TODO bring Tx infra into wcore
const DefaultTimeout = 2 * time.Second
const DefaultActivateBlockTimeout = 60 * time.Second
func DeleteBlock(ctx context.Context, tabId string, blockId string) error {
err := wstore.DeleteBlock(ctx, tabId, blockId)
@ -174,17 +174,5 @@ func CreateBlock(ctx context.Context, tabId string, blockDef *waveobj.BlockDef,
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
err = conncontroller.EnsureConnection(ctx, blockData)
if err != nil {
return nil, fmt.Errorf("unable to ensure connection: %v", err)
}
controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName != "" {
err = blockcontroller.StartBlockController(ctx, tabId, blockData.OID)
if err != nil {
return nil, fmt.Errorf("error starting block controller: %w", err)
}
}
return blockData, nil
}

View File

@ -60,9 +60,15 @@ func ControllerInputCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockInputData
return err
}
// command "controllerrestart", wshserver.ControllerRestartCommand
func ControllerRestartCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockRestartData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "controllerrestart", data, opts)
// command "controllerresync", wshserver.ControllerResyncCommand
func ControllerResyncCommand(w *wshutil.WshRpc, data wshrpc.CommandControllerResyncData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "controllerresync", data, opts)
return err
}
// command "controllerstop", wshserver.ControllerStopCommand
func ControllerStopCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "controllerstop", data, opts)
return err
}

View File

@ -41,6 +41,8 @@ const (
Command_SetView = "setview"
Command_ControllerInput = "controllerinput"
Command_ControllerRestart = "controllerrestart"
Command_ControllerStop = "controllerstop"
Command_ControllerResync = "controllerresync"
Command_FileAppend = "fileappend"
Command_FileAppendIJson = "fileappendijson"
Command_ResolveIds = "resolveids"
@ -84,7 +86,8 @@ type WshRpcInterface interface {
SetMetaCommand(ctx context.Context, data CommandSetMetaData) error
SetViewCommand(ctx context.Context, data CommandBlockSetViewData) error
ControllerInputCommand(ctx context.Context, data CommandBlockInputData) error
ControllerRestartCommand(ctx context.Context, data CommandBlockRestartData) error
ControllerStopCommand(ctx context.Context, blockId string) error
ControllerResyncCommand(ctx context.Context, data CommandControllerResyncData) error
FileAppendCommand(ctx context.Context, data CommandFileData) error
FileAppendIJsonCommand(ctx context.Context, data CommandAppendIJsonData) error
ResolveIdsCommand(ctx context.Context, data CommandResolveIdsData) (CommandResolveIdsRtnData, error)
@ -217,8 +220,11 @@ type CommandBlockSetViewData struct {
View string `json:"view"`
}
type CommandBlockRestartData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
type CommandControllerResyncData struct {
ForceRestart bool `json:"forcerestart,omitempty"`
TabId string `json:"tabid" wshcontext:"TabId"`
BlockId string `json:"blockid" wshcontext:"BlockId"`
RtOpts *waveobj.RuntimeOpts `json:"rtopts,omitempty"`
}
type CommandBlockInputData struct {

View File

@ -284,12 +284,20 @@ func (ws *WshServer) SetViewCommand(ctx context.Context, data wshrpc.CommandBloc
return nil
}
func (ws *WshServer) ControllerRestartCommand(ctx context.Context, data wshrpc.CommandBlockRestartData) error {
bc := blockcontroller.GetBlockController(data.BlockId)
func (ws *WshServer) ControllerStopCommand(ctx context.Context, blockId string) error {
bc := blockcontroller.GetBlockController(blockId)
if bc == nil {
return fmt.Errorf("block controller not found for block %q", data.BlockId)
return nil
}
return bc.RestartController()
bc.StopShellProc(true)
return nil
}
func (ws *WshServer) ControllerResyncCommand(ctx context.Context, data wshrpc.CommandControllerResyncData) error {
if data.ForceRestart {
blockcontroller.StopBlockController(data.BlockId)
}
return blockcontroller.ResyncController(ctx, data.TabId, data.BlockId, data.RtOpts)
}
func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.CommandBlockInputData) error {
@ -476,27 +484,7 @@ func (ws *WshServer) ConnStatusCommand(ctx context.Context) ([]wshrpc.ConnStatus
}
func (ws *WshServer) ConnEnsureCommand(ctx context.Context, connName string) error {
connOpts, err := remote.ParseOpts(connName)
if err != nil {
return fmt.Errorf("error parsing connection name: %w", err)
}
conn := conncontroller.GetConn(ctx, connOpts, false)
if conn == nil {
return fmt.Errorf("connection not found: %s", connName)
}
connStatus := conn.DeriveConnStatus()
switch connStatus.Status {
case conncontroller.Status_Connected:
return nil
case conncontroller.Status_Connecting:
return conn.WaitForConnect(ctx)
case conncontroller.Status_Init, conncontroller.Status_Disconnected:
return conn.Connect(ctx)
case conncontroller.Status_Error:
return fmt.Errorf("connection error: %s", connStatus.Error)
default:
return fmt.Errorf("unknown connection status %q", connStatus.Status)
}
return conncontroller.EnsureConnection(ctx, connName)
}
func (ws *WshServer) ConnDisconnectCommand(ctx context.Context, connName string) error {

View File

@ -4,11 +4,13 @@
package wshutil
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/wavetermdev/thenextwave/pkg/wps"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
@ -237,6 +239,20 @@ func (router *WshRouter) runServer() {
}
}
func (router *WshRouter) WaitForRegister(ctx context.Context, routeId string) error {
for {
if router.GetRpc(routeId) != nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(30 * time.Millisecond):
continue
}
}
}
// this will also consume the output channel of the abstract client
func (router *WshRouter) RegisterRoute(routeId string, rpc AbstractRpcClient) {
if routeId == SysRoute {