waveterm/pkg/vdom/vdomclient/streamingresp.go
2024-11-02 10:58:13 -07:00

130 lines
3.0 KiB
Go

// Copyright 2024, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package vdomclient
import (
"bytes"
"net/http"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
)
const maxChunkSize = 64 * 1024 // 64KB maximum chunk size
// StreamingResponseWriter implements http.ResponseWriter interface to stream response
// data through a channel rather than buffering it in memory. This is particularly
// useful for handling large responses like video streams or file downloads.
type StreamingResponseWriter struct {
header http.Header
statusCode int
respChan chan<- wshrpc.RespOrErrorUnion[wshrpc.VDomUrlRequestResponse]
headerSent bool
buffer *bytes.Buffer
}
func NewStreamingResponseWriter(respChan chan<- wshrpc.RespOrErrorUnion[wshrpc.VDomUrlRequestResponse]) *StreamingResponseWriter {
return &StreamingResponseWriter{
header: make(http.Header),
statusCode: http.StatusOK,
respChan: respChan,
headerSent: false,
buffer: bytes.NewBuffer(make([]byte, 0, maxChunkSize)),
}
}
func (w *StreamingResponseWriter) Header() http.Header {
return w.header
}
func (w *StreamingResponseWriter) WriteHeader(statusCode int) {
if w.headerSent {
return
}
w.statusCode = statusCode
w.headerSent = true
headers := make(map[string]string)
for key, values := range w.header {
if len(values) > 0 {
headers[key] = values[0]
}
}
w.respChan <- wshrpc.RespOrErrorUnion[wshrpc.VDomUrlRequestResponse]{
Response: wshrpc.VDomUrlRequestResponse{
StatusCode: w.statusCode,
Headers: headers,
},
}
}
// sendChunk sends a single chunk of exactly maxChunkSize (or less)
func (w *StreamingResponseWriter) sendChunk(data []byte) {
if len(data) == 0 {
return
}
chunk := make([]byte, len(data))
copy(chunk, data)
w.respChan <- wshrpc.RespOrErrorUnion[wshrpc.VDomUrlRequestResponse]{
Response: wshrpc.VDomUrlRequestResponse{
Body: chunk,
},
}
}
func (w *StreamingResponseWriter) Write(data []byte) (int, error) {
if !w.headerSent {
w.WriteHeader(http.StatusOK)
}
originalLen := len(data)
// If we already have data in the buffer
if w.buffer.Len() > 0 {
// Fill the buffer up to maxChunkSize
spaceInBuffer := maxChunkSize - w.buffer.Len()
if spaceInBuffer > 0 {
// How much of the new data can fit in the buffer
toBuffer := spaceInBuffer
if toBuffer > len(data) {
toBuffer = len(data)
}
w.buffer.Write(data[:toBuffer])
data = data[toBuffer:] // Advance data slice
}
// If buffer is full, send it
if w.buffer.Len() == maxChunkSize {
w.sendChunk(w.buffer.Bytes())
w.buffer.Reset()
}
}
// Send any full chunks from data
for len(data) >= maxChunkSize {
w.sendChunk(data[:maxChunkSize])
data = data[maxChunkSize:]
}
// Buffer any remaining data
if len(data) > 0 {
w.buffer.Write(data)
}
return originalLen, nil
}
func (w *StreamingResponseWriter) Close() error {
if !w.headerSent {
w.WriteHeader(http.StatusOK)
}
if w.buffer.Len() > 0 {
w.sendChunk(w.buffer.Bytes())
w.buffer.Reset()
}
return nil
}