From 333a979529128394c61954b690d17a0577c64c39 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 18:27:38 -0700 Subject: [PATCH 1/7] hook up blockstore flusher --- pkg/blockstore/blockstore.go | 49 +++++++++++++++++-- .../{dbsetup.go => blockstore_dbsetup.go} | 3 ++ pkg/blockstore/blockstore_test.go | 2 +- 3 files changed, 48 insertions(+), 6 deletions(-) rename pkg/blockstore/{dbsetup.go => blockstore_dbsetup.go} (97%) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index f770b51df..fcf19f9fd 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -11,6 +11,8 @@ import ( "context" "fmt" "io/fs" + "log" + "runtime/debug" "sync" "sync/atomic" "time" @@ -265,28 +267,40 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) return } -func (s *BlockStore) FlushCache(ctx context.Context) error { +type FlushStats struct { + FlushDuration time.Duration + NumDirtyEntries int + NumCommitted int +} + +func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) { wasFlushing := s.setUnlessFlushing() if wasFlushing { - return fmt.Errorf("flush already in progress") + return stats, fmt.Errorf("flush already in progress") } defer s.setIsFlushing(false) + startTime := time.Now() + defer func() { + stats.FlushDuration = time.Since(startTime) + }() // get a copy of dirty keys so we can iterate without the lock dirtyCacheKeys := s.getDirtyCacheKeys() + stats.NumDirtyEntries = len(dirtyCacheKeys) for _, key := range dirtyCacheKeys { err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { return entry.flushToDB(ctx, false) }) if ctx.Err() != nil { // transient error (also must stop the loop) - return ctx.Err() + return stats, ctx.Err() } if err != nil { - return fmt.Errorf("error flushing cache entry[%v]: %v", key, err) + return stats, fmt.Errorf("error flushing cache entry[%v]: %v", key, err) } + stats.NumCommitted++ } - return nil + return stats, nil } /////////////////////////////////// @@ -367,7 +381,32 @@ func (s *BlockStore) setUnlessFlushing() bool { } s.IsFlushing = true return false +} +func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime) + defer cancelFn() + return s.FlushCache(ctx) +} + +func (s *BlockStore) runFlusher() { + defer func() { + if r := recover(); r != nil { + log.Printf("panic in blockstore flusher: %v\n", r) + debug.PrintStack() + } + }() + for { + stats, err := s.runFlushWithNewContext() + if err != nil || stats.NumDirtyEntries > 0 { + log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err) + } + if stopFlush.Load() { + log.Printf("blockstore flusher stopping\n") + return + } + time.Sleep(DefaultFlushTime) + } } func minInt64(a, b int64) int64 { diff --git a/pkg/blockstore/dbsetup.go b/pkg/blockstore/blockstore_dbsetup.go similarity index 97% rename from pkg/blockstore/dbsetup.go rename to pkg/blockstore/blockstore_dbsetup.go index a4937ab97..51d66911b 100644 --- a/pkg/blockstore/dbsetup.go +++ b/pkg/blockstore/blockstore_dbsetup.go @@ -42,6 +42,9 @@ func InitBlockstore() error { if err != nil { return err } + if !stopFlush.Load() { + go GBS.runFlusher() + } log.Printf("blockstore initialized\n") return nil } diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 170f58973..f2fc475ed 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -565,7 +565,7 @@ func TestSimpleDBFlush(t *testing.T) { t.Fatalf("error writing data: %v", err) } checkFileData(t, ctx, blockId, fileName, "hello world!") - err = GBS.FlushCache(ctx) + _, err = GBS.FlushCache(ctx) if err != nil { t.Fatalf("error flushing cache: %v", err) } From 4d3ab0e822be59760ee1f21ead336cc53a5e2950 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 18:34:10 -0700 Subject: [PATCH 2/7] install a shutdown handler --- main.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/main.go b/main.go index cb923c3a2..a21674157 100644 --- a/main.go +++ b/main.go @@ -11,8 +11,11 @@ import ( "fmt" "log" "net/http" + "os" + "os/signal" "runtime" "strings" + "syscall" "time" "github.com/wavetermdev/thenextwave/pkg/blockstore" @@ -101,6 +104,27 @@ func (wah waveAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wah.AssetHandler.ServeHTTP(w, r) } +func doShutdown(reason string) { + log.Printf("shutting down: %s\n", reason) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + // TODO deal with flush in progress + blockstore.GBS.FlushCache(ctx) + time.Sleep(200 * time.Millisecond) + os.Exit(0) +} + +func installShutdownSignalHandlers() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) + go func() { + for sig := range sigCh { + doShutdown(fmt.Sprintf("got signal %v", sig)) + break + } + }() +} + func main() { err := wavebase.EnsureWaveHomeDir() if err != nil { @@ -129,6 +153,7 @@ func main() { log.Printf("error ensuring initial data: %v\n", err) return } + installShutdownSignalHandlers() app := application.New(application.Options{ Name: "NextWave", From ae24e46eceb2bfadb00d632d43f31a387afaaee5 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 21:08:00 -0700 Subject: [PATCH 3/7] start blockcontrollers on switch tab --- pkg/service/objectservice/objectservice.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/service/objectservice/objectservice.go b/pkg/service/objectservice/objectservice.go index 372ec1b77..1d047acdf 100644 --- a/pkg/service/objectservice/objectservice.go +++ b/pkg/service/objectservice/objectservice.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "log" "strings" "time" @@ -108,6 +109,19 @@ func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string) if err != nil { return nil, fmt.Errorf("error setting active tab: %w", err) } + // check all blocks in tab and start controllers (if necessary) + tab, err := wstore.DBMustGet[*wstore.Tab](ctx, tabId) + if err != nil { + return nil, fmt.Errorf("error getting tab: %w", err) + } + for _, blockId := range tab.BlockIds { + blockErr := blockcontroller.StartBlockController(ctx, blockId) + if blockErr != nil { + // we don't want to fail the set active tab operation if a block controller fails to start + log.Printf("error starting block controller (blockid:%s): %w", blockId, blockErr) + continue + } + } return updatesRtn(ctx, nil) } From bc18869b2ed1f76c38d1c38fd644ee980e3d2317 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 21:11:50 -0700 Subject: [PATCH 4/7] restart block controllers for initial tab when creating a window as well --- main.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/main.go b/main.go index a21674157..0699ddec7 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,18 @@ func createWindow(windowData *wstore.Window, app *application.App) { eventbus.UnregisterWailsWindow(window.ID()) }) window.Show() + go func() { + time.Sleep(100 * time.Millisecond) + objectService := &objectservice.ObjectService{} + uiContext := wstore.UIContext{ + WindowId: windowData.OID, + ActiveTabId: windowData.ActiveTabId, + } + _, err := objectService.SetActiveTab(uiContext, windowData.ActiveTabId) + if err != nil { + log.Printf("error setting active tab for new window: %v\n", err) + } + }() } type waveAssetHandler struct { From bff46d98221c4f7b06dc53f3eec2bc5c577a008a Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 21:44:47 -0700 Subject: [PATCH 5/7] write pty output to blockstore. initialize blockstore file on controller start. create frontend api to read the blockfile --- main.go | 45 ++++++++++++++++++++++++++ pkg/blockcontroller/blockcontroller.go | 45 ++++++++++++++++++++------ pkg/blockstore/blockstore.go | 24 ++++++++++++-- pkg/blockstore/blockstore_dbops.go | 8 ++++- 4 files changed, 109 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 0699ddec7..0a295aca7 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "context" "embed" "fmt" + "io/fs" "log" "net/http" "os" @@ -18,6 +19,7 @@ import ( "syscall" "time" + "github.com/google/uuid" "github.com/wavetermdev/thenextwave/pkg/blockstore" "github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/service/blockservice" @@ -98,13 +100,56 @@ type waveAssetHandler struct { AssetHandler http.Handler } +func serveBlockFile(w http.ResponseWriter, r *http.Request) { + blockId := r.URL.Query().Get("blockid") + name := r.URL.Query().Get("name") + if _, err := uuid.Parse(blockId); err != nil { + http.Error(w, fmt.Sprintf("invalid blockid: %v", err), http.StatusBadRequest) + return + } + if name == "" { + http.Error(w, "name is required", http.StatusBadRequest) + return + + } + file, err := blockstore.GBS.Stat(r.Context(), blockId, name) + if err == fs.ErrNotExist { + http.NotFound(w, r) + return + } + if err != nil { + http.Error(w, fmt.Sprintf("error getting file info: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size)) + for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize { + _, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize) + if err != nil { + if offset == 0 { + http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError) + } else { + // nothing to do, the headers have already been sent + log.Printf("error reading file %s/%s @ %d: %v\n", blockId, name, offset, err) + } + return + } + w.Write(data) + } +} + func serveWaveUrls(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Cache-Control", "no-cache") if r.URL.Path == "/wave/stream-file" { fileName := r.URL.Query().Get("path") fileName = wavebase.ExpandHomeDir(fileName) http.ServeFile(w, r, fileName) return } + if r.URL.Path == "/wave/blockfile" { + serveBlockFile(w, r) + return + } http.NotFound(w, r) } diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index d78f4edb9..69f951dd4 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -15,6 +15,7 @@ import ( "github.com/creack/pty" "github.com/wailsapp/wails/v3/pkg/application" + "github.com/wavetermdev/thenextwave/pkg/blockstore" "github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/wstore" @@ -86,7 +87,35 @@ func (bc *BlockController) Close() { } } +const DefaultTermMaxFileSize = 256 * 1024 + +func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", data) + if err != nil { + return fmt.Errorf("error appending to blockfile: %w", err) + } + eventbus.SendEvent(application.WailsEvent{ + Name: "block:ptydata", + Data: map[string]any{ + "blockid": bc.BlockId, + "blockfile": "main", + "ptydata": base64.StdEncoding.EncodeToString(data), + "seqnum": seqNum, + }, + }) + return nil +} + func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { + // create a circular blockfile for the output + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + err := blockstore.GBS.MakeFile(ctx, bc.BlockId, "main", nil, blockstore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true}) + if err != nil && err != blockstore.ErrAlreadyExists { + return fmt.Errorf("error creating blockfile: %w", err) + } if bc.getShellProc() != nil { return nil } @@ -114,15 +143,13 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { for { nr, err := bc.ShellProc.Pty.Read(buf) seqNum++ - eventbus.SendEvent(application.WailsEvent{ - Name: "block:ptydata", - Data: map[string]any{ - "blockid": bc.BlockId, - "blockfile": "main", - "ptydata": base64.StdEncoding.EncodeToString(buf[:nr]), - "seqnum": seqNum, - }, - }) + if nr > 0 { + handleDataErr := bc.handleShellProcData(buf[:nr], seqNum) + if handleDataErr != nil { + log.Printf("error handling shell data: %v\n", handleDataErr) + break + } + } if err == io.EOF { break } diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index fcf19f9fd..516cbaffc 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -35,9 +35,9 @@ var GBS *BlockStore = &BlockStore{ } type FileOptsType struct { - MaxSize int64 - Circular bool - IJson bool + MaxSize int64 `json:"maxsize,omitempty"` + Circular bool `json:"circular,omitempty"` + IJson bool `json:"ijson,omitempty"` } type FileMeta = map[string]any @@ -55,6 +55,24 @@ type BlockFile struct { Meta FileMeta `json:"meta"` // only top-level keys can be updated (lower levels are immutable) } +// for regular files this is just Size +// for circular files this is min(Size, MaxSize) +func (f BlockFile) DataLength() int64 { + if f.Opts.Circular { + return minInt64(f.Size, f.Opts.MaxSize) + } + return f.Size +} + +// for regular files this is just 0 +// for circular files this is the index of the first byte of data we have +func (f BlockFile) DataStartIdx() int64 { + if f.Opts.Circular && f.Size > f.Opts.MaxSize { + return f.Size - f.Opts.MaxSize + } + return 0 +} + // this works because lower levels are immutable func copyMeta(meta FileMeta) FileMeta { newMeta := make(FileMeta) diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 5b9a41964..112c25aaf 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -11,10 +11,16 @@ import ( "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" ) +var ErrAlreadyExists = fmt.Errorf("file already exists") + func dbInsertFile(ctx context.Context, file *BlockFile) error { // will fail if file already exists return WithTx(ctx, func(tx *TxWrap) error { - query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" + query := "SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?" + if tx.Exists(query, file.BlockId, file.Name) { + return ErrAlreadyExists + } + query = "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta)) return nil }) From 30ef0113385a2153ddf28378214aaa4309caaec4 Mon Sep 17 00:00:00 2001 From: sawka Date: Tue, 28 May 2024 22:54:49 -0700 Subject: [PATCH 6/7] add x-blockfileinfo and last-modified to blockfile handler --- main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/main.go b/main.go index 0a295aca7..505f68230 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,8 @@ package main import ( "context" "embed" + "encoding/base64" + "encoding/json" "fmt" "io/fs" "log" @@ -121,8 +123,14 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("error getting file info: %v", err), http.StatusInternalServerError) return } + jsonFileBArr, err := json.Marshal(file) + if err != nil { + http.Error(w, fmt.Sprintf("error serializing file info: %v", err), http.StatusInternalServerError) + } w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size)) + w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr)) + w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat)) for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize { _, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize) if err != nil { From 02cda396e8def5b8e6012d4ecbd0658ca6cb7858 Mon Sep 17 00:00:00 2001 From: sawka Date: Wed, 29 May 2024 00:28:25 -0700 Subject: [PATCH 7/7] restore terminal state when loading term view --- frontend/app/view/term.tsx | 46 +++++++++++++++++++++- pkg/blockcontroller/blockcontroller.go | 20 ++++++++++ pkg/service/objectservice/objectservice.go | 2 +- 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/frontend/app/view/term.tsx b/frontend/app/view/term.tsx index e49685ddb..c06cc0372 100644 --- a/frontend/app/view/term.tsx +++ b/frontend/app/view/term.tsx @@ -40,14 +40,21 @@ function getThemeFromCSSVars(el: Element): ITheme { return theme; } +type InitialLoadDataType = { + loaded: boolean; + heldData: Uint8Array[]; +}; + const TerminalView = ({ blockId }: { blockId: string }) => { const connectElemRef = React.useRef(null); const termRef = React.useRef(null); + const initialLoadRef = React.useRef({ loaded: false, heldData: [] }); React.useEffect(() => { if (!connectElemRef.current) { return; } + console.log("terminal created"); const term = new Terminal({ theme: getThemeFromCSSVars(connectElemRef.current), fontSize: 12, @@ -89,7 +96,11 @@ const TerminalView = ({ blockId }: { blockId: string }) => { blockSubject.subscribe((data) => { // base64 decode const decodedData = base64ToArray(data.ptydata); - term.write(decodedData); + if (initialLoadRef.current.loaded) { + term.write(decodedData); + } else { + initialLoadRef.current.heldData.push(decodedData); + } }); return () => { @@ -99,6 +110,39 @@ const TerminalView = ({ blockId }: { blockId: string }) => { }; }, [connectElemRef.current]); + React.useEffect(() => { + if (!termRef.current) { + return; + } + // load data from blockfile + const startTs = Date.now(); + let loadedBytes = 0; + const localTerm = termRef.current; // avoids devmode double effect running issue (terminal gets created twice) + const usp = new URLSearchParams(); + usp.set("blockid", blockId); + usp.set("name", "main"); + fetch("/wave/blockfile?" + usp.toString()) + .then((resp) => { + if (resp.ok) { + return resp.arrayBuffer(); + } + console.log("error loading blockfile", resp.status, resp.statusText); + }) + .then((data: ArrayBuffer) => { + const uint8View = new Uint8Array(data); + localTerm.write(uint8View); + loadedBytes = uint8View.byteLength; + }) + .finally(() => { + initialLoadRef.current.heldData.forEach((data) => { + localTerm.write(data); + }); + initialLoadRef.current.loaded = true; + initialLoadRef.current.heldData = []; + console.log(`terminal loaded blockfile ${loadedBytes} bytes, ${Date.now() - startTs}ms`); + }); + }, [termRef.current]); + return (
diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index 69f951dd4..df82bc49d 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -4,6 +4,7 @@ package blockcontroller import ( + "bytes" "context" "encoding/base64" "encoding/json" @@ -108,6 +109,21 @@ func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error { return nil } +func (bc *BlockController) resetTerminalState() { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + var buf bytes.Buffer + // buf.WriteString("\x1b[?1049l") // disable alternative buffer + buf.WriteString("\x1b[0m") // reset attributes + buf.WriteString("\x1b[?25h") // show cursor + buf.WriteString("\x1b[?1000l") // disable mouse tracking + buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n") + err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", buf.Bytes()) + if err != nil { + log.Printf("error appending to blockfile (terminal reset): %v\n", err) + } +} + func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { // create a circular blockfile for the output ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) @@ -116,6 +132,10 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { if err != nil && err != blockstore.ErrAlreadyExists { return fmt.Errorf("error creating blockfile: %w", err) } + if err == blockstore.ErrAlreadyExists { + // reset the terminal state + bc.resetTerminalState() + } if bc.getShellProc() != nil { return nil } diff --git a/pkg/service/objectservice/objectservice.go b/pkg/service/objectservice/objectservice.go index 1d047acdf..6fb3c844b 100644 --- a/pkg/service/objectservice/objectservice.go +++ b/pkg/service/objectservice/objectservice.go @@ -118,7 +118,7 @@ func (svc *ObjectService) SetActiveTab(uiContext wstore.UIContext, tabId string) blockErr := blockcontroller.StartBlockController(ctx, blockId) if blockErr != nil { // we don't want to fail the set active tab operation if a block controller fails to start - log.Printf("error starting block controller (blockid:%s): %w", blockId, blockErr) + log.Printf("error starting block controller (blockid:%s): %v", blockId, blockErr) continue } }