remote sysinfo data plotting (#294)

This commit is contained in:
Mike Sawka 2024-08-30 11:33:04 -07:00 committed by GitHub
parent a2695e8c08
commit e488862355
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 370 additions and 111 deletions

View File

@ -184,7 +184,7 @@ tasks:
- "cmd/generatego/*.go" - "cmd/generatego/*.go"
- "cmd/generatets/*.go" - "cmd/generatets/*.go"
- "pkg/service/**/*.go" - "pkg/service/**/*.go"
- "pkg/waveobj/wtype.go" - "pkg/waveobj/*.go"
- "pkg/wconfig/**/*.go" - "pkg/wconfig/**/*.go"
- "pkg/wstore/*.go" - "pkg/wstore/*.go"
- "pkg/wshrpc/**/*.go" - "pkg/wshrpc/**/*.go"

View File

@ -157,7 +157,8 @@ func createMainWshClient() {
rpc := wshserver.GetMainRpcClient() rpc := wshserver.GetMainRpcClient()
wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc) wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc)
wps.Broker.SetClient(wshutil.DefaultRouter) wps.Broker.SetClient(wshutil.DefaultRouter)
localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{}, &wshremote.ServerImpl{}) localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{})
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterRoute(wshutil.MakeConnectionRouteId(wshrpc.LocalConnName), localConnWsh) wshutil.DefaultRouter.RegisterRoute(wshutil.MakeConnectionRouteId(wshrpc.LocalConnName), localConnWsh)
} }

View File

@ -23,7 +23,8 @@ func init() {
} }
func serverRun(cmd *cobra.Command, args []string) { func serverRun(cmd *cobra.Command, args []string) {
WriteStdout("running wsh connserver\n") WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
go wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
RpcClient.SetServerImpl(&wshremote.ServerImpl{LogWriter: os.Stdout}) RpcClient.SetServerImpl(&wshremote.ServerImpl{LogWriter: os.Stdout})
select {} // run forever select {} // run forever

View File

@ -42,6 +42,11 @@ class WshServerType {
return WOS.wshServerRpcHelper_call("eventpublish", data, opts); return WOS.wshServerRpcHelper_call("eventpublish", data, opts);
} }
// command "eventreadhistory" [call]
EventReadHistoryCommand(data: CommandEventReadHistoryData, opts?: RpcOpts): Promise<WaveEvent[]> {
return WOS.wshServerRpcHelper_call("eventreadhistory", data, opts);
}
// command "eventrecv" [call] // command "eventrecv" [call]
EventRecvCommand(data: WaveEvent, opts?: RpcOpts): Promise<void> { EventRecvCommand(data: WaveEvent, opts?: RpcOpts): Promise<void> {
return WOS.wshServerRpcHelper_call("eventrecv", data, opts); return WOS.wshServerRpcHelper_call("eventrecv", data, opts);

View File

@ -3,8 +3,9 @@
import { useHeight } from "@/app/hook/useHeight"; import { useHeight } from "@/app/hook/useHeight";
import { useWidth } from "@/app/hook/useWidth"; import { useWidth } from "@/app/hook/useWidth";
import { WOS } from "@/store/global"; import { globalStore, waveEventSubscribe, WOS } from "@/store/global";
import { WshServer } from "@/store/wshserver"; import { WshServer } from "@/store/wshserver";
import * as util from "@/util/util";
import * as Plot from "@observablehq/plot"; import * as Plot from "@observablehq/plot";
import dayjs from "dayjs"; import dayjs from "dayjs";
import * as htl from "htl"; import * as htl from "htl";
@ -13,11 +14,36 @@ import * as React from "react";
import "./cpuplot.less"; import "./cpuplot.less";
type Point = { const DefaultNumPoints = 120;
time: number; // note this is in seconds not milliseconds
value: number; type DataItem = {
ts: number;
[k: string]: number;
}; };
const SysInfoMetricNames = {
cpu: "CPU %",
"mem:total": "Memory Total",
"mem:used": "Memory Used",
"mem:free": "Memory Free",
"mem:available": "Memory Available",
};
for (let i = 0; i < 32; i++) {
SysInfoMetricNames[`cpu:${i}`] = `CPU[${i}] %`;
}
function convertWaveEventToDataItem(event: WaveEvent): DataItem {
const eventData: TimeSeriesData = event.data;
if (eventData == null || eventData.ts == null || eventData.values == null) {
return null;
}
const dataItem = { ts: eventData.ts };
for (const key in eventData.values) {
dataItem[key] = eventData.values[key];
}
return dataItem;
}
class CpuPlotViewModel { class CpuPlotViewModel {
viewType: string; viewType: string;
blockAtom: jotai.Atom<Block>; blockAtom: jotai.Atom<Block>;
@ -27,24 +53,54 @@ class CpuPlotViewModel {
viewIcon: jotai.Atom<string>; viewIcon: jotai.Atom<string>;
viewText: jotai.Atom<string>; viewText: jotai.Atom<string>;
viewName: jotai.Atom<string>; viewName: jotai.Atom<string>;
dataAtom: jotai.PrimitiveAtom<Array<Point>>; dataAtom: jotai.PrimitiveAtom<Array<DataItem>>;
addDataAtom: jotai.WritableAtom<unknown, [Point], void>; addDataAtom: jotai.WritableAtom<unknown, [DataItem[]], void>;
width: number;
incrementCount: jotai.WritableAtom<unknown, [], Promise<void>>; incrementCount: jotai.WritableAtom<unknown, [], Promise<void>>;
loadingAtom: jotai.PrimitiveAtom<boolean>;
numPoints: jotai.Atom<number>;
metrics: jotai.Atom<string[]>;
connection: jotai.Atom<string>;
manageConnection: jotai.Atom<boolean>;
constructor(blockId: string) { constructor(blockId: string) {
this.viewType = "cpuplot"; this.viewType = "cpuplot";
this.blockId = blockId; this.blockId = blockId;
this.blockAtom = WOS.getWaveObjectAtom<Block>(`block:${blockId}`); this.blockAtom = WOS.getWaveObjectAtom<Block>(`block:${blockId}`);
this.width = 100; this.addDataAtom = jotai.atom(null, (get, set, points) => {
this.dataAtom = jotai.atom(this.getDefaultData()); const targetLen = get(this.numPoints) + 1;
this.addDataAtom = jotai.atom(null, (get, set, point) => { let data = get(this.dataAtom);
// not efficient but should be okay for a demo? try {
const data = get(this.dataAtom); if (data.length > targetLen) {
const newData = [...data.slice(1), point]; data = data.slice(data.length - targetLen);
}
if (data.length < targetLen) {
const defaultData = this.getDefaultData();
data = [...defaultData.slice(defaultData.length - targetLen + data.length), ...data];
}
const newData = [...data.slice(points.length), ...points];
set(this.dataAtom, newData); set(this.dataAtom, newData);
} catch (e) {
console.log("Error adding data to cpuplot", e);
}
});
this.manageConnection = jotai.atom(true);
this.loadingAtom = jotai.atom(true);
this.numPoints = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const metaNumPoints = blockData?.meta?.["graph:numpoints"];
if (metaNumPoints == null || metaNumPoints <= 0) {
return DefaultNumPoints;
}
return metaNumPoints;
});
this.metrics = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const metrics = blockData?.meta?.["graph:metrics"];
if (metrics == null || !Array.isArray(metrics)) {
return ["cpu"];
}
return metrics;
}); });
this.viewIcon = jotai.atom((get) => { this.viewIcon = jotai.atom((get) => {
return "chart-line"; // should not be hardcoded return "chart-line"; // should not be hardcoded
}); });
@ -56,14 +112,47 @@ class CpuPlotViewModel {
const count = meta.count ?? 0; const count = meta.count ?? 0;
await WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { count: count + 1 } }); await WshServer.SetMetaCommand({ oref: WOS.makeORef("block", this.blockId), meta: { count: count + 1 } });
}); });
this.connection = jotai.atom((get) => {
const blockData = get(this.blockAtom);
const connValue = blockData?.meta?.connection;
if (util.isBlank(connValue)) {
return "local";
}
return connValue;
});
this.dataAtom = jotai.atom(this.getDefaultData());
this.loadInitialData();
} }
getDefaultData(): Array<Point> { async loadInitialData() {
globalStore.set(this.loadingAtom, true);
try {
const numPoints = globalStore.get(this.numPoints);
const connName = globalStore.get(this.connection);
const initialData = await WshServer.EventReadHistoryCommand({
event: "sysinfo",
scope: connName,
maxitems: numPoints,
});
if (initialData == null) {
return;
}
const initialDataItems: DataItem[] = initialData.map(convertWaveEventToDataItem);
globalStore.set(this.addDataAtom, initialDataItems);
} catch (e) {
console.log("Error loading initial data for cpuplot", e);
} finally {
globalStore.set(this.loadingAtom, false);
}
}
getDefaultData(): Array<DataItem> {
// set it back one to avoid backwards line being possible // set it back one to avoid backwards line being possible
const currentTime = Date.now() / 1000 - 1; const numPoints = globalStore.get(this.numPoints);
const points = []; const currentTime = Date.now() - 1000;
for (let i = this.width; i > -1; i--) { const points: DataItem[] = [];
points.push({ time: currentTime - i, value: 0 }); for (let i = numPoints; i > -1; i--) {
points.push({ ts: currentTime - i * 1000 });
} }
return points; return points;
} }
@ -74,59 +163,83 @@ function makeCpuPlotViewModel(blockId: string): CpuPlotViewModel {
return cpuPlotViewModel; return cpuPlotViewModel;
} }
const plotColors = ["#58C142", "#FFC107", "#FF5722", "#2196F3", "#9C27B0", "#00BCD4", "#FFEB3B", "#795548"];
function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) { function CpuPlotView({ model }: { model: CpuPlotViewModel; blockId: string }) {
const containerRef = React.useRef<HTMLInputElement>(); const containerRef = React.useRef<HTMLInputElement>();
const plotData = jotai.useAtomValue(model.dataAtom); const plotData = jotai.useAtomValue(model.dataAtom);
const addPlotData = jotai.useSetAtom(model.addDataAtom); const addPlotData = jotai.useSetAtom(model.addDataAtom);
const parentHeight = useHeight(containerRef); const parentHeight = useHeight(containerRef);
const parentWidth = useWidth(containerRef); const parentWidth = useWidth(containerRef);
const block = jotai.useAtomValue(model.blockAtom); const yvals = jotai.useAtomValue(model.metrics);
const incrementCount = jotai.useSetAtom(model.incrementCount); // temporary const connName = jotai.useAtomValue(model.connection);
const lastConnName = React.useRef(connName);
React.useEffect(() => { React.useEffect(() => {
const temp = async () => { if (lastConnName.current !== connName) {
await incrementCount(); model.loadInitialData();
const dataGen = WshServer.StreamCpuDataCommand(
{ id: model.blockId, count: (block.meta?.count ?? 0) + 1 },
{ timeout: 999999999, noresponse: false }
);
try {
for await (const datum of dataGen) {
const data = { time: datum.ts / 1000, value: datum.values?.["cpu"] };
addPlotData(data);
} }
} catch (e) { const unsubFn = waveEventSubscribe("sysinfo", connName, (event: WaveEvent) => {
console.log(e); const loading = globalStore.get(model.loadingAtom);
if (loading) {
return;
} }
const dataItem = convertWaveEventToDataItem(event);
addPlotData([dataItem]);
});
return () => {
unsubFn();
}; };
temp(); }, [connName]);
}, []);
React.useEffect(() => { React.useEffect(() => {
const plot = Plot.plot({ const marks: Plot.Markish[] = [];
x: { grid: true, label: "time", tickFormat: (d) => `${dayjs.unix(d).format("HH:mm:ss")}` }, marks.push(
y: { label: "%", domain: [0, 100] },
width: parentWidth,
height: parentHeight,
marks: [
() => htl.svg`<defs> () => htl.svg`<defs>
<linearGradient id="gradient" gradientTransform="rotate(90)"> <linearGradient id="gradient" gradientTransform="rotate(90)">
<stop offset="0%" stop-color="#58C142" stop-opacity="0.7" /> <stop offset="0%" stop-color="#58C142" stop-opacity="0.7" />
<stop offset="100%" stop-color="#58C142" stop-opacity="0" /> <stop offset="100%" stop-color="#58C142" stop-opacity="0" />
</linearGradient> </linearGradient>
</defs>`, </defs>`
);
if (yvals.length == 0) {
// nothing
} else if (yvals.length == 1) {
marks.push(
Plot.lineY(plotData, { Plot.lineY(plotData, {
stroke: "#58C142", stroke: plotColors[0],
strokeWidth: 2, strokeWidth: 2,
x: "time", x: "ts",
y: "value", y: yvals[0],
}), })
);
marks.push(
Plot.areaY(plotData, { Plot.areaY(plotData, {
fill: "url(#gradient)", fill: "url(#gradient)",
x: "time", x: "ts",
y: "value", y: yvals[0],
}), })
], );
} else {
let idx = 0;
for (const yval of yvals) {
marks.push(
Plot.lineY(plotData, {
stroke: plotColors[idx % plotColors.length],
strokeWidth: 1,
x: "ts",
y: yval,
})
);
idx++;
}
}
const plot = Plot.plot({
x: { grid: true, label: "time", tickFormat: (d) => `${dayjs.unix(d / 1000).format("HH:mm:ss")}` },
y: { label: "%", domain: [0, 100] },
width: parentWidth,
height: parentHeight,
marks: marks,
}); });
if (plot !== undefined) { if (plot !== undefined) {

View File

@ -82,6 +82,13 @@ declare global {
blockid: string; blockid: string;
}; };
// wshrpc.CommandEventReadHistoryData
type CommandEventReadHistoryData = {
event: string;
scope: string;
maxitems: number;
};
// wshrpc.CommandFileData // wshrpc.CommandFileData
type CommandFileData = { type CommandFileData = {
zoneid: string; zoneid: string;
@ -250,6 +257,9 @@ declare global {
"cmd:env"?: {[key: string]: string}; "cmd:env"?: {[key: string]: string};
"cmd:cwd"?: string; "cmd:cwd"?: string;
"cmd:nowsh"?: boolean; "cmd:nowsh"?: boolean;
"graph:*"?: boolean;
"graph:numpoints"?: number;
"graph:metrics"?: string[];
bg?: string; bg?: string;
"bg:*"?: boolean; "bg:*"?: boolean;
"bg:opacity"?: number; "bg:opacity"?: number;
@ -556,6 +566,7 @@ declare global {
event: string; event: string;
scopes?: string[]; scopes?: string[];
sender?: string; sender?: string;
persist?: number;
data?: any; data?: any;
}; };

View File

@ -43,6 +43,10 @@ const (
MetaKey_CmdCwd = "cmd:cwd" MetaKey_CmdCwd = "cmd:cwd"
MetaKey_CmdNoWsh = "cmd:nowsh" MetaKey_CmdNoWsh = "cmd:nowsh"
MetaKey_GraphClear = "graph:*"
MetaKey_GraphNumPoints = "graph:numpoints"
MetaKey_GraphMetrics = "graph:metrics"
MetaKey_Bg = "bg" MetaKey_Bg = "bg"
MetaKey_BgClear = "bg:*" MetaKey_BgClear = "bg:*"
MetaKey_BgOpacity = "bg:opacity" MetaKey_BgOpacity = "bg:opacity"

View File

@ -43,6 +43,10 @@ type MetaTSType struct {
CmdCwd string `json:"cmd:cwd,omitempty"` CmdCwd string `json:"cmd:cwd,omitempty"`
CmdNoWsh bool `json:"cmd:nowsh,omitempty"` CmdNoWsh bool `json:"cmd:nowsh,omitempty"`
GraphClear bool `json:"graph:*,omitempty"`
GraphNumPoints int `json:"graph:numpoints,omitempty"`
GraphMetrics []string `json:"graph:metrics,omitempty"`
// for tabs // for tabs
Bg string `json:"bg,omitempty"` Bg string `json:"bg,omitempty"`
BgClear bool `json:"bg:*,omitempty"` BgClear bool `json:"bg:*,omitempty"`

View File

@ -5,6 +5,7 @@
package wps package wps
import ( import (
"log"
"strings" "strings"
"sync" "sync"
@ -15,6 +16,9 @@ import (
// this broker interface is mostly generic // this broker interface is mostly generic
// strong typing and event types can be defined elsewhere // strong typing and event types can be defined elsewhere
const MaxPersist = 4096
const ReMakeArrThreshold = 10 * 1024
type Client interface { type Client interface {
SendEvent(routeId string, event wshrpc.WaveEvent) SendEvent(routeId string, event wshrpc.WaveEvent)
} }
@ -25,15 +29,27 @@ type BrokerSubscription struct {
StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them) StarSubs map[string][]string // routeids subscribed to star scope (scopes with "*" or "**" in them)
} }
type persistKey struct {
Event string
Scope string
}
type persistEventWrap struct {
ArrTotalAdds int
Events []*wshrpc.WaveEvent
}
type BrokerType struct { type BrokerType struct {
Lock *sync.Mutex Lock *sync.Mutex
Client Client Client Client
SubMap map[string]*BrokerSubscription SubMap map[string]*BrokerSubscription
PersistMap map[persistKey]*persistEventWrap
} }
var Broker = &BrokerType{ var Broker = &BrokerType{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
SubMap: make(map[string]*BrokerSubscription), SubMap: make(map[string]*BrokerSubscription),
PersistMap: make(map[persistKey]*persistEventWrap),
} }
func scopeHasStarMatch(scope string) bool { func scopeHasStarMatch(scope string) bool {
@ -60,6 +76,7 @@ func (b *BrokerType) GetClient() Client {
// if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one) // if already subscribed, this will *resubscribe* with the new subscription (remove the old one, and replace with this one)
func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) { func (b *BrokerType) Subscribe(subRouteId string, sub wshrpc.SubscriptionRequest) {
log.Printf("[wps] sub %s %s\n", subRouteId, sub.Event)
if sub.Event == "" { if sub.Event == "" {
return return
} }
@ -121,6 +138,7 @@ func addStrToScopeMap(scopeMap map[string][]string, scope string, routeId string
} }
func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) { func (b *BrokerType) Unsubscribe(subRouteId string, eventName string) {
log.Printf("[wps] unsub %s %s\n", subRouteId, eventName)
b.Lock.Lock() b.Lock.Lock()
defer b.Lock.Unlock() defer b.Lock.Unlock()
b.unsubscribe_nolock(subRouteId, eventName) b.unsubscribe_nolock(subRouteId, eventName)
@ -156,7 +174,65 @@ func (b *BrokerType) UnsubscribeAll(subRouteId string) {
} }
} }
// does not take wildcards, use "" for all
func (b *BrokerType) ReadEventHistory(eventType string, scope string, maxItems int) []*wshrpc.WaveEvent {
if maxItems <= 0 {
return nil
}
b.Lock.Lock()
defer b.Lock.Unlock()
key := persistKey{Event: eventType, Scope: scope}
pe := b.PersistMap[key]
if pe == nil || len(pe.Events) == 0 {
return nil
}
if maxItems > len(pe.Events) {
maxItems = len(pe.Events)
}
// return new arr
rtn := make([]*wshrpc.WaveEvent, maxItems)
copy(rtn, pe.Events[len(pe.Events)-maxItems:])
return rtn
}
func (b *BrokerType) persistEvent(event wshrpc.WaveEvent) {
if event.Persist <= 0 {
return
}
numPersist := event.Persist
if numPersist > MaxPersist {
numPersist = MaxPersist
}
scopeMap := make(map[string]bool)
for _, scope := range event.Scopes {
scopeMap[scope] = true
}
scopeMap[""] = true
b.Lock.Lock()
defer b.Lock.Unlock()
for scope := range scopeMap {
key := persistKey{Event: event.Event, Scope: scope}
pe := b.PersistMap[key]
if pe == nil {
pe = &persistEventWrap{
ArrTotalAdds: 0,
Events: make([]*wshrpc.WaveEvent, 0, event.Persist),
}
b.PersistMap[key] = pe
}
pe.Events = append(pe.Events, &event)
pe.ArrTotalAdds++
if pe.ArrTotalAdds > ReMakeArrThreshold {
pe.Events = append([]*wshrpc.WaveEvent{}, pe.Events...)
pe.ArrTotalAdds = len(pe.Events)
}
}
}
func (b *BrokerType) Publish(event wshrpc.WaveEvent) { func (b *BrokerType) Publish(event wshrpc.WaveEvent) {
if event.Persist > 0 {
b.persistEvent(event)
}
client := b.GetClient() client := b.GetClient()
if client == nil { if client == nil {
return return

View File

@ -53,6 +53,12 @@ func EventPublishCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.
return err return err
} }
// command "eventreadhistory", wshserver.EventReadHistoryCommand
func EventReadHistoryCommand(w *wshutil.WshRpc, data wshrpc.CommandEventReadHistoryData, opts *wshrpc.RpcOpts) ([]*wshrpc.WaveEvent, error) {
resp, err := sendRpcRequestCallHelper[[]*wshrpc.WaveEvent](w, "eventreadhistory", data, opts)
return resp, err
}
// command "eventrecv", wshserver.EventRecvCommand // command "eventrecv", wshserver.EventRecvCommand
func EventRecvCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.RpcOpts) error { func EventRecvCommand(w *wshutil.WshRpc, data wshrpc.WaveEvent, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "eventrecv", data, opts) _, err := sendRpcRequestCallHelper[any](w, "eventrecv", data, opts)

View File

@ -0,0 +1,69 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshremote
import (
"log"
"strconv"
"time"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshclient"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
)
func getCpuData(values map[string]float64) {
percentArr, err := cpu.Percent(0, false)
if err != nil {
return
}
if len(percentArr) > 0 {
values[wshrpc.TimeSeries_Cpu] = percentArr[0]
}
percentArr, err = cpu.Percent(0, true)
if err != nil {
return
}
for idx, percent := range percentArr {
values[wshrpc.TimeSeries_Cpu+":"+strconv.Itoa(idx)] = percent
}
}
func getMemData(values map[string]float64) {
memData, err := mem.VirtualMemory()
if err != nil {
return
}
values["mem:total"] = float64(memData.Total)
values["mem:available"] = float64(memData.Available)
values["mem:used"] = float64(memData.Used)
values["mem:free"] = float64(memData.Free)
}
func generateSingleServerData(client *wshutil.WshRpc, connName string) {
now := time.Now()
values := make(map[string]float64)
getCpuData(values)
getMemData(values)
tsData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: values}
event := wshrpc.WaveEvent{
Event: wshrpc.Event_SysInfo,
Scopes: []string{connName},
Data: tsData,
Persist: 1024,
}
wshclient.EventPublishCommand(client, event, &wshrpc.RpcOpts{NoResponse: true})
}
func RunSysInfoLoop(client *wshutil.WshRpc, connName string) {
defer func() {
log.Printf("sysinfo loop ended conn:%s\n", connName)
}()
for {
generateSingleServerData(client, connName)
time.Sleep(1 * time.Second)
}
}

View File

@ -27,6 +27,7 @@ const (
const ( const (
Event_BlockClose = "blockclose" Event_BlockClose = "blockclose"
Event_ConnChange = "connchange" Event_ConnChange = "connchange"
Event_SysInfo = "sysinfo"
) )
const ( const (
@ -50,6 +51,7 @@ const (
Command_EventSub = "eventsub" Command_EventSub = "eventsub"
Command_EventUnsub = "eventunsub" Command_EventUnsub = "eventunsub"
Command_EventUnsubAll = "eventunsuball" Command_EventUnsubAll = "eventunsuball"
Command_EventReadHistory = "eventreadhistory"
Command_StreamTest = "streamtest" Command_StreamTest = "streamtest"
Command_StreamWaveAi = "streamwaveai" Command_StreamWaveAi = "streamwaveai"
Command_StreamCpuData = "streamcpudata" Command_StreamCpuData = "streamcpudata"
@ -86,6 +88,7 @@ type WshRpcInterface interface {
EventSubCommand(ctx context.Context, data SubscriptionRequest) error EventSubCommand(ctx context.Context, data SubscriptionRequest) error
EventUnsubCommand(ctx context.Context, data string) error EventUnsubCommand(ctx context.Context, data string) error
EventUnsubAllCommand(ctx context.Context) error EventUnsubAllCommand(ctx context.Context) error
EventReadHistoryCommand(ctx context.Context, data CommandEventReadHistoryData) ([]*WaveEvent, error)
StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int] StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int]
StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType] StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType]
StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData] StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData]
@ -229,6 +232,7 @@ type WaveEvent struct {
Event string `json:"event"` Event string `json:"event"`
Scopes []string `json:"scopes,omitempty"` Scopes []string `json:"scopes,omitempty"`
Sender string `json:"sender,omitempty"` Sender string `json:"sender,omitempty"`
Persist int `json:"persist,omitempty"`
Data any `json:"data,omitempty"` Data any `json:"data,omitempty"`
} }
@ -242,6 +246,12 @@ type SubscriptionRequest struct {
AllScopes bool `json:"allscopes,omitempty"` AllScopes bool `json:"allscopes,omitempty"`
} }
type CommandEventReadHistoryData struct {
Event string `json:"event"`
Scope string `json:"scope"`
MaxItems int `json:"maxitems"`
}
type OpenAiStreamRequest struct { type OpenAiStreamRequest struct {
ClientId string `json:"clientid,omitempty"` ClientId string `json:"clientid,omitempty"`
Opts *OpenAIOptsType `json:"opts"` Opts *OpenAIOptsType `json:"opts"`

View File

@ -15,7 +15,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller" "github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/filestore"
@ -102,51 +101,6 @@ func (ws *WshServer) StreamWaveAiCommand(ctx context.Context, request wshrpc.Ope
return waveai.RunLocalCompletionStream(ctx, request) return waveai.RunLocalCompletionStream(ctx, request)
} }
func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] {
rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData])
go func() {
defer close(rtn)
MakePlotData(ctx, request.Id)
// we can use the err from MakePlotData to determine if a routine is already running
// but we still need a way to close it or get data from it
for {
now := time.Now()
percent, err := cpu.Percent(0, false)
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err}
}
var value float64
if len(percent) > 0 {
value = percent[0]
} else {
value = 0.0
}
cpuData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: map[string]float64{wshrpc.TimeSeries_Cpu: value}}
rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Response: cpuData}
time.Sleep(time.Second * 1)
// this will end the goroutine if the block is closed
err = SavePlotData(ctx, request.Id, "")
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err}
return
}
blockData, getBlockDataErr := wstore.DBMustGet[*waveobj.Block](ctx, request.Id)
if getBlockDataErr != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: getBlockDataErr}
return
}
count := blockData.Meta.GetInt(waveobj.MetaKey_Count, 0)
if count != request.Count {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: fmt.Errorf("new instance created. canceling old goroutine")}
return
}
}
}()
return rtn
}
func MakePlotData(ctx context.Context, blockId string) error { func MakePlotData(ctx context.Context, blockId string) error {
block, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId) block, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil { if err != nil {
@ -473,6 +427,11 @@ func (ws *WshServer) EventUnsubAllCommand(ctx context.Context) error {
return nil return nil
} }
func (ws *WshServer) EventReadHistoryCommand(ctx context.Context, data wshrpc.CommandEventReadHistoryData) ([]*wshrpc.WaveEvent, error) {
events := wps.Broker.ReadEventHistory(data.Event, data.Scope, data.MaxItems)
return events, nil
}
func (ws *WshServer) SetConfigCommand(ctx context.Context, data waveobj.MetaMapType) error { func (ws *WshServer) SetConfigCommand(ctx context.Context, data waveobj.MetaMapType) error {
return wconfig.SetBaseConfigValue(data) return wconfig.SetBaseConfigValue(data)
} }