enable testing, bug fixes

This commit is contained in:
sawka 2024-05-13 00:02:32 -07:00
parent 4da5a4f610
commit 68a6605209
6 changed files with 141 additions and 36 deletions

View File

@ -1,11 +1,10 @@
CREATE TABLE db_block_file ( CREATE TABLE db_block_file (
blockid varchar(36) NOT NULL, blockid varchar(36) NOT NULL,
name varchar(200) NOT NULL, name varchar(200) NOT NULL,
maxsize bigint NOT NULL,
circular boolean NOT NULL,
size bigint NOT NULL, size bigint NOT NULL,
createdts bigint NOT NULL, createdts bigint NOT NULL,
modts bigint NOT NULL, modts bigint NOT NULL,
opts json NOT NULL,
meta json NOT NULL, meta json NOT NULL,
PRIMARY KEY (blockid, name) PRIMARY KEY (blockid, name)
); );

View File

@ -14,11 +14,13 @@ import (
"time" "time"
) )
const PartDataSize = 64 * 1024 const DefaultPartDataSize = 64 * 1024
const DefaultFlushTime = 5 * time.Second const DefaultFlushTime = 5 * time.Second
const NoPartIdx = -1 const NoPartIdx = -1
var GlobalBlockStore *BlockStore = &BlockStore{ var partDataSize int64 = DefaultPartDataSize // overridden in tests
var GBS *BlockStore = &BlockStore{
Lock: &sync.Mutex{}, Lock: &sync.Mutex{},
Cache: make(map[cacheKey]*CacheEntry), Cache: make(map[cacheKey]*CacheEntry),
FlushTime: DefaultFlushTime, FlushTime: DefaultFlushTime,
@ -81,6 +83,11 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
if opts.Circular && opts.IJson { if opts.Circular && opts.IJson {
return fmt.Errorf("circular file cannot be ijson") return fmt.Errorf("circular file cannot be ijson")
} }
if opts.Circular {
if opts.MaxSize%partDataSize != 0 {
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
}
}
now := time.Now().UnixMilli() now := time.Now().UnixMilli()
file := &BlockFile{ file := &BlockFile{
BlockId: blockId, BlockId: blockId,
@ -124,7 +131,7 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl
if ok { if ok {
return file, nil return file, nil
} }
return dbGetFile(ctx, blockId, name) return dbGetBlockFile(ctx, blockId, name)
} }
func stripNils[T any](arr []*T) []*T { func stripNils[T any](arr []*T) []*T {
@ -163,7 +170,7 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error {
file, ok := s.getFileFromCache(blockId, name) file, ok := s.getFileFromCache(blockId, name)
if !ok { if !ok {
dbFile, err := dbGetFile(ctx, blockId, name) dbFile, err := dbGetBlockFile(ctx, blockId, name)
if err != nil { if err != nil {
return fmt.Errorf("error getting file: %v", err) return fmt.Errorf("error getting file: %v", err)
} }
@ -195,7 +202,7 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri
} }
return file, nil return file, nil
} }
dbFile, err := dbGetFile(ctx, blockId, name) dbFile, err := dbGetBlockFile(ctx, blockId, name)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting file: %v", err) return nil, fmt.Errorf("error getting file: %v", err)
} }
@ -222,16 +229,16 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri
} }
func (f *BlockFile) getLastIncompletePartNum() int { func (f *BlockFile) getLastIncompletePartNum() int {
if f.Size%PartDataSize == 0 { if f.Size%partDataSize == 0 {
return NoPartIdx return NoPartIdx
} }
return f.partIdxAtOffset(f.Size) return f.partIdxAtOffset(f.Size)
} }
func (f *BlockFile) partIdxAtOffset(offset int64) int { func (f *BlockFile) partIdxAtOffset(offset int64) int {
partIdx := int(offset / PartDataSize) partIdx := int(offset / partDataSize)
if f.Opts.Circular { if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / PartDataSize) maxPart := int(f.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart partIdx = partIdx % maxPart
} }
return partIdx return partIdx
@ -363,9 +370,9 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
} }
} }
var partsNeeded []int var partsNeeded []int
lastPartOffset := (offset + size) % PartDataSize lastPartOffset := (offset + size) % partDataSize
endOffsetOfLastPart := offset + size - lastPartOffset + PartDataSize endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize
for i := offset; i < endOffsetOfLastPart; i += PartDataSize { for i := offset; i < endOffsetOfLastPart; i += partDataSize {
partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) partsNeeded = append(partsNeeded, file.partIdxAtOffset(i))
} }
dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded)
@ -398,12 +405,12 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
partDataEntry := dataEntries[partIdx] partDataEntry := dataEntries[partIdx]
var partData []byte var partData []byte
if partDataEntry == nil { if partDataEntry == nil {
partData = make([]byte, PartDataSize) partData = make([]byte, partDataSize)
} else { } else {
partData = partDataEntry.Data[0:PartDataSize] partData = partDataEntry.Data[0:partDataSize]
} }
partOffset := curReadOffset % PartDataSize partOffset := curReadOffset % partDataSize
amtToRead := minInt64(PartDataSize-partOffset, amtLeftToRead) amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...)
amtLeftToRead -= amtToRead amtLeftToRead -= amtToRead
curReadOffset += amtToRead curReadOffset += amtToRead

View File

@ -50,7 +50,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry {
if create && e.DataEntries[partIdx] == nil { if create && e.DataEntries[partIdx] == nil {
e.DataEntries[partIdx] = &DataCacheEntry{ e.DataEntries[partIdx] = &DataCacheEntry{
PartIdx: partIdx, PartIdx: partIdx,
Data: make([]byte, 0, PartDataSize), Data: make([]byte, 0, partDataSize),
Dirty: &atomic.Bool{}, Dirty: &atomic.Bool{},
} }
} }
@ -58,7 +58,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry {
} }
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 {
leftInPart := PartDataSize - offset leftInPart := partDataSize - offset
toWrite := int64(len(data)) toWrite := int64(len(data))
if toWrite > leftInPart { if toWrite > leftInPart {
toWrite = leftInPart toWrite = leftInPart
@ -73,12 +73,12 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 {
func (entry *CacheEntry) writeAt(offset int64, data []byte) { func (entry *CacheEntry) writeAt(offset int64, data []byte) {
for len(data) > 0 { for len(data) > 0 {
partIdx := int(offset / PartDataSize) partIdx := int(offset / partDataSize)
if entry.FileEntry.File.Opts.Circular { if entry.FileEntry.File.Opts.Circular {
maxPart := int(entry.FileEntry.File.Opts.MaxSize / PartDataSize) maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart partIdx = partIdx % maxPart
} }
partOffset := offset % PartDataSize partOffset := offset % partDataSize
partData := entry.ensurePart(partIdx, true) partData := entry.ensurePart(partIdx, true)
nw := partData.writeToPart(partOffset, data) nw := partData.writeToPart(partOffset, data)
data = data[nw:] data = data[nw:]

View File

@ -12,7 +12,7 @@ func dbInsertFile(ctx context.Context, file *BlockFile) error {
// will fail if file already exists // will fail if file already exists
return WithTx(ctx, func(tx *TxWrap) error { return WithTx(ctx, func(tx *TxWrap) error {
query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, file.Opts, file.Meta) tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta))
return nil return nil
}) })
} }
@ -46,12 +46,11 @@ func dbDeleteBlock(ctx context.Context, blockId string) error {
}) })
} }
func dbGetFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) {
var file BlockFile
query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?"
tx.Get(&file, query, blockId, name) file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name)
return &file, nil return file, nil
}) })
} }
@ -80,9 +79,8 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in
func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) {
var files []*BlockFile
query := "SELECT * FROM db_block_file WHERE blockid = ?" query := "SELECT * FROM db_block_file WHERE blockid = ?"
tx.Select(&files, query, blockId) files := dbutil.SelectMappable[*BlockFile](tx, query, blockId)
return files, nil return files, nil
}) })
} }
@ -99,7 +97,7 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri
} }
if fileEntry.Dirty.Load() { if fileEntry.Dirty.Load() {
query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?` query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?`
tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, fileEntry.File.Opts, fileEntry.File.Meta, fileEntry.File.BlockId, fileEntry.File.Name) tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name)
} }
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
for _, dataEntry := range dataEntries { for _, dataEntry := range dataEntries {

View File

@ -0,0 +1,79 @@
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
"context"
"testing"
"time"
"github.com/google/uuid"
)
func initDb(t *testing.T) {
t.Logf("initializing db for %q", t.Name())
useTestingDb = true
partDataSize = 64
err := MigrateBlockstore(false)
if err != nil {
t.Fatalf("error migrating blockstore: %v", err)
}
}
func cleanupDb(t *testing.T) {
t.Logf("cleaning up db for %q", t.Name())
globalDBLock.Lock()
defer globalDBLock.Unlock()
if globalDB != nil {
globalDB.Close()
globalDB = nil
}
globalDBErr = nil
useTestingDb = false
partDataSize = DefaultPartDataSize
}
func TestCreate(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file == nil {
t.Fatalf("file not found")
}
if file.BlockId != blockId {
t.Fatalf("block id mismatch")
}
if file.Name != "testfile" {
t.Fatalf("name mismatch")
}
if file.Size != 0 {
t.Fatalf("size mismatch")
}
if file.CreatedTs == 0 {
t.Fatalf("created ts zero")
}
if file.ModTs == 0 {
t.Fatalf("mod ts zero")
}
if file.CreatedTs != file.ModTs {
t.Fatalf("create ts != mod ts")
}
if len(file.Meta) != 0 {
t.Fatalf("meta should have no values")
}
if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 {
t.Fatalf("opts not empty")
}
}

View File

@ -14,7 +14,7 @@ import (
"github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/wavetermdev/thenextwave/pkg/wavebase"
"github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/sqlite3" sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs" "github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
@ -35,9 +35,10 @@ var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex
var globalDBLock = &sync.Mutex{} var globalDBLock = &sync.Mutex{}
var globalDB *sqlx.DB var globalDB *sqlx.DB
var globalDBErr error var globalDBErr error
var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db)
func InitBlockstore() error { func InitBlockstore() error {
err := MigrateBlockstore() err := MigrateBlockstore(false)
if err != nil { if err != nil {
return err return err
} }
@ -63,6 +64,16 @@ func GetDB(ctx context.Context) (*sqlx.DB, error) {
globalDBLock.Lock() globalDBLock.Lock()
defer globalDBLock.Unlock() defer globalDBLock.Unlock()
if globalDB == nil && globalDBErr == nil { if globalDB == nil && globalDBErr == nil {
if useTestingDb {
dbName := ":memory:"
globalDB, globalDBErr = sqlx.Open("sqlite3", dbName)
if globalDBErr != nil {
log.Printf("[db] in-memory db err: %v\n", globalDBErr)
} else {
log.Printf("[db] using in-memory db\n")
}
return globalDB, globalDBErr
}
dbName := GetDBName() dbName := GetDBName()
globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName))
if globalDBErr != nil { if globalDBErr != nil {
@ -110,15 +121,24 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("opening iofs: %w", err) return nil, fmt.Errorf("opening iofs: %w", err)
} }
dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) defer cancelFn()
db, err := GetDB(ctx)
if err != nil {
return nil, err
}
mdriver, err := sqlite3migrate.WithInstance(db.DB, &sqlite3migrate.Config{})
if err != nil {
return nil, fmt.Errorf("making blockstore migration driver: %w", err)
}
m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver)
if err != nil { if err != nil {
return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err) return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err)
} }
return m, nil return m, nil
} }
func MigrateBlockstore() error { func MigrateBlockstore(shouldClose bool) error {
log.Printf("migrate blockstore\n") log.Printf("migrate blockstore\n")
m, err := MakeBlockstoreMigrate() m, err := MakeBlockstoreMigrate()
if err != nil { if err != nil {
@ -131,7 +151,9 @@ func MigrateBlockstore() error {
if err != nil { if err != nil {
return fmt.Errorf("cannot get current migration version: %v", err) return fmt.Errorf("cannot get current migration version: %v", err)
} }
if shouldClose {
defer m.Close() defer m.Close()
}
err = m.Up() err = m.Up()
if err != nil && err != migrate.ErrNoChange { if err != nil && err != migrate.ErrNoChange {
return fmt.Errorf("migrating blockstore: %w", err) return fmt.Errorf("migrating blockstore: %w", err)