From c191fc89453906467944abd2ff34fc7869914f11 Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 3 Jun 2024 13:03:21 -0700 Subject: [PATCH] rename blockstore to filestore. rename blockid to zoneid. --- db/db.go | 4 +- db/migrations-blockstore/000001_init.down.sql | 3 - db/migrations-filestore/000001_init.down.sql | 3 + .../000001_init.up.sql | 12 +- main.go | 14 +- pkg/blockcontroller/blockcontroller.go | 12 +- pkg/blockcontroller/shell_controller.go | 4 + pkg/blockstore/blockstore_dbops.go | 117 -------- pkg/{blockstore => filestore}/blockstore.go | 114 ++++---- .../blockstore_cache.go | 54 ++-- pkg/filestore/blockstore_dbops.go | 117 ++++++++ .../blockstore_dbsetup.go | 16 +- .../blockstore_test.go | 268 +++++++++--------- pkg/service/fileservice/fileservice.go | 14 + 14 files changed, 385 insertions(+), 367 deletions(-) delete mode 100644 db/migrations-blockstore/000001_init.down.sql create mode 100644 db/migrations-filestore/000001_init.down.sql rename db/{migrations-blockstore => migrations-filestore}/000001_init.up.sql (56%) create mode 100644 pkg/blockcontroller/shell_controller.go delete mode 100644 pkg/blockstore/blockstore_dbops.go rename pkg/{blockstore => filestore}/blockstore.go (68%) rename pkg/{blockstore => filestore}/blockstore_cache.go (83%) create mode 100644 pkg/filestore/blockstore_dbops.go rename pkg/{blockstore => filestore}/blockstore_dbsetup.go (83%) rename pkg/{blockstore => filestore}/blockstore_test.go (59%) diff --git a/db/db.go b/db/db.go index b13b72fbb..3dfaf3fb8 100644 --- a/db/db.go +++ b/db/db.go @@ -5,8 +5,8 @@ package db import "embed" -//go:embed migrations-blockstore/*.sql -var BlockstoreMigrationFS embed.FS +//go:embed migrations-filestore/*.sql +var FilestoreMigrationFS embed.FS //go:embed migrations-wstore/*.sql var WStoreMigrationFS embed.FS diff --git a/db/migrations-blockstore/000001_init.down.sql b/db/migrations-blockstore/000001_init.down.sql deleted file mode 100644 index 1cb72b3fd..000000000 --- a/db/migrations-blockstore/000001_init.down.sql +++ /dev/null @@ -1,3 +0,0 @@ -DROP TABLE block_file; - -DROP TABLE block_data; diff --git a/db/migrations-filestore/000001_init.down.sql b/db/migrations-filestore/000001_init.down.sql new file mode 100644 index 000000000..534c404e7 --- /dev/null +++ b/db/migrations-filestore/000001_init.down.sql @@ -0,0 +1,3 @@ +DROP TABLE db_wave_file; + +DROP TABLE db_file_data; diff --git a/db/migrations-blockstore/000001_init.up.sql b/db/migrations-filestore/000001_init.up.sql similarity index 56% rename from db/migrations-blockstore/000001_init.up.sql rename to db/migrations-filestore/000001_init.up.sql index fce5a6986..af9fcf8c0 100644 --- a/db/migrations-blockstore/000001_init.up.sql +++ b/db/migrations-filestore/000001_init.up.sql @@ -1,19 +1,19 @@ -CREATE TABLE db_block_file ( - blockid varchar(36) NOT NULL, +CREATE TABLE db_wave_file ( + zoneid varchar(36) NOT NULL, name varchar(200) NOT NULL, size bigint NOT NULL, createdts bigint NOT NULL, modts bigint NOT NULL, opts json NOT NULL, meta json NOT NULL, - PRIMARY KEY (blockid, name) + PRIMARY KEY (zoneid, name) ); -CREATE TABLE db_block_data ( - blockid varchar(36) NOT NULL, +CREATE TABLE db_file_data ( + zoneid varchar(36) NOT NULL, name varchar(200) NOT NULL, partidx int NOT NULL, data blob NOT NULL, - PRIMARY KEY(blockid, name, partidx) + PRIMARY KEY(zoneid, name, partidx) ); diff --git a/main.go b/main.go index 505f68230..a4be0f6e4 100644 --- a/main.go +++ b/main.go @@ -22,8 +22,8 @@ import ( "time" "github.com/google/uuid" - "github.com/wavetermdev/thenextwave/pkg/blockstore" "github.com/wavetermdev/thenextwave/pkg/eventbus" + "github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/service/blockservice" "github.com/wavetermdev/thenextwave/pkg/service/clientservice" "github.com/wavetermdev/thenextwave/pkg/service/fileservice" @@ -114,7 +114,7 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) { return } - file, err := blockstore.GBS.Stat(r.Context(), blockId, name) + file, err := filestore.WFS.Stat(r.Context(), blockId, name) if err == fs.ErrNotExist { http.NotFound(w, r) return @@ -131,8 +131,8 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size)) w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr)) w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat)) - for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize { - _, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize) + for offset := file.DataStartIdx(); offset < file.Size; offset += filestore.DefaultPartDataSize { + _, data, err := filestore.WFS.ReadAt(r.Context(), blockId, name, offset, filestore.DefaultPartDataSize) if err != nil { if offset == 0 { http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError) @@ -174,7 +174,7 @@ func doShutdown(reason string) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() // TODO deal with flush in progress - blockstore.GBS.FlushCache(ctx) + filestore.WFS.FlushCache(ctx) time.Sleep(200 * time.Millisecond) os.Exit(0) } @@ -203,9 +203,9 @@ func main() { } log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir()) - err = blockstore.InitBlockstore() + err = filestore.InitFilestore() if err != nil { - log.Printf("error initializing blockstore: %v\n", err) + log.Printf("error initializing filestore: %v\n", err) return } err = wstore.InitWStore() diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index df82bc49d..a7580617d 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -16,8 +16,8 @@ import ( "github.com/creack/pty" "github.com/wailsapp/wails/v3/pkg/application" - "github.com/wavetermdev/thenextwave/pkg/blockstore" "github.com/wavetermdev/thenextwave/pkg/eventbus" + "github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/wstore" ) @@ -93,7 +93,7 @@ const DefaultTermMaxFileSize = 256 * 1024 func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error { ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) defer cancelFn() - err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", data) + err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", data) if err != nil { return fmt.Errorf("error appending to blockfile: %w", err) } @@ -118,7 +118,7 @@ func (bc *BlockController) resetTerminalState() { buf.WriteString("\x1b[?25h") // show cursor buf.WriteString("\x1b[?1000l") // disable mouse tracking buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n") - err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", buf.Bytes()) + err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", buf.Bytes()) if err != nil { log.Printf("error appending to blockfile (terminal reset): %v\n", err) } @@ -128,11 +128,11 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error { // create a circular blockfile for the output ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) defer cancelFn() - err := blockstore.GBS.MakeFile(ctx, bc.BlockId, "main", nil, blockstore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true}) - if err != nil && err != blockstore.ErrAlreadyExists { + err := filestore.WFS.MakeFile(ctx, bc.BlockId, "main", nil, filestore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true}) + if err != nil && err != filestore.ErrAlreadyExists { return fmt.Errorf("error creating blockfile: %w", err) } - if err == blockstore.ErrAlreadyExists { + if err == filestore.ErrAlreadyExists { // reset the terminal state bc.resetTerminalState() } diff --git a/pkg/blockcontroller/shell_controller.go b/pkg/blockcontroller/shell_controller.go new file mode 100644 index 000000000..26eb62525 --- /dev/null +++ b/pkg/blockcontroller/shell_controller.go @@ -0,0 +1,4 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockcontroller diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go deleted file mode 100644 index 112c25aaf..000000000 --- a/pkg/blockstore/blockstore_dbops.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2024, Command Line Inc. -// SPDX-License-Identifier: Apache-2.0 - -package blockstore - -import ( - "context" - "fmt" - "os" - - "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" -) - -var ErrAlreadyExists = fmt.Errorf("file already exists") - -func dbInsertFile(ctx context.Context, file *BlockFile) error { - // will fail if file already exists - return WithTx(ctx, func(tx *TxWrap) error { - query := "SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?" - if tx.Exists(query, file.BlockId, file.Name) { - return ErrAlreadyExists - } - query = "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" - tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta)) - return nil - }) -} - -func dbDeleteFile(ctx context.Context, blockId string, name string) error { - return WithTx(ctx, func(tx *TxWrap) error { - query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?" - tx.Exec(query, blockId, name) - query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?" - tx.Exec(query, blockId, name) - return nil - }) -} - -func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) { - return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { - var files []string - query := "SELECT name FROM db_block_file WHERE blockid = ?" - tx.Select(&files, query, blockId) - return files, nil - }) -} - -func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { - return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { - query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" - file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name) - return file, nil - }) -} - -func dbGetAllBlockIds(ctx context.Context) ([]string, error) { - return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { - var ids []string - query := "SELECT DISTINCT blockid FROM db_block_file" - tx.Select(&ids, query) - return ids, nil - }) -} - -func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) { - if len(parts) == 0 { - return nil, nil - } - return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) { - var data []*DataCacheEntry - query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))" - tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts)) - rtn := make(map[int]*DataCacheEntry) - for _, d := range data { - if cap(d.Data) != int(partDataSize) { - newData := make([]byte, len(d.Data), partDataSize) - copy(newData, d.Data) - d.Data = newData - } - rtn[d.PartIdx] = d - } - return rtn, nil - }) -} - -func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { - return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) { - query := "SELECT * FROM db_block_file WHERE blockid = ?" - files := dbutil.SelectMappable[*BlockFile](tx, query, blockId) - return files, nil - }) -} - -func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error { - return WithTx(ctx, func(tx *TxWrap) error { - query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?` - if !tx.Exists(query, file.BlockId, file.Name) { - // since deletion is synchronous this stops us from writing to a deleted file - return os.ErrNotExist - } - // we don't update CreatedTs or Opts - query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?` - tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name) - if replace { - query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?` - tx.Exec(query, file.BlockId, file.Name) - } - dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` - for partIdx, dataEntry := range dataEntries { - if partIdx != dataEntry.PartIdx { - panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx)) - } - tx.Exec(dataPartQuery, file.BlockId, file.Name, dataEntry.PartIdx, dataEntry.Data) - } - return nil - }) -} diff --git a/pkg/blockstore/blockstore.go b/pkg/filestore/blockstore.go similarity index 68% rename from pkg/blockstore/blockstore.go rename to pkg/filestore/blockstore.go index 516cbaffc..121314035 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/filestore/blockstore.go @@ -1,9 +1,9 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -package blockstore +package filestore -// the blockstore package implements a write cache for block files +// the blockstore package implements a write cache for wave files // it is not a read cache (reads still go to the DB -- unless items are in the cache) // but all writes only go to the cache, and then the cache is periodically flushed to the DB @@ -29,7 +29,7 @@ var flushErrorCount = &atomic.Int32{} var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} -var GBS *BlockStore = &BlockStore{ +var WFS *FileStore = &FileStore{ Lock: &sync.Mutex{}, Cache: make(map[cacheKey]*CacheEntry), } @@ -42,9 +42,9 @@ type FileOptsType struct { type FileMeta = map[string]any -type BlockFile struct { +type WaveFile struct { // these fields are static (not updated) - BlockId string `json:"blockid"` + ZoneId string `json:"zoneid"` Name string `json:"name"` Opts FileOptsType `json:"opts"` CreatedTs int64 `json:"createdts"` @@ -57,7 +57,7 @@ type BlockFile struct { // for regular files this is just Size // for circular files this is min(Size, MaxSize) -func (f BlockFile) DataLength() int64 { +func (f WaveFile) DataLength() int64 { if f.Opts.Circular { return minInt64(f.Size, f.Opts.MaxSize) } @@ -66,7 +66,7 @@ func (f BlockFile) DataLength() int64 { // for regular files this is just 0 // for circular files this is the index of the first byte of data we have -func (f BlockFile) DataStartIdx() int64 { +func (f WaveFile) DataStartIdx() int64 { if f.Opts.Circular && f.Size > f.Opts.MaxSize { return f.Size - f.Opts.MaxSize } @@ -82,7 +82,7 @@ func copyMeta(meta FileMeta) FileMeta { return newMeta } -func (f *BlockFile) DeepCopy() *BlockFile { +func (f *WaveFile) DeepCopy() *WaveFile { if f == nil { return nil } @@ -91,19 +91,19 @@ func (f *BlockFile) DeepCopy() *BlockFile { return &newFile } -func (BlockFile) UseDBMap() {} +func (WaveFile) UseDBMap() {} -type BlockData struct { - BlockId string `json:"blockid"` +type FileData struct { + ZoneId string `json:"zoneid"` Name string `json:"name"` PartIdx int `json:"partidx"` Data []byte `json:"data"` } -func (BlockData) UseDBMap() {} +func (FileData) UseDBMap() {} // synchronous (does not interact with the cache) -func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error { +func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, meta FileMeta, opts FileOptsType) error { if opts.MaxSize < 0 { return fmt.Errorf("max size must be non-negative") } @@ -118,13 +118,13 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize } } - return withLock(s, blockId, name, func(entry *CacheEntry) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { if entry.File != nil { return fs.ErrExist } now := time.Now().UnixMilli() - file := &BlockFile{ - BlockId: blockId, + file := &WaveFile{ + ZoneId: zoneId, Name: name, Size: 0, CreatedTs: now, @@ -136,9 +136,9 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, }) } -func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error { - return withLock(s, blockId, name, func(entry *CacheEntry) error { - err := dbDeleteFile(ctx, blockId, name) +func (s *FileStore) DeleteFile(ctx context.Context, zoneId string, name string) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { + err := dbDeleteFile(ctx, zoneId, name) if err != nil { return fmt.Errorf("error deleting file: %v", err) } @@ -147,20 +147,20 @@ func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string }) } -func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { - fileNames, err := dbGetBlockFileNames(ctx, blockId) +func (s *FileStore) DeleteZone(ctx context.Context, zoneId string) error { + fileNames, err := dbGetZoneFileNames(ctx, zoneId) if err != nil { - return fmt.Errorf("error getting block files: %v", err) + return fmt.Errorf("error getting zone files: %v", err) } for _, name := range fileNames { - s.DeleteFile(ctx, blockId, name) + s.DeleteFile(ctx, zoneId, name) } return nil } // if file doesn't exsit, returns fs.ErrNotExist -func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { - return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) { +func (s *FileStore) Stat(ctx context.Context, zoneId string, name string) (*WaveFile, error) { + return withLockRtn(s, zoneId, name, func(entry *CacheEntry) (*WaveFile, error) { file, err := entry.loadFileForRead(ctx) if err != nil { return nil, fmt.Errorf("error getting file: %v", err) @@ -169,13 +169,13 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl }) } -func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { - files, err := dbGetBlockFiles(ctx, blockId) +func (s *FileStore) ListFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) { + files, err := dbGetZoneFiles(ctx, zoneId) if err != nil { - return nil, fmt.Errorf("error getting block files: %v", err) + return nil, fmt.Errorf("error getting zone files: %v", err) } for idx, file := range files { - withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error { + withLock(s, file.ZoneId, file.Name, func(entry *CacheEntry) error { if entry.File != nil { files[idx] = entry.File.DeepCopy() } @@ -185,8 +185,8 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil return files, nil } -func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { - return withLock(s, blockId, name, func(entry *CacheEntry) error { +func (s *FileStore) WriteMeta(ctx context.Context, zoneId string, name string, meta FileMeta, merge bool) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { err := entry.loadFileIntoCache(ctx) if err != nil { return err @@ -207,8 +207,8 @@ func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, }) } -func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error { - return withLock(s, blockId, name, func(entry *CacheEntry) error { +func (s *FileStore) WriteFile(ctx context.Context, zoneId string, name string, data []byte) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { err := entry.loadFileIntoCache(ctx) if err != nil { return err @@ -219,11 +219,11 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, }) } -func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { +func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, offset int64, data []byte) error { if offset < 0 { return fmt.Errorf("offset must be non-negative") } - return withLock(s, blockId, name, func(entry *CacheEntry) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { err := entry.loadFileIntoCache(ctx) if err != nil { return err @@ -243,8 +243,8 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o }) } -func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { - return withLock(s, blockId, name, func(entry *CacheEntry) error { +func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string, data []byte) error { + return withLock(s, zoneId, name, func(entry *CacheEntry) error { err := entry.loadFileIntoCache(ctx) if err != nil { return err @@ -262,14 +262,14 @@ func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string }) } -func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { - return dbGetAllBlockIds(ctx) +func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) { + return dbGetAllZoneIds(ctx) } // returns (offset, data, error) // we return the offset because the offset may have been adjusted if the size was too big (for circular files) -func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) { - withLock(s, blockId, name, func(entry *CacheEntry) error { +func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) { + withLock(s, zoneId, name, func(entry *CacheEntry) error { rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false) return nil }) @@ -277,8 +277,8 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of } // returns (offset, data, error) -func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) { - withLock(s, blockId, name, func(entry *CacheEntry) error { +func (s *FileStore) ReadFile(ctx context.Context, zoneId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) { + withLock(s, zoneId, name, func(entry *CacheEntry) error { rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true) return nil }) @@ -291,7 +291,7 @@ type FlushStats struct { NumCommitted int } -func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) { +func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) { wasFlushing := s.setUnlessFlushing() if wasFlushing { return stats, fmt.Errorf("flush already in progress") @@ -306,7 +306,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e dirtyCacheKeys := s.getDirtyCacheKeys() stats.NumDirtyEntries = len(dirtyCacheKeys) for _, key := range dirtyCacheKeys { - err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { + err := withLock(s, key.ZoneId, key.Name, func(entry *CacheEntry) error { return entry.flushToDB(ctx, false) }) if ctx.Err() != nil { @@ -323,7 +323,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e /////////////////////////////////// -func (f *BlockFile) partIdxAtOffset(offset int64) int { +func (f *WaveFile) partIdxAtOffset(offset int64) int { partIdx := int(offset / partDataSize) if f.Opts.Circular { maxPart := int(f.Opts.MaxSize / partDataSize) @@ -351,11 +351,11 @@ func getPartIdxsFromMap(partMap map[int]int) []int { } // returns a map of partIdx to amount of data to write to that part -func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int { +func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int { partMap := make(map[int]int) endOffset := startOffset + size - startBlockOffset := startOffset - (startOffset % partDataSize) - for testOffset := startBlockOffset; testOffset < endOffset; testOffset += partDataSize { + startFileOffset := startOffset - (startOffset % partDataSize) + for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize { partIdx := file.partIdxAtOffset(testOffset) partStartOffset := testOffset partEndOffset := testOffset + partDataSize @@ -372,7 +372,7 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int return partMap } -func (s *BlockStore) getDirtyCacheKeys() []cacheKey { +func (s *FileStore) getDirtyCacheKeys() []cacheKey { s.Lock.Lock() defer s.Lock.Unlock() var dirtyCacheKeys []cacheKey @@ -384,14 +384,14 @@ func (s *BlockStore) getDirtyCacheKeys() []cacheKey { return dirtyCacheKeys } -func (s *BlockStore) setIsFlushing(flushing bool) { +func (s *FileStore) setIsFlushing(flushing bool) { s.Lock.Lock() defer s.Lock.Unlock() s.IsFlushing = flushing } // returns old value of IsFlushing -func (s *BlockStore) setUnlessFlushing() bool { +func (s *FileStore) setUnlessFlushing() bool { s.Lock.Lock() defer s.Lock.Unlock() if s.IsFlushing { @@ -401,26 +401,26 @@ func (s *BlockStore) setUnlessFlushing() bool { return false } -func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) { +func (s *FileStore) runFlushWithNewContext() (FlushStats, error) { ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime) defer cancelFn() return s.FlushCache(ctx) } -func (s *BlockStore) runFlusher() { +func (s *FileStore) runFlusher() { defer func() { if r := recover(); r != nil { - log.Printf("panic in blockstore flusher: %v\n", r) + log.Printf("panic in filestore flusher: %v\n", r) debug.PrintStack() } }() for { stats, err := s.runFlushWithNewContext() if err != nil || stats.NumDirtyEntries > 0 { - log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err) + log.Printf("filestore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err) } if stopFlush.Load() { - log.Printf("blockstore flusher stopping\n") + log.Printf("filestore flusher stopping\n") return } time.Sleep(DefaultFlushTime) diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/filestore/blockstore_cache.go similarity index 83% rename from pkg/blockstore/blockstore_cache.go rename to pkg/filestore/blockstore_cache.go index 60f6c27f9..f8608654b 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/filestore/blockstore_cache.go @@ -1,7 +1,7 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -package blockstore +package filestore import ( "bytes" @@ -13,11 +13,11 @@ import ( ) type cacheKey struct { - BlockId string - Name string + ZoneId string + Name string } -type BlockStore struct { +type FileStore struct { Lock *sync.Mutex Cache map[cacheKey]*CacheEntry IsFlushing bool @@ -25,17 +25,17 @@ type BlockStore struct { type DataCacheEntry struct { PartIdx int - Data []byte // capacity is always BlockDataPartSize + Data []byte // capacity is always ZoneDataPartSize } // if File or DataEntries are not nil then they are dirty (need to be flushed to disk) type CacheEntry struct { - PinCount int // this is synchronzed with the BlockStore lock (not the entry lock) + PinCount int // this is synchronzed with the FileStore lock (not the entry lock) Lock *sync.Mutex - BlockId string + ZoneId string Name string - File *BlockFile + File *WaveFile DataEntries map[int]*DataCacheEntry FlushErrors int } @@ -43,7 +43,7 @@ type CacheEntry struct { //lint:ignore U1000 used for testing func (e *CacheEntry) dump() string { var buf bytes.Buffer - fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount) + fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount) fmt.Fprintf(&buf, " FileEntry: %v\n", e.File) for idx, dce := range e.DataEntries { fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data)) @@ -59,28 +59,28 @@ func makeDataCacheEntry(partIdx int) *DataCacheEntry { } // will create new entries -func (s *BlockStore) getEntryAndPin(blockId string, name string) *CacheEntry { +func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry { s.Lock.Lock() defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}] if entry == nil { - entry = makeCacheEntry(blockId, name) - s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry + entry = makeCacheEntry(zoneId, name) + s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry } entry.PinCount++ return entry } -func (s *BlockStore) unpinEntryAndTryDelete(blockId string, name string) { +func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) { s.Lock.Lock() defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}] if entry == nil { return } entry.PinCount-- if entry.PinCount <= 0 && entry.File == nil { - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name}) } } @@ -111,11 +111,11 @@ func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error { } // does not populate the cache entry, returns err if file does not exist -func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error) { +func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) { if entry.File != nil { return entry.File, nil } - file, err := dbGetBlockFile(ctx, entry.BlockId, entry.Name) + file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name) if err != nil { return nil, fmt.Errorf("error getting file: %w", err) } @@ -125,17 +125,17 @@ func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error return file, nil } -func withLock(s *BlockStore, blockId string, name string, fn func(*CacheEntry) error) error { - entry := s.getEntryAndPin(blockId, name) - defer s.unpinEntryAndTryDelete(blockId, name) +func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error { + entry := s.getEntryAndPin(zoneId, name) + defer s.unpinEntryAndTryDelete(zoneId, name) entry.Lock.Lock() defer entry.Lock.Unlock() return fn(entry) } -func withLockRtn[T any](s *BlockStore, blockId string, name string, fn func(*CacheEntry) (T, error)) (T, error) { +func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) { var rtnVal T - rtnErr := withLock(s, blockId, name, func(entry *CacheEntry) error { + rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error { var err error rtnVal, err = fn(entry) return err @@ -273,7 +273,7 @@ func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int // parts are already loaded return nil } - dbDataParts, err := dbGetFileParts(ctx, entry.BlockId, entry.Name, parts) + dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts) if err != nil { return fmt.Errorf("error getting data parts: %w", err) } @@ -291,7 +291,7 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) var dbDataParts map[int]*DataCacheEntry if len(dbParts) > 0 { var err error - dbDataParts, err = dbGetFileParts(ctx, entry.BlockId, entry.Name, dbParts) + dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts) if err != nil { return nil, fmt.Errorf("error getting data parts: %w", err) } @@ -311,10 +311,10 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) return rtn, nil } -func makeCacheEntry(blockId string, name string) *CacheEntry { +func makeCacheEntry(zoneId string, name string) *CacheEntry { return &CacheEntry{ Lock: &sync.Mutex{}, - BlockId: blockId, + ZoneId: zoneId, Name: name, PinCount: 0, File: nil, diff --git a/pkg/filestore/blockstore_dbops.go b/pkg/filestore/blockstore_dbops.go new file mode 100644 index 000000000..e82de5b47 --- /dev/null +++ b/pkg/filestore/blockstore_dbops.go @@ -0,0 +1,117 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package filestore + +import ( + "context" + "fmt" + "os" + + "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" +) + +var ErrAlreadyExists = fmt.Errorf("file already exists") + +func dbInsertFile(ctx context.Context, file *WaveFile) error { + // will fail if file already exists + return WithTx(ctx, func(tx *TxWrap) error { + query := "SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?" + if tx.Exists(query, file.ZoneId, file.Name) { + return ErrAlreadyExists + } + query = "INSERT INTO db_wave_file (zoneid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" + tx.Exec(query, file.ZoneId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta)) + return nil + }) +} + +func dbDeleteFile(ctx context.Context, zoneId string, name string) error { + return WithTx(ctx, func(tx *TxWrap) error { + query := "DELETE FROM db_wave_file WHERE zoneid = ? AND name = ?" + tx.Exec(query, zoneId, name) + query = "DELETE FROM db_file_data WHERE zoneid = ? AND name = ?" + tx.Exec(query, zoneId, name) + return nil + }) +} + +func dbGetZoneFileNames(ctx context.Context, zoneId string) ([]string, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { + var files []string + query := "SELECT name FROM db_wave_file WHERE zoneid = ?" + tx.Select(&files, query, zoneId) + return files, nil + }) +} + +func dbGetZoneFile(ctx context.Context, zoneId string, name string) (*WaveFile, error) { + return WithTxRtn(ctx, func(tx *TxWrap) (*WaveFile, error) { + query := "SELECT * FROM db_wave_file WHERE zoneid = ? AND name = ?" + file := dbutil.GetMappable[*WaveFile](tx, query, zoneId, name) + return file, nil + }) +} + +func dbGetAllZoneIds(ctx context.Context) ([]string, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { + var ids []string + query := "SELECT DISTINCT zoneid FROM db_wave_file" + tx.Select(&ids, query) + return ids, nil + }) +} + +func dbGetFileParts(ctx context.Context, zoneId string, name string, parts []int) (map[int]*DataCacheEntry, error) { + if len(parts) == 0 { + return nil, nil + } + return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) { + var data []*DataCacheEntry + query := "SELECT partidx, data FROM db_file_data WHERE zoneid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))" + tx.Select(&data, query, zoneId, name, dbutil.QuickJsonArr(parts)) + rtn := make(map[int]*DataCacheEntry) + for _, d := range data { + if cap(d.Data) != int(partDataSize) { + newData := make([]byte, len(d.Data), partDataSize) + copy(newData, d.Data) + d.Data = newData + } + rtn[d.PartIdx] = d + } + return rtn, nil + }) +} + +func dbGetZoneFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]*WaveFile, error) { + query := "SELECT * FROM db_wave_file WHERE zoneid = ?" + files := dbutil.SelectMappable[*WaveFile](tx, query, zoneId) + return files, nil + }) +} + +func dbWriteCacheEntry(ctx context.Context, file *WaveFile, dataEntries map[int]*DataCacheEntry, replace bool) error { + return WithTx(ctx, func(tx *TxWrap) error { + query := `SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?` + if !tx.Exists(query, file.ZoneId, file.Name) { + // since deletion is synchronous this stops us from writing to a deleted file + return os.ErrNotExist + } + // we don't update CreatedTs or Opts + query = `UPDATE db_wave_file SET size = ?, modts = ?, meta = ? WHERE zoneid = ? AND name = ?` + tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.ZoneId, file.Name) + if replace { + query = `DELETE FROM db_file_data WHERE zoneid = ? AND name = ?` + tx.Exec(query, file.ZoneId, file.Name) + } + dataPartQuery := `REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?, ?, ?, ?)` + for partIdx, dataEntry := range dataEntries { + if partIdx != dataEntry.PartIdx { + panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx)) + } + tx.Exec(dataPartQuery, file.ZoneId, file.Name, dataEntry.PartIdx, dataEntry.Data) + } + return nil + }) +} diff --git a/pkg/blockstore/blockstore_dbsetup.go b/pkg/filestore/blockstore_dbsetup.go similarity index 83% rename from pkg/blockstore/blockstore_dbsetup.go rename to pkg/filestore/blockstore_dbsetup.go index 51d66911b..63bcf88d9 100644 --- a/pkg/blockstore/blockstore_dbsetup.go +++ b/pkg/filestore/blockstore_dbsetup.go @@ -1,9 +1,9 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -package blockstore +package filestore -// setup for blockstore db +// setup for filestore db // includes migration support and txwrap setup import ( @@ -23,14 +23,14 @@ import ( dbfs "github.com/wavetermdev/thenextwave/db" ) -const BlockstoreDBName = "blockstore.db" +const FilestoreDBName = "filestore.db" type TxWrap = txwrap.TxWrap var globalDB *sqlx.DB var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) -func InitBlockstore() error { +func InitFilestore() error { ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) defer cancelFn() var err error @@ -38,20 +38,20 @@ func InitBlockstore() error { if err != nil { return err } - err = migrateutil.Migrate("blockstore", globalDB.DB, dbfs.BlockstoreMigrationFS, "migrations-blockstore") + err = migrateutil.Migrate("filestore", globalDB.DB, dbfs.FilestoreMigrationFS, "migrations-filestore") if err != nil { return err } if !stopFlush.Load() { - go GBS.runFlusher() + go WFS.runFlusher() } - log.Printf("blockstore initialized\n") + log.Printf("filestore initialized\n") return nil } func GetDBName() string { waveHome := wavebase.GetWaveHomeDir() - return path.Join(waveHome, BlockstoreDBName) + return path.Join(waveHome, FilestoreDBName) } func MakeDB(ctx context.Context) (*sqlx.DB, error) { diff --git a/pkg/blockstore/blockstore_test.go b/pkg/filestore/blockstore_test.go similarity index 59% rename from pkg/blockstore/blockstore_test.go rename to pkg/filestore/blockstore_test.go index f2fc475ed..1fda2e009 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/filestore/blockstore_test.go @@ -1,7 +1,7 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 -package blockstore +package filestore import ( "bytes" @@ -24,9 +24,9 @@ func initDb(t *testing.T) { partDataSize = 50 warningCount = &atomic.Int32{} stopFlush.Store(true) - err := InitBlockstore() + err := InitFilestore() if err != nil { - t.Fatalf("error initializing blockstore: %v", err) + t.Fatalf("error initializing filestore: %v", err) } } @@ -38,7 +38,7 @@ func cleanupDb(t *testing.T) { } useTestingDb = false partDataSize = DefaultPartDataSize - GBS.clearCache() + WFS.clearCache() if warningCount.Load() > 0 { t.Errorf("warning count: %d", warningCount.Load()) } @@ -47,24 +47,24 @@ func cleanupDb(t *testing.T) { } } -func (s *BlockStore) getCacheSize() int { +func (s *FileStore) getCacheSize() int { s.Lock.Lock() defer s.Lock.Unlock() return len(s.Cache) } -func (s *BlockStore) clearCache() { +func (s *FileStore) clearCache() { s.Lock.Lock() defer s.Lock.Unlock() s.Cache = make(map[cacheKey]*CacheEntry) } //lint:ignore U1000 used for testing -func (s *BlockStore) dump() string { +func (s *FileStore) dump() string { s.Lock.Lock() defer s.Lock.Unlock() var buf bytes.Buffer - buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache))) + buf.WriteString(fmt.Sprintf("FileStore %d entries\n", len(s.Cache))) for _, v := range s.Cache { entryStr := v.dump() buf.WriteString(entryStr) @@ -79,20 +79,20 @@ func TestCreate(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() - err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + zoneId := uuid.New().String() + err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - file, err := GBS.Stat(ctx, blockId, "testfile") + file, err := WFS.Stat(ctx, zoneId, "testfile") if err != nil { t.Fatalf("error stating file: %v", err) } if file == nil { t.Fatalf("file not found") } - if file.BlockId != blockId { - t.Fatalf("block id mismatch") + if file.ZoneId != zoneId { + t.Fatalf("zone id mismatch") } if file.Name != "testfile" { t.Fatalf("name mismatch") @@ -115,30 +115,30 @@ func TestCreate(t *testing.T) { if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { t.Fatalf("opts not empty") } - blockIds, err := GBS.GetAllBlockIds(ctx) + zoneIds, err := WFS.GetAllZoneIds(ctx) if err != nil { - t.Fatalf("error getting block ids: %v", err) + t.Fatalf("error getting zone ids: %v", err) } - if len(blockIds) != 1 { - t.Fatalf("block id count mismatch") + if len(zoneIds) != 1 { + t.Fatalf("zone id count mismatch") } - if blockIds[0] != blockId { - t.Fatalf("block id mismatch") + if zoneIds[0] != zoneId { + t.Fatalf("zone id mismatch") } - err = GBS.DeleteFile(ctx, blockId, "testfile") + err = WFS.DeleteFile(ctx, zoneId, "testfile") if err != nil { t.Fatalf("error deleting file: %v", err) } - blockIds, err = GBS.GetAllBlockIds(ctx) + zoneIds, err = WFS.GetAllZoneIds(ctx) if err != nil { - t.Fatalf("error getting block ids: %v", err) + t.Fatalf("error getting zone ids: %v", err) } - if len(blockIds) != 0 { - t.Fatalf("block id count mismatch") + if len(zoneIds) != 0 { + t.Fatalf("zone id count mismatch") } } -func containsFile(arr []*BlockFile, name string) bool { +func containsFile(arr []*WaveFile, name string) bool { for _, f := range arr { if f.Name == name { return true @@ -153,30 +153,30 @@ func TestDelete(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() - err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + zoneId := uuid.New().String() + err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.DeleteFile(ctx, blockId, "testfile") + err = WFS.DeleteFile(ctx, zoneId, "testfile") if err != nil { t.Fatalf("error deleting file: %v", err) } - _, err = GBS.Stat(ctx, blockId, "testfile") + _, err = WFS.Stat(ctx, zoneId, "testfile") if err == nil || errors.Is(err, fs.ErrNotExist) { t.Errorf("expected file not found error") } - // create two files in same block, use DeleteBlock to delete - err = GBS.MakeFile(ctx, blockId, "testfile1", nil, FileOptsType{}) + // create two files in same zone, use DeleteZone to delete + err = WFS.MakeFile(ctx, zoneId, "testfile1", nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.MakeFile(ctx, blockId, "testfile2", nil, FileOptsType{}) + err = WFS.MakeFile(ctx, zoneId, "testfile2", nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - files, err := GBS.ListFiles(ctx, blockId) + files, err := WFS.ListFiles(ctx, zoneId) if err != nil { t.Fatalf("error listing files: %v", err) } @@ -186,11 +186,11 @@ func TestDelete(t *testing.T) { if !containsFile(files, "testfile1") || !containsFile(files, "testfile2") { t.Fatalf("file names mismatch") } - err = GBS.DeleteBlock(ctx, blockId) + err = WFS.DeleteZone(ctx, zoneId) if err != nil { - t.Fatalf("error deleting block: %v", err) + t.Fatalf("error deleting zone: %v", err) } - files, err = GBS.ListFiles(ctx, blockId) + files, err = WFS.ListFiles(ctx, zoneId) if err != nil { t.Fatalf("error listing files: %v", err) } @@ -216,19 +216,19 @@ func TestSetMeta(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() - err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + zoneId := uuid.New().String() + err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - if GBS.getCacheSize() != 0 { + if WFS.getCacheSize() != 0 { t.Errorf("cache size mismatch -- should have 0 entries after create") } - err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false) + err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false) if err != nil { t.Fatalf("error setting meta: %v", err) } - file, err := GBS.Stat(ctx, blockId, "testfile") + file, err := WFS.Stat(ctx, zoneId, "testfile") if err != nil { t.Fatalf("error stating file: %v", err) } @@ -236,14 +236,14 @@ func TestSetMeta(t *testing.T) { t.Fatalf("file not found") } checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta") - if GBS.getCacheSize() != 1 { + if WFS.getCacheSize() != 1 { t.Errorf("cache size mismatch") } - err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true) + err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true) if err != nil { t.Fatalf("error setting meta: %v", err) } - file, err = GBS.Stat(ctx, blockId, "testfile") + file, err = WFS.Stat(ctx, zoneId, "testfile") if err != nil { t.Fatalf("error stating file: %v", err) } @@ -252,15 +252,15 @@ func TestSetMeta(t *testing.T) { } checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta") - err = GBS.WriteMeta(ctx, blockId, "testfile-notexist", map[string]any{"a": 6}, true) + err = WFS.WriteMeta(ctx, zoneId, "testfile-notexist", map[string]any{"a": 6}, true) if err == nil { t.Fatalf("expected error setting meta") } err = nil } -func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) { - file, err := GBS.Stat(ctx, blockId, name) +func checkFileSize(t *testing.T, ctx context.Context, zoneId string, name string, size int64) { + file, err := WFS.Stat(ctx, zoneId, name) if err != nil { t.Errorf("error stating file %q: %v", name, err) return @@ -274,8 +274,8 @@ func checkFileSize(t *testing.T, ctx context.Context, blockId string, name strin } } -func checkFileData(t *testing.T, ctx context.Context, blockId string, name string, data string) { - _, rdata, err := GBS.ReadFile(ctx, blockId, name) +func checkFileData(t *testing.T, ctx context.Context, zoneId string, name string, data string) { + _, rdata, err := WFS.ReadFile(ctx, zoneId, name) if err != nil { t.Errorf("error reading data for file %q: %v", name, err) return @@ -285,8 +285,8 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin } } -func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name string, val byte, expected int) { - _, rdata, err := GBS.ReadFile(ctx, blockId, name) +func checkFileByteCount(t *testing.T, ctx context.Context, zoneId string, name string, val byte, expected int) { + _, rdata, err := WFS.ReadFile(ctx, zoneId, name) if err != nil { t.Errorf("error reading data for file %q: %v", name, err) return @@ -302,8 +302,8 @@ func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name } } -func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) { - _, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data))) +func checkFileDataAt(t *testing.T, ctx context.Context, zoneId string, name string, offset int64, data string) { + _, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, int64(len(data))) if err != nil { t.Errorf("error reading data for file %q: %v", name, err) return @@ -319,26 +319,26 @@ func TestAppend(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() + zoneId := uuid.New().String() fileName := "t2" - err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.AppendData(ctx, blockId, fileName, []byte("hello")) + err = WFS.AppendData(ctx, zoneId, fileName, []byte("hello")) if err != nil { t.Fatalf("error appending data: %v", err) } // fmt.Print(GBS.dump()) - checkFileSize(t, ctx, blockId, fileName, 5) - checkFileData(t, ctx, blockId, fileName, "hello") - err = GBS.AppendData(ctx, blockId, fileName, []byte(" world")) + checkFileSize(t, ctx, zoneId, fileName, 5) + checkFileData(t, ctx, zoneId, fileName, "hello") + err = WFS.AppendData(ctx, zoneId, fileName, []byte(" world")) if err != nil { t.Fatalf("error appending data: %v", err) } // fmt.Print(GBS.dump()) - checkFileSize(t, ctx, blockId, fileName, 11) - checkFileData(t, ctx, blockId, fileName, "hello world") + checkFileSize(t, ctx, zoneId, fileName, 11) + checkFileData(t, ctx, zoneId, fileName, "hello world") } func TestWriteFile(t *testing.T) { @@ -347,43 +347,43 @@ func TestWriteFile(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() + zoneId := uuid.New().String() fileName := "t3" - err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) + err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, fileName, "hello world!") - err = GBS.WriteFile(ctx, blockId, fileName, []byte("goodbye world!")) + checkFileData(t, ctx, zoneId, fileName, "hello world!") + err = WFS.WriteFile(ctx, zoneId, fileName, []byte("goodbye world!")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, fileName, "goodbye world!") - err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello")) + checkFileData(t, ctx, zoneId, fileName, "goodbye world!") + err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, fileName, "hello") + checkFileData(t, ctx, zoneId, fileName, "hello") // circular file - err = GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) + err = WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple")) + err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") - err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) + checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana")) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") + checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana") } func TestCircularWrites(t *testing.T) { @@ -391,66 +391,66 @@ func TestCircularWrites(t *testing.T) { defer cleanupDb(t) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() - err := GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) + zoneId := uuid.New().String() + err := WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 ")) + err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 ")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ") - err = GBS.AppendData(ctx, blockId, "c1", []byte("apple")) + checkFileData(t, ctx, zoneId, "c1", "123456789 123456789 123456789 123456789 123456789 ") + err = WFS.AppendData(ctx, zoneId, "c1", []byte("apple")) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") - err = GBS.WriteAt(ctx, blockId, "c1", 0, []byte("foo")) + checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = WFS.WriteAt(ctx, zoneId, "c1", 0, []byte("foo")) if err != nil { t.Fatalf("error writing data: %v", err) } // content should be unchanged because write is before the beginning of circular offset - checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") - err = GBS.WriteAt(ctx, blockId, "c1", 5, []byte("a")) + checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple") + err = WFS.WriteAt(ctx, zoneId, "c1", 5, []byte("a")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileSize(t, ctx, blockId, "c1", 55) - checkFileData(t, ctx, blockId, "c1", "a789 123456789 123456789 123456789 123456789 apple") - err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) + checkFileSize(t, ctx, zoneId, "c1", 55) + checkFileData(t, ctx, zoneId, "c1", "a789 123456789 123456789 123456789 123456789 apple") + err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana")) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileSize(t, ctx, blockId, "c1", 62) - checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") - err = GBS.WriteAt(ctx, blockId, "c1", 20, []byte("foo")) + checkFileSize(t, ctx, zoneId, "c1", 62) + checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana") + err = WFS.WriteAt(ctx, zoneId, "c1", 20, []byte("foo")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileSize(t, ctx, blockId, "c1", 62) - checkFileData(t, ctx, blockId, "c1", "3456789 foo456789 123456789 123456789 apple banana") - offset, _, _ := GBS.ReadFile(ctx, blockId, "c1") + checkFileSize(t, ctx, zoneId, "c1", 62) + checkFileData(t, ctx, zoneId, "c1", "3456789 foo456789 123456789 123456789 apple banana") + offset, _, _ := WFS.ReadFile(ctx, zoneId, "c1") if offset != 12 { t.Errorf("offset mismatch: expected 12, got %d", offset) } - err = GBS.AppendData(ctx, blockId, "c1", []byte(" world")) + err = WFS.AppendData(ctx, zoneId, "c1", []byte(" world")) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileSize(t, ctx, blockId, "c1", 68) - offset, _, _ = GBS.ReadFile(ctx, blockId, "c1") + checkFileSize(t, ctx, zoneId, "c1", 68) + offset, _, _ = WFS.ReadFile(ctx, zoneId, "c1") if offset != 18 { t.Errorf("offset mismatch: expected 18, got %d", offset) } - checkFileData(t, ctx, blockId, "c1", "9 foo456789 123456789 123456789 apple banana world") - err = GBS.AppendData(ctx, blockId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789")) + checkFileData(t, ctx, zoneId, "c1", "9 foo456789 123456789 123456789 apple banana world") + err = WFS.AppendData(ctx, zoneId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789")) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileSize(t, ctx, blockId, "c1", 128) - checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789") - err = withLock(GBS, blockId, "c1", func(entry *CacheEntry) error { + checkFileSize(t, ctx, zoneId, "c1", 128) + checkFileData(t, ctx, zoneId, "c1", " 123456789 123456789 123456789 bar456789 123456789") + err = withLock(WFS, zoneId, "c1", func(entry *CacheEntry) error { if entry == nil { return fmt.Errorf("entry not found") } @@ -478,30 +478,30 @@ func TestMultiPart(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() + zoneId := uuid.New().String() fileName := "m2" data := makeText(80) - err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.AppendData(ctx, blockId, fileName, []byte(data)) + err = WFS.AppendData(ctx, zoneId, fileName, []byte(data)) if err != nil { t.Fatalf("error appending data: %v", err) } - checkFileSize(t, ctx, blockId, fileName, 80) - checkFileData(t, ctx, blockId, fileName, data) - _, barr, err := GBS.ReadAt(ctx, blockId, fileName, 42, 10) + checkFileSize(t, ctx, zoneId, fileName, 80) + checkFileData(t, ctx, zoneId, fileName, data) + _, barr, err := WFS.ReadAt(ctx, zoneId, fileName, 42, 10) if err != nil { t.Fatalf("error reading data: %v", err) } if string(barr) != data[42:52] { t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr)) } - GBS.WriteAt(ctx, blockId, fileName, 49, []byte("world")) - checkFileSize(t, ctx, blockId, fileName, 80) - checkFileDataAt(t, ctx, blockId, fileName, 49, "world") - checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4") + WFS.WriteAt(ctx, zoneId, fileName, 49, []byte("world")) + checkFileSize(t, ctx, zoneId, fileName, 80) + checkFileDataAt(t, ctx, zoneId, fileName, 49, "world") + checkFileDataAt(t, ctx, zoneId, fileName, 48, "8world4") } func testIntMapsEq(t *testing.T, msg string, m map[int]int, expected map[int]int) { @@ -521,7 +521,7 @@ func TestComputePartMap(t *testing.T) { defer func() { partDataSize = DefaultPartDataSize }() - file := &BlockFile{} + file := &WaveFile{} m := file.computePartMap(0, 250) testIntMapsEq(t, "map1", m, map[int]int{0: 100, 1: 100, 2: 50}) m = file.computePartMap(110, 40) @@ -535,7 +535,7 @@ func TestComputePartMap(t *testing.T) { testIntMapsEq(t, "map5", m, map[int]int{8: 80, 9: 100, 10: 100, 11: 60}) // now test circular - file = &BlockFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}} + file = &WaveFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}} m = file.computePartMap(10, 250) testIntMapsEq(t, "map6", m, map[int]int{0: 90, 1: 100, 2: 60}) m = file.computePartMap(990, 40) @@ -554,31 +554,31 @@ func TestSimpleDBFlush(t *testing.T) { ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() + zoneId := uuid.New().String() fileName := "t1" - err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } - err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) + err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!")) if err != nil { t.Fatalf("error writing data: %v", err) } - checkFileData(t, ctx, blockId, fileName, "hello world!") - _, err = GBS.FlushCache(ctx) + checkFileData(t, ctx, zoneId, fileName, "hello world!") + _, err = WFS.FlushCache(ctx) if err != nil { t.Fatalf("error flushing cache: %v", err) } - if GBS.getCacheSize() != 0 { + if WFS.getCacheSize() != 0 { t.Errorf("cache size mismatch") } - checkFileData(t, ctx, blockId, fileName, "hello world!") - if GBS.getCacheSize() != 0 { + checkFileData(t, ctx, zoneId, fileName, "hello world!") + if WFS.getCacheSize() != 0 { t.Errorf("cache size mismatch (after read)") } - checkFileDataAt(t, ctx, blockId, fileName, 6, "world!") - checkFileSize(t, ctx, blockId, fileName, 12) - checkFileByteCount(t, ctx, blockId, fileName, 'l', 3) + checkFileDataAt(t, ctx, zoneId, fileName, 6, "world!") + checkFileSize(t, ctx, zoneId, fileName, 12) + checkFileByteCount(t, ctx, zoneId, fileName, 'l', 3) } func TestConcurrentAppend(t *testing.T) { @@ -586,9 +586,9 @@ func TestConcurrentAppend(t *testing.T) { defer cleanupDb(t) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) defer cancelFn() - blockId := uuid.New().String() + zoneId := uuid.New().String() fileName := "t1" - err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{}) if err != nil { t.Fatalf("error creating file: %v", err) } @@ -600,23 +600,23 @@ func TestConcurrentAppend(t *testing.T) { const hexChars = "0123456789abcdef" ch := hexChars[n] for j := 0; j < 100; j++ { - err := GBS.AppendData(ctx, blockId, fileName, []byte{ch}) + err := WFS.AppendData(ctx, zoneId, fileName, []byte{ch}) if err != nil { t.Errorf("error appending data (%d): %v", n, err) } if j == 50 { // ignore error here (concurrent flushing) - GBS.FlushCache(ctx) + WFS.FlushCache(ctx) } } }(i) } wg.Wait() - checkFileSize(t, ctx, blockId, fileName, 1600) - checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) - checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) - GBS.FlushCache(ctx) - checkFileSize(t, ctx, blockId, fileName, 1600) - checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) - checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) + checkFileSize(t, ctx, zoneId, fileName, 1600) + checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100) + checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100) + WFS.FlushCache(ctx) + checkFileSize(t, ctx, zoneId, fileName, 1600) + checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100) + checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100) } diff --git a/pkg/service/fileservice/fileservice.go b/pkg/service/fileservice/fileservice.go index 0178d6f1b..01fbca6bd 100644 --- a/pkg/service/fileservice/fileservice.go +++ b/pkg/service/fileservice/fileservice.go @@ -4,17 +4,21 @@ package fileservice import ( + "context" "encoding/base64" "encoding/json" "fmt" "os" "path/filepath" + "time" + "github.com/wavetermdev/thenextwave/pkg/filestore" "github.com/wavetermdev/thenextwave/pkg/util/utilfn" "github.com/wavetermdev/thenextwave/pkg/wavebase" ) const MaxFileSize = 10 * 1024 * 1024 // 10M +const DefaultTimeout = 2 * time.Second type FileService struct{} @@ -104,3 +108,13 @@ func (fs *FileService) ReadFile(path string) (*FullFile, error) { Data64: base64.StdEncoding.EncodeToString(barr), }, nil } + +func (fs *FileService) GetWaveFile(id string, path string) (any, error) { + ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancelFn() + file, err := filestore.WFS.Stat(ctx, id, path) + if err != nil { + return nil, fmt.Errorf("error getting file: %w", err) + } + return file, nil +}