Skip to content

Commit 4096fd7

Browse files
committed
Add stream support for CLI relayminer relay cmd
1 parent a631ef5 commit 4096fd7

File tree

3 files changed

+429
-140
lines changed

3 files changed

+429
-140
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 199 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package cmd
33

44
import (
5+
"bufio"
56
"bytes"
67
"context"
78
"encoding/json"
@@ -12,6 +13,8 @@ import (
1213
"net/http"
1314
"net/url"
1415
"os"
16+
"strings"
17+
"time"
1518

1619
"github.com/cosmos/cosmos-sdk/client"
1720
cosmosflags "github.com/cosmos/cosmos-sdk/client/flags"
@@ -23,6 +26,7 @@ import (
2326
"github.com/pokt-network/poktroll/cmd/flags"
2427
"github.com/pokt-network/poktroll/pkg/polylog"
2528
"github.com/pokt-network/poktroll/pkg/polylog/polyzero"
29+
"github.com/pokt-network/poktroll/pkg/relayer/proxy"
2630
apptypes "github.com/pokt-network/poktroll/x/application/types"
2731
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
2832
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
@@ -178,7 +182,12 @@ func runRelay(cmd *cobra.Command, args []string) error {
178182
logger.Error().Err(err).Msg("❌ Error connecting to gRPC")
179183
return err
180184
}
181-
defer grpcConn.Close()
185+
defer func(grpcConn *grpc.ClientConn) {
186+
err := grpcConn.Close()
187+
if err != nil {
188+
logger.Error().Err(err).Msg("❌ Error closing gRPC connection")
189+
}
190+
}(grpcConn)
182191
logger.Info().Msgf("✅ gRPC connection initialized: %v", grpcConn)
183192

184193
// Create a connection to the POKT full node
@@ -357,124 +366,234 @@ func runRelay(cmd *cobra.Command, args []string) error {
357366
}
358367
logger.Info().Msgf("✅ Endpoint URL parsed: %v", reqUrl)
359368

369+
// Create http client
370+
backendClient := &http.Client{
371+
Timeout: 600 * time.Second,
372+
}
373+
374+
ctxWithTimeout, cancelFn := context.WithTimeout(context.Background(), 600*time.Second)
375+
defer cancelFn()
376+
360377
// Send multiple requests sequentially as specified by the count flag
361378
for i := 1; i <= flagRelayRequestCount; i++ {
362-
if flagRelayRequestCount > 1 {
363-
logger.Info().Msgf("📤 Sending request %d of %d", i, flagRelayRequestCount)
364-
}
365379

366380
// Create the HTTP request with the relay request body
367-
httpReq := &http.Request{
368-
Method: http.MethodPost,
369-
URL: reqUrl,
370-
Header: http.Header{
371-
"Content-Type": []string{"application/json"},
372-
},
373-
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
374-
}
375-
376-
// Send the HTTP request containing the signed relay request
377-
httpResp, err := http.DefaultClient.Do(httpReq)
381+
httpReq, err := http.NewRequestWithContext(
382+
ctxWithTimeout,
383+
http.MethodPost, // This is the method to the Relay Miner node
384+
reqUrl.String(),
385+
io.NopCloser(bytes.NewReader(relayReqBz)),
386+
)
378387
if err != nil {
379-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d", i)
388+
logger.Error().Err(err).Msg("❌ Error creating relay request")
380389
continue
381390
}
382391

383-
if httpResp.StatusCode != http.StatusOK {
384-
logger.Error().Err(err).Msgf("❌ Error sending relay request %d due to response status code %d", i, httpResp.StatusCode)
385-
continue
386-
}
387-
388-
// Read the response
389-
respBz, err := io.ReadAll(httpResp.Body)
392+
// Send the request HTTP request containing the signed relay request
393+
httpResp, err := backendClient.Do(httpReq)
390394
if err != nil {
391-
logger.Error().Err(err).Msgf("❌ Error reading response %d", i)
395+
logger.Error().Err(err).Msg("❌ Error sending relay request")
392396
continue
393397
}
394398

395-
// This is intentionally not a defer because the loop could introduce memory leaks,
396-
// performance issues and bad connection management for high flagRelayRequestCount values
397-
httpResp.Body.Close()
398-
399399
// Ensure the supplier operator signature is present
400400
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
401401
if supplierSignerAddress == "" {
402402
logger.Error().Msg("❌ Supplier operator signature is missing")
403+
proxy.CloseBody(logger, httpResp.Body)
403404
continue
404405
}
405-
406406
// Ensure the supplier operator address matches the expected address
407407
if flagRelaySupplier == "" {
408-
if flagRelayRequestCount == 1 {
409-
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
410-
}
408+
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
411409
} else if supplierSignerAddress != flagRelaySupplier {
412410
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
411+
proxy.CloseBody(logger, httpResp.Body)
413412
continue
414413
}
415414

416-
// Validate the relay response
417-
relayResp, err := sdk.ValidateRelayResponse(
418-
ctx,
419-
sdk.SupplierAddress(supplierSignerAddress),
420-
respBz,
421-
&accountClient,
422-
)
423-
if err != nil {
424-
logger.Error().Err(err).Msgf("❌ Error validating response %d", i)
415+
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
416+
417+
// Handle response according to type
418+
if proxy.IsStreamingResponse(httpResp) {
419+
streamErr := processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
420+
proxy.CloseBody(logger, httpResp.Body)
421+
if streamErr != nil {
422+
logger.Error().Err(streamErr).Msg("❌ Stream errored")
423+
}
424+
} else {
425+
// Normal, non-streaming request
426+
reqErr := processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
427+
proxy.CloseBody(logger, httpResp.Body)
428+
if reqErr != nil {
429+
logger.Error().Err(reqErr).Msg("❌ Request errored")
430+
}
431+
}
432+
433+
// This is intentionally not a defer because the loop could introduce memory leaks,
434+
// performance issues and bad connection management for high flagRelayRequestCount values
435+
proxy.CloseBody(logger, httpResp.Body)
436+
}
437+
438+
return nil
439+
}
440+
441+
// Handles the Pocket Network stream response from a Relay Miner.
442+
//
443+
// This functions uses an scanner that chunks the incomming response using the
444+
// defined split function.
445+
// Then it checks if the chunk is correctly signed, and tries to unmarshal it
446+
// if the stream is of type SSE.
447+
func processStreamRequest(ctx context.Context,
448+
httpResp *http.Response,
449+
supplierSignerAddress string,
450+
accountClient sdk.AccountClient,
451+
logger polylog.Logger) error {
452+
logger.Info().Msgf("🌊 Handling streaming response with status:")
453+
454+
// Check if this is SSE (used below, if this is SSE we will unmarshal)
455+
isSSE := strings.Contains(strings.ToLower(httpResp.Header.Get("Content-Type")), "text/event-stream")
456+
if isSSE {
457+
logger.Info().Msgf("🔍 Detected SSE stream, we will try to unmarshal.")
458+
}
459+
460+
// Start handling the body chunks
461+
scanner := bufio.NewScanner(httpResp.Body)
462+
// Assign the custom stream splitter
463+
scanner.Split(proxy.ScanEvents)
464+
// Scan
465+
for scanner.Scan() {
466+
// Get chunck
467+
line := scanner.Bytes()
468+
if len(line) == 0 {
425469
continue
426470
}
427-
// Deserialize the relay response
428-
backendHttpResponse, err := sdktypes.DeserializeHTTPResponse(relayResp.Payload)
471+
logger.Info().Msgf("📦 Read chunk of length %d", len(line))
472+
// Check and retrieve backend chunk
473+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, line, accountClient, logger)
429474
if err != nil {
430-
logger.Error().Err(err).Msgf("❌ Error deserializing response payload %d", i)
431-
continue
475+
return err
432476
}
433477

434-
// Unmarshal the HTTP response body into jsonMap
435-
var jsonMap map[string]interface{}
436-
if err := json.Unmarshal(backendHttpResponse.BodyBz, &jsonMap); err != nil {
437-
logger.Error().Err(err).Msgf("❌ Error unmarshaling response into a JSON map %d", i)
478+
// get string body
479+
stringBody := string(backendHttpResponse.BodyBz)
480+
481+
if !isSSE {
482+
// Just print content and continue
483+
logger.Info().Msgf("Chunk String Content: %s", stringBody)
438484
continue
439485
}
440486

441-
// Log response details
442-
if flagRelayRequestCount > 1 {
443-
logger.Info().Msgf("✅ Request %d: Status code %d, Response size %d bytes", i, backendHttpResponse.StatusCode, len(respBz))
487+
// This is SSE, unmarshal
488+
trimmedPrefix := strings.TrimPrefix(stringBody, "data: ")
489+
stringJson := strings.TrimSuffix(trimmedPrefix, "\n")
490+
if len(stringJson) == 0 {
491+
// this was probably a delimiter
492+
continue
493+
} else if stringJson == "[DONE]" {
494+
// SSE end
495+
logger.Info().Msgf("✅ SSE Done")
444496
} else {
445-
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
446-
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
447-
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
448-
}
449-
450-
// If "jsonrpc" key exists, try to further deserialize "result".
451-
// Only do this once for the first request.
452-
if flagRelayRequestCount == 1 || i == 1 {
453-
if _, ok := jsonMap["jsonrpc"]; ok {
454-
resultRaw, exists := jsonMap["result"]
455-
if exists {
456-
switch v := resultRaw.(type) {
457-
case map[string]interface{}:
458-
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
459-
case []interface{}:
460-
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
461-
case string:
462-
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
463-
case float64, bool, nil:
464-
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
465-
default:
466-
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
467-
}
468-
}
469-
if flagRelayRequestCount > 1 {
470-
logger.Debug().Msg("⚠️ Will be skipping JSON-RPC deserialization for subsequent requests")
471-
}
497+
// Umarshal
498+
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
499+
if err != nil {
500+
logger.Info().Msgf("Received: %s | Stripped: %s", stringBody, stringJson)
501+
return err
472502
}
473503
}
474504
}
505+
return nil
506+
}
507+
508+
func processNormalRequest(ctx context.Context,
509+
httpResp *http.Response,
510+
supplierSignerAddress string,
511+
accountClient sdk.AccountClient,
512+
logger polylog.Logger) error {
513+
// Read the response
514+
respBz, err := io.ReadAll(httpResp.Body)
515+
if err != nil {
516+
logger.Error().Err(err).Msg("❌ Error reading response")
517+
return err
518+
}
519+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
520+
521+
// Check signature and get backend response
522+
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, respBz, accountClient, logger)
523+
if err != nil {
524+
return err
525+
}
526+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
475527

528+
// Log response details
476529
if flagRelayRequestCount > 1 {
477-
logger.Info().Msgf("✅ Successfully sent %d relay requests", flagRelayRequestCount)
530+
logger.Info().Msgf("✅ Status code %d, Response size %d bytes", backendHttpResponse.StatusCode, len(respBz))
531+
} else {
532+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
533+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
534+
}
535+
536+
err = unmarshalAndPrintResponse(backendHttpResponse.BodyBz, logger)
537+
if err != nil {
538+
return err
539+
}
540+
541+
return nil
542+
}
543+
544+
func checkAndGetBackendResponse(ctx context.Context,
545+
supplierSignerAddress string,
546+
respBz []byte,
547+
accountClient sdk.AccountClient,
548+
logger polylog.Logger) (backendHttpResponse *sdktypes.POKTHTTPResponse, err error) {
549+
// Validate the relay response
550+
relayResp, err := sdk.ValidateRelayResponse(
551+
ctx,
552+
sdk.SupplierAddress(supplierSignerAddress),
553+
respBz,
554+
&accountClient,
555+
)
556+
if err != nil {
557+
logger.Error().Err(err).Msg("❌ Error validating response")
558+
return
559+
}
560+
// Deserialize the relay response
561+
backendHttpResponse, err = sdktypes.DeserializeHTTPResponse(relayResp.Payload)
562+
if err != nil {
563+
logger.Error().Err(err).Msg("❌ Error deserializing response payload")
564+
return
565+
}
566+
567+
return
568+
}
569+
570+
func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
571+
var jsonMap map[string]interface{}
572+
// Unmarshal the HTTP response body into jsonMap
573+
if err := json.Unmarshal(BodyBz, &jsonMap); err != nil {
574+
logger.Error().Err(err).Msg("❌ Error deserializing backend response payload")
575+
return err
576+
}
577+
578+
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
579+
580+
// If "jsonrpc" key exists, try to further deserialize "result"
581+
if _, ok := jsonMap["jsonrpc"]; ok {
582+
resultRaw, exists := jsonMap["result"]
583+
if exists {
584+
switch v := resultRaw.(type) {
585+
case map[string]interface{}:
586+
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
587+
case []interface{}:
588+
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
589+
case string:
590+
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
591+
case float64, bool, nil:
592+
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
593+
default:
594+
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
595+
}
596+
}
478597
}
479598

480599
return nil

0 commit comments

Comments
 (0)