From d53af7213d3fb73eab42c624fb1806d5db5a0727 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 29 Jan 2025 14:41:52 -0600 Subject: [PATCH] DLQ service improvements for CritterWatch support. Closes GH-764. Closes GH-1243. Closes GH-1244. Closes GH-1245. Closes GH-1248 --- .../deadletter_admin_compliance.cs | 36 ++++ .../MessageDatabase.DeadLetterAdminService.cs | 32 ++- .../Persistence/SqlServerMessageStore.cs | 15 +- .../DeadLetterAdminCompliance.cs | 195 +++++++++--------- 4 files changed, 173 insertions(+), 105 deletions(-) create mode 100644 src/Persistence/SqlServerTests/Persistence/deadletter_admin_compliance.cs diff --git a/src/Persistence/SqlServerTests/Persistence/deadletter_admin_compliance.cs b/src/Persistence/SqlServerTests/Persistence/deadletter_admin_compliance.cs new file mode 100644 index 000000000..d25d2b28e --- /dev/null +++ b/src/Persistence/SqlServerTests/Persistence/deadletter_admin_compliance.cs @@ -0,0 +1,36 @@ +using IntegrationTests; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.Persistence.Durability; +using Wolverine.RDBMS; +using Wolverine.SqlServer; +using Xunit.Abstractions; + +namespace SqlServerTests.Persistence; + +public class deadletter_admin_compliance : DeadLetterAdminCompliance +{ + public deadletter_admin_compliance(ITestOutputHelper output) : base(output) + { + } + + public override async Task BuildCleanHost() + { + var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "dlq5"); + + // This setting changes the internal message storage identity + opts.Durability.DeadLetterQueueExpirationEnabled = true; + }) + .StartAsync(); + + var persistence = (IMessageDatabase)host.Services.GetRequiredService(); + await persistence.Admin.ClearAllAsync(); + + return host; + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs index 0245a4a07..b9477cb97 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.DeadLetterAdminService.cs @@ -65,13 +65,19 @@ public Task> SummarizeByDatabaseAsync(string { return SummarizeAllAsync(serviceName, range, token); } + + protected virtual string toTopClause(DeadLetterEnvelopeQuery query) + { + return ""; + } public async Task QueryAsync(DeadLetterEnvelopeQuery query, CancellationToken token) { var builder = ToCommandBuilder(); + + var topSelect = toTopClause(query); - // TODO -- not sure the total works for both databases - builder.Append($"select {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); + builder.Append($"select{topSelect} {DatabaseConstants.DeadLetterFields}, count(*) OVER() as total_rows from {SchemaName}.{DatabaseConstants.DeadLetterTable} where 1 = 1"); writeDeadLetterWhereClause(query, builder); @@ -163,11 +169,18 @@ public Task DiscardAsync(DeadLetterEnvelopeQuery query, CancellationToken token) public Task ReplayAsync(DeadLetterEnvelopeQuery query, CancellationToken token) { var builder = ToCommandBuilder(); - - builder.Append($"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = true where 1 = 1"); + + builder.Append( + $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append(" where 1 = 1"); writeDeadLetterWhereClause(query, builder); builder.Append(';'); - + builder.Append( + $"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable} where {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append(';'); + return executeCommandBatch(builder, token); } @@ -193,9 +206,16 @@ public Task ReplayAsync(MessageBatchRequest request, CancellationToken token) foreach (var id in request.Ids) { - builder.Append($"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = true where {DatabaseConstants.Id} = "); + builder.Append( + $"update {SchemaName}.{DatabaseConstants.DeadLetterTable} set {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append($" where {DatabaseConstants.Id} = "); builder.AppendParameter(id); builder.Append(';'); + builder.Append( + $"delete from {SchemaName}.{DatabaseConstants.DeadLetterTable} where {DatabaseConstants.Replayable} = "); + builder.AppendParameter(true); + builder.Append(';'); } new MoveReplayableErrorMessagesToIncomingOperation(this).ConfigureCommand(builder); diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index e3363797c..5a568a2a7 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -8,6 +8,7 @@ using Weasel.SqlServer; using Weasel.SqlServer.Tables; using Wolverine.Logging; +using Wolverine.Persistence.Durability; using Wolverine.RDBMS; using Wolverine.RDBMS.Sagas; using Wolverine.RDBMS.Transport; @@ -64,6 +65,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex) protected override void writePagingAfter(DbCommandBuilder builder, int offset, int limit) { + if (offset == 0) return; + if (offset > 0) { builder.Append(" OFFSET "); @@ -73,12 +76,22 @@ protected override void writePagingAfter(DbCommandBuilder builder, int offset, i if (limit > 0) { - builder.Append("FETCH NEXT "); + builder.Append(" FETCH NEXT "); builder.AppendParameter(limit); builder.Append(" ROWS ONLY"); } } + protected override string toTopClause(DeadLetterEnvelopeQuery query) + { + if (query.PageSize > 0 && query.PageNumber <= 1) + { + return $" top {query.PageSize}"; + } + + return string.Empty; + } + public override async Task FetchCountsAsync() { var counts = new PersistedCounts(); diff --git a/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs b/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs index dce976568..b379f3e86 100644 --- a/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/DeadLetterAdminCompliance.cs @@ -1,14 +1,10 @@ -using System.Diagnostics; -using Castle.Components.DictionaryAdapter; using JasperFx.Core.Reflection; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using NSubstitute.Core; using Oakton.Resources; using Shouldly; using Wolverine.Persistence; using Wolverine.Persistence.Durability; -using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Util; using Xunit; @@ -19,31 +15,30 @@ namespace Wolverine.ComplianceTests; public abstract class DeadLetterAdminCompliance : IAsyncLifetime { protected const string ServiceName = "Service1"; - private readonly ITestOutputHelper _output; - - protected DeadLetterAdminCompliance(ITestOutputHelper output) - { - _output = output; - } private static readonly string[] _colors = ["Red", "Blue", "Orange", "Yellow", "Purple", "Green", "Black", "White", "Gray", "Pink"]; - - public IHost theHost { get; private set; } - protected IMessageStore thePersistence; - protected IDeadLetterAdminService theDeadLetters; - protected EnvelopeGenerator theGenerator; + + private readonly ITestOutputHelper _output; + private DeadLetterEnvelopeResults allEnvelopes; + private DateTimeOffset EightHoursAgo; private DateTimeOffset FiveHoursAgo; private DateTimeOffset FourHoursAgo; - private DateTimeOffset SixHoursAgo; private DateTimeOffset SevenHoursAgo; - private DateTimeOffset EightHoursAgo; + private DateTimeOffset SixHoursAgo; + protected IDeadLetterAdminService theDeadLetters; + protected EnvelopeGenerator theGenerator; + protected IMessageStore thePersistence; private IReadOnlyList theSummaries; - private DeadLetterEnvelopeResults allEnvelopes; - public abstract Task BuildCleanHost(); - + protected DeadLetterAdminCompliance(ITestOutputHelper output) + { + _output = output; + } + + public IHost theHost { get; private set; } + public async Task InitializeAsync() { theHost = await BuildCleanHost(); @@ -55,7 +50,7 @@ public async Task InitializeAsync() theGenerator = new EnvelopeGenerator(); theGenerator.MessageSource = BuildRandomMessage; - + FourHoursAgo = DateTimeOffset.UtcNow.AddHours(-4); FiveHoursAgo = DateTimeOffset.UtcNow.AddHours(-5); SixHoursAgo = DateTimeOffset.UtcNow.AddHours(-6); @@ -69,6 +64,8 @@ public async Task DisposeAsync() theHost.Dispose(); } + public abstract Task BuildCleanHost(); + protected Task load(int count, DateTimeOffset startingTime) { theGenerator.StartingTime = startingTime; @@ -79,10 +76,7 @@ protected async Task theStoredDeadLettersAre(params EnvelopeGenerator[] generato { await thePersistence.Admin.RebuildAsync(); - foreach (var generator in generators) - { - await generator.WriteDeadLetters(thePersistence); - } + foreach (var generator in generators) await generator.WriteDeadLetters(thePersistence); } public static object BuildRandomMessage() @@ -109,7 +103,7 @@ private void withTargetMessage1() return new TargetMessage1(Guid.NewGuid(), number, color); }; } - + private void withTargetMessage2() { theGenerator.MessageSource = () => @@ -119,7 +113,7 @@ private void withTargetMessage2() return new TargetMessage2(Guid.NewGuid(), number, color); }; } - + private void withTargetMessage3() { theGenerator.MessageSource = () => @@ -129,7 +123,7 @@ private void withTargetMessage3() return new TargetMessage3(Guid.NewGuid(), number, color); }; } - + private async Task fetchSummary(TimeRange range) { theSummaries = await theDeadLetters.SummarizeAllAsync(ServiceName, range, CancellationToken.None); @@ -137,20 +131,16 @@ private async Task fetchSummary(TimeRange range) if (theSummaries.Any()) { _output.WriteLine("Summaries were:"); - foreach (var summary in theSummaries) - { - _output.WriteLine(summary.ToString()); - } + foreach (var summary in theSummaries) _output.WriteLine(summary.ToString()); } else { _output.WriteLine("No summaries were found!"); } - - } - private DeadLetterQueueCount summaryCount(int expected, Uri? receivedAt = null, string? databaseIdentifier = null) + private DeadLetterQueueCount summaryCount(int expected, Uri? receivedAt = null, + string? databaseIdentifier = null) { var uri = receivedAt ?? theGenerator.ReceivedAt; var messageType = typeof(TMessage).ToMessageTypeName(); @@ -160,10 +150,12 @@ private DeadLetterQueueCount summaryCount(int expected, Ur return new DeadLetterQueueCount(ServiceName, uri, messageType, exceptionType, databaseIdentifier, expected); } - + protected void noCountsFor() { - theSummaries.Any(x => x.MessageType == typeof(TMessage).ToMessageTypeName() && x.ExceptionType == typeof(TException).FullNameInCode()) + theSummaries.Any(x => + x.MessageType == typeof(TMessage).ToMessageTypeName() && + x.ExceptionType == typeof(TException).FullNameInCode()) .ShouldBeFalse(); } @@ -175,7 +167,7 @@ public async Task get_all_summaries_with_no_options() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -184,7 +176,7 @@ public async Task get_all_summaries_with_no_options() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, EightHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -196,13 +188,16 @@ public async Task get_all_summaries_with_no_options() await load(3, SevenHoursAgo); await fetchSummary(TimeRange.AllTime()); - - theSummaries.ShouldContain(summaryCount(20, TransportConstants.DurableLocalUri)); - theSummaries.ShouldContain(summaryCount(5, TransportConstants.DurableLocalUri)); - theSummaries.ShouldContain(summaryCount(12, TransportConstants.DurableLocalUri)); + + theSummaries.ShouldContain( + summaryCount(20, TransportConstants.DurableLocalUri)); + theSummaries.ShouldContain( + summaryCount(5, TransportConstants.DurableLocalUri)); + theSummaries.ShouldContain( + summaryCount(12, TransportConstants.DurableLocalUri)); theSummaries.ShouldContain(summaryCount(14, new Uri("local://one"))); } - + [Fact] public async Task get_summaries_after_a_time() { @@ -211,7 +206,7 @@ public async Task get_summaries_after_a_time() await load(5, FiveHoursAgo); await load(7, FourHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -220,7 +215,7 @@ public async Task get_summaries_after_a_time() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, EightHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -232,12 +227,15 @@ public async Task get_summaries_after_a_time() await load(3, SevenHoursAgo); await fetchSummary(new TimeRange(FiveHoursAgo, null)); - - theSummaries.ShouldContain(summaryCount(12, TransportConstants.DurableLocalUri)); - theSummaries.ShouldContain(summaryCount(3, TransportConstants.DurableLocalUri)); - theSummaries.ShouldContain(summaryCount(5, TransportConstants.DurableLocalUri)); + + theSummaries.ShouldContain( + summaryCount(12, TransportConstants.DurableLocalUri)); + theSummaries.ShouldContain( + summaryCount(3, TransportConstants.DurableLocalUri)); + theSummaries.ShouldContain( + summaryCount(5, TransportConstants.DurableLocalUri)); } - + [Fact] public async Task get_summaries_before_a_time() { @@ -246,7 +244,7 @@ public async Task get_summaries_before_a_time() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -255,7 +253,7 @@ public async Task get_summaries_before_a_time() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -267,15 +265,15 @@ public async Task get_summaries_before_a_time() await load(3, SevenHoursAgo); await fetchSummary(new TimeRange(null, SixHoursAgo.AddMinutes(-1))); - + noCountsFor(); - - theSummaries.ShouldContain(summaryCount(8, TransportConstants.DurableLocalUri)); + + theSummaries.ShouldContain( + summaryCount(8, TransportConstants.DurableLocalUri)); theSummaries.ShouldContain(summaryCount(14, new Uri("local://one"))); } - [Fact] public async Task get_summaries_within_a_time_range() { @@ -285,16 +283,17 @@ public async Task get_summaries_within_a_time_range() await load(7, SixHoursAgo); await load(8, SevenHoursAgo); await load(11, EightHoursAgo); - + await fetchSummary(new TimeRange(SevenHoursAgo, SixHoursAgo.AddMinutes(-1))); - + theSummaries.ShouldContain(summaryCount(8)); } - + protected async Task loadAllEnvelopes() { allEnvelopes = - await theDeadLetters.QueryAsync(new DeadLetterEnvelopeQuery(TimeRange.AllTime()){PageSize = 1000}, CancellationToken.None); + await theDeadLetters.QueryAsync(new DeadLetterEnvelopeQuery(TimeRange.AllTime()) { PageSize = 0 }, + CancellationToken.None); } protected async Task queryMatches(DeadLetterEnvelopeQuery query, Func filter) @@ -302,9 +301,9 @@ protected async Task queryMatches(DeadLetterEnvelopeQuery query, Func x.Id).ToList(); - + //actual.TotalCount.ShouldBe(expected.Count); - + actual.Envelopes.Select(x => x.Id).OrderBy(x => x).ToArray() .ShouldBe(expected.Select(x => x.Id).OrderBy(x => x).ToArray()); } @@ -317,7 +316,7 @@ public async Task query_for_envelopes_big_options() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -327,7 +326,7 @@ public async Task query_for_envelopes_big_options() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -337,12 +336,12 @@ public async Task query_for_envelopes_big_options() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -350,8 +349,10 @@ public async Task query_for_envelopes_big_options() await loadAllEnvelopes(); await queryMatches(new DeadLetterEnvelopeQuery(new TimeRange(SixHoursAgo, null)), e => e.SentAt >= SixHoursAgo); - await queryMatches(new DeadLetterEnvelopeQuery(new TimeRange(null, SevenHoursAgo)), e => e.SentAt <= SevenHoursAgo); - await queryMatches(new DeadLetterEnvelopeQuery(new TimeRange(SixHoursAgo, SevenHoursAgo)), e => e.SentAt >= SixHoursAgo && e.SentAt <= SevenHoursAgo); + await queryMatches(new DeadLetterEnvelopeQuery(new TimeRange(null, SevenHoursAgo)), + e => e.SentAt <= SevenHoursAgo); + await queryMatches(new DeadLetterEnvelopeQuery(new TimeRange(SixHoursAgo, SevenHoursAgo)), + e => e.SentAt >= SixHoursAgo && e.SentAt <= SevenHoursAgo); await queryMatches( new DeadLetterEnvelopeQuery(TimeRange.AllTime()) @@ -376,7 +377,7 @@ public async Task paging_and_totals() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -386,7 +387,7 @@ public async Task paging_and_totals() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -396,12 +397,12 @@ public async Task paging_and_totals() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -410,11 +411,11 @@ public async Task paging_and_totals() var firstPage = await theDeadLetters.QueryAsync( new DeadLetterEnvelopeQuery(TimeRange.AllTime()) { PageSize = 10, PageNumber = 1 }, CancellationToken.None); - + firstPage.TotalCount.ShouldBe(allEnvelopes.TotalCount); firstPage.PageNumber.ShouldBe(1); firstPage.Envelopes.Count.ShouldBe(10); - + var secondPage = await theDeadLetters.QueryAsync( new DeadLetterEnvelopeQuery(TimeRange.AllTime()) { PageSize = 10, PageNumber = 2 }, CancellationToken.None); @@ -431,7 +432,7 @@ public async Task discard_by_query() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -441,7 +442,7 @@ public async Task discard_by_query() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -451,12 +452,12 @@ public async Task discard_by_query() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -469,9 +470,8 @@ await theDeadLetters.DiscardAsync( var results = await theDeadLetters.QueryAsync(query, CancellationToken.None); results.TotalCount.ShouldBe(0); results.Envelopes.Count.ShouldBe(0); - } - + [Fact] public async Task replay_by_query() { @@ -480,7 +480,7 @@ public async Task replay_by_query() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -490,7 +490,7 @@ public async Task replay_by_query() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -500,12 +500,12 @@ public async Task replay_by_query() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -518,7 +518,6 @@ await theDeadLetters.ReplayAsync( var results = await theDeadLetters.QueryAsync(query, CancellationToken.None); results.TotalCount.ShouldBe(0); results.Envelopes.Count.ShouldBe(0); - } [Fact] @@ -529,7 +528,7 @@ public async Task discard_by_message_batch() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -539,7 +538,7 @@ public async Task discard_by_message_batch() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -549,12 +548,12 @@ public async Task discard_by_message_batch() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -563,12 +562,12 @@ public async Task discard_by_message_batch() var ids = allEnvelopes.Envelopes.Take(10).Select(x => x.Id).ToArray(); await theDeadLetters.DiscardAsync(new MessageBatchRequest(ids), CancellationToken.None); - + // Reload await loadAllEnvelopes(); allEnvelopes.Envelopes.Where(x => ids.Contains(x.Id)).Any().ShouldBeFalse(); } - + [Fact] public async Task replay_by_message_batch() { @@ -577,7 +576,7 @@ public async Task replay_by_message_batch() await load(5, FiveHoursAgo); await load(7, SixHoursAgo); await load(8, SevenHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(3, FiveHoursAgo); await load(2, SixHoursAgo); @@ -587,7 +586,7 @@ public async Task replay_by_message_batch() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(10, FiveHoursAgo); await load(8, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(5, FiveHoursAgo); await load(7, SixHoursAgo); @@ -597,12 +596,12 @@ public async Task replay_by_message_batch() theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(11, EightHoursAgo); await load(3, SevenHoursAgo); - + withTargetMessage3(); theGenerator.ExceptionSource = msg => new BadImageFormatException(msg); await load(56, FiveHoursAgo); await load(45, FourHoursAgo); - + theGenerator.ExceptionSource = msg => new DivideByZeroException(msg); await load(10, FiveHoursAgo); await load(13, FourHoursAgo); @@ -616,10 +615,10 @@ public async Task replay_by_message_batch() await loadAllEnvelopes(); allEnvelopes.Envelopes.Where(x => ids.Contains(x.Id)).Any().ShouldBeFalse(); } - } public record TargetMessage1(Guid Id, int Number, string Color); + public record TargetMessage2(Guid Id, int Number, string Color); -public record TargetMessage3(Guid Id, int Number, string Color); +public record TargetMessage3(Guid Id, int Number, string Color); \ No newline at end of file