From 5289de1ad9028cdec86e40677bab2f5002be9964 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 16 Jan 2025 08:28:05 -0600 Subject: [PATCH] New IConfigureLocalQueue mechanism for fine tuning local queues even when using sticky handler assignments. Closes GH-1213 --- docs/guide/durability/postgresql.md | 2 +- docs/guide/durability/sqlserver.md | 2 +- docs/guide/handlers/sticky.md | 14 +- .../messaging/transports/external-tables.md | 2 +- docs/guide/messaging/transports/local.md | 56 ++++++++ .../Acceptance/configuring_local_queues.cs | 134 ++++++++++++++++++ .../Configuration/IConfigureLocalQueue.cs | 8 +- .../Runtime/Handlers/HandlerGraph.cs | 12 ++ .../Transports/Local/LocalTransport.cs | 50 +++++++ 9 files changed, 271 insertions(+), 9 deletions(-) create mode 100644 src/Testing/CoreTests/Acceptance/configuring_local_queues.cs diff --git a/docs/guide/durability/postgresql.md b/docs/guide/durability/postgresql.md index ce2d5507b..1278e4815 100644 --- a/docs/guide/durability/postgresql.md +++ b/docs/guide/durability/postgresql.md @@ -117,7 +117,7 @@ that they are utilizing the transactional inbox and outbox. The PostgreSQL queue ```cs opts.ListenToPostgresqlQueue("sender").BufferedInMemory(); ``` -snippet source | anchor +snippet source | anchor Using this option just means that the PostgreSQL queues can be used for both sending or receiving with no integration diff --git a/docs/guide/durability/sqlserver.md b/docs/guide/durability/sqlserver.md index 5f0809852..07da10972 100644 --- a/docs/guide/durability/sqlserver.md +++ b/docs/guide/durability/sqlserver.md @@ -91,7 +91,7 @@ that they are utilizing the transactional inbox and outbox. The Sql Server queue ```cs opts.ListenToSqlServerQueue("sender").BufferedInMemory(); ``` -snippet source | anchor +snippet source | anchor Using this option just means that the Sql Server queues can be used for both sending or receiving with no integration diff --git a/docs/guide/handlers/sticky.md b/docs/guide/handlers/sticky.md index ce025f86e..41e22dc67 100644 --- a/docs/guide/handlers/sticky.md +++ b/docs/guide/handlers/sticky.md @@ -25,7 +25,7 @@ message as an input. ```cs public class StickyMessage; ``` -snippet source | anchor +snippet source | anchor And we're going to handle that `StickyMessage` message separately with two different handler types: @@ -51,7 +51,7 @@ public static class GreenStickyHandler } } ``` -snippet source | anchor +snippet source | anchor ::: tip @@ -79,7 +79,7 @@ using var host = await Host.CreateDefaultBuilder() opts.ListenAtPort(4000).Named("blue"); }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor With all of that being said, the end result of the two `StickyMessage` handlers that are marked with `[StickyHandler]` @@ -119,7 +119,13 @@ using var host = await Host.CreateDefaultBuilder() }).StartAsync(); ``` -snippet source | anchor +snippet source | anchor +## Configuring Local Queues + +There is a world of reasons why you might want to fine tune the behavior of local queues (sequential ordering? parallelism? circuit breakers?), but the +"sticky" handler usage did make it a little harder to configure the exact right local queue for a sticky handler. To alleviate that, see the +[IConfigureLocalQueue](/guide/messaging/transports/local.html#using-iconfigurelocalqueue-to-configure-local-queues) usage. + diff --git a/docs/guide/messaging/transports/external-tables.md b/docs/guide/messaging/transports/external-tables.md index bf3d4bea4..204e8f85d 100644 --- a/docs/guide/messaging/transports/external-tables.md +++ b/docs/guide/messaging/transports/external-tables.md @@ -74,7 +74,7 @@ builder.UseWolverine(opts => .Sequential(); }); ``` -snippet source | anchor +snippet source | anchor So a couple things to know: diff --git a/docs/guide/messaging/transports/local.md b/docs/guide/messaging/transports/local.md index 3375da73a..b769a8ee3 100644 --- a/docs/guide/messaging/transports/local.md +++ b/docs/guide/messaging/transports/local.md @@ -206,6 +206,62 @@ using var host = await Host.CreateDefaultBuilder() snippet source | anchor +## Using IConfigureLocalQueue to Configure Local Queues + +::: info +This feature was added in reaction to the newer "sticky" handler to local queue usage, but it's perfectly usable for +message types that are happily handled without any "sticky" handler configuration. +::: + +The advent of ["sticky handlers"](/guide/handlers/sticky) or the [separated handler mode](/guide/handlers/#multiple-handlers-for-the-same-message-type) for better Wolverine usage in modular monoliths admittedly +made it a little harder to fine tune the local queue behavior for different message types or message handlers without understanding +the Wolverine naming conventions. To get back to leaning more on the type system, Wolverine introduced the static `IConfigureLocalQueue` +interface that can be implemented on any handler type to configure the local queue where that handler would run: + + + +```cs +/// +/// Helps mark a handler to configure the local queue that its messages +/// would be routed to. It's probably only useful to use this with "sticky" handlers +/// that run on an isolated local queue +/// +public interface IConfigureLocalQueue +{ + static abstract void Configure(LocalQueueConfiguration configuration); +} +``` +snippet source | anchor + + +::: tip +Static interfaces can only be used on non-static types, so even if all your message handler *methods* are static, the +handler type itself cannot be static. Just a .NET quirk. +::: + +To use this, just implement that interface on any message handler type: + + + +```cs +public class MultipleMessage1Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + // This method is configuring the local queue that executes this + // handler to be strictly ordered + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } +} +``` +snippet source | anchor + + ## Durable Local Messages The local worker queues can optionally be designated as "durable," meaning that local messages would be persisted until they can be successfully processed to provide a guarantee that the message will be successfully processed in the case of the running application faulting or having been shut down prematurely (assuming that other nodes are running or it's restarted later of course). diff --git a/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs b/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs new file mode 100644 index 000000000..a440f0f36 --- /dev/null +++ b/src/Testing/CoreTests/Acceptance/configuring_local_queues.cs @@ -0,0 +1,134 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Attributes; +using Wolverine.Configuration; +using Wolverine.Tracking; +using Wolverine.Transports.Local; +using Xunit; + +namespace CoreTests.Acceptance; + +public class configuring_local_queues : IntegrationContext +{ + public configuring_local_queues(DefaultApp @default) : base(@default) + { + } + + [Fact] + public void apply_to_normal_non_sticky_default_routed_handler() + { + var runtime = Host.GetRuntime(); + runtime.Endpoints.EndpointByName("Frank") + .ShouldBeOfType().Uri.ShouldBe(new Uri("local://coretests.acceptance.simplemessage/")); + } + + [Fact] + public void apply_to_sticky_handlers() + { + var runtime = Host.GetRuntime(); + runtime.Endpoints.EndpointByName("blue") + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1); + + runtime.Endpoints.EndpointByName("green") + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000); + } + + [Fact] + public async Task use_with_separated_mode() + { + using var host = await new HostBuilder().UseWolverine(opts => + { + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + }).StartAsync(); + + var runtime = host.GetRuntime(); + runtime.Endpoints.EndpointByName(typeof(MultipleMessage1Handler).FullNameInCode().ToLowerInvariant()) + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1); + + runtime.Endpoints.EndpointByName(typeof(MultipleMessage2Handler).FullNameInCode().ToLowerInvariant()) + .ShouldBeOfType().ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(1000); + } +} + +public record SimpleMessage; + +public class SimpleMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + // Just got to do something to prove out the configuration + configuration.Named("Frank"); + } + + public static void Handle(SimpleMessage message) + { + + } +} + +public record StuckMessage; + +[StickyHandler("blue")] +public class BlueStuckMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } + + public static void Handle(StuckMessage message) + { + + } +} + +[StickyHandler("green")] +public class GreenStuckMessageHandler : IConfigureLocalQueue +{ + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.MaximumParallelMessages(1000); + } + + public static void Handle(StuckMessage message) + { + + } +} + +public record MultipleMessage; + +#region sample_using_IConfigureLocalQueue + +public class MultipleMessage1Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + // This method is configuring the local queue that executes this + // handler to be strictly ordered + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.Sequential(); + } +} + +#endregion + +public class MultipleMessage2Handler : IConfigureLocalQueue +{ + public static void Handle(MultipleMessage message) + { + + } + + public static void Configure(LocalQueueConfiguration configuration) + { + configuration.MaximumParallelMessages(1000); + } +} + + diff --git a/src/Wolverine/Configuration/IConfigureLocalQueue.cs b/src/Wolverine/Configuration/IConfigureLocalQueue.cs index 160c1c091..7ace15664 100644 --- a/src/Wolverine/Configuration/IConfigureLocalQueue.cs +++ b/src/Wolverine/Configuration/IConfigureLocalQueue.cs @@ -2,6 +2,8 @@ namespace Wolverine.Configuration; +#region sample_IConfigureLocalQueue + /// /// Helps mark a handler to configure the local queue that its messages /// would be routed to. It's probably only useful to use this with "sticky" handlers @@ -9,5 +11,7 @@ namespace Wolverine.Configuration; /// public interface IConfigureLocalQueue { - static abstract void Configure(LocalQueueConfiguration configuration); -} \ No newline at end of file + static abstract void Configure(LocalQueueConfiguration configuration); +} + +#endregion \ No newline at end of file diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 5780560ad..01cf7f195 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -17,6 +17,7 @@ using Wolverine.Runtime.Scheduled; using Wolverine.Runtime.Serialization; using Wolverine.Transports; +using Wolverine.Transports.Local; using Wolverine.Util; namespace Wolverine.Runtime.Handlers; @@ -309,6 +310,17 @@ IEnumerable explodeChains(HandlerChain chain) foreach (var configuration in _configurations) configuration(); registerMessageTypes(); + + tryApplyLocalQueueConfiguration(options); + } + + private void tryApplyLocalQueueConfiguration(WolverineOptions options) + { + var local = options.Transports.GetOrCreate(); + foreach (var chain in Chains) + { + local.ApplyConfiguration(chain); + } } private void registerMessageTypes() diff --git a/src/Wolverine/Transports/Local/LocalTransport.cs b/src/Wolverine/Transports/Local/LocalTransport.cs index 817807851..58ed600de 100644 --- a/src/Wolverine/Transports/Local/LocalTransport.cs +++ b/src/Wolverine/Transports/Local/LocalTransport.cs @@ -4,6 +4,7 @@ using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Runtime.Agents; +using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Routing; using Wolverine.Util; @@ -234,4 +235,53 @@ internal LocalQueueConfiguration ConfigureQueueFor(Type messageType) return configuration; } + + internal void ApplyConfiguration(HandlerChain chain) + { + // Gotta go recursive + foreach (var handlerChain in chain.ByEndpoint) + { + ApplyConfiguration(handlerChain); + } + + var configured = chain.Handlers.Select(x => x.HandlerType) + .Where(x => x.CanBeCastTo(typeof(IConfigureLocalQueue))).ToArray(); + + if (!configured.Any()) return; + + // Is it sticky? + if (chain.Endpoints.OfType().Any()) + { + foreach (var handlerType in configured) + { + var applier = typeof(Applier<>).CloseAndBuildAs(handlerType); + foreach (var localQueue in chain.Endpoints.OfType()) + { + applier.Apply(new LocalQueueConfiguration(localQueue)); + } + } + } + else + { + var configuration = ConfigureQueueFor(chain.MessageType); + foreach (var handlerType in configured) + { + typeof(Applier<>).CloseAndBuildAs(handlerType).Apply(configuration); + } + } + } + + private interface IApplier + { + void Apply(LocalQueueConfiguration configuration); + } + + private class Applier : IApplier where T : IConfigureLocalQueue + { + public void Apply(LocalQueueConfiguration configuration) + { + T.Configure(configuration); + } + } + } \ No newline at end of file