diff --git a/pkg/cirfile/cirfile.go b/pkg/cirfile/cirfile.go new file mode 100644 index 000000000..d40fdceb8 --- /dev/null +++ b/pkg/cirfile/cirfile.go @@ -0,0 +1,412 @@ +package cirfile + +import ( + "context" + "fmt" + "io" + "os" + "syscall" + "time" +) + +// CBUF[version] [maxsize] [fileoffset] [startpos] [endpos] +const HeaderFmt = "CBUF%02d %19d %19d %19d %19d\n" // 87 bytes +const HeaderLen = 256 // set to 256 for future expandability +const FullHeaderFmt = "%-255s\n" // 256 bytes (255 + newline) +const CurrentVersion = 1 +const FilePosEmpty = -1 // sentinel, if startpos is set to -1, file is empty + +const InitialLockDelay = 10 * time.Millisecond +const InitialLockTries = 5 +const LockDelay = 100 * time.Millisecond + +type File struct { + OSFile *os.File + Version byte + MaxSize int64 + FileOffset int64 + StartPos int64 + EndPos int64 + FileDataSize int64 // size of data (does not include header size) + FlockStatus int +} + +func (f *File) flock(ctx context.Context, lockType int) error { + err := syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB) + if err == nil { + f.FlockStatus = lockType + return nil + } + if err != syscall.EWOULDBLOCK { + return err + } + if ctx == nil { + return syscall.EWOULDBLOCK + } + // busy-wait with context + numWaits := 0 + for { + numWaits++ + var timeout time.Duration + if numWaits <= InitialLockTries { + timeout = InitialLockDelay + } else { + timeout = LockDelay + } + select { + case <-time.After(timeout): + break + case <-ctx.Done(): + return ctx.Err() + } + err = syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB) + if err == nil { + f.FlockStatus = lockType + return nil + } + if err != syscall.EWOULDBLOCK { + return err + } + } + return fmt.Errorf("could not acquire lock") +} + +func (f *File) unflock() { + syscall.Flock(int(f.OSFile.Fd()), syscall.LOCK_UN) // ignore error (nothing to do about it anyway) + f.FlockStatus = 0 + return +} + +// does not read metadata because locking could block/fail. we want to be able +// to return a valid file struct without blocking. +func OpenCirFile(fileName string) (*File, error) { + fd, err := os.OpenFile(fileName, os.O_RDWR, 0777) + if err != nil { + return nil, err + } + finfo, err := fd.Stat() + if err != nil { + return nil, err + } + if finfo.Size() < HeaderLen { + return nil, fmt.Errorf("invalid cirfile, file length[%d] less than HeaderLen[%d]", finfo.Size(), HeaderLen) + } + rtn := &File{OSFile: fd} + return rtn, nil +} + +// if the file already exists, it is an error. +// there is a race condition if two goroutines try to create the same file between Stat() and Create(), so +// they both might get no error, but only one file will be valid. if this is a concern, this call +// should be externally synchronized. +func CreateCirFile(fileName string, maxSize int64) (*File, error) { + if maxSize <= 0 { + return nil, fmt.Errorf("invalid maxsize[%d]", maxSize) + } + _, err := os.Stat(fileName) + if err == nil { + return nil, fmt.Errorf("file[%s] already exists", fileName) + } + if !os.IsNotExist(err) { + return nil, fmt.Errorf("cannot stat: %w", err) + } + fd, err := os.Create(fileName) + if err != nil { + return nil, err + } + rtn := &File{OSFile: fd, Version: CurrentVersion, MaxSize: maxSize, StartPos: FilePosEmpty} + err = rtn.flock(nil, syscall.LOCK_EX) + if err != nil { + return nil, err + } + defer rtn.unflock() + err = rtn.writeMeta() + if err != nil { + return nil, err + } + return rtn, nil +} + +func (f *File) ReadMeta(ctx context.Context) error { + err := f.flock(ctx, syscall.LOCK_SH) + if err != nil { + return err + } + defer f.unflock() + return f.readMeta() +} + +func (f *File) hasShLock() bool { + return f.FlockStatus == syscall.LOCK_EX || f.FlockStatus == syscall.LOCK_SH +} + +func (f *File) hasExLock() bool { + return f.FlockStatus == syscall.LOCK_EX +} + +func (f *File) readMeta() error { + if f.OSFile == nil { + return fmt.Errorf("no *os.File") + } + if !f.hasShLock() { + return fmt.Errorf("writeMeta must hold LOCK_SH") + } + _, err := f.OSFile.Seek(0, 0) + if err != nil { + return fmt.Errorf("cannot seek file: %w", err) + } + finfo, err := f.OSFile.Stat() + if err != nil { + return fmt.Errorf("cannot stat file: %w", err) + } + if finfo.Size() < 256 { + return fmt.Errorf("invalid cbuf file size[%d] < 256", finfo.Size()) + } + f.FileDataSize = finfo.Size() - 256 + buf := make([]byte, 256) + _, err = io.ReadFull(f.OSFile, buf) + if err != nil { + return fmt.Errorf("error reading header: %w", err) + } + // currently only one version, so we don't need to have special logic here yet + _, err = fmt.Sscanf(string(buf), HeaderFmt, &f.Version, &f.MaxSize, &f.FileOffset, &f.StartPos, &f.EndPos) + if err != nil { + return fmt.Errorf("sscanf error: %w", err) + } + if f.Version != CurrentVersion { + return fmt.Errorf("invalid cbuf version[%d]", f.Version) + } + // possible incomplete write, fix start/end pos to be within filesize + if f.FileDataSize == 0 { + f.StartPos = FilePosEmpty + f.EndPos = 0 + } else if f.StartPos >= f.FileDataSize && f.EndPos >= f.FileDataSize { + f.StartPos = FilePosEmpty + f.EndPos = 0 + } else if f.StartPos >= f.FileDataSize { + f.StartPos = 0 + } else if f.EndPos >= f.FileDataSize { + f.EndPos = f.FileDataSize - 1 + } + if f.MaxSize <= 0 || f.FileOffset < 0 || (f.StartPos < 0 && f.StartPos != FilePosEmpty) || f.StartPos >= f.MaxSize || f.EndPos < 0 || f.EndPos >= f.MaxSize { + return fmt.Errorf("invalid cbuf metadata version[%d] filedatasize[%d] maxsize[%d] fileoffset[%d] startpos[%d] endpos[%d]", f.Version, f.FileDataSize, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos) + } + return nil +} + +// no error checking of meta values +func (f *File) writeMeta() error { + if f.OSFile == nil { + return fmt.Errorf("no *os.File") + } + if !f.hasExLock() { + return fmt.Errorf("writeMeta must hold LOCK_EX") + } + _, err := f.OSFile.Seek(0, 0) + if err != nil { + return fmt.Errorf("cannot seek file: %w", err) + } + metaStr := fmt.Sprintf(HeaderFmt, f.Version, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos) + fullMetaStr := fmt.Sprintf(FullHeaderFmt, metaStr) + _, err = f.OSFile.WriteString(fullMetaStr) + if err != nil { + return fmt.Errorf("write error: %w", err) + } + return nil +} + +// returns (fileOffset, datasize, error) +// datasize is the current amount of readable data held in the cirfile +func (f *File) GetStartOffsetAndSize(ctx context.Context) (int64, int64, error) { + err := f.flock(ctx, syscall.LOCK_SH) + if err != nil { + return 0, 0, err + } + defer f.unflock() + err = f.readMeta() + if err != nil { + return 0, 0, err + } + chunks := f.getFileChunks() + return f.FileOffset, totalChunksSize(chunks), nil +} + +type fileChunk struct { + StartPos int64 + Len int64 +} + +func totalChunksSize(chunks []fileChunk) int64 { + var rtn int64 + for _, chunk := range chunks { + rtn += chunk.Len + } + return rtn +} + +func advanceChunks(chunks []fileChunk, offset int64) []fileChunk { + if offset < 0 { + panic(fmt.Sprintf("invalid negative offset: %d", offset)) + } + if offset == 0 { + return chunks + } + var rtn []fileChunk + for _, chunk := range chunks { + if offset >= chunk.Len { + offset = offset - chunk.Len + continue + } + if offset == 0 { + rtn = append(rtn, chunk) + } else { + rtn = append(rtn, fileChunk{chunk.StartPos + offset, chunk.Len - offset}) + offset = 0 + } + } + return rtn +} + +func (f *File) getFileChunks() []fileChunk { + if f.StartPos == FilePosEmpty { + return nil + } + if f.EndPos >= f.StartPos { + return []fileChunk{fileChunk{f.StartPos, f.EndPos - f.StartPos + 1}} + } + return []fileChunk{ + fileChunk{f.StartPos, f.FileDataSize - f.StartPos}, + fileChunk{0, f.EndPos + 1}, + } +} + +func (f *File) getFreeChunks() []fileChunk { + if f.StartPos == FilePosEmpty { + return []fileChunk{fileChunk{0, f.MaxSize}} + } + if (f.EndPos == f.StartPos-1) || (f.StartPos == 0 && f.EndPos == f.MaxSize-1) { + return nil + } + if f.EndPos < f.StartPos { + return []fileChunk{fileChunk{f.EndPos + 1, f.StartPos - f.EndPos - 1}} + } + var rtn []fileChunk + if f.EndPos < f.MaxSize-1 { + rtn = append(rtn, fileChunk{f.EndPos + 1, f.MaxSize - f.EndPos - 1}) + } + if f.StartPos > 0 { + rtn = append(rtn, fileChunk{0, f.StartPos}) + } + return rtn +} + +// returns (realOffset, data, error) +// will only return io.EOF when len(data) == 0, otherwise will just do a short read +func (f *File) ReadNext(ctx context.Context, buf []byte, offset int64) (int64, int, error) { + err := f.flock(ctx, syscall.LOCK_SH) + if err != nil { + return 0, 0, err + } + defer f.unflock() + err = f.readMeta() + if err != nil { + return 0, 0, err + } + if offset < f.FileOffset { + offset = f.FileOffset + } + relativeOffset := offset - f.FileOffset + chunks := f.getFileChunks() + curSize := totalChunksSize(chunks) + if offset >= f.FileOffset+curSize { + return f.FileOffset + curSize, 0, nil + } + chunks = advanceChunks(chunks, relativeOffset) + numRead := 0 + for _, chunk := range chunks { + if numRead >= len(buf) { + break + } + toRead := len(buf) - numRead + if toRead > int(chunk.Len) { + toRead = int(chunk.Len) + } + nr, err := f.OSFile.ReadAt(buf[numRead:numRead+toRead], chunk.StartPos+HeaderLen) + if err != nil { + return offset, 0, err + } + numRead += nr + } + return offset, numRead, nil +} + +func (f *File) ensureFreeSpace(requiredSpace int64) error { + if f.StartPos == FilePosEmpty { + return nil + } + chunks := f.getFileChunks() + curSpace := f.MaxSize - totalChunksSize(chunks) + if curSpace >= requiredSpace { + return nil + } + neededSpace := requiredSpace - curSpace + if requiredSpace >= f.MaxSize { + f.StartPos = FilePosEmpty + f.EndPos = 0 + f.FileOffset += neededSpace + } else { + f.StartPos = (f.StartPos + neededSpace) % f.MaxSize + f.FileOffset += neededSpace + } + return f.writeMeta() +} + +func (f *File) AppendData(ctx context.Context, buf []byte) error { + err := f.flock(ctx, syscall.LOCK_EX) + if err != nil { + return err + } + defer f.unflock() + err = f.readMeta() + if err != nil { + return err + } + err = f.ensureFreeSpace(int64(len(buf))) + if err != nil { + return err + } + if len(buf) >= int(f.MaxSize) { + buf = buf[len(buf)-int(f.MaxSize):] + } + chunks := f.getFreeChunks() + numWrite := 0 + for _, chunk := range chunks { + if numWrite >= len(buf) { + break + } + if chunk.Len == 0 { + continue + } + toRead := len(buf) - numWrite + if toRead > int(chunk.Len) { + toRead = int(chunk.Len) + } + nw, err := f.OSFile.WriteAt(buf[numWrite:numWrite+toRead], chunk.StartPos+HeaderLen) + if err != nil { + return err + } + if chunk.StartPos+int64(nw) > f.FileDataSize { + f.FileDataSize = chunk.StartPos + int64(nw) + } + if f.StartPos == FilePosEmpty { + f.StartPos = chunk.StartPos + } + f.EndPos = chunk.StartPos + int64(nw) - 1 + numWrite += nw + } + err = f.writeMeta() + if err != nil { + return err + } + return nil +} diff --git a/pkg/cirfile/cirfile_test.go b/pkg/cirfile/cirfile_test.go new file mode 100644 index 000000000..4e4f555c9 --- /dev/null +++ b/pkg/cirfile/cirfile_test.go @@ -0,0 +1,222 @@ +package cirfile + +import ( + "context" + "fmt" + "os" + "path" + "syscall" + "testing" + "time" +) + +func validateFileSize(t *testing.T, name string, size int) { + finfo, err := os.Stat(name) + if err != nil { + t.Fatalf("error stating file[%s]: %v", name, err) + } + if int(finfo.Size()) != size { + t.Fatalf("invalid file[%s] expected[%d] got[%d]", name, size, finfo.Size()) + } +} + +func validateMeta(t *testing.T, desc string, f *File, startPos int64, endPos int64, dataSize int64, offset int64) { + if f.StartPos != startPos || f.EndPos != endPos || f.FileDataSize != dataSize || f.FileOffset != offset { + t.Fatalf("metadata error (%s): startpos[%d %d] endpos[%d %d] filedatasize[%d %d] fileoffset[%d %d]", desc, f.StartPos, startPos, f.EndPos, endPos, f.FileDataSize, dataSize, f.FileOffset, offset) + } +} + +func dumpFile(name string) { + barr, _ := os.ReadFile(name) + fmt.Printf("<<<\n%s\n>>>", string(barr)) +} + +func makeData(size int) string { + var rtn string + for { + if len(rtn) >= size { + break + } + needed := size - len(rtn) + if needed < 10 { + rtn += "123456789\n"[0:needed] + break + } + rtn += "123456789\n" + } + return rtn +} + +func TestCreate(t *testing.T) { + tempDir := t.TempDir() + f1Name := path.Join(tempDir, "f1.cf") + f, err := OpenCirFile(f1Name) + if err == nil || f != nil { + t.Fatalf("OpenCirFile f1.cf should fail (no file)") + } + f, err = CreateCirFile(f1Name, 100) + if err != nil { + t.Fatalf("CreateCirFile f1.cf failed: %v", err) + } + if f == nil { + t.Fatalf("CreateCirFile f1.cf returned nil") + } + err = f.ReadMeta(context.Background()) + if err != nil { + t.Fatalf("cannot readmeta from f1.cf: %v", err) + } + validateFileSize(t, f1Name, 256) + if f.Version != CurrentVersion || f.MaxSize != 100 || f.FileOffset != 0 || f.StartPos != FilePosEmpty || f.EndPos != 0 || f.FileDataSize != 0 || f.FlockStatus != 0 { + t.Fatalf("error with initial metadata #%v", f) + } + buf := make([]byte, 200) + realOffset, nr, err := f.ReadNext(context.Background(), buf, 0) + if realOffset != 0 || nr != 0 || err != nil { + t.Fatalf("error with empty read: real-offset[%d] nr[%d] err[%v]", realOffset, nr, err) + } + realOffset, nr, err = f.ReadNext(context.Background(), buf, 1000) + if realOffset != 0 || nr != 0 || err != nil { + t.Fatalf("error with empty read: real-offset[%d] nr[%d] err[%v]", realOffset, nr, err) + } + f2, err := CreateCirFile(f1Name, 100) + if err == nil || f2 != nil { + t.Fatalf("should be an error to create duplicate CirFile") + } +} + +func TestFile(t *testing.T) { + tempDir := t.TempDir() + f1Name := path.Join(tempDir, "f1.cf") + f, err := CreateCirFile(f1Name, 100) + if err != nil { + t.Fatalf("cannot create cirfile: %v", err) + } + err = f.AppendData(context.Background(), nil) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen) + if f.StartPos != FilePosEmpty || f.EndPos != 0 || f.FileDataSize != 0 { + t.Fatalf("metadata error (1): %#v", f) + } + err = f.AppendData(context.Background(), []byte("hello")) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen+5) + if f.StartPos != 0 || f.EndPos != 4 || f.FileDataSize != 5 { + t.Fatalf("metadata error (2): %#v", f) + } + err = f.AppendData(context.Background(), []byte(" foo")) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen+9) + validateMeta(t, "3", f, 0, 8, 9, 0) + err = f.AppendData(context.Background(), []byte("\n"+makeData(20))) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen+30) + validateMeta(t, "4", f, 0, 29, 30, 0) + + data120 := makeData(120) + err = f.AppendData(context.Background(), []byte(data120)) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen+100) + validateMeta(t, "5", f, 0, 99, 100, 50) + err = f.AppendData(context.Background(), []byte("foo ")) + if err != nil { + t.Fatalf("cannot append data: %v", err) + } + validateFileSize(t, f1Name, HeaderLen+100) + validateMeta(t, "6", f, 4, 3, 100, 54) + + buf := make([]byte, 5) + realOffset, nr, err := f.ReadNext(context.Background(), buf, 0) + if err != nil { + t.Fatalf("cannot ReadNext: %v", err) + } + if realOffset != 54 { + t.Fatalf("wrong realoffset got[%d] expected[%d]", realOffset, 54) + } + if nr != 5 { + t.Fatalf("wrong nr got[%d] expected[%d]", nr, 5) + } + if string(buf[0:nr]) != "56789" { + t.Fatalf("wrong buf return got[%s] expected[%s]", string(buf[0:nr]), "56789") + } + realOffset, nr, err = f.ReadNext(context.Background(), buf, 60) + if err != nil { + t.Fatalf("cannot readnext: %v", err) + } + if realOffset != 60 && nr != 5 { + t.Fatalf("invalid rtn realoffset[%d] nr[%d]", realOffset, nr) + } + if string(buf[0:nr]) != "12345" { + t.Fatalf("invalid rtn buf[%s]", string(buf[0:nr])) + } + realOffset, nr, err = f.ReadNext(context.Background(), buf, 800) + if err != nil || realOffset != 154 || nr != 0 { + t.Fatalf("invalid past end read: err[%v] realoffset[%d] nr[%d]", err, realOffset, nr) + } + realOffset, nr, err = f.ReadNext(context.Background(), buf, 150) + if err != nil || realOffset != 150 || nr != 4 || string(buf[0:nr]) != "foo " { + t.Fatalf("invalid end read: err[%v] realoffset[%d] nr[%d] buf[%s]", err, realOffset, nr, string(buf[0:nr])) + } +} + +func TestFlock(t *testing.T) { + tempDir := t.TempDir() + f1Name := path.Join(tempDir, "f1.cf") + f, err := CreateCirFile(f1Name, 100) + if err != nil { + t.Fatalf("cannot create cirfile: %v", err) + } + fd2, err := os.OpenFile(f1Name, os.O_RDWR, 0777) + if err != nil { + t.Fatalf("cannot open file: %v", err) + } + err = syscall.Flock(int(fd2.Fd()), syscall.LOCK_EX) + if err != nil { + t.Fatalf("cannot lock fd: %v", err) + } + err = f.AppendData(nil, []byte("hello")) + if err != syscall.EWOULDBLOCK { + t.Fatalf("append should fail with EWOULDBLOCK") + } + timeoutCtx, _ := context.WithTimeout(context.Background(), 20*time.Millisecond) + startTs := time.Now() + err = f.ReadMeta(timeoutCtx) + if err != context.DeadlineExceeded { + t.Fatalf("readmeta should fail with context.DeadlineExceeded") + } + dur := time.Now().Sub(startTs) + if dur < 20*time.Millisecond { + t.Fatalf("readmeta should take at least 20ms") + } + syscall.Flock(int(fd2.Fd()), syscall.LOCK_UN) + err = f.ReadMeta(timeoutCtx) + if err != nil { + t.Fatalf("readmeta err: %v", err) + } + err = syscall.Flock(int(fd2.Fd()), syscall.LOCK_SH) + if err != nil { + t.Fatalf("cannot flock: %v", err) + } + err = f.AppendData(nil, []byte("hello")) + if err != syscall.EWOULDBLOCK { + t.Fatalf("append should fail with EWOULDBLOCK") + } + err = f.ReadMeta(timeoutCtx) + if err != nil { + t.Fatalf("readmeta err (should work because LOCK_SH): %v", err) + } + fd2.Close() + err = f.AppendData(nil, []byte("hello")) + if err != nil { + t.Fatalf("append error (should work fd2 was closed): %v", err) + } +}