From e4888623551e11d8059a23066ab68d62e0810496 Mon Sep 17 00:00:00 2001 From: Mike Sawka Date: Fri, 30 Aug 2024 11:33:04 -0700 Subject: [PATCH] remote sysinfo data plotting (#294) --- Taskfile.yml | 2 +- cmd/server/main-server.go | 3 +- cmd/wsh/cmd/wshcmd-connserver.go | 3 +- frontend/app/store/wshserver.ts | 5 + frontend/app/view/cpuplot/cpuplot.tsx | 219 +++++++++++++++++++------- frontend/types/gotypes.d.ts | 11 ++ pkg/waveobj/metaconsts.go | 4 + pkg/waveobj/wtypemeta.go | 4 + pkg/wps/wps.go | 86 +++++++++- pkg/wshrpc/wshclient/wshclient.go | 6 + pkg/wshrpc/wshremote/sysinfo.go | 69 ++++++++ pkg/wshrpc/wshrpctypes.go | 18 ++- pkg/wshrpc/wshserver/wshserver.go | 51 +----- 13 files changed, 370 insertions(+), 111 deletions(-) create mode 100644 pkg/wshrpc/wshremote/sysinfo.go diff --git a/Taskfile.yml b/Taskfile.yml index 967aa34ef..47428ebc0 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -184,7 +184,7 @@ tasks: - "cmd/generatego/*.go" - "cmd/generatets/*.go" - "pkg/service/**/*.go" - - "pkg/waveobj/wtype.go" + - "pkg/waveobj/*.go" - "pkg/wconfig/**/*.go" - "pkg/wstore/*.go" - "pkg/wshrpc/**/*.go" diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 81efae9fe..03da995ad 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -157,7 +157,8 @@ func createMainWshClient() { rpc := wshserver.GetMainRpcClient() wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc) wps.Broker.SetClient(wshutil.DefaultRouter) - localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{}, &wshremote.ServerImpl{}) + localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{}) + go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName) wshutil.DefaultRouter.RegisterRoute(wshutil.MakeConnectionRouteId(wshrpc.LocalConnName), localConnWsh) } diff --git a/cmd/wsh/cmd/wshcmd-connserver.go b/cmd/wsh/cmd/wshcmd-connserver.go index 519a41a65..f9cc5d651 100644 --- a/cmd/wsh/cmd/wshcmd-connserver.go +++ b/cmd/wsh/cmd/wshcmd-connserver.go @@ -23,7 +23,8 @@ func init() { } func serverRun(cmd *cobra.Command, args []string) { - WriteStdout("running wsh connserver\n") + WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn) + go wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn) RpcClient.SetServerImpl(&wshremote.ServerImpl{LogWriter: os.Stdout}) select {} // run forever diff --git a/frontend/app/store/wshserver.ts b/frontend/app/store/wshserver.ts index f193ccbc5..a7e8aa749 100644 --- a/frontend/app/store/wshserver.ts +++ b/frontend/app/store/wshserver.ts @@ -42,6 +42,11 @@ class WshServerType { return WOS.wshServerRpcHelper_call("eventpublish", data, opts); } + // command "eventreadhistory" [call] + EventReadHistoryCommand(data: CommandEventReadHistoryData, opts?: RpcOpts): Promise { + return WOS.wshServerRpcHelper_call("eventreadhistory", data, opts); + } + // command "eventrecv" [call] EventRecvCommand(data: WaveEvent, opts?: RpcOpts): Promise { return WOS.wshServerRpcHelper_call("eventrecv", data, opts); diff --git a/frontend/app/view/cpuplot/cpuplot.tsx b/frontend/app/view/cpuplot/cpuplot.tsx index d9322012d..97289620b 100644 --- a/frontend/app/view/cpuplot/cpuplot.tsx +++ b/frontend/app/view/cpuplot/cpuplot.tsx @@ -3,8 +3,9 @@ import { useHeight } from "@/app/hook/useHeight"; import { useWidth } from "@/app/hook/useWidth"; -import { WOS } from "@/store/global"; +import { globalStore, waveEventSubscribe, WOS } from "@/store/global"; import { WshServer } from "@/store/wshserver"; +import * as util from "@/util/util"; import * as Plot from "@observablehq/plot"; import dayjs from "dayjs"; import * as htl from "htl"; @@ -13,11 +14,36 @@ import * as React from "react"; import "./cpuplot.less"; -type Point = { - time: number; // note this is in seconds not milliseconds - value: number; +const DefaultNumPoints = 120; + +type DataItem = { + ts: number; + [k: string]: number; }; +const SysInfoMetricNames = { + cpu: "CPU %", + "mem:total": "Memory Total", + "mem:used": "Memory Used", + "mem:free": "Memory Free", + "mem:available": "Memory Available", +}; +for (let i = 0; i < 32; i++) { + SysInfoMetricNames[`cpu:${i}`] = `CPU[${i}] %`; +} + +function convertWaveEventToDataItem(event: WaveEvent): DataItem { + const eventData: TimeSeriesData = event.data; + if (eventData == null || eventData.ts == null || eventData.values == null) { + return null; + } + const dataItem = { ts: eventData.ts }; + for (const key in eventData.values) { + dataItem[key] = eventData.values[key]; + } + return dataItem; +} + class CpuPlotViewModel { viewType: string; blockAtom: jotai.Atom; @@ -27,24 +53,54 @@ class CpuPlotViewModel { viewIcon: jotai.Atom; viewText: jotai.Atom; viewName: jotai.Atom; - dataAtom: jotai.PrimitiveAtom>; - addDataAtom: jotai.WritableAtom; - width: number; + dataAtom: jotai.PrimitiveAtom>; + addDataAtom: jotai.WritableAtom; incrementCount: jotai.WritableAtom>; + loadingAtom: jotai.PrimitiveAtom; + numPoints: jotai.Atom; + metrics: jotai.Atom; + connection: jotai.Atom; + manageConnection: jotai.Atom; constructor(blockId: string) { this.viewType = "cpuplot"; this.blockId = blockId; this.blockAtom = WOS.getWaveObjectAtom(`block:${blockId}`); - this.width = 100; - this.dataAtom = jotai.atom(this.getDefaultData()); - this.addDataAtom = jotai.atom(null, (get, set, point) => { - // not efficient but should be okay for a demo? - const data = get(this.dataAtom); - const newData = [...data.slice(1), point]; - set(this.dataAtom, newData); + this.addDataAtom = jotai.atom(null, (get, set, points) => { + const targetLen = get(this.numPoints) + 1; + let data = get(this.dataAtom); + try { + if (data.length > targetLen) { + data = data.slice(data.length - targetLen); + } + if (data.length < targetLen) { + const defaultData = this.getDefaultData(); + data = [...defaultData.slice(defaultData.length - targetLen + data.length), ...data]; + } + const newData = [...data.slice(points.length), ...points]; + set(this.dataAtom, newData); + } catch (e) { + console.log("Error adding data to cpuplot", e); + } + }); + this.manageConnection = jotai.atom(true); + this.loadingAtom = jotai.atom(true); + this.numPoints = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const metaNumPoints = blockData?.meta?.["graph:numpoints"]; + if (metaNumPoints == null || metaNumPoints <= 0) { + return DefaultNumPoints; + } + return metaNumPoints; + }); + this.metrics = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const metrics = blockData?.meta?.["graph:metrics"]; + if (metrics == null || !Array.isArray(metrics)) { + return ["cpu"]; + } + return metrics; }); - this.viewIcon = jotai.atom((get) => { return "chart-line"; // should not be hardcoded }); @@ -56,14 +112,47 @@ class CpuPlotViewModel { const count = meta.count ?? 0; await WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { count: count + 1 } }); }); + this.connection = jotai.atom((get) => { + const blockData = get(this.blockAtom); + const connValue = blockData?.meta?.connection; + if (util.isBlank(connValue)) { + return "local"; + } + return connValue; + }); + this.dataAtom = jotai.atom(this.getDefaultData()); + this.loadInitialData(); } - getDefaultData(): Array { + async loadInitialData() { + globalStore.set(this.loadingAtom, true); + try { + const numPoints = globalStore.get(this.numPoints); + const connName = globalStore.get(this.connection); + const initialData = await WshServer.EventReadHistoryCommand({ + event: "sysinfo", + scope: connName, + maxitems: numPoints, + }); + if (initialData == null) { + return; + } + const initialDataItems: DataItem[] = initialData.map(convertWaveEventToDataItem); + globalStore.set(this.addDataAtom, initialDataItems); + } catch (e) { + console.log("Error loading initial data for cpuplot", e); + } finally { + globalStore.set(this.loadingAtom, false); + } + } + + getDefaultData(): Array { // set it back one to avoid backwards line being possible - const currentTime = Date.now() / 1000 - 1; - const points = []; - for (let i = this.width; i > -1; i--) { - points.push({ time: currentTime - i, value: 0 }); + const numPoints = globalStore.get(this.numPoints); + const currentTime = Date.now() - 1000; + const points: DataItem[] = []; + for (let i = numPoints; i > -1; i--) { + points.push({ ts: currentTime - i * 1000 }); } return points; } @@ -74,59 +163,83 @@ function makeCpuPlotViewModel(blockId: string): CpuPlotViewModel { return cpuPlotViewModel; } +const plotColors = ["#58C142", "#FFC107", "#FF5722", "#2196F3", "#9C27B0", "#00BCD4", "#FFEB3B", "#795548"]; + function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) { const containerRef = React.useRef(); const plotData = jotai.useAtomValue(model.dataAtom); const addPlotData = jotai.useSetAtom(model.addDataAtom); const parentHeight = useHeight(containerRef); const parentWidth = useWidth(containerRef); - const block = jotai.useAtomValue(model.blockAtom); - const incrementCount = jotai.useSetAtom(model.incrementCount); // temporary + const yvals = jotai.useAtomValue(model.metrics); + const connName = jotai.useAtomValue(model.connection); + const lastConnName = React.useRef(connName); React.useEffect(() => { - const temp = async () => { - await incrementCount(); - const dataGen = WshServer.StreamCpuDataCommand( - { id: model.blockId, count: (block.meta?.count ?? 0) + 1 }, - { timeout: 999999999, noresponse: false } - ); - try { - for await (const datum of dataGen) { - const data = { time: datum.ts / 1000, value: datum.values?.["cpu"] }; - addPlotData(data); - } - } catch (e) { - console.log(e); + if (lastConnName.current !== connName) { + model.loadInitialData(); + } + const unsubFn = waveEventSubscribe("sysinfo", connName, (event: WaveEvent) => { + const loading = globalStore.get(model.loadingAtom); + if (loading) { + return; } + const dataItem = convertWaveEventToDataItem(event); + addPlotData([dataItem]); + }); + return () => { + unsubFn(); }; - temp(); - }, []); + }, [connName]); React.useEffect(() => { - const plot = Plot.plot({ - x: { grid: true, label: "time", tickFormat: (d) => `${dayjs.unix(d).format("HH:mm:ss")}` }, - y: { label: "%", domain: [0, 100] }, - width: parentWidth, - height: parentHeight, - marks: [ - () => htl.svg` + const marks: Plot.Markish[] = []; + marks.push( + () => htl.svg` - `, + ` + ); + if (yvals.length == 0) { + // nothing + } else if (yvals.length == 1) { + marks.push( Plot.lineY(plotData, { - stroke: "#58C142", + stroke: plotColors[0], strokeWidth: 2, - x: "time", - y: "value", - }), + x: "ts", + y: yvals[0], + }) + ); + marks.push( Plot.areaY(plotData, { fill: "url(#gradient)", - x: "time", - y: "value", - }), - ], + x: "ts", + y: yvals[0], + }) + ); + } else { + let idx = 0; + for (const yval of yvals) { + marks.push( + Plot.lineY(plotData, { + stroke: plotColors[idx % plotColors.length], + strokeWidth: 1, + x: "ts", + y: yval, + }) + ); + idx++; + } + } + const plot = Plot.plot({ + x: { grid: true, label: "time", tickFormat: (d) => `${dayjs.unix(d / 1000).format("HH:mm:ss")}` }, + y: { label: "%", domain: [0, 100] }, + width: parentWidth, + height: parentHeight, + marks: marks, }); if (plot !== undefined) { diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index e8a2d2297..d1091facb 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -82,6 +82,13 @@ declare global { blockid: string; }; + // wshrpc.CommandEventReadHistoryData + type CommandEventReadHistoryData = { + event: string; + scope: string; + maxitems: number; + }; + // wshrpc.CommandFileData type CommandFileData = { zoneid: string; @@ -250,6 +257,9 @@ declare global { "cmd:env"?: {[key: string]: string}; "cmd:cwd"?: string; "cmd:nowsh"?: boolean; + "graph:*"?: boolean; + "graph:numpoints"?: number; + "graph:metrics"?: string[]; bg?: string; "bg:*"?: boolean; "bg:opacity"?: number; @@ -556,6 +566,7 @@ declare global { event: string; scopes?: string[]; sender?: string; + persist?: number; data?: any; }; diff --git a/pkg/waveobj/metaconsts.go b/pkg/waveobj/metaconsts.go index a9d23d7ae..7cfd72429 100644 --- a/pkg/waveobj/metaconsts.go +++ b/pkg/waveobj/metaconsts.go @@ -43,6 +43,10 @@ const ( MetaKey_CmdCwd = "cmd:cwd" MetaKey_CmdNoWsh = "cmd:nowsh" + MetaKey_GraphClear = "graph:*" + MetaKey_GraphNumPoints = "graph:numpoints" + MetaKey_GraphMetrics = "graph:metrics" + MetaKey_Bg = "bg" MetaKey_BgClear = "bg:*" MetaKey_BgOpacity = "bg:opacity" diff --git a/pkg/waveobj/wtypemeta.go b/pkg/waveobj/wtypemeta.go index f6056e9dc..cfd52e9e1 100644 --- a/pkg/waveobj/wtypemeta.go +++ b/pkg/waveobj/wtypemeta.go @@ -43,6 +43,10 @@ type MetaTSType struct { CmdCwd string `json:"cmd:cwd,omitempty"` CmdNoWsh bool `json:"cmd:nowsh,omitempty"` + GraphClear bool `json:"graph:*,omitempty"` + GraphNumPoints int `json:"graph:numpoints,omitempty"` + GraphMetrics []string `json:"graph:metrics,omitempty"` + // for tabs Bg string `json:"bg,omitempty"` BgClear bool `json:"bg:*,omitempty"` diff --git a/pkg/wps/wps.go b/pkg/wps/wps.go index 79797b2ff..fb13649e6 100644 --- a/pkg/wps/wps.go +++ b/pkg/wps/wps.go @@ -5,6 +5,7 @@ package wps import ( + "log" "strings" "sync" @@ -15,6 +16,9 @@ import ( // this broker interface is mostly generic // strong typing and event types can be defined elsewhere +const MaxPersist = 4096 +const ReMakeArrThreshold = 10 * 1024 + type Client interface { SendEvent(routeId string, event wshrpc.WaveEvent) } @@ -25,15 +29,27 @@ type BrokerSubscription struct { StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them) } +type persistKey struct { + Event string + Scope string +} + +type persistEventWrap struct { + ArrTotalAdds int + Events []*wshrpc.WaveEvent +} + type BrokerType struct { - Lock *sync.Mutex - Client Client - SubMap map[string]*BrokerSubscription + Lock *sync.Mutex + Client Client + SubMap map[string]*BrokerSubscription + PersistMap map[persistKey]*persistEventWrap } var Broker = &BrokerType{ - Lock: &sync.Mutex{}, - SubMap: make(map[string]*BrokerSubscription), + Lock: &sync.Mutex{}, + SubMap: make(map[string]*BrokerSubscription), + PersistMap: make(map[persistKey]*persistEventWrap), } func scopeHasStarMatch(scope string) bool { @@ -60,6 +76,7 @@ func (b *BrokerType) GetClient() Client { // if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one) func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) { + log.Printf("[wps] sub %s %s\n", subRouteId, sub.Event) if sub.Event == "" { return } @@ -121,6 +138,7 @@ func addStrToScopeMap(scopeMap map[string][]string, scope string, routeId string } func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) { + log.Printf("[wps] unsub %s %s\n", subRouteId, eventName) b.Lock.Lock() defer b.Lock.Unlock() b.unsubscribe_nolock(subRouteId, eventName) @@ -156,7 +174,65 @@ func (b *BrokerType) UnsubscribeAll(subRouteId string) { } } +// does not take wildcards, use "" for all +func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*wshrpc.WaveEvent { + if maxItems <= 0 { + return nil + } + b.Lock.Lock() + defer b.Lock.Unlock() + key := persistKey{Event: eventType, Scope: scope} + pe := b.PersistMap[key] + if pe == nil || len(pe.Events) == 0 { + return nil + } + if maxItems > len(pe.Events) { + maxItems = len(pe.Events) + } + // return new arr + rtn := make([]*wshrpc.WaveEvent, maxItems) + copy(rtn, pe.Events[len(pe.Events)-maxItems:]) + return rtn +} + +func (b *BrokerType) persistEvent(event wshrpc.WaveEvent) { + if event.Persist <= 0 { + return + } + numPersist := event.Persist + if numPersist > MaxPersist { + numPersist = MaxPersist + } + scopeMap := make(map[string]bool) + for _, scope := range event.Scopes { + scopeMap[scope] = true + } + scopeMap[""] = true + b.Lock.Lock() + defer b.Lock.Unlock() + for scope := range scopeMap { + key := persistKey{Event: event.Event, Scope: scope} + pe := b.PersistMap[key] + if pe == nil { + pe = &persistEventWrap{ + ArrTotalAdds: 0, + Events: make([]*wshrpc.WaveEvent, 0, event.Persist), + } + b.PersistMap[key] = pe + } + pe.Events = append(pe.Events, &event) + pe.ArrTotalAdds++ + if pe.ArrTotalAdds > ReMakeArrThreshold { + pe.Events = append([]*wshrpc.WaveEvent{}, pe.Events...) + pe.ArrTotalAdds = len(pe.Events) + } + } +} + func (b *BrokerType) Publish(event wshrpc.WaveEvent) { + if event.Persist > 0 { + b.persistEvent(event) + } client := b.GetClient() if client == nil { return diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 3aef251d8..ce14cf666 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -53,6 +53,12 @@ func EventPublishCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc. return err } +// command "eventreadhistory", wshserver.EventReadHistoryCommand +func EventReadHistoryCommand(w *wshutil.WshRpc, data wshrpc.CommandEventReadHistoryData, opts *wshrpc.RpcOpts) ([]*wshrpc.WaveEvent, error) { + resp, err := sendRpcRequestCallHelper[[]*wshrpc.WaveEvent](w, "eventreadhistory", data, opts) + return resp, err +} + // command "eventrecv", wshserver.EventRecvCommand func EventRecvCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "eventrecv", data, opts) diff --git a/pkg/wshrpc/wshremote/sysinfo.go b/pkg/wshrpc/wshremote/sysinfo.go new file mode 100644 index 000000000..8616f0b95 --- /dev/null +++ b/pkg/wshrpc/wshremote/sysinfo.go @@ -0,0 +1,69 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshremote + +import ( + "log" + "strconv" + "time" + + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" + "github.com/wavetermdev/thenextwave/pkg/wshrpc" + "github.com/wavetermdev/thenextwave/pkg/wshrpc/wshclient" + "github.com/wavetermdev/thenextwave/pkg/wshutil" +) + +func getCpuData(values map[string]float64) { + percentArr, err := cpu.Percent(0, false) + if err != nil { + return + } + if len(percentArr) > 0 { + values[wshrpc.TimeSeries_Cpu] = percentArr[0] + } + percentArr, err = cpu.Percent(0, true) + if err != nil { + return + } + for idx, percent := range percentArr { + values[wshrpc.TimeSeries_Cpu+":"+strconv.Itoa(idx)] = percent + } +} + +func getMemData(values map[string]float64) { + memData, err := mem.VirtualMemory() + if err != nil { + return + } + values["mem:total"] = float64(memData.Total) + values["mem:available"] = float64(memData.Available) + values["mem:used"] = float64(memData.Used) + values["mem:free"] = float64(memData.Free) +} + +func generateSingleServerData(client *wshutil.WshRpc, connName string) { + now := time.Now() + values := make(map[string]float64) + getCpuData(values) + getMemData(values) + tsData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: values} + event := wshrpc.WaveEvent{ + Event: wshrpc.Event_SysInfo, + Scopes: []string{connName}, + Data: tsData, + Persist: 1024, + } + wshclient.EventPublishCommand(client, event, &wshrpc.RpcOpts{NoResponse: true}) +} + +func RunSysInfoLoop(client *wshutil.WshRpc, connName string) { + defer func() { + log.Printf("sysinfo loop ended conn:%s\n", connName) + }() + for { + generateSingleServerData(client, connName) + time.Sleep(1 * time.Second) + } +} diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index f1326325a..583cdb25e 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -27,6 +27,7 @@ const ( const ( Event_BlockClose = "blockclose" Event_ConnChange = "connchange" + Event_SysInfo = "sysinfo" ) const ( @@ -50,6 +51,7 @@ const ( Command_EventSub = "eventsub" Command_EventUnsub = "eventunsub" Command_EventUnsubAll = "eventunsuball" + Command_EventReadHistory = "eventreadhistory" Command_StreamTest = "streamtest" Command_StreamWaveAi = "streamwaveai" Command_StreamCpuData = "streamcpudata" @@ -86,6 +88,7 @@ type WshRpcInterface interface { EventSubCommand(ctx context.Context, data SubscriptionRequest) error EventUnsubCommand(ctx context.Context, data string) error EventUnsubAllCommand(ctx context.Context) error + EventReadHistoryCommand(ctx context.Context, data CommandEventReadHistoryData) ([]*WaveEvent, error) StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int] StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType] StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData] @@ -226,10 +229,11 @@ type CommandDeleteBlockData struct { } type WaveEvent struct { - Event string `json:"event"` - Scopes []string `json:"scopes,omitempty"` - Sender string `json:"sender,omitempty"` - Data any `json:"data,omitempty"` + Event string `json:"event"` + Scopes []string `json:"scopes,omitempty"` + Sender string `json:"sender,omitempty"` + Persist int `json:"persist,omitempty"` + Data any `json:"data,omitempty"` } func (e WaveEvent) HasScope(scope string) bool { @@ -242,6 +246,12 @@ type SubscriptionRequest struct { AllScopes bool `json:"allscopes,omitempty"` } +type CommandEventReadHistoryData struct { + Event string `json:"event"` + Scope string `json:"scope"` + MaxItems int `json:"maxitems"` +} + type OpenAiStreamRequest struct { ClientId string `json:"clientid,omitempty"` Opts *OpenAIOptsType `json:"opts"` diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 723bab3a1..ea8f7aa62 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -15,7 +15,6 @@ import ( "strings" "time" - "github.com/shirou/gopsutil/v4/cpu" "github.com/wavetermdev/thenextwave/pkg/blockcontroller" "github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/filestore" @@ -102,51 +101,6 @@ func (ws *WshServer) StreamWaveAiCommand(ctx context.Context, request wshrpc.Ope return waveai.RunLocalCompletionStream(ctx, request) } -func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] { - rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]) - go func() { - defer close(rtn) - MakePlotData(ctx, request.Id) - // we can use the err from MakePlotData to determine if a routine is already running - // but we still need a way to close it or get data from it - for { - now := time.Now() - percent, err := cpu.Percent(0, false) - if err != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err} - } - var value float64 - if len(percent) > 0 { - value = percent[0] - } else { - value = 0.0 - } - cpuData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: map[string]float64{wshrpc.TimeSeries_Cpu: value}} - rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Response: cpuData} - time.Sleep(time.Second * 1) - // this will end the goroutine if the block is closed - err = SavePlotData(ctx, request.Id, "") - if err != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err} - return - } - blockData, getBlockDataErr := wstore.DBMustGet[*waveobj.Block](ctx, request.Id) - if getBlockDataErr != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: getBlockDataErr} - return - } - count := blockData.Meta.GetInt(waveobj.MetaKey_Count, 0) - if count != request.Count { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: fmt.Errorf("new instance created. canceling old goroutine")} - return - } - - } - }() - - return rtn -} - func MakePlotData(ctx context.Context, blockId string) error { block, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) if err != nil { @@ -473,6 +427,11 @@ func (ws *WshServer) EventUnsubAllCommand(ctx context.Context) error { return nil } +func (ws *WshServer) EventReadHistoryCommand(ctx context.Context, data wshrpc.CommandEventReadHistoryData) ([]*wshrpc.WaveEvent, error) { + events := wps.Broker.ReadEventHistory(data.Event, data.Scope, data.MaxItems) + return events, nil +} + func (ws *WshServer) SetConfigCommand(ctx context.Context, data waveobj.MetaMapType) error { return wconfig.SetBaseConfigValue(data) }