-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use RetryPolicy
to keep SignalR connection alive and replace List<RealtimeData>
with ImmutableList<RealtimeData>
#24
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach makes sense of looking for old data as a trigger to reconnect to the signalr bus but need to review the specifics still.
@@ -41,12 +43,36 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa | |||
/// <returns>A collection of arriving trains.</returns> | |||
public Task<IEnumerable<RealtimeData>> GetRealtimeData(Station station) | |||
{ | |||
return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow)); | |||
var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)); | |||
var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Worried that this condition will always be met when the headways are really large... it may be fine. Need to double check.
Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated); | ||
|
||
Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection..."); | ||
Task.Run(this.CreateHubConnections).Wait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you make this method async you could await CreateHubConnections and just return freshData (rather than needing to wrap it in a task).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think once I deal with the concurrency issue of CreateHubConnections
I'll avoid calling .Wait()
so this function can return immediately.
Currently there are 2 potential issues:
if (!this.hubConnections.ContainsKey((station, direction)))
This check is not reliable as without lock another thread can substitute the connection with a new one and the condition will still return true.
private async Task CloseExistingHubConnections()
{
// Materialize the connections so we can clear the dictionary before disconnecting.
// Otherwise, we will reconnect before reinitializing the connection (potentially
// causing a loop if the token changes).
var connections = this.hubConnections.Values.ToArray();
this.hubConnections.Clear();
await Task.WhenAll(connections.Select(async (client) => await client.DisposeAsync()));
}
Another thread might be able to run between .ToArray()
and .Clear()
such that it will try to dispose the connections again.
The goal is to make sure once an HubConnection
instance is retrieved from dictionary it remain exclusive to the thread until being put back(or not), maybe TryRemove
will do.
Once refactor is done we should be able to reconnect a single HubConnection
for more fine-grained control, if staled data is detected for the (Station, RouteDirection)
pair.
I noticed that some connection start getting 401 in automatic retry(endless loop) while others are fine, I wonder if we should force re-download SQL DB in this case. |
Currently there are 1 enhancement regarding reconnection:
|
This change can potentially fix high error rate of realtime endpoint.
closes: #23
Other than the added logs that will help with future debugging, there are 2 problems this PR would solve:
RetryPolicy
, which should avoid the need to listen to theClosed
event.ImmutableList
withConcurrentDictionary
should help with the potential concurrency issues, asaddValueFactory
may be called multiple times, see link.There are 2 problems this PR would not solve for now:
ConcurrentDictionary
usingGetRealtimeData
method is sometimes not very performant, it maybe due to GRPC server itself but I need more time to investigate, the GRPC connection simply stuck and can take long enough for the gateway to timeout the connection.ConcurrentDictionary
to storeHubConnection
suffer from similar issue as the problem mentioned above, asaddValueFactory
may be called multiple times however since it is not used as often as theConcurrentDictionary
used to storeList<RealtimeData>
I never observed any problem in production.There are 1 problem I have no clue how to solve:
upcomingTrains
array.Recent monitoring from my own instance:
Seems to be relatively stable after the changes.