Skip to content

Commit 2409c43

Browse files
adshmhcommoddity
andauthored
[Protocol][Shannon] Shannon: add a parallel fallback request after 1 second (#396)
## Summary Shannon: add a parallel fallback request after 1 second ### Primary Changes: - Shannon: add a parallel fallback request after 1 second ## Issue Success rate drop due to endpoint slowness. - Issue or PR: #{ISSUE_OR_PR_NUMBER} ## Type of change Select one or more from the following: - [x] New feature, functionality or library - [ ] Bug fix - [ ] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## QoS Checklist ### E2E Validation & Tests - [ ] `make path_up` - [ ] `make test_e2e_evm_shannon` ### Observability - [ ] 1. `make path_up` - [ ] 2. Run the following E2E test: `make test_request__shannon_relay_util_100` - [ ] 3. View results in LocalNet's [PATH Relay Grafana Dashboard](http://localhost:3003/d/relays/path-service-requests) ## Sanity Checklist - [ ] I have updated the GitHub Issue `assignees`, `reviewers`, `labels`, `project`, `iteration` and `milestone` - [ ] For docs, I have run `make docusaurus_start` - [ ] For code, I have run `make test_all` - [ ] For configurations, I have updated the documentation - [ ] I added `TODO`s where applicable --------- Co-authored-by: commoddity <[email protected]>
1 parent 8fd27b2 commit 2409c43

File tree

2 files changed

+112
-13
lines changed

2 files changed

+112
-13
lines changed

protocol/shannon/context.go

Lines changed: 109 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"maps"
8+
"math/rand"
89
"net/http"
910
"strconv"
1011
"time"
@@ -23,6 +24,11 @@ import (
2324
"github.com/buildwithgrove/path/websockets"
2425
)
2526

27+
// TODO_TECHDEBT(@adshmh): Make this threshold configurable.
28+
//
29+
// Maximum time to wait before using a fallback endpoint.
30+
const maxWaitBeforeFallbackMillisecond = 1000
31+
2632
// Maximum endpoint payload length for error logging (100 chars)
2733
const maxEndpointPayloadLenForLogging = 100
2834

@@ -73,6 +79,8 @@ type requestContext struct {
7379

7480
// HTTP client used for sending relay requests to endpoints while also capturing various debug metrics
7581
httpClient *httpClientWithDebugMetrics
82+
83+
fallbackEndpoints map[protocol.EndpointAddr]endpoint
7684
}
7785

7886
// HandleServiceRequest:
@@ -89,23 +97,20 @@ func (rc *requestContext) HandleServiceRequest(payload protocol.Payload) (protoc
8997
// Record endpoint query time.
9098
endpointQueryTime := time.Now()
9199

92-
// Initialize relay response and error.
93100
var (
94101
relayResponse protocol.Response
95102
err error
96103
)
97104
if rc.selectedEndpoint.IsFallback() {
98105
// If the selected endpoint is a fallback endpoint, send the relay request to the fallback endpoint.
99106
// This will bypass protocol-level request processing and validation, meaning the request is not sent to a RelayMiner.
100-
relayResponse, err = rc.sendFallbackRelay(rc.logger, payload)
107+
relayResponse, err = rc.sendFallbackRelay(rc.logger, rc.selectedEndpoint, payload)
101108
} else {
109+
// TODO_TECHDEBT(@adshmh): Separate error handling for fallback and Shannon endpoints.
102110
// If the selected endpoint is not a fallback endpoint, send the relay request to the selected protocolendpoint.
103-
relayResponse, err = rc.sendProtocolRelay(payload)
111+
relayResponse, err = rc.sendRelayWithFallback(payload)
104112
}
105113

106-
// Ensure the endpoint address is set on the response in all cases, error or success.
107-
relayResponse.EndpointAddr = rc.selectedEndpoint.Addr()
108-
109114
// Handle endpoint error and capture RelayMinerError data if available.
110115
if err != nil {
111116
// Pass the response (which may contain RelayMinerError data) to error handler.
@@ -201,6 +206,87 @@ func buildHeaders(payload protocol.Payload) map[string]string {
201206
return headers
202207
}
203208

209+
// sendRelayWithFallback:
210+
// - Attempts Shannon endpoint with timeout
211+
// - Falls back to random fallback endpoint on failure/timeout
212+
// - Shields user from endpoint errors
213+
func (rc *requestContext) sendRelayWithFallback(payload protocol.Payload) (protocol.Response, error) {
214+
// TODO_TECHDEBT(@adshmh): Replace this with intelligent fallback.
215+
216+
// Setup Shannon endpoint request:
217+
// - Create channel for async response
218+
// - Initialize response variables
219+
shannonEndpointResponseReceivedChan := make(chan error, 1)
220+
var (
221+
shannonEndpointResponse protocol.Response
222+
shannonEndpointErr error
223+
)
224+
225+
// Send Shannon relay in parallel:
226+
// - Execute request asynchronously
227+
// - Signal completion via channel
228+
go func() {
229+
shannonEndpointResponse, shannonEndpointErr = rc.sendProtocolRelay(payload)
230+
shannonEndpointResponseReceivedChan <- shannonEndpointErr
231+
}()
232+
233+
logger := rc.logger.With("timeout_ms", maxWaitBeforeFallbackMillisecond)
234+
235+
// Wait for Shannon response or timeout:
236+
// - Return Shannon response if successful
237+
// - Fall back on error or timeout
238+
select {
239+
case err := <-shannonEndpointResponseReceivedChan:
240+
if err == nil {
241+
return shannonEndpointResponse, nil
242+
}
243+
244+
logger.Info().Err(err).Msg("Error getting a valid response from the selected Shannon endpoint. Using a fallback endpoint.")
245+
246+
// Shannon endpoint failed, use fallback
247+
return rc.sendRelayToARandomFallbackEndpoint(payload)
248+
249+
// Shannon endpoint timeout, use fallback
250+
case <-time.After(time.Duration(maxWaitBeforeFallbackMillisecond) * time.Millisecond):
251+
logger.Info().Msg("Timed out waiting for Shannon endpoint to respond. Using a fallback endpoint.")
252+
253+
// Use a random fallback endpoint
254+
return rc.sendRelayToARandomFallbackEndpoint(payload)
255+
}
256+
}
257+
258+
// sendRelayToARandomFallbackEndpoint:
259+
// - Selects random fallback endpoint
260+
// - Routes payload via selected endpoint
261+
// - Returns error if no endpoints available
262+
func (rc *requestContext) sendRelayToARandomFallbackEndpoint(payload protocol.Payload) (protocol.Response, error) {
263+
if len(rc.fallbackEndpoints) == 0 {
264+
rc.logger.Warn().Msg("SHOULD HAPPEN RARELY: no fallback endpoints available for the service")
265+
return protocol.Response{}, fmt.Errorf("no fallback endpoints available")
266+
}
267+
268+
logger := rc.logger.With("method", "sendRelayToARandomFallbackEndpoint")
269+
270+
// Select random fallback endpoint:
271+
// - Convert map to slice for random selection
272+
// - Pick random index
273+
allFallbackEndpoints := make([]endpoint, 0, len(rc.fallbackEndpoints))
274+
for _, endpoint := range rc.fallbackEndpoints {
275+
allFallbackEndpoints = append(allFallbackEndpoints, endpoint)
276+
}
277+
fallbackEndpoint := allFallbackEndpoints[rand.Intn(len(allFallbackEndpoints))]
278+
279+
// Send relay and handle response:
280+
// - Use selected fallback endpoint
281+
// - Log unexpected errors
282+
relayResponse, err := rc.sendFallbackRelay(logger, fallbackEndpoint, payload)
283+
if err != nil {
284+
logger.Warn().Err(err).Msg("SHOULD NEVER HAPPEN: fallback endpoint returned an error.")
285+
}
286+
287+
return relayResponse, err
288+
}
289+
204290
// sendProtocolRelay:
205291
// - Sends the supplied payload as a relay request to the endpoint selected via SelectEndpoint.
206292
// - Enhanced error handling for more fine-grained endpoint error type classification.
@@ -232,21 +318,28 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol.
232318
// Send the HTTP request to the protocol endpoint.
233319
httpRelayResponseBz, _, err := rc.sendHTTPRequest(hydratedLogger, payload, rc.selectedEndpoint.PublicURL(), relayRequestBz)
234320
if err != nil {
235-
return protocol.Response{}, err
321+
return protocol.Response{
322+
EndpointAddr: rc.selectedEndpoint.Addr(),
323+
}, err
236324
}
237325

238326
// Validate and process the response
239327
response, err := rc.validateAndProcessResponse(hydratedLogger, httpRelayResponseBz)
240328
if err != nil {
241-
return protocol.Response{}, err
329+
return protocol.Response{
330+
EndpointAddr: rc.selectedEndpoint.Addr(),
331+
}, err
242332
}
243333

244334
// Deserialize the response
245335
deserializedResponse, err := rc.deserializeRelayResponse(response)
246336
if err != nil {
247-
return protocol.Response{}, err
337+
return protocol.Response{
338+
EndpointAddr: rc.selectedEndpoint.Addr(),
339+
}, err
248340
}
249341

342+
deserializedResponse.EndpointAddr = rc.selectedEndpoint.Addr()
250343
return deserializedResponse, nil
251344
}
252345

@@ -393,12 +486,13 @@ func buildUnsignedRelayRequest(
393486
// - Returns the response received from the fallback endpoint.
394487
func (rc *requestContext) sendFallbackRelay(
395488
hydratedLogger polylog.Logger,
489+
selectedEndpoint endpoint,
396490
payload protocol.Payload,
397491
) (protocol.Response, error) {
398492
// Get the fallback URL for the selected endpoint.
399493
// If the RPC type is unknown or not configured for the
400494
// service, `endpointFallbackURL` will be the default URL.
401-
endpointFallbackURL := rc.selectedEndpoint.FallbackURL(payload.RPCType)
495+
endpointFallbackURL := selectedEndpoint.FallbackURL(payload.RPCType)
402496

403497
// Prepare the fallback URL with optional path
404498
fallbackURL := prepareURLFromPayload(endpointFallbackURL, payload)
@@ -411,14 +505,16 @@ func (rc *requestContext) sendFallbackRelay(
411505
[]byte(payload.Data),
412506
)
413507
if err != nil {
414-
return protocol.Response{}, err
508+
return protocol.Response{
509+
EndpointAddr: selectedEndpoint.Addr(),
510+
}, err
415511
}
416512

417513
// Build and return the fallback response
418514
return protocol.Response{
419515
Bytes: httpResponseBz,
420516
HTTPStatusCode: httpStatusCode,
421-
EndpointAddr: rc.selectedEndpoint.Addr(),
517+
EndpointAddr: selectedEndpoint.Addr(),
422518
}, nil
423519
}
424520

@@ -556,7 +652,7 @@ func (rc *requestContext) sendHTTPRequest(
556652

557653
// TODO_INVESTIGATE: Evaluate the impact of `rc.context` vs `context.TODO`
558654
// with respect to handling timeouts.
559-
ctxWithTimeout, cancelFn := context.WithTimeout(rc.context, timeout)
655+
ctxWithTimeout, cancelFn := context.WithTimeout(context.TODO(), timeout)
560656
defer cancelFn()
561657

562658
// Build headers including RPCType header

protocol/shannon/protocol.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func (p *Protocol) BuildRequestContextForEndpoint(
245245
return nil, buildProtocolContextSetupErrorObservation(serviceID, err), err
246246
}
247247

248+
fallbackEndpoints, _ := p.getServiceFallback(serviceID)
248249
// Return new request context for the pre-selected endpoint
249250
return &requestContext{
250251
logger: p.logger,
@@ -254,6 +255,8 @@ func (p *Protocol) BuildRequestContextForEndpoint(
254255
serviceID: serviceID,
255256
relayRequestSigner: permittedSigner,
256257
httpClient: p.httpClient,
258+
// Pass the list of fallback endpoints for the service
259+
fallbackEndpoints: fallbackEndpoints,
257260
}, protocolobservations.Observations{}, nil
258261
}
259262

0 commit comments

Comments
 (0)