waveterm/pkg/filestore/blockstore.go

436 lines
11 KiB
Go
Raw Normal View History

// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package filestore
// the blockstore package implements a write cache for wave files
2024-05-13 06:59:42 +02:00
// it is not a read cache (reads still go to the DB -- unless items are in the cache)
// but all writes only go to the cache, and then the cache is periodically flushed to the DB
import (
2024-05-13 06:59:42 +02:00
"context"
"fmt"
2024-05-20 08:27:21 +02:00
"io/fs"
2024-05-29 03:27:38 +02:00
"log"
"runtime/debug"
2024-05-13 06:59:42 +02:00
"sync"
2024-05-13 22:40:25 +02:00
"sync/atomic"
2024-05-13 06:59:42 +02:00
"time"
)
2024-05-13 09:02:32 +02:00
const DefaultPartDataSize = 64 * 1024
2024-05-13 06:59:42 +02:00
const DefaultFlushTime = 5 * time.Second
const NoPartIdx = -1
2024-05-19 21:22:55 +02:00
// for unit tests
2024-05-19 09:26:53 +02:00
var warningCount = &atomic.Int32{}
2024-05-19 21:22:55 +02:00
var flushErrorCount = &atomic.Int32{}
2024-05-13 09:02:32 +02:00
var partDataSize int64 = DefaultPartDataSize // overridden in tests
2024-05-13 22:40:25 +02:00
var stopFlush = &atomic.Bool{}
2024-05-13 09:02:32 +02:00
var WFS *FileStore = &FileStore{
2024-05-20 07:40:27 +02:00
Lock: &sync.Mutex{},
Cache: make(map[cacheKey]*CacheEntry),
2024-05-13 06:59:42 +02:00
}
type FileOptsType struct {
MaxSize int64 `json:"maxsize,omitempty"`
Circular bool `json:"circular,omitempty"`
IJson bool `json:"ijson,omitempty"`
}
2024-05-13 06:59:42 +02:00
type FileMeta = map[string]any
type WaveFile struct {
// these fields are static (not updated)
ZoneId string `json:"zoneid"`
2024-05-13 06:59:42 +02:00
Name string `json:"name"`
Opts FileOptsType `json:"opts"`
CreatedTs int64 `json:"createdts"`
// these fields are mutable
Size int64 `json:"size"`
ModTs int64 `json:"modts"`
Meta FileMeta `json:"meta"` // only top-level keys can be updated (lower levels are immutable)
2024-05-13 06:59:42 +02:00
}
// for regular files this is just Size
// for circular files this is min(Size, MaxSize)
func (f WaveFile) DataLength() int64 {
if f.Opts.Circular {
return minInt64(f.Size, f.Opts.MaxSize)
}
return f.Size
}
// for regular files this is just 0
// for circular files this is the index of the first byte of data we have
func (f WaveFile) DataStartIdx() int64 {
if f.Opts.Circular && f.Size > f.Opts.MaxSize {
return f.Size - f.Opts.MaxSize
}
return 0
}
// this works because lower levels are immutable
2024-05-13 06:59:42 +02:00
func copyMeta(meta FileMeta) FileMeta {
newMeta := make(FileMeta)
for k, v := range meta {
newMeta[k] = v
}
return newMeta
}
func (f *WaveFile) DeepCopy() *WaveFile {
2024-05-13 06:59:42 +02:00
if f == nil {
return nil
}
newFile := *f
newFile.Meta = copyMeta(f.Meta)
return &newFile
}
func (WaveFile) UseDBMap() {}
2024-05-13 06:59:42 +02:00
type FileData struct {
ZoneId string `json:"zoneid"`
2024-05-13 06:59:42 +02:00
Name string `json:"name"`
PartIdx int `json:"partidx"`
Data []byte `json:"data"`
}
func (FileData) UseDBMap() {}
2024-05-13 06:59:42 +02:00
// synchronous (does not interact with the cache)
func (s *FileStore) MakeFile(ctx context.Context, zoneId string, name string, meta FileMeta, opts FileOptsType) error {
2024-05-13 06:59:42 +02:00
if opts.MaxSize < 0 {
return fmt.Errorf("max size must be non-negative")
}
if opts.Circular && opts.MaxSize <= 0 {
return fmt.Errorf("circular file must have a max size")
}
if opts.Circular && opts.IJson {
return fmt.Errorf("circular file cannot be ijson")
}
2024-05-13 09:02:32 +02:00
if opts.Circular {
if opts.MaxSize%partDataSize != 0 {
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
}
}
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
if entry.File != nil {
2024-05-20 08:27:21 +02:00
return fs.ErrExist
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
now := time.Now().UnixMilli()
file := &WaveFile{
ZoneId: zoneId,
2024-05-20 07:40:27 +02:00
Name: name,
Size: 0,
CreatedTs: now,
ModTs: now,
Opts: opts,
Meta: meta,
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
return dbInsertFile(ctx, file)
2024-05-18 21:31:54 +02:00
})
2024-05-13 06:59:42 +02:00
}
func (s *FileStore) DeleteFile(ctx context.Context, zoneId string, name string) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
err := dbDeleteFile(ctx, zoneId, name)
2024-05-20 07:40:27 +02:00
if err != nil {
return fmt.Errorf("error deleting file: %v", err)
2024-05-18 21:31:54 +02:00
}
2024-05-20 07:40:27 +02:00
entry.clear()
return nil
2024-05-13 06:59:42 +02:00
})
}
func (s *FileStore) DeleteZone(ctx context.Context, zoneId string) error {
fileNames, err := dbGetZoneFileNames(ctx, zoneId)
2024-05-13 06:59:42 +02:00
if err != nil {
return fmt.Errorf("error getting zone files: %v", err)
2024-05-13 06:59:42 +02:00
}
for _, name := range fileNames {
s.DeleteFile(ctx, zoneId, name)
2024-05-13 06:59:42 +02:00
}
return nil
}
2024-05-20 08:27:21 +02:00
// if file doesn't exsit, returns fs.ErrNotExist
func (s *FileStore) Stat(ctx context.Context, zoneId string, name string) (*WaveFile, error) {
return withLockRtn(s, zoneId, name, func(entry *CacheEntry) (*WaveFile, error) {
2024-05-20 07:40:27 +02:00
file, err := entry.loadFileForRead(ctx)
if err != nil {
return nil, fmt.Errorf("error getting file: %v", err)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
return file.DeepCopy(), nil
})
}
func (s *FileStore) ListFiles(ctx context.Context, zoneId string) ([]*WaveFile, error) {
files, err := dbGetZoneFiles(ctx, zoneId)
if err != nil {
return nil, fmt.Errorf("error getting zone files: %v", err)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
for idx, file := range files {
withLock(s, file.ZoneId, file.Name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
if entry.File != nil {
files[idx] = entry.File.DeepCopy()
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
return nil
})
}
2024-05-13 06:59:42 +02:00
return files, nil
}
func (s *FileStore) WriteMeta(ctx context.Context, zoneId string, name string, meta FileMeta, merge bool) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
}
if merge {
for k, v := range meta {
if v == nil {
delete(entry.File.Meta, k)
continue
2024-05-13 09:12:55 +02:00
}
2024-05-20 07:40:27 +02:00
entry.File.Meta[k] = v
2024-05-13 09:12:55 +02:00
}
2024-05-20 07:40:27 +02:00
} else {
entry.File.Meta = meta
}
entry.File.ModTs = time.Now().UnixMilli()
return nil
2024-05-13 06:59:42 +02:00
})
}
func (s *FileStore) WriteFile(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
entry.writeAt(0, data, true)
2024-05-20 08:27:21 +02:00
// since WriteFile can *truncate* the file, we need to flush the file to the DB immediately
return entry.flushToDB(ctx, true)
2024-05-20 07:40:27 +02:00
})
}
func (s *FileStore) WriteAt(ctx context.Context, zoneId string, name string, offset int64, data []byte) error {
2024-05-20 07:40:27 +02:00
if offset < 0 {
return fmt.Errorf("offset must be non-negative")
2024-05-13 06:59:42 +02:00
}
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
file := entry.File
if offset > file.Size {
return fmt.Errorf("offset is past the end of the file")
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
partMap := file.computePartMap(offset, int64(len(data)))
incompleteParts := incompletePartsFromMap(partMap)
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
if err != nil {
return err
}
2024-05-20 08:48:08 +02:00
entry.writeAt(offset, data, false)
2024-05-20 07:40:27 +02:00
return nil
2024-05-13 06:59:42 +02:00
})
}
func (s *FileStore) AppendData(ctx context.Context, zoneId string, name string, data []byte) error {
return withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
err := entry.loadFileIntoCache(ctx)
if err != nil {
return err
}
2024-05-20 08:48:08 +02:00
partMap := entry.File.computePartMap(entry.File.Size, int64(len(data)))
incompleteParts := incompletePartsFromMap(partMap)
if len(incompleteParts) > 0 {
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
2024-05-20 07:40:27 +02:00
if err != nil {
return err
}
}
entry.writeAt(entry.File.Size, data, false)
2024-05-13 06:59:42 +02:00
return nil
})
}
func (s *FileStore) GetAllZoneIds(ctx context.Context) ([]string, error) {
return dbGetAllZoneIds(ctx)
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
// returns (offset, data, error)
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
func (s *FileStore) ReadAt(ctx context.Context, zoneId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
2024-05-13 06:59:42 +02:00
return nil
})
2024-05-20 07:40:27 +02:00
return
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
// returns (offset, data, error)
func (s *FileStore) ReadFile(ctx context.Context, zoneId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
withLock(s, zoneId, name, func(entry *CacheEntry) error {
2024-05-20 07:40:27 +02:00
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true)
2024-05-13 06:59:42 +02:00
return nil
})
2024-05-20 07:40:27 +02:00
return
2024-05-13 06:59:42 +02:00
}
2024-05-29 03:27:38 +02:00
type FlushStats struct {
FlushDuration time.Duration
NumDirtyEntries int
NumCommitted int
}
func (s *FileStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
2024-05-20 07:40:27 +02:00
wasFlushing := s.setUnlessFlushing()
if wasFlushing {
2024-05-29 03:27:38 +02:00
return stats, fmt.Errorf("flush already in progress")
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
defer s.setIsFlushing(false)
2024-05-29 03:27:38 +02:00
startTime := time.Now()
defer func() {
stats.FlushDuration = time.Since(startTime)
}()
2024-05-20 07:40:27 +02:00
// get a copy of dirty keys so we can iterate without the lock
dirtyCacheKeys := s.getDirtyCacheKeys()
2024-05-29 03:27:38 +02:00
stats.NumDirtyEntries = len(dirtyCacheKeys)
2024-05-20 07:40:27 +02:00
for _, key := range dirtyCacheKeys {
err := withLock(s, key.ZoneId, key.Name, func(entry *CacheEntry) error {
2024-05-20 08:27:21 +02:00
return entry.flushToDB(ctx, false)
2024-05-20 07:40:27 +02:00
})
if ctx.Err() != nil {
// transient error (also must stop the loop)
2024-05-29 03:27:38 +02:00
return stats, ctx.Err()
2024-05-20 07:40:27 +02:00
}
if err != nil {
2024-05-29 03:27:38 +02:00
return stats, fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
2024-05-20 07:40:27 +02:00
}
2024-05-29 03:27:38 +02:00
stats.NumCommitted++
2024-05-13 06:59:42 +02:00
}
2024-05-29 03:27:38 +02:00
return stats, nil
2024-05-13 06:59:42 +02:00
}
2024-05-20 07:40:27 +02:00
///////////////////////////////////
func (f *WaveFile) partIdxAtOffset(offset int64) int {
2024-05-20 07:40:27 +02:00
partIdx := int(offset / partDataSize)
if f.Opts.Circular {
maxPart := int(f.Opts.MaxSize / partDataSize)
partIdx = partIdx % maxPart
}
return partIdx
2024-05-13 06:59:42 +02:00
}
func incompletePartsFromMap(partMap map[int]int) []int {
var incompleteParts []int
for partIdx, size := range partMap {
if size != int(partDataSize) {
incompleteParts = append(incompleteParts, partIdx)
}
}
return incompleteParts
}
2024-05-20 07:40:27 +02:00
func getPartIdxsFromMap(partMap map[int]int) []int {
var partIdxs []int
for partIdx := range partMap {
partIdxs = append(partIdxs, partIdx)
}
return partIdxs
}
2024-05-18 21:31:54 +02:00
// returns a map of partIdx to amount of data to write to that part
func (file *WaveFile) computePartMap(startOffset int64, size int64) map[int]int {
2024-05-18 21:31:54 +02:00
partMap := make(map[int]int)
endOffset := startOffset + size
startFileOffset := startOffset - (startOffset % partDataSize)
for testOffset := startFileOffset; testOffset < endOffset; testOffset += partDataSize {
2024-05-18 21:31:54 +02:00
partIdx := file.partIdxAtOffset(testOffset)
partStartOffset := testOffset
partEndOffset := testOffset + partDataSize
partWriteStartOffset := 0
partWriteEndOffset := int(partDataSize)
if startOffset > partStartOffset && startOffset < partEndOffset {
partWriteStartOffset = int(startOffset - partStartOffset)
}
if endOffset > partStartOffset && endOffset < partEndOffset {
partWriteEndOffset = int(endOffset - partStartOffset)
}
partMap[partIdx] = partWriteEndOffset - partWriteStartOffset
}
return partMap
}
func (s *FileStore) getDirtyCacheKeys() []cacheKey {
s.Lock.Lock()
2024-05-19 09:26:53 +02:00
defer s.Lock.Unlock()
2024-05-20 07:40:27 +02:00
var dirtyCacheKeys []cacheKey
for key, entry := range s.Cache {
2024-05-20 07:40:27 +02:00
if entry.File != nil {
dirtyCacheKeys = append(dirtyCacheKeys, key)
}
2024-05-19 09:26:53 +02:00
}
return dirtyCacheKeys
}
func (s *FileStore) setIsFlushing(flushing bool) {
2024-05-19 21:42:05 +02:00
s.Lock.Lock()
defer s.Lock.Unlock()
s.IsFlushing = flushing
}
// returns old value of IsFlushing
func (s *FileStore) setUnlessFlushing() bool {
2024-05-19 21:42:05 +02:00
s.Lock.Lock()
defer s.Lock.Unlock()
if s.IsFlushing {
return true
}
s.IsFlushing = true
return false
2024-05-29 03:27:38 +02:00
}
func (s *FileStore) runFlushWithNewContext() (FlushStats, error) {
2024-05-29 03:27:38 +02:00
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
defer cancelFn()
return s.FlushCache(ctx)
}
2024-05-19 21:42:05 +02:00
func (s *FileStore) runFlusher() {
2024-05-29 03:27:38 +02:00
defer func() {
if r := recover(); r != nil {
log.Printf("panic in filestore flusher: %v\n", r)
2024-05-29 03:27:38 +02:00
debug.PrintStack()
}
}()
for {
stats, err := s.runFlushWithNewContext()
if err != nil || stats.NumDirtyEntries > 0 {
log.Printf("filestore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
2024-05-29 03:27:38 +02:00
}
if stopFlush.Load() {
log.Printf("filestore flusher stopping\n")
2024-05-29 03:27:38 +02:00
return
}
time.Sleep(DefaultFlushTime)
}
2024-05-19 21:42:05 +02:00
}
2024-05-13 06:59:42 +02:00
func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}