88 "net/http"
99 "slices"
1010 "strings"
11+ "time"
1112
1213 sdktypes "github.com/pokt-network/shannon-sdk/types"
1314 "google.golang.org/protobuf/proto"
@@ -20,12 +21,30 @@ import (
2021// A custom delimiter used to separate chunks in a streaming response.
2122const streamDelimiter = "||POKT_STREAM||"
2223
24+ // Batch streaming configuration
25+ const (
26+ // batchTimeThreshold is the maximum time to wait before flushing a batch (100ms)
27+ batchTimeThreshold = 100 * time .Millisecond
28+ // batchSizeThreshold is the maximum payload size before flushing a batch (100KB)
29+ batchSizeThreshold = 100 * 1024 // 100KB in bytes
30+ // batchChunksThreshold is the maximum number of chunks received efore flushing a batch (100)
31+ batchChunksThreshold = 100
32+ )
33+
2334// Target streaming types
2435var httpStreamingTypes = []string {
2536 "text/event-stream" ,
2637 "application/x-ndjson" ,
2738}
2839
40+ // chunkBatch accumulates multiple chunks for batch signing and writing
41+ type chunkBatch struct {
42+ chunks [][]byte // individual chunk bodies
43+ totalSize int64 // cumulative size of all chunks
44+ totalChunks int64 // number of chunks
45+ startTime time.Time // when the batch was created
46+ }
47+
2948// isStreamingResponse determines if an HTTP response should be handled as a stream.
3049//
3150// Checks the "Content-Type" HTTP header against supported streaming types:
@@ -54,25 +73,119 @@ func isStreamingResponse(response *http.Response) bool {
5473 return slices .Contains (httpStreamingTypes , strings .ToLower (mediaType ))
5574}
5675
57- // handleHttpStream processes streaming HTTP responses from backend services.
76+ // shouldFlushBatch determines if the current batch should be flushed based on
77+ // time and size thresholds:
78+ // - Time threshold: 100ms has elapsed since batch creation
79+ // - Size threshold: batch size >= 100KB
80+ // - Chunk threshold: chunks number >= 100
81+ // - Force flag: set to true to flush regardless of thresholds
82+ func shouldFlushBatch (batch * chunkBatch , forceFlush bool ) bool {
83+ if forceFlush {
84+ return forceFlush && batch .totalChunks > 0
85+ }
86+
87+ // Check time threshold
88+ elapsed := time .Since (batch .startTime )
89+ if elapsed >= batchTimeThreshold {
90+ return true
91+ }
92+
93+ // Check size threshold
94+ if batch .totalSize >= batchSizeThreshold {
95+ return true
96+ }
97+
98+ // Check chunk number
99+ if batch .totalChunks >= batchChunksThreshold {
100+ return true
101+ }
102+
103+ return false
104+ }
105+
106+ // flushBatch signs and writes the accumulated batch to the client
107+ func (server * relayMinerHTTPServer ) flushBatch (
108+ ctx context.Context ,
109+ logger polylog.Logger ,
110+ batch * chunkBatch ,
111+ meta * types.RelayRequestMetadata ,
112+ writer http.ResponseWriter ,
113+ flusher http.Flusher ,
114+ ) (* types.RelayResponse , error ) {
115+ if batch .totalChunks == 0 {
116+ return nil , nil
117+ }
118+
119+ // Combine all chunks into a single payload
120+ combinedPayload := make ([]byte , 0 , batch .totalSize )
121+ for i := int64 (0 ); i < batch .totalChunks ; i ++ {
122+ combinedPayload = append (combinedPayload , batch .chunks [i ]... )
123+ }
124+
125+ // Wrap combined chunks in POKT HTTP response structure
126+ poktHTTPResponse := & sdktypes.POKTHTTPResponse {
127+ StatusCode : uint32 (http .StatusOK ),
128+ Header : make (map [string ]* sdktypes.Header , 0 ),
129+ BodyBz : combinedPayload ,
130+ }
131+
132+ // Marshal with deterministic ordering for signature consistency
133+ marshalOpts := proto.MarshalOptions {Deterministic : true }
134+ poktHTTPResponseBz , err := marshalOpts .Marshal (poktHTTPResponse )
135+ if err != nil {
136+ return nil , fmt .Errorf ("❌ failed to marshal POKT HTTP response batch: %w" , err )
137+ }
138+
139+ // Sign the batch once
140+ relayResponse , err := server .newRelayResponse (poktHTTPResponseBz , meta .SessionHeader , meta .SupplierOperatorAddress )
141+ if err != nil {
142+ return nil , fmt .Errorf ("❌ failed to sign relay response batch: %w" , err )
143+ }
144+
145+ // Serialize signed response
146+ signedBatch , err := relayResponse .Marshal ()
147+ if err != nil {
148+ return nil , fmt .Errorf ("❌ failed to marshal signed relay response batch: %w" , err )
149+ }
150+
151+ // Append POKT stream delimiter (allows client-side batch detection)
152+ signedBatch = append (signedBatch , []byte (streamDelimiter )... )
153+
154+ // Write signed batch to client
155+ if _ , err = writer .Write (signedBatch ); err != nil {
156+ return nil , fmt .Errorf ("❌ failed to write stream batch to client: %w" , err )
157+ }
158+
159+ // Flush to ensure data reaches client with low latency
160+ flusher .Flush ()
161+
162+ return relayResponse , nil
163+ }
164+
165+ // handleHttpStream processes streaming HTTP responses from backend services with batching.
58166//
59- // Streaming flow:
60- // 1. Read each newline-delimited chunk from backend response
61- // 2. Wrap chunk in POKT HTTP response structure (status code, headers, body)
62- // 3. Sign each chunk individually using supplier's key
63- // 4. Write signed chunk with delimiter to client
64- // 5. Flush immediately to ensure low-latency streaming
167+ // Streaming flow with batching:
168+ // 1. Accumulate newline-delimited chunks in a batch buffer
169+ // 2. Flush batch when either:
170+ // - 100ms has elapsed since batch creation AND at least one chunk exists
171+ // - Total accumulated payload reaches 100KB
172+ // 3. When flushing:
173+ // - Combine all buffered chunks into single POKTHTTPResponse
174+ // - Sign batch once (not per-chunk)
175+ // - Write signed batch with delimiter to client
176+ // - Flush to ensure low-latency delivery
177+ // 4. Final batch automatically flushes when stream ends
65178//
66- // This enables real-time streaming for SSE and NDJSON responses while maintaining
67- // POKT's signature verification requirements .
179+ // This batching strategy reduces signing overhead and improves throughput while
180+ // maintaining low-latency streaming (max 100ms delay) for SSE and NDJSON responses .
68181//
69182// TODO_IMPROVE: Consider adding configurable buffer size for scanner to handle
70183// large streaming chunks (default is 64KB).
71184// Some LLM responses may exceed this.
72185//
73186// Returns:
74- // - Final relay response (contains last chunk 's signature)
75- // - Total response size across all chunks (for metrics)
187+ // - Final relay response (contains last batch 's signature)
188+ // - Total response size across all batches (for metrics)
76189// - Error if streaming fails (network errors, signature failures, etc.)
77190func (server * relayMinerHTTPServer ) handleHttpStream (
78191 ctx context.Context ,
@@ -106,68 +219,118 @@ func (server *relayMinerHTTPServer) handleHttpStream(
106219 // Create scanner with default newline delimiter
107220 scanner := bufio .NewScanner (response .Body )
108221
109- // Initialize the return values
110- relayResponse := & types.RelayResponse {
111- Meta : types.RelayResponseMetadata {SessionHeader : meta .SessionHeader },
222+ // Initialize batch buffer
223+ batch := & chunkBatch {
224+ chunks : make ([][]byte , batchChunksThreshold ),
225+ totalSize : 0 ,
226+ totalChunks : 0 ,
227+ startTime : time .Now (),
112228 }
229+
230+ // Initialize the return values
231+ var relayResponse * types.RelayResponse
113232 responseSize := float64 (0 )
114233
115- // Process each chunk from backend stream
116- for scanner . Scan () {
117- // TODO_TECHDEBT: Need to periodically check for context cancellation to prevent signature race conditions
234+ // Create a ticker for time-based flushing
235+ ticker := time . NewTicker ( batchTimeThreshold )
236+ defer ticker . Stop ()
118237
119- // Restore newline stripped by scanner (needed for protocol compatibility)
120- line := scanner .Bytes ()
121- line = append (line , '\n' )
238+ // Process chunks from backend stream with time-based and size-based batching
239+ for {
240+ select {
241+ case <- ctx .Done ():
242+ logger .Debug ().Msg ("📦 Stream Context canceled: flushing." )
243+ // Context cancelled - flush any remaining batch before exiting
244+ if batch .totalChunks > 0 {
245+ resp , err := server .flushBatch (ctx , logger , batch , & meta , writer , flusher )
246+ if err != nil {
247+ logger .Error ().Err (err ).Msg ("❌ failed to flush final batch on context cancellation" )
248+ } else if resp != nil {
249+ relayResponse = resp
250+ responseSize += float64 (resp .Size ())
251+ }
252+ }
253+ return nil , responseSize , ctx .Err ()
122254
123- // Wrap chunk in POKT HTTP response structure
124- poktHTTPResponse := & sdktypes.POKTHTTPResponse {
125- StatusCode : uint32 (http .StatusOK ),
126- Header : make (map [string ]* sdktypes.Header , 0 ),
127- BodyBz : line ,
128- }
255+ case <- ticker .C :
256+ logger .Debug ().Msg ("📦 Stream Ticker Time up: Streaming content." )
257+ // Time threshold (100ms) reached - flush if batch has chunks
258+ if shouldFlushBatch (batch , false ) {
259+ resp , err := server .flushBatch (ctx , logger , batch , & meta , writer , flusher )
260+ if err != nil {
261+ return nil , responseSize , fmt .Errorf ("❌ failed to flush batch on time threshold: %w" , err )
262+ }
263+ if resp != nil {
264+ relayResponse = resp
265+ responseSize += float64 (resp .Size ())
266+ }
129267
130- // Marshal with deterministic ordering for signature consistency
131- marshalOpts := proto.MarshalOptions {Deterministic : true }
132- poktHTTPResponseBz , err := marshalOpts .Marshal (poktHTTPResponse )
133- if err != nil {
134- return nil , 0 , fmt .Errorf ("❌ failed to marshal POKT HTTP response: %w" , err )
135- }
268+ // Reset batch for next round
269+ batch .totalSize = 0
270+ batch .totalChunks = 0
271+ batch .startTime = time .Now ()
272+ }
136273
137- // Sign this chunk
138- relayResponse , err = server .newRelayResponse (poktHTTPResponseBz , meta .SessionHeader , meta .SupplierOperatorAddress )
139- if err != nil {
140- return nil , 0 , fmt .Errorf ("❌ failed to sign relay response chunk: %w" , err )
141- }
274+ default :
275+ // Try to read next chunk from scanner (non-blocking via default case)
276+ if ! scanner .Scan () {
277+ // Stream ended - flush final batch if it has chunks
278+ logger .Debug ().Msg ("📦 Stream ended: flusshing last." )
279+ if batch .totalChunks > 0 {
280+ resp , err := server .flushBatch (ctx , logger , batch , & meta , writer , flusher )
281+ if err != nil {
282+ return nil , responseSize , fmt .Errorf ("❌ failed to flush final batch: %w" , err )
283+ }
284+ if resp != nil {
285+ relayResponse = resp
286+ responseSize += float64 (resp .Size ())
287+ }
288+ }
142289
143- // Serialize signed response
144- signedLine , err := relayResponse .Marshal ()
145- if err != nil {
146- return nil , 0 , fmt .Errorf ("❌ failed to marshal signed relay response: %w" , err )
147- }
290+ // Check for scanner errors (network issues, buffer overflows, etc.)
291+ if err := scanner .Err (); err != nil {
292+ return nil , responseSize , fmt .Errorf ("❌ stream scanning error: %w" , err )
293+ }
148294
149- // Track cumulative size across all chunks
150- responseSize += float64 (relayResponse .Size ())
295+ // Stream ended successfully
296+ return relayResponse , responseSize , nil
297+ }
151298
152- // Append POKT stream delimiter (allows client-side chunk detection)
153- signedLine = append (signedLine , []byte (streamDelimiter )... )
299+ logger .Debug ().Msg ("📦 Stream Received: Adding to buffer." )
154300
155- // Write signed chunk to client
156- if _ , err = writer .Write (signedLine ); err != nil {
157- return nil , 0 , fmt .Errorf ("❌ failed to write stream chunk to client: %w" , err )
158- }
301+ // Restore newline stripped by scanner (needed for protocol compatibility)
302+ lineBz := scanner .Bytes ()
303+ line := make ([]byte , len (lineBz )+ 1 )
304+ copy (line , lineBz )
305+ line [len (lineBz )] = '\n'
159306
160- // Flush immediately for low-latency streaming
161- flusher .Flush ()
162- }
307+ // Add chunk to batch
308+ batch .chunks [batch .totalChunks ] = line
309+ batch .totalSize += int64 (len (line ))
310+ batch .totalChunks += 1
163311
164- // Check for scanner errors (network issues, buffer overflows, etc.)
165- if err := scanner .Err (); err != nil {
166- return nil , 0 , fmt .Errorf ("❌ stream scanning error: %w" , err )
167- }
312+ // Check if batch size threshold (100KB) exceeded
313+ if shouldFlushBatch (batch , false ) {
314+ logger .Debug ().Msg ("📦 Stream Buffer full: flushing." )
315+ resp , err := server .flushBatch (ctx , logger , batch , & meta , writer , flusher )
316+ if err != nil {
317+ return nil , responseSize , fmt .Errorf ("❌ failed to flush batch on size threshold: %w" , err )
318+ }
319+ if resp != nil {
320+ relayResponse = resp
321+ responseSize += float64 (resp .Size ())
322+ }
168323
169- // Return the relay response, response size, and nil error.
170- return relayResponse , responseSize , nil
324+ // Reset timer
325+ ticker .Reset (batchTimeThreshold )
326+ // Reset batch for next round
327+ batch .totalSize = 0
328+ batch .totalChunks = 0
329+ batch .startTime = time .Now ()
330+
331+ }
332+ }
333+ }
171334}
172335
173336// ScanEvents is a bufio.SplitFunc that splits streaming data by the POKT stream delimiter.
0 commit comments