1
1
#nullable enable
2
2
using System ;
3
+ using System . Collections . Concurrent ;
3
4
using System . Diagnostics . CodeAnalysis ;
4
5
using System . Runtime . ExceptionServices ;
5
6
using System . Threading . Tasks ;
7
+ using Microsoft . Extensions . DependencyInjection ;
6
8
using Microsoft . Extensions . Logging ;
7
9
using Orleans . Runtime ;
8
10
using Orleans . Serialization . Activators ;
@@ -19,11 +21,8 @@ namespace Orleans.Core
19
21
/// <seealso cref="IStorage{TState}" />
20
22
public class StateStorageBridge < TState > : IStorage < TState > , IGrainMigrationParticipant
21
23
{
22
- private readonly string _name ;
23
24
private readonly IGrainContext _grainContext ;
24
- private readonly IGrainStorage _store ;
25
- private readonly ILogger _logger ;
26
- private readonly IActivator < TState > _activator ;
25
+ private readonly StateStorageBridgeShared < TState > _shared ;
27
26
private GrainState < TState > ? _grainState ;
28
27
29
28
/// <inheritdoc/>
@@ -32,7 +31,12 @@ public TState State
32
31
get
33
32
{
34
33
GrainRuntime . CheckRuntimeContext ( RuntimeContext . Current ) ;
35
- return GrainState . State ;
34
+ if ( _grainState is { } grainState )
35
+ {
36
+ return grainState . State ;
37
+ }
38
+
39
+ return default ! ;
36
40
}
37
41
38
42
set
@@ -42,28 +46,32 @@ public TState State
42
46
}
43
47
}
44
48
45
- private GrainState < TState > GrainState => _grainState ??= new GrainState < TState > ( _activator . Create ( ) ) ;
46
- internal bool IsStateInitialized => _grainState != null ;
49
+ private GrainState < TState > GrainState => _grainState ??= new GrainState < TState > ( _shared . Activator . Create ( ) ) ;
50
+ internal bool IsStateInitialized { get ; private set ; }
47
51
48
52
/// <inheritdoc/>
49
- public string ? Etag { get => GrainState . ETag ; set => GrainState . ETag = value ; }
53
+ public string ? Etag { get => _grainState ? . ETag ; set => GrainState . ETag = value ; }
50
54
51
55
/// <inheritdoc/>
52
- public bool RecordExists => GrainState . RecordExists ;
56
+ public bool RecordExists => IsStateInitialized switch
57
+ {
58
+ true => GrainState . RecordExists ,
59
+ _ => throw new InvalidOperationException ( "State has not yet been loaded" )
60
+ } ;
61
+
62
+ [ Obsolete ( "Use StateStorageBridge(string, IGrainContext, IGrainStorage) instead." ) ]
63
+ public StateStorageBridge ( string name , IGrainContext grainContext , IGrainStorage store , ILoggerFactory loggerFactory , IActivatorProvider activatorProvider ) : this ( name , grainContext , store )
64
+ { }
53
65
54
- public StateStorageBridge ( string name , IGrainContext grainContext , IGrainStorage store , ILoggerFactory loggerFactory , IActivatorProvider activatorProvider )
66
+ public StateStorageBridge ( string name , IGrainContext grainContext , IGrainStorage store )
55
67
{
56
68
ArgumentNullException . ThrowIfNull ( name ) ;
57
69
ArgumentNullException . ThrowIfNull ( grainContext ) ;
58
70
ArgumentNullException . ThrowIfNull ( store ) ;
59
- ArgumentNullException . ThrowIfNull ( loggerFactory ) ;
60
- ArgumentNullException . ThrowIfNull ( activatorProvider ) ;
61
71
62
- _logger = loggerFactory . CreateLogger ( store . GetType ( ) ) ;
63
- _name = name ;
64
72
_grainContext = grainContext ;
65
- _store = store ;
66
- _activator = activatorProvider . GetActivator < TState > ( ) ;
73
+ var sharedInstances = ActivatorUtilities . GetServiceOrCreateInstance < StateStorageBridgeSharedMap > ( grainContext . ActivationServices ) ;
74
+ _shared = sharedInstances . Get < TState > ( name , store ) ;
67
75
}
68
76
69
77
/// <inheritdoc />
@@ -74,7 +82,8 @@ public async Task ReadStateAsync()
74
82
GrainRuntime . CheckRuntimeContext ( RuntimeContext . Current ) ;
75
83
76
84
var sw = ValueStopwatch . StartNew ( ) ;
77
- await _store . ReadStateAsync ( _name , _grainContext . GrainId , GrainState ) ;
85
+ await _shared . Store . ReadStateAsync ( _shared . Name , _grainContext . GrainId , GrainState ) ;
86
+ IsStateInitialized = true ;
78
87
StorageInstruments . OnStorageRead ( sw . Elapsed ) ;
79
88
}
80
89
catch ( Exception exc )
@@ -92,7 +101,7 @@ public async Task WriteStateAsync()
92
101
GrainRuntime . CheckRuntimeContext ( RuntimeContext . Current ) ;
93
102
94
103
var sw = ValueStopwatch . StartNew ( ) ;
95
- await _store . WriteStateAsync ( _name , _grainContext . GrainId , GrainState ) ;
104
+ await _shared . Store . WriteStateAsync ( _shared . Name , _grainContext . GrainId , GrainState ) ;
96
105
StorageInstruments . OnStorageWrite ( sw . Elapsed ) ;
97
106
}
98
107
catch ( Exception exc )
@@ -110,12 +119,13 @@ public async Task ClearStateAsync()
110
119
GrainRuntime . CheckRuntimeContext ( RuntimeContext . Current ) ;
111
120
112
121
var sw = ValueStopwatch . StartNew ( ) ;
122
+
113
123
// Clear (most likely Delete) state from external storage
114
- await _store . ClearStateAsync ( _name , _grainContext . GrainId , GrainState ) ;
124
+ await _shared . Store . ClearStateAsync ( _shared . Name , _grainContext . GrainId , GrainState ) ;
115
125
sw . Stop ( ) ;
116
126
117
127
// Reset the in-memory copy of the state
118
- GrainState . State = _activator . Create ( ) ;
128
+ GrainState . State = _shared . Activator . Create ( ) ;
119
129
120
130
// Update counters
121
131
StorageInstruments . OnStorageDelete ( sw . Elapsed ) ;
@@ -131,11 +141,11 @@ public void OnDehydrate(IDehydrationContext dehydrationContext)
131
141
{
132
142
try
133
143
{
134
- dehydrationContext . TryAddValue ( $ "state. { _name } " , _grainState ) ;
144
+ dehydrationContext . TryAddValue ( _shared . MigrationContextKey , _grainState ) ;
135
145
}
136
146
catch ( Exception exception )
137
147
{
138
- _logger . LogError ( exception , "Failed to dehydrate state named {StateName} for grain {GrainId}" , _name , _grainContext . GrainId ) ;
148
+ _shared . Logger . LogError ( exception , "Failed to dehydrate state named {StateName} for grain {GrainId}" , _shared . Name , _grainContext . GrainId ) ;
139
149
140
150
// We must throw here since we do not know that the dehydration context is in a clean state after this.
141
151
throw ;
@@ -146,34 +156,66 @@ public void OnRehydrate(IRehydrationContext rehydrationContext)
146
156
{
147
157
try
148
158
{
149
- rehydrationContext . TryGetValue ( $ "state.{ _name } ", out _grainState ) ;
159
+ if ( rehydrationContext . TryGetValue < GrainState < TState > > ( _shared . MigrationContextKey , out var grainState ) )
160
+ {
161
+ _grainState = grainState ;
162
+ IsStateInitialized = true ;
163
+ }
150
164
}
151
165
catch ( Exception exception )
152
166
{
153
167
// It is ok to swallow this exception, since state rehydration is best-effort.
154
- _logger . LogError ( exception , "Failed to rehydrate state named {StateName} for grain {GrainId}" , _name , _grainContext . GrainId ) ;
168
+ _shared . Logger . LogError ( exception , "Failed to rehydrate state named {StateName} for grain {GrainId}" , _shared . Name , _grainContext . GrainId ) ;
155
169
}
156
170
}
157
171
158
172
[ DoesNotReturn ]
159
173
private void OnError ( Exception exception , ErrorCode id , string operation )
160
174
{
161
175
string ? errorCode = null ;
162
- ( _store as IRestExceptionDecoder ) ? . DecodeException ( exception , out _ , out errorCode , true ) ;
176
+ ( _shared . Store as IRestExceptionDecoder ) ? . DecodeException ( exception , out _ , out errorCode , true ) ;
163
177
var errorString = errorCode is { Length : > 0 } ? $ " Error: { errorCode } " : null ;
164
178
165
179
var grainId = _grainContext . GrainId ;
166
- var providerName = _store . GetType ( ) . Name ;
167
- _logger . LogError ( ( int ) id , exception , "Error from storage provider {ProviderName}.{StateName} during {Operation} for grain {GrainId}{ErrorCode}" , providerName , _name , operation , grainId , errorString ) ;
180
+ var providerName = _shared . Store . GetType ( ) . Name ;
181
+ _shared . Logger . LogError ( ( int ) id , exception , "Error from storage provider {ProviderName}.{StateName} during {Operation} for grain {GrainId}{ErrorCode}" , providerName , _shared . Name , operation , grainId , errorString ) ;
168
182
169
183
// If error is not specialization of OrleansException, wrap it
170
184
if ( exception is not OrleansException )
171
185
{
172
- var errMsg = $ "Error from storage provider { providerName } .{ _name } during { operation } for grain { grainId } { errorString } { Environment . NewLine } { LogFormatter . PrintException ( exception ) } ";
186
+ var errMsg = $ "Error from storage provider { providerName } .{ _shared . Name } during { operation } for grain { grainId } { errorString } { Environment . NewLine } { LogFormatter . PrintException ( exception ) } ";
173
187
throw new OrleansException ( errMsg , exception ) ;
174
188
}
175
189
176
190
ExceptionDispatchInfo . Throw ( exception ) ;
177
191
}
178
192
}
193
+
194
+ internal sealed class StateStorageBridgeSharedMap ( ILoggerFactory loggerFactory , IActivatorProvider activatorProvider )
195
+ {
196
+ private readonly ConcurrentDictionary < ( string Name , IGrainStorage Store , Type StateType ) , object > _instances = new ( ) ;
197
+ private readonly ILoggerFactory _loggerFactory = loggerFactory ;
198
+ private readonly IActivatorProvider _activatorProvider = activatorProvider ;
199
+
200
+ public StateStorageBridgeShared < TState > Get < TState > ( string name , IGrainStorage store )
201
+ => ( StateStorageBridgeShared < TState > ) _instances . GetOrAdd (
202
+ ( name , store , typeof ( TState ) ) ,
203
+ static ( key , self ) => new StateStorageBridgeShared < TState > (
204
+ key . Name ,
205
+ key . Store ,
206
+ self . _loggerFactory . CreateLogger ( key . Store . GetType ( ) ) ,
207
+ self . _activatorProvider . GetActivator < TState > ( ) ) ,
208
+ this ) ;
209
+ }
210
+
211
+ internal sealed class StateStorageBridgeShared < TState > ( string name , IGrainStorage store , ILogger logger , IActivator < TState > activator )
212
+ {
213
+ private string ? _migrationContextKey ;
214
+
215
+ public readonly string Name = name ;
216
+ public readonly IGrainStorage Store = store ;
217
+ public readonly ILogger Logger = logger ;
218
+ public readonly IActivator < TState > Activator = activator ;
219
+ public string MigrationContextKey => _migrationContextKey ??= $ "state.{ Name } ";
220
+ }
179
221
}
0 commit comments