mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-02 18:39:05 +01:00
checkpoint on blockstore
This commit is contained in:
parent
14772c8f61
commit
1695ec46b1
@ -10,6 +10,7 @@ package blockstore
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -19,6 +20,7 @@ const DefaultPartDataSize = 64 * 1024
|
||||
const DefaultFlushTime = 5 * time.Second
|
||||
const NoPartIdx = -1
|
||||
|
||||
var warningCount = &atomic.Int32{}
|
||||
var partDataSize int64 = DefaultPartDataSize // overridden in tests
|
||||
var stopFlush = &atomic.Bool{}
|
||||
|
||||
@ -311,9 +313,7 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting file part: %v", err)
|
||||
}
|
||||
maxPart := maxOfIntArr(parts)
|
||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||
entry.ensurePart(maxPart, false)
|
||||
for partIdx, partData := range partDataMap {
|
||||
if entry.DataEntries[partIdx] != nil {
|
||||
// someone beat us to it
|
||||
@ -325,20 +325,9 @@ func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name str
|
||||
})
|
||||
}
|
||||
|
||||
func (entry *CacheEntry) writeAtToCache(offset int64, data []byte, replace bool) {
|
||||
endWrite := offset + int64(len(data))
|
||||
entry.writeAt(offset, data, replace)
|
||||
entry.modifyFileData(func(file *BlockFile) {
|
||||
if endWrite > file.Size || replace {
|
||||
file.Size = endWrite
|
||||
}
|
||||
file.ModTs = time.Now().UnixMilli()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error {
|
||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||
entry.writeAtToCache(entry.FileEntry.File.Size, data, false)
|
||||
entry.writeAt(entry.FileEntry.File.Size, data, false)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@ -409,7 +398,7 @@ func (s *BlockStore) WriteFile(ctx context.Context, blockId string, name string,
|
||||
return fmt.Errorf("error loading file info: %v", err)
|
||||
}
|
||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||
entry.writeAtToCache(0, data, true)
|
||||
entry.writeAt(0, data, true)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@ -448,7 +437,7 @@ func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, o
|
||||
return fmt.Errorf("error loading data parts: %v", err)
|
||||
}
|
||||
return s.withLockExists(blockId, name, func(entry *CacheEntry) error {
|
||||
entry.writeAtToCache(offset, data, false)
|
||||
entry.writeAt(offset, data, false)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@ -531,22 +520,35 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string)
|
||||
return s.ReadAt(ctx, blockId, name, 0, file.Size)
|
||||
}
|
||||
|
||||
func (s *BlockStore) FlushCache(ctx context.Context) error {
|
||||
func (s *BlockStore) getDirtyCacheKeys() []cacheKey {
|
||||
var dirtyCacheKeys []cacheKey
|
||||
s.Lock.Lock()
|
||||
defer s.Lock.Unlock()
|
||||
for key, entry := range s.Cache {
|
||||
if entry.FileEntry != nil && entry.FileEntry.Dirty.Load() {
|
||||
dirtyCacheKeys = append(dirtyCacheKeys, key)
|
||||
continue
|
||||
}
|
||||
for _, dataEntry := range entry.DataEntries {
|
||||
if dataEntry != nil && dataEntry.Dirty.Load() {
|
||||
dirtyCacheKeys = append(dirtyCacheKeys, key)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
s.Lock.Unlock()
|
||||
return dirtyCacheKeys
|
||||
}
|
||||
|
||||
func (s *BlockStore) flushFile(ctx context.Context, blockId string, name string) error {
|
||||
// todo
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BlockStore) FlushCache(ctx context.Context) error {
|
||||
// get a copy of dirty keys so we can iterate without the lock
|
||||
dirtyCacheKeys := s.getDirtyCacheKeys()
|
||||
for _, key := range dirtyCacheKeys {
|
||||
err := s.flushFile(ctx, key.BlockId, key.Name)
|
||||
if err != nil {
|
||||
// if error is not transient, we should probably delete the offending entry :/
|
||||
log.Printf("error flushing file %s/%s: %v", key.BlockId, key.Name, err)
|
||||
continue
|
||||
}
|
||||
s.cleanCacheEntry(key.BlockId, key.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type cacheKey struct {
|
||||
@ -51,6 +52,7 @@ type WriteIntention struct {
|
||||
// - DataCacheEntry items are never updated in place, the entire DataCacheEntry is replaced
|
||||
// - when pinned, the cache entry is never removed
|
||||
// this allows us to flush the cache entry to disk without holding the lock
|
||||
// if there is a dirty data entry, then FileEntry must also be dirty
|
||||
type CacheEntry struct {
|
||||
BlockId string
|
||||
Name string
|
||||
@ -58,7 +60,7 @@ type CacheEntry struct {
|
||||
Deleted bool
|
||||
WriteIntentions map[int]WriteIntention // map from intentionid -> WriteIntention
|
||||
FileEntry *FileCacheEntry
|
||||
DataEntries []*DataCacheEntry
|
||||
DataEntries map[int]*DataCacheEntry
|
||||
}
|
||||
|
||||
//lint:ignore U1000 used for testing
|
||||
@ -68,10 +70,8 @@ func (e *CacheEntry) dump() string {
|
||||
if e.FileEntry != nil {
|
||||
fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File)
|
||||
}
|
||||
for i, dce := range e.DataEntries {
|
||||
if dce != nil {
|
||||
fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", i, dce.Dirty.Load(), string(dce.Data))
|
||||
}
|
||||
for idx, dce := range e.DataEntries {
|
||||
fmt.Fprintf(&buf, "DataEntry[%d][%v]: %q\n", idx, dce.Dirty.Load(), string(dce.Data))
|
||||
}
|
||||
buf.WriteString("}\n")
|
||||
return buf.String()
|
||||
@ -114,11 +114,8 @@ func (s *BlockStore) clearCache() {
|
||||
s.Cache = make(map[cacheKey]*CacheEntry)
|
||||
}
|
||||
|
||||
func (e *CacheEntry) ensurePart(partIdx int, create bool) *DataCacheEntry {
|
||||
for len(e.DataEntries) <= partIdx {
|
||||
e.DataEntries = append(e.DataEntries, nil)
|
||||
}
|
||||
if create && e.DataEntries[partIdx] == nil {
|
||||
func (e *CacheEntry) getOrCreateDataCacheEntry(partIdx int) *DataCacheEntry {
|
||||
if e.DataEntries[partIdx] == nil {
|
||||
e.DataEntries[partIdx] = makeDataCacheEntry(partIdx)
|
||||
}
|
||||
return e.DataEntries[partIdx]
|
||||
@ -151,8 +148,9 @@ func (dce *DataCacheEntry) writeToPart(offset int64, data []byte) (int64, *DataC
|
||||
}
|
||||
|
||||
func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
||||
endWriteOffset := offset + int64(len(data))
|
||||
if replace {
|
||||
entry.DataEntries = nil
|
||||
entry.DataEntries = make(map[int]*DataCacheEntry)
|
||||
}
|
||||
for len(data) > 0 {
|
||||
partIdx := int(offset / partDataSize)
|
||||
@ -161,12 +159,17 @@ func (entry *CacheEntry) writeAt(offset int64, data []byte, replace bool) {
|
||||
partIdx = partIdx % maxPart
|
||||
}
|
||||
partOffset := offset % partDataSize
|
||||
partData := entry.ensurePart(partIdx, true)
|
||||
partData := entry.getOrCreateDataCacheEntry(partIdx)
|
||||
nw, newDce := partData.writeToPart(partOffset, data)
|
||||
entry.DataEntries[partIdx] = newDce
|
||||
data = data[nw:]
|
||||
offset += nw
|
||||
}
|
||||
entry.modifyFileData(func(file *BlockFile) {
|
||||
if endWriteOffset > file.Size || replace {
|
||||
file.Size = endWriteOffset
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type BlockStore struct {
|
||||
@ -182,7 +185,7 @@ func makeCacheEntry(blockId string, name string) *CacheEntry {
|
||||
PinCount: 0,
|
||||
WriteIntentions: make(map[int]WriteIntention),
|
||||
FileEntry: nil,
|
||||
DataEntries: nil,
|
||||
DataEntries: make(map[int]*DataCacheEntry),
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,6 +241,7 @@ func (s *BlockStore) clearWriteIntention(blockId string, name string, intentionI
|
||||
defer s.Lock.Unlock()
|
||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||
if entry == nil {
|
||||
warningCount.Add(1)
|
||||
log.Printf("warning: cannot find write intention to clear %q %q", blockId, name)
|
||||
return
|
||||
}
|
||||
@ -249,30 +253,13 @@ func (s *BlockStore) unpinCacheEntry(blockId string, name string) {
|
||||
defer s.Lock.Unlock()
|
||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||
if entry == nil {
|
||||
warningCount.Add(1)
|
||||
log.Printf("warning: unpinning non-existent cache entry %q %q", blockId, name)
|
||||
return
|
||||
}
|
||||
entry.PinCount--
|
||||
}
|
||||
|
||||
// returns true if the entry was deleted (or there is no cache entry)
|
||||
func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) bool {
|
||||
s.Lock.Lock()
|
||||
defer s.Lock.Unlock()
|
||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||
if entry == nil {
|
||||
return true
|
||||
}
|
||||
if entry.PinCount > 0 {
|
||||
return false
|
||||
}
|
||||
if len(entry.WriteIntentions) > 0 {
|
||||
return false
|
||||
}
|
||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
||||
return true
|
||||
}
|
||||
|
||||
// getFileFromCache returns the file from the cache if it exists
|
||||
// makes a copy, so it can be used by the caller
|
||||
// return (file, cached)
|
||||
@ -305,6 +292,7 @@ func (e *CacheEntry) modifyFileData(fn func(*BlockFile)) {
|
||||
}
|
||||
// always set to dirty (we're modifying it)
|
||||
fileEntry.Dirty.Store(true)
|
||||
fileEntry.File.ModTs = time.Now().UnixMilli()
|
||||
fn(&fileEntry.File)
|
||||
}
|
||||
|
||||
@ -335,7 +323,6 @@ func (s *BlockStore) getDirtyDataEntries(entry *CacheEntry) (*FileCacheEntry, []
|
||||
func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error {
|
||||
fileEntry, dirtyData := s.getDirtyDataEntries(entry)
|
||||
if fileEntry == nil && len(dirtyData) == 0 {
|
||||
s.tryDeleteCacheEntry(entry.BlockId, entry.Name)
|
||||
return nil
|
||||
}
|
||||
err := dbWriteCacheEntry(ctx, fileEntry, dirtyData)
|
||||
@ -344,3 +331,51 @@ func (s *BlockStore) flushEntry(ctx context.Context, entry *CacheEntry) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (entry *CacheEntry) isDataBlockPinned(partIdx int) bool {
|
||||
if entry.FileEntry == nil {
|
||||
warningCount.Add(1)
|
||||
log.Printf("warning: checking pinned, but no FileEntry %q %q", entry.BlockId, entry.Name)
|
||||
return false
|
||||
}
|
||||
lastIncomplete := entry.FileEntry.File.getLastIncompletePartNum()
|
||||
for _, intention := range entry.WriteIntentions {
|
||||
if intention.Append && partIdx == lastIncomplete {
|
||||
return true
|
||||
}
|
||||
if intention.Parts[partIdx] > 0 {
|
||||
return true
|
||||
}
|
||||
// note "replace" does not pin anything
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// removes clean data entries (if they aren't pinned)
|
||||
// and if the entire cache entry is not in use and is clean, removes the entry
|
||||
func (s *BlockStore) cleanCacheEntry(blockId string, name string) {
|
||||
s.Lock.Lock()
|
||||
defer s.Lock.Unlock()
|
||||
entry := s.Cache[cacheKey{BlockId: blockId, Name: name}]
|
||||
if entry == nil {
|
||||
return
|
||||
}
|
||||
var hasDirtyData bool
|
||||
for _, dce := range entry.DataEntries {
|
||||
if dce.Flushing.Load() || dce.Dirty.Load() || entry.isDataBlockPinned(dce.PartIdx) {
|
||||
hasDirtyData = true
|
||||
continue
|
||||
}
|
||||
delete(entry.DataEntries, dce.PartIdx)
|
||||
}
|
||||
if hasDirtyData || (entry.FileEntry != nil && entry.FileEntry.Dirty.Load()) {
|
||||
return
|
||||
}
|
||||
if entry.PinCount > 0 {
|
||||
return
|
||||
}
|
||||
if len(entry.WriteIntentions) > 0 {
|
||||
return
|
||||
}
|
||||
delete(s.Cache, cacheKey{BlockId: blockId, Name: name})
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -18,6 +19,7 @@ func initDb(t *testing.T) {
|
||||
t.Logf("initializing db for %q", t.Name())
|
||||
useTestingDb = true
|
||||
partDataSize = 50
|
||||
warningCount = &atomic.Int32{}
|
||||
stopFlush.Store(true)
|
||||
err := InitBlockstore()
|
||||
if err != nil {
|
||||
@ -34,6 +36,9 @@ func cleanupDb(t *testing.T) {
|
||||
useTestingDb = false
|
||||
partDataSize = DefaultPartDataSize
|
||||
GBS.clearCache()
|
||||
if warningCount.Load() > 0 {
|
||||
t.Errorf("warning count: %d", warningCount.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user