Skip to content

Commit 4db0ec3

Browse files
committed
Add stream support for CLI relayminer relay cmd
1 parent fb8ad79 commit 4db0ec3

File tree

3 files changed

+429
-140
lines changed

3 files changed

+429
-140
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 199 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package cmd
33

44
import (
5+
"bufio"
56
"bytes"
67
"context"
78
"encoding/json"
@@ -12,6 +13,8 @@ import (
1213
"net/http"
1314
"net/url"
1415
"os"
16+
"strings"
17+
"time"
1518

1619
"github.com/cosmos/cosmos-sdk/client"
1720
cosmosflags "github.com/cosmos/cosmos-sdk/client/flags"
@@ -23,6 +26,7 @@ import (
2326
"github.com/pokt-network/poktroll/cmd/flags"
2427
"github.com/pokt-network/poktroll/pkg/polylog"
2528
"github.com/pokt-network/poktroll/pkg/polylog/polyzero"
29+
"github.com/pokt-network/poktroll/pkg/relayer/proxy"
2630
apptypes "github.com/pokt-network/poktroll/x/application/types"
2731
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
2832
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
@@ -177,7 +181,12 @@ func runRelay(cmd *cobra.Command, args []string) error {
177181
logger.Error().Err(err).Msg("❌ Error connecting to gRPC")
178182
return err
179183
}
180-
defer grpcConn.Close()
184+
defer func(grpcConn *grpc.ClientConn) {
185+
err := grpcConn.Close()
186+
if err != nil {
187+
logger.Error().Err(err).Msg("❌ Error closing gRPC connection")
188+
}
189+
}(grpcConn)
181190
logger.Info().Msgf("✅ gRPC connection initialized: %v", grpcConn)
182191

183192
// Create a connection to the POKT full node
@@ -356,124 +365,234 @@ func runRelay(cmd *cobra.Command, args []string) error {
356365
}
357366
logger.Info().Msgf("✅ Endpoint URL parsed: %v", reqUrl)
358367

368+
// Create http client
369+
backendClient := &http.Client{
370+
Timeout: 600 * time.Second,
371+
}
372+
373+
ctxWithTimeout, cancelFn := context.WithTimeout(context.Background(), 600*time.Second)
374+
defer cancelFn()
375+
359376
// Send multiple requests sequentially as specified by the count flag
360377
for i := 1; i <= flagRelayRequestCount; i++ {
361-
if flagRelayRequestCount > 1 {
362-
logger.Info().Msgf("📤 Sending request %d of %d", i, flagRelayRequestCount)
363-
}
364378

365379
// Create the HTTP request with the relay request body
366-
httpReq := &http.Request{
367-
Method: http.MethodPost,
368-
URL: reqUrl,
369-
Header: http.Header{
370-
"Content-Type": []string{"application/json"},
371-
},
372-
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
373-
}
374-
375-
// Send the HTTP request containing the signed relay request
376-
httpResp, err := http.DefaultClient.Do(httpReq)
380+
httpReq, err := http.NewRequestWithContext(
381+
ctxWithTimeout,
382+
http.MethodPost, // This is the method to the Relay Miner node
383+
reqUrl.String(),
384+
io.NopCloser(bytes.NewReader(relayReqBz)),
385+
)
377386
if err != nil {
378-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d", i)
387+
logger.Error().Err(err).Msg("❌ Error creating relay request")
379388
continue
380389
}
381390

382-
if httpResp.StatusCode != http.StatusOK {
383-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d due to response status code %d", i, httpResp.StatusCode)
384-
continue
385-
}
386-
387-
// Read the response
388-
respBz, err := io.ReadAll(httpResp.Body)
391+
// Send the request HTTP request containing the signed relay request
392+
httpResp, err := backendClient.Do(httpReq)
389393
if err != nil {
390-
logger.Error().Err(err).Msgf("❌ Error reading response %d", i)
394+
logger.Error().Err(err).Msg("❌ Error sending relay request")
391395
continue
392396
}
393397

394-
// This is intentionally not a defer because the loop could introduce memory leaks,
395-
// performance issues and bad connection management for high flagRelayRequestCount values
396-
httpResp.Body.Close()
397-
398398
// Ensure the supplier operator signature is present
399399
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
400400
if supplierSignerAddress == "" {
401401
logger.Error().Msg("❌ Supplier operator signature is missing")
402+
proxy.CloseBody(logger, httpResp.Body)
402403
continue
403404
}
404-
405405
// Ensure the supplier operator address matches the expected address
406406
if flagRelaySupplier == "" {
407-
if flagRelayRequestCount == 1 {
408-
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
409-
}
407+
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
410408
} else if supplierSignerAddress != flagRelaySupplier {
411409
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
410+
proxy.CloseBody(logger, httpResp.Body)
412411
continue
413412
}
414413

415-
// Validate the relay response
416-
relayResp, err := sdk.ValidateRelayResponse(
417-
ctx,
418-
sdk.SupplierAddress(supplierSignerAddress),
419-
respBz,
420-
&accountClient,
421-
)
422-
if err != nil {
423-
logger.Error().Err(err).Msgf("❌ Error validating response %d", i)
414+
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
415+
416+
// Handle response according to type
417+
if proxy.IsStreamingResponse(httpResp) {
418+
streamErr := processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
419+
proxy.CloseBody(logger, httpResp.Body)
420+
if streamErr != nil {
421+
logger.Error().Err(streamErr).Msg("❌ Stream errored")
422+
}
423+
} else {
424+
// Normal, non-streaming request
425+
reqErr := processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
426+
proxy.CloseBody(logger, httpResp.Body)
427+
if reqErr != nil {
428+
logger.Error().Err(reqErr).Msg("❌ Request errored")
429+
}
430+
}
431+
432+
// This is intentionally not a defer because the loop could introduce memory leaks,
433+
// performance issues and bad connection management for high flagRelayRequestCount values
434+
proxy.CloseBody(logger, httpResp.Body)
435+
}
436+
437+
return nil
438+
}
439+
440+
// Handles the Pocket Network stream response from a Relay Miner.
441+
//
442+
// This functions uses an scanner that chunks the incomming response using the
443+
// defined split function.
444+
// Then it checks if the chunk is correctly signed, and tries to unmarshal it
445+
// if the stream is of type SSE.
446+
func processStreamRequest(ctx context.Context,
447+
httpResp *http.Response,
448+
supplierSignerAddress string,
449+
accountClient sdk.AccountClient,
450+
logger polylog.Logger) error {
451+
logger.Info().Msgf("🌊 Handling streaming response with status:")
452+
453+
// Check if this is SSE (used below, if this is SSE we will unmarshal)
454+
isSSE := strings.Contains(strings.ToLower(httpResp.Header.Get("Content-Type")), "text/event-stream")
455+
if isSSE {
456+
logger.Info().Msgf("🔍 Detected SSE stream, we will try to unmarshal.")
457+
}
458+
459+
// Start handling the body chunks
460+
scanner := bufio.NewScanner(httpResp.Body)
461+
// Assign the custom stream splitter
462+
scanner.Split(proxy.ScanEvents)
463+
// Scan
464+
for scanner.Scan() {
465+
// Get chunck
466+
line := scanner.Bytes()
467+
if len(line) == 0 {
424468
continue
425469
}
426-
// Deserialize the relay response
427-
backendHttpResponse, err := sdktypes.DeserializeHTTPResponse(relayResp.Payload)
470+
logger.Info().Msgf("📦 Read chunk of length %d", len(line))
471+
// Check and retrieve backend chunk
472+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, line, accountClient, logger)
428473
if err != nil {
429-
logger.Error().Err(err).Msgf("❌ Error deserializing response payload %d", i)
430-
continue
474+
return err
431475
}
432476

433-
// Unmarshal the HTTP response body into jsonMap
434-
var jsonMap map[string]interface{}
435-
if err := json.Unmarshal(backendHttpResponse.BodyBz, &jsonMap); err != nil {
436-
logger.Error().Err(err).Msgf("❌ Error unmarshaling response into a JSON map %d", i)
477+
// get string body
478+
stringBody := string(backendHttpResponse.BodyBz)
479+
480+
if !isSSE {
481+
// Just print content and continue
482+
logger.Info().Msgf("Chunk String Content: %s", stringBody)
437483
continue
438484
}
439485

440-
// Log response details
441-
if flagRelayRequestCount > 1 {
442-
logger.Info().Msgf("✅ Request %d: Status code %d, Response size %d bytes", i, backendHttpResponse.StatusCode, len(respBz))
486+
// This is SSE, unmarshal
487+
trimmedPrefix := strings.TrimPrefix(stringBody, "data: ")
488+
stringJson := strings.TrimSuffix(trimmedPrefix, "\n")
489+
if len(stringJson) == 0 {
490+
// this was probably a delimiter
491+
continue
492+
} else if stringJson == "[DONE]" {
493+
// SSE end
494+
logger.Info().Msgf("✅ SSE Done")
443495
} else {
444-
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
445-
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
446-
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
447-
}
448-
449-
// If "jsonrpc" key exists, try to further deserialize "result".
450-
// Only do this once for the first request.
451-
if flagRelayRequestCount == 1 || i == 1 {
452-
if _, ok := jsonMap["jsonrpc"]; ok {
453-
resultRaw, exists := jsonMap["result"]
454-
if exists {
455-
switch v := resultRaw.(type) {
456-
case map[string]interface{}:
457-
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
458-
case []interface{}:
459-
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
460-
case string:
461-
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
462-
case float64, bool, nil:
463-
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
464-
default:
465-
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
466-
}
467-
}
468-
if flagRelayRequestCount > 1 {
469-
logger.Debug().Msg("⚠️ Will be skipping JSON-RPC deserialization for subsequent requests")
470-
}
496+
// Umarshal
497+
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
498+
if err != nil {
499+
logger.Info().Msgf("Received: %s | Stripped: %s", stringBody, stringJson)
500+
return err
471501
}
472502
}
473503
}
504+
return nil
505+
}
506+
507+
func processNormalRequest(ctx context.Context,
508+
httpResp *http.Response,
509+
supplierSignerAddress string,
510+
accountClient sdk.AccountClient,
511+
logger polylog.Logger) error {
512+
// Read the response
513+
respBz, err := io.ReadAll(httpResp.Body)
514+
if err != nil {
515+
logger.Error().Err(err).Msg("❌ Error reading response")
516+
return err
517+
}
518+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
519+
520+
// Check signature and get backend response
521+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, respBz, accountClient, logger)
522+
if err != nil {
523+
return err
524+
}
525+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
474526

527+
// Log response details
475528
if flagRelayRequestCount > 1 {
476-
logger.Info().Msgf("✅ Successfully sent %d relay requests", flagRelayRequestCount)
529+
logger.Info().Msgf("✅ Status code %d, Response size %d bytes", backendHttpResponse.StatusCode, len(respBz))
530+
} else {
531+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
532+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
533+
}
534+
535+
err = unmarshalAndPrintResponse(backendHttpResponse.BodyBz, logger)
536+
if err != nil {
537+
return err
538+
}
539+
540+
return nil
541+
}
542+
543+
func checkAndGetBackendResponse(ctx context.Context,
544+
supplierSignerAddress string,
545+
respBz []byte,
546+
accountClient sdk.AccountClient,
547+
logger polylog.Logger) (backendHttpResponse *sdktypes.POKTHTTPResponse, err error) {
548+
// Validate the relay response
549+
relayResp, err := sdk.ValidateRelayResponse(
550+
ctx,
551+
sdk.SupplierAddress(supplierSignerAddress),
552+
respBz,
553+
&accountClient,
554+
)
555+
if err != nil {
556+
logger.Error().Err(err).Msg("❌ Error validating response")
557+
return
558+
}
559+
// Deserialize the relay response
560+
backendHttpResponse, err = sdktypes.DeserializeHTTPResponse(relayResp.Payload)
561+
if err != nil {
562+
logger.Error().Err(err).Msg("❌ Error deserializing response payload")
563+
return
564+
}
565+
566+
return
567+
}
568+
569+
func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
570+
var jsonMap map[string]interface{}
571+
// Unmarshal the HTTP response body into jsonMap
572+
if err := json.Unmarshal(BodyBz, &jsonMap); err != nil {
573+
logger.Error().Err(err).Msg("❌ Error deserializing backend response payload")
574+
return err
575+
}
576+
577+
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
578+
579+
// If "jsonrpc" key exists, try to further deserialize "result"
580+
if _, ok := jsonMap["jsonrpc"]; ok {
581+
resultRaw, exists := jsonMap["result"]
582+
if exists {
583+
switch v := resultRaw.(type) {
584+
case map[string]interface{}:
585+
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
586+
case []interface{}:
587+
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
588+
case string:
589+
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
590+
case float64, bool, nil:
591+
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
592+
default:
593+
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
594+
}
595+
}
477596
}
478597

479598
return nil

0 commit comments

Comments
 (0)