diff --git a/wavesrv/go.mod b/wavesrv/go.mod index e95b03fae..cf02a32f0 100644 --- a/wavesrv/go.mod +++ b/wavesrv/go.mod @@ -5,6 +5,7 @@ go 1.22 toolchain go1.22.0 require ( + github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 github.com/alessio/shellescape v1.4.1 github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 github.com/creack/pty v1.1.18 diff --git a/wavesrv/go.sum b/wavesrv/go.sum index ca70668e5..1ecce907b 100644 --- a/wavesrv/go.sum +++ b/wavesrv/go.sum @@ -1,3 +1,5 @@ +github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs= +github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 h1:7Ip0wMmLHLRJdrloDxZfhMm0xrLXZS8+COSu2bXmEQs= @@ -55,6 +57,7 @@ github.com/sawka/txwrap v0.1.2 h1:v8xS0Z1LE7/6vMZA81PYihI+0TSR6Zm1MalzzBIuXKc= github.com/sawka/txwrap v0.1.2/go.mod h1:T3nlw2gVpuolo6/XEetvBbk1oMXnY978YmBFy1UyHvw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/wavetermdev/ssh_config v0.0.0-20240306041034-17e2087ebde2 h1:onqZrJVap1sm15AiIGTfWzdr6cEF0KdtddeuuOVhzyY= @@ -71,6 +74,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= mvdan.cc/sh/v3 v3.7.0 h1:lSTjdP/1xsddtaKfGg7Myu7DnlHItd3/M2tomOcNNBg= diff --git a/wavesrv/pkg/blockstore/blockstore.go b/wavesrv/pkg/blockstore/blockstore.go new file mode 100644 index 000000000..ff1f1286d --- /dev/null +++ b/wavesrv/pkg/blockstore/blockstore.go @@ -0,0 +1,623 @@ +package blockstore + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math" + "os" + "strings" + "sync" + "time" + + "github.com/alecthomas/units" +) + +type FileOptsType struct { + MaxSize int64 + Circular bool + IJson bool +} + +type FileMeta = map[string]any + +type FileInfo struct { + BlockId string + Name string + Size int64 + CreatedTs int64 + ModTs int64 + Opts FileOptsType + Meta FileMeta +} + +const MaxBlockSize = int64(128 * units.Kilobyte) +const DefaultFlushTimeout = 1 * time.Second + +type CacheEntry struct { + Lock *sync.Mutex + CacheTs int64 + Info *FileInfo + DataBlocks []*CacheBlock + Refs int64 +} + +func (c *CacheEntry) IncRefs() { + c.Refs += 1 +} + +func (c *CacheEntry) DecRefs() { + c.Refs -= 1 +} + +type CacheBlock struct { + data []byte + size int + dirty bool +} + +func MakeCacheEntry(info *FileInfo) *CacheEntry { + rtn := &CacheEntry{Lock: &sync.Mutex{}, CacheTs: int64(time.Now().UnixMilli()), Info: info, DataBlocks: []*CacheBlock{}, Refs: 0} + return rtn +} + +// add ctx context.Context to all these methods +type BlockStore interface { + MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error + WriteFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType, data []byte) (int, error) + AppendData(ctx context.Context, blockId string, name string, p []byte) (int, error) + WriteAt(ctx context.Context, blockId string, name string, p []byte, off int64) (int, error) + ReadAt(ctx context.Context, blockId string, name string, p *[]byte, off int64) (int, error) + Stat(ctx context.Context, blockId string, name string) (FileInfo, error) + CollapseIJson(ctx context.Context, blockId string, name string) error + WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error + DeleteFile(ctx context.Context, blockId string, name string) error + DeleteBlock(ctx context.Context, blockId string) error + ListFiles(ctx context.Context, blockId string) []*FileInfo + FlushCache(ctx context.Context) error + GetAllBlockIds(ctx context.Context) []string +} + +var cache map[string]*CacheEntry = make(map[string]*CacheEntry) +var globalLock *sync.Mutex = &sync.Mutex{} +var appendLock *sync.Mutex = &sync.Mutex{} +var flushTimeout = DefaultFlushTimeout +var lastWriteTime time.Time + +func InsertFileIntoDB(ctx context.Context, fileInfo FileInfo) error { + metaJson, err := json.Marshal(fileInfo.Meta) + if err != nil { + return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, err) + } + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `INSERT INTO block_file VALUES (?, ?, ?, ?, ?, ?, ?, ?)` + tx.Exec(query, fileInfo.BlockId, fileInfo.Name, fileInfo.Opts.MaxSize, fileInfo.Opts.Circular, fileInfo.Size, fileInfo.CreatedTs, fileInfo.ModTs, metaJson) + return nil + }) + if txErr != nil { + return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, txErr) + } + return nil +} + +func WriteFileToDB(ctx context.Context, fileInfo FileInfo) error { + metaJson, err := json.Marshal(fileInfo.Meta) + if err != nil { + return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, err) + } + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `UPDATE block_file SET blockid = ?, name = ?, maxsize = ?, circular = ?, size = ?, createdts = ?, modts = ?, meta = ? where blockid = ? and name = ?` + tx.Exec(query, fileInfo.BlockId, fileInfo.Name, fileInfo.Opts.MaxSize, fileInfo.Opts.Circular, fileInfo.Size, fileInfo.CreatedTs, fileInfo.ModTs, metaJson, fileInfo.BlockId, fileInfo.Name) + return nil + }) + if txErr != nil { + return fmt.Errorf("Error writing file %s to db: %v", fileInfo.Name, txErr) + } + return nil + +} + +func WriteDataBlockToDB(ctx context.Context, blockId string, name string, index int, data []byte) error { + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `REPLACE INTO block_data values (?, ?, ?, ?)` + tx.Exec(query, blockId, name, index, data) + return nil + }) + if txErr != nil { + return fmt.Errorf("Error writing data block to db: %v", txErr) + } + return nil +} + +func MakeFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType) error { + curTs := time.Now().UnixMilli() + fileInfo := FileInfo{BlockId: blockId, Name: name, Size: 0, CreatedTs: curTs, ModTs: curTs, Opts: opts, Meta: meta} + err := InsertFileIntoDB(ctx, fileInfo) + if err != nil { + return err + } + curCacheEntry := MakeCacheEntry(&fileInfo) + SetCacheEntry(ctx, GetCacheId(blockId, name), curCacheEntry) + return nil +} + +func WriteToCacheBlockNum(ctx context.Context, blockId string, name string, p []byte, pos int, length int, cacheNum int, pullFromDB bool) (int64, int, error) { + cacheEntry, err := GetCacheEntryOrPopulate(ctx, blockId, name) + if err != nil { + return 0, 0, err + } + cacheEntry.IncRefs() + cacheEntry.Lock.Lock() + defer cacheEntry.Lock.Unlock() + block, err := GetCacheBlock(ctx, blockId, name, cacheNum, pullFromDB) + if err != nil { + return 0, 0, fmt.Errorf("Error getting cache block: %v", err) + } + var bytesWritten = 0 + blockLen := len(block.data) + fileMaxSize := cacheEntry.Info.Opts.MaxSize + maxWriteSize := fileMaxSize - (int64(cacheNum) * MaxBlockSize) + numLeftPad := int64(0) + if pos > blockLen { + numLeftPad = int64(pos - blockLen) + leftPadBytes := []byte{} + for index := 0; index < int(numLeftPad); index++ { + leftPadBytes = append(leftPadBytes, 0) + } + leftPadPos := int64(pos) - numLeftPad + b, err := WriteToCacheBuf(&block.data, leftPadBytes, int(leftPadPos), int(numLeftPad), maxWriteSize) + if err != nil { + return int64(b), b, err + } + numLeftPad = int64(b) + cacheEntry.Info.Size += (int64(cacheNum) * MaxBlockSize) + } + b, writeErr := WriteToCacheBuf(&block.data, p, pos, length, maxWriteSize) + bytesWritten += b + blockLenDiff := len(block.data) - blockLen + block.size = len(block.data) + cacheEntry.Info.Size += int64(blockLenDiff) + block.dirty = true + cacheEntry.DecRefs() + return numLeftPad, bytesWritten, writeErr +} + +func ReadFromCacheBlock(ctx context.Context, blockId string, name string, block *CacheBlock, p *[]byte, pos int, length int, destOffset int, maxRead int64) (int, error) { + defer func() { + if r := recover(); r != nil { + log.Printf("recovered from crash %v ", r) + log.Printf("values: %v %v %v %v %v %v", pos, length, destOffset, maxRead, p, block) + os.Exit(0) + } + }() + if pos > len(block.data) { + return 0, fmt.Errorf("Reading past end of cache block, should never happen") + } + bytesWritten := 0 + index := pos + for ; index < length+pos; index++ { + if int64(index) >= maxRead { + return index - pos, fmt.Errorf(MaxSizeError) + } + if index >= len(block.data) { + return bytesWritten, nil + } + destIndex := index - pos + destOffset + if destIndex >= len(*p) { + return bytesWritten, nil + } + (*p)[destIndex] = block.data[index] + bytesWritten++ + } + if int64(index) >= maxRead { + return bytesWritten, fmt.Errorf(MaxSizeError) + } + return bytesWritten, nil +} + +const MaxSizeError = "Hit Max Size" + +func WriteToCacheBuf(buf *[]byte, p []byte, pos int, length int, maxWrite int64) (int, error) { + bytesToWrite := length + if pos > len(*buf) { + return 0, fmt.Errorf("writing to a position (%v) in the cache that doesn't exist yet, something went wrong", pos) + } + if int64(pos+bytesToWrite) > MaxBlockSize { + return 0, fmt.Errorf("writing more bytes than max block size, not allowed - length of bytes to write: %v, length of cache: %v", bytesToWrite, len(*buf)) + } + for index := pos; index < bytesToWrite+pos; index++ { + if index-pos >= len(p) { + return len(p), nil + } + if int64(index) >= maxWrite { + return index - pos, fmt.Errorf(MaxSizeError) + } + curByte := p[index-pos] + if len(*buf) == index { + *buf = append(*buf, curByte) + } else { + (*buf)[index] = curByte + } + } + return bytesToWrite, nil +} + +func GetCacheId(blockId string, name string) string { + return blockId + "~SEP~" + name +} + +func GetValuesFromCacheId(cacheId string) (blockId string, name string) { + vals := strings.Split(cacheId, "~SEP~") + if len(vals) == 2 { + return vals[0], vals[1] + } else { + log.Println("Failure in GetValuesFromCacheId, this should never happen") + return "", "" + } +} + +func GetCacheEntry(ctx context.Context, blockId string, name string) (*CacheEntry, bool) { + globalLock.Lock() + defer globalLock.Unlock() + if curCacheEntry, found := cache[GetCacheId(blockId, name)]; found { + return curCacheEntry, true + } else { + return nil, false + } +} + +func GetCacheEntryOrPopulate(ctx context.Context, blockId string, name string) (*CacheEntry, error) { + if cacheEntry, found := GetCacheEntry(ctx, blockId, name); found { + return cacheEntry, nil + } else { + log.Printf("populating cache entry\n") + _, err := Stat(ctx, blockId, name) + if err != nil { + return nil, err + } + if cacheEntry, found := GetCacheEntry(ctx, blockId, name); found { + return cacheEntry, nil + } else { + return nil, fmt.Errorf("Error getting cache entry %v %v", blockId, name) + } + } + +} + +func SetCacheEntry(ctx context.Context, cacheId string, cacheEntry *CacheEntry) { + globalLock.Lock() + defer globalLock.Unlock() + if _, found := cache[cacheId]; found { + return + } + cache[cacheId] = cacheEntry +} + +func DeleteCacheEntry(ctx context.Context, blockId string, name string) { + globalLock.Lock() + defer globalLock.Unlock() + delete(cache, GetCacheId(blockId, name)) +} + +func GetCacheBlock(ctx context.Context, blockId string, name string, cacheNum int, pullFromDB bool) (*CacheBlock, error) { + curCacheEntry, err := GetCacheEntryOrPopulate(ctx, blockId, name) + if err != nil { + return nil, err + } + if len(curCacheEntry.DataBlocks) < cacheNum+1 { + for index := len(curCacheEntry.DataBlocks); index < cacheNum+1; index++ { + curCacheEntry.DataBlocks = append(curCacheEntry.DataBlocks, nil) + } + } + if curCacheEntry.DataBlocks[cacheNum] == nil { + var curCacheBlock *CacheBlock + if pullFromDB { + cacheData, err := GetCacheFromDB(ctx, blockId, name, 0, MaxBlockSize, int64(cacheNum)) + if err != nil { + return nil, err + } + curCacheBlock = &CacheBlock{data: *cacheData, size: len(*cacheData), dirty: false} + curCacheEntry.DataBlocks[cacheNum] = curCacheBlock + } else { + curCacheBlock = &CacheBlock{data: []byte{}, size: 0, dirty: false} + curCacheEntry.DataBlocks[cacheNum] = curCacheBlock + } + return curCacheBlock, nil + } else { + return curCacheEntry.DataBlocks[cacheNum], nil + } +} + +func DeepCopyFileInfo(fInfo *FileInfo) *FileInfo { + fInfoMeta := make(FileMeta) + for k, v := range fInfo.Meta { + fInfoMeta[k] = v + } + fInfoOpts := fInfo.Opts + fInfoCopy := &FileInfo{BlockId: fInfo.BlockId, Name: fInfo.Name, Size: fInfo.Size, CreatedTs: fInfo.CreatedTs, ModTs: fInfo.ModTs, Opts: fInfoOpts, Meta: fInfoMeta} + return fInfoCopy +} + +func Stat(ctx context.Context, blockId string, name string) (*FileInfo, error) { + cacheEntry, found := GetCacheEntry(ctx, blockId, name) + if found { + return DeepCopyFileInfo(cacheEntry.Info), nil + } + curCacheEntry := MakeCacheEntry(nil) + curCacheEntry.Lock.Lock() + defer curCacheEntry.Lock.Unlock() + fInfo, err := GetFileInfo(ctx, blockId, name) + if err != nil { + return nil, err + } + curCacheEntry.Info = fInfo + SetCacheEntry(ctx, GetCacheId(blockId, name), curCacheEntry) + return DeepCopyFileInfo(fInfo), nil +} + +func SetFlushTimeout(newTimeout time.Duration) { + flushTimeout = newTimeout +} + +func GetClockString(t time.Time) string { + hour, min, sec := t.Clock() + return fmt.Sprintf("%v:%v:%v", hour, min, sec) +} + +func StartFlushTimer(ctx context.Context) { + curTime := time.Now() + writeTimePassed := curTime.UnixNano() - lastWriteTime.UnixNano() + if writeTimePassed >= int64(flushTimeout) { + lastWriteTime = curTime + go func() { + time.Sleep(flushTimeout) + FlushCache(ctx) + }() + } +} + +func WriteAt(ctx context.Context, blockId string, name string, p []byte, off int64) (int, error) { + return WriteAtHelper(ctx, blockId, name, p, off, true) +} + +func WriteAtHelper(ctx context.Context, blockId string, name string, p []byte, off int64, flushCache bool) (int, error) { + bytesToWrite := len(p) + bytesWritten := 0 + curCacheNum := int(math.Floor(float64(off) / float64(MaxBlockSize))) + numCaches := int(math.Ceil(float64(bytesToWrite) / float64(MaxBlockSize))) + cacheOffset := off - (int64(curCacheNum) * MaxBlockSize) + if (cacheOffset + int64(bytesToWrite)) > MaxBlockSize { + numCaches += 1 + } + fInfo, err := Stat(ctx, blockId, name) + if err != nil { + return 0, fmt.Errorf("Write At err: %v", err) + } + if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular { + numOver := off / fInfo.Opts.MaxSize + off = off - (numOver * fInfo.Opts.MaxSize) + } + for index := curCacheNum; index < curCacheNum+numCaches; index++ { + cacheOffset := off - (int64(index) * MaxBlockSize) + bytesToWriteToCurCache := int(math.Min(float64(bytesToWrite), float64(MaxBlockSize-cacheOffset))) + pullFromDB := true + if cacheOffset == 0 && int64(bytesToWriteToCurCache) == MaxBlockSize { + pullFromDB = false + } + _, b, err := WriteToCacheBlockNum(ctx, blockId, name, p, int(cacheOffset), bytesToWriteToCurCache, index, pullFromDB) + bytesWritten += b + bytesToWrite -= b + off += int64(b) + if err != nil { + if err.Error() == MaxSizeError { + if fInfo.Opts.Circular { + p = p[int64(b):] + b, err := WriteAtHelper(ctx, blockId, name, p, 0, false) + bytesWritten += b + if err != nil { + return bytesWritten, fmt.Errorf("Write to cache error: %v", err) + } + break + } + } else { + return bytesWritten, fmt.Errorf("Write to cache error: %v", err) + } + } + if len(p) == b { + break + } + p = p[int64(b):] + } + if flushCache { + StartFlushTimer(ctx) + } + return bytesWritten, nil +} + +func GetAllBlockSizes(dataBlocks []*CacheBlock) (int, int) { + rtn := 0 + numNil := 0 + for idx, block := range dataBlocks { + if block == nil { + numNil += 1 + continue + } + rtn += block.size + if block.size != len(block.data) { + log.Printf("error: block %v has incorrect block size : %v %v", idx, block.size, len(block.data)) + } + } + return rtn, numNil +} + +func FlushCache(ctx context.Context) error { + for _, cacheEntry := range cache { + err := WriteFileToDB(ctx, *cacheEntry.Info) + if err != nil { + return err + } + clearEntry := true + cacheEntry.Lock.Lock() + for index, block := range cacheEntry.DataBlocks { + if block == nil || block.size == 0 { + continue + } + if !block.dirty { + clearEntry = false + continue + } + err := WriteDataBlockToDB(ctx, cacheEntry.Info.BlockId, cacheEntry.Info.Name, index, block.data) + if err != nil { + return err + } + cacheEntry.DataBlocks[index] = nil + } + cacheEntry.Lock.Unlock() + if clearEntry && cacheEntry.Refs <= 0 { + DeleteCacheEntry(ctx, cacheEntry.Info.BlockId, cacheEntry.Info.Name) + } + } + return nil +} + +func ReadAt(ctx context.Context, blockId string, name string, p *[]byte, off int64) (int, error) { + bytesRead := 0 + fInfo, err := Stat(ctx, blockId, name) + if err != nil { + return 0, fmt.Errorf("Read At err: %v", err) + } + if off > fInfo.Opts.MaxSize && fInfo.Opts.Circular { + numOver := off / fInfo.Opts.MaxSize + off = off - (numOver * fInfo.Opts.MaxSize) + } + if off > fInfo.Size { + return 0, fmt.Errorf("Read At error: tried to read past the end of the file") + } + endReadPos := math.Min(float64(int64(len(*p))+off), float64(fInfo.Size)) + bytesToRead := int64(endReadPos) - off + curCacheNum := int(math.Floor(float64(off) / float64(MaxBlockSize))) + numCaches := int(math.Ceil(float64(bytesToRead) / float64(MaxBlockSize))) + cacheOffset := off - (int64(curCacheNum) * MaxBlockSize) + if (cacheOffset + int64(bytesToRead)) > MaxBlockSize { + numCaches += 1 + } + for index := curCacheNum; index < curCacheNum+numCaches; index++ { + curCacheBlock, err := GetCacheBlock(ctx, blockId, name, index, true) + if err != nil { + return bytesRead, fmt.Errorf("Error getting cache block: %v", err) + } + cacheOffset := off - (int64(index) * MaxBlockSize) + if cacheOffset < 0 { + return bytesRead, nil + } + bytesToReadFromCurCache := int(math.Min(float64(bytesToRead), float64(MaxBlockSize-cacheOffset))) + fileMaxSize := fInfo.Opts.MaxSize + maxReadSize := fileMaxSize - (int64(index) * MaxBlockSize) + b, err := ReadFromCacheBlock(ctx, blockId, name, curCacheBlock, p, int(cacheOffset), bytesToReadFromCurCache, bytesRead, maxReadSize) + if b == 0 { + log.Printf("something wrong %v %v %v %v %v %v %v %v", index, off, cacheOffset, curCacheNum, numCaches, bytesRead, bytesToRead, curCacheBlock) + cacheEntry, _ := GetCacheEntry(ctx, blockId, name) + blockSize, numNil := GetAllBlockSizes(cacheEntry.DataBlocks) + maybeDBSize := int64(numNil) * MaxBlockSize + maybeFullSize := int64(blockSize) + maybeDBSize + log.Printf("block actual sizes: %v %v %v %v %v\n", blockSize, numNil, maybeDBSize, maybeFullSize, len(cacheEntry.DataBlocks)) + } + bytesRead += b + bytesToRead -= int64(b) + off += int64(b) + + if err != nil { + if err.Error() == MaxSizeError { + if fInfo.Opts.Circular { + off = 0 + newP := (*p)[b:] + b, err := ReadAt(ctx, blockId, name, &newP, off) + bytesRead += b + if err != nil { + return bytesRead, err + } + break + } + } else { + return bytesRead, fmt.Errorf("Read from cache error: %v", err) + } + } + } + return bytesRead, nil +} + +func AppendData(ctx context.Context, blockId string, name string, p []byte) (int, error) { + appendLock.Lock() + defer appendLock.Unlock() + fInfo, err := Stat(ctx, blockId, name) + if err != nil { + return 0, fmt.Errorf("Append stat error: %v", err) + } + return WriteAt(ctx, blockId, name, p, fInfo.Size) +} + +func DeleteFile(ctx context.Context, blockId string, name string) error { + DeleteCacheEntry(ctx, blockId, name) + err := DeleteFileFromDB(ctx, blockId, name) + return err +} + +func DeleteBlock(ctx context.Context, blockId string) error { + for cacheId, _ := range cache { + curBlockId, name := GetValuesFromCacheId(cacheId) + if curBlockId == blockId { + err := DeleteFile(ctx, blockId, name) + if err != nil { + return fmt.Errorf("Error deleting %v %v: %v", blockId, name, err) + } + } + } + err := DeleteBlockFromDB(ctx, blockId) + return err +} + +func WriteFile(ctx context.Context, blockId string, name string, meta FileMeta, opts FileOptsType, data []byte) (int, error) { + MakeFile(ctx, blockId, name, meta, opts) + return AppendData(ctx, blockId, name, data) +} + +func WriteMeta(ctx context.Context, blockId string, name string, meta FileMeta) error { + _, err := Stat(ctx, blockId, name) + // stat so that we can make sure cache entry is popuplated + if err != nil { + return err + } + cacheEntry, found := GetCacheEntry(ctx, blockId, name) + if !found { + return fmt.Errorf("WriteAt error: cache entry not found") + } + cacheEntry.Lock.Lock() + defer cacheEntry.Lock.Unlock() + cacheEntry.Info.Meta = meta + return nil +} + +func ListFiles(ctx context.Context, blockId string) []*FileInfo { + fInfoArr, err := GetAllFilesInDBForBlockId(ctx, blockId) + if err != nil { + return nil + } + return fInfoArr +} + +func ListAllFiles(ctx context.Context) []*FileInfo { + fInfoArr, err := GetAllFilesInDB(ctx) + if err != nil { + return nil + } + return fInfoArr +} + +func GetAllBlockIds(ctx context.Context) []string { + rtn, err := GetAllBlockIdsInDB(ctx) + if err != nil { + return nil + } + return rtn +} diff --git a/wavesrv/pkg/blockstore/blockstore_dbops.go b/wavesrv/pkg/blockstore/blockstore_dbops.go new file mode 100644 index 000000000..0eefa247d --- /dev/null +++ b/wavesrv/pkg/blockstore/blockstore_dbops.go @@ -0,0 +1,242 @@ +package blockstore + +import ( + "context" + "encoding/json" + "fmt" + "log" + "path" + "sync" + + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + "github.com/sawka/txwrap" + "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" + "github.com/wavetermdev/waveterm/wavesrv/pkg/scbase" +) + +const DBFileName = "blockstore.db" + +type SingleConnDBGetter struct { + SingleConnLock *sync.Mutex +} + +var dbWrap *SingleConnDBGetter + +type TxWrap = txwrap.TxWrap + +func InitDBState() { + dbWrap = &SingleConnDBGetter{SingleConnLock: &sync.Mutex{}} +} + +func (dbg *SingleConnDBGetter) GetDB(ctx context.Context) (*sqlx.DB, error) { + db, err := GetDB(ctx) + if err != nil { + return nil, err + } + dbg.SingleConnLock.Lock() + return db, nil +} + +func (dbg *SingleConnDBGetter) ReleaseDB(db *sqlx.DB) { + dbg.SingleConnLock.Unlock() +} + +func WithTx(ctx context.Context, fn func(tx *TxWrap) error) error { + return txwrap.DBGWithTx(ctx, dbWrap, fn) +} + +func WithTxRtn[RT any](ctx context.Context, fn func(tx *TxWrap) (RT, error)) (RT, error) { + var rtn RT + txErr := WithTx(ctx, func(tx *TxWrap) error { + temp, err := fn(tx) + if err != nil { + return err + } + rtn = temp + return nil + }) + return rtn, txErr +} + +var globalDBLock = &sync.Mutex{} +var globalDB *sqlx.DB +var globalDBErr error + +func GetDBName() string { + scHome := scbase.GetWaveHomeDir() + return path.Join(scHome, DBFileName) +} + +func GetDB(ctx context.Context) (*sqlx.DB, error) { + if txwrap.IsTxWrapContext(ctx) { + return nil, fmt.Errorf("cannot call GetDB from within a running transaction") + } + globalDBLock.Lock() + defer globalDBLock.Unlock() + if globalDB == nil && globalDBErr == nil { + dbName := GetDBName() + globalDB, globalDBErr = sqlx.Open("sqlite3", fmt.Sprintf("file:%s?cache=shared&mode=rwc&_journal_mode=WAL&_busy_timeout=5000", dbName)) + if globalDBErr != nil { + globalDBErr = fmt.Errorf("opening db[%s]: %w", dbName, globalDBErr) + log.Printf("[db] error: %v\n", globalDBErr) + } else { + log.Printf("[db] successfully opened db %s\n", dbName) + } + } + return globalDB, globalDBErr +} + +func CloseDB() { + globalDBLock.Lock() + defer globalDBLock.Unlock() + if globalDB == nil { + return + } + err := globalDB.Close() + if err != nil { + log.Printf("[db] error closing database: %v\n", err) + } + globalDB = nil +} + +func (f *FileInfo) ToMap() map[string]interface{} { + rtn := make(map[string]interface{}) + log.Printf("fileInfo ToMap is unimplemented!") + return rtn +} + +func (fInfo *FileInfo) FromMap(m map[string]interface{}) bool { + fileOpts := FileOptsType{} + dbutil.QuickSetBool(&fileOpts.Circular, m, "circular") + dbutil.QuickSetInt64(&fileOpts.MaxSize, m, "maxsize") + + var metaJson []byte + dbutil.QuickSetBytes(&metaJson, m, "meta") + var fileMeta FileMeta + err := json.Unmarshal(metaJson, &fileMeta) + if err != nil { + return false + } + dbutil.QuickSetStr(&fInfo.BlockId, m, "blockid") + dbutil.QuickSetStr(&fInfo.Name, m, "name") + dbutil.QuickSetInt64(&fInfo.Size, m, "size") + dbutil.QuickSetInt64(&fInfo.CreatedTs, m, "createdts") + dbutil.QuickSetInt64(&fInfo.ModTs, m, "modts") + fInfo.Opts = fileOpts + fInfo.Meta = fileMeta + return true +} + +func GetFileInfo(ctx context.Context, blockId string, name string) (*FileInfo, error) { + fInfoArr, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]*FileInfo, error) { + var rtn []*FileInfo + query := `SELECT * FROM block_file WHERE name = 'file-1'` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*FileInfo](m)) + } + return rtn, nil + }) + if txErr != nil { + return nil, fmt.Errorf("GetFileInfo database error: %v", txErr) + } + if len(fInfoArr) > 1 { + return nil, fmt.Errorf("GetFileInfo duplicate files in database") + } + if len(fInfoArr) == 0 { + return nil, fmt.Errorf("GetFileInfo: File not found") + } + fInfo := fInfoArr[0] + return fInfo, nil +} + +func GetCacheFromDB(ctx context.Context, blockId string, name string, off int64, length int64, cacheNum int64) (*[]byte, error) { + return WithTxRtn(ctx, func(tx *TxWrap) (*[]byte, error) { + var cacheData *[]byte + query := `SELECT substr(data,?,?) FROM block_data WHERE blockid = ? AND name = ? and partidx = ?` + tx.Get(&cacheData, query, off, length+1, blockId, name, cacheNum) + if cacheData == nil { + cacheData = &[]byte{} + } + return cacheData, nil + }) +} + +func DeleteFileFromDB(ctx context.Context, blockId string, name string) error { + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_file where blockid = ? AND name = ?` + tx.Exec(query, blockId, name) + return nil + }) + if txErr != nil { + return txErr + } + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_data where blockid = ? AND name = ?` + tx.Exec(query, blockId, name) + return nil + }) + if txErr != nil { + return txErr + } + return nil +} + +func DeleteBlockFromDB(ctx context.Context, blockId string) error { + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_file where blockid = ?` + tx.Exec(query, blockId) + return nil + }) + if txErr != nil { + return txErr + } + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_data where blockid = ?` + tx.Exec(query, blockId) + return nil + }) + if txErr != nil { + return txErr + } + return nil +} + +func GetAllFilesInDBForBlockId(ctx context.Context, blockId string) ([]*FileInfo, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]*FileInfo, error) { + var rtn []*FileInfo + query := `SELECT * FROM block_file where blockid = ?` + marr := tx.SelectMaps(query, blockId) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*FileInfo](m)) + } + return rtn, nil + }) +} + +func GetAllFilesInDB(ctx context.Context) ([]*FileInfo, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]*FileInfo, error) { + var rtn []*FileInfo + query := `SELECT * FROM block_file` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*FileInfo](m)) + } + return rtn, nil + }) +} + +func GetAllBlockIdsInDB(ctx context.Context) ([]string, error) { + return WithTxRtn(ctx, func(tx *TxWrap) ([]string, error) { + var rtn []string + query := `SELECT DISTINCT blockid FROM block_file` + marr := tx.SelectMaps(query) + for _, m := range marr { + var blockId string + dbutil.QuickSetStr(&blockId, m, "blockid") + rtn = append(rtn, blockId) + } + return rtn, nil + }) +} diff --git a/wavesrv/pkg/blockstore/blockstore_test.go b/wavesrv/pkg/blockstore/blockstore_test.go new file mode 100644 index 000000000..5d539bced --- /dev/null +++ b/wavesrv/pkg/blockstore/blockstore_test.go @@ -0,0 +1,1077 @@ +package blockstore + +import ( + "bytes" + "context" + "crypto/md5" + "crypto/rand" + "log" + "sync" + "testing" + "time" + + "github.com/alecthomas/units" + + "github.com/wavetermdev/waveterm/wavesrv/pkg/dbutil" +) + +type TestBlockType struct { + BlockId string + Name string + Partidx int + Data []byte +} + +func (b *TestBlockType) ToMap() map[string]interface{} { + rtn := make(map[string]interface{}) + return rtn +} + +func (b *TestBlockType) FromMap(m map[string]interface{}) bool { + dbutil.QuickSetStr(&b.BlockId, m, "blockid") + dbutil.QuickSetStr(&b.Name, m, "name") + dbutil.QuickSetInt(&b.Partidx, m, "partidx") + dbutil.QuickSetBytes(&b.Data, m, "data") + return true +} + +func Cleanup(t *testing.T, ctx context.Context) { + DeleteBlock(ctx, "test-block-id") +} + +func CleanupName(t *testing.T, ctx context.Context, blockId string) { + DeleteBlock(ctx, blockId) +} + +func TestGetDB(t *testing.T) { + GetDBTimeout := 10 * time.Second + ctx, _ := context.WithTimeout(context.Background(), GetDBTimeout) + _, err := GetDB(ctx) + if err != nil { + t.Errorf("TestInitDB error: %v", err) + } + CloseDB() +} + +func SimpleAssert(t *testing.T, condition bool, description string) { + if !condition { + t.Errorf("Simple Assert <%s> Failed", description) + } else { + log.Printf("Simple Assert <%s> Passed", description) + } +} + +func SimpleFatalAssert(t *testing.T, condition bool, description string) { + if !condition { + t.Fatalf("Simple Assert <%s> Failed", description) + } else { + log.Printf("Simple Assert <%s> Passed", description) + } + +} + +func InsertIntoBlockData(t *testing.T, ctx context.Context, blockId string, name string, partidx int, data []byte) { + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `INSERT into block_data values (?, ?, ?, ?)` + tx.Exec(query, blockId, name, partidx, data) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error inserting into block_data table: %v", txErr) + } +} + +func TestTx(t *testing.T) { + ctx := context.Background() + SetFlushTimeout(2 * time.Minute) + InitDBState() + txErr := WithTx(ctx, func(tx *TxWrap) error { + query := `INSERT into block_data values ('test-block-id', 'test-file-name', 0, 256)` + tx.Exec(query) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error inserting into block_data table: %v", txErr) + } + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `INSERT into block_data values (?, ?, ?, ?)` + tx.Exec(query, "test-block-id", "test-file-name-2", 1, []byte{110, 200, 50, 45}) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error inserting into block_data table: %v", txErr) + } + block_data, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]*TestBlockType, error) { + var rtn []*TestBlockType + query := `SELECT * FROM block_data where blockid = 'test-block-id'` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*TestBlockType](m)) + } + return rtn, nil + }) + if txErr != nil { + t.Errorf("TestTx error getting block data: %v", txErr) + } + SimpleAssert(t, len(block_data) == 2, "select-num-entries") + log.Printf("Block Data: ") + log.Printf("%v", block_data[0]) + log.Printf("%v", block_data[1]) + SimpleAssert(t, block_data[0].Name == "test-file-name", "first-entry-name-correct") + SimpleAssert(t, len(block_data[1].Data) == 4, "blob-data-correct-length") + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_data where blockid = 'test-block-id'` + tx.Exec(query) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error deleting test entries: %v", txErr) + } + CloseDB() +} +func TestMultipleChunks(t *testing.T) { + ctx := context.Background() + InitDBState() + InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 0, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 1, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 2, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 3, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-1", 4, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-2", 0, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-2", 1, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-2", 2, make([]byte, 5)) + InsertIntoBlockData(t, ctx, "test-block-id", "file-2", 3, make([]byte, 5)) + data, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]*TestBlockType, error) { + var rtn []*TestBlockType + query := `SELECT * FROM block_data where name = 'file-1'` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*TestBlockType](m)) + } + return rtn, nil + }) + if txErr != nil { + t.Errorf("TestMultipleChunks error getting chunks from file-1 %v", txErr) + } + SimpleAssert(t, len(data) == 5, "file-1 num parts == 5") + data, txErr = WithTxRtn(ctx, func(tx *TxWrap) ([]*TestBlockType, error) { + var rtn []*TestBlockType + query := `SELECT * FROM block_data where name = 'file-2'` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*TestBlockType](m)) + } + return rtn, nil + }) + if txErr != nil { + t.Errorf("TestMultipleChunks error getting chunks from file-2 %v", txErr) + } + SimpleAssert(t, len(data) == 4, "file-2 num parts == 4") + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_data where blockid = 'test-block-id'` + tx.Exec(query) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error deleting test entries: %v", txErr) + } +} + +func TestMakeFile(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: 0, Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + data, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]*FileInfo, error) { + var rtn []*FileInfo + query := `SELECT * FROM block_file WHERE name = 'file-1'` + marr := tx.SelectMaps(query) + for _, m := range marr { + rtn = append(rtn, dbutil.FromMap[*FileInfo](m)) + } + return rtn, nil + }) + if txErr != nil { + t.Errorf("TestMakeFile err getting file-1 info %v", txErr) + } + log.Printf("data: %v", data) + SimpleAssert(t, len(data) == 1, "no duplicate files") + curFileInfo := data[0] + log.Printf("cur file info: %v", curFileInfo) + SimpleAssert(t, curFileInfo.Name == "file-1", "correct file name") + SimpleAssert(t, curFileInfo.Meta["test-descriptor"] == true, "meta correct") + curCacheEntry := cache[GetCacheId("test-block-id", "file-1")] + curFileInfo = curCacheEntry.Info + log.Printf("cache entry: %v", curCacheEntry) + SimpleAssert(t, curFileInfo.Name == "file-1", "cache correct file name") + SimpleAssert(t, curFileInfo.Meta["test-descriptor"] == true, "cache meta correct") + txErr = WithTx(ctx, func(tx *TxWrap) error { + query := `DELETE from block_file where blockid = 'test-block-id'` + tx.Exec(query) + return nil + }) + if txErr != nil { + t.Errorf("TestTx error deleting test entries: %v", txErr) + } + Cleanup(t, ctx) +} + +func TestWriteAt(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + log.Printf("Max Block Size: %v", MaxBlockSize) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + cacheData, err := GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err = GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == len(testBytesToWrite), "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + bytesWritten, err = WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, int64(bytesWritten)) + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == (2*len(testBytesToWrite)), "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err = Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + testBytesToWrite = []byte{'B', 'E', 'S', 'T'} + bytesWritten, err = WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err = GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == 22, "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err = Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + bytesWritten, err = WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 11) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err = GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == 22, "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err = Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + Cleanup(t, ctx) +} + +func TestWriteAtLeftPad(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + log.Printf("Max Block Size: %v", MaxBlockSize) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 11) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + log.Printf("LEFT PAD bytes written: %v\n", bytesWritten) + SimpleAssert(t, bytesWritten == 11, "Correct num bytes written") + cacheData, err := GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == 22, "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v %v %v", fInfo, fInfo.Size, len(cacheData.data)) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + Cleanup(t, ctx) +} + +func TestReadAt(t *testing.T) { + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + log.Printf("Max Block Size: %v", MaxBlockSize) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write Aterror: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err := GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == len(testBytesToWrite), "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + + var read []byte = make([]byte, 16) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &read, 0) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + + read = make([]byte, 16) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &read, 4) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == (11-4), "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + Cleanup(t, ctx) +} + +func TestFlushCache(t *testing.T) { + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + log.Printf("Max Block Size: %v", MaxBlockSize) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err := GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == len(testBytesToWrite), "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + + FlushCache(ctx) + + var read []byte = make([]byte, 32) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &read, 0) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + + read = make([]byte, 32) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &read, 4) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == (11-4), "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + dbData, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]byte, error) { + var cacheData *[]byte = &[]byte{} + query := `SELECT data from block_data where blockid = 'test-block-id' and name = 'file-1'` + tx.Get(&cacheData, query) + return *cacheData, nil + }) + if txErr != nil { + t.Errorf("get data from db error: %v", txErr) + } + log.Printf("DB Data: %v", dbData) + Cleanup(t, ctx) +} + +var largeDataFlushFullWriteSize int64 = int64(1024 * units.Megabyte) + +func WriteLargeDataFlush(t *testing.T, ctx context.Context) { + writeSize := int64(64 - 16) + fullWriteSize := largeDataFlushFullWriteSize + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + writeIndex := int64(0) + writeBuf := make([]byte, writeSize) + numWrites := fullWriteSize / writeSize + hashBuf := make([]byte, 16) + for i := 0; i < int(numWrites); i++ { + rand.Read(writeBuf) + hash := md5.New() + _, err := hash.Write(hashBuf) + if err != nil { + t.Errorf("hashing hashbuf error: %v", err) + } + _, err = hash.Write(writeBuf) + if err != nil { + t.Errorf("hashing writebuf error: %v", err) + } + copy(hashBuf, hash.Sum(nil)) + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", writeBuf, writeIndex) + if err != nil { + log.Printf("error: %v", err) + t.Errorf("Write At error: %v\n", err) + } + writeIndex += int64(bytesWritten) + } + log.Printf("final hash: %v writeBuf: %v bytesWritten: %v", hashBuf, writeBuf, writeIndex) + + FlushCache(ctx) + + readBuf := make([]byte, writeSize) + readHashBuf := make([]byte, 16) + readIndex := int64(0) + for i := 0; i < int(numWrites); i++ { + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, readIndex) + readIndex += int64(bytesRead) + hash := md5.New() + _, err = hash.Write(readHashBuf) + if err != nil { + t.Errorf("hashing hashbuf error: %v", err) + } + _, err = hash.Write(readBuf) + if err != nil { + t.Errorf("hashing readbuf error: %v", err) + } + copy(readHashBuf, hash.Sum(nil)) + } + log.Printf("final hash: %v readBuf: %v, bytesRead: %v", readHashBuf, readBuf, readIndex) + SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal") +} +func TestWriteAtMaxSize(t *testing.T) { + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(4), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write at error: %v", err) + } + SimpleAssert(t, bytesWritten == 4, "Correct num bytes written") + readTest := []byte{'T', 'E', 'S', 'T'} + readBuf := make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, 0) + log.Printf("readbuf: %v\n", readBuf) + SimpleAssert(t, bytesRead == 4, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") + Cleanup(t, ctx) +} + +func TestWriteAtMaxSizeMultipleBlocks(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(MaxBlockSize * 2), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, (MaxBlockSize*2)-4) + if err != nil { + t.Errorf("Write at error: %v", err) + } + SimpleAssert(t, bytesWritten == 4, "Correct num bytes written") + readTest := []byte{'T', 'E', 'S', 'T'} + readBuf := make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, (MaxBlockSize*2)-4) + log.Printf("readbuf multiple: %v %v %v\n", readBuf, bytesRead, bytesWritten) + SimpleAssert(t, bytesRead == 4, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") + Cleanup(t, ctx) +} + +func TestWriteAtCircular(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(MaxBlockSize * 2), Circular: true, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, (MaxBlockSize*2)-4) + if err != nil { + t.Errorf("Write at error: %v", err) + } + SimpleAssert(t, bytesWritten == 11, "Correct num bytes written") + + readTest := []byte{'T', 'E', 'S', 'T'} + readBuf := make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, (MaxBlockSize*2)-4) + SimpleAssert(t, bytesRead == 11, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") + SimpleAssert(t, bytes.Equal(readBuf, testBytesToWrite), "Correct bytes read") + log.Printf("readbuf circular %v %v %v", readBuf, string(readBuf), bytesRead) + + readTest = []byte{'M', 'E', 'S', 'S', 'A', 'G', 'E'} + readBuf = make([]byte, len(testBytesToWrite)) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &readBuf, 0) + SimpleAssert(t, bytesRead == 7, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read") + log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead) + Cleanup(t, ctx) +} + +func TestWriteAtCircularWierdOffset(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileSize := MaxBlockSize*2 - 500 + fileOpts := FileOptsType{MaxSize: int64(fileSize), Circular: true, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + log.Printf("first mk") + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, (fileSize)-4) + log.Printf("end mk") + if err != nil { + t.Errorf("Write at error: %v", err) + } + SimpleAssert(t, bytesWritten == 11, "Correct num bytes written") + + readTest := []byte{'T', 'E', 'S', 'T'} + readBuf := make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, (fileSize)-4) + if err != nil { + t.Errorf("Read at error: %v", err) + } + SimpleAssert(t, bytesRead == 11, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:4], readTest), "Correct bytes read") + SimpleAssert(t, bytes.Equal(readBuf, testBytesToWrite), "Correct bytes read") + log.Printf("readbuf circular %v %v bytesRead: %v", readBuf, string(readBuf), bytesRead) + + readTest = []byte{'M', 'E', 'S', 'S', 'A', 'G', 'E'} + readBuf = make([]byte, len(testBytesToWrite)) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &readBuf, 0) + if err != nil { + t.Errorf("Read at error: %v", err) + } + SimpleAssert(t, bytesRead == 7, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf[:7], readTest), "Correct bytes read") + log.Printf("readbuf circular %v %v, %v", readBuf, string(readBuf), bytesRead) + Cleanup(t, ctx) +} + +func TestAppend(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileSize := MaxBlockSize*2 - 500 + fileOpts := FileOptsType{MaxSize: int64(fileSize), Circular: true, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + testAppendBytes1 := []byte{'T', 'E', 'S', 'T'} + log.Printf("append mk1\n") + bytesWritten, err := AppendData(ctx, "test-block-id", "file-1", testAppendBytes1) + if err != nil { + t.Errorf("Append Error: %v", err) + } + log.Printf("append mk2\n") + SimpleAssert(t, bytesWritten == len(testAppendBytes1), "Correct num bytes written") + readBuf := make([]byte, len(testAppendBytes1)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, 0) + log.Printf("read buf : %v", string(readBuf)) + if err != nil { + t.Errorf("Read Error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf, testAppendBytes1), "Correct bytes read") + testAppendBytes2 := []byte{'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err = AppendData(ctx, "test-block-id", "file-1", testAppendBytes2) + if err != nil { + t.Errorf("Append Error: %v", err) + } + SimpleAssert(t, bytesWritten == len(testAppendBytes2), "Correct num bytes written") + readTestBytes := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + readBuf = make([]byte, len(readTestBytes)) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &readBuf, 0) + log.Printf("read buf : %v", string(readBuf)) + if err != nil { + t.Errorf("Read Error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten+4, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf, readTestBytes), "Correct bytes read") + Cleanup(t, ctx) +} + +func AppendSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + writeBuf := make([]byte, 1) + rand.Read(writeBuf) + bytesWritten, err := AppendData(ctx, "test-block-id-sync", "file-1", writeBuf) + if err != nil { + t.Errorf("Worker append err: %v", err) + } + SimpleAssert(t, bytesWritten == 1, "Correct bytes written") +} +func TestAppendSync(t *testing.T) { + InitDBState() + var wg sync.WaitGroup + numWorkers := 10 + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + FlushCache(ctx) + for index := 0; index < numWorkers; index++ { + wg.Add(1) + go AppendSyncWorker(t, ctx, &wg) + } + wg.Wait() + readBuf := make([]byte, numWorkers) + bytesRead, err := ReadAt(ctx, "test-block-id-sync", "file-1", &readBuf, 0) + if err != nil { + t.Errorf("Read Error: %v", err) + } + log.Printf("read buf : %v", readBuf) + SimpleAssert(t, bytesRead == numWorkers, "Correct bytes read") + CleanupName(t, ctx, "test-block-id-sync") +} + +func TestAppendSyncMultiple(t *testing.T) { + numTests := 100 + for index := 0; index < numTests; index++ { + TestAppendSync(t) + log.Printf("finished test: %v", index) + } +} + +func WriteAtSyncWorker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, index int64) { + defer wg.Done() + writeBuf := make([]byte, 1) + rand.Read(writeBuf) + bytesWritten, err := WriteAt(ctx, "test-block-id-sync", "file-1", writeBuf, index) + if err != nil { + t.Errorf("Worker append err: %v", err) + } + log.Printf("worker bytes written: %v %v", bytesWritten, index) + SimpleAssert(t, bytesWritten == 1 || bytesWritten == int(index+1), "Correct bytes written") +} + +func TestWriteAtSync(t *testing.T) { + InitDBState() + var wg sync.WaitGroup + numWorkers := 10 + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id-sync", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + FlushCache(ctx) + for index := 0; index < numWorkers; index++ { + wg.Add(1) + go WriteAtSyncWorker(t, ctx, &wg, int64(index)) + } + wg.Wait() + readBuf := make([]byte, numWorkers) + bytesRead, err := ReadAt(ctx, "test-block-id-sync", "file-1", &readBuf, 0) + if err != nil { + t.Errorf("Read Error: %v", err) + } + log.Printf("read buf : %v", readBuf) + SimpleAssert(t, bytesRead == numWorkers, "Correct num bytes read") + CleanupName(t, ctx, "test-block-id-sync") +} + +func TestWriteAtSyncMultiple(t *testing.T) { + numTests := 100 + for index := 0; index < numTests; index++ { + TestWriteAtSync(t) + } +} + +func TestWriteFile(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts, testBytesToWrite) + if err != nil { + t.Errorf("write at error: %v", err) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + var read []byte = make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &read, 0) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + SimpleAssert(t, bytes.Equal(read, testBytesToWrite), "Correct bytes read") + Cleanup(t, ctx) +} + +func TestWriteMeta(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("stat error: %v", err) + } + SimpleAssert(t, fInfo.Meta["test-descriptor"] == true, "Retrieved meta correctly") + fInfo.Meta["second-test-descriptor"] = "test1" + fInfo, err = Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("stat error: %v", err) + } + log.Printf("meta: %v", fInfo.Meta) + SimpleAssert(t, fInfo.Meta["second-test-descriptor"] != "test1", "Stat returned deep copy") + fInfo.Meta["second-test-descriptor"] = "test1" + err = WriteMeta(ctx, "test-block-id", "file-1", fInfo.Meta) + if err != nil { + t.Errorf("write meta error: %v", err) + } + fInfo, err = Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("stat error: %v", err) + } + log.Printf("meta: %v", fInfo.Meta) + SimpleAssert(t, fInfo.Meta["second-test-descriptor"] == "test1", "Retrieved second meta correctly") + Cleanup(t, ctx) +} + +func TestGetAllBlockIds(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-3", "file-2", fileMeta, fileOpts) + if err != nil { + t.Errorf("error making file: %v", err) + } + blockIds := GetAllBlockIds(ctx) + log.Printf("blockids: %v", blockIds) + testBlockIdArr := []string{"test-block-id", "test-block-id-2", "test-block-id-3"} + for idx, val := range blockIds { + SimpleAssert(t, testBlockIdArr[idx] == val, "Correct blockid value") + CleanupName(t, ctx, val) + } +} + +func TestListFiles(t *testing.T) { + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-2", "file-1", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-2", "file-2", fileMeta, fileOpts) + err = MakeFile(ctx, "test-block-id-3", "file-2", fileMeta, fileOpts) + if err != nil { + t.Errorf("error making file: %v", err) + } + files := ListFiles(ctx, "test-block-id-2") + blockid_2_files := []string{"file-1", "file-2"} + log.Printf("files: %v", files) + for idx, val := range files { + SimpleAssert(t, val.Name == blockid_2_files[idx], "Correct file name") + } + blockid_1_files := []string{"file-1"} + files = ListFiles(ctx, "test-block-id") + log.Printf("files: %v", files) + for idx, val := range files { + SimpleAssert(t, val.Name == blockid_1_files[idx], "Correct file name") + } + CleanupName(t, ctx, "test-block-id") + CleanupName(t, ctx, "test-block-id-2") + CleanupName(t, ctx, "test-block-id-3") +} + +func TestFlushTimer(t *testing.T) { + testFlushTimeout := 10 * time.Second + SetFlushTimeout(testFlushTimeout) + InitDBState() + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + log.Printf("Max Block Size: %v", MaxBlockSize) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, 0) + if err != nil { + t.Errorf("Write At error: %v", err) + } else { + log.Printf("Write at no errors: %v", bytesWritten) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + cacheData, err := GetCacheBlock(ctx, "test-block-id", "file-1", 0, false) + if err != nil { + t.Errorf("Error getting cache: %v", err) + } + log.Printf("Cache data received: %v str: %s", cacheData, string(cacheData.data)) + SimpleAssert(t, len(cacheData.data) == len(testBytesToWrite), "Correct num bytes received") + SimpleAssert(t, len(cacheData.data) == cacheData.size, "Correct cache size") + fInfo, err := Stat(ctx, "test-block-id", "file-1") + if err != nil { + t.Errorf("Stat Error: %v", err) + } + log.Printf("Got stat: %v", fInfo) + SimpleAssert(t, int64(len(cacheData.data)) == fInfo.Size, "Correct fInfo size") + time.Sleep(testFlushTimeout) + var read []byte = make([]byte, 32) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &read, 0) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + + read = make([]byte, 32) + bytesRead, err = ReadAt(ctx, "test-block-id", "file-1", &read, 4) + if err != nil { + t.Errorf("Read error: %v", err) + } + SimpleAssert(t, bytesRead == (11-4), "Correct num bytes read") + log.Printf("bytes read: %v string: %s", read, string(read)) + dbData, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]byte, error) { + var cacheData *[]byte = &[]byte{} + query := `SELECT data from block_data where blockid = 'test-block-id' and name = 'file-1'` + tx.Get(&cacheData, query) + return *cacheData, nil + }) + if txErr != nil { + t.Errorf("get data from db error: %v", txErr) + } + log.Printf("DB Data: %v", dbData) + Cleanup(t, ctx) +} + +func TestFlushTimerMultiple(t *testing.T) { + testFlushTimeout := 1 * time.Second + SetFlushTimeout(testFlushTimeout) + numTests := 10 + for index := 0; index < numTests; index++ { + TestWriteAt(t) + time.Sleep(500 * time.Millisecond) + } +} + +// time consuming test + +func TestWriteAtMiddle(t *testing.T) { + ctx := context.Background() + WriteLargeDataFlush(t, ctx) + testBytesToWrite := []byte{'T', 'E', 'S', 'T', 'M', 'E', 'S', 'S', 'A', 'G', 'E'} + writeOff := MaxBlockSize + 15 + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", testBytesToWrite, writeOff) + if err != nil { + t.Errorf("Write at error: %v", err) + } + SimpleAssert(t, bytesWritten == len(testBytesToWrite), "Correct num bytes written") + FlushCache(ctx) + readBuf := make([]byte, len(testBytesToWrite)) + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, writeOff) + log.Printf("readBuf: %v %v", readBuf, string(readBuf)) + SimpleAssert(t, bytesRead == bytesWritten, "Correct num bytes read") + SimpleAssert(t, bytes.Equal(readBuf, testBytesToWrite), "read correct bytes") + Cleanup(t, ctx) +} + +func TestWriteLargeDataFlush(t *testing.T) { + ctx := context.Background() + WriteLargeDataFlush(t, ctx) + Cleanup(t, ctx) +} + +func TestWriteLargeDataNoFlush(t *testing.T) { + InitDBState() + writeSize := int64(64 - 16) + fullWriteSize := int64(1024 * units.Megabyte) + ctx := context.Background() + fileMeta := make(FileMeta) + fileMeta["test-descriptor"] = true + fileOpts := FileOptsType{MaxSize: int64(5 * units.Gigabyte), Circular: false, IJson: false} + err := MakeFile(ctx, "test-block-id", "file-1", fileMeta, fileOpts) + if err != nil { + t.Fatalf("MakeFile error: %v", err) + } + writeIndex := int64(0) + writeBuf := make([]byte, writeSize) + numWrites := fullWriteSize / writeSize + hashBuf := make([]byte, 16) + for i := 0; i < int(numWrites); i++ { + rand.Read(writeBuf) + hash := md5.New() + _, err := hash.Write(hashBuf) + if err != nil { + t.Errorf("hashing hashbuf error: %v", err) + } + _, err = hash.Write(writeBuf) + if err != nil { + t.Errorf("hashing writebuf error: %v", err) + } + copy(hashBuf, hash.Sum(nil)) + bytesWritten, err := WriteAt(ctx, "test-block-id", "file-1", writeBuf, writeIndex) + if int64(bytesWritten) != writeSize { + log.Printf("write issue: %v %v \n", bytesWritten, writeSize) + } + if err != nil { + log.Printf("error: %v", err) + t.Errorf("Write At error: %v\n", err) + } + writeIndex += int64(bytesWritten) + } + log.Printf("final hash: %v writeBuf: %v bytesWritten: %v", hashBuf, writeBuf, writeIndex) + + readBuf := make([]byte, writeSize) + readHashBuf := make([]byte, 16) + readIndex := int64(0) + for i := 0; i < int(numWrites); i++ { + bytesRead, err := ReadAt(ctx, "test-block-id", "file-1", &readBuf, readIndex) + /*if int64(bytesRead) != writeSize { + log.Printf("read issue: %v %v \n", bytesRead, writeSize) + } */ + readIndex += int64(bytesRead) + hash := md5.New() + _, err = hash.Write(readHashBuf) + if err != nil { + t.Errorf("hashing hashbuf error: %v", err) + } + _, err = hash.Write(readBuf) + if err != nil { + t.Errorf("hashing readbuf error: %v", err) + } + copy(readHashBuf, hash.Sum(nil)) + } + log.Printf("final hash: %v readBuf: %v, bytesRead: %v", readHashBuf, readBuf, readIndex) + SimpleAssert(t, bytes.Equal(readHashBuf, hashBuf), "hashes are equal") + Cleanup(t, ctx) +} + +// saving this code for later +/* + + cacheData, txErr := WithTxRtn(ctx, func(tx *TxWrap) ([]byte, error) { + var cacheData *[]byte + query := `SELECT data from block_data where blockid = 'test-block-id' and name = 'file-1'` + log.Printf("mk2") + tx.Get(&cacheData, query) + log.Printf("mk3: %v", cacheData) + return *cacheData, nil + }) +*/ diff --git a/wavesrv/pkg/blockstore/schema.sql b/wavesrv/pkg/blockstore/schema.sql new file mode 100644 index 000000000..f947c8fad --- /dev/null +++ b/wavesrv/pkg/blockstore/schema.sql @@ -0,0 +1,21 @@ +CREATE TABLE schema_migrations (version uint64,dirty bool); +CREATE UNIQUE INDEX version_unique ON schema_migrations (version); +CREATE TABLE block_file ( + blockid varchar(36) NOT NULL, + name varchar(200) NOT NULL, + maxsize bigint NOT NULL, + circular boolean NOT NULL, + size bigint NOT NULL, + createdts bigint NOT NULL, + modts bigint NOT NULL, + meta json NOT NULL, + PRIMARY KEY (blockid, name) +); + +CREATE TABLE block_data ( + blockid varchar(36) NOT NULL, + name varchar(200) NOT NULL, + partidx int NOT NULL, + data blob NOT NULL, + PRIMARY KEY(blockid, name, partidx) +); \ No newline at end of file