From 00958b8fed8d93ec2d9ba2a90955e6a10cbe52f7 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 19 Aug 2024 15:01:00 -0700 Subject: [PATCH] refactor --- frontend/app/store/wshserver.ts | 7 ++++++- frontend/app/view/cpuplot/cpuplot.tsx | 5 +++-- frontend/types/gotypes.d.ts | 12 ++++++------ pkg/wshrpc/wshclient/wshclient.go | 9 +++++++-- pkg/wshrpc/wshrpctypes.go | 12 +++++++++++- pkg/wshrpc/wshserver/wshserver.go | 16 ++++++++-------- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/frontend/app/store/wshserver.ts b/frontend/app/store/wshserver.ts index fc2b15686..8afebe4d8 100644 --- a/frontend/app/store/wshserver.ts +++ b/frontend/app/store/wshserver.ts @@ -102,6 +102,11 @@ class WshServerType { return WOS.wshServerRpcHelper_call("remotefileinfo", data, opts); } + // command "remotestreamcpudata" [responsestream] + RemoteStreamCpuDataCommand(opts?: RpcOpts): AsyncGenerator { + return WOS.wshServerRpcHelper_responsestream("remotestreamcpudata", null, opts); + } + // command "remotestreamfile" [responsestream] RemoteStreamFileCommand(data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator { return WOS.wshServerRpcHelper_responsestream("remotestreamfile", data, opts); @@ -128,7 +133,7 @@ class WshServerType { } // command "streamcpudata" [responsestream] - StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator { + StreamCpuDataCommand(data: CpuDataRequest, opts?: RpcOpts): AsyncGenerator { return WOS.wshServerRpcHelper_responsestream("streamcpudata", data, opts); } diff --git a/frontend/app/view/cpuplot/cpuplot.tsx b/frontend/app/view/cpuplot/cpuplot.tsx index 61e6d1c43..e52d3205a 100644 --- a/frontend/app/view/cpuplot/cpuplot.tsx +++ b/frontend/app/view/cpuplot/cpuplot.tsx @@ -14,7 +14,7 @@ import * as React from "react"; import "./cpuplot.less"; type Point = { - time: number; + time: number; // note this is in seconds not milliseconds value: number; }; @@ -89,7 +89,8 @@ function CpuPlotView({ model }: { model: CpuPlotViewModel }) { ); try { for await (const datum of dataGen) { - addPlotData(datum); + const ts = datum.ts; + addPlotData({ time: ts / 1000, value: datum.values?.["cpu"] }); } } catch (e) { console.log(e); diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 86b3c7916..8b03d9bef 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -158,12 +158,6 @@ declare global { count: number; }; - // wshrpc.CpuDataType - type CpuDataType = { - time: number; - value: number; - }; - // wstore.FileDef type FileDef = { filetype?: string; @@ -437,6 +431,12 @@ declare global { disablewebgl: boolean; }; + // wshrpc.TimeSeriesData + type TimeSeriesData = { + ts: number; + values: {[key: string]: number}; + }; + // wstore.UIContext type UIContext = { windowid: string; diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 1d28245c3..cd9942de6 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -125,6 +125,11 @@ func RemoteFileInfoCommand(w *wshutil.WshRpc, data string, opts *wshrpc.RpcOpts) return resp, err } +// command "remotestreamcpudata", wshserver.RemoteStreamCpuDataCommand +func RemoteStreamCpuDataCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] { + return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "remotestreamcpudata", nil, opts) +} + // command "remotestreamfile", wshserver.RemoteStreamFileCommand func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteStreamFileRtnData] { return sendRpcRequestResponseStreamHelper[wshrpc.CommandRemoteStreamFileRtnData](w, "remotestreamfile", data, opts) @@ -155,8 +160,8 @@ func SetViewCommand(w *wshutil.WshRpc, data wshrpc.CommandBlockSetViewData, opts } // command "streamcpudata", wshserver.StreamCpuDataCommand -func StreamCpuDataCommand(w *wshutil.WshRpc, data wshrpc.CpuDataRequest, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType] { - return sendRpcRequestResponseStreamHelper[wshrpc.CpuDataType](w, "streamcpudata", data, opts) +func StreamCpuDataCommand(w *wshutil.WshRpc, data wshrpc.CpuDataRequest, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] { + return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "streamcpudata", data, opts) } // command "streamtest", wshserver.StreamTestCommand diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index d647d0868..489a93078 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -85,7 +85,7 @@ type WshRpcInterface interface { EventUnsubAllCommand(ctx context.Context) error StreamTestCommand(ctx context.Context) chan RespOrErrorUnion[int] StreamWaveAiCommand(ctx context.Context, request OpenAiStreamRequest) chan RespOrErrorUnion[OpenAIPacketType] - StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[CpuDataType] + StreamCpuDataCommand(ctx context.Context, request CpuDataRequest) chan RespOrErrorUnion[TimeSeriesData] TestCommand(ctx context.Context, data string) error // remotes @@ -93,6 +93,7 @@ type WshRpcInterface interface { RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error) RemoteFileDeleteCommand(ctx context.Context, path string) error RemoteWriteFileCommand(ctx context.Context, data CommandRemoteWriteFileData) error + RemoteStreamCpuDataCommand(ctx context.Context) chan RespOrErrorUnion[TimeSeriesData] } // for frontend @@ -292,3 +293,12 @@ type CommandRemoteWriteFileData struct { Data64 string `json:"data64"` CreateMode os.FileMode `json:"createmode,omitempty"` } + +const ( + TimeSeries_Cpu = "cpu" +) + +type TimeSeriesData struct { + Ts int64 `json:"ts"` + Values map[string]float64 `json:"values"` +} diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index e1176a509..b86af5df1 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -99,8 +99,8 @@ func (ws *WshServer) StreamWaveAiCommand(ctx context.Context, request wshrpc.Ope return waveai.RunLocalCompletionStream(ctx, request) } -func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType] { - rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]) +func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.CpuDataRequest) chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData] { + rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]) go func() { defer close(rtn) MakePlotData(ctx, request.Id) @@ -110,7 +110,7 @@ func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.Cp now := time.Now() percent, err := cpu.Percent(0, false) if err != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: err} + rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err} } var value float64 if len(percent) > 0 { @@ -118,23 +118,23 @@ func (ws *WshServer) StreamCpuDataCommand(ctx context.Context, request wshrpc.Cp } else { value = 0.0 } - cpuData := wshrpc.CpuDataType{Time: now.UnixMilli() / 1000, Value: value} - rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Response: cpuData} + cpuData := wshrpc.TimeSeriesData{Ts: now.UnixMilli(), Values: map[string]float64{wshrpc.TimeSeries_Cpu: value}} + rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Response: cpuData} time.Sleep(time.Second * 1) // this will end the goroutine if the block is closed err = SavePlotData(ctx, request.Id, "") if err != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: err} + rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: err} return } blockData, getBlockDataErr := wstore.DBMustGet[*wstore.Block](ctx, request.Id) if getBlockDataErr != nil { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: getBlockDataErr} + rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: getBlockDataErr} return } count := blockData.Meta.GetInt(wstore.MetaKey_Count, 0) if count != request.Count { - rtn <- wshrpc.RespOrErrorUnion[wshrpc.CpuDataType]{Error: fmt.Errorf("new instance created. canceling old goroutine")} + rtn <- wshrpc.RespOrErrorUnion[wshrpc.TimeSeriesData]{Error: fmt.Errorf("new instance created. canceling old goroutine")} return }