diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 19f947efa..3f23ae9ff 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -20,7 +20,10 @@ const DefaultPartDataSize = 64 * 1024 const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 +// for unit tests var warningCount = &atomic.Int32{} +var flushErrorCount = &atomic.Int32{} + var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} @@ -445,8 +448,6 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o // returns (offset, data, error) // we return the offset because the offset may have been adjusted if the size was too big (for circular files) func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) { - s.pinCacheEntry(blockId, name) - defer s.unpinCacheEntry(blockId, name) file, err := s.Stat(ctx, blockId, name) if err != nil { return 0, nil, fmt.Errorf("error getting file: %v", err) @@ -459,18 +460,20 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of offset += sizeTooBig } } + partMap := file.computePartMap(offset, size) var partsNeeded []int - lastPartOffset := (offset + size) % partDataSize - endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize - for i := offset; i < endOffsetOfLastPart; i += partDataSize { - partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) + for partIdx := range partMap { + partsNeeded = append(partsNeeded, partIdx) } dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) if err != nil { return 0, nil, fmt.Errorf("error loading data parts: %v", err) } // wash the entries through the cache - err = s.withLockExists(blockId, name, func(entry *CacheEntry) error { + s.withLock(blockId, name, false, func(entry *CacheEntry) { + if entry == nil || entry.FileEntry == nil { + return + } if offset+size > entry.FileEntry.File.Size { // limit read to the actual size of the file size = entry.FileEntry.File.Size - offset @@ -481,11 +484,7 @@ func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, of } dataEntries[partIdx] = entry.DataEntries[partIdx] } - return nil }) - if err != nil { - return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err) - } // combine the entries into a single byte slice // note that we only want part of the first and last part depending on offset and size var rtn []byte @@ -532,19 +531,68 @@ func (s *BlockStore) getDirtyCacheKeys() []cacheKey { return dirtyCacheKeys } -func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) error { - // todo +func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) (rtnErr error) { + fileEntry, dataEntries := s.getDirtyDataEntriesForFlush(blockId, name) + if fileEntry == nil { + return nil + } + defer func() { + // clear flushing flags (always) + // clear dirty flags if no error + // no lock required, note that we unset dirty before flushing + if rtnErr == nil { + fileEntry.Dirty.Store(false) + } + fileEntry.Flushing.Store(false) + for _, dataEntry := range dataEntries { + if rtnErr == nil { + dataEntry.Dirty.Store(false) + } + dataEntry.Flushing.Store(false) + } + }() + rtnErr = dbWriteCacheEntry(ctx, fileEntry, dataEntries) + if rtnErr != nil { + rtnErr = fmt.Errorf("error writing cache entry: %v", rtnErr) + return rtnErr + } return nil } +func (s *BlockStore) incrementFlushErrors(blockId string, name string) int { + var rtn int + s.withLock(blockId, name, false, func(entry *CacheEntry) { + entry.FlushErrors++ + rtn = entry.FlushErrors + }) + return rtn +} + +func (s *BlockStore) deleteCacheEntry(blockId string, name string) { + s.Lock.Lock() + defer s.Lock.Unlock() + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) +} + func (s *BlockStore) FlushCache(ctx context.Context) error { // get a copy of dirty keys so we can iterate without the lock dirtyCacheKeys := s.getDirtyCacheKeys() for _, key := range dirtyCacheKeys { err := s.flushFile(ctx, key.BlockId, key.Name) if err != nil { + if ctx.Err() != nil { + // context error is transient + return fmt.Errorf("context error: %v", ctx.Err()) + } // if error is not transient, we should probably delete the offending entry :/ log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err) + flushErrorCount.Add(1) + totalErrors := s.incrementFlushErrors(key.BlockId, key.Name) + if totalErrors >= 3 { + s.deleteCacheEntry(key.BlockId, key.Name) + log.Printf("too many errors flushing file %s/%s, clear entry", key.BlockId, key.Name) + + } continue } s.cleanCacheEntry(key.BlockId, key.Name) diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index d6679893c..69045a6c4 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -5,7 +5,6 @@ package blockstore import ( "bytes" - "context" "fmt" "log" "sync" @@ -61,6 +60,7 @@ type CacheEntry struct { WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention FileEntry *FileCacheEntry DataEntries map[int]*DataCacheEntry + FlushErrors int } //lint:ignore U1000 used for testing @@ -186,6 +186,7 @@ func makeCacheEntry(blockId string, name string) *CacheEntry { WriteIntentions: make(map[int]WriteIntention), FileEntry: nil, DataEntries: make(map[int]*DataCacheEntry), + FlushErrors: 0, } } @@ -296,16 +297,21 @@ func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) { fn(&fileEntry.File) } -// also sets Flushing to true -func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []*DataCacheEntry) { +// also sets Flushing to true on fileentry / dataentries +func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*FileCacheEntry, []*DataCacheEntry) { s.Lock.Lock() defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return nil, nil + } if entry.Deleted || entry.FileEntry == nil { return nil, nil } var dirtyData []*DataCacheEntry for _, dce := range entry.DataEntries { if dce != nil && dce.Dirty.Load() { + dce.Flushing.Store(true) dirtyData = append(dirtyData, dce) } } @@ -318,20 +324,6 @@ func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, [] return entry.FileEntry, dirtyData } -// clean is true if the block was clean (nothing to write) -// returns (clean, error) -func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error { - fileEntry, dirtyData := s.getDirtyDataEntries(entry) - if fileEntry == nil && len(dirtyData) == 0 { - return nil - } - err := dbWriteCacheEntry(ctx, fileEntry, dirtyData) - if err != nil { - return err - } - return nil -} - func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool { if entry.FileEntry == nil { warningCount.Add(1) diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index 1b0c31061..802c4b5db 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -61,6 +61,11 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in rtn := make(map[int]*DataCacheEntry) for _, d := range data { d.Dirty = &atomic.Bool{} + if cap(d.Data) != int(partDataSize) { + newData := make([]byte, len(d.Data), partDataSize) + copy(newData, d.Data) + d.Data = newData + } rtn[d.PartIdx] = d } return rtn, nil @@ -96,16 +101,6 @@ func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntri } tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data) } - if tx.Err == nil { - // clear dirty flags - fileEntry.Dirty.Store(false) - for _, dataEntry := range dataEntries { - if dataEntry != nil { - dataEntry.Dirty.Store(false) - dataEntry.Flushing.Store(false) - } - } - } return nil }) } diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 8f7887eb2..43030f6b1 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -39,6 +39,9 @@ func cleanupDb(t *testing.T) { if warningCount.Load() > 0 { t.Errorf("warning count: %d", warningCount.Load()) } + if flushErrorCount.Load() > 0 { + t.Errorf("flush error count: %d", flushErrorCount.Load()) + } } func TestCreate(t *testing.T) { @@ -502,3 +505,35 @@ func TestComputePartMap(t *testing.T) { m = file.computePartMap(2005, 1105) testIntMapsEq(t, "map9", m, map[int]int{0: 100, 1: 10, 2: 100, 3: 100, 4: 100, 5: 100, 6: 100, 7: 100, 8: 100, 9: 100}) } + +func TestSimpleDBFlush(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + fileName := "t1" + err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.WriteFile(ctx, blockId, fileName, []byte("hello world!")) + if err != nil { + t.Fatalf("error writing data: %v", err) + } + checkFileData(t, ctx, blockId, fileName, "hello world!") + err = GBS.FlushCache(ctx) + if err != nil { + t.Fatalf("error flushing cache: %v", err) + } + if GBS.getCacheSize() != 0 { + t.Errorf("cache size mismatch") + } + checkFileData(t, ctx, blockId, fileName, "hello world!") + if GBS.getCacheSize() != 0 { + t.Errorf("cache size mismatch (after read)") + } + checkFileDataAt(t, ctx, blockId, fileName, 6, "world!") + checkFileSize(t, ctx, blockId, fileName, 12) +}