diff --git a/db/migrations-blockstore/000001_init.up.sql b/db/migrations-blockstore/000001_init.up.sql index dd4b70a0e..fce5a6986 100644 --- a/db/migrations-blockstore/000001_init.up.sql +++ b/db/migrations-blockstore/000001_init.up.sql @@ -1,11 +1,10 @@ CREATE TABLE db_block_file ( blockid varchar(36) NOT NULL, name varchar(200) NOT NULL, - maxsize bigint NOT NULL, - circular boolean NOT NULL, size bigint NOT NULL, createdts bigint NOT NULL, modts bigint NOT NULL, + opts json NOT NULL, meta json NOT NULL, PRIMARY KEY (blockid, name) ); diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index e25b084ee..95514f4a8 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -14,11 +14,13 @@ import ( "time" ) -const PartDataSize = 64 * 1024 +const DefaultPartDataSize = 64 * 1024 const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 -var GlobalBlockStore *BlockStore = &BlockStore{ +var partDataSize int64 = DefaultPartDataSize // overridden in tests + +var GBS *BlockStore = &BlockStore{ Lock: &sync.Mutex{}, Cache: make(map[cacheKey]*CacheEntry), FlushTime: DefaultFlushTime, @@ -81,6 +83,11 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, if opts.Circular && opts.IJson { return fmt.Errorf("circular file cannot be ijson") } + if opts.Circular { + if opts.MaxSize%partDataSize != 0 { + opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize + } + } now := time.Now().UnixMilli() file := &BlockFile{ BlockId: blockId, @@ -124,7 +131,7 @@ func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*Bl if ok { return file, nil } - return dbGetFile(ctx, blockId, name) + return dbGetBlockFile(ctx, blockId, name) } func stripNils[T any](arr []*T) []*T { @@ -163,7 +170,7 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { file, ok := s.getFileFromCache(blockId, name) if !ok { - dbFile, err := dbGetFile(ctx, blockId, name) + dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return fmt.Errorf("error getting file: %v", err) } @@ -195,7 +202,7 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri } return file, nil } - dbFile, err := dbGetFile(ctx, blockId, name) + dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return nil, fmt.Errorf("error getting file: %v", err) } @@ -222,16 +229,16 @@ func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name stri } func (f *BlockFile) getLastIncompletePartNum() int { - if f.Size%PartDataSize == 0 { + if f.Size%partDataSize == 0 { return NoPartIdx } return f.partIdxAtOffset(f.Size) } func (f *BlockFile) partIdxAtOffset(offset int64) int { - partIdx := int(offset / PartDataSize) + partIdx := int(offset / partDataSize) if f.Opts.Circular { - maxPart := int(f.Opts.MaxSize / PartDataSize) + maxPart := int(f.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } return partIdx @@ -363,9 +370,9 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of } } var partsNeeded []int - lastPartOffset := (offset + size) % PartDataSize - endOffsetOfLastPart := offset + size - lastPartOffset + PartDataSize - for i := offset; i < endOffsetOfLastPart; i += PartDataSize { + lastPartOffset := (offset + size) % partDataSize + endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize + for i := offset; i < endOffsetOfLastPart; i += partDataSize { partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) } dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) @@ -398,12 +405,12 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of partDataEntry := dataEntries[partIdx] var partData []byte if partDataEntry == nil { - partData = make([]byte, PartDataSize) + partData = make([]byte, partDataSize) } else { - partData = partDataEntry.Data[0:PartDataSize] + partData = partDataEntry.Data[0:partDataSize] } - partOffset := curReadOffset % PartDataSize - amtToRead := minInt64(PartDataSize-partOffset, amtLeftToRead) + partOffset := curReadOffset % partDataSize + amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) amtLeftToRead -= amtToRead curReadOffset += amtToRead diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index ab88eacef..dc6ba26bc 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -50,7 +50,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { if create && e.DataEntries[partIdx] == nil { e.DataEntries[partIdx] = &DataCacheEntry{ PartIdx: partIdx, - Data: make([]byte, 0, PartDataSize), + Data: make([]byte, 0, partDataSize), Dirty: &atomic.Bool{}, } } @@ -58,7 +58,7 @@ func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { } func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { - leftInPart := PartDataSize - offset + leftInPart := partDataSize - offset toWrite := int64(len(data)) if toWrite > leftInPart { toWrite = leftInPart @@ -73,12 +73,12 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 { func (entry *CacheEntry) writeAt(offset int64, data []byte) { for len(data) > 0 { - partIdx := int(offset / PartDataSize) + partIdx := int(offset / partDataSize) if entry.FileEntry.File.Opts.Circular { - maxPart := int(entry.FileEntry.File.Opts.MaxSize / PartDataSize) + maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } - partOffset := offset % PartDataSize + partOffset := offset % partDataSize partData := entry.ensurePart(partIdx, true) nw := partData.writeToPart(partOffset, data) data = data[nw:] diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 583737ef7..03d338e7d 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -12,7 +12,7 @@ func dbInsertFile(ctx context.Context, file *BlockFile) error { // will fail if file already exists return WithTx(ctx, func(tx *TxWrap) error { query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)" - tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, file.Opts, file.Meta) + tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, dbutil.QuickJson(file.Opts), dbutil.QuickJson(file.Meta)) return nil }) } @@ -46,12 +46,11 @@ func dbDeleteBlock(ctx context.Context, blockId string) error { }) } -func dbGetFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { +func dbGetBlockFile(ctx context.Context, blockId string, name string) (*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) { - var file BlockFile query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?" - tx.Get(&file, query, blockId, name) - return &file, nil + file := dbutil.GetMappable[*BlockFile](tx, query, blockId, name) + return file, nil }) } @@ -80,9 +79,8 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) { - var files []*BlockFile query := "SELECT * FROM db_block_file WHERE blockid = ?" - tx.Select(&files, query, blockId) + files := dbutil.SelectMappable[*BlockFile](tx, query, blockId) return files, nil }) } @@ -99,7 +97,7 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri } if fileEntry.Dirty.Load() { query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?` - tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, fileEntry.File.Opts, fileEntry.File.Meta, fileEntry.File.BlockId, fileEntry.File.Name) + tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name) } dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` for _, dataEntry := range dataEntries { diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go new file mode 100644 index 000000000..2fad077f1 --- /dev/null +++ b/pkg/blockstore/blockstore_test.go @@ -0,0 +1,79 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package blockstore + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" +) + +func initDb(t *testing.T) { + t.Logf("initializing db for %q", t.Name()) + useTestingDb = true + partDataSize = 64 + err := MigrateBlockstore(false) + if err != nil { + t.Fatalf("error migrating blockstore: %v", err) + } +} + +func cleanupDb(t *testing.T) { + t.Logf("cleaning up db for %q", t.Name()) + globalDBLock.Lock() + defer globalDBLock.Unlock() + if globalDB != nil { + globalDB.Close() + globalDB = nil + } + globalDBErr = nil + useTestingDb = false + partDataSize = DefaultPartDataSize +} + +func TestCreate(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + file, err := GBS.Stat(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error stating file: %v", err) + } + if file == nil { + t.Fatalf("file not found") + } + if file.BlockId != blockId { + t.Fatalf("block id mismatch") + } + if file.Name != "testfile" { + t.Fatalf("name mismatch") + } + if file.Size != 0 { + t.Fatalf("size mismatch") + } + if file.CreatedTs == 0 { + t.Fatalf("created ts zero") + } + if file.ModTs == 0 { + t.Fatalf("mod ts zero") + } + if file.CreatedTs != file.ModTs { + t.Fatalf("create ts != mod ts") + } + if len(file.Meta) != 0 { + t.Fatalf("meta should have no values") + } + if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { + t.Fatalf("opts not empty") + } +} diff --git a/pkg/blockstore/dbsetup.go b/pkg/blockstore/dbsetup.go index 500158d7e..49f7f9019 100644 --- a/pkg/blockstore/dbsetup.go +++ b/pkg/blockstore/dbsetup.go @@ -14,7 +14,7 @@ import ( "github.com/wavetermdev/thenextwave/pkg/wavebase" "github.com/golang-migrate/migrate/v4" - _ "github.com/golang-migrate/migrate/v4/database/sqlite3" + sqlite3migrate "github.com/golang-migrate/migrate/v4/database/sqlite3" "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/jmoiron/sqlx" _ "github.com/mattn/go-sqlite3" @@ -35,9 +35,10 @@ var dbWrap *SingleConnDBGetter = &SingleConnDBGetter{SingleConnLock: &sync.Mutex var globalDBLock = &sync.Mutex{} var globalDB *sqlx.DB var globalDBErr error +var useTestingDb bool // just for testing (forces GetDB() to return an in-memory db) func InitBlockstore() error { - err := MigrateBlockstore() + err := MigrateBlockstore(false) if err != nil { return err } @@ -63,6 +64,16 @@ func GetDB(ctx context.Context) (*sqlx.DB, error) { globalDBLock.Lock() defer globalDBLock.Unlock() if globalDB == nil && globalDBErr == nil { + if useTestingDb { + dbName := ":memory:" + globalDB, globalDBErr = sqlx.Open("sqlite3", dbName) + if globalDBErr != nil { + log.Printf("[db] in-memory db err: %v\n", globalDBErr) + } else { + log.Printf("[db] using in-memory db\n") + } + return globalDB, globalDBErr + } dbName := GetDBName() globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) if globalDBErr != nil { @@ -110,15 +121,24 @@ func MakeBlockstoreMigrate() (*migrate.Migrate, error) { if err != nil { return nil, fmt.Errorf("opening iofs: %w", err) } - dbUrl := fmt.Sprintf("sqlite3://%s", GetDBName()) - m, err := migrate.NewWithSourceInstance("iofs", fsVar, dbUrl) + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + db, err := GetDB(ctx) + if err != nil { + return nil, err + } + mdriver, err := sqlite3migrate.WithInstance(db.DB, &sqlite3migrate.Config{}) + if err != nil { + return nil, fmt.Errorf("making blockstore migration driver: %w", err) + } + m, err := migrate.NewWithInstance("iofs", fsVar, "sqlite3", mdriver) if err != nil { return nil, fmt.Errorf("making blockstore migration db[%s]: %w", GetDBName(), err) } return m, nil } -func MigrateBlockstore() error { +func MigrateBlockstore(shouldClose bool) error { log.Printf("migrate blockstore\n") m, err := MakeBlockstoreMigrate() if err != nil { @@ -131,7 +151,9 @@ func MigrateBlockstore() error { if err != nil { return fmt.Errorf("cannot get current migration version: %v", err) } - defer m.Close() + if shouldClose { + defer m.Close() + } err = m.Up() if err != nil && err != migrate.ErrNoChange { return fmt.Errorf("migrating blockstore: %w", err)