rename blockstore to filestore. rename blockid to zoneid.

This commit is contained in:
sawka 2024-06-03 13:03:21 -07:00
parent f7c2897904
commit c191fc8945
14 changed files with 385 additions and 367 deletions

View File

@ -5,8 +5,8 @@ package db
import "embed" import "embed"
//go:embed migrations-blockstore/*.sql //go:embed migrations-filestore/*.sql
var BlockstoreMigrationFS embed.FS var FilestoreMigrationFS embed.FS
//go:embed migrations-wstore/*.sql //go:embed migrations-wstore/*.sql
var WStoreMigrationFS embed.FS var WStoreMigrationFS embed.FS

View File

@ -1,3 +0,0 @@
DROP TABLE block_file;
DROP TABLE block_data;

View File

@ -0,0 +1,3 @@
DROP TABLE db_wave_file;
DROP TABLE db_file_data;

View File

@ -1,19 +1,19 @@
CREATE TABLE db_block_file ( CREATE TABLE db_wave_file (
blockid varchar(36) NOT NULL, zoneid varchar(36) NOT NULL,
name varchar(200) NOT NULL, name varchar(200) NOT NULL,
size bigint NOT NULL, size bigint NOT NULL,
createdts bigint NOT NULL, createdts bigint NOT NULL,
modts bigint NOT NULL, modts bigint NOT NULL,
opts json NOT NULL, opts json NOT NULL,
meta json NOT NULL, meta json NOT NULL,
PRIMARY KEY (blockid, name) PRIMARY KEY (zoneid, name)
); );
CREATE TABLE db_block_data ( CREATE TABLE db_file_data (
blockid varchar(36) NOT NULL, zoneid varchar(36) NOT NULL,
name varchar(200) NOT NULL, name varchar(200) NOT NULL,
partidx int NOT NULL, partidx int NOT NULL,
data blob NOT NULL, data blob NOT NULL,
PRIMARY KEY(blockid, name, partidx) PRIMARY KEY(zoneid, name, partidx)
); );

14
main.go
View File

@ -22,8 +22,8 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service/blockservice" "github.com/wavetermdev/thenextwave/pkg/service/blockservice"
"github.com/wavetermdev/thenextwave/pkg/service/clientservice" "github.com/wavetermdev/thenextwave/pkg/service/clientservice"
"github.com/wavetermdev/thenextwave/pkg/service/fileservice" "github.com/wavetermdev/thenextwave/pkg/service/fileservice"
@ -114,7 +114,7 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) {
return return
} }
file, err := blockstore.GBS.Stat(r.Context(), blockId, name) file, err := filestore.WFS.Stat(r.Context(), blockId, name)
if err == fs.ErrNotExist { if err == fs.ErrNotExist {
http.NotFound(w, r) http.NotFound(w, r)
return return
@ -131,8 +131,8 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size)) w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size))
w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr)) w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr))
w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat)) w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat))
for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize { for offset := file.DataStartIdx(); offset < file.Size; offset += filestore.DefaultPartDataSize {
_, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize) _, data, err := filestore.WFS.ReadAt(r.Context(), blockId, name, offset, filestore.DefaultPartDataSize)
if err != nil { if err != nil {
if offset == 0 { if offset == 0 {
http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError)
@ -174,7 +174,7 @@ func doShutdown(reason string) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
// TODO deal with flush in progress // TODO deal with flush in progress
blockstore.GBS.FlushCache(ctx) filestore.WFS.FlushCache(ctx)
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
os.Exit(0) os.Exit(0)
} }
@ -203,9 +203,9 @@ func main() {
} }
log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir()) log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir())
err = blockstore.InitBlockstore() err = filestore.InitFilestore()
if err != nil { if err != nil {
log.Printf("error initializing blockstore: %v\n", err) log.Printf("error initializing filestore: %v\n", err)
return return
} }
err = wstore.InitWStore() err = wstore.InitWStore()

View File

@ -16,8 +16,8 @@ import (
"github.com/creack/pty" "github.com/creack/pty"
"github.com/wailsapp/wails/v3/pkg/application" "github.com/wailsapp/wails/v3/pkg/application"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus" "github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/shellexec" "github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/wstore" "github.com/wavetermdev/thenextwave/pkg/wstore"
) )
@ -93,7 +93,7 @@ const DefaultTermMaxFileSize = 256 * 1024
func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error { func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout) ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn() defer cancelFn()
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", data) err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", data)
if err != nil { if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err) return fmt.Errorf("error appending to blockfile: %w", err)
} }
@ -118,7 +118,7 @@ func (bc *BlockController) resetTerminalState() {
buf.WriteString("\x1b[?25h") // show cursor buf.WriteString("\x1b[?25h") // show cursor
buf.WriteString("\x1b[?1000l") // disable mouse tracking buf.WriteString("\x1b[?1000l") // disable mouse tracking
buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n") buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n")
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", buf.Bytes()) err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", buf.Bytes())
if err != nil { if err != nil {
log.Printf("error appending to blockfile (terminal reset): %v\n", err) log.Printf("error appending to blockfile (terminal reset): %v\n", err)
} }
@ -128,11 +128,11 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error {
// create a circular blockfile for the output // create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn() defer cancelFn()
err := blockstore.GBS.MakeFile(ctx, bc.BlockId, "main", nil, blockstore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true}) err := filestore.WFS.MakeFile(ctx, bc.BlockId, "main", nil, filestore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true})
if err != nil && err != blockstore.ErrAlreadyExists { if err != nil && err != filestore.ErrAlreadyExists {
return fmt.Errorf("error creating blockfile: %w", err) return fmt.Errorf("error creating blockfile: %w", err)
} }
if err == blockstore.ErrAlreadyExists { if err == filestore.ErrAlreadyExists {
// reset the terminal state // reset the terminal state
bc.resetTerminalState() bc.resetTerminalState()
} }

View File

@ -0,0 +1,4 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockcontroller

View File

@ -1,117 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
"context"
"fmt"
"os"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrAlreadyExists = fmt.Errorf("file already exists")
func dbInsertFile(ctx context.Context, file *BlockFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?"
if tx.Exists(query, file.BlockId, file.Name) {
return ErrAlreadyExists
}
query = "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})
}
func dbDeleteFile(ctx context.Context, blockId string, name string) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
return nil
})
}
func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var files []string
query := "SELECT name FROM db_block_file WHERE blockid = ?"
tx.Select(&files, query, blockId)
return files, nil
})
}
func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?"
file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name)
return file, nil
})
}
func dbGetAllBlockIds(ctx context.Context) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var ids []string
query := "SELECT DISTINCT blockid FROM db_block_file"
tx.Select(&ids, query)
return ids, nil
})
}
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
return nil, nil
}
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
var data []*DataCacheEntry
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data)
d.Data = newData
}
rtn[d.PartIdx] = d
}
return rtn, nil
})
}
func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ?"
files := dbutil.SelectMappable[*BlockFile](tx, query, blockId)
return files, nil
})
}
func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
if !tx.Exists(query, file.BlockId, file.Name) {
// since deletion is synchronous this stops us from writing to a deleted file
return os.ErrNotExist
}
// we don't update CreatedTs or Opts
query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?`
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name)
if replace {
query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?`
tx.Exec(query, file.BlockId, file.Name)
}
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
for partIdx, dataEntry := range dataEntries {
if partIdx != dataEntry.PartIdx {
panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx))
}
tx.Exec(dataPartQuery, file.BlockId, file.Name, dataEntry.PartIdx, dataEntry.Data)
}
return nil
})
}

View File

@ -1,9 +1,9 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
package blockstore package filestore
// the blockstore package implements a write cache for block files // the blockstore package implements a write cache for wave files
// it is not a read cache (reads still go to the DB -- unless items are in the cache) // it is not a read cache (reads still go to the DB -- unless items are in the cache)
// but all writes only go to the cache, and then the cache is periodically flushed to the DB // but all writes only go to the cache, and then the cache is periodically flushed to the DB
@ -29,7 +29,7 @@ var flushErrorCount = &atomic.Int32{}
var partDataSize int64 = DefaultPartDataSize // overridden in tests var partDataSize int64 = DefaultPartDataSize // overridden in tests
var stopFlush = &atomic.Bool{} var stopFlush = &atomic.Bool{}
var GBS *BlockStore = &BlockStore{ var WFS *FileStore = &FileStore{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
Cache: make(map[cacheKey]*CacheEntry), Cache: make(map[cacheKey]*CacheEntry),
} }
@ -42,9 +42,9 @@ type FileOptsType struct {
type FileMeta = map[string]any type FileMeta = map[string]any
type BlockFile struct { type WaveFile struct {
// these fields are static (not updated) // these fields are static (not updated)
BlockId string `json:"blockid"` ZoneId string `json:"zoneid"`
Name string `json:"name"` Name string `json:"name"`
Opts FileOptsType `json:"opts"` Opts FileOptsType `json:"opts"`
CreatedTs int64 `json:"createdts"` CreatedTs int64 `json:"createdts"`
@ -57,7 +57,7 @@ type BlockFile struct {
// for regular files this is just Size // for regular files this is just Size
// for circular files this is min(Size, MaxSize) // for circular files this is min(Size, MaxSize)
func (f BlockFile) DataLength() int64 { func (f WaveFile) DataLength() int64 {
if f.Opts.Circular { if f.Opts.Circular {
return minInt64(f.Size, f.Opts.MaxSize) return minInt64(f.Size, f.Opts.MaxSize)
} }
@ -66,7 +66,7 @@ func (f BlockFile) DataLength() int64 {
// for regular files this is just 0 // for regular files this is just 0
// for circular files this is the index of the first byte of data we have // for circular files this is the index of the first byte of data we have
func (f BlockFile) DataStartIdx() int64 { func (f WaveFile) DataStartIdx() int64 {
if f.Opts.Circular && f.Size > f.Opts.MaxSize { if f.Opts.Circular && f.Size > f.Opts.MaxSize {
return f.Size - f.Opts.MaxSize return f.Size - f.Opts.MaxSize
} }
@ -82,7 +82,7 @@ func copyMeta(meta FileMeta) FileMeta {
return newMeta return newMeta
} }
func (f *BlockFile) DeepCopy() *BlockFile { func (f *WaveFile) DeepCopy() *WaveFile {
if f == nil { if f == nil {
return nil return nil
} }
@ -91,19 +91,19 @@ func (f *BlockFile) DeepCopy() *BlockFile {
return &newFile return &newFile
} }
func (BlockFile) UseDBMap() {} func (WaveFile) UseDBMap() {}
type BlockData struct { type FileData struct {
BlockId string `json:"blockid"` ZoneId string `json:"zoneid"`
Name string `json:"name"` Name string `json:"name"`
PartIdx int `json:"partidx"` PartIdx int `json:"partidx"`
Data []byte `json:"data"` Data []byte `json:"data"`
} }
func (BlockData) UseDBMap() {} func (FileData) UseDBMap() {}
// synchronous (does not interact with the cache) // synchronous (does not interact with the cache)
func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error { func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, meta FileMeta, opts FileOptsType) error {
if opts.MaxSize < 0 { if opts.MaxSize < 0 {
return fmt.Errorf("max size must be non-negative") return fmt.Errorf("max size must be non-negative")
} }
@ -118,13 +118,13 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
} }
} }
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
if entry.File != nil { if entry.File != nil {
return fs.ErrExist return fs.ErrExist
} }
now := time.Now().UnixMilli() now := time.Now().UnixMilli()
file := &BlockFile{ file := &WaveFile{
BlockId: blockId, ZoneId: zoneId,
Name: name, Name: name,
Size: 0, Size: 0,
CreatedTs: now, CreatedTs: now,
@ -136,9 +136,9 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
}) })
} }
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error { func (s *FileStore) DeleteFile(ctx context.Context, zoneId string, name string) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := dbDeleteFile(ctx, blockId, name) err := dbDeleteFile(ctx, zoneId, name)
if err != nil { if err != nil {
return fmt.Errorf("error deleting file: %v", err) return fmt.Errorf("error deleting file: %v", err)
} }
@ -147,20 +147,20 @@ func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string
}) })
} }
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { func (s *FileStore) DeleteZone(ctx context.Context, zoneId string) error {
fileNames, err := dbGetBlockFileNames(ctx, blockId) fileNames, err := dbGetZoneFileNames(ctx, zoneId)
if err != nil { if err != nil {
return fmt.Errorf("error getting block files: %v", err) return fmt.Errorf("error getting zone files: %v", err)
} }
for _, name := range fileNames { for _, name := range fileNames {
s.DeleteFile(ctx, blockId, name) s.DeleteFile(ctx, zoneId, name)
} }
return nil return nil
} }
// if file doesn't exsit, returns fs.ErrNotExist // if file doesn't exsit, returns fs.ErrNotExist
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { func (s *FileStore) Stat(ctx context.Context, zoneId string, name string) (*WaveFile, error) {
return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) { return withLockRtn(s, zoneId, name, func(entry *CacheEntry) (*WaveFile, error) {
file, err := entry.loadFileForRead(ctx) file, err := entry.loadFileForRead(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting file: %v", err) return nil, fmt.Errorf("error getting file: %v", err)
@ -169,13 +169,13 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl
}) })
} }
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { func (s *FileStore) ListFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) {
files, err := dbGetBlockFiles(ctx, blockId) files, err := dbGetZoneFiles(ctx, zoneId)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting block files: %v", err) return nil, fmt.Errorf("error getting zone files: %v", err)
} }
for idx, file := range files { for idx, file := range files {
withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error { withLock(s, file.ZoneId, file.Name, func(entry *CacheEntry) error {
if entry.File != nil { if entry.File != nil {
files[idx] = entry.File.DeepCopy() files[idx] = entry.File.DeepCopy()
} }
@ -185,8 +185,8 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil
return files, nil return files, nil
} }
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { func (s *FileStore) WriteMeta(ctx context.Context, zoneId string, name string, meta FileMeta, merge bool) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx) err := entry.loadFileIntoCache(ctx)
if err != nil { if err != nil {
return err return err
@ -207,8 +207,8 @@ func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string,
}) })
} }
func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error { func (s *FileStore) WriteFile(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx) err := entry.loadFileIntoCache(ctx)
if err != nil { if err != nil {
return err return err
@ -219,11 +219,11 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string,
}) })
} }
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, offset int64, data []byte) error {
if offset < 0 { if offset < 0 {
return fmt.Errorf("offset must be non-negative") return fmt.Errorf("offset must be non-negative")
} }
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx) err := entry.loadFileIntoCache(ctx)
if err != nil { if err != nil {
return err return err
@ -243,8 +243,8 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o
}) })
} }
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error { return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx) err := entry.loadFileIntoCache(ctx)
if err != nil { if err != nil {
return err return err
@ -262,14 +262,14 @@ func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string
}) })
} }
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) {
return dbGetAllBlockIds(ctx) return dbGetAllZoneIds(ctx)
} }
// returns (offset, data, error) // returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files) // we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) { func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, blockId, name, func(entry *CacheEntry) error { withLock(s, zoneId, name, func(entry *CacheEntry) error {
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false) rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
return nil return nil
}) })
@ -277,8 +277,8 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
} }
// returns (offset, data, error) // returns (offset, data, error)
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) { func (s *FileStore) ReadFile(ctx context.Context, zoneId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, blockId, name, func(entry *CacheEntry) error { withLock(s, zoneId, name, func(entry *CacheEntry) error {
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true) rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true)
return nil return nil
}) })
@ -291,7 +291,7 @@ type FlushStats struct {
NumCommitted int NumCommitted int
} }
func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) { func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
wasFlushing := s.setUnlessFlushing() wasFlushing := s.setUnlessFlushing()
if wasFlushing { if wasFlushing {
return stats, fmt.Errorf("flush already in progress") return stats, fmt.Errorf("flush already in progress")
@ -306,7 +306,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e
dirtyCacheKeys := s.getDirtyCacheKeys() dirtyCacheKeys := s.getDirtyCacheKeys()
stats.NumDirtyEntries = len(dirtyCacheKeys) stats.NumDirtyEntries = len(dirtyCacheKeys)
for _, key := range dirtyCacheKeys { for _, key := range dirtyCacheKeys {
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { err := withLock(s, key.ZoneId, key.Name, func(entry *CacheEntry) error {
return entry.flushToDB(ctx, false) return entry.flushToDB(ctx, false)
}) })
if ctx.Err() != nil { if ctx.Err() != nil {
@ -323,7 +323,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e
/////////////////////////////////// ///////////////////////////////////
func (f *BlockFile) partIdxAtOffset(offset int64) int { func (f *WaveFile) partIdxAtOffset(offset int64) int {
partIdx := int(offset / partDataSize) partIdx := int(offset / partDataSize)
if f.Opts.Circular { if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / partDataSize) maxPart := int(f.Opts.MaxSize / partDataSize)
@ -351,11 +351,11 @@ func getPartIdxsFromMap(partMap map[int]int) []int {
} }
// returns a map of partIdx to amount of data to write to that part // returns a map of partIdx to amount of data to write to that part
func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int { func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int {
partMap := make(map[int]int) partMap := make(map[int]int)
endOffset := startOffset + size endOffset := startOffset + size
startBlockOffset := startOffset - (startOffset % partDataSize) startFileOffset := startOffset - (startOffset % partDataSize)
for testOffset := startBlockOffset; testOffset < endOffset; testOffset += partDataSize { for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize {
partIdx := file.partIdxAtOffset(testOffset) partIdx := file.partIdxAtOffset(testOffset)
partStartOffset := testOffset partStartOffset := testOffset
partEndOffset := testOffset + partDataSize partEndOffset := testOffset + partDataSize
@ -372,7 +372,7 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int
return partMap return partMap
} }
func (s *BlockStore) getDirtyCacheKeys() []cacheKey { func (s *FileStore) getDirtyCacheKeys() []cacheKey {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
var dirtyCacheKeys []cacheKey var dirtyCacheKeys []cacheKey
@ -384,14 +384,14 @@ func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
return dirtyCacheKeys return dirtyCacheKeys
} }
func (s *BlockStore) setIsFlushing(flushing bool) { func (s *FileStore) setIsFlushing(flushing bool) {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
s.IsFlushing = flushing s.IsFlushing = flushing
} }
// returns old value of IsFlushing // returns old value of IsFlushing
func (s *BlockStore) setUnlessFlushing() bool { func (s *FileStore) setUnlessFlushing() bool {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
if s.IsFlushing { if s.IsFlushing {
@ -401,26 +401,26 @@ func (s *BlockStore) setUnlessFlushing() bool {
return false return false
} }
func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) { func (s *FileStore) runFlushWithNewContext() (FlushStats, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime) ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
defer cancelFn() defer cancelFn()
return s.FlushCache(ctx) return s.FlushCache(ctx)
} }
func (s *BlockStore) runFlusher() { func (s *FileStore) runFlusher() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Printf("panic in blockstore flusher: %v\n", r) log.Printf("panic in filestore flusher: %v\n", r)
debug.PrintStack() debug.PrintStack()
} }
}() }()
for { for {
stats, err := s.runFlushWithNewContext() stats, err := s.runFlushWithNewContext()
if err != nil || stats.NumDirtyEntries > 0 { if err != nil || stats.NumDirtyEntries > 0 {
log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err) log.Printf("filestore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
} }
if stopFlush.Load() { if stopFlush.Load() {
log.Printf("blockstore flusher stopping\n") log.Printf("filestore flusher stopping\n")
return return
} }
time.Sleep(DefaultFlushTime) time.Sleep(DefaultFlushTime)

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
package blockstore package filestore
import ( import (
"bytes" "bytes"
@ -13,11 +13,11 @@ import (
) )
type cacheKey struct { type cacheKey struct {
BlockId string ZoneId string
Name string Name string
} }
type BlockStore struct { type FileStore struct {
Lock *sync.Mutex Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry Cache map[cacheKey]*CacheEntry
IsFlushing bool IsFlushing bool
@ -25,17 +25,17 @@ type BlockStore struct {
type DataCacheEntry struct { type DataCacheEntry struct {
PartIdx int PartIdx int
Data []byte // capacity is always BlockDataPartSize Data []byte // capacity is always ZoneDataPartSize
} }
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk) // if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
type CacheEntry struct { type CacheEntry struct {
PinCount int // this is synchronzed with the BlockStore lock (not the entry lock) PinCount int // this is synchronzed with the FileStore lock (not the entry lock)
Lock *sync.Mutex Lock *sync.Mutex
BlockId string ZoneId string
Name string Name string
File *BlockFile File *WaveFile
DataEntries map[int]*DataCacheEntry DataEntries map[int]*DataCacheEntry
FlushErrors int FlushErrors int
} }
@ -43,7 +43,7 @@ type CacheEntry struct {
//lint:ignore U1000 used for testing //lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string { func (e *CacheEntry) dump() string {
var buf bytes.Buffer var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount) fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount)
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File) fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries { for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data)) fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
@ -59,28 +59,28 @@ func makeDataCacheEntry(partIdx int) *DataCacheEntry {
} }
// will create new entries // will create new entries
func (s *BlockStore) getEntryAndPin(blockId string, name string) *CacheEntry { func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil { if entry == nil {
entry = makeCacheEntry(blockId, name) entry = makeCacheEntry(zoneId, name)
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry
} }
entry.PinCount++ entry.PinCount++
return entry return entry
} }
func (s *BlockStore) unpinEntryAndTryDelete(blockId string, name string) { func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil { if entry == nil {
return return
} }
entry.PinCount-- entry.PinCount--
if entry.PinCount <= 0 && entry.File == nil { if entry.PinCount <= 0 && entry.File == nil {
delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name})
} }
} }
@ -111,11 +111,11 @@ func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
} }
// does not populate the cache entry, returns err if file does not exist // does not populate the cache entry, returns err if file does not exist
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error) { func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) {
if entry.File != nil { if entry.File != nil {
return entry.File, nil return entry.File, nil
} }
file, err := dbGetBlockFile(ctx, entry.BlockId, entry.Name) file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting file: %w", err) return nil, fmt.Errorf("error getting file: %w", err)
} }
@ -125,17 +125,17 @@ func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error
return file, nil return file, nil
} }
func withLock(s *BlockStore, blockId string, name string, fn func(*CacheEntry) error) error { func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error {
entry := s.getEntryAndPin(blockId, name) entry := s.getEntryAndPin(zoneId, name)
defer s.unpinEntryAndTryDelete(blockId, name) defer s.unpinEntryAndTryDelete(zoneId, name)
entry.Lock.Lock() entry.Lock.Lock()
defer entry.Lock.Unlock() defer entry.Lock.Unlock()
return fn(entry) return fn(entry)
} }
func withLockRtn[T any](s *BlockStore, blockId string, name string, fn func(*CacheEntry) (T, error)) (T, error) { func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
var rtnVal T var rtnVal T
rtnErr := withLock(s, blockId, name, func(entry *CacheEntry) error { rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error {
var err error var err error
rtnVal, err = fn(entry) rtnVal, err = fn(entry)
return err return err
@ -273,7 +273,7 @@ func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int
// parts are already loaded // parts are already loaded
return nil return nil
} }
dbDataParts, err := dbGetFileParts(ctx, entry.BlockId, entry.Name, parts) dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts)
if err != nil { if err != nil {
return fmt.Errorf("error getting data parts: %w", err) return fmt.Errorf("error getting data parts: %w", err)
} }
@ -291,7 +291,7 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int)
var dbDataParts map[int]*DataCacheEntry var dbDataParts map[int]*DataCacheEntry
if len(dbParts) > 0 { if len(dbParts) > 0 {
var err error var err error
dbDataParts, err = dbGetFileParts(ctx, entry.BlockId, entry.Name, dbParts) dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting data parts: %w", err) return nil, fmt.Errorf("error getting data parts: %w", err)
} }
@ -311,10 +311,10 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int)
return rtn, nil return rtn, nil
} }
func makeCacheEntry(blockId string, name string) *CacheEntry { func makeCacheEntry(zoneId string, name string) *CacheEntry {
return &CacheEntry{ return &CacheEntry{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
BlockId: blockId, ZoneId: zoneId,
Name: name, Name: name,
PinCount: 0, PinCount: 0,
File: nil, File: nil,

View File

@ -0,0 +1,117 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package filestore
import (
"context"
"fmt"
"os"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrAlreadyExists = fmt.Errorf("file already exists")
func dbInsertFile(ctx context.Context, file *WaveFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?"
if tx.Exists(query, file.ZoneId, file.Name) {
return ErrAlreadyExists
}
query = "INSERT INTO db_wave_file (zoneid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.ZoneId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})
}
func dbDeleteFile(ctx context.Context, zoneId string, name string) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := "DELETE FROM db_wave_file WHERE zoneid = ? AND name = ?"
tx.Exec(query, zoneId, name)
query = "DELETE FROM db_file_data WHERE zoneid = ? AND name = ?"
tx.Exec(query, zoneId, name)
return nil
})
}
func dbGetZoneFileNames(ctx context.Context, zoneId string) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var files []string
query := "SELECT name FROM db_wave_file WHERE zoneid = ?"
tx.Select(&files, query, zoneId)
return files, nil
})
}
func dbGetZoneFile(ctx context.Context, zoneId string, name string) (*WaveFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*WaveFile, error) {
query := "SELECT * FROM db_wave_file WHERE zoneid = ? AND name = ?"
file := dbutil.GetMappable[*WaveFile](tx, query, zoneId, name)
return file, nil
})
}
func dbGetAllZoneIds(ctx context.Context) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var ids []string
query := "SELECT DISTINCT zoneid FROM db_wave_file"
tx.Select(&ids, query)
return ids, nil
})
}
func dbGetFileParts(ctx context.Context, zoneId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
return nil, nil
}
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
var data []*DataCacheEntry
query := "SELECT partidx, data FROM db_file_data WHERE zoneid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
tx.Select(&data, query, zoneId, name, dbutil.QuickJsonArr(parts))
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data)
d.Data = newData
}
rtn[d.PartIdx] = d
}
return rtn, nil
})
}
func dbGetZoneFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*WaveFile, error) {
query := "SELECT * FROM db_wave_file WHERE zoneid = ?"
files := dbutil.SelectMappable[*WaveFile](tx, query, zoneId)
return files, nil
})
}
func dbWriteCacheEntry(ctx context.Context, file *WaveFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?`
if !tx.Exists(query, file.ZoneId, file.Name) {
// since deletion is synchronous this stops us from writing to a deleted file
return os.ErrNotExist
}
// we don't update CreatedTs or Opts
query = `UPDATE db_wave_file SET size = ?, modts = ?, meta = ? WHERE zoneid = ? AND name = ?`
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.ZoneId, file.Name)
if replace {
query = `DELETE FROM db_file_data WHERE zoneid = ? AND name = ?`
tx.Exec(query, file.ZoneId, file.Name)
}
dataPartQuery := `REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?, ?, ?, ?)`
for partIdx, dataEntry := range dataEntries {
if partIdx != dataEntry.PartIdx {
panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx))
}
tx.Exec(dataPartQuery, file.ZoneId, file.Name, dataEntry.PartIdx, dataEntry.Data)
}
return nil
})
}

View File

@ -1,9 +1,9 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
package blockstore package filestore
// setup for blockstore db // setup for filestore db
// includes migration support and txwrap setup // includes migration support and txwrap setup
import ( import (
@ -23,14 +23,14 @@ import (
dbfs "github.com/wavetermdev/thenextwave/db" dbfs "github.com/wavetermdev/thenextwave/db"
) )
const BlockstoreDBName = "blockstore.db" const FilestoreDBName = "filestore.db"
type TxWrap = txwrap.TxWrap type TxWrap = txwrap.TxWrap
var globalDB *sqlx.DB var globalDB *sqlx.DB
var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db)
func InitBlockstore() error { func InitFilestore() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn() defer cancelFn()
var err error var err error
@ -38,20 +38,20 @@ func InitBlockstore() error {
if err != nil { if err != nil {
return err return err
} }
err = migrateutil.Migrate("blockstore", globalDB.DB, dbfs.BlockstoreMigrationFS, "migrations-blockstore") err = migrateutil.Migrate("filestore", globalDB.DB, dbfs.FilestoreMigrationFS, "migrations-filestore")
if err != nil { if err != nil {
return err return err
} }
if !stopFlush.Load() { if !stopFlush.Load() {
go GBS.runFlusher() go WFS.runFlusher()
} }
log.Printf("blockstore initialized\n") log.Printf("filestore initialized\n")
return nil return nil
} }
func GetDBName() string { func GetDBName() string {
waveHome := wavebase.GetWaveHomeDir() waveHome := wavebase.GetWaveHomeDir()
return path.Join(waveHome, BlockstoreDBName) return path.Join(waveHome, FilestoreDBName)
} }
func MakeDB(ctx context.Context) (*sqlx.DB, error) { func MakeDB(ctx context.Context) (*sqlx.DB, error) {

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc. // Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
package blockstore package filestore
import ( import (
"bytes" "bytes"
@ -24,9 +24,9 @@ func initDb(t *testing.T) {
partDataSize = 50 partDataSize = 50
warningCount = &atomic.Int32{} warningCount = &atomic.Int32{}
stopFlush.Store(true) stopFlush.Store(true)
err := InitBlockstore() err := InitFilestore()
if err != nil { if err != nil {
t.Fatalf("error initializing blockstore: %v", err) t.Fatalf("error initializing filestore: %v", err)
} }
} }
@ -38,7 +38,7 @@ func cleanupDb(t *testing.T) {
} }
useTestingDb = false useTestingDb = false
partDataSize = DefaultPartDataSize partDataSize = DefaultPartDataSize
GBS.clearCache() WFS.clearCache()
if warningCount.Load() > 0 { if warningCount.Load() > 0 {
t.Errorf("warning count: %d", warningCount.Load()) t.Errorf("warning count: %d", warningCount.Load())
} }
@ -47,24 +47,24 @@ func cleanupDb(t *testing.T) {
} }
} }
func (s *BlockStore) getCacheSize() int { func (s *FileStore) getCacheSize() int {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
return len(s.Cache) return len(s.Cache)
} }
func (s *BlockStore) clearCache() { func (s *FileStore) clearCache() {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
s.Cache = make(map[cacheKey]*CacheEntry) s.Cache = make(map[cacheKey]*CacheEntry)
} }
//lint:ignore U1000 used for testing //lint:ignore U1000 used for testing
func (s *BlockStore) dump() string { func (s *FileStore) dump() string {
s.Lock.Lock() s.Lock.Lock()
defer s.Lock.Unlock() defer s.Lock.Unlock()
var buf bytes.Buffer var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache))) buf.WriteString(fmt.Sprintf("FileStore %d entries\n", len(s.Cache)))
for _, v := range s.Cache { for _, v := range s.Cache {
entryStr := v.dump() entryStr := v.dump()
buf.WriteString(entryStr) buf.WriteString(entryStr)
@ -79,20 +79,20 @@ func TestCreate(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
file, err := GBS.Stat(ctx, blockId, "testfile") file, err := WFS.Stat(ctx, zoneId, "testfile")
if err != nil { if err != nil {
t.Fatalf("error stating file: %v", err) t.Fatalf("error stating file: %v", err)
} }
if file == nil { if file == nil {
t.Fatalf("file not found") t.Fatalf("file not found")
} }
if file.BlockId != blockId { if file.ZoneId != zoneId {
t.Fatalf("block id mismatch") t.Fatalf("zone id mismatch")
} }
if file.Name != "testfile" { if file.Name != "testfile" {
t.Fatalf("name mismatch") t.Fatalf("name mismatch")
@ -115,30 +115,30 @@ func TestCreate(t *testing.T) {
if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 {
t.Fatalf("opts not empty") t.Fatalf("opts not empty")
} }
blockIds, err := GBS.GetAllBlockIds(ctx) zoneIds, err := WFS.GetAllZoneIds(ctx)
if err != nil { if err != nil {
t.Fatalf("error getting block ids: %v", err) t.Fatalf("error getting zone ids: %v", err)
} }
if len(blockIds) != 1 { if len(zoneIds) != 1 {
t.Fatalf("block id count mismatch") t.Fatalf("zone id count mismatch")
} }
if blockIds[0] != blockId { if zoneIds[0] != zoneId {
t.Fatalf("block id mismatch") t.Fatalf("zone id mismatch")
} }
err = GBS.DeleteFile(ctx, blockId, "testfile") err = WFS.DeleteFile(ctx, zoneId, "testfile")
if err != nil { if err != nil {
t.Fatalf("error deleting file: %v", err) t.Fatalf("error deleting file: %v", err)
} }
blockIds, err = GBS.GetAllBlockIds(ctx) zoneIds, err = WFS.GetAllZoneIds(ctx)
if err != nil { if err != nil {
t.Fatalf("error getting block ids: %v", err) t.Fatalf("error getting zone ids: %v", err)
} }
if len(blockIds) != 0 { if len(zoneIds) != 0 {
t.Fatalf("block id count mismatch") t.Fatalf("zone id count mismatch")
} }
} }
func containsFile(arr []*BlockFile, name string) bool { func containsFile(arr []*WaveFile, name string) bool {
for _, f := range arr { for _, f := range arr {
if f.Name == name { if f.Name == name {
return true return true
@ -153,30 +153,30 @@ func TestDelete(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.DeleteFile(ctx, blockId, "testfile") err = WFS.DeleteFile(ctx, zoneId, "testfile")
if err != nil { if err != nil {
t.Fatalf("error deleting file: %v", err) t.Fatalf("error deleting file: %v", err)
} }
_, err = GBS.Stat(ctx, blockId, "testfile") _, err = WFS.Stat(ctx, zoneId, "testfile")
if err == nil || errors.Is(err, fs.ErrNotExist) { if err == nil || errors.Is(err, fs.ErrNotExist) {
t.Errorf("expected file not found error") t.Errorf("expected file not found error")
} }
// create two files in same block, use DeleteBlock to delete // create two files in same zone, use DeleteZone to delete
err = GBS.MakeFile(ctx, blockId, "testfile1", nil, FileOptsType{}) err = WFS.MakeFile(ctx, zoneId, "testfile1", nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.MakeFile(ctx, blockId, "testfile2", nil, FileOptsType{}) err = WFS.MakeFile(ctx, zoneId, "testfile2", nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
files, err := GBS.ListFiles(ctx, blockId) files, err := WFS.ListFiles(ctx, zoneId)
if err != nil { if err != nil {
t.Fatalf("error listing files: %v", err) t.Fatalf("error listing files: %v", err)
} }
@ -186,11 +186,11 @@ func TestDelete(t *testing.T) {
if !containsFile(files, "testfile1") || !containsFile(files, "testfile2") { if !containsFile(files, "testfile1") || !containsFile(files, "testfile2") {
t.Fatalf("file names mismatch") t.Fatalf("file names mismatch")
} }
err = GBS.DeleteBlock(ctx, blockId) err = WFS.DeleteZone(ctx, zoneId)
if err != nil { if err != nil {
t.Fatalf("error deleting block: %v", err) t.Fatalf("error deleting zone: %v", err)
} }
files, err = GBS.ListFiles(ctx, blockId) files, err = WFS.ListFiles(ctx, zoneId)
if err != nil { if err != nil {
t.Fatalf("error listing files: %v", err) t.Fatalf("error listing files: %v", err)
} }
@ -216,19 +216,19 @@ func TestSetMeta(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
if GBS.getCacheSize() != 0 { if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch -- should have 0 entries after create") t.Errorf("cache size mismatch -- should have 0 entries after create")
} }
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false) err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false)
if err != nil { if err != nil {
t.Fatalf("error setting meta: %v", err) t.Fatalf("error setting meta: %v", err)
} }
file, err := GBS.Stat(ctx, blockId, "testfile") file, err := WFS.Stat(ctx, zoneId, "testfile")
if err != nil { if err != nil {
t.Fatalf("error stating file: %v", err) t.Fatalf("error stating file: %v", err)
} }
@ -236,14 +236,14 @@ func TestSetMeta(t *testing.T) {
t.Fatalf("file not found") t.Fatalf("file not found")
} }
checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta") checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta")
if GBS.getCacheSize() != 1 { if WFS.getCacheSize() != 1 {
t.Errorf("cache size mismatch") t.Errorf("cache size mismatch")
} }
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true) err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true)
if err != nil { if err != nil {
t.Fatalf("error setting meta: %v", err) t.Fatalf("error setting meta: %v", err)
} }
file, err = GBS.Stat(ctx, blockId, "testfile") file, err = WFS.Stat(ctx, zoneId, "testfile")
if err != nil { if err != nil {
t.Fatalf("error stating file: %v", err) t.Fatalf("error stating file: %v", err)
} }
@ -252,15 +252,15 @@ func TestSetMeta(t *testing.T) {
} }
checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta") checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta")
err = GBS.WriteMeta(ctx, blockId, "testfile-notexist", map[string]any{"a": 6}, true) err = WFS.WriteMeta(ctx, zoneId, "testfile-notexist", map[string]any{"a": 6}, true)
if err == nil { if err == nil {
t.Fatalf("expected error setting meta") t.Fatalf("expected error setting meta")
} }
err = nil err = nil
} }
func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) { func checkFileSize(t *testing.T, ctx context.Context, zoneId string, name string, size int64) {
file, err := GBS.Stat(ctx, blockId, name) file, err := WFS.Stat(ctx, zoneId, name)
if err != nil { if err != nil {
t.Errorf("error stating file %q: %v", name, err) t.Errorf("error stating file %q: %v", name, err)
return return
@ -274,8 +274,8 @@ func checkFileSize(t *testing.T, ctx context.Context, blockId string, name strin
} }
} }
func checkFileData(t *testing.T, ctx context.Context, blockId string, name string, data string) { func checkFileData(t *testing.T, ctx context.Context, zoneId string, name string, data string) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name) _, rdata, err := WFS.ReadFile(ctx, zoneId, name)
if err != nil { if err != nil {
t.Errorf("error reading data for file %q: %v", name, err) t.Errorf("error reading data for file %q: %v", name, err)
return return
@ -285,8 +285,8 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin
} }
} }
func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name string, val byte, expected int) { func checkFileByteCount(t *testing.T, ctx context.Context, zoneId string, name string, val byte, expected int) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name) _, rdata, err := WFS.ReadFile(ctx, zoneId, name)
if err != nil { if err != nil {
t.Errorf("error reading data for file %q: %v", name, err) t.Errorf("error reading data for file %q: %v", name, err)
return return
@ -302,8 +302,8 @@ func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name
} }
} }
func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) { func checkFileDataAt(t *testing.T, ctx context.Context, zoneId string, name string, offset int64, data string) {
_, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data))) _, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, int64(len(data)))
if err != nil { if err != nil {
t.Errorf("error reading data for file %q: %v", name, err) t.Errorf("error reading data for file %q: %v", name, err)
return return
@ -319,26 +319,26 @@ func TestAppend(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
fileName := "t2" fileName := "t2"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.AppendData(ctx, blockId, fileName, []byte("hello")) err = WFS.AppendData(ctx, zoneId, fileName, []byte("hello"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
// fmt.Print(GBS.dump()) // fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 5) checkFileSize(t, ctx, zoneId, fileName, 5)
checkFileData(t, ctx, blockId, fileName, "hello") checkFileData(t, ctx, zoneId, fileName, "hello")
err = GBS.AppendData(ctx, blockId, fileName, []byte(" world")) err = WFS.AppendData(ctx, zoneId, fileName, []byte(" world"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
// fmt.Print(GBS.dump()) // fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 11) checkFileSize(t, ctx, zoneId, fileName, 11)
checkFileData(t, ctx, blockId, fileName, "hello world") checkFileData(t, ctx, zoneId, fileName, "hello world")
} }
func TestWriteFile(t *testing.T) { func TestWriteFile(t *testing.T) {
@ -347,43 +347,43 @@ func TestWriteFile(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
fileName := "t3" fileName := "t3"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, fileName, "hello world!") checkFileData(t, ctx, zoneId, fileName, "hello world!")
err = GBS.WriteFile(ctx, blockId, fileName, []byte("goodbye world!")) err = WFS.WriteFile(ctx, zoneId, fileName, []byte("goodbye world!"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, fileName, "goodbye world!") checkFileData(t, ctx, zoneId, fileName, "goodbye world!")
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello")) err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, fileName, "hello") checkFileData(t, ctx, zoneId, fileName, "hello")
// circular file // circular file
err = GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) err = WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple")) err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana")
} }
func TestCircularWrites(t *testing.T) { func TestCircularWrites(t *testing.T) {
@ -391,66 +391,66 @@ func TestCircularWrites(t *testing.T) {
defer cleanupDb(t) defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50}) err := WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 ")) err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 "))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ") checkFileData(t, ctx, zoneId, "c1", "123456789 123456789 123456789 123456789 123456789 ")
err = GBS.AppendData(ctx, blockId, "c1", []byte("apple")) err = WFS.AppendData(ctx, zoneId, "c1", []byte("apple"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.WriteAt(ctx, blockId, "c1", 0, []byte("foo")) err = WFS.WriteAt(ctx, zoneId, "c1", 0, []byte("foo"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
// content should be unchanged because write is before the beginning of circular offset // content should be unchanged because write is before the beginning of circular offset
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple") checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.WriteAt(ctx, blockId, "c1", 5, []byte("a")) err = WFS.WriteAt(ctx, zoneId, "c1", 5, []byte("a"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileSize(t, ctx, blockId, "c1", 55) checkFileSize(t, ctx, zoneId, "c1", 55)
checkFileData(t, ctx, blockId, "c1", "a789 123456789 123456789 123456789 123456789 apple") checkFileData(t, ctx, zoneId, "c1", "a789 123456789 123456789 123456789 123456789 apple")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana")) err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileSize(t, ctx, blockId, "c1", 62) checkFileSize(t, ctx, zoneId, "c1", 62)
checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana") checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana")
err = GBS.WriteAt(ctx, blockId, "c1", 20, []byte("foo")) err = WFS.WriteAt(ctx, zoneId, "c1", 20, []byte("foo"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileSize(t, ctx, blockId, "c1", 62) checkFileSize(t, ctx, zoneId, "c1", 62)
checkFileData(t, ctx, blockId, "c1", "3456789 foo456789 123456789 123456789 apple banana") checkFileData(t, ctx, zoneId, "c1", "3456789 foo456789 123456789 123456789 apple banana")
offset, _, _ := GBS.ReadFile(ctx, blockId, "c1") offset, _, _ := WFS.ReadFile(ctx, zoneId, "c1")
if offset != 12 { if offset != 12 {
t.Errorf("offset mismatch: expected 12, got %d", offset) t.Errorf("offset mismatch: expected 12, got %d", offset)
} }
err = GBS.AppendData(ctx, blockId, "c1", []byte(" world")) err = WFS.AppendData(ctx, zoneId, "c1", []byte(" world"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileSize(t, ctx, blockId, "c1", 68) checkFileSize(t, ctx, zoneId, "c1", 68)
offset, _, _ = GBS.ReadFile(ctx, blockId, "c1") offset, _, _ = WFS.ReadFile(ctx, zoneId, "c1")
if offset != 18 { if offset != 18 {
t.Errorf("offset mismatch: expected 18, got %d", offset) t.Errorf("offset mismatch: expected 18, got %d", offset)
} }
checkFileData(t, ctx, blockId, "c1", "9 foo456789 123456789 123456789 apple banana world") checkFileData(t, ctx, zoneId, "c1", "9 foo456789 123456789 123456789 apple banana world")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789")) err = WFS.AppendData(ctx, zoneId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789"))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileSize(t, ctx, blockId, "c1", 128) checkFileSize(t, ctx, zoneId, "c1", 128)
checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789") checkFileData(t, ctx, zoneId, "c1", " 123456789 123456789 123456789 bar456789 123456789")
err = withLock(GBS, blockId, "c1", func(entry *CacheEntry) error { err = withLock(WFS, zoneId, "c1", func(entry *CacheEntry) error {
if entry == nil { if entry == nil {
return fmt.Errorf("entry not found") return fmt.Errorf("entry not found")
} }
@ -478,30 +478,30 @@ func TestMultiPart(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
fileName := "m2" fileName := "m2"
data := makeText(80) data := makeText(80)
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.AppendData(ctx, blockId, fileName, []byte(data)) err = WFS.AppendData(ctx, zoneId, fileName, []byte(data))
if err != nil { if err != nil {
t.Fatalf("error appending data: %v", err) t.Fatalf("error appending data: %v", err)
} }
checkFileSize(t, ctx, blockId, fileName, 80) checkFileSize(t, ctx, zoneId, fileName, 80)
checkFileData(t, ctx, blockId, fileName, data) checkFileData(t, ctx, zoneId, fileName, data)
_, barr, err := GBS.ReadAt(ctx, blockId, fileName, 42, 10) _, barr, err := WFS.ReadAt(ctx, zoneId, fileName, 42, 10)
if err != nil { if err != nil {
t.Fatalf("error reading data: %v", err) t.Fatalf("error reading data: %v", err)
} }
if string(barr) != data[42:52] { if string(barr) != data[42:52] {
t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr)) t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr))
} }
GBS.WriteAt(ctx, blockId, fileName, 49, []byte("world")) WFS.WriteAt(ctx, zoneId, fileName, 49, []byte("world"))
checkFileSize(t, ctx, blockId, fileName, 80) checkFileSize(t, ctx, zoneId, fileName, 80)
checkFileDataAt(t, ctx, blockId, fileName, 49, "world") checkFileDataAt(t, ctx, zoneId, fileName, 49, "world")
checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4") checkFileDataAt(t, ctx, zoneId, fileName, 48, "8world4")
} }
func testIntMapsEq(t *testing.T, msg string, m map[int]int, expected map[int]int) { func testIntMapsEq(t *testing.T, msg string, m map[int]int, expected map[int]int) {
@ -521,7 +521,7 @@ func TestComputePartMap(t *testing.T) {
defer func() { defer func() {
partDataSize = DefaultPartDataSize partDataSize = DefaultPartDataSize
}() }()
file := &BlockFile{} file := &WaveFile{}
m := file.computePartMap(0, 250) m := file.computePartMap(0, 250)
testIntMapsEq(t, "map1", m, map[int]int{0: 100, 1: 100, 2: 50}) testIntMapsEq(t, "map1", m, map[int]int{0: 100, 1: 100, 2: 50})
m = file.computePartMap(110, 40) m = file.computePartMap(110, 40)
@ -535,7 +535,7 @@ func TestComputePartMap(t *testing.T) {
testIntMapsEq(t, "map5", m, map[int]int{8: 80, 9: 100, 10: 100, 11: 60}) testIntMapsEq(t, "map5", m, map[int]int{8: 80, 9: 100, 10: 100, 11: 60})
// now test circular // now test circular
file = &BlockFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}} file = &WaveFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}}
m = file.computePartMap(10, 250) m = file.computePartMap(10, 250)
testIntMapsEq(t, "map6", m, map[int]int{0: 90, 1: 100, 2: 60}) testIntMapsEq(t, "map6", m, map[int]int{0: 90, 1: 100, 2: 60})
m = file.computePartMap(990, 40) m = file.computePartMap(990, 40)
@ -554,31 +554,31 @@ func TestSimpleDBFlush(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
fileName := "t1" fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!"))
if err != nil { if err != nil {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, fileName, "hello world!") checkFileData(t, ctx, zoneId, fileName, "hello world!")
_, err = GBS.FlushCache(ctx) _, err = WFS.FlushCache(ctx)
if err != nil { if err != nil {
t.Fatalf("error flushing cache: %v", err) t.Fatalf("error flushing cache: %v", err)
} }
if GBS.getCacheSize() != 0 { if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch") t.Errorf("cache size mismatch")
} }
checkFileData(t, ctx, blockId, fileName, "hello world!") checkFileData(t, ctx, zoneId, fileName, "hello world!")
if GBS.getCacheSize() != 0 { if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch (after read)") t.Errorf("cache size mismatch (after read)")
} }
checkFileDataAt(t, ctx, blockId, fileName, 6, "world!") checkFileDataAt(t, ctx, zoneId, fileName, 6, "world!")
checkFileSize(t, ctx, blockId, fileName, 12) checkFileSize(t, ctx, zoneId, fileName, 12)
checkFileByteCount(t, ctx, blockId, fileName, 'l', 3) checkFileByteCount(t, ctx, zoneId, fileName, 'l', 3)
} }
func TestConcurrentAppend(t *testing.T) { func TestConcurrentAppend(t *testing.T) {
@ -586,9 +586,9 @@ func TestConcurrentAppend(t *testing.T) {
defer cleanupDb(t) defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn() defer cancelFn()
blockId := uuid.New().String() zoneId := uuid.New().String()
fileName := "t1" fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil { if err != nil {
t.Fatalf("error creating file: %v", err) t.Fatalf("error creating file: %v", err)
} }
@ -600,23 +600,23 @@ func TestConcurrentAppend(t *testing.T) {
const hexChars = "0123456789abcdef" const hexChars = "0123456789abcdef"
ch := hexChars[n] ch := hexChars[n]
for j := 0; j < 100; j++ { for j := 0; j < 100; j++ {
err := GBS.AppendData(ctx, blockId, fileName, []byte{ch}) err := WFS.AppendData(ctx, zoneId, fileName, []byte{ch})
if err != nil { if err != nil {
t.Errorf("error appending data (%d): %v", n, err) t.Errorf("error appending data (%d): %v", n, err)
} }
if j == 50 { if j == 50 {
// ignore error here (concurrent flushing) // ignore error here (concurrent flushing)
GBS.FlushCache(ctx) WFS.FlushCache(ctx)
} }
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
checkFileSize(t, ctx, blockId, fileName, 1600) checkFileSize(t, ctx, zoneId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100)
GBS.FlushCache(ctx) WFS.FlushCache(ctx)
checkFileSize(t, ctx, blockId, fileName, 1600) checkFileSize(t, ctx, zoneId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100) checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100) checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100)
} }

View File

@ -4,17 +4,21 @@
package fileservice package fileservice
import ( import (
"context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn" "github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
) )
const MaxFileSize = 10 * 1024 * 1024 // 10M const MaxFileSize = 10 * 1024 * 1024 // 10M
const DefaultTimeout = 2 * time.Second
type FileService struct{} type FileService struct{}
@ -104,3 +108,13 @@ func (fs *FileService) ReadFile(path string) (*FullFile, error) {
Data64: base64.StdEncoding.EncodeToString(barr), Data64: base64.StdEncoding.EncodeToString(barr),
}, nil }, nil
} }
func (fs *FileService) GetWaveFile(id string, path string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
file, err := filestore.WFS.Stat(ctx, id, path)
if err != nil {
return nil, fmt.Errorf("error getting file: %w", err)
}
return file, nil
}