Merge branch 'main' into cole/file-backend-changes

This commit is contained in:
Cole Lashley 2024-05-06 19:44:58 -07:00 committed by GitHub
commit 25259428ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 278 additions and 172 deletions

View File

@ -22,7 +22,7 @@ jobs:
- uses: dashcamio/testdriver@main - uses: dashcamio/testdriver@main
id: testdriver id: testdriver
with: with:
version: v2.10.2 version: v2.12.5
prerun: | prerun: |
rm ~/Desktop/WITH-LOVE-FROM-AMERICA.txt rm ~/Desktop/WITH-LOVE-FROM-AMERICA.txt
cd ~/actions-runner/_work/testdriver/testdriver/ cd ~/actions-runner/_work/testdriver/testdriver/

View File

@ -34,9 +34,6 @@ let wasActive = true;
let wasInFg = true; let wasInFg = true;
let currentGlobalShortcut: string | null = null; let currentGlobalShortcut: string | null = null;
let initialClientData: ClientDataType = null; let initialClientData: ClientDataType = null;
let windows: Windows = {};
interface Windows extends Record<string, Electron.BrowserWindow> {}
checkPromptMigrate(); checkPromptMigrate();
ensureDir(waveHome); ensureDir(waveHome);
@ -325,10 +322,7 @@ function shFrameNavHandler(event: Electron.Event<Electron.WebContentsWillFrameNa
console.log("frame navigation canceled"); console.log("frame navigation canceled");
} }
function createWindow(id: string, clientData: ClientDataType | null): Electron.BrowserWindow { function createWindow(clientData: ClientDataType | null): Electron.BrowserWindow {
if (windows[id]) {
console.error(`createWindow called for existing window ${id}`);
}
const bounds = calcBounds(clientData); const bounds = calcBounds(clientData);
setKeyUtilPlatform(platform()); setKeyUtilPlatform(platform());
const win = new electron.BrowserWindow({ const win = new electron.BrowserWindow({
@ -376,18 +370,9 @@ function createWindow(id: string, clientData: ClientDataType | null): Electron.B
wasInFg = true; wasInFg = true;
wasActive = true; wasActive = true;
}); });
win.on("close", () => {
delete windows[id];
});
win.webContents.on("zoom-changed", (e) => { win.webContents.on("zoom-changed", (e) => {
win.webContents.send("zoom-changed"); win.webContents.send("zoom-changed");
}); });
windows[id] = win;
return win;
}
function createMainWindow(clientData: ClientDataType | null) {
const win = createWindow("main", clientData);
win.webContents.setWindowOpenHandler(({ url, frameName }) => { win.webContents.setWindowOpenHandler(({ url, frameName }) => {
if (url.startsWith("https://docs.waveterm.dev/")) { if (url.startsWith("https://docs.waveterm.dev/")) {
console.log("openExternal docs", url); console.log("openExternal docs", url);
@ -408,6 +393,7 @@ function createMainWindow(clientData: ClientDataType | null) {
console.log("window-open denied", url); console.log("window-open denied", url);
return { action: "deny" }; return { action: "deny" };
}); });
return win;
} }
function mainResizeHandler(_: any, win: Electron.BrowserWindow) { function mainResizeHandler(_: any, win: Electron.BrowserWindow) {
@ -673,13 +659,13 @@ async function getClientData(willRetry: boolean, retryNum: number): Promise<Clie
} }
function sendWSSC() { function sendWSSC() {
if (windows["main"] != null) { electron.BrowserWindow.getAllWindows().forEach((win) => {
if (waveSrvProc == null) { if (waveSrvProc == null) {
windows["main"].webContents.send("wavesrv-status-change", false); win.webContents.send("wavesrv-status-change", false);
return; } else {
} win.webContents.send("wavesrv-status-change", true, waveSrvProc.pid);
windows["main"].webContents.send("wavesrv-status-change", true, waveSrvProc.pid);
} }
});
} }
function runWaveSrv() { function runWaveSrv() {
@ -747,7 +733,7 @@ electron.ipcMain.on("context-editmenu", (_, { x, y }, opts) => {
menu.popup({ x, y }); menu.popup({ x, y });
}); });
async function createMainWindowWrap() { async function createWindowWrap() {
let clientData: ClientDataType | null = null; let clientData: ClientDataType | null = null;
try { try {
clientData = await getClientDataPoll(1); clientData = await getClientDataPoll(1);
@ -755,9 +741,9 @@ async function createMainWindowWrap() {
} catch (e) { } catch (e) {
console.log("error getting wavesrv clientdata", e.toString()); console.log("error getting wavesrv clientdata", e.toString());
} }
createMainWindow(clientData); const win = createWindow(clientData);
if (clientData?.winsize.fullscreen) { if (clientData?.winsize.fullscreen) {
windows["main"].setFullScreen(true); win.setFullScreen(true);
} }
configureAutoUpdaterStartup(clientData); configureAutoUpdaterStartup(clientData);
} }
@ -776,7 +762,7 @@ function logActiveState() {
console.log("error logging active state", err); console.log("error logging active state", err);
}); });
// for next iteration // for next iteration
wasInFg = windows["main"]?.isFocused(); wasInFg = electron.BrowserWindow.getFocusedWindow()?.isFocused() ?? false;
wasActive = false; wasActive = false;
} }
@ -802,9 +788,13 @@ function reregisterGlobalShortcut(shortcut: string) {
currentGlobalShortcut = null; currentGlobalShortcut = null;
return; return;
} }
const ok = electron.globalShortcut.register(shortcut, () => { const ok = electron.globalShortcut.register(shortcut, async () => {
console.log("global shortcut triggered, showing window"); console.log("global shortcut triggered, showing window");
windows["main"]?.show(); if (electron.BrowserWindow.getAllWindows().length == 0) {
await createWindowWrap();
}
const winToShow = electron.BrowserWindow.getFocusedWindow() ?? electron.BrowserWindow.getAllWindows()[0];
winToShow?.show();
}); });
console.log("registered global shortcut", shortcut, ok ? "ok" : "failed"); console.log("registered global shortcut", shortcut, ok ? "ok" : "failed");
if (!ok) { if (!ok) {
@ -829,9 +819,9 @@ let lastUpdateCheck: Date = null;
*/ */
function setAppUpdateStatus(status: string) { function setAppUpdateStatus(status: string) {
appUpdateStatus = status; appUpdateStatus = status;
if (windows["main"] != null) { electron.BrowserWindow.getAllWindows().forEach((window) => {
windows["main"].webContents.send("app-update-status", appUpdateStatus); window.webContents.send("app-update-status", appUpdateStatus);
} });
} }
/** /**
@ -915,9 +905,14 @@ async function installAppUpdate() {
detail: "A new version has been downloaded. Restart the application to apply the updates.", detail: "A new version has been downloaded. Restart the application to apply the updates.",
}; };
await electron.dialog.showMessageBox(windows["main"], dialogOpts).then(({ response }) => { const allWindows = electron.BrowserWindow.getAllWindows();
if (allWindows.length > 0) {
await electron.dialog
.showMessageBox(electron.BrowserWindow.getFocusedWindow() ?? allWindows[0], dialogOpts)
.then(({ response }) => {
if (response === 0) autoUpdater.quitAndInstall(); if (response === 0) autoUpdater.quitAndInstall();
}); });
}
} }
electron.ipcMain.on("install-app-update", () => fireAndForget(() => installAppUpdate())); electron.ipcMain.on("install-app-update", () => fireAndForget(() => installAppUpdate()));
@ -989,10 +984,11 @@ function configureAutoUpdater(enabled: boolean) {
} }
setTimeout(runActiveTimer, 5000); // start active timer, wait 5s just to be safe setTimeout(runActiveTimer, 5000); // start active timer, wait 5s just to be safe
await app.whenReady(); await app.whenReady();
await createMainWindowWrap(); await createWindowWrap();
app.on("activate", () => { app.on("activate", () => {
if (electron.BrowserWindow.getAllWindows().length === 0) { if (electron.BrowserWindow.getAllWindows().length === 0) {
createMainWindowWrap().then(); createWindowWrap().then();
} }
checkForUpdates(); checkForUpdates();
}); });

View File

@ -1128,6 +1128,11 @@ func main() {
log.Printf("[error] migrate up: %v\n", err) log.Printf("[error] migrate up: %v\n", err)
return return
} }
// err = blockstore.MigrateBlockstore()
// if err != nil {
// log.Printf("[error] migrate blockstore: %v\n", err)
// return
// }
clientData, err := sstore.EnsureClientData(context.Background()) clientData, err := sstore.EnsureClientData(context.Background())
if err != nil { if err != nil {
log.Printf("[error] ensuring client data: %v\n", err) log.Printf("[error] ensuring client data: %v\n", err)

View File

@ -0,0 +1 @@
-- nothing

View File

@ -0,0 +1,19 @@
CREATE TABLE block_file (
blockid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
maxsize bigint NOT NULL,
circular boolean NOT NULL,
size bigint NOT NULL,
createdts bigint NOT NULL,
modts bigint NOT NULL,
meta json NOT NULL,
PRIMARY KEY (blockid, name)
);
CREATE TABLE block_data (
blockid varchar(36) NOT NULL,
name varchar(200) NOT NULL,
partidx int NOT NULL,
data blob NOT NULL,
PRIMARY KEY(blockid, name, partidx)
);

View File

@ -10,3 +10,6 @@ import "embed"
//go:embed migrations/*.sql //go:embed migrations/*.sql
var MigrationFS embed.FS var MigrationFS embed.FS
//go:embed blockstore-migrations/*.sql
var BlockstoreMigrationFS embed.FS

View File

@ -5,7 +5,6 @@ go 1.22
toolchain go1.22.0 toolchain go1.22.0
require ( require (
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9
github.com/alessio/shellescape v1.4.1 github.com/alessio/shellescape v1.4.1
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2
github.com/creack/pty v1.1.18 github.com/creack/pty v1.1.18

View File

@ -1,5 +1,3 @@
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs=
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0=
github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs= github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs=
@ -57,7 +55,6 @@ github.com/sawka/txwrap v0.1.2 h1:v8xS0Z1LE7/6vMZA81PYihI+0TSR6Zm1MalzzBIuXKc=
github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw= github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/wavetermdev/ssh_config v0.0.0-20240306041034-17e2087ebde2 h1:onqZrJVap1sm15AiIGTfWzdr6cEF0KdtddeuuOVhzyY= github.com/wavetermdev/ssh_config v0.0.0-20240306041034-17e2087ebde2 h1:onqZrJVap1sm15AiIGTfWzdr6cEF0KdtddeuuOVhzyY=
@ -74,8 +71,6 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
mvdan.cc/sh/v3 v3.7.0 h1:lSTjdP/1xsddtaKfGg7Myu7DnlHItd3/M2tomOcNNBg= mvdan.cc/sh/v3 v3.7.0 h1:lSTjdP/1xsddtaKfGg7Myu7DnlHItd3/M2tomOcNNBg=

View File

@ -10,8 +10,6 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/alecthomas/units"
) )
type FileOptsType struct { type FileOptsType struct {
@ -32,7 +30,11 @@ type FileInfo struct {
Meta FileMeta Meta FileMeta
} }
const MaxBlockSize = int64(128 * units.Kilobyte) const UnitsKB = 1024 * 1024
const UnitsMB = 1024 * UnitsKB
const UnitsGB = 1024 * UnitsMB
const MaxBlockSize = int64(128 * UnitsKB)
const DefaultFlushTimeout = 1 * time.Second const DefaultFlushTimeout = 1 * time.Second
type CacheEntry struct { type CacheEntry struct {
@ -79,16 +81,23 @@ type BlockStore interface {
GetAllBlockIds(ctx context.Context) []string GetAllBlockIds(ctx context.Context) []string
} }
var cache map[string]*CacheEntry = make(map[string]*CacheEntry) var blockstoreCache map[string]*CacheEntry = make(map[string]*CacheEntry)
var globalLock *sync.Mutex = &sync.Mutex{} var globalLock *sync.Mutex = &sync.Mutex{}
var appendLock *sync.Mutex = &sync.Mutex{} var appendLock *sync.Mutex = &sync.Mutex{}
var flushTimeout = DefaultFlushTimeout var flushTimeout = DefaultFlushTimeout
var lastWriteTime time.Time var lastWriteTime time.Time
// for testing
func clearCache() {
globalLock.Lock()
defer globalLock.Unlock()
blockstoreCache = make(map[string]*CacheEntry)
}
func InsertFileIntoDB(ctx context.Context, fileInfo FileInfo) error { func InsertFileIntoDB(ctx context.Context, fileInfo FileInfo) error {
metaJson, err := json.Marshal(fileInfo.Meta) metaJson, err := json.Marshal(fileInfo.Meta)
if err != nil { if err != nil {
return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, err) return fmt.Errorf("error writing file %s to db: %v", fileInfo.Name, err)
} }
txErr := WithTx(ctx, func(tx *TxWrap) error { txErr := WithTx(ctx, func(tx *TxWrap) error {
query := `INSERT INTO block_file VALUES (?, ?, ?, ?, ?, ?, ?, ?)` query := `INSERT INTO block_file VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
@ -96,7 +105,7 @@ func InsertFileIntoDB(ctx context.Context, fileInfo FileInfo) error {
return nil return nil
}) })
if txErr != nil { if txErr != nil {
return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, txErr) return fmt.Errorf("error writing file %s to db: %v", fileInfo.Name, txErr)
} }
return nil return nil
} }
@ -104,7 +113,7 @@ func InsertFileIntoDB(ctx context.Context, fileInfo FileInfo) error {
func WriteFileToDB(ctx context.Context, fileInfo FileInfo) error { func WriteFileToDB(ctx context.Context, fileInfo FileInfo) error {
metaJson, err := json.Marshal(fileInfo.Meta) metaJson, err := json.Marshal(fileInfo.Meta)
if err != nil { if err != nil {
return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, err) return fmt.Errorf("error writing file %s to db: %v", fileInfo.Name, err)
} }
txErr := WithTx(ctx, func(tx *TxWrap) error { txErr := WithTx(ctx, func(tx *TxWrap) error {
query := `UPDATE block_file SET blockid = ?, name = ?, maxsize = ?, circular = ?, size = ?, createdts = ?, modts = ?, meta = ? where blockid = ? and name = ?` query := `UPDATE block_file SET blockid = ?, name = ?, maxsize = ?, circular = ?, size = ?, createdts = ?, modts = ?, meta = ? where blockid = ? and name = ?`
@ -112,7 +121,7 @@ func WriteFileToDB(ctx context.Context, fileInfo FileInfo) error {
return nil return nil
}) })
if txErr != nil { if txErr != nil {
return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, txErr) return fmt.Errorf("error writing file %s to db: %v", fileInfo.Name, txErr)
} }
return nil return nil
@ -125,7 +134,7 @@ func WriteDataBlockToDB(ctx context.Context, blockId string, name string, index
return nil return nil
}) })
if txErr != nil { if txErr != nil {
return fmt.Errorf("Error writing data block to db: %v", txErr) return fmt.Errorf("error writing data block to db: %v", txErr)
} }
return nil return nil
} }
@ -152,7 +161,7 @@ func WriteToCacheBlockNum(ctx context.Context, blockId string, name string, p []
defer cacheEntry.Lock.Unlock() defer cacheEntry.Lock.Unlock()
block, err := GetCacheBlock(ctx, blockId, name, cacheNum, pullFromDB) block, err := GetCacheBlock(ctx, blockId, name, cacheNum, pullFromDB)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("Error getting cache block: %v", err) return 0, 0, fmt.Errorf("error getting cache block: %v", err)
} }
var bytesWritten = 0 var bytesWritten = 0
blockLen := len(block.data) blockLen := len(block.data)
@ -192,7 +201,7 @@ func ReadFromCacheBlock(ctx context.Context, blockId string, name string, block
} }
}() }()
if pos > len(block.data) { if pos > len(block.data) {
return 0, fmt.Errorf("Reading past end of cache block, should never happen") return 0, fmt.Errorf("reading past end of cache block, should never happen")
} }
bytesWritten := 0 bytesWritten := 0
index := pos index := pos
@ -216,7 +225,7 @@ func ReadFromCacheBlock(ctx context.Context, blockId string, name string, block
return bytesWritten, nil return bytesWritten, nil
} }
const MaxSizeError = "Hit Max Size" const MaxSizeError = "MaxSizeError"
func WriteToCacheBuf(buf *[]byte, p []byte, pos int, length int, maxWrite int64) (int, error) { func WriteToCacheBuf(buf *[]byte, p []byte, pos int, length int, maxWrite int64) (int, error) {
bytesToWrite := length bytesToWrite := length
@ -260,7 +269,7 @@ func GetValuesFromCacheId(cacheId string) (blockId string, name string) {
func GetCacheEntry(ctx context.Context, blockId string, name string) (*CacheEntry, bool) { func GetCacheEntry(ctx context.Context, blockId string, name string) (*CacheEntry, bool) {
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
if curCacheEntry, found := cache[GetCacheId(blockId, name)]; found { if curCacheEntry, found := blockstoreCache[GetCacheId(blockId, name)]; found {
return curCacheEntry, true return curCacheEntry, true
} else { } else {
return nil, false return nil, false
@ -279,7 +288,7 @@ func GetCacheEntryOrPopulate(ctx context.Context, blockId string, name string) (
if cacheEntry, found := GetCacheEntry(ctx, blockId, name); found { if cacheEntry, found := GetCacheEntry(ctx, blockId, name); found {
return cacheEntry, nil return cacheEntry, nil
} else { } else {
return nil, fmt.Errorf("Error getting cache entry %v %v", blockId, name) return nil, fmt.Errorf("error getting cache entry %v %v", blockId, name)
} }
} }
@ -288,16 +297,16 @@ func GetCacheEntryOrPopulate(ctx context.Context, blockId string, name string) (
func SetCacheEntry(ctx context.Context, cacheId string, cacheEntry *CacheEntry) { func SetCacheEntry(ctx context.Context, cacheId string, cacheEntry *CacheEntry) {
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
if _, found := cache[cacheId]; found { if _, found := blockstoreCache[cacheId]; found {
return return
} }
cache[cacheId] = cacheEntry blockstoreCache[cacheId] = cacheEntry
} }
func DeleteCacheEntry(ctx context.Context, blockId string, name string) { func DeleteCacheEntry(ctx context.Context, blockId string, name string) {
globalLock.Lock() globalLock.Lock()
defer globalLock.Unlock() defer globalLock.Unlock()
delete(cache, GetCacheId(blockId, name)) delete(blockstoreCache, GetCacheId(blockId, name))
} }
func GetCacheBlock(ctx context.Context, blockId string, name string, cacheNum int, pullFromDB bool) (*CacheBlock, error) { func GetCacheBlock(ctx context.Context, blockId string, name string, cacheNum int, pullFromDB bool) (*CacheBlock, error) {
@ -392,7 +401,7 @@ func WriteAtHelper(ctx context.Context, blockId string, name string, p []byte, o
} }
fInfo, err := Stat(ctx, blockId, name) fInfo, err := Stat(ctx, blockId, name)
if err != nil { if err != nil {
return 0, fmt.Errorf("Write At err: %v", err) return 0, fmt.Errorf("WriteAt err: %v", err)
} }
if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular { if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular {
numOver := off / fInfo.Opts.MaxSize numOver := off / fInfo.Opts.MaxSize
@ -416,12 +425,12 @@ func WriteAtHelper(ctx context.Context, blockId string, name string, p []byte, o
b, err := WriteAtHelper(ctx, blockId, name, p, 0, false) b, err := WriteAtHelper(ctx, blockId, name, p, 0, false)
bytesWritten += b bytesWritten += b
if err != nil { if err != nil {
return bytesWritten, fmt.Errorf("Write to cache error: %v", err) return bytesWritten, fmt.Errorf("write to cache error: %v", err)
} }
break break
} }
} else { } else {
return bytesWritten, fmt.Errorf("Write to cache error: %v", err) return bytesWritten, fmt.Errorf("write to cache error: %v", err)
} }
} }
if len(p) == b { if len(p) == b {
@ -452,7 +461,7 @@ func GetAllBlockSizes(dataBlocks []*CacheBlock) (int, int) {
} }
func FlushCache(ctx context.Context) error { func FlushCache(ctx context.Context) error {
for _, cacheEntry := range cache { for _, cacheEntry := range blockstoreCache {
err := WriteFileToDB(ctx, *cacheEntry.Info) err := WriteFileToDB(ctx, *cacheEntry.Info)
if err != nil { if err != nil {
return err return err
@ -485,14 +494,14 @@ func ReadAt(ctx context.Context, blockId string, name string, p *[]byte, off int
bytesRead := 0 bytesRead := 0
fInfo, err := Stat(ctx, blockId, name) fInfo, err := Stat(ctx, blockId, name)
if err != nil { if err != nil {
return 0, fmt.Errorf("Read At err: %v", err) return 0, fmt.Errorf("ReadAt err: %v", err)
} }
if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular { if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular {
numOver := off / fInfo.Opts.MaxSize numOver := off / fInfo.Opts.MaxSize
off = off - (numOver * fInfo.Opts.MaxSize) off = off - (numOver * fInfo.Opts.MaxSize)
} }
if off > fInfo.Size { if off > fInfo.Size {
return 0, fmt.Errorf("Read At error: tried to read past the end of the file") return 0, fmt.Errorf("ReadAt error: tried to read past the end of the file")
} }
endReadPos := math.Min(float64(int64(len(*p))+off), float64(fInfo.Size)) endReadPos := math.Min(float64(int64(len(*p))+off), float64(fInfo.Size))
bytesToRead := int64(endReadPos) - off bytesToRead := int64(endReadPos) - off
@ -505,7 +514,7 @@ func ReadAt(ctx context.Context, blockId string, name string, p *[]byte, off int
for index := curCacheNum; index < curCacheNum+numCaches; index++ { for index := curCacheNum; index < curCacheNum+numCaches; index++ {
curCacheBlock, err := GetCacheBlock(ctx, blockId, name, index, true) curCacheBlock, err := GetCacheBlock(ctx, blockId, name, index, true)
if err != nil { if err != nil {
return bytesRead, fmt.Errorf("Error getting cache block: %v", err) return bytesRead, fmt.Errorf("error getting cache block: %v", err)
} }
cacheOffset := off - (int64(index) * MaxBlockSize) cacheOffset := off - (int64(index) * MaxBlockSize)
if cacheOffset < 0 { if cacheOffset < 0 {
@ -540,7 +549,7 @@ func ReadAt(ctx context.Context, blockId string, name string, p *[]byte, off int
break break
} }
} else { } else {
return bytesRead, fmt.Errorf("Read from cache error: %v", err) return bytesRead, fmt.Errorf("read from cache error: %v", err)
} }
} }
} }
@ -552,7 +561,7 @@ func AppendData(ctx context.Context, blockId string, name string, p []byte) (int
defer appendLock.Unlock() defer appendLock.Unlock()
fInfo, err := Stat(ctx, blockId, name) fInfo, err := Stat(ctx, blockId, name)
if err != nil { if err != nil {
return 0, fmt.Errorf("Append stat error: %v", err) return 0, fmt.Errorf("append stat error: %v", err)
} }
return WriteAt(ctx, blockId, name, p, fInfo.Size) return WriteAt(ctx, blockId, name, p, fInfo.Size)
} }
@ -564,12 +573,12 @@ func DeleteFile(ctx context.Context, blockId string, name string) error {
} }
func DeleteBlock(ctx context.Context, blockId string) error { func DeleteBlock(ctx context.Context, blockId string) error {
for cacheId, _ := range cache { for cacheId := range blockstoreCache {
curBlockId, name := GetValuesFromCacheId(cacheId) curBlockId, name := GetValuesFromCacheId(cacheId)
if curBlockId == blockId { if curBlockId == blockId {
err := DeleteFile(ctx, blockId, name) err := DeleteFile(ctx, blockId, name)
if err != nil { if err != nil {
return fmt.Errorf("Error deleting %v %v: %v", blockId, name, err) return fmt.Errorf("error deleting %v %v: %v", blockId, name, err)
} }
} }
} }

View File

@ -8,11 +8,16 @@ import (
"path" "path"
"sync" "sync"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/sawka/txwrap" "github.com/sawka/txwrap"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
"github.com/wavetermdev/waveterm/wavesrv/pkg/scbase" "github.com/wavetermdev/waveterm/wavesrv/pkg/scbase"
dbfs "github.com/wavetermdev/waveterm/wavesrv/db"
) )
const DBFileName = "blockstore.db" const DBFileName = "blockstore.db"
@ -21,12 +26,64 @@ type SingleConnDBGetter struct {
SingleConnLock *sync.Mutex SingleConnLock *sync.Mutex
} }
var dbWrap *SingleConnDBGetter var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}}
type TxWrap = txwrap.TxWrap type TxWrap = txwrap.TxWrap
func InitDBState() { func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
dbWrap = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}} fsVar, err := iofs.New(dbfs.BlockstoreMigrationFS, "blockstore-migrations")
if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err)
}
dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName())
m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl)
if err != nil {
return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err)
}
return m, nil
}
func MigrateBlockstore() error {
log.Printf("migrate blockstore\n")
m, err := MakeBlockstoreMigrate()
if err != nil {
return err
}
curVersion, dirty, err := GetMigrateVersion(m)
if dirty {
return fmt.Errorf("cannot migrate up, database is dirty")
}
if err != nil {
return fmt.Errorf("cannot get current migration version: %v", err)
}
defer m.Close()
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating blockstore: %w", err)
}
newVersion, _, err := GetMigrateVersion(m)
if err != nil {
return fmt.Errorf("cannot get new migration version: %v", err)
}
if newVersion != curVersion {
log.Printf("[db] blockstore migration done, version %d -> %d\n", curVersion, newVersion)
}
return nil
}
func GetMigrateVersion(m *migrate.Migrate) (uint, bool, error) {
if m == nil {
var err error
m, err = MakeBlockstoreMigrate()
if err != nil {
return 0, false, err
}
}
curVersion, dirty, err := m.Version()
if err == migrate.ErrNilVersion {
return 0, false, nil
}
return curVersion, dirty, err
} }
func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) {
@ -62,8 +119,12 @@ func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT
var globalDBLock = &sync.Mutex{} var globalDBLock = &sync.Mutex{}
var globalDB *sqlx.DB var globalDB *sqlx.DB
var globalDBErr error var globalDBErr error
var overrideDBName string
func GetDBName() string { func GetDBName() string {
if overrideDBName != "" {
return overrideDBName
}
scHome := scbase.GetWaveHomeDir() scHome := scbase.GetWaveHomeDir()
return path.Join(scHome, DBFileName) return path.Join(scHome, DBFileName)
} }

View File

@ -6,15 +6,17 @@ import (
"crypto/md5" "crypto/md5"
"crypto/rand" "crypto/rand"
"log" "log"
"os"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/alecthomas/units"
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
) )
const testOverrideDBName = "test-blockstore.db"
const bigFileSize = 10 * UnitsMB
type TestBlockType struct { type TestBlockType struct {
BlockId string BlockId string
Name string Name string
@ -22,6 +24,22 @@ type TestBlockType struct {
Data []byte Data []byte
} }
func initTestDb(t *testing.T) {
log.Printf("initTestDb: %v", t.Name())
os.Remove(testOverrideDBName)
overrideDBName = testOverrideDBName
err := MigrateBlockstore()
if err != nil {
t.Fatalf("MigrateBlockstore error: %v", err)
}
}
func cleanupTestDB(t *testing.T) {
clearCache()
CloseDB()
os.Remove(testOverrideDBName)
}
func (b *TestBlockType) ToMap() map[string]interface{} { func (b *TestBlockType) ToMap() map[string]interface{} {
rtn := make(map[string]interface{}) rtn := make(map[string]interface{})
return rtn return rtn
@ -35,22 +53,17 @@ func (b *TestBlockType) FromMap(m map[string]interface{}) bool {
return true return true
} }
func Cleanup(t *testing.T, ctx context.Context) {
DeleteBlock(ctx, "test-block-id")
}
func CleanupName(t *testing.T, ctx context.Context, blockId string) {
DeleteBlock(ctx, blockId)
}
func TestGetDB(t *testing.T) { func TestGetDB(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
GetDBTimeout := 10 * time.Second GetDBTimeout := 10 * time.Second
ctx, _ := context.WithTimeout(context.Background(), GetDBTimeout) ctx, cancelFn := context.WithTimeout(context.Background(), GetDBTimeout)
defer cancelFn()
_, err := GetDB(ctx) _, err := GetDB(ctx)
if err != nil { if err != nil {
t.Errorf("TestInitDB error: %v", err) t.Errorf("TestInitDB error: %v", err)
} }
CloseDB()
} }
func SimpleAssert(t *testing.T, condition bool, description string) { func SimpleAssert(t *testing.T, condition bool, description string) {
@ -82,9 +95,11 @@ func InsertIntoBlockData(t *testing.T, ctx context.Context, blockId string, name
} }
func TestTx(t *testing.T) { func TestTx(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
SetFlushTimeout(2 * time.Minute) SetFlushTimeout(2 * time.Minute)
InitDBState()
txErr := WithTx(ctx, func(tx *TxWrap) error { txErr := WithTx(ctx, func(tx *TxWrap) error {
query := `INSERT into block_data values ('test-block-id', 'test-file-name', 0, 256)` query := `INSERT into block_data values ('test-block-id', 'test-file-name', 0, 256)`
tx.Exec(query) tx.Exec(query)
@ -127,11 +142,13 @@ func TestTx(t *testing.T) {
if txErr != nil { if txErr != nil {
t.Errorf("TestTx error deleting test entries: %v", txErr) t.Errorf("TestTx error deleting test entries: %v", txErr)
} }
CloseDB()
} }
func TestMultipleChunks(t *testing.T) { func TestMultipleChunks(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
InitDBState()
InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 0, make([]byte, 5)) InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 0, make([]byte, 5))
InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 1, make([]byte, 5)) InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 1, make([]byte, 5))
InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 2, make([]byte, 5)) InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 2, make([]byte, 5))
@ -178,7 +195,9 @@ func TestMultipleChunks(t *testing.T) {
} }
func TestMakeFile(t *testing.T) { func TestMakeFile(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -205,7 +224,7 @@ func TestMakeFile(t *testing.T) {
log.Printf("cur file info: %v", curFileInfo) log.Printf("cur file info: %v", curFileInfo)
SimpleAssert(t, curFileInfo.Name == "file-1", "correct file name") SimpleAssert(t, curFileInfo.Name == "file-1", "correct file name")
SimpleAssert(t, curFileInfo.Meta["test-descriptor"] == true, "meta correct") SimpleAssert(t, curFileInfo.Meta["test-descriptor"] == true, "meta correct")
curCacheEntry := cache[GetCacheId("test-block-id", "file-1")] curCacheEntry := blockstoreCache[GetCacheId("test-block-id", "file-1")]
curFileInfo = curCacheEntry.Info curFileInfo = curCacheEntry.Info
log.Printf("cache entry: %v", curCacheEntry) log.Printf("cache entry: %v", curCacheEntry)
SimpleAssert(t, curFileInfo.Name == "file-1", "cache correct file name") SimpleAssert(t, curFileInfo.Name == "file-1", "cache correct file name")
@ -218,15 +237,16 @@ func TestMakeFile(t *testing.T) {
if txErr != nil { if txErr != nil {
t.Errorf("TestTx error deleting test entries: %v", txErr) t.Errorf("TestTx error deleting test entries: %v", txErr)
} }
Cleanup(t, ctx)
} }
func TestWriteAt(t *testing.T) { func TestWriteAt(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -244,7 +264,10 @@ func TestWriteAt(t *testing.T) {
} else { } else {
log.Printf("Write at no errors: %v", bytesWritten) log.Printf("Write at no errors: %v", bytesWritten)
} }
SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") if bytesWritten != len(testBytesToWrite) {
t.Errorf("WriteAt error: towrite:%d written:%d err:%v\n", len(testBytesToWrite), bytesWritten, err)
return
}
cacheData, err = GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) cacheData, err = GetCacheBlock(ctx, "test-block-id", "file-1", 0, false)
if err != nil { if err != nil {
t.Errorf("Error getting cache: %v", err) t.Errorf("Error getting cache: %v", err)
@ -313,15 +336,16 @@ func TestWriteAt(t *testing.T) {
} }
log.Printf("Got stat: %v", fInfo) log.Printf("Got stat: %v", fInfo)
SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size")
Cleanup(t, ctx)
} }
func TestWriteAtLeftPad(t *testing.T) { func TestWriteAtLeftPad(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -349,14 +373,16 @@ func TestWriteAtLeftPad(t *testing.T) {
} }
log.Printf("Got stat: %v %v %v", fInfo, fInfo.Size, len(cacheData.data)) log.Printf("Got stat: %v %v %v", fInfo, fInfo.Size, len(cacheData.data))
SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size")
Cleanup(t, ctx)
} }
func TestReadAt(t *testing.T) { func TestReadAt(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -399,14 +425,16 @@ func TestReadAt(t *testing.T) {
} }
SimpleAssert(t, bytesRead == (11-4), "Correct num bytes read") SimpleAssert(t, bytesRead == (11-4), "Correct num bytes read")
log.Printf("bytes read: %v string: %s", read, string(read)) log.Printf("bytes read: %v string: %s", read, string(read))
Cleanup(t, ctx)
} }
func TestFlushCache(t *testing.T) { func TestFlushCache(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -461,17 +489,16 @@ func TestFlushCache(t *testing.T) {
t.Errorf("get data from db error: %v", txErr) t.Errorf("get data from db error: %v", txErr)
} }
log.Printf("DB Data: %v", dbData) log.Printf("DB Data: %v", dbData)
Cleanup(t, ctx)
} }
var largeDataFlushFullWriteSize int64 = int64(1024 * units.Megabyte) var largeDataFlushFullWriteSize int64 = 64 * UnitsKB
func WriteLargeDataFlush(t *testing.T, ctx context.Context) { func WriteLargeDataFlush(t *testing.T, ctx context.Context) {
writeSize := int64(64 - 16) writeSize := int64(64 - 16)
fullWriteSize := largeDataFlushFullWriteSize fullWriteSize := largeDataFlushFullWriteSize
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -524,6 +551,9 @@ func WriteLargeDataFlush(t *testing.T, ctx context.Context) {
SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal") SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal")
} }
func TestWriteAtMaxSize(t *testing.T) { func TestWriteAtMaxSize(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -544,11 +574,12 @@ func TestWriteAtMaxSize(t *testing.T) {
log.Printf("readbuf: %v\n", readBuf) log.Printf("readbuf: %v\n", readBuf)
SimpleAssert(t, bytesRead == 4, "Correct num bytes read") SimpleAssert(t, bytesRead == 4, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read")
Cleanup(t, ctx)
} }
func TestWriteAtMaxSizeMultipleBlocks(t *testing.T) { func TestWriteAtMaxSizeMultipleBlocks(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -569,11 +600,12 @@ func TestWriteAtMaxSizeMultipleBlocks(t *testing.T) {
log.Printf("readbuf multiple: %v %v %v\n", readBuf, bytesRead, bytesWritten) log.Printf("readbuf multiple: %v %v %v\n", readBuf, bytesRead, bytesWritten)
SimpleAssert(t, bytesRead == 4, "Correct num bytes read") SimpleAssert(t, bytesRead == 4, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read")
Cleanup(t, ctx)
} }
func TestWriteAtCircular(t *testing.T) { func TestWriteAtCircular(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -603,11 +635,12 @@ func TestWriteAtCircular(t *testing.T) {
SimpleAssert(t, bytesRead == 7, "Correct num bytes read") SimpleAssert(t, bytesRead == 7, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read") SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read")
log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead) log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead)
Cleanup(t, ctx)
} }
func TestWriteAtCircularWierdOffset(t *testing.T) { func TestWriteAtCircularWierdOffset(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -646,11 +679,12 @@ func TestWriteAtCircularWierdOffset(t *testing.T) {
SimpleAssert(t, bytesRead == 7, "Correct num bytes read") SimpleAssert(t, bytesRead == 7, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read") SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read")
log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead) log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead)
Cleanup(t, ctx)
} }
func TestAppend(t *testing.T) { func TestAppend(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
@ -691,7 +725,6 @@ func TestAppend(t *testing.T) {
} }
SimpleAssert(t, bytesRead == bytesWritten+4, "Correct num bytes read") SimpleAssert(t, bytesRead == bytesWritten+4, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf, readTestBytes), "Correct bytes read") SimpleAssert(t, bytes.Equal(readBuf, readTestBytes), "Correct bytes read")
Cleanup(t, ctx)
} }
func AppendSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup) { func AppendSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup) {
@ -705,13 +738,15 @@ func AppendSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup) {
SimpleAssert(t, bytesWritten == 1, "Correct bytes written") SimpleAssert(t, bytesWritten == 1, "Correct bytes written")
} }
func TestAppendSync(t *testing.T) { func TestAppendSync(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
var wg sync.WaitGroup var wg sync.WaitGroup
numWorkers := 10 numWorkers := 10
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -729,15 +764,6 @@ func TestAppendSync(t *testing.T) {
} }
log.Printf("read buf : %v", readBuf) log.Printf("read buf : %v", readBuf)
SimpleAssert(t, bytesRead == numWorkers, "Correct bytes read") SimpleAssert(t, bytesRead == numWorkers, "Correct bytes read")
CleanupName(t, ctx, "test-block-id-sync")
}
func TestAppendSyncMultiple(t *testing.T) {
numTests := 100
for index := 0; index < numTests; index++ {
TestAppendSync(t)
log.Printf("finished test: %v", index)
}
} }
func WriteAtSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, index int64) { func WriteAtSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, index int64) {
@ -753,13 +779,15 @@ func WriteAtSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, in
} }
func TestWriteAtSync(t *testing.T) { func TestWriteAtSync(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
var wg sync.WaitGroup var wg sync.WaitGroup
numWorkers := 10 numWorkers := 10
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -777,22 +805,16 @@ func TestWriteAtSync(t *testing.T) {
} }
log.Printf("read buf : %v", readBuf) log.Printf("read buf : %v", readBuf)
SimpleAssert(t, bytesRead == numWorkers, "Correct num bytes read") SimpleAssert(t, bytesRead == numWorkers, "Correct num bytes read")
CleanupName(t, ctx, "test-block-id-sync")
}
func TestWriteAtSyncMultiple(t *testing.T) {
numTests := 100
for index := 0; index < numTests; index++ {
TestWriteAtSync(t)
}
} }
func TestWriteFile(t *testing.T) { func TestWriteFile(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'}
bytesWritten, err := WriteFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts, testBytesToWrite) bytesWritten, err := WriteFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts, testBytesToWrite)
if err != nil { if err != nil {
@ -807,15 +829,16 @@ func TestWriteFile(t *testing.T) {
SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read")
log.Printf("bytes read: %v string: %s", read, string(read)) log.Printf("bytes read: %v string: %s", read, string(read))
SimpleAssert(t, bytes.Equal(read, testBytesToWrite), "Correct bytes read") SimpleAssert(t, bytes.Equal(read, testBytesToWrite), "Correct bytes read")
Cleanup(t, ctx)
} }
func TestWriteMeta(t *testing.T) { func TestWriteMeta(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -843,15 +866,16 @@ func TestWriteMeta(t *testing.T) {
} }
log.Printf("meta: %v", fInfo.Meta) log.Printf("meta: %v", fInfo.Meta)
SimpleAssert(t, fInfo.Meta["second-test-descriptor"] == "test1", "Retrieved second meta correctly") SimpleAssert(t, fInfo.Meta["second-test-descriptor"] == "test1", "Retrieved second meta correctly")
Cleanup(t, ctx)
} }
func TestGetAllBlockIds(t *testing.T) { func TestGetAllBlockIds(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts) err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts)
err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts) err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts)
@ -864,16 +888,17 @@ func TestGetAllBlockIds(t *testing.T) {
testBlockIdArr := []string{"test-block-id", "test-block-id-2", "test-block-id-3"} testBlockIdArr := []string{"test-block-id", "test-block-id-2", "test-block-id-3"}
for idx, val := range blockIds { for idx, val := range blockIds {
SimpleAssert(t, testBlockIdArr[idx] == val, "Correct blockid value") SimpleAssert(t, testBlockIdArr[idx] == val, "Correct blockid value")
CleanupName(t, ctx, val)
} }
} }
func TestListFiles(t *testing.T) { func TestListFiles(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts) err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts)
err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts) err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts)
@ -893,19 +918,18 @@ func TestListFiles(t *testing.T) {
for idx, val := range files { for idx, val := range files {
SimpleAssert(t, val.Name == blockid_1_files[idx], "Correct file name") SimpleAssert(t, val.Name == blockid_1_files[idx], "Correct file name")
} }
CleanupName(t, ctx, "test-block-id")
CleanupName(t, ctx, "test-block-id-2")
CleanupName(t, ctx, "test-block-id-3")
} }
func TestFlushTimer(t *testing.T) { func TestFlushTimer(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
testFlushTimeout := 10 * time.Second testFlushTimeout := 10 * time.Second
SetFlushTimeout(testFlushTimeout) SetFlushTimeout(testFlushTimeout)
InitDBState()
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -958,22 +982,12 @@ func TestFlushTimer(t *testing.T) {
t.Errorf("get data from db error: %v", txErr) t.Errorf("get data from db error: %v", txErr)
} }
log.Printf("DB Data: %v", dbData) log.Printf("DB Data: %v", dbData)
Cleanup(t, ctx)
} }
func TestFlushTimerMultiple(t *testing.T) {
testFlushTimeout := 1 * time.Second
SetFlushTimeout(testFlushTimeout)
numTests := 10
for index := 0; index < numTests; index++ {
TestWriteAt(t)
time.Sleep(500 * time.Millisecond)
}
}
// time consuming test
func TestWriteAtMiddle(t *testing.T) { func TestWriteAtMiddle(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
WriteLargeDataFlush(t, ctx) WriteLargeDataFlush(t, ctx)
testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'}
@ -989,23 +1003,26 @@ func TestWriteAtMiddle(t *testing.T) {
log.Printf("readBuf: %v %v", readBuf, string(readBuf)) log.Printf("readBuf: %v %v", readBuf, string(readBuf))
SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read")
SimpleAssert(t, bytes.Equal(readBuf, testBytesToWrite), "read correct bytes") SimpleAssert(t, bytes.Equal(readBuf, testBytesToWrite), "read correct bytes")
Cleanup(t, ctx)
} }
func TestWriteLargeDataFlush(t *testing.T) { func TestWriteLargeDataFlush(t *testing.T) {
initTestDb(t)
defer cleanupTestDB(t)
ctx := context.Background() ctx := context.Background()
WriteLargeDataFlush(t, ctx) WriteLargeDataFlush(t, ctx)
Cleanup(t, ctx)
} }
func TestWriteLargeDataNoFlush(t *testing.T) { func TestWriteLargeDataNoFlush(t *testing.T) {
InitDBState() initTestDb(t)
defer cleanupTestDB(t)
writeSize := int64(64 - 16) writeSize := int64(64 - 16)
fullWriteSize := int64(1024 * units.Megabyte) fullWriteSize := int64(64 * UnitsKB)
ctx := context.Background() ctx := context.Background()
fileMeta := make(FileMeta) fileMeta := make(FileMeta)
fileMeta["test-descriptor"] = true fileMeta["test-descriptor"] = true
fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} fileOpts := FileOptsType{MaxSize: bigFileSize, Circular: false, IJson: false}
err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts)
if err != nil { if err != nil {
t.Fatalf("MakeFile error: %v", err) t.Fatalf("MakeFile error: %v", err)
@ -1028,11 +1045,13 @@ func TestWriteLargeDataNoFlush(t *testing.T) {
copy(hashBuf, hash.Sum(nil)) copy(hashBuf, hash.Sum(nil))
bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", writeBuf, writeIndex) bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", writeBuf, writeIndex)
if int64(bytesWritten) != writeSize { if int64(bytesWritten) != writeSize {
log.Printf("write issue: %v %v \n", bytesWritten, writeSize) t.Errorf("write issue: %v %v %v err:%v\n", bytesWritten, writeSize, writeIndex, err)
return
} }
if err != nil { if err != nil {
log.Printf("error: %v", err) log.Printf("error: %v", err)
t.Errorf("Write At error: %v\n", err) t.Errorf("Write At error: %v\n", err)
return
} }
writeIndex += int64(bytesWritten) writeIndex += int64(bytesWritten)
} }
@ -1060,7 +1079,6 @@ func TestWriteLargeDataNoFlush(t *testing.T) {
} }
log.Printf("final hash: %v readBuf: %v, bytesRead: %v", readHashBuf, readBuf, readIndex) log.Printf("final hash: %v readBuf: %v, bytesRead: %v", readHashBuf, readBuf, readIndex)
SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal") SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal")
Cleanup(t, ctx)
} }
// saving this code for later // saving this code for later

View File

@ -17,7 +17,7 @@ import (
_ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/golang-migrate/migrate/v4/source/iofs" "github.com/golang-migrate/migrate/v4/source/iofs"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
sh2db "github.com/wavetermdev/waveterm/wavesrv/db" dbfs "github.com/wavetermdev/waveterm/wavesrv/db"
"github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4"
) )
@ -29,7 +29,7 @@ const CmdLineSpecialMigration = 20
const RISpecialMigration = 30 const RISpecialMigration = 30
func MakeMigrate() (*migrate.Migrate, error) { func MakeMigrate() (*migrate.Migrate, error) {
fsVar, err := iofs.New(sh2db.MigrationFS, "migrations") fsVar, err := iofs.New(dbfs.MigrationFS, "migrations")
if err != nil { if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err) return nil, fmt.Errorf("opening iofs: %w", err)
} }