diff --git a/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs b/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs index c5f6fe3f8..dccd4bae5 100644 --- a/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs +++ b/src/Persistence/PostgresqlTests/Transport/compliance_tests.cs @@ -23,6 +23,9 @@ await SenderIs(opts => opts.ListenToPostgresqlQueue("sender"); opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); await ReceiverIs(opts => @@ -30,6 +33,9 @@ await ReceiverIs(opts => opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, "durable", transportSchema:"durable"); opts.ListenToPostgresqlQueue("receiver").UseDurableInbox(); + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); } @@ -60,6 +66,9 @@ await SenderIs(opts => opts.ListenToPostgresqlQueue("sender").BufferedInMemory(); #endregion + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); @@ -69,6 +78,9 @@ await ReceiverIs(opts => .AutoProvision().AutoPurgeOnStartup().DisableInboxAndOutboxOnAll(); opts.ListenToPostgresqlQueue("receiver").BufferedInMemory(); + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); } diff --git a/src/Persistence/PostgresqlTests/Transport/end_to_end_from_scratch.cs b/src/Persistence/PostgresqlTests/Transport/end_to_end_from_scratch.cs index 37332cc9e..9b01f9578 100644 --- a/src/Persistence/PostgresqlTests/Transport/end_to_end_from_scratch.cs +++ b/src/Persistence/PostgresqlTests/Transport/end_to_end_from_scratch.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using IntegrationTests; +using JasperFx.Core; using Microsoft.AspNetCore.Mvc.ApiExplorer; using Microsoft.Extensions.Hosting; using Shouldly; @@ -58,6 +59,7 @@ public async Task can_send_messages_end_to_end() var session = await UI .TrackActivity() .AlsoTrack(Api) + .Timeout(30.Seconds()) .PublishMessageAndWaitAsync(new ApiRequest("Rey")); // It's handled in UI diff --git a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs index 0f2378cee..32f644233 100644 --- a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs +++ b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs @@ -13,6 +13,7 @@ using Weasel.Postgresql.Tables; using Wolverine; using Wolverine.ComplianceTests.Compliance; +using Wolverine.ErrorHandling; using Wolverine.Marten; using Wolverine.Persistence.Durability; using Wolverine.Postgresql; @@ -272,4 +273,11 @@ public static void Handle(Message3 message) Debug.WriteLine("Got a Message3"); } +} + +public record BlowsUpMessage; + +public static class BlowsUpMessageHandler +{ + public static void Handle(BlowsUpMessage message) => throw new Exception("You stink!"); } \ No newline at end of file diff --git a/src/Persistence/SqlServerTests/Transport/compliance_tests.cs b/src/Persistence/SqlServerTests/Transport/compliance_tests.cs index 897cbc850..615066102 100644 --- a/src/Persistence/SqlServerTests/Transport/compliance_tests.cs +++ b/src/Persistence/SqlServerTests/Transport/compliance_tests.cs @@ -26,6 +26,9 @@ await SenderIs(opts => opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); opts.Durability.Mode = DurabilityMode.Solo; + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); await ReceiverIs(opts => @@ -34,6 +37,9 @@ await ReceiverIs(opts => opts.ListenToSqlServerQueue("receiver").UseDurableInbox(); opts.Durability.Mode = DurabilityMode.Solo; + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); } @@ -65,6 +71,9 @@ await SenderIs(opts => #endregion opts.Durability.Mode = DurabilityMode.Solo; + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); @@ -76,6 +85,9 @@ await ReceiverIs(opts => opts.ListenToSqlServerQueue("receiver").BufferedInMemory(); opts.Durability.Mode = DurabilityMode.Solo; + + opts.Durability.ScheduledJobPollingTime = 250.Milliseconds(); + opts.Durability.ScheduledJobFirstExecution = 0.Seconds(); }); } diff --git a/src/Persistence/Wolverine.RDBMS/Durability/CheckRecoverableIncomingMessagesOperation.cs b/src/Persistence/Wolverine.RDBMS/Durability/CheckRecoverableIncomingMessagesOperation.cs index 421d40967..36c8f27be 100644 --- a/src/Persistence/Wolverine.RDBMS/Durability/CheckRecoverableIncomingMessagesOperation.cs +++ b/src/Persistence/Wolverine.RDBMS/Durability/CheckRecoverableIncomingMessagesOperation.cs @@ -50,9 +50,12 @@ public async Task ReadResultsAsync(DbDataReader reader, IList excepti public IEnumerable PostProcessingCommands() { + if (_settings.Cancellation.IsCancellationRequested) yield break; + foreach (var incoming in _incoming) { var listener = _endpoints.FindListenerCircuit(incoming.Destination); + if (listener == null) continue; // This *might* happen during shutdown if (listener.Status == ListeningStatus.Accepting) { diff --git a/src/Persistence/Wolverine.RDBMS/Transport/ExternalDbTransport.cs b/src/Persistence/Wolverine.RDBMS/Transport/ExternalDbTransport.cs index 0086269d6..5d9f16a1e 100644 --- a/src/Persistence/Wolverine.RDBMS/Transport/ExternalDbTransport.cs +++ b/src/Persistence/Wolverine.RDBMS/Transport/ExternalDbTransport.cs @@ -1,6 +1,7 @@ using JasperFx.Core; using Microsoft.Extensions.Logging; using Weasel.Core; +using Wolverine.Configuration; using Wolverine.Transports; namespace Wolverine.RDBMS.Transport; @@ -22,6 +23,11 @@ protected override IEnumerable endpoints() return Tables; } + public override Endpoint? ReplyEndpoint() + { + return null; + } + protected override ExternalMessageTable findEndpointByUri(Uri uri) { if (uri.Scheme != Protocol) diff --git a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs index 960959916..8ce5e806d 100644 --- a/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/Compliance/TransportCompliance.cs @@ -335,8 +335,9 @@ public async Task can_request_reply() { var request = new Request { Name = "Nick Bolton" }; - var (session, response) = await theSender.TrackActivity(Fixture.DefaultTimeout) + var (session, response) = await theSender.TrackActivity() .AlsoTrack(theReceiver) + .Timeout(30.Seconds()) .InvokeAndWaitAsync(request); response.Name.ShouldBe(request.Name); diff --git a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs index 932c3e4ec..b015089ea 100644 --- a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs +++ b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs @@ -29,7 +29,9 @@ public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, Activity? activity) { - if (lifecycle.Envelope.Destination.Scheme != TransportConstants.Local) + // TODO -- at some point, we need a more systematic way of doing this + var scheme = lifecycle.Envelope.Destination.Scheme; + if (scheme != TransportConstants.Local && scheme != "external-table") { await lifecycle.SendFailureAcknowledgementAsync( $"Moved message {lifecycle.Envelope!.Id} to the Error Queue.\n{Exception}"); diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index ad00fe642..6057a834c 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -238,6 +238,10 @@ public async ValueTask SendFailureAcknowledgementAsync(string failureDescription { await envelope.StoreAndForwardAsync(); } + catch (NotSupportedException) + { + // I don't like this, but if this happens, then it should never have been routed to the failure ack + } catch (Exception e) { // Should never happen, but still.