mirror of
https://github.com/wavetermdev/waveterm.git
synced 2025-03-25 15:49:29 +01:00
implement WriteAt, refactored and tested append, writeat still needs testing
This commit is contained in:
parent
38870f9c6e
commit
e1eecae6d3
@ -20,6 +20,7 @@ const InitialLockDelay = 10 * time.Millisecond
|
||||
const InitialLockTries = 5
|
||||
const LockDelay = 100 * time.Millisecond
|
||||
|
||||
// File objects are *not* multithread safe, operations must be externally synchronized
|
||||
type File struct {
|
||||
OSFile *os.File
|
||||
Version byte
|
||||
@ -72,8 +73,10 @@ func (f *File) flock(ctx context.Context, lockType int) error {
|
||||
}
|
||||
|
||||
func (f *File) unflock() {
|
||||
syscall.Flock(int(f.OSFile.Fd()), syscall.LOCK_UN) // ignore error (nothing to do about it anyway)
|
||||
f.FlockStatus = 0
|
||||
if f.FlockStatus != 0 {
|
||||
syscall.Flock(int(f.OSFile.Fd()), syscall.LOCK_UN) // ignore error (nothing to do about it anyway)
|
||||
f.FlockStatus = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -127,6 +130,10 @@ func CreateCirFile(fileName string, maxSize int64) (*File, error) {
|
||||
return rtn, nil
|
||||
}
|
||||
|
||||
func (f *File) Close() error {
|
||||
return f.OSFile.Close()
|
||||
}
|
||||
|
||||
func (f *File) ReadMeta(ctx context.Context) error {
|
||||
err := f.flock(ctx, syscall.LOCK_SH)
|
||||
if err != nil {
|
||||
@ -341,16 +348,13 @@ func (f *File) ReadNext(ctx context.Context, buf []byte, offset int64) (int64, i
|
||||
}
|
||||
|
||||
func (f *File) ensureFreeSpace(requiredSpace int64) error {
|
||||
if f.StartPos == FilePosEmpty {
|
||||
return nil
|
||||
}
|
||||
chunks := f.getFileChunks()
|
||||
curSpace := f.MaxSize - totalChunksSize(chunks)
|
||||
if curSpace >= requiredSpace {
|
||||
return nil
|
||||
}
|
||||
neededSpace := requiredSpace - curSpace
|
||||
if requiredSpace >= f.MaxSize {
|
||||
if requiredSpace >= f.MaxSize || f.StartPos == FilePosEmpty {
|
||||
f.StartPos = FilePosEmpty
|
||||
f.EndPos = 0
|
||||
f.FileOffset += neededSpace
|
||||
@ -361,6 +365,126 @@ func (f *File) ensureFreeSpace(requiredSpace int64) error {
|
||||
return f.writeMeta()
|
||||
}
|
||||
|
||||
// does not implement io.WriterAt (needs context)
|
||||
func (f *File) WriteAt(ctx context.Context, buf []byte, writePos int64) error {
|
||||
if writePos < 0 {
|
||||
return fmt.Errorf("WriteAt got invalid writePos[%d]", writePos)
|
||||
}
|
||||
err := f.flock(ctx, syscall.LOCK_EX)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.unflock()
|
||||
err = f.readMeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunks := f.getFileChunks()
|
||||
currentSize := totalChunksSize(chunks)
|
||||
if writePos < f.FileOffset {
|
||||
negOffset := f.FileOffset - writePos
|
||||
if negOffset >= int64(len(buf)) {
|
||||
return nil
|
||||
}
|
||||
buf = buf[negOffset:]
|
||||
writePos = f.FileOffset
|
||||
}
|
||||
if writePos > f.FileOffset+currentSize {
|
||||
// fill gap with zero bytes
|
||||
posOffset := writePos - (f.FileOffset + currentSize)
|
||||
err = f.ensureFreeSpace(int64(posOffset))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var zeroBuf []byte
|
||||
if posOffset >= f.MaxSize {
|
||||
zeroBuf = make([]byte, f.MaxSize)
|
||||
} else {
|
||||
zeroBuf = make([]byte, posOffset)
|
||||
}
|
||||
err = f.internalAppendData(zeroBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// recalc chunks/currentSize
|
||||
chunks = f.getFileChunks()
|
||||
currentSize = totalChunksSize(chunks)
|
||||
// after writing the zero bytes, writePos == f.FileOffset+currentSize (the rest is a straight append)
|
||||
}
|
||||
// now writePos >= f.FileOffset && writePos <= f.FileOffset+currentSize (check invariant)
|
||||
if writePos < f.FileOffset || writePos > f.FileOffset+currentSize {
|
||||
panic(fmt.Sprintf("invalid writePos, invariant violated writepos[%d] fileoffset[%d] currentsize[%d]", writePos, f.FileOffset, currentSize))
|
||||
}
|
||||
// overwrite existing data (in chunks). advance by writePosOffset
|
||||
writePosOffset := writePos - f.FileOffset
|
||||
if writePosOffset < currentSize {
|
||||
advChunks := advanceChunks(chunks, writePosOffset)
|
||||
nw, err := f.writeToChunks(buf, advChunks, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf = buf[nw:]
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// buf contains what was unwritten. this unwritten data is now just a straight append
|
||||
return f.internalAppendData(buf)
|
||||
}
|
||||
|
||||
// try writing to chunks, returns (nw, error)
|
||||
func (f *File) writeToChunks(buf []byte, chunks []fileChunk, updatePos bool) (int64, error) {
|
||||
var numWrite int64
|
||||
for _, chunk := range chunks {
|
||||
if numWrite >= int64(len(buf)) {
|
||||
break
|
||||
}
|
||||
if chunk.Len == 0 {
|
||||
continue
|
||||
}
|
||||
toWrite := int64(len(buf)) - numWrite
|
||||
if toWrite > chunk.Len {
|
||||
toWrite = chunk.Len
|
||||
}
|
||||
nw, err := f.OSFile.WriteAt(buf[numWrite:numWrite+toWrite], chunk.StartPos+HeaderLen)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if updatePos {
|
||||
if chunk.StartPos+int64(nw) > f.FileDataSize {
|
||||
f.FileDataSize = chunk.StartPos + int64(nw)
|
||||
}
|
||||
if f.StartPos == FilePosEmpty {
|
||||
f.StartPos = chunk.StartPos
|
||||
}
|
||||
f.EndPos = chunk.StartPos + int64(nw) - 1
|
||||
}
|
||||
numWrite += int64(nw)
|
||||
}
|
||||
return numWrite, nil
|
||||
}
|
||||
|
||||
func (f *File) internalAppendData(buf []byte) error {
|
||||
err := f.ensureFreeSpace(int64(len(buf)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(buf) >= int(f.MaxSize) {
|
||||
buf = buf[len(buf)-int(f.MaxSize):]
|
||||
}
|
||||
chunks := f.getFreeChunks()
|
||||
// don't track nw because we know we have enough free space to write entire buf
|
||||
_, err = f.writeToChunks(buf, chunks, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = f.writeMeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *File) AppendData(ctx context.Context, buf []byte) error {
|
||||
err := f.flock(ctx, syscall.LOCK_EX)
|
||||
if err != nil {
|
||||
@ -371,42 +495,5 @@ func (f *File) AppendData(ctx context.Context, buf []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = f.ensureFreeSpace(int64(len(buf)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(buf) >= int(f.MaxSize) {
|
||||
buf = buf[len(buf)-int(f.MaxSize):]
|
||||
}
|
||||
chunks := f.getFreeChunks()
|
||||
numWrite := 0
|
||||
for _, chunk := range chunks {
|
||||
if numWrite >= len(buf) {
|
||||
break
|
||||
}
|
||||
if chunk.Len == 0 {
|
||||
continue
|
||||
}
|
||||
toRead := len(buf) - numWrite
|
||||
if toRead > int(chunk.Len) {
|
||||
toRead = int(chunk.Len)
|
||||
}
|
||||
nw, err := f.OSFile.WriteAt(buf[numWrite:numWrite+toRead], chunk.StartPos+HeaderLen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if chunk.StartPos+int64(nw) > f.FileDataSize {
|
||||
f.FileDataSize = chunk.StartPos + int64(nw)
|
||||
}
|
||||
if f.StartPos == FilePosEmpty {
|
||||
f.StartPos = chunk.StartPos
|
||||
}
|
||||
f.EndPos = chunk.StartPos + int64(nw) - 1
|
||||
numWrite += nw
|
||||
}
|
||||
err = f.writeMeta()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return f.internalAppendData(buf)
|
||||
}
|
||||
|
@ -96,17 +96,13 @@ func TestFile(t *testing.T) {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen)
|
||||
if f.StartPos != FilePosEmpty || f.EndPos != 0 || f.FileDataSize != 0 {
|
||||
t.Fatalf("metadata error (1): %#v", f)
|
||||
}
|
||||
validateMeta(t, "1", f, FilePosEmpty, 0, 0, 0)
|
||||
err = f.AppendData(context.Background(), []byte("hello"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
}
|
||||
validateFileSize(t, f1Name, HeaderLen+5)
|
||||
if f.StartPos != 0 || f.EndPos != 4 || f.FileDataSize != 5 {
|
||||
t.Fatalf("metadata error (2): %#v", f)
|
||||
}
|
||||
validateMeta(t, "2", f, 0, 4, 5, 0)
|
||||
err = f.AppendData(context.Background(), []byte(" foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot append data: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user