Merge pull request #11 from wavetermdev/sawka/use-blockstore

use blockstore, save/restore terminal state output (tab switching or app restart)
This commit is contained in:
Mike Sawka 2024-05-29 00:30:52 -07:00 committed by GitHub
commit 2472deb379
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 281 additions and 20 deletions

View File

@ -40,14 +40,21 @@ function getThemeFromCSSVars(el: Element): ITheme {
return theme;
}
type InitialLoadDataType = {
loaded: boolean;
heldData: Uint8Array[];
};
const TerminalView = ({ blockId }: { blockId: string }) => {
const connectElemRef = React.useRef<HTMLDivElement>(null);
const termRef = React.useRef<Terminal>(null);
const initialLoadRef = React.useRef<InitialLoadDataType>({ loaded: false, heldData: [] });
React.useEffect(() => {
if (!connectElemRef.current) {
return;
}
console.log("terminal created");
const term = new Terminal({
theme: getThemeFromCSSVars(connectElemRef.current),
fontSize: 12,
@ -89,7 +96,11 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
blockSubject.subscribe((data) => {
// base64 decode
const decodedData = base64ToArray(data.ptydata);
term.write(decodedData);
if (initialLoadRef.current.loaded) {
term.write(decodedData);
} else {
initialLoadRef.current.heldData.push(decodedData);
}
});
return () => {
@ -99,6 +110,39 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
};
}, [connectElemRef.current]);
React.useEffect(() => {
if (!termRef.current) {
return;
}
// load data from blockfile
const startTs = Date.now();
let loadedBytes = 0;
const localTerm = termRef.current; // avoids devmode double effect running issue (terminal gets created twice)
const usp = new URLSearchParams();
usp.set("blockid", blockId);
usp.set("name", "main");
fetch("/wave/blockfile?" + usp.toString())
.then((resp) => {
if (resp.ok) {
return resp.arrayBuffer();
}
console.log("error loading blockfile", resp.status, resp.statusText);
})
.then((data: ArrayBuffer) => {
const uint8View = new Uint8Array(data);
localTerm.write(uint8View);
loadedBytes = uint8View.byteLength;
})
.finally(() => {
initialLoadRef.current.heldData.forEach((data) => {
localTerm.write(data);
});
initialLoadRef.current.loaded = true;
initialLoadRef.current.heldData = [];
console.log(`terminal loaded blockfile ${loadedBytes} bytes, ${Date.now() - startTs}ms`);
});
}, [termRef.current]);
return (
<div className="view-term">
<div key="conntectElem" className="term-connectelem" ref={connectElemRef}></div>

90
main.go
View File

@ -8,13 +8,20 @@ package main
import (
"context"
"embed"
"encoding/base64"
"encoding/json"
"fmt"
"io/fs"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/service/blockservice"
@ -77,19 +84,80 @@ func createWindow(windowData *wstore.Window, app *application.App) {
eventbus.UnregisterWailsWindow(window.ID())
})
window.Show()
go func() {
time.Sleep(100 * time.Millisecond)
objectService := &objectservice.ObjectService{}
uiContext := wstore.UIContext{
WindowId: windowData.OID,
ActiveTabId: windowData.ActiveTabId,
}
_, err := objectService.SetActiveTab(uiContext, windowData.ActiveTabId)
if err != nil {
log.Printf("error setting active tab for new window: %v\n", err)
}
}()
}
type waveAssetHandler struct {
AssetHandler http.Handler
}
func serveBlockFile(w http.ResponseWriter, r *http.Request) {
blockId := r.URL.Query().Get("blockid")
name := r.URL.Query().Get("name")
if _, err := uuid.Parse(blockId); err != nil {
http.Error(w, fmt.Sprintf("invalid blockid: %v", err), http.StatusBadRequest)
return
}
if name == "" {
http.Error(w, "name is required", http.StatusBadRequest)
return
}
file, err := blockstore.GBS.Stat(r.Context(), blockId, name)
if err == fs.ErrNotExist {
http.NotFound(w, r)
return
}
if err != nil {
http.Error(w, fmt.Sprintf("error getting file info: %v", err), http.StatusInternalServerError)
return
}
jsonFileBArr, err := json.Marshal(file)
if err != nil {
http.Error(w, fmt.Sprintf("error serializing file info: %v", err), http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size))
w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr))
w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat))
for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize {
_, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize)
if err != nil {
if offset == 0 {
http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError)
} else {
// nothing to do, the headers have already been sent
log.Printf("error reading file %s/%s @ %d: %v\n", blockId, name, offset, err)
}
return
}
w.Write(data)
}
}
func serveWaveUrls(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache")
if r.URL.Path == "/wave/stream-file" {
fileName := r.URL.Query().Get("path")
fileName = wavebase.ExpandHomeDir(fileName)
http.ServeFile(w, r, fileName)
return
}
if r.URL.Path == "/wave/blockfile" {
serveBlockFile(w, r)
return
}
http.NotFound(w, r)
}
@ -101,6 +169,27 @@ func (wah waveAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wah.AssetHandler.ServeHTTP(w, r)
}
func doShutdown(reason string) {
log.Printf("shutting down: %s\n", reason)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
// TODO deal with flush in progress
blockstore.GBS.FlushCache(ctx)
time.Sleep(200 * time.Millisecond)
os.Exit(0)
}
func installShutdownSignalHandlers() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
go func() {
for sig := range sigCh {
doShutdown(fmt.Sprintf("got signal %v", sig))
break
}
}()
}
func main() {
err := wavebase.EnsureWaveHomeDir()
if err != nil {
@ -129,6 +218,7 @@ func main() {
log.Printf("error ensuring initial data: %v\n", err)
return
}
installShutdownSignalHandlers()
app := application.New(application.Options{
Name: "NextWave",

View File

@ -4,6 +4,7 @@
package blockcontroller
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
@ -15,6 +16,7 @@ import (
"github.com/creack/pty"
"github.com/wailsapp/wails/v3/pkg/application"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/wstore"
@ -86,7 +88,54 @@ func (bc *BlockController) Close() {
}
}
const DefaultTermMaxFileSize = 256 * 1024
func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", data)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
eventbus.SendEvent(application.WailsEvent{
Name: "block:ptydata",
Data: map[string]any{
"blockid": bc.BlockId,
"blockfile": "main",
"ptydata": base64.StdEncoding.EncodeToString(data),
"seqnum": seqNum,
},
})
return nil
}
func (bc *BlockController) resetTerminalState() {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
var buf bytes.Buffer
// buf.WriteString("\x1b[?1049l") // disable alternative buffer
buf.WriteString("\x1b[0m") // reset attributes
buf.WriteString("\x1b[?25h") // show cursor
buf.WriteString("\x1b[?1000l") // disable mouse tracking
buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n")
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", buf.Bytes())
if err != nil {
log.Printf("error appending to blockfile (terminal reset): %v\n", err)
}
}
func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error {
// create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
err := blockstore.GBS.MakeFile(ctx, bc.BlockId, "main", nil, blockstore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true})
if err != nil && err != blockstore.ErrAlreadyExists {
return fmt.Errorf("error creating blockfile: %w", err)
}
if err == blockstore.ErrAlreadyExists {
// reset the terminal state
bc.resetTerminalState()
}
if bc.getShellProc() != nil {
return nil
}
@ -114,15 +163,13 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error {
for {
nr, err := bc.ShellProc.Pty.Read(buf)
seqNum++
eventbus.SendEvent(application.WailsEvent{
Name: "block:ptydata",
Data: map[string]any{
"blockid": bc.BlockId,
"blockfile": "main",
"ptydata": base64.StdEncoding.EncodeToString(buf[:nr]),
"seqnum": seqNum,
},
})
if nr > 0 {
handleDataErr := bc.handleShellProcData(buf[:nr], seqNum)
if handleDataErr != nil {
log.Printf("error handling shell data: %v\n", handleDataErr)
break
}
}
if err == io.EOF {
break
}

View File

@ -11,6 +11,8 @@ import (
"context"
"fmt"
"io/fs"
"log"
"runtime/debug"
"sync"
"sync/atomic"
"time"
@ -33,9 +35,9 @@ var GBS *BlockStore = &BlockStore{
}
type FileOptsType struct {
MaxSize int64
Circular bool
IJson bool
MaxSize int64 `json:"maxsize,omitempty"`
Circular bool `json:"circular,omitempty"`
IJson bool `json:"ijson,omitempty"`
}
type FileMeta = map[string]any
@ -53,6 +55,24 @@ type BlockFile struct {
Meta FileMeta `json:"meta"` // only top-level keys can be updated (lower levels are immutable)
}
// for regular files this is just Size
// for circular files this is min(Size, MaxSize)
func (f BlockFile) DataLength() int64 {
if f.Opts.Circular {
return minInt64(f.Size, f.Opts.MaxSize)
}
return f.Size
}
// for regular files this is just 0
// for circular files this is the index of the first byte of data we have
func (f BlockFile) DataStartIdx() int64 {
if f.Opts.Circular && f.Size > f.Opts.MaxSize {
return f.Size - f.Opts.MaxSize
}
return 0
}
// this works because lower levels are immutable
func copyMeta(meta FileMeta) FileMeta {
newMeta := make(FileMeta)
@ -265,28 +285,40 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string)
return
}
func (s *BlockStore) FlushCache(ctx context.Context) error {
type FlushStats struct {
FlushDuration time.Duration
NumDirtyEntries int
NumCommitted int
}
func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
wasFlushing := s.setUnlessFlushing()
if wasFlushing {
return fmt.Errorf("flush already in progress")
return stats, fmt.Errorf("flush already in progress")
}
defer s.setIsFlushing(false)
startTime := time.Now()
defer func() {
stats.FlushDuration = time.Since(startTime)
}()
// get a copy of dirty keys so we can iterate without the lock
dirtyCacheKeys := s.getDirtyCacheKeys()
stats.NumDirtyEntries = len(dirtyCacheKeys)
for _, key := range dirtyCacheKeys {
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
return entry.flushToDB(ctx, false)
})
if ctx.Err() != nil {
// transient error (also must stop the loop)
return ctx.Err()
return stats, ctx.Err()
}
if err != nil {
return fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
return stats, fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
}
stats.NumCommitted++
}
return nil
return stats, nil
}
///////////////////////////////////
@ -367,7 +399,32 @@ func (s *BlockStore) setUnlessFlushing() bool {
}
s.IsFlushing = true
return false
}
func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
defer cancelFn()
return s.FlushCache(ctx)
}
func (s *BlockStore) runFlusher() {
defer func() {
if r := recover(); r != nil {
log.Printf("panic in blockstore flusher: %v\n", r)
debug.PrintStack()
}
}()
for {
stats, err := s.runFlushWithNewContext()
if err != nil || stats.NumDirtyEntries > 0 {
log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
}
if stopFlush.Load() {
log.Printf("blockstore flusher stopping\n")
return
}
time.Sleep(DefaultFlushTime)
}
}
func minInt64(a, b int64) int64 {

View File

@ -11,10 +11,16 @@ import (
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrAlreadyExists = fmt.Errorf("file already exists")
func dbInsertFile(ctx context.Context, file *BlockFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
query := "SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?"
if tx.Exists(query, file.BlockId, file.Name) {
return ErrAlreadyExists
}
query = "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})

View File

@ -42,6 +42,9 @@ func InitBlockstore() error {
if err != nil {
return err
}
if !stopFlush.Load() {
go GBS.runFlusher()
}
log.Printf("blockstore initialized\n")
return nil
}

View File

@ -565,7 +565,7 @@ func TestSimpleDBFlush(t *testing.T) {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
err = GBS.FlushCache(ctx)
_, err = GBS.FlushCache(ctx)
if err != nil {
t.Fatalf("error flushing cache: %v", err)
}

View File

@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
@ -108,6 +109,19 @@ func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string)
if err != nil {
return nil, fmt.Errorf("error setting active tab: %w", err)
}
// check all blocks in tab and start controllers (if necessary)
tab, err := wstore.DBMustGet[*wstore.Tab](ctx, tabId)
if err != nil {
return nil, fmt.Errorf("error getting tab: %w", err)
}
for _, blockId := range tab.BlockIds {
blockErr := blockcontroller.StartBlockController(ctx, blockId)
if blockErr != nil {
// we don't want to fail the set active tab operation if a block controller fails to start
log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr)
continue
}
}
return updatesRtn(ctx, nil)
}