Skip to content

Commit f63af5f

Browse files
Olshanskadshmh
andauthored
[Session Rollovers] Simplify & Consolidate Timeouts (#389)
Testing script: ```bash #!/bin/bash # Script to monitor XRPL EVM testnet block number every 0.2 seconds # Usage: ./block_monitor.sh [--local] # --local: Use localhost endpoint instead of remote # Parse command line arguments USE_LOCAL=false for arg in "$@"; do case $arg in --local) USE_LOCAL=true shift ;; *) echo "Unknown option: $arg" echo "Usage: $0 [--local]" exit 1 ;; esac done # Print configuration mode at the beginning if [ "$USE_LOCAL" = true ]; then echo "🔧 MODE: LOCAL" else echo "🌐 MODE: REMOTE" fi echo "" # Set endpoint based on flag if [ "$USE_LOCAL" = true ]; then ENDPOINT="http://localhost:3070/v1" HEADERS='-H "Target-Service-Id: xrplevm" -H "Authorization: test_api_key"' echo "Starting XRPL EVM testnet block number monitor (LOCAL)..." else ENDPOINT="https://xrpl-evm-testnet.rpc.grove.city/v1/6c5de5ff" HEADERS='-H "Content-Type: application/json"' echo "Starting XRPL EVM testnet block number monitor (REMOTE)..." fi echo "Press Ctrl+C to stop" echo "" # Initialize variables for delta calculation last_request_time=0 while true; do # Get current timestamp for display and delta calculation current_time=$(date +%s.%N) timestamp=$(date '+%Y-%m-%d %H:%M:%S') # Calculate delta from last request if [ "$last_request_time" != "0" ]; then delta=$(echo "$current_time - $last_request_time" | bc -l) delta_formatted=$(printf "%.2f" "$delta") # Color code based on delta (assuming 1s target interval) # Green: <= 1.2s (quick), Yellow: 1.2-2.0s (moderate), Red: > 2.0s (slow) if (( $(echo "$delta <= 1" | bc -l) )); then delta_color="\033[32m" # Green delta_status="⚡" elif (( $(echo "$delta <= 3" | bc -l) )); then delta_color="\033[33m" # Yellow delta_status="⏱️" else delta_color="\033[31m" # Red delta_status="🐌" fi reset_color="\033[0m" delta_display=" ${delta_color}[Δ${delta_formatted}s ${delta_status}]${reset_color}" else delta_display="" fi # Make the curl request and capture both response and error if [ "$USE_LOCAL" = true ]; then response=$(curl -s "$ENDPOINT" \ -H "Target-Service-Id: xrplevm" \ -H "Authorization: test_api_key" \ -d '{"jsonrpc": "2.0", "id": 1, "method": "eth_blockNumber" }' 2>&1) else response=$(curl -s "$ENDPOINT" \ -X POST \ -H "Content-Type: application/json" \ --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' 2>&1) fi # Check if curl command was successful curl_exit_code=$? if [ $curl_exit_code -ne 0 ]; then echo -e "❌ [$timestamp] ERROR: curl failed with exit code $curl_exit_code: $response$delta_display" else # Check if response contains an error if echo "$response" | grep -q '"error"'; then echo -e "❌ [$timestamp] API ERROR: $response$delta_display" else # Extract block number from response (remove 0x prefix and convert from hex) block_hex=$(echo "$response" | grep -o '"result":"0x[^"]*"' | cut -d'"' -f4) if [ -z "$block_hex" ]; then echo -e "❌ [$timestamp] PARSE ERROR: Could not extract block number from response: $response$delta_display" else # Convert hex to decimal, handle potential conversion errors if block_decimal=$((16#${block_hex#0x})) 2>/dev/null; then echo -e "✅ [$timestamp] Block: $block_decimal (hex: $block_hex)$delta_display" else echo -e "❌ [$timestamp] CONVERSION ERROR: Could not convert hex $block_hex to decimal$delta_display" fi fi fi fi # Update last request time for next iteration last_request_time="$current_time" # Wait 0.5 seconds sleep 0.5 done ``` --------- Co-authored-by: Arash Deshmeh <[email protected]>
1 parent c617913 commit f63af5f

File tree

22 files changed

+198
-166
lines changed

22 files changed

+198
-166
lines changed

config/config_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ func Test_LoadGatewayConfigFromYAML(t *testing.T) {
4545
},
4646
},
4747
Router: RouterConfig{
48-
Port: defaultPort,
49-
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
50-
ReadTimeout: defaultHTTPServerReadTimeout,
51-
WriteTimeout: defaultHTTPServerWriteTimeout,
52-
IdleTimeout: defaultHTTPServerIdleTimeout,
48+
Port: defaultPort,
49+
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
50+
ReadTimeout: defaultHTTPServerReadTimeout,
51+
WriteTimeout: defaultHTTPServerWriteTimeout,
52+
IdleTimeout: defaultHTTPServerIdleTimeout,
53+
SystemOverheadAllowanceDuration: defaultSystemOverheadAllowanceDuration,
5354
},
5455
Logger: LoggerConfig{
5556
Level: defaultLogLevel,
@@ -131,11 +132,12 @@ logger_config:
131132
},
132133
},
133134
Router: RouterConfig{
134-
Port: defaultPort,
135-
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
136-
ReadTimeout: defaultHTTPServerReadTimeout,
137-
WriteTimeout: defaultHTTPServerWriteTimeout,
138-
IdleTimeout: defaultHTTPServerIdleTimeout,
135+
Port: defaultPort,
136+
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
137+
ReadTimeout: defaultHTTPServerReadTimeout,
138+
WriteTimeout: defaultHTTPServerWriteTimeout,
139+
IdleTimeout: defaultHTTPServerIdleTimeout,
140+
SystemOverheadAllowanceDuration: defaultSystemOverheadAllowanceDuration,
139141
},
140142
Logger: LoggerConfig{
141143
Level: "debug",

config/router.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"fmt"
45
"time"
56
)
67

@@ -14,25 +15,30 @@ const (
1415
// defaultMaxRequestHeaderBytes is the default maximum size of the HTTP request header.
1516
defaultMaxRequestHeaderBytes = 2 * 1e6 // 2 MB
1617

17-
// HTTP server's default timeout values.
18-
defaultHTTPServerReadTimeout = 10 * time.Second
19-
defaultHTTPServerIdleTimeout = 120 * time.Second
18+
// Reserve time for system overhead, i.e. time spent on non-business logic operations.
19+
// Examples:
20+
// - Read HTTP Request body
21+
// - Write HTTP Response
22+
defaultSystemOverheadAllowanceDuration = 10 * time.Second
2023

21-
// HTTP request handler's WriteTimeout.
2224
// https://pkg.go.dev/net/http#Server
23-
defaultHTTPServerWriteTimeout = 20 * time.Second
25+
// HTTP server's default timeout values.
26+
defaultHTTPServerReadTimeout = 60 * time.Second
27+
defaultHTTPServerWriteTimeout = 120 * time.Second
28+
defaultHTTPServerIdleTimeout = 180 * time.Second
2429
)
2530

2631
/* --------------------------------- Router Config Struct -------------------------------- */
2732

2833
// RouterConfig contains server configuration settings.
2934
// See default values above.
3035
type RouterConfig struct {
31-
Port int `yaml:"port"`
32-
MaxRequestHeaderBytes int `yaml:"max_request_header_bytes"`
33-
ReadTimeout time.Duration `yaml:"read_timeout"`
34-
WriteTimeout time.Duration `yaml:"write_timeout"`
35-
IdleTimeout time.Duration `yaml:"idle_timeout"`
36+
Port int `yaml:"port"`
37+
MaxRequestHeaderBytes int `yaml:"max_request_header_bytes"`
38+
ReadTimeout time.Duration `yaml:"read_timeout"`
39+
WriteTimeout time.Duration `yaml:"write_timeout"`
40+
IdleTimeout time.Duration `yaml:"idle_timeout"`
41+
SystemOverheadAllowanceDuration time.Duration `yaml:"system_overhead_allowance_duration"`
3642
}
3743

3844
/* --------------------------------- Router Config Private Helpers -------------------------------- */
@@ -54,4 +60,10 @@ func (c *RouterConfig) hydrateRouterDefaults() {
5460
if c.IdleTimeout == 0 {
5561
c.IdleTimeout = defaultHTTPServerIdleTimeout
5662
}
63+
if c.SystemOverheadAllowanceDuration == 0 {
64+
c.SystemOverheadAllowanceDuration = defaultSystemOverheadAllowanceDuration
65+
}
66+
if c.SystemOverheadAllowanceDuration >= c.ReadTimeout || c.SystemOverheadAllowanceDuration >= c.WriteTimeout {
67+
panic(fmt.Sprintf("system overhead allowance duration %v must be less than read timeout %v and write timeout %v", c.SystemOverheadAllowanceDuration, c.ReadTimeout, c.WriteTimeout))
68+
}
5769
}

config/router_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,12 @@ func TestRouterConfig_hydrateRouterDefaults(t *testing.T) {
5959
name: "should set all defaults",
6060
cfg: RouterConfig{},
6161
want: RouterConfig{
62-
Port: defaultPort,
63-
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
64-
ReadTimeout: defaultHTTPServerReadTimeout,
65-
WriteTimeout: defaultHTTPServerWriteTimeout,
66-
IdleTimeout: defaultHTTPServerIdleTimeout,
62+
Port: defaultPort,
63+
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
64+
ReadTimeout: defaultHTTPServerReadTimeout,
65+
WriteTimeout: defaultHTTPServerWriteTimeout,
66+
IdleTimeout: defaultHTTPServerIdleTimeout,
67+
SystemOverheadAllowanceDuration: defaultSystemOverheadAllowanceDuration,
6768
},
6869
},
6970
{
@@ -72,11 +73,12 @@ func TestRouterConfig_hydrateRouterDefaults(t *testing.T) {
7273
Port: 8080,
7374
},
7475
want: RouterConfig{
75-
Port: 8080,
76-
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
77-
ReadTimeout: defaultHTTPServerReadTimeout,
78-
WriteTimeout: defaultHTTPServerWriteTimeout,
79-
IdleTimeout: defaultHTTPServerIdleTimeout,
76+
Port: 8080,
77+
MaxRequestHeaderBytes: defaultMaxRequestHeaderBytes,
78+
ReadTimeout: defaultHTTPServerReadTimeout,
79+
WriteTimeout: defaultHTTPServerWriteTimeout,
80+
IdleTimeout: defaultHTTPServerIdleTimeout,
81+
SystemOverheadAllowanceDuration: defaultSystemOverheadAllowanceDuration,
8082
},
8183
},
8284
}

data/reporter_http.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"github.com/buildwithgrove/path/observation"
1414
)
1515

16-
// defaultPostTimeoutMS defines the default timeout for HTTP POST operations in milliseconds (10 seconds)
17-
const defaultPostTimeoutMS = 10_000
16+
// defaultDataReporterPostTimeoutMillisec defines the default timeout for HTTP POST operations in milliseconds (10 seconds)
17+
const defaultDataReporterPostTimeoutMillisec = 20_000
1818

1919
// DataReporterHTTP exports observations to an external components over HTTP (e.g. Flentd HTTP Plugin, a Messaging system, or a database)
2020
var _ gateway.RequestResponseReporter = &DataReporterHTTP{}
@@ -63,7 +63,7 @@ func (drh *DataReporterHTTP) sendRecordOverHTTP(serializedDataRecord []byte) err
6363
// Determine the timeout to use
6464
timeoutMS := drh.PostTimeoutMS
6565
if timeoutMS <= 0 {
66-
timeoutMS = defaultPostTimeoutMS // Default timeout
66+
timeoutMS = defaultDataReporterPostTimeoutMillisec // Default timeout
6767
}
6868

6969
// Create an HTTP client with the configured timeout

gateway/gateway.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type Gateway struct {
6060
// Reference: https://en.wikipedia.org/wiki/Template_method_pattern
6161
//
6262
// TODO_FUTURE: Refactor when adding other protocols (e.g. gRPC):
63-
// - Extract generic processing into common method
63+
// - Extract generic processing into common method
6464
// - Keep HTTP-specific details separate
6565
func (g Gateway) HandleServiceRequest(
6666
ctx context.Context,

gateway/request_context.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@ const (
3232
// - If all endpoints are sanctioned, send parallel requests by default
3333
// - Make this configurable at the gateway level yaml config
3434
// - Enable parallel requests for gateways that maintain their own backend nodes as a special config
35-
maxParallelRequests = 1
36-
parallelRequestTimeout = 30 * time.Second
35+
maxParallelRequests = 1
36+
37+
// RelayRequestTimeout is the timeout for relay requests
38+
// TODO_TECHDEBT: Look into whether we can remove this variable altogether and consolidate
39+
// it with HTTP level timeouts.
40+
RelayRequestTimeout = 60 * time.Second
3741
)
3842

3943
// requestContext is responsible for performing the steps necessary to complete a service request.

gateway/request_context_handle_request.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"sync"
78
"time"
89

910
"github.com/pokt-network/poktroll/pkg/polylog"
@@ -89,7 +90,8 @@ func (rc *requestContext) handleParallelRelayRequests() error {
8990
With("service_id", rc.serviceID)
9091
logger.Debug().Msg("Starting parallel relay race")
9192

92-
ctx, cancel := context.WithTimeout(rc.context, parallelRequestTimeout)
93+
// TODO_TECHDEBT: Make sure timed out parallel requests are also sanctioned.
94+
ctx, cancel := context.WithTimeout(rc.context, RelayRequestTimeout)
9395
defer cancel()
9496

9597
resultChan := rc.launchParallelRequests(ctx, logger)
@@ -112,33 +114,46 @@ func (rc *requestContext) updateParallelRequestMetrics(metrics *parallelRequestM
112114
func (rc *requestContext) launchParallelRequests(ctx context.Context, logger polylog.Logger) <-chan parallelRelayResult {
113115
resultChan := make(chan parallelRelayResult, len(rc.protocolContexts))
114116

117+
// Ensures thread-safety of QoS context operations.
118+
qosContextMutex := sync.Mutex{}
119+
115120
for protocolCtxIdx, protocolCtx := range rc.protocolContexts {
116-
go rc.executeRelayRequest(ctx, logger, protocolCtx, protocolCtxIdx, resultChan)
121+
go rc.executeOneOfParallelRequests(ctx, logger, protocolCtx, protocolCtxIdx, resultChan, &qosContextMutex)
117122
}
118123

119124
return resultChan
120125
}
121126

122-
// executeRelayRequest handles a single relay request in a goroutine
123-
func (rc *requestContext) executeRelayRequest(
127+
// executeOneOfParallelRequests handles a single relay request in a goroutine
128+
func (rc *requestContext) executeOneOfParallelRequests(
124129
ctx context.Context,
125130
logger polylog.Logger,
126131
protocolCtx ProtocolRequestContext,
127132
index int,
128133
resultChan chan<- parallelRelayResult,
134+
qosContextMutex *sync.Mutex,
129135
) {
130136
startTime := time.Now()
131-
response, err := protocolCtx.HandleServiceRequest(rc.qosCtx.GetServicePayload())
137+
endpointResponse, err := protocolCtx.HandleServiceRequest(rc.qosCtx.GetServicePayload())
132138
duration := time.Since(startTime)
133139

134140
result := parallelRelayResult{
135-
response: response,
141+
response: endpointResponse,
136142
err: err,
137143
index: index,
138144
duration: duration,
139145
startTime: startTime,
140146
}
141147

148+
if err != nil {
149+
// TODO_TECHDEBT(@adshmh): refactor the parallel requests feature:
150+
// 1. Ensure parallel requests are handled correctly by the QoS layer: e.g. cannot use the most recent response as best anymore.
151+
// 2. Simplify the parallel requests feature: it may be best to fully encapsulate it in the protocol/shannon package.
152+
qosContextMutex.Lock()
153+
rc.qosCtx.UpdateWithResponse(endpointResponse.EndpointAddr, endpointResponse.Bytes)
154+
qosContextMutex.Unlock()
155+
}
156+
142157
select {
143158
case resultChan <- result:
144159
// Result sent successfully

observation/protocol/shannon.pb.go

Lines changed: 20 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/path/protocol/shannon.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,21 @@ enum ShannonEndpointErrorType {
100100
// New error types for better classification (added without renumbering existing ones)
101101
// Error was recognized but specific classification failed.
102102
// Only general category: HTTP, Raw Payload, General, is known.
103+
104+
// ** SHANNON_ENDPOINT_ERROR_HTTP_UNKNOWN **
105+
// PATH -> HTTP Request -> Endpoint
106+
// HTTP Response is NOT received
107+
// We cannot move forward with PATH business logic
108+
// There is nothing to go to.
103109
SHANNON_ENDPOINT_ERROR_HTTP_UNKNOWN = 34;
110+
104111
SHANNON_ENDPOINT_ERROR_RAW_PAYLOAD_UNKNOWN = 35;
105112
SHANNON_ENDPOINT_ERROR_UNKNOWN = 36;
113+
114+
// The relay request sent to the endpoint via HTTP returned a non 2XX HTTP status code.
115+
SHANNON_ENDPOINT_ERROR_HTTP_NON_2XX_STATUS = 37;
116+
// The relay request sent to the endpoint via HTTP failed with `context deadline exceeded` error
117+
SHANNON_ENDPOINT_ERROR_HTTP_CONTEXT_DEADLINE_EXCEEDED = 38;
106118
}
107119

108120
// ShannonSanctionType specifies the duration type for endpoint sanctions

protocol/payload.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
// TODO_DOCUMENT(@adshmh): Add more examples (e.g. for RESTful services)
99
// TODO_IMPROVE(@adshmh): Use an interface here that returns the serialized form of the request.
1010
type Payload struct {
11-
Data string
12-
Method string
13-
Path string
14-
Headers map[string]string
15-
TimeoutMillisec int
11+
Data string
12+
Method string
13+
Path string
14+
Headers map[string]string
1615
// RPCType indicates the type of RPC protocol for routing to appropriate backend ports:
1716
// - RPCType_REST: Cosmos SDK REST API (typically port 1317)
1817
// - RPCType_COMET_BFT: CometBFT RPC (typically port 26657)
@@ -27,11 +26,10 @@ func EmptyErrorPayload() Payload {
2726
return Payload{
2827
// This is an intentional placeholder to distinguish errors in retrieving payloads
2928
// from explicit empty payloads set by PATH.
30-
Data: "PATH_EmptyErrorPayload",
31-
Method: "",
32-
Path: "",
33-
Headers: map[string]string{},
34-
TimeoutMillisec: 0,
35-
RPCType: sharedtypes.RPCType_UNKNOWN_RPC,
29+
Data: "PATH_EmptyErrorPayload",
30+
Method: "",
31+
Path: "",
32+
Headers: map[string]string{},
33+
RPCType: sharedtypes.RPCType_UNKNOWN_RPC,
3634
}
3735
}

0 commit comments

Comments
 (0)