diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 6a28e3776..a369b5ee4 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -10,7 +10,7 @@ package blockstore import ( "context" "fmt" - "log" + "os" "sync" "sync/atomic" "time" @@ -28,10 +28,8 @@ var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} var GBS *BlockStore = &BlockStore{ - Lock: &sync.Mutex{}, - Cache: make(map[cacheKey]*CacheEntry), - NextIntentionId: 1, - IsFlushing: false, + Lock: &sync.Mutex{}, + Cache: make(map[cacheKey]*CacheEntry), } type FileOptsType struct { @@ -100,55 +98,33 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize } } - var cacheErr error - s.withLock(blockId, name, false, func(entry *CacheEntry) { - if entry == nil { - return + return withLock(s, blockId, name, func(entry *CacheEntry) error { + if entry.File != nil { + return os.ErrExist } - if !entry.Deleted { - cacheErr = fmt.Errorf("file exists") - return + now := time.Now().UnixMilli() + file := &BlockFile{ + BlockId: blockId, + Name: name, + Size: 0, + CreatedTs: now, + ModTs: now, + Opts: opts, + Meta: meta, } - // deleted is set. check intentions - if entry.PinCount == 0 && len(entry.WriteIntentions) == 0 { - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) - return - } - cacheErr = fmt.Errorf("file is deleted but has active requests") + return dbInsertFile(ctx, file) }) - if cacheErr != nil { - return cacheErr - } - now := time.Now().UnixMilli() - file := &BlockFile{ - BlockId: blockId, - Name: name, - Size: 0, - CreatedTs: now, - ModTs: now, - Opts: opts, - Meta: meta, - } - return dbInsertFile(ctx, file) } func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error { - err := dbDeleteFile(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error deleting file: %v", err) - } - s.withLock(blockId, name, false, func(entry *CacheEntry) { - if entry == nil { - return - } - if entry.PinCount > 0 || len(entry.WriteIntentions) > 0 { - // mark as deleted if we have a active requests - entry.Deleted = true - } else { - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + return withLock(s, blockId, name, func(entry *CacheEntry) error { + err := dbDeleteFile(ctx, blockId, name) + if err != nil { + return fmt.Errorf("error deleting file: %v", err) } + entry.clear() + return nil }) - return nil } func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { @@ -162,23 +138,15 @@ func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { return nil } +// if file doesn't exsit, returns os.ErrNotExist func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { - file, ok := s.getFileFromCache(blockId, name) - if ok { - return file, nil - } - return dbGetBlockFile(ctx, blockId, name) -} - -func stripNils[T any](arr []*T) []*T { - newArr := make([]*T, 0) - for _, item := range arr { - if item == nil { - continue + return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) { + file, err := entry.loadFileForRead(ctx) + if err != nil { + return nil, fmt.Errorf("error getting file: %v", err) } - newArr = append(newArr, item) - } - return newArr + return file.DeepCopy(), nil + }) } func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { @@ -186,87 +154,153 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil if err != nil { return nil, fmt.Errorf("error getting block files: %v", err) } - // now we wash the files through the cache - var hasNils bool - for idx, dbFile := range files { - cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name) - if ok { - if cacheFile == nil { - hasNils = true + for idx, file := range files { + withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error { + if entry.File != nil { + files[idx] = entry.File.DeepCopy() } - files[idx] = cacheFile - } - } - if hasNils { - files = stripNils(files) + return nil + }) } return files, nil } func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { - s.pinCacheEntry(blockId, name) - defer s.unpinCacheEntry(blockId, name) - _, err := s.loadFileInfo(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error loading file info: %v", err) - } - return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.modifyFileData(func(file *BlockFile) { - if merge { - for k, v := range meta { - if v == nil { - delete(file.Meta, k) - continue - } - file.Meta[k] = v + return withLock(s, blockId, name, func(entry *CacheEntry) error { + err := entry.loadFileIntoCache(ctx) + if err != nil { + return err + } + if merge { + for k, v := range meta { + if v == nil { + delete(entry.File.Meta, k) + continue } - } else { - file.Meta = meta + entry.File.Meta[k] = v } - }) + } else { + entry.File.Meta = meta + } + entry.File.ModTs = time.Now().UnixMilli() return nil }) } -func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) { - file, ok := s.getFileFromCache(blockId, name) - if ok { - if file == nil { - return nil, fmt.Errorf("file not found") +func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error { + return withLock(s, blockId, name, func(entry *CacheEntry) error { + err := entry.loadFileIntoCache(ctx) + if err != nil { + return err } - return file, nil - } - dbFile, err := dbGetBlockFile(ctx, blockId, name) - if err != nil { - return nil, fmt.Errorf("error getting file: %v", err) - } - if dbFile == nil { - return nil, fmt.Errorf("file not found") - } - var rtnErr error - rtnFile := dbFile - // cannot use withLockExists because we're setting entry.FileEntry! - s.withLock(blockId, name, true, func(entry *CacheEntry) { - if entry.Deleted { - rtnFile = nil - rtnErr = fmt.Errorf("file is deleted") - return - } - if entry.FileEntry != nil { - // someone beat us to it - rtnFile = entry.FileEntry.File.DeepCopy() - return - } - entry.FileEntry = &FileCacheEntry{ - Dirty: &atomic.Bool{}, - Flushing: &atomic.Bool{}, - File: *dbFile.DeepCopy(), // make a copy since File must be immutable - } - // returns dbFile, nil + entry.writeAt(0, data, true) + return nil }) - return rtnFile, rtnErr } +func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { + if offset < 0 { + return fmt.Errorf("offset must be non-negative") + } + return withLock(s, blockId, name, func(entry *CacheEntry) error { + err := entry.loadFileIntoCache(ctx) + if err != nil { + return err + } + file := entry.File + if offset > file.Size { + return fmt.Errorf("offset is past the end of the file") + } + if file.Opts.Circular { + startCirFileOffset := file.Size - file.Opts.MaxSize + if offset+int64(len(data)) < startCirFileOffset { + // write is before the start of the circular file + return nil + } + if offset < startCirFileOffset { + amtBeforeStart := startCirFileOffset - offset + offset += amtBeforeStart + data = data[amtBeforeStart:] + } + } + partMap := file.computePartMap(offset, int64(len(data))) + incompleteParts := incompletePartsFromMap(partMap) + err = entry.loadDataPartsIntoCache(ctx, incompleteParts) + if err != nil { + return err + } + entry.writeAt(offset, data, true) + return nil + }) +} + +func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { + return withLock(s, blockId, name, func(entry *CacheEntry) error { + err := entry.loadFileIntoCache(ctx) + if err != nil { + return err + } + lastPartIdx := entry.File.getLastIncompletePartNum() + if lastPartIdx != NoPartIdx { + err = entry.loadDataPartsIntoCache(ctx, []int{lastPartIdx}) + if err != nil { + return err + } + } + entry.writeAt(entry.File.Size, data, false) + return nil + }) +} + +func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { + return dbGetAllBlockIds(ctx) +} + +// returns (offset, data, error) +// we return the offset because the offset may have been adjusted if the size was too big (for circular files) +func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) { + withLock(s, blockId, name, func(entry *CacheEntry) error { + rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false) + return nil + }) + return +} + +// returns (offset, data, error) +func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) { + withLock(s, blockId, name, func(entry *CacheEntry) error { + rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true) + return nil + }) + return +} + +func (s *BlockStore) FlushCache(ctx context.Context) error { + wasFlushing := s.setUnlessFlushing() + if wasFlushing { + return fmt.Errorf("flush already in progress") + } + defer s.setIsFlushing(false) + + // get a copy of dirty keys so we can iterate without the lock + dirtyCacheKeys := s.getDirtyCacheKeys() + for _, key := range dirtyCacheKeys { + err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { + return entry.flushToDB(ctx) + }) + if ctx.Err() != nil { + // transient error (also must stop the loop) + return ctx.Err() + } + if err != nil { + return fmt.Errorf("error flushing cache entry[%v]: %v", key, err) + } + } + return nil +} + +/////////////////////////////////// + func (f *BlockFile) getLastIncompletePartNum() int { if f.Size%partDataSize == 0 { return NoPartIdx @@ -283,83 +317,6 @@ func (f *BlockFile) partIdxAtOffset(offset int64) int { return partIdx } -// blockfile must be loaded -func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error { - var partIdx int - err := s.withLockExists(blockId, name, func(entry *CacheEntry) error { - partIdx = entry.FileEntry.File.getLastIncompletePartNum() - return nil - }) - if err != nil { - return err - } - if partIdx == NoPartIdx { - return nil - } - return s.loadDataParts(ctx, blockId, name, []int{partIdx}) -} - -func maxOfIntArr(arr []int) int { - if len(arr) == 0 { - return 0 - } - max := arr[0] - for _, v := range arr[1:] { - if v > max { - max = v - } - } - return max -} - -func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error { - partDataMap, err := dbGetFileParts(ctx, blockId, name, parts) - if err != nil { - return fmt.Errorf("error getting file part: %v", err) - } - return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - for partIdx, partData := range partDataMap { - if entry.DataEntries[partIdx] != nil { - // someone beat us to it - continue - } - entry.DataEntries[partIdx] = partData - } - return nil - }) -} - -func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error { - return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAt(entry.FileEntry.File.Size, data, false) - return nil - }) -} - -func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { - s.pinCacheEntry(blockId, name) - defer s.unpinCacheEntry(blockId, name) - intentionId := s.setWriteIntention(blockId, name, WriteIntention{Append: true}) - defer s.clearWriteIntention(blockId, name, intentionId) - _, err := s.loadFileInfo(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error loading file info: %v", err) - } - err = s.loadLastDataBlock(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error loading last data block: %v", err) - } - err = s.appendDataToCache(blockId, name, data) - if err != nil { - return fmt.Errorf("error appending data: %v", err) - } - return nil -} - -func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { - return dbGetAllBlockIds(ctx) -} - func incompletePartsFromMap(partMap map[int]int) []int { var incompleteParts []int for partIdx, size := range partMap { @@ -370,6 +327,14 @@ func incompletePartsFromMap(partMap map[int]int) []int { return incompleteParts } +func getPartIdxsFromMap(partMap map[int]int) []int { + var partIdxs []int + for partIdx := range partMap { + partIdxs = append(partIdxs, partIdx) + } + return partIdxs +} + // returns a map of partIdx to amount of data to write to that part func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int { partMap := make(map[int]int) @@ -392,189 +357,18 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int return partMap } -func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error { - s.pinCacheEntry(blockId, name) - defer s.unpinCacheEntry(blockId, name) - intentionId := s.setWriteIntention(blockId, name, WriteIntention{Replace: true}) - defer s.clearWriteIntention(blockId, name, intentionId) - _, err := s.loadFileInfo(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error loading file info: %v", err) - } - return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAt(0, data, true) - return nil - }) -} - -func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { - s.pinCacheEntry(blockId, name) - defer s.unpinCacheEntry(blockId, name) - file, err := s.loadFileInfo(ctx, blockId, name) - if err != nil { - return fmt.Errorf("error loading file info: %v", err) - } - if offset < 0 { - return fmt.Errorf("offset must be non-negative") - } - if offset > file.Size { - return fmt.Errorf("offset is past the end of the file") - } - if file.Opts.Circular { - startCirFileOffset := file.Size - file.Opts.MaxSize - if offset+int64(len(data)) < startCirFileOffset { - // write is before the start of the circular file - return nil - } - if offset < startCirFileOffset { - amtBeforeStart := startCirFileOffset - offset - offset += amtBeforeStart - data = data[amtBeforeStart:] - } - } - partMap := file.computePartMap(offset, int64(len(data))) - intentionId := s.setWriteIntention(blockId, name, WriteIntention{Parts: partMap}) - defer s.clearWriteIntention(blockId, name, intentionId) - incompleteParts := incompletePartsFromMap(partMap) - err = s.loadDataParts(ctx, blockId, name, incompleteParts) - if err != nil { - return fmt.Errorf("error loading data parts: %v", err) - } - return s.withLockExists(blockId, name, func(entry *CacheEntry) error { - entry.writeAt(offset, data, false) - return nil - }) -} - -// returns (offset, data, error) -// we return the offset because the offset may have been adjusted if the size was too big (for circular files) -func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) { - file, err := s.Stat(ctx, blockId, name) - if err != nil { - return 0, nil, fmt.Errorf("error getting file: %v", err) - } - if file.Opts.Circular { - // we can do this check here because MaxSize for file cannot be modified - if size > file.Opts.MaxSize { - // just read the last maxsize bytes - sizeTooBig := size - file.Opts.MaxSize - offset += sizeTooBig - } - } - partMap := file.computePartMap(offset, size) - var partsNeeded []int - for partIdx := range partMap { - partsNeeded = append(partsNeeded, partIdx) - } - dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) - if err != nil { - return 0, nil, fmt.Errorf("error loading data parts: %v", err) - } - // wash the entries through the cache - s.withLock(blockId, name, false, func(entry *CacheEntry) { - if entry == nil || entry.FileEntry == nil { - return - } - if offset+size > entry.FileEntry.File.Size { - // limit read to the actual size of the file - size = entry.FileEntry.File.Size - offset - } - for _, partIdx := range partsNeeded { - if len(entry.DataEntries) <= partIdx || entry.DataEntries[partIdx] == nil { - continue - } - dataEntries[partIdx] = entry.DataEntries[partIdx] - } - }) - // combine the entries into a single byte slice - // note that we only want part of the first and last part depending on offset and size - var rtn []byte - amtLeftToRead := size - curReadOffset := offset - for amtLeftToRead > 0 { - partIdx := file.partIdxAtOffset(curReadOffset) - partDataEntry := dataEntries[partIdx] - var partData []byte - if partDataEntry == nil { - partData = make([]byte, partDataSize) - } else { - partData = partDataEntry.Data[0:partDataSize] - } - partOffset := curReadOffset % partDataSize - amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) - rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) - amtLeftToRead -= amtToRead - curReadOffset += amtToRead - } - return offset, rtn, nil -} - -func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (int64, []byte, error) { - file, err := s.Stat(ctx, blockId, name) - if err != nil { - return 0, nil, fmt.Errorf("error getting file: %v", err) - } - if file == nil { - return 0, nil, fmt.Errorf("file not found") - } - return s.ReadAt(ctx, blockId, name, 0, file.Size) -} - func (s *BlockStore) getDirtyCacheKeys() []cacheKey { - var dirtyCacheKeys []cacheKey s.Lock.Lock() defer s.Lock.Unlock() + var dirtyCacheKeys []cacheKey for key, entry := range s.Cache { - if entry.FileEntry != nil && entry.FileEntry.Dirty.Load() { + if entry.File != nil { dirtyCacheKeys = append(dirtyCacheKeys, key) } } return dirtyCacheKeys } -func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) (rtnErr error) { - fileEntry, dataEntries := s.getDirtyDataEntriesForFlush(blockId, name) - if fileEntry == nil { - return nil - } - defer func() { - // clear flushing flags (always) - // clear dirty flags if no error - // no lock required, note that we unset dirty before flushing - if rtnErr == nil { - fileEntry.Dirty.Store(false) - } - fileEntry.Flushing.Store(false) - for _, dataEntry := range dataEntries { - if rtnErr == nil { - dataEntry.Dirty.Store(false) - } - dataEntry.Flushing.Store(false) - } - }() - rtnErr = dbWriteCacheEntry(ctx, fileEntry, dataEntries) - if rtnErr != nil { - rtnErr = fmt.Errorf("error writing cache entry: %v", rtnErr) - return rtnErr - } - return nil -} - -func (s *BlockStore) incrementFlushErrors(blockId string, name string) int { - var rtn int - s.withLock(blockId, name, false, func(entry *CacheEntry) { - entry.FlushErrors++ - rtn = entry.FlushErrors - }) - return rtn -} - -func (s *BlockStore) deleteCacheEntry(blockId string, name string) { - s.Lock.Lock() - defer s.Lock.Unlock() - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) -} - func (s *BlockStore) setIsFlushing(flushing bool) { s.Lock.Lock() defer s.Lock.Unlock() @@ -593,38 +387,6 @@ func (s *BlockStore) setUnlessFlushing() bool { } -func (s *BlockStore) FlushCache(ctx context.Context) error { - wasFlushing := s.setUnlessFlushing() - if wasFlushing { - return fmt.Errorf("flush already in progress") - } - defer s.setIsFlushing(false) - - // get a copy of dirty keys so we can iterate without the lock - dirtyCacheKeys := s.getDirtyCacheKeys() - for _, key := range dirtyCacheKeys { - err := s.flushFile(ctx, key.BlockId, key.Name) - if err != nil { - if ctx.Err() != nil { - // context error is transient - return fmt.Errorf("context error: %v", ctx.Err()) - } - // if error is not transient, we should probably delete the offending entry :/ - log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err) - flushErrorCount.Add(1) - totalErrors := s.incrementFlushErrors(key.BlockId, key.Name) - if totalErrors >= 3 { - s.deleteCacheEntry(key.BlockId, key.Name) - log.Printf("too many errors flushing file %s/%s, clear entry", key.BlockId, key.Name) - - } - continue - } - s.cleanCacheEntry(key.BlockId, key.Name) - } - return nil -} - func minInt64(a, b int64) int64 { if a < b { return a diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 09505f97b..a2af2eef2 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -4,11 +4,10 @@ package blockstore import ( - "bytes" + "context" "fmt" - "log" + "os" "sync" - "sync/atomic" "time" ) @@ -17,123 +16,122 @@ type cacheKey struct { Name string } -// note about "Dirty" and "Flushing" fields: -// - Dirty is set to true when the entry is modified -// - Flushing is set to true when the entry is being flushed to disk -// note these fields can *only* be set to true while holding the store lock -// but the flusher may set them to false without the lock (when the flusher no longer will read the entry fields) -// the flusher *must* unset Dirty first, then Flushing -// other code should test Flushing before Dirty -// that means you *cannot* write a field in a cache entry if Flushing.Load() is true (you must make a copy) +type BlockStore struct { + Lock *sync.Mutex + Cache map[cacheKey]*CacheEntry + IsFlushing bool +} + type DataCacheEntry struct { - Dirty *atomic.Bool - Flushing *atomic.Bool - PartIdx int - Data []byte // capacity is always BlockDataPartSize + PartIdx int + Data []byte // capacity is always BlockDataPartSize } -type FileCacheEntry struct { - Dirty *atomic.Bool - Flushing *atomic.Bool - File BlockFile -} - -type WriteIntention struct { - Parts map[int]int - Append bool - Replace bool -} - -// invariants: -// - we only modify CacheEntry fields when we are holding the BlockStore lock -// - FileEntry can be nil, if pinned -// - FileEntry.File is never updated in place, the entire FileEntry is replaced -// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced -// - when pinned, the cache entry is never removed -// this allows us to flush the cache entry to disk without holding the lock -// if there is a dirty data entry, then FileEntry must also be dirty +// if File or DataEntries are not nil then they are dirty (need to be flushed to disk) type CacheEntry struct { - BlockId string - Name string - PinCount int - Deleted bool - WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention - FileEntry *FileCacheEntry - DataEntries map[int]*DataCacheEntry - FlushErrors int -} + PinCount int // this is synchronzed with the BlockStore lock (not the entry lock) -//lint:ignore U1000 used for testing -func (e *CacheEntry) dump() string { - var buf bytes.Buffer - fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.PinCount, e.Deleted, e.WriteIntentions) - if e.FileEntry != nil { - fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File) - } - for idx, dce := range e.DataEntries { - fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", idx, dce.Dirty.Load(), string(dce.Data)) - } - buf.WriteString("}\n") - return buf.String() -} - -//lint:ignore U1000 used for testing -func (s *BlockStore) dump() string { - s.Lock.Lock() - defer s.Lock.Unlock() - var buf bytes.Buffer - buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache))) - for _, v := range s.Cache { - entryStr := v.dump() - buf.WriteString(entryStr) - buf.WriteString("\n") - } - return buf.String() + Lock *sync.Mutex + BlockId string + Name string + File *BlockFile + DataEntries map[int]*DataCacheEntry + FlushErrors int } func makeDataCacheEntry(partIdx int) *DataCacheEntry { return &DataCacheEntry{ - Dirty: &atomic.Bool{}, - Flushing: &atomic.Bool{}, - PartIdx: partIdx, - Data: make([]byte, 0, partDataSize), + PartIdx: partIdx, + Data: make([]byte, 0, partDataSize), } } -// for testing -func (s *BlockStore) getCacheSize() int { +// will create new entries +func (s *BlockStore) getEntryAndPin(blockId string, name string) *CacheEntry { s.Lock.Lock() defer s.Lock.Unlock() - return len(s.Cache) + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + entry = makeCacheEntry(blockId, name) + s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry + } + entry.PinCount++ + return entry } -// for testing -func (s *BlockStore) clearCache() { +func (s *BlockStore) unpinEntryAndTryDelete(blockId string, name string) { s.Lock.Lock() defer s.Lock.Unlock() - s.Cache = make(map[cacheKey]*CacheEntry) + entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] + if entry == nil { + return + } + entry.PinCount-- + if entry.PinCount <= 0 && entry.File == nil { + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + } } -func (e *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry { - if e.DataEntries[partIdx] == nil { - e.DataEntries[partIdx] = makeDataCacheEntry(partIdx) - } - return e.DataEntries[partIdx] +func (entry *CacheEntry) clear() { + entry.File = nil + entry.DataEntries = make(map[int]*DataCacheEntry) + entry.FlushErrors = 0 } -func (dce *DataCacheEntry) clonePart() *DataCacheEntry { - rtn := makeDataCacheEntry(dce.PartIdx) - copy(rtn.Data, dce.Data) - if dce.Dirty.Load() { - rtn.Dirty.Store(true) +func (entry *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry { + if entry.DataEntries[partIdx] == nil { + entry.DataEntries[partIdx] = makeDataCacheEntry(partIdx) } - return rtn + return entry.DataEntries[partIdx] +} + +// returns err if file does not exist +func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error { + if entry.File != nil { + return nil + } + file, err := entry.loadFileForRead(ctx) + if err != nil { + return err + } + entry.File = file + return nil +} + +// does not populate the cache entry, returns err if file does not exist +func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error) { + if entry.File != nil { + return entry.File, nil + } + file, err := dbGetBlockFile(ctx, entry.BlockId, entry.Name) + if err != nil { + return nil, fmt.Errorf("error getting file: %w", err) + } + if file == nil { + return nil, os.ErrNotExist + } + return file, nil +} + +func withLock(s *BlockStore, blockId string, name string, fn func(*CacheEntry) error) error { + entry := s.getEntryAndPin(blockId, name) + defer s.unpinEntryAndTryDelete(blockId, name) + entry.Lock.Lock() + defer entry.Lock.Unlock() + return fn(entry) +} + +func withLockRtn[T any](s *BlockStore, blockId string, name string, fn func(*CacheEntry) (T, error)) (T, error) { + var rtnVal T + rtnErr := withLock(s, blockId, name, func(entry *CacheEntry) error { + var err error + rtnVal, err = fn(entry) + return err + }) + return rtnVal, rtnErr } func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) { - if dce.Flushing.Load() { - dce = dce.clonePart() - } leftInPart := partDataSize - offset toWrite := int64(len(data)) if toWrite > leftInPart { @@ -143,7 +141,6 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC dce.Data = dce.Data[:offset+toWrite] } copy(dce.Data[offset:], data[:toWrite]) - dce.Dirty.Store(true) return toWrite, dce } @@ -154,8 +151,8 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { } for len(data) > 0 { partIdx := int(offset / partDataSize) - if entry.FileEntry.File.Opts.Circular { - maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize) + if entry.File.Opts.Circular { + maxPart := int(entry.File.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } partOffset := offset % partDataSize @@ -165,211 +162,150 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) { data = data[nw:] offset += nw } - entry.modifyFileData(func(file *BlockFile) { - if endWriteOffset > file.Size || replace { - file.Size = endWriteOffset - } - }) + if endWriteOffset > entry.File.Size || replace { + entry.File.Size = endWriteOffset + } + entry.File.ModTs = time.Now().UnixMilli() } -type BlockStore struct { - Lock *sync.Mutex - Cache map[cacheKey]*CacheEntry - NextIntentionId int - IsFlushing bool +// returns (realOffset, data, error) +func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) { + if offset < 0 { + return 0, nil, fmt.Errorf("offset cannot be negative") + } + file, err := entry.loadFileForRead(ctx) + if err != nil { + return 0, nil, err + } + if readFull { + size = file.Size - offset + } + if offset+size > file.Size { + size = file.Size - offset + } + if file.Opts.Circular { + realDataOffset := int64(0) + if file.Size > file.Opts.MaxSize { + realDataOffset = file.Size - file.Opts.MaxSize + } + if offset < realDataOffset { + offset = realDataOffset + } + } + partMap := entry.File.computePartMap(offset, size) + dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap)) + if err != nil { + return 0, nil, err + } + // combine the entries into a single byte slice + // note that we only want part of the first and last part depending on offset and size + rtnData := make([]byte, 0, size) + amtLeftToRead := size + curReadOffset := offset + for amtLeftToRead > 0 { + partIdx := file.partIdxAtOffset(curReadOffset) + partDataEntry := dataEntryMap[partIdx] + var partData []byte + if partDataEntry == nil { + partData = make([]byte, partDataSize) + } else { + partData = partDataEntry.Data[0:partDataSize] + } + partOffset := curReadOffset % partDataSize + amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) + rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...) + amtLeftToRead -= amtToRead + curReadOffset += amtToRead + } + return offset, rtnData, nil +} + +func prunePartsWithCache(dataEntries map[int]*DataCacheEntry, parts []int) []int { + var rtn []int + for _, partIdx := range parts { + if dataEntries[partIdx] != nil { + continue + } + rtn = append(rtn, partIdx) + } + return rtn +} + +func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int) error { + parts = prunePartsWithCache(entry.DataEntries, parts) + if len(parts) == 0 { + // parts are already loaded + return nil + } + dbDataParts, err := dbGetFileParts(ctx, entry.BlockId, entry.Name, parts) + if err != nil { + return fmt.Errorf("error getting data parts: %w", err) + } + for partIdx, dce := range dbDataParts { + entry.DataEntries[partIdx] = dce + } + return nil +} + +func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) (map[int]*DataCacheEntry, error) { + if len(parts) == 0 { + return nil, nil + } + dbParts := prunePartsWithCache(entry.DataEntries, parts) + var dbDataParts map[int]*DataCacheEntry + if len(dbParts) > 0 { + var err error + dbDataParts, err = dbGetFileParts(ctx, entry.BlockId, entry.Name, dbParts) + if err != nil { + return nil, fmt.Errorf("error getting data parts: %w", err) + } + } + rtn := make(map[int]*DataCacheEntry) + for _, partIdx := range parts { + if entry.DataEntries[partIdx] != nil { + rtn[partIdx] = entry.DataEntries[partIdx] + continue + } + if dbDataParts[partIdx] != nil { + rtn[partIdx] = dbDataParts[partIdx] + continue + } + // part not found + } + return rtn, nil } func makeCacheEntry(blockId string, name string) *CacheEntry { return &CacheEntry{ - BlockId: blockId, - Name: name, - PinCount: 0, - WriteIntentions: make(map[int]WriteIntention), - FileEntry: nil, - DataEntries: make(map[int]*DataCacheEntry), - FlushErrors: 0, + Lock: &sync.Mutex{}, + BlockId: blockId, + Name: name, + PinCount: 0, + File: nil, + DataEntries: make(map[int]*DataCacheEntry), + FlushErrors: 0, } } -func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - if shouldCreate { - entry = makeCacheEntry(blockId, name) - s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry +func (entry *CacheEntry) flushToDB(ctx context.Context) error { + if entry.File == nil { + return nil + } + err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries) + if ctx.Err() != nil { + // transient error + return ctx.Err() + } + if err != nil { + flushErrorCount.Add(1) + entry.FlushErrors++ + if entry.FlushErrors > 3 { + entry.clear() + return fmt.Errorf("too many flush errors (clearing entry): %w", err) } + return err } - f(entry) -} - -func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil || entry.Deleted || entry.FileEntry == nil { - return fmt.Errorf("file not found") - } - return f(entry) -} - -func (s *BlockStore) pinCacheEntry(blockId string, name string) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - entry = makeCacheEntry(blockId, name) - s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry - } - entry.PinCount++ -} - -func (s *BlockStore) setWriteIntention(blockId string, name string, intention WriteIntention) int { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - return 0 - } - intentionId := s.NextIntentionId - s.NextIntentionId++ - entry.WriteIntentions[intentionId] = intention - return intentionId -} - -func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionId int) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - warningCount.Add(1) - log.Printf("warning: cannot find write intention to clear %q %q", blockId, name) - return - } - delete(entry.WriteIntentions, intentionId) -} - -func (s *BlockStore) unpinCacheEntry(blockId string, name string) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - warningCount.Add(1) - log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name) - return - } - entry.PinCount-- -} - -// getFileFromCache returns the file from the cache if it exists -// makes a copy, so it can be used by the caller -// return (file, cached) -func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - return nil, false - } - if entry.Deleted { - return nil, true - } - if entry.FileEntry == nil { - return nil, false - } - return entry.FileEntry.File.DeepCopy(), true -} - -func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) { - var fileEntry = e.FileEntry - if e.FileEntry.Flushing.Load() { - // must make a copy - fileEntry = &FileCacheEntry{ - Dirty: &atomic.Bool{}, - Flushing: &atomic.Bool{}, - File: *e.FileEntry.File.DeepCopy(), - } - e.FileEntry = fileEntry - } - // always set to dirty (we're modifying it) - fileEntry.Dirty.Store(true) - fileEntry.File.ModTs = time.Now().UnixMilli() - fn(&fileEntry.File) -} - -// also sets Flushing to true on fileentry / dataentries -func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*FileCacheEntry, []*DataCacheEntry) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - return nil, nil - } - if entry.Deleted || entry.FileEntry == nil { - return nil, nil - } - var dirtyData []*DataCacheEntry - for _, dce := range entry.DataEntries { - if dce != nil && dce.Dirty.Load() { - dce.Flushing.Store(true) - dirtyData = append(dirtyData, dce) - } - } - if !entry.FileEntry.Dirty.Load() && len(dirtyData) == 0 { - return nil, nil - } - for _, data := range dirtyData { - data.Flushing.Store(true) - } - entry.FileEntry.Flushing.Store(true) - return entry.FileEntry, dirtyData -} - -func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool { - if entry.FileEntry == nil { - warningCount.Add(1) - log.Printf("warning: checking pinned, but no FileEntry %q %q", entry.BlockId, entry.Name) - return false - } - lastIncomplete := entry.FileEntry.File.getLastIncompletePartNum() - for _, intention := range entry.WriteIntentions { - if intention.Append && partIdx == lastIncomplete { - return true - } - if intention.Parts[partIdx] > 0 { - return true - } - // note "replace" does not pin anything - } - return false -} - -// removes clean data entries (if they aren't pinned) -// and if the entire cache entry is not in use and is clean, removes the entry -func (s *BlockStore) cleanCacheEntry(blockId string, name string) { - s.Lock.Lock() - defer s.Lock.Unlock() - entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] - if entry == nil { - return - } - var hasDirtyData bool - for _, dce := range entry.DataEntries { - if dce.Flushing.Load() || dce.Dirty.Load() || entry.isDataBlockPinned(dce.PartIdx) { - hasDirtyData = true - continue - } - delete(entry.DataEntries, dce.PartIdx) - } - if hasDirtyData || (entry.FileEntry != nil && entry.FileEntry.Dirty.Load()) { - return - } - if entry.PinCount > 0 { - return - } - if len(entry.WriteIntentions) > 0 { - return - } - delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + // clear cache entry (data is now in db) + entry.clear() + return nil } diff --git a/pkg/blockstore/blockstore_dbops.go b/pkg/blockstore/blockstore_dbops.go index fc0dedcd5..7829de448 100644 --- a/pkg/blockstore/blockstore_dbops.go +++ b/pkg/blockstore/blockstore_dbops.go @@ -3,7 +3,7 @@ package blockstore import ( "context" "fmt" - "sync/atomic" + "os" "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" ) @@ -54,14 +54,15 @@ func dbGetAllBlockIds(ctx context.Context) ([]string, error) { } func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) { + if len(parts) == 0 { + return nil, nil + } return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) { var data []*DataCacheEntry query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))" tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts)) rtn := make(map[int]*DataCacheEntry) for _, d := range data { - d.Dirty = &atomic.Bool{} - d.Flushing = &atomic.Bool{} if cap(d.Data) != int(partDataSize) { newData := make([]byte, len(d.Data), partDataSize) copy(newData, d.Data) @@ -81,26 +82,22 @@ func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) }) } -func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntries []*DataCacheEntry) error { - if fileEntry == nil { - return fmt.Errorf("fileEntry or fileEntry.File is nil") - } +func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry) error { return WithTx(ctx, func(tx *TxWrap) error { query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?` - if !tx.Exists(query, fileEntry.File.BlockId, fileEntry.File.Name) { + if !tx.Exists(query, file.BlockId, file.Name) { // since deletion is synchronous this stops us from writing to a deleted file - return fmt.Errorf("file not found in db") - } - if fileEntry.Dirty.Load() { - query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?` - tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name) + return os.ErrNotExist } + // we don't update CreatedTs or Opts + query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?` + tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name) dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)` - for _, dataEntry := range dataEntries { - if dataEntry == nil || !dataEntry.Dirty.Load() { - continue + for partIdx, dataEntry := range dataEntries { + if partIdx != dataEntry.PartIdx { + panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx)) } - tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data) + tx.Exec(dataPartQuery, file.BlockId, file.Name, dataEntry.PartIdx, dataEntry.Data) } return nil }) diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 7658b9462..f381c372b 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -45,6 +45,43 @@ func cleanupDb(t *testing.T) { } } +func (s *BlockStore) getCacheSize() int { + s.Lock.Lock() + defer s.Lock.Unlock() + return len(s.Cache) +} + +func (s *BlockStore) clearCache() { + s.Lock.Lock() + defer s.Lock.Unlock() + s.Cache = make(map[cacheKey]*CacheEntry) +} + +//lint:ignore U1000 used for testing +func (e *CacheEntry) dump() string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount) + fmt.Fprintf(&buf, " FileEntry: %v\n", e.File) + for idx, dce := range e.DataEntries { + fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data)) + } + return buf.String() +} + +//lint:ignore U1000 used for testing +func (s *BlockStore) dump() string { + s.Lock.Lock() + defer s.Lock.Unlock() + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache))) + for _, v := range s.Cache { + entryStr := v.dump() + buf.WriteString(entryStr) + buf.WriteString("\n") + } + return buf.String() +} + func TestCreate(t *testing.T) { initDb(t) defer cleanupDb(t) @@ -426,14 +463,14 @@ func TestCircularWrites(t *testing.T) { } checkFileSize(t, ctx, blockId, "c1", 128) checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789") - GBS.withLock(blockId, "c1", false, func(entry *CacheEntry) { + err = withLock(GBS, blockId, "c1", func(entry *CacheEntry) error { if entry == nil { - err = fmt.Errorf("entry not found") - return + return fmt.Errorf("entry not found") } if len(entry.DataEntries) != 1 { - err = fmt.Errorf("data entries mismatch: expected 1, got %d", len(entry.DataEntries)) + return fmt.Errorf("data entries mismatch: expected 1, got %d", len(entry.DataEntries)) } + return nil }) if err != nil { t.Fatalf("error checking data entries: %v", err)