mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-22 21:42:49 +01:00
circular file buffer. metadata in header. uses flock to synchronize access. write metadata before and after writing file data.
This commit is contained in:
parent
77bd1fa7bc
commit
38870f9c6e
412
pkg/cirfile/cirfile.go
Normal file
412
pkg/cirfile/cirfile.go
Normal file
@ -0,0 +1,412 @@
|
||||
package cirfile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CBUF[version] [maxsize] [fileoffset] [startpos] [endpos]
|
||||
const HeaderFmt = "CBUF%02d %19d %19d %19d %19d\n" // 87 bytes
|
||||
const HeaderLen = 256 // set to 256 for future expandability
|
||||
const FullHeaderFmt = "%-255s\n" // 256 bytes (255 + newline)
|
||||
const CurrentVersion = 1
|
||||
const FilePosEmpty = -1 // sentinel, if startpos is set to -1, file is empty
|
||||
|
||||
const InitialLockDelay = 10 * time.Millisecond
|
||||
const InitialLockTries = 5
|
||||
const LockDelay = 100 * time.Millisecond
|
||||
|
||||
type File struct {
|
||||
OSFile *os.File
|
||||
Version byte
|
||||
MaxSize int64
|
||||
FileOffset int64
|
||||
StartPos int64
|
||||
EndPos int64
|
||||
FileDataSize int64 // size of data (does not include header size)
|
||||
FlockStatus int
|
||||
}
|
||||
|
||||
func (f *File) flock(ctx context.Context, lockType int) error {
|
||||
err := syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB)
|
||||
if err == nil {
|
||||
f.FlockStatus = lockType
|
||||
return nil
|
||||
}
|
||||
if err != syscall.EWOULDBLOCK {
|
||||
return err
|
||||
}
|
||||
if ctx == nil {
|
||||
return syscall.EWOULDBLOCK
|
||||
}
|
||||
// busy-wait with context
|
||||
numWaits := 0
|
||||
for {
|
||||
numWaits++
|
||||
var timeout time.Duration
|
||||
if numWaits <= InitialLockTries {
|
||||
timeout = InitialLockDelay
|
||||
} else {
|
||||
timeout = LockDelay
|
||||
}
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
err = syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB)
|
||||
if err == nil {
|
||||
f.FlockStatus = lockType
|
||||
return nil
|
||||
}
|
||||
if err != syscall.EWOULDBLOCK {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("could not acquire lock")
|
||||
}
|
||||
|
||||
func (f *File) unflock() {
|
||||
syscall.Flock(int(f.OSFile.Fd()), syscall.LOCK_UN) // ignore error (nothing to do about it anyway)
|
||||
f.FlockStatus = 0
|
||||
return
|
||||
}
|
||||
|
||||
// does not read metadata because locking could block/fail. we want to be able
|
||||
// to return a valid file struct without blocking.
|
||||
func OpenCirFile(fileName string) (*File, error) {
|
||||
fd, err := os.OpenFile(fileName, os.O_RDWR, 0777)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
finfo, err := fd.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if finfo.Size() < HeaderLen {
|
||||
return nil, fmt.Errorf("invalid cirfile, file length[%d] less than HeaderLen[%d]", finfo.Size(), HeaderLen)
|
||||
}
|
||||
rtn := &File{OSFile: fd}
|
||||
return rtn, nil
|
||||
}
|
||||
|
||||
// if the file already exists, it is an error.
|
||||
// there is a race condition if two goroutines try to create the same file between Stat() and Create(), so
|
||||
// they both might get no error, but only one file will be valid. if this is a concern, this call
|
||||
// should be externally synchronized.
|
||||
func CreateCirFile(fileName string, maxSize int64) (*File, error) {
|
||||
if maxSize <= 0 {
|
||||
return nil, fmt.Errorf("invalid maxsize[%d]", maxSize)
|
||||
}
|
||||
_, err := os.Stat(fileName)
|
||||
if err == nil {
|
||||
return nil, fmt.Errorf("file[%s] already exists", fileName)
|
||||
}
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("cannot stat: %w", err)
|
||||
}
|
||||
fd, err := os.Create(fileName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rtn := &File{OSFile: fd, Version: CurrentVersion, MaxSize: maxSize, StartPos: FilePosEmpty}
|
||||
err = rtn.flock(nil, syscall.LOCK_EX)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rtn.unflock()
|
||||
err = rtn.writeMeta()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rtn, nil
|
||||
}
|
||||
|
||||
func (f *File) ReadMeta(ctx context.Context) error {
|
||||
err := f.flock(ctx, syscall.LOCK_SH)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.unflock()
|
||||
return f.readMeta()
|
||||
}
|
||||
|
||||
func (f *File) hasShLock() bool {
|
||||
return f.FlockStatus == syscall.LOCK_EX || f.FlockStatus == syscall.LOCK_SH
|
||||
}
|
||||
|
||||
func (f *File) hasExLock() bool {
|
||||
return f.FlockStatus == syscall.LOCK_EX
|
||||
}
|
||||
|
||||
func (f *File) readMeta() error {
|
||||
if f.OSFile == nil {
|
||||
return fmt.Errorf("no *os.File")
|
||||
}
|
||||
if !f.hasShLock() {
|
||||
return fmt.Errorf("writeMeta must hold LOCK_SH")
|
||||
}
|
||||
_, err := f.OSFile.Seek(0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot seek file: %w", err)
|
||||
}
|
||||
finfo, err := f.OSFile.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot stat file: %w", err)
|
||||
}
|
||||
if finfo.Size() < 256 {
|
||||
return fmt.Errorf("invalid cbuf file size[%d] < 256", finfo.Size())
|
||||
}
|
||||
f.FileDataSize = finfo.Size() - 256
|
||||
buf := make([]byte, 256)
|
||||
_, err = io.ReadFull(f.OSFile, buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading header: %w", err)
|
||||
}
|
||||
// currently only one version, so we don't need to have special logic here yet
|
||||
_, err = fmt.Sscanf(string(buf), HeaderFmt, &f.Version, &f.MaxSize, &f.FileOffset, &f.StartPos, &f.EndPos)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sscanf error: %w", err)
|
||||
}
|
||||
if f.Version != CurrentVersion {
|
||||
return fmt.Errorf("invalid cbuf version[%d]", f.Version)
|
||||
}
|
||||
// possible incomplete write, fix start/end pos to be within filesize
|
||||
if f.FileDataSize == 0 {
|
||||
f.StartPos = FilePosEmpty
|
||||
f.EndPos = 0
|
||||
} else if f.StartPos >= f.FileDataSize && f.EndPos >= f.FileDataSize {
|
||||
f.StartPos = FilePosEmpty
|
||||
f.EndPos = 0
|
||||
} else if f.StartPos >= f.FileDataSize {
|
||||
f.StartPos = 0
|
||||
} else if f.EndPos >= f.FileDataSize {
|
||||
f.EndPos = f.FileDataSize - 1
|
||||
}
|
||||
if f.MaxSize <= 0 || f.FileOffset < 0 || (f.StartPos < 0 && f.StartPos != FilePosEmpty) || f.StartPos >= f.MaxSize || f.EndPos < 0 || f.EndPos >= f.MaxSize {
|
||||
return fmt.Errorf("invalid cbuf metadata version[%d] filedatasize[%d] maxsize[%d] fileoffset[%d] startpos[%d] endpos[%d]", f.Version, f.FileDataSize, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// no error checking of meta values
|
||||
func (f *File) writeMeta() error {
|
||||
if f.OSFile == nil {
|
||||
return fmt.Errorf("no *os.File")
|
||||
}
|
||||
if !f.hasExLock() {
|
||||
return fmt.Errorf("writeMeta must hold LOCK_EX")
|
||||
}
|
||||
_, err := f.OSFile.Seek(0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot seek file: %w", err)
|
||||
}
|
||||
metaStr := fmt.Sprintf(HeaderFmt, f.Version, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos)
|
||||
fullMetaStr := fmt.Sprintf(FullHeaderFmt, metaStr)
|
||||
_, err = f.OSFile.WriteString(fullMetaStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("write error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// returns (fileOffset, datasize, error)
|
||||
// datasize is the current amount of readable data held in the cirfile
|
||||
func (f *File) GetStartOffsetAndSize(ctx context.Context) (int64, int64, error) {
|
||||
err := f.flock(ctx, syscall.LOCK_SH)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
defer f.unflock()
|
||||
err = f.readMeta()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
chunks := f.getFileChunks()
|
||||
return f.FileOffset, totalChunksSize(chunks), nil
|
||||
}
|
||||
|
||||
type fileChunk struct {
|
||||
StartPos int64
|
||||
Len int64
|
||||
}
|
||||
|
||||
func totalChunksSize(chunks []fileChunk) int64 {
|
||||
var rtn int64
|
||||
for _, chunk := range chunks {
|
||||
rtn += chunk.Len
|
||||
}
|
||||
return rtn
|
||||
}
|
||||
|
||||
func advanceChunks(chunks []fileChunk, offset int64) []fileChunk {
|
||||
if offset < 0 {
|
||||
panic(fmt.Sprintf("invalid negative offset: %d", offset))
|
||||
}
|
||||
if offset == 0 {
|
||||
return chunks
|
||||
}
|
||||
var rtn []fileChunk
|
||||
for _, chunk := range chunks {
|
||||
if offset >= chunk.Len {
|
||||
offset = offset - chunk.Len
|
||||
continue
|
||||
}
|
||||
if offset == 0 {
|
||||
rtn = append(rtn, chunk)
|
||||
} else {
|
||||
rtn = append(rtn, fileChunk{chunk.StartPos + offset, chunk.Len - offset})
|
||||
offset = 0
|
||||
}
|
||||
}
|
||||
return rtn
|
||||
}
|
||||
|
||||
func (f *File) getFileChunks() []fileChunk {
|
||||
if f.StartPos == FilePosEmpty {
|
||||
return nil
|
||||
}
|
||||
if f.EndPos >= f.StartPos {
|
||||
return []fileChunk{fileChunk{f.StartPos, f.EndPos - f.StartPos + 1}}
|
||||
}
|
||||
return []fileChunk{
|
||||
fileChunk{f.StartPos, f.FileDataSize - f.StartPos},
|
||||
fileChunk{0, f.EndPos + 1},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *File) getFreeChunks() []fileChunk {
|
||||
if f.StartPos == FilePosEmpty {
|
||||
return []fileChunk{fileChunk{0, f.MaxSize}}
|
||||
}
|
||||
if (f.EndPos == f.StartPos-1) || (f.StartPos == 0 && f.EndPos == f.MaxSize-1) {
|
||||
return nil
|
||||
}
|
||||
if f.EndPos < f.StartPos {
|
||||
return []fileChunk{fileChunk{f.EndPos + 1, f.StartPos - f.EndPos - 1}}
|
||||
}
|
||||
var rtn []fileChunk
|
||||
if f.EndPos < f.MaxSize-1 {
|
||||
rtn = append(rtn, fileChunk{f.EndPos + 1, f.MaxSize - f.EndPos - 1})
|
||||
}
|
||||
if f.StartPos > 0 {
|
||||
rtn = append(rtn, fileChunk{0, f.StartPos})
|
||||
}
|
||||
return rtn
|
||||
}
|
||||
|
||||
// returns (realOffset, data, error)
|
||||
// will only return io.EOF when len(data) == 0, otherwise will just do a short read
|
||||
func (f *File) ReadNext(ctx context.Context, buf []byte, offset int64) (int64, int, error) {
|
||||
err := f.flock(ctx, syscall.LOCK_SH)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
defer f.unflock()
|
||||
err = f.readMeta()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if offset < f.FileOffset {
|
||||
offset = f.FileOffset
|
||||
}
|
||||
relativeOffset := offset - f.FileOffset
|
||||
chunks := f.getFileChunks()
|
||||
curSize := totalChunksSize(chunks)
|
||||
if offset >= f.FileOffset+curSize {
|
||||
return f.FileOffset + curSize, 0, nil
|
||||
}
|
||||
chunks = advanceChunks(chunks, relativeOffset)
|
||||
numRead := 0
|
||||
for _, chunk := range chunks {
|
||||
if numRead >= len(buf) {
|
||||
break
|
||||
}
|
||||
toRead := len(buf) - numRead
|
||||
if toRead > int(chunk.Len) {
|
||||
toRead = int(chunk.Len)
|
||||
}
|
||||
nr, err := f.OSFile.ReadAt(buf[numRead:numRead+toRead], chunk.StartPos+HeaderLen)
|
||||
if err != nil {
|
||||
return offset, 0, err
|
||||
}
|
||||
numRead += nr
|
||||
}
|
||||
return offset, numRead, nil
|
||||
}
|
||||
|
||||
func (f *File) ensureFreeSpace(requiredSpace int64) error {
|
||||
if f.StartPos == FilePosEmpty {
|
||||
return nil
|
||||
}
|
||||
chunks := f.getFileChunks()
|
||||
curSpace := f.MaxSize - totalChunksSize(chunks)
|
||||
if curSpace >= requiredSpace {
|
||||
return nil
|
||||
}
|
||||
neededSpace := requiredSpace - curSpace
|
||||
if requiredSpace >= f.MaxSize {
|
||||
f.StartPos = FilePosEmpty
|
||||
f.EndPos = 0
|
||||
f.FileOffset += neededSpace
|
||||
} else {
|
||||
f.StartPos = (f.StartPos + neededSpace) % f.MaxSize
|
||||
f.FileOffset += neededSpace
|
||||
}
|
||||
return f.writeMeta()
|
||||
}
|
||||
|
||||
func (f *File) AppendData(ctx context.Context, buf []byte) error {
|
||||
err := f.flock(ctx, syscall.LOCK_EX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.unflock()
|
||||
err = f.readMeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = f.ensureFreeSpace(int64(len(buf)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(buf) >= int(f.MaxSize) {
|
||||
buf = buf[len(buf)-int(f.MaxSize):]
|
||||
}
|
||||
chunks := f.getFreeChunks()
|
||||
numWrite := 0
|
||||
for _, chunk := range chunks {
|
||||
if numWrite >= len(buf) {
|
||||
break
|
||||
}
|
||||
if chunk.Len == 0 {
|
||||
continue
|
||||
}
|
||||
toRead := len(buf) - numWrite
|
||||
if toRead > int(chunk.Len) {
|
||||
toRead = int(chunk.Len)
|
||||
}
|
||||
nw, err := f.OSFile.WriteAt(buf[numWrite:numWrite+toRead], chunk.StartPos+HeaderLen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if chunk.StartPos+int64(nw) > f.FileDataSize {
|
||||
f.FileDataSize = chunk.StartPos + int64(nw)
|
||||
}
|
||||
if f.StartPos == FilePosEmpty {
|
||||
f.StartPos = chunk.StartPos
|
||||
}
|
||||
f.EndPos = chunk.StartPos + int64(nw) - 1
|
||||
numWrite += nw
|
||||
}
|
||||
err = f.writeMeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
222
pkg/cirfile/cirfile_test.go
Normal file
222
pkg/cirfile/cirfile_test.go
Normal file
@ -0,0 +1,222 @@
|
||||
package cirfile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func validateFileSize(t *testing.T, name string, size int) {
|
||||
finfo, err := os.Stat(name)
|
||||
if err != nil {
|
||||
t.Fatalf("error stating file[%s]: %v", name, err)
|
||||
}
|
||||
if int(finfo.Size()) != size {
|
||||
t.Fatalf("invalid file[%s] expected[%d] got[%d]", name, size, finfo.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func validateMeta(t *testing.T, desc string, f *File, startPos int64, endPos int64, dataSize int64, offset int64) {
|
||||
if f.StartPos != startPos || f.EndPos != endPos || f.FileDataSize != dataSize || f.FileOffset != offset {
|
||||
t.Fatalf("metadata error (%s): startpos[%d %d] endpos[%d %d] filedatasize[%d %d] fileoffset[%d %d]", desc, f.StartPos, startPos, f.EndPos, endPos, f.FileDataSize, dataSize, f.FileOffset, offset)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpFile(name string) {
|
||||
barr, _ := os.ReadFile(name)
|
||||
fmt.Printf("<<<\n%s\n>>>", string(barr))
|
||||
}
|
||||
|
||||
func makeData(size int) string {
|
||||
var rtn string
|
||||
for {
|
||||
if len(rtn) >= size {
|
||||
break
|
||||
}
|
||||
needed := size - len(rtn)
|
||||
if needed < 10 {
|
||||
rtn += "123456789\n"[0:needed]
|
||||
break
|
||||
}
|
||||
rtn += "123456789\n"
|
||||
}
|
||||
return rtn
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
f1Name := path.Join(tempDir, "f1.cf")
|
||||
f, err := OpenCirFile(f1Name)
|
||||
if err == nil || f != nil {
|
||||
t.Fatalf("OpenCirFile f1.cf should fail (no file)")
|
||||
}
|
||||
f, err = CreateCirFile(f1Name, 100)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateCirFile f1.cf failed: %v", err)
|
||||
}
|
||||
if f == nil {
|
||||
t.Fatalf("CreateCirFile f1.cf returned nil")
|
||||
}
|
||||
err = f.ReadMeta(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("cannot readmeta from f1.cf: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, 256)
|
||||
if f.Version != CurrentVersion || f.MaxSize != 100 || f.FileOffset != 0 || f.StartPos != FilePosEmpty || f.EndPos != 0 || f.FileDataSize != 0 || f.FlockStatus != 0 {
|
||||
t.Fatalf("error with initial metadata #%v", f)
|
||||
}
|
||||
buf := make([]byte, 200)
|
||||
realOffset, nr, err := f.ReadNext(context.Background(), buf, 0)
|
||||
if realOffset != 0 || nr != 0 || err != nil {
|
||||
t.Fatalf("error with empty read: real-offset[%d] nr[%d] err[%v]", realOffset, nr, err)
|
||||
}
|
||||
realOffset, nr, err = f.ReadNext(context.Background(), buf, 1000)
|
||||
if realOffset != 0 || nr != 0 || err != nil {
|
||||
t.Fatalf("error with empty read: real-offset[%d] nr[%d] err[%v]", realOffset, nr, err)
|
||||
}
|
||||
f2, err := CreateCirFile(f1Name, 100)
|
||||
if err == nil || f2 != nil {
|
||||
t.Fatalf("should be an error to create duplicate CirFile")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFile(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
f1Name := path.Join(tempDir, "f1.cf")
|
||||
f, err := CreateCirFile(f1Name, 100)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create cirfile: %v", err)
|
||||
}
|
||||
err = f.AppendData(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen)
|
||||
if f.StartPos != FilePosEmpty || f.EndPos != 0 || f.FileDataSize != 0 {
|
||||
t.Fatalf("metadata error (1): %#v", f)
|
||||
}
|
||||
err = f.AppendData(context.Background(), []byte("hello"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+5)
|
||||
if f.StartPos != 0 || f.EndPos != 4 || f.FileDataSize != 5 {
|
||||
t.Fatalf("metadata error (2): %#v", f)
|
||||
}
|
||||
err = f.AppendData(context.Background(), []byte(" foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+9)
|
||||
validateMeta(t, "3", f, 0, 8, 9, 0)
|
||||
err = f.AppendData(context.Background(), []byte("\n"+makeData(20)))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+30)
|
||||
validateMeta(t, "4", f, 0, 29, 30, 0)
|
||||
|
||||
data120 := makeData(120)
|
||||
err = f.AppendData(context.Background(), []byte(data120))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+100)
|
||||
validateMeta(t, "5", f, 0, 99, 100, 50)
|
||||
err = f.AppendData(context.Background(), []byte("foo "))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+100)
|
||||
validateMeta(t, "6", f, 4, 3, 100, 54)
|
||||
|
||||
buf := make([]byte, 5)
|
||||
realOffset, nr, err := f.ReadNext(context.Background(), buf, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot ReadNext: %v", err)
|
||||
}
|
||||
if realOffset != 54 {
|
||||
t.Fatalf("wrong realoffset got[%d] expected[%d]", realOffset, 54)
|
||||
}
|
||||
if nr != 5 {
|
||||
t.Fatalf("wrong nr got[%d] expected[%d]", nr, 5)
|
||||
}
|
||||
if string(buf[0:nr]) != "56789" {
|
||||
t.Fatalf("wrong buf return got[%s] expected[%s]", string(buf[0:nr]), "56789")
|
||||
}
|
||||
realOffset, nr, err = f.ReadNext(context.Background(), buf, 60)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot readnext: %v", err)
|
||||
}
|
||||
if realOffset != 60 && nr != 5 {
|
||||
t.Fatalf("invalid rtn realoffset[%d] nr[%d]", realOffset, nr)
|
||||
}
|
||||
if string(buf[0:nr]) != "12345" {
|
||||
t.Fatalf("invalid rtn buf[%s]", string(buf[0:nr]))
|
||||
}
|
||||
realOffset, nr, err = f.ReadNext(context.Background(), buf, 800)
|
||||
if err != nil || realOffset != 154 || nr != 0 {
|
||||
t.Fatalf("invalid past end read: err[%v] realoffset[%d] nr[%d]", err, realOffset, nr)
|
||||
}
|
||||
realOffset, nr, err = f.ReadNext(context.Background(), buf, 150)
|
||||
if err != nil || realOffset != 150 || nr != 4 || string(buf[0:nr]) != "foo " {
|
||||
t.Fatalf("invalid end read: err[%v] realoffset[%d] nr[%d] buf[%s]", err, realOffset, nr, string(buf[0:nr]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlock(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
f1Name := path.Join(tempDir, "f1.cf")
|
||||
f, err := CreateCirFile(f1Name, 100)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot create cirfile: %v", err)
|
||||
}
|
||||
fd2, err := os.OpenFile(f1Name, os.O_RDWR, 0777)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open file: %v", err)
|
||||
}
|
||||
err = syscall.Flock(int(fd2.Fd()), syscall.LOCK_EX)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot lock fd: %v", err)
|
||||
}
|
||||
err = f.AppendData(nil, []byte("hello"))
|
||||
if err != syscall.EWOULDBLOCK {
|
||||
t.Fatalf("append should fail with EWOULDBLOCK")
|
||||
}
|
||||
timeoutCtx, _ := context.WithTimeout(context.Background(), 20*time.Millisecond)
|
||||
startTs := time.Now()
|
||||
err = f.ReadMeta(timeoutCtx)
|
||||
if err != context.DeadlineExceeded {
|
||||
t.Fatalf("readmeta should fail with context.DeadlineExceeded")
|
||||
}
|
||||
dur := time.Now().Sub(startTs)
|
||||
if dur < 20*time.Millisecond {
|
||||
t.Fatalf("readmeta should take at least 20ms")
|
||||
}
|
||||
syscall.Flock(int(fd2.Fd()), syscall.LOCK_UN)
|
||||
err = f.ReadMeta(timeoutCtx)
|
||||
if err != nil {
|
||||
t.Fatalf("readmeta err: %v", err)
|
||||
}
|
||||
err = syscall.Flock(int(fd2.Fd()), syscall.LOCK_SH)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot flock: %v", err)
|
||||
}
|
||||
err = f.AppendData(nil, []byte("hello"))
|
||||
if err != syscall.EWOULDBLOCK {
|
||||
t.Fatalf("append should fail with EWOULDBLOCK")
|
||||
}
|
||||
err = f.ReadMeta(timeoutCtx)
|
||||
if err != nil {
|
||||
t.Fatalf("readmeta err (should work because LOCK_SH): %v", err)
|
||||
}
|
||||
fd2.Close()
|
||||
err = f.AppendData(nil, []byte("hello"))
|
||||
if err != nil {
|
||||
t.Fatalf("append error (should work fd2 was closed): %v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user