Skip to content

Commit e2031e0

Browse files
committed
Rebase onto main
1 parent a494176 commit e2031e0

File tree

1 file changed

+69
-34
lines changed

1 file changed

+69
-34
lines changed

pkg/relayer/cmd/cmd_relay.go

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,12 @@ func runRelay(cmd *cobra.Command, args []string) error {
162162
logger.Error().Err(err).Msg("❌ Error connecting to gRPC")
163163
return err
164164
}
165-
defer grpcConn.Close()
165+
defer func(grpcConn *grpc.ClientConn) {
166+
err := grpcConn.Close()
167+
if err != nil {
168+
logger.Error().Err(err).Msg("❌ Error closing gRPC connection")
169+
}
170+
}(grpcConn)
166171
logger.Info().Msgf("✅ gRPC connection initialized: %v", grpcConn)
167172

168173
// Create a connection to the POKT full node
@@ -341,43 +346,63 @@ func runRelay(cmd *cobra.Command, args []string) error {
341346
}
342347
logger.Info().Msgf("✅ Endpoint URL parsed: %v", reqUrl)
343348

344-
// Create the HTTP request with the relay request body
345-
httpReq := &http.Request{
346-
Method: http.MethodPost,
347-
URL: reqUrl,
348-
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
349-
}
349+
// Send multiple requests sequentially as specified by the count flag
350+
for i := 1; i <= flagRelayRequestCount; i++ {
351+
// Create the HTTP request with the relay request body
352+
httpReq := &http.Request{
353+
Method: http.MethodPost,
354+
URL: reqUrl,
355+
Body: io.NopCloser(bytes.NewReader(relayReqBz)),
356+
}
350357

351-
// Send the request HTTP request containing the signed relay request
352-
httpResp, err := http.DefaultClient.Do(httpReq)
353-
if err != nil {
354-
logger.Error().Err(err).Msg("❌ Error sending relay request")
355-
return err
356-
}
357-
defer httpResp.Body.Close()
358+
// Send the request HTTP request containing the signed relay request
359+
httpResp, err := http.DefaultClient.Do(httpReq)
360+
if err != nil {
361+
logger.Error().Err(err).Msg("❌ Error sending relay request")
362+
proxy.CloseRequestBody(logger, httpResp.Body)
363+
continue
364+
}
358365

359-
// Ensure the supplier operator signature is present
360-
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
361-
if supplierSignerAddress == "" {
362-
logger.Error().Msg("❌ Supplier operator signature is missing")
363-
return errors.New("Relay response missing supplier operator signature")
364-
}
365-
// Ensure the supplier operator address matches the expected address
366-
if flagRelaySupplier == "" {
367-
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
368-
} else if supplierSignerAddress != flagRelaySupplier {
369-
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
370-
return errors.New("Relay response supplier operator signature does not match")
371-
}
366+
bodyCloseErr := httpResp.Body.Close()
367+
if bodyCloseErr != nil {
368+
logger.Error().Err(bodyCloseErr).Msg("❌ Error closing response body")
369+
proxy.CloseRequestBody(logger, httpResp.Body)
370+
continue
371+
}
372372

373-
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
373+
// Ensure the supplier operator signature is present
374+
supplierSignerAddress := signedRelayReq.Meta.SupplierOperatorAddress
375+
if supplierSignerAddress == "" {
376+
logger.Error().Msg("❌ Supplier operator signature is missing")
377+
proxy.CloseRequestBody(logger, httpResp.Body)
378+
continue
379+
}
380+
// Ensure the supplier operator address matches the expected address
381+
if flagRelaySupplier == "" {
382+
logger.Warn().Msg("⚠️ Supplier operator address not specified, skipping signature check")
383+
} else if supplierSignerAddress != flagRelaySupplier {
384+
logger.Error().Msgf("❌ Supplier operator address %s does not match the expected address %s", supplierSignerAddress, flagRelaySupplier)
385+
proxy.CloseRequestBody(logger, httpResp.Body)
386+
continue
387+
}
374388

375-
// Handle response according to type
376-
if proxy.IsStreamingResponse(httpResp) {
377-
return processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
378-
} else {
379-
// Normal, non-streaming request
380-
return processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
389+
logger.Info().Msgf("🔍 Backend response header, Content-Type: %s", httpResp.Header.Get("Content-Type"))
390+
391+
// Handle response according to type
392+
if proxy.IsStreamingResponse(httpResp) {
393+
streamErr := processStreamRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
394+
proxy.CloseRequestBody(logger, httpResp.Body)
395+
if streamErr != nil {
396+
logger.Error().Err(streamErr).Msg("❌ Stream errored")
397+
}
398+
} else {
399+
// Normal, non-streaming request
400+
reqErr := processNormalRequest(ctx, httpResp, supplierSignerAddress, accountClient, logger)
401+
proxy.CloseRequestBody(logger, httpResp.Body)
402+
if reqErr != nil {
403+
logger.Error().Err(reqErr).Msg("❌ Request errored")
404+
}
405+
}
381406
}
382407

383408
return nil
@@ -470,6 +495,14 @@ func processNormalRequest(ctx context.Context,
470495
}
471496
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
472497

498+
// Log response details
499+
if flagRelayRequestCount > 1 {
500+
logger.Info().Msgf("✅ Status code %d, Response size %d bytes", backendHttpResponse.StatusCode, len(respBz))
501+
} else {
502+
logger.Info().Msgf("✅ Backend response status code: %v", backendHttpResponse.StatusCode)
503+
logger.Info().Msgf("✅ Response read %d bytes", len(respBz))
504+
}
505+
473506
err = unmarshalAndPrintResponse(backendHttpResponse.BodyBz, logger)
474507
if err != nil {
475508
return err
@@ -511,6 +544,7 @@ func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
511544
logger.Error().Err(err).Msg("❌ Error deserializing backend response payload")
512545
return err
513546
}
547+
514548
logger.Info().Msgf("✅ Deserialized response body as JSON map: %+v", jsonMap)
515549

516550
// If "jsonrpc" key exists, try to further deserialize "result"
@@ -534,6 +568,7 @@ func unmarshalAndPrintResponse(BodyBz []byte, logger polylog.Logger) error {
534568

535569
return nil
536570
}
571+
537572
// If a supplier is specified but not in the session, try to fetch it directly.
538573
// TODO_UPNEXT(@olshansk): Add support for sending a relay to a supplier that is not in the session.
539574
// This will require starting a relayminer in debug mode to avoid validating the session header.

0 commit comments

Comments
 (0)