Skip to content

Commit 9ca874e

Browse files
commoddityadshmhOlshansk
authored
[WebSockets] Clarify WebSockets implementation package responsibilities and add Observations to Metrics and Data (#386)
## 🌿 Summary Clarify WebSockets implementation package responsibilities and add Observations to Metrics and Data ### 🌱 Primary Changes: - Add WebSocket connection and message observations to protocol observations for metrics and data reporting - Create dedicated `gateway` package websocketRequestContext to handle WebSocket-specific request processing separate from HTTP - Implement WebSocket message processing in `gateway` package to orchestrate `protocol` and `QoS`-level message processing - Add WebSocket-specific metrics counters and error tracking in Shannon metrics ### 🍃 Secondary changes: - Update documentation with WebSocket connection examples using wscat - Add WebSocket-specific error types to gateway and protocol error classifications - Refactor bridge implementation to be protocol-agnostic with message processor interface - Rename HTTP-specific request context files for clarity ## 💡 New TODOs New TODOs introduced in this PR: - TODO_TECHDEBT(@adshmh,@commoddity): Use ParseHTTPRequest as the single entry point to QoS, including for a WebSocket request. - gateway/websocket_request_context.go - TODO_TECHDEBT(@commoddity): process message using QoS context and update the message observations. messageObservations.Qos = wrc.qosCtx.ProcessProtocolEndpointWebsocketMessage(msgData) - gateway/websocket_request_context.go - TODO_NEXT(@commoddity): Introduce correct error classification for WebSocket errors. - Error creating a WebSocket connection. - Error signing the relay request. - Error validating the relay response. - protocol/shannon/sanctions.go - TODO_IMPROVE(@commoddity,@adshmh): Cleanly separate fallback endpoint handling from the protocol package. - protocol/shannon/websocket_context.go ## 🛠️ Type of change Select one or more from the following: - [x] New feature, functionality or library ## 🤯 Sanity Checklist - [x] I have updated the GitHub Issue 'assignees', 'reviewers', 'labels', 'project', 'iteration' and 'milestone' - [x] For code, I have run 'make test_all' - [x] I added TODOs where applicable --------- Co-authored-by: Arash Deshmeh <[email protected]> Co-authored-by: Daniel Olshansky <[email protected]>
1 parent c2d1456 commit 9ca874e

29 files changed

+3140
-884
lines changed

data/legacy_gateway.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ func setLegacyFieldsFromGatewayObservations(
6363
legacyRecord.RequestTimestamp = formatTimestampPbForBigQueryJSON(observations.ReceivedTime)
6464

6565
// Request processing time, in seconds.
66-
legacyRecord.RequestRoundTripTime = float64(observations.CompletedTime.AsTime().Sub(observations.ReceivedTime.AsTime()).Milliseconds())
66+
// - For HTTP requests, this is the round-trip time.
67+
// - For WebSocket requests, this is the total elapsed time the WebSocket connection was open.
68+
legacyRecord.RequestRoundTripTime = float64(observations.CompletedTime.AsTime().Sub(observations.ReceivedTime.AsTime()).Milliseconds()) / 1000
6769

6870
return legacyRecord
6971
}

data/legacy_protocol_shannon.go

Lines changed: 173 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,36 @@ func setLegacyFieldsFromShannonProtocolObservations(
5050
return legacyRecord
5151
}
5252

53-
endpointObservations := observations.GetEndpointObservations()
53+
// Handle different observation types based on the oneof field
54+
switch obsData := observations.GetObservationData().(type) {
55+
56+
// HTTP observations
57+
case *protocolobservation.ShannonRequestObservations_HttpObservations:
58+
return setLegacyFieldsFromHTTPObservations(logger, legacyRecord, obsData.HttpObservations)
59+
60+
// WebSocket connection observations
61+
case *protocolobservation.ShannonRequestObservations_WebsocketConnectionObservation:
62+
return setLegacyFieldsFromWebsocketConnectionObservation(logger, legacyRecord, obsData.WebsocketConnectionObservation)
63+
64+
// WebSocket message observations
65+
case *protocolobservation.ShannonRequestObservations_WebsocketMessageObservation:
66+
return setLegacyFieldsFromWebsocketMessageObservation(logger, legacyRecord, obsData.WebsocketMessageObservation)
67+
68+
// Unknown observation type
69+
default:
70+
logger.Warn().Msg("Unknown observation type received for legacy record processing")
71+
return legacyRecord
72+
}
73+
}
74+
75+
// setLegacyFieldsFromHTTPObservations populates legacy record with HTTP endpoint observation data.
76+
// This handles the original HTTP relay processing logic.
77+
func setLegacyFieldsFromHTTPObservations(
78+
logger polylog.Logger,
79+
legacyRecord *legacyRecord,
80+
httpObservations *protocolobservation.ShannonHTTPEndpointObservations,
81+
) *legacyRecord {
82+
endpointObservations := httpObservations.GetEndpointObservations()
5483
// No endpoint observations: this should not happen as the request has not error set.
5584
// Log a warning entry.
5685
if len(endpointObservations) == 0 {
@@ -81,19 +110,160 @@ func setLegacyFieldsFromShannonProtocolObservations(
81110
// Will be "fallback" in the case of a request sent to a fallback endpoint.
82111
legacyRecord.NodeAddress = endpointObservation.GetSupplier()
83112

84-
// Extract the endpoint's domain from its URL.
113+
// Extract and set the endpoint's domain from its URL.
114+
// Empty value if parsing the URL above failed.
85115
endpointDomain, err := shannonmetrics.ExtractDomainOrHost(endpointObservation.GetEndpointUrl())
86116
if err != nil {
87117
logger.With("endpoint_url", endpointObservation.EndpointUrl).Warn().Err(err).Msg("Could not extract domain from Shannon endpoint URL")
88118
return legacyRecord
89119
}
120+
legacyRecord.NodeDomain = endpointDomain
90121

91-
// Set the endpoint domain field: empty value if parsing the URL above failed.
122+
return legacyRecord
123+
}
124+
125+
// setLegacyFieldsFromWebsocketConnectionObservation populates legacy record with WebSocket connection observation data.
126+
// This handles WebSocket connection lifecycle (not individual messages).
127+
func setLegacyFieldsFromWebsocketConnectionObservation(
128+
logger polylog.Logger,
129+
legacyRecord *legacyRecord,
130+
wsConnectionObs *protocolobservation.ShannonWebsocketConnectionObservation,
131+
) *legacyRecord {
132+
// Update error fields if a connection error has occurred.
133+
legacyRecord = setLegacyErrFieldsFromWebsocketConnectionError(legacyRecord, wsConnectionObs)
134+
135+
// Set application address
136+
legacyRecord.ProtocolAppPublicKey = wsConnectionObs.GetEndpointAppAddress()
137+
138+
// WebSocket connections don't have separate query/response timestamps at the protocol level.
139+
// Connection timing is tracked at the gateway level instead.
140+
// Set both timestamps to empty strings to indicate they don't apply.
141+
legacyRecord.NodeQueryTimestamp = ""
142+
legacyRecord.NodeReceiveTimestamp = ""
143+
legacyRecord.endpointTripTime = 0
144+
145+
// Set endpoint address to the supplier address.
146+
legacyRecord.NodeAddress = wsConnectionObs.GetSupplier()
147+
148+
// Extract and set the endpoint's domain from its URL.
149+
// Empty value if parsing the URL above failed.
150+
endpointDomain, err := shannonmetrics.ExtractDomainOrHost(wsConnectionObs.GetEndpointUrl())
151+
if err != nil {
152+
logger.With("endpoint_url", wsConnectionObs.EndpointUrl).Warn().Err(err).Msg("Could not extract domain from WebSocket endpoint URL")
153+
return legacyRecord
154+
}
92155
legacyRecord.NodeDomain = endpointDomain
93156

94157
return legacyRecord
95158
}
96159

160+
// setLegacyFieldsFromWebsocketMessageObservation populates legacy record with WebSocket message observation data.
161+
// This handles individual WebSocket messages sent over an established connection.
162+
func setLegacyFieldsFromWebsocketMessageObservation(
163+
logger polylog.Logger,
164+
legacyRecord *legacyRecord,
165+
wsMessageObs *protocolobservation.ShannonWebsocketMessageObservation,
166+
) *legacyRecord {
167+
// Update error fields if a message error has occurred.
168+
legacyRecord = setLegacyErrFieldsFromWebsocketMessageError(legacyRecord, wsMessageObs)
169+
170+
// Set application address
171+
legacyRecord.ProtocolAppPublicKey = wsMessageObs.GetEndpointAppAddress()
172+
173+
// WebSocket messages lack separate request/response cycles - timestamps don't apply
174+
// Set both timestamps to empty strings as requested by @fredteumer
175+
legacyRecord.NodeQueryTimestamp = ""
176+
// TODO_REVISIT: Can individual websocket message have a receive timestamp?
177+
legacyRecord.NodeReceiveTimestamp = ""
178+
179+
// WebSocket messages have no request/response latency - set to 0 as it doesn't apply
180+
legacyRecord.endpointTripTime = 0
181+
182+
// Set endpoint address to the supplier address.
183+
legacyRecord.NodeAddress = wsMessageObs.GetSupplier()
184+
185+
// Extract and set the endpoint's domain from its URL.
186+
// Empty value if parsing the URL above failed.
187+
endpointDomain, err := shannonmetrics.ExtractDomainOrHost(wsMessageObs.GetEndpointUrl())
188+
if err != nil {
189+
logger.With("endpoint_url", wsMessageObs.EndpointUrl).Warn().Err(err).Msg("Could not extract domain from WebSocket message endpoint URL")
190+
return legacyRecord
191+
}
192+
legacyRecord.NodeDomain = endpointDomain
193+
194+
// WebSocket messages lack HTTP-style methods and JSON-RPC extraction is QoS-level - using identifier for analytics
195+
// TODO_TECHDEBT(@adshmh,@commoddity): When QoS observations for WebSocket messages are added,
196+
// use the method from the QoS observations and move this to a new method in the `legacy_qos.go` file.
197+
legacyRecord.ChainMethod = "websocket_message"
198+
199+
// Using MessagePayloadSize as closest equivalent to HTTP request size for bandwidth analytics
200+
legacyRecord.RequestDataSize = float64(wsMessageObs.GetMessagePayloadSize())
201+
202+
return legacyRecord
203+
}
204+
205+
// setLegacyErrFieldsFromWebsocketConnectionError populates error fields in legacy record from WebSocket connection error data.
206+
func setLegacyErrFieldsFromWebsocketConnectionError(
207+
legacyRecord *legacyRecord,
208+
wsConnectionObs *protocolobservation.ShannonWebsocketConnectionObservation,
209+
) *legacyRecord {
210+
endpointErr := wsConnectionObs.ErrorType
211+
// No endpoint error has occurred: no error processing required.
212+
if endpointErr == nil {
213+
return legacyRecord
214+
}
215+
216+
// Update ErrorType using the observed endpoint error.
217+
legacyRecord.ErrorType = endpointErr.String()
218+
219+
// Build the endpoint error details, including any sanctions.
220+
var errMsg string
221+
if errDetails := wsConnectionObs.GetErrorDetails(); errDetails != "" {
222+
errMsg = fmt.Sprintf("error details: %s", errDetails)
223+
}
224+
225+
// Add the sanction details to the error message.
226+
if endpointSanction := wsConnectionObs.RecommendedSanction; endpointSanction != nil {
227+
errMsg = fmt.Sprintf("%s, sanction: %s", errMsg, endpointSanction.String())
228+
}
229+
230+
// Set the error message field.
231+
legacyRecord.ErrorMessage = errMsg
232+
233+
return legacyRecord
234+
}
235+
236+
// setLegacyErrFieldsFromWebsocketMessageError populates error fields in legacy record from WebSocket message error data.
237+
func setLegacyErrFieldsFromWebsocketMessageError(
238+
legacyRecord *legacyRecord,
239+
wsMessageObs *protocolobservation.ShannonWebsocketMessageObservation,
240+
) *legacyRecord {
241+
endpointErr := wsMessageObs.ErrorType
242+
// No endpoint error has occurred: no error processing required.
243+
if endpointErr == nil {
244+
return legacyRecord
245+
}
246+
247+
// Update ErrorType using the observed endpoint error.
248+
legacyRecord.ErrorType = endpointErr.String()
249+
250+
// Build the endpoint error details, including any sanctions.
251+
var errMsg string
252+
if errDetails := wsMessageObs.GetErrorDetails(); errDetails != "" {
253+
errMsg = fmt.Sprintf("error details: %s", errDetails)
254+
}
255+
256+
// Add the sanction details to the error message.
257+
if endpointSanction := wsMessageObs.RecommendedSanction; endpointSanction != nil {
258+
errMsg = fmt.Sprintf("%s, sanction: %s", errMsg, endpointSanction.String())
259+
}
260+
261+
// Set the error message field.
262+
legacyRecord.ErrorMessage = errMsg
263+
264+
return legacyRecord
265+
}
266+
97267
// setLegacyErrFieldsFromShannonEndpointError populates error fields in legacy record from endpoint error data.
98268
// It handles:
99269
// - Error type mapping

docusaurus/docs/develop/path/2_cheatsheet_shannon.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Skip to [2.1 Generate Shannon Config](#21-generate-shannon-config).
4747
- [3.2 Check configured services](#32-check-configured-services)
4848
- [4. Test Relays](#4-test-relays)
4949
- [Test Relay with `curl`](#test-relay-with-curl)
50+
- [Test WebSockets with `wscat`](#test-websockets-with-wscat)
5051
- [Load Testing with `relay-util`](#load-testing-with-relay-util)
5152
- [5. Stop PATH](#5-stop-path)
5253

@@ -268,6 +269,47 @@ Expected response:
268269
{ "id": 1, "jsonrpc": "2.0", "result": "0x2f01a" }
269270
```
270271
272+
### Test WebSockets with `wscat`
273+
274+
:::tip
275+
276+
For `wscat` installation instructions, see [here](https://github.com/ArtiomL/wscat?tab=readme-ov-file#installation).
277+
278+
:::
279+
280+
```bash
281+
wscat -c ws://localhost:3070/v1 \
282+
-H "Authorization: test_api_key" \
283+
-H "Target-Service-Id: xrplevm"
284+
```
285+
286+
Expected terminal prompt:
287+
288+
```bash
289+
Connected (press CTRL+C to quit)
290+
>
291+
```
292+
293+
Sample WebSocket request/response:
294+
295+
```bash
296+
> {"jsonrpc": "2.0", "id": 1, "method": "eth_blockNumber" }
297+
< {"id":1,"jsonrpc":"2.0","result":"0x17cbc3"}
298+
299+
> {"jsonrpc": "2.0", "id": 1, "method": "eth_blockNumber" }
300+
< {"id":1,"jsonrpc":"2.0","result":"0x17cbc4"}
301+
```
302+
303+
:::info
304+
305+
This is a simple terminal-based WebSocket example and does not contain reconnection logic.
306+
307+
Connections will drop on session rollover, which is expected behavior.
308+
309+
In production environments, you should implement reconnection logic and handle errors gracefully.
310+
311+
:::
312+
271313
### Load Testing with `relay-util`
272314
273315
Send 100 requests with performance metrics:

e2e/config/e2e_load_test.config.tmpl.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ e2e_load_test_config:
4646
# Default configuration applied to all test cases (unless overridden)
4747
default_service_config:
4848
global_rps: 100 # Requests per second (shared across all methods)
49-
requests_per_method: 300 # Number of requests per method
49+
requests_per_method: 30 # Number of requests per method
5050
success_rate: 0.95 # Minimum required success rate (80%)
5151
max_p50_latency_ms: 10000ms # Max allowed P50 latency (ms)
5252
max_p95_latency_ms: 20000ms # Max allowed P95 latency (ms)

gateway/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,12 @@ var (
1414

1515
// Error building protocol contexts from HTTP request.
1616
errBuildProtocolContextsFromHTTPRequest = errors.New("error building protocol contexts from HTTP request")
17+
18+
// WebSocket request was rejected by QoS instance.
19+
// e.g. WebSocket subscription request validation failed.
20+
errWebsocketRequestRejectedByQoS = errors.New("WebSocket request rejected by QoS instance")
21+
22+
// WebSocket connection establishment failed.
23+
// e.g. Failed to upgrade HTTP connection to WebSocket or connect to endpoint.
24+
errWebsocketConnectionFailed = errors.New("WebSocket connection establishment failed")
1725
)

0 commit comments

Comments
 (0)