2024-04-03 00:46:32 +02:00
// Copyright 2023, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
// Provides a mechanism for writing data to a buffer and reading it later. The buffer is stored in a map of buffered pipes, which are removed after a certain amount of time. The output of a buffered pipe can be read by sending a GET request to a specific URL.
package bufferedpipe
import (
"bytes"
"io"
"log"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/wavesrv/pkg/scbase"
2024-04-17 01:58:17 +02:00
"github.com/wavetermdev/waveterm/wavesrv/pkg/waveenc"
2024-04-03 00:46:32 +02:00
)
const (
BufferedPipeMapTTL = 30 * time . Second // The time-to-live for a buffered pipe in the map of buffered pipes.
BufferedPipeGetterUrl = "/api/buffered-pipe" // The URL for getting the output of a buffered pipe.
)
// A pipe that allows for lazy writing to a downstream writer. Data written to the pipe is buffered until WriteTo is called.
type BufferedPipe struct {
Key string // a unique key for the pipe
buffer bytes . Buffer // buffer of data to be written to the downstream writer once it is ready
closed atomic . Bool // whether the pipe has been closed
bufferDataCond * sync . Cond // Condition variable to signal waiting writers that there is either data to write or the pipe has been closed
downstreamLock * sync . Mutex // Lock to ensure that only one goroutine can read from the buffer at a time
}
// Create a new BufferedPipe with a timeout. The writer will be closed after the timeout
func NewBufferedPipe ( timeout time . Duration ) * BufferedPipe {
newPipe := & BufferedPipe {
Key : uuid . New ( ) . String ( ) ,
buffer : bytes . Buffer { } ,
closed : atomic . Bool { } ,
bufferDataCond : & sync . Cond { L : & sync . Mutex { } } ,
downstreamLock : & sync . Mutex { } ,
}
SetBufferedPipe ( newPipe )
time . AfterFunc ( timeout , func ( ) {
newPipe . Close ( )
} )
return newPipe
}
// Get the URL for reading the output of the pipe.
func ( pipe * BufferedPipe ) GetOutputUrl ( ) ( string , error ) {
qvals := make ( url . Values )
qvals . Set ( "key" , pipe . Key )
qvals . Set ( "nonce" , uuid . New ( ) . String ( ) )
2024-04-17 01:58:17 +02:00
hmacStr , err := waveenc . ComputeUrlHmac ( [ ] byte ( scbase . WaveAuthKey ) , BufferedPipeGetterUrl , qvals )
2024-04-03 00:46:32 +02:00
if err != nil {
return "" , err
}
qvals . Set ( "hmac" , hmacStr )
return BufferedPipeGetterUrl + "?" + qvals . Encode ( ) , nil
}
// Write data to the buffer.
func ( pipe * BufferedPipe ) Write ( p [ ] byte ) ( n int , err error ) {
if pipe . closed . Load ( ) {
return 0 , io . ErrClosedPipe
}
defer func ( ) {
pipe . bufferDataCond . L . Unlock ( )
pipe . bufferDataCond . Broadcast ( )
} ( )
pipe . bufferDataCond . L . Lock ( )
return pipe . buffer . Write ( p )
}
// Write all buffered data to a waiting writer and block, sending all subsequent data until the pipe is closed. Only one goroutine should call this method.
func ( pipe * BufferedPipe ) WriteTo ( w io . Writer ) ( n int64 , err error ) {
// Lock the buffer to ensure that only one downstream writer can read from it at a time.
if ! pipe . downstreamLock . TryLock ( ) {
return 0 , io . ErrClosedPipe
}
defer func ( ) {
pipe . bufferDataCond . L . Unlock ( )
pipe . downstreamLock . Unlock ( )
} ( )
pipe . bufferDataCond . L . Lock ( )
for {
n1 , err := pipe . buffer . WriteTo ( w )
if err != nil {
return n , err
}
n += n1
// Check if the pipe has been closed. If it has, we don't need to wait for more data.
if pipe . closed . Load ( ) {
break
}
// Wait for more data to be written to the buffer or for the pipe to be closed.
pipe . bufferDataCond . Wait ( )
}
return n , nil
}
// Close the pipe. This will cause any blocking WriteTo calls to return.
func ( pipe * BufferedPipe ) Close ( ) error {
defer pipe . bufferDataCond . Broadcast ( )
pipe . closed . Store ( true )
return nil
}
// Ensure that BufferedPipe implements the io.WriteCloser and io.WriterTo interfaces.
var _ io . WriteCloser = ( * BufferedPipe ) ( nil )
var _ io . WriterTo = ( * BufferedPipe ) ( nil )
type BufferedPipeMap struct {
_map map [ string ] * BufferedPipe
lock sync . Mutex
}
// A global map of registered buffered pipes.
var bufferedPipes = BufferedPipeMap { _map : make ( map [ string ] * BufferedPipe ) }
// Get a buffered pipe from the map of buffered pipes, given a key.
func GetBufferedPipe ( key string ) ( * BufferedPipe , bool ) {
bufferedPipes . lock . Lock ( )
defer bufferedPipes . lock . Unlock ( )
ewc , ok := bufferedPipes . _map [ key ]
return ewc , ok
}
// Set a buffered pipe in the map of buffered pipes.
func SetBufferedPipe ( pipe * BufferedPipe ) {
bufferedPipes . lock . Lock ( )
defer bufferedPipes . lock . Unlock ( )
key := pipe . Key
bufferedPipes . _map [ key ] = pipe
// Remove the buffered pipe after a certain amount of time
time . AfterFunc ( BufferedPipeMapTTL , func ( ) {
bufferedPipes . lock . Lock ( )
defer bufferedPipes . lock . Unlock ( )
pipe . Close ( )
log . Printf ( "removing buffered pipe %s" , key )
delete ( bufferedPipes . _map , key )
} )
}
// Handle a HTTP GET request to get the output of a buffered pipe, given a key.
func HandleGetBufferedPipeOutput ( w http . ResponseWriter , r * http . Request ) {
qvals := r . URL . Query ( )
key := qvals . Get ( "key" )
pipe , ok := GetBufferedPipe ( key )
if ! ok {
http . Error ( w , "buffered pipe not found" , http . StatusNotFound )
return
}
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
_ , err := pipe . WriteTo ( w )
if err != nil {
http . Error ( w , "error writing from buffer" , http . StatusInternalServerError )
return
}
}