This commit is contained in:
sawka 2024-08-19 15:01:00 -07:00
parent 0d8c159101
commit 00958b8fed
6 changed files with 41 additions and 20 deletions

View File

@ -102,6 +102,11 @@ class WshServerType {
return WOS.wshServerRpcHelper_call("remotefileinfo", data, opts); return WOS.wshServerRpcHelper_call("remotefileinfo", data, opts);
} }
// command "remotestreamcpudata" [responsestream]
RemoteStreamCpuDataCommand(opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return WOS.wshServerRpcHelper_responsestream("remotestreamcpudata", null, opts);
}
// command "remotestreamfile" [responsestream] // command "remotestreamfile" [responsestream]
RemoteStreamFileCommand(data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<CommandRemoteStreamFileRtnData, void, boolean> { RemoteStreamFileCommand(data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<CommandRemoteStreamFileRtnData, void, boolean> {
return WOS.wshServerRpcHelper_responsestream("remotestreamfile", data, opts); return WOS.wshServerRpcHelper_responsestream("remotestreamfile", data, opts);
@ -128,7 +133,7 @@ class WshServerType {
} }
// command "streamcpudata" [responsestream] // command "streamcpudata" [responsestream]
StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator<CpuDataType, void, boolean> { StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator<TimeSeriesData, void, boolean> {
return WOS.wshServerRpcHelper_responsestream("streamcpudata", data, opts); return WOS.wshServerRpcHelper_responsestream("streamcpudata", data, opts);
} }

View File

@ -14,7 +14,7 @@ import * as React from "react";
import "./cpuplot.less"; import "./cpuplot.less";
type Point = { type Point = {
time: number; time: number; // note this is in seconds not milliseconds
value: number; value: number;
}; };
@ -89,7 +89,8 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel }) {
); );
try { try {
for await (const datum of dataGen) { for await (const datum of dataGen) {
addPlotData(datum); const ts = datum.ts;
addPlotData({ time: ts / 1000, value: datum.values?.["cpu"] });
} }
} catch (e) { } catch (e) {
console.log(e); console.log(e);

View File

@ -158,12 +158,6 @@ declare global {
count: number; count: number;
}; };
// wshrpc.CpuDataType
type CpuDataType = {
time: number;
value: number;
};
// wstore.FileDef // wstore.FileDef
type FileDef = { type FileDef = {
filetype?: string; filetype?: string;
@ -437,6 +431,12 @@ declare global {
disablewebgl: boolean; disablewebgl: boolean;
}; };
// wshrpc.TimeSeriesData
type TimeSeriesData = {
ts: number;
values: {[key: string]: number};
};
// wstore.UIContext // wstore.UIContext
type UIContext = { type UIContext = {
windowid: string; windowid: string;

View File

@ -125,6 +125,11 @@ func RemoteFileInfoCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts)
return resp, err return resp, err
} }
// command "remotestreamcpudata", wshserver.RemoteStreamCpuDataCommand
func RemoteStreamCpuDataCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] {
return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "remotestreamcpudata", nil, opts)
}
// command "remotestreamfile", wshserver.RemoteStreamFileCommand // command "remotestreamfile", wshserver.RemoteStreamFileCommand
func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] {
return sendRpcRequestResponseStreamHelper[wshrpc.CommandRemoteStreamFileRtnData](w, "remotestreamfile", data, opts) return sendRpcRequestResponseStreamHelper[wshrpc.CommandRemoteStreamFileRtnData](w, "remotestreamfile", data, opts)
@ -155,8 +160,8 @@ func SetViewCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockSetViewData, opts
} }
// command "streamcpudata", wshserver.StreamCpuDataCommand // command "streamcpudata", wshserver.StreamCpuDataCommand
func StreamCpuDataCommand(w *wshutil.WshRpc, data wshrpc.CpuDataRequest, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType] { func StreamCpuDataCommand(w *wshutil.WshRpc, data wshrpc.CpuDataRequest, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] {
return sendRpcRequestResponseStreamHelper[wshrpc.CpuDataType](w, "streamcpudata", data, opts) return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "streamcpudata", data, opts)
} }
// command "streamtest", wshserver.StreamTestCommand // command "streamtest", wshserver.StreamTestCommand

View File

@ -85,7 +85,7 @@ type WshRpcInterface interface {
EventUnsubAllCommand(ctx context.Context) error EventUnsubAllCommand(ctx context.Context) error
StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int] StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int]
StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType] StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType]
StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[CpuDataType] StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData]
TestCommand(ctx context.Context, data string) error TestCommand(ctx context.Context, data string) error
// remotes // remotes
@ -93,6 +93,7 @@ type WshRpcInterface interface {
RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error) RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error)
RemoteFileDeleteCommand(ctx context.Context, path string) error RemoteFileDeleteCommand(ctx context.Context, path string) error
RemoteWriteFileCommand(ctx context.Context, data CommandRemoteWriteFileData) error RemoteWriteFileCommand(ctx context.Context, data CommandRemoteWriteFileData) error
RemoteStreamCpuDataCommand(ctx context.Context) chan RespOrErrorUnion[TimeSeriesData]
} }
// for frontend // for frontend
@ -292,3 +293,12 @@ type CommandRemoteWriteFileData struct {
Data64 string `json:"data64"` Data64 string `json:"data64"`
CreateMode os.FileMode `json:"createmode,omitempty"` CreateMode os.FileMode `json:"createmode,omitempty"`
} }
const (
TimeSeries_Cpu = "cpu"
)
type TimeSeriesData struct {
Ts int64 `json:"ts"`
Values map[string]float64 `json:"values"`
}

View File

@ -99,8 +99,8 @@ func (ws *WshServer) StreamWaveAiCommand(ctx context.Context, request wshrpc.Ope
return waveai.RunLocalCompletionStream(ctx, request) return waveai.RunLocalCompletionStream(ctx, request)
} }
func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType] { func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] {
rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]) rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData])
go func() { go func() {
defer close(rtn) defer close(rtn)
MakePlotData(ctx, request.Id) MakePlotData(ctx, request.Id)
@ -110,7 +110,7 @@ func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.Cp
now := time.Now() now := time.Now()
percent, err := cpu.Percent(0, false) percent, err := cpu.Percent(0, false)
if err != nil { if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: err} rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err}
} }
var value float64 var value float64
if len(percent) > 0 { if len(percent) > 0 {
@ -118,23 +118,23 @@ func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.Cp
} else { } else {
value = 0.0 value = 0.0
} }
cpuData := wshrpc.CpuDataType{Time: now.UnixMilli() / 1000, Value: value} cpuData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: map[string]float64{wshrpc.TimeSeries_Cpu: value}}
rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Response: cpuData} rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Response: cpuData}
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// this will end the goroutine if the block is closed // this will end the goroutine if the block is closed
err = SavePlotData(ctx, request.Id, "") err = SavePlotData(ctx, request.Id, "")
if err != nil { if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: err} rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err}
return return
} }
blockData, getBlockDataErr := wstore.DBMustGet[*wstore.Block](ctx, request.Id) blockData, getBlockDataErr := wstore.DBMustGet[*wstore.Block](ctx, request.Id)
if getBlockDataErr != nil { if getBlockDataErr != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: getBlockDataErr} rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: getBlockDataErr}
return return
} }
count := blockData.Meta.GetInt(wstore.MetaKey_Count, 0) count := blockData.Meta.GetInt(wstore.MetaKey_Count, 0)
if count != request.Count { if count != request.Count {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: fmt.Errorf("new instance created. canceling old goroutine")} rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: fmt.Errorf("new instance created. canceling old goroutine")}
return return
} }