Skip to content

Commit d191e0d

Browse files
authored
[Batch Requests] Introduce handling for batch requests for EVM services (#388)
## 🌿 Summary Introduce support for handling batch requests in EVM services, including parallel processing and proper response correlation. ### 🌱 Primary Changes: - Implement batch request handling in EVM services with proper JSON-RPC 2.0 spec compliance - Add parallel processing for batch requests to improve performance - Update data pipeline to handle multiple records for batch requests ### 🍃 Secondary changes: - Refactor protocol interfaces to support batch payloads - Add new metrics field to track batch requests - Improve error handling and logging for batch scenarios ## 💡 New TODOs New TODOs introduced in this PR: - TODO_TECHDEBT(@commoddity, @adshmh): add special handling for Cosmos SDK observations. - data/legacy_qos.go ## 🛠️ Type of change Select one or more from the following: - [x] New feature, functionality or library - [ ] Bug fix - [ ] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## 🤯 Sanity Checklist - [ ] I have updated the GitHub Issue 'assignees', 'reviewers', 'labels', 'project', 'iteration' and 'milestone' - [ ] For docs, I have run 'make docusaurus_start' - [x] For code, I have run 'make test_all' - [ ] For configurations, I have update the documentation - [ ] I added TODOs where applicable
1 parent 42e8c96 commit d191e0d

35 files changed

+2529
-1045
lines changed

data/legacy.go

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,49 +39,56 @@ type legacyRecord struct {
3939
endpointTripTime float64 // endpoint response timestamp - endpoint query time, in seconds.
4040
}
4141

42-
// converts a data record to legacy format, for compatibility with the existing data pipeline.
43-
func buildLegacyDataRecord(
42+
// converts data records to legacy format, for compatibility with the existing data pipeline.
43+
// Returns multiple legacy records for scenarios like EVM batch requests where each method
44+
// needs its own record.
45+
func buildLegacyDataRecords(
4446
logger polylog.Logger,
4547
observations *observation.RequestResponseObservations,
46-
) *legacyRecord {
47-
// initialize the legacy-compatible data record.
48-
legacyRecord := &legacyRecord{}
48+
) []*legacyRecord {
49+
// initialize the base legacy-compatible data record.
50+
baseLegacyRecord := &legacyRecord{}
4951

5052
// Update the legacy data record from Gateway observations.
5153
if gatewayObservations := observations.GetGateway(); gatewayObservations != nil {
52-
legacyRecord = setLegacyFieldsFromGatewayObservations(logger, legacyRecord, gatewayObservations)
54+
baseLegacyRecord = setLegacyFieldsFromGatewayObservations(logger, baseLegacyRecord, gatewayObservations)
5355
}
5456

5557
// TODO_MVP(@adshmh): Set legacy fields from Shannon observations.
5658
//
5759
// Extract protocol observations
5860
protocolObservations := observations.GetProtocol()
5961

60-
// Update the data record from Shannonprotocol data
62+
// Update the data record from Shannon protocol data
6163
if shannonObservations := protocolObservations.GetShannon(); shannonObservations != nil {
62-
legacyRecord = setLegacyFieldsFromShannonProtocolObservations(logger, legacyRecord, shannonObservations)
64+
baseLegacyRecord = setLegacyFieldsFromShannonProtocolObservations(logger, baseLegacyRecord, shannonObservations)
6365
}
6466

65-
// Update the legacy data record from QoS observations.
67+
// Update the legacy data records from QoS observations.
68+
// This may return multiple records for EVM batch requests.
69+
var legacyRecords []*legacyRecord
6670
if qosObservations := observations.GetQos(); qosObservations != nil {
67-
legacyRecord = setLegacyFieldsFromQoSObservations(logger, legacyRecord, qosObservations)
71+
legacyRecords = setLegacyFieldsFromQoSObservations(logger, baseLegacyRecord, qosObservations)
72+
} else {
73+
legacyRecords = []*legacyRecord{baseLegacyRecord}
6874
}
6975

70-
// Set constant/calculated/inferred fields' values.
71-
//
72-
if legacyRecord.ErrorType != "" {
73-
// Redundant value, set to comply with the legacy data pipeline.
74-
legacyRecord.IsError = true
75-
}
76+
// Set constant/calculated/inferred fields' values for all records.
77+
for _, legacyRecord := range legacyRecords {
78+
if legacyRecord.ErrorType != "" {
79+
// Redundant value, set to comply with the legacy data pipeline.
80+
legacyRecord.IsError = true
81+
}
7682

77-
// Hardcoded to "PATH" to inform the legacy data pipeline.
78-
legacyRecord.ErrorSource = "PATH"
83+
// Hardcoded to "PATH" to inform the legacy data pipeline.
84+
legacyRecord.ErrorSource = "PATH"
7985

80-
// Time spent waiting for the endpoint's response, in seconds.
81-
legacyRecord.NodeTripTime = legacyRecord.endpointTripTime
86+
// Time spent waiting for the endpoint's response, in seconds.
87+
legacyRecord.NodeTripTime = legacyRecord.endpointTripTime
8288

83-
// Total request processing time - time spent waiting for the endpoint, measured in seconds.
84-
legacyRecord.PortalTripTime = legacyRecord.RequestRoundTripTime - legacyRecord.endpointTripTime
89+
// Total request processing time - time spent waiting for the endpoint, measured in seconds.
90+
legacyRecord.PortalTripTime = legacyRecord.RequestRoundTripTime - legacyRecord.endpointTripTime
91+
}
8592

86-
return legacyRecord
93+
return legacyRecords
8794
}

data/legacy_qos.go

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,73 +12,110 @@ import (
1212
qosobservation "github.com/buildwithgrove/path/observation/qos"
1313
)
1414

15-
// setLegacyFieldsFromQoSObservations populates legacy record with QoS observation data.
15+
// setLegacyFieldsFromQoSObservations populates legacy records with QoS observation data.
1616
// Currently supports:
17-
// - EVM observations
18-
// Future support planned for Solana and CometBFT
17+
// - EVM observations (returns multiple records based on RequestObservations)
18+
// - Solana observations (returns single record)
19+
// Future support planned for Cosmos SDK
1920
//
2021
// Parameters:
2122
// - logger: logging interface
22-
// - legacyRecord: the record to populate
23+
// - baseLegacyRecord: the base record to populate
2324
// - observations: QoS observations data
24-
// Returns: the populated legacy record
25+
//
26+
// Returns: slice of populated legacy records
2527
func setLegacyFieldsFromQoSObservations(
2628
logger polylog.Logger,
27-
legacyRecord *legacyRecord,
29+
baseLegacyRecord *legacyRecord,
2830
observations *qosobservation.Observations,
29-
) *legacyRecord {
31+
) []*legacyRecord {
32+
// EVM observations may contains multiple records in the case of batch requests.
3033
if evmObservations := observations.GetEvm(); evmObservations != nil {
31-
return setLegacyFieldsFromQoSEVMObservations(logger, legacyRecord, evmObservations)
34+
return setLegacyFieldsFromQoSEVMObservations(logger, baseLegacyRecord, evmObservations)
3235
}
3336

3437
// Use Solana observations to update the legacy record's fields.
3538
if solanaObservations := observations.GetSolana(); solanaObservations != nil {
36-
return setLegacyFieldsFromQoSSolanaObservations(logger, legacyRecord, solanaObservations)
39+
populatedRecord := setLegacyFieldsFromQoSSolanaObservations(logger, baseLegacyRecord, solanaObservations)
40+
// Solana does not support batch requests so expect a single record.
41+
return []*legacyRecord{populatedRecord}
3742
}
3843

39-
return legacyRecord
44+
// For all other services, expect a single record.
45+
// TODO_TECHDEBT(@commoddity, @adshmh): add special handling for Cosmos SDK observations.
46+
return []*legacyRecord{baseLegacyRecord}
4047
}
4148

4249
// qosEVMErrorTypeStr defines the prefix for EVM QoS error types in legacy records
4350
const qosEVMErrorTypeStr = "QOS_EVM"
4451

45-
// setLegacyFieldsFromQoSEVMObservations populates legacy record with EVM-specific QoS data.
52+
// setLegacyFieldsFromQoSEVMObservations populates legacy records with EVM-specific QoS data.
4653
// It captures:
4754
// - Request payload size
4855
// - JSONRPC method information
4956
// - Error details (when applicable)
57+
// Creates one legacy record per RequestObservation
5058
//
5159
// Parameters:
5260
// - logger: logging interface
53-
// - legacyRecord: the record to populate
61+
// - baseLegacyRecord: the base record to copy for each method
5462
// - observations: EVM-specific QoS observations
55-
// Returns: the populated legacy record
63+
//
64+
// Returns: slice of populated legacy records
65+
// EVM batch requests are supported as of PR #388.
5666
func setLegacyFieldsFromQoSEVMObservations(
57-
logger polylog.Logger,
58-
legacyRecord *legacyRecord,
67+
_ polylog.Logger,
68+
baseLegacyRecord *legacyRecord,
5969
observations *qosobservation.EVMRequestObservations,
60-
) *legacyRecord {
61-
// In bytes: the length of the request: float64 type is for compatibility with the legacy data pipeline.
62-
legacyRecord.RequestDataSize = float64(observations.RequestPayloadLength)
70+
) []*legacyRecord {
71+
// Set common fields from observations
72+
baseLegacyRecord.RequestDataSize = float64(observations.RequestPayloadLength)
6373

6474
evmInterpreter := &qosobservation.EVMObservationInterpreter{
6575
Observations: observations,
6676
}
6777

68-
// Extract the JSONRPC request's method.
69-
jsonrpcRequestMethod, _ := evmInterpreter.GetRequestMethod()
70-
legacyRecord.ChainMethod = jsonrpcRequestMethod
78+
// Extract all JSONRPC request methods
79+
jsonrpcRequestMethods, ok := evmInterpreter.GetRequestMethods()
80+
if !ok || len(jsonrpcRequestMethods) == 0 {
81+
// If no methods found, return single record with base data
82+
populateEVMErrorFields(baseLegacyRecord, evmInterpreter)
83+
return []*legacyRecord{baseLegacyRecord}
84+
}
85+
86+
// Create a separate legacy record for each method
87+
// - In the case of EVM batch requests, this will create multiple records.
88+
// - Non-EVM batch requests will create a single record.
89+
var legacyRecords []*legacyRecord
90+
for _, method := range jsonrpcRequestMethods {
91+
// Create a copy of the base record
92+
recordCopy := *baseLegacyRecord
93+
legacyRecord := &recordCopy
94+
95+
// Set the method for this record
96+
legacyRecord.ChainMethod = method
97+
98+
// Populate error fields if needed
99+
populateEVMErrorFields(legacyRecord, evmInterpreter)
100+
101+
legacyRecords = append(legacyRecords, legacyRecord)
102+
}
71103

104+
return legacyRecords
105+
}
106+
107+
// populateEVMErrorFields sets error-related fields in the legacy record based on QoS observations
108+
func populateEVMErrorFields(legacyRecord *legacyRecord, evmInterpreter *qosobservation.EVMObservationInterpreter) {
72109
// ErrorType is already set at gateway or protocol level.
73110
// Skip updating the error fields to preserve the original error.
74111
if legacyRecord.ErrorType != "" {
75-
return legacyRecord
112+
return
76113
}
77114

78115
_, requestErr, err := evmInterpreter.GetRequestStatus()
79116
// Could not extract request error details, skip the rest of the updates.
80117
if err != nil || requestErr == nil {
81-
return legacyRecord
118+
return
82119
}
83120

84121
legacyRecord.ErrorMessage = requestErr.String()
@@ -91,8 +128,6 @@ func setLegacyFieldsFromQoSEVMObservations(
91128
default:
92129
legacyRecord.ErrorType = fmt.Sprintf("%s_UNKNOWN_ERROR", qosEVMErrorTypeStr)
93130
}
94-
95-
return legacyRecord
96131
}
97132

98133
// setLegacyFieldsFromQoSSolanaObservations populates legacy record with Solana-specific QoS data.

data/reporter_http.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// defaultDataReporterPostTimeoutMillisec defines the default timeout for HTTP POST operations in milliseconds (10 seconds)
1717
const defaultDataReporterPostTimeoutMillisec = 20_000
1818

19-
// DataReporterHTTP exports observations to an external components over HTTP (e.g. Flentd HTTP Plugin, a Messaging system, or a database)
19+
// DataReporterHTTP exports observations to an external components over HTTP (e.g. Fluentd HTTP Plugin, a Messaging system, or a database)
2020
var _ gateway.RequestResponseReporter = &DataReporterHTTP{}
2121

2222
// DataReporterHTTP sends the observation for each handled request to an HTTP endpoint.
@@ -37,25 +37,34 @@ type DataReporterHTTP struct {
3737
}
3838

3939
// Publish the supplied observations:
40-
// - Build the expected data record.
41-
// - Send to the configured URL.
40+
// - Build the expected data records.
41+
// - Send each record to the configured URL.
4242
func (drh *DataReporterHTTP) Publish(observations *observation.RequestResponseObservations) {
4343
logger := drh.hydrateLogger(observations)
4444

4545
// TODO_MVP(@adshmh): Replace this with the new DataRecord struct once the data pipeline is updated.
46-
// convert to legacy-formatted data record
47-
legacyDataRecord := buildLegacyDataRecord(logger, observations)
48-
49-
// Marshal the data record.
50-
serializedRecord, err := json.Marshal(legacyDataRecord)
51-
if err != nil {
52-
logger.Warn().Err(err).Msg("Failed to serialize the data record. Skip reporting.")
53-
return
54-
}
55-
56-
// Send the marshaled data record to the data processor.
57-
if err := drh.sendRecordOverHTTP(serializedRecord); err != nil {
58-
logger.Warn().Err(err).Msg("Failed to send the data record over HTTP. Skip reporting.")
46+
// convert to legacy-formatted data records (may be multiple for EVM batch requests)
47+
legacyDataRecords := buildLegacyDataRecords(logger, observations)
48+
49+
// Process each legacy data record as a single relay for data pipeline and metering purposes.
50+
//
51+
// If the observations are for an EVM batch request, legacyDataRecords will contain multiple records.
52+
// As of PR #388 all other QoS observations are expected to be single records.
53+
// Reference: https://github.com/buildwithgrove/path/pull/388
54+
for i, legacyDataRecord := range legacyDataRecords {
55+
recordLogger := logger.With("record_index", i, "total_records", len(legacyDataRecords))
56+
57+
// Marshal the data record.
58+
serializedRecord, err := json.Marshal(legacyDataRecord)
59+
if err != nil {
60+
recordLogger.Warn().Err(err).Msg("Failed to serialize the data record. Skip reporting this record.")
61+
continue
62+
}
63+
64+
// Send the marshaled data record to the data processor.
65+
if err := drh.sendRecordOverHTTP(serializedRecord); err != nil {
66+
recordLogger.Warn().Err(err).Msg("Failed to send the data record over HTTP. Skip reporting this record.")
67+
}
5968
}
6069
}
6170

docusaurus/package.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
"write-heading-ids": "docusaurus write-heading-ids"
1515
},
1616
"dependencies": {
17-
"@docusaurus/core": "^3.8.0",
18-
"@docusaurus/preset-classic": "^3.8.0",
19-
"@docusaurus/theme-mermaid": "^3.8.0",
17+
"@docusaurus/core": "^3.8.1",
18+
"@docusaurus/preset-classic": "^3.8.1",
19+
"@docusaurus/theme-mermaid": "^3.8.1",
2020
"@easyops-cn/docusaurus-search-local": "^0.46.1",
2121
"@mdx-js/react": "^3.0.0",
2222
"clsx": "^2.0.0",
@@ -31,8 +31,8 @@
3131
"remark-mermaid-plugin": "^1.0.2"
3232
},
3333
"devDependencies": {
34-
"@docusaurus/module-type-aliases": "^3.8.0",
35-
"@docusaurus/types": "^3.7.0"
34+
"@docusaurus/module-type-aliases": "^3.8.1",
35+
"@docusaurus/types": "^3.8.1"
3636
},
3737
"browserslist": {
3838
"production": [

0 commit comments

Comments
 (0)