Skip to content

Commit 6c49427

Browse files
committed
added batched stream to reduce the sig&verify latency
1 parent f16d968 commit 6c49427

File tree

2 files changed

+250
-74
lines changed

2 files changed

+250
-74
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -700,22 +700,35 @@ func processStreamRequest(ctx context.Context,
700700
continue
701701
}
702702

703-
// This is SSE, unmarshal
704-
trimmedPrefix := strings.TrimPrefix(stringBody, "data: ")
705-
stringJson := strings.TrimSuffix(trimmedPrefix, "\n")
706-
if len(stringJson) == 0 {
707-
// this was probably a delimiter
708-
continue
709-
} else if stringJson == "[DONE]" {
710-
// SSE end
711-
logger.Info().Msgf("✅ SSE Done")
712-
} else {
713-
// Umarshal
714-
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
715-
if err != nil {
716-
logger.Info().Msgf("Received: %s | Stripped: %s", stringBody, stringJson)
717-
return err
703+
// This is batched SSE, split and unmarshal
704+
705+
// Split by newlines to get individual SSE lines
706+
linesBatched := strings.Split(stringBody, "data: ")
707+
for _, batchLine := range linesBatched {
708+
// Skip empty lines
709+
if len(strings.TrimSpace(batchLine)) == 0 {
710+
continue
718711
}
712+
713+
// Process single SSE line
714+
// trimmedPrefix := strings.TrimPrefix(batchLine, "data: ")
715+
stringJson := strings.TrimRight(batchLine, "\n")
716+
if len(stringJson) == 0 {
717+
// this was probably a delimiter
718+
continue
719+
} else if stringJson == "[DONE]" {
720+
// SSE end
721+
logger.Info().Msgf("✅ SSE Done")
722+
} else {
723+
// Umarshal
724+
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
725+
if err != nil {
726+
logger.Info().Msgf("Received: %s | Stripped: %s", batchLine, stringJson)
727+
logger.Info().Str("stringBody", stringBody).Msg("Raw data")
728+
return err
729+
}
730+
}
731+
719732
}
720733
}
721734
return nil

pkg/relayer/proxy/http_stream.go

Lines changed: 222 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"slices"
1010
"strings"
11+
"time"
1112

1213
sdktypes "github.com/pokt-network/shannon-sdk/types"
1314
"google.golang.org/protobuf/proto"
@@ -20,12 +21,30 @@ import (
2021
// A custom delimiter used to separate chunks in a streaming response.
2122
const streamDelimiter = "||POKT_STREAM||"
2223

24+
// Batch streaming configuration
25+
const (
26+
// batchTimeThreshold is the maximum time to wait before flushing a batch (100ms)
27+
batchTimeThreshold = 100 * time.Millisecond
28+
// batchSizeThreshold is the maximum payload size before flushing a batch (100KB)
29+
batchSizeThreshold = 100 * 1024 // 100KB in bytes
30+
// batchChunksThreshold is the maximum number of chunks received efore flushing a batch (100)
31+
batchChunksThreshold = 100
32+
)
33+
2334
// Target streaming types
2435
var httpStreamingTypes = []string{
2536
"text/event-stream",
2637
"application/x-ndjson",
2738
}
2839

40+
// chunkBatch accumulates multiple chunks for batch signing and writing
41+
type chunkBatch struct {
42+
chunks [][]byte // individual chunk bodies
43+
totalSize int64 // cumulative size of all chunks
44+
totalChunks int64 // number of chunks
45+
startTime time.Time // when the batch was created
46+
}
47+
2948
// isStreamingResponse determines if an HTTP response should be handled as a stream.
3049
//
3150
// Checks the "Content-Type" HTTP header against supported streaming types:
@@ -54,25 +73,119 @@ func isStreamingResponse(response *http.Response) bool {
5473
return slices.Contains(httpStreamingTypes, strings.ToLower(mediaType))
5574
}
5675

57-
// handleHttpStream processes streaming HTTP responses from backend services.
76+
// shouldFlushBatch determines if the current batch should be flushed based on
77+
// time and size thresholds:
78+
// - Time threshold: 100ms has elapsed since batch creation
79+
// - Size threshold: batch size >= 100KB
80+
// - Chunk threshold: chunks number >= 100
81+
// - Force flag: set to true to flush regardless of thresholds
82+
func shouldFlushBatch(batch *chunkBatch, forceFlush bool) bool {
83+
if forceFlush {
84+
return forceFlush && batch.totalChunks > 0
85+
}
86+
87+
// Check time threshold
88+
elapsed := time.Since(batch.startTime)
89+
if elapsed >= batchTimeThreshold {
90+
return true
91+
}
92+
93+
// Check size threshold
94+
if batch.totalSize >= batchSizeThreshold {
95+
return true
96+
}
97+
98+
// Check chunk number
99+
if batch.totalChunks >= batchChunksThreshold {
100+
return true
101+
}
102+
103+
return false
104+
}
105+
106+
// flushBatch signs and writes the accumulated batch to the client
107+
func (server *relayMinerHTTPServer) flushBatch(
108+
ctx context.Context,
109+
logger polylog.Logger,
110+
batch *chunkBatch,
111+
meta *types.RelayRequestMetadata,
112+
writer http.ResponseWriter,
113+
flusher http.Flusher,
114+
) (*types.RelayResponse, error) {
115+
if batch.totalChunks == 0 {
116+
return nil, nil
117+
}
118+
119+
// Combine all chunks into a single payload
120+
combinedPayload := make([]byte, 0, batch.totalSize)
121+
for i := int64(0); i < batch.totalChunks; i++ {
122+
combinedPayload = append(combinedPayload, batch.chunks[i]...)
123+
}
124+
125+
// Wrap combined chunks in POKT HTTP response structure
126+
poktHTTPResponse := &sdktypes.POKTHTTPResponse{
127+
StatusCode: uint32(http.StatusOK),
128+
Header: make(map[string]*sdktypes.Header, 0),
129+
BodyBz: combinedPayload,
130+
}
131+
132+
// Marshal with deterministic ordering for signature consistency
133+
marshalOpts := proto.MarshalOptions{Deterministic: true}
134+
poktHTTPResponseBz, err := marshalOpts.Marshal(poktHTTPResponse)
135+
if err != nil {
136+
return nil, fmt.Errorf("❌ failed to marshal POKT HTTP response batch: %w", err)
137+
}
138+
139+
// Sign the batch once
140+
relayResponse, err := server.newRelayResponse(poktHTTPResponseBz, meta.SessionHeader, meta.SupplierOperatorAddress)
141+
if err != nil {
142+
return nil, fmt.Errorf("❌ failed to sign relay response batch: %w", err)
143+
}
144+
145+
// Serialize signed response
146+
signedBatch, err := relayResponse.Marshal()
147+
if err != nil {
148+
return nil, fmt.Errorf("❌ failed to marshal signed relay response batch: %w", err)
149+
}
150+
151+
// Append POKT stream delimiter (allows client-side batch detection)
152+
signedBatch = append(signedBatch, []byte(streamDelimiter)...)
153+
154+
// Write signed batch to client
155+
if _, err = writer.Write(signedBatch); err != nil {
156+
return nil, fmt.Errorf("❌ failed to write stream batch to client: %w", err)
157+
}
158+
159+
// Flush to ensure data reaches client with low latency
160+
flusher.Flush()
161+
162+
return relayResponse, nil
163+
}
164+
165+
// handleHttpStream processes streaming HTTP responses from backend services with batching.
58166
//
59-
// Streaming flow:
60-
// 1. Read each newline-delimited chunk from backend response
61-
// 2. Wrap chunk in POKT HTTP response structure (status code, headers, body)
62-
// 3. Sign each chunk individually using supplier's key
63-
// 4. Write signed chunk with delimiter to client
64-
// 5. Flush immediately to ensure low-latency streaming
167+
// Streaming flow with batching:
168+
// 1. Accumulate newline-delimited chunks in a batch buffer
169+
// 2. Flush batch when either:
170+
// - 100ms has elapsed since batch creation AND at least one chunk exists
171+
// - Total accumulated payload reaches 100KB
172+
// 3. When flushing:
173+
// - Combine all buffered chunks into single POKTHTTPResponse
174+
// - Sign batch once (not per-chunk)
175+
// - Write signed batch with delimiter to client
176+
// - Flush to ensure low-latency delivery
177+
// 4. Final batch automatically flushes when stream ends
65178
//
66-
// This enables real-time streaming for SSE and NDJSON responses while maintaining
67-
// POKT's signature verification requirements.
179+
// This batching strategy reduces signing overhead and improves throughput while
180+
// maintaining low-latency streaming (max 100ms delay) for SSE and NDJSON responses.
68181
//
69182
// TODO_IMPROVE: Consider adding configurable buffer size for scanner to handle
70183
// large streaming chunks (default is 64KB).
71184
// Some LLM responses may exceed this.
72185
//
73186
// Returns:
74-
// - Final relay response (contains last chunk's signature)
75-
// - Total response size across all chunks (for metrics)
187+
// - Final relay response (contains last batch's signature)
188+
// - Total response size across all batches (for metrics)
76189
// - Error if streaming fails (network errors, signature failures, etc.)
77190
func (server *relayMinerHTTPServer) handleHttpStream(
78191
ctx context.Context,
@@ -106,68 +219,118 @@ func (server *relayMinerHTTPServer) handleHttpStream(
106219
// Create scanner with default newline delimiter
107220
scanner := bufio.NewScanner(response.Body)
108221

109-
// Initialize the return values
110-
relayResponse := &types.RelayResponse{
111-
Meta: types.RelayResponseMetadata{SessionHeader: meta.SessionHeader},
222+
// Initialize batch buffer
223+
batch := &chunkBatch{
224+
chunks: make([][]byte, batchChunksThreshold),
225+
totalSize: 0,
226+
totalChunks: 0,
227+
startTime: time.Now(),
112228
}
229+
230+
// Initialize the return values
231+
var relayResponse *types.RelayResponse
113232
responseSize := float64(0)
114233

115-
// Process each chunk from backend stream
116-
for scanner.Scan() {
117-
// TODO_TECHDEBT: Need to periodically check for context cancellation to prevent signature race conditions
234+
// Create a ticker for time-based flushing
235+
ticker := time.NewTicker(batchTimeThreshold)
236+
defer ticker.Stop()
118237

119-
// Restore newline stripped by scanner (needed for protocol compatibility)
120-
line := scanner.Bytes()
121-
line = append(line, '\n')
238+
// Process chunks from backend stream with time-based and size-based batching
239+
for {
240+
select {
241+
case <-ctx.Done():
242+
logger.Debug().Msg("📦 Stream Context canceled: flushing.")
243+
// Context cancelled - flush any remaining batch before exiting
244+
if batch.totalChunks > 0 {
245+
resp, err := server.flushBatch(ctx, logger, batch, &meta, writer, flusher)
246+
if err != nil {
247+
logger.Error().Err(err).Msg("❌ failed to flush final batch on context cancellation")
248+
} else if resp != nil {
249+
relayResponse = resp
250+
responseSize += float64(resp.Size())
251+
}
252+
}
253+
return nil, responseSize, ctx.Err()
122254

123-
// Wrap chunk in POKT HTTP response structure
124-
poktHTTPResponse := &sdktypes.POKTHTTPResponse{
125-
StatusCode: uint32(http.StatusOK),
126-
Header: make(map[string]*sdktypes.Header, 0),
127-
BodyBz: line,
128-
}
255+
case <-ticker.C:
256+
logger.Debug().Msg("📦 Stream Ticker Time up: Streaming content.")
257+
// Time threshold (100ms) reached - flush if batch has chunks
258+
if shouldFlushBatch(batch, false) {
259+
resp, err := server.flushBatch(ctx, logger, batch, &meta, writer, flusher)
260+
if err != nil {
261+
return nil, responseSize, fmt.Errorf("❌ failed to flush batch on time threshold: %w", err)
262+
}
263+
if resp != nil {
264+
relayResponse = resp
265+
responseSize += float64(resp.Size())
266+
}
129267

130-
// Marshal with deterministic ordering for signature consistency
131-
marshalOpts := proto.MarshalOptions{Deterministic: true}
132-
poktHTTPResponseBz, err := marshalOpts.Marshal(poktHTTPResponse)
133-
if err != nil {
134-
return nil, 0, fmt.Errorf("❌ failed to marshal POKT HTTP response: %w", err)
135-
}
268+
// Reset batch for next round
269+
batch.totalSize = 0
270+
batch.totalChunks = 0
271+
batch.startTime = time.Now()
272+
}
136273

137-
// Sign this chunk
138-
relayResponse, err = server.newRelayResponse(poktHTTPResponseBz, meta.SessionHeader, meta.SupplierOperatorAddress)
139-
if err != nil {
140-
return nil, 0, fmt.Errorf("❌ failed to sign relay response chunk: %w", err)
141-
}
274+
default:
275+
// Try to read next chunk from scanner (non-blocking via default case)
276+
if !scanner.Scan() {
277+
// Stream ended - flush final batch if it has chunks
278+
logger.Debug().Msg("📦 Stream ended: flusshing last.")
279+
if batch.totalChunks > 0 {
280+
resp, err := server.flushBatch(ctx, logger, batch, &meta, writer, flusher)
281+
if err != nil {
282+
return nil, responseSize, fmt.Errorf("❌ failed to flush final batch: %w", err)
283+
}
284+
if resp != nil {
285+
relayResponse = resp
286+
responseSize += float64(resp.Size())
287+
}
288+
}
142289

143-
// Serialize signed response
144-
signedLine, err := relayResponse.Marshal()
145-
if err != nil {
146-
return nil, 0, fmt.Errorf("❌ failed to marshal signed relay response: %w", err)
147-
}
290+
// Check for scanner errors (network issues, buffer overflows, etc.)
291+
if err := scanner.Err(); err != nil {
292+
return nil, responseSize, fmt.Errorf("❌ stream scanning error: %w", err)
293+
}
148294

149-
// Track cumulative size across all chunks
150-
responseSize += float64(relayResponse.Size())
295+
// Stream ended successfully
296+
return relayResponse, responseSize, nil
297+
}
151298

152-
// Append POKT stream delimiter (allows client-side chunk detection)
153-
signedLine = append(signedLine, []byte(streamDelimiter)...)
299+
logger.Debug().Msg("📦 Stream Received: Adding to buffer.")
154300

155-
// Write signed chunk to client
156-
if _, err = writer.Write(signedLine); err != nil {
157-
return nil, 0, fmt.Errorf("❌ failed to write stream chunk to client: %w", err)
158-
}
301+
// Restore newline stripped by scanner (needed for protocol compatibility)
302+
lineBz := scanner.Bytes()
303+
line := make([]byte, len(lineBz)+1)
304+
copy(line, lineBz)
305+
line[len(lineBz)] = '\n'
159306

160-
// Flush immediately for low-latency streaming
161-
flusher.Flush()
162-
}
307+
// Add chunk to batch
308+
batch.chunks[batch.totalChunks] = line
309+
batch.totalSize += int64(len(line))
310+
batch.totalChunks += 1
163311

164-
// Check for scanner errors (network issues, buffer overflows, etc.)
165-
if err := scanner.Err(); err != nil {
166-
return nil, 0, fmt.Errorf("❌ stream scanning error: %w", err)
167-
}
312+
// Check if batch size threshold (100KB) exceeded
313+
if shouldFlushBatch(batch, false) {
314+
logger.Debug().Msg("📦 Stream Buffer full: flushing.")
315+
resp, err := server.flushBatch(ctx, logger, batch, &meta, writer, flusher)
316+
if err != nil {
317+
return nil, responseSize, fmt.Errorf("❌ failed to flush batch on size threshold: %w", err)
318+
}
319+
if resp != nil {
320+
relayResponse = resp
321+
responseSize += float64(resp.Size())
322+
}
168323

169-
// Return the relay response, response size, and nil error.
170-
return relayResponse, responseSize, nil
324+
// Reset timer
325+
ticker.Reset(batchTimeThreshold)
326+
// Reset batch for next round
327+
batch.totalSize = 0
328+
batch.totalChunks = 0
329+
batch.startTime = time.Now()
330+
331+
}
332+
}
333+
}
171334
}
172335

173336
// ScanEvents is a bufio.SplitFunc that splits streaming data by the POKT stream delimiter.

0 commit comments

Comments
 (0)