Migrate websocket eventbus messages to wps (#367)

This migrates all remaining eventbus events sent over the websocket to
use the wps interface. WPS is more flexible for registering events and
callbacks and provides support for more reliable unsubscribes and
resubscribes.
This commit is contained in:
Evan Simkowitz 2024-09-11 18:03:55 -07:00 committed by GitHub
parent 4bfb96b001
commit 936d4bfb30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 519 additions and 549 deletions

View File

@ -28,6 +28,7 @@ func GenerateWshClient() {
"github.com/wavetermdev/waveterm/pkg/wshrpc",
"github.com/wavetermdev/waveterm/pkg/waveobj",
"github.com/wavetermdev/waveterm/pkg/wconfig",
"github.com/wavetermdev/waveterm/pkg/wps",
})
wshDeclMap := wshrpc.GenerateWshCommandDeclMap()
for _, key := range utilfn.GetOrderedMapKeys(wshDeclMap) {

View File

@ -10,6 +10,7 @@ import (
"github.com/spf13/cobra"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
)
@ -64,11 +65,11 @@ func editorRun(cmd *cobra.Command, args []string) {
return
}
doneCh := make(chan bool)
RpcClient.EventListener.On("blockclose", func(event *wshrpc.WaveEvent) {
RpcClient.EventListener.On("blockclose", func(event *wps.WaveEvent) {
if event.HasScope(blockRef.String()) {
close(doneCh)
}
})
wshclient.EventSubCommand(RpcClient, wshrpc.SubscriptionRequest{Event: "blockclose", Scopes: []string{blockRef.String()}}, nil)
wshclient.EventSubCommand(RpcClient, wps.SubscriptionRequest{Event: "blockclose", Scopes: []string{blockRef.String()}}, nil)
<-doneCh
}

View File

@ -2,9 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
import { NumActiveConnColors } from "@/app/block/blockframe";
import { getConnStatusAtom, waveEventSubscribe, WOS } from "@/app/store/global";
import { getConnStatusAtom, WOS } from "@/app/store/global";
import * as services from "@/app/store/services";
import { makeORef } from "@/app/store/wos";
import { waveEventSubscribe } from "@/store/wps";
import * as util from "@/util/util";
import clsx from "clsx";
import * as jotai from "jotai";
@ -160,9 +161,13 @@ export const ControllerStatusIcon = React.memo(({ blockId }: { blockId: string }
setGotInitialStatus(true);
setControllerStatus(rts);
});
const unsubFn = waveEventSubscribe("controllerstatus", makeORef("block", blockId), (event) => {
const cstatus: BlockControllerRuntimeStatus = event.data;
setControllerStatus(cstatus);
const unsubFn = waveEventSubscribe({
eventType: "controllerstatus",
scope: makeORef("block", blockId),
handler: (event) => {
const cstatus: BlockControllerRuntimeStatus = event.data;
setControllerStatus(cstatus);
},
});
return () => {
unsubFn();

View File

@ -1,7 +1,6 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { handleIncomingRpcMessage, sendRawRpcMessage } from "@/app/store/wshrpc";
import {
getLayoutModelForActiveTab,
getLayoutModelForTabById,
@ -9,23 +8,22 @@ import {
LayoutTreeInsertNodeAction,
newLayoutNode,
} from "@/layout/index";
import { getWebServerEndpoint, getWSServerEndpoint } from "@/util/endpoints";
import { getWebServerEndpoint } from "@/util/endpoints";
import { fetch } from "@/util/fetchutil";
import * as util from "@/util/util";
import * as jotai from "jotai";
import * as rxjs from "rxjs";
import { getPrefixedSettings, isBlank } from "@/util/util";
import { atom, Atom, createStore, PrimitiveAtom, useAtomValue } from "jotai";
import { modalsModel } from "./modalmodel";
import * as services from "./services";
import { ClientService, ObjectService } from "./services";
import * as WOS from "./wos";
import { WSControl } from "./ws";
import { getFileSubject, waveEventSubscribe } from "./wps";
let PLATFORM: NodeJS.Platform = "darwin";
const globalStore = jotai.createStore();
const globalStore = createStore();
let atoms: GlobalAtomsType;
let globalEnvironment: "electron" | "renderer";
const blockComponentModelMap = new Map<string, BlockComponentModel>();
const Counters = new Map<string, number>();
const ConnStatusMap = new Map<string, jotai.PrimitiveAtom<ConnStatus>>();
const ConnStatusMap = new Map<string, PrimitiveAtom<ConnStatus>>();
type GlobalInitOptions = {
platform: NodeJS.Platform;
@ -45,18 +43,18 @@ function setPlatform(platform: NodeJS.Platform) {
}
function initGlobalAtoms(initOpts: GlobalInitOptions) {
const windowIdAtom = jotai.atom(initOpts.windowId) as jotai.PrimitiveAtom<string>;
const clientIdAtom = jotai.atom(initOpts.clientId) as jotai.PrimitiveAtom<string>;
const uiContextAtom = jotai.atom((get) => {
const windowIdAtom = atom(initOpts.windowId) as PrimitiveAtom<string>;
const clientIdAtom = atom(initOpts.clientId) as PrimitiveAtom<string>;
const uiContextAtom = atom((get) => {
const windowData = get(windowDataAtom);
const uiContext: UIContext = {
windowid: get(atoms.windowId),
activetabid: windowData?.activetabid,
};
return uiContext;
}) as jotai.Atom<UIContext>;
}) as Atom<UIContext>;
const isFullScreenAtom = jotai.atom(false) as jotai.PrimitiveAtom<boolean>;
const isFullScreenAtom = atom(false) as PrimitiveAtom<boolean>;
try {
getApi().onFullScreenChange((isFullScreen) => {
globalStore.set(isFullScreenAtom, isFullScreen);
@ -65,7 +63,7 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
// do nothing
}
const showAboutModalAtom = jotai.atom(false) as jotai.PrimitiveAtom<boolean>;
const showAboutModalAtom = atom(false) as PrimitiveAtom<boolean>;
try {
getApi().onMenuItemAbout(() => {
modalsModel.pushModal("AboutModal");
@ -74,14 +72,14 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
// do nothing
}
const clientAtom: jotai.Atom<Client> = jotai.atom((get) => {
const clientAtom: Atom<Client> = atom((get) => {
const clientId = get(clientIdAtom);
if (clientId == null) {
return null;
}
return WOS.getObjectValue(WOS.makeORef("client", clientId), get);
});
const windowDataAtom: jotai.Atom<WaveWindow> = jotai.atom((get) => {
const windowDataAtom: Atom<WaveWindow> = atom((get) => {
const windowId = get(windowIdAtom);
if (windowId == null) {
return null;
@ -89,33 +87,33 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
const rtn = WOS.getObjectValue<WaveWindow>(WOS.makeORef("window", windowId), get);
return rtn;
});
const workspaceAtom: jotai.Atom<Workspace> = jotai.atom((get) => {
const workspaceAtom: Atom<Workspace> = atom((get) => {
const windowData = get(windowDataAtom);
if (windowData == null) {
return null;
}
return WOS.getObjectValue(WOS.makeORef("workspace", windowData.workspaceid), get);
});
const fullConfigAtom = jotai.atom(null) as jotai.PrimitiveAtom<FullConfigType>;
const settingsAtom = jotai.atom((get) => {
const fullConfigAtom = atom(null) as PrimitiveAtom<FullConfigType>;
const settingsAtom = atom((get) => {
return get(fullConfigAtom)?.settings ?? {};
}) as jotai.Atom<SettingsType>;
const tabAtom: jotai.Atom<Tab> = jotai.atom((get) => {
}) as Atom<SettingsType>;
const tabAtom: Atom<Tab> = atom((get) => {
const windowData = get(windowDataAtom);
if (windowData == null) {
return null;
}
return WOS.getObjectValue(WOS.makeORef("tab", windowData.activetabid), get);
});
const activeTabIdAtom: jotai.Atom<string> = jotai.atom((get) => {
const activeTabIdAtom: Atom<string> = atom((get) => {
const windowData = get(windowDataAtom);
if (windowData == null) {
return null;
}
return windowData.activetabid;
});
const controlShiftDelayAtom = jotai.atom(false);
const updaterStatusAtom = jotai.atom<UpdaterStatus>("up-to-date") as jotai.PrimitiveAtom<UpdaterStatus>;
const controlShiftDelayAtom = atom(false);
const updaterStatusAtom = atom<UpdaterStatus>("up-to-date") as PrimitiveAtom<UpdaterStatus>;
try {
globalStore.set(updaterStatusAtom, getApi().getUpdaterStatus());
getApi().onUpdaterStatusChange((status) => {
@ -125,11 +123,11 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
// do nothing
}
const reducedMotionSettingAtom = jotai.atom((get) => get(settingsAtom)?.["window:reducedmotion"]);
const reducedMotionSystemPreferenceAtom = jotai.atom(false);
const reducedMotionSettingAtom = atom((get) => get(settingsAtom)?.["window:reducedmotion"]);
const reducedMotionSystemPreferenceAtom = atom(false);
// Composite of the prefers-reduced-motion media query and the window:reducedmotion user setting.
const prefersReducedMotionAtom = jotai.atom((get) => {
const prefersReducedMotionAtom = atom((get) => {
const reducedMotionSetting = get(reducedMotionSettingAtom);
const reducedMotionSystemPreference = get(reducedMotionSystemPreferenceAtom);
return reducedMotionSetting || reducedMotionSystemPreference;
@ -144,9 +142,9 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
});
}
const typeAheadModalAtom = jotai.atom({});
const modalOpen = jotai.atom(false);
const allConnStatusAtom = jotai.atom<ConnStatus[]>((get) => {
const typeAheadModalAtom = atom({});
const modalOpen = atom(false);
const allConnStatusAtom = atom<ConnStatus[]>((get) => {
const connStatuses = Array.from(ConnStatusMap.values()).map((atom) => get(atom));
return connStatuses;
});
@ -172,115 +170,44 @@ function initGlobalAtoms(initOpts: GlobalInitOptions) {
};
}
type WaveEventSubjectContainer = {
id: string;
handler: (event: WaveEvent) => void;
scope: string;
};
// key is "eventType" or "eventType|oref"
const eventSubjects = new Map<string, SubjectWithRef<WSEventType>>();
const fileSubjects = new Map<string, SubjectWithRef<WSFileEventData>>();
const waveEventSubjects = new Map<string, WaveEventSubjectContainer[]>();
function getSubjectInternal(subjectKey: string): SubjectWithRef<WSEventType> {
let subject = eventSubjects.get(subjectKey);
if (subject == null) {
subject = new rxjs.Subject<any>() as any;
subject.refCount = 0;
subject.release = () => {
subject.refCount--;
if (subject.refCount === 0) {
subject.complete();
eventSubjects.delete(subjectKey);
}
};
eventSubjects.set(subjectKey, subject);
}
subject.refCount++;
return subject;
}
function getEventSubject(eventType: string): SubjectWithRef<WSEventType> {
return getSubjectInternal(eventType);
}
function getEventORefSubject(eventType: string, oref: string): SubjectWithRef<WSEventType> {
return getSubjectInternal(eventType + "|" + oref);
}
function makeWaveReSubCommand(eventType: string): RpcMessage {
let subjects = waveEventSubjects.get(eventType);
if (subjects == null) {
return { command: "eventunsub", data: eventType };
}
let subreq: SubscriptionRequest = { event: eventType, scopes: [], allscopes: false };
for (const scont of subjects) {
if (util.isBlank(scont.scope)) {
subreq.allscopes = true;
subreq.scopes = [];
break;
function initGlobalWaveEventSubs() {
waveEventSubscribe(
{
eventType: "waveobj:update",
handler: (event) => {
// console.log("waveobj:update wave event handler", event);
const update: WaveObjUpdate = event.data;
WOS.updateWaveObject(update);
},
},
{
eventType: "config",
handler: (event) => {
// console.log("config wave event handler", event);
const fullConfig = (event.data as WatcherUpdate).fullconfig;
globalStore.set(atoms.fullConfigAtom, fullConfig);
},
},
{
eventType: "userinput",
handler: (event) => {
// console.log("userinput event handler", event);
const data: UserInputRequest = event.data;
modalsModel.pushModal("UserInputModal", { ...data });
},
},
{
eventType: "blockfile",
handler: (event) => {
// console.log("blockfile event update", event);
const fileData: WSFileEventData = event.data;
const fileSubject = getFileSubject(fileData.zoneid, fileData.filename);
if (fileSubject != null) {
fileSubject.next(fileData);
}
},
}
subreq.scopes.push(scont.scope);
}
return { command: "eventsub", data: subreq };
}
function updateWaveEventSub(eventType: string) {
const command = makeWaveReSubCommand(eventType);
sendRawRpcMessage(command);
}
function waveEventSubscribe(eventType: string, scope: string, handler: (event: WaveEvent) => void): () => void {
if (handler == null) {
return;
}
const id = crypto.randomUUID();
const subject = new rxjs.Subject() as any;
const scont: WaveEventSubjectContainer = { id, scope, handler };
let subjects = waveEventSubjects.get(eventType);
if (subjects == null) {
subjects = [];
waveEventSubjects.set(eventType, subjects);
}
subjects.push(scont);
updateWaveEventSub(eventType);
return () => waveEventUnsubscribe(eventType, id);
}
function waveEventUnsubscribe(eventType: string, id: string) {
let subjects = waveEventSubjects.get(eventType);
if (subjects == null) {
return;
}
const idx = subjects.findIndex((s) => s.id === id);
if (idx === -1) {
return;
}
subjects.splice(idx, 1);
if (subjects.length === 0) {
waveEventSubjects.delete(eventType);
}
updateWaveEventSub(eventType);
}
function getFileSubject(zoneId: string, fileName: string): SubjectWithRef<WSFileEventData> {
const subjectKey = zoneId + "|" + fileName;
let subject = fileSubjects.get(subjectKey);
if (subject == null) {
subject = new rxjs.Subject<any>() as any;
subject.refCount = 0;
subject.release = () => {
subject.refCount--;
if (subject.refCount === 0) {
subject.complete();
fileSubjects.delete(subjectKey);
}
};
fileSubjects.set(subjectKey, subject);
}
subject.refCount++;
return subject;
);
}
const blockCache = new Map<string, Map<string, any>>();
@ -299,45 +226,45 @@ function useBlockCache<T>(blockId: string, name: string, makeFn: () => T): T {
return value as T;
}
const settingsAtomCache = new Map<string, jotai.Atom<any>>();
const settingsAtomCache = new Map<string, Atom<any>>();
function useSettingsKeyAtom<T extends keyof SettingsType>(key: T): jotai.Atom<SettingsType[T]> {
let atom = settingsAtomCache.get(key) as jotai.Atom<SettingsType[T]>;
if (atom == null) {
atom = jotai.atom((get) => {
function useSettingsKeyAtom<T extends keyof SettingsType>(key: T): Atom<SettingsType[T]> {
let settingsKeyAtom = settingsAtomCache.get(key) as Atom<SettingsType[T]>;
if (settingsKeyAtom == null) {
settingsKeyAtom = atom((get) => {
const settings = get(atoms.settingsAtom);
if (settings == null) {
return null;
}
return settings[key];
});
settingsAtomCache.set(key, atom);
settingsAtomCache.set(key, settingsKeyAtom);
}
return atom;
return settingsKeyAtom;
}
function useSettingsPrefixAtom(prefix: string): jotai.Atom<SettingsType> {
function useSettingsPrefixAtom(prefix: string): Atom<SettingsType> {
// TODO: use a shallow equal here to make this more efficient
let atom = settingsAtomCache.get(prefix + ":");
if (atom == null) {
atom = jotai.atom((get) => {
let settingsPrefixAtom = settingsAtomCache.get(prefix + ":") as Atom<SettingsType>;
if (settingsPrefixAtom == null) {
settingsPrefixAtom = atom((get) => {
const settings = get(atoms.settingsAtom);
if (settings == null) {
return {};
}
return util.getPrefixedSettings(settings, prefix);
return getPrefixedSettings(settings, prefix);
});
settingsAtomCache.set(prefix + ":", atom);
settingsAtomCache.set(prefix + ":", settingsPrefixAtom);
}
return atom;
return settingsPrefixAtom;
}
const blockAtomCache = new Map<string, Map<string, jotai.Atom<any>>>();
const blockAtomCache = new Map<string, Map<string, Atom<any>>>();
function useBlockAtom<T>(blockId: string, name: string, makeFn: () => jotai.Atom<T>): jotai.Atom<T> {
function useBlockAtom<T>(blockId: string, name: string, makeFn: () => Atom<T>): Atom<T> {
let blockCache = blockAtomCache.get(blockId);
if (blockCache == null) {
blockCache = new Map<string, jotai.Atom<any>>();
blockCache = new Map<string, Atom<any>>();
blockAtomCache.set(blockId, blockCache);
}
let atom = blockCache.get(name);
@ -346,106 +273,16 @@ function useBlockAtom<T>(blockId: string, name: string, makeFn: () => jotai.Atom
blockCache.set(name, atom);
console.log("New BlockAtom", blockId, name);
}
return atom as jotai.Atom<T>;
return atom as Atom<T>;
}
function useBlockDataLoaded(blockId: string): boolean {
const loadedAtom = useBlockAtom<boolean>(blockId, "block-loaded", () => {
return WOS.getWaveObjectLoadingAtom(WOS.makeORef("block", blockId));
});
return jotai.useAtomValue(loadedAtom);
return useAtomValue(loadedAtom);
}
let globalWS: WSControl = null;
function handleWaveEvent(event: WaveEvent) {
const subjects = waveEventSubjects.get(event.event);
if (subjects == null) {
return;
}
for (const scont of subjects) {
if (util.isBlank(scont.scope)) {
scont.handler(event);
continue;
}
if (event.scopes == null) {
continue;
}
if (event.scopes.includes(scont.scope)) {
scont.handler(event);
}
}
}
function handleWSEventMessage(msg: WSEventType) {
if (msg.eventtype == null) {
console.warn("unsupported WSEvent", msg);
return;
}
if (msg.eventtype == "config") {
const fullConfig = (msg.data as WatcherUpdate).fullconfig;
globalStore.set(atoms.fullConfigAtom, fullConfig);
return;
}
if (msg.eventtype == "userinput") {
const data: UserInputRequest = msg.data;
modalsModel.pushModal("UserInputModal", { ...data });
return;
}
if (msg.eventtype == "blockfile") {
const fileData: WSFileEventData = msg.data;
const fileSubject = getFileSubject(fileData.zoneid, fileData.filename);
if (fileSubject != null) {
fileSubject.next(fileData);
}
return;
}
if (msg.eventtype == "rpc") {
const rpcMsg: RpcMessage = msg.data;
handleIncomingRpcMessage(rpcMsg, handleWaveEvent);
return;
}
// we send to two subjects just eventType and eventType|oref
// we don't use getORefSubject here because we don't want to create a new subject
const eventSubject = eventSubjects.get(msg.eventtype);
if (eventSubject != null) {
eventSubject.next(msg);
}
const eventOrefSubject = eventSubjects.get(msg.eventtype + "|" + msg.oref);
if (eventOrefSubject != null) {
eventOrefSubject.next(msg);
}
}
function handleWSMessage(msg: any) {
if (msg == null) {
return;
}
if (msg.eventtype != null) {
handleWSEventMessage(msg);
}
}
function initWS() {
const windowId = globalStore.get(atoms.windowId);
globalWS = new WSControl(getWSServerEndpoint(), globalStore, windowId, "", (msg) => {
handleWSMessage(msg);
});
globalWS.connectNow("initWS");
}
function sendWSCommand(command: WSCommandType) {
globalWS.pushMessage(command);
}
// more code that could be moved into an init
// here we want to set up a "waveobj:update" handler
const waveobjUpdateSubject = getEventSubject("waveobj:update");
waveobjUpdateSubject.subscribe((msg: WSEventType) => {
const update: WaveObjUpdate = msg.data;
WOS.updateWaveObject(update);
});
/**
* Get the preload api.
*/
@ -455,7 +292,7 @@ function getApi(): ElectronApi {
async function createBlock(blockDef: BlockDef, magnified = false): Promise<string> {
const rtOpts: RuntimeOpts = { termsize: { rows: 25, cols: 80 } };
const blockId = await services.ObjectService.CreateBlock(blockDef, rtOpts);
const blockId = await ObjectService.CreateBlock(blockDef, rtOpts);
const insertNodeAction: LayoutTreeInsertNodeAction = {
type: LayoutTreeActionType.InsertNode,
node: newLayoutNode(undefined, undefined, undefined, { blockId }),
@ -608,7 +445,7 @@ function countersPrint() {
}
async function loadConnStatus() {
const connStatusArr = await services.ClientService.GetAllConnStatus();
const connStatusArr = await ClientService.GetAllConnStatus();
if (connStatusArr == null) {
return;
}
@ -619,25 +456,28 @@ async function loadConnStatus() {
}
function subscribeToConnEvents() {
waveEventSubscribe("connchange", null, (event: WaveEvent) => {
try {
const connStatus = event.data as ConnStatus;
if (connStatus == null || util.isBlank(connStatus.connection)) {
return;
waveEventSubscribe({
eventType: "connchange",
handler: (event: WaveEvent) => {
try {
const connStatus = event.data as ConnStatus;
if (connStatus == null || isBlank(connStatus.connection)) {
return;
}
console.log("connstatus update", connStatus);
let curAtom = getConnStatusAtom(connStatus.connection);
globalStore.set(curAtom, connStatus);
} catch (e) {
console.log("connchange error", e);
}
console.log("connstatus update", connStatus);
let curAtom = getConnStatusAtom(connStatus.connection);
globalStore.set(curAtom, connStatus);
} catch (e) {
console.log("connchange error", e);
}
},
});
}
function getConnStatusAtom(conn: string): jotai.PrimitiveAtom<ConnStatus> {
function getConnStatusAtom(conn: string): PrimitiveAtom<ConnStatus> {
let rtn = ConnStatusMap.get(conn);
if (rtn == null) {
if (util.isBlank(conn)) {
if (isBlank(conn)) {
// create a fake "local" status atom that's always connected
const connStatus: ConnStatus = {
connection: conn,
@ -647,7 +487,7 @@ function getConnStatusAtom(conn: string): jotai.PrimitiveAtom<ConnStatus> {
hasconnected: true,
activeconnnum: 0,
};
rtn = jotai.atom(connStatus);
rtn = atom(connStatus);
} else {
const connStatus: ConnStatus = {
connection: conn,
@ -657,7 +497,7 @@ function getConnStatusAtom(conn: string): jotai.PrimitiveAtom<ConnStatus> {
hasconnected: false,
activeconnnum: 0,
};
rtn = jotai.atom(connStatus);
rtn = atom(connStatus);
}
ConnStatusMap.set(conn, rtn);
}
@ -674,23 +514,18 @@ export {
getApi,
getBlockComponentModel,
getConnStatusAtom,
getEventORefSubject,
getEventSubject,
getFileSubject,
getHostName,
getObjectId,
getUserName,
globalStore,
globalWS,
initGlobal,
initWS,
initGlobalWaveEventSubs,
isDev,
loadConnStatus,
openLink,
PLATFORM,
refocusNode,
registerBlockComponentModel,
sendWSCommand,
setNodeFocus,
setPlatform,
subscribeToConnEvents,
@ -700,7 +535,5 @@ export {
useBlockDataLoaded,
useSettingsKeyAtom,
useSettingsPrefixAtom,
waveEventSubscribe,
waveEventUnsubscribe,
WOS,
};

View File

@ -5,10 +5,10 @@
import { getWebServerEndpoint } from "@/util/endpoints";
import { fetch } from "@/util/fetchutil";
import * as jotai from "jotai";
import * as React from "react";
import { atom, Atom, Getter, PrimitiveAtom, Setter, useAtomValue } from "jotai";
import { useEffect } from "react";
import { atoms, globalStore } from "./global";
import * as services from "./services";
import { ObjectService } from "./services";
type WaveObjectDataItemType<T extends WaveObj> = {
value: T;
@ -17,7 +17,7 @@ type WaveObjectDataItemType<T extends WaveObj> = {
type WaveObjectValue<T extends WaveObj> = {
pendingPromise: Promise<T>;
dataAtom: jotai.PrimitiveAtom<WaveObjectDataItemType<T>>;
dataAtom: PrimitiveAtom<WaveObjectDataItemType<T>>;
refCount: number;
holdTime: number;
};
@ -132,7 +132,7 @@ const defaultHoldTime = 5000; // 5-seconds
function createWaveValueObject<T extends WaveObj>(oref: string, shouldFetch: boolean): WaveObjectValue<T> {
const wov = { pendingPromise: null, dataAtom: null, refCount: 0, holdTime: Date.now() + 5000 };
wov.dataAtom = jotai.atom({ value: null, loading: true });
wov.dataAtom = atom({ value: null, loading: true });
if (!shouldFetch) {
return wov;
}
@ -180,7 +180,7 @@ function loadAndPinWaveObject<T extends WaveObj>(oref: string): Promise<T> {
function getWaveObjectAtom<T extends WaveObj>(oref: string): WritableWaveObjectAtom<T> {
const wov = getWaveObjectValue<T>(oref);
return jotai.atom(
return atom(
(get) => get(wov.dataAtom).value,
(_get, set, value: T) => {
setObjectValue(value, set, true);
@ -188,9 +188,9 @@ function getWaveObjectAtom<T extends WaveObj>(oref: string): WritableWaveObjectA
);
}
function getWaveObjectLoadingAtom(oref: string): jotai.Atom<boolean> {
function getWaveObjectLoadingAtom(oref: string): Atom<boolean> {
const wov = getWaveObjectValue(oref);
return jotai.atom((get) => {
return atom((get) => {
const dataValue = get(wov.dataAtom);
if (dataValue.loading) {
return null;
@ -201,13 +201,13 @@ function getWaveObjectLoadingAtom(oref: string): jotai.Atom<boolean> {
function useWaveObjectValue<T extends WaveObj>(oref: string): [T, boolean] {
const wov = getWaveObjectValue<T>(oref);
React.useEffect(() => {
useEffect(() => {
wov.refCount++;
return () => {
wov.refCount--;
};
}, [oref]);
const atomVal = jotai.useAtomValue(wov.dataAtom);
const atomVal = useAtomValue(wov.dataAtom);
return [atomVal.value, atomVal.loading];
}
@ -254,7 +254,7 @@ function cleanWaveObjectCache() {
// gets the value of a WaveObject from the cache.
// should provide getFn if it is available (e.g. inside of a jotai atom)
// otherwise it will use the globalStore.get function
function getObjectValue<T extends WaveObj>(oref: string, getFn?: jotai.Getter): T {
function getObjectValue<T extends WaveObj>(oref: string, getFn?: Getter): T {
const wov = getWaveObjectValue<T>(oref);
if (getFn == null) {
getFn = globalStore.get;
@ -266,7 +266,7 @@ function getObjectValue<T extends WaveObj>(oref: string, getFn?: jotai.Getter):
// sets the value of a WaveObject in the cache.
// should provide setFn if it is available (e.g. inside of a jotai atom)
// otherwise it will use the globalStore.set function
function setObjectValue<T extends WaveObj>(value: T, setFn?: jotai.Setter, pushToServer?: boolean) {
function setObjectValue<T extends WaveObj>(value: T, setFn?: Setter, pushToServer?: boolean) {
const oref = makeORef(value.otype, value.oid);
const wov = getWaveObjectValue(oref, false);
if (wov === undefined) {
@ -277,7 +277,7 @@ function setObjectValue<T extends WaveObj>(value: T, setFn?: jotai.Setter, pushT
}
setFn(wov.dataAtom, { value: value, loading: false });
if (pushToServer) {
services.ObjectService.UpdateObject(value, false);
ObjectService.UpdateObject(value, false);
}
}

141
frontend/app/store/wps.ts Normal file
View File

@ -0,0 +1,141 @@
import { isBlank } from "@/util/util";
import { Subject } from "rxjs";
import { sendRawRpcMessage, setRpcEventHandlerFn } from "./wshrpc";
type WaveEventSubject = {
handler: (event: WaveEvent) => void;
scope?: string;
};
type WaveEventSubjectContainer = WaveEventSubject & {
id: string;
};
type WaveEventSubscription = WaveEventSubject & {
eventType: string;
};
type WaveEventUnsubscribe = {
id: string;
eventType: string;
};
// key is "eventType" or "eventType|oref"
const fileSubjects = new Map<string, SubjectWithRef<WSFileEventData>>();
const waveEventSubjects = new Map<string, WaveEventSubjectContainer[]>();
function makeWaveReSubCommand(eventType: string): RpcMessage {
let subjects = waveEventSubjects.get(eventType);
if (subjects == null) {
return { command: "eventunsub", data: eventType };
}
let subreq: SubscriptionRequest = { event: eventType, scopes: [], allscopes: false };
for (const scont of subjects) {
if (isBlank(scont.scope)) {
subreq.allscopes = true;
subreq.scopes = [];
break;
}
subreq.scopes.push(scont.scope);
}
return { command: "eventsub", data: subreq };
}
function updateWaveEventSub(eventType: string) {
const command = makeWaveReSubCommand(eventType);
// console.log("updateWaveEventSub", eventType, command);
sendRawRpcMessage(command);
}
function waveEventSubscribe(...subscriptions: WaveEventSubscription[]): () => void {
const unsubs: WaveEventUnsubscribe[] = [];
const eventTypeSet = new Set<string>();
for (const subscription of subscriptions) {
// console.log("waveEventSubscribe", subscription);
if (subscription.handler == null) {
return;
}
const id: string = crypto.randomUUID();
let subjects = waveEventSubjects.get(subscription.eventType);
if (subjects == null) {
subjects = [];
waveEventSubjects.set(subscription.eventType, subjects);
}
const subcont: WaveEventSubjectContainer = { id, handler: subscription.handler, scope: subscription.scope };
subjects.push(subcont);
unsubs.push({ id, eventType: subscription.eventType });
eventTypeSet.add(subscription.eventType);
}
for (const eventType of eventTypeSet) {
updateWaveEventSub(eventType);
}
return () => waveEventUnsubscribe(...unsubs);
}
function waveEventUnsubscribe(...unsubscribes: WaveEventUnsubscribe[]) {
const eventTypeSet = new Set<string>();
for (const unsubscribe of unsubscribes) {
let subjects = waveEventSubjects.get(unsubscribe.eventType);
if (subjects == null) {
return;
}
const idx = subjects.findIndex((s) => s.id === unsubscribe.id);
if (idx === -1) {
return;
}
subjects.splice(idx, 1);
if (subjects.length === 0) {
waveEventSubjects.delete(unsubscribe.eventType);
}
eventTypeSet.add(unsubscribe.eventType);
}
for (const eventType of eventTypeSet) {
updateWaveEventSub(eventType);
}
}
function getFileSubject(zoneId: string, fileName: string): SubjectWithRef<WSFileEventData> {
const subjectKey = zoneId + "|" + fileName;
let subject = fileSubjects.get(subjectKey);
if (subject == null) {
subject = new Subject<any>() as any;
subject.refCount = 0;
subject.release = () => {
subject.refCount--;
if (subject.refCount === 0) {
subject.complete();
fileSubjects.delete(subjectKey);
}
};
fileSubjects.set(subjectKey, subject);
}
subject.refCount++;
return subject;
}
function handleWaveEvent(event: WaveEvent) {
// console.log("handleWaveEvent", event);
const subjects = waveEventSubjects.get(event.event);
if (subjects == null) {
return;
}
for (const scont of subjects) {
if (isBlank(scont.scope)) {
scont.handler(event);
continue;
}
if (event.scopes == null) {
continue;
}
if (event.scopes.includes(scont.scope)) {
scont.handler(event);
}
}
}
function initWps() {
setRpcEventHandlerFn(handleWaveEvent);
}
export { getFileSubject, initWps, waveEventSubscribe, waveEventUnsubscribe };

View File

@ -1,45 +1,31 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import * as jotai from "jotai";
import { sprintf } from "sprintf-js";
const MaxWebSocketSendSize = 1024 * 1024; // 1MB
type JotaiStore = {
get: <Value>(atom: jotai.Atom<Value>) => Value;
set: <Value>(atom: jotai.WritableAtom<Value, [Value], void>, value: Value) => void;
};
type WSEventCallback = (arg0: WSEventType) => void;
class WSControl {
wsConn: any;
open: jotai.WritableAtom<boolean, [boolean], void>;
open: boolean;
opening: boolean = false;
reconnectTimes: number = 0;
msgQueue: any[] = [];
windowId: string;
messageCallback: (any) => void = null;
messageCallback: WSEventCallback;
watchSessionId: string = null;
watchScreenId: string = null;
wsLog: string[] = [];
authKey: string;
baseHostPort: string;
lastReconnectTime: number = 0;
jotaiStore: JotaiStore;
constructor(
baseHostPort: string,
jotaiStore: JotaiStore,
windowId: string,
authKey: string,
messageCallback: (any) => void
) {
constructor(baseHostPort: string, windowId: string, messageCallback: WSEventCallback) {
this.baseHostPort = baseHostPort;
this.messageCallback = messageCallback;
this.windowId = windowId;
this.authKey = authKey;
this.open = jotai.atom(false);
this.jotaiStore = jotaiStore;
this.open = false;
setInterval(this.sendPing.bind(this), 5000);
}
@ -51,16 +37,8 @@ class WSControl {
}
}
setOpen(val: boolean) {
this.jotaiStore.set(this.open, val);
}
isOpen() {
return this.jotaiStore.get(this.open);
}
connectNow(desc: string) {
if (this.isOpen()) {
if (this.open) {
return;
}
this.lastReconnectTime = Date.now();
@ -75,7 +53,7 @@ class WSControl {
}
reconnect(forceClose?: boolean) {
if (this.isOpen()) {
if (this.open) {
if (forceClose) {
this.wsConn.close(); // this will force a reconnect
}
@ -109,8 +87,8 @@ class WSControl {
} else {
this.log("connection error/disconnected");
}
if (this.isOpen() || this.opening) {
this.setOpen(false);
if (this.open || this.opening) {
this.open = false;
this.opening = false;
this.reconnect();
}
@ -118,14 +96,14 @@ class WSControl {
onopen() {
this.log("connection open");
this.setOpen(true);
this.open = true;
this.opening = false;
this.runMsgQueue();
// reconnectTimes is reset in onmessage:hello
}
runMsgQueue() {
if (!this.isOpen()) {
if (!this.open) {
return;
}
if (this.msgQueue.length == 0) {
@ -168,14 +146,14 @@ class WSControl {
}
sendPing() {
if (!this.isOpen()) {
if (!this.open) {
return;
}
this.wsConn.send(JSON.stringify({ type: "ping", stime: Date.now() }));
}
sendMessage(data: WSCommandType) {
if (!this.isOpen()) {
if (!this.open) {
return;
}
const msg = JSON.stringify(data);
@ -188,7 +166,7 @@ class WSControl {
}
pushMessage(data: WSCommandType) {
if (!this.isOpen()) {
if (!this.open) {
this.msgQueue.push(data);
return;
}

View File

@ -1,7 +1,8 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { globalWS } from "./global";
import { getWSServerEndpoint } from "@/util/endpoints";
import { WSControl } from "./ws";
type RpcEntry = {
reqId: string;
@ -119,12 +120,12 @@ async function* rpcResponseGenerator(
function sendRpcCancel(reqid: string) {
const rpcMsg: RpcMessage = { reqid: reqid, cancel: true };
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: rpcMsg };
globalWS.pushMessage(wsMsg);
sendWSCommand(wsMsg);
}
function sendRpcCommand(msg: RpcMessage): AsyncGenerator<RpcMessage, void, boolean> {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
globalWS.pushMessage(wsMsg);
sendWSCommand(wsMsg);
if (msg.reqid == null) {
return null;
}
@ -135,19 +136,30 @@ function sendRpcCommand(msg: RpcMessage): AsyncGenerator<RpcMessage, void, boole
function sendRawRpcMessage(msg: RpcMessage) {
const wsMsg: WSRpcCommand = { wscommand: "rpc", message: msg };
globalWS.pushMessage(wsMsg);
sendWSCommand(wsMsg);
}
const notFoundLogMap = new Map<string, boolean>();
function handleIncomingRpcMessage(msg: RpcMessage, eventHandlerFn: (event: WaveEvent) => void) {
let rpcEventHandlerFn: (evt: WaveEvent) => void;
function setRpcEventHandlerFn(fn: (evt: WaveEvent) => void) {
if (rpcEventHandlerFn) {
throw new Error("wshrpc.setRpcEventHandlerFn called more than once");
}
rpcEventHandlerFn = fn;
}
function handleIncomingRpcMessage(event: WSEventType) {
if (event.eventtype !== "rpc") {
console.warn("unsupported ws event type:", event.eventtype, event);
}
const msg: RpcMessage = event.data;
const isRequest = msg.command != null || msg.reqid != null;
if (isRequest) {
// handle events
if (msg.command == "eventrecv") {
if (eventHandlerFn != null) {
eventHandlerFn(msg.data);
}
rpcEventHandlerFn?.(msg.data);
return;
}
if (msg.command == "message") {
@ -195,10 +207,24 @@ if (globalThis.window != null) {
globalThis["consumeGenerator"] = consumeGenerator;
}
let globalWS: WSControl;
function initWshrpc(windowId: string) {
globalWS = new WSControl(getWSServerEndpoint(), windowId, handleIncomingRpcMessage);
globalWS.connectNow("connectWshrpc");
}
function sendWSCommand(cmd: WSCommandType) {
globalWS?.sendMessage(cmd);
}
export {
handleIncomingRpcMessage,
initWshrpc,
sendRawRpcMessage,
sendRpcCommand,
sendWSCommand,
setRpcEventHandlerFn,
wshServerRpcHelper_call,
wshServerRpcHelper_responsestream,
};

View File

@ -3,7 +3,7 @@
import { useHeight } from "@/app/hook/useHeight";
import { useWidth } from "@/app/hook/useWidth";
import { getConnStatusAtom, globalStore, waveEventSubscribe, WOS } from "@/store/global";
import { getConnStatusAtom, globalStore, WOS } from "@/store/global";
import { WshServer } from "@/store/wshserver";
import * as util from "@/util/util";
import * as Plot from "@observablehq/plot";
@ -12,6 +12,7 @@ import * as htl from "htl";
import * as jotai from "jotai";
import * as React from "react";
import { waveEventSubscribe } from "@/app/store/wps";
import "./cpuplot.less";
const DefaultNumPoints = 120;
@ -192,13 +193,17 @@ function CpuPlotView({ model, blockId }: CpuPlotViewProps) {
lastConnName.current = connName;
model.loadInitialData();
}
const unsubFn = waveEventSubscribe("sysinfo", connName, (event: WaveEvent) => {
const loading = globalStore.get(model.loadingAtom);
if (loading) {
return;
}
const dataItem = convertWaveEventToDataItem(event);
addPlotData([dataItem]);
const unsubFn = waveEventSubscribe({
eventType: "sysinfo",
scope: connName,
handler: (event) => {
const loading = globalStore.get(model.loadingAtom);
if (loading) {
return;
}
const dataItem = convertWaveEventToDataItem(event);
addPlotData([dataItem]);
},
});
return () => {
unsubFn();

View File

@ -1,9 +1,10 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { waveEventSubscribe } from "@/app/store/wps";
import { WshServer } from "@/app/store/wshserver";
import { VDomView } from "@/app/view/term/vdom";
import { WOS, atoms, getConnStatusAtom, getEventORefSubject, globalStore, useSettingsPrefixAtom } from "@/store/global";
import { WOS, atoms, getConnStatusAtom, globalStore, useSettingsPrefixAtom } from "@/store/global";
import * as services from "@/store/services";
import * as keyutil from "@/util/keyutil";
import * as util from "@/util/util";
@ -379,12 +380,15 @@ const TerminalView = ({ blockId, model }: TerminalViewProps) => {
initialRTStatus.then((rts) => {
updateShellProcStatus(rts?.shellprocstatus);
});
const bcSubject = getEventORefSubject("blockcontroller:status", WOS.makeORef("block", blockId));
const sub = bcSubject.subscribe((data: WSEventType) => {
let bcRTS: BlockControllerRuntimeStatus = data.data;
updateShellProcStatus(bcRTS?.shellprocstatus);
return waveEventSubscribe({
eventType: "controllerstatus",
scope: WOS.makeORef("block", blockId),
handler: (event) => {
console.log("term waveEvent handler", event);
let bcRTS: BlockControllerRuntimeStatus = event.data;
updateShellProcStatus(bcRTS?.shellprocstatus);
},
});
return () => sub.unsubscribe();
}, []);
let stickerConfig = {

View File

@ -1,17 +1,10 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { getFileSubject } from "@/app/store/wps";
import { sendWSCommand } from "@/app/store/wshrpc";
import { WshServer } from "@/app/store/wshserver";
import {
PLATFORM,
WOS,
atoms,
fetchWaveFile,
getFileSubject,
globalStore,
openLink,
sendWSCommand,
} from "@/store/global";
import { PLATFORM, WOS, atoms, fetchWaveFile, globalStore, openLink } from "@/store/global";
import * as services from "@/store/services";
import * as util from "@/util/util";
import { base64ToArray, fireAndForget } from "@/util/util";

View File

@ -444,7 +444,7 @@ declare global {
display: StickerDisplayOptsType;
};
// wshrpc.SubscriptionRequest
// wps.SubscriptionRequest
type SubscriptionRequest = {
event: string;
scopes?: string[];
@ -560,7 +560,7 @@ declare global {
data: any;
};
// eventbus.WSFileEventData
// wps.WSFileEventData
type WSFileEventData = {
zoneid: string;
filename: string;
@ -579,7 +579,7 @@ declare global {
fullconfig: FullConfigType;
};
// wshrpc.WaveEvent
// wps.WaveEvent
type WaveEvent = {
event: string;
scopes?: string[];

View File

@ -1,11 +1,15 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
import { App } from "@/app/app";
import {
registerControlShiftStateUpdateHandler,
registerElectronReinjectKeyHandler,
registerGlobalKeys,
} from "@/app/store/keymodel";
import { FileService, ObjectService } from "@/app/store/services";
import { initWps } from "@/app/store/wps";
import { initWshrpc } from "@/app/store/wshrpc";
import { WshServer } from "@/app/store/wshserver";
import { loadMonaco } from "@/app/view/codeeditor/codeeditor";
import { getLayoutModelForActiveTab } from "@/layout/index";
@ -15,19 +19,16 @@ import {
countersPrint,
getApi,
globalStore,
globalWS,
initGlobal,
initWS,
initGlobalWaveEventSubs,
loadConnStatus,
subscribeToConnEvents,
} from "@/store/global";
import * as services from "@/store/services";
import * as WOS from "@/store/wos";
import * as keyutil from "@/util/keyutil";
import * as React from "react";
import { loadFonts } from "@/util/fontutil";
import { setKeyUtilPlatform } from "@/util/keyutil";
import { createElement } from "react";
import { createRoot } from "react-dom/client";
import { App } from "./app/app";
import { loadFonts } from "./util/fontutil";
const platform = getApi().getPlatform();
const urlParams = new URLSearchParams(window.location.search);
@ -39,10 +40,9 @@ console.log("clientid", clientId, "windowid", windowId);
initGlobal({ clientId, windowId, platform, environment: "renderer" });
keyutil.setKeyUtilPlatform(platform);
setKeyUtilPlatform(platform);
loadFonts();
(window as any).globalWS = globalWS;
(window as any).WOS = WOS;
(window as any).globalStore = globalStore;
(window as any).globalAtoms = atoms;
@ -56,27 +56,33 @@ document.title = `The Next Wave (${windowId.substring(0, 8)})`;
document.addEventListener("DOMContentLoaded", async () => {
console.log("DOMContentLoaded");
// Init WPS event handlers
initWshrpc(windowId);
await loadConnStatus();
initWps();
initGlobalWaveEventSubs();
subscribeToConnEvents();
// ensures client/window/workspace are loaded into the cache before rendering
const client = await WOS.loadAndPinWaveObject<Client>(WOS.makeORef("client", clientId));
const waveWindow = await WOS.loadAndPinWaveObject<WaveWindow>(WOS.makeORef("window", windowId));
await WOS.loadAndPinWaveObject<Workspace>(WOS.makeORef("workspace", waveWindow.workspaceid));
const initialTab = await WOS.loadAndPinWaveObject<Tab>(WOS.makeORef("tab", waveWindow.activetabid));
await WOS.loadAndPinWaveObject<LayoutState>(WOS.makeORef("layout", initialTab.layoutstate));
initWS();
await loadConnStatus();
subscribeToConnEvents();
registerGlobalKeys();
registerElectronReinjectKeyHandler();
registerControlShiftStateUpdateHandler();
setTimeout(loadMonaco, 30);
const fullConfig = await services.FileService.GetFullConfig();
const fullConfig = await FileService.GetFullConfig();
console.log("fullconfig", fullConfig);
globalStore.set(atoms.fullConfigAtom, fullConfig);
const prtn = services.ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait
const prtn = ObjectService.SetActiveTab(waveWindow.activetabid); // no need to wait
prtn.catch((e) => {
console.log("error on initial SetActiveTab", e);
});
const reactElem = React.createElement(App, null, null);
const reactElem = createElement(App, null, null);
const elem = document.getElementById("main");
const root = createRoot(elem);
document.fonts.ready.then(() => {

View File

@ -14,7 +14,6 @@ import (
"sync"
"time"
"github.com/wavetermdev/waveterm/pkg/eventbus"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/remote"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
@ -112,20 +111,14 @@ func (bc *BlockController) UpdateControllerAndSendUpdate(updateFn func() bool) {
if sendUpdate {
rtStatus := bc.GetRuntimeStatus()
log.Printf("sending blockcontroller update %#v\n", rtStatus)
go eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockControllerStatus,
ORef: waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(),
Data: rtStatus,
})
waveEvent := wshrpc.WaveEvent{
Event: wshrpc.Event_ControllerStatus,
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_ControllerStatus,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, bc.TabId).String(),
waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(),
},
Data: rtStatus,
}
wps.Broker.Publish(waveEvent)
})
}
}
@ -139,13 +132,13 @@ func HandleTruncateBlockFile(blockId string, blockFile string) error {
if err != nil {
return fmt.Errorf("error truncating blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(waveobj.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, blockId).String()},
Data: &wps.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Truncate,
FileOp: wps.FileOp_Truncate,
},
})
return nil
@ -159,13 +152,15 @@ func HandleAppendBlockFile(blockId string, blockFile string, data []byte) error
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(waveobj.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Block, blockId).String(),
},
Data: &wps.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Append,
FileOp: wps.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(data),
},
})
@ -423,7 +418,7 @@ func setTermSize(ctx context.Context, blockId string, termSize waveobj.TermSize)
}
bdata.RuntimeOpts.TermSize = termSize
updates := waveobj.ContextGetUpdatesRtn(ctx)
eventbus.SendUpdateEvents(updates)
wps.Broker.SendUpdateEvents(updates)
return nil
}

View File

@ -15,15 +15,9 @@ import (
)
const (
WSEvent_WaveObjUpdate = "waveobj:update"
WSEvent_BlockFile = "blockfile"
WSEvent_Config = "config"
WSEvent_UserInput = "userinput"
WSEvent_BlockControllerStatus = "blockcontroller:status"
WSEvent_LayoutAction = "layoutaction"
WSEvent_ElectronNewWindow = "electron:newwindow"
WSEvent_ElectronCloseWindow = "electron:closewindow"
WSEvent_Rpc = "rpc"
WSEvent_ElectronNewWindow = "electron:newwindow"
WSEvent_ElectronCloseWindow = "electron:closewindow"
WSEvent_Rpc = "rpc"
)
type WSEventType struct {
@ -32,19 +26,6 @@ type WSEventType struct {
Data any `json:"data"`
}
const (
FileOp_Append = "append"
FileOp_Truncate = "truncate"
FileOp_Invalidate = "invalidate"
)
type WSFileEventData struct {
ZoneId string `json:"zoneid"`
FileName string `json:"filename"`
FileOp string `json:"fileop"`
Data64 string `json:"data64"`
}
type WindowWatchData struct {
WindowWSCh chan any
WaveWindowId string
@ -97,40 +78,6 @@ func BusyWaitForWindowId(windowId string, timeout time.Duration) bool {
}
}
func getAllWatches() []*WindowWatchData {
globalLock.Lock()
defer globalLock.Unlock()
watches := make([]*WindowWatchData, 0, len(wsMap))
for _, wdata := range wsMap {
watches = append(watches, wdata)
}
return watches
}
func SendEventToWindow(windowId string, event WSEventType) {
wwdArr := getWindowWatchesForWindowId(windowId)
for _, wdata := range wwdArr {
wdata.WindowWSCh <- event
}
}
func SendEvent(event WSEventType) {
wwdArr := getAllWatches()
for _, wdata := range wwdArr {
wdata.WindowWSCh <- event
}
}
func SendUpdateEvents(updates waveobj.UpdatesRtnType) {
for _, update := range updates {
SendEvent(WSEventType{
EventType: WSEvent_WaveObjUpdate,
ORef: waveobj.MakeORef(update.OType, update.OID).String(),
Data: update,
})
}
}
func SendEventToElectron(event WSEventType) {
barr, err := json.Marshal(event)
if err != nil {

View File

@ -85,8 +85,8 @@ func (conn *SSHConn) DeriveConnStatus() wshrpc.ConnStatus {
func (conn *SSHConn) FireConnChangeEvent() {
status := conn.DeriveConnStatus()
event := wshrpc.WaveEvent{
Event: wshrpc.Event_ConnChange,
event := wps.WaveEvent{
Event: wps.Event_ConnChange,
Scopes: []string{
fmt.Sprintf("connection:%s", conn.GetName()),
},

View File

@ -20,6 +20,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wconfig"
"github.com/wavetermdev/waveterm/pkg/web/webcmd"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshutil"
)
@ -33,7 +34,7 @@ var ExtraTypes = []any{
service.WebReturnType{},
waveobj.UIContext{},
eventbus.WSEventType{},
eventbus.WSFileEventData{},
wps.WSFileEventData{},
waveobj.LayoutActionData{},
filestore.WaveFile{},
wconfig.FullConfigType{},

View File

@ -11,7 +11,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/pkg/eventbus"
"github.com/wavetermdev/waveterm/pkg/wps"
)
var MainUserInputHandler = UserInputHandler{Channels: make(map[string](chan *UserInputResponse), 1)}
@ -60,9 +60,9 @@ func (ui *UserInputHandler) unregisterChannel(id string) {
}
func (ui *UserInputHandler) sendRequestToFrontend(request *UserInputRequest) {
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_UserInput,
Data: request,
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_UserInput,
Data: request,
})
}

View File

@ -10,8 +10,8 @@ import (
"sync"
"github.com/fsnotify/fsnotify"
"github.com/wavetermdev/waveterm/pkg/eventbus"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/wps"
)
var configDirAbsPath = filepath.Join(wavebase.GetWaveHomeDir(), wavebase.ConfigDir)
@ -95,9 +95,9 @@ func (w *Watcher) Close() {
func (w *Watcher) broadcast(message WatcherUpdate) {
// send to frontend
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_Config,
Data: message,
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_Config,
Data: message,
})
}

View File

@ -13,7 +13,6 @@ import (
"github.com/wavetermdev/waveterm/pkg/blockcontroller"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wstore"
)
@ -36,8 +35,8 @@ func DeleteBlock(ctx context.Context, tabId string, blockId string) error {
}
func sendBlockCloseEvent(tabId string, blockId string) {
waveEvent := wshrpc.WaveEvent{
Event: wshrpc.Event_BlockClose,
waveEvent := wps.WaveEvent{
Event: wps.Event_BlockClose,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, tabId).String(),
waveobj.MakeORef(waveobj.OType_Block, blockId).String(),

View File

@ -10,7 +10,7 @@ import (
"sync"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/waveobj"
)
// this broker interface is mostly generic
@ -20,7 +20,7 @@ const MaxPersist = 4096
const ReMakeArrThreshold = 10 * 1024
type Client interface {
SendEvent(routeId string, event wshrpc.WaveEvent)
SendEvent(routeId string, event WaveEvent)
}
type BrokerSubscription struct {
@ -36,7 +36,7 @@ type persistKey struct {
type persistEventWrap struct {
ArrTotalAdds int
Events []*wshrpc.WaveEvent
Events []*WaveEvent
}
type BrokerType struct {
@ -75,7 +75,7 @@ func (b *BrokerType) GetClient() Client {
}
// if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)
func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) {
func (b *BrokerType) Subscribe(subRouteId string, sub SubscriptionRequest) {
log.Printf("[wps] sub %s %s\n", subRouteId, sub.Event)
if sub.Event == "" {
return
@ -175,7 +175,7 @@ func (b *BrokerType) UnsubscribeAll(subRouteId string) {
}
// does not take wildcards, use "" for all
func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*wshrpc.WaveEvent {
func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*WaveEvent {
if maxItems <= 0 {
return nil
}
@ -190,12 +190,12 @@ func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems i
maxItems = len(pe.Events)
}
// return new arr
rtn := make([]*wshrpc.WaveEvent, maxItems)
rtn := make([]*WaveEvent, maxItems)
copy(rtn, pe.Events[len(pe.Events)-maxItems:])
return rtn
}
func (b *BrokerType) persistEvent(event wshrpc.WaveEvent) {
func (b *BrokerType) persistEvent(event WaveEvent) {
if event.Persist <= 0 {
return
}
@ -216,20 +216,21 @@ func (b *BrokerType) persistEvent(event wshrpc.WaveEvent) {
if pe == nil {
pe = &persistEventWrap{
ArrTotalAdds: 0,
Events: make([]*wshrpc.WaveEvent, 0, event.Persist),
Events: make([]*WaveEvent, 0, event.Persist),
}
b.PersistMap[key] = pe
}
pe.Events = append(pe.Events, &event)
pe.ArrTotalAdds++
if pe.ArrTotalAdds > ReMakeArrThreshold {
pe.Events = append([]*wshrpc.WaveEvent{}, pe.Events...)
pe.Events = append([]*WaveEvent{}, pe.Events...)
pe.ArrTotalAdds = len(pe.Events)
}
}
}
func (b *BrokerType) Publish(event wshrpc.WaveEvent) {
func (b *BrokerType) Publish(event WaveEvent) {
// log.Printf("BrokerType.Publish: %v\n", event)
if event.Persist > 0 {
b.persistEvent(event)
}
@ -243,7 +244,17 @@ func (b *BrokerType) Publish(event wshrpc.WaveEvent) {
}
}
func (b *BrokerType) getMatchingRouteIds(event wshrpc.WaveEvent) []string {
func (b *BrokerType) SendUpdateEvents(updates waveobj.UpdatesRtnType) {
for _, update := range updates {
b.Publish(WaveEvent{
Event: Event_WaveObjUpdate,
Scopes: []string{waveobj.MakeORef(update.OType, update.OID).String()},
Data: update,
})
}
}
func (b *BrokerType) getMatchingRouteIds(event WaveEvent) []string {
b.Lock.Lock()
defer b.Lock.Unlock()
bs := b.SubMap[event.Event]
@ -270,5 +281,6 @@ func (b *BrokerType) getMatchingRouteIds(event wshrpc.WaveEvent) []string {
for routeId := range routeIds {
rtn = append(rtn, routeId)
}
// log.Printf("getMatchingRouteIds %v %v\n", event, rtn)
return rtn
}

45
pkg/wps/wpstypes.go Normal file
View File

@ -0,0 +1,45 @@
package wps
import "github.com/wavetermdev/waveterm/pkg/util/utilfn"
const (
Event_BlockClose = "blockclose"
Event_ConnChange = "connchange"
Event_SysInfo = "sysinfo"
Event_ControllerStatus = "controllerstatus"
Event_WaveObjUpdate = "waveobj:update"
Event_BlockFile = "blockfile"
Event_Config = "config"
Event_UserInput = "userinput"
)
type WaveEvent struct {
Event string `json:"event"`
Scopes []string `json:"scopes,omitempty"`
Sender string `json:"sender,omitempty"`
Persist int `json:"persist,omitempty"`
Data any `json:"data,omitempty"`
}
func (e WaveEvent) HasScope(scope string) bool {
return utilfn.ContainsStr(e.Scopes, scope)
}
type SubscriptionRequest struct {
Event string `json:"event"`
Scopes []string `json:"scopes,omitempty"`
AllScopes bool `json:"allscopes,omitempty"`
}
const (
FileOp_Append = "append"
FileOp_Truncate = "truncate"
FileOp_Invalidate = "invalidate"
)
type WSFileEventData struct {
ZoneId string `json:"zoneid"`
FileName string `json:"filename"`
FileOp string `json:"fileop"`
Data64 string `json:"data64"`
}

View File

@ -10,6 +10,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wconfig"
"github.com/wavetermdev/waveterm/pkg/wps"
)
// command "announce", wshserver.AnnounceCommand
@ -91,25 +92,25 @@ func DeleteBlockCommand(w *wshutil.WshRpc, data wshrpc.CommandDeleteBlockData, o
}
// command "eventpublish", wshserver.EventPublishCommand
func EventPublishCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.RpcOpts) error {
func EventPublishCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventpublish", data, opts)
return err
}
// command "eventreadhistory", wshserver.EventReadHistoryCommand
func EventReadHistoryCommand(w *wshutil.WshRpc, data wshrpc.CommandEventReadHistoryData, opts *wshrpc.RpcOpts) ([]*wshrpc.WaveEvent, error) {
resp, err := sendRpcRequestCallHelper[[]*wshrpc.WaveEvent](w, "eventreadhistory", data, opts)
func EventReadHistoryCommand(w *wshutil.WshRpc, data wshrpc.CommandEventReadHistoryData, opts *wshrpc.RpcOpts) ([]*wps.WaveEvent, error) {
resp, err := sendRpcRequestCallHelper[[]*wps.WaveEvent](w, "eventreadhistory", data, opts)
return resp, err
}
// command "eventrecv", wshserver.EventRecvCommand
func EventRecvCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.RpcOpts) error {
func EventRecvCommand(w *wshutil.WshRpc, data wps.WaveEvent, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventrecv", data, opts)
return err
}
// command "eventsub", wshserver.EventSubCommand
func EventSubCommand(w *wshutil.WshRpc, data wshrpc.SubscriptionRequest, opts *wshrpc.RpcOpts) error {
func EventSubCommand(w *wshutil.WshRpc, data wps.SubscriptionRequest, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventsub", data, opts)
return err
}

View File

@ -10,6 +10,7 @@ import (
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
@ -49,8 +50,8 @@ func generateSingleServerData(client *wshutil.WshRpc, connName string) {
getCpuData(values)
getMemData(values)
tsData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: values}
event := wshrpc.WaveEvent{
Event: wshrpc.Event_SysInfo,
event := wps.WaveEvent{
Event: wps.Event_SysInfo,
Scopes: []string{connName},
Data: tsData,
Persist: 1024,

View File

@ -11,9 +11,9 @@ import (
"reflect"
"github.com/wavetermdev/waveterm/pkg/ijson"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wconfig"
"github.com/wavetermdev/waveterm/pkg/wps"
)
const LocalConnName = "local"
@ -25,13 +25,6 @@ const (
RpcType_Complex = "complex" // streaming request/response
)
const (
Event_BlockClose = "blockclose"
Event_ConnChange = "connchange"
Event_SysInfo = "sysinfo"
Event_ControllerStatus = "controllerstatus"
)
const (
Command_Authenticate = "authenticate" // special
Command_Announce = "announce" // special (for routing)
@ -96,11 +89,11 @@ type WshRpcInterface interface {
DeleteBlockCommand(ctx context.Context, data CommandDeleteBlockData) error
FileWriteCommand(ctx context.Context, data CommandFileData) error
FileReadCommand(ctx context.Context, data CommandFileData) (string, error)
EventPublishCommand(ctx context.Context, data WaveEvent) error
EventSubCommand(ctx context.Context, data SubscriptionRequest) error
EventPublishCommand(ctx context.Context, data wps.WaveEvent) error
EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error
EventUnsubCommand(ctx context.Context, data string) error
EventUnsubAllCommand(ctx context.Context) error
EventReadHistoryCommand(ctx context.Context, data CommandEventReadHistoryData) ([]*WaveEvent, error)
EventReadHistoryCommand(ctx context.Context, data CommandEventReadHistoryData) ([]*wps.WaveEvent, error)
StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int]
StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType]
StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData]
@ -116,7 +109,7 @@ type WshRpcInterface interface {
ConnListCommand(ctx context.Context) ([]string, error)
// eventrecv is special, it's handled internally by WshRpc with EventListener
EventRecvCommand(ctx context.Context, data WaveEvent) error
EventRecvCommand(ctx context.Context, data wps.WaveEvent) error
// remotes
RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[CommandRemoteStreamFileRtnData]
@ -252,24 +245,6 @@ type CommandDeleteBlockData struct {
BlockId string `json:"blockid" wshcontext:"BlockId"`
}
type WaveEvent struct {
Event string `json:"event"`
Scopes []string `json:"scopes,omitempty"`
Sender string `json:"sender,omitempty"`
Persist int `json:"persist,omitempty"`
Data any `json:"data,omitempty"`
}
func (e WaveEvent) HasScope(scope string) bool {
return utilfn.ContainsStr(e.Scopes, scope)
}
type SubscriptionRequest struct {
Event string `json:"event"`
Scopes []string `json:"scopes,omitempty"`
AllScopes bool `json:"allscopes,omitempty"`
}
type CommandEventReadHistoryData struct {
Event string `json:"event"`
Scope string `json:"scope"`

View File

@ -18,7 +18,6 @@ import (
"time"
"github.com/wavetermdev/waveterm/pkg/blockcontroller"
"github.com/wavetermdev/waveterm/pkg/eventbus"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/remote"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
@ -169,9 +168,9 @@ func sendWaveObjUpdate(oref waveobj.ORef) {
log.Printf("error getting object for update event: %v", err)
return
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_WaveObjUpdate,
ORef: oref.String(),
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_WaveObjUpdate,
Scopes: []string{oref.String()},
Data: waveobj.WaveObjUpdate{
UpdateType: waveobj.UpdateType_Update,
OType: waveObj.GetOType(),
@ -263,7 +262,7 @@ func (ws *WshServer) CreateBlockCommand(ctx context.Context, data wshrpc.Command
return nil, fmt.Errorf("error queuing layout action: %w", err)
}
updates := waveobj.ContextGetUpdatesRtn(ctx)
eventbus.SendUpdateEvents(updates)
wps.Broker.SendUpdateEvents(updates)
return &waveobj.ORef{OType: waveobj.OType_Block, OID: blockRef.OID}, nil
}
@ -280,7 +279,7 @@ func (ws *WshServer) SetViewCommand(ctx context.Context, data wshrpc.CommandBloc
return fmt.Errorf("error updating block: %w", err)
}
updates := waveobj.ContextGetUpdatesRtn(ctx)
eventbus.SendUpdateEvents(updates)
wps.Broker.SendUpdateEvents(updates)
return nil
}
@ -329,13 +328,13 @@ func (ws *WshServer) FileWriteCommand(ctx context.Context, data wshrpc.CommandFi
if err != nil {
return fmt.Errorf("error writing to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String(),
Data: &eventbus.WSFileEventData{
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: data.ZoneId,
FileName: data.FileName,
FileOp: eventbus.FileOp_Invalidate,
FileOp: wps.FileOp_Invalidate,
},
})
return nil
@ -358,13 +357,13 @@ func (ws *WshServer) FileAppendCommand(ctx context.Context, data wshrpc.CommandF
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String(),
Data: &eventbus.WSFileEventData{
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: data.ZoneId,
FileName: data.FileName,
FileOp: eventbus.FileOp_Append,
FileOp: wps.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(dataBuf),
},
})
@ -383,13 +382,13 @@ func (ws *WshServer) FileAppendIJsonCommand(ctx context.Context, data wshrpc.Com
if err != nil {
return fmt.Errorf("error appending to blockfile(ijson): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: eventbus.WSEvent_BlockFile,
ORef: waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String(),
Data: &eventbus.WSFileEventData{
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, data.ZoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: data.ZoneId,
FileName: data.FileName,
FileOp: eventbus.FileOp_Append,
FileOp: wps.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString([]byte("{}")),
},
})
@ -421,15 +420,15 @@ func (ws *WshServer) DeleteBlockCommand(ctx context.Context, data wshrpc.Command
BlockId: data.BlockId,
})
updates := waveobj.ContextGetUpdatesRtn(ctx)
eventbus.SendUpdateEvents(updates)
wps.Broker.SendUpdateEvents(updates)
return nil
}
func (ws *WshServer) EventRecvCommand(ctx context.Context, data wshrpc.WaveEvent) error {
func (ws *WshServer) EventRecvCommand(ctx context.Context, data wps.WaveEvent) error {
return nil
}
func (ws *WshServer) EventPublishCommand(ctx context.Context, data wshrpc.WaveEvent) error {
func (ws *WshServer) EventPublishCommand(ctx context.Context, data wps.WaveEvent) error {
rpcSource := wshutil.GetRpcSourceFromContext(ctx)
if rpcSource == "" {
return fmt.Errorf("no rpc source set")
@ -441,7 +440,8 @@ func (ws *WshServer) EventPublishCommand(ctx context.Context, data wshrpc.WaveEv
return nil
}
func (ws *WshServer) EventSubCommand(ctx context.Context, data wshrpc.SubscriptionRequest) error {
func (ws *WshServer) EventSubCommand(ctx context.Context, data wps.SubscriptionRequest) error {
log.Printf("EventSubCommand: %v\n", data)
rpcSource := wshutil.GetRpcSourceFromContext(ctx)
if rpcSource == "" {
return fmt.Errorf("no rpc source set")
@ -468,7 +468,7 @@ func (ws *WshServer) EventUnsubAllCommand(ctx context.Context) error {
return nil
}
func (ws *WshServer) EventReadHistoryCommand(ctx context.Context, data wshrpc.CommandEventReadHistoryData) ([]*wshrpc.WaveEvent, error) {
func (ws *WshServer) EventReadHistoryCommand(ctx context.Context, data wshrpc.CommandEventReadHistoryData) ([]*wps.WaveEvent, error) {
events := wps.Broker.ReadEventHistory(data.Event, data.Scope, data.MaxItems)
return events, nil
}

View File

@ -7,14 +7,14 @@ import (
"sync"
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wps"
)
// event inverter. converts WaveEvents to a listener.On() API
type singleListener struct {
Id string
Fn func(*wshrpc.WaveEvent)
Fn func(*wps.WaveEvent)
}
type EventListener struct {
@ -29,7 +29,7 @@ func MakeEventListener() *EventListener {
}
}
func (el *EventListener) On(eventName string, fn func(*wshrpc.WaveEvent)) string {
func (el *EventListener) On(eventName string, fn func(*wps.WaveEvent)) string {
id := uuid.New().String()
el.Lock.Lock()
defer el.Lock.Unlock()
@ -59,7 +59,7 @@ func (el *EventListener) getListeners(eventName string) []singleListener {
return el.Listeners[eventName]
}
func (el *EventListener) RecvEvent(e *wshrpc.WaveEvent) {
func (el *EventListener) RecvEvent(e *wps.WaveEvent) {
larr := el.getListeners(e.Event)
for _, sl := range larr {
sl.Fn(e)

View File

@ -80,7 +80,7 @@ func noRouteErr(routeId string) error {
return fmt.Errorf("no route for %q", routeId)
}
func (router *WshRouter) SendEvent(routeId string, event wshrpc.WaveEvent) {
func (router *WshRouter) SendEvent(routeId string, event wps.WaveEvent) {
rpc := router.GetRpc(routeId)
if rpc == nil {
return

View File

@ -17,6 +17,7 @@ import (
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
)
@ -257,7 +258,7 @@ func (w *WshRpc) handleRequest(req *RpcMessage) {
// invalid
return
}
var waveEvent wshrpc.WaveEvent
var waveEvent wps.WaveEvent
err := utilfn.ReUnmarshal(&waveEvent, req.Data)
if err != nil {
// invalid