adding telemetry updates (#209)

This commit is contained in:
Mike Sawka 2024-08-08 18:24:54 -07:00 committed by GitHub
parent fc7c640e6b
commit ed0279ad72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1187 additions and 20 deletions

View File

@ -17,6 +17,9 @@ tasks:
deps:
- build:server
- build:wsh
env:
WCLOUD_ENDPOINT: "https://ot2e112zx5.execute-api.us-west-2.amazonaws.com/dev"
WCLOUD_WS_ENDPOINT: "wss://5lfzlg5crl.execute-api.us-west-2.amazonaws.com/dev/"
electron:start:
desc: Run the Electron application directly.

View File

@ -9,6 +9,7 @@ import (
"log"
"os"
"os/signal"
"runtime/debug"
"strconv"
"runtime"
@ -19,8 +20,10 @@ import (
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service"
"github.com/wavetermdev/thenextwave/pkg/telemetry"
"github.com/wavetermdev/thenextwave/pkg/util/shellutil"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wcloud"
"github.com/wavetermdev/thenextwave/pkg/wconfig"
"github.com/wavetermdev/thenextwave/pkg/web"
"github.com/wavetermdev/thenextwave/pkg/wshrpc/wshserver"
@ -31,6 +34,10 @@ import (
var WaveVersion = "0.0.0"
var BuildTime = "0"
const InitialTelemetryWait = 30 * time.Second
const TelemetryTick = 10 * time.Minute
const TelemetryInterval = 4 * time.Hour
const ReadySignalPidVarName = "WAVETERM_READY_SIGNAL_PID"
var shutdownOnce sync.Once
@ -40,6 +47,8 @@ func doShutdown(reason string) {
log.Printf("shutting down: %s\n", reason)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
shutdownActivityUpdate()
sendTelemetryWrapper()
// TODO deal with flush in progress
filestore.WFS.FlushCache(ctx)
watcher := wconfig.GetWatcher()
@ -81,12 +90,70 @@ func configWatcher() {
}
}
func telemetryLoop() {
var nextSend int64
time.Sleep(InitialTelemetryWait)
for {
if time.Now().Unix() > nextSend {
nextSend = time.Now().Add(TelemetryInterval).Unix()
sendTelemetryWrapper()
}
time.Sleep(TelemetryTick)
}
}
func sendTelemetryWrapper() {
defer func() {
r := recover()
if r == nil {
return
}
log.Printf("[error] in sendTelemetryWrapper: %v\n", r)
debug.PrintStack()
}()
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
client, err := wstore.DBGetSingleton[*wstore.Client](ctx)
if err != nil {
log.Printf("[error] getting client data for telemetry: %v\n", err)
return
}
err = wcloud.SendTelemetry(ctx, client.OID)
if err != nil {
log.Printf("[error] sending telemetry: %v\n", err)
}
}
func startupActivityUpdate() {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
activity := telemetry.ActivityUpdate{
Startup: 1,
}
activity.NumTabs, _ = wstore.DBGetCount[*wstore.Tab](ctx)
err := telemetry.UpdateActivity(ctx, activity) // set at least one record into activity (don't use go routine wrap here)
if err != nil {
log.Printf("error updating startup activity: %v\n", err)
}
}
func shutdownActivityUpdate() {
activity := telemetry.ActivityUpdate{Shutdown: 1}
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFn()
err := telemetry.UpdateActivity(ctx, activity) // do NOT use the go routine wrap here (this needs to be synchronous)
if err != nil {
log.Printf("error updating shutdown activity: %v\n", err)
}
}
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.SetPrefix("[wavesrv] ")
blockcontroller.WshServerFactoryFn = wshserver.MakeWshServer
web.WshServerFactoryFn = wshserver.MakeWshServer
wavebase.WaveVersion = WaveVersion
wavebase.BuildTime = BuildTime
err := service.ValidateServiceMap()
if err != nil {
@ -134,7 +201,9 @@ func main() {
return
}
installShutdownSignalHandlers()
startupActivityUpdate()
go stdinReadWatch()
go telemetryLoop()
configWatcher()
webListener, err := web.MakeTCPListener("web")
if err != nil {

View File

@ -0,0 +1 @@
DROP TABLE db_activity;

View File

@ -0,0 +1,11 @@
CREATE TABLE db_activity (
day varchar(20) PRIMARY KEY,
uploaded boolean NOT NULL,
tdata json NOT NULL,
tzname varchar(50) NOT NULL,
tzoffset int NOT NULL,
clientversion varchar(20) NOT NULL,
clientarch varchar(20) NOT NULL,
buildtime varchar(20) NOT NULL DEFAULT '-',
osrelease varchar(20) NOT NULL DEFAULT '-'
);

View File

@ -38,6 +38,10 @@ let globalIsQuitting = false;
let globalIsStarting = true;
let globalIsRelaunching = false;
// for activity updates
let wasActive = true;
let wasInFg = true;
const isDev = !electronApp.isPackaged;
const isDevVite = isDev && process.env.ELECTRON_RENDERER_URL;
if (isDev) {
@ -394,7 +398,9 @@ function createBrowserWindow(
});
});
win.webContents.on("before-input-event", (e, input) => {
// console.log("before-input-event", input);
if (win.isFocused()) {
wasActive = true;
}
});
win.on(
"resize",
@ -405,6 +411,8 @@ function createBrowserWindow(
debounce(400, (e) => mainResizeHandler(e, waveWindow.oid, win))
);
win.on("focus", () => {
wasInFg = true;
wasActive = true;
if (globalIsStarting) {
return;
}
@ -588,6 +596,30 @@ electron.ipcMain.on("contextmenu-show", (event, menuDefArr: ElectronContextMenuI
event.returnValue = true;
});
async function logActiveState() {
const activeState = { fg: wasInFg, active: wasActive, open: true };
const url = new URL(getWebServerEndpoint() + "/wave/log-active-state");
try {
const resp = await fetch(url, { method: "post", body: JSON.stringify(activeState) });
if (!resp.ok) {
console.log("error logging active state", resp.status, resp.statusText);
return;
}
} catch (e) {
console.log("error logging active state", e);
} finally {
// for next iteration
wasInFg = electron.BrowserWindow.getFocusedWindow()?.isFocused() ?? false;
wasActive = false;
}
}
// this isn't perfect, but gets the job done without being complicated
function runActiveTimer() {
logActiveState();
setTimeout(runActiveTimer, 60000);
}
function convertMenuDefArrToMenu(menuDefArr: ElectronContextMenuItem[]): electron.Menu {
const menuItems: electron.MenuItem[] = [];
for (const menuDef of menuDefArr) {
@ -836,6 +868,7 @@ async function appMain() {
await electronApp.whenReady();
await relaunchBrowserWindows();
await configureAutoUpdater();
setTimeout(runActiveTimer, 5000); // start active timer, wait 5s just to be safe
globalIsStarting = false;

View File

@ -324,6 +324,7 @@ declare global {
termthemes: {[key: string]: TermThemeType};
window: WindowSettingsType;
web: WebConfigType;
telemetry: TelemetrySettingsType;
defaultmeta?: MetaType;
presets?: {[key: string]: MetaType};
};
@ -363,6 +364,11 @@ declare global {
blockids: string[];
};
// wconfig.TelemetrySettingsType
type TelemetrySettingsType = {
enabled: boolean;
};
// shellexec.TermSize
type TermSize = {
rows: number;

1
go.mod
View File

@ -21,7 +21,6 @@ require (
github.com/shirou/gopsutil/v4 v4.24.7
github.com/spf13/cobra v1.8.1
github.com/wavetermdev/htmltoken v0.1.0
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94
golang.org/x/crypto v0.25.0
golang.org/x/term v0.22.0
)

2
go.sum
View File

@ -80,8 +80,6 @@ github.com/wavetermdev/htmltoken v0.1.0 h1:RMdA9zTfnYa5jRC4RRG3XNoV5NOP8EDxpaVPj
github.com/wavetermdev/htmltoken v0.1.0/go.mod h1:5FM0XV6zNYiNza2iaTcFGj+hnMtgqumFHO31Z8euquk=
github.com/wavetermdev/ssh_config v0.0.0-20240306041034-17e2087ebde2 h1:onqZrJVap1sm15AiIGTfWzdr6cEF0KdtddeuuOVhzyY=
github.com/wavetermdev/ssh_config v0.0.0-20240306041034-17e2087ebde2/go.mod h1:q2RIzfka+BXARoNexmF9gkxEX7DmvbW9P4hIVx2Kg4M=
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 h1:/SPCxd4KHlS4eRTreYEXWFRr8WfRFBcChlV5cgkaO58=
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94/go.mod h1:ywoo7DXdYueQ0tTPhVoB+wzRTgERSE19EA3mR6KGRaI=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=

View File

@ -7,7 +7,7 @@
"productName": "TheNextWave",
"description": "An Open-Source, AI-Native, Terminal Built for Seamless Workflows",
"license": "Apache-2.0",
"version": "0.0.6",
"version": "0.1.8",
"homepage": "https://waveterm.dev",
"build": {
"appId": "dev.commandline.thenextwave"

View File

@ -9,7 +9,7 @@ import (
"io/fs"
"os"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
"github.com/wavetermdev/thenextwave/pkg/util/dbutil"
)
// can return fs.ErrExist

174
pkg/telemetry/telemetry.go Normal file
View File

@ -0,0 +1,174 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package telemetry
import (
"context"
"database/sql/driver"
"log"
"time"
"github.com/wavetermdev/thenextwave/pkg/util/daystr"
"github.com/wavetermdev/thenextwave/pkg/util/dbutil"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wconfig"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const MaxTzNameLen = 50
// "terminal" should not be in this list
var allowedRenderers = map[string]bool{
"markdown": true,
"code": true,
"openai": true,
"csv": true,
"image": true,
"pdf": true,
"media": true,
"mustache": true,
}
type ActivityUpdate struct {
FgMinutes int
ActiveMinutes int
OpenMinutes int
NumTabs int
NewTab int
Startup int
Shutdown int
BuildTime string
Renderers map[string]int
}
type ActivityType struct {
Day string `json:"day"`
Uploaded bool `json:"-"`
TData TelemetryData `json:"tdata"`
TzName string `json:"tzname"`
TzOffset int `json:"tzoffset"`
ClientVersion string `json:"clientversion"`
ClientArch string `json:"clientarch"`
BuildTime string `json:"buildtime"`
OSRelease string `json:"osrelease"`
}
type TelemetryData struct {
ActiveMinutes int `json:"activeminutes"`
FgMinutes int `json:"fgminutes"`
OpenMinutes int `json:"openminutes"`
NumTabs int `json:"numtabs"`
NewTab int `json:"newtab"`
NumStartup int `json:"numstartup,omitempty"`
NumShutdown int `json:"numshutdown,omitempty"`
Renderers map[string]int `json:"renderers,omitempty"`
}
func (tdata TelemetryData) Value() (driver.Value, error) {
return dbutil.QuickValueJson(tdata)
}
func (tdata *TelemetryData) Scan(val interface{}) error {
return dbutil.QuickScanJson(tdata, val)
}
func IsTelemetryEnabled() bool {
settings := wconfig.GetWatcher().GetSettingsConfig()
if settings.Telemetry == nil || settings.Telemetry.Enabled == nil {
return true
}
return *settings.Telemetry.Enabled
}
func IsAllowedRenderer(renderer string) bool {
return allowedRenderers[renderer]
}
// Wraps UpdateCurrentActivity, spawns goroutine, and logs errors
func GoUpdateActivityWrap(update ActivityUpdate, debugStr string) {
go func() {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
err := UpdateActivity(ctx, update)
if err != nil {
// ignore error, just log, since this is not critical
log.Printf("error updating current activity (%s): %v\n", debugStr, err)
}
}()
}
func UpdateActivity(ctx context.Context, update ActivityUpdate) error {
now := time.Now()
dayStr := daystr.GetCurDayStr()
txErr := wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
var tdata TelemetryData
query := `SELECT tdata FROM db_activity WHERE day = ?`
found := tx.Get(&tdata, query, dayStr)
if !found {
query = `INSERT INTO db_activity (day, uploaded, tdata, tzname, tzoffset, clientversion, clientarch, buildtime, osrelease)
VALUES ( ?, 0, ?, ?, ?, ?, ?, ?, ?)`
tzName, tzOffset := now.Zone()
if len(tzName) > MaxTzNameLen {
tzName = tzName[0:MaxTzNameLen]
}
tx.Exec(query, dayStr, tdata, tzName, tzOffset, wavebase.WaveVersion, wavebase.ClientArch(), wavebase.BuildTime, wavebase.UnameKernelRelease())
}
tdata.FgMinutes += update.FgMinutes
tdata.ActiveMinutes += update.ActiveMinutes
tdata.OpenMinutes += update.OpenMinutes
tdata.NewTab += update.NewTab
tdata.NumStartup += update.Startup
tdata.NumShutdown += update.Shutdown
if update.NumTabs > 0 {
tdata.NumTabs = update.NumTabs
}
if len(update.Renderers) > 0 {
if tdata.Renderers == nil {
tdata.Renderers = make(map[string]int)
}
for key, val := range update.Renderers {
tdata.Renderers[key] += val
}
}
query = `UPDATE db_activity
SET tdata = ?,
clientversion = ?,
buildtime = ?
WHERE day = ?`
tx.Exec(query, tdata, wavebase.WaveVersion, wavebase.BuildTime, dayStr)
return nil
})
if txErr != nil {
return txErr
}
return nil
}
func GetNonUploadedActivity(ctx context.Context) ([]*ActivityType, error) {
var rtn []*ActivityType
txErr := wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
query := `SELECT * FROM db_activity WHERE uploaded = 0 ORDER BY day DESC LIMIT 30`
tx.Select(&rtn, query)
return nil
})
if txErr != nil {
return nil, txErr
}
return rtn, nil
}
func MarkActivityAsUploaded(ctx context.Context, activityArr []*ActivityType) error {
dayStr := daystr.GetCurDayStr()
txErr := wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
query := `UPDATE db_activity SET uploaded = 1 WHERE day = ?`
for _, activity := range activityArr {
if activity.Day == dayStr {
continue
}
tx.Exec(query, activity.Day)
}
return nil
})
return txErr
}

104
pkg/util/daystr/daystr.go Normal file
View File

@ -0,0 +1,104 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package daystr
import (
"fmt"
"regexp"
"strconv"
"time"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
)
var customDayStrRe = regexp.MustCompile(`^((?:\d{4}-\d{2}-\d{2})|today|yesterday|bom|bow)?((?:[+-]\d+[dwm])*)$`)
var daystrRe = regexp.MustCompile(`^(\d{4})-(\d{2})-(\d{2})$`)
func GetCurDayStr() string {
now := time.Now()
dayStr := now.Format("2006-01-02")
return dayStr
}
func GetRelDayStr(relDays int) string {
now := time.Now()
dayStr := now.AddDate(0, 0, relDays).Format("2006-01-02")
return dayStr
}
// accepts a custom format string to return a daystr
// can be either a prefix, a delta, or a prefix w/ a delta
// if no prefix is given, "today" is assumed
// examples: today-2d, bow, bom+1m-1d (that's end of the month), 2024-04-01+1w
//
// prefixes:
//
// yyyy-mm-dd
// today
// yesterday
// bom (beginning of month)
// bow (beginning of week -- sunday)
//
// deltas:
//
// +[n]d, -[n]d (e.g. +1d, -5d)
// +[n]w, -[n]w (e.g. +2w)
// +[n]m, -[n]m (e.g. -1m)
// deltas can be combined e.g. +1w-2d
func GetCustomDayStr(format string) (string, error) {
m := customDayStrRe.FindStringSubmatch(format)
if m == nil {
return "", fmt.Errorf("invalid daystr format")
}
prefix, deltas := m[1], m[2]
if prefix == "" {
prefix = "today"
}
var rtnTime time.Time
now := time.Now()
switch prefix {
case "today":
rtnTime = now
case "yesterday":
rtnTime = now.AddDate(0, 0, -1)
case "bom":
rtnTime = time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
case "bow":
weekday := now.Weekday()
if weekday == time.Sunday {
rtnTime = now
} else {
rtnTime = now.AddDate(0, 0, -int(weekday))
}
default:
m = daystrRe.FindStringSubmatch(prefix)
if m == nil {
return "", fmt.Errorf("invalid prefix format")
}
year, month, day := m[1], m[2], m[3]
yearInt, monthInt, dayInt := utilfn.AtoiNoErr(year), utilfn.AtoiNoErr(month), utilfn.AtoiNoErr(day)
if yearInt == 0 || monthInt == 0 || dayInt == 0 {
return "", fmt.Errorf("invalid prefix format")
}
rtnTime = time.Date(yearInt, time.Month(monthInt), dayInt, 0, 0, 0, 0, now.Location())
}
for _, delta := range regexp.MustCompile(`[+-]\d+[dwm]`).FindAllString(deltas, -1) {
deltaVal, err := strconv.Atoi(delta[1 : len(delta)-1])
if err != nil {
return "", fmt.Errorf("invalid delta format")
}
if delta[0] == '-' {
deltaVal = -deltaVal
}
switch delta[len(delta)-1] {
case 'd':
rtnTime = rtnTime.AddDate(0, 0, deltaVal)
case 'w':
rtnTime = rtnTime.AddDate(0, 0, deltaVal*7)
case 'm':
rtnTime = rtnTime.AddDate(0, deltaVal, 0)
}
}
return rtnTime.Format("2006-01-02"), nil
}

View File

@ -0,0 +1,237 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package dbutil
import (
"fmt"
"reflect"
"strings"
"github.com/sawka/txwrap"
)
type DBMappable interface {
UseDBMap()
}
type MapEntry[T any] struct {
Key string
Val T
}
type MapConverter interface {
ToMap() map[string]interface{}
FromMap(map[string]interface{}) bool
}
type HasSimpleKey interface {
GetSimpleKey() string
}
type HasSimpleInt64Key interface {
GetSimpleKey() int64
}
type MapConverterPtr[T any] interface {
MapConverter
*T
}
type DBMappablePtr[T any] interface {
DBMappable
*T
}
func FromMap[PT MapConverterPtr[T], T any](m map[string]any) PT {
if len(m) == 0 {
return nil
}
rtn := PT(new(T))
ok := rtn.FromMap(m)
if !ok {
return nil
}
return rtn
}
func GetMapGen[PT MapConverterPtr[T], T any](tx *txwrap.TxWrap, query string, args ...interface{}) PT {
m := tx.GetMap(query, args...)
return FromMap[PT](m)
}
func GetMappable[PT DBMappablePtr[T], T any](tx *txwrap.TxWrap, query string, args ...interface{}) PT {
m := tx.GetMap(query, args...)
if len(m) == 0 {
return nil
}
rtn := PT(new(T))
FromDBMap(rtn, m)
return rtn
}
func SelectMappable[PT DBMappablePtr[T], T any](tx *txwrap.TxWrap, query string, args ...interface{}) []PT {
var rtn []PT
marr := tx.SelectMaps(query, args...)
for _, m := range marr {
if len(m) == 0 {
continue
}
val := PT(new(T))
FromDBMap(val, m)
rtn = append(rtn, val)
}
return rtn
}
func SelectMapsGen[PT MapConverterPtr[T], T any](tx *txwrap.TxWrap, query string, args ...interface{}) []PT {
var rtn []PT
marr := tx.SelectMaps(query, args...)
for _, m := range marr {
val := FromMap[PT](m)
if val != nil {
rtn = append(rtn, val)
}
}
return rtn
}
func SelectSimpleMap[T any](tx *txwrap.TxWrap, query string, args ...interface{}) map[string]T {
var rtn []MapEntry[T]
tx.Select(&rtn, query, args...)
if len(rtn) == 0 {
return nil
}
rtnMap := make(map[string]T)
for _, entry := range rtn {
rtnMap[entry.Key] = entry.Val
}
return rtnMap
}
func MakeGenMap[T HasSimpleKey](arr []T) map[string]T {
rtn := make(map[string]T)
for _, val := range arr {
rtn[val.GetSimpleKey()] = val
}
return rtn
}
func MakeGenMapInt64[T HasSimpleInt64Key](arr []T) map[int64]T {
rtn := make(map[int64]T)
for _, val := range arr {
rtn[val.GetSimpleKey()] = val
}
return rtn
}
func isStructType(rt reflect.Type) bool {
if rt.Kind() == reflect.Struct {
return true
}
if rt.Kind() == reflect.Pointer && rt.Elem().Kind() == reflect.Struct {
return true
}
return false
}
func isByteArrayType(t reflect.Type) bool {
return t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8
}
func isStringMapType(t reflect.Type) bool {
return t.Kind() == reflect.Map && t.Key().Kind() == reflect.String
}
func ToDBMap(v DBMappable, useBytes bool) map[string]interface{} {
if CheckNil(v) {
return nil
}
rv := reflect.ValueOf(v)
if rv.Kind() == reflect.Pointer {
rv = rv.Elem()
}
if rv.Kind() != reflect.Struct {
panic(fmt.Sprintf("invalid type %T (non-struct) passed to StructToDBMap", v))
}
rt := rv.Type()
m := make(map[string]interface{})
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
fieldVal := rv.FieldByIndex(field.Index)
dbName := field.Tag.Get("dbmap")
if dbName == "" {
dbName = strings.ToLower(field.Name)
}
if dbName == "-" {
continue
}
if isByteArrayType(field.Type) {
m[dbName] = fieldVal.Interface()
} else if field.Type.Kind() == reflect.Slice {
if useBytes {
m[dbName] = QuickJsonArrBytes(fieldVal.Interface())
} else {
m[dbName] = QuickJsonArr(fieldVal.Interface())
}
} else if isStructType(field.Type) || isStringMapType(field.Type) {
if useBytes {
m[dbName] = QuickJsonBytes(fieldVal.Interface())
} else {
m[dbName] = QuickJson(fieldVal.Interface())
}
} else {
m[dbName] = fieldVal.Interface()
}
}
return m
}
func FromDBMap(v DBMappable, m map[string]interface{}) {
if CheckNil(v) {
panic("StructFromDBMap, v cannot be nil")
}
rv := reflect.ValueOf(v)
if rv.Kind() == reflect.Pointer {
rv = rv.Elem()
}
if rv.Kind() != reflect.Struct {
panic(fmt.Sprintf("invalid type %T (non-struct) passed to StructFromDBMap", v))
}
rt := rv.Type()
numFields := rt.NumField()
for i := 0; i < numFields; i++ {
field := rt.Field(i)
fieldVal := rv.FieldByIndex(field.Index)
dbName := field.Tag.Get("dbmap")
if dbName == "" {
dbName = strings.ToLower(field.Name)
}
if dbName == "-" {
continue
}
if isByteArrayType(field.Type) {
barrVal := fieldVal.Addr().Interface()
QuickSetBytes(barrVal.(*[]byte), m, dbName)
} else if field.Type.Kind() == reflect.Slice {
QuickSetJsonArr(fieldVal.Addr().Interface(), m, dbName)
} else if isStructType(field.Type) || isStringMapType(field.Type) {
QuickSetJson(fieldVal.Addr().Interface(), m, dbName)
} else if field.Type.Kind() == reflect.String {
strVal := fieldVal.Addr().Interface()
QuickSetStr(strVal.(*string), m, dbName)
} else if field.Type.Kind() == reflect.Int64 {
intVal := fieldVal.Addr().Interface()
QuickSetInt64(intVal.(*int64), m, dbName)
} else if field.Type.Kind() == reflect.Int {
intVal := fieldVal.Addr().Interface()
QuickSetInt(intVal.(*int), m, dbName)
} else if field.Type.Kind() == reflect.Bool {
boolVal := fieldVal.Addr().Interface()
QuickSetBool(boolVal.(*bool), m, dbName)
} else {
panic(fmt.Sprintf("StructFromDBMap invalid field type %v in %T", fieldVal.Type(), v))
}
}
}

235
pkg/util/dbutil/dbutil.go Normal file
View File

@ -0,0 +1,235 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package dbutil
import (
"database/sql/driver"
"encoding/json"
"fmt"
"reflect"
"strconv"
)
func QuickSetStr(strVal *string, m map[string]interface{}, name string) {
v, ok := m[name]
if !ok {
return
}
ival, ok := v.(int64)
if ok {
*strVal = strconv.FormatInt(ival, 10)
return
}
str, ok := v.(string)
if !ok {
return
}
*strVal = str
}
func QuickSetInt(ival *int, m map[string]interface{}, name string) {
v, ok := m[name]
if !ok {
return
}
sqlInt, ok := v.(int)
if ok {
*ival = sqlInt
return
}
sqlInt64, ok := v.(int64)
if ok {
*ival = int(sqlInt64)
return
}
}
func QuickSetNullableInt64(ival **int64, m map[string]any, name string) {
v, ok := m[name]
if !ok {
// set to nil
return
}
sqlInt64, ok := v.(int64)
if ok {
*ival = &sqlInt64
return
}
sqlInt, ok := v.(int)
if ok {
sqlInt64 = int64(sqlInt)
*ival = &sqlInt64
return
}
}
func QuickSetInt64(ival *int64, m map[string]interface{}, name string) {
v, ok := m[name]
if !ok {
// leave as zero
return
}
sqlInt64, ok := v.(int64)
if ok {
*ival = sqlInt64
return
}
sqlInt, ok := v.(int)
if ok {
*ival = int64(sqlInt)
return
}
}
func QuickSetBool(bval *bool, m map[string]interface{}, name string) {
v, ok := m[name]
if !ok {
return
}
sqlInt, ok := v.(int64)
if ok {
if sqlInt > 0 {
*bval = true
}
return
}
sqlBool, ok := v.(bool)
if ok {
*bval = sqlBool
}
}
func QuickSetBytes(bval *[]byte, m map[string]interface{}, name string) {
v, ok := m[name]
if !ok {
return
}
sqlBytes, ok := v.([]byte)
if ok {
*bval = sqlBytes
}
}
func getByteArr(m map[string]any, name string, def string) ([]byte, bool) {
v, ok := m[name]
if !ok {
return nil, false
}
barr, ok := v.([]byte)
if !ok {
str, ok := v.(string)
if !ok {
return nil, false
}
barr = []byte(str)
}
if len(barr) == 0 {
barr = []byte(def)
}
return barr, true
}
func QuickSetJson(ptr interface{}, m map[string]interface{}, name string) {
barr, ok := getByteArr(m, name, "{}")
if !ok {
return
}
json.Unmarshal(barr, ptr)
}
func QuickSetNullableJson(ptr interface{}, m map[string]interface{}, name string) {
barr, ok := getByteArr(m, name, "null")
if !ok {
return
}
json.Unmarshal(barr, ptr)
}
func QuickSetJsonArr(ptr interface{}, m map[string]interface{}, name string) {
barr, ok := getByteArr(m, name, "[]")
if !ok {
return
}
json.Unmarshal(barr, ptr)
}
func CheckNil(v interface{}) bool {
rv := reflect.ValueOf(v)
if !rv.IsValid() {
return true
}
switch rv.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Pointer, reflect.Slice:
return rv.IsNil()
default:
return false
}
}
func QuickNullableJson(v interface{}) string {
if CheckNil(v) {
return "null"
}
barr, _ := json.Marshal(v)
return string(barr)
}
func QuickJson(v interface{}) string {
if CheckNil(v) {
return "{}"
}
barr, _ := json.Marshal(v)
return string(barr)
}
func QuickJsonBytes(v interface{}) []byte {
if CheckNil(v) {
return []byte("{}")
}
barr, _ := json.Marshal(v)
return barr
}
func QuickJsonArr(v interface{}) string {
if CheckNil(v) {
return "[]"
}
barr, _ := json.Marshal(v)
return string(barr)
}
func QuickJsonArrBytes(v interface{}) []byte {
if CheckNil(v) {
return []byte("[]")
}
barr, _ := json.Marshal(v)
return barr
}
func QuickScanJson(ptr interface{}, val interface{}) error {
barrVal, ok := val.([]byte)
if !ok {
strVal, ok := val.(string)
if !ok {
return fmt.Errorf("cannot scan '%T' into '%T'", val, ptr)
}
barrVal = []byte(strVal)
}
if len(barrVal) == 0 {
barrVal = []byte("{}")
}
return json.Unmarshal(barrVal, ptr)
}
func QuickValueJson(v interface{}) (driver.Value, error) {
if CheckNil(v) {
return "{}", nil
}
barr, err := json.Marshal(v)
if err != nil {
return nil, err
}
return string(barr), nil
}

View File

@ -20,6 +20,7 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"syscall"
"unicode/utf8"
@ -869,3 +870,11 @@ func AtomicRenameCopy(dstPath string, srcPath string, perms os.FileMode) error {
}
return nil
}
func AtoiNoErr(str string) int {
val, err := strconv.Atoi(str)
if err != nil {
return 0
}
return val
}

View File

@ -14,6 +14,7 @@ import (
openaiapi "github.com/sashabaranov/go-openai"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wcloud"
"github.com/wavetermdev/thenextwave/pkg/wshrpc"
"github.com/gorilla/websocket"
@ -108,28 +109,37 @@ func ConvertPrompt(prompt []wshrpc.OpenAIPromptMessageType) []openaiapi.ChatComp
return rtn
}
func makeAIError(err error) wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType] {
return wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: err}
}
func RunCloudCompletionStream(ctx context.Context, request wshrpc.OpenAiStreamRequest) chan wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType] {
rtn := make(chan wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType])
wsEndpoint := wcloud.GetWSEndpoint()
go func() {
defer close(rtn)
if wsEndpoint == "" {
rtn <- makeAIError(fmt.Errorf("no cloud ws endpoint found"))
return
}
if request.Opts == nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("no openai opts found")}
rtn <- makeAIError(fmt.Errorf("no openai opts found"))
return
}
websocketContext, dialCancelFn := context.WithTimeout(context.Background(), CloudWebsocketConnectTimeout)
defer dialCancelFn()
conn, _, err := websocket.DefaultDialer.DialContext(websocketContext, GetWSEndpoint(), nil)
conn, _, err := websocket.DefaultDialer.DialContext(websocketContext, wsEndpoint, nil)
if err == context.DeadlineExceeded {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, timed out connecting to cloud server: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, timed out connecting to cloud server: %v", err))
return
} else if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, websocket connect error: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, websocket connect error: %v", err))
return
}
defer func() {
err = conn.Close()
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("unable to close openai channel: %v", err)}
rtn <- makeAIError(fmt.Errorf("unable to close openai channel: %v", err))
}
}()
reqPk := MakeOpenAICloudReqPacket()
@ -139,12 +149,12 @@ func RunCloudCompletionStream(ctx context.Context, request wshrpc.OpenAiStreamRe
reqPk.MaxChoices = request.Opts.MaxChoices
configMessageBuf, err := json.Marshal(reqPk)
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, packet marshal error: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, packet marshal error: %v", err))
return
}
err = conn.WriteMessage(websocket.TextMessage, configMessageBuf)
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, websocket write config error: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, websocket write config error: %v", err))
return
}
for {
@ -154,14 +164,13 @@ func RunCloudCompletionStream(ctx context.Context, request wshrpc.OpenAiStreamRe
}
if err != nil {
log.Printf("err received: %v", err)
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, websocket error reading message: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, websocket error reading message: %v", err))
break
}
var streamResp *wshrpc.OpenAIPacketType
err = json.Unmarshal(socketMessage, &streamResp)
log.Printf("ai resp: %v", streamResp)
if err != nil {
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("OpenAI request, websocket response json decode error: %v", err)}
rtn <- makeAIError(fmt.Errorf("OpenAI request, websocket response json decode error: %v", err))
break
}
if streamResp.Error == PacketEOFStr {
@ -169,7 +178,7 @@ func RunCloudCompletionStream(ctx context.Context, request wshrpc.OpenAiStreamRe
break
} else if streamResp.Error != "" {
// use error from server directly
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Error: fmt.Errorf("%v", streamResp.Error)}
rtn <- makeAIError(fmt.Errorf("%v", streamResp.Error))
break
}
rtn <- wshrpc.RespOrErrorUnion[wshrpc.OpenAIPacketType]{Response: *streamResp}

View File

@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
@ -22,6 +23,7 @@ import (
// set by main-server.go
var WaveVersion = "0.0.0"
var BuildTime = "0"
const DefaultWaveHome = "~/.w2"
const DevWaveHome = "~/.w2-dev"
@ -179,3 +181,35 @@ func AcquireWaveLock() (*filemutex.FileMutex, error) {
err = m.TryLock()
return m, err
}
func ClientArch() string {
return fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH)
}
var releaseRegex = regexp.MustCompile(`^(\d+\.\d+\.\d+)`)
var osReleaseOnce = &sync.Once{}
var osRelease string
func unameKernelRelease() string {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
out, err := exec.CommandContext(ctx, "uname", "-r").CombinedOutput()
if err != nil {
log.Printf("error executing uname -r: %v\n", err)
return "-"
}
releaseStr := strings.TrimSpace(string(out))
m := releaseRegex.FindStringSubmatch(releaseStr)
if m == nil || len(m) < 2 {
log.Printf("invalid uname -r output: [%s]\n", releaseStr)
return "-"
}
return m[1]
}
func UnameKernelRelease() string {
osReleaseOnce.Do(func() {
osRelease = unameKernelRelease()
})
return osRelease
}

159
pkg/wcloud/wcloud.go Normal file
View File

@ -0,0 +1,159 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wcloud
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/telemetry"
"github.com/wavetermdev/thenextwave/pkg/util/daystr"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
)
const WCloudEndpoint = "https://api.waveterm.dev/central"
const WCloudEndpointVarName = "WCLOUD_ENDPOINT"
const WCloudWSEndpoint = "wss://wsapi.waveterm.dev/"
const WCloudWSEndpointVarName = "WCLOUD_WS_ENDPOINT"
const APIVersion = 1
const MaxPtyUpdateSize = (128 * 1024)
const MaxUpdatesPerReq = 10
const MaxUpdatesToDeDup = 1000
const MaxUpdateWriterErrors = 3
const WCloudDefaultTimeout = 5 * time.Second
const WCloudWebShareUpdateTimeout = 15 * time.Second
// setting to 1M to be safe (max is 6M for API-GW + Lambda, but there is base64 encoding and upload time)
// we allow one extra update past this estimated size
const MaxUpdatePayloadSize = 1 * (1024 * 1024)
const TelemetryUrl = "/telemetry"
const NoTelemetryUrl = "/no-telemetry"
const WebShareUpdateUrl = "/auth/web-share-update"
func GetEndpoint() string {
if !wavebase.IsDevMode() {
return WCloudEndpoint
}
endpoint := os.Getenv(WCloudEndpointVarName)
if endpoint == "" || !strings.HasPrefix(endpoint, "https://") {
log.Printf("Invalid wcloud dev endpoint, WCLOUD_ENDPOINT not set or invalid\n")
return ""
}
return endpoint
}
func GetWSEndpoint() string {
if !wavebase.IsDevMode() {
return WCloudWSEndpoint
}
endpoint := os.Getenv(WCloudWSEndpointVarName)
if endpoint == "" || !strings.HasPrefix(endpoint, "wss://") {
log.Printf("Invalid wcloud ws dev endpoint, WCLOUD_WS_ENDPOINT not set or invalid\n")
return ""
}
return endpoint
}
func makeAnonPostReq(ctx context.Context, apiUrl string, data interface{}) (*http.Request, error) {
endpoint := GetEndpoint()
if endpoint == "" {
return nil, errors.New("wcloud endpoint not set")
}
var dataReader io.Reader
if data != nil {
byteArr, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("error marshaling json for %s request: %v", apiUrl, err)
}
dataReader = bytes.NewReader(byteArr)
}
fullUrl := GetEndpoint() + apiUrl
req, err := http.NewRequestWithContext(ctx, "POST", fullUrl, dataReader)
if err != nil {
return nil, fmt.Errorf("error creating %s request: %v", apiUrl, err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-PromptAPIVersion", strconv.Itoa(APIVersion))
req.Header.Set("X-PromptAPIUrl", apiUrl)
req.Close = true
return req, nil
}
func doRequest(req *http.Request, outputObj interface{}) (*http.Response, error) {
apiUrl := req.Header.Get("X-PromptAPIUrl")
log.Printf("[wcloud] sending request %s %v\n", req.Method, req.URL)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error contacting wcloud %q service: %v", apiUrl, err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return resp, fmt.Errorf("error reading %q response body: %v", apiUrl, err)
}
if resp.StatusCode != http.StatusOK {
return resp, fmt.Errorf("error contacting wcloud %q service: %s", apiUrl, resp.Status)
}
if outputObj != nil && resp.Header.Get("Content-Type") == "application/json" {
err = json.Unmarshal(bodyBytes, outputObj)
if err != nil {
return resp, fmt.Errorf("error decoding json: %v", err)
}
}
return resp, nil
}
func SendTelemetry(ctx context.Context, clientId string) error {
if !telemetry.IsTelemetryEnabled() {
log.Printf("telemetry disabled, not sending\n")
return nil
}
activity, err := telemetry.GetNonUploadedActivity(ctx)
if err != nil {
return fmt.Errorf("cannot get activity: %v", err)
}
if len(activity) == 0 {
return nil
}
log.Printf("[wcloud] sending telemetry data\n")
dayStr := daystr.GetCurDayStr()
input := TelemetryInputType{ClientId: clientId, UserId: clientId, CurDay: dayStr, Activity: activity}
req, err := makeAnonPostReq(ctx, TelemetryUrl, input)
if err != nil {
return err
}
_, err = doRequest(req, nil)
if err != nil {
return err
}
err = telemetry.MarkActivityAsUploaded(ctx, activity)
if err != nil {
return fmt.Errorf("error marking activity as uploaded: %v", err)
}
return nil
}
func SendNoTelemetryUpdate(ctx context.Context, clientId string, noTelemetryVal bool) error {
req, err := makeAnonPostReq(ctx, NoTelemetryUrl, NoTelemetryInputType{ClientId: clientId, Value: noTelemetryVal})
if err != nil {
return err
}
_, err = doRequest(req, nil)
if err != nil {
return err
}
return nil
}

21
pkg/wcloud/wclouddata.go Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wcloud
import (
"github.com/wavetermdev/thenextwave/pkg/telemetry"
)
type NoTelemetryInputType struct {
ClientId string `json:"clientid"`
Value bool `json:"value"`
}
type TelemetryInputType struct {
UserId string `json:"userid"`
ClientId string `json:"clientid"`
CurDay string `json:"curday"`
DefaultShell string `json:"defaultshell"`
Activity []*telemetry.ActivityType `json:"activity"`
}

View File

@ -229,8 +229,6 @@ func (w *Watcher) broadcast(message WatcherUpdate) {
if message.Error != "" {
log.Printf("watcher: error processing update: %v. error: %s", message.Settings, message.Error)
} else {
log.Printf("watcher: update: %v", message.Settings)
}
}

View File

@ -95,6 +95,10 @@ type WindowSettingsType struct {
BgColor *string `json:"bgcolor"`
}
type TelemetrySettingsType struct {
Enabled *bool `json:"enabled"`
}
type SettingsConfigType struct {
MimeTypes map[string]MimeTypeConfigType `json:"mimetypes"`
Term TerminalConfigType `json:"term"`
@ -105,6 +109,7 @@ type SettingsConfigType struct {
TermThemes TermThemesConfigType `json:"termthemes"`
WindowSettings WindowSettingsType `json:"window"`
Web WebConfigType `json:"web"`
Telemetry *TelemetrySettingsType `json:"telemetry"`
DefaultMeta *waveobj.MetaMapType `json:"defaultmeta,omitempty"`
Presets map[string]*waveobj.MetaMapType `json:"presets,omitempty"`

View File

@ -22,7 +22,9 @@ import (
"github.com/gorilla/mux"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service"
"github.com/wavetermdev/thenextwave/pkg/telemetry"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
type WebFnType = func(http.ResponseWriter, *http.Request)
@ -233,6 +235,65 @@ func handleStreamFile(w http.ResponseWriter, r *http.Request) {
}
}
func WriteJsonError(w http.ResponseWriter, errVal error) {
w.Header().Set(ContentTypeHeaderKey, ContentTypeJson)
w.WriteHeader(http.StatusOK)
errMap := make(map[string]interface{})
errMap["error"] = errVal.Error()
barr, _ := json.Marshal(errMap)
w.Write(barr)
}
func WriteJsonSuccess(w http.ResponseWriter, data interface{}) {
w.Header().Set(ContentTypeHeaderKey, ContentTypeJson)
rtnMap := make(map[string]interface{})
rtnMap["success"] = true
if data != nil {
rtnMap["data"] = data
}
barr, err := json.Marshal(rtnMap)
if err != nil {
WriteJsonError(w, err)
return
}
w.WriteHeader(http.StatusOK)
w.Write(barr)
}
type ClientActiveState struct {
Fg bool `json:"fg"`
Active bool `json:"active"`
Open bool `json:"open"`
}
// params: fg, active, open
func handleLogActiveState(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var activeState ClientActiveState
err := decoder.Decode(&activeState)
if err != nil {
WriteJsonError(w, fmt.Errorf("error decoding json: %v", err))
return
}
activity := telemetry.ActivityUpdate{}
if activeState.Fg {
activity.FgMinutes = 1
}
if activeState.Active {
activity.ActiveMinutes = 1
}
if activeState.Open {
activity.OpenMinutes = 1
}
activity.NumTabs, _ = wstore.DBGetCount[*wstore.Tab](r.Context())
err = telemetry.UpdateActivity(r.Context(), activity)
if err != nil {
WriteJsonError(w, fmt.Errorf("error updating activity: %w", err))
return
}
WriteJsonSuccess(w, true)
}
func WebFnWrap(opts WebFnOpts, fn WebFnType) WebFnType {
return func(w http.ResponseWriter, r *http.Request) {
defer func() {
@ -300,6 +361,7 @@ func RunWebServer(listener net.Listener) {
gr.HandleFunc("/wave/stream-file", WebFnWrap(WebFnOpts{AllowCaching: true}, handleStreamFile))
gr.HandleFunc("/wave/file", WebFnWrap(WebFnOpts{AllowCaching: false}, handleWaveFile))
gr.HandleFunc("/wave/service", WebFnWrap(WebFnOpts{JsonErrors: true}, handleService))
gr.HandleFunc("/wave/log-active-state", WebFnWrap(WebFnOpts{JsonErrors: true}, handleLogActiveState))
handler := http.TimeoutHandler(gr, HttpTimeoutDuration, "Timeout")
if wavebase.IsDevMode() {
handler = handlers.CORS(handlers.AllowedOrigins([]string{"*"}))(handler)

View File

@ -11,8 +11,8 @@ import (
"time"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/util/dbutil"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrNotFound = fmt.Errorf("not found")