concurrent stress test, fix some bugs

This commit is contained in:
sawka 2024-05-19 12:36:25 -07:00
parent 4e54b8a9e1
commit 56a75d9a6a
3 changed files with 63 additions and 0 deletions

View File

@ -321,6 +321,7 @@ func (s *BlockStore) getDirtyDataEntriesForFlush(blockId string, name string) (*
for _, data := range dirtyData { for _, data := range dirtyData {
data.Flushing.Store(true) data.Flushing.Store(true)
} }
entry.FileEntry.Flushing.Store(true)
return entry.FileEntry, dirtyData return entry.FileEntry, dirtyData
} }

View File

@ -61,6 +61,7 @@ func dbGetFileParts(ctx context.Context, blockId string, name string, parts []in
rtn := make(map[int]*DataCacheEntry) rtn := make(map[int]*DataCacheEntry)
for _, d := range data { for _, d := range data {
d.Dirty = &atomic.Bool{} d.Dirty = &atomic.Bool{}
d.Flushing = &atomic.Bool{}
if cap(d.Data) != int(partDataSize) { if cap(d.Data) != int(partDataSize) {
newData := make([]byte, len(d.Data), partDataSize) newData := make([]byte, len(d.Data), partDataSize)
copy(newData, d.Data) copy(newData, d.Data)

View File

@ -8,6 +8,7 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -259,6 +260,23 @@ func checkFileData(t *testing.T, ctx context.Context, blockId string, name strin
} }
} }
func checkFileByteCount(t *testing.T, ctx context.Context, blockId string, name string, val byte, expected int) {
_, rdata, err := GBS.ReadFile(ctx, blockId, name)
if err != nil {
t.Errorf("error reading data for file %q: %v", name, err)
return
}
var count int
for _, b := range rdata {
if b == val {
count++
}
}
if count != expected {
t.Errorf("byte count mismatch for file %q: expected %d, got %d", name, expected, count)
}
}
func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) { func checkFileDataAt(t *testing.T, ctx context.Context, blockId string, name string, offset int64, data string) {
_, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data))) _, rdata, err := GBS.ReadAt(ctx, blockId, name, offset, int64(len(data)))
if err != nil { if err != nil {
@ -536,4 +554,47 @@ func TestSimpleDBFlush(t *testing.T) {
} }
checkFileDataAt(t, ctx, blockId, fileName, 6, "world!") checkFileDataAt(t, ctx, blockId, fileName, 6, "world!")
checkFileSize(t, ctx, blockId, fileName, 12) checkFileSize(t, ctx, blockId, fileName, 12)
checkFileByteCount(t, ctx, blockId, fileName, 'l', 3)
}
func TestConcurrentAppend(t *testing.T) {
initDb(t)
defer cleanupDb(t)
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
blockId := uuid.New().String()
fileName := "t1"
err := GBS.MakeFile(ctx, blockId, fileName, nil, FileOptsType{})
if err != nil {
t.Fatalf("error creating file: %v", err)
}
var wg sync.WaitGroup
for i := 0; i < 16; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
const hexChars = "0123456789abcdef"
ch := hexChars[n]
for j := 0; j < 100; j++ {
err := GBS.AppendData(ctx, blockId, fileName, []byte{ch})
if err != nil {
t.Errorf("error appending data (%d): %v", n, err)
}
if j == 50 {
err = GBS.FlushCache(ctx)
if err != nil {
t.Errorf("error flushing cache: %v", err)
}
}
}
}(i)
}
wg.Wait()
checkFileSize(t, ctx, blockId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100)
GBS.FlushCache(ctx)
checkFileSize(t, ctx, blockId, fileName, 1600)
checkFileByteCount(t, ctx, blockId, fileName, 'a', 100)
checkFileByteCount(t, ctx, blockId, fileName, 'e', 100)
} }