diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 19dc00f8d..19f947efa 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -10,6 +10,7 @@ package blockstore import ( "context" "fmt" + "log" "sync" "sync/atomic" "time" @@ -19,6 +20,7 @@ const DefaultPartDataSize = 64 * 1024 const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 +var warningCount = &atomic.Int32{} var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} @@ -311,9 +313,7 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str if err != nil { return fmt.Errorf("error getting file part: %v", err) } - maxPart := maxOfIntArr(parts) return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.ensurePart(maxPart, false) for partIdx, partData := range partDataMap { if entry.DataEntries[partIdx] != nil { // someone beat us to it @@ -325,20 +325,9 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str }) } -func (entry *CacheEntry) writeAtToCache(offset int64, data []byte, replace bool) { - endWrite := offset + int64(len(data)) - entry.writeAt(offset, data, replace) - entry.modifyFileData(func(file *BlockFile) { - if endWrite > file.Size || replace { - file.Size = endWrite - } - file.ModTs = time.Now().UnixMilli() - }) -} - func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error { return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAtToCache(entry.FileEntry.File.Size, data, false) + entry.writeAt(entry.FileEntry.File.Size, data, false) return nil }) } @@ -409,7 +398,7 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, return fmt.Errorf("error loading file info: %v", err) } return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAtToCache(0, data, true) + entry.writeAt(0, data, true) return nil }) } @@ -448,7 +437,7 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o return fmt.Errorf("error loading data parts: %v", err) } return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAtToCache(offset, data, false) + entry.writeAt(offset, data, false) return nil }) } @@ -531,22 +520,35 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) return s.ReadAt(ctx, blockId, name, 0, file.Size) } -func (s *BlockStore) FlushCache(ctx context.Context) error { +func (s *BlockStore) getDirtyCacheKeys() []cacheKey { var dirtyCacheKeys []cacheKey s.Lock.Lock() + defer s.Lock.Unlock() for key, entry := range s.Cache { if entry.FileEntry != nil && entry.FileEntry.Dirty.Load() { dirtyCacheKeys = append(dirtyCacheKeys, key) - continue - } - for _, dataEntry := range entry.DataEntries { - if dataEntry != nil && dataEntry.Dirty.Load() { - dirtyCacheKeys = append(dirtyCacheKeys, key) - break - } } } - s.Lock.Unlock() + return dirtyCacheKeys +} + +func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) error { + // todo + return nil +} + +func (s *BlockStore) FlushCache(ctx context.Context) error { + // get a copy of dirty keys so we can iterate without the lock + dirtyCacheKeys := s.getDirtyCacheKeys() + for _, key := range dirtyCacheKeys { + err := s.flushFile(ctx, key.BlockId, key.Name) + if err != nil { + // if error is not transient, we should probably delete the offending entry :/ + log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err) + continue + } + s.cleanCacheEntry(key.BlockId, key.Name) + } return nil } diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 242025878..d6679893c 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -10,6 +10,7 @@ import ( "log" "sync" "sync/atomic" + "time" ) type cacheKey struct { @@ -51,6 +52,7 @@ type WriteIntention struct { // - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced // - when pinned, the cache entry is never removed // this allows us to flush the cache entry to disk without holding the lock +// if there is a dirty data entry, then FileEntry must also be dirty type CacheEntry struct { BlockId string Name string @@ -58,7 +60,7 @@ type CacheEntry struct { Deleted bool WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention FileEntry *FileCacheEntry - DataEntries []*DataCacheEntry + DataEntries map[int]*DataCacheEntry } //lint:ignore U1000 used for testing @@ -68,10 +70,8 @@ func (e *CacheEntry) dump() string { if e.FileEntry != nil { fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File) } - for i, dce := range e.DataEntries { - if dce != nil { - fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", i, dce.Dirty.Load(), string(dce.Data)) - } + for idx, dce := range e.DataEntries { + fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", idx, dce.Dirty.Load(), string(dce.Data)) } buf.WriteString("}\n") return buf.String() @@ -114,11 +114,8 @@ func (s *BlockStore) clearCache() { s.Cache = make(map[cacheKey]*CacheEntry) } -func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry { - for len(e.DataEntries) <= partIdx { - e.DataEntries = append(e.DataEntries, nil) - } - if create && e.DataEntries[partIdx] == nil { +func (e *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry { + if e.DataEntries[partIdx] == nil { e.DataEntries[partIdx] = makeDataCacheEntry(partIdx) } return e.DataEntries[partIdx] @@ -151,8 +148,9 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC } func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { + endWriteOffset := offset + int64(len(data)) if replace { - entry.DataEntries = nil + entry.DataEntries = make(map[int]*DataCacheEntry) } for len(data) > 0 { partIdx := int(offset / partDataSize) @@ -161,12 +159,17 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { partIdx = partIdx % maxPart } partOffset := offset % partDataSize - partData := entry.ensurePart(partIdx, true) + partData := entry.getOrCreateDataCacheEntry(partIdx) nw, newDce := partData.writeToPart(partOffset, data) entry.DataEntries[partIdx] = newDce data = data[nw:] offset += nw } + entry.modifyFileData(func(file *BlockFile) { + if endWriteOffset > file.Size || replace { + file.Size = endWriteOffset + } + }) } type BlockStore struct { @@ -182,7 +185,7 @@ func makeCacheEntry(blockId string, name string) *CacheEntry { PinCount: 0, WriteIntentions: make(map[int]WriteIntention), FileEntry: nil, - DataEntries: nil, + DataEntries: make(map[int]*DataCacheEntry), } } @@ -238,6 +241,7 @@ func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionI defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { + warningCount.Add(1) log.Printf("warning: cannot find write intention to clear %q %q", blockId, name) return } @@ -249,30 +253,13 @@ func (s *BlockStore) unpinCacheEntry(blockId string, name string) { defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { + warningCount.Add(1) log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name) return } entry.PinCount-- } -// returns true if the entry was deleted (or there is no cache entry) -func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) bool { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - return true - } - if entry.PinCount > 0 { - return false - } - if len(entry.WriteIntentions) > 0 { - return false - } - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) - return true -} - // getFileFromCache returns the file from the cache if it exists // makes a copy, so it can be used by the caller // return (file, cached) @@ -305,6 +292,7 @@ func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) { } // always set to dirty (we're modifying it) fileEntry.Dirty.Store(true) + fileEntry.File.ModTs = time.Now().UnixMilli() fn(&fileEntry.File) } @@ -335,7 +323,6 @@ func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, [] func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error { fileEntry, dirtyData := s.getDirtyDataEntries(entry) if fileEntry == nil && len(dirtyData) == 0 { - s.tryDeleteCacheEntry(entry.BlockId, entry.Name) return nil } err := dbWriteCacheEntry(ctx, fileEntry, dirtyData) @@ -344,3 +331,51 @@ func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error { } return nil } + +func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool { + if entry.FileEntry == nil { + warningCount.Add(1) + log.Printf("warning: checking pinned, but no FileEntry %q %q", entry.BlockId, entry.Name) + return false + } + lastIncomplete := entry.FileEntry.File.getLastIncompletePartNum() + for _, intention := range entry.WriteIntentions { + if intention.Append && partIdx == lastIncomplete { + return true + } + if intention.Parts[partIdx] > 0 { + return true + } + // note "replace" does not pin anything + } + return false +} + +// removes clean data entries (if they aren't pinned) +// and if the entire cache entry is not in use and is clean, removes the entry +func (s *BlockStore) cleanCacheEntry(blockId string, name string) { + s.Lock.Lock() + defer s.Lock.Unlock() + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return + } + var hasDirtyData bool + for _, dce := range entry.DataEntries { + if dce.Flushing.Load() || dce.Dirty.Load() || entry.isDataBlockPinned(dce.PartIdx) { + hasDirtyData = true + continue + } + delete(entry.DataEntries, dce.PartIdx) + } + if hasDirtyData || (entry.FileEntry != nil && entry.FileEntry.Dirty.Load()) { + return + } + if entry.PinCount > 0 { + return + } + if len(entry.WriteIntentions) > 0 { + return + } + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) +} diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 5e94560cd..8f7887eb2 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "log" + "sync/atomic" "testing" "time" @@ -18,6 +19,7 @@ func initDb(t *testing.T) { t.Logf("initializing db for %q", t.Name()) useTestingDb = true partDataSize = 50 + warningCount = &atomic.Int32{} stopFlush.Store(true) err := InitBlockstore() if err != nil { @@ -34,6 +36,9 @@ func cleanupDb(t *testing.T) { useTestingDb = false partDataSize = DefaultPartDataSize GBS.clearCache() + if warningCount.Load() > 0 { + t.Errorf("warning count: %d", warningCount.Load()) + } } func TestCreate(t *testing.T) {