hook up blockstore flusher

This commit is contained in:
sawka 2024-05-28 18:27:38 -07:00
parent 7a54b79bda
commit 333a979529
3 changed files with 48 additions and 6 deletions

View File

@ -11,6 +11,8 @@ import (
"context" "context"
"fmt" "fmt"
"io/fs" "io/fs"
"log"
"runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -265,28 +267,40 @@ func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string)
return return
} }
func (s *BlockStore) FlushCache(ctx context.Context) error { type FlushStats struct {
FlushDuration time.Duration
NumDirtyEntries int
NumCommitted int
}
func (s *BlockStore) FlushCache(ctx context.Context) (stats FlushStats, rtnErr error) {
wasFlushing := s.setUnlessFlushing() wasFlushing := s.setUnlessFlushing()
if wasFlushing { if wasFlushing {
return fmt.Errorf("flush already in progress") return stats, fmt.Errorf("flush already in progress")
} }
defer s.setIsFlushing(false) defer s.setIsFlushing(false)
startTime := time.Now()
defer func() {
stats.FlushDuration = time.Since(startTime)
}()
// get a copy of dirty keys so we can iterate without the lock // get a copy of dirty keys so we can iterate without the lock
dirtyCacheKeys := s.getDirtyCacheKeys() dirtyCacheKeys := s.getDirtyCacheKeys()
stats.NumDirtyEntries = len(dirtyCacheKeys)
for _, key := range dirtyCacheKeys { for _, key := range dirtyCacheKeys {
err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error { err := withLock(s, key.BlockId, key.Name, func(entry *CacheEntry) error {
return entry.flushToDB(ctx, false) return entry.flushToDB(ctx, false)
}) })
if ctx.Err() != nil { if ctx.Err() != nil {
// transient error (also must stop the loop) // transient error (also must stop the loop)
return ctx.Err() return stats, ctx.Err()
} }
if err != nil { if err != nil {
return fmt.Errorf("error flushing cache entry[%v]: %v", key, err) return stats, fmt.Errorf("error flushing cache entry[%v]: %v", key, err)
} }
stats.NumCommitted++
} }
return nil return stats, nil
} }
/////////////////////////////////// ///////////////////////////////////
@ -367,7 +381,32 @@ func (s *BlockStore) setUnlessFlushing() bool {
} }
s.IsFlushing = true s.IsFlushing = true
return false return false
}
func (s *BlockStore) runFlushWithNewContext() (FlushStats, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultFlushTime)
defer cancelFn()
return s.FlushCache(ctx)
}
func (s *BlockStore) runFlusher() {
defer func() {
if r := recover(); r != nil {
log.Printf("panic in blockstore flusher: %v\n", r)
debug.PrintStack()
}
}()
for {
stats, err := s.runFlushWithNewContext()
if err != nil || stats.NumDirtyEntries > 0 {
log.Printf("blockstore flush: %d/%d entries flushed, err:%v\n", stats.NumCommitted, stats.NumDirtyEntries, err)
}
if stopFlush.Load() {
log.Printf("blockstore flusher stopping\n")
return
}
time.Sleep(DefaultFlushTime)
}
} }
func minInt64(a, b int64) int64 { func minInt64(a, b int64) int64 {

View File

@ -42,6 +42,9 @@ func InitBlockstore() error {
if err != nil { if err != nil {
return err return err
} }
if !stopFlush.Load() {
go GBS.runFlusher()
}
log.Printf("blockstore initialized\n") log.Printf("blockstore initialized\n")
return nil return nil
} }

View File

@ -565,7 +565,7 @@ func TestSimpleDBFlush(t *testing.T) {
t.Fatalf("error writing data: %v", err) t.Fatalf("error writing data: %v", err)
} }
checkFileData(t, ctx, blockId, fileName, "hello world!") checkFileData(t, ctx, blockId, fileName, "hello world!")
err = GBS.FlushCache(ctx) _, err = GBS.FlushCache(ctx)
if err != nil { if err != nil {
t.Fatalf("error flushing cache: %v", err) t.Fatalf("error flushing cache: %v", err)
} }