Skip to content

Commit 3240b61

Browse files
authored
Feat: Solana QoS supports batch JSONRPC v0 (#445)
## Summary Solana QoS supports batch JSONRPC v0 ## Issue Support batch JSONRPC requests on Solana - Issue or PR: #{ISSUE_OR_PR_NUMBER} ## Type of change Select one or more from the following: - [x] New feature, functionality or library - [ ] Bug fix - [ ] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## QoS Checklist ### E2E Validation & Tests - [ ] `make path_up` - [ ] `make test_e2e_evm_shannon` ### Observability - [ ] 1. `make path_up` - [ ] 2. Run the following E2E test: `make test_request__shannon_relay_util_100` - [ ] 3. View results in LocalNet's [PATH Relay Grafana Dashboard](http://localhost:3003/d/relays/path-service-requests) ## Sanity Checklist - [x] I have updated the GitHub Issue `assignees`, `reviewers`, `labels`, `project`, `iteration` and `milestone` - [ ] For docs, I have run `make docusaurus_start` - [ ] For code, I have run `make test_all` - [ ] For configurations, I have updated the documentation - [ ] I added `TODO`s where applicable
1 parent e8118ec commit 3240b61

File tree

4 files changed

+278
-48
lines changed

4 files changed

+278
-48
lines changed

protocol/shannon/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ func (rc *requestContext) sendSingleRelay(payload protocol.Payload) (protocol.Re
147147
return relayResponse, err
148148
}
149149

150+
// TODO_TECHDEBT(@adshmh): Set and enforce a cap on the number of concurrent parallel requests for a single method call.
151+
//
150152
// TODO_TECHDEBT(@adshmh): Single and Multiple payloads should be handled as similarly as possible:
151153
// - This includes using similar execution paths.
152154
//

qos/jsonrpc/request_batch.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package jsonrpc
2+
3+
import (
4+
"encoding/json"
5+
)
6+
7+
type BatchRequest struct {
8+
Requests []Request
9+
}
10+
11+
// TODO_UPNEXT(@adshmh): Validate ID values, e.g. for duplicate values, when unmarshaling.
12+
//
13+
// GetRequestPayloads returns the slice of serialized forms of JSONRPC requests.
14+
func (br *BatchRequest) GetRequestsPayloads() [][]byte {
15+
requestPayloads := make([][]byte, len(br.Requests))
16+
for i, req := range br.Requests {
17+
// TODO_TECHDEBT(@adshmh): Log an entry if there is an error marshaling.
18+
// A marshaling error here should never happen here.
19+
payload, _ := json.Marshal(req)
20+
requestPayloads[i] = payload
21+
}
22+
23+
return requestPayloads
24+
}
25+
26+
// Custom unmarshaller to support requests of the format `[{"jsonrpc":"2.0","id":1},{"jsonrpc":"2.0","id":2}]`
27+
func (br *BatchRequest) UnmarshalJSON(data []byte) error {
28+
var requests []Request
29+
if err := json.Unmarshal(data, &requests); err != nil {
30+
return err
31+
}
32+
33+
br.Requests = requests
34+
return nil
35+
}
36+
37+
// TODO_UPNEXT(@adshmh): Validate responses in the batch
38+
//
39+
// BuildResponseBytes constructs a Batch JSONRPC response from the slice of response payloads.
40+
func (br *BatchRequest) BuildResponseBytes(jsonrpcResponses []Response) []byte {
41+
// TODO_TECHDEBT(@adshmh): Refactor so marshaling a Response never fails.
42+
responseBz, _ := json.Marshal(jsonrpcResponses)
43+
44+
return responseBz
45+
}

qos/solana/context_batch.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package solana
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"net/http"
7+
8+
"github.com/pokt-network/poktroll/pkg/polylog"
9+
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
10+
11+
"github.com/buildwithgrove/path/gateway"
12+
pathhttp "github.com/buildwithgrove/path/network/http"
13+
qosobservations "github.com/buildwithgrove/path/observation/qos"
14+
"github.com/buildwithgrove/path/protocol"
15+
"github.com/buildwithgrove/path/qos"
16+
"github.com/buildwithgrove/path/qos/jsonrpc"
17+
)
18+
19+
// batchJSONRPCRequestContext provides the support required by the gateway
20+
// package for handling service requests.
21+
var _ gateway.RequestQoSContext = &batchJSONRPCRequestContext{}
22+
23+
type endpointJSONRPCResponse struct {
24+
protocol.EndpointAddr
25+
jsonrpc.Response
26+
}
27+
28+
// batchJSONRPCRequestContext provides the functionality required
29+
// to support QoS for a Solana blockchain service.
30+
type batchJSONRPCRequestContext struct {
31+
logger polylog.Logger
32+
33+
// chainID is the chain identifier for the Solana QoS implementation.
34+
chainID string
35+
36+
// service_id is the identifier for the Solana QoS implementation.
37+
// It is the "alias" or human readable interpretation of the chain_id.
38+
// Used in generating observations.
39+
serviceID protocol.ServiceID
40+
41+
// The length of the request payload in bytes.
42+
requestPayloadLength uint
43+
44+
endpointStore *EndpointStore
45+
46+
JSONRPCBatchRequest jsonrpc.BatchRequest
47+
48+
// The origin of the request handled by the context.
49+
// Either:
50+
// - User: user requests
51+
// - QoS: requests built by the QoS service to get additional data points on endpoints.
52+
requestOrigin qosobservations.RequestOrigin
53+
54+
// endpointResponses is the set of responses received from one or
55+
// more endpoints as part of handling this service request.
56+
endpointJSONRPCResponses []endpointJSONRPCResponse
57+
}
58+
59+
// TODO_NEXT(@commoddity): handle batch requests for Solana
60+
// TODO_MVP(@adshmh): Ensure the JSONRPC request struct
61+
// can handle all valid service requests.
62+
func (brc batchJSONRPCRequestContext) GetServicePayloads() []protocol.Payload {
63+
protocolPayloads := make([]protocol.Payload, len(brc.JSONRPCBatchRequest.Requests))
64+
65+
for i, jsonrpcRequestPayload := range brc.JSONRPCBatchRequest.GetRequestsPayloads() {
66+
// TODO_TECHDEBT(@adshmh): Set method-specific timeouts on protocol payload entry.
67+
protocolPayloads[i] = protocol.Payload{
68+
Data: string(jsonrpcRequestPayload),
69+
Method: http.MethodPost, // Method is alway POST for Solana.
70+
Path: "", // Path field is not used for Solana.
71+
RPCType: sharedtypes.RPCType_JSON_RPC,
72+
}
73+
}
74+
75+
return protocolPayloads
76+
}
77+
78+
// TODO_TECHDEBT(@adshmh): Refactor once the QoS context interface is updated to receive an array of responses.
79+
// UpdateWithResponse is NOT safe for concurrent use
80+
func (brc *batchJSONRPCRequestContext) UpdateWithResponse(endpointAddr protocol.EndpointAddr, responseBz []byte) {
81+
// TODO_TECHDEBT(@adshmh): Refactor this once the QoS context interface is updated to accept all endpoint responses at once.
82+
// This would make it possible to map each JSONRPC request of a batch to its corresponding endpoint response.
83+
// This is required to enable request method-specific esponse validation: e.g. format of result field in response to a `getHealth` request.
84+
//
85+
// Parse and track the endpoint payload as a JSONRPC response.
86+
var jsonrpcResponse jsonrpc.Response
87+
if err := json.Unmarshal(responseBz, &jsonrpcResponse); err != nil {
88+
// TODO_UPNEXT(@adshmh): Include a preview of malformed payload in the response.
89+
//
90+
// Parsing failed, store a generic error JSONRPC response
91+
jsonrpcResponse = jsonrpc.GetErrorResponse(jsonrpc.ID{}, errCodeUnmarshaling, errMsgUnmarshaling, nil)
92+
}
93+
94+
// Store the response: will be processed later by the JSONRPC batch request struct.
95+
brc.endpointJSONRPCResponses = append(brc.endpointJSONRPCResponses, endpointJSONRPCResponse{
96+
EndpointAddr: endpointAddr,
97+
Response: jsonrpcResponse,
98+
})
99+
}
100+
101+
// TODO_MVP(@adshmh): add `Content-Type: application/json` header.
102+
// GetHTTPResponse builds the HTTP response that should be returned for
103+
// a Solana blockchain service request.
104+
func (brc batchJSONRPCRequestContext) GetHTTPResponse() pathhttp.HTTPResponse {
105+
// TODO_UPNEXT(@adshmh): Return an error response matching the batch of JSONRPC requests.
106+
//
107+
// No responses received: this is an internal error:
108+
// e.g. protocol-level errors like endpoint timing out.
109+
if len(brc.endpointJSONRPCResponses) == 0 {
110+
// Build the JSONRPC response indicating a protocol-level error.
111+
jsonrpcErrorResponse := jsonrpc.NewErrResponseInternalErr(jsonrpc.ID{}, errors.New("protocol-level error: no endpoint responses received"))
112+
return qos.BuildHTTPResponseFromJSONRPCResponse(brc.logger, jsonrpcErrorResponse)
113+
}
114+
115+
// assemble the array of JSONRPC responses
116+
jsonrpcResponses := make([]jsonrpc.Response, len(brc.endpointJSONRPCResponses))
117+
for i, jsonrpcResponse := range brc.endpointJSONRPCResponses {
118+
jsonrpcResponses[i] = jsonrpcResponse.Response
119+
}
120+
121+
// Use the Batch JSONRPC request to assemble the JSONRPC batch response.
122+
batchResponseBz := brc.JSONRPCBatchRequest.BuildResponseBytes(jsonrpcResponses)
123+
124+
// TODO_UPNEXT(@adshmh): Adjust HTTP status code according to responses in the batch.
125+
return jsonrpc.HTTPResponse{
126+
ResponsePayload: batchResponseBz,
127+
// According to the JSON-RPC 2.0 specification, even if individual responses
128+
// in a batch contain errors, the entire batch should still return HTTP 200 OK.
129+
HTTPStatusCode: http.StatusOK,
130+
}
131+
}
132+
133+
// GetObservations returns all the observations contained in the request context.
134+
// Implements the gateway.RequestQoSContext interface.
135+
func (rc batchJSONRPCRequestContext) GetObservations() qosobservations.Observations {
136+
// Set the observation fields common for all requests: successful or failed.
137+
observations := &qosobservations.SolanaRequestObservations{
138+
ChainId: rc.chainID,
139+
ServiceId: string(rc.serviceID),
140+
RequestPayloadLength: uint32(rc.requestPayloadLength),
141+
RequestOrigin: rc.requestOrigin,
142+
// TODO_UPNEXT(@adshmh): Add a Batch JSONRPC request observation.
143+
}
144+
145+
// No endpoint responses received.
146+
// Set request error.
147+
if len(rc.endpointJSONRPCResponses) == 0 {
148+
observations.RequestError = qos.GetRequestErrorForProtocolError()
149+
150+
return qosobservations.Observations{
151+
ServiceObservations: &qosobservations.Observations_Solana{
152+
Solana: observations,
153+
},
154+
}
155+
}
156+
157+
// TODO_UPNEXT(@adshmh): Report batch JSONRPC requests endpoint observations via metrics.
158+
//
159+
return qosobservations.Observations{
160+
ServiceObservations: &qosobservations.Observations_Solana{
161+
Solana: observations,
162+
},
163+
}
164+
}
165+
166+
// GetEndpointSelector is required to satisfy the gateway package's RequestQoSContext interface.
167+
// The request context is queried for the correct endpoint selector.
168+
// This allows different endpoint selectors based on the request's context.
169+
// e.g. the request context for a particular request method can potentially rank endpoints based on their latency when responding to requests with matching method.
170+
func (rc *batchJSONRPCRequestContext) GetEndpointSelector() protocol.EndpointSelector {
171+
return rc
172+
}
173+
174+
// TODO_TECHDEBT(@adshmh): Enhance endpoint selection to consider endpoint quality specific to batch requests.
175+
//
176+
// Select chooses an endpoint from the list of supplied endpoints.
177+
// It uses the perceived state of the Solana chain using other endpoints' responses.
178+
// It is required to satisfy the protocol package's EndpointSelector interface.
179+
func (rc *batchJSONRPCRequestContext) Select(allEndpoints protocol.EndpointAddrList) (protocol.EndpointAddr, error) {
180+
return rc.endpointStore.Select(allEndpoints)
181+
}
182+
183+
// SelectMultiple chooses multiple endpoints from the list of supplied endpoints.
184+
// It uses the perceived state of the Solana chain using other endpoints' responses.
185+
// It is required to satisfy the protocol package's EndpointSelector interface.
186+
func (rc *batchJSONRPCRequestContext) SelectMultiple(allEndpoints protocol.EndpointAddrList, numEndpoints uint) (protocol.EndpointAddrList, error) {
187+
return rc.endpointStore.SelectMultiple(allEndpoints, numEndpoints)
188+
}

qos/solana/request_validator.go

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type requestValidator struct {
3333
endpointStore *EndpointStore
3434
}
3535

36+
// TODO_TECHDEBT(@adshmh): Add a JSON-RPC request validator to reject invalid/unsupported method calls early.
37+
//
3638
// validateHTTPRequest:
3739
// - Validates an HTTP request for a Solana JSONRPC payload
3840
// - Extracts and validates the JSONRPC request from the HTTP body
@@ -51,24 +53,50 @@ func (rv *requestValidator) validateHTTPRequest(req *http.Request) (gateway.Requ
5153
return rv.createHTTPBodyReadFailureContext(err), false
5254
}
5355

56+
// TODO_TECHDEBT(@adshmh): Distinguish malformed single and batch requests.
57+
// This is needed to provide a JSONRPC-compliant error response to user if e.g. a batch request is malformed.
58+
//
5459
// Parse and validate the JSONRPC request
55-
jsonrpcReq, err := parseJSONRPCFromRequestBody(logger, body)
56-
if err != nil {
57-
return rv.createRequestUnmarshalingFailureContext(jsonrpcReq.ID, err), false
60+
// 1. Attempt to parse as a batch of requests
61+
// Ref: https://www.jsonrpc.org/specification#batch
62+
//
63+
var jsonrpcBatchRequest jsonrpc.BatchRequest
64+
if err := json.Unmarshal(body, &jsonrpcBatchRequest); err == nil {
65+
return &batchJSONRPCRequestContext{
66+
logger: rv.logger,
67+
chainID: rv.chainID,
68+
serviceID: rv.serviceID,
69+
requestPayloadLength: uint(len(body)),
70+
JSONRPCBatchRequest: jsonrpcBatchRequest,
71+
// Set the origin of the request as USER (i.e. organic relay)
72+
// The request is from a user.
73+
requestOrigin: qosobservations.RequestOrigin_REQUEST_ORIGIN_ORGANIC,
74+
endpointStore: rv.endpointStore,
75+
}, true
76+
}
77+
78+
// 2. Attempt to parse as a single JSONRPC request
79+
var jsonrpcRequest jsonrpc.Request
80+
if err := json.Unmarshal(body, &jsonrpcRequest); err == nil {
81+
// single JSONRPC request is valid, return a fully initialized requestContext
82+
return &requestContext{
83+
logger: rv.logger,
84+
chainID: rv.chainID,
85+
serviceID: rv.serviceID,
86+
requestPayloadLength: uint(len(body)),
87+
JSONRPCReq: jsonrpcRequest,
88+
// Set the origin of the request as USER (i.e. organic relay)
89+
// The request is from a user.
90+
requestOrigin: qosobservations.RequestOrigin_REQUEST_ORIGIN_ORGANIC,
91+
endpointStore: rv.endpointStore,
92+
}, true
5893
}
5994

60-
// Request is valid, return a fully initialized requestContext
61-
return &requestContext{
62-
logger: rv.logger,
63-
chainID: rv.chainID,
64-
serviceID: rv.serviceID,
65-
requestPayloadLength: uint(len(body)),
66-
JSONRPCReq: jsonrpcReq,
67-
// Set the origin of the request as USER (i.e. organic relay)
68-
// The request is from a user.
69-
requestOrigin: qosobservations.RequestOrigin_REQUEST_ORIGIN_ORGANIC,
70-
endpointStore: rv.endpointStore,
71-
}, true
95+
// TODO_UPNEXT(@adshmh): Adjust the error response based on request type: single JSONRPC vs. batch JSONRPC.
96+
// Only log a preview of the request body (first 1000 bytes or less) to avoid excessive logging
97+
requestPreview := string(body[:min(maxErrMessageLen, len(body))])
98+
logger.Error().Err(err).Msgf("❌ Solana endpoint will fail QoS check because JSONRPC request could not be parsed. Request preview: %s", requestPreview)
99+
return rv.createRequestUnmarshalingFailureContext(jsonrpc.ID{}, err), false
72100
}
73101

74102
// createHTTPBodyReadFailureContext:
@@ -169,36 +197,3 @@ func (rv *requestValidator) createHTTPBodyReadFailureObservation(
169197
},
170198
}
171199
}
172-
173-
// TODO_TECHDEBT(@adshmh): Support Batch JSONRPC requests per spec:
174-
// https://www.jsonrpc.org/specification#batch
175-
//
176-
// TODO_MVP(@adshmh): Add a JSON-RPC request validator to reject invalid/unsupported method calls early.
177-
//
178-
// parseJSONRPCFromRequestBody:
179-
// - Attempts to unmarshal HTTP request body into a JSONRPC request structure
180-
// - On failure:
181-
// - Logs a preview of the request body (truncated for security/performance)
182-
// - Logs the specific error
183-
//
184-
// Parameters:
185-
// - logger: Logger for structured logging
186-
// - requestBody: Raw HTTP request body bytes
187-
//
188-
// Returns:
189-
// - jsonrpc.Request: Parsed request (empty on error)
190-
// - error: Any error encountered during parsing
191-
func parseJSONRPCFromRequestBody(
192-
logger polylog.Logger,
193-
requestBody []byte,
194-
) (jsonrpc.Request, error) {
195-
var jsonrpcRequest jsonrpc.Request
196-
err := json.Unmarshal(requestBody, &jsonrpcRequest)
197-
if err != nil {
198-
// Only log a preview of the request body (first 1000 bytes or less) to avoid excessive logging
199-
requestPreview := string(requestBody[:min(maxErrMessageLen, len(requestBody))])
200-
logger.Error().Err(err).Msgf("❌ Solana endpoint will fail QoS check because JSONRPC request could not be parsed. Request preview: %s", requestPreview)
201-
}
202-
203-
return jsonrpcRequest, err
204-
}

0 commit comments

Comments
 (0)