checkpoint, bug fixes

This commit is contained in:
sawka 2024-05-19 23:27:21 -07:00
parent 9129900a3e
commit 889fcac38d
4 changed files with 58 additions and 41 deletions

View File

@ -10,7 +10,7 @@ package blockstore
import (
"context"
"fmt"
"os"
"io/fs"
"sync"
"sync/atomic"
"time"
@ -100,7 +100,7 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
}
return withLock(s, blockId, name, func(entry *CacheEntry) error {
if entry.File != nil {
return os.ErrExist
return fs.ErrExist
}
now := time.Now().UnixMilli()
file := &BlockFile{
@ -138,7 +138,7 @@ func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
return nil
}
// if file doesn't exsit, returns os.ErrNotExist
// if file doesn't exsit, returns fs.ErrNotExist
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) {
file, err := entry.loadFileForRead(ctx)
@ -194,7 +194,8 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string,
return err
}
entry.writeAt(0, data, true)
return nil
// since WriteFile can *truncate* the file, we need to flush the file to the DB immediately
return entry.flushToDB(ctx, true)
})
}
@ -211,18 +212,6 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o
if offset > file.Size {
return fmt.Errorf("offset is past the end of the file")
}
if file.Opts.Circular {
startCirFileOffset := file.Size - file.Opts.MaxSize
if offset+int64(len(data)) < startCirFileOffset {
// write is before the start of the circular file
return nil
}
if offset < startCirFileOffset {
amtBeforeStart := startCirFileOffset - offset
offset += amtBeforeStart
data = data[amtBeforeStart:]
}
}
partMap := file.computePartMap(offset, int64(len(data)))
incompleteParts := incompletePartsFromMap(partMap)
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
@ -286,7 +275,7 @@ func (s *BlockStore) FlushCache(ctx context.Context) error {
dirtyCacheKeys := s.getDirtyCacheKeys()
for _, key := range dirtyCacheKeys {
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
return entry.flushToDB(ctx)
return entry.flushToDB(ctx, false)
})
if ctx.Err() != nil {
// transient error (also must stop the loop)

View File

@ -4,9 +4,10 @@
package blockstore
import (
"bytes"
"context"
"fmt"
"os"
"io/fs"
"sync"
"time"
)
@ -39,6 +40,17 @@ type CacheEntry struct {
FlushErrors int
}
//lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount)
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
}
return buf.String()
}
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
return &DataCacheEntry{
PartIdx: partIdx,
@ -108,7 +120,7 @@ func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error
return nil, fmt.Errorf("error getting file: %w", err)
}
if file == nil {
return nil, os.ErrNotExist
return nil, fs.ErrNotExist
}
return file, nil
}
@ -145,6 +157,28 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC
}
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
if replace {
entry.File.Size = 0
}
if entry.File.Opts.Circular {
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
if offset+int64(len(data)) < startCirFileOffset {
// write is before the start of the circular file
return
}
if offset < startCirFileOffset {
// truncate data (from the front), update offset
truncateAmt := startCirFileOffset - offset
data = data[truncateAmt:]
offset += truncateAmt
}
if int64(len(data)) > entry.File.Opts.MaxSize {
// truncate data (from the front), update offset
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
data = data[truncateAmt:]
offset += truncateAmt
}
}
endWriteOffset := offset + int64(len(data))
if replace {
entry.DataEntries = make(map[int]*DataCacheEntry)
@ -189,10 +223,12 @@ func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, r
realDataOffset = file.Size - file.Opts.MaxSize
}
if offset < realDataOffset {
offset = realDataOffset
truncateAmt := realDataOffset - offset
offset += truncateAmt
size -= truncateAmt
}
}
partMap := entry.File.computePartMap(offset, size)
partMap := file.computePartMap(offset, size)
dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap))
if err != nil {
return 0, nil, err
@ -287,11 +323,11 @@ func makeCacheEntry(blockId string, name string) *CacheEntry {
}
}
func (entry *CacheEntry) flushToDB(ctx context.Context) error {
func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error {
if entry.File == nil {
return nil
}
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries)
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace)
if ctx.Err() != nil {
// transient error
return ctx.Err()

View File

@ -82,7 +82,7 @@ func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error)
})
}
func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry) error {
func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
return WithTx(ctx, func(tx *TxWrap) error {
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
if !tx.Exists(query, file.BlockId, file.Name) {
@ -92,6 +92,10 @@ func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int
// we don't update CreatedTs or Opts
query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?`
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name)
if replace {
query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?`
tx.Exec(query, file.BlockId, file.Name)
}
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
for partIdx, dataEntry := range dataEntries {
if partIdx != dataEntry.PartIdx {

View File

@ -6,7 +6,9 @@ package blockstore
import (
"bytes"
"context"
"errors"
"fmt"
"io/fs"
"log"
"sync"
"sync/atomic"
@ -57,17 +59,6 @@ func (s *BlockStore) clearCache() {
s.Cache = make(map[cacheKey]*CacheEntry)
}
//lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount)
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
}
return buf.String()
}
//lint:ignore U1000 used for testing
func (s *BlockStore) dump() string {
s.Lock.Lock()
@ -171,12 +162,9 @@ func TestDelete(t *testing.T) {
if err != nil {
t.Fatalf("error deleting file: %v", err)
}
file, err := GBS.Stat(ctx, blockId, "testfile")
if err != nil {
t.Fatalf("error stating file: %v", err)
}
if file != nil {
t.Fatalf("file should not be found")
_, err = GBS.Stat(ctx, blockId, "testfile")
if err == nil || errors.Is(err, fs.ErrNotExist) {
t.Errorf("expected file not found error")
}
// create two files in same block, use DeleteBlock to delete