mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-04 18:59:08 +01:00
reimplement blockstore, needs testing
This commit is contained in:
parent
da03fbe8f2
commit
4da5a4f610
1
go.mod
1
go.mod
@ -10,6 +10,7 @@ require (
|
|||||||
github.com/mattn/go-sqlite3 v1.14.22
|
github.com/mattn/go-sqlite3 v1.14.22
|
||||||
github.com/sawka/txwrap v0.1.2
|
github.com/sawka/txwrap v0.1.2
|
||||||
github.com/wailsapp/wails/v3 v3.0.0-alpha.0
|
github.com/wailsapp/wails/v3 v3.0.0-alpha.0
|
||||||
|
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
2
go.sum
2
go.sum
@ -123,6 +123,8 @@ github.com/wailsapp/go-webview2 v1.0.9 h1:lrU+q0cf1wgLdR69rN+ZnRtMJNaJRrcQ4ELxoO
|
|||||||
github.com/wailsapp/go-webview2 v1.0.9/go.mod h1:Uk2BePfCRzttBBjFrBmqKGJd41P6QIHeV9kTgIeOZNo=
|
github.com/wailsapp/go-webview2 v1.0.9/go.mod h1:Uk2BePfCRzttBBjFrBmqKGJd41P6QIHeV9kTgIeOZNo=
|
||||||
github.com/wailsapp/mimetype v1.4.1 h1:pQN9ycO7uo4vsUUuPeHEYoUkLVkaRntMnHJxVwYhwHs=
|
github.com/wailsapp/mimetype v1.4.1 h1:pQN9ycO7uo4vsUUuPeHEYoUkLVkaRntMnHJxVwYhwHs=
|
||||||
github.com/wailsapp/mimetype v1.4.1/go.mod h1:9aV5k31bBOv5z6u+QP8TltzvNGJPmNJD4XlAL3U+j3o=
|
github.com/wailsapp/mimetype v1.4.1/go.mod h1:9aV5k31bBOv5z6u+QP8TltzvNGJPmNJD4XlAL3U+j3o=
|
||||||
|
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94 h1:/SPCxd4KHlS4eRTreYEXWFRr8WfRFBcChlV5cgkaO58=
|
||||||
|
github.com/wavetermdev/waveterm/wavesrv v0.0.0-20240508181017-d07068c09d94/go.mod h1:ywoo7DXdYueQ0tTPhVoB+wzRTgERSE19EA3mR6KGRaI=
|
||||||
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
|
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
|
||||||
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
|
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
|
||||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||||
|
@ -3,42 +3,34 @@
|
|||||||
|
|
||||||
package blockstore
|
package blockstore
|
||||||
|
|
||||||
|
// the blockstore package implements a write cache for block files
|
||||||
|
// it is not a read cache (reads still go to the DB -- unless items are in the cache)
|
||||||
|
// but all writes only go to the cache, and then the cache is periodically flushed to the DB
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql/driver"
|
"context"
|
||||||
"encoding/json"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const PartDataSize = 64 * 1024
|
||||||
|
const DefaultFlushTime = 5 * time.Second
|
||||||
|
const NoPartIdx = -1
|
||||||
|
|
||||||
|
var GlobalBlockStore *BlockStore = &BlockStore{
|
||||||
|
Lock: &sync.Mutex{},
|
||||||
|
Cache: make(map[cacheKey]*CacheEntry),
|
||||||
|
FlushTime: DefaultFlushTime,
|
||||||
|
}
|
||||||
|
|
||||||
type FileOptsType struct {
|
type FileOptsType struct {
|
||||||
MaxSize int64
|
MaxSize int64
|
||||||
Circular bool
|
Circular bool
|
||||||
IJson bool
|
IJson bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FileOptsType) Scan(value interface{}) error {
|
type FileMeta = map[string]any
|
||||||
return json.Unmarshal(value.([]byte), f)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f FileOptsType) Value() (driver.Value, error) {
|
|
||||||
barr, err := json.Marshal(f)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return string(barr), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type FileMeta map[string]any
|
|
||||||
|
|
||||||
func (m *FileMeta) Scan(value interface{}) error {
|
|
||||||
return json.Unmarshal(value.([]byte), m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m FileMeta) Value() (driver.Value, error) {
|
|
||||||
barr, err := json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return string(barr), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockFile struct {
|
type BlockFile struct {
|
||||||
BlockId string `json:"blockid"`
|
BlockId string `json:"blockid"`
|
||||||
@ -49,3 +41,379 @@ type BlockFile struct {
|
|||||||
Opts FileOptsType `json:"opts"`
|
Opts FileOptsType `json:"opts"`
|
||||||
Meta FileMeta `json:"meta"`
|
Meta FileMeta `json:"meta"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func copyMeta(meta FileMeta) FileMeta {
|
||||||
|
newMeta := make(FileMeta)
|
||||||
|
for k, v := range meta {
|
||||||
|
newMeta[k] = v
|
||||||
|
}
|
||||||
|
return newMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BlockFile) DeepCopy() *BlockFile {
|
||||||
|
if f == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
newFile := *f
|
||||||
|
newFile.Meta = copyMeta(f.Meta)
|
||||||
|
return &newFile
|
||||||
|
}
|
||||||
|
|
||||||
|
func (BlockFile) UseDBMap() {}
|
||||||
|
|
||||||
|
type BlockData struct {
|
||||||
|
BlockId string `json:"blockid"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
PartIdx int `json:"partidx"`
|
||||||
|
Data []byte `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (BlockData) UseDBMap() {}
|
||||||
|
|
||||||
|
// synchronous (does not interact with the cache)
|
||||||
|
func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error {
|
||||||
|
if opts.MaxSize < 0 {
|
||||||
|
return fmt.Errorf("max size must be non-negative")
|
||||||
|
}
|
||||||
|
if opts.Circular && opts.MaxSize <= 0 {
|
||||||
|
return fmt.Errorf("circular file must have a max size")
|
||||||
|
}
|
||||||
|
if opts.Circular && opts.IJson {
|
||||||
|
return fmt.Errorf("circular file cannot be ijson")
|
||||||
|
}
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
file := &BlockFile{
|
||||||
|
BlockId: blockId,
|
||||||
|
Name: name,
|
||||||
|
Size: 0,
|
||||||
|
CreatedTs: now,
|
||||||
|
ModTs: now,
|
||||||
|
Opts: opts,
|
||||||
|
Meta: meta,
|
||||||
|
}
|
||||||
|
return dbInsertFile(ctx, file)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error {
|
||||||
|
err := dbDeleteFile(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error deleting file: %v", err)
|
||||||
|
}
|
||||||
|
s.withLock(blockId, name, false, func(entry *CacheEntry) {
|
||||||
|
if entry == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.Deleted = true
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error {
|
||||||
|
fileNames, err := dbGetBlockFileNames(ctx, blockId)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting block files: %v", err)
|
||||||
|
}
|
||||||
|
for _, name := range fileNames {
|
||||||
|
s.DeleteFile(ctx, blockId, name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
||||||
|
file, ok := s.getFileFromCache(blockId, name)
|
||||||
|
if ok {
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
return dbGetFile(ctx, blockId, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func stripNils[T any](arr []*T) []*T {
|
||||||
|
newArr := make([]*T, 0)
|
||||||
|
for _, item := range arr {
|
||||||
|
if item == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newArr = append(newArr, item)
|
||||||
|
}
|
||||||
|
return newArr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
|
||||||
|
files, err := dbGetBlockFiles(ctx, blockId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting block files: %v", err)
|
||||||
|
}
|
||||||
|
// now we wash the files through the cache
|
||||||
|
var hasNils bool
|
||||||
|
for idx, dbFile := range files {
|
||||||
|
cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name)
|
||||||
|
if ok {
|
||||||
|
if cacheFile == nil {
|
||||||
|
hasNils = true
|
||||||
|
}
|
||||||
|
files[idx] = cacheFile
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if hasNils {
|
||||||
|
files = stripNils(files)
|
||||||
|
}
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error {
|
||||||
|
file, ok := s.getFileFromCache(blockId, name)
|
||||||
|
if !ok {
|
||||||
|
dbFile, err := dbGetFile(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting file: %v", err)
|
||||||
|
}
|
||||||
|
file = dbFile
|
||||||
|
}
|
||||||
|
if file == nil {
|
||||||
|
return fmt.Errorf("file not found")
|
||||||
|
}
|
||||||
|
var rtnErr error
|
||||||
|
s.withLock(blockId, name, true, func(entry *CacheEntry) {
|
||||||
|
if entry.Deleted {
|
||||||
|
rtnErr = fmt.Errorf("file is deleted")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newFileEntry := entry.copyOrCreateFileEntry(file)
|
||||||
|
newFileEntry.File.Meta = meta
|
||||||
|
entry.FileEntry = newFileEntry
|
||||||
|
entry.FileEntry.File.ModTs = time.Now().UnixMilli()
|
||||||
|
entry.Version++
|
||||||
|
})
|
||||||
|
return rtnErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
||||||
|
file, ok := s.getFileFromCache(blockId, name)
|
||||||
|
if ok {
|
||||||
|
if file == nil {
|
||||||
|
return nil, fmt.Errorf("file not found")
|
||||||
|
}
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
dbFile, err := dbGetFile(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting file: %v", err)
|
||||||
|
}
|
||||||
|
if dbFile == nil {
|
||||||
|
return nil, fmt.Errorf("file not found")
|
||||||
|
}
|
||||||
|
var rtnErr error
|
||||||
|
rtnFile := dbFile
|
||||||
|
s.withLock(blockId, name, true, func(entry *CacheEntry) {
|
||||||
|
if entry.Deleted {
|
||||||
|
rtnFile = nil
|
||||||
|
rtnErr = fmt.Errorf("file is deleted")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if entry.FileEntry != nil {
|
||||||
|
// someone beat us to it
|
||||||
|
rtnFile = entry.FileEntry.File.DeepCopy()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.FileEntry = entry.copyOrCreateFileEntry(dbFile)
|
||||||
|
// returns dbFile, nil
|
||||||
|
})
|
||||||
|
return rtnFile, rtnErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BlockFile) getLastIncompletePartNum() int {
|
||||||
|
if f.Size%PartDataSize == 0 {
|
||||||
|
return NoPartIdx
|
||||||
|
}
|
||||||
|
return f.partIdxAtOffset(f.Size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *BlockFile) partIdxAtOffset(offset int64) int {
|
||||||
|
partIdx := int(offset / PartDataSize)
|
||||||
|
if f.Opts.Circular {
|
||||||
|
maxPart := int(f.Opts.MaxSize / PartDataSize)
|
||||||
|
partIdx = partIdx % maxPart
|
||||||
|
}
|
||||||
|
return partIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
// blockfile must be loaded
|
||||||
|
func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error {
|
||||||
|
var partIdx int
|
||||||
|
err := s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||||
|
partIdx = entry.FileEntry.File.getLastIncompletePartNum()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if partIdx == NoPartIdx {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s.loadDataParts(ctx, blockId, name, []int{partIdx})
|
||||||
|
}
|
||||||
|
|
||||||
|
func maxOfIntArr(arr []int) int {
|
||||||
|
if len(arr) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
max := arr[0]
|
||||||
|
for _, v := range arr[1:] {
|
||||||
|
if v > max {
|
||||||
|
max = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error {
|
||||||
|
partDataMap, err := dbGetFileParts(ctx, blockId, name, parts)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting file part: %v", err)
|
||||||
|
}
|
||||||
|
maxPart := maxOfIntArr(parts)
|
||||||
|
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||||
|
entry.ensurePart(maxPart, false)
|
||||||
|
for partIdx, partData := range partDataMap {
|
||||||
|
if entry.DataEntries[partIdx] != nil {
|
||||||
|
// someone beat us to it
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
entry.DataEntries[partIdx] = partData
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) {
|
||||||
|
endWrite := entry.FileEntry.File.Size + int64(len(data))
|
||||||
|
entry.writeAt(offset, data)
|
||||||
|
if endWrite > entry.FileEntry.File.Size {
|
||||||
|
entry.FileEntry.File.Size = endWrite
|
||||||
|
}
|
||||||
|
entry.FileEntry.File.ModTs = time.Now().UnixMilli()
|
||||||
|
entry.Version++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error {
|
||||||
|
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||||
|
s.writeAt_nolock(entry, entry.FileEntry.File.Size, data)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error {
|
||||||
|
s.pinCacheEntry(blockId, name)
|
||||||
|
defer s.unpinCacheEntry(blockId, name)
|
||||||
|
_, err := s.loadFileInfo(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading file info: %v", err)
|
||||||
|
}
|
||||||
|
err = s.loadLastDataBlock(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading last data block: %v", err)
|
||||||
|
}
|
||||||
|
err = s.appendDataToCache(blockId, name, data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error appending data: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) {
|
||||||
|
return dbGetAllBlockIds(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error {
|
||||||
|
s.pinCacheEntry(blockId, name)
|
||||||
|
defer s.unpinCacheEntry(blockId, name)
|
||||||
|
file, err := s.loadFileInfo(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading file info: %v", err)
|
||||||
|
}
|
||||||
|
startWriteIdx := offset
|
||||||
|
endWriteIdx := offset + int64(len(data))
|
||||||
|
startPartIdx := file.partIdxAtOffset(startWriteIdx)
|
||||||
|
endPartIdx := file.partIdxAtOffset(endWriteIdx)
|
||||||
|
err = s.loadDataParts(ctx, blockId, name, []int{startPartIdx, endPartIdx})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading data parts: %v", err)
|
||||||
|
}
|
||||||
|
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||||
|
s.writeAt_nolock(entry, offset, data)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns (offset, data, error)
|
||||||
|
// we return the offset because the offset may have been adjusted if the size was too big (for circular files)
|
||||||
|
func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) {
|
||||||
|
s.pinCacheEntry(blockId, name)
|
||||||
|
defer s.unpinCacheEntry(blockId, name)
|
||||||
|
file, err := s.Stat(ctx, blockId, name)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("error getting file: %v", err)
|
||||||
|
}
|
||||||
|
if file.Opts.Circular {
|
||||||
|
// we can do this check here because MaxSize for file cannot be modified
|
||||||
|
if size > file.Opts.MaxSize {
|
||||||
|
// just read the last maxsize bytes
|
||||||
|
sizeTooBig := size - file.Opts.MaxSize
|
||||||
|
offset += sizeTooBig
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var partsNeeded []int
|
||||||
|
lastPartOffset := (offset + size) % PartDataSize
|
||||||
|
endOffsetOfLastPart := offset + size - lastPartOffset + PartDataSize
|
||||||
|
for i := offset; i < endOffsetOfLastPart; i += PartDataSize {
|
||||||
|
partsNeeded = append(partsNeeded, file.partIdxAtOffset(i))
|
||||||
|
}
|
||||||
|
dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("error loading data parts: %v", err)
|
||||||
|
}
|
||||||
|
// wash the entries through the cache
|
||||||
|
err = s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||||
|
if offset+size > entry.FileEntry.File.Size {
|
||||||
|
// limit read to the actual size of the file
|
||||||
|
size = entry.FileEntry.File.Size - offset
|
||||||
|
}
|
||||||
|
for partIdx, _ := range dataEntries {
|
||||||
|
if entry.DataEntries[partIdx] != nil {
|
||||||
|
dataEntries[partIdx] = entry.DataEntries[partIdx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err)
|
||||||
|
}
|
||||||
|
// combine the entries into a single byte slice
|
||||||
|
// note that we only want part of the first and last part depending on offset and size
|
||||||
|
var rtn []byte
|
||||||
|
amtLeftToRead := size
|
||||||
|
curReadOffset := offset
|
||||||
|
for amtLeftToRead > 0 {
|
||||||
|
partIdx := file.partIdxAtOffset(curReadOffset)
|
||||||
|
partDataEntry := dataEntries[partIdx]
|
||||||
|
var partData []byte
|
||||||
|
if partDataEntry == nil {
|
||||||
|
partData = make([]byte, PartDataSize)
|
||||||
|
} else {
|
||||||
|
partData = partDataEntry.Data[0:PartDataSize]
|
||||||
|
}
|
||||||
|
partOffset := curReadOffset % PartDataSize
|
||||||
|
amtToRead := minInt64(PartDataSize-partOffset, amtLeftToRead)
|
||||||
|
rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...)
|
||||||
|
amtLeftToRead -= amtToRead
|
||||||
|
curReadOffset += amtToRead
|
||||||
|
}
|
||||||
|
return offset, rtn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func minInt64(a, b int64) int64 {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
194
pkg/blockstore/blockstore_cache.go
Normal file
194
pkg/blockstore/blockstore_cache.go
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
// Copyright 2024, Command Line Inc.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
package blockstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type cacheKey struct {
|
||||||
|
BlockId string
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
type DataCacheEntry struct {
|
||||||
|
Dirty *atomic.Bool
|
||||||
|
PartIdx int
|
||||||
|
Data []byte // capacity is always BlockDataPartSize
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileCacheEntry struct {
|
||||||
|
Dirty *atomic.Bool
|
||||||
|
File BlockFile
|
||||||
|
}
|
||||||
|
|
||||||
|
// invariants:
|
||||||
|
// - we only modify CacheEntry fields when we are holding the BlockStore lock
|
||||||
|
// - FileEntry can be nil, if pinned
|
||||||
|
// - FileEntry.File is never updated in place, the entire FileEntry is replaced
|
||||||
|
// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced
|
||||||
|
// - when pinned, the cache entry is never removed
|
||||||
|
// this allows us to flush the cache entry to disk without holding the lock
|
||||||
|
type CacheEntry struct {
|
||||||
|
BlockId string
|
||||||
|
Name string
|
||||||
|
Version int
|
||||||
|
PinCount int
|
||||||
|
Deleted bool
|
||||||
|
FileEntry *FileCacheEntry
|
||||||
|
DataEntries []*DataCacheEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry {
|
||||||
|
for len(e.DataEntries) <= partIdx {
|
||||||
|
e.DataEntries = append(e.DataEntries, nil)
|
||||||
|
}
|
||||||
|
if create && e.DataEntries[partIdx] == nil {
|
||||||
|
e.DataEntries[partIdx] = &DataCacheEntry{
|
||||||
|
PartIdx: partIdx,
|
||||||
|
Data: make([]byte, 0, PartDataSize),
|
||||||
|
Dirty: &atomic.Bool{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return e.DataEntries[partIdx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) int64 {
|
||||||
|
leftInPart := PartDataSize - offset
|
||||||
|
toWrite := int64(len(data))
|
||||||
|
if toWrite > leftInPart {
|
||||||
|
toWrite = leftInPart
|
||||||
|
}
|
||||||
|
if int64(len(dce.Data)) < offset+toWrite {
|
||||||
|
dce.Data = dce.Data[:offset+toWrite]
|
||||||
|
}
|
||||||
|
copy(dce.Data[offset:], data[:toWrite])
|
||||||
|
dce.Dirty.Store(true)
|
||||||
|
return toWrite
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry *CacheEntry) writeAt(offset int64, data []byte) {
|
||||||
|
for len(data) > 0 {
|
||||||
|
partIdx := int(offset / PartDataSize)
|
||||||
|
if entry.FileEntry.File.Opts.Circular {
|
||||||
|
maxPart := int(entry.FileEntry.File.Opts.MaxSize / PartDataSize)
|
||||||
|
partIdx = partIdx % maxPart
|
||||||
|
}
|
||||||
|
partOffset := offset % PartDataSize
|
||||||
|
partData := entry.ensurePart(partIdx, true)
|
||||||
|
nw := partData.writeToPart(partOffset, data)
|
||||||
|
data = data[nw:]
|
||||||
|
offset += nw
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlockStore struct {
|
||||||
|
Lock *sync.Mutex
|
||||||
|
Cache map[cacheKey]*CacheEntry
|
||||||
|
FlushTime time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
if shouldCreate {
|
||||||
|
entry = &CacheEntry{
|
||||||
|
BlockId: blockId,
|
||||||
|
Name: name,
|
||||||
|
PinCount: 0,
|
||||||
|
FileEntry: nil,
|
||||||
|
DataEntries: nil,
|
||||||
|
}
|
||||||
|
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) withLockExists(blockId string, name string, f func(*CacheEntry) error) error {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil || entry.Deleted || entry.FileEntry == nil {
|
||||||
|
return fmt.Errorf("file not found")
|
||||||
|
}
|
||||||
|
return f(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) pinCacheEntry(blockId string, name string) {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
entry = &CacheEntry{
|
||||||
|
BlockId: blockId,
|
||||||
|
Name: name,
|
||||||
|
PinCount: 0,
|
||||||
|
FileEntry: nil,
|
||||||
|
DataEntries: nil,
|
||||||
|
}
|
||||||
|
s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry
|
||||||
|
}
|
||||||
|
entry.PinCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) unpinCacheEntry(blockId string, name string) {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
// this is not good
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.PinCount--
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if entry.PinCount > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFileFromCache returns the file from the cache if it exists
|
||||||
|
// return (file, cached)
|
||||||
|
func (s *BlockStore) getFileFromCache(blockId string, name string) (*BlockFile, bool) {
|
||||||
|
s.Lock.Lock()
|
||||||
|
defer s.Lock.Unlock()
|
||||||
|
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||||
|
if entry == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if entry.Deleted {
|
||||||
|
return nil, true
|
||||||
|
}
|
||||||
|
if entry.FileEntry == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return entry.FileEntry.File.DeepCopy(), true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *CacheEntry) copyOrCreateFileEntry(dbFile *BlockFile) *FileCacheEntry {
|
||||||
|
if e.FileEntry == nil {
|
||||||
|
return &FileCacheEntry{
|
||||||
|
Dirty: &atomic.Bool{},
|
||||||
|
File: *dbFile,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &FileCacheEntry{
|
||||||
|
Dirty: &atomic.Bool{},
|
||||||
|
File: *e.FileEntry.File.DeepCopy(),
|
||||||
|
}
|
||||||
|
}
|
122
pkg/blockstore/blockstore_dbops.go
Normal file
122
pkg/blockstore/blockstore_dbops.go
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
package blockstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func dbInsertFile(ctx context.Context, file *BlockFile) error {
|
||||||
|
// will fail if file already exists
|
||||||
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
|
query := "INSERT INTO db_block_file (blockid, name, size, createdts, modts, opts, meta) VALUES (?, ?, ?, ?, ?, ?, ?)"
|
||||||
|
tx.Exec(query, file.BlockId, file.Name, file.Size, file.CreatedTs, file.ModTs, file.Opts, file.Meta)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbDeleteFile(ctx context.Context, blockId string, name string) error {
|
||||||
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
|
query := "DELETE FROM db_block_file WHERE blockid = ? AND name = ?"
|
||||||
|
tx.Exec(query, blockId, name)
|
||||||
|
query = "DELETE FROM db_block_data WHERE blockid = ? AND name = ?"
|
||||||
|
tx.Exec(query, blockId, name)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbGetBlockFileNames(ctx context.Context, blockId string) ([]string, error) {
|
||||||
|
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
|
||||||
|
var files []string
|
||||||
|
query := "SELECT name FROM db_block_file WHERE blockid = ?"
|
||||||
|
tx.Select(&files, query, blockId)
|
||||||
|
return files, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbDeleteBlock(ctx context.Context, blockId string) error {
|
||||||
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
|
query := "DELETE FROM db_block_file WHERE blockid = ?"
|
||||||
|
tx.Exec(query, blockId)
|
||||||
|
query = "DELETE FROM db_block_data WHERE blockid = ?"
|
||||||
|
tx.Exec(query, blockId)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbGetFile(ctx context.Context, blockId string, name string) (*BlockFile, error) {
|
||||||
|
return WithTxRtn(ctx, func(tx *TxWrap) (*BlockFile, error) {
|
||||||
|
var file BlockFile
|
||||||
|
query := "SELECT * FROM db_block_file WHERE blockid = ? AND name = ?"
|
||||||
|
tx.Get(&file, query, blockId, name)
|
||||||
|
return &file, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbGetAllBlockIds(ctx context.Context) ([]string, error) {
|
||||||
|
return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) {
|
||||||
|
var ids []string
|
||||||
|
query := "SELECT DISTINCT blockid FROM db_block_file"
|
||||||
|
tx.Select(&ids, query)
|
||||||
|
return ids, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbGetFileParts(ctx context.Context, blockId string, name string, parts []int) (map[int]*DataCacheEntry, error) {
|
||||||
|
return WithTxRtn(ctx, func(tx *TxWrap) (map[int]*DataCacheEntry, error) {
|
||||||
|
var data []*DataCacheEntry
|
||||||
|
query := "SELECT partidx, data FROM db_block_data WHERE blockid = ? AND name = ? AND partidx IN (SELECT value FROM json_each(?))"
|
||||||
|
tx.Select(&data, query, blockId, name, dbutil.QuickJsonArr(parts))
|
||||||
|
rtn := make(map[int]*DataCacheEntry)
|
||||||
|
for _, d := range data {
|
||||||
|
d.Dirty = &atomic.Bool{}
|
||||||
|
rtn[d.PartIdx] = d
|
||||||
|
}
|
||||||
|
return rtn, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbGetBlockFiles(ctx context.Context, blockId string) ([]*BlockFile, error) {
|
||||||
|
return WithTxRtn(ctx, func(tx *TxWrap) ([]*BlockFile, error) {
|
||||||
|
var files []*BlockFile
|
||||||
|
query := "SELECT * FROM db_block_file WHERE blockid = ?"
|
||||||
|
tx.Select(&files, query, blockId)
|
||||||
|
return files, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func dbWriteCacheEntry(ctx context.Context, fileEntry *FileCacheEntry, dataEntries []*DataCacheEntry) error {
|
||||||
|
if fileEntry == nil {
|
||||||
|
return fmt.Errorf("fileEntry or fileEntry.File is nil")
|
||||||
|
}
|
||||||
|
return WithTx(ctx, func(tx *TxWrap) error {
|
||||||
|
query := `SELECT blockid FROM db_block_file WHERE blockid = ? AND name = ?`
|
||||||
|
if !tx.Exists(query, fileEntry.File.BlockId, fileEntry.File.Name) {
|
||||||
|
// since deletion is synchronous this stops us from writing to a deleted file
|
||||||
|
return fmt.Errorf("file not found in db")
|
||||||
|
}
|
||||||
|
if fileEntry.Dirty.Load() {
|
||||||
|
query := `UPDATE db_block_file SET size = ?, createdts = ?, modts = ?, opts = ?, meta = ? WHERE blockid = ? AND name = ?`
|
||||||
|
tx.Exec(query, fileEntry.File.Size, fileEntry.File.CreatedTs, fileEntry.File.ModTs, fileEntry.File.Opts, fileEntry.File.Meta, fileEntry.File.BlockId, fileEntry.File.Name)
|
||||||
|
}
|
||||||
|
dataPartQuery := `REPLACE INTO db_block_data (blockid, name, partidx, data) VALUES (?, ?, ?, ?)`
|
||||||
|
for _, dataEntry := range dataEntries {
|
||||||
|
if dataEntry == nil || !dataEntry.Dirty.Load() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tx.Exec(dataPartQuery, fileEntry.File.BlockId, fileEntry.File.Name, dataEntry.PartIdx, dataEntry.Data)
|
||||||
|
}
|
||||||
|
if tx.Err == nil {
|
||||||
|
// clear dirty flags
|
||||||
|
fileEntry.Dirty.Store(false)
|
||||||
|
for _, dataEntry := range dataEntries {
|
||||||
|
if dataEntry != nil {
|
||||||
|
dataEntry.Dirty.Store(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user