waveterm/frontend/app/store/wps.ts
Evan Simkowitz 936d4bfb30
Migrate websocket eventbus messages to wps (#367)
This migrates all remaining eventbus events sent over the websocket to
use the wps interface. WPS is more flexible for registering events and
callbacks and provides support for more reliable unsubscribes and
resubscribes.
2024-09-11 18:03:55 -07:00

142 lines
4.3 KiB
TypeScript

import { isBlank } from "@/util/util";
import { Subject } from "rxjs";
import { sendRawRpcMessage, setRpcEventHandlerFn } from "./wshrpc";
type WaveEventSubject = {
handler: (event: WaveEvent) => void;
scope?: string;
};
type WaveEventSubjectContainer = WaveEventSubject & {
id: string;
};
type WaveEventSubscription = WaveEventSubject & {
eventType: string;
};
type WaveEventUnsubscribe = {
id: string;
eventType: string;
};
// key is "eventType" or "eventType|oref"
const fileSubjects = new Map<string, SubjectWithRef<WSFileEventData>>();
const waveEventSubjects = new Map<string, WaveEventSubjectContainer[]>();
function makeWaveReSubCommand(eventType: string): RpcMessage {
let subjects = waveEventSubjects.get(eventType);
if (subjects == null) {
return { command: "eventunsub", data: eventType };
}
let subreq: SubscriptionRequest = { event: eventType, scopes: [], allscopes: false };
for (const scont of subjects) {
if (isBlank(scont.scope)) {
subreq.allscopes = true;
subreq.scopes = [];
break;
}
subreq.scopes.push(scont.scope);
}
return { command: "eventsub", data: subreq };
}
function updateWaveEventSub(eventType: string) {
const command = makeWaveReSubCommand(eventType);
// console.log("updateWaveEventSub", eventType, command);
sendRawRpcMessage(command);
}
function waveEventSubscribe(...subscriptions: WaveEventSubscription[]): () => void {
const unsubs: WaveEventUnsubscribe[] = [];
const eventTypeSet = new Set<string>();
for (const subscription of subscriptions) {
// console.log("waveEventSubscribe", subscription);
if (subscription.handler == null) {
return;
}
const id: string = crypto.randomUUID();
let subjects = waveEventSubjects.get(subscription.eventType);
if (subjects == null) {
subjects = [];
waveEventSubjects.set(subscription.eventType, subjects);
}
const subcont: WaveEventSubjectContainer = { id, handler: subscription.handler, scope: subscription.scope };
subjects.push(subcont);
unsubs.push({ id, eventType: subscription.eventType });
eventTypeSet.add(subscription.eventType);
}
for (const eventType of eventTypeSet) {
updateWaveEventSub(eventType);
}
return () => waveEventUnsubscribe(...unsubs);
}
function waveEventUnsubscribe(...unsubscribes: WaveEventUnsubscribe[]) {
const eventTypeSet = new Set<string>();
for (const unsubscribe of unsubscribes) {
let subjects = waveEventSubjects.get(unsubscribe.eventType);
if (subjects == null) {
return;
}
const idx = subjects.findIndex((s) => s.id === unsubscribe.id);
if (idx === -1) {
return;
}
subjects.splice(idx, 1);
if (subjects.length === 0) {
waveEventSubjects.delete(unsubscribe.eventType);
}
eventTypeSet.add(unsubscribe.eventType);
}
for (const eventType of eventTypeSet) {
updateWaveEventSub(eventType);
}
}
function getFileSubject(zoneId: string, fileName: string): SubjectWithRef<WSFileEventData> {
const subjectKey = zoneId + "|" + fileName;
let subject = fileSubjects.get(subjectKey);
if (subject == null) {
subject = new Subject<any>() as any;
subject.refCount = 0;
subject.release = () => {
subject.refCount--;
if (subject.refCount === 0) {
subject.complete();
fileSubjects.delete(subjectKey);
}
};
fileSubjects.set(subjectKey, subject);
}
subject.refCount++;
return subject;
}
function handleWaveEvent(event: WaveEvent) {
// console.log("handleWaveEvent", event);
const subjects = waveEventSubjects.get(event.event);
if (subjects == null) {
return;
}
for (const scont of subjects) {
if (isBlank(scont.scope)) {
scont.handler(event);
continue;
}
if (event.scopes == null) {
continue;
}
if (event.scopes.includes(scont.scope)) {
scont.handler(event);
}
}
}
function initWps() {
setRpcEventHandlerFn(handleWaveEvent);
}
export { getFileSubject, initWps, waveEventSubscribe, waveEventUnsubscribe };