mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-02 18:39:05 +01:00
Merge pull request #3 from wavetermdev/sawka/new-blockstore
new blockstore
This commit is contained in:
commit
501b05a3e3
@ -10,7 +10,7 @@ package blockstore
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"io/fs"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -28,10 +28,8 @@ var partDataSize int64 = DefaultPartDataSize // overridden in tests
|
|||||||
var stopFlush = &atomic.Bool{}
|
var stopFlush = &atomic.Bool{}
|
||||||
|
|
||||||
var GBS *BlockStore = &BlockStore{
|
var GBS *BlockStore = &BlockStore{
|
||||||
Lock: &sync.Mutex{},
|
Lock: &sync.Mutex{},
|
||||||
Cache: make(map[cacheKey]*CacheEntry),
|
Cache: make(map[cacheKey]*CacheEntry),
|
||||||
NextIntentionId: 1,
|
|
||||||
IsFlushing: false,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type FileOptsType struct {
|
type FileOptsType struct {
|
||||||
@ -100,55 +98,33 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string,
|
|||||||
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
|
opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var cacheErr error
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
s.withLock(blockId, name, false, func(entry *CacheEntry) {
|
if entry.File != nil {
|
||||||
if entry == nil {
|
return fs.ErrExist
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if !entry.Deleted {
|
now := time.Now().UnixMilli()
|
||||||
cacheErr = fmt.Errorf("file exists")
|
file := &BlockFile{
|
||||||
return
|
BlockId: blockId,
|
||||||
|
Name: name,
|
||||||
|
Size: 0,
|
||||||
|
CreatedTs: now,
|
||||||
|
ModTs: now,
|
||||||
|
Opts: opts,
|
||||||
|
Meta: meta,
|
||||||
}
|
}
|
||||||
// deleted is set. check intentions
|
return dbInsertFile(ctx, file)
|
||||||
if entry.PinCount == 0 && len(entry.WriteIntentions) == 0 {
|
|
||||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
cacheErr = fmt.Errorf("file is deleted but has active requests")
|
|
||||||
})
|
})
|
||||||
if cacheErr != nil {
|
|
||||||
return cacheErr
|
|
||||||
}
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
file := &BlockFile{
|
|
||||||
BlockId: blockId,
|
|
||||||
Name: name,
|
|
||||||
Size: 0,
|
|
||||||
CreatedTs: now,
|
|
||||||
ModTs: now,
|
|
||||||
Opts: opts,
|
|
||||||
Meta: meta,
|
|
||||||
}
|
|
||||||
return dbInsertFile(ctx, file)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
|
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
|
||||||
err := dbDeleteFile(ctx, blockId, name)
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
if err != nil {
|
err := dbDeleteFile(ctx, blockId, name)
|
||||||
return fmt.Errorf("error deleting file: %v", err)
|
if err != nil {
|
||||||
}
|
return fmt.Errorf("error deleting file: %v", err)
|
||||||
s.withLock(blockId, name, false, func(entry *CacheEntry) {
|
|
||||||
if entry == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if entry.PinCount > 0 || len(entry.WriteIntentions) > 0 {
|
|
||||||
// mark as deleted if we have a active requests
|
|
||||||
entry.Deleted = true
|
|
||||||
} else {
|
|
||||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
|
||||||
}
|
}
|
||||||
|
entry.clear()
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
|
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
|
||||||
@ -162,23 +138,15 @@ func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if file doesn't exsit, returns fs.ErrNotExist
|
||||||
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
||||||
file, ok := s.getFileFromCache(blockId, name)
|
return withLockRtn(s, blockId, name, func(entry *CacheEntry) (*BlockFile, error) {
|
||||||
if ok {
|
file, err := entry.loadFileForRead(ctx)
|
||||||
return file, nil
|
if err != nil {
|
||||||
}
|
return nil, fmt.Errorf("error getting file: %v", err)
|
||||||
return dbGetBlockFile(ctx, blockId, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func stripNils[T any](arr []*T) []*T {
|
|
||||||
newArr := make([]*T, 0)
|
|
||||||
for _, item := range arr {
|
|
||||||
if item == nil {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
newArr = append(newArr, item)
|
return file.DeepCopy(), nil
|
||||||
}
|
})
|
||||||
return newArr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
|
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
|
||||||
@ -186,94 +154,143 @@ func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFil
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error getting block files: %v", err)
|
return nil, fmt.Errorf("error getting block files: %v", err)
|
||||||
}
|
}
|
||||||
// now we wash the files through the cache
|
for idx, file := range files {
|
||||||
var hasNils bool
|
withLock(s, file.BlockId, file.Name, func(entry *CacheEntry) error {
|
||||||
for idx, dbFile := range files {
|
if entry.File != nil {
|
||||||
cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name)
|
files[idx] = entry.File.DeepCopy()
|
||||||
if ok {
|
|
||||||
if cacheFile == nil {
|
|
||||||
hasNils = true
|
|
||||||
}
|
}
|
||||||
files[idx] = cacheFile
|
return nil
|
||||||
}
|
})
|
||||||
}
|
|
||||||
if hasNils {
|
|
||||||
files = stripNils(files)
|
|
||||||
}
|
}
|
||||||
return files, nil
|
return files, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error {
|
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error {
|
||||||
s.pinCacheEntry(blockId, name)
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
defer s.unpinCacheEntry(blockId, name)
|
err := entry.loadFileIntoCache(ctx)
|
||||||
_, err := s.loadFileInfo(ctx, blockId, name)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return fmt.Errorf("error loading file info: %v", err)
|
}
|
||||||
}
|
if merge {
|
||||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
for k, v := range meta {
|
||||||
entry.modifyFileData(func(file *BlockFile) {
|
if v == nil {
|
||||||
if merge {
|
delete(entry.File.Meta, k)
|
||||||
for k, v := range meta {
|
continue
|
||||||
if v == nil {
|
|
||||||
delete(file.Meta, k)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
file.Meta[k] = v
|
|
||||||
}
|
}
|
||||||
} else {
|
entry.File.Meta[k] = v
|
||||||
file.Meta = meta
|
|
||||||
}
|
}
|
||||||
})
|
} else {
|
||||||
|
entry.File.Meta = meta
|
||||||
|
}
|
||||||
|
entry.File.ModTs = time.Now().UnixMilli()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error {
|
||||||
file, ok := s.getFileFromCache(blockId, name)
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
if ok {
|
err := entry.loadFileIntoCache(ctx)
|
||||||
if file == nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("file not found")
|
return err
|
||||||
}
|
}
|
||||||
return file, nil
|
entry.writeAt(0, data, true)
|
||||||
}
|
// since WriteFile can *truncate* the file, we need to flush the file to the DB immediately
|
||||||
dbFile, err := dbGetBlockFile(ctx, blockId, name)
|
return entry.flushToDB(ctx, true)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error getting file: %v", err)
|
|
||||||
}
|
|
||||||
if dbFile == nil {
|
|
||||||
return nil, fmt.Errorf("file not found")
|
|
||||||
}
|
|
||||||
var rtnErr error
|
|
||||||
rtnFile := dbFile
|
|
||||||
// cannot use withLockExists because we're setting entry.FileEntry!
|
|
||||||
s.withLock(blockId, name, true, func(entry *CacheEntry) {
|
|
||||||
if entry.Deleted {
|
|
||||||
rtnFile = nil
|
|
||||||
rtnErr = fmt.Errorf("file is deleted")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if entry.FileEntry != nil {
|
|
||||||
// someone beat us to it
|
|
||||||
rtnFile = entry.FileEntry.File.DeepCopy()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
entry.FileEntry = &FileCacheEntry{
|
|
||||||
Dirty: &atomic.Bool{},
|
|
||||||
Flushing: &atomic.Bool{},
|
|
||||||
File: *dbFile.DeepCopy(), // make a copy since File must be immutable
|
|
||||||
}
|
|
||||||
// returns dbFile, nil
|
|
||||||
})
|
})
|
||||||
return rtnFile, rtnErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *BlockFile) getLastIncompletePartNum() int {
|
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
|
||||||
if f.Size%partDataSize == 0 {
|
if offset < 0 {
|
||||||
return NoPartIdx
|
return fmt.Errorf("offset must be non-negative")
|
||||||
}
|
}
|
||||||
return f.partIdxAtOffset(f.Size)
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
|
err := entry.loadFileIntoCache(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
file := entry.File
|
||||||
|
if offset > file.Size {
|
||||||
|
return fmt.Errorf("offset is past the end of the file")
|
||||||
|
}
|
||||||
|
partMap := file.computePartMap(offset, int64(len(data)))
|
||||||
|
incompleteParts := incompletePartsFromMap(partMap)
|
||||||
|
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
entry.writeAt(offset, data, false)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
|
||||||
|
return withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
|
err := entry.loadFileIntoCache(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
partMap := entry.File.computePartMap(entry.File.Size, int64(len(data)))
|
||||||
|
incompleteParts := incompletePartsFromMap(partMap)
|
||||||
|
if len(incompleteParts) > 0 {
|
||||||
|
err = entry.loadDataPartsIntoCache(ctx, incompleteParts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
entry.writeAt(entry.File.Size, data, false)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
|
||||||
|
return dbGetAllBlockIds(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns (offset, data, error)
|
||||||
|
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
|
||||||
|
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (rtnOffset int64, rtnData []byte, rtnErr error) {
|
||||||
|
withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
|
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, offset, size, false)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns (offset, data, error)
|
||||||
|
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (rtnOffset int64, rtnData []byte, rtnErr error) {
|
||||||
|
withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
|
rtnOffset, rtnData, rtnErr = entry.readAt(ctx, 0, 0, true)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) FlushCache(ctx context.Context) error {
|
||||||
|
wasFlushing := s.setUnlessFlushing()
|
||||||
|
if wasFlushing {
|
||||||
|
return fmt.Errorf("flush already in progress")
|
||||||
|
}
|
||||||
|
defer s.setIsFlushing(false)
|
||||||
|
|
||||||
|
// get a copy of dirty keys so we can iterate without the lock
|
||||||
|
dirtyCacheKeys := s.getDirtyCacheKeys()
|
||||||
|
for _, key := range dirtyCacheKeys {
|
||||||
|
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
|
||||||
|
return entry.flushToDB(ctx, false)
|
||||||
|
})
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
// transient error (also must stop the loop)
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////
|
||||||
|
|
||||||
func (f *BlockFile) partIdxAtOffset(offset int64) int {
|
func (f *BlockFile) partIdxAtOffset(offset int64) int {
|
||||||
partIdx := int(offset / partDataSize)
|
partIdx := int(offset / partDataSize)
|
||||||
if f.Opts.Circular {
|
if f.Opts.Circular {
|
||||||
@ -283,83 +300,6 @@ func (f *BlockFile) partIdxAtOffset(offset int64) int {
|
|||||||
return partIdx
|
return partIdx
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockfile must be loaded
|
|
||||||
func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error {
|
|
||||||
var partIdx int
|
|
||||||
err := s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
|
||||||
partIdx = entry.FileEntry.File.getLastIncompletePartNum()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if partIdx == NoPartIdx {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return s.loadDataParts(ctx, blockId, name, []int{partIdx})
|
|
||||||
}
|
|
||||||
|
|
||||||
func maxOfIntArr(arr []int) int {
|
|
||||||
if len(arr) == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
max := arr[0]
|
|
||||||
for _, v := range arr[1:] {
|
|
||||||
if v > max {
|
|
||||||
max = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return max
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error {
|
|
||||||
partDataMap, err := dbGetFileParts(ctx, blockId, name, parts)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error getting file part: %v", err)
|
|
||||||
}
|
|
||||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
|
||||||
for partIdx, partData := range partDataMap {
|
|
||||||
if entry.DataEntries[partIdx] != nil {
|
|
||||||
// someone beat us to it
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
entry.DataEntries[partIdx] = partData
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error {
|
|
||||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
|
||||||
entry.writeAt(entry.FileEntry.File.Size, data, false)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
|
|
||||||
s.pinCacheEntry(blockId, name)
|
|
||||||
defer s.unpinCacheEntry(blockId, name)
|
|
||||||
intentionId := s.setWriteIntention(blockId, name, WriteIntention{Append: true})
|
|
||||||
defer s.clearWriteIntention(blockId, name, intentionId)
|
|
||||||
_, err := s.loadFileInfo(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading file info: %v", err)
|
|
||||||
}
|
|
||||||
err = s.loadLastDataBlock(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading last data block: %v", err)
|
|
||||||
}
|
|
||||||
err = s.appendDataToCache(blockId, name, data)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error appending data: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
|
|
||||||
return dbGetAllBlockIds(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func incompletePartsFromMap(partMap map[int]int) []int {
|
func incompletePartsFromMap(partMap map[int]int) []int {
|
||||||
var incompleteParts []int
|
var incompleteParts []int
|
||||||
for partIdx, size := range partMap {
|
for partIdx, size := range partMap {
|
||||||
@ -370,6 +310,14 @@ func incompletePartsFromMap(partMap map[int]int) []int {
|
|||||||
return incompleteParts
|
return incompleteParts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPartIdxsFromMap(partMap map[int]int) []int {
|
||||||
|
var partIdxs []int
|
||||||
|
for partIdx := range partMap {
|
||||||
|
partIdxs = append(partIdxs, partIdx)
|
||||||
|
}
|
||||||
|
return partIdxs
|
||||||
|
}
|
||||||
|
|
||||||
// returns a map of partIdx to amount of data to write to that part
|
// returns a map of partIdx to amount of data to write to that part
|
||||||
func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int {
|
func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int {
|
||||||
partMap := make(map[int]int)
|
partMap := make(map[int]int)
|
||||||
@ -392,189 +340,18 @@ func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int
|
|||||||
return partMap
|
return partMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string, data []byte) error {
|
|
||||||
s.pinCacheEntry(blockId, name)
|
|
||||||
defer s.unpinCacheEntry(blockId, name)
|
|
||||||
intentionId := s.setWriteIntention(blockId, name, WriteIntention{Replace: true})
|
|
||||||
defer s.clearWriteIntention(blockId, name, intentionId)
|
|
||||||
_, err := s.loadFileInfo(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading file info: %v", err)
|
|
||||||
}
|
|
||||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
|
||||||
entry.writeAt(0, data, true)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
|
|
||||||
s.pinCacheEntry(blockId, name)
|
|
||||||
defer s.unpinCacheEntry(blockId, name)
|
|
||||||
file, err := s.loadFileInfo(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading file info: %v", err)
|
|
||||||
}
|
|
||||||
if offset < 0 {
|
|
||||||
return fmt.Errorf("offset must be non-negative")
|
|
||||||
}
|
|
||||||
if offset > file.Size {
|
|
||||||
return fmt.Errorf("offset is past the end of the file")
|
|
||||||
}
|
|
||||||
if file.Opts.Circular {
|
|
||||||
startCirFileOffset := file.Size - file.Opts.MaxSize
|
|
||||||
if offset+int64(len(data)) < startCirFileOffset {
|
|
||||||
// write is before the start of the circular file
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if offset < startCirFileOffset {
|
|
||||||
amtBeforeStart := startCirFileOffset - offset
|
|
||||||
offset += amtBeforeStart
|
|
||||||
data = data[amtBeforeStart:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
partMap := file.computePartMap(offset, int64(len(data)))
|
|
||||||
intentionId := s.setWriteIntention(blockId, name, WriteIntention{Parts: partMap})
|
|
||||||
defer s.clearWriteIntention(blockId, name, intentionId)
|
|
||||||
incompleteParts := incompletePartsFromMap(partMap)
|
|
||||||
err = s.loadDataParts(ctx, blockId, name, incompleteParts)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error loading data parts: %v", err)
|
|
||||||
}
|
|
||||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
|
||||||
entry.writeAt(offset, data, false)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns (offset, data, error)
|
|
||||||
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
|
|
||||||
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) {
|
|
||||||
file, err := s.Stat(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("error getting file: %v", err)
|
|
||||||
}
|
|
||||||
if file.Opts.Circular {
|
|
||||||
// we can do this check here because MaxSize for file cannot be modified
|
|
||||||
if size > file.Opts.MaxSize {
|
|
||||||
// just read the last maxsize bytes
|
|
||||||
sizeTooBig := size - file.Opts.MaxSize
|
|
||||||
offset += sizeTooBig
|
|
||||||
}
|
|
||||||
}
|
|
||||||
partMap := file.computePartMap(offset, size)
|
|
||||||
var partsNeeded []int
|
|
||||||
for partIdx := range partMap {
|
|
||||||
partsNeeded = append(partsNeeded, partIdx)
|
|
||||||
}
|
|
||||||
dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("error loading data parts: %v", err)
|
|
||||||
}
|
|
||||||
// wash the entries through the cache
|
|
||||||
s.withLock(blockId, name, false, func(entry *CacheEntry) {
|
|
||||||
if entry == nil || entry.FileEntry == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if offset+size > entry.FileEntry.File.Size {
|
|
||||||
// limit read to the actual size of the file
|
|
||||||
size = entry.FileEntry.File.Size - offset
|
|
||||||
}
|
|
||||||
for _, partIdx := range partsNeeded {
|
|
||||||
if len(entry.DataEntries) <= partIdx || entry.DataEntries[partIdx] == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dataEntries[partIdx] = entry.DataEntries[partIdx]
|
|
||||||
}
|
|
||||||
})
|
|
||||||
// combine the entries into a single byte slice
|
|
||||||
// note that we only want part of the first and last part depending on offset and size
|
|
||||||
var rtn []byte
|
|
||||||
amtLeftToRead := size
|
|
||||||
curReadOffset := offset
|
|
||||||
for amtLeftToRead > 0 {
|
|
||||||
partIdx := file.partIdxAtOffset(curReadOffset)
|
|
||||||
partDataEntry := dataEntries[partIdx]
|
|
||||||
var partData []byte
|
|
||||||
if partDataEntry == nil {
|
|
||||||
partData = make([]byte, partDataSize)
|
|
||||||
} else {
|
|
||||||
partData = partDataEntry.Data[0:partDataSize]
|
|
||||||
}
|
|
||||||
partOffset := curReadOffset % partDataSize
|
|
||||||
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
|
|
||||||
rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...)
|
|
||||||
amtLeftToRead -= amtToRead
|
|
||||||
curReadOffset += amtToRead
|
|
||||||
}
|
|
||||||
return offset, rtn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (int64, []byte, error) {
|
|
||||||
file, err := s.Stat(ctx, blockId, name)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("error getting file: %v", err)
|
|
||||||
}
|
|
||||||
if file == nil {
|
|
||||||
return 0, nil, fmt.Errorf("file not found")
|
|
||||||
}
|
|
||||||
return s.ReadAt(ctx, blockId, name, 0, file.Size)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
|
func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
|
||||||
var dirtyCacheKeys []cacheKey
|
|
||||||
s.Lock.Lock()
|
s.Lock.Lock()
|
||||||
defer s.Lock.Unlock()
|
defer s.Lock.Unlock()
|
||||||
|
var dirtyCacheKeys []cacheKey
|
||||||
for key, entry := range s.Cache {
|
for key, entry := range s.Cache {
|
||||||
if entry.FileEntry != nil && entry.FileEntry.Dirty.Load() {
|
if entry.File != nil {
|
||||||
dirtyCacheKeys = append(dirtyCacheKeys, key)
|
dirtyCacheKeys = append(dirtyCacheKeys, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dirtyCacheKeys
|
return dirtyCacheKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) (rtnErr error) {
|
|
||||||
fileEntry, dataEntries := s.getDirtyDataEntriesForFlush(blockId, name)
|
|
||||||
if fileEntry == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
// clear flushing flags (always)
|
|
||||||
// clear dirty flags if no error
|
|
||||||
// no lock required, note that we unset dirty before flushing
|
|
||||||
if rtnErr == nil {
|
|
||||||
fileEntry.Dirty.Store(false)
|
|
||||||
}
|
|
||||||
fileEntry.Flushing.Store(false)
|
|
||||||
for _, dataEntry := range dataEntries {
|
|
||||||
if rtnErr == nil {
|
|
||||||
dataEntry.Dirty.Store(false)
|
|
||||||
}
|
|
||||||
dataEntry.Flushing.Store(false)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
rtnErr = dbWriteCacheEntry(ctx, fileEntry, dataEntries)
|
|
||||||
if rtnErr != nil {
|
|
||||||
rtnErr = fmt.Errorf("error writing cache entry: %v", rtnErr)
|
|
||||||
return rtnErr
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) incrementFlushErrors(blockId string, name string) int {
|
|
||||||
var rtn int
|
|
||||||
s.withLock(blockId, name, false, func(entry *CacheEntry) {
|
|
||||||
entry.FlushErrors++
|
|
||||||
rtn = entry.FlushErrors
|
|
||||||
})
|
|
||||||
return rtn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) deleteCacheEntry(blockId string, name string) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) setIsFlushing(flushing bool) {
|
func (s *BlockStore) setIsFlushing(flushing bool) {
|
||||||
s.Lock.Lock()
|
s.Lock.Lock()
|
||||||
defer s.Lock.Unlock()
|
defer s.Lock.Unlock()
|
||||||
@ -593,38 +370,6 @@ func (s *BlockStore) setUnlessFlushing() bool {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) FlushCache(ctx context.Context) error {
|
|
||||||
wasFlushing := s.setUnlessFlushing()
|
|
||||||
if wasFlushing {
|
|
||||||
return fmt.Errorf("flush already in progress")
|
|
||||||
}
|
|
||||||
defer s.setIsFlushing(false)
|
|
||||||
|
|
||||||
// get a copy of dirty keys so we can iterate without the lock
|
|
||||||
dirtyCacheKeys := s.getDirtyCacheKeys()
|
|
||||||
for _, key := range dirtyCacheKeys {
|
|
||||||
err := s.flushFile(ctx, key.BlockId, key.Name)
|
|
||||||
if err != nil {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
// context error is transient
|
|
||||||
return fmt.Errorf("context error: %v", ctx.Err())
|
|
||||||
}
|
|
||||||
// if error is not transient, we should probably delete the offending entry :/
|
|
||||||
log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err)
|
|
||||||
flushErrorCount.Add(1)
|
|
||||||
totalErrors := s.incrementFlushErrors(key.BlockId, key.Name)
|
|
||||||
if totalErrors >= 3 {
|
|
||||||
s.deleteCacheEntry(key.BlockId, key.Name)
|
|
||||||
log.Printf("too many errors flushing file %s/%s, clear entry", key.BlockId, key.Name)
|
|
||||||
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.cleanCacheEntry(key.BlockId, key.Name)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func minInt64(a, b int64) int64 {
|
func minInt64(a, b int64) int64 {
|
||||||
if a < b {
|
if a < b {
|
||||||
return a
|
return a
|
||||||
|
@ -5,10 +5,10 @@ package blockstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"io/fs"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,123 +17,133 @@ type cacheKey struct {
|
|||||||
Name string
|
Name string
|
||||||
}
|
}
|
||||||
|
|
||||||
// note about "Dirty" and "Flushing" fields:
|
type BlockStore struct {
|
||||||
// - Dirty is set to true when the entry is modified
|
Lock *sync.Mutex
|
||||||
// - Flushing is set to true when the entry is being flushed to disk
|
Cache map[cacheKey]*CacheEntry
|
||||||
// note these fields can *only* be set to true while holding the store lock
|
IsFlushing bool
|
||||||
// but the flusher may set them to false without the lock (when the flusher no longer will read the entry fields)
|
}
|
||||||
// the flusher *must* unset Dirty first, then Flushing
|
|
||||||
// other code should test Flushing before Dirty
|
|
||||||
// that means you *cannot* write a field in a cache entry if Flushing.Load() is true (you must make a copy)
|
|
||||||
type DataCacheEntry struct {
|
type DataCacheEntry struct {
|
||||||
Dirty *atomic.Bool
|
PartIdx int
|
||||||
Flushing *atomic.Bool
|
Data []byte // capacity is always BlockDataPartSize
|
||||||
PartIdx int
|
|
||||||
Data []byte // capacity is always BlockDataPartSize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type FileCacheEntry struct {
|
// if File or DataEntries are not nil then they are dirty (need to be flushed to disk)
|
||||||
Dirty *atomic.Bool
|
|
||||||
Flushing *atomic.Bool
|
|
||||||
File BlockFile
|
|
||||||
}
|
|
||||||
|
|
||||||
type WriteIntention struct {
|
|
||||||
Parts map[int]int
|
|
||||||
Append bool
|
|
||||||
Replace bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// invariants:
|
|
||||||
// - we only modify CacheEntry fields when we are holding the BlockStore lock
|
|
||||||
// - FileEntry can be nil, if pinned
|
|
||||||
// - FileEntry.File is never updated in place, the entire FileEntry is replaced
|
|
||||||
// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced
|
|
||||||
// - when pinned, the cache entry is never removed
|
|
||||||
// this allows us to flush the cache entry to disk without holding the lock
|
|
||||||
// if there is a dirty data entry, then FileEntry must also be dirty
|
|
||||||
type CacheEntry struct {
|
type CacheEntry struct {
|
||||||
BlockId string
|
PinCount int // this is synchronzed with the BlockStore lock (not the entry lock)
|
||||||
Name string
|
|
||||||
PinCount int
|
Lock *sync.Mutex
|
||||||
Deleted bool
|
BlockId string
|
||||||
WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention
|
Name string
|
||||||
FileEntry *FileCacheEntry
|
File *BlockFile
|
||||||
DataEntries map[int]*DataCacheEntry
|
DataEntries map[int]*DataCacheEntry
|
||||||
FlushErrors int
|
FlushErrors int
|
||||||
}
|
}
|
||||||
|
|
||||||
//lint:ignore U1000 used for testing
|
//lint:ignore U1000 used for testing
|
||||||
func (e *CacheEntry) dump() string {
|
func (e *CacheEntry) dump() string {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.PinCount, e.Deleted, e.WriteIntentions)
|
fmt.Fprintf(&buf, "CacheEntry [BlockId: %q, Name: %q] PinCount: %d\n", e.BlockId, e.Name, e.PinCount)
|
||||||
if e.FileEntry != nil {
|
fmt.Fprintf(&buf, " FileEntry: %v\n", e.File)
|
||||||
fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File)
|
|
||||||
}
|
|
||||||
for idx, dce := range e.DataEntries {
|
for idx, dce := range e.DataEntries {
|
||||||
fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", idx, dce.Dirty.Load(), string(dce.Data))
|
fmt.Fprintf(&buf, " DataEntry[%d]: %q\n", idx, string(dce.Data))
|
||||||
}
|
|
||||||
buf.WriteString("}\n")
|
|
||||||
return buf.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
//lint:ignore U1000 used for testing
|
|
||||||
func (s *BlockStore) dump() string {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
var buf bytes.Buffer
|
|
||||||
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache)))
|
|
||||||
for _, v := range s.Cache {
|
|
||||||
entryStr := v.dump()
|
|
||||||
buf.WriteString(entryStr)
|
|
||||||
buf.WriteString("\n")
|
|
||||||
}
|
}
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
|
func makeDataCacheEntry(partIdx int) *DataCacheEntry {
|
||||||
return &DataCacheEntry{
|
return &DataCacheEntry{
|
||||||
Dirty: &atomic.Bool{},
|
PartIdx: partIdx,
|
||||||
Flushing: &atomic.Bool{},
|
Data: make([]byte, 0, partDataSize),
|
||||||
PartIdx: partIdx,
|
|
||||||
Data: make([]byte, 0, partDataSize),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for testing
|
// will create new entries
|
||||||
func (s *BlockStore) getCacheSize() int {
|
func (s *BlockStore) getEntryAndPin(blockId string, name string) *CacheEntry {
|
||||||
s.Lock.Lock()
|
s.Lock.Lock()
|
||||||
defer s.Lock.Unlock()
|
defer s.Lock.Unlock()
|
||||||
return len(s.Cache)
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
entry = makeCacheEntry(blockId, name)
|
||||||
|
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
|
||||||
|
}
|
||||||
|
entry.PinCount++
|
||||||
|
return entry
|
||||||
}
|
}
|
||||||
|
|
||||||
// for testing
|
func (s *BlockStore) unpinEntryAndTryDelete(blockId string, name string) {
|
||||||
func (s *BlockStore) clearCache() {
|
|
||||||
s.Lock.Lock()
|
s.Lock.Lock()
|
||||||
defer s.Lock.Unlock()
|
defer s.Lock.Unlock()
|
||||||
s.Cache = make(map[cacheKey]*CacheEntry)
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.PinCount--
|
||||||
|
if entry.PinCount <= 0 && entry.File == nil {
|
||||||
|
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
|
func (entry *CacheEntry) clear() {
|
||||||
if e.DataEntries[partIdx] == nil {
|
entry.File = nil
|
||||||
e.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
|
entry.DataEntries = make(map[int]*DataCacheEntry)
|
||||||
}
|
entry.FlushErrors = 0
|
||||||
return e.DataEntries[partIdx]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dce *DataCacheEntry) clonePart() *DataCacheEntry {
|
func (entry *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
|
||||||
rtn := makeDataCacheEntry(dce.PartIdx)
|
if entry.DataEntries[partIdx] == nil {
|
||||||
copy(rtn.Data, dce.Data)
|
entry.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
|
||||||
if dce.Dirty.Load() {
|
|
||||||
rtn.Dirty.Store(true)
|
|
||||||
}
|
}
|
||||||
return rtn
|
return entry.DataEntries[partIdx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns err if file does not exist
|
||||||
|
func (entry *CacheEntry) loadFileIntoCache(ctx context.Context) error {
|
||||||
|
if entry.File != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
file, err := entry.loadFileForRead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
entry.File = file
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// does not populate the cache entry, returns err if file does not exist
|
||||||
|
func (entry *CacheEntry) loadFileForRead(ctx context.Context) (*BlockFile, error) {
|
||||||
|
if entry.File != nil {
|
||||||
|
return entry.File, nil
|
||||||
|
}
|
||||||
|
file, err := dbGetBlockFile(ctx, entry.BlockId, entry.Name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting file: %w", err)
|
||||||
|
}
|
||||||
|
if file == nil {
|
||||||
|
return nil, fs.ErrNotExist
|
||||||
|
}
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func withLock(s *BlockStore, blockId string, name string, fn func(*CacheEntry) error) error {
|
||||||
|
entry := s.getEntryAndPin(blockId, name)
|
||||||
|
defer s.unpinEntryAndTryDelete(blockId, name)
|
||||||
|
entry.Lock.Lock()
|
||||||
|
defer entry.Lock.Unlock()
|
||||||
|
return fn(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func withLockRtn[T any](s *BlockStore, blockId string, name string, fn func(*CacheEntry) (T, error)) (T, error) {
|
||||||
|
var rtnVal T
|
||||||
|
rtnErr := withLock(s, blockId, name, func(entry *CacheEntry) error {
|
||||||
|
var err error
|
||||||
|
rtnVal, err = fn(entry)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return rtnVal, rtnErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
|
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataCacheEntry) {
|
||||||
if dce.Flushing.Load() {
|
|
||||||
dce = dce.clonePart()
|
|
||||||
}
|
|
||||||
leftInPart := partDataSize - offset
|
leftInPart := partDataSize - offset
|
||||||
toWrite := int64(len(data))
|
toWrite := int64(len(data))
|
||||||
if toWrite > leftInPart {
|
if toWrite > leftInPart {
|
||||||
@ -143,19 +153,40 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC
|
|||||||
dce.Data = dce.Data[:offset+toWrite]
|
dce.Data = dce.Data[:offset+toWrite]
|
||||||
}
|
}
|
||||||
copy(dce.Data[offset:], data[:toWrite])
|
copy(dce.Data[offset:], data[:toWrite])
|
||||||
dce.Dirty.Store(true)
|
|
||||||
return toWrite, dce
|
return toWrite, dce
|
||||||
}
|
}
|
||||||
|
|
||||||
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
||||||
|
if replace {
|
||||||
|
entry.File.Size = 0
|
||||||
|
}
|
||||||
|
if entry.File.Opts.Circular {
|
||||||
|
startCirFileOffset := entry.File.Size - entry.File.Opts.MaxSize
|
||||||
|
if offset+int64(len(data)) <= startCirFileOffset {
|
||||||
|
// write is before the start of the circular file
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if offset < startCirFileOffset {
|
||||||
|
// truncate data (from the front), update offset
|
||||||
|
truncateAmt := startCirFileOffset - offset
|
||||||
|
data = data[truncateAmt:]
|
||||||
|
offset += truncateAmt
|
||||||
|
}
|
||||||
|
if int64(len(data)) > entry.File.Opts.MaxSize {
|
||||||
|
// truncate data (from the front), update offset
|
||||||
|
truncateAmt := int64(len(data)) - entry.File.Opts.MaxSize
|
||||||
|
data = data[truncateAmt:]
|
||||||
|
offset += truncateAmt
|
||||||
|
}
|
||||||
|
}
|
||||||
endWriteOffset := offset + int64(len(data))
|
endWriteOffset := offset + int64(len(data))
|
||||||
if replace {
|
if replace {
|
||||||
entry.DataEntries = make(map[int]*DataCacheEntry)
|
entry.DataEntries = make(map[int]*DataCacheEntry)
|
||||||
}
|
}
|
||||||
for len(data) > 0 {
|
for len(data) > 0 {
|
||||||
partIdx := int(offset / partDataSize)
|
partIdx := int(offset / partDataSize)
|
||||||
if entry.FileEntry.File.Opts.Circular {
|
if entry.File.Opts.Circular {
|
||||||
maxPart := int(entry.FileEntry.File.Opts.MaxSize / partDataSize)
|
maxPart := int(entry.File.Opts.MaxSize / partDataSize)
|
||||||
partIdx = partIdx % maxPart
|
partIdx = partIdx % maxPart
|
||||||
}
|
}
|
||||||
partOffset := offset % partDataSize
|
partOffset := offset % partDataSize
|
||||||
@ -165,211 +196,152 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
|||||||
data = data[nw:]
|
data = data[nw:]
|
||||||
offset += nw
|
offset += nw
|
||||||
}
|
}
|
||||||
entry.modifyFileData(func(file *BlockFile) {
|
if endWriteOffset > entry.File.Size || replace {
|
||||||
if endWriteOffset > file.Size || replace {
|
entry.File.Size = endWriteOffset
|
||||||
file.Size = endWriteOffset
|
}
|
||||||
}
|
entry.File.ModTs = time.Now().UnixMilli()
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type BlockStore struct {
|
// returns (realOffset, data, error)
|
||||||
Lock *sync.Mutex
|
func (entry *CacheEntry) readAt(ctx context.Context, offset int64, size int64, readFull bool) (int64, []byte, error) {
|
||||||
Cache map[cacheKey]*CacheEntry
|
if offset < 0 {
|
||||||
NextIntentionId int
|
return 0, nil, fmt.Errorf("offset cannot be negative")
|
||||||
IsFlushing bool
|
}
|
||||||
|
file, err := entry.loadFileForRead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
if readFull {
|
||||||
|
size = file.Size - offset
|
||||||
|
}
|
||||||
|
if offset+size > file.Size {
|
||||||
|
size = file.Size - offset
|
||||||
|
}
|
||||||
|
if file.Opts.Circular {
|
||||||
|
realDataOffset := int64(0)
|
||||||
|
if file.Size > file.Opts.MaxSize {
|
||||||
|
realDataOffset = file.Size - file.Opts.MaxSize
|
||||||
|
}
|
||||||
|
if offset < realDataOffset {
|
||||||
|
truncateAmt := realDataOffset - offset
|
||||||
|
offset += truncateAmt
|
||||||
|
size -= truncateAmt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
partMap := file.computePartMap(offset, size)
|
||||||
|
dataEntryMap, err := entry.loadDataPartsForRead(ctx, getPartIdxsFromMap(partMap))
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
// combine the entries into a single byte slice
|
||||||
|
// note that we only want part of the first and last part depending on offset and size
|
||||||
|
rtnData := make([]byte, 0, size)
|
||||||
|
amtLeftToRead := size
|
||||||
|
curReadOffset := offset
|
||||||
|
for amtLeftToRead > 0 {
|
||||||
|
partIdx := file.partIdxAtOffset(curReadOffset)
|
||||||
|
partDataEntry := dataEntryMap[partIdx]
|
||||||
|
var partData []byte
|
||||||
|
if partDataEntry == nil {
|
||||||
|
partData = make([]byte, partDataSize)
|
||||||
|
} else {
|
||||||
|
partData = partDataEntry.Data[0:partDataSize]
|
||||||
|
}
|
||||||
|
partOffset := curReadOffset % partDataSize
|
||||||
|
amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead)
|
||||||
|
rtnData = append(rtnData, partData[partOffset:partOffset+amtToRead]...)
|
||||||
|
amtLeftToRead -= amtToRead
|
||||||
|
curReadOffset += amtToRead
|
||||||
|
}
|
||||||
|
return offset, rtnData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prunePartsWithCache(dataEntries map[int]*DataCacheEntry, parts []int) []int {
|
||||||
|
var rtn []int
|
||||||
|
for _, partIdx := range parts {
|
||||||
|
if dataEntries[partIdx] != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rtn = append(rtn, partIdx)
|
||||||
|
}
|
||||||
|
return rtn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry *CacheEntry) loadDataPartsIntoCache(ctx context.Context, parts []int) error {
|
||||||
|
parts = prunePartsWithCache(entry.DataEntries, parts)
|
||||||
|
if len(parts) == 0 {
|
||||||
|
// parts are already loaded
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dbDataParts, err := dbGetFileParts(ctx, entry.BlockId, entry.Name, parts)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting data parts: %w", err)
|
||||||
|
}
|
||||||
|
for partIdx, dce := range dbDataParts {
|
||||||
|
entry.DataEntries[partIdx] = dce
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry *CacheEntry) loadDataPartsForRead(ctx context.Context, parts []int) (map[int]*DataCacheEntry, error) {
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
dbParts := prunePartsWithCache(entry.DataEntries, parts)
|
||||||
|
var dbDataParts map[int]*DataCacheEntry
|
||||||
|
if len(dbParts) > 0 {
|
||||||
|
var err error
|
||||||
|
dbDataParts, err = dbGetFileParts(ctx, entry.BlockId, entry.Name, dbParts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting data parts: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rtn := make(map[int]*DataCacheEntry)
|
||||||
|
for _, partIdx := range parts {
|
||||||
|
if entry.DataEntries[partIdx] != nil {
|
||||||
|
rtn[partIdx] = entry.DataEntries[partIdx]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if dbDataParts[partIdx] != nil {
|
||||||
|
rtn[partIdx] = dbDataParts[partIdx]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// part not found
|
||||||
|
}
|
||||||
|
return rtn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeCacheEntry(blockId string, name string) *CacheEntry {
|
func makeCacheEntry(blockId string, name string) *CacheEntry {
|
||||||
return &CacheEntry{
|
return &CacheEntry{
|
||||||
BlockId: blockId,
|
Lock: &sync.Mutex{},
|
||||||
Name: name,
|
BlockId: blockId,
|
||||||
PinCount: 0,
|
Name: name,
|
||||||
WriteIntentions: make(map[int]WriteIntention),
|
PinCount: 0,
|
||||||
FileEntry: nil,
|
File: nil,
|
||||||
DataEntries: make(map[int]*DataCacheEntry),
|
DataEntries: make(map[int]*DataCacheEntry),
|
||||||
FlushErrors: 0,
|
FlushErrors: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) {
|
func (entry *CacheEntry) flushToDB(ctx context.Context, replace bool) error {
|
||||||
s.Lock.Lock()
|
if entry.File == nil {
|
||||||
defer s.Lock.Unlock()
|
return nil
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
}
|
||||||
if entry == nil {
|
err := dbWriteCacheEntry(ctx, entry.File, entry.DataEntries, replace)
|
||||||
if shouldCreate {
|
if ctx.Err() != nil {
|
||||||
entry = makeCacheEntry(blockId, name)
|
// transient error
|
||||||
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
flushErrorCount.Add(1)
|
||||||
|
entry.FlushErrors++
|
||||||
|
if entry.FlushErrors > 3 {
|
||||||
|
entry.clear()
|
||||||
|
return fmt.Errorf("too many flush errors (clearing entry): %w", err)
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
f(entry)
|
// clear cache entry (data is now in db)
|
||||||
}
|
entry.clear()
|
||||||
|
return nil
|
||||||
func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil || entry.Deleted || entry.FileEntry == nil {
|
|
||||||
return fmt.Errorf("file not found")
|
|
||||||
}
|
|
||||||
return f(entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) pinCacheEntry(blockId string, name string) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
entry = makeCacheEntry(blockId, name)
|
|
||||||
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
|
|
||||||
}
|
|
||||||
entry.PinCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) setWriteIntention(blockId string, name string, intention WriteIntention) int {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
intentionId := s.NextIntentionId
|
|
||||||
s.NextIntentionId++
|
|
||||||
entry.WriteIntentions[intentionId] = intention
|
|
||||||
return intentionId
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionId int) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
warningCount.Add(1)
|
|
||||||
log.Printf("warning: cannot find write intention to clear %q %q", blockId, name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
delete(entry.WriteIntentions, intentionId)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BlockStore) unpinCacheEntry(blockId string, name string) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
warningCount.Add(1)
|
|
||||||
log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
entry.PinCount--
|
|
||||||
}
|
|
||||||
|
|
||||||
// getFileFromCache returns the file from the cache if it exists
|
|
||||||
// makes a copy, so it can be used by the caller
|
|
||||||
// return (file, cached)
|
|
||||||
func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
if entry.Deleted {
|
|
||||||
return nil, true
|
|
||||||
}
|
|
||||||
if entry.FileEntry == nil {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
return entry.FileEntry.File.DeepCopy(), true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) {
|
|
||||||
var fileEntry = e.FileEntry
|
|
||||||
if e.FileEntry.Flushing.Load() {
|
|
||||||
// must make a copy
|
|
||||||
fileEntry = &FileCacheEntry{
|
|
||||||
Dirty: &atomic.Bool{},
|
|
||||||
Flushing: &atomic.Bool{},
|
|
||||||
File: *e.FileEntry.File.DeepCopy(),
|
|
||||||
}
|
|
||||||
e.FileEntry = fileEntry
|
|
||||||
}
|
|
||||||
// always set to dirty (we're modifying it)
|
|
||||||
fileEntry.Dirty.Store(true)
|
|
||||||
fileEntry.File.ModTs = time.Now().UnixMilli()
|
|
||||||
fn(&fileEntry.File)
|
|
||||||
}
|
|
||||||
|
|
||||||
// also sets Flushing to true on fileentry / dataentries
|
|
||||||
func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*FileCacheEntry, []*DataCacheEntry) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if entry.Deleted || entry.FileEntry == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
var dirtyData []*DataCacheEntry
|
|
||||||
for _, dce := range entry.DataEntries {
|
|
||||||
if dce != nil && dce.Dirty.Load() {
|
|
||||||
dce.Flushing.Store(true)
|
|
||||||
dirtyData = append(dirtyData, dce)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !entry.FileEntry.Dirty.Load() && len(dirtyData) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
for _, data := range dirtyData {
|
|
||||||
data.Flushing.Store(true)
|
|
||||||
}
|
|
||||||
entry.FileEntry.Flushing.Store(true)
|
|
||||||
return entry.FileEntry, dirtyData
|
|
||||||
}
|
|
||||||
|
|
||||||
func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool {
|
|
||||||
if entry.FileEntry == nil {
|
|
||||||
warningCount.Add(1)
|
|
||||||
log.Printf("warning: checking pinned, but no FileEntry %q %q", entry.BlockId, entry.Name)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
lastIncomplete := entry.FileEntry.File.getLastIncompletePartNum()
|
|
||||||
for _, intention := range entry.WriteIntentions {
|
|
||||||
if intention.Append && partIdx == lastIncomplete {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if intention.Parts[partIdx] > 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// note "replace" does not pin anything
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// removes clean data entries (if they aren't pinned)
|
|
||||||
// and if the entire cache entry is not in use and is clean, removes the entry
|
|
||||||
func (s *BlockStore) cleanCacheEntry(blockId string, name string) {
|
|
||||||
s.Lock.Lock()
|
|
||||||
defer s.Lock.Unlock()
|
|
||||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
|
||||||
if entry == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var hasDirtyData bool
|
|
||||||
for _, dce := range entry.DataEntries {
|
|
||||||
if dce.Flushing.Load() || dce.Dirty.Load() || entry.isDataBlockPinned(dce.PartIdx) {
|
|
||||||
hasDirtyData = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
delete(entry.DataEntries, dce.PartIdx)
|
|
||||||
}
|
|
||||||
if hasDirtyData || (entry.FileEntry != nil && entry.FileEntry.Dirty.Load()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if entry.PinCount > 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(entry.WriteIntentions) > 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ package blockstore
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"os"
|
||||||
|
|
||||||
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
|
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
|
||||||
)
|
)
|
||||||
@ -54,14 +54,15 @@ func dbGetAllBlockIds(ctx context.Context) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
|
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
|
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
|
||||||
var data []*DataCacheEntry
|
var data []*DataCacheEntry
|
||||||
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
|
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
|
||||||
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
|
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
|
||||||
rtn := make(map[int]*DataCacheEntry)
|
rtn := make(map[int]*DataCacheEntry)
|
||||||
for _, d := range data {
|
for _, d := range data {
|
||||||
d.Dirty = &atomic.Bool{}
|
|
||||||
d.Flushing = &atomic.Bool{}
|
|
||||||
if cap(d.Data) != int(partDataSize) {
|
if cap(d.Data) != int(partDataSize) {
|
||||||
newData := make([]byte, len(d.Data), partDataSize)
|
newData := make([]byte, len(d.Data), partDataSize)
|
||||||
copy(newData, d.Data)
|
copy(newData, d.Data)
|
||||||
@ -81,26 +82,26 @@ func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntries []*DataCacheEntry) error {
|
func dbWriteCacheEntry(ctx context.Context, file *BlockFile, dataEntries map[int]*DataCacheEntry, replace bool) error {
|
||||||
if fileEntry == nil {
|
|
||||||
return fmt.Errorf("fileEntry or fileEntry.File is nil")
|
|
||||||
}
|
|
||||||
return WithTx(ctx, func(tx *TxWrap) error {
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
|
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
|
||||||
if !tx.Exists(query, fileEntry.File.BlockId, fileEntry.File.Name) {
|
if !tx.Exists(query, file.BlockId, file.Name) {
|
||||||
// since deletion is synchronous this stops us from writing to a deleted file
|
// since deletion is synchronous this stops us from writing to a deleted file
|
||||||
return fmt.Errorf("file not found in db")
|
return os.ErrNotExist
|
||||||
}
|
}
|
||||||
if fileEntry.Dirty.Load() {
|
// we don't update CreatedTs or Opts
|
||||||
query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?`
|
query = `UPDATE db_block_file SET size = ?, modts = ?, meta = ? WHERE blockid = ? AND name = ?`
|
||||||
tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, dbutil.QuickJson(fileEntry.File.Opts), dbutil.QuickJson(fileEntry.File.Meta), fileEntry.File.BlockId, fileEntry.File.Name)
|
tx.Exec(query, file.Size, file.ModTs, dbutil.QuickJson(file.Meta), file.BlockId, file.Name)
|
||||||
|
if replace {
|
||||||
|
query = `DELETE FROM db_block_data WHERE blockid = ? AND name = ?`
|
||||||
|
tx.Exec(query, file.BlockId, file.Name)
|
||||||
}
|
}
|
||||||
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
|
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
|
||||||
for _, dataEntry := range dataEntries {
|
for partIdx, dataEntry := range dataEntries {
|
||||||
if dataEntry == nil || !dataEntry.Dirty.Load() {
|
if partIdx != dataEntry.PartIdx {
|
||||||
continue
|
panic(fmt.Sprintf("partIdx:%d and dataEntry.PartIdx:%d do not match", partIdx, dataEntry.PartIdx))
|
||||||
}
|
}
|
||||||
tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data)
|
tx.Exec(dataPartQuery, file.BlockId, file.Name, dataEntry.PartIdx, dataEntry.Data)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -6,7 +6,9 @@ package blockstore
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -45,6 +47,32 @@ func cleanupDb(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) getCacheSize() int {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
return len(s.Cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) clearCache() {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
s.Cache = make(map[cacheKey]*CacheEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 used for testing
|
||||||
|
func (s *BlockStore) dump() string {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(fmt.Sprintf("BlockStore %d entries\n", len(s.Cache)))
|
||||||
|
for _, v := range s.Cache {
|
||||||
|
entryStr := v.dump()
|
||||||
|
buf.WriteString(entryStr)
|
||||||
|
buf.WriteString("\n")
|
||||||
|
}
|
||||||
|
return buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
initDb(t)
|
initDb(t)
|
||||||
defer cleanupDb(t)
|
defer cleanupDb(t)
|
||||||
@ -134,12 +162,9 @@ func TestDelete(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error deleting file: %v", err)
|
t.Fatalf("error deleting file: %v", err)
|
||||||
}
|
}
|
||||||
file, err := GBS.Stat(ctx, blockId, "testfile")
|
_, err = GBS.Stat(ctx, blockId, "testfile")
|
||||||
if err != nil {
|
if err == nil || errors.Is(err, fs.ErrNotExist) {
|
||||||
t.Fatalf("error stating file: %v", err)
|
t.Errorf("expected file not found error")
|
||||||
}
|
|
||||||
if file != nil {
|
|
||||||
t.Fatalf("file should not be found")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// create two files in same block, use DeleteBlock to delete
|
// create two files in same block, use DeleteBlock to delete
|
||||||
@ -376,7 +401,6 @@ func TestCircularWrites(t *testing.T) {
|
|||||||
t.Fatalf("error writing data: %v", err)
|
t.Fatalf("error writing data: %v", err)
|
||||||
}
|
}
|
||||||
checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ")
|
checkFileData(t, ctx, blockId, "c1", "123456789 123456789 123456789 123456789 123456789 ")
|
||||||
|
|
||||||
err = GBS.AppendData(ctx, blockId, "c1", []byte("apple"))
|
err = GBS.AppendData(ctx, blockId, "c1", []byte("apple"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error appending data: %v", err)
|
t.Fatalf("error appending data: %v", err)
|
||||||
@ -426,14 +450,14 @@ func TestCircularWrites(t *testing.T) {
|
|||||||
}
|
}
|
||||||
checkFileSize(t, ctx, blockId, "c1", 128)
|
checkFileSize(t, ctx, blockId, "c1", 128)
|
||||||
checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789")
|
checkFileData(t, ctx, blockId, "c1", " 123456789 123456789 123456789 bar456789 123456789")
|
||||||
GBS.withLock(blockId, "c1", false, func(entry *CacheEntry) {
|
err = withLock(GBS, blockId, "c1", func(entry *CacheEntry) error {
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
err = fmt.Errorf("entry not found")
|
return fmt.Errorf("entry not found")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if len(entry.DataEntries) != 1 {
|
if len(entry.DataEntries) != 1 {
|
||||||
err = fmt.Errorf("data entries mismatch: expected 1, got %d", len(entry.DataEntries))
|
return fmt.Errorf("data entries mismatch: expected 1, got %d", len(entry.DataEntries))
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error checking data entries: %v", err)
|
t.Fatalf("error checking data entries: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user