working on flush, bug fixes, tests

This commit is contained in:
sawka 2024-05-19 12:22:55 -07:00
parent 1695ec46b1
commit 4e54b8a9e1
4 changed files with 110 additions and 40 deletions

View File

@ -20,7 +20,10 @@ const DefaultPartDataSize = 64 * 1024
const DefaultFlushTime = 5 * time.Second
const NoPartIdx = -1
// for unit tests
var warningCount = &atomic.Int32{}
var flushErrorCount = &atomic.Int32{}
var partDataSize int64 = DefaultPartDataSize // overridden in tests
var stopFlush = &atomic.Bool{}
@ -445,8 +448,6 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o
// returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) {
s.pinCacheEntry(blockId, name)
defer s.unpinCacheEntry(blockId, name)
file, err := s.Stat(ctx, blockId, name)
if err != nil {
return 0, nil, fmt.Errorf("error getting file: %v", err)
@ -459,18 +460,20 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
offset += sizeTooBig
}
}
partMap := file.computePartMap(offset, size)
var partsNeeded []int
lastPartOffset := (offset + size) % partDataSize
endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize
for i := offset; i < endOffsetOfLastPart; i += partDataSize {
partsNeeded = append(partsNeeded, file.partIdxAtOffset(i))
for partIdx := range partMap {
partsNeeded = append(partsNeeded, partIdx)
}
dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded)
if err != nil {
return 0, nil, fmt.Errorf("error loading data parts: %v", err)
}
// wash the entries through the cache
err = s.withLockExists(blockId, name, func(entry *CacheEntry) error {
s.withLock(blockId, name, false, func(entry *CacheEntry) {
if entry == nil || entry.FileEntry == nil {
return
}
if offset+size > entry.FileEntry.File.Size {
// limit read to the actual size of the file
size = entry.FileEntry.File.Size - offset
@ -481,11 +484,7 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of
}
dataEntries[partIdx] = entry.DataEntries[partIdx]
}
return nil
})
if err != nil {
return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err)
}
// combine the entries into a single byte slice
// note that we only want part of the first and last part depending on offset and size
var rtn []byte
@ -532,10 +531,48 @@ func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
return dirtyCacheKeys
}
func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) error {
// todo
func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) (rtnErr error) {
fileEntry, dataEntries := s.getDirtyDataEntriesForFlush(blockId, name)
if fileEntry == nil {
return nil
}
defer func() {
// clear flushing flags (always)
// clear dirty flags if no error
// no lock required, note that we unset dirty before flushing
if rtnErr == nil {
fileEntry.Dirty.Store(false)
}
fileEntry.Flushing.Store(false)
for _, dataEntry := range dataEntries {
if rtnErr == nil {
dataEntry.Dirty.Store(false)
}
dataEntry.Flushing.Store(false)
}
}()
rtnErr = dbWriteCacheEntry(ctx, fileEntry, dataEntries)
if rtnErr != nil {
rtnErr = fmt.Errorf("error writing cache entry: %v", rtnErr)
return rtnErr
}
return nil
}
func (s *BlockStore) incrementFlushErrors(blockId string, name string) int {
var rtn int
s.withLock(blockId, name, false, func(entry *CacheEntry) {
entry.FlushErrors++
rtn = entry.FlushErrors
})
return rtn
}
func (s *BlockStore) deleteCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
}
func (s *BlockStore) FlushCache(ctx context.Context) error {
// get a copy of dirty keys so we can iterate without the lock
@ -543,8 +580,19 @@ func (s *BlockStore) FlushCache(ctx context.Context) error {
for _, key := range dirtyCacheKeys {
err := s.flushFile(ctx, key.BlockId, key.Name)
if err != nil {
if ctx.Err() != nil {
// context error is transient
return fmt.Errorf("context error: %v", ctx.Err())
}
// if error is not transient, we should probably delete the offending entry :/
log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err)
flushErrorCount.Add(1)
totalErrors := s.incrementFlushErrors(key.BlockId, key.Name)
if totalErrors >= 3 {
s.deleteCacheEntry(key.BlockId, key.Name)
log.Printf("too many errors flushing file %s/%s, clear entry", key.BlockId, key.Name)
}
continue
}
s.cleanCacheEntry(key.BlockId, key.Name)

View File

@ -5,7 +5,6 @@ package blockstore
import (
"bytes"
"context"
"fmt"
"log"
"sync"
@ -61,6 +60,7 @@ type CacheEntry struct {
WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention
FileEntry *FileCacheEntry
DataEntries map[int]*DataCacheEntry
FlushErrors int
}
//lint:ignore U1000 used for testing
@ -186,6 +186,7 @@ func makeCacheEntry(blockId string, name string) *CacheEntry {
WriteIntentions: make(map[int]WriteIntention),
FileEntry: nil,
DataEntries: make(map[int]*DataCacheEntry),
FlushErrors: 0,
}
}
@ -296,16 +297,21 @@ func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) {
fn(&fileEntry.File)
}
// also sets Flushing to true
func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []*DataCacheEntry) {
// also sets Flushing to true on fileentry / dataentries
func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*FileCacheEntry, []*DataCacheEntry) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return nil, nil
}
if entry.Deleted || entry.FileEntry == nil {
return nil, nil
}
var dirtyData []*DataCacheEntry
for _, dce := range entry.DataEntries {
if dce != nil && dce.Dirty.Load() {
dce.Flushing.Store(true)
dirtyData = append(dirtyData, dce)
}
}
@ -318,20 +324,6 @@ func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []
return entry.FileEntry, dirtyData
}
// clean is true if the block was clean (nothing to write)
// returns (clean, error)
func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error {
fileEntry, dirtyData := s.getDirtyDataEntries(entry)
if fileEntry == nil && len(dirtyData) == 0 {
return nil
}
err := dbWriteCacheEntry(ctx, fileEntry, dirtyData)
if err != nil {
return err
}
return nil
}
func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool {
if entry.FileEntry == nil {
warningCount.Add(1)

View File

@ -61,6 +61,11 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in
rtn := make(map[int]*DataCacheEntry)
for _, d := range data {
d.Dirty = &atomic.Bool{}
if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data)
d.Data = newData
}
rtn[d.PartIdx] = d
}
return rtn, nil
@ -96,16 +101,6 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri
}
tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data)
}
if tx.Err == nil {
// clear dirty flags
fileEntry.Dirty.Store(false)
for _, dataEntry := range dataEntries {
if dataEntry != nil {
dataEntry.Dirty.Store(false)
dataEntry.Flushing.Store(false)
}
}
}
return nil
})
}

View File

@ -39,6 +39,9 @@ func cleanupDb(t *testing.T) {
if warningCount.Load() > 0 {
t.Errorf("warning count: %d", warningCount.Load())
}
if flushErrorCount.Load() > 0 {
t.Errorf("flush error count: %d", flushErrorCount.Load())
}
}
func TestCreate(t *testing.T) {
@ -502,3 +505,35 @@ func TestComputePartMap(t *testing.T) {
m = file.computePartMap(2005, 1105)
testIntMapsEq(t, "map9", m, map[int]int{0: 100, 1: 10, 2: 100, 3: 100, 4: 100, 5: 100, 6: 100, 7: 100, 8: 100, 9: 100})
}
func TestSimpleDBFlush(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!"))
if err != nil {
t.Fatalf("error writing data: %v", err)
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
err = GBS.FlushCache(ctx)
if err != nil {
t.Fatalf("error flushing cache: %v", err)
}
if GBS.getCacheSize() != 0 {
t.Errorf("cache size mismatch")
}
checkFileData(t, ctx, blockId, fileName, "hello world!")
if GBS.getCacheSize() != 0 {
t.Errorf("cache size mismatch (after read)")
}
checkFileDataAt(t, ctx, blockId, fileName, 6, "world!")
checkFileSize(t, ctx, blockId, fileName, 12)
}