Merge branch 'main' into sylvie/backwards-nav

This commit is contained in:
Sylvia Crowe 2024-06-03 15:55:48 -07:00
commit b12417101e
21 changed files with 567 additions and 400 deletions

View File

@ -5,8 +5,8 @@ package db
import "embed"
//go:embed migrations-blockstore/*.sql
var BlockstoreMigrationFS embed.FS
//go:embed migrations-filestore/*.sql
var FilestoreMigrationFS embed.FS
//go:embed migrations-wstore/*.sql
var WStoreMigrationFS embed.FS

View File

@ -1,3 +0,0 @@
DROP TABLE block_file;
DROP TABLE block_data;

View File

@ -0,0 +1,3 @@
DROP TABLE db_wave_file;
DROP TABLE db_file_data;

View File

@ -1,19 +1,19 @@
CREATE TABLE db_block_file (
blockid varchar(36) NOT NULL,
CREATE TABLE db_wave_file (
zoneid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
size bigint NOT NULL,
createdts bigint NOT NULL,
modts bigint NOT NULL,
opts json NOT NULL,
meta json NOT NULL,
PRIMARY KEY (blockid, name)
PRIMARY KEY (zoneid, name)
);
CREATE TABLE db_block_data (
blockid varchar(36) NOT NULL,
CREATE TABLE db_file_data (
zoneid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
partidx int NOT NULL,
data blob NOT NULL,
PRIMARY KEY(blockid, name, partidx)
PRIMARY KEY(zoneid, name, partidx)
);

View File

@ -59,6 +59,13 @@ function MarkdownPreview({ contentAtom }: { contentAtom: jotai.Atom<Promise<stri
function StreamingPreview({ fileInfo }: { fileInfo: FileInfo }) {
const filePath = fileInfo.path;
const streamingUrl = "/wave/stream-file?path=" + encodeURIComponent(filePath);
if (fileInfo.mimetype == "application/pdf") {
return (
<div className="view-preview view-preview-pdf">
<iframe src={streamingUrl} width="100%" height="100%" name="pdfview" />
</div>
);
}
if (fileInfo.mimetype.startsWith("video/")) {
return (
<div className="view-preview view-preview-video">
@ -153,7 +160,12 @@ function PreviewView({ blockId }: { blockId: string }) {
// handle streaming files here
let specializedView: React.ReactNode;
if (mimeType.startsWith("video/") || mimeType.startsWith("audio/") || mimeType.startsWith("image/")) {
if (
mimeType == "application/pdf" ||
mimeType.startsWith("video/") ||
mimeType.startsWith("audio/") ||
mimeType.startsWith("image/")
) {
specializedView = <StreamingPreview fileInfo={fileInfo} />;
} else if (fileInfo == null) {
specializedView = <CenteredDiv>File Not Found</CenteredDiv>;

View File

@ -114,19 +114,19 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
if (!termRef.current) {
return;
}
// load data from blockfile
// load data from filestore
const startTs = Date.now();
let loadedBytes = 0;
const localTerm = termRef.current; // avoids devmode double effect running issue (terminal gets created twice)
const usp = new URLSearchParams();
usp.set("blockid", blockId);
usp.set("zoneid", blockId);
usp.set("name", "main");
fetch("/wave/blockfile?" + usp.toString())
fetch("/wave/file?" + usp.toString())
.then((resp) => {
if (resp.ok) {
return resp.arrayBuffer();
}
console.log("error loading blockfile", resp.status, resp.statusText);
console.log("error loading file", resp.status, resp.statusText);
})
.then((data: ArrayBuffer) => {
const uint8View = new Uint8Array(data);
@ -139,7 +139,7 @@ const TerminalView = ({ blockId }: { blockId: string }) => {
});
initialLoadRef.current.loaded = true;
initialLoadRef.current.heldData = [];
console.log(`terminal loaded blockfile ${loadedBytes} bytes, ${Date.now() - startTs}ms`);
console.log(`terminal loaded file ${loadedBytes} bytes, ${Date.now() - startTs}ms`);
});
}, [termRef.current]);

View File

@ -20,7 +20,7 @@ console.log("Wave Starting");
document.addEventListener("DOMContentLoaded", async () => {
console.log("DOMContentLoaded");
// ensures client/window are loaded into the cache before rendering
// ensures client/window/workspace are loaded into the cache before rendering
await WOS.loadAndPinWaveObject<Client>(WOS.makeORef("client", clientId));
const waveWindow = await WOS.loadAndPinWaveObject<WaveWindow>(WOS.makeORef("window", windowId));
await WOS.loadAndPinWaveObject<Workspace>(WOS.makeORef("workspace", waveWindow.workspaceid));

32
main.go
View File

@ -22,8 +22,8 @@ import (
"time"
"github.com/google/uuid"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/service/blockservice"
"github.com/wavetermdev/thenextwave/pkg/service/clientservice"
"github.com/wavetermdev/thenextwave/pkg/service/fileservice"
@ -72,7 +72,7 @@ func createWindow(windowData *wstore.Window, app *application.App) {
Backdrop: application.MacBackdropTranslucent,
TitleBar: application.MacTitleBarHiddenInset,
},
BackgroundColour: application.NewRGB(0, 0, 0),
BackgroundColour: application.NewRGBA(0, 0, 0, 255),
URL: "/public/index.html?windowid=" + windowData.OID + "&clientid=" + client.OID,
X: windowData.Pos.X,
Y: windowData.Pos.Y,
@ -102,11 +102,11 @@ type waveAssetHandler struct {
AssetHandler http.Handler
}
func serveBlockFile(w http.ResponseWriter, r *http.Request) {
blockId := r.URL.Query().Get("blockid")
func serveWaveFile(w http.ResponseWriter, r *http.Request) {
zoneId := r.URL.Query().Get("zoneid")
name := r.URL.Query().Get("name")
if _, err := uuid.Parse(blockId); err != nil {
http.Error(w, fmt.Sprintf("invalid blockid: %v", err), http.StatusBadRequest)
if _, err := uuid.Parse(zoneId); err != nil {
http.Error(w, fmt.Sprintf("invalid zoneid: %v", err), http.StatusBadRequest)
return
}
if name == "" {
@ -114,7 +114,7 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) {
return
}
file, err := blockstore.GBS.Stat(r.Context(), blockId, name)
file, err := filestore.WFS.Stat(r.Context(), zoneId, name)
if err == fs.ErrNotExist {
http.NotFound(w, r)
return
@ -129,16 +129,16 @@ func serveBlockFile(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", file.Size))
w.Header().Set("X-BlockFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr))
w.Header().Set("X-ZoneFileInfo", base64.StdEncoding.EncodeToString(jsonFileBArr))
w.Header().Set("Last-Modified", time.UnixMilli(file.ModTs).UTC().Format(http.TimeFormat))
for offset := file.DataStartIdx(); offset < file.Size; offset += blockstore.DefaultPartDataSize {
_, data, err := blockstore.GBS.ReadAt(r.Context(), blockId, name, offset, blockstore.DefaultPartDataSize)
for offset := file.DataStartIdx(); offset < file.Size; offset += filestore.DefaultPartDataSize {
_, data, err := filestore.WFS.ReadAt(r.Context(), zoneId, name, offset, filestore.DefaultPartDataSize)
if err != nil {
if offset == 0 {
http.Error(w, fmt.Sprintf("error reading file: %v", err), http.StatusInternalServerError)
} else {
// nothing to do, the headers have already been sent
log.Printf("error reading file %s/%s @ %d: %v\n", blockId, name, offset, err)
log.Printf("error reading file %s/%s @ %d: %v\n", zoneId, name, offset, err)
}
return
}
@ -154,8 +154,8 @@ func serveWaveUrls(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, fileName)
return
}
if r.URL.Path == "/wave/blockfile" {
serveBlockFile(w, r)
if r.URL.Path == "/wave/file" {
serveWaveFile(w, r)
return
}
http.NotFound(w, r)
@ -174,7 +174,7 @@ func doShutdown(reason string) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
// TODO deal with flush in progress
blockstore.GBS.FlushCache(ctx)
filestore.WFS.FlushCache(ctx)
time.Sleep(200 * time.Millisecond)
os.Exit(0)
}
@ -203,9 +203,9 @@ func main() {
}
log.Printf("wave home dir: %s\n", wavebase.GetWaveHomeDir())
err = blockstore.InitBlockstore()
err = filestore.InitFilestore()
if err != nil {
log.Printf("error initializing blockstore: %v\n", err)
log.Printf("error initializing filestore: %v\n", err)
return
}
err = wstore.InitWStore()

View File

@ -16,8 +16,8 @@ import (
"github.com/creack/pty"
"github.com/wailsapp/wails/v3/pkg/application"
"github.com/wavetermdev/thenextwave/pkg/blockstore"
"github.com/wavetermdev/thenextwave/pkg/eventbus"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/shellexec"
"github.com/wavetermdev/thenextwave/pkg/wstore"
)
@ -93,7 +93,7 @@ const DefaultTermMaxFileSize = 256 * 1024
func (bc *BlockController) handleShellProcData(data []byte, seqNum int) error {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", data)
err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", data)
if err != nil {
return fmt.Errorf("error appending to blockfile: %w", err)
}
@ -118,7 +118,7 @@ func (bc *BlockController) resetTerminalState() {
buf.WriteString("\x1b[?25h") // show cursor
buf.WriteString("\x1b[?1000l") // disable mouse tracking
buf.WriteString("\r\n\r\n(restored terminal state)\r\n\r\n")
err := blockstore.GBS.AppendData(ctx, bc.BlockId, "main", buf.Bytes())
err := filestore.WFS.AppendData(ctx, bc.BlockId, "main", buf.Bytes())
if err != nil {
log.Printf("error appending to blockfile (terminal reset): %v\n", err)
}
@ -128,11 +128,11 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts) error {
// create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
err := blockstore.GBS.MakeFile(ctx, bc.BlockId, "main", nil, blockstore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true})
if err != nil && err != blockstore.ErrAlreadyExists {
err := filestore.WFS.MakeFile(ctx, bc.BlockId, "main", nil, filestore.FileOptsType{MaxSize: DefaultTermMaxFileSize, Circular: true})
if err != nil && err != filestore.ErrAlreadyExists {
return fmt.Errorf("error creating blockfile: %w", err)
}
if err == blockstore.ErrAlreadyExists {
if err == filestore.ErrAlreadyExists {
// reset the terminal state
bc.resetTerminalState()
}

View File

@ -0,0 +1,4 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockcontroller

View File

@ -1,117 +0,0 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
"context"
"fmt"
"os"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrAlreadyExists = fmt.Errorf("file already exists")
func dbInsertFile(ctx context.Context, file *BlockFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?"
if tx.Exists(query, file.BlockId, file.Name) {
return ErrAlreadyExists
}
query = "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})
}
func dbDeleteFile(ctx context.Context, blockId string, name string) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?"
tx.Exec(query, blockId, name)
return nil
})
}
func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var files []string
query := "SELECT name FROM db_block_file WHERE blockid = ?"
tx.Select(&files, query, blockId)
return files, nil
})
}
func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?"
file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name)
return file, nil
})
}
func dbGetAllBlockIds(ctx context.Context) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var ids []string
query := "SELECT DISTINCT blockid FROM db_block_file"
tx.Select(&ids, query)
return ids, nil
})
}
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
return nil, nil
}
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
var data []*DataCacheEntry
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data)
d.Data = newData
}
rtn[d.PartIdx] = d
}
return rtn, nil
})
}
func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) {
query := "SELECT * FROM db_block_file WHERE blockid = ?"
files := dbutil.SelectMappable[*BlockFile](tx, query, blockId)
return files, nil
})
}
func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
if !tx.Exists(query, file.BlockId, file.Name) {
// since deletion is synchronous this stops us from writing to a deleted file
return os.ErrNotExist
}
// we don't update CreatedTs or Opts
query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?`
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name)
if replace {
query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?`
tx.Exec(query, file.BlockId, file.Name)
}
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
for partIdx, dataEntry := range dataEntries {
if partIdx != dataEntry.PartIdx {
panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx))
}
tx.Exec(dataPartQuery, file.BlockId, file.Name, dataEntry.PartIdx, dataEntry.Data)
}
return nil
})
}

View File

@ -1,9 +1,9 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
package filestore
// the blockstore package implements a write cache for block files
// the blockstore package implements a write cache for wave files
// it is not a read cache (reads still go to the DB -- unless items are in the cache)
// but all writes only go to the cache, and then the cache is periodically flushed to the DB
@ -29,7 +29,7 @@ var flushErrorCount = &atomic.Int32{}
var partDataSize int64 = DefaultPartDataSize // overridden in tests
var stopFlush = &atomic.Bool{}
var GBS *BlockStore = &BlockStore{
var WFS *FileStore = &FileStore{
Lock: &sync.Mutex{},
Cache: make(map[cacheKey]*CacheEntry),
}
@ -42,9 +42,9 @@ type FileOptsType struct {
type FileMeta = map[string]any
type BlockFile struct {
type WaveFile struct {
// these fields are static (not updated)
BlockId string `json:"blockid"`
ZoneId string `json:"zoneid"`
Name string `json:"name"`
Opts FileOptsType `json:"opts"`
CreatedTs int64 `json:"createdts"`
@ -57,7 +57,7 @@ type BlockFile struct {
// for regular files this is just Size
// for circular files this is min(Size, MaxSize)
func (f BlockFile) DataLength() int64 {
func (f WaveFile) DataLength() int64 {
if f.Opts.Circular {
return minInt64(f.Size, f.Opts.MaxSize)
}
@ -66,7 +66,7 @@ func (f BlockFile) DataLength() int64 {
// for regular files this is just 0
// for circular files this is the index of the first byte of data we have
func (f BlockFile) DataStartIdx() int64 {
func (f WaveFile) DataStartIdx() int64 {
if f.Opts.Circular && f.Size > f.Opts.MaxSize {
return f.Size - f.Opts.MaxSize
}
@ -82,7 +82,7 @@ func copyMeta(meta FileMeta) FileMeta {
return newMeta
}
func (f *BlockFile) DeepCopy() *BlockFile {
func (f *WaveFile) DeepCopy() *WaveFile {
if f == nil {
return nil
}
@ -91,19 +91,19 @@ func (f *BlockFile) DeepCopy() *BlockFile {
return &newFile
}
func (BlockFile) UseDBMap() {}
func (WaveFile) UseDBMap() {}
type BlockData struct {
BlockId string `json:"blockid"`
type FileData struct {
ZoneId string `json:"zoneid"`
Name string `json:"name"`
PartIdx int `json:"partidx"`
Data []byte `json:"data"`
}
func (BlockData) UseDBMap() {}
func (FileData) UseDBMap() {}
// synchronous (does not interact with the cache)
func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error {
func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, meta FileMeta, opts FileOptsType) error {
if opts.MaxSize < 0 {
return fmt.Errorf("max size must be non-negative")
}
@ -118,13 +118,13 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
}
}
return withLock(s, blockId, name, func(entry *CacheEntry) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
if entry.File != nil {
return fs.ErrExist
}
now := time.Now().UnixMilli()
file := &BlockFile{
BlockId: blockId,
file := &WaveFile{
ZoneId: zoneId,
Name: name,
Size: 0,
CreatedTs: now,
@ -136,9 +136,9 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
})
}
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error {
err := dbDeleteFile(ctx, blockId, name)
func (s *FileStore) DeleteFile(ctx context.Context, zoneId string, name string) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := dbDeleteFile(ctx, zoneId, name)
if err != nil {
return fmt.Errorf("error deleting file: %v", err)
}
@ -147,20 +147,20 @@ func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string
})
}
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
fileNames, err := dbGetBlockFileNames(ctx, blockId)
func (s *FileStore) DeleteZone(ctx context.Context, zoneId string) error {
fileNames, err := dbGetZoneFileNames(ctx, zoneId)
if err != nil {
return fmt.Errorf("error getting block files: %v", err)
return fmt.Errorf("error getting zone files: %v", err)
}
for _, name := range fileNames {
s.DeleteFile(ctx, blockId, name)
s.DeleteFile(ctx, zoneId, name)
}
return nil
}
// if file doesn't exsit, returns fs.ErrNotExist
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) {
func (s *FileStore) Stat(ctx context.Context, zoneId string, name string) (*WaveFile, error) {
return withLockRtn(s, zoneId, name, func(entry *CacheEntry) (*WaveFile, error) {
file, err := entry.loadFileForRead(ctx)
if err != nil {
return nil, fmt.Errorf("error getting file: %v", err)
@ -169,13 +169,13 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl
})
}
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
files, err := dbGetBlockFiles(ctx, blockId)
func (s *FileStore) ListFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) {
files, err := dbGetZoneFiles(ctx, zoneId)
if err != nil {
return nil, fmt.Errorf("error getting block files: %v", err)
return nil, fmt.Errorf("error getting zone files: %v", err)
}
for idx, file := range files {
withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error {
withLock(s, file.ZoneId, file.Name, func(entry *CacheEntry) error {
if entry.File != nil {
files[idx] = entry.File.DeepCopy()
}
@ -185,8 +185,8 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil
return files, nil
}
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error {
func (s *FileStore) WriteMeta(ctx context.Context, zoneId string, name string, meta FileMeta, merge bool) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
@ -207,8 +207,8 @@ func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string,
})
}
func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error {
func (s *FileStore) WriteFile(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
@ -219,11 +219,11 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string,
})
}
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, offset int64, data []byte) error {
if offset < 0 {
return fmt.Errorf("offset must be non-negative")
}
return withLock(s, blockId, name, func(entry *CacheEntry) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
@ -243,8 +243,8 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o
})
}
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
return withLock(s, blockId, name, func(entry *CacheEntry) error {
func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
@ -262,14 +262,14 @@ func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string
})
}
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
return dbGetAllBlockIds(ctx)
func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) {
return dbGetAllZoneIds(ctx)
}
// returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, blockId, name, func(entry *CacheEntry) error {
func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, zoneId, name, func(entry *CacheEntry) error {
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
return nil
})
@ -277,8 +277,8 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
}
// returns (offset, data, error)
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, blockId, name, func(entry *CacheEntry) error {
func (s *FileStore) ReadFile(ctx context.Context, zoneId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, zoneId, name, func(entry *CacheEntry) error {
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true)
return nil
})
@ -291,7 +291,7 @@ type FlushStats struct {
NumCommitted int
}
func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
wasFlushing := s.setUnlessFlushing()
if wasFlushing {
return stats, fmt.Errorf("flush already in progress")
@ -306,7 +306,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e
dirtyCacheKeys := s.getDirtyCacheKeys()
stats.NumDirtyEntries = len(dirtyCacheKeys)
for _, key := range dirtyCacheKeys {
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
err := withLock(s, key.ZoneId, key.Name, func(entry *CacheEntry) error {
return entry.flushToDB(ctx, false)
})
if ctx.Err() != nil {
@ -323,7 +323,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr e
///////////////////////////////////
func (f *BlockFile) partIdxAtOffset(offset int64) int {
func (f *WaveFile) partIdxAtOffset(offset int64) int {
partIdx := int(offset / partDataSize)
if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / partDataSize)
@ -351,11 +351,11 @@ func getPartIdxsFromMap(partMap map[int]int) []int {
}
// returns a map of partIdx to amount of data to write to that part
func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int {
func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int {
partMap := make(map[int]int)
endOffset := startOffset + size
startBlockOffset := startOffset - (startOffset % partDataSize)
for testOffset := startBlockOffset; testOffset < endOffset; testOffset += partDataSize {
startFileOffset := startOffset - (startOffset % partDataSize)
for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize {
partIdx := file.partIdxAtOffset(testOffset)
partStartOffset := testOffset
partEndOffset := testOffset + partDataSize
@ -372,7 +372,7 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int
return partMap
}
func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
func (s *FileStore) getDirtyCacheKeys() []cacheKey {
s.Lock.Lock()
defer s.Lock.Unlock()
var dirtyCacheKeys []cacheKey
@ -384,14 +384,14 @@ func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
return dirtyCacheKeys
}
func (s *BlockStore) setIsFlushing(flushing bool) {
func (s *FileStore) setIsFlushing(flushing bool) {
s.Lock.Lock()
defer s.Lock.Unlock()
s.IsFlushing = flushing
}
// returns old value of IsFlushing
func (s *BlockStore) setUnlessFlushing() bool {
func (s *FileStore) setUnlessFlushing() bool {
s.Lock.Lock()
defer s.Lock.Unlock()
if s.IsFlushing {
@ -401,26 +401,26 @@ func (s *BlockStore) setUnlessFlushing() bool {
return false
}
func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) {
func (s *FileStore) runFlushWithNewContext() (FlushStats, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
defer cancelFn()
return s.FlushCache(ctx)
}
func (s *BlockStore) runFlusher() {
func (s *FileStore) runFlusher() {
defer func() {
if r := recover(); r != nil {
log.Printf("panic in blockstore flusher: %v\n", r)
log.Printf("panic in filestore flusher: %v\n", r)
debug.PrintStack()
}
}()
for {
stats, err := s.runFlushWithNewContext()
if err != nil || stats.NumDirtyEntries > 0 {
log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
log.Printf("filestore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
}
if stopFlush.Load() {
log.Printf("blockstore flusher stopping\n")
log.Printf("filestore flusher stopping\n")
return
}
time.Sleep(DefaultFlushTime)

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
package filestore
import (
"bytes"
@ -13,11 +13,11 @@ import (
)
type cacheKey struct {
BlockId string
Name string
ZoneId string
Name string
}
type BlockStore struct {
type FileStore struct {
Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry
IsFlushing bool
@ -25,17 +25,17 @@ type BlockStore struct {
type DataCacheEntry struct {
PartIdx int
Data []byte // capacity is always BlockDataPartSize
Data []byte // capacity is always ZoneDataPartSize
}
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
type CacheEntry struct {
PinCount int // this is synchronzed with the BlockStore lock (not the entry lock)
PinCount int // this is synchronzed with the FileStore lock (not the entry lock)
Lock *sync.Mutex
BlockId string
ZoneId string
Name string
File *BlockFile
File *WaveFile
DataEntries map[int]*DataCacheEntry
FlushErrors int
}
@ -43,7 +43,7 @@ type CacheEntry struct {
//lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount)
fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount)
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
@ -59,28 +59,28 @@ func makeDataCacheEntry(partIdx int) *DataCacheEntry {
}
// will create new entries
func (s *BlockStore) getEntryAndPin(blockId string, name string) *CacheEntry {
func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil {
entry = makeCacheEntry(blockId, name)
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
entry = makeCacheEntry(zoneId, name)
s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry
}
entry.PinCount++
return entry
}
func (s *BlockStore) unpinEntryAndTryDelete(blockId string, name string) {
func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil {
return
}
entry.PinCount--
if entry.PinCount <= 0 && entry.File == nil {
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name})
}
}
@ -111,11 +111,11 @@ func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
}
// does not populate the cache entry, returns err if file does not exist
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error) {
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) {
if entry.File != nil {
return entry.File, nil
}
file, err := dbGetBlockFile(ctx, entry.BlockId, entry.Name)
file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name)
if err != nil {
return nil, fmt.Errorf("error getting file: %w", err)
}
@ -125,17 +125,17 @@ func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error
return file, nil
}
func withLock(s *BlockStore, blockId string, name string, fn func(*CacheEntry) error) error {
entry := s.getEntryAndPin(blockId, name)
defer s.unpinEntryAndTryDelete(blockId, name)
func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error {
entry := s.getEntryAndPin(zoneId, name)
defer s.unpinEntryAndTryDelete(zoneId, name)
entry.Lock.Lock()
defer entry.Lock.Unlock()
return fn(entry)
}
func withLockRtn[T any](s *BlockStore, blockId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
var rtnVal T
rtnErr := withLock(s, blockId, name, func(entry *CacheEntry) error {
rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error {
var err error
rtnVal, err = fn(entry)
return err
@ -273,7 +273,7 @@ func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int
// parts are already loaded
return nil
}
dbDataParts, err := dbGetFileParts(ctx, entry.BlockId, entry.Name, parts)
dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts)
if err != nil {
return fmt.Errorf("error getting data parts: %w", err)
}
@ -291,7 +291,7 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int)
var dbDataParts map[int]*DataCacheEntry
if len(dbParts) > 0 {
var err error
dbDataParts, err = dbGetFileParts(ctx, entry.BlockId, entry.Name, dbParts)
dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts)
if err != nil {
return nil, fmt.Errorf("error getting data parts: %w", err)
}
@ -311,10 +311,10 @@ func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int)
return rtn, nil
}
func makeCacheEntry(blockId string, name string) *CacheEntry {
func makeCacheEntry(zoneId string, name string) *CacheEntry {
return &CacheEntry{
Lock: &sync.Mutex{},
BlockId: blockId,
ZoneId: zoneId,
Name: name,
PinCount: 0,
File: nil,

View File

@ -0,0 +1,117 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package filestore
import (
"context"
"fmt"
"os"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
)
var ErrAlreadyExists = fmt.Errorf("file already exists")
func dbInsertFile(ctx context.Context, file *WaveFile) error {
// will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error {
query := "SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?"
if tx.Exists(query, file.ZoneId, file.Name) {
return ErrAlreadyExists
}
query = "INSERT INTO db_wave_file (zoneid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.ZoneId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil
})
}
func dbDeleteFile(ctx context.Context, zoneId string, name string) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := "DELETE FROM db_wave_file WHERE zoneid = ? AND name = ?"
tx.Exec(query, zoneId, name)
query = "DELETE FROM db_file_data WHERE zoneid = ? AND name = ?"
tx.Exec(query, zoneId, name)
return nil
})
}
func dbGetZoneFileNames(ctx context.Context, zoneId string) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var files []string
query := "SELECT name FROM db_wave_file WHERE zoneid = ?"
tx.Select(&files, query, zoneId)
return files, nil
})
}
func dbGetZoneFile(ctx context.Context, zoneId string, name string) (*WaveFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*WaveFile, error) {
query := "SELECT * FROM db_wave_file WHERE zoneid = ? AND name = ?"
file := dbutil.GetMappable[*WaveFile](tx, query, zoneId, name)
return file, nil
})
}
func dbGetAllZoneIds(ctx context.Context) ([]string, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
var ids []string
query := "SELECT DISTINCT zoneid FROM db_wave_file"
tx.Select(&ids, query)
return ids, nil
})
}
func dbGetFileParts(ctx context.Context, zoneId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
return nil, nil
}
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
var data []*DataCacheEntry
query := "SELECT partidx, data FROM db_file_data WHERE zoneid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
tx.Select(&data, query, zoneId, name, dbutil.QuickJsonArr(parts))
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data)
d.Data = newData
}
rtn[d.PartIdx] = d
}
return rtn, nil
})
}
func dbGetZoneFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*WaveFile, error) {
query := "SELECT * FROM db_wave_file WHERE zoneid = ?"
files := dbutil.SelectMappable[*WaveFile](tx, query, zoneId)
return files, nil
})
}
func dbWriteCacheEntry(ctx context.Context, file *WaveFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT zoneid FROM db_wave_file WHERE zoneid = ? AND name = ?`
if !tx.Exists(query, file.ZoneId, file.Name) {
// since deletion is synchronous this stops us from writing to a deleted file
return os.ErrNotExist
}
// we don't update CreatedTs or Opts
query = `UPDATE db_wave_file SET size = ?, modts = ?, meta = ? WHERE zoneid = ? AND name = ?`
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.ZoneId, file.Name)
if replace {
query = `DELETE FROM db_file_data WHERE zoneid = ? AND name = ?`
tx.Exec(query, file.ZoneId, file.Name)
}
dataPartQuery := `REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?, ?, ?, ?)`
for partIdx, dataEntry := range dataEntries {
if partIdx != dataEntry.PartIdx {
panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx))
}
tx.Exec(dataPartQuery, file.ZoneId, file.Name, dataEntry.PartIdx, dataEntry.Data)
}
return nil
})
}

View File

@ -1,9 +1,9 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
package filestore
// setup for blockstore db
// setup for filestore db
// includes migration support and txwrap setup
import (
@ -23,14 +23,14 @@ import (
dbfs "github.com/wavetermdev/thenextwave/db"
)
const BlockstoreDBName = "blockstore.db"
const FilestoreDBName = "filestore.db"
type TxWrap = txwrap.TxWrap
var globalDB *sqlx.DB
var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db)
func InitBlockstore() error {
func InitFilestore() error {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
var err error
@ -38,20 +38,20 @@ func InitBlockstore() error {
if err != nil {
return err
}
err = migrateutil.Migrate("blockstore", globalDB.DB, dbfs.BlockstoreMigrationFS, "migrations-blockstore")
err = migrateutil.Migrate("filestore", globalDB.DB, dbfs.FilestoreMigrationFS, "migrations-filestore")
if err != nil {
return err
}
if !stopFlush.Load() {
go GBS.runFlusher()
go WFS.runFlusher()
}
log.Printf("blockstore initialized\n")
log.Printf("filestore initialized\n")
return nil
}
func GetDBName() string {
waveHome := wavebase.GetWaveHomeDir()
return path.Join(waveHome, BlockstoreDBName)
return path.Join(waveHome, FilestoreDBName)
}
func MakeDB(ctx context.Context) (*sqlx.DB, error) {

View File

@ -1,7 +1,7 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
package filestore
import (
"bytes"
@ -24,9 +24,9 @@ func initDb(t *testing.T) {
partDataSize = 50
warningCount = &atomic.Int32{}
stopFlush.Store(true)
err := InitBlockstore()
err := InitFilestore()
if err != nil {
t.Fatalf("error initializing blockstore: %v", err)
t.Fatalf("error initializing filestore: %v", err)
}
}
@ -38,7 +38,7 @@ func cleanupDb(t *testing.T) {
}
useTestingDb = false
partDataSize = DefaultPartDataSize
GBS.clearCache()
WFS.clearCache()
if warningCount.Load() > 0 {
t.Errorf("warning count: %d", warningCount.Load())
}
@ -47,24 +47,24 @@ func cleanupDb(t *testing.T) {
}
}
func (s *BlockStore) getCacheSize() int {
func (s *FileStore) getCacheSize() int {
s.Lock.Lock()
defer s.Lock.Unlock()
return len(s.Cache)
}
func (s *BlockStore) clearCache() {
func (s *FileStore) clearCache() {
s.Lock.Lock()
defer s.Lock.Unlock()
s.Cache = make(map[cacheKey]*CacheEntry)
}
//lint:ignore U1000 used for testing
func (s *BlockStore) dump() string {
func (s *FileStore) dump() string {
s.Lock.Lock()
defer s.Lock.Unlock()
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache)))
buf.WriteString(fmt.Sprintf("FileStore %d entries\n", len(s.Cache)))
for _, v := range s.Cache {
entryStr := v.dump()
buf.WriteString(entryStr)
@ -79,20 +79,20 @@ func TestCreate(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
zoneId := uuid.New().String()
err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
file, err := WFS.Stat(ctx, zoneId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file == nil {
t.Fatalf("file not found")
}
if file.BlockId != blockId {
t.Fatalf("block id mismatch")
if file.ZoneId != zoneId {
t.Fatalf("zone id mismatch")
}
if file.Name != "testfile" {
t.Fatalf("name mismatch")
@ -115,30 +115,30 @@ func TestCreate(t *testing.T) {
if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 {
t.Fatalf("opts not empty")
}
blockIds, err := GBS.GetAllBlockIds(ctx)
zoneIds, err := WFS.GetAllZoneIds(ctx)
if err != nil {
t.Fatalf("error getting block ids: %v", err)
t.Fatalf("error getting zone ids: %v", err)
}
if len(blockIds) != 1 {
t.Fatalf("block id count mismatch")
if len(zoneIds) != 1 {
t.Fatalf("zone id count mismatch")
}
if blockIds[0] != blockId {
t.Fatalf("block id mismatch")
if zoneIds[0] != zoneId {
t.Fatalf("zone id mismatch")
}
err = GBS.DeleteFile(ctx, blockId, "testfile")
err = WFS.DeleteFile(ctx, zoneId, "testfile")
if err != nil {
t.Fatalf("error deleting file: %v", err)
}
blockIds, err = GBS.GetAllBlockIds(ctx)
zoneIds, err = WFS.GetAllZoneIds(ctx)
if err != nil {
t.Fatalf("error getting block ids: %v", err)
t.Fatalf("error getting zone ids: %v", err)
}
if len(blockIds) != 0 {
t.Fatalf("block id count mismatch")
if len(zoneIds) != 0 {
t.Fatalf("zone id count mismatch")
}
}
func containsFile(arr []*BlockFile, name string) bool {
func containsFile(arr []*WaveFile, name string) bool {
for _, f := range arr {
if f.Name == name {
return true
@ -153,30 +153,30 @@ func TestDelete(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
zoneId := uuid.New().String()
err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.DeleteFile(ctx, blockId, "testfile")
err = WFS.DeleteFile(ctx, zoneId, "testfile")
if err != nil {
t.Fatalf("error deleting file: %v", err)
}
_, err = GBS.Stat(ctx, blockId, "testfile")
_, err = WFS.Stat(ctx, zoneId, "testfile")
if err == nil || errors.Is(err, fs.ErrNotExist) {
t.Errorf("expected file not found error")
}
// create two files in same block, use DeleteBlock to delete
err = GBS.MakeFile(ctx, blockId, "testfile1", nil, FileOptsType{})
// create two files in same zone, use DeleteZone to delete
err = WFS.MakeFile(ctx, zoneId, "testfile1", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.MakeFile(ctx, blockId, "testfile2", nil, FileOptsType{})
err = WFS.MakeFile(ctx, zoneId, "testfile2", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
files, err := GBS.ListFiles(ctx, blockId)
files, err := WFS.ListFiles(ctx, zoneId)
if err != nil {
t.Fatalf("error listing files: %v", err)
}
@ -186,11 +186,11 @@ func TestDelete(t *testing.T) {
if !containsFile(files, "testfile1") || !containsFile(files, "testfile2") {
t.Fatalf("file names mismatch")
}
err = GBS.DeleteBlock(ctx, blockId)
err = WFS.DeleteZone(ctx, zoneId)
if err != nil {
t.Fatalf("error deleting block: %v", err)
t.Fatalf("error deleting zone: %v", err)
}
files, err = GBS.ListFiles(ctx, blockId)
files, err = WFS.ListFiles(ctx, zoneId)
if err != nil {
t.Fatalf("error listing files: %v", err)
}
@ -216,19 +216,19 @@ func TestSetMeta(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
zoneId := uuid.New().String()
err := WFS.MakeFile(ctx, zoneId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
if GBS.getCacheSize() != 0 {
if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch -- should have 0 entries after create")
}
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false)
err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 5, "b": "hello", "q": 8}, false)
if err != nil {
t.Fatalf("error setting meta: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
file, err := WFS.Stat(ctx, zoneId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
@ -236,14 +236,14 @@ func TestSetMeta(t *testing.T) {
t.Fatalf("file not found")
}
checkMapsEqual(t, map[string]any{"a": 5, "b": "hello", "q": 8}, file.Meta, "meta")
if GBS.getCacheSize() != 1 {
if WFS.getCacheSize() != 1 {
t.Errorf("cache size mismatch")
}
err = GBS.WriteMeta(ctx, blockId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true)
err = WFS.WriteMeta(ctx, zoneId, "testfile", map[string]any{"a": 6, "c": "world", "d": 7, "q": nil}, true)
if err != nil {
t.Fatalf("error setting meta: %v", err)
}
file, err = GBS.Stat(ctx, blockId, "testfile")
file, err = WFS.Stat(ctx, zoneId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
@ -252,15 +252,15 @@ func TestSetMeta(t *testing.T) {
}
checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta")
err = GBS.WriteMeta(ctx, blockId, "testfile-notexist", map[string]any{"a": 6}, true)
err = WFS.WriteMeta(ctx, zoneId, "testfile-notexist", map[string]any{"a": 6}, true)
if err == nil {
t.Fatalf("expected error setting meta")
}
err = nil
}
func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) {
file, err := GBS.Stat(ctx, blockId, name)
func checkFileSize(t *testing.T, ctx context.Context, zoneId string, name string, size int64) {
file, err := WFS.Stat(ctx, zoneId, name)
if err != nil {
t.Errorf("error stating file %q: %v", name, err)
return
@ -274,8 +274,8 @@ func checkFileSize(t *testing.T, ctx context.Context, blockId string, name strin
}
}
func checkFileData(t *testing.T, ctx context.Context, blockId string, name string, data string) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name)
func checkFileData(t *testing.T, ctx context.Context, zoneId string, name string, data string) {
_, rdata, err := WFS.ReadFile(ctx, zoneId, name)
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
@ -285,8 +285,8 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin
}
}
func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name string, val byte, expected int) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name)
func checkFileByteCount(t *testing.T, ctx context.Context, zoneId string, name string, val byte, expected int) {
_, rdata, err := WFS.ReadFile(ctx, zoneId, name)
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
@ -302,8 +302,8 @@ func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name
}
}
func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) {
_, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data)))
func checkFileDataAt(t *testing.T, ctx context.Context, zoneId string, name string, offset int64, data string) {
_, rdata, err := WFS.ReadAt(ctx, zoneId, name, offset, int64(len(data)))
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
@ -319,26 +319,26 @@ func TestAppend(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
zoneId := uuid.New().String()
fileName := "t2"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.AppendData(ctx, blockId, fileName, []byte("hello"))
err = WFS.AppendData(ctx, zoneId, fileName, []byte("hello"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
// fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 5)
checkFileData(t, ctx, blockId, fileName, "hello")
err = GBS.AppendData(ctx, blockId, fileName, []byte(" world"))
checkFileSize(t, ctx, zoneId, fileName, 5)
checkFileData(t, ctx, zoneId, fileName, "hello")
err = WFS.AppendData(ctx, zoneId, fileName, []byte(" world"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
// fmt.Print(GBS.dump())
checkFileSize(t, ctx, blockId, fileName, 11)
checkFileData(t, ctx, blockId, fileName, "hello world")
checkFileSize(t, ctx, zoneId, fileName, 11)
checkFileData(t, ctx, zoneId, fileName, "hello world")
}
func TestWriteFile(t *testing.T) {
@ -347,43 +347,43 @@ func TestWriteFile(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
zoneId := uuid.New().String()
fileName := "t3"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!"))
err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
err = GBS.WriteFile(ctx, blockId, fileName, []byte("goodbye world!"))
checkFileData(t, ctx, zoneId, fileName, "hello world!")
err = WFS.WriteFile(ctx, zoneId, fileName, []byte("goodbye world!"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "goodbye world!")
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello"))
checkFileData(t, ctx, zoneId, fileName, "goodbye world!")
err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "hello")
checkFileData(t, ctx, zoneId, fileName, "hello")
// circular file
err = GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
err = WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple"))
err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 apple"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana"))
checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana")
checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana")
}
func TestCircularWrites(t *testing.T) {
@ -391,66 +391,66 @@ func TestCircularWrites(t *testing.T) {
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
zoneId := uuid.New().String()
err := WFS.MakeFile(ctx, zoneId, "c1", nil, FileOptsType{Circular: true, MaxSize: 50})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.WriteFile(ctx, blockId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 "))
err = WFS.WriteFile(ctx, zoneId, "c1", []byte("123456789 123456789 123456789 123456789 123456789 "))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ")
err = GBS.AppendData(ctx, blockId, "c1", []byte("apple"))
checkFileData(t, ctx, zoneId, "c1", "123456789 123456789 123456789 123456789 123456789 ")
err = WFS.AppendData(ctx, zoneId, "c1", []byte("apple"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.WriteAt(ctx, blockId, "c1", 0, []byte("foo"))
checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = WFS.WriteAt(ctx, zoneId, "c1", 0, []byte("foo"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
// content should be unchanged because write is before the beginning of circular offset
checkFileData(t, ctx, blockId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = GBS.WriteAt(ctx, blockId, "c1", 5, []byte("a"))
checkFileData(t, ctx, zoneId, "c1", "6789 123456789 123456789 123456789 123456789 apple")
err = WFS.WriteAt(ctx, zoneId, "c1", 5, []byte("a"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileSize(t, ctx, blockId, "c1", 55)
checkFileData(t, ctx, blockId, "c1", "a789 123456789 123456789 123456789 123456789 apple")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" banana"))
checkFileSize(t, ctx, zoneId, "c1", 55)
checkFileData(t, ctx, zoneId, "c1", "a789 123456789 123456789 123456789 123456789 apple")
err = WFS.AppendData(ctx, zoneId, "c1", []byte(" banana"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileSize(t, ctx, blockId, "c1", 62)
checkFileData(t, ctx, blockId, "c1", "3456789 123456789 123456789 123456789 apple banana")
err = GBS.WriteAt(ctx, blockId, "c1", 20, []byte("foo"))
checkFileSize(t, ctx, zoneId, "c1", 62)
checkFileData(t, ctx, zoneId, "c1", "3456789 123456789 123456789 123456789 apple banana")
err = WFS.WriteAt(ctx, zoneId, "c1", 20, []byte("foo"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileSize(t, ctx, blockId, "c1", 62)
checkFileData(t, ctx, blockId, "c1", "3456789 foo456789 123456789 123456789 apple banana")
offset, _, _ := GBS.ReadFile(ctx, blockId, "c1")
checkFileSize(t, ctx, zoneId, "c1", 62)
checkFileData(t, ctx, zoneId, "c1", "3456789 foo456789 123456789 123456789 apple banana")
offset, _, _ := WFS.ReadFile(ctx, zoneId, "c1")
if offset != 12 {
t.Errorf("offset mismatch: expected 12, got %d", offset)
}
err = GBS.AppendData(ctx, blockId, "c1", []byte(" world"))
err = WFS.AppendData(ctx, zoneId, "c1", []byte(" world"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileSize(t, ctx, blockId, "c1", 68)
offset, _, _ = GBS.ReadFile(ctx, blockId, "c1")
checkFileSize(t, ctx, zoneId, "c1", 68)
offset, _, _ = WFS.ReadFile(ctx, zoneId, "c1")
if offset != 18 {
t.Errorf("offset mismatch: expected 18, got %d", offset)
}
checkFileData(t, ctx, blockId, "c1", "9 foo456789 123456789 123456789 apple banana world")
err = GBS.AppendData(ctx, blockId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789"))
checkFileData(t, ctx, zoneId, "c1", "9 foo456789 123456789 123456789 apple banana world")
err = WFS.AppendData(ctx, zoneId, "c1", []byte(" 123456789 123456789 123456789 123456789 bar456789 123456789"))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileSize(t, ctx, blockId, "c1", 128)
checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789")
err = withLock(GBS, blockId, "c1", func(entry *CacheEntry) error {
checkFileSize(t, ctx, zoneId, "c1", 128)
checkFileData(t, ctx, zoneId, "c1", " 123456789 123456789 123456789 bar456789 123456789")
err = withLock(WFS, zoneId, "c1", func(entry *CacheEntry) error {
if entry == nil {
return fmt.Errorf("entry not found")
}
@ -478,30 +478,30 @@ func TestMultiPart(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
zoneId := uuid.New().String()
fileName := "m2"
data := makeText(80)
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.AppendData(ctx, blockId, fileName, []byte(data))
err = WFS.AppendData(ctx, zoneId, fileName, []byte(data))
if err != nil {
t.Fatalf("error appending data: %v", err)
}
checkFileSize(t, ctx, blockId, fileName, 80)
checkFileData(t, ctx, blockId, fileName, data)
_, barr, err := GBS.ReadAt(ctx, blockId, fileName, 42, 10)
checkFileSize(t, ctx, zoneId, fileName, 80)
checkFileData(t, ctx, zoneId, fileName, data)
_, barr, err := WFS.ReadAt(ctx, zoneId, fileName, 42, 10)
if err != nil {
t.Fatalf("error reading data: %v", err)
}
if string(barr) != data[42:52] {
t.Errorf("data mismatch: expected %q, got %q", data[42:52], string(barr))
}
GBS.WriteAt(ctx, blockId, fileName, 49, []byte("world"))
checkFileSize(t, ctx, blockId, fileName, 80)
checkFileDataAt(t, ctx, blockId, fileName, 49, "world")
checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4")
WFS.WriteAt(ctx, zoneId, fileName, 49, []byte("world"))
checkFileSize(t, ctx, zoneId, fileName, 80)
checkFileDataAt(t, ctx, zoneId, fileName, 49, "world")
checkFileDataAt(t, ctx, zoneId, fileName, 48, "8world4")
}
func testIntMapsEq(t *testing.T, msg string, m map[int]int, expected map[int]int) {
@ -521,7 +521,7 @@ func TestComputePartMap(t *testing.T) {
defer func() {
partDataSize = DefaultPartDataSize
}()
file := &BlockFile{}
file := &WaveFile{}
m := file.computePartMap(0, 250)
testIntMapsEq(t, "map1", m, map[int]int{0: 100, 1: 100, 2: 50})
m = file.computePartMap(110, 40)
@ -535,7 +535,7 @@ func TestComputePartMap(t *testing.T) {
testIntMapsEq(t, "map5", m, map[int]int{8: 80, 9: 100, 10: 100, 11: 60})
// now test circular
file = &BlockFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}}
file = &WaveFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}}
m = file.computePartMap(10, 250)
testIntMapsEq(t, "map6", m, map[int]int{0: 90, 1: 100, 2: 60})
m = file.computePartMap(990, 40)
@ -554,31 +554,31 @@ func TestSimpleDBFlush(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
zoneId := uuid.New().String()
fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!"))
err = WFS.WriteFile(ctx, zoneId, fileName, []byte("hello world!"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
_, err = GBS.FlushCache(ctx)
checkFileData(t, ctx, zoneId, fileName, "hello world!")
_, err = WFS.FlushCache(ctx)
if err != nil {
t.Fatalf("error flushing cache: %v", err)
}
if GBS.getCacheSize() != 0 {
if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch")
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
if GBS.getCacheSize() != 0 {
checkFileData(t, ctx, zoneId, fileName, "hello world!")
if WFS.getCacheSize() != 0 {
t.Errorf("cache size mismatch (after read)")
}
checkFileDataAt(t, ctx, blockId, fileName, 6, "world!")
checkFileSize(t, ctx, blockId, fileName, 12)
checkFileByteCount(t, ctx, blockId, fileName, 'l', 3)
checkFileDataAt(t, ctx, zoneId, fileName, 6, "world!")
checkFileSize(t, ctx, zoneId, fileName, 12)
checkFileByteCount(t, ctx, zoneId, fileName, 'l', 3)
}
func TestConcurrentAppend(t *testing.T) {
@ -586,9 +586,9 @@ func TestConcurrentAppend(t *testing.T) {
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
zoneId := uuid.New().String()
fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
err := WFS.MakeFile(ctx, zoneId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
@ -600,23 +600,23 @@ func TestConcurrentAppend(t *testing.T) {
const hexChars = "0123456789abcdef"
ch := hexChars[n]
for j := 0; j < 100; j++ {
err := GBS.AppendData(ctx, blockId, fileName, []byte{ch})
err := WFS.AppendData(ctx, zoneId, fileName, []byte{ch})
if err != nil {
t.Errorf("error appending data (%d): %v", n, err)
}
if j == 50 {
// ignore error here (concurrent flushing)
GBS.FlushCache(ctx)
WFS.FlushCache(ctx)
}
}
}(i)
}
wg.Wait()
checkFileSize(t, ctx, blockId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100)
GBS.FlushCache(ctx)
checkFileSize(t, ctx, blockId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100)
checkFileSize(t, ctx, zoneId, fileName, 1600)
checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100)
checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100)
WFS.FlushCache(ctx)
checkFileSize(t, ctx, zoneId, fileName, 1600)
checkFileByteCount(t, ctx, zoneId, fileName, 'a', 100)
checkFileByteCount(t, ctx, zoneId, fileName, 'e', 100)
}

View File

@ -4,17 +4,21 @@
package fileservice
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/wavetermdev/thenextwave/pkg/filestore"
"github.com/wavetermdev/thenextwave/pkg/util/utilfn"
"github.com/wavetermdev/thenextwave/pkg/wavebase"
)
const MaxFileSize = 10 * 1024 * 1024 // 10M
const DefaultTimeout = 2 * time.Second
type FileService struct{}
@ -103,3 +107,13 @@ func (fs *FileService) ReadFile(path string) (*FullFile, error) {
Data64: base64.StdEncoding.EncodeToString(barr),
}, nil
}
func (fs *FileService) GetWaveFile(id string, path string) (any, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
file, err := filestore.WFS.Stat(ctx, id, path)
if err != nil {
return nil, fmt.Errorf("error getting file: %w", err)
}
return file, nil
}

View File

@ -156,6 +156,9 @@ func SetMeta(waveObj WaveObj, meta map[string]any) {
}
func ToJsonMap(w WaveObj) (map[string]any, error) {
if w == nil {
return nil, nil
}
m := make(map[string]any)
dconfig := &mapstructure.DecoderConfig{
Result: &m,

View File

@ -14,7 +14,7 @@ import (
)
type SimpleCommandHandlerFn func(context.Context, *RpcServer, string, any) (any, error)
type StreamCommandHandlerFn func(context.Context, *RpcServer, string, any) error
type StreamCommandHandlerFn func(context.Context, *RpcServer, *RpcPacket) error
type RpcServer struct {
CVar *sync.Cond
@ -33,27 +33,54 @@ func MakeRpcServer(sendCh chan *RpcPacket, recvCh chan *RpcPacket) *RpcServer {
panic(fmt.Errorf("sendCh buffer size must be at least MaxInFlightPackets(%d)", MaxInFlightPackets))
}
rtn := &RpcServer{
CVar: sync.NewCond(&sync.Mutex{}),
NextSeqNum: &atomic.Int64{},
RespPacketsInFlight: make(map[int64]string),
AckList: nil,
RpcReqs: make(map[string]*RpcInfo),
SendCh: sendCh,
RecvCh: recvCh,
CVar: sync.NewCond(&sync.Mutex{}),
NextSeqNum: &atomic.Int64{},
RespPacketsInFlight: make(map[int64]string),
AckList: nil,
RpcReqs: make(map[string]*RpcInfo),
SendCh: sendCh,
RecvCh: recvCh,
SimpleCommandHandlers: make(map[string]SimpleCommandHandlerFn),
StreamCommandHandlers: make(map[string]StreamCommandHandlerFn),
}
go rtn.runRecvLoop()
return rtn
}
func (s *RpcServer) shouldUseStreamHandler(command string) bool {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
_, ok := s.StreamCommandHandlers[command]
return ok
}
func (s *RpcServer) getStreamHandler(command string) StreamCommandHandlerFn {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
return s.StreamCommandHandlers[command]
}
func (s *RpcServer) getSimpleHandler(command string) SimpleCommandHandlerFn {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
return s.SimpleCommandHandlers[command]
}
func (s *RpcServer) RegisterSimpleCommandHandler(command string, handler SimpleCommandHandlerFn) {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
if s.StreamCommandHandlers[command] != nil {
panic(fmt.Errorf("command %q already registered as a stream handler", command))
}
s.SimpleCommandHandlers[command] = handler
}
func (s *RpcServer) RegisterStreamCommandHandler(command string, handler StreamCommandHandlerFn) {
s.CVar.L.Lock()
defer s.CVar.L.Unlock()
if s.SimpleCommandHandlers[command] != nil {
panic(fmt.Errorf("command %q already registered as a simple handler", command))
}
s.StreamCommandHandlers[command] = handler
}
@ -67,10 +94,10 @@ func (s *RpcServer) runRecvLoop() {
for pk := range s.RecvCh {
s.handleAcks(pk.Acks)
if pk.RpcType == RpcType_Req {
if pk.ReqDone {
s.handleSimpleReq(pk)
} else {
if s.shouldUseStreamHandler(pk.Command) {
s.handleStreamReq(pk)
} else {
s.handleSimpleReq(pk)
}
continue
}
@ -163,8 +190,9 @@ func (s *RpcServer) handleAcks(acks []int64) {
func (s *RpcServer) handleSimpleReq(pk *RpcPacket) {
s.ackResp(pk.SeqNum)
handler, ok := s.SimpleCommandHandlers[pk.Command]
if !ok {
handler := s.getSimpleHandler(pk.Command)
if handler == nil {
s.sendErrorResp(pk, fmt.Errorf("unknown command: %s", pk.Command))
log.Printf("RpcServer.handleReq() unknown command: %s", pk.Command)
return
}
@ -201,11 +229,35 @@ func (s *RpcServer) grabAcks_nolock() []int64 {
return acks
}
func (s *RpcServer) sendErrorResp(pk *RpcPacket, err error) {
respPk := &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: true,
Error: err.Error(),
}
s.waitForSend(context.Background(), respPk)
}
func (s *RpcServer) makeRespPk(pk *RpcPacket, data any, done bool) *RpcPacket {
return &RpcPacket{
Command: pk.Command,
RpcId: pk.RpcId,
RpcType: RpcType_Resp,
SeqNum: s.NextSeqNum.Add(1),
RespDone: done,
Data: data,
}
}
func (s *RpcServer) handleStreamReq(pk *RpcPacket) {
s.ackResp(pk.SeqNum)
handler, ok := s.StreamCommandHandlers[pk.Command]
if !ok {
handler := s.getStreamHandler(pk.Command)
if handler == nil {
s.ackResp(pk.SeqNum)
s.sendErrorResp(pk, fmt.Errorf("unknown command: %s", pk.Command))
log.Printf("RpcServer.handleStreamReq() unknown command: %s", pk.Command)
return
}
@ -229,7 +281,7 @@ func (s *RpcServer) handleStreamReq(pk *RpcPacket) {
}()
ctx, cancelFn := makeContextFromTimeout(pk.Timeout)
defer cancelFn()
err := handler(ctx, s, pk.Command, pk.Data)
err := handler(ctx, s, pk)
if err != nil {
respPk := &RpcPacket{
Command: pk.Command,

View File

@ -5,6 +5,8 @@ package wshprc
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
@ -113,3 +115,80 @@ func TestStream(t *testing.T) {
}()
wg.Wait()
}
func TestSimpleClientServer(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
server := MakeRpcServer(recvCh, sendCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
server.RegisterSimpleCommandHandler("test", func(ctx context.Context, s *RpcServer, cmd string, data any) (any, error) {
if data != "hello" {
return nil, fmt.Errorf("expected 'hello', got '%s'", data)
}
return "world", nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.SimpleReq(ctx, "test", "hello")
if err != nil {
t.Errorf("SimpleReq() failed: %v", err)
return
}
if resp != "world" {
t.Errorf("SimpleReq() failed: expected 'world', got '%s'", resp)
}
}()
wg.Wait()
}
func TestStreamClientServer(t *testing.T) {
sendCh := make(chan *RpcPacket, MaxInFlightPackets)
recvCh := make(chan *RpcPacket, MaxInFlightPackets)
client := MakeRpcClient(sendCh, recvCh)
server := MakeRpcServer(recvCh, sendCh)
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
server.RegisterStreamCommandHandler("test", func(ctx context.Context, s *RpcServer, req *RpcPacket) error {
pk1 := s.makeRespPk(req, "one", false)
pk2 := s.makeRespPk(req, "two", false)
pk3 := s.makeRespPk(req, "three", true)
s.SendResponse(ctx, pk1)
s.SendResponse(ctx, pk2)
s.SendResponse(ctx, pk3)
return nil
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
respCh, err := client.StreamReq(ctx, "test", "hello", 2*time.Second)
if err != nil {
t.Errorf("StreamReq() failed: %v", err)
return
}
var result []string
for respPk := range respCh {
if respPk.Error != "" {
t.Errorf("StreamReq() failed: %v", respPk.Error)
return
}
log.Printf("got response: %#v", respPk)
result = append(result, respPk.Data.(string))
}
if len(result) != 3 {
t.Errorf("expected 3 responses, got %d", len(result))
return
}
if result[0] != "one" || result[1] != "two" || result[2] != "three" {
t.Errorf("expected 'one', 'two', 'three', got %v", result)
return
}
}()
wg.Wait()
}

View File

@ -103,7 +103,10 @@ func DBGetORef(ctx context.Context, oref waveobj.ORef) (waveobj.WaveObj, error)
table := tableNameFromOType(oref.OType)
query := fmt.Sprintf("SELECT oid, version, data FROM %s WHERE oid = ?", table)
var row idDataType
tx.Get(&row, query, oref.OID)
found := tx.Get(&row, query, oref.OID)
if !found {
return nil, nil
}
rtn, err := waveobj.FromJson(row.Data)
if err != nil {
return rtn, err