mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-06 19:18:22 +01:00
348 lines
8.9 KiB
Go
348 lines
8.9 KiB
Go
// Copyright 2024, Command Line Inc.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package filestore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io/fs"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type cacheKey struct {
|
|
ZoneId string
|
|
Name string
|
|
}
|
|
|
|
type FileStore struct {
|
|
Lock *sync.Mutex
|
|
Cache map[cacheKey]*CacheEntry
|
|
IsFlushing bool
|
|
}
|
|
|
|
type DataCacheEntry struct {
|
|
PartIdx int
|
|
Data []byte // capacity is always ZoneDataPartSize
|
|
}
|
|
|
|
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
|
|
type CacheEntry struct {
|
|
PinCount int // this is synchronzed with the FileStore lock (not the entry lock)
|
|
|
|
Lock *sync.Mutex
|
|
ZoneId string
|
|
Name string
|
|
File *WaveFile
|
|
DataEntries map[int]*DataCacheEntry
|
|
FlushErrors int
|
|
}
|
|
|
|
//lint:ignore U1000 used for testing
|
|
func (e *CacheEntry) dump() string {
|
|
var buf bytes.Buffer
|
|
fmt.Fprintf(&buf, "CacheEntry [ZoneId: %q, Name: %q] PinCount: %d\n", e.ZoneId, e.Name, e.PinCount)
|
|
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
|
|
for idx, dce := range e.DataEntries {
|
|
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
|
|
return &DataCacheEntry{
|
|
PartIdx: partIdx,
|
|
Data: make([]byte, 0, partDataSize),
|
|
}
|
|
}
|
|
|
|
// will create new entries
|
|
func (s *FileStore) getEntryAndPin(zoneId string, name string) *CacheEntry {
|
|
s.Lock.Lock()
|
|
defer s.Lock.Unlock()
|
|
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
|
|
if entry == nil {
|
|
entry = makeCacheEntry(zoneId, name)
|
|
s.Cache[cacheKey{ZoneId: zoneId, Name: name}] = entry
|
|
}
|
|
entry.PinCount++
|
|
return entry
|
|
}
|
|
|
|
func (s *FileStore) unpinEntryAndTryDelete(zoneId string, name string) {
|
|
s.Lock.Lock()
|
|
defer s.Lock.Unlock()
|
|
entry := s.Cache[cacheKey{ZoneId: zoneId, Name: name}]
|
|
if entry == nil {
|
|
return
|
|
}
|
|
entry.PinCount--
|
|
if entry.PinCount <= 0 && entry.File == nil {
|
|
delete(s.Cache, cacheKey{ZoneId: zoneId, Name: name})
|
|
}
|
|
}
|
|
|
|
func (entry *CacheEntry) clear() {
|
|
entry.File = nil
|
|
entry.DataEntries = make(map[int]*DataCacheEntry)
|
|
entry.FlushErrors = 0
|
|
}
|
|
|
|
func (entry *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
|
|
if entry.DataEntries[partIdx] == nil {
|
|
entry.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
|
|
}
|
|
return entry.DataEntries[partIdx]
|
|
}
|
|
|
|
// returns err if file does not exist
|
|
func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
|
|
if entry.File != nil {
|
|
return nil
|
|
}
|
|
file, err := entry.loadFileForRead(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
entry.File = file
|
|
return nil
|
|
}
|
|
|
|
// does not populate the cache entry, returns err if file does not exist
|
|
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*WaveFile, error) {
|
|
if entry.File != nil {
|
|
return entry.File, nil
|
|
}
|
|
file, err := dbGetZoneFile(ctx, entry.ZoneId, entry.Name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting file: %w", err)
|
|
}
|
|
if file == nil {
|
|
return nil, fs.ErrNotExist
|
|
}
|
|
return file, nil
|
|
}
|
|
|
|
func withLock(s *FileStore, zoneId string, name string, fn func(*CacheEntry) error) error {
|
|
entry := s.getEntryAndPin(zoneId, name)
|
|
defer s.unpinEntryAndTryDelete(zoneId, name)
|
|
entry.Lock.Lock()
|
|
defer entry.Lock.Unlock()
|
|
return fn(entry)
|
|
}
|
|
|
|
func withLockRtn[T any](s *FileStore, zoneId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
|
|
var rtnVal T
|
|
rtnErr := withLock(s, zoneId, name, func(entry *CacheEntry) error {
|
|
var err error
|
|
rtnVal, err = fn(entry)
|
|
return err
|
|
})
|
|
return rtnVal, rtnErr
|
|
}
|
|
|
|
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
|
|
leftInPart := partDataSize - offset
|
|
toWrite := int64(len(data))
|
|
if toWrite > leftInPart {
|
|
toWrite = leftInPart
|
|
}
|
|
if int64(len(dce.Data)) < offset+toWrite {
|
|
dce.Data = dce.Data[:offset+toWrite]
|
|
}
|
|
copy(dce.Data[offset:], data[:toWrite])
|
|
return toWrite, dce
|
|
}
|
|
|
|
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
|
if replace {
|
|
entry.File.Size = 0
|
|
}
|
|
if entry.File.Opts.Circular {
|
|
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
|
|
if offset+int64(len(data)) <= startCirFileOffset {
|
|
// write is before the start of the circular file
|
|
return
|
|
}
|
|
if offset < startCirFileOffset {
|
|
// truncate data (from the front), update offset
|
|
truncateAmt := startCirFileOffset - offset
|
|
data = data[truncateAmt:]
|
|
offset += truncateAmt
|
|
}
|
|
if int64(len(data)) > entry.File.Opts.MaxSize {
|
|
// truncate data (from the front), update offset
|
|
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
|
|
data = data[truncateAmt:]
|
|
offset += truncateAmt
|
|
}
|
|
}
|
|
endWriteOffset := offset + int64(len(data))
|
|
if replace {
|
|
entry.DataEntries = make(map[int]*DataCacheEntry)
|
|
}
|
|
for len(data) > 0 {
|
|
partIdx := int(offset / partDataSize)
|
|
if entry.File.Opts.Circular {
|
|
maxPart := int(entry.File.Opts.MaxSize / partDataSize)
|
|
partIdx = partIdx % maxPart
|
|
}
|
|
partOffset := offset % partDataSize
|
|
partData := entry.getOrCreateDataCacheEntry(partIdx)
|
|
nw, newDce := partData.writeToPart(partOffset, data)
|
|
entry.DataEntries[partIdx] = newDce
|
|
data = data[nw:]
|
|
offset += nw
|
|
}
|
|
if endWriteOffset > entry.File.Size || replace {
|
|
entry.File.Size = endWriteOffset
|
|
}
|
|
entry.File.ModTs = time.Now().UnixMilli()
|
|
}
|
|
|
|
// returns (realOffset, data, error)
|
|
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) {
|
|
if offset < 0 {
|
|
return 0, nil, fmt.Errorf("offset cannot be negative")
|
|
}
|
|
file, err := entry.loadFileForRead(ctx)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
if readFull {
|
|
size = file.Size - offset
|
|
}
|
|
if offset+size > file.Size {
|
|
size = file.Size - offset
|
|
}
|
|
if file.Opts.Circular {
|
|
realDataOffset := int64(0)
|
|
if file.Size > file.Opts.MaxSize {
|
|
realDataOffset = file.Size - file.Opts.MaxSize
|
|
}
|
|
if offset < realDataOffset {
|
|
truncateAmt := realDataOffset - offset
|
|
offset += truncateAmt
|
|
size -= truncateAmt
|
|
}
|
|
}
|
|
partMap := file.computePartMap(offset, size)
|
|
dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap))
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
// combine the entries into a single byte slice
|
|
// note that we only want part of the first and last part depending on offset and size
|
|
rtnData := make([]byte, 0, size)
|
|
amtLeftToRead := size
|
|
curReadOffset := offset
|
|
for amtLeftToRead > 0 {
|
|
partIdx := file.partIdxAtOffset(curReadOffset)
|
|
partDataEntry := dataEntryMap[partIdx]
|
|
var partData []byte
|
|
if partDataEntry == nil {
|
|
partData = make([]byte, partDataSize)
|
|
} else {
|
|
partData = partDataEntry.Data[0:partDataSize]
|
|
}
|
|
partOffset := curReadOffset % partDataSize
|
|
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
|
|
rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...)
|
|
amtLeftToRead -= amtToRead
|
|
curReadOffset += amtToRead
|
|
}
|
|
return offset, rtnData, nil
|
|
}
|
|
|
|
func prunePartsWithCache(dataEntries map[int]*DataCacheEntry, parts []int) []int {
|
|
var rtn []int
|
|
for _, partIdx := range parts {
|
|
if dataEntries[partIdx] != nil {
|
|
continue
|
|
}
|
|
rtn = append(rtn, partIdx)
|
|
}
|
|
return rtn
|
|
}
|
|
|
|
func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int) error {
|
|
parts = prunePartsWithCache(entry.DataEntries, parts)
|
|
if len(parts) == 0 {
|
|
// parts are already loaded
|
|
return nil
|
|
}
|
|
dbDataParts, err := dbGetFileParts(ctx, entry.ZoneId, entry.Name, parts)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting data parts: %w", err)
|
|
}
|
|
for partIdx, dce := range dbDataParts {
|
|
entry.DataEntries[partIdx] = dce
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) (map[int]*DataCacheEntry, error) {
|
|
if len(parts) == 0 {
|
|
return nil, nil
|
|
}
|
|
dbParts := prunePartsWithCache(entry.DataEntries, parts)
|
|
var dbDataParts map[int]*DataCacheEntry
|
|
if len(dbParts) > 0 {
|
|
var err error
|
|
dbDataParts, err = dbGetFileParts(ctx, entry.ZoneId, entry.Name, dbParts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting data parts: %w", err)
|
|
}
|
|
}
|
|
rtn := make(map[int]*DataCacheEntry)
|
|
for _, partIdx := range parts {
|
|
if entry.DataEntries[partIdx] != nil {
|
|
rtn[partIdx] = entry.DataEntries[partIdx]
|
|
continue
|
|
}
|
|
if dbDataParts[partIdx] != nil {
|
|
rtn[partIdx] = dbDataParts[partIdx]
|
|
continue
|
|
}
|
|
// part not found
|
|
}
|
|
return rtn, nil
|
|
}
|
|
|
|
func makeCacheEntry(zoneId string, name string) *CacheEntry {
|
|
return &CacheEntry{
|
|
Lock: &sync.Mutex{},
|
|
ZoneId: zoneId,
|
|
Name: name,
|
|
PinCount: 0,
|
|
File: nil,
|
|
DataEntries: make(map[int]*DataCacheEntry),
|
|
FlushErrors: 0,
|
|
}
|
|
}
|
|
|
|
func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error {
|
|
if entry.File == nil {
|
|
return nil
|
|
}
|
|
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace)
|
|
if ctx.Err() != nil {
|
|
// transient error
|
|
return ctx.Err()
|
|
}
|
|
if err != nil {
|
|
flushErrorCount.Add(1)
|
|
entry.FlushErrors++
|
|
if entry.FlushErrors > 3 {
|
|
entry.clear()
|
|
return fmt.Errorf("too many flush errors (clearing entry): %w", err)
|
|
}
|
|
return err
|
|
}
|
|
// clear cache entry (data is now in db)
|
|
entry.clear()
|
|
return nil
|
|
}
|