waveterm/pkg/blockcontroller/blockcontroller.go
Evan Simkowitz 936d4bfb30
Migrate websocket eventbus messages to wps (#367)
This migrates all remaining eventbus events sent over the websocket to
use the wps interface. WPS is more flexible for registering events and
callbacks and provides support for more reliable unsubscribes and
resubscribes.
2024-09-11 18:03:55 -07:00

636 lines
18 KiB
Go

// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockcontroller
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"io/fs"
"log"
"sync"
"time"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/remote"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
"github.com/wavetermdev/waveterm/pkg/shellexec"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshutil"
"github.com/wavetermdev/waveterm/pkg/wstore"
)
const (
BlockController_Shell = "shell"
BlockController_Cmd = "cmd"
)
const (
BlockFile_Term = "term" // used for main pty output
BlockFile_Html = "html" // used for alt html layout
)
const (
Status_Running = "running"
Status_Done = "done"
)
const (
DefaultTermMaxFileSize = 256 * 1024
DefaultHtmlMaxFileSize = 256 * 1024
)
const DefaultTimeout = 2 * time.Second
var globalLock = &sync.Mutex{}
var blockControllerMap = make(map[string]*BlockController)
type BlockInputUnion struct {
InputData []byte `json:"inputdata,omitempty"`
SigName string `json:"signame,omitempty"`
TermSize *waveobj.TermSize `json:"termsize,omitempty"`
}
type BlockController struct {
Lock *sync.Mutex
ControllerType string
TabId string
BlockId string
BlockDef *waveobj.BlockDef
CreatedHtmlFile bool
ShellProc *shellexec.ShellProc
ShellInputCh chan *BlockInputUnion
ShellProcStatus string
}
type BlockControllerRuntimeStatus struct {
BlockId string `json:"blockid"`
ShellProcStatus string `json:"shellprocstatus,omitempty"`
ShellProcConnName string `json:"shellprocconnname,omitempty"`
}
func (bc *BlockController) WithLock(f func()) {
bc.Lock.Lock()
defer bc.Lock.Unlock()
f()
}
func (bc *BlockController) GetRuntimeStatus() *BlockControllerRuntimeStatus {
var rtn BlockControllerRuntimeStatus
bc.WithLock(func() {
rtn.BlockId = bc.BlockId
rtn.ShellProcStatus = bc.ShellProcStatus
if bc.ShellProc != nil {
rtn.ShellProcConnName = bc.ShellProc.ConnName
}
})
return &rtn
}
func (bc *BlockController) getShellProc() *shellexec.ShellProc {
bc.Lock.Lock()
defer bc.Lock.Unlock()
return bc.ShellProc
}
type RunShellOpts struct {
TermSize waveobj.TermSize `json:"termsize,omitempty"`
}
func (bc *BlockController) UpdateControllerAndSendUpdate(updateFn func() bool) {
var sendUpdate bool
bc.WithLock(func() {
sendUpdate = updateFn()
})
if sendUpdate {
rtStatus := bc.GetRuntimeStatus()
log.Printf("sending blockcontroller update %#v\n", rtStatus)
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_ControllerStatus,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, bc.TabId).String(),
waveobj.MakeORef(waveobj.OType_Block, bc.BlockId).String(),
},
Data: rtStatus,
})
}
}
func HandleTruncateBlockFile(blockId string, blockFile string) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := filestore.WFS.WriteFile(ctx, blockId, blockFile, nil)
if err == fs.ErrNotExist {
return nil
}
if err != nil {
return fmt.Errorf("error truncating blockfile: %w", err)
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, blockId).String()},
Data: &wps.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: wps.FileOp_Truncate,
},
})
return nil
}
func HandleAppendBlockFile(blockId string, blockFile string, data []byte) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := filestore.WFS.AppendData(ctx, blockId, blockFile, data)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Block, blockId).String(),
},
Data: &wps.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: wps.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(data),
},
})
return nil
}
func (bc *BlockController) resetTerminalState() {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
var shouldTruncate bool
blockData, getBlockDataErr := wstore.DBMustGet[*waveobj.Block](ctx, bc.BlockId)
if getBlockDataErr == nil {
shouldTruncate = blockData.Meta.GetBool(waveobj.MetaKey_CmdClearOnRestart, false)
}
if shouldTruncate {
err := HandleTruncateBlockFile(bc.BlockId, BlockFile_Term)
if err != nil {
log.Printf("error truncating term blockfile: %v\n", err)
}
return
}
// controller type = "shell"
var buf bytes.Buffer
// buf.WriteString("\x1b[?1049l") // disable alternative buffer
buf.WriteString("\x1b[0m") // reset attributes
buf.WriteString("\x1b[?25h") // show cursor
buf.WriteString("\x1b[?1000l") // disable mouse tracking
buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n")
err := filestore.WFS.AppendData(ctx, bc.BlockId, BlockFile_Term, buf.Bytes())
if err != nil {
log.Printf("error appending to blockfile (terminal reset): %v\n", err)
}
}
func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj.MetaMapType) error {
// create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
err := filestore.WFS.MakeFile(ctx, bc.BlockId, BlockFile_Term, nil, filestore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true})
if err != nil && err != fs.ErrExist {
err = fs.ErrExist
return fmt.Errorf("error creating blockfile: %w", err)
}
if err == fs.ErrExist {
// reset the terminal state
bc.resetTerminalState()
}
err = nil
bcInitStatus := bc.GetRuntimeStatus()
if bcInitStatus.ShellProcStatus == Status_Running {
return nil
}
// TODO better sync here (don't let two starts happen at the same times)
remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "")
var cmdStr string
cmdOpts := shellexec.CommandOptsType{
Env: make(map[string]string),
}
if bc.ControllerType == BlockController_Shell {
cmdOpts.Interactive = true
cmdOpts.Login = true
cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "")
if cmdOpts.Cwd != "" {
cmdOpts.Cwd = wavebase.ExpandHomeDir(cmdOpts.Cwd)
}
} else if bc.ControllerType == BlockController_Cmd {
cmdStr = blockMeta.GetString(waveobj.MetaKey_Cmd, "")
if cmdStr == "" {
return fmt.Errorf("missing cmd in block meta")
}
cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "")
if cmdOpts.Cwd != "" {
cmdOpts.Cwd = wavebase.ExpandHomeDir(cmdOpts.Cwd)
}
cmdOpts.Interactive = blockMeta.GetBool(waveobj.MetaKey_CmdInteractive, false)
cmdOpts.Login = blockMeta.GetBool(waveobj.MetaKey_CmdLogin, false)
cmdEnv := blockMeta.GetMap(waveobj.MetaKey_CmdEnv)
for k, v := range cmdEnv {
if v == nil {
continue
}
if _, ok := v.(string); ok {
cmdOpts.Env[k] = v.(string)
}
if _, ok := v.(float64); ok {
cmdOpts.Env[k] = fmt.Sprintf("%v", v)
}
}
} else {
return fmt.Errorf("unknown controller type %q", bc.ControllerType)
}
var shellProc *shellexec.ShellProc
if remoteName != "" {
credentialCtx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Second)
defer cancelFunc()
opts, err := remote.ParseOpts(remoteName)
if err != nil {
return err
}
conn := conncontroller.GetConn(credentialCtx, opts, false)
connStatus := conn.DeriveConnStatus()
if connStatus.Status != conncontroller.Status_Connected {
return fmt.Errorf("not connected, cannot start shellproc")
}
if !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) {
jwtStr, err := wshutil.MakeClientJWTToken(wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()}, conn.GetDomainSocketName())
if err != nil {
return fmt.Errorf("error making jwt token: %w", err)
}
cmdOpts.Env[wshutil.WaveJwtTokenVarName] = jwtStr
}
shellProc, err = shellexec.StartRemoteShellProc(rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
return err
}
} else {
if !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false) {
jwtStr, err := wshutil.MakeClientJWTToken(wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId}, wavebase.GetDomainSocketName())
if err != nil {
return fmt.Errorf("error making jwt token: %w", err)
}
cmdOpts.Env[wshutil.WaveJwtTokenVarName] = jwtStr
}
shellProc, err = shellexec.StartShellProc(rc.TermSize, cmdStr, cmdOpts)
if err != nil {
return err
}
}
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProc = shellProc
bc.ShellProcStatus = Status_Running
return true
})
shellInputCh := make(chan *BlockInputUnion, 32)
bc.ShellInputCh = shellInputCh
// make esc sequence wshclient wshProxy
// we don't need to authenticate this wshProxy since it is coming direct
wshProxy := wshutil.MakeRpcProxy()
wshProxy.SetRpcContext(&wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId})
wshutil.DefaultRouter.RegisterRoute(wshutil.MakeControllerRouteId(bc.BlockId), wshProxy)
ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, bc.ShellProc.Cmd, wshProxy.FromRemoteCh)
go func() {
// handles regular output from the pty (goes to the blockfile and xterm)
defer func() {
log.Printf("[shellproc] pty-read loop done\n")
bc.ShellProc.Close()
bc.WithLock(func() {
// so no other events are sent
bc.ShellInputCh = nil
})
// to stop the inputCh loop
time.Sleep(100 * time.Millisecond)
close(shellInputCh) // don't use bc.ShellInputCh (it's nil)
}()
buf := make([]byte, 4096)
for {
nr, err := ptyBuffer.Read(buf)
if nr > 0 {
err := HandleAppendBlockFile(bc.BlockId, BlockFile_Term, buf[:nr])
if err != nil {
log.Printf("error appending to blockfile: %v\n", err)
}
}
if err == io.EOF {
break
}
if err != nil {
log.Printf("error reading from shell: %v\n", err)
break
}
}
}()
go func() {
defer func() {
log.Printf("[shellproc] shellInputCh loop done\n")
}()
// handles input from the shellInputCh, sent to pty
// use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated)
for ic := range shellInputCh {
if len(ic.InputData) > 0 {
bc.ShellProc.Cmd.Write(ic.InputData)
}
if ic.TermSize != nil {
log.Printf("SETTERMSIZE: %dx%d\n", ic.TermSize.Rows, ic.TermSize.Cols)
err = setTermSize(ctx, bc.BlockId, *ic.TermSize)
if err != nil {
log.Printf("error setting pty size: %v\n", err)
}
err = bc.ShellProc.Cmd.SetSize(ic.TermSize.Rows, ic.TermSize.Cols)
if err != nil {
log.Printf("error setting pty size: %v\n", err)
}
}
}
}()
go func() {
// handles outputCh -> shellInputCh
for msg := range wshProxy.ToRemoteCh {
encodedMsg := wshutil.EncodeWaveOSCBytes(wshutil.WaveServerOSC, msg)
shellInputCh <- &BlockInputUnion{InputData: encodedMsg}
}
}()
go func() {
// wait for the shell to finish
defer func() {
wshutil.DefaultRouter.UnregisterRoute(wshutil.MakeControllerRouteId(bc.BlockId))
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProcStatus = Status_Done
return true
})
log.Printf("[shellproc] shell process wait loop done\n")
}()
waitErr := shellProc.Cmd.Wait()
exitCode := shellexec.ExitCodeFromWaitErr(waitErr)
termMsg := fmt.Sprintf("\r\nprocess finished with exit code = %d\r\n\r\n", exitCode)
//HandleAppendBlockFile(bc.BlockId, BlockFile_Term, []byte("\r\n"))
HandleAppendBlockFile(bc.BlockId, BlockFile_Term, []byte(termMsg))
shellProc.SetWaitErrorAndSignalDone(waitErr)
}()
return nil
}
func getBoolFromMeta(meta map[string]any, key string, def bool) bool {
ival, found := meta[key]
if !found || ival == nil {
return def
}
if val, ok := ival.(bool); ok {
return val
}
return def
}
func getTermSize(bdata *waveobj.Block) waveobj.TermSize {
if bdata.RuntimeOpts != nil {
return bdata.RuntimeOpts.TermSize
} else {
return waveobj.TermSize{
Rows: 25,
Cols: 80,
}
}
}
func setTermSize(ctx context.Context, blockId string, termSize waveobj.TermSize) error {
ctx = waveobj.ContextWithUpdates(ctx)
bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), blockId)
if err != nil {
return fmt.Errorf("error getting block data: %v", err)
}
if bdata.RuntimeOpts == nil {
return fmt.Errorf("error from nil RuntimeOpts: %v", err)
}
bdata.RuntimeOpts.TermSize = termSize
updates := waveobj.ContextGetUpdatesRtn(ctx)
wps.Broker.SendUpdateEvents(updates)
return nil
}
func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts) {
controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
log.Printf("unknown controller %q\n", controllerName)
return
}
if getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdClearOnStart, false) {
err := HandleTruncateBlockFile(bc.BlockId, BlockFile_Term)
if err != nil {
log.Printf("error truncating term blockfile: %v\n", err)
}
}
runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true)
if runOnStart {
go func() {
var termSize waveobj.TermSize
if rtOpts != nil {
termSize = rtOpts.TermSize
} else {
termSize = getTermSize(bdata)
}
err := bc.DoRunShellCommand(&RunShellOpts{TermSize: termSize}, bdata.Meta)
if err != nil {
log.Printf("error running shell: %v\n", err)
}
}()
}
}
func (bc *BlockController) SendInput(inputUnion *BlockInputUnion) error {
var shellInputCh chan *BlockInputUnion
bc.WithLock(func() {
shellInputCh = bc.ShellInputCh
})
if shellInputCh == nil {
return fmt.Errorf("no shell input chan")
}
shellInputCh <- inputUnion
return nil
}
func CheckConnStatus(blockId string) error {
bdata, err := wstore.DBMustGet[*waveobj.Block](context.Background(), blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
connName := bdata.Meta.GetString(waveobj.MetaKey_Connection, "")
if connName == "" {
return nil
}
opts, err := remote.ParseOpts(connName)
if err != nil {
return fmt.Errorf("error parsing connection name: %w", err)
}
conn := conncontroller.GetConn(context.Background(), opts, false)
connStatus := conn.DeriveConnStatus()
if connStatus.Status != conncontroller.Status_Connected {
return fmt.Errorf("not connected: %s", connStatus.Status)
}
return nil
}
func (bc *BlockController) StopShellProc(shouldWait bool) {
bc.Lock.Lock()
defer bc.Lock.Unlock()
if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done {
return
}
bc.ShellProc.Close()
if shouldWait {
doneCh := bc.ShellProc.DoneCh
<-doneCh
}
}
func getOrCreateBlockController(tabId string, blockId string, controllerName string) *BlockController {
var createdController bool
var bc *BlockController
defer func() {
if !createdController || bc == nil {
return
}
bc.UpdateControllerAndSendUpdate(func() bool {
return true
})
}()
globalLock.Lock()
defer globalLock.Unlock()
bc = blockControllerMap[blockId]
if bc == nil {
bc = &BlockController{
Lock: &sync.Mutex{},
ControllerType: controllerName,
TabId: tabId,
BlockId: blockId,
ShellProcStatus: Status_Done,
}
blockControllerMap[blockId] = bc
createdController = true
}
return bc
}
func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error {
if tabId == "" || blockId == "" {
return fmt.Errorf("invalid tabId or blockId passed to ResyncController")
}
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "")
curBc := GetBlockController(blockId)
if controllerName == "" {
if curBc != nil {
StopBlockController(blockId)
}
return nil
}
// check if conn is different, if so, stop the current controller
if curBc != nil {
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName {
StopBlockController(blockId)
}
}
// now if there is a conn, ensure it is connected
if connName != "" {
err = CheckConnStatus(blockId)
if err != nil {
return fmt.Errorf("cannot start shellproc: %w", err)
}
}
if curBc == nil {
return startBlockController(ctx, tabId, blockId, rtOpts)
}
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus != Status_Running {
return startBlockController(ctx, tabId, blockId, rtOpts)
}
return nil
}
func startBlockController(ctx context.Context, tabId string, blockId string, rtOpts *waveobj.RuntimeOpts) error {
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
controllerName := blockData.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName == "" {
// nothing to start
return nil
}
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
return fmt.Errorf("unknown controller %q", controllerName)
}
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
log.Printf("start blockcontroller %s %q (%q)\n", blockId, controllerName, connName)
err = CheckConnStatus(blockId)
if err != nil {
return fmt.Errorf("cannot start shellproc: %w", err)
}
bc := getOrCreateBlockController(tabId, blockId, controllerName)
bcStatus := bc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Done {
go bc.run(blockData, blockData.Meta, rtOpts)
}
return nil
}
func StopBlockController(blockId string) {
bc := GetBlockController(blockId)
if bc == nil {
return
}
if bc.getShellProc() != nil {
bc.ShellProc.Close()
<-bc.ShellProc.DoneCh
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProcStatus = Status_Done
return true
})
}
}
func getControllerList() []*BlockController {
globalLock.Lock()
defer globalLock.Unlock()
var rtn []*BlockController
for _, bc := range blockControllerMap {
rtn = append(rtn, bc)
}
return rtn
}
func StopAllBlockControllers() {
clist := getControllerList()
for _, bc := range clist {
if bc.ShellProcStatus == Status_Running {
go StopBlockController(bc.BlockId)
}
}
}
func GetBlockController(blockId string) *BlockController {
globalLock.Lock()
defer globalLock.Unlock()
return blockControllerMap[blockId]
}