diff --git a/pkg/blockstore/blockstore.go b/pkg/blockstore/blockstore.go index 849aff884..4a71f20fc 100644 --- a/pkg/blockstore/blockstore.go +++ b/pkg/blockstore/blockstore.go @@ -89,6 +89,25 @@ func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize } } + var cacheErr error + s.withLock(blockId, name, false, func(entry *CacheEntry) { + if entry == nil { + return + } + if !entry.Deleted { + cacheErr = fmt.Errorf("file exists") + return + } + // deleted is set. check intentions + if entry.PinCount == 0 && len(entry.WriteIntentions) == 0 { + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + return + } + cacheErr = fmt.Errorf("file is deleted but has active requests") + }) + if cacheErr != nil { + return cacheErr + } now := time.Now().UnixMilli() file := &BlockFile{ BlockId: blockId, @@ -111,7 +130,12 @@ func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string if entry == nil { return } - entry.Deleted = true + if entry.PinCount > 0 || len(entry.WriteIntentions) > 0 { + // mark as deleted if we have a active requests + entry.Deleted = true + } else { + delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + } }) return nil } @@ -342,6 +366,28 @@ func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { return dbGetAllBlockIds(ctx) } +// returns a map of partIdx to amount of data to write to that part +func (file *BlockFile) computePartMap(startOffset int64, size int64) map[int]int { + partMap := make(map[int]int) + endOffset := startOffset + size + startBlockOffset := startOffset - (startOffset % partDataSize) + for testOffset := startBlockOffset; testOffset < endOffset; testOffset += partDataSize { + partIdx := file.partIdxAtOffset(testOffset) + partStartOffset := testOffset + partEndOffset := testOffset + partDataSize + partWriteStartOffset := 0 + partWriteEndOffset := int(partDataSize) + if startOffset > partStartOffset && startOffset < partEndOffset { + partWriteStartOffset = int(startOffset - partStartOffset) + } + if endOffset > partStartOffset && endOffset < partEndOffset { + partWriteEndOffset = int(endOffset - partStartOffset) + } + partMap[partIdx] = partWriteEndOffset - partWriteStartOffset + } + return partMap +} + func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) diff --git a/pkg/blockstore/blockstore_cache.go b/pkg/blockstore/blockstore_cache.go index 757a92a8d..9a5665942 100644 --- a/pkg/blockstore/blockstore_cache.go +++ b/pkg/blockstore/blockstore_cache.go @@ -28,6 +28,11 @@ type FileCacheEntry struct { File BlockFile } +type WriteIntention struct { + Parts map[int]bool + Append bool +} + // invariants: // - we only modify CacheEntry fields when we are holding the BlockStore lock // - FileEntry can be nil, if pinned @@ -36,18 +41,20 @@ type FileCacheEntry struct { // - when pinned, the cache entry is never removed // this allows us to flush the cache entry to disk without holding the lock type CacheEntry struct { - BlockId string - Name string - Version int - PinCount int - Deleted bool - FileEntry *FileCacheEntry - DataEntries []*DataCacheEntry + BlockId string + Name string + Version int + PinCount int + Deleted bool + WriteIntentions map[string]*WriteIntention // map from intentionid -> WriteIntention + FileEntry *FileCacheEntry + DataEntries []*DataCacheEntry } +//lint:ignore U1000 used for testing func (e *CacheEntry) dump() string { var buf bytes.Buffer - fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, Version: %d, PinCount: %d, Deleted: %v\n", e.BlockId, e.Name, e.Version, e.PinCount, e.Deleted) + fmt.Fprintf(&buf, "CacheEntry{\nBlockId: %q, Name: %q, Version: %d, PinCount: %d, Deleted: %v, IW: %v\n", e.BlockId, e.Name, e.Version, e.PinCount, e.Deleted, e.WriteIntentions) if e.FileEntry != nil { fmt.Fprintf(&buf, "FileEntry: %v\n", e.FileEntry.File) } @@ -60,6 +67,7 @@ func (e *CacheEntry) dump() string { return buf.String() } +//lint:ignore U1000 used for testing func (s *BlockStore) dump() string { s.Lock.Lock() defer s.Lock.Unlock() @@ -153,19 +161,24 @@ type BlockStore struct { Cache map[cacheKey]*CacheEntry } +func makeCacheEntry(blockId string, name string) *CacheEntry { + return &CacheEntry{ + BlockId: blockId, + Name: name, + PinCount: 0, + WriteIntentions: make(map[string]*WriteIntention), + FileEntry: nil, + DataEntries: nil, + } +} + func (s *BlockStore) withLock(blockId string, name string, shouldCreate bool, f func(*CacheEntry)) { s.Lock.Lock() defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { if shouldCreate { - entry = &CacheEntry{ - BlockId: blockId, - Name: name, - PinCount: 0, - FileEntry: nil, - DataEntries: nil, - } + entry = makeCacheEntry(blockId, name) s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry } } @@ -187,13 +200,7 @@ func (s *BlockStore) pinCacheEntry(blockId string, name string) { defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { - entry = &CacheEntry{ - BlockId: blockId, - Name: name, - PinCount: 0, - FileEntry: nil, - DataEntries: nil, - } + entry = makeCacheEntry(blockId, name) s.Cache[cacheKey{BlockId: blockId, Name: name}] = entry } entry.PinCount++ @@ -210,17 +217,22 @@ func (s *BlockStore) unpinCacheEntry(blockId string, name string) { entry.PinCount-- } -func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) { +// returns true if the entry was deleted (or there is no cache entry) +func (s *BlockStore) tryDeleteCacheEntry(blockId string, name string) bool { s.Lock.Lock() defer s.Lock.Unlock() entry := s.Cache[cacheKey{BlockId: blockId, Name: name}] if entry == nil { - return + return true } if entry.PinCount > 0 { - return + return false + } + if len(entry.WriteIntentions) > 0 { + return false } delete(s.Cache, cacheKey{BlockId: blockId, Name: name}) + return true } // getFileFromCache returns the file from the cache if it exists diff --git a/pkg/blockstore/blockstore_test.go b/pkg/blockstore/blockstore_test.go index 090bc46b6..a25e18194 100644 --- a/pkg/blockstore/blockstore_test.go +++ b/pkg/blockstore/blockstore_test.go @@ -6,6 +6,7 @@ package blockstore import ( "bytes" "context" + "log" "testing" "time" @@ -76,6 +77,74 @@ func TestCreate(t *testing.T) { if file.Opts.Circular || file.Opts.IJson || file.Opts.MaxSize != 0 { t.Fatalf("opts not empty") } + err = GBS.DeleteFile(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error deleting file: %v", err) + } +} + +func containsFile(arr []*BlockFile, name string) bool { + for _, f := range arr { + if f.Name == name { + return true + } + } + return false +} + +func TestDelete(t *testing.T) { + initDb(t) + defer cleanupDb(t) + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + blockId := uuid.New().String() + err := GBS.MakeFile(ctx, blockId, "testfile", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.DeleteFile(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error deleting file: %v", err) + } + file, err := GBS.Stat(ctx, blockId, "testfile") + if err != nil { + t.Fatalf("error stating file: %v", err) + } + if file != nil { + t.Fatalf("file should not be found") + } + + // create two files in same block, use DeleteBlock to delete + err = GBS.MakeFile(ctx, blockId, "testfile1", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + err = GBS.MakeFile(ctx, blockId, "testfile2", nil, FileOptsType{}) + if err != nil { + t.Fatalf("error creating file: %v", err) + } + files, err := GBS.ListFiles(ctx, blockId) + if err != nil { + t.Fatalf("error listing files: %v", err) + } + if len(files) != 2 { + t.Fatalf("file count mismatch") + } + if !containsFile(files, "testfile1") || !containsFile(files, "testfile2") { + t.Fatalf("file names mismatch") + } + err = GBS.DeleteBlock(ctx, blockId) + if err != nil { + t.Fatalf("error deleting block: %v", err) + } + files, err = GBS.ListFiles(ctx, blockId) + if err != nil { + t.Fatalf("error listing files: %v", err) + } + if len(files) != 0 { + t.Fatalf("file count mismatch") + } } func checkMapsEqual(t *testing.T, m1 map[string]any, m2 map[string]any, msg string) { @@ -130,6 +199,12 @@ func TestSetMeta(t *testing.T) { t.Fatalf("file not found") } checkMapsEqual(t, map[string]any{"a": 6, "b": "hello", "c": "world", "d": 7}, file.Meta, "meta") + + err = GBS.WriteMeta(ctx, blockId, "testfile-notexist", map[string]any{"a": 6}, true) + if err == nil { + t.Fatalf("expected error setting meta") + } + err = nil } func checkFileSize(t *testing.T, ctx context.Context, blockId string, name string, size int64) { @@ -236,3 +311,47 @@ func TestMultiPart(t *testing.T) { checkFileDataAt(t, ctx, blockId, fileName, 49, "world") checkFileDataAt(t, ctx, blockId, fileName, 48, "8world4") } + +func testIntMapsEq(t *testing.T, msg string, m map[int]int, expected map[int]int) { + if len(m) != len(expected) { + t.Errorf("%s: map length mismatch got:%d expected:%d", msg, len(m), len(expected)) + return + } + for k, v := range m { + if expected[k] != v { + t.Errorf("%s: value mismatch for key %d, got:%d expected:%d", msg, k, v, expected[k]) + } + } +} + +func TestComputePartMap(t *testing.T) { + partDataSize = 100 + defer func() { + partDataSize = DefaultPartDataSize + }() + file := &BlockFile{} + m := file.computePartMap(0, 250) + testIntMapsEq(t, "map1", m, map[int]int{0: 100, 1: 100, 2: 50}) + m = file.computePartMap(110, 40) + log.Printf("map2:%#v\n", m) + testIntMapsEq(t, "map2", m, map[int]int{1: 40}) + m = file.computePartMap(110, 90) + testIntMapsEq(t, "map3", m, map[int]int{1: 90}) + m = file.computePartMap(110, 91) + testIntMapsEq(t, "map4", m, map[int]int{1: 90, 2: 1}) + m = file.computePartMap(820, 340) + testIntMapsEq(t, "map5", m, map[int]int{8: 80, 9: 100, 10: 100, 11: 60}) + + // now test circular + file = &BlockFile{Opts: FileOptsType{Circular: true, MaxSize: 1000}} + m = file.computePartMap(10, 250) + testIntMapsEq(t, "map6", m, map[int]int{0: 90, 1: 100, 2: 60}) + m = file.computePartMap(990, 40) + testIntMapsEq(t, "map7", m, map[int]int{9: 10, 0: 30}) + m = file.computePartMap(990, 130) + testIntMapsEq(t, "map8", m, map[int]int{9: 10, 0: 100, 1: 20}) + m = file.computePartMap(5, 1105) + testIntMapsEq(t, "map9", m, map[int]int{0: 100, 1: 10, 2: 100, 3: 100, 4: 100, 5: 100, 6: 100, 7: 100, 8: 100, 9: 100}) + m = file.computePartMap(2005, 1105) + testIntMapsEq(t, "map9", m, map[int]int{0: 100, 1: 10, 2: 100, 3: 100, 4: 100, 5: 100, 6: 100, 7: 100, 8: 100, 9: 100}) +}