new wlog package to do distributed logging from waveshell back to wavesrv (#295)

This commit is contained in:
Mike Sawka 2024-02-15 17:42:43 -08:00 committed by GitHub
parent 07fa5bf9cb
commit e2e71898c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 121 additions and 17 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/wavetermdev/waveterm/waveshell/pkg/packet" "github.com/wavetermdev/waveterm/waveshell/pkg/packet"
"github.com/wavetermdev/waveterm/waveshell/pkg/server" "github.com/wavetermdev/waveterm/waveshell/pkg/server"
"github.com/wavetermdev/waveterm/waveshell/pkg/shexec" "github.com/wavetermdev/waveterm/waveshell/pkg/shexec"
"github.com/wavetermdev/waveterm/waveshell/pkg/wlog"
) )
var BuildTime = "0" var BuildTime = "0"
@ -39,6 +40,7 @@ func handleSingle() {
sender.Close() sender.Close()
sender.WaitForDone() sender.WaitForDone()
}() }()
wlog.LogConsumer = sender.SendLogPacket
initPacket := shexec.MakeInitPacket() initPacket := shexec.MakeInitPacket()
sender.SendPacket(initPacket) sender.SendPacket(initPacket)
if len(os.Args) >= 3 && os.Args[2] == "--version" { if len(os.Args) >= 3 && os.Args[2] == "--version" {
@ -133,11 +135,13 @@ func main() {
return return
} else if firstArg == "--single" || firstArg == "--single-from-server" { } else if firstArg == "--single" || firstArg == "--single-from-server" {
base.ProcessType = base.ProcessType_WaveShellSingle base.ProcessType = base.ProcessType_WaveShellSingle
wlog.GlobalSubsystem = base.ProcessType_WaveShellSingle
base.InitDebugLog("single") base.InitDebugLog("single")
handleSingle() handleSingle()
return return
} else if firstArg == "--server" { } else if firstArg == "--server" {
base.ProcessType = base.ProcessType_WaveShellServer base.ProcessType = base.ProcessType_WaveShellServer
wlog.GlobalSubsystem = base.ProcessType_WaveShellServer
base.InitDebugLog("server") base.InitDebugLog("server")
rtnCode, err := server.RunServer() rtnCode, err := server.RunServer()
if err != nil { if err != nil {

View File

@ -42,8 +42,8 @@ const LogRcFileName = "debug.rcfile"
const ( const (
ProcessType_Unknown = "unknown" ProcessType_Unknown = "unknown"
ProcessType_WaveSrv = "wavesrv" ProcessType_WaveSrv = "wavesrv"
ProcessType_WaveShellSingle = "waveshell-single" ProcessType_WaveShellSingle = "wsh-1"
ProcessType_WaveShellServer = "waveshell-server" ProcessType_WaveShellServer = "wsh-s"
) )
// keys are sessionids (also the key RcFilesDirBaseName) // keys are sessionids (also the key RcFilesDirBaseName)

View File

@ -14,9 +14,9 @@ import (
"os" "os"
"reflect" "reflect"
"sync" "sync"
"time"
"github.com/wavetermdev/waveterm/waveshell/pkg/base" "github.com/wavetermdev/waveterm/waveshell/pkg/base"
"github.com/wavetermdev/waveterm/waveshell/pkg/wlog"
) )
// single : <init, >run, >cmddata, >cmddone, <cmdstart, <>data, <>dataack, <cmddone // single : <init, >run, >cmddata, >cmddone, <cmdstart, <>data, <>dataack, <cmddone
@ -556,11 +556,8 @@ func MakeRawPacket(val string) *RawPacketType {
} }
type LogPacketType struct { type LogPacketType struct {
Type string `json:"type"` Type string `json:"type"`
Ts int64 `json:"ts"` // log timestamp Entry wlog.LogEntry `json:"entry"`
ReqId string `json:"reqid,omitempty"` // if this log line is related to an rpc request
ProcInfo string `json:"procinfo,omitempty"` // server/single
LogLine string `json:"logline"` // the logline data
} }
func (*LogPacketType) GetType() string { func (*LogPacketType) GetType() string {
@ -571,8 +568,8 @@ func (p *LogPacketType) String() string {
return "log" return "log"
} }
func MakeLogPacket() *LogPacketType { func MakeLogPacket(entry wlog.LogEntry) *LogPacketType {
return &LogPacketType{Type: LogPacketStr, Ts: time.Now().UnixMilli()} return &LogPacketType{Type: LogPacketStr, Entry: entry}
} }
type ShellStatePacketType struct { type ShellStatePacketType struct {
@ -975,6 +972,11 @@ type CommandPacketType interface {
GetCK() base.CommandKey GetCK() base.CommandKey
} }
type ModelUpdatePacketType struct {
Type string `json:"type"`
Updates []any `json:"updates"`
}
func AsExtType(pk PacketType) string { func AsExtType(pk PacketType) string {
if rpcPacket, ok := pk.(RpcPacketType); ok { if rpcPacket, ok := pk.(RpcPacketType); ok {
return fmt.Sprintf("%s[%s]", rpcPacket.GetType(), rpcPacket.GetReqId()) return fmt.Sprintf("%s[%s]", rpcPacket.GetType(), rpcPacket.GetReqId())
@ -1103,6 +1105,10 @@ func MakePacketSender(output io.Writer, errHandler func(*PacketSender, PacketTyp
return sender return sender
} }
func (sender *PacketSender) SendLogPacket(entry wlog.LogEntry) {
sender.SendPacket(MakeLogPacket(entry))
}
func (sender *PacketSender) goHandleError(pk PacketType, err error) { func (sender *PacketSender) goHandleError(pk PacketType, err error) {
sender.Lock.Lock() sender.Lock.Lock()
defer sender.Lock.Unlock() defer sender.Lock.Unlock()

View File

@ -10,6 +10,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"github.com/wavetermdev/waveterm/waveshell/pkg/wlog"
) )
type PacketParser struct { type PacketParser struct {
@ -243,6 +245,11 @@ func MakePacketParser(input io.Reader, opts *PacketParserOpts) *PacketParser {
if pk.GetType() == PingPacketStr { if pk.GetType() == PingPacketStr {
continue continue
} }
if pk.GetType() == LogPacketStr {
logPk := pk.(*LogPacketType)
wlog.LogLogEntry(logPk.Entry)
continue
}
if parser.RpcHandler { if parser.RpcHandler {
sent := parser.trySendRpcResponse(pk) sent := parser.trySendRpcResponse(pk)
if sent { if sent {

View File

@ -23,6 +23,7 @@ import (
"github.com/wavetermdev/waveterm/waveshell/pkg/shellapi" "github.com/wavetermdev/waveterm/waveshell/pkg/shellapi"
"github.com/wavetermdev/waveterm/waveshell/pkg/shexec" "github.com/wavetermdev/waveterm/waveshell/pkg/shexec"
"github.com/wavetermdev/waveterm/waveshell/pkg/utilfn" "github.com/wavetermdev/waveterm/waveshell/pkg/utilfn"
"github.com/wavetermdev/waveterm/waveshell/pkg/wlog"
) )
const MaxFileDataPacketSize = 16 * 1024 const MaxFileDataPacketSize = 16 * 1024
@ -741,6 +742,13 @@ func RunServer() (int, error) {
WriteErrorChOnce: &sync.Once{}, WriteErrorChOnce: &sync.Once{},
WriteFileContextMap: make(map[string]*WriteFileContext), WriteFileContextMap: make(map[string]*WriteFileContext),
} }
if debug {
packet.GlobalDebug = true
}
server.MainInput = packet.MakePacketParser(os.Stdin, nil)
server.Sender = packet.MakePacketSender(os.Stdout, server.packetSenderErrorHandler)
defer server.Close()
wlog.LogConsumer = server.Sender.SendLogPacket
go func() { go func() {
for { for {
if server.checkDone() { if server.checkDone() {
@ -750,12 +758,6 @@ func RunServer() (int, error) {
server.cleanWriteFileContexts() server.cleanWriteFileContexts()
} }
}() }()
if debug {
packet.GlobalDebug = true
}
server.MainInput = packet.MakePacketParser(os.Stdin, nil)
server.Sender = packet.MakePacketSender(os.Stdout, server.packetSenderErrorHandler)
defer server.Close()
var err error var err error
initPacket, err := shexec.MakeServerInitPacket() initPacket, err := shexec.MakeServerInitPacket()
if err != nil { if err != nil {

View File

@ -1091,7 +1091,6 @@ func (cmd *ShExecType) DetachedWait(startPacket *packet.CmdStartPacketType) {
cmd.DetachedOutput.SendPacket(donePacket) cmd.DetachedOutput.SendPacket(donePacket)
<-ptyCopyDone <-ptyCopyDone
cmd.Close() cmd.Close()
return
} }
func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) { func RunCommandDetached(pk *packet.RunPacketType, sender *packet.PacketSender) (*ShExecType, *packet.CmdStartPacketType, error) {

View File

@ -0,0 +1,77 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// implements distributed logging for waveshell processes
package wlog
import (
"fmt"
"log"
)
// wlog will send logs back to the controlling wavesrv process
// note that these logs end up on your local machine where the main Wave Terminal process is running
// wlog has no ability to send logs to a cloud service or Command Line Inc servers
// this code is written a bit strange (with globals getting set from other packages)
// because we want no dependencies so any package (including base) can use wlog
// this should match base.ProcessType (set by main)
var GlobalSubsystem string
// if not set, Logf is a no-op. will be set by main to hook up to
// the main packet.PacketSender
var LogConsumer func(LogEntry)
type LogEntry struct {
LogLine string `json:"logline"`
ReqId string `json:"reqid"`
SubSystem string `json:"subsystem"`
}
func LogLogEntry(entry LogEntry) {
if LogConsumer == nil {
return
}
LogConsumer(entry)
}
// log with a request id (if related to an rpc request)
func LogfRpc(reqId string, format string, args ...interface{}) {
if LogConsumer == nil {
return
}
logEntry := LogEntry{
LogLine: fmt.Sprintf(format, args...),
ReqId: reqId,
SubSystem: GlobalSubsystem,
}
LogConsumer(logEntry)
}
func LogfSS(subsystem string, format string, args ...interface{}) {
if LogConsumer == nil {
return
}
logEntry := LogEntry{
LogLine: fmt.Sprintf(format, args...),
ReqId: "",
SubSystem: subsystem,
}
LogConsumer(logEntry)
}
func Logf(format string, args ...interface{}) {
LogfSS(GlobalSubsystem, format, args...)
}
func LogWithLogger(entry LogEntry) {
if entry.SubSystem == "" {
entry.SubSystem = "unknown"
}
if entry.ReqId != "" {
log.Printf("[%s] reqid=%s %s", entry.SubSystem, entry.ReqId, entry.LogLine)
} else {
log.Printf("[%s] %s", entry.SubSystem, entry.LogLine)
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/wavetermdev/waveterm/waveshell/pkg/base" "github.com/wavetermdev/waveterm/waveshell/pkg/base"
"github.com/wavetermdev/waveterm/waveshell/pkg/packet" "github.com/wavetermdev/waveterm/waveshell/pkg/packet"
"github.com/wavetermdev/waveterm/waveshell/pkg/server" "github.com/wavetermdev/waveterm/waveshell/pkg/server"
"github.com/wavetermdev/waveterm/waveshell/pkg/wlog"
"github.com/wavetermdev/waveterm/wavesrv/pkg/cmdrunner" "github.com/wavetermdev/waveterm/wavesrv/pkg/cmdrunner"
"github.com/wavetermdev/waveterm/wavesrv/pkg/pcloud" "github.com/wavetermdev/waveterm/wavesrv/pkg/pcloud"
"github.com/wavetermdev/waveterm/wavesrv/pkg/releasechecker" "github.com/wavetermdev/waveterm/wavesrv/pkg/releasechecker"
@ -804,6 +805,8 @@ func doShutdown(reason string) {
func main() { func main() {
scbase.BuildTime = BuildTime scbase.BuildTime = BuildTime
base.ProcessType = base.ProcessType_WaveSrv base.ProcessType = base.ProcessType_WaveSrv
wlog.GlobalSubsystem = base.ProcessType_WaveSrv
wlog.LogConsumer = wlog.LogWithLogger
if len(os.Args) >= 2 && os.Args[1] == "--test" { if len(os.Args) >= 2 && os.Args[1] == "--test" {
log.Printf("running test fn\n") log.Printf("running test fn\n")

View File

@ -556,6 +556,9 @@ func (msh *MShellProc) GetShellPref() string {
if msh.Remote.ShellPref == sstore.ShellTypePref_Detect { if msh.Remote.ShellPref == sstore.ShellTypePref_Detect {
return msh.InitPkShellType return msh.InitPkShellType
} }
if msh.Remote.ShellPref == "" {
return packet.ShellType_bash
}
return msh.Remote.ShellPref return msh.Remote.ShellPref
} }

View File

@ -2868,6 +2868,9 @@ func GetRemoteActiveShells(ctx context.Context, remoteId string) ([]string, erro
riArr := dbutil.SelectMapsGen[*RemoteInstance](tx, query, remoteId) riArr := dbutil.SelectMapsGen[*RemoteInstance](tx, query, remoteId)
shellTypeMap := make(map[string]bool) shellTypeMap := make(map[string]bool)
for _, ri := range riArr { for _, ri := range riArr {
if ri.ShellType == "" {
continue
}
shellTypeMap[ri.ShellType] = true shellTypeMap[ri.ShellType] = true
} }
return utilfn.GetMapKeys(shellTypeMap), nil return utilfn.GetMapKeys(shellTypeMap), nil