// Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 package blockstore // the blockstore package implements a write cache for block files // it is not a read cache (reads still go to the DB -- unless items are in the cache) // but all writes only go to the cache, and then the cache is periodically flushed to the DB import ( "context" "fmt" "sync" "sync/atomic" "time" ) const DefaultPartDataSize = 64 * 1024 const DefaultFlushTime = 5 * time.Second const NoPartIdx = -1 var partDataSize int64 = DefaultPartDataSize // overridden in tests var stopFlush = &atomic.Bool{} var GBS *BlockStore = &BlockStore{ Lock: &sync.Mutex{}, Cache: make(map[cacheKey]*CacheEntry), } type FileOptsType struct { MaxSize int64 Circular bool IJson bool } type FileMeta = map[string]any type BlockFile struct { BlockId string `json:"blockid"` Name string `json:"name"` Size int64 `json:"size"` CreatedTs int64 `json:"createdts"` ModTs int64 `json:"modts"` Opts FileOptsType `json:"opts"` Meta FileMeta `json:"meta"` } func copyMeta(meta FileMeta) FileMeta { newMeta := make(FileMeta) for k, v := range meta { newMeta[k] = v } return newMeta } func (f *BlockFile) DeepCopy() *BlockFile { if f == nil { return nil } newFile := *f newFile.Meta = copyMeta(f.Meta) return &newFile } func (BlockFile) UseDBMap() {} type BlockData struct { BlockId string `json:"blockid"` Name string `json:"name"` PartIdx int `json:"partidx"` Data []byte `json:"data"` } func (BlockData) UseDBMap() {} // synchronous (does not interact with the cache) func (s *BlockStore) MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error { if opts.MaxSize < 0 { return fmt.Errorf("max size must be non-negative") } if opts.Circular && opts.MaxSize <= 0 { return fmt.Errorf("circular file must have a max size") } if opts.Circular && opts.IJson { return fmt.Errorf("circular file cannot be ijson") } if opts.Circular { if opts.MaxSize%partDataSize != 0 { opts.MaxSize = (opts.MaxSize/partDataSize + 1) * partDataSize } } now := time.Now().UnixMilli() file := &BlockFile{ BlockId: blockId, Name: name, Size: 0, CreatedTs: now, ModTs: now, Opts: opts, Meta: meta, } return dbInsertFile(ctx, file) } func (s *BlockStore) DeleteFile(ctx context.Context, blockId string, name string) error { err := dbDeleteFile(ctx, blockId, name) if err != nil { return fmt.Errorf("error deleting file: %v", err) } s.withLock(blockId, name, false, func(entry *CacheEntry) { if entry == nil { return } entry.Deleted = true }) return nil } func (s *BlockStore) DeleteBlock(ctx context.Context, blockId string) error { fileNames, err := dbGetBlockFileNames(ctx, blockId) if err != nil { return fmt.Errorf("error getting block files: %v", err) } for _, name := range fileNames { s.DeleteFile(ctx, blockId, name) } return nil } func (s *BlockStore) Stat(ctx context.Context, blockId string, name string) (*BlockFile, error) { file, ok := s.getFileFromCache(blockId, name) if ok { return file, nil } return dbGetBlockFile(ctx, blockId, name) } func stripNils[T any](arr []*T) []*T { newArr := make([]*T, 0) for _, item := range arr { if item == nil { continue } newArr = append(newArr, item) } return newArr } func (s *BlockStore) ListFiles(ctx context.Context, blockId string) ([]*BlockFile, error) { files, err := dbGetBlockFiles(ctx, blockId) if err != nil { return nil, fmt.Errorf("error getting block files: %v", err) } // now we wash the files through the cache var hasNils bool for idx, dbFile := range files { cacheFile, ok := s.getFileFromCache(dbFile.BlockId, dbFile.Name) if ok { if cacheFile == nil { hasNils = true } files[idx] = cacheFile } } if hasNils { files = stripNils(files) } return files, nil } func (s *BlockStore) WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta, merge bool) error { file, ok := s.getFileFromCache(blockId, name) if !ok { dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return fmt.Errorf("error getting file: %v", err) } file = dbFile } if file == nil { return fmt.Errorf("file not found") } var rtnErr error s.withLock(blockId, name, true, func(entry *CacheEntry) { if entry.Deleted { rtnErr = fmt.Errorf("file is deleted") return } newFileEntry := entry.copyOrCreateFileEntry(file) if merge { for k, v := range meta { if v == nil { delete(newFileEntry.File.Meta, k) continue } newFileEntry.File.Meta[k] = v } } else { newFileEntry.File.Meta = meta } entry.FileEntry = newFileEntry entry.FileEntry.File.ModTs = time.Now().UnixMilli() entry.Version++ }) return rtnErr } func (s *BlockStore) loadFileInfo(ctx context.Context, blockId string, name string) (*BlockFile, error) { file, ok := s.getFileFromCache(blockId, name) if ok { if file == nil { return nil, fmt.Errorf("file not found") } return file, nil } dbFile, err := dbGetBlockFile(ctx, blockId, name) if err != nil { return nil, fmt.Errorf("error getting file: %v", err) } if dbFile == nil { return nil, fmt.Errorf("file not found") } var rtnErr error rtnFile := dbFile s.withLock(blockId, name, true, func(entry *CacheEntry) { if entry.Deleted { rtnFile = nil rtnErr = fmt.Errorf("file is deleted") return } if entry.FileEntry != nil { // someone beat us to it rtnFile = entry.FileEntry.File.DeepCopy() return } entry.FileEntry = entry.copyOrCreateFileEntry(dbFile) // returns dbFile, nil }) return rtnFile, rtnErr } func (f *BlockFile) getLastIncompletePartNum() int { if f.Size%partDataSize == 0 { return NoPartIdx } return f.partIdxAtOffset(f.Size) } func (f *BlockFile) partIdxAtOffset(offset int64) int { partIdx := int(offset / partDataSize) if f.Opts.Circular { maxPart := int(f.Opts.MaxSize / partDataSize) partIdx = partIdx % maxPart } return partIdx } // blockfile must be loaded func (s *BlockStore) loadLastDataBlock(ctx context.Context, blockId string, name string) error { var partIdx int err := s.withLockExists(blockId, name, func(entry *CacheEntry) error { partIdx = entry.FileEntry.File.getLastIncompletePartNum() return nil }) if err != nil { return err } if partIdx == NoPartIdx { return nil } return s.loadDataParts(ctx, blockId, name, []int{partIdx}) } func maxOfIntArr(arr []int) int { if len(arr) == 0 { return 0 } max := arr[0] for _, v := range arr[1:] { if v > max { max = v } } return max } func (s *BlockStore) loadDataParts(ctx context.Context, blockId string, name string, parts []int) error { partDataMap, err := dbGetFileParts(ctx, blockId, name, parts) if err != nil { return fmt.Errorf("error getting file part: %v", err) } maxPart := maxOfIntArr(parts) return s.withLockExists(blockId, name, func(entry *CacheEntry) error { entry.ensurePart(maxPart, false) for partIdx, partData := range partDataMap { if entry.DataEntries[partIdx] != nil { // someone beat us to it continue } entry.DataEntries[partIdx] = partData } return nil }) } func (s *BlockStore) writeAt_nolock(entry *CacheEntry, offset int64, data []byte) { endWrite := offset + int64(len(data)) entry.writeAt(offset, data) if endWrite > entry.FileEntry.File.Size { entry.FileEntry.File.Size = endWrite } entry.FileEntry.File.ModTs = time.Now().UnixMilli() entry.Version++ } func (s *BlockStore) appendDataToCache(blockId string, name string, data []byte) error { return s.withLockExists(blockId, name, func(entry *CacheEntry) error { s.writeAt_nolock(entry, entry.FileEntry.File.Size, data) return nil }) } func (s *BlockStore) AppendData(ctx context.Context, blockId string, name string, data []byte) error { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) _, err := s.loadFileInfo(ctx, blockId, name) if err != nil { return fmt.Errorf("error loading file info: %v", err) } err = s.loadLastDataBlock(ctx, blockId, name) if err != nil { return fmt.Errorf("error loading last data block: %v", err) } err = s.appendDataToCache(blockId, name, data) if err != nil { return fmt.Errorf("error appending data: %v", err) } return nil } func (s *BlockStore) GetAllBlockIds(ctx context.Context) ([]string, error) { return dbGetAllBlockIds(ctx) } func (s *BlockStore) WriteAt(ctx context.Context, blockId string, name string, offset int64, data []byte) error { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) file, err := s.loadFileInfo(ctx, blockId, name) if err != nil { return fmt.Errorf("error loading file info: %v", err) } startWriteIdx := offset endWriteIdx := offset + int64(len(data)) startPartIdx := file.partIdxAtOffset(startWriteIdx) endPartIdx := file.partIdxAtOffset(endWriteIdx) err = s.loadDataParts(ctx, blockId, name, []int{startPartIdx, endPartIdx}) if err != nil { return fmt.Errorf("error loading data parts: %v", err) } return s.withLockExists(blockId, name, func(entry *CacheEntry) error { s.writeAt_nolock(entry, offset, data) return nil }) } // returns (offset, data, error) // we return the offset because the offset may have been adjusted if the size was too big (for circular files) func (s *BlockStore) ReadAt(ctx context.Context, blockId string, name string, offset int64, size int64) (int64, []byte, error) { s.pinCacheEntry(blockId, name) defer s.unpinCacheEntry(blockId, name) file, err := s.Stat(ctx, blockId, name) if err != nil { return 0, nil, fmt.Errorf("error getting file: %v", err) } if file.Opts.Circular { // we can do this check here because MaxSize for file cannot be modified if size > file.Opts.MaxSize { // just read the last maxsize bytes sizeTooBig := size - file.Opts.MaxSize offset += sizeTooBig } } var partsNeeded []int lastPartOffset := (offset + size) % partDataSize endOffsetOfLastPart := offset + size - lastPartOffset + partDataSize for i := offset; i < endOffsetOfLastPart; i += partDataSize { partsNeeded = append(partsNeeded, file.partIdxAtOffset(i)) } dataEntries, err := dbGetFileParts(ctx, blockId, name, partsNeeded) if err != nil { return 0, nil, fmt.Errorf("error loading data parts: %v", err) } // wash the entries through the cache err = s.withLockExists(blockId, name, func(entry *CacheEntry) error { if offset+size > entry.FileEntry.File.Size { // limit read to the actual size of the file size = entry.FileEntry.File.Size - offset } for _, partIdx := range partsNeeded { if len(entry.DataEntries) <= partIdx || entry.DataEntries[partIdx] == nil { continue } dataEntries[partIdx] = entry.DataEntries[partIdx] } return nil }) if err != nil { return 0, nil, fmt.Errorf("error reconciling cache entries: %v", err) } // combine the entries into a single byte slice // note that we only want part of the first and last part depending on offset and size var rtn []byte amtLeftToRead := size curReadOffset := offset for amtLeftToRead > 0 { partIdx := file.partIdxAtOffset(curReadOffset) partDataEntry := dataEntries[partIdx] var partData []byte if partDataEntry == nil { partData = make([]byte, partDataSize) } else { partData = partDataEntry.Data[0:partDataSize] } partOffset := curReadOffset % partDataSize amtToRead := minInt64(partDataSize-partOffset, amtLeftToRead) rtn = append(rtn, partData[partOffset:partOffset+amtToRead]...) amtLeftToRead -= amtToRead curReadOffset += amtToRead } return offset, rtn, nil } func (s *BlockStore) ReadFile(ctx context.Context, blockId string, name string) (int64, []byte, error) { file, err := s.Stat(ctx, blockId, name) if err != nil { return 0, nil, fmt.Errorf("error getting file: %v", err) } if file == nil { return 0, nil, fmt.Errorf("file not found") } return s.ReadAt(ctx, blockId, name, 0, file.Size) } func minInt64(a, b int64) int64 { if a < b { return a } return b }