waveterm/pkg/filestore/blockstore_cache.go
2024-12-19 10:35:50 -08:00

351 lines
9.0 KiB
Go

// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package filestore
import (
"bytes"
"context"
"fmt"
"io/fs"
"sync"
"time"
)
type cacheKey struct {
ZoneId string
Name string
}
type FileStore struct {
Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry
IsFlushing bool
}
type DataCacheEntry struct {
PartIdx int
Data []byte // capacity is always ZoneDataPartSize
}
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
type CacheEntry struct {
PinCount int // this is synchronzed with the FileStore lock (not the entry lock)
Lock *sync.Mutex
ZoneId string
Name string
File *WaveFile
DataEntries map[int]*DataCacheEntry
FlushErrors int
}
//lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount)
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
}
return buf.String()
}
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
return &DataCacheEntry{
PartIdx: partIdx,
Data: make([]byte, 0, partDataSize),
}
}
// will create new entries
func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil {
entry = makeCacheEntry(zoneId, name)
s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry
}
entry.PinCount++
return entry
}
func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) {
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
if entry == nil {
return
}
entry.PinCount--
if entry.PinCount <= 0 && entry.File == nil {
delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name})
}
}
func (entry *CacheEntry) clear() {
entry.File = nil
entry.DataEntries = make(map[int]*DataCacheEntry)
entry.FlushErrors = 0
}
func (entry *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
if entry.DataEntries[partIdx] == nil {
entry.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
}
return entry.DataEntries[partIdx]
}
// returns err if file does not exist
func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
if entry.File != nil {
return nil
}
file, err := entry.loadFileForRead(ctx)
if err != nil {
return err
}
entry.File = file
return nil
}
// does not populate the cache entry, returns err if file does not exist
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) {
if entry.File != nil {
return entry.File, nil
}
file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name)
if err != nil {
return nil, fmt.Errorf("error getting file: %w", err)
}
if file == nil {
return nil, fs.ErrNotExist
}
return file, nil
}
func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error {
entry := s.getEntryAndPin(zoneId, name)
defer s.unpinEntryAndTryDelete(zoneId, name)
entry.Lock.Lock()
defer entry.Lock.Unlock()
return fn(entry)
}
func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
var rtnVal T
rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error {
var err error
rtnVal, err = fn(entry)
return err
})
return rtnVal, rtnErr
}
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
leftInPart := partDataSize - offset
toWrite := int64(len(data))
if toWrite > leftInPart {
toWrite = leftInPart
}
if int64(len(dce.Data)) < offset+toWrite {
dce.Data = dce.Data[:offset+toWrite]
}
copy(dce.Data[offset:], data[:toWrite])
return toWrite, dce
}
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
if replace {
entry.File.Size = 0
}
if entry.File.Opts.Circular {
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
if offset+int64(len(data)) <= startCirFileOffset {
// write is before the start of the circular file
return
}
if offset < startCirFileOffset {
// truncate data (from the front), update offset
truncateAmt := startCirFileOffset - offset
data = data[truncateAmt:]
offset += truncateAmt
}
if int64(len(data)) > entry.File.Opts.MaxSize {
// truncate data (from the front), update offset
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
data = data[truncateAmt:]
offset += truncateAmt
}
}
endWriteOffset := offset + int64(len(data))
if replace {
entry.DataEntries = make(map[int]*DataCacheEntry)
}
for len(data) > 0 {
partIdx := int(offset / partDataSize)
if entry.File.Opts.Circular {
maxPart := int(entry.File.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart
}
partOffset := offset % partDataSize
partData := entry.getOrCreateDataCacheEntry(partIdx)
nw, newDce := partData.writeToPart(partOffset, data)
entry.DataEntries[partIdx] = newDce
data = data[nw:]
offset += nw
}
if endWriteOffset > entry.File.Size || replace {
entry.File.Size = endWriteOffset
}
entry.File.ModTs = time.Now().UnixMilli()
}
// returns (realOffset, data, error)
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) {
if offset < 0 {
return 0, nil, fmt.Errorf("offset cannot be negative")
}
file, err := entry.loadFileForRead(ctx)
if err != nil {
return 0, nil, err
}
if readFull {
size = file.Size - offset
}
if offset+size > file.Size {
size = file.Size - offset
}
if file.Opts.Circular {
realDataOffset := int64(0)
if file.Size > file.Opts.MaxSize {
realDataOffset = file.Size - file.Opts.MaxSize
}
if offset < realDataOffset {
truncateAmt := realDataOffset - offset
offset += truncateAmt
size -= truncateAmt
}
if size <= 0 {
return realDataOffset, nil, nil
}
}
partMap := file.computePartMap(offset, size)
dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap))
if err != nil {
return 0, nil, err
}
// combine the entries into a single byte slice
// note that we only want part of the first and last part depending on offset and size
rtnData := make([]byte, 0, size)
amtLeftToRead := size
curReadOffset := offset
for amtLeftToRead > 0 {
partIdx := file.partIdxAtOffset(curReadOffset)
partDataEntry := dataEntryMap[partIdx]
var partData []byte
if partDataEntry == nil {
partData = make([]byte, partDataSize)
} else {
partData = partDataEntry.Data[0:partDataSize]
}
partOffset := curReadOffset % partDataSize
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...)
amtLeftToRead -= amtToRead
curReadOffset += amtToRead
}
return offset, rtnData, nil
}
func prunePartsWithCache(dataEntries map[int]*DataCacheEntry, parts []int) []int {
var rtn []int
for _, partIdx := range parts {
if dataEntries[partIdx] != nil {
continue
}
rtn = append(rtn, partIdx)
}
return rtn
}
func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int) error {
parts = prunePartsWithCache(entry.DataEntries, parts)
if len(parts) == 0 {
// parts are already loaded
return nil
}
dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts)
if err != nil {
return fmt.Errorf("error getting data parts: %w", err)
}
for partIdx, dce := range dbDataParts {
entry.DataEntries[partIdx] = dce
}
return nil
}
func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
return nil, nil
}
dbParts := prunePartsWithCache(entry.DataEntries, parts)
var dbDataParts map[int]*DataCacheEntry
if len(dbParts) > 0 {
var err error
dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts)
if err != nil {
return nil, fmt.Errorf("error getting data parts: %w", err)
}
}
rtn := make(map[int]*DataCacheEntry)
for _, partIdx := range parts {
if entry.DataEntries[partIdx] != nil {
rtn[partIdx] = entry.DataEntries[partIdx]
continue
}
if dbDataParts[partIdx] != nil {
rtn[partIdx] = dbDataParts[partIdx]
continue
}
// part not found
}
return rtn, nil
}
func makeCacheEntry(zoneId string, name string) *CacheEntry {
return &CacheEntry{
Lock: &sync.Mutex{},
ZoneId: zoneId,
Name: name,
PinCount: 0,
File: nil,
DataEntries: make(map[int]*DataCacheEntry),
FlushErrors: 0,
}
}
func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error {
if entry.File == nil {
return nil
}
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace)
if ctx.Err() != nil {
// transient error
return ctx.Err()
}
if err != nil {
flushErrorCount.Add(1)
entry.FlushErrors++
if entry.FlushErrors > 3 {
entry.clear()
return fmt.Errorf("too many flush errors (clearing entry): %w", err)
}
return err
}
// clear cache entry (data is now in db)
entry.clear()
return nil
}