mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-22 21:42:49 +01:00
269 lines
8.3 KiB
Go
269 lines
8.3 KiB
Go
|
// Copyright 2024, Command Line Inc.
|
||
|
// SPDX-License-Identifier: Apache-2.0
|
||
|
|
||
|
package cmdqueue
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/base64"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"runtime/debug"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
|
||
|
"github.com/wavetermdev/thenextwave/pkg/eventbus"
|
||
|
"github.com/wavetermdev/thenextwave/pkg/filestore"
|
||
|
"github.com/wavetermdev/thenextwave/pkg/waveobj"
|
||
|
"github.com/wavetermdev/thenextwave/pkg/wshutil"
|
||
|
"github.com/wavetermdev/thenextwave/pkg/wstore"
|
||
|
)
|
||
|
|
||
|
const DefaultTimeout = 2 * time.Second
|
||
|
const CmdQueueSize = 100
|
||
|
|
||
|
func RunCmd(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (rtnData wshutil.ResponseDataType, rtnErr error) {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
log.Printf("PANIC: %v\n", r)
|
||
|
debug.PrintStack()
|
||
|
rtnData = nil
|
||
|
rtnErr = fmt.Errorf("panic: %v", r)
|
||
|
return
|
||
|
}
|
||
|
}()
|
||
|
blockId := cmdCtx.BlockId
|
||
|
bcCmd, ok := cmd.(wshutil.BlockControllerCommand)
|
||
|
if ok && bcCmd.GetBlockId() != "" {
|
||
|
blockId = bcCmd.GetBlockId()
|
||
|
}
|
||
|
if strings.HasPrefix(cmd.GetCommand(), "controller:") {
|
||
|
// send to block controller
|
||
|
bc := blockcontroller.GetBlockController(blockId)
|
||
|
if bc == nil {
|
||
|
return nil, fmt.Errorf("block controller not found for block %q", blockId)
|
||
|
}
|
||
|
bc.InputCh <- cmd
|
||
|
return nil, nil
|
||
|
}
|
||
|
switch typedCmd := cmd.(type) {
|
||
|
case *wshutil.BlockGetMetaCommand:
|
||
|
return handleGetMeta(ctx, typedCmd)
|
||
|
case *wshutil.ResolveIdsCommand:
|
||
|
return handleResolveIds(ctx, typedCmd)
|
||
|
case *wshutil.BlockSetMetaCommand:
|
||
|
return handleSetMeta(ctx, typedCmd, cmdCtx)
|
||
|
case *wshutil.BlockSetViewCommand:
|
||
|
return handleSetView(ctx, typedCmd, cmdCtx)
|
||
|
case *wshutil.BlockMessageCommand:
|
||
|
log.Printf("MESSAGE: %s | %q\n", blockId, typedCmd.Message)
|
||
|
return nil, nil
|
||
|
case *wshutil.BlockAppendFileCommand:
|
||
|
log.Printf("APPENDFILE: %s | %q | len:%d\n", blockId, typedCmd.FileName, len(typedCmd.Data))
|
||
|
err := handleAppendBlockFile(blockId, typedCmd.FileName, typedCmd.Data)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error appending blockfile: %w", err)
|
||
|
}
|
||
|
return nil, nil
|
||
|
case *wshutil.BlockAppendIJsonCommand:
|
||
|
log.Printf("APPENDIJSON: %s | %q\n", blockId, typedCmd.FileName)
|
||
|
err := handleAppendIJsonFile(blockId, typedCmd.FileName, typedCmd.Data, true)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error appending blockfile(ijson): %w", err)
|
||
|
}
|
||
|
return nil, nil
|
||
|
case *wshutil.CreateBlockCommand:
|
||
|
return handleCreateBlock(ctx, typedCmd, cmdCtx)
|
||
|
default:
|
||
|
return nil, fmt.Errorf("unknown command: %q", cmd.GetCommand())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func handleSetView(ctx context.Context, cmd *wshutil.BlockSetViewCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
|
||
|
log.Printf("SETVIEW: %s | %q\n", cmdCtx.BlockId, cmd.View)
|
||
|
block, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error getting block: %w", err)
|
||
|
}
|
||
|
block.View = cmd.View
|
||
|
err = wstore.DBUpdate(ctx, block)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error updating block: %w", err)
|
||
|
}
|
||
|
// send a waveobj:update event
|
||
|
updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error getting block: %w", err)
|
||
|
}
|
||
|
eventbus.SendEvent(eventbus.WSEventType{
|
||
|
EventType: "waveobj:update",
|
||
|
ORef: waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId).String(),
|
||
|
Data: wstore.WaveObjUpdate{
|
||
|
UpdateType: wstore.UpdateType_Update,
|
||
|
OType: wstore.OType_Block,
|
||
|
OID: cmdCtx.BlockId,
|
||
|
Obj: updatedBlock,
|
||
|
},
|
||
|
})
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
func handleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) {
|
||
|
oref, err := waveobj.ParseORef(cmd.ORef)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error parsing oref: %w", err)
|
||
|
}
|
||
|
obj, err := wstore.DBGetORef(ctx, oref)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error getting object: %w", err)
|
||
|
}
|
||
|
if obj == nil {
|
||
|
return nil, fmt.Errorf("object not found: %s", oref)
|
||
|
}
|
||
|
return waveobj.GetMeta(obj), nil
|
||
|
}
|
||
|
|
||
|
func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) {
|
||
|
if strings.Contains(simpleId, ":") {
|
||
|
rtn, err := waveobj.ParseORef(simpleId)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error parsing simple id: %w", err)
|
||
|
}
|
||
|
return &rtn, nil
|
||
|
}
|
||
|
return wstore.DBResolveEasyOID(ctx, simpleId)
|
||
|
}
|
||
|
|
||
|
func handleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) {
|
||
|
rtn := make(map[string]any)
|
||
|
for _, simpleId := range cmd.Ids {
|
||
|
oref, err := resolveSimpleId(ctx, simpleId)
|
||
|
if err != nil || oref == nil {
|
||
|
continue
|
||
|
}
|
||
|
rtn[simpleId] = oref.String()
|
||
|
}
|
||
|
return rtn, nil
|
||
|
}
|
||
|
|
||
|
func handleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
|
||
|
var oref *waveobj.ORef
|
||
|
if cmd.ORef != "" {
|
||
|
orefVal, err := waveobj.ParseORef(cmd.ORef)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error parsing oref: %w", err)
|
||
|
}
|
||
|
oref = &orefVal
|
||
|
} else {
|
||
|
orefVal := waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId)
|
||
|
oref = &orefVal
|
||
|
}
|
||
|
log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta)
|
||
|
obj, err := wstore.DBGetORef(ctx, *oref)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error getting object: %w", err)
|
||
|
}
|
||
|
if obj == nil {
|
||
|
return nil, nil
|
||
|
}
|
||
|
meta := waveobj.GetMeta(obj)
|
||
|
if meta == nil {
|
||
|
meta = make(map[string]any)
|
||
|
}
|
||
|
for k, v := range cmd.Meta {
|
||
|
if v == nil {
|
||
|
delete(meta, k)
|
||
|
continue
|
||
|
}
|
||
|
meta[k] = v
|
||
|
}
|
||
|
waveobj.SetMeta(obj, meta)
|
||
|
err = wstore.DBUpdate(ctx, obj)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error updating block: %w", err)
|
||
|
}
|
||
|
// send a waveobj:update event
|
||
|
updatedBlock, err := wstore.DBGetORef(ctx, *oref)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error getting object (2): %w", err)
|
||
|
}
|
||
|
eventbus.SendEvent(eventbus.WSEventType{
|
||
|
EventType: "waveobj:update",
|
||
|
ORef: oref.String(),
|
||
|
Data: wstore.WaveObjUpdate{
|
||
|
UpdateType: wstore.UpdateType_Update,
|
||
|
OType: updatedBlock.GetOType(),
|
||
|
OID: waveobj.GetOID(updatedBlock),
|
||
|
Obj: updatedBlock,
|
||
|
},
|
||
|
})
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
func handleAppendBlockFile(blockId string, blockFile string, data []byte) error {
|
||
|
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
|
||
|
defer cancelFn()
|
||
|
err := filestore.WFS.AppendData(ctx, blockId, blockFile, data)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error appending to blockfile: %w", err)
|
||
|
}
|
||
|
eventbus.SendEvent(eventbus.WSEventType{
|
||
|
EventType: "blockfile",
|
||
|
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
|
||
|
Data: &eventbus.WSFileEventData{
|
||
|
ZoneId: blockId,
|
||
|
FileName: blockFile,
|
||
|
FileOp: eventbus.FileOp_Append,
|
||
|
Data64: base64.StdEncoding.EncodeToString(data),
|
||
|
},
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func handleAppendIJsonFile(blockId string, blockFile string, cmd map[string]any, tryCreate bool) error {
|
||
|
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
|
||
|
defer cancelFn()
|
||
|
if blockFile == blockcontroller.BlockFile_Html && tryCreate {
|
||
|
err := filestore.WFS.MakeFile(ctx, blockId, blockFile, nil, filestore.FileOptsType{MaxSize: blockcontroller.DefaultHtmlMaxFileSize, IJson: true})
|
||
|
if err != nil && err != filestore.ErrAlreadyExists {
|
||
|
return fmt.Errorf("error creating blockfile[html]: %w", err)
|
||
|
}
|
||
|
}
|
||
|
err := filestore.WFS.AppendIJson(ctx, blockId, blockFile, cmd)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error appending to blockfile(ijson): %w", err)
|
||
|
}
|
||
|
eventbus.SendEvent(eventbus.WSEventType{
|
||
|
EventType: "blockfile",
|
||
|
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
|
||
|
Data: &eventbus.WSFileEventData{
|
||
|
ZoneId: blockId,
|
||
|
FileName: blockFile,
|
||
|
FileOp: eventbus.FileOp_Append,
|
||
|
Data64: base64.StdEncoding.EncodeToString([]byte("{}")),
|
||
|
},
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func handleCreateBlock(ctx context.Context, cmd *wshutil.CreateBlockCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
|
||
|
tabId := cmdCtx.TabId
|
||
|
if cmd.TabId != "" {
|
||
|
tabId = cmd.TabId
|
||
|
}
|
||
|
log.Printf("handleCreateBlock %s %v\n", tabId, cmd.BlockDef)
|
||
|
blockData, err := wstore.CreateBlock(ctx, tabId, cmd.BlockDef, cmd.RtOpts)
|
||
|
log.Printf("blockData: %v err:%v\n", blockData, err)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error creating block: %w", err)
|
||
|
}
|
||
|
if blockData.Controller != "" {
|
||
|
err = blockcontroller.StartBlockController(ctx, cmd.TabId, blockData.OID, RunCmd)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error starting block controller: %w", err)
|
||
|
}
|
||
|
}
|
||
|
return map[string]any{"blockId": blockData.OID}, nil
|
||
|
}
|