Skip to content

Commit f61ad03

Browse files
committed
Add stream support for CLI relayminer relay cmd
1 parent 993412b commit f61ad03

File tree

3 files changed

+233
-65
lines changed

3 files changed

+233
-65
lines changed

pkg/network/http/http_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ func NewDefaultHTTPClientWithDebugMetrics() *HTTPClientWithDebugMetrics {
102102
IdleConnTimeout: 90 * time.Second, // Reduced from 300s - shorter idle to free resources
103103

104104
// Timeout settings optimized for quick failure detection
105-
TLSHandshakeTimeout: 5 * time.Second, // Fast TLS timeout since handshakes typically complete in ~100ms
106-
ResponseHeaderTimeout: 5 * time.Second, // Header timeout to allow for server processing time
105+
TLSHandshakeTimeout: 5 * time.Second, // Fast TLS timeout since handshakes typically complete in ~100ms
106+
// ResponseHeaderTimeout: 5 * time.Second, // Header timeout to allow for server processing time. TODO: This affect custom, long, timeouts. This needs to be modified when larger timeouts are needed.
107107

108108
// Performance optimizations
109109
DisableKeepAlives: false, // Enable connection reuse to reduce connection overhead

pkg/relayer/proxy/http_stream.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package proxy
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"net/http"
7+
"strings"
8+
9+
"github.com/pokt-network/poktroll/pkg/polylog"
10+
"github.com/pokt-network/poktroll/x/service/types"
11+
sdktypes "github.com/pokt-network/shannon-sdk/types"
12+
"google.golang.org/protobuf/proto"
13+
)
14+
15+
// Target streaming types
16+
var httpStreamingTypes = []string{
17+
"text/event-stream",
18+
"application/x-ndjson",
19+
}
20+
21+
const streamDelimitter = "||POKT_STREAM||"
22+
23+
// Custom split function for POKT events.
24+
// The POKT streams contains a signature and the body of the request to the
25+
// backend, so we need a custom delimiter for streaming responses.
26+
// This functions is a custom delimiter for the Stream functionality
27+
func ScanEvents(data []byte, atEOF bool) (advance int, token []byte, err error) {
28+
if atEOF && len(data) == 0 {
29+
return 0, nil, nil
30+
}
31+
32+
// Look for the POKT_STREAM delimiter
33+
if i := strings.Index(string(data), streamDelimitter); i >= 0 {
34+
// Return chunk without the delimiter
35+
return i + len(streamDelimitter), data[0:i], nil
36+
}
37+
38+
// If we're at EOF, return whatever we have
39+
if atEOF {
40+
return len(data), data, nil
41+
}
42+
43+
// Request more data
44+
return 0, nil, nil
45+
}
46+
47+
// This function looks for the supported streams.
48+
// We should be able to handle any cunked stream, but we have limited this
49+
// to the content types specified in the variable `httpStreamingTypes`
50+
//
51+
// Returns:
52+
// - boolean whether the backend response should be streamed or not
53+
func IsStreamingResponse(response *http.Response) bool {
54+
// Check if this is a streaming response
55+
contentType := strings.ToLower(response.Header.Get("Content-Type"))
56+
for _, streamType := range httpStreamingTypes {
57+
if strings.Contains(contentType, streamType) {
58+
return true
59+
}
60+
}
61+
return false
62+
}
63+
64+
// Handles the streaming backend responses.
65+
// This function takes a streaming body and:
66+
// - Reads each chunk
67+
// - Converts the chunk into a POKT HTTP response structure
68+
// - Signs each chunk
69+
// - Writes and flush to the requesting app
70+
//
71+
// This will handle any stream where the delimiter is a newline (`\n`)
72+
func (server *relayMinerHTTPServer) HandleHttpStream(
73+
response *http.Response,
74+
writer http.ResponseWriter,
75+
meta types.RelayRequestMetadata,
76+
logger polylog.Logger,
77+
) (relayResponse *types.RelayResponse, responseSize float64, err error) {
78+
// Set headers
79+
writer.Header().Set("Connection", "close")
80+
// Copy the response back to the original request
81+
for k, v := range response.Header {
82+
writer.Header()[k] = v
83+
}
84+
writer.WriteHeader(response.StatusCode)
85+
86+
// Instance the flusher
87+
flusher, ok := writer.(http.Flusher)
88+
if !ok {
89+
logger.Error().Msg("Streaming not supported.")
90+
return nil, 0, fmt.Errorf("❌ failed to open stream request")
91+
}
92+
93+
// Create scanner with default delimiter (\n)
94+
scanner := bufio.NewScanner(response.Body)
95+
// Scan for chunks
96+
for scanner.Scan() {
97+
line := scanner.Bytes()
98+
// Add back the newline that we stripped at Scan
99+
line = append(line, '\n')
100+
101+
// Create the POKT HTTP response structure
102+
poktHTTPResponse := &sdktypes.POKTHTTPResponse{
103+
StatusCode: uint32(http.StatusOK),
104+
Header: make(map[string]*sdktypes.Header, 0),
105+
BodyBz: line,
106+
}
107+
// Deterministic marshaling of response
108+
marshalOpts := proto.MarshalOptions{Deterministic: true}
109+
poktHTTPResponseBz, err := marshalOpts.Marshal(poktHTTPResponse)
110+
if err != nil {
111+
return nil, 0, fmt.Errorf("❌ failed to marshal POKT HTTP response: %w", err)
112+
}
113+
114+
// Sign response
115+
relayResponse, err = server.newRelayResponse(poktHTTPResponseBz, meta.SessionHeader, meta.SupplierOperatorAddress)
116+
if err != nil {
117+
return nil, 0, err
118+
}
119+
// Marshal response
120+
signedLine, err := relayResponse.Marshal()
121+
if err != nil {
122+
return nil, 0, err
123+
}
124+
125+
// track size, the sum of all chunks
126+
responseSize += float64(relayResponse.Size())
127+
128+
// Append custom delimiter (used by app to detect POKT streaming)
129+
signedLine = append(signedLine, []byte(streamDelimitter)...)
130+
131+
// Write to client
132+
_, err = writer.Write(signedLine)
133+
if err != nil {
134+
return nil, 0, err
135+
}
136+
137+
// Flush to ensure the stream goes asap to the app
138+
flusher.Flush()
139+
}
140+
141+
return
142+
}

pkg/relayer/proxy/sync.go

Lines changed: 89 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -328,82 +328,108 @@ func (server *relayMinerHTTPServer) serveSyncRequest(
328328
// Capture the service call request duration metric.
329329
relayer.CaptureServiceDuration(serviceId, serviceCallStartTime, httpResponse.StatusCode)
330330

331-
// Serialize the service response to be sent back to the client.
332-
// This will include the status code, headers, and body.
333-
wrappedHTTPResponse, responseBz, err := SerializeHTTPResponse(logger, httpResponse, server.serverConfig.MaxBodySize)
334-
if err != nil {
335-
logger.Error().Err(err).Msg("❌ Failed serializing the service response")
336-
return relayRequest, err
337-
}
338-
// Early close backend response body to free up pool resources.
339-
CloseBody(logger, httpResponse.Body)
340-
341-
// Pass through all backend responses including errors.
342-
// Allows clients to see real HTTP status codes from backend service.
343-
// Log non-2XX status codes for monitoring but don't block response.
344-
if httpResponse.StatusCode >= http.StatusMultipleChoices {
345-
logger.Error().
346-
Int("status_code", httpResponse.StatusCode).
347-
Str("request_url", httpRequestWithUpdatedTimeout.URL.String()).
348-
Str("request_payload_first_bytes", polylog.Preview(string(relayRequest.Payload))).
349-
Str("response_payload_first_bytes", polylog.Preview(string(wrappedHTTPResponse.BodyBz))).
350-
Msg("backend service returned a non-2XX status code. Passing it through to the client.")
331+
// Check if the response is a stream
332+
streamThis := IsStreamingResponse(httpResponse)
333+
334+
// Create empty relay response
335+
relayResponse := &types.RelayResponse{
336+
Meta: types.RelayResponseMetadata{SessionHeader: meta.SessionHeader},
337+
Payload: nil,
351338
}
352339

353-
logger.Debug().
354-
Str("relay_request_session_header", meta.SessionHeader.String()).
355-
Msg("building relay response protobuf from service response")
340+
var responseSize float64
341+
if streamThis {
342+
logger.Debug().Msg("Handling streaming request.")
356343

357-
// Check context cancellation before building relay response to prevent signature race conditions
358-
if ctxErr := ctxWithDeadline.Err(); ctxErr != nil {
359-
logger.Warn().Err(ctxErr).Msg("⚠️ Context canceled before building relay response - preventing signature race condition")
360-
return relayRequest, ErrRelayerProxyTimeout.Wrapf(
361-
"request context canceled during response building: %v",
362-
ctxErr,
363-
)
364-
}
344+
// Process and assign the relay response
345+
relayResponse, responseSize, err = server.HandleHttpStream(httpResponse, writer, meta, logger)
346+
if err != nil {
347+
return relayRequest, err
348+
}
365349

366-
// Build the relay response using the original service's response.
367-
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it
368-
// was verified to be valid and has to be the same as the relayResponse session header.
369-
relayResponse, err := server.newRelayResponse(responseBz, meta.SessionHeader, supplierOperatorAddress)
370-
if err != nil {
371-
logger.Error().Err(err).Msg("❌ Failed building the relay response")
372-
// The client should not have knowledge about the RelayMiner's issues with
373-
// building the relay response. Reply with an internal error so that the
374-
// original error is not exposed to the client.
375-
return relayRequest, ErrRelayerProxyInternalError.Wrap(err.Error())
376-
}
350+
} else {
351+
logger.Debug().Msg("Handling normal request.")
377352

378-
relay := &types.Relay{Req: relayRequest, Res: relayResponse}
353+
// Serialize the service response to be sent back to the client.
354+
// This will include the status code, headers, and body.
355+
wrappedHTTPResponse, responseBz, err := SerializeHTTPResponse(logger, httpResponse, server.serverConfig.MaxBodySize)
356+
if err != nil {
357+
logger.Error().Err(err).Msg("❌ Failed serializing the service response")
358+
return relayRequest, err
359+
}
379360

380-
// Capture the time after response time for the relay.
381-
responsePreparationEnd := time.Now()
382-
// Add response preparation duration to the logger such that any log before errors will have
383-
// as much request duration information as possible.
384-
logger = logger.With(
385-
"response_preparation_duration",
386-
time.Since(backendServiceProcessingEnd).String(),
387-
)
388-
relayer.CaptureResponsePreparationDuration(serviceId, backendServiceProcessingEnd)
361+
// Pass through all backend responses including errors.
362+
// Allows clients to see real HTTP status codes from backend service.
363+
// Log non-2XX status codes for monitoring but don't block response.
364+
if httpResponse.StatusCode >= http.StatusMultipleChoices {
365+
logger.Error().
366+
Int("status_code", httpResponse.StatusCode).
367+
Str("request_url", httpRequest.URL.String()).
368+
Str("request_payload_first_bytes", polylog.Preview(string(relayRequest.Payload))).
369+
Str("response_payload_first_bytes", polylog.Preview(string(wrappedHTTPResponse.BodyBz))).
370+
Msg("backend service returned a non-2XX status code. Passing it through to the client.")
371+
}
372+
373+
logger.Debug().
374+
Str("relay_request_session_header", meta.SessionHeader.String()).
375+
Msg("building relay response protobuf from service response")
376+
377+
// Check context cancellation before building relay response to prevent signature race conditions
378+
if ctxErr := ctxWithDeadline.Err(); ctxErr != nil {
379+
logger.Warn().Err(ctxErr).Msg("⚠️ Context canceled before building relay response - preventing signature race condition")
380+
return relayRequest, ErrRelayerProxyTimeout.Wrapf(
381+
"request context canceled during response building: %v",
382+
ctxErr,
383+
)
384+
}
385+
386+
// Build the relay response using the original service's response.
387+
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it
388+
// was verified to be valid and has to be the same as the relayResponse session header.
389+
relayResponse, err = server.newRelayResponse(responseBz, meta.SessionHeader, meta.SupplierOperatorAddress)
390+
if err != nil {
391+
logger.Error().Err(err).Msg("❌ Failed building the relay response")
392+
// The client should not have knowledge about the RelayMiner's issues with
393+
// building the relay response. Reply with an internal error so that the
394+
// original error is not exposed to the client.
395+
return relayRequest, ErrRelayerProxyInternalError.Wrap(err.Error())
396+
}
397+
398+
// Capture the time after response time for the relay.
399+
responsePreparationEnd := time.Now()
400+
// Add response preparation duration to the logger such that any log before errors will have
401+
// as much request duration information as possible.
402+
logger = logger.With(
403+
"response_preparation_duration",
404+
time.Since(backendServiceProcessingEnd).String(),
405+
)
406+
relayer.CaptureResponsePreparationDuration(serviceId, backendServiceProcessingEnd)
407+
408+
// Send the relay response to the client.
409+
err = server.sendRelayResponse(relayResponse, writer)
410+
logger = logger.With("send_response_duration", time.Since(responsePreparationEnd).String())
411+
if err != nil {
412+
// If the originHost cannot be parsed, reply with an internal error so that
413+
// the original error is not exposed to the client.
414+
clientError := ErrRelayerProxyInternalError.Wrap(err.Error())
415+
// Log current time to highlight writer i/o timeout errors.
416+
logger.Warn().Err(err).Time("current_time", time.Now()).Msg("❌ Failed sending relay response")
417+
return relayRequest, clientError
418+
}
419+
420+
// Set response size
421+
responseSize = float64(relayResponse.Size())
389422

390-
// Send the relay response to the client.
391-
err = server.sendRelayResponse(relay.Res, writer)
392-
logger = logger.With("send_response_duration", time.Since(responsePreparationEnd).String())
393-
if err != nil {
394-
// If the originHost cannot be parsed, reply with an internal error so that
395-
// the original error is not exposed to the client.
396-
clientError := ErrRelayerProxyInternalError.Wrap(err.Error())
397-
// Log current time to highlight writer i/o timeout errors.
398-
logger.Warn().Err(err).Time("current_time", time.Now()).Msg("❌ Failed sending relay response")
399-
return relayRequest, clientError
400423
}
401424

425+
// Create the relay response
426+
relay := &types.Relay{Req: relayRequest, Res: relayResponse}
427+
402428
logger.ProbabilisticDebugInfo(polylog.ProbabilisticDebugInfoProb).Msg("relay request served successfully")
403429

404430
relayer.RelaysSuccessTotal.With("service_id", serviceId).Add(1)
405431

406-
relayer.RelayResponseSizeBytes.With("service_id", serviceId).Observe(float64(relay.Res.Size()))
432+
relayer.RelayResponseSizeBytes.With("service_id", serviceId).Observe(responseSize)
407433

408434
// Verify relay reward eligibility a SECOND time AFTER completing backend request.
409435
//

0 commit comments

Comments
 (0)