mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-01-06 19:18:22 +01:00
e7725e0e11
* Fix unvalidated path warning in cirfile.go * Adding test * move dir path vars into global scope, set wave home in test if not set already * separate validation code into own testable func * do removal of cir file in cleanup * rename test file names * flock should return err if waiting without done fn * update comment * use timeout context * remove redundant validation * add more error messages * use filepath.join instead * validate when creating cirfile too * support .ptyout.cf, etc. * remove validation as cirfile is "trusted", since it only accepts pre-validated input * revert to passing nil context for flock in the Create file case
573 lines
15 KiB
Go
573 lines
15 KiB
Go
// Copyright 2023, Command Line Inc.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package cirfile
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
// CBUF[version] [maxsize] [fileoffset] [startpos] [endpos]
|
|
const HeaderFmt1 = "CBUF%02d %19d %19d %19d %19d\n" // 87 bytes
|
|
const HeaderLen = 256 // set to 256 for future expandability
|
|
const FullHeaderFmt = "%-255s\n" // 256 bytes (255 + newline)
|
|
const CurrentVersion = 1
|
|
const FilePosEmpty = -1 // sentinel, if startpos is set to -1, file is empty
|
|
|
|
const InitialLockDelay = 10 * time.Millisecond
|
|
const InitialLockTries = 5
|
|
const LockDelay = 100 * time.Millisecond
|
|
|
|
// File objects are *not* multithread safe, operations must be externally synchronized
|
|
type File struct {
|
|
OSFile *os.File
|
|
Version byte
|
|
MaxSize int64
|
|
FileOffset int64
|
|
StartPos int64
|
|
EndPos int64
|
|
FileDataSize int64 // size of data (does not include header size)
|
|
FlockStatus int
|
|
}
|
|
|
|
type Stat struct {
|
|
Location string
|
|
Version byte
|
|
MaxSize int64
|
|
FileOffset int64
|
|
DataSize int64
|
|
}
|
|
|
|
func (f *File) flock(ctx context.Context, lockType int) error {
|
|
err := syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB)
|
|
if err == nil {
|
|
f.FlockStatus = lockType
|
|
return nil
|
|
}
|
|
if err != syscall.EWOULDBLOCK {
|
|
return err
|
|
}
|
|
|
|
// Do not busy-wait unless we have a way to cancel the context
|
|
if ctx == nil || ctx.Done() == nil {
|
|
return syscall.EWOULDBLOCK
|
|
}
|
|
// busy-wait with context
|
|
numWaits := 0
|
|
for {
|
|
numWaits++
|
|
var timeout time.Duration
|
|
if numWaits <= InitialLockTries {
|
|
timeout = InitialLockDelay
|
|
} else {
|
|
timeout = LockDelay
|
|
}
|
|
select {
|
|
case <-time.After(timeout):
|
|
// TODO: Ineffective break statement
|
|
break
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
err = syscall.Flock(int(f.OSFile.Fd()), lockType|syscall.LOCK_NB)
|
|
if err == nil {
|
|
f.FlockStatus = lockType
|
|
return nil
|
|
}
|
|
if err != syscall.EWOULDBLOCK {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *File) unflock() {
|
|
if f.FlockStatus != 0 {
|
|
syscall.Flock(int(f.OSFile.Fd()), syscall.LOCK_UN) // ignore error (nothing to do about it anyway)
|
|
f.FlockStatus = 0
|
|
}
|
|
}
|
|
|
|
// does not read metadata because locking could block/fail. we want to be able
|
|
// to return a valid file struct without blocking.
|
|
func OpenCirFile(fileName string) (*File, error) {
|
|
fd, err := os.OpenFile(fileName, os.O_RDWR, 0777)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot open file: %w", err)
|
|
}
|
|
finfo, err := fd.Stat()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get file info: %w", err)
|
|
}
|
|
if finfo.Size() < HeaderLen {
|
|
return nil, fmt.Errorf("invalid cirfile, file length[%d] less than HeaderLen[%d]", finfo.Size(), HeaderLen)
|
|
}
|
|
rtn := &File{OSFile: fd}
|
|
return rtn, nil
|
|
}
|
|
|
|
func StatCirFile(ctx context.Context, fileName string) (*Stat, error) {
|
|
file, err := OpenCirFile(fileName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer file.Close()
|
|
fileOffset, dataSize, err := file.GetStartOffsetAndSize(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Stat{
|
|
Location: fileName,
|
|
Version: file.Version,
|
|
MaxSize: file.MaxSize,
|
|
FileOffset: fileOffset,
|
|
DataSize: dataSize,
|
|
}, nil
|
|
}
|
|
|
|
// if the file already exists, it is an error.
|
|
// there is a race condition if two goroutines try to create the same file between Stat() and Create(), so
|
|
//
|
|
// they both might get no error, but only one file will be valid. if this is a concern, this call
|
|
// should be externally synchronized.
|
|
func CreateCirFile(fileName string, maxSize int64) (*File, error) {
|
|
if maxSize <= 0 {
|
|
return nil, fmt.Errorf("invalid maxsize[%d]", maxSize)
|
|
}
|
|
_, err := os.Stat(fileName)
|
|
if err == nil {
|
|
return nil, fmt.Errorf("file[%s] already exists", fileName)
|
|
}
|
|
if !os.IsNotExist(err) {
|
|
return nil, fmt.Errorf("cannot stat: %w", err)
|
|
}
|
|
fd, err := os.Create(fileName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rtn := &File{OSFile: fd, Version: CurrentVersion, MaxSize: maxSize, StartPos: FilePosEmpty}
|
|
err = rtn.flock(nil, syscall.LOCK_EX) // pass nil context here for a fast fail if someone else is also creating the same file
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot lock file: %w", err)
|
|
}
|
|
defer rtn.unflock()
|
|
err = rtn.writeMeta()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot write metadata: %w", err)
|
|
}
|
|
return rtn, nil
|
|
}
|
|
|
|
func (f *File) Close() error {
|
|
return f.OSFile.Close()
|
|
}
|
|
|
|
func (f *File) ReadMeta(ctx context.Context) error {
|
|
err := f.flock(ctx, syscall.LOCK_SH)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.unflock()
|
|
return f.readMeta()
|
|
}
|
|
|
|
func (f *File) hasShLock() bool {
|
|
return f.FlockStatus == syscall.LOCK_EX || f.FlockStatus == syscall.LOCK_SH
|
|
}
|
|
|
|
func (f *File) hasExLock() bool {
|
|
return f.FlockStatus == syscall.LOCK_EX
|
|
}
|
|
|
|
func (f *File) readMeta() error {
|
|
if f.OSFile == nil {
|
|
return fmt.Errorf("no *os.File")
|
|
}
|
|
if !f.hasShLock() {
|
|
return fmt.Errorf("writeMeta must hold LOCK_SH")
|
|
}
|
|
_, err := f.OSFile.Seek(0, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot seek file: %w", err)
|
|
}
|
|
finfo, err := f.OSFile.Stat()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot stat file: %w", err)
|
|
}
|
|
if finfo.Size() < 256 {
|
|
return fmt.Errorf("invalid cbuf file size[%d] < 256", finfo.Size())
|
|
}
|
|
f.FileDataSize = finfo.Size() - 256
|
|
buf := make([]byte, 256)
|
|
_, err = io.ReadFull(f.OSFile, buf)
|
|
if err != nil {
|
|
return fmt.Errorf("error reading header: %w", err)
|
|
}
|
|
// currently only one version, so we don't need to have special logic here yet
|
|
_, err = fmt.Sscanf(string(buf), HeaderFmt1, &f.Version, &f.MaxSize, &f.FileOffset, &f.StartPos, &f.EndPos)
|
|
if err != nil {
|
|
return fmt.Errorf("sscanf error: %w", err)
|
|
}
|
|
if f.Version != CurrentVersion {
|
|
return fmt.Errorf("invalid cbuf version[%d]", f.Version)
|
|
}
|
|
// possible incomplete write, fix start/end pos to be within filesize
|
|
if f.FileDataSize == 0 || (f.StartPos >= f.FileDataSize && f.EndPos >= f.FileDataSize) {
|
|
f.StartPos = FilePosEmpty
|
|
f.EndPos = 0
|
|
} else if f.StartPos >= f.FileDataSize {
|
|
f.StartPos = 0
|
|
} else if f.EndPos >= f.FileDataSize {
|
|
f.EndPos = f.FileDataSize - 1
|
|
}
|
|
if f.MaxSize <= 0 || f.FileOffset < 0 || (f.StartPos < 0 && f.StartPos != FilePosEmpty) || f.StartPos >= f.MaxSize || f.EndPos < 0 || f.EndPos >= f.MaxSize {
|
|
return fmt.Errorf("invalid cbuf metadata version[%d] filedatasize[%d] maxsize[%d] fileoffset[%d] startpos[%d] endpos[%d]", f.Version, f.FileDataSize, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// no error checking of meta values
|
|
func (f *File) writeMeta() error {
|
|
if f.OSFile == nil {
|
|
return fmt.Errorf("no *os.File")
|
|
}
|
|
if !f.hasExLock() {
|
|
return fmt.Errorf("writeMeta must hold LOCK_EX")
|
|
}
|
|
_, err := f.OSFile.Seek(0, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot seek file: %w", err)
|
|
}
|
|
metaStr := fmt.Sprintf(HeaderFmt1, f.Version, f.MaxSize, f.FileOffset, f.StartPos, f.EndPos)
|
|
fullMetaStr := fmt.Sprintf(FullHeaderFmt, metaStr)
|
|
_, err = f.OSFile.WriteString(fullMetaStr)
|
|
if err != nil {
|
|
return fmt.Errorf("write error: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// returns (fileOffset, datasize, error)
|
|
// datasize is the current amount of readable data held in the cirfile
|
|
func (f *File) GetStartOffsetAndSize(ctx context.Context) (int64, int64, error) {
|
|
err := f.flock(ctx, syscall.LOCK_SH)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
chunks := f.getFileChunks()
|
|
return f.FileOffset, totalChunksSize(chunks), nil
|
|
}
|
|
|
|
type fileChunk struct {
|
|
StartPos int64
|
|
Len int64
|
|
}
|
|
|
|
func totalChunksSize(chunks []fileChunk) int64 {
|
|
var rtn int64
|
|
for _, chunk := range chunks {
|
|
rtn += chunk.Len
|
|
}
|
|
return rtn
|
|
}
|
|
|
|
func advanceChunks(chunks []fileChunk, offset int64) []fileChunk {
|
|
if offset < 0 {
|
|
panic(fmt.Sprintf("invalid negative offset: %d", offset))
|
|
}
|
|
if offset == 0 {
|
|
return chunks
|
|
}
|
|
var rtn []fileChunk
|
|
for _, chunk := range chunks {
|
|
if offset >= chunk.Len {
|
|
offset = offset - chunk.Len
|
|
continue
|
|
}
|
|
if offset == 0 {
|
|
rtn = append(rtn, chunk)
|
|
} else {
|
|
rtn = append(rtn, fileChunk{chunk.StartPos + offset, chunk.Len - offset})
|
|
offset = 0
|
|
}
|
|
}
|
|
return rtn
|
|
}
|
|
|
|
func (f *File) getFileChunks() []fileChunk {
|
|
if f.StartPos == FilePosEmpty {
|
|
return nil
|
|
}
|
|
if f.EndPos >= f.StartPos {
|
|
return []fileChunk{{f.StartPos, f.EndPos - f.StartPos + 1}}
|
|
}
|
|
return []fileChunk{
|
|
{f.StartPos, f.FileDataSize - f.StartPos},
|
|
{0, f.EndPos + 1},
|
|
}
|
|
}
|
|
|
|
func (f *File) getFreeChunks() []fileChunk {
|
|
if f.StartPos == FilePosEmpty {
|
|
return []fileChunk{{0, f.MaxSize}}
|
|
}
|
|
if (f.EndPos == f.StartPos-1) || (f.StartPos == 0 && f.EndPos == f.MaxSize-1) {
|
|
return nil
|
|
}
|
|
if f.EndPos < f.StartPos {
|
|
return []fileChunk{{f.EndPos + 1, f.StartPos - f.EndPos - 1}}
|
|
}
|
|
var rtn []fileChunk
|
|
if f.EndPos < f.MaxSize-1 {
|
|
rtn = append(rtn, fileChunk{f.EndPos + 1, f.MaxSize - f.EndPos - 1})
|
|
}
|
|
if f.StartPos > 0 {
|
|
rtn = append(rtn, fileChunk{0, f.StartPos})
|
|
}
|
|
return rtn
|
|
}
|
|
|
|
// returns (offset, data, err)
|
|
func (f *File) ReadAll(ctx context.Context) (int64, []byte, error) {
|
|
err := f.flock(ctx, syscall.LOCK_SH)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
chunks := f.getFileChunks()
|
|
curSize := totalChunksSize(chunks)
|
|
buf := make([]byte, curSize)
|
|
realOffset, nr, err := f.internalReadNext(buf, 0)
|
|
return realOffset, buf[0:nr], err
|
|
}
|
|
|
|
func (f *File) ReadAtWithMax(ctx context.Context, offset int64, maxSize int64) (int64, []byte, error) {
|
|
err := f.flock(ctx, syscall.LOCK_SH)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
chunks := f.getFileChunks()
|
|
curSize := totalChunksSize(chunks)
|
|
var buf []byte
|
|
if maxSize > curSize {
|
|
buf = make([]byte, curSize)
|
|
} else {
|
|
buf = make([]byte, maxSize)
|
|
}
|
|
realOffset, nr, err := f.internalReadNext(buf, offset)
|
|
return realOffset, buf[0:nr], err
|
|
}
|
|
|
|
func (f *File) internalReadNext(buf []byte, offset int64) (int64, int, error) {
|
|
if offset < f.FileOffset {
|
|
offset = f.FileOffset
|
|
}
|
|
relativeOffset := offset - f.FileOffset
|
|
chunks := f.getFileChunks()
|
|
curSize := totalChunksSize(chunks)
|
|
if offset >= f.FileOffset+curSize {
|
|
return f.FileOffset + curSize, 0, nil
|
|
}
|
|
chunks = advanceChunks(chunks, relativeOffset)
|
|
numRead := 0
|
|
for _, chunk := range chunks {
|
|
if numRead >= len(buf) {
|
|
break
|
|
}
|
|
toRead := len(buf) - numRead
|
|
if toRead > int(chunk.Len) {
|
|
toRead = int(chunk.Len)
|
|
}
|
|
nr, err := f.OSFile.ReadAt(buf[numRead:numRead+toRead], chunk.StartPos+HeaderLen)
|
|
if err != nil {
|
|
return offset, 0, err
|
|
}
|
|
numRead += nr
|
|
}
|
|
return offset, numRead, nil
|
|
}
|
|
|
|
// returns (realOffset, numread, error)
|
|
// will only return io.EOF when len(data) == 0, otherwise will just do a short read
|
|
func (f *File) ReadNext(ctx context.Context, buf []byte, offset int64) (int64, int, error) {
|
|
err := f.flock(ctx, syscall.LOCK_SH)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
return f.internalReadNext(buf, offset)
|
|
}
|
|
|
|
func (f *File) ensureFreeSpace(requiredSpace int64) error {
|
|
chunks := f.getFileChunks()
|
|
curSpace := f.MaxSize - totalChunksSize(chunks)
|
|
if curSpace >= requiredSpace {
|
|
return nil
|
|
}
|
|
neededSpace := requiredSpace - curSpace
|
|
if requiredSpace >= f.MaxSize || f.StartPos == FilePosEmpty {
|
|
f.StartPos = FilePosEmpty
|
|
f.EndPos = 0
|
|
f.FileOffset += neededSpace
|
|
} else {
|
|
f.StartPos = (f.StartPos + neededSpace) % f.MaxSize
|
|
f.FileOffset += neededSpace
|
|
}
|
|
return f.writeMeta()
|
|
}
|
|
|
|
// does not implement io.WriterAt (needs context)
|
|
func (f *File) WriteAt(ctx context.Context, buf []byte, writePos int64) error {
|
|
if writePos < 0 {
|
|
return fmt.Errorf("WriteAt got invalid writePos[%d]", writePos)
|
|
}
|
|
err := f.flock(ctx, syscall.LOCK_EX)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
chunks := f.getFileChunks()
|
|
currentSize := totalChunksSize(chunks)
|
|
if writePos < f.FileOffset {
|
|
negOffset := f.FileOffset - writePos
|
|
if negOffset >= int64(len(buf)) {
|
|
return nil
|
|
}
|
|
buf = buf[negOffset:]
|
|
writePos = f.FileOffset
|
|
}
|
|
if writePos > f.FileOffset+currentSize {
|
|
// fill gap with zero bytes
|
|
posOffset := writePos - (f.FileOffset + currentSize)
|
|
err = f.ensureFreeSpace(int64(posOffset))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var zeroBuf []byte
|
|
if posOffset >= f.MaxSize {
|
|
zeroBuf = make([]byte, f.MaxSize)
|
|
} else {
|
|
zeroBuf = make([]byte, posOffset)
|
|
}
|
|
err = f.internalAppendData(zeroBuf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// recalc chunks/currentSize
|
|
chunks = f.getFileChunks()
|
|
currentSize = totalChunksSize(chunks)
|
|
// after writing the zero bytes, writePos == f.FileOffset+currentSize (the rest is a straight append)
|
|
}
|
|
// now writePos >= f.FileOffset && writePos <= f.FileOffset+currentSize (check invariant)
|
|
if writePos < f.FileOffset || writePos > f.FileOffset+currentSize {
|
|
panic(fmt.Sprintf("invalid writePos, invariant violated writepos[%d] fileoffset[%d] currentsize[%d]", writePos, f.FileOffset, currentSize))
|
|
}
|
|
// overwrite existing data (in chunks). advance by writePosOffset
|
|
writePosOffset := writePos - f.FileOffset
|
|
if writePosOffset < currentSize {
|
|
advChunks := advanceChunks(chunks, writePosOffset)
|
|
nw, err := f.writeToChunks(buf, advChunks, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
buf = buf[nw:]
|
|
if len(buf) == 0 {
|
|
return nil
|
|
}
|
|
}
|
|
// buf contains what was unwritten. this unwritten data is now just a straight append
|
|
return f.internalAppendData(buf)
|
|
}
|
|
|
|
// try writing to chunks, returns (nw, error)
|
|
func (f *File) writeToChunks(buf []byte, chunks []fileChunk, updatePos bool) (int64, error) {
|
|
var numWrite int64
|
|
for _, chunk := range chunks {
|
|
if numWrite >= int64(len(buf)) {
|
|
break
|
|
}
|
|
if chunk.Len == 0 {
|
|
continue
|
|
}
|
|
toWrite := int64(len(buf)) - numWrite
|
|
if toWrite > chunk.Len {
|
|
toWrite = chunk.Len
|
|
}
|
|
nw, err := f.OSFile.WriteAt(buf[numWrite:numWrite+toWrite], chunk.StartPos+HeaderLen)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if updatePos {
|
|
if chunk.StartPos+int64(nw) > f.FileDataSize {
|
|
f.FileDataSize = chunk.StartPos + int64(nw)
|
|
}
|
|
if f.StartPos == FilePosEmpty {
|
|
f.StartPos = chunk.StartPos
|
|
}
|
|
f.EndPos = chunk.StartPos + int64(nw) - 1
|
|
}
|
|
numWrite += int64(nw)
|
|
}
|
|
return numWrite, nil
|
|
}
|
|
|
|
func (f *File) internalAppendData(buf []byte) error {
|
|
err := f.ensureFreeSpace(int64(len(buf)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(buf) >= int(f.MaxSize) {
|
|
buf = buf[len(buf)-int(f.MaxSize):]
|
|
}
|
|
chunks := f.getFreeChunks()
|
|
// don't track nw because we know we have enough free space to write entire buf
|
|
_, err = f.writeToChunks(buf, chunks, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = f.writeMeta()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *File) AppendData(ctx context.Context, buf []byte) error {
|
|
err := f.flock(ctx, syscall.LOCK_EX)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.unflock()
|
|
err = f.readMeta()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return f.internalAppendData(buf)
|
|
}
|