5
5
using System . Threading ;
6
6
using System . Threading . Tasks ;
7
7
using Microsoft . Extensions . Logging ;
8
+ using Microsoft . Extensions . Options ;
9
+ using Orleans . Configuration ;
8
10
9
11
#nullable enable
10
12
namespace Orleans . Runtime . MembershipService . SiloMetadata ;
11
13
12
14
internal class SiloMetadataCache (
13
15
ISiloMetadataClient siloMetadataClient ,
14
16
MembershipTableManager membershipTableManager ,
17
+ IOptions < ClusterMembershipOptions > clusterMembershipOptions ,
15
18
ILogger < SiloMetadataCache > logger )
16
19
: ISiloMetadataCache , ILifecycleParticipant < ISiloLifecycle > , IDisposable
17
20
{
18
21
private readonly ConcurrentDictionary < SiloAddress , SiloMetadata > _metadata = new ( ) ;
22
+ private readonly Dictionary < SiloAddress , DateTime > _negativeCache = new ( ) ;
19
23
private readonly CancellationTokenSource _cts = new ( ) ;
24
+ private TimeSpan negativeCachePeriod ;
20
25
21
26
void ILifecycleParticipant < ISiloLifecycle > . Participate ( ISiloLifecycle lifecycle )
22
27
{
23
28
Task ? task = null ;
24
29
Task OnStart ( CancellationToken _ )
25
30
{
31
+ // This gives time for the cluster to be voted Dead and for membership updates to propagate that out
32
+ negativeCachePeriod = clusterMembershipOptions . Value . ProbeTimeout * clusterMembershipOptions . Value . NumMissedProbesLimit
33
+ + ( 2 * clusterMembershipOptions . Value . TableRefreshTimeout ) ;
26
34
task = Task . Run ( ( ) => this . ProcessMembershipUpdates ( _cts . Token ) ) ;
27
35
return Task . CompletedTask ;
28
36
}
@@ -51,26 +59,39 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
51
59
await foreach ( var update in membershipTableManager . MembershipTableUpdates . WithCancellation ( ct ) )
52
60
{
53
61
// Add entries for members that aren't already in the cache
54
- foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status is SiloStatus . Active or SiloStatus . Joining ) )
62
+ var now = DateTime . UtcNow ;
63
+ var recentlyActiveSilos = update . Entries
64
+ . Where ( e => e . Value . Status is SiloStatus . Active or SiloStatus . Joining )
65
+ . Where ( e => ! e . Value . HasMissedIAmAlives ( clusterMembershipOptions . Value , now ) ) ;
66
+ foreach ( var membershipEntry in recentlyActiveSilos )
55
67
{
56
68
if ( ! _metadata . ContainsKey ( membershipEntry . Key ) )
57
69
{
70
+ if ( _negativeCache . TryGetValue ( membershipEntry . Key , out var expiration ) && expiration > now )
71
+ {
72
+ continue ;
73
+ }
58
74
try
59
75
{
60
76
var metadata = await siloMetadataClient . GetSiloMetadata ( membershipEntry . Key ) . WaitAsync ( ct ) ;
61
77
_metadata . TryAdd ( membershipEntry . Key , metadata ) ;
78
+ _negativeCache . Remove ( membershipEntry . Key , out _ ) ;
62
79
}
63
80
catch ( Exception exception )
64
81
{
82
+ _negativeCache . TryAdd ( membershipEntry . Key , now + negativeCachePeriod ) ;
65
83
logger . LogError ( exception , "Error fetching metadata for silo {Silo}" , membershipEntry . Key ) ;
66
84
}
67
85
}
68
86
}
69
87
70
88
// Remove entries for members that are now dead
71
- foreach ( var membershipEntry in update . Entries . Where ( e => e . Value . Status == SiloStatus . Dead ) )
89
+ var deadSilos = update . Entries
90
+ . Where ( e => e . Value . Status == SiloStatus . Dead ) ;
91
+ foreach ( var membershipEntry in deadSilos )
72
92
{
73
93
_metadata . TryRemove ( membershipEntry . Key , out _ ) ;
94
+ _negativeCache . Remove ( membershipEntry . Key , out _ ) ;
74
95
}
75
96
76
97
// Remove entries for members that are no longer in the table
@@ -79,6 +100,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
79
100
if ( ! update . Entries . ContainsKey ( silo ) )
80
101
{
81
102
_metadata . TryRemove ( silo , out _ ) ;
103
+ _negativeCache . Remove ( silo , out _ ) ;
82
104
}
83
105
}
84
106
}
@@ -102,4 +124,5 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
102
124
public void SetMetadata ( SiloAddress siloAddress , SiloMetadata metadata ) => _metadata . TryAdd ( siloAddress , metadata ) ;
103
125
104
126
public void Dispose ( ) => _cts . Cancel ( ) ;
105
- }
127
+ }
128
+
0 commit comments