Skip to content

Commit

Permalink
Disables failure acks to external message table inputs. Closes GH-1218
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Jan 15, 2025
1 parent 235593c commit 8cbfdac
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/Persistence/PostgresqlTests/Transport/compliance_tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ await SenderIs(opts =>

opts.ListenToPostgresqlQueue("sender");
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});

await ReceiverIs(opts =>
{
opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, "durable", transportSchema:"durable");

opts.ListenToPostgresqlQueue("receiver").UseDurableInbox();

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});
}

Expand Down Expand Up @@ -60,6 +66,9 @@ await SenderIs(opts =>
opts.ListenToPostgresqlQueue("sender").BufferedInMemory();

#endregion

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();

});

Expand All @@ -69,6 +78,9 @@ await ReceiverIs(opts =>
.AutoProvision().AutoPurgeOnStartup().DisableInboxAndOutboxOnAll();

opts.ListenToPostgresqlQueue("receiver").BufferedInMemory();

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx.Core;
using Microsoft.AspNetCore.Mvc.ApiExplorer;
using Microsoft.Extensions.Hosting;
using Shouldly;
Expand Down Expand Up @@ -58,6 +59,7 @@ public async Task can_send_messages_end_to_end()
var session = await UI
.TrackActivity()
.AlsoTrack(Api)
.Timeout(30.Seconds())
.PublishMessageAndWaitAsync(new ApiRequest("Rey"));

// It's handled in UI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Weasel.Postgresql.Tables;
using Wolverine;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.ErrorHandling;
using Wolverine.Marten;
using Wolverine.Persistence.Durability;
using Wolverine.Postgresql;
Expand Down Expand Up @@ -272,4 +273,11 @@ public static void Handle(Message3 message)
Debug.WriteLine("Got a Message3");
}

}

public record BlowsUpMessage;

public static class BlowsUpMessageHandler
{
public static void Handle(BlowsUpMessage message) => throw new Exception("You stink!");
}
12 changes: 12 additions & 0 deletions src/Persistence/SqlServerTests/Transport/compliance_tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ await SenderIs(opts =>
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();

opts.Durability.Mode = DurabilityMode.Solo;

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});

await ReceiverIs(opts =>
Expand All @@ -34,6 +37,9 @@ await ReceiverIs(opts =>

opts.ListenToSqlServerQueue("receiver").UseDurableInbox();
opts.Durability.Mode = DurabilityMode.Solo;

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});
}

Expand Down Expand Up @@ -65,6 +71,9 @@ await SenderIs(opts =>
#endregion

opts.Durability.Mode = DurabilityMode.Solo;

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();

});

Expand All @@ -76,6 +85,9 @@ await ReceiverIs(opts =>
opts.ListenToSqlServerQueue("receiver").BufferedInMemory();

opts.Durability.Mode = DurabilityMode.Solo;

opts.Durability.ScheduledJobPollingTime = 250.Milliseconds();
opts.Durability.ScheduledJobFirstExecution = 0.Seconds();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ public async Task ReadResultsAsync(DbDataReader reader, IList<Exception> excepti

public IEnumerable<IAgentCommand> PostProcessingCommands()
{
if (_settings.Cancellation.IsCancellationRequested) yield break;

foreach (var incoming in _incoming)
{
var listener = _endpoints.FindListenerCircuit(incoming.Destination);
if (listener == null) continue; // This *might* happen during shutdown

if (listener.Status == ListeningStatus.Accepting)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Configuration;
using Wolverine.Transports;

namespace Wolverine.RDBMS.Transport;
Expand All @@ -22,6 +23,11 @@ protected override IEnumerable<ExternalMessageTable> endpoints()
return Tables;
}

public override Endpoint? ReplyEndpoint()
{
return null;
}

protected override ExternalMessageTable findEndpointByUri(Uri uri)
{
if (uri.Scheme != Protocol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,9 @@ public async Task can_request_reply()
{
var request = new Request { Name = "Nick Bolton" };

var (session, response) = await theSender.TrackActivity(Fixture.DefaultTimeout)
var (session, response) = await theSender.TrackActivity()
.AlsoTrack(theReceiver)
.Timeout(30.Seconds())
.InvokeAndWaitAsync<Response>(request);

response.Name.ShouldBe(request.Name);
Expand Down
4 changes: 3 additions & 1 deletion src/Wolverine/ErrorHandling/MoveToErrorQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle,
IWolverineRuntime runtime,
DateTimeOffset now, Activity? activity)
{
if (lifecycle.Envelope.Destination.Scheme != TransportConstants.Local)
// TODO -- at some point, we need a more systematic way of doing this
var scheme = lifecycle.Envelope.Destination.Scheme;
if (scheme != TransportConstants.Local && scheme != "external-table")
{
await lifecycle.SendFailureAcknowledgementAsync(
$"Moved message {lifecycle.Envelope!.Id} to the Error Queue.\n{Exception}");
Expand Down
4 changes: 4 additions & 0 deletions src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ public async ValueTask SendFailureAcknowledgementAsync(string failureDescription
{
await envelope.StoreAndForwardAsync();
}
catch (NotSupportedException)
{
// I don't like this, but if this happens, then it should never have been routed to the failure ack
}
catch (Exception e)
{
// Should never happen, but still.
Expand Down

0 comments on commit 8cbfdac

Please sign in to comment.