Merge pull request #7 from wavetermdev/sawka/wstore

WaveObj + objectservice + WaveObjectStore

New distributed object system for Wave. Every object has an otype + oid (oref), and can be persisted to the DB and gotten from the DB using those attributes. The frontend tracks a cache of objects (based on orefs) and the store is integrated with jotai atoms for use with react components.

The WaveObjectStore passes a new UIContext through to the backend, and will seamlessly handle updated/deleted objects passed back (updating the store).

The backend DB operations are now all generic. As the updates happen they update the context adding the updated/deleted objects to seamlessly pass them back to the client.

Simplified global.ts store. Have a new way to force an object to be present in the cache at the top level (await loadAndPin) along with integration with nice loading flags for async fetching (or integration with React.Suspense)
This commit is contained in:
Mike Sawka 2024-05-27 16:40:26 -07:00 committed by GitHub
commit cdaa85f92f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 2076 additions and 353 deletions

View File

@ -34,7 +34,7 @@ Now to run the dev version of the app:
wails3 dev wails3 dev
``` ```
You should see a very poorly laid out app :) You should see the app!
Now to build a MacOS application: Now to build a MacOS application:

View File

@ -0,0 +1,26 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"reflect"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
func main() {
tsTypesMap := make(map[reflect.Type]string)
var waveObj waveobj.WaveObj
waveobj.GenerateTSType(reflect.TypeOf(waveobj.ORef{}), tsTypesMap)
waveobj.GenerateTSType(reflect.TypeOf(&waveObj).Elem(), tsTypesMap)
for _, rtype := range wstore.AllWaveObjTypes() {
waveobj.GenerateTSType(rtype, tsTypesMap)
}
for _, ts := range tsTypesMap {
fmt.Print(ts)
fmt.Print("\n")
}
}

View File

@ -7,3 +7,6 @@ import "embed"
//go:embed migrations-blockstore/*.sql //go:embed migrations-blockstore/*.sql
var BlockstoreMigrationFS embed.FS var BlockstoreMigrationFS embed.FS
//go:embed migrations-wstore/*.sql
var WStoreMigrationFS embed.FS

View File

@ -1,20 +1,29 @@
CREATE TABLE db_client ( CREATE TABLE db_client (
clientid varchar(36) PRIMARY KEY, -- unnecessary, but useful to have a PK oid varchar(36) PRIMARY KEY,
version int NOT NULL,
data json NOT NULL
);
CREATE TABLE db_window (
oid varchar(36) PRIMARY KEY,
version int NOT NULL,
data json NOT NULL data json NOT NULL
); );
CREATE TABLE db_workspace ( CREATE TABLE db_workspace (
workspaceid varchar(36) PRIMARY KEY, oid varchar(36) PRIMARY KEY,
version int NOT NULL,
data json NOT NULL data json NOT NULL
); );
CREATE TABLE db_tab ( CREATE TABLE db_tab (
tabid varchar(36) PRIMARY KEY, oid varchar(36) PRIMARY KEY,
version int NOT NULL,
data json NOT NULL data json NOT NULL
); );
CREATE TABLE db_block ( CREATE TABLE db_block (
blockid varchar(36) PRIMARY KEY, oid varchar(36) PRIMARY KEY,
tabid varchar(36) NOT NULL, -- the tab this block belongs to version int NOT NULL,
data json NOT NULL data json NOT NULL
); );

View File

@ -9,6 +9,7 @@ import { Workspace } from "@/app/workspace/workspace";
import { globalStore, atoms } from "@/store/global"; import { globalStore, atoms } from "@/store/global";
import "../../public/style.less"; import "../../public/style.less";
import { CenteredDiv } from "./element/quickelems";
const App = () => { const App = () => {
return ( return (
@ -19,6 +20,16 @@ const App = () => {
}; };
const AppInner = () => { const AppInner = () => {
const client = jotai.useAtomValue(atoms.client);
const windowData = jotai.useAtomValue(atoms.waveWindow);
if (client == null || windowData == null) {
return (
<div className="mainapp">
<div className="titlebar"></div>
<CenteredDiv>invalid configuration, client or window was not loaded</CenteredDiv>
</div>
);
}
return ( return (
<div className="mainapp"> <div className="mainapp">
<div className="titlebar"></div> <div className="titlebar"></div>

View File

@ -3,8 +3,7 @@
import * as React from "react"; import * as React from "react";
import * as jotai from "jotai"; import * as jotai from "jotai";
import { atoms, blockDataMap, removeBlockFromTab } from "@/store/global"; import * as WOS from "@/store/wos";
import { TerminalView } from "@/app/view/term"; import { TerminalView } from "@/app/view/term";
import { PreviewView } from "@/app/view/preview"; import { PreviewView } from "@/app/view/preview";
import { PlotView } from "@/app/view/plotview"; import { PlotView } from "@/app/view/plotview";
@ -17,7 +16,7 @@ const Block = ({ tabId, blockId }: { tabId: string; blockId: string }) => {
const [dims, setDims] = React.useState({ width: 0, height: 0 }); const [dims, setDims] = React.useState({ width: 0, height: 0 });
function handleClose() { function handleClose() {
removeBlockFromTab(tabId, blockId); WOS.DeleteBlock(blockId);
} }
React.useEffect(() => { React.useEffect(() => {
@ -31,10 +30,12 @@ const Block = ({ tabId, blockId }: { tabId: string; blockId: string }) => {
setDims({ width: newWidth, height: newHeight }); setDims({ width: newWidth, height: newHeight });
} }
}, [blockRef.current]); }, [blockRef.current]);
let blockElem: JSX.Element = null; let blockElem: JSX.Element = null;
const blockAtom = blockDataMap.get(blockId); const [blockData, blockDataLoading] = WOS.useWaveObjectValue<Block>(WOS.makeORef("block", blockId));
const blockData = jotai.useAtomValue(blockAtom); if (blockDataLoading) {
if (blockData.view === "term") { blockElem = <CenteredDiv>Loading...</CenteredDiv>;
} else if (blockData.view === "term") {
blockElem = <TerminalView blockId={blockId} />; blockElem = <TerminalView blockId={blockId} />;
} else if (blockData.view === "preview") { } else if (blockData.view === "preview") {
blockElem = <PreviewView blockId={blockId} />; blockElem = <PreviewView blockId={blockId} />;

View File

@ -3,6 +3,10 @@
import "./quickelems.less"; import "./quickelems.less";
function CenteredLoadingDiv() {
return <CenteredDiv>loading...</CenteredDiv>;
}
function CenteredDiv({ children }: { children: React.ReactNode }) { function CenteredDiv({ children }: { children: React.ReactNode }) {
return ( return (
<div className="centered-div"> <div className="centered-div">
@ -11,4 +15,4 @@ function CenteredDiv({ children }: { children: React.ReactNode }) {
); );
} }
export { CenteredDiv as CenteredDiv }; export { CenteredDiv, CenteredLoadingDiv };

View File

@ -2,26 +2,56 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
import * as jotai from "jotai"; import * as jotai from "jotai";
import { atomFamily } from "jotai/utils";
import { v4 as uuidv4 } from "uuid";
import * as rxjs from "rxjs"; import * as rxjs from "rxjs";
import type { WailsEvent } from "@wailsio/runtime/types/events";
import { Events } from "@wailsio/runtime"; import { Events } from "@wailsio/runtime";
import { produce } from "immer"; import * as WOS from "./wos";
import { BlockService } from "@/bindings/blockservice";
const globalStore = jotai.createStore(); const globalStore = jotai.createStore();
const urlParams = new URLSearchParams(window.location.search);
const tabId1 = uuidv4(); const globalWindowId = urlParams.get("windowid");
const globalClientId = urlParams.get("clientid");
const tabArr: TabData[] = [{ name: "Tab 1", tabid: tabId1, blockIds: [] }]; const windowIdAtom = jotai.atom(null) as jotai.PrimitiveAtom<string>;
const blockDataMap = new Map<string, jotai.Atom<BlockData>>(); const clientIdAtom = jotai.atom(null) as jotai.PrimitiveAtom<string>;
const blockAtomCache = new Map<string, Map<string, jotai.Atom<any>>>(); globalStore.set(windowIdAtom, globalWindowId);
globalStore.set(clientIdAtom, globalClientId);
const uiContextAtom = jotai.atom((get) => {
const windowData = get(windowDataAtom);
const uiContext: UIContext = {
windowid: get(atoms.windowId),
activetabid: windowData.activetabid,
};
return uiContext;
}) as jotai.Atom<UIContext>;
const clientAtom: jotai.Atom<Client> = jotai.atom((get) => {
const clientId = get(clientIdAtom);
if (clientId == null) {
return null;
}
return WOS.getStaticObjectValue(WOS.makeORef("client", clientId), get);
});
const windowDataAtom: jotai.Atom<WaveWindow> = jotai.atom((get) => {
const windowId = get(windowIdAtom);
if (windowId == null) {
return null;
}
return WOS.getStaticObjectValue(WOS.makeORef("window", windowId), get);
});
const workspaceAtom: jotai.Atom<Workspace> = jotai.atom((get) => {
const windowData = get(windowDataAtom);
if (windowData == null) {
return null;
}
return WOS.getStaticObjectValue(WOS.makeORef("workspace", windowData.workspaceid), get);
});
const atoms = { const atoms = {
activeTabId: jotai.atom<string>(tabId1), // initialized in wave.ts (will not be null inside of application)
tabsAtom: jotai.atom<TabData[]>(tabArr), windowId: windowIdAtom,
blockDataMap: blockDataMap, clientId: clientIdAtom,
uiContext: uiContextAtom,
client: clientAtom,
waveWindow: windowDataAtom,
workspace: workspaceAtom,
}; };
type SubjectWithRef<T> = rxjs.Subject<T> & { refCount: number; release: () => void }; type SubjectWithRef<T> = rxjs.Subject<T> & { refCount: number; release: () => void };
@ -60,19 +90,7 @@ Events.On("block:ptydata", (event: any) => {
subject.next(data); subject.next(data);
}); });
function addBlockIdToTab(tabId: string, blockId: string) { const blockAtomCache = new Map<string, Map<string, jotai.Atom<any>>>();
let tabArr = globalStore.get(atoms.tabsAtom);
const newTabArr = produce(tabArr, (draft) => {
const tab = draft.find((tab) => tab.tabid == tabId);
tab.blockIds.push(blockId);
});
globalStore.set(atoms.tabsAtom, newTabArr);
}
function removeBlock(blockId: string) {
blockDataMap.delete(blockId);
blockAtomCache.delete(blockId);
}
function useBlockAtom<T>(blockId: string, name: string, makeFn: () => jotai.Atom<T>): jotai.Atom<T> { function useBlockAtom<T>(blockId: string, name: string, makeFn: () => jotai.Atom<T>): jotai.Atom<T> {
let blockCache = blockAtomCache.get(blockId); let blockCache = blockAtomCache.get(blockId);
@ -84,19 +102,9 @@ function useBlockAtom<T>(blockId: string, name: string, makeFn: () => jotai.Atom
if (atom == null) { if (atom == null) {
atom = makeFn(); atom = makeFn();
blockCache.set(name, atom); blockCache.set(name, atom);
console.log("New BlockAtom", blockId, name);
} }
return atom as jotai.Atom<T>; return atom as jotai.Atom<T>;
} }
function removeBlockFromTab(tabId: string, blockId: string) { export { globalStore, atoms, getBlockSubject, useBlockAtom, WOS };
let tabArr = globalStore.get(atoms.tabsAtom);
const newTabArr = produce(tabArr, (draft) => {
const tab = draft.find((tab) => tab.tabid == tabId);
tab.blockIds = tab.blockIds.filter((id) => id !== blockId);
});
globalStore.set(atoms.tabsAtom, newTabArr);
removeBlock(blockId);
BlockService.CloseBlock(blockId);
}
export { globalStore, atoms, getBlockSubject, addBlockIdToTab, blockDataMap, useBlockAtom, removeBlockFromTab };

287
frontend/app/store/wos.ts Normal file
View File

@ -0,0 +1,287 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// WaveObjectStore
import * as React from "react";
import * as jotai from "jotai";
import { Events } from "@wailsio/runtime";
import { Call as $Call } from "@wailsio/runtime";
import { globalStore, atoms } from "./global";
type WaveObjectDataItemType<T extends WaveObj> = {
value: T;
loading: boolean;
};
type WaveObjectValue<T extends WaveObj> = {
pendingPromise: Promise<T>;
dataAtom: jotai.PrimitiveAtom<WaveObjectDataItemType<T>>;
refCount: number;
holdTime: number;
};
function splitORef(oref: string): [string, string] {
let parts = oref.split(":");
if (parts.length != 2) {
throw new Error("invalid oref");
}
return [parts[0], parts[1]];
}
function isBlank(str: string): boolean {
return str == null || str == "";
}
function isBlankNum(num: number): boolean {
return num == null || isNaN(num) || num == 0;
}
function isValidWaveObj(val: WaveObj): boolean {
if (val == null) {
return false;
}
if (isBlank(val.otype) || isBlank(val.oid) || isBlankNum(val.version)) {
return false;
}
return true;
}
function makeORef(otype: string, oid: string): string {
if (isBlank(otype) || isBlank(oid)) {
return null;
}
return `${otype}:${oid}`;
}
function GetObject<T>(oref: string): Promise<T> {
let prtn = $Call.ByName(
"github.com/wavetermdev/thenextwave/pkg/service/objectservice.ObjectService.GetObject",
oref
);
return prtn;
}
const waveObjectValueCache = new Map<string, WaveObjectValue<any>>();
function clearWaveObjectCache() {
waveObjectValueCache.clear();
}
const defaultHoldTime = 5000; // 5-seconds
function createWaveValueObject<T extends WaveObj>(oref: string, shouldFetch: boolean): WaveObjectValue<T> {
const wov = { pendingPromise: null, dataAtom: null, refCount: 0, holdTime: Date.now() + 5000 };
wov.dataAtom = jotai.atom({ value: null, loading: true });
if (!shouldFetch) {
return wov;
}
let startTs = Date.now();
let localPromise = GetObject<T>(oref);
wov.pendingPromise = localPromise;
localPromise.then((val) => {
if (wov.pendingPromise != localPromise) {
return;
}
const [otype, oid] = splitORef(oref);
if (val != null) {
if (val["otype"] != otype) {
throw new Error("GetObject returned wrong type");
}
if (val["oid"] != oid) {
throw new Error("GetObject returned wrong id");
}
}
wov.pendingPromise = null;
globalStore.set(wov.dataAtom, { value: val, loading: false });
console.log("WaveObj resolved", oref, Date.now() - startTs + "ms");
});
return wov;
}
function loadAndPinWaveObject<T>(oref: string): Promise<T> {
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
wov = createWaveValueObject(oref, true);
waveObjectValueCache.set(oref, wov);
}
wov.refCount++;
if (wov.pendingPromise == null) {
const dataValue = globalStore.get(wov.dataAtom);
return Promise.resolve(dataValue.value);
}
return wov.pendingPromise;
}
function useWaveObjectValueWithSuspense<T>(oref: string): T {
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
wov = createWaveValueObject(oref, true);
waveObjectValueCache.set(oref, wov);
}
React.useEffect(() => {
wov.refCount++;
return () => {
wov.refCount--;
};
}, [oref]);
const dataValue = jotai.useAtomValue(wov.dataAtom);
if (dataValue.loading) {
throw wov.pendingPromise;
}
return dataValue.value;
}
function useWaveObjectValue<T>(oref: string): [T, boolean] {
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
wov = createWaveValueObject(oref, true);
waveObjectValueCache.set(oref, wov);
}
React.useEffect(() => {
wov.refCount++;
return () => {
wov.refCount--;
};
}, [oref]);
const atomVal = jotai.useAtomValue(wov.dataAtom);
return [atomVal.value, atomVal.loading];
}
function useWaveObject<T>(oref: string): [T, boolean, (T) => void] {
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
wov = createWaveValueObject(oref, true);
waveObjectValueCache.set(oref, wov);
}
React.useEffect(() => {
wov.refCount++;
return () => {
wov.refCount--;
};
}, [oref]);
const [atomVal, setAtomVal] = jotai.useAtom(wov.dataAtom);
const simpleSet = (val: T) => {
setAtomVal({ value: val, loading: false });
};
return [atomVal.value, atomVal.loading, simpleSet];
}
function updateWaveObject(update: WaveObjUpdate) {
if (update == null) {
return;
}
let oref = makeORef(update.otype, update.oid);
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
wov = createWaveValueObject(oref, false);
waveObjectValueCache.set(oref, wov);
}
if (update.updatetype == "delete") {
console.log("WaveObj deleted", oref);
globalStore.set(wov.dataAtom, { value: null, loading: false });
} else {
if (!isValidWaveObj(update.obj)) {
console.log("invalid wave object update", update);
return;
}
let curValue: WaveObjectDataItemType<WaveObj> = globalStore.get(wov.dataAtom);
if (curValue.value != null && curValue.value.version >= update.obj.version) {
return;
}
console.log("WaveObj updated", oref);
globalStore.set(wov.dataAtom, { value: update.obj, loading: false });
}
wov.holdTime = Date.now() + defaultHoldTime;
return;
}
function updateWaveObjects(vals: WaveObjUpdate[]) {
for (let val of vals) {
updateWaveObject(val);
}
}
function cleanWaveObjectCache() {
let now = Date.now();
for (let [oref, wov] of waveObjectValueCache) {
if (wov.refCount == 0 && wov.holdTime < now) {
waveObjectValueCache.delete(oref);
}
}
}
Events.On("waveobj:update", (event: any) => {
const data: WaveObjUpdate[] = event?.data;
if (data == null) {
return;
}
if (!Array.isArray(data)) {
console.log("invalid waveobj:update, not an array", data);
return;
}
if (data.length == 0) {
return;
}
updateWaveObjects(data);
});
function wrapObjectServiceCall<T>(fnName: string, ...args: any[]): Promise<T> {
const uiContext = globalStore.get(atoms.uiContext);
const startTs = Date.now();
let prtn = $Call.ByName(
"github.com/wavetermdev/thenextwave/pkg/service/objectservice.ObjectService." + fnName,
uiContext,
...args
);
prtn = prtn.then((val) => {
console.log("Call", fnName, Date.now() - startTs + "ms");
if (val.updates) {
updateWaveObjects(val.updates);
}
return val;
});
return prtn;
}
function getStaticObjectValue<T>(oref: string, getFn: jotai.Getter): T {
let wov = waveObjectValueCache.get(oref);
if (wov == null) {
return null;
}
const atomVal = getFn(wov.dataAtom);
return atomVal.value;
}
export function AddTabToWorkspace(tabName: string, activateTab: boolean): Promise<{ tabId: string }> {
return wrapObjectServiceCall("AddTabToWorkspace", tabName, activateTab);
}
export function SetActiveTab(tabId: string): Promise<void> {
return wrapObjectServiceCall("SetActiveTab", tabId);
}
export function CreateBlock(blockDef: BlockDef, rtOpts: RuntimeOpts): Promise<{ blockId: string }> {
return wrapObjectServiceCall("CreateBlock", blockDef, rtOpts);
}
export function DeleteBlock(blockId: string): Promise<void> {
return wrapObjectServiceCall("DeleteBlock", blockId);
}
export function CloseTab(tabId: string): Promise<void> {
return wrapObjectServiceCall("CloseTab", tabId);
}
export {
makeORef,
useWaveObject,
useWaveObjectValue,
useWaveObjectValueWithSuspense,
loadAndPinWaveObject,
clearWaveObjectCache,
updateWaveObject,
updateWaveObjects,
cleanWaveObjectCache,
getStaticObjectValue,
};

View File

@ -5,21 +5,29 @@ import * as React from "react";
import * as jotai from "jotai"; import * as jotai from "jotai";
import { Block } from "@/app/block/block"; import { Block } from "@/app/block/block";
import { atoms } from "@/store/global"; import { atoms } from "@/store/global";
import * as WOS from "@/store/wos";
import "./tab.less"; import "./tab.less";
import { CenteredDiv, CenteredLoadingDiv } from "../element/quickelems";
const TabContent = ({ tabId }: { tabId: string }) => { const TabContent = ({ tabId }: { tabId: string }) => {
const tabs = jotai.useAtomValue(atoms.tabsAtom); const [tabData, tabLoading] = WOS.useWaveObjectValue<Tab>(WOS.makeORef("tab", tabId));
const tabData = tabs.find((tab) => tab.tabid === tabId); if (tabLoading) {
return <CenteredLoadingDiv />;
}
if (!tabData) { if (!tabData) {
return <div className="tabcontent">Tab not found</div>; return (
<div className="tabcontent">
<CenteredDiv>Tab Not Found</CenteredDiv>
</div>
);
} }
return ( return (
<div className="tabcontent"> <div className="tabcontent">
{tabData.blockIds.map((blockId: string) => { {tabData.blockids.map((blockId: string) => {
return ( return (
<div key={blockId} className="block-container"> <div key={blockId} className="block-container">
<Block tabId={tabId} blockId={blockId} /> <Block key={blockId} tabId={tabId} blockId={blockId} />
</div> </div>
); );
})} })}

View File

@ -3,14 +3,16 @@
import * as React from "react"; import * as React from "react";
import * as jotai from "jotai"; import * as jotai from "jotai";
import { atoms, blockDataMap, useBlockAtom } from "@/store/global"; import { atoms, useBlockAtom } from "@/store/global";
import { Markdown } from "@/element/markdown"; import { Markdown } from "@/element/markdown";
import { FileService, FileInfo, FullFile } from "@/bindings/fileservice"; import { FileService, FileInfo, FullFile } from "@/bindings/fileservice";
import * as util from "@/util/util"; import * as util from "@/util/util";
import { CenteredDiv } from "../element/quickelems"; import { CenteredDiv } from "../element/quickelems";
import { DirectoryTable } from "@/element/directorytable"; import { DirectoryTable } from "@/element/directorytable";
import * as WOS from "@/store/wos";
import "./view.less"; import "./view.less";
import { first } from "rxjs";
const MaxFileSize = 1024 * 1024 * 10; // 10MB const MaxFileSize = 1024 * 1024 * 10; // 10MB
@ -61,10 +63,17 @@ function DirectoryPreview({ contentAtom }: { contentAtom: jotai.Atom<Promise<str
} }
function PreviewView({ blockId }: { blockId: string }) { function PreviewView({ blockId }: { blockId: string }) {
const blockDataAtom: jotai.Atom<BlockData> = blockDataMap.get(blockId); const blockData = WOS.useWaveObjectValueWithSuspense<Block>(WOS.makeORef("block", blockId));
if (blockData == null) {
return (
<div className="view-preview">
<CenteredDiv>Block Not Found</CenteredDiv>
</div>
);
}
const fileNameAtom = useBlockAtom(blockId, "preview:filename", () => const fileNameAtom = useBlockAtom(blockId, "preview:filename", () =>
jotai.atom<string>((get) => { jotai.atom<string>((get) => {
return get(blockDataAtom)?.meta?.file; return blockData?.meta?.file;
}) })
); );
const statFileAtom = useBlockAtom(blockId, "preview:statfile", () => const statFileAtom = useBlockAtom(blockId, "preview:statfile", () =>

View File

@ -55,9 +55,24 @@
height: 100%; height: 100%;
border-right: 1px solid var(--border-color); border-right: 1px solid var(--border-color);
cursor: pointer; cursor: pointer;
position: relative;
&.active { &.active {
background-color: var(--highlight-bg-color); background-color: var(--highlight-bg-color);
} }
&.active:hover .tab-close {
display: block;
}
.tab-close {
position: absolute;
display: none;
padding: 5px;
right: 2px;
top: 5px;
cursor: pointer;
}
} }
.tab-add { .tab-add {

View File

@ -5,37 +5,46 @@ import * as React from "react";
import * as jotai from "jotai"; import * as jotai from "jotai";
import { TabContent } from "@/app/tab/tab"; import { TabContent } from "@/app/tab/tab";
import { clsx } from "clsx"; import { clsx } from "clsx";
import { atoms, addBlockIdToTab, blockDataMap } from "@/store/global"; import { atoms } from "@/store/global";
import { v4 as uuidv4 } from "uuid"; import * as WOS from "@/store/wos";
import { BlockService } from "@/bindings/blockservice"; import { CenteredLoadingDiv, CenteredDiv } from "../element/quickelems";
import "./workspace.less"; import "./workspace.less";
function Tab({ tab }: { tab: TabData }) { function Tab({ tabId }: { tabId: string }) {
const [activeTab, setActiveTab] = jotai.useAtom(atoms.activeTabId); const windowData = jotai.useAtomValue(atoms.waveWindow);
const [tabData, tabLoading] = WOS.useWaveObjectValue<Tab>(WOS.makeORef("tab", tabId));
function setActiveTab() {
WOS.SetActiveTab(tabId);
}
function handleCloseTab() {
WOS.CloseTab(tabId);
}
return ( return (
<div className={clsx("tab", { active: activeTab === tab.tabid })} onClick={() => setActiveTab(tab.tabid)}> <div
{tab.name} className={clsx("tab", { active: tabData != null && windowData.activetabid === tabData.oid })}
onClick={() => setActiveTab()}
>
<div className="tab-close" onClick={() => handleCloseTab()}>
<div>
<i className="fa fa-solid fa-xmark" />
</div>
</div>
{tabData?.name ?? "..."}
</div> </div>
); );
} }
function TabBar() { function TabBar({ workspace }: { workspace: Workspace }) {
const [tabData, setTabData] = jotai.useAtom(atoms.tabsAtom);
const [activeTab, setActiveTab] = jotai.useAtom(atoms.activeTabId);
const tabs = jotai.useAtomValue(atoms.tabsAtom);
function handleAddTab() { function handleAddTab() {
const newTabId = uuidv4(); const newTabName = `Tab-${workspace.tabids.length + 1}`;
const newTabName = "Tab " + (tabData.length + 1); WOS.AddTabToWorkspace(newTabName, true);
setTabData([...tabData, { name: newTabName, tabid: newTabId, blockIds: [] }]);
setActiveTab(newTabId);
} }
const tabIds = workspace?.tabids ?? [];
return ( return (
<div className="tab-bar"> <div className="tab-bar">
{tabs.map((tab, idx) => { {tabIds.map((tabid, idx) => {
return <Tab key={idx} tab={tab} />; return <Tab key={idx} tabId={tabid} />;
})} })}
<div className="tab-add" onClick={() => handleAddTab()}> <div className="tab-add" onClick={() => handleAddTab()}>
<i className="fa fa-solid fa-plus fa-fw" /> <i className="fa fa-solid fa-plus fa-fw" />
@ -45,14 +54,12 @@ function TabBar() {
} }
function Widgets() { function Widgets() {
const activeTabId = jotai.useAtomValue(atoms.activeTabId); const windowData = jotai.useAtomValue(atoms.waveWindow);
const activeTabId = windowData.activetabid;
async function createBlock(blockDef: BlockDef) { async function createBlock(blockDef: BlockDef) {
const rtOpts = { termsize: { rows: 25, cols: 80 } }; const rtOpts: RuntimeOpts = { termsize: { rows: 25, cols: 80 } };
const rtnBlock: BlockData = (await BlockService.CreateBlock(blockDef, rtOpts)) as BlockData; await WOS.CreateBlock(blockDef, rtOpts);
const newBlockAtom = jotai.atom(rtnBlock);
blockDataMap.set(rtnBlock.blockid, newBlockAtom);
addBlockIdToTab(activeTabId, rtnBlock.blockid);
} }
async function clickTerminal() { async function clickTerminal() {
@ -72,7 +79,7 @@ function Widgets() {
} }
async function clickPlot() { async function clickPlot() {
const plotDef = { const plotDef: BlockDef = {
view: "plot", view: "plot",
}; };
createBlock(plotDef); createBlock(plotDef);
@ -105,17 +112,25 @@ function Widgets() {
); );
} }
function Workspace() { function WorkspaceElem() {
const activeTabId = jotai.useAtomValue(atoms.activeTabId); const windowData = jotai.useAtomValue(atoms.waveWindow);
const activeTabId = windowData?.activetabid;
const ws = jotai.useAtomValue(atoms.workspace);
return ( return (
<div className="workspace"> <div className="workspace">
<TabBar /> <TabBar workspace={ws} />
<div className="workspace-tabcontent"> <div className="workspace-tabcontent">
<TabContent key={activeTabId} tabId={activeTabId} /> {activeTabId == "" ? (
<CenteredDiv>No Active Tab</CenteredDiv>
) : (
<>
<TabContent key={windowData.workspaceid} tabId={activeTabId} />
<Widgets /> <Widgets />
</>
)}
</div> </div>
</div> </div>
); );
} }
export { Workspace }; export { WorkspaceElem as Workspace };

View File

@ -2,21 +2,42 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
declare global { declare global {
type MetaDataType = Record<string, any>; type UIContext = {
windowid: string;
type TabData = { activetabid: string;
name: string;
tabid: string;
blockIds: string[];
}; };
type BlockData = { type ORef = {
blockid: string; otype: string;
oid: string;
};
type WaveObj = {
otype: string;
oid: string;
version: number;
};
type WaveObjUpdate = {
updatetype: "update" | "delete";
otype: string;
oid: string;
obj?: WaveObj;
};
type Block = WaveObj & {
blockdef: BlockDef; blockdef: BlockDef;
controller: string; controller: string;
controllerstatus: string;
view: string; view: string;
meta?: MetaDataType; meta?: { [key: string]: any };
runtimeopts?: RuntimeOpts;
};
type BlockDef = {
controller?: string;
view?: string;
files?: { [key: string]: FileDef };
meta?: { [key: string]: any };
}; };
type FileDef = { type FileDef = {
@ -24,14 +45,62 @@ declare global {
path?: string; path?: string;
url?: string; url?: string;
content?: string; content?: string;
meta?: MetaDataType; meta?: { [key: string]: any };
}; };
type BlockDef = { type TermSize = {
controller?: string; rows: number;
view: string; cols: number;
files?: FileDef[]; };
meta?: MetaDataType;
type Client = {
otype: string;
oid: string;
version: number;
mainwindowid: string;
};
type Tab = {
otype: string;
oid: string;
version: number;
name: string;
blockids: string[];
};
type Point = {
x: number;
y: number;
};
type WinSize = {
width: number;
height: number;
};
type Workspace = {
otype: string;
oid: string;
version: number;
name: string;
tabids: string[];
};
type RuntimeOpts = {
termsize?: TermSize;
winsize?: WinSize;
};
type WaveWindow = {
otype: string;
oid: string;
version: number;
workspaceid: string;
activetabid: string;
activeblockmap: { [key: string]: string };
pos: Point;
winsize: WinSize;
lastfocusts: number;
}; };
} }

View File

@ -5,14 +5,34 @@ import * as React from "react";
import { createRoot } from "react-dom/client"; import { createRoot } from "react-dom/client";
import { App } from "./app/app"; import { App } from "./app/app";
import { loadFonts } from "./util/fontutil"; import { loadFonts } from "./util/fontutil";
import { ClientService } from "@/bindings/clientservice";
import { Client } from "@/gopkg/wstore";
import { globalStore, atoms } from "@/store/global";
import * as WOS from "@/store/wos";
import * as wailsRuntime from "@wailsio/runtime";
import * as wstore from "@/gopkg/wstore";
import * as gdata from "@/store/global";
import { immerable } from "immer";
const urlParams = new URLSearchParams(window.location.search);
const windowId = urlParams.get("windowid");
const clientId = urlParams.get("clientid");
loadFonts(); loadFonts();
document.addEventListener("DOMContentLoaded", () => { console.log("Wave Starting");
document.addEventListener("DOMContentLoaded", async () => {
console.log("DOMContentLoaded");
// ensures client/window are loaded into the cache before rendering
await WOS.loadAndPinWaveObject<Client>(WOS.makeORef("client", clientId));
const waveWindow = await WOS.loadAndPinWaveObject<WaveWindow>(WOS.makeORef("window", windowId));
await WOS.loadAndPinWaveObject<Workspace>(WOS.makeORef("workspace", waveWindow.workspaceid));
let reactElem = React.createElement(App, null, null); let reactElem = React.createElement(App, null, null);
let elem = document.getElementById("main"); let elem = document.getElementById("main");
let root = createRoot(elem); let root = createRoot(elem);
document.fonts.ready.then(() => { document.fonts.ready.then(() => {
console.log("Wave First Render");
root.render(reactElem); root.render(reactElem);
}); });
}); });

1
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/google/uuid v1.4.0 github.com/google/uuid v1.4.0
github.com/jmoiron/sqlx v1.4.0 github.com/jmoiron/sqlx v1.4.0
github.com/mattn/go-sqlite3 v1.14.22 github.com/mattn/go-sqlite3 v1.14.22
github.com/mitchellh/mapstructure v1.5.0
github.com/sawka/txwrap v0.2.0 github.com/sawka/txwrap v0.2.0
github.com/wailsapp/wails/v3 v3.0.0-alpha.0 github.com/wailsapp/wails/v3 v3.0.0-alpha.0
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94

2
go.sum
View File

@ -91,6 +91,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=

61
main.go
View File

@ -6,17 +6,23 @@ package main
// Note, main.go needs to be in the root of the project for the go:embed directive to work. // Note, main.go needs to be in the root of the project for the go:embed directive to work.
import ( import (
"context"
"embed" "embed"
"fmt"
"log" "log"
"net/http" "net/http"
"runtime" "runtime"
"strings" "strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockstore" "github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/service/blockservice" "github.com/wavetermdev/thenextwave/pkg/service/blockservice"
"github.com/wavetermdev/thenextwave/pkg/service/clientservice"
"github.com/wavetermdev/thenextwave/pkg/service/fileservice" "github.com/wavetermdev/thenextwave/pkg/service/fileservice"
"github.com/wavetermdev/thenextwave/pkg/service/objectservice"
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wstore"
"github.com/wailsapp/wails/v3/pkg/application" "github.com/wailsapp/wails/v3/pkg/application"
"github.com/wailsapp/wails/v3/pkg/events" "github.com/wailsapp/wails/v3/pkg/events"
@ -32,10 +38,10 @@ func createAppMenu(app *application.App) *application.Menu {
menu := application.NewMenu() menu := application.NewMenu()
menu.AddRole(application.AppMenu) menu.AddRole(application.AppMenu)
fileMenu := menu.AddSubmenu("File") fileMenu := menu.AddSubmenu("File")
newWindow := fileMenu.Add("New Window") // newWindow := fileMenu.Add("New Window")
newWindow.OnClick(func(appContext *application.Context) { // newWindow.OnClick(func(appContext *application.Context) {
createWindow(app) // createWindow(app)
}) // })
closeWindow := fileMenu.Add("Close Window") closeWindow := fileMenu.Add("Close Window")
closeWindow.OnClick(func(appContext *application.Context) { closeWindow.OnClick(func(appContext *application.Context) {
app.CurrentWindow().Close() app.CurrentWindow().Close()
@ -47,7 +53,11 @@ func createAppMenu(app *application.App) *application.Menu {
return menu return menu
} }
func createWindow(app *application.App) { func createWindow(windowData *wstore.Window, app *application.App) {
client, err := wstore.DBGetSingleton[*wstore.Client](context.Background())
if err != nil {
panic(fmt.Errorf("error getting client data: %w", err))
}
window := app.NewWebviewWindowWithOptions(application.WebviewWindowOptions{ window := app.NewWebviewWindowWithOptions(application.WebviewWindowOptions{
Title: "Wave Terminal", Title: "Wave Terminal",
Mac: application.MacWindow{ Mac: application.MacWindow{
@ -55,13 +65,18 @@ func createWindow(app *application.App) {
Backdrop: application.MacBackdropTranslucent, Backdrop: application.MacBackdropTranslucent,
TitleBar: application.MacTitleBarHiddenInset, TitleBar: application.MacTitleBarHiddenInset,
}, },
BackgroundColour: application.NewRGB(27, 38, 54), BackgroundColour: application.NewRGB(0, 0, 0),
URL: "/public/index.html", URL: "/public/index.html?windowid=" + windowData.OID + "&clientid=" + client.OID,
X: windowData.Pos.X,
Y: windowData.Pos.Y,
Width: windowData.WinSize.Width,
Height: windowData.WinSize.Height,
}) })
eventbus.RegisterWailsWindow(window) eventbus.RegisterWailsWindow(window, windowData.OID)
window.On(events.Common.WindowClosing, func(event *application.WindowEvent) { window.On(events.Common.WindowClosing, func(event *application.WindowEvent) {
eventbus.UnregisterWailsWindow(window.ID()) eventbus.UnregisterWailsWindow(window.ID())
}) })
window.Show()
} }
type waveAssetHandler struct { type waveAssetHandler struct {
@ -104,6 +119,16 @@ func main() {
log.Printf("error initializing blockstore: %v\n", err) log.Printf("error initializing blockstore: %v\n", err)
return return
} }
err = wstore.InitWStore()
if err != nil {
log.Printf("error initializing wstore: %v\n", err)
return
}
err = wstore.EnsureInitialData()
if err != nil {
log.Printf("error ensuring initial data: %v\n", err)
return
}
app := application.New(application.Options{ app := application.New(application.Options{
Name: "NextWave", Name: "NextWave",
@ -111,6 +136,8 @@ func main() {
Services: []application.Service{ Services: []application.Service{
application.NewService(&fileservice.FileService{}), application.NewService(&fileservice.FileService{}),
application.NewService(&blockservice.BlockService{}), application.NewService(&blockservice.BlockService{}),
application.NewService(&clientservice.ClientService{}),
application.NewService(&objectservice.ObjectService{}),
}, },
Icon: appIcon, Icon: appIcon,
Assets: application.AssetOptions{ Assets: application.AssetOptions{
@ -124,7 +151,23 @@ func main() {
app.SetMenu(menu) app.SetMenu(menu)
eventbus.RegisterWailsApp(app) eventbus.RegisterWailsApp(app)
createWindow(app) setupCtx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
client, err := wstore.DBGetSingleton[*wstore.Client](setupCtx)
if err != nil {
log.Printf("error getting client data: %v\n", err)
return
}
mainWindow, err := wstore.DBGet[*wstore.Window](setupCtx, client.MainWindowId)
if err != nil {
log.Printf("error getting main window: %v\n", err)
return
}
if mainWindow == nil {
log.Printf("no main window data\n")
return
}
createWindow(mainWindow, app)
eventbus.Start() eventbus.Start()
defer eventbus.Shutdown() defer eventbus.Shutdown()

View File

@ -4,15 +4,16 @@
package blockcontroller package blockcontroller
import ( import (
"context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log" "log"
"sync" "sync"
"time"
"github.com/creack/pty" "github.com/creack/pty"
"github.com/google/uuid"
"github.com/wailsapp/wails/v3/pkg/application" "github.com/wailsapp/wails/v3/pkg/application"
"github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/shellexec"
@ -24,6 +25,8 @@ const (
BlockController_Cmd = "cmd" BlockController_Cmd = "cmd"
) )
const DefaultTimeout = 2 * time.Second
var globalLock = &sync.Mutex{} var globalLock = &sync.Mutex{}
var blockControllerMap = make(map[string]*BlockController) var blockControllerMap = make(map[string]*BlockController)
@ -32,11 +35,18 @@ type BlockController struct {
BlockId string BlockId string
BlockDef *wstore.BlockDef BlockDef *wstore.BlockDef
InputCh chan BlockCommand InputCh chan BlockCommand
Status string
ShellProc *shellexec.ShellProc ShellProc *shellexec.ShellProc
ShellInputCh chan *InputCommand ShellInputCh chan *InputCommand
} }
func (bc *BlockController) WithLock(f func()) {
bc.Lock.Lock()
defer bc.Lock.Unlock()
f()
}
func jsonDeepCopy(val map[string]any) (map[string]any, error) { func jsonDeepCopy(val map[string]any) (map[string]any, error) {
barr, err := json.Marshal(val) barr, err := json.Marshal(val)
if err != nil { if err != nil {
@ -50,38 +60,6 @@ func jsonDeepCopy(val map[string]any) (map[string]any, error) {
return rtn, nil return rtn, nil
} }
func CreateBlock(bdef *wstore.BlockDef, rtOpts *wstore.RuntimeOpts) (*wstore.Block, error) {
blockId := uuid.New().String()
blockData := &wstore.Block{
Lock: &sync.Mutex{},
BlockId: blockId,
BlockDef: bdef,
Controller: bdef.Controller,
View: bdef.View,
RuntimeOpts: rtOpts,
}
var err error
blockData.Meta, err = jsonDeepCopy(bdef.Meta)
if err != nil {
return nil, fmt.Errorf("error copying meta: %w", err)
}
wstore.BlockMap.Set(blockId, blockData)
if blockData.Controller != "" {
StartBlockController(blockId, blockData)
}
return blockData, nil
}
func CloseBlock(blockId string) {
bc := GetBlockController(blockId)
if bc == nil {
return
}
bc.Close()
close(bc.InputCh)
wstore.BlockMap.Delete(blockId)
}
func (bc *BlockController) setShellProc(shellProc *shellexec.ShellProc) error { func (bc *BlockController) setShellProc(shellProc *shellexec.ShellProc) error {
bc.Lock.Lock() bc.Lock.Lock()
defer bc.Lock.Unlock() defer bc.Lock.Unlock()
@ -179,10 +157,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error {
func (bc *BlockController) Run(bdata *wstore.Block) { func (bc *BlockController) Run(bdata *wstore.Block) {
defer func() { defer func() {
bdata.WithLock(func() { bc.WithLock(func() {
// if the controller had an error status, don't change it // if the controller had an error status, don't change it
if bdata.ControllerStatus == "running" { if bc.Status == "running" {
bdata.ControllerStatus = "done" bc.Status = "done"
} }
}) })
eventbus.SendEvent(application.WailsEvent{ eventbus.SendEvent(application.WailsEvent{
@ -193,8 +171,8 @@ func (bc *BlockController) Run(bdata *wstore.Block) {
defer globalLock.Unlock() defer globalLock.Unlock()
delete(blockControllerMap, bc.BlockId) delete(blockControllerMap, bc.BlockId)
}() }()
bdata.WithLock(func() { bc.WithLock(func() {
bdata.ControllerStatus = "running" bc.Status = "running"
}) })
// only controller is "shell" for now // only controller is "shell" for now
@ -218,26 +196,42 @@ func (bc *BlockController) Run(bdata *wstore.Block) {
} }
} }
func StartBlockController(blockId string, bdata *wstore.Block) { func StartBlockController(ctx context.Context, blockId string) error {
if bdata.Controller != BlockController_Shell { blockData, err := wstore.DBMustGet[*wstore.Block](ctx, blockId)
log.Printf("unknown controller %q\n", bdata.Controller) if err != nil {
bdata.WithLock(func() { return fmt.Errorf("error getting block: %w", err)
bdata.ControllerStatus = "error" }
}) if blockData.Controller == "" {
return // nothing to start
return nil
}
if blockData.Controller != BlockController_Shell {
return fmt.Errorf("unknown controller %q", blockData.Controller)
} }
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
if _, ok := blockControllerMap[blockId]; ok { if _, ok := blockControllerMap[blockId]; ok {
return // already running
return nil
} }
bc := &BlockController{ bc := &BlockController{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
BlockId: blockId, BlockId: blockId,
Status: "init",
InputCh: make(chan BlockCommand), InputCh: make(chan BlockCommand),
} }
blockControllerMap[blockId] = bc blockControllerMap[blockId] = bc
go bc.Run(bdata) go bc.Run(blockData)
return nil
}
func StopBlockController(blockId string) {
bc := GetBlockController(blockId)
if bc == nil {
return
}
bc.Close()
close(bc.InputCh)
} }
func GetBlockController(blockId string) *BlockController { func GetBlockController(blockId string) *BlockController {
@ -246,23 +240,34 @@ func GetBlockController(blockId string) *BlockController {
return blockControllerMap[blockId] return blockControllerMap[blockId]
} }
func ProcessStaticCommand(blockId string, cmdGen BlockCommand) { func ProcessStaticCommand(blockId string, cmdGen BlockCommand) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
switch cmd := cmdGen.(type) { switch cmd := cmdGen.(type) {
case *MessageCommand: case *MessageCommand:
log.Printf("MESSAGE: %s | %q\n", blockId, cmd.Message) log.Printf("MESSAGE: %s | %q\n", blockId, cmd.Message)
return nil
case *SetViewCommand: case *SetViewCommand:
log.Printf("SETVIEW: %s | %q\n", blockId, cmd.View) log.Printf("SETVIEW: %s | %q\n", blockId, cmd.View)
block := wstore.BlockMap.Get(blockId) block, err := wstore.DBGet[*wstore.Block](ctx, blockId)
if block != nil { if err != nil {
block.WithLock(func() { return fmt.Errorf("error getting block: %w", err)
block.View = cmd.View
})
} }
block.View = cmd.View
err = wstore.DBUpdate(ctx, block)
if err != nil {
return fmt.Errorf("error updating block: %w", err)
}
return nil
case *SetMetaCommand: case *SetMetaCommand:
log.Printf("SETMETA: %s | %v\n", blockId, cmd.Meta) log.Printf("SETMETA: %s | %v\n", blockId, cmd.Meta)
block := wstore.BlockMap.Get(blockId) block, err := wstore.DBGet[*wstore.Block](ctx, blockId)
if block != nil { if err != nil {
block.WithLock(func() { return fmt.Errorf("error getting block: %w", err)
}
if block == nil {
return nil
}
for k, v := range cmd.Meta { for k, v := range cmd.Meta {
if v == nil { if v == nil {
delete(block.Meta, k) delete(block.Meta, k)
@ -270,7 +275,12 @@ func ProcessStaticCommand(blockId string, cmdGen BlockCommand) {
} }
block.Meta[k] = v block.Meta[k] = v
} }
}) err = wstore.DBUpdate(ctx, block)
} if err != nil {
return fmt.Errorf("error updating block: %w", err)
}
return nil
default:
return fmt.Errorf("unknown command type %T", cmdGen)
} }
} }

View File

@ -13,11 +13,9 @@ import (
"path" "path"
"time" "time"
"github.com/wavetermdev/thenextwave/pkg/util/migrateutil"
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/golang-migrate/migrate/v4"
sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/sawka/txwrap" "github.com/sawka/txwrap"
@ -40,7 +38,7 @@ func InitBlockstore() error {
if err != nil { if err != nil {
return err return err
} }
err = MigrateBlockstore() err = migrateutil.Migrate("blockstore", globalDB.DB, dbfs.BlockstoreMigrationFS, "migrations-blockstore")
if err != nil { if err != nil {
return err return err
} }
@ -79,61 +77,3 @@ func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error {
func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) { func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) {
return txwrap.WithTxRtn(ctx, globalDB, fn) return txwrap.WithTxRtn(ctx, globalDB, fn)
} }
func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
fsVar, err := iofs.New(dbfs.BlockstoreMigrationFS, "migrations-blockstore")
if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err)
}
mdriver, err := sqlite3migrate.WithInstance(globalDB.DB, &sqlite3migrate.Config{})
if err != nil {
return nil, fmt.Errorf("making blockstore migration driver: %w", err)
}
m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver)
if err != nil {
return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err)
}
return m, nil
}
func MigrateBlockstore() error {
log.Printf("migrate blockstore\n")
m, err := MakeBlockstoreMigrate()
if err != nil {
return err
}
curVersion, dirty, err := GetMigrateVersion(m)
if dirty {
return fmt.Errorf("cannot migrate up, database is dirty")
}
if err != nil {
return fmt.Errorf("cannot get current migration version: %v", err)
}
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating blockstore: %w", err)
}
newVersion, _, err := GetMigrateVersion(m)
if err != nil {
return fmt.Errorf("cannot get new migration version: %v", err)
}
if newVersion != curVersion {
log.Printf("[db] blockstore migration done, version %d -> %d\n", curVersion, newVersion)
}
return nil
}
func GetMigrateVersion(m *migrate.Migrate) (uint, bool, error) {
if m == nil {
var err error
m, err = MakeBlockstoreMigrate()
if err != nil {
return 0, false, err
}
}
curVersion, dirty, err := m.Version()
if err == migrate.ErrNilVersion {
return 0, false, nil
}
return curVersion, dirty, err
}

View File

@ -5,11 +5,13 @@ package eventbus
import ( import (
"errors" "errors"
"fmt"
"log" "log"
"runtime/debug" "runtime/debug"
"sync" "sync"
"github.com/wailsapp/wails/v3/pkg/application" "github.com/wailsapp/wails/v3/pkg/application"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
) )
const EventBufferSize = 50 const EventBufferSize = 50
@ -24,9 +26,16 @@ type WindowEvent struct {
Event application.WailsEvent Event application.WailsEvent
} }
type WindowWatchData struct {
Window *application.WebviewWindow
WaveWindowId string
WailsWindowId uint
WatchedORefs map[waveobj.ORef]bool
}
var globalLock = &sync.Mutex{} var globalLock = &sync.Mutex{}
var wailsApp *application.App var wailsApp *application.App
var wailsWindowMap = make(map[uint]*application.WebviewWindow) var wailsWindowMap = make(map[uint]*WindowWatchData)
func Start() { func Start() {
go processEvents() go processEvents()
@ -42,10 +51,18 @@ func RegisterWailsApp(app *application.App) {
wailsApp = app wailsApp = app
} }
func RegisterWailsWindow(window *application.WebviewWindow) { func RegisterWailsWindow(window *application.WebviewWindow, windowId string) {
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
wailsWindowMap[window.ID()] = window if _, found := wailsWindowMap[window.ID()]; found {
panic(fmt.Errorf("wails window already registered with eventbus: %d", window.ID()))
}
wailsWindowMap[window.ID()] = &WindowWatchData{
Window: window,
WailsWindowId: window.ID(),
WaveWindowId: "",
WatchedORefs: make(map[waveobj.ORef]bool),
}
} }
func UnregisterWailsWindow(windowId uint) { func UnregisterWailsWindow(windowId uint) {
@ -56,18 +73,18 @@ func UnregisterWailsWindow(windowId uint) {
func emitEventToWindow(event WindowEvent) { func emitEventToWindow(event WindowEvent) {
globalLock.Lock() globalLock.Lock()
window := wailsWindowMap[event.WindowId] wdata := wailsWindowMap[event.WindowId]
globalLock.Unlock() globalLock.Unlock()
if window != nil { if wdata != nil {
window.DispatchWailsEvent(&event.Event) wdata.Window.DispatchWailsEvent(&event.Event)
} }
} }
func emitEventToAllWindows(event *application.WailsEvent) { func emitEventToAllWindows(event *application.WailsEvent) {
globalLock.Lock() globalLock.Lock()
wins := make([]*application.WebviewWindow, 0, len(wailsWindowMap)) wins := make([]*application.WebviewWindow, 0, len(wailsWindowMap))
for _, window := range wailsWindowMap { for _, wdata := range wailsWindowMap {
wins = append(wins, window) wins = append(wins, wdata.Window)
} }
globalLock.Unlock() globalLock.Unlock()
for _, window := range wins { for _, window := range wins {
@ -79,6 +96,25 @@ func SendEvent(event application.WailsEvent) {
EventCh <- event EventCh <- event
} }
func findWindowIdsByORef(oref waveobj.ORef) []uint {
globalLock.Lock()
defer globalLock.Unlock()
var ids []uint
for _, wdata := range wailsWindowMap {
if wdata.WatchedORefs[oref] {
ids = append(ids, wdata.WailsWindowId)
}
}
return ids
}
func SendORefEvent(oref waveobj.ORef, event application.WailsEvent) {
wins := findWindowIdsByORef(oref)
for _, windowId := range wins {
SendWindowEvent(windowId, event)
}
}
func SendEventNonBlocking(event application.WailsEvent) error { func SendEventNonBlocking(event application.WailsEvent) error {
select { select {
case EventCh <- event: case EventCh <- event:

View File

@ -6,52 +6,14 @@ package blockservice
import ( import (
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller" "github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wstore"
) )
type BlockService struct{} type BlockService struct{}
func (bs *BlockService) CreateBlock(bdefMap map[string]any, rtOptsMap map[string]any) (map[string]any, error) { const DefaultTimeout = 2 * time.Second
var bdef wstore.BlockDef
err := utilfn.JsonMapToStruct(bdefMap, &bdef)
if err != nil {
return nil, fmt.Errorf("error unmarshalling BlockDef: %w", err)
}
var rtOpts wstore.RuntimeOpts
err = utilfn.JsonMapToStruct(rtOptsMap, &rtOpts)
if err != nil {
return nil, fmt.Errorf("error unmarshalling RuntimeOpts: %w", err)
}
blockData, err := blockcontroller.CreateBlock(&bdef, &rtOpts)
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
rtnMap, err := utilfn.StructToJsonMap(blockData)
if err != nil {
return nil, fmt.Errorf("error marshalling BlockData: %w", err)
}
return rtnMap, nil
}
func (bs *BlockService) CloseBlock(blockId string) {
blockcontroller.CloseBlock(blockId)
}
func (bs *BlockService) GetBlockData(blockId string) (map[string]any, error) {
blockData := wstore.BlockMap.Get(blockId)
if blockData == nil {
return nil, nil
}
rtnMap, err := utilfn.StructToJsonMap(blockData)
if err != nil {
return nil, fmt.Errorf("error marshalling BlockData: %w", err)
}
return rtnMap, nil
}
func (bs *BlockService) SendCommand(blockId string, cmdMap map[string]any) error { func (bs *BlockService) SendCommand(blockId string, cmdMap map[string]any) error {
cmd, err := blockcontroller.ParseCmdMap(cmdMap) cmd, err := blockcontroller.ParseCmdMap(cmdMap)

View File

@ -0,0 +1,56 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package clientservice
import (
"context"
"fmt"
"time"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
type ClientService struct{}
const DefaultTimeout = 2 * time.Second
func (cs *ClientService) GetClientData() (*wstore.Client, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
clientData, err := wstore.DBGetSingleton[*wstore.Client](ctx)
if err != nil {
return nil, fmt.Errorf("error getting client data: %w", err)
}
return clientData, nil
}
func (cs *ClientService) GetWorkspace(workspaceId string) (*wstore.Workspace, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ws, err := wstore.DBGet[*wstore.Workspace](ctx, workspaceId)
if err != nil {
return nil, fmt.Errorf("error getting workspace: %w", err)
}
return ws, nil
}
func (cs *ClientService) GetTab(tabId string) (*wstore.Tab, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
tab, err := wstore.DBGet[*wstore.Tab](ctx, tabId)
if err != nil {
return nil, fmt.Errorf("error getting tab: %w", err)
}
return tab, nil
}
func (cs *ClientService) GetWindow(windowId string) (*wstore.Window, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
window, err := wstore.DBGet[*wstore.Window](ctx, windowId)
if err != nil {
return nil, fmt.Errorf("error getting window: %w", err)
}
return window, nil
}

View File

@ -0,0 +1,181 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package objectservice
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
type ObjectService struct{}
const DefaultTimeout = 2 * time.Second
func parseORef(oref string) (*waveobj.ORef, error) {
fields := strings.Split(oref, ":")
if len(fields) != 2 {
return nil, fmt.Errorf("invalid object reference: %q", oref)
}
return &waveobj.ORef{OType: fields[0], OID: fields[1]}, nil
}
func (svc *ObjectService) GetObject(orefStr string) (any, error) {
oref, err := parseORef(orefStr)
if err != nil {
return nil, err
}
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
obj, err := wstore.DBGetORef(ctx, *oref)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
rtn, err := waveobj.ToJsonMap(obj)
return rtn, err
}
func (svc *ObjectService) GetObjects(orefStrArr []string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
var orefArr []waveobj.ORef
for _, orefStr := range orefStrArr {
orefObj, err := parseORef(orefStr)
if err != nil {
return nil, err
}
orefArr = append(orefArr, *orefObj)
}
return wstore.DBSelectORefs(ctx, orefArr)
}
func updatesRtn(ctx context.Context, rtnVal map[string]any) (any, error) {
updates := wstore.ContextGetUpdates(ctx)
if len(updates) == 0 {
return nil, nil
}
updateArr := make([]wstore.WaveObjUpdate, 0, len(updates))
for _, update := range updates {
updateArr = append(updateArr, update)
}
jval, err := json.Marshal(updateArr)
if err != nil {
return nil, fmt.Errorf("error converting updates to JSON: %w", err)
}
if rtnVal == nil {
rtnVal = make(map[string]any)
}
rtnVal["updates"] = json.RawMessage(jval)
return rtnVal, nil
}
func (svc *ObjectService) AddTabToWorkspace(uiContext wstore.UIContext, tabName string, activateTab bool) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ctx = wstore.ContextWithUpdates(ctx)
windowData, err := wstore.DBMustGet[*wstore.Window](ctx, uiContext.WindowId)
if err != nil {
return nil, fmt.Errorf("error getting window: %w", err)
}
tab, err := wstore.CreateTab(ctx, windowData.WorkspaceId, tabName)
if err != nil {
return nil, fmt.Errorf("error creating tab: %w", err)
}
if activateTab {
err = wstore.SetActiveTab(ctx, uiContext.WindowId, tab.OID)
if err != nil {
return nil, fmt.Errorf("error setting active tab: %w", err)
}
}
rtn := make(map[string]any)
rtn["tabid"] = waveobj.GetOID(tab)
return updatesRtn(ctx, rtn)
}
func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ctx = wstore.ContextWithUpdates(ctx)
err := wstore.SetActiveTab(ctx, uiContext.WindowId, tabId)
if err != nil {
return nil, fmt.Errorf("error setting active tab: %w", err)
}
return updatesRtn(ctx, nil)
}
func (svc *ObjectService) CreateBlock(uiContext wstore.UIContext, blockDef *wstore.BlockDef, rtOpts *wstore.RuntimeOpts) (any, error) {
if uiContext.ActiveTabId == "" {
return nil, fmt.Errorf("no active tab")
}
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ctx = wstore.ContextWithUpdates(ctx)
blockData, err := wstore.CreateBlock(ctx, uiContext.ActiveTabId, blockDef, rtOpts)
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
if blockData.Controller != "" {
err = blockcontroller.StartBlockController(ctx, blockData.OID)
if err != nil {
return nil, fmt.Errorf("error starting block controller: %w", err)
}
}
rtn := make(map[string]any)
rtn["blockid"] = blockData.OID
return updatesRtn(ctx, rtn)
}
func (svc *ObjectService) DeleteBlock(uiContext wstore.UIContext, blockId string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ctx = wstore.ContextWithUpdates(ctx)
err := wstore.DeleteBlock(ctx, uiContext.ActiveTabId, blockId)
if err != nil {
return nil, fmt.Errorf("error deleting block: %w", err)
}
blockcontroller.StopBlockController(blockId)
return updatesRtn(ctx, nil)
}
func (svc *ObjectService) CloseTab(uiContext wstore.UIContext, tabId string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
ctx = wstore.ContextWithUpdates(ctx)
window, err := wstore.DBMustGet[*wstore.Window](ctx, uiContext.WindowId)
if err != nil {
return nil, fmt.Errorf("error getting window: %w", err)
}
tab, err := wstore.DBMustGet[*wstore.Tab](ctx, tabId)
if err != nil {
return nil, fmt.Errorf("error getting tab: %w", err)
}
for _, blockId := range tab.BlockIds {
blockcontroller.StopBlockController(blockId)
}
err = wstore.CloseTab(ctx, window.WorkspaceId, tabId)
if err != nil {
return nil, fmt.Errorf("error closing tab: %w", err)
}
if window.ActiveTabId == tabId {
ws, err := wstore.DBMustGet[*wstore.Workspace](ctx, window.WorkspaceId)
if err != nil {
return nil, fmt.Errorf("error getting workspace: %w", err)
}
var newActiveTabId string
if len(ws.TabIds) > 0 {
newActiveTabId = ws.TabIds[0]
} else {
newActiveTabId = ""
}
wstore.SetActiveTab(ctx, uiContext.WindowId, newActiveTabId)
}
return updatesRtn(ctx, nil)
}

View File

@ -0,0 +1,67 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package migrateutil
import (
"database/sql"
"fmt"
"io/fs"
"log"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/source/iofs"
sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3"
)
func GetMigrateVersion(m *migrate.Migrate) (uint, bool, error) {
curVersion, dirty, err := m.Version()
if err == migrate.ErrNilVersion {
return 0, false, nil
}
return curVersion, dirty, err
}
func MakeMigrate(storeName string, db *sql.DB, migrationFS fs.FS, migrationsName string) (*migrate.Migrate, error) {
fsVar, err := iofs.New(migrationFS, migrationsName)
if err != nil {
return nil, fmt.Errorf("opening fs: %w", err)
}
mdriver, err := sqlite3migrate.WithInstance(db, &sqlite3migrate.Config{})
if err != nil {
return nil, fmt.Errorf("making %s migration driver: %w", storeName, err)
}
m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver)
if err != nil {
return nil, fmt.Errorf("making %s migration: %w", storeName, err)
}
return m, nil
}
func Migrate(storeName string, db *sql.DB, migrationFS fs.FS, migrationsName string) error {
log.Printf("migrate %s\n", storeName)
m, err := MakeMigrate(storeName, db, migrationFS, migrationsName)
if err != nil {
return err
}
curVersion, dirty, err := GetMigrateVersion(m)
if dirty {
return fmt.Errorf("%s, migrate up, database is dirty", storeName)
}
if err != nil {
return fmt.Errorf("%s, cannot get current migration version: %v", storeName, err)
}
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating %s: %w", storeName, err)
}
newVersion, _, err := GetMigrateVersion(m)
if err != nil {
return fmt.Errorf("%s, cannot get new migration version: %v", storeName, err)
}
if newVersion != curVersion {
log.Printf("[db] %s migration done, version %d -> %d\n", storeName, curVersion, newVersion)
}
return nil
}

346
pkg/waveobj/waveobj.go Normal file
View File

@ -0,0 +1,346 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package waveobj
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"github.com/mitchellh/mapstructure"
)
const (
OTypeKeyName = "otype"
OIDKeyName = "oid"
VersionKeyName = "version"
OIDGoFieldName = "OID"
VersionGoFieldName = "Version"
)
type ORef struct {
OType string `json:"otype"`
OID string `json:"oid"`
}
type WaveObj interface {
GetOType() string // should not depend on object state (should work with nil value)
}
type waveObjDesc struct {
RType reflect.Type
OIDField reflect.StructField
VersionField reflect.StructField
}
var waveObjMap = sync.Map{}
var waveObjRType = reflect.TypeOf((*WaveObj)(nil)).Elem()
func RegisterType(rtype reflect.Type) {
if rtype.Kind() != reflect.Ptr {
panic(fmt.Sprintf("wave object must be a pointer for %v", rtype))
}
if !rtype.Implements(waveObjRType) {
panic(fmt.Sprintf("wave object must implement WaveObj for %v", rtype))
}
waveObj := reflect.Zero(rtype).Interface().(WaveObj)
otype := waveObj.GetOType()
if otype == "" {
panic(fmt.Sprintf("otype is empty for %v", rtype))
}
oidField, found := rtype.Elem().FieldByName(OIDGoFieldName)
if !found {
panic(fmt.Sprintf("missing OID field for %v", rtype))
}
if oidField.Type.Kind() != reflect.String {
panic(fmt.Sprintf("OID field must be string for %v", rtype))
}
if oidField.Tag.Get("json") != OIDKeyName {
panic(fmt.Sprintf("OID field json tag must be %q for %v", OIDKeyName, rtype))
}
versionField, found := rtype.Elem().FieldByName(VersionGoFieldName)
if !found {
panic(fmt.Sprintf("missing Version field for %v", rtype))
}
if versionField.Type.Kind() != reflect.Int {
panic(fmt.Sprintf("Version field must be int for %v", rtype))
}
if versionField.Tag.Get("json") != VersionKeyName {
panic(fmt.Sprintf("Version field json tag must be %q for %v", VersionKeyName, rtype))
}
_, found = waveObjMap.Load(otype)
if found {
panic(fmt.Sprintf("otype %q already registered", otype))
}
waveObjMap.Store(otype, &waveObjDesc{
RType: rtype,
OIDField: oidField,
VersionField: versionField,
})
}
func getWaveObjDesc(otype string) *waveObjDesc {
desc, _ := waveObjMap.Load(otype)
if desc == nil {
return nil
}
return desc.(*waveObjDesc)
}
func GetOID(waveObj WaveObj) string {
desc := getWaveObjDesc(waveObj.GetOType())
if desc == nil {
return ""
}
return reflect.ValueOf(waveObj).Elem().FieldByIndex(desc.OIDField.Index).String()
}
func SetOID(waveObj WaveObj, oid string) {
desc := getWaveObjDesc(waveObj.GetOType())
if desc == nil {
return
}
reflect.ValueOf(waveObj).Elem().FieldByIndex(desc.OIDField.Index).SetString(oid)
}
func GetVersion(waveObj WaveObj) int {
desc := getWaveObjDesc(waveObj.GetOType())
if desc == nil {
return 0
}
return int(reflect.ValueOf(waveObj).Elem().FieldByIndex(desc.VersionField.Index).Int())
}
func SetVersion(waveObj WaveObj, version int) {
desc := getWaveObjDesc(waveObj.GetOType())
if desc == nil {
return
}
reflect.ValueOf(waveObj).Elem().FieldByIndex(desc.VersionField.Index).SetInt(int64(version))
}
func ToJsonMap(w WaveObj) (map[string]any, error) {
m := make(map[string]any)
dconfig := &mapstructure.DecoderConfig{
Result: &m,
TagName: "json",
}
decoder, err := mapstructure.NewDecoder(dconfig)
if err != nil {
return nil, err
}
err = decoder.Decode(w)
if err != nil {
return nil, err
}
m[OTypeKeyName] = w.GetOType()
m[OIDKeyName] = GetOID(w)
m[VersionKeyName] = GetVersion(w)
return m, nil
}
func ToJson(w WaveObj) ([]byte, error) {
m, err := ToJsonMap(w)
if err != nil {
return nil, err
}
return json.Marshal(m)
}
func FromJson(data []byte) (WaveObj, error) {
var m map[string]any
err := json.Unmarshal(data, &m)
if err != nil {
return nil, err
}
otype, ok := m[OTypeKeyName].(string)
if !ok {
return nil, fmt.Errorf("missing otype")
}
desc := getWaveObjDesc(otype)
if desc == nil {
return nil, fmt.Errorf("unknown otype: %s", otype)
}
wobj := reflect.Zero(desc.RType).Interface().(WaveObj)
dconfig := &mapstructure.DecoderConfig{
Result: &wobj,
TagName: "json",
}
decoder, err := mapstructure.NewDecoder(dconfig)
if err != nil {
return nil, err
}
err = decoder.Decode(m)
if err != nil {
return nil, err
}
return wobj, nil
}
func FromJsonGen[T WaveObj](data []byte) (T, error) {
obj, err := FromJson(data)
if err != nil {
var zero T
return zero, err
}
rtn, ok := obj.(T)
if !ok {
var zero T
return zero, fmt.Errorf("type mismatch got %T, expected %T", obj, zero)
}
return rtn, nil
}
func getTSFieldName(field reflect.StructField) string {
jsonTag := field.Tag.Get("json")
if jsonTag != "" {
parts := strings.Split(jsonTag, ",")
namePart := parts[0]
if namePart != "" {
if namePart == "-" {
return ""
}
return namePart
}
// if namePart is empty, still uses default
}
return field.Name
}
func isFieldOmitEmpty(field reflect.StructField) bool {
jsonTag := field.Tag.Get("json")
if jsonTag != "" {
parts := strings.Split(jsonTag, ",")
if len(parts) > 1 {
for _, part := range parts[1:] {
if part == "omitempty" {
return true
}
}
}
}
return false
}
func typeToTSType(t reflect.Type) (string, []reflect.Type) {
switch t.Kind() {
case reflect.String:
return "string", nil
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Float32, reflect.Float64:
return "number", nil
case reflect.Bool:
return "boolean", nil
case reflect.Slice, reflect.Array:
elemType, subTypes := typeToTSType(t.Elem())
if elemType == "" {
return "", nil
}
return fmt.Sprintf("%s[]", elemType), subTypes
case reflect.Map:
if t.Key().Kind() != reflect.String {
return "", nil
}
elemType, subTypes := typeToTSType(t.Elem())
if elemType == "" {
return "", nil
}
return fmt.Sprintf("{[key: string]: %s}", elemType), subTypes
case reflect.Struct:
return t.Name(), []reflect.Type{t}
case reflect.Ptr:
return typeToTSType(t.Elem())
case reflect.Interface:
return "any", nil
default:
return "", nil
}
}
var tsRenameMap = map[string]string{
"Window": "WaveWindow",
}
func generateTSTypeInternal(rtype reflect.Type) (string, []reflect.Type) {
var buf bytes.Buffer
waveObjType := reflect.TypeOf((*WaveObj)(nil)).Elem()
tsTypeName := rtype.Name()
if tsRename, ok := tsRenameMap[tsTypeName]; ok {
tsTypeName = tsRename
}
var isWaveObj bool
if rtype.Implements(waveObjType) || reflect.PointerTo(rtype).Implements(waveObjType) {
isWaveObj = true
buf.WriteString(fmt.Sprintf("type %s = WaveObj & {\n", tsTypeName))
} else {
buf.WriteString(fmt.Sprintf("type %s = {\n", tsTypeName))
}
var subTypes []reflect.Type
for i := 0; i < rtype.NumField(); i++ {
field := rtype.Field(i)
if field.PkgPath != "" {
continue
}
fieldName := getTSFieldName(field)
if fieldName == "" {
continue
}
if isWaveObj && (fieldName == OTypeKeyName || fieldName == OIDKeyName || fieldName == VersionKeyName) {
continue
}
optMarker := ""
if isFieldOmitEmpty(field) {
optMarker = "?"
}
tsTypeTag := field.Tag.Get("tstype")
if tsTypeTag != "" {
buf.WriteString(fmt.Sprintf(" %s%s: %s;\n", fieldName, optMarker, tsTypeTag))
continue
}
tsType, fieldSubTypes := typeToTSType(field.Type)
if tsType == "" {
continue
}
subTypes = append(subTypes, fieldSubTypes...)
buf.WriteString(fmt.Sprintf(" %s%s: %s;\n", fieldName, optMarker, tsType))
}
buf.WriteString("};\n")
return buf.String(), subTypes
}
func GenerateWaveObjTSType() string {
var buf bytes.Buffer
buf.WriteString("type WaveObj = {\n")
buf.WriteString(" otype: string;\n")
buf.WriteString(" oid: string;\n")
buf.WriteString(" version: number;\n")
buf.WriteString("};\n")
return buf.String()
}
func GenerateTSType(rtype reflect.Type, tsTypesMap map[reflect.Type]string) {
if rtype == nil {
return
}
if rtype.Kind() == reflect.Ptr {
rtype = rtype.Elem()
}
if _, ok := tsTypesMap[rtype]; ok {
return
}
if rtype == waveObjRType {
tsTypesMap[rtype] = GenerateWaveObjTSType()
return
}
tsType, subTypes := generateTSTypeInternal(rtype)
tsTypesMap[rtype] = tsType
for _, subType := range subTypes {
GenerateTSType(subType, tsTypesMap)
}
}

View File

@ -0,0 +1,30 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package waveobj
import (
"log"
"reflect"
"testing"
)
type TestBlock struct {
BlockId string `json:"blockid" waveobj:"oid"`
Name string `json:"name"`
}
func (TestBlock) GetOType() string {
return "block"
}
func TestGenerate(t *testing.T) {
log.Printf("Testing Generate\n")
tsMap := make(map[reflect.Type]string)
var waveObj WaveObj
GenerateTSType(reflect.TypeOf(&waveObj).Elem(), tsMap)
GenerateTSType(reflect.TypeOf(TestBlock{}), tsMap)
for k, v := range tsMap {
log.Printf("Type: %v, TS:\n%s\n", k, v)
}
}

View File

@ -4,45 +4,233 @@
package wstore package wstore
import ( import (
"bytes"
"context"
"encoding/json"
"fmt" "fmt"
"sync" "log"
"reflect"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/util/ds" "github.com/wavetermdev/thenextwave/pkg/waveobj"
) )
var WorkspaceMap = ds.NewSyncMap[*Workspace]() var waveObjUpdateKey = struct{}{}
var TabMap = ds.NewSyncMap[*Tab]()
var BlockMap = ds.NewSyncMap[*Block]() func init() {
for _, rtype := range AllWaveObjTypes() {
waveobj.RegisterType(rtype)
}
}
type contextUpdatesType struct {
UpdatesStack []map[waveobj.ORef]WaveObjUpdate
}
func dumpUpdateStack(updates *contextUpdatesType) {
log.Printf("dumpUpdateStack len:%d\n", len(updates.UpdatesStack))
for idx, update := range updates.UpdatesStack {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf(" [%d]:", idx))
for k := range update {
buf.WriteString(fmt.Sprintf(" %s:%s", k.OType, k.OID))
}
buf.WriteString("\n")
log.Print(buf.String())
}
}
func ContextWithUpdates(ctx context.Context) context.Context {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal != nil {
return ctx
}
return context.WithValue(ctx, waveObjUpdateKey, &contextUpdatesType{
UpdatesStack: []map[waveobj.ORef]WaveObjUpdate{make(map[waveobj.ORef]WaveObjUpdate)},
})
}
func ContextGetUpdates(ctx context.Context) map[waveobj.ORef]WaveObjUpdate {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return nil
}
updates := updatesVal.(*contextUpdatesType)
if len(updates.UpdatesStack) == 1 {
return updates.UpdatesStack[0]
}
rtn := make(map[waveobj.ORef]WaveObjUpdate)
for _, update := range updates.UpdatesStack {
for k, v := range update {
rtn[k] = v
}
}
return rtn
}
func ContextGetUpdate(ctx context.Context, oref waveobj.ORef) *WaveObjUpdate {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return nil
}
updates := updatesVal.(*contextUpdatesType)
for idx := len(updates.UpdatesStack) - 1; idx >= 0; idx-- {
if obj, ok := updates.UpdatesStack[idx][oref]; ok {
return &obj
}
}
return nil
}
func ContextAddUpdate(ctx context.Context, update WaveObjUpdate) {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return
}
updates := updatesVal.(*contextUpdatesType)
oref := waveobj.ORef{
OType: update.OType,
OID: update.OID,
}
updates.UpdatesStack[len(updates.UpdatesStack)-1][oref] = update
}
func ContextUpdatesBeginTx(ctx context.Context) context.Context {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return ctx
}
updates := updatesVal.(*contextUpdatesType)
updates.UpdatesStack = append(updates.UpdatesStack, make(map[waveobj.ORef]WaveObjUpdate))
return ctx
}
func ContextUpdatesCommitTx(ctx context.Context) {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return
}
updates := updatesVal.(*contextUpdatesType)
if len(updates.UpdatesStack) <= 1 {
panic(fmt.Errorf("no updates transaction to commit"))
}
// merge the last two updates
curUpdateMap := updates.UpdatesStack[len(updates.UpdatesStack)-1]
prevUpdateMap := updates.UpdatesStack[len(updates.UpdatesStack)-2]
for k, v := range curUpdateMap {
prevUpdateMap[k] = v
}
updates.UpdatesStack = updates.UpdatesStack[:len(updates.UpdatesStack)-1]
}
func ContextUpdatesRollbackTx(ctx context.Context) {
updatesVal := ctx.Value(waveObjUpdateKey)
if updatesVal == nil {
return
}
updates := updatesVal.(*contextUpdatesType)
if len(updates.UpdatesStack) <= 1 {
panic(fmt.Errorf("no updates transaction to rollback"))
}
updates.UpdatesStack = updates.UpdatesStack[:len(updates.UpdatesStack)-1]
}
type WaveObjTombstone struct {
OType string `json:"otype"`
OID string `json:"oid"`
}
const (
UpdateType_Update = "update"
UpdateType_Delete = "delete"
)
type WaveObjUpdate struct {
UpdateType string `json:"updatetype"`
OType string `json:"otype"`
OID string `json:"oid"`
Obj waveobj.WaveObj `json:"obj,omitempty"`
}
func (update WaveObjUpdate) MarshalJSON() ([]byte, error) {
rtn := make(map[string]any)
rtn["updatetype"] = update.UpdateType
rtn["otype"] = update.OType
rtn["oid"] = update.OID
if update.Obj != nil {
var err error
rtn["obj"], err = waveobj.ToJsonMap(update.Obj)
if err != nil {
return nil, err
}
}
return json.Marshal(rtn)
}
type UIContext struct {
WindowId string `json:"windowid"`
ActiveTabId string `json:"activetabid"`
}
type Client struct { type Client struct {
DefaultWorkspaceId string `json:"defaultworkspaceid"` OID string `json:"oid"`
Version int `json:"version"`
MainWindowId string `json:"mainwindowid"`
}
func (*Client) GetOType() string {
return "client"
}
func AllWaveObjTypes() []reflect.Type {
return []reflect.Type{
reflect.TypeOf(&Client{}),
reflect.TypeOf(&Window{}),
reflect.TypeOf(&Workspace{}),
reflect.TypeOf(&Tab{}),
reflect.TypeOf(&Block{}),
}
}
// stores the ui-context of the window
// workspaceid, active tab, active block within each tab, window size, etc.
type Window struct {
OID string `json:"oid"`
Version int `json:"version"`
WorkspaceId string `json:"workspaceid"`
ActiveTabId string `json:"activetabid"`
ActiveBlockMap map[string]string `json:"activeblockmap"` // map from tabid to blockid
Pos Point `json:"pos"`
WinSize WinSize `json:"winsize"`
LastFocusTs int64 `json:"lastfocusts"`
}
func (*Window) GetOType() string {
return "window"
} }
type Workspace struct { type Workspace struct {
Lock *sync.Mutex `json:"-"` OID string `json:"oid"`
WorkspaceId string `json:"workspaceid"` Version int `json:"version"`
Name string `json:"name"`
TabIds []string `json:"tabids"` TabIds []string `json:"tabids"`
} }
func (ws *Workspace) WithLock(f func()) { func (*Workspace) GetOType() string {
ws.Lock.Lock() return "workspace"
defer ws.Lock.Unlock()
f()
} }
type Tab struct { type Tab struct {
Lock *sync.Mutex `json:"-"` OID string `json:"oid"`
TabId string `json:"tabid"` Version int `json:"version"`
Name string `json:"name"` Name string `json:"name"`
BlockIds []string `json:"blockids"` BlockIds []string `json:"blockids"`
} }
func (tab *Tab) WithLock(f func()) { func (*Tab) GetOType() string {
tab.Lock.Lock() return "tab"
defer tab.Lock.Unlock()
f()
} }
type FileDef struct { type FileDef struct {
@ -54,7 +242,7 @@ type FileDef struct {
} }
type BlockDef struct { type BlockDef struct {
Controller string `json:"controller"` Controller string `json:"controller,omitempty"`
View string `json:"view,omitempty"` View string `json:"view,omitempty"`
Files map[string]*FileDef `json:"files,omitempty"` Files map[string]*FileDef `json:"files,omitempty"`
Meta map[string]any `json:"meta,omitempty"` Meta map[string]any `json:"meta,omitempty"`
@ -65,56 +253,205 @@ type RuntimeOpts struct {
WinSize WinSize `json:"winsize,omitempty"` WinSize WinSize `json:"winsize,omitempty"`
} }
type Point struct {
X int `json:"x"`
Y int `json:"y"`
}
type WinSize struct { type WinSize struct {
Width int `json:"width"` Width int `json:"width"`
Height int `json:"height"` Height int `json:"height"`
} }
type Block struct { type Block struct {
Lock *sync.Mutex `json:"-"` OID string `json:"oid"`
BlockId string `json:"blockid"` Version int `json:"version"`
BlockDef *BlockDef `json:"blockdef"` BlockDef *BlockDef `json:"blockdef"`
Controller string `json:"controller"` Controller string `json:"controller"`
ControllerStatus string `json:"controllerstatus"`
View string `json:"view"` View string `json:"view"`
Meta map[string]any `json:"meta,omitempty"` Meta map[string]any `json:"meta,omitempty"`
RuntimeOpts *RuntimeOpts `json:"runtimeopts,omitempty"` RuntimeOpts *RuntimeOpts `json:"runtimeopts,omitempty"`
} }
func (b *Block) WithLock(f func()) { func (*Block) GetOType() string {
b.Lock.Lock() return "block"
defer b.Lock.Unlock()
f()
} }
func CreateTab(workspaceId string, name string) (*Tab, error) { func CreateTab(ctx context.Context, workspaceId string, name string) (*Tab, error) {
tab := &Tab{ return WithTxRtn(ctx, func(tx *TxWrap) (*Tab, error) {
Lock: &sync.Mutex{}, ws, _ := DBGet[*Workspace](tx.Context(), workspaceId)
TabId: uuid.New().String(),
Name: name,
BlockIds: []string{},
}
TabMap.Set(tab.TabId, tab)
ws := WorkspaceMap.Get(workspaceId)
if ws == nil { if ws == nil {
return nil, fmt.Errorf("workspace not found: %q", workspaceId) return nil, fmt.Errorf("workspace not found: %q", workspaceId)
} }
ws.WithLock(func() { tab := &Tab{
ws.TabIds = append(ws.TabIds, tab.TabId) OID: uuid.New().String(),
}) Name: name,
BlockIds: []string{},
}
ws.TabIds = append(ws.TabIds, tab.OID)
DBInsert(tx.Context(), tab)
DBUpdate(tx.Context(), ws)
return tab, nil return tab, nil
})
} }
func CreateWorkspace() (*Workspace, error) { func CreateWorkspace(ctx context.Context) (*Workspace, error) {
ws := &Workspace{ ws := &Workspace{
Lock: &sync.Mutex{}, OID: uuid.New().String(),
WorkspaceId: uuid.New().String(),
TabIds: []string{}, TabIds: []string{},
} }
WorkspaceMap.Set(ws.WorkspaceId, ws) DBInsert(ctx, ws)
_, err := CreateTab(ws.WorkspaceId, "Tab 1")
if err != nil {
return nil, err
}
return ws, nil return ws, nil
} }
func SetActiveTab(ctx context.Context, windowId string, tabId string) error {
return WithTx(ctx, func(tx *TxWrap) error {
window, _ := DBGet[*Window](tx.Context(), windowId)
if window == nil {
return fmt.Errorf("window not found: %q", windowId)
}
if tabId != "" {
tab, _ := DBGet[*Tab](tx.Context(), tabId)
if tab == nil {
return fmt.Errorf("tab not found: %q", tabId)
}
}
window.ActiveTabId = tabId
DBUpdate(tx.Context(), window)
return nil
})
}
func CreateBlock(ctx context.Context, tabId string, blockDef *BlockDef, rtOpts *RuntimeOpts) (*Block, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*Block, error) {
tab, _ := DBGet[*Tab](tx.Context(), tabId)
if tab == nil {
return nil, fmt.Errorf("tab not found: %q", tabId)
}
blockId := uuid.New().String()
blockData := &Block{
OID: blockId,
BlockDef: blockDef,
Controller: blockDef.Controller,
View: blockDef.View,
RuntimeOpts: rtOpts,
Meta: blockDef.Meta,
}
DBInsert(tx.Context(), blockData)
tab.BlockIds = append(tab.BlockIds, blockId)
DBUpdate(tx.Context(), tab)
return blockData, nil
})
}
func findStringInSlice(slice []string, val string) int {
for idx, v := range slice {
if v == val {
return idx
}
}
return -1
}
func DeleteBlock(ctx context.Context, tabId string, blockId string) error {
return WithTx(ctx, func(tx *TxWrap) error {
tab, _ := DBGet[*Tab](tx.Context(), tabId)
if tab == nil {
return fmt.Errorf("tab not found: %q", tabId)
}
blockIdx := findStringInSlice(tab.BlockIds, blockId)
if blockIdx == -1 {
return nil
}
tab.BlockIds = append(tab.BlockIds[:blockIdx], tab.BlockIds[blockIdx+1:]...)
DBUpdate(tx.Context(), tab)
DBDelete(tx.Context(), "block", blockId)
return nil
})
}
func CloseTab(ctx context.Context, workspaceId string, tabId string) error {
return WithTx(ctx, func(tx *TxWrap) error {
ws, _ := DBGet[*Workspace](tx.Context(), workspaceId)
if ws == nil {
return fmt.Errorf("workspace not found: %q", workspaceId)
}
tab, _ := DBGet[*Tab](tx.Context(), tabId)
if tab == nil {
return fmt.Errorf("tab not found: %q", tabId)
}
tabIdx := findStringInSlice(ws.TabIds, tabId)
if tabIdx == -1 {
return nil
}
ws.TabIds = append(ws.TabIds[:tabIdx], ws.TabIds[tabIdx+1:]...)
DBUpdate(tx.Context(), ws)
DBDelete(tx.Context(), "tab", tabId)
for _, blockId := range tab.BlockIds {
DBDelete(tx.Context(), "block", blockId)
}
return nil
})
}
func EnsureInitialData() error {
// does not need to run in a transaction since it is called on startup
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
clientCount, err := DBGetCount[*Client](ctx)
if err != nil {
return fmt.Errorf("error getting client count: %w", err)
}
if clientCount > 0 {
return nil
}
windowId := uuid.New().String()
workspaceId := uuid.New().String()
tabId := uuid.New().String()
client := &Client{
OID: uuid.New().String(),
MainWindowId: windowId,
}
err = DBInsert(ctx, client)
if err != nil {
return fmt.Errorf("error inserting client: %w", err)
}
window := &Window{
OID: windowId,
WorkspaceId: workspaceId,
ActiveTabId: tabId,
ActiveBlockMap: make(map[string]string),
Pos: Point{
X: 100,
Y: 100,
},
WinSize: WinSize{
Width: 800,
Height: 600,
},
}
err = DBInsert(ctx, window)
if err != nil {
return fmt.Errorf("error inserting window: %w", err)
}
ws := &Workspace{
OID: workspaceId,
Name: "default",
TabIds: []string{tabId},
}
err = DBInsert(ctx, ws)
if err != nil {
return fmt.Errorf("error inserting workspace: %w", err)
}
tab := &Tab{
OID: tabId,
Name: "Tab-1",
BlockIds: []string{},
}
err = DBInsert(ctx, tab)
if err != nil {
return fmt.Errorf("error inserting tab: %w", err)
}
return nil
}

203
pkg/wstore/wstore_dbops.go Normal file
View File

@ -0,0 +1,203 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wstore
import (
"context"
"fmt"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
)
var ErrNotFound = fmt.Errorf("not found")
func waveObjTableName(w waveobj.WaveObj) string {
return "db_" + w.GetOType()
}
func tableNameFromOType(otype string) string {
return "db_" + otype
}
func tableNameGen[T waveobj.WaveObj]() string {
var zeroObj T
return tableNameFromOType(zeroObj.GetOType())
}
func getOTypeGen[T waveobj.WaveObj]() string {
var zeroObj T
return zeroObj.GetOType()
}
func DBGetCount[T waveobj.WaveObj](ctx context.Context) (int, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (int, error) {
table := tableNameGen[T]()
query := fmt.Sprintf("SELECT count(*) FROM %s", table)
return tx.GetInt(query), nil
})
}
type idDataType struct {
OId string
Version int
Data []byte
}
func genericCastWithErr[T any](v any, err error) (T, error) {
if err != nil {
var zeroVal T
return zeroVal, err
}
return v.(T), err
}
func DBGetSingleton[T waveobj.WaveObj](ctx context.Context) (T, error) {
rtn, err := DBGetSingletonByType(ctx, getOTypeGen[T]())
return genericCastWithErr[T](rtn, err)
}
func DBGetSingletonByType(ctx context.Context, otype string) (waveobj.WaveObj, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (waveobj.WaveObj, error) {
table := tableNameFromOType(otype)
query := fmt.Sprintf("SELECT oid, version, data FROM %s LIMIT 1", table)
var row idDataType
tx.Get(&row, query)
rtn, err := waveobj.FromJson(row.Data)
if err != nil {
return rtn, err
}
waveobj.SetVersion(rtn, row.Version)
return rtn, nil
})
}
func DBGet[T waveobj.WaveObj](ctx context.Context, id string) (T, error) {
rtn, err := DBGetORef(ctx, waveobj.ORef{OType: getOTypeGen[T](), OID: id})
return genericCastWithErr[T](rtn, err)
}
func DBMustGet[T waveobj.WaveObj](ctx context.Context, id string) (T, error) {
rtn, err := DBGetORef(ctx, waveobj.ORef{OType: getOTypeGen[T](), OID: id})
if err != nil {
var zeroVal T
return zeroVal, err
}
if rtn == nil {
var zeroVal T
return zeroVal, ErrNotFound
}
return rtn.(T), nil
}
func DBGetORef(ctx context.Context, oref waveobj.ORef) (waveobj.WaveObj, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (waveobj.WaveObj, error) {
table := tableNameFromOType(oref.OType)
query := fmt.Sprintf("SELECT oid, version, data FROM %s WHERE oid = ?", table)
var row idDataType
tx.Get(&row, query, oref.OID)
rtn, err := waveobj.FromJson(row.Data)
if err != nil {
return rtn, err
}
waveobj.SetVersion(rtn, row.Version)
return rtn, nil
})
}
func dbSelectOIDs(ctx context.Context, otype string, oids []string) ([]waveobj.WaveObj, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]waveobj.WaveObj, error) {
table := tableNameFromOType(otype)
query := fmt.Sprintf("SELECT oid, version, data FROM %s WHERE oid IN (SELECT value FROM json_each(?))", table)
var rows []idDataType
tx.Select(&rows, query, oids)
rtn := make([]waveobj.WaveObj, 0, len(rows))
for _, row := range rows {
waveObj, err := waveobj.FromJson(row.Data)
if err != nil {
return nil, err
}
waveobj.SetVersion(waveObj, row.Version)
rtn = append(rtn, waveObj)
}
return rtn, nil
})
}
func DBSelectORefs(ctx context.Context, orefs []waveobj.ORef) ([]waveobj.WaveObj, error) {
oidsByType := make(map[string][]string)
for _, oref := range orefs {
oidsByType[oref.OType] = append(oidsByType[oref.OType], oref.OID)
}
return WithTxRtn(ctx, func(tx *TxWrap) ([]waveobj.WaveObj, error) {
rtn := make([]waveobj.WaveObj, 0, len(orefs))
for otype, oids := range oidsByType {
rtnArr, err := dbSelectOIDs(tx.Context(), otype, oids)
if err != nil {
return nil, err
}
rtn = append(rtn, rtnArr...)
}
return rtn, nil
})
}
func DBSelectMap[T waveobj.WaveObj](ctx context.Context, ids []string) (map[string]T, error) {
rtnArr, err := dbSelectOIDs(ctx, getOTypeGen[T](), ids)
if err != nil {
return nil, err
}
rtnMap := make(map[string]T)
for _, obj := range rtnArr {
rtnMap[waveobj.GetOID(obj)] = obj.(T)
}
return rtnMap, nil
}
func DBDelete(ctx context.Context, otype string, id string) error {
return WithTx(ctx, func(tx *TxWrap) error {
table := tableNameFromOType(otype)
query := fmt.Sprintf("DELETE FROM %s WHERE oid = ?", table)
tx.Exec(query, id)
ContextAddUpdate(ctx, WaveObjUpdate{UpdateType: UpdateType_Delete, OType: otype, OID: id})
return nil
})
}
func DBUpdate(ctx context.Context, val waveobj.WaveObj) error {
oid := waveobj.GetOID(val)
if oid == "" {
return fmt.Errorf("cannot update %T value with empty id", val)
}
jsonData, err := waveobj.ToJson(val)
if err != nil {
return err
}
return WithTx(ctx, func(tx *TxWrap) error {
table := waveObjTableName(val)
query := fmt.Sprintf("UPDATE %s SET data = ?, version = version+1 WHERE oid = ? RETURNING version", table)
newVersion := tx.GetInt(query, jsonData, oid)
waveobj.SetVersion(val, newVersion)
ContextAddUpdate(ctx, WaveObjUpdate{UpdateType: UpdateType_Update, OType: val.GetOType(), OID: oid, Obj: val})
return nil
})
}
func DBInsert(ctx context.Context, val waveobj.WaveObj) error {
oid := waveobj.GetOID(val)
if oid == "" {
return fmt.Errorf("cannot insert %T value with empty id", val)
}
jsonData, err := waveobj.ToJson(val)
if err != nil {
return err
}
return WithTx(ctx, func(tx *TxWrap) error {
table := waveObjTableName(val)
waveobj.SetVersion(val, 1)
query := fmt.Sprintf("INSERT INTO %s (oid, version, data) VALUES (?, ?, ?)", table)
tx.Exec(query, oid, 1, jsonData)
ContextAddUpdate(ctx, WaveObjUpdate{UpdateType: UpdateType_Update, OType: val.GetOType(), OID: oid, Obj: val})
return nil
})
}

View File

@ -12,7 +12,10 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sawka/txwrap" "github.com/sawka/txwrap"
"github.com/wavetermdev/thenextwave/pkg/util/migrateutil"
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
dbfs "github.com/wavetermdev/thenextwave/db"
) )
const WStoreDBName = "waveterm.db" const WStoreDBName = "waveterm.db"
@ -29,7 +32,7 @@ func InitWStore() error {
if err != nil { if err != nil {
return err return err
} }
err = MigrateWStore() err = migrateutil.Migrate("wstore", globalDB.DB, dbfs.WStoreMigrationFS, "migrations-wstore")
if err != nil { if err != nil {
return err return err
} }
@ -52,6 +55,26 @@ func MakeDB(ctx context.Context) (*sqlx.DB, error) {
return rtn, nil return rtn, nil
} }
func MigrateWStore() error { func WithTx(ctx context.Context, fn func(tx *TxWrap) error) (rtnErr error) {
return nil ContextUpdatesBeginTx(ctx)
defer func() {
if rtnErr != nil {
ContextUpdatesRollbackTx(ctx)
} else {
ContextUpdatesCommitTx(ctx)
}
}()
return txwrap.WithTx(ctx, globalDB, fn)
}
func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (rtnVal RT, rtnErr error) {
ContextUpdatesBeginTx(ctx)
defer func() {
if rtnErr != nil {
ContextUpdatesRollbackTx(ctx)
} else {
ContextUpdatesCommitTx(ctx)
}
}()
return txwrap.WithTxRtn(ctx, globalDB, fn)
} }

View File

@ -20,6 +20,7 @@
"@/store/*": ["frontend/app/store/*"], "@/store/*": ["frontend/app/store/*"],
"@/element/*": ["frontend/app/element/*"], "@/element/*": ["frontend/app/element/*"],
"@/bindings/*": ["frontend/bindings/github.com/wavetermdev/thenextwave/pkg/service/*"], "@/bindings/*": ["frontend/bindings/github.com/wavetermdev/thenextwave/pkg/service/*"],
"@/gopkg/*": ["frontend/bindings/github.com/wavetermdev/thenextwave/pkg/*"],
} }
} }
} }