Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RetryPolicy to keep SignalR connection alive and replace List<RealtimeData> with ImmutableList<RealtimeData> #24

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

persuasive0pest
Copy link

This change can potentially fix high error rate of realtime endpoint.

closes: #23

Other than the added logs that will help with future debugging, there are 2 problems this PR would solve:

  • The reconnection is now handled by RetryPolicy, which should avoid the need to listen to the Closed event.
  • Using ImmutableList with ConcurrentDictionary should help with the potential concurrency issues, as addValueFactory may be called multiple times, see link.

There are 2 problems this PR would not solve for now:

  • It seems reading from ConcurrentDictionary using GetRealtimeData method is sometimes not very performant, it maybe due to GRPC server itself but I need more time to investigate, the GRPC connection simply stuck and can take long enough for the gateway to timeout the connection.
  • It seems using ConcurrentDictionary to store HubConnection suffer from similar issue as the problem mentioned above, as addValueFactory may be called multiple times however since it is not used as often as the ConcurrentDictionary used to store List<RealtimeData> I never observed any problem in production.

There are 1 problem I have no clue how to solve:

  • It seems PATH sometimes simply don't feed messages for trains to a certain direction for a certain station, see a screenshot from official app for evidence. This result in trains missing from upcomingTrains array.

Screenshot_20240220_081414

Recent monitoring from my own instance:
image
Seems to be relatively stable after the changes.

@mrazza mrazza self-requested a review February 21, 2024 03:15
Copy link
Owner

@mrazza mrazza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approach makes sense of looking for old data as a trigger to reconnect to the signalr bus but need to review the specifics still.

server/PathServices/RetryPolicy.cs Outdated Show resolved Hide resolved
@@ -41,12 +43,36 @@ public SignalRRealtimeDataRepository(IPathDataRepository pathDataRepository, IPa
/// <returns>A collection of arriving trains.</returns>
public Task<IEnumerable<RealtimeData>> GetRealtimeData(Station station)
{
return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow));
var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ));
var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Worried that this condition will always be met when the headways are really large... it may be fine. Need to double check.

Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated);

Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection...");
Task.Run(this.CreateHubConnections).Wait();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make this method async you could await CreateHubConnections and just return freshData (rather than needing to wrap it in a task).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once I deal with the concurrency issue of CreateHubConnections I'll avoid calling .Wait() so this function can return immediately.
Currently there are 2 potential issues:

                    if (!this.hubConnections.ContainsKey((station, direction)))

This check is not reliable as without lock another thread can substitute the connection with a new one and the condition will still return true.

        private async Task CloseExistingHubConnections()
        {
            // Materialize the connections so we can clear the dictionary before disconnecting.
            // Otherwise, we will reconnect before reinitializing the connection (potentially
            // causing a loop if the token changes).
            var connections = this.hubConnections.Values.ToArray();
            this.hubConnections.Clear();

            await Task.WhenAll(connections.Select(async (client) => await client.DisposeAsync()));
        }

Another thread might be able to run between .ToArray() and .Clear() such that it will try to dispose the connections again.

The goal is to make sure once an HubConnection instance is retrieved from dictionary it remain exclusive to the thread until being put back(or not), maybe TryRemove will do.

Once refactor is done we should be able to reconnect a single HubConnection for more fine-grained control, if staled data is detected for the (Station, RouteDirection) pair.

server/PathServices/SignalRRealtimeDataRepository.cs Outdated Show resolved Hide resolved
@persuasive0pest
Copy link
Author

I noticed that some connection start getting 401 in automatic retry(endless loop) while others are fine, I wonder if we should force re-download SQL DB in this case.

@persuasive0pest
Copy link
Author

Currently there are 1 enhancement regarding reconnection:

  • When staled data is detected, reconnect SignalR connections. This should be further improved so we can reconnect the specific connection responsible for the staled data. This can be done without affecting any working connections with minimized side effect.
    It is clear that there 2 additional enhancements needed:
  • When SignalR connection receiving no message for a given amount of time, force reconnect the specific <station, direction> connection. This can be done without affecting any working connections with minimized side effect.
  • When any SignalR connection retries reached a certain amount, download SQL DB again and reconnect all connections. This should be carefully calculated so it will not be happening too often as it can affect working connections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

API error rate is high for realtime endpoint
2 participants