From 889fcac38d00ee700015d7a7ebde594e961e4e15 Mon Sep 17 00:00:00 2001 From: sawka Date: Sun, 19 May 2024 23:27:21 -0700 Subject: [PATCH] checkpoint, bug fixes --- pkg/blockstore/blockstore.go | 23 ++++---------- pkg/blockstore/blockstore_cache.go | 48 ++++++++++++++++++++++++++---- pkg/blockstore/blockstore_dbops.go | 6 +++- pkg/blockstore/blockstore_test.go | 22 ++++---------- 4 files changed, 58 insertions(+), 41 deletions(-) diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index a369b5ee4..7d506bf68 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -10,7 +10,7 @@ package blockstore import ( "context" "fmt" - "os" + "io/fs" "sync" "sync/atomic" "time" @@ -100,7 +100,7 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, } return withLock(s, blockId, name, func(entry *CacheEntry) error { if entry.File != nil { - return os.ErrExist + return fs.ErrExist } now := time.Now().UnixMilli() file := &BlockFile{ @@ -138,7 +138,7 @@ func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { return nil } -// if file doesn't exsit, returns os.ErrNotExist +// if file doesn't exsit, returns fs.ErrNotExist func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) { file, err := entry.loadFileForRead(ctx) @@ -194,7 +194,8 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, return err } entry.writeAt(0, data, true) - return nil + // since WriteFile can *truncate* the file, we need to flush the file to the DB immediately + return entry.flushToDB(ctx, true) }) } @@ -211,18 +212,6 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o if offset > file.Size { return fmt.Errorf("offset is past the end of the file") } - if file.Opts.Circular { - startCirFileOffset := file.Size - file.Opts.MaxSize - if offset+int64(len(data)) < startCirFileOffset { - // write is before the start of the circular file - return nil - } - if offset < startCirFileOffset { - amtBeforeStart := startCirFileOffset - offset - offset += amtBeforeStart - data = data[amtBeforeStart:] - } - } partMap := file.computePartMap(offset, int64(len(data))) incompleteParts := incompletePartsFromMap(partMap) err = entry.loadDataPartsIntoCache(ctx, incompleteParts) @@ -286,7 +275,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) error { dirtyCacheKeys := s.getDirtyCacheKeys() for _, key := range dirtyCacheKeys { err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { - return entry.flushToDB(ctx) + return entry.flushToDB(ctx, false) }) if ctx.Err() != nil { // transient error (also must stop the loop) diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index a2af2eef2..e25716136 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -4,9 +4,10 @@ package blockstore import ( + "bytes" "context" "fmt" - "os" + "io/fs" "sync" "time" ) @@ -39,6 +40,17 @@ type CacheEntry struct { FlushErrors int } +//lint:ignore U1000 used for testing +func (e *CacheEntry) dump() string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount) + fmt.Fprintf(&buf, " FileEntry: %v\n", e.File) + for idx, dce := range e.DataEntries { + fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data)) + } + return buf.String() +} + func makeDataCacheEntry(partIdx int) *DataCacheEntry { return &DataCacheEntry{ PartIdx: partIdx, @@ -108,7 +120,7 @@ func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error return nil, fmt.Errorf("error getting file: %w", err) } if file == nil { - return nil, os.ErrNotExist + return nil, fs.ErrNotExist } return file, nil } @@ -145,6 +157,28 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC } func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { + if replace { + entry.File.Size = 0 + } + if entry.File.Opts.Circular { + startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize + if offset+int64(len(data)) < startCirFileOffset { + // write is before the start of the circular file + return + } + if offset < startCirFileOffset { + // truncate data (from the front), update offset + truncateAmt := startCirFileOffset - offset + data = data[truncateAmt:] + offset += truncateAmt + } + if int64(len(data)) > entry.File.Opts.MaxSize { + // truncate data (from the front), update offset + truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize + data = data[truncateAmt:] + offset += truncateAmt + } + } endWriteOffset := offset + int64(len(data)) if replace { entry.DataEntries = make(map[int]*DataCacheEntry) @@ -189,10 +223,12 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r realDataOffset = file.Size - file.Opts.MaxSize } if offset < realDataOffset { - offset = realDataOffset + truncateAmt := realDataOffset - offset + offset += truncateAmt + size -= truncateAmt } } - partMap := entry.File.computePartMap(offset, size) + partMap := file.computePartMap(offset, size) dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap)) if err != nil { return 0, nil, err @@ -287,11 +323,11 @@ func makeCacheEntry(blockId string, name string) *CacheEntry { } } -func (entry *CacheEntry) flushToDB(ctx context.Context) error { +func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error { if entry.File == nil { return nil } - err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries) + err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace) if ctx.Err() != nil { // transient error return ctx.Err() diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 7829de448..279b2b38c 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -82,7 +82,7 @@ func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) }) } -func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry) error { +func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error { return WithTx(ctx, func(tx *TxWrap) error { query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?` if !tx.Exists(query, file.BlockId, file.Name) { @@ -92,6 +92,10 @@ func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int // we don't update CreatedTs or Opts query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?` tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name) + if replace { + query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?` + tx.Exec(query, file.BlockId, file.Name) + } dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` for partIdx, dataEntry := range dataEntries { if partIdx != dataEntry.PartIdx { diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index f381c372b..8fbdfb22d 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -6,7 +6,9 @@ package blockstore import ( "bytes" "context" + "errors" "fmt" + "io/fs" "log" "sync" "sync/atomic" @@ -57,17 +59,6 @@ func (s *BlockStore) clearCache() { s.Cache = make(map[cacheKey]*CacheEntry) } -//lint:ignore U1000 used for testing -func (e *CacheEntry) dump() string { - var buf bytes.Buffer - fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount) - fmt.Fprintf(&buf, " FileEntry: %v\n", e.File) - for idx, dce := range e.DataEntries { - fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data)) - } - return buf.String() -} - //lint:ignore U1000 used for testing func (s *BlockStore) dump() string { s.Lock.Lock() @@ -171,12 +162,9 @@ func TestDelete(t *testing.T) { if err != nil { t.Fatalf("error deleting file: %v", err) } - file, err := GBS.Stat(ctx, blockId, "testfile") - if err != nil { - t.Fatalf("error stating file: %v", err) - } - if file != nil { - t.Fatalf("file should not be found") + _, err = GBS.Stat(ctx, blockId, "testfile") + if err == nil || errors.Is(err, fs.ErrNotExist) { + t.Errorf("expected file not found error") } // create two files in same block, use DeleteBlock to delete