mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-08 19:38:51 +01:00
436 lines
12 KiB
Go
436 lines
12 KiB
Go
// Copyright 2024, Command Line Inc.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package blockstore
|
|
|
|
// the blockstore package implements a write cache for block files
|
|
// it is not a read cache (reads still go to the DB -- unless items are in the cache)
|
|
// but all writes only go to the cache, and then the cache is periodically flushed to the DB
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/fs"
|
|
"log"
|
|
"runtime/debug"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const DefaultPartDataSize = 64 * 1024
|
|
const DefaultFlushTime = 5 * time.Second
|
|
const NoPartIdx = -1
|
|
|
|
// for unit tests
|
|
var warningCount = &atomic.Int32{}
|
|
var flushErrorCount = &atomic.Int32{}
|
|
|
|
var partDataSize int64 = DefaultPartDataSize // overridden in tests
|
|
var stopFlush = &atomic.Bool{}
|
|
|
|
var GBS *BlockStore = &BlockStore{
|
|
Lock: &sync.Mutex{},
|
|
Cache: make(map[cacheKey]*CacheEntry),
|
|
}
|
|
|
|
type FileOptsType struct {
|
|
MaxSize int64 `json:"maxsize,omitempty"`
|
|
Circular bool `json:"circular,omitempty"`
|
|
IJson bool `json:"ijson,omitempty"`
|
|
}
|
|
|
|
type FileMeta = map[string]any
|
|
|
|
type BlockFile struct {
|
|
// these fields are static (not updated)
|
|
BlockId string `json:"blockid"`
|
|
Name string `json:"name"`
|
|
Opts FileOptsType `json:"opts"`
|
|
CreatedTs int64 `json:"createdts"`
|
|
|
|
// these fields are mutable
|
|
Size int64 `json:"size"`
|
|
ModTs int64 `json:"modts"`
|
|
Meta FileMeta `json:"meta"` // only top-level keys can be updated (lower levels are immutable)
|
|
}
|
|
|
|
// for regular files this is just Size
|
|
// for circular files this is min(Size, MaxSize)
|
|
func (f BlockFile) DataLength() int64 {
|
|
if f.Opts.Circular {
|
|
return minInt64(f.Size, f.Opts.MaxSize)
|
|
}
|
|
return f.Size
|
|
}
|
|
|
|
// for regular files this is just 0
|
|
// for circular files this is the index of the first byte of data we have
|
|
func (f BlockFile) DataStartIdx() int64 {
|
|
if f.Opts.Circular && f.Size > f.Opts.MaxSize {
|
|
return f.Size - f.Opts.MaxSize
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// this works because lower levels are immutable
|
|
func copyMeta(meta FileMeta) FileMeta {
|
|
newMeta := make(FileMeta)
|
|
for k, v := range meta {
|
|
newMeta[k] = v
|
|
}
|
|
return newMeta
|
|
}
|
|
|
|
func (f *BlockFile) DeepCopy() *BlockFile {
|
|
if f == nil {
|
|
return nil
|
|
}
|
|
newFile := *f
|
|
newFile.Meta = copyMeta(f.Meta)
|
|
return &newFile
|
|
}
|
|
|
|
func (BlockFile) UseDBMap() {}
|
|
|
|
type BlockData struct {
|
|
BlockId string `json:"blockid"`
|
|
Name string `json:"name"`
|
|
PartIdx int `json:"partidx"`
|
|
Data []byte `json:"data"`
|
|
}
|
|
|
|
func (BlockData) UseDBMap() {}
|
|
|
|
// synchronous (does not interact with the cache)
|
|
func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error {
|
|
if opts.MaxSize < 0 {
|
|
return fmt.Errorf("max size must be non-negative")
|
|
}
|
|
if opts.Circular && opts.MaxSize <= 0 {
|
|
return fmt.Errorf("circular file must have a max size")
|
|
}
|
|
if opts.Circular && opts.IJson {
|
|
return fmt.Errorf("circular file cannot be ijson")
|
|
}
|
|
if opts.Circular {
|
|
if opts.MaxSize%partDataSize != 0 {
|
|
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
|
|
}
|
|
}
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
if entry.File != nil {
|
|
return fs.ErrExist
|
|
}
|
|
now := time.Now().UnixMilli()
|
|
file := &BlockFile{
|
|
BlockId: blockId,
|
|
Name: name,
|
|
Size: 0,
|
|
CreatedTs: now,
|
|
ModTs: now,
|
|
Opts: opts,
|
|
Meta: meta,
|
|
}
|
|
return dbInsertFile(ctx, file)
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
err := dbDeleteFile(ctx, blockId, name)
|
|
if err != nil {
|
|
return fmt.Errorf("error deleting file: %v", err)
|
|
}
|
|
entry.clear()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
|
|
fileNames, err := dbGetBlockFileNames(ctx, blockId)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting block files: %v", err)
|
|
}
|
|
for _, name := range fileNames {
|
|
s.DeleteFile(ctx, blockId, name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// if file doesn't exsit, returns fs.ErrNotExist
|
|
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
|
return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) {
|
|
file, err := entry.loadFileForRead(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting file: %v", err)
|
|
}
|
|
return file.DeepCopy(), nil
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
|
|
files, err := dbGetBlockFiles(ctx, blockId)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting block files: %v", err)
|
|
}
|
|
for idx, file := range files {
|
|
withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error {
|
|
if entry.File != nil {
|
|
files[idx] = entry.File.DeepCopy()
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
return files, nil
|
|
}
|
|
|
|
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error {
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
err := entry.loadFileIntoCache(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if merge {
|
|
for k, v := range meta {
|
|
if v == nil {
|
|
delete(entry.File.Meta, k)
|
|
continue
|
|
}
|
|
entry.File.Meta[k] = v
|
|
}
|
|
} else {
|
|
entry.File.Meta = meta
|
|
}
|
|
entry.File.ModTs = time.Now().UnixMilli()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error {
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
err := entry.loadFileIntoCache(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
entry.writeAt(0, data, true)
|
|
// since WriteFile can *truncate* the file, we need to flush the file to the DB immediately
|
|
return entry.flushToDB(ctx, true)
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
|
|
if offset < 0 {
|
|
return fmt.Errorf("offset must be non-negative")
|
|
}
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
err := entry.loadFileIntoCache(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
file := entry.File
|
|
if offset > file.Size {
|
|
return fmt.Errorf("offset is past the end of the file")
|
|
}
|
|
partMap := file.computePartMap(offset, int64(len(data)))
|
|
incompleteParts := incompletePartsFromMap(partMap)
|
|
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
entry.writeAt(offset, data, false)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
|
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
err := entry.loadFileIntoCache(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
partMap := entry.File.computePartMap(entry.File.Size, int64(len(data)))
|
|
incompleteParts := incompletePartsFromMap(partMap)
|
|
if len(incompleteParts) > 0 {
|
|
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
entry.writeAt(entry.File.Size, data, false)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
|
|
return dbGetAllBlockIds(ctx)
|
|
}
|
|
|
|
// returns (offset, data, error)
|
|
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
|
|
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
|
|
withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
// returns (offset, data, error)
|
|
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
|
|
withLock(s, blockId, name, func(entry *CacheEntry) error {
|
|
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true)
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
type FlushStats struct {
|
|
FlushDuration time.Duration
|
|
NumDirtyEntries int
|
|
NumCommitted int
|
|
}
|
|
|
|
func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
|
|
wasFlushing := s.setUnlessFlushing()
|
|
if wasFlushing {
|
|
return stats, fmt.Errorf("flush already in progress")
|
|
}
|
|
defer s.setIsFlushing(false)
|
|
startTime := time.Now()
|
|
defer func() {
|
|
stats.FlushDuration = time.Since(startTime)
|
|
}()
|
|
|
|
// get a copy of dirty keys so we can iterate without the lock
|
|
dirtyCacheKeys := s.getDirtyCacheKeys()
|
|
stats.NumDirtyEntries = len(dirtyCacheKeys)
|
|
for _, key := range dirtyCacheKeys {
|
|
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
|
|
return entry.flushToDB(ctx, false)
|
|
})
|
|
if ctx.Err() != nil {
|
|
// transient error (also must stop the loop)
|
|
return stats, ctx.Err()
|
|
}
|
|
if err != nil {
|
|
return stats, fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
|
|
}
|
|
stats.NumCommitted++
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
///////////////////////////////////
|
|
|
|
func (f *BlockFile) partIdxAtOffset(offset int64) int {
|
|
partIdx := int(offset / partDataSize)
|
|
if f.Opts.Circular {
|
|
maxPart := int(f.Opts.MaxSize / partDataSize)
|
|
partIdx = partIdx % maxPart
|
|
}
|
|
return partIdx
|
|
}
|
|
|
|
func incompletePartsFromMap(partMap map[int]int) []int {
|
|
var incompleteParts []int
|
|
for partIdx, size := range partMap {
|
|
if size != int(partDataSize) {
|
|
incompleteParts = append(incompleteParts, partIdx)
|
|
}
|
|
}
|
|
return incompleteParts
|
|
}
|
|
|
|
func getPartIdxsFromMap(partMap map[int]int) []int {
|
|
var partIdxs []int
|
|
for partIdx := range partMap {
|
|
partIdxs = append(partIdxs, partIdx)
|
|
}
|
|
return partIdxs
|
|
}
|
|
|
|
// returns a map of partIdx to amount of data to write to that part
|
|
func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int {
|
|
partMap := make(map[int]int)
|
|
endOffset := startOffset + size
|
|
startBlockOffset := startOffset - (startOffset % partDataSize)
|
|
for testOffset := startBlockOffset; testOffset < endOffset; testOffset += partDataSize {
|
|
partIdx := file.partIdxAtOffset(testOffset)
|
|
partStartOffset := testOffset
|
|
partEndOffset := testOffset + partDataSize
|
|
partWriteStartOffset := 0
|
|
partWriteEndOffset := int(partDataSize)
|
|
if startOffset > partStartOffset && startOffset < partEndOffset {
|
|
partWriteStartOffset = int(startOffset - partStartOffset)
|
|
}
|
|
if endOffset > partStartOffset && endOffset < partEndOffset {
|
|
partWriteEndOffset = int(endOffset - partStartOffset)
|
|
}
|
|
partMap[partIdx] = partWriteEndOffset - partWriteStartOffset
|
|
}
|
|
return partMap
|
|
}
|
|
|
|
func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
|
|
s.Lock.Lock()
|
|
defer s.Lock.Unlock()
|
|
var dirtyCacheKeys []cacheKey
|
|
for key, entry := range s.Cache {
|
|
if entry.File != nil {
|
|
dirtyCacheKeys = append(dirtyCacheKeys, key)
|
|
}
|
|
}
|
|
return dirtyCacheKeys
|
|
}
|
|
|
|
func (s *BlockStore) setIsFlushing(flushing bool) {
|
|
s.Lock.Lock()
|
|
defer s.Lock.Unlock()
|
|
s.IsFlushing = flushing
|
|
}
|
|
|
|
// returns old value of IsFlushing
|
|
func (s *BlockStore) setUnlessFlushing() bool {
|
|
s.Lock.Lock()
|
|
defer s.Lock.Unlock()
|
|
if s.IsFlushing {
|
|
return true
|
|
}
|
|
s.IsFlushing = true
|
|
return false
|
|
}
|
|
|
|
func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) {
|
|
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
|
|
defer cancelFn()
|
|
return s.FlushCache(ctx)
|
|
}
|
|
|
|
func (s *BlockStore) runFlusher() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("panic in blockstore flusher: %v\n", r)
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
for {
|
|
stats, err := s.runFlushWithNewContext()
|
|
if err != nil || stats.NumDirtyEntries > 0 {
|
|
log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
|
|
}
|
|
if stopFlush.Load() {
|
|
log.Printf("blockstore flusher stopping\n")
|
|
return
|
|
}
|
|
time.Sleep(DefaultFlushTime)
|
|
}
|
|
}
|
|
|
|
func minInt64(a, b int64) int64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|