waveterm/pkg/filestore/blockstore_cache.go

348 lines
8.9 KiB
Go
Raw Permalink Normal View History

2024-05-13 06:59:42 +02:00
// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package filestore
2024-05-13 06:59:42 +02:00
import (
2024-05-20 08:27:21 +02:00
"bytes"
2024-05-20 07:40:27 +02:00
"context"
2024-05-13 06:59:42 +02:00
"fmt"
2024-05-20 08:27:21 +02:00
"io/fs"
2024-05-13 06:59:42 +02:00
"sync"
2024-05-19 09:26:53 +02:00
"time"
2024-05-13 06:59:42 +02:00
)
type cacheKey struct {
ZoneId string
Name string
2024-05-13 06:59:42 +02:00
}
type FileStore struct {
2024-05-20 07:40:27 +02:00
Lock *sync.Mutex
Cache map[cacheKey]*CacheEntry
IsFlushing bool
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
type DataCacheEntry struct {
PartIdx int
Data []byte // capacity is always ZoneDataPartSize
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
2024-05-13 06:59:42 +02:00
type CacheEntry struct {
PinCount int // this is synchronzed with the FileStore lock (not the entry lock)
2024-05-13 09:33:46 +02:00
2024-05-20 07:40:27 +02:00
Lock *sync.Mutex
ZoneId string
2024-05-20 07:40:27 +02:00
Name string
File *WaveFile
2024-05-20 07:40:27 +02:00
DataEntries map[int]*DataCacheEntry
FlushErrors int
2024-05-13 22:40:25 +02:00
}
2024-05-13 09:33:46 +02:00
2024-05-20 08:27:21 +02:00
//lint:ignore U1000 used for testing
func (e *CacheEntry) dump() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount)
2024-05-20 08:27:21 +02:00
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
for idx, dce := range e.DataEntries {
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
}
return buf.String()
}
2024-05-13 22:40:25 +02:00
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
return &DataCacheEntry{
2024-05-20 07:40:27 +02:00
PartIdx: partIdx,
Data: make([]byte, 0, partDataSize),
2024-05-13 22:40:25 +02:00
}
2024-05-13 09:33:46 +02:00
}
2024-05-20 07:40:27 +02:00
// will create new entries
func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry {
2024-05-13 09:08:50 +02:00
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
2024-05-20 07:40:27 +02:00
if entry == nil {
entry = makeCacheEntry(zoneId, name)
s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry
2024-05-20 07:40:27 +02:00
}
entry.PinCount++
return entry
2024-05-13 09:08:50 +02:00
}
func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) {
2024-05-13 09:08:50 +02:00
s.Lock.Lock()
defer s.Lock.Unlock()
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
2024-05-20 07:40:27 +02:00
if entry == nil {
return
}
entry.PinCount--
if entry.PinCount <= 0 && entry.File == nil {
delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name})
2024-05-20 07:40:27 +02:00
}
}
func (entry *CacheEntry) clear() {
entry.File = nil
entry.DataEntries = make(map[int]*DataCacheEntry)
entry.FlushErrors = 0
2024-05-13 09:08:50 +02:00
}
2024-05-20 07:40:27 +02:00
func (entry *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
if entry.DataEntries[partIdx] == nil {
entry.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
return entry.DataEntries[partIdx]
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
// returns err if file does not exist
func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
if entry.File != nil {
return nil
2024-05-13 22:40:25 +02:00
}
2024-05-20 07:40:27 +02:00
file, err := entry.loadFileForRead(ctx)
if err != nil {
return err
}
entry.File = file
return nil
2024-05-13 22:40:25 +02:00
}
2024-05-20 07:40:27 +02:00
// does not populate the cache entry, returns err if file does not exist
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) {
2024-05-20 07:40:27 +02:00
if entry.File != nil {
return entry.File, nil
}
file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name)
2024-05-20 07:40:27 +02:00
if err != nil {
return nil, fmt.Errorf("error getting file: %w", err)
2024-05-13 22:40:25 +02:00
}
2024-05-20 07:40:27 +02:00
if file == nil {
2024-05-20 08:27:21 +02:00
return nil, fs.ErrNotExist
2024-05-20 07:40:27 +02:00
}
return file, nil
}
func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error {
entry := s.getEntryAndPin(zoneId, name)
defer s.unpinEntryAndTryDelete(zoneId, name)
2024-05-20 07:40:27 +02:00
entry.Lock.Lock()
defer entry.Lock.Unlock()
return fn(entry)
}
func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
2024-05-20 07:40:27 +02:00
var rtnVal T
rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
var err error
rtnVal, err = fn(entry)
return err
})
return rtnVal, rtnErr
}
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
2024-05-13 09:02:32 +02:00
leftInPart := partDataSize - offset
2024-05-13 06:59:42 +02:00
toWrite := int64(len(data))
if toWrite > leftInPart {
toWrite = leftInPart
}
if int64(len(dce.Data)) < offset+toWrite {
dce.Data = dce.Data[:offset+toWrite]
}
copy(dce.Data[offset:], data[:toWrite])
2024-05-13 22:40:25 +02:00
return toWrite, dce
2024-05-13 06:59:42 +02:00
}
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
2024-05-20 08:27:21 +02:00
if replace {
entry.File.Size = 0
}
if entry.File.Opts.Circular {
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
2024-05-20 08:48:08 +02:00
if offset+int64(len(data)) <= startCirFileOffset {
2024-05-20 08:27:21 +02:00
// write is before the start of the circular file
return
}
if offset < startCirFileOffset {
// truncate data (from the front), update offset
truncateAmt := startCirFileOffset - offset
data = data[truncateAmt:]
offset += truncateAmt
}
if int64(len(data)) > entry.File.Opts.MaxSize {
// truncate data (from the front), update offset
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
data = data[truncateAmt:]
offset += truncateAmt
}
}
2024-05-19 09:26:53 +02:00
endWriteOffset := offset + int64(len(data))
if replace {
2024-05-19 09:26:53 +02:00
entry.DataEntries = make(map[int]*DataCacheEntry)
}
2024-05-13 06:59:42 +02:00
for len(data) > 0 {
2024-05-13 09:02:32 +02:00
partIdx := int(offset / partDataSize)
2024-05-20 07:40:27 +02:00
if entry.File.Opts.Circular {
maxPart := int(entry.File.Opts.MaxSize / partDataSize)
2024-05-13 06:59:42 +02:00
partIdx = partIdx % maxPart
}
2024-05-13 09:02:32 +02:00
partOffset := offset % partDataSize
2024-05-19 09:26:53 +02:00
partData := entry.getOrCreateDataCacheEntry(partIdx)
2024-05-13 22:40:25 +02:00
nw, newDce := partData.writeToPart(partOffset, data)
entry.DataEntries[partIdx] = newDce
2024-05-13 06:59:42 +02:00
data = data[nw:]
offset += nw
}
2024-05-20 07:40:27 +02:00
if endWriteOffset > entry.File.Size || replace {
entry.File.Size = endWriteOffset
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
entry.File.ModTs = time.Now().UnixMilli()
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
// returns (realOffset, data, error)
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) {
if offset < 0 {
return 0, nil, fmt.Errorf("offset cannot be negative")
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
file, err := entry.loadFileForRead(ctx)
if err != nil {
return 0, nil, err
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
if readFull {
size = file.Size - offset
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
if offset+size > file.Size {
size = file.Size - offset
}
2024-05-20 07:40:27 +02:00
if file.Opts.Circular {
realDataOffset := int64(0)
if file.Size > file.Opts.MaxSize {
realDataOffset = file.Size - file.Opts.MaxSize
}
if offset < realDataOffset {
2024-05-20 08:27:21 +02:00
truncateAmt := realDataOffset - offset
offset += truncateAmt
size -= truncateAmt
2024-05-20 07:40:27 +02:00
}
}
2024-05-20 08:27:21 +02:00
partMap := file.computePartMap(offset, size)
2024-05-20 07:40:27 +02:00
dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap))
if err != nil {
return 0, nil, err
}
// combine the entries into a single byte slice
// note that we only want part of the first and last part depending on offset and size
rtnData := make([]byte, 0, size)
amtLeftToRead := size
curReadOffset := offset
for amtLeftToRead > 0 {
partIdx := file.partIdxAtOffset(curReadOffset)
partDataEntry := dataEntryMap[partIdx]
var partData []byte
if partDataEntry == nil {
partData = make([]byte, partDataSize)
} else {
partData = partDataEntry.Data[0:partDataSize]
}
partOffset := curReadOffset % partDataSize
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...)
amtLeftToRead -= amtToRead
curReadOffset += amtToRead
}
return offset, rtnData, nil
}
2024-05-20 07:40:27 +02:00
func prunePartsWithCache(dataEntries map[int]*DataCacheEntry, parts []int) []int {
var rtn []int
for _, partIdx := range parts {
if dataEntries[partIdx] != nil {
continue
}
rtn = append(rtn, partIdx)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
return rtn
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int) error {
parts = prunePartsWithCache(entry.DataEntries, parts)
if len(parts) == 0 {
// parts are already loaded
return nil
2024-05-13 06:59:42 +02:00
}
dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts)
2024-05-20 07:40:27 +02:00
if err != nil {
return fmt.Errorf("error getting data parts: %w", err)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
for partIdx, dce := range dbDataParts {
entry.DataEntries[partIdx] = dce
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
return nil
2024-05-13 06:59:42 +02:00
}
2024-05-13 22:40:25 +02:00
2024-05-20 07:40:27 +02:00
func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) (map[int]*DataCacheEntry, error) {
if len(parts) == 0 {
2024-05-13 22:40:25 +02:00
return nil, nil
}
2024-05-20 07:40:27 +02:00
dbParts := prunePartsWithCache(entry.DataEntries, parts)
var dbDataParts map[int]*DataCacheEntry
if len(dbParts) > 0 {
var err error
dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts)
2024-05-20 07:40:27 +02:00
if err != nil {
return nil, fmt.Errorf("error getting data parts: %w", err)
2024-05-13 22:40:25 +02:00
}
}
2024-05-20 07:40:27 +02:00
rtn := make(map[int]*DataCacheEntry)
for _, partIdx := range parts {
if entry.DataEntries[partIdx] != nil {
rtn[partIdx] = entry.DataEntries[partIdx]
continue
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
if dbDataParts[partIdx] != nil {
rtn[partIdx] = dbDataParts[partIdx]
continue
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
// part not found
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
return rtn, nil
2024-05-19 09:26:53 +02:00
}
func makeCacheEntry(zoneId string, name string) *CacheEntry {
2024-05-20 07:40:27 +02:00
return &CacheEntry{
Lock: &sync.Mutex{},
ZoneId: zoneId,
2024-05-20 07:40:27 +02:00
Name: name,
PinCount: 0,
File: nil,
DataEntries: make(map[int]*DataCacheEntry),
FlushErrors: 0,
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
}
2024-05-20 08:27:21 +02:00
func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error {
2024-05-20 07:40:27 +02:00
if entry.File == nil {
return nil
}
2024-05-20 08:27:21 +02:00
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace)
2024-05-20 07:40:27 +02:00
if ctx.Err() != nil {
// transient error
return ctx.Err()
}
if err != nil {
flushErrorCount.Add(1)
entry.FlushErrors++
if entry.FlushErrors > 3 {
entry.clear()
return fmt.Errorf("too many flush errors (clearing entry): %w", err)
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
return err
2024-05-19 09:26:53 +02:00
}
2024-05-20 07:40:27 +02:00
// clear cache entry (data is now in db)
entry.clear()
return nil
2024-05-19 09:26:53 +02:00
}