-
-
Notifications
You must be signed in to change notification settings - Fork 806
Expand file tree
/
Copy pathPostgresOutboxIntegrationTests.cs
More file actions
438 lines (368 loc) · 16.7 KB
/
Copy pathPostgresOutboxIntegrationTests.cs
File metadata and controls
438 lines (368 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
using System.Collections.Concurrent;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Mocha.EntityFrameworkCore.Postgres.Tests.Helpers;
using Mocha.Outbox;
using Mocha.Transport.InMemory;
namespace Mocha.EntityFrameworkCore.Postgres.Tests;
public sealed class PostgresOutboxIntegrationTests(PostgresFixture fixture) : IClassFixture<PostgresFixture>
{
private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(30);
[Fact]
public async Task Outbox_Should_DeliverMessage_When_EventPublished()
{
// Arrange
var recorder = new MessageRecorder();
await using var env = await CreateBusWithOutboxAsync(recorder);
using var scope = env.Provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
// Act
await bus.PublishAsync(new TestEvent { Payload = "hello" }, TestContext.Current.CancellationToken);
// Assert
Assert.True(await recorder.WaitAsync(s_timeout), "Handler should have received the message");
var received = Assert.Single(recorder.Messages.OfType<TestEvent>());
Assert.Equal("hello", received.Payload);
}
[Fact]
public async Task Outbox_Should_DeliverAllMessages_When_MultipleEventsPublished()
{
// Arrange
const int count = 5;
var recorder = new MessageRecorder();
await using var env = await CreateBusWithOutboxAsync(recorder);
using var scope = env.Provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
// Act
for (var i = 0; i < count; i++)
{
await bus.PublishAsync(new TestEvent { Payload = $"msg-{i}" }, TestContext.Current.CancellationToken);
}
// Assert
Assert.True(await recorder.WaitAsync(s_timeout, count), $"Handler should have received all {count} messages");
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).OrderBy(p => p).ToList();
Assert.Equal(count, payloads.Count);
for (var i = 0; i < count; i++)
{
Assert.Contains($"msg-{i}", payloads);
}
}
[Fact]
public async Task Outbox_Should_DeliverMessages_When_PublishedUnderLoad()
{
// Arrange
const int count = 50;
var recorder = new MessageRecorder();
await using var env = await CreateBusWithOutboxAsync(recorder);
// Act - publish concurrently from separate scopes
var tasks = Enumerable
.Range(0, count)
.Select(async i =>
{
using var scope = env.Provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new TestEvent { Payload = $"load-{i}" }, default);
});
await Task.WhenAll(tasks);
// Assert
Assert.True(
await recorder.WaitAsync(s_timeout, count),
$"Handler should have received all {count} messages under load");
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).ToHashSet();
Assert.Equal(count, payloads.Count);
for (var i = 0; i < count; i++)
{
Assert.Contains($"load-{i}", payloads);
}
}
[Fact]
public async Task Outbox_Should_ProcessPendingMessages_When_WorkerStartsAfterPersist()
{
// Arrange - persist messages before the worker starts
const int count = 3;
var connectionString = await fixture.CreateDatabaseAsync();
var recorder = new MessageRecorder();
// Phase 1: build bus but don't start the hosted services (worker)
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
services.AddDbContext<TestDbContext>(o => o.UseTestNpgsql(connectionString));
services.AddSingleton<IOutboxSignal, ResilientOutboxSignal>();
var builder = services.AddMessageBus();
builder.AddEntityFramework<TestDbContext>(ef => ef.UsePostgresOutbox());
builder.AddEventHandler<TestEventHandler>();
builder.AddInMemory();
var provider = services.BuildServiceProvider();
var runtime = (MessagingRuntime)provider.GetRequiredService<IMessagingRuntime>();
await runtime.StartAsync(TestContext.Current.CancellationToken);
// Ensure schema exists
using (var scope = provider.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<TestDbContext>();
await db.Database.EnsureCreatedAsync(TestContext.Current.CancellationToken);
}
// Persist messages via IMessageBus (outbox captures them)
for (var i = 0; i < count; i++)
{
using var scope = provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new TestEvent { Payload = $"pending-{i}" }, TestContext.Current.CancellationToken);
}
// Verify that the messages are persisted
using (var scope = provider.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<TestDbContext>();
var messages = await db.Set<OutboxMessage>()!.ToListAsync(TestContext.Current.CancellationToken);
Assert.Equal(count, messages.Count);
}
// Phase 2: start the outbox worker (hosted services)
var hostedServices = provider.GetServices<IHostedService>().ToList();
foreach (var svc in hostedServices)
{
await svc.StartAsync(TestContext.Current.CancellationToken);
}
try
{
// Assert - all pre-existing messages are processed
Assert.True(
await recorder.WaitAsync(s_timeout, count),
"Worker should process messages that were persisted before it started");
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).ToHashSet();
Assert.Equal(count, payloads.Count);
}
finally
{
foreach (var svc in hostedServices)
{
await svc.StopAsync(TestContext.Current.CancellationToken);
}
// Allow in-flight processor transactions to drain (see TestEnvironment comment)
await Task.Delay(250, TestContext.Current.CancellationToken);
await provider.DisposeAsync();
}
}
[Fact]
public async Task Outbox_Should_ResumeProcessing_When_WorkerRestartedAfterInterruption()
{
// Arrange
var connectionString = await fixture.CreateDatabaseAsync();
var recorder = new MessageRecorder();
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
services.AddDbContext<TestDbContext>(o => o.UseTestNpgsql(connectionString));
services.AddSingleton<IOutboxSignal, ResilientOutboxSignal>();
var builder = services.AddMessageBus();
builder.AddEntityFramework<TestDbContext>(ef => ef.UsePostgresOutbox());
builder.AddEventHandler<TestEventHandler>();
builder.AddInMemory();
var provider = services.BuildServiceProvider();
var runtime = (MessagingRuntime)provider.GetRequiredService<IMessagingRuntime>();
await runtime.StartAsync(TestContext.Current.CancellationToken);
using (var scope = provider.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<TestDbContext>();
await db.Database.EnsureCreatedAsync(TestContext.Current.CancellationToken);
}
var hostedServices = provider.GetServices<IHostedService>().ToList();
foreach (var svc in hostedServices)
{
await svc.StartAsync(TestContext.Current.CancellationToken);
}
try
{
// Phase 1: publish and let worker process
using (var scope = provider.CreateScope())
{
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(
new TestEvent { Payload = "before-stop" },
TestContext.Current.CancellationToken);
}
Assert.True(await recorder.WaitAsync(s_timeout), "First message should be delivered before worker stops");
// Phase 2: stop worker
foreach (var svc in hostedServices)
{
await svc.StopAsync(TestContext.Current.CancellationToken);
}
// Allow the background loop to fully drain before publishing.
// ContinuousTask.DisposeAsync cancels but does not await the task.
await Task.Delay(500, TestContext.Current.CancellationToken);
// Phase 3: publish more messages while worker is stopped
using (var scope = provider.CreateScope())
{
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(
new TestEvent { Payload = "during-stop" },
TestContext.Current.CancellationToken);
}
// Phase 4: restart worker
foreach (var svc in hostedServices)
{
await svc.StartAsync(TestContext.Current.CancellationToken);
}
// Assert - message published during downtime is delivered.
// Wait until "during-stop" appears in the recorder's messages.
using var waitCts = new CancellationTokenSource(s_timeout);
while (!recorder.Messages.OfType<TestEvent>().Any(e => e.Payload == "during-stop"))
{
await Task.Delay(50, waitCts.Token);
}
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).ToHashSet();
Assert.Contains("before-stop", payloads);
Assert.Contains("during-stop", payloads);
}
finally
{
foreach (var svc in hostedServices)
{
await svc.StopAsync(TestContext.Current.CancellationToken);
}
// Allow in-flight processor transactions to drain (see TestEnvironment comment)
await Task.Delay(250, TestContext.Current.CancellationToken);
await provider.DisposeAsync();
}
}
[Fact]
public async Task Outbox_Should_ProcessNewMessages_When_PublishedWhileWorkerRunning()
{
// Arrange
var recorder = new MessageRecorder();
await using var env = await CreateBusWithOutboxAsync(recorder);
// Act - publish messages at intervals while worker is running
for (var i = 0; i < 5; i++)
{
using var scope = env.Provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new TestEvent { Payload = $"live-{i}" }, TestContext.Current.CancellationToken);
// Wait for this message to be delivered before publishing the next
Assert.True(
await recorder.WaitAsync(s_timeout),
$"Message live-{i} should be delivered while worker is running");
}
// Assert
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).ToHashSet();
Assert.Equal(5, payloads.Count);
}
[Fact]
public async Task Outbox_Should_HandleConcurrentPublishers_When_MultipleScopes()
{
// Arrange
const int scopeCount = 10;
const int messagesPerScope = 5;
const int totalMessages = scopeCount * messagesPerScope;
var recorder = new MessageRecorder();
await using var env = await CreateBusWithOutboxAsync(recorder);
// Act - multiple scopes publishing simultaneously
var tasks = Enumerable
.Range(0, scopeCount)
.Select(async scopeIndex =>
{
for (var i = 0; i < messagesPerScope; i++)
{
using var scope = env.Provider.CreateScope();
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(
new TestEvent { Payload = $"scope-{scopeIndex}-msg-{i}" },
default);
}
});
await Task.WhenAll(tasks);
// Assert
Assert.True(
await recorder.WaitAsync(s_timeout, totalMessages),
$"All {totalMessages} messages from {scopeCount} scopes should be delivered");
var payloads = recorder.Messages.OfType<TestEvent>().Select(e => e.Payload).ToHashSet();
Assert.Equal(totalMessages, payloads.Count);
}
private async Task<TestEnvironment> CreateBusWithOutboxAsync(MessageRecorder recorder)
{
var connectionString = await fixture.CreateDatabaseAsync();
var services = new ServiceCollection();
services.AddSingleton(recorder);
services.AddLogging();
services.AddDbContext<TestDbContext>(o => o.UseTestNpgsql(connectionString));
// Register the resilient signal BEFORE UsePostgresOutbox() so that
// TryAddSingleton<IOutboxSignal> in AddOutboxCore() is a no-op.
// This prevents ObjectDisposedException during teardown when the
// outbox processor's own transaction commits fire the interceptor.
services.AddSingleton<IOutboxSignal, ResilientOutboxSignal>();
var builder = services.AddMessageBus();
builder.AddEntityFramework<TestDbContext>(ef => ef.UsePostgresOutbox());
builder.AddEventHandler<TestEventHandler>();
builder.AddInMemory();
var provider = services.BuildServiceProvider();
var runtime = (MessagingRuntime)provider.GetRequiredService<IMessagingRuntime>();
await runtime.StartAsync(default);
// Ensure schema exists
using (var scope = provider.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<TestDbContext>();
await db.Database.EnsureCreatedAsync(default);
}
// Start hosted services (outbox worker)
var hostedServices = provider.GetServices<IHostedService>().ToList();
foreach (var svc in hostedServices)
{
await svc.StartAsync(default);
}
return new TestEnvironment(provider, hostedServices);
}
// ══════════════════════════════════════════════════════════════════════
// Test types
// ══════════════════════════════════════════════════════════════════════
public sealed class TestEvent
{
public required string Payload { get; init; }
}
public sealed class TestEventHandler(MessageRecorder recorder) : IEventHandler<TestEvent>
{
public ValueTask HandleAsync(TestEvent message, CancellationToken cancellationToken)
{
recorder.Record(message);
return default;
}
}
public sealed class MessageRecorder
{
private readonly SemaphoreSlim _semaphore = new(0);
public ConcurrentBag<object> Messages { get; } = [];
public void Record(object message)
{
Messages.Add(message);
_semaphore.Release();
}
public async Task<bool> WaitAsync(TimeSpan timeout, int expectedCount = 1)
{
for (var i = 0; i < expectedCount; i++)
{
if (!await _semaphore.WaitAsync(timeout))
{
return false;
}
}
return true;
}
}
/// <summary>
/// Ensures hosted services are stopped before the provider is disposed,
/// preventing ObjectDisposedException from background tasks.
/// </summary>
private sealed class TestEnvironment(ServiceProvider provider, List<IHostedService> hostedServices)
: IAsyncDisposable
{
public ServiceProvider Provider => provider;
public async ValueTask DisposeAsync()
{
foreach (var svc in hostedServices)
{
await svc.StopAsync(default);
}
// ContinuousTask.DisposeAsync cancels but doesn't await the background
// loop, so in-flight processor transactions may still be committing.
// Allow them to drain before disposing the provider's singletons.
await Task.Delay(250);
await provider.DisposeAsync();
}
}
}