waveterm/pkg/cmdqueue/cmdqueue.go

269 lines
8.3 KiB
Go
Raw Normal View History

// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package cmdqueue
import (
"context"
"encoding/base64"
"fmt"
"log"
"runtime/debug"
"strings"
"time"
"github.com/wavetermdev/thenextwave/pkg/blockcontroller"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/waveobj"
"github.com/wavetermdev/thenextwave/pkg/wshutil"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
const DefaultTimeout = 2 * time.Second
const CmdQueueSize = 100
func RunCmd(ctx context.Context, cmd wshutil.BlockCommand, cmdCtx wshutil.CmdContextType) (rtnData wshutil.ResponseDataType, rtnErr error) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC: %v\n", r)
debug.PrintStack()
rtnData = nil
rtnErr = fmt.Errorf("panic: %v", r)
return
}
}()
blockId := cmdCtx.BlockId
bcCmd, ok := cmd.(wshutil.BlockControllerCommand)
if ok && bcCmd.GetBlockId() != "" {
blockId = bcCmd.GetBlockId()
}
if strings.HasPrefix(cmd.GetCommand(), "controller:") {
// send to block controller
bc := blockcontroller.GetBlockController(blockId)
if bc == nil {
return nil, fmt.Errorf("block controller not found for block %q", blockId)
}
bc.InputCh <- cmd
return nil, nil
}
switch typedCmd := cmd.(type) {
case *wshutil.BlockGetMetaCommand:
return handleGetMeta(ctx, typedCmd)
case *wshutil.ResolveIdsCommand:
return handleResolveIds(ctx, typedCmd)
case *wshutil.BlockSetMetaCommand:
return handleSetMeta(ctx, typedCmd, cmdCtx)
case *wshutil.BlockSetViewCommand:
return handleSetView(ctx, typedCmd, cmdCtx)
case *wshutil.BlockMessageCommand:
log.Printf("MESSAGE: %s | %q\n", blockId, typedCmd.Message)
return nil, nil
case *wshutil.BlockAppendFileCommand:
log.Printf("APPENDFILE: %s | %q | len:%d\n", blockId, typedCmd.FileName, len(typedCmd.Data))
err := handleAppendBlockFile(blockId, typedCmd.FileName, typedCmd.Data)
if err != nil {
return nil, fmt.Errorf("error appending blockfile: %w", err)
}
return nil, nil
case *wshutil.BlockAppendIJsonCommand:
log.Printf("APPENDIJSON: %s | %q\n", blockId, typedCmd.FileName)
err := handleAppendIJsonFile(blockId, typedCmd.FileName, typedCmd.Data, true)
if err != nil {
return nil, fmt.Errorf("error appending blockfile(ijson): %w", err)
}
return nil, nil
case *wshutil.CreateBlockCommand:
return handleCreateBlock(ctx, typedCmd, cmdCtx)
default:
return nil, fmt.Errorf("unknown command: %q", cmd.GetCommand())
}
}
func handleSetView(ctx context.Context, cmd *wshutil.BlockSetViewCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
log.Printf("SETVIEW: %s | %q\n", cmdCtx.BlockId, cmd.View)
block, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
}
block.View = cmd.View
err = wstore.DBUpdate(ctx, block)
if err != nil {
return nil, fmt.Errorf("error updating block: %w", err)
}
// send a waveobj:update event
updatedBlock, err := wstore.DBGet[*wstore.Block](ctx, cmdCtx.BlockId)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: "waveobj:update",
ORef: waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId).String(),
Data: wstore.WaveObjUpdate{
UpdateType: wstore.UpdateType_Update,
OType: wstore.OType_Block,
OID: cmdCtx.BlockId,
Obj: updatedBlock,
},
})
return nil, nil
}
func handleGetMeta(ctx context.Context, cmd *wshutil.BlockGetMetaCommand) (map[string]any, error) {
oref, err := waveobj.ParseORef(cmd.ORef)
if err != nil {
return nil, fmt.Errorf("error parsing oref: %w", err)
}
obj, err := wstore.DBGetORef(ctx, oref)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil, fmt.Errorf("object not found: %s", oref)
}
return waveobj.GetMeta(obj), nil
}
func resolveSimpleId(ctx context.Context, simpleId string) (*waveobj.ORef, error) {
if strings.Contains(simpleId, ":") {
rtn, err := waveobj.ParseORef(simpleId)
if err != nil {
return nil, fmt.Errorf("error parsing simple id: %w", err)
}
return &rtn, nil
}
return wstore.DBResolveEasyOID(ctx, simpleId)
}
func handleResolveIds(ctx context.Context, cmd *wshutil.ResolveIdsCommand) (map[string]any, error) {
rtn := make(map[string]any)
for _, simpleId := range cmd.Ids {
oref, err := resolveSimpleId(ctx, simpleId)
if err != nil || oref == nil {
continue
}
rtn[simpleId] = oref.String()
}
return rtn, nil
}
func handleSetMeta(ctx context.Context, cmd *wshutil.BlockSetMetaCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
var oref *waveobj.ORef
if cmd.ORef != "" {
orefVal, err := waveobj.ParseORef(cmd.ORef)
if err != nil {
return nil, fmt.Errorf("error parsing oref: %w", err)
}
oref = &orefVal
} else {
orefVal := waveobj.MakeORef(wstore.OType_Block, cmdCtx.BlockId)
oref = &orefVal
}
log.Printf("SETMETA: %s | %v\n", oref, cmd.Meta)
obj, err := wstore.DBGetORef(ctx, *oref)
if err != nil {
return nil, fmt.Errorf("error getting object: %w", err)
}
if obj == nil {
return nil, nil
}
meta := waveobj.GetMeta(obj)
if meta == nil {
meta = make(map[string]any)
}
for k, v := range cmd.Meta {
if v == nil {
delete(meta, k)
continue
}
meta[k] = v
}
waveobj.SetMeta(obj, meta)
err = wstore.DBUpdate(ctx, obj)
if err != nil {
return nil, fmt.Errorf("error updating block: %w", err)
}
// send a waveobj:update event
updatedBlock, err := wstore.DBGetORef(ctx, *oref)
if err != nil {
return nil, fmt.Errorf("error getting object (2): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: "waveobj:update",
ORef: oref.String(),
Data: wstore.WaveObjUpdate{
UpdateType: wstore.UpdateType_Update,
OType: updatedBlock.GetOType(),
OID: waveobj.GetOID(updatedBlock),
Obj: updatedBlock,
},
})
return nil, nil
}
func handleAppendBlockFile(blockId string, blockFile string, data []byte) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := filestore.WFS.AppendData(ctx, blockId, blockFile, data)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: "blockfile",
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString(data),
},
})
return nil
}
func handleAppendIJsonFile(blockId string, blockFile string, cmd map[string]any, tryCreate bool) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
if blockFile == blockcontroller.BlockFile_Html && tryCreate {
err := filestore.WFS.MakeFile(ctx, blockId, blockFile, nil, filestore.FileOptsType{MaxSize: blockcontroller.DefaultHtmlMaxFileSize, IJson: true})
if err != nil && err != filestore.ErrAlreadyExists {
return fmt.Errorf("error creating blockfile[html]: %w", err)
}
}
err := filestore.WFS.AppendIJson(ctx, blockId, blockFile, cmd)
if err != nil {
return fmt.Errorf("error appending to blockfile(ijson): %w", err)
}
eventbus.SendEvent(eventbus.WSEventType{
EventType: "blockfile",
ORef: waveobj.MakeORef(wstore.OType_Block, blockId).String(),
Data: &eventbus.WSFileEventData{
ZoneId: blockId,
FileName: blockFile,
FileOp: eventbus.FileOp_Append,
Data64: base64.StdEncoding.EncodeToString([]byte("{}")),
},
})
return nil
}
func handleCreateBlock(ctx context.Context, cmd *wshutil.CreateBlockCommand, cmdCtx wshutil.CmdContextType) (map[string]any, error) {
tabId := cmdCtx.TabId
if cmd.TabId != "" {
tabId = cmd.TabId
}
log.Printf("handleCreateBlock %s %v\n", tabId, cmd.BlockDef)
blockData, err := wstore.CreateBlock(ctx, tabId, cmd.BlockDef, cmd.RtOpts)
log.Printf("blockData: %v err:%v\n", blockData, err)
if err != nil {
return nil, fmt.Errorf("error creating block: %w", err)
}
if blockData.Controller != "" {
err = blockcontroller.StartBlockController(ctx, cmd.TabId, blockData.OID, RunCmd)
if err != nil {
return nil, fmt.Errorf("error starting block controller: %w", err)
}
}
return map[string]any{"blockId": blockData.OID}, nil
}