Skip to content

Commit cee9aa7

Browse files
committed
Add stream support for CLI relayminer relay cmd
1 parent 5c08935 commit cee9aa7

File tree

3 files changed

+429
-140
lines changed

3 files changed

+429
-140
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 199 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package cmd
33

44
import (
5+
"bufio"
56
"bytes"
67
"context"
78
"encoding/json"
@@ -12,6 +13,8 @@ import (
1213
"net/http"
1314
"net/url"
1415
"os"
16+
"strings"
17+
"time"
1518

1619
"github.com/cosmos/cosmos-sdk/client"
1720
cosmosflags "github.com/cosmos/cosmos-sdk/client/flags"
@@ -22,6 +25,7 @@ import (
2225

2326
"github.com/pokt-network/poktroll/pkg/polylog"
2427
"github.com/pokt-network/poktroll/pkg/polylog/polyzero"
28+
"github.com/pokt-network/poktroll/pkg/relayer/proxy"
2529
apptypes "github.com/pokt-network/poktroll/x/application/types"
2630
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
2731
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
@@ -160,7 +164,12 @@ func runRelay(cmd *cobra.Command, args []string) error {
160164
logger.Error().Err(err).Msg("❌ Error connecting to gRPC")
161165
return err
162166
}
163-
defer grpcConn.Close()
167+
defer func(grpcConn *grpc.ClientConn) {
168+
err := grpcConn.Close()
169+
if err != nil {
170+
logger.Error().Err(err).Msg("❌ Error closing gRPC connection")
171+
}
172+
}(grpcConn)
164173
logger.Info().Msgf("✅ gRPC connection initialized: %v", grpcConn)
165174

166175
// Create a connection to the POKT full node
@@ -339,124 +348,234 @@ func runRelay(cmd *cobra.Command, args []string) error {
339348
}
340349
logger.Info().Msgf("✅ Endpoint URL parsed: %v", reqUrl)
341350

351+
// Create http client
352+
backendClient := &http.Client{
353+
Timeout: 600 * time.Second,
354+
}
355+
356+
ctxWithTimeout, cancelFn := context.WithTimeout(context.Background(), 600*time.Second)
357+
defer cancelFn()
358+
342359
// Send multiple requests sequentially as specified by the count flag
343360
for i := 1; i <= flagRelayRequestCount; i++ {
344-
if flagRelayRequestCount > 1 {
345-
logger.Info().Msgf("📤 Sending request %d of %d", i, flagRelayRequestCount)
346-
}
347361

348362
// Create the HTTP request with the relay request body
349-
httpReq := &http.Request{
350-
Method: http.MethodPost,
351-
URL: reqUrl,
352-
Header: http.Header{
353-
"Content-Type": []string{"application/json"},
354-
},
355-
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
356-
}
357-
358-
// Send the HTTP request containing the signed relay request
359-
httpResp, err := http.DefaultClient.Do(httpReq)
363+
httpReq, err := http.NewRequestWithContext(
364+
ctxWithTimeout,
365+
http.MethodPost, // This is the method to the Relay Miner node
366+
reqUrl.String(),
367+
io.NopCloser(bytes.NewReader(relayReqBz)),
368+
)
360369
if err != nil {
361-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d", i)
370+
logger.Error().Err(err).Msg("❌ Error creating relay request")
362371
continue
363372
}
364373

365-
if httpResp.StatusCode != http.StatusOK {
366-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d due to response status code %d", i, httpResp.StatusCode)
367-
continue
368-
}
369-
370-
// Read the response
371-
respBz, err := io.ReadAll(httpResp.Body)
374+
// Send the request HTTP request containing the signed relay request
375+
httpResp, err := backendClient.Do(httpReq)
372376
if err != nil {
373-
logger.Error().Err(err).Msgf("❌ Error reading response %d", i)
377+
logger.Error().Err(err).Msg("❌ Error sending relay request")
374378
continue
375379
}
376380

377-
// This is intentionally not a defer because the loop could introduce memory leaks,
378-
// performance issues and bad connection management for high flagRelayRequestCount values
379-
httpResp.Body.Close()
380-
381381
// Ensure the supplier operator signature is present
382382
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
383383
if supplierSignerAddress == "" {
384384
logger.Error().Msg("❌ Supplier operator signature is missing")
385+
proxy.CloseBody(logger, httpResp.Body)
385386
continue
386387
}
387-
388388
// Ensure the supplier operator address matches the expected address
389389
if flagRelaySupplier == "" {
390-
if flagRelayRequestCount == 1 {
391-
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
392-
}
390+
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
393391
} else if supplierSignerAddress != flagRelaySupplier {
394392
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
393+
proxy.CloseBody(logger, httpResp.Body)
395394
continue
396395
}
397396

398-
// Validate the relay response
399-
relayResp, err := sdk.ValidateRelayResponse(
400-
ctx,
401-
sdk.SupplierAddress(supplierSignerAddress),
402-
respBz,
403-
&accountClient,
404-
)
405-
if err != nil {
406-
logger.Error().Err(err).Msgf("❌ Error validating response %d", i)
397+
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
398+
399+
// Handle response according to type
400+
if proxy.IsStreamingResponse(httpResp) {
401+
streamErr := processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
402+
proxy.CloseBody(logger, httpResp.Body)
403+
if streamErr != nil {
404+
logger.Error().Err(streamErr).Msg("❌ Stream errored")
405+
}
406+
} else {
407+
// Normal, non-streaming request
408+
reqErr := processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
409+
proxy.CloseBody(logger, httpResp.Body)
410+
if reqErr != nil {
411+
logger.Error().Err(reqErr).Msg("❌ Request errored")
412+
}
413+
}
414+
415+
// This is intentionally not a defer because the loop could introduce memory leaks,
416+
// performance issues and bad connection management for high flagRelayRequestCount values
417+
proxy.CloseBody(logger, httpResp.Body)
418+
}
419+
420+
return nil
421+
}
422+
423+
// Handles the Pocket Network stream response from a Relay Miner.
424+
//
425+
// This functions uses an scanner that chunks the incomming response using the
426+
// defined split function.
427+
// Then it checks if the chunk is correctly signed, and tries to unmarshal it
428+
// if the stream is of type SSE.
429+
func processStreamRequest(ctx context.Context,
430+
httpResp *http.Response,
431+
supplierSignerAddress string,
432+
accountClient sdk.AccountClient,
433+
logger polylog.Logger) error {
434+
logger.Info().Msgf("🌊 Handling streaming response with status:")
435+
436+
// Check if this is SSE (used below, if this is SSE we will unmarshal)
437+
isSSE := strings.Contains(strings.ToLower(httpResp.Header.Get("Content-Type")), "text/event-stream")
438+
if isSSE {
439+
logger.Info().Msgf("🔍 Detected SSE stream, we will try to unmarshal.")
440+
}
441+
442+
// Start handling the body chunks
443+
scanner := bufio.NewScanner(httpResp.Body)
444+
// Assign the custom stream splitter
445+
scanner.Split(proxy.ScanEvents)
446+
// Scan
447+
for scanner.Scan() {
448+
// Get chunck
449+
line := scanner.Bytes()
450+
if len(line) == 0 {
407451
continue
408452
}
409-
// Deserialize the relay response
410-
backendHttpResponse, err := sdktypes.DeserializeHTTPResponse(relayResp.Payload)
453+
logger.Info().Msgf("📦 Read chunk of length %d", len(line))
454+
// Check and retrieve backend chunk
455+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, line, accountClient, logger)
411456
if err != nil {
412-
logger.Error().Err(err).Msgf("❌ Error deserializing response payload %d", i)
413-
continue
457+
return err
414458
}
415459

416-
// Unmarshal the HTTP response body into jsonMap
417-
var jsonMap map[string]interface{}
418-
if err := json.Unmarshal(backendHttpResponse.BodyBz, &jsonMap); err != nil {
419-
logger.Error().Err(err).Msgf("❌ Error unmarshaling response into a JSON map %d", i)
460+
// get string body
461+
stringBody := string(backendHttpResponse.BodyBz)
462+
463+
if !isSSE {
464+
// Just print content and continue
465+
logger.Info().Msgf("Chunk String Content: %s", stringBody)
420466
continue
421467
}
422468

423-
// Log response details
424-
if flagRelayRequestCount > 1 {
425-
logger.Info().Msgf("✅ Request %d: Status code %d, Response size %d bytes", i, backendHttpResponse.StatusCode, len(respBz))
469+
// This is SSE, unmarshal
470+
trimmedPrefix := strings.TrimPrefix(stringBody, "data: ")
471+
stringJson := strings.TrimSuffix(trimmedPrefix, "\n")
472+
if len(stringJson) == 0 {
473+
// this was probably a delimiter
474+
continue
475+
} else if stringJson == "[DONE]" {
476+
// SSE end
477+
logger.Info().Msgf("✅ SSE Done")
426478
} else {
427-
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
428-
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
429-
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
430-
}
431-
432-
// If "jsonrpc" key exists, try to further deserialize "result".
433-
// Only do this once for the first request.
434-
if flagRelayRequestCount == 1 || i == 1 {
435-
if _, ok := jsonMap["jsonrpc"]; ok {
436-
resultRaw, exists := jsonMap["result"]
437-
if exists {
438-
switch v := resultRaw.(type) {
439-
case map[string]interface{}:
440-
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
441-
case []interface{}:
442-
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
443-
case string:
444-
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
445-
case float64, bool, nil:
446-
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
447-
default:
448-
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
449-
}
450-
}
451-
if flagRelayRequestCount > 1 {
452-
logger.Debug().Msg("⚠️ Will be skipping JSON-RPC deserialization for subsequent requests")
453-
}
479+
// Umarshal
480+
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
481+
if err != nil {
482+
logger.Info().Msgf("Received: %s | Stripped: %s", stringBody, stringJson)
483+
return err
454484
}
455485
}
456486
}
487+
return nil
488+
}
489+
490+
func processNormalRequest(ctx context.Context,
491+
httpResp *http.Response,
492+
supplierSignerAddress string,
493+
accountClient sdk.AccountClient,
494+
logger polylog.Logger) error {
495+
// Read the response
496+
respBz, err := io.ReadAll(httpResp.Body)
497+
if err != nil {
498+
logger.Error().Err(err).Msg("❌ Error reading response")
499+
return err
500+
}
501+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
457502

503+
// Check signature and get backend response
504+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, respBz, accountClient, logger)
505+
if err != nil {
506+
return err
507+
}
508+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
509+
510+
// Log response details
458511
if flagRelayRequestCount > 1 {
459-
logger.Info().Msgf("✅ Successfully sent %d relay requests", flagRelayRequestCount)
512+
logger.Info().Msgf("✅ Status code %d, Response size %d bytes", backendHttpResponse.StatusCode, len(respBz))
513+
} else {
514+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
515+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
516+
}
517+
518+
err = unmarshalAndPrintResponse(backendHttpResponse.BodyBz, logger)
519+
if err != nil {
520+
return err
521+
}
522+
523+
return nil
524+
}
525+
526+
func checkAndGetBackendResponse(ctx context.Context,
527+
supplierSignerAddress string,
528+
respBz []byte,
529+
accountClient sdk.AccountClient,
530+
logger polylog.Logger) (backendHttpResponse *sdktypes.POKTHTTPResponse, err error) {
531+
// Validate the relay response
532+
relayResp, err := sdk.ValidateRelayResponse(
533+
ctx,
534+
sdk.SupplierAddress(supplierSignerAddress),
535+
respBz,
536+
&accountClient,
537+
)
538+
if err != nil {
539+
logger.Error().Err(err).Msg("❌ Error validating response")
540+
return
541+
}
542+
// Deserialize the relay response
543+
backendHttpResponse, err = sdktypes.DeserializeHTTPResponse(relayResp.Payload)
544+
if err != nil {
545+
logger.Error().Err(err).Msg("❌ Error deserializing response payload")
546+
return
547+
}
548+
549+
return
550+
}
551+
552+
func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
553+
var jsonMap map[string]interface{}
554+
// Unmarshal the HTTP response body into jsonMap
555+
if err := json.Unmarshal(BodyBz, &jsonMap); err != nil {
556+
logger.Error().Err(err).Msg("❌ Error deserializing backend response payload")
557+
return err
558+
}
559+
560+
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
561+
562+
// If "jsonrpc" key exists, try to further deserialize "result"
563+
if _, ok := jsonMap["jsonrpc"]; ok {
564+
resultRaw, exists := jsonMap["result"]
565+
if exists {
566+
switch v := resultRaw.(type) {
567+
case map[string]interface{}:
568+
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
569+
case []interface{}:
570+
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
571+
case string:
572+
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
573+
case float64, bool, nil:
574+
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
575+
default:
576+
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
577+
}
578+
}
460579
}
461580

462581
return nil

0 commit comments

Comments
 (0)