Skip to content

Commit 0836e61

Browse files
committed
fixed wromg mege | dropped cmd_relay suport
1 parent a34b558 commit 0836e61

File tree

2 files changed

+84
-241
lines changed

2 files changed

+84
-241
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 83 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package cmd
33

44
import (
5-
"bufio"
65
"bytes"
76
"context"
87
"encoding/json"
@@ -14,8 +13,6 @@ import (
1413
"net/url"
1514
"os"
1615
"time"
17-
"strings"
18-
"time"
1916

2017
"github.com/cosmos/cosmos-sdk/client"
2118
cosmosflags "github.com/cosmos/cosmos-sdk/client/flags"
@@ -27,7 +24,6 @@ import (
2724
"github.com/pokt-network/poktroll/cmd/flags"
2825
"github.com/pokt-network/poktroll/pkg/polylog"
2926
"github.com/pokt-network/poktroll/pkg/polylog/polyzero"
30-
"github.com/pokt-network/poktroll/pkg/relayer/proxy"
3127
apptypes "github.com/pokt-network/poktroll/x/application/types"
3228
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
3329
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
@@ -182,12 +178,7 @@ func runRelay(cmd *cobra.Command, args []string) error {
182178
logger.Error().Err(err).Msg("❌ Error connecting to gRPC")
183179
return err
184180
}
185-
defer func(grpcConn *grpc.ClientConn) {
186-
err = grpcConn.Close()
187-
if err != nil {
188-
logger.Error().Err(err).Msg("❌ Error closing gRPC connection")
189-
}
190-
}(grpcConn)
181+
defer grpcConn.Close()
191182
logger.Info().Msgf("✅ gRPC connection initialized: %v", grpcConn)
192183

193184
// Create a connection to the POKT full node
@@ -390,34 +381,20 @@ func runRelay(cmd *cobra.Command, args []string) error {
390381

391382
beforeRequestSendingTime := time.Now()
392383

393-
// Parse the endpoint URL
394-
reqUrl, err := url.Parse(endpointUrl)
395-
if err != nil {
396-
logger.Error().Err(err).Msg("❌ Error parsing endpoint URL")
397-
return err
398-
}
399-
logger.Info().Msgf("✅ Endpoint URL parsed: %v", reqUrl)
400-
401-
// Create http client
402-
backendClient := &http.Client{
403-
Timeout: 600 * time.Second,
404-
}
405-
406-
ctxWithTimeout, cancelFn := context.WithTimeout(context.Background(), 600*time.Second)
407-
defer cancelFn()
408-
409-
// Send multiple requests sequentially as specified by the count flag
410-
for i := 1; i <= flagRelayRequestCount; i++ {
411-
412384
// Create the HTTP request with the relay request body
413-
httpReq, err := http.NewRequestWithContext(
414-
ctxWithTimeout,
415-
http.MethodPost, // This is the method to the Relay Miner node
416-
reqUrl.String(),
417-
io.NopCloser(bytes.NewReader(relayReqBz)),
418-
)
385+
httpReq := &http.Request{
386+
Method: http.MethodPost,
387+
URL: reqUrl,
388+
Header: http.Header{
389+
"Content-Type": []string{"application/json"},
390+
},
391+
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
392+
}
393+
394+
// Send the HTTP request containing the signed relay request
395+
httpResp, err := http.DefaultClient.Do(httpReq)
419396
if err != nil {
420-
logger.Error().Err(err).Msg("❌ Error creating relay request")
397+
logger.Error().Err(err).Msgf("❌ Error sending relay request %d", i)
421398
continue
422399
}
423400

@@ -434,213 +411,110 @@ func runRelay(cmd *cobra.Command, args []string) error {
434411
// Read the response
435412
respBz, err := io.ReadAll(httpResp.Body)
436413
if err != nil {
437-
logger.Error().Err(err).Msg("❌ Error sending relay request")
414+
logger.Error().Err(err).Msgf("❌ Error reading response %d", i)
438415
continue
439416
}
440417

418+
// This is intentionally not a defer because the loop could introduce memory leaks,
419+
// performance issues and bad connection management for high flagRelayRequestCount values
420+
httpResp.Body.Close()
421+
441422
// Ensure the supplier operator signature is present
442423
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
443424
if supplierSignerAddress == "" {
444425
logger.Error().Msg("❌ Supplier operator signature is missing")
445-
proxy.CloseBody(logger, httpResp.Body)
446426
continue
447427
}
428+
448429
// Ensure the supplier operator address matches the expected address
449430
if flagRelaySupplier == "" {
450-
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
431+
if flagRelayRequestCount == 1 {
432+
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
433+
}
451434
} else if supplierSignerAddress != flagRelaySupplier {
452435
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
453-
proxy.CloseBody(logger, httpResp.Body)
454436
continue
455437
}
456438

457-
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
458-
459-
// Handle response according to type
460-
if proxy.IsStreamingResponse(httpResp) {
461-
streamErr := processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
462-
proxy.CloseBody(logger, httpResp.Body)
463-
if streamErr != nil {
464-
logger.Error().Err(streamErr).Msg("❌ Stream errored")
465-
}
466-
} else {
467-
// Normal, non-streaming request
468-
reqErr := processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
469-
proxy.CloseBody(logger, httpResp.Body)
470-
if reqErr != nil {
471-
logger.Error().Err(reqErr).Msg("❌ Request errored")
472-
}
473-
}
474-
475-
// This is intentionally not a defer because the loop could introduce memory leaks,
476-
// performance issues and bad connection management for high flagRelayRequestCount values
477-
proxy.CloseBody(logger, httpResp.Body)
478-
}
479-
480-
return nil
481-
}
439+
responseReadDuration := time.Since(beforeResponseReadTime)
440+
logger.Info().Msgf("⏱️ Response building duration: %s", responseReadDuration)
482441

483-
// Handles the Pocket Network stream response from a Relay Miner.
484-
//
485-
// This functions uses an scanner that chunks the incomming response using the
486-
// defined split function.
487-
// Then it checks if the chunk is correctly signed, and tries to unmarshal it
488-
// if the stream is of type SSE.
489-
func processStreamRequest(ctx context.Context,
490-
httpResp *http.Response,
491-
supplierSignerAddress string,
492-
accountClient sdk.AccountClient,
493-
logger polylog.Logger) error {
494-
logger.Info().Msgf("🌊 Handling streaming response with status:")
495-
496-
// Check if this is SSE (used below, if this is SSE we will unmarshal)
497-
isSSE := strings.Contains(strings.ToLower(httpResp.Header.Get("Content-Type")), "text/event-stream")
498-
if isSSE {
499-
logger.Info().Msgf("🔍 Detected SSE stream, we will try to unmarshal.")
500-
}
442+
beforeResponseVerificationTime := time.Now()
501443

502-
// Start handling the body chunks
503-
scanner := bufio.NewScanner(httpResp.Body)
504-
// Assign the custom stream splitter
505-
scanner.Split(proxy.ScanEvents)
506-
// Scan
507-
for scanner.Scan() {
508-
// Get chunck
509-
line := scanner.Bytes()
510-
if len(line) == 0 {
511-
continue
512-
}
513-
logger.Info().Msgf("📦 Read chunk of length %d", len(line))
514-
// Check and retrieve backend chunk
515-
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, line, accountClient, logger)
444+
// Validate the relay response
445+
relayResp, err := sdk.ValidateRelayResponse(
446+
ctx,
447+
sdk.SupplierAddress(supplierSignerAddress),
448+
respBz,
449+
&accountClient,
450+
)
516451
if err != nil {
517-
return err
452+
logger.Error().Err(err).Msgf("❌ Error validating response %d", i)
453+
continue
518454
}
519455

520-
// get string body
521-
stringBody := string(backendHttpResponse.BodyBz)
456+
responseVerificationDuration := time.Since(beforeResponseVerificationTime)
457+
logger.Info().Msgf("⏱️ Response verification duration: %s", responseVerificationDuration)
458+
459+
beforeBackendResponseExtractionTime := time.Now()
522460

523-
if !isSSE {
524-
// Just print content and continue
525-
logger.Info().Msgf("Chunk String Content: %s", stringBody)
461+
// Deserialize the relay response
462+
backendHttpResponse, err := sdktypes.DeserializeHTTPResponse(relayResp.Payload)
463+
if err != nil {
464+
logger.Error().Err(err).Msgf("❌ Error deserializing response payload %d", i)
526465
continue
527466
}
528467

529-
responseReadDuration := time.Since(beforeResponseReadTime)
530-
logger.Info().Msgf("⏱️ Response building duration: %s", responseReadDuration)
468+
backendResponseExtractionDuration := time.Since(beforeBackendResponseExtractionTime)
469+
logger.Info().Msgf("⏱️ Backend response extraction duration: %s", backendResponseExtractionDuration)
531470

532-
beforeResponseVerificationTime := time.Now()
471+
totalRequestDuration := time.Since(beforeRequestPreparationTime)
472+
logger.Info().Msgf("⏱️ Total request duration: %s", totalRequestDuration)
533473

534-
// This is SSE, unmarshal
535-
trimmedPrefix := strings.TrimPrefix(stringBody, "data: ")
536-
stringJson := strings.TrimSuffix(trimmedPrefix, "\n")
537-
if len(stringJson) == 0 {
538-
// this was probably a delimiter
474+
// Unmarshal the HTTP response body into jsonMap
475+
var jsonMap map[string]interface{}
476+
if err := json.Unmarshal(backendHttpResponse.BodyBz, &jsonMap); err != nil {
477+
logger.Error().Err(err).Msgf("❌ Error unmarshaling response into a JSON map %d", i)
539478
continue
540-
} else if stringJson == "[DONE]" {
541-
// SSE end
542-
logger.Info().Msgf("✅ SSE Done")
543-
} else {
544-
// Umarshal
545-
err = unmarshalAndPrintResponse([]byte(stringJson), logger)
546-
if err != nil {
547-
logger.Info().Msgf("Received: %s | Stripped: %s", stringBody, stringJson)
548-
return err
549-
}
550479
}
551-
}
552-
return nil
553-
}
554480

555-
func processNormalRequest(ctx context.Context,
556-
httpResp *http.Response,
557-
supplierSignerAddress string,
558-
accountClient sdk.AccountClient,
559-
logger polylog.Logger) error {
560-
// Read the response
561-
respBz, err := io.ReadAll(httpResp.Body)
562-
if err != nil {
563-
logger.Error().Err(err).Msg("❌ Error reading response")
564-
return err
565-
}
566-
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
481+
// Log response details
482+
if flagRelayRequestCount > 1 {
483+
logger.Info().Msgf("✅ Request %d: Status code %d, Response size %d bytes", i, backendHttpResponse.StatusCode, len(respBz))
484+
} else {
485+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
486+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
487+
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
488+
}
567489

568-
// Check signature and get backend response
569-
backendHttpResponse, err := checkAndGetBackendResponse(ctx, supplierSignerAddress, respBz, accountClient, logger)
570-
if err != nil {
571-
return err
490+
// If "jsonrpc" key exists, try to further deserialize "result".
491+
// Only do this once for the first request.
492+
if flagRelayRequestCount == 1 || i == 1 {
493+
if _, ok := jsonMap["jsonrpc"]; ok {
494+
resultRaw, exists := jsonMap["result"]
495+
if exists {
496+
switch v := resultRaw.(type) {
497+
case map[string]interface{}:
498+
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
499+
case []interface{}:
500+
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
501+
case string:
502+
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
503+
case float64, bool, nil:
504+
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
505+
default:
506+
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
507+
}
508+
}
509+
if flagRelayRequestCount > 1 {
510+
logger.Debug().Msg("⚠️ Will be skipping JSON-RPC deserialization for subsequent requests")
511+
}
512+
}
513+
}
572514
}
573-
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
574515

575-
// Log response details
576516
if flagRelayRequestCount > 1 {
577-
logger.Info().Msgf("✅ Status code %d, Response size %d bytes", backendHttpResponse.StatusCode, len(respBz))
578-
} else {
579-
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
580-
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
581-
}
582-
583-
err = unmarshalAndPrintResponse(backendHttpResponse.BodyBz, logger)
584-
if err != nil {
585-
return err
586-
}
587-
588-
return nil
589-
}
590-
591-
func checkAndGetBackendResponse(ctx context.Context,
592-
supplierSignerAddress string,
593-
respBz []byte,
594-
accountClient sdk.AccountClient,
595-
logger polylog.Logger) (backendHttpResponse *sdktypes.POKTHTTPResponse, err error) {
596-
// Validate the relay response
597-
relayResp, err := sdk.ValidateRelayResponse(
598-
ctx,
599-
sdk.SupplierAddress(supplierSignerAddress),
600-
respBz,
601-
&accountClient,
602-
)
603-
if err != nil {
604-
logger.Error().Err(err).Msg("❌ Error validating response")
605-
return
606-
}
607-
// Deserialize the relay response
608-
backendHttpResponse, err = sdktypes.DeserializeHTTPResponse(relayResp.Payload)
609-
if err != nil {
610-
logger.Error().Err(err).Msg("❌ Error deserializing response payload")
611-
return
612-
}
613-
614-
return
615-
}
616-
617-
func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
618-
var jsonMap map[string]interface{}
619-
// Unmarshal the HTTP response body into jsonMap
620-
if err := json.Unmarshal(BodyBz, &jsonMap); err != nil {
621-
logger.Error().Err(err).Msg("❌ Error deserializing backend response payload")
622-
return err
623-
}
624-
625-
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
626-
627-
// If "jsonrpc" key exists, try to further deserialize "result"
628-
if _, ok := jsonMap["jsonrpc"]; ok {
629-
resultRaw, exists := jsonMap["result"]
630-
if exists {
631-
switch v := resultRaw.(type) {
632-
case map[string]interface{}:
633-
logger.Info().Msgf("✅ Further deserialized 'result' (object): %+v", v)
634-
case []interface{}:
635-
logger.Info().Msgf("✅ Further deserialized 'result' (array): %+v", v)
636-
case string:
637-
logger.Info().Msgf("✅ Further deserialized 'result' (string): %s", v)
638-
case float64, bool, nil:
639-
logger.Info().Msgf("✅ Further deserialized 'result' (primitive): %+v", v)
640-
default:
641-
logger.Warn().Msgf("⚠️ 'result' is of an unhandled type: %T, value: %+v", v, v)
642-
}
643-
}
517+
logger.Info().Msgf("✅ Successfully sent %d relay requests", flagRelayRequestCount)
644518
}
645519

646520
return nil

0 commit comments

Comments
 (0)