waveterm/pkg/blockstore/blockstore_cache.go

376 lines
10 KiB
Go
Raw Normal View History

2024-05-13 06:59:42 +02:00
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockstore
import (
2024-05-13 09:33:46 +02:00
"bytes"
2024-05-13 06:59:42 +02:00
"fmt"
"log"
2024-05-13 06:59:42 +02:00
"sync"
"sync/atomic"
2024-05-19 09:26:53 +02:00
"time"
2024-05-13 06:59:42 +02:00
)
type cacheKey struct {
BlockId string
Name string
}
// note about "Dirty" and "Flushing" fields:
// - Dirty is set to true when the entry is modified
// - Flushing is set to true when the entry is being flushed to disk
// note these fields can *only* be set to true while holding the store lock
// but the flusher may set them to false without the lock (when the flusher no longer will read the entry fields)
// the flusher *must* unset Dirty first, then Flushing
// other code should test Flushing before Dirty
// that means you *cannot* write a field in a cache entry if Flushing.Load() is true (you must make a copy)
2024-05-13 06:59:42 +02:00
type DataCacheEntry struct {
2024-05-13 22:40:25 +02:00
Dirty *atomic.Bool
Flushing *atomic.Bool
PartIdx int
Data []byte // capacity is always BlockDataPartSize
2024-05-13 06:59:42 +02:00
}
type FileCacheEntry struct {
Dirty *atomic.Bool
Flushing *atomic.Bool
File BlockFile
2024-05-13 06:59:42 +02:00
}
2024-05-18 21:31:54 +02:00
type WriteIntention struct {
Parts map[int]int
Append bool
Replace bool
2024-05-18 21:31:54 +02:00
}
2024-05-13 06:59:42 +02:00
// invariants:
// - we only modify CacheEntry fields when we are holding the BlockStore lock
// - FileEntry can be nil, if pinned
// - FileEntry.File is never updated in place, the entire FileEntry is replaced
// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced
// - when pinned, the cache entry is never removed
// this allows us to flush the cache entry to disk without holding the lock
2024-05-19 09:26:53 +02:00
// if there is a dirty data entry, then FileEntry must also be dirty
2024-05-13 06:59:42 +02:00
type CacheEntry struct {
2024-05-18 21:31:54 +02:00
BlockId string
Name string
PinCount int
Deleted bool
WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention
2024-05-18 21:31:54 +02:00
FileEntry *FileCacheEntry
2024-05-19 09:26:53 +02:00
DataEntries map[int]*DataCacheEntry
2024-05-19 21:22:55 +02:00
FlushErrors int
2024-05-13 06:59:42 +02:00
}
2024-05-18 21:31:54 +02:00
//lint:ignore U1000 used for testing
2024-05-13 09:33:46 +02:00
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.PinCount, e.Deleted, e.WriteIntentions)
2024-05-13 09:33:46 +02:00
if e.FileEntry != nil {
fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File)
}
2024-05-19 09:26:53 +02:00
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", idx, dce.Dirty.Load(), string(dce.Data))
2024-05-13 09:33:46 +02:00
}
buf.WriteString("}\n")
return buf.String()
}
2024-05-18 21:31:54 +02:00
//lint:ignore U1000 used for testing
2024-05-13 09:33:46 +02:00
func (s *BlockStore) dump() string {
s.Lock.Lock()
defer s.Lock.Unlock()
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache)))
for _, v := range s.Cache {
entryStr := v.dump()
buf.WriteString(entryStr)
buf.WriteString("\n")
}
return buf.String()
2024-05-13 22:40:25 +02:00
}
2024-05-13 09:33:46 +02:00
2024-05-13 22:40:25 +02:00
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
return &DataCacheEntry{
Dirty: &atomic.Bool{},
Flushing: &atomic.Bool{},
PartIdx: partIdx,
Data: make([]byte, 0, partDataSize),
}
2024-05-13 09:33:46 +02:00
}
2024-05-13 09:08:50 +02:00
// for testing
func (s *BlockStore) getCacheSize() int {
s.Lock.Lock()
defer s.Lock.Unlock()
return len(s.Cache)
}
// for testing
func (s *BlockStore) clearCache() {
s.Lock.Lock()
defer s.Lock.Unlock()
s.Cache = make(map[cacheKey]*CacheEntry)
}
2024-05-19 09:26:53 +02:00
func (e *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
if e.DataEntries[partIdx] == nil {
2024-05-13 22:40:25 +02:00
e.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
2024-05-13 06:59:42 +02:00
}
return e.DataEntries[partIdx]
}
2024-05-13 22:40:25 +02:00
func (dce *DataCacheEntry) clonePart() *DataCacheEntry {
rtn := makeDataCacheEntry(dce.PartIdx)
copy(rtn.Data, dce.Data)
if dce.Dirty.Load() {
rtn.Dirty.Store(true)
}
return rtn
}
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
if dce.Flushing.Load() {
dce = dce.clonePart()
}
2024-05-13 09:02:32 +02:00
leftInPart := partDataSize - offset
2024-05-13 06:59:42 +02:00
toWrite := int64(len(data))
if toWrite > leftInPart {
toWrite = leftInPart
}
if int64(len(dce.Data)) < offset+toWrite {
dce.Data = dce.Data[:offset+toWrite]
}
copy(dce.Data[offset:], data[:toWrite])
dce.Dirty.Store(true)
2024-05-13 22:40:25 +02:00
return toWrite, dce
2024-05-13 06:59:42 +02:00
}
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
2024-05-19 09:26:53 +02:00
endWriteOffset := offset + int64(len(data))
if replace {
2024-05-19 09:26:53 +02:00
entry.DataEntries = make(map[int]*DataCacheEntry)
}
2024-05-13 06:59:42 +02:00
for len(data) > 0 {
2024-05-13 09:02:32 +02:00
partIdx := int(offset / partDataSize)
2024-05-13 06:59:42 +02:00
if entry.FileEntry.File.Opts.Circular {
2024-05-13 09:02:32 +02:00
maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize)
2024-05-13 06:59:42 +02:00
partIdx = partIdx % maxPart
}
2024-05-13 09:02:32 +02:00
partOffset := offset % partDataSize
2024-05-19 09:26:53 +02:00
partData := entry.getOrCreateDataCacheEntry(partIdx)
2024-05-13 22:40:25 +02:00
nw, newDce := partData.writeToPart(partOffset, data)
entry.DataEntries[partIdx] = newDce
2024-05-13 06:59:42 +02:00
data = data[nw:]
offset += nw
}
2024-05-19 09:26:53 +02:00
entry.modifyFileData(func(file *BlockFile) {
if endWriteOffset > file.Size || replace {
file.Size = endWriteOffset
}
})
2024-05-13 06:59:42 +02:00
}
type BlockStore struct {
Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry
NextIntentionId int
2024-05-19 21:42:05 +02:00
IsFlushing bool
2024-05-13 06:59:42 +02:00
}
2024-05-18 21:31:54 +02:00
func makeCacheEntry(blockId string, name string) *CacheEntry {
return &CacheEntry{
BlockId: blockId,
Name: name,
PinCount: 0,
WriteIntentions: make(map[int]WriteIntention),
2024-05-18 21:31:54 +02:00
FileEntry: nil,
2024-05-19 09:26:53 +02:00
DataEntries: make(map[int]*DataCacheEntry),
2024-05-19 21:22:55 +02:00
FlushErrors: 0,
2024-05-18 21:31:54 +02:00
}
}
2024-05-13 06:59:42 +02:00
func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
if shouldCreate {
2024-05-18 21:31:54 +02:00
entry = makeCacheEntry(blockId, name)
2024-05-13 06:59:42 +02:00
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
}
}
f(entry)
}
func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil || entry.Deleted || entry.FileEntry == nil {
return fmt.Errorf("file not found")
}
return f(entry)
}
func (s *BlockStore) pinCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
2024-05-18 21:31:54 +02:00
entry = makeCacheEntry(blockId, name)
2024-05-13 06:59:42 +02:00
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
}
entry.PinCount++
}
func (s *BlockStore) setWriteIntention(blockId string, name string, intention WriteIntention) int {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return 0
}
intentionId := s.NextIntentionId
s.NextIntentionId++
entry.WriteIntentions[intentionId] = intention
return intentionId
}
func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionId int) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
2024-05-19 09:26:53 +02:00
warningCount.Add(1)
log.Printf("warning: cannot find write intention to clear %q %q", blockId, name)
return
}
delete(entry.WriteIntentions, intentionId)
}
2024-05-13 06:59:42 +02:00
func (s *BlockStore) unpinCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
2024-05-19 09:26:53 +02:00
warningCount.Add(1)
log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name)
2024-05-13 06:59:42 +02:00
return
}
entry.PinCount--
}
// getFileFromCache returns the file from the cache if it exists
// makes a copy, so it can be used by the caller
2024-05-13 06:59:42 +02:00
// return (file, cached)
func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return nil, false
}
if entry.Deleted {
return nil, true
}
if entry.FileEntry == nil {
return nil, false
}
return entry.FileEntry.File.DeepCopy(), true
}
func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) {
var fileEntry = e.FileEntry
if e.FileEntry.Flushing.Load() {
// must make a copy
fileEntry = &FileCacheEntry{
Dirty: &atomic.Bool{},
Flushing: &atomic.Bool{},
File: *e.FileEntry.File.DeepCopy(),
2024-05-13 06:59:42 +02:00
}
e.FileEntry = fileEntry
2024-05-13 06:59:42 +02:00
}
// always set to dirty (we're modifying it)
fileEntry.Dirty.Store(true)
2024-05-19 09:26:53 +02:00
fileEntry.File.ModTs = time.Now().UnixMilli()
fn(&fileEntry.File)
2024-05-13 06:59:42 +02:00
}
2024-05-13 22:40:25 +02:00
2024-05-19 21:22:55 +02:00
// also sets Flushing to true on fileentry / dataentries
func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*FileCacheEntry, []*DataCacheEntry) {
2024-05-13 22:40:25 +02:00
s.Lock.Lock()
defer s.Lock.Unlock()
2024-05-19 21:22:55 +02:00
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return nil, nil
}
2024-05-13 22:40:25 +02:00
if entry.Deleted || entry.FileEntry == nil {
return nil, nil
}
var dirtyData []*DataCacheEntry
for _, dce := range entry.DataEntries {
if dce != nil && dce.Dirty.Load() {
2024-05-19 21:22:55 +02:00
dce.Flushing.Store(true)
2024-05-13 22:40:25 +02:00
dirtyData = append(dirtyData, dce)
}
}
if !entry.FileEntry.Dirty.Load() && len(dirtyData) == 0 {
return nil, nil
}
for _, data := range dirtyData {
data.Flushing.Store(true)
}
2024-05-19 21:36:25 +02:00
entry.FileEntry.Flushing.Store(true)
2024-05-13 22:40:25 +02:00
return entry.FileEntry, dirtyData
}
2024-05-19 09:26:53 +02:00
func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool {
if entry.FileEntry == nil {
warningCount.Add(1)
log.Printf("warning: checking pinned, but no FileEntry %q %q", entry.BlockId, entry.Name)
return false
}
lastIncomplete := entry.FileEntry.File.getLastIncompletePartNum()
for _, intention := range entry.WriteIntentions {
if intention.Append && partIdx == lastIncomplete {
return true
}
if intention.Parts[partIdx] > 0 {
return true
}
// note "replace" does not pin anything
}
return false
}
// removes clean data entries (if they aren't pinned)
// and if the entire cache entry is not in use and is clean, removes the entry
func (s *BlockStore) cleanCacheEntry(blockId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
if entry == nil {
return
}
var hasDirtyData bool
for _, dce := range entry.DataEntries {
if dce.Flushing.Load() || dce.Dirty.Load() || entry.isDataBlockPinned(dce.PartIdx) {
hasDirtyData = true
continue
}
delete(entry.DataEntries, dce.PartIdx)
}
if hasDirtyData || (entry.FileEntry != nil && entry.FileEntry.Dirty.Load()) {
return
}
if entry.PinCount > 0 {
return
}
if len(entry.WriteIntentions) > 0 {
return
}
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
}