Skip to content

Commit bb552fc

Browse files
committed
Little extra docs and tests for disabling one or the other Rabbit MQ connections
1 parent a4af296 commit bb552fc

File tree

3 files changed

+170
-2
lines changed

3 files changed

+170
-2
lines changed

docs/guide/messaging/transports/rabbitmq/index.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,74 @@ return await Host.CreateDefaultBuilder(args)
5454
See the [Rabbit MQ .NET Client documentation](https://www.rabbitmq.com/dotnet-api-guide.html#connecting) for more information about configuring the `ConnectionFactory` to connect to Rabbit MQ.
5555

5656

57+
## Managing Rabbit MQ Connections
58+
59+
In its default setup, the Rabbit MQ transport in Wolverine will open two connections, one for listening and another for sending
60+
messages. All Rabbit MQ endpoints will share these two connections. If you need to conserve Rabbit MQ connections
61+
and have a process that is only sending or only receiving messages through Rabbit MQ, you can opt to turn off one or the
62+
other connections that might not be used at runtime.
63+
64+
To only listen to Rabbit MQ messages, but never send them:
65+
66+
<!-- snippet: sample_only_use_listener_connection_with_rabbitmq -->
67+
<a id='snippet-sample_only_use_listener_connection_with_rabbitmq'></a>
68+
```cs
69+
using var host = await Host.CreateDefaultBuilder()
70+
.UseWolverine(opts =>
71+
{
72+
// *A* way to configure Rabbit MQ using their Uri schema
73+
// documented here: https://www.rabbitmq.com/uri-spec.html
74+
opts.UseRabbitMq(new Uri("amqp://localhost"))
75+
76+
// Turn on listener connection only in case if you only need to listen for messages
77+
// The sender connection won't be activated in this case
78+
.UseListenerConnectionOnly();
79+
80+
// Set up a listener for a queue, but also
81+
// fine-tune the queue characteristics if Wolverine
82+
// will be governing the queue setup
83+
opts.ListenToRabbitQueue("incoming2", q =>
84+
{
85+
q.PurgeOnStartup = true;
86+
q.TimeToLive(5.Minutes());
87+
});
88+
}).StartAsync();
89+
```
90+
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L84-L107' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_only_use_listener_connection_with_rabbitmq' title='Start of snippet'>anchor</a></sup>
91+
<!-- endSnippet -->
92+
93+
To only send Rabbit MQ messages, but never receive them:
94+
95+
<!-- snippet: sample_only_use_sending_connection_with_rabbitmq -->
96+
<a id='snippet-sample_only_use_sending_connection_with_rabbitmq'></a>
97+
```cs
98+
using var host = await Host.CreateDefaultBuilder()
99+
.UseWolverine(opts =>
100+
{
101+
// *A* way to configure Rabbit MQ using their Uri schema
102+
// documented here: https://www.rabbitmq.com/uri-spec.html
103+
opts.UseRabbitMq(new Uri("amqp://localhost"))
104+
105+
// Turn on sender connection only in case if you only need to send messages
106+
// The listener connection won't be created in this case
107+
.UseSenderConnectionOnly();
108+
109+
// Set up a listener for a queue, but also
110+
// fine-tune the queue characteristics if Wolverine
111+
// will be governing the queue setup
112+
opts.ListenToRabbitQueue("incoming2", q =>
113+
{
114+
q.PurgeOnStartup = true;
115+
q.TimeToLive(5.Minutes());
116+
});
117+
}).StartAsync();
118+
```
119+
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L112-L135' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_only_use_sending_connection_with_rabbitmq' title='Start of snippet'>anchor</a></sup>
120+
<!-- endSnippet -->
121+
122+
123+
124+
57125
## Disable Rabbit MQ Reply Queues
58126

59127
By default, Wolverine creates an in memory queue in the Rabbit MQ broker for each individual node that is used by Wolverine

src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public static async Task disable_system_queue()
8181

8282
public static async Task use_listener_connection_only()
8383
{
84-
#region sample_disable_rabbit_mq_system_queue
84+
#region sample_only_use_listener_connection_with_rabbitmq
8585

8686
using var host = await Host.CreateDefaultBuilder()
8787
.UseWolverine(opts =>
@@ -109,7 +109,7 @@ public static async Task use_listener_connection_only()
109109

110110
public static async Task use_sender_connection_only()
111111
{
112-
#region sample_disable_rabbit_mq_system_queue
112+
#region sample_only_use_sending_connection_with_rabbitmq
113113

114114
using var host = await Host.CreateDefaultBuilder()
115115
.UseWolverine(opts =>

src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void rabbitmq_transport_is_exposed_as_a_resource()
4343
var queueName = RabbitTesting.NextQueueName();
4444
using var publisher = WolverineHost.For(opts =>
4545
{
46+
4647
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();
4748

4849
opts.PublishAllMessages()
@@ -194,6 +195,105 @@ public async Task send_message_to_and_receive_through_rabbitmq_with_inline_recei
194195
cancellation.Token.ThrowIfCancellationRequested();
195196

196197

198+
}
199+
200+
[Fact]
201+
public async Task send_message_to_and_receive_through_rabbitmq_with_inline_receivers_and_only_listener_connection()
202+
{
203+
var queueName = RabbitTesting.NextQueueName();
204+
using var publisher = WolverineHost.For(opts =>
205+
{
206+
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();
207+
208+
opts.PublishAllMessages()
209+
.ToRabbitQueue(queueName)
210+
.SendInline();
211+
212+
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
213+
214+
215+
});
216+
217+
218+
using var receiver = WolverineHost.For(opts =>
219+
{
220+
opts.UseRabbitMq().AutoProvision().UseListenerConnectionOnly();
221+
222+
opts.ListenToRabbitQueue(queueName).ProcessInline().Named(queueName);
223+
opts.Services.AddSingleton<ColorHistory>();
224+
225+
226+
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
227+
});
228+
229+
await receiver.ResetResourceState();
230+
231+
for (int i = 0; i < 10000; i++)
232+
{
233+
await publisher.SendAsync(new ColorChosen { Name = "blue" });
234+
}
235+
236+
var cancellation = new CancellationTokenSource(30.Seconds());
237+
var queue = receiver.Get<IWolverineRuntime>().Endpoints.EndpointByName(queueName).ShouldBeOfType<RabbitMqQueue>();
238+
239+
while (!cancellation.IsCancellationRequested && queue.QueuedCount() > 0)
240+
{
241+
await Task.Delay(250.Milliseconds(), cancellation.Token);
242+
}
243+
244+
cancellation.Token.ThrowIfCancellationRequested();
245+
246+
247+
}
248+
249+
250+
[Fact]
251+
public async Task send_message_to_and_receive_through_rabbitmq_with_inline_receivers_and_only_subscriber_connection()
252+
{
253+
var queueName = RabbitTesting.NextQueueName();
254+
using var publisher = WolverineHost.For(opts =>
255+
{
256+
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup().UseSenderConnectionOnly();
257+
258+
opts.PublishAllMessages()
259+
.ToRabbitQueue(queueName)
260+
.SendInline();
261+
262+
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
263+
264+
265+
});
266+
267+
268+
using var receiver = WolverineHost.For(opts =>
269+
{
270+
opts.UseRabbitMq().AutoProvision();
271+
272+
opts.ListenToRabbitQueue(queueName).ProcessInline().Named(queueName);
273+
opts.Services.AddSingleton<ColorHistory>();
274+
275+
276+
opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
277+
});
278+
279+
await receiver.ResetResourceState();
280+
281+
for (int i = 0; i < 10000; i++)
282+
{
283+
await publisher.SendAsync(new ColorChosen { Name = "blue" });
284+
}
285+
286+
var cancellation = new CancellationTokenSource(30.Seconds());
287+
var queue = receiver.Get<IWolverineRuntime>().Endpoints.EndpointByName(queueName).ShouldBeOfType<RabbitMqQueue>();
288+
289+
while (!cancellation.IsCancellationRequested && queue.QueuedCount() > 0)
290+
{
291+
await Task.Delay(250.Milliseconds(), cancellation.Token);
292+
}
293+
294+
cancellation.Token.ThrowIfCancellationRequested();
295+
296+
197297
}
198298

199299

0 commit comments

Comments
 (0)