Skip to content

Commit 195059c

Browse files
authored
Remove ChannelOptions internal class (#1712)
* Remove `ChannelOptions` internal class Update `CreateChannelOptions` so that it provides the same internal behavior that `ChannelOptions` did. * * Make `CreateChannelOptions` an immutable class. * * Fix test failures due to not passing `_consumerDispatchConcurrency` correctly to `CreateChannelOptions`
1 parent c642f7b commit 195059c

26 files changed

+172
-184
lines changed

projects/Applications/GH-1647/Program.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,7 @@
4040
Password = "guest"
4141
};
4242

43-
var channelOptions = new CreateChannelOptions
44-
{
45-
PublisherConfirmationsEnabled = true,
46-
PublisherConfirmationTrackingEnabled = true
47-
};
43+
var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
4844

4945
var props = new BasicProperties();
5046
byte[] msg = Encoding.UTF8.GetBytes("test");

projects/Applications/MassPublish/Program.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,9 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
137137

138138
publishTasks.Add(Task.Run(async () =>
139139
{
140-
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
140+
var createChannelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true,
141+
publisherConfirmationTrackingEnabled: true);
142+
using IChannel publishChannel = await publishConnection.CreateChannelAsync(createChannelOptions);
141143
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
142144

143145
for (int i = 0; i < ItemsPerBatch; i++)

projects/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@
4343
const int MESSAGE_COUNT = 50_000;
4444
bool debug = false;
4545

46-
var channelOpts = new CreateChannelOptions
47-
{
48-
PublisherConfirmationsEnabled = true,
49-
PublisherConfirmationTrackingEnabled = true,
50-
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
51-
};
46+
var channelOpts = new CreateChannelOptions(
47+
publisherConfirmationsEnabled: true,
48+
publisherConfirmationTrackingEnabled: true,
49+
outstandingPublisherConfirmationsRateLimiter: new ThrottlingRateLimiter(MAX_OUTSTANDING_CONFIRMS)
50+
);
5251

5352
var props = new BasicProperties
5453
{
@@ -177,7 +176,7 @@ async Task HandlePublishConfirmsAsynchronously()
177176

178177
await using IConnection connection = await CreateConnectionAsync();
179178

180-
channelOpts.PublisherConfirmationTrackingEnabled = false;
179+
channelOpts = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: false);
181180
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);
182181

183182
// declare a server-named queue

projects/RabbitMQ.Client/ConnectionFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
185185
/// </summary>
186186
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(5);
187187

188-
private TimeSpan _handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
189-
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
188+
private TimeSpan _handshakeContinuationTimeout = Constants.DefaultHandshakeContinuationTimeout;
189+
private TimeSpan _continuationTimeout = Constants.DefaultContinuationTimeout;
190190

191191
// just here to hold the value that was set through the setter
192192
private string? _clientProvidedName;

projects/RabbitMQ.Client/Constants.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
33+
3234
namespace RabbitMQ.Client
3335
{
3436
public static class Constants
@@ -97,5 +99,15 @@ public static class Constants
9799
/// <c>basic.return</c> is sent via the broker.
98100
/// </summary>
99101
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
102+
103+
/// <summary>
104+
/// The default timeout for initial AMQP handshake
105+
/// </summary>
106+
public static readonly TimeSpan DefaultHandshakeContinuationTimeout = TimeSpan.FromSeconds(10);
107+
108+
/// <summary>
109+
/// The default timeout for RPC methods
110+
/// </summary>
111+
public static readonly TimeSpan DefaultContinuationTimeout = TimeSpan.FromSeconds(20);
100112
}
101113
}

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Threading.RateLimiting;
3334

3435
namespace RabbitMQ.Client
@@ -38,6 +39,9 @@ namespace RabbitMQ.Client
3839
/// </summary>
3940
public sealed class CreateChannelOptions
4041
{
42+
private ushort? _connectionConfigConsumerDispatchConcurrency;
43+
private TimeSpan _connectionConfigContinuationTimeout;
44+
4145
/// <summary>
4246
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
4347
///
@@ -49,7 +53,7 @@ public sealed class CreateChannelOptions
4953
/// <see cref="IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken)"/> to allow correlation
5054
/// of the response with the correct message.
5155
/// </summary>
52-
public bool PublisherConfirmationsEnabled { get; set; } = false;
56+
public readonly bool PublisherConfirmationsEnabled = false;
5357

5458
/// <summary>
5559
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
@@ -59,7 +63,7 @@ public sealed class CreateChannelOptions
5963
/// If the broker then sends a <c>basic.return</c> response for the message, this library can
6064
/// then correctly handle the message.
6165
/// </summary>
62-
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
66+
public readonly bool PublisherConfirmationTrackingEnabled = false;
6367

6468
/// <summary>
6569
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
@@ -68,7 +72,7 @@ public sealed class CreateChannelOptions
6872
/// Defaults to a <see cref="ThrottlingRateLimiter"/> with a limit of 128 and a throttling percentage of 50% with a delay during throttling.
6973
/// </summary>
7074
/// <remarks>Setting the rate limiter to <c>null</c> disables the rate limiting entirely.</remarks>
71-
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
75+
public readonly RateLimiter? OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(128);
7276

7377
/// <summary>
7478
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
@@ -80,11 +84,62 @@ public sealed class CreateChannelOptions
8084
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
8185
/// In addition to that consumers need to be thread/concurrency safe.
8286
/// </summary>
83-
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
87+
public readonly ushort? ConsumerDispatchConcurrency = null;
8488

85-
/// <summary>
86-
/// The default channel options.
87-
/// </summary>
88-
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
89+
public CreateChannelOptions(bool publisherConfirmationsEnabled,
90+
bool publisherConfirmationTrackingEnabled,
91+
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
92+
ushort? consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
93+
{
94+
PublisherConfirmationsEnabled = publisherConfirmationsEnabled;
95+
PublisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
96+
OutstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
97+
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
98+
}
99+
100+
internal ushort InternalConsumerDispatchConcurrency
101+
{
102+
get
103+
{
104+
if (ConsumerDispatchConcurrency is not null)
105+
{
106+
return ConsumerDispatchConcurrency.Value;
107+
}
108+
109+
if (_connectionConfigConsumerDispatchConcurrency is not null)
110+
{
111+
return _connectionConfigConsumerDispatchConcurrency.Value;
112+
}
113+
114+
return Constants.DefaultConsumerDispatchConcurrency;
115+
}
116+
}
117+
118+
internal TimeSpan ContinuationTimeout => _connectionConfigContinuationTimeout;
119+
120+
internal CreateChannelOptions(ConnectionConfig connectionConfig)
121+
{
122+
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
123+
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
124+
}
125+
126+
private CreateChannelOptions WithConnectionConfig(ConnectionConfig connectionConfig)
127+
{
128+
_connectionConfigConsumerDispatchConcurrency = connectionConfig.ConsumerDispatchConcurrency;
129+
_connectionConfigContinuationTimeout = connectionConfig.ContinuationTimeout;
130+
return this;
131+
}
132+
133+
internal static CreateChannelOptions CreateOrUpdate(CreateChannelOptions? createChannelOptions, ConnectionConfig config)
134+
{
135+
if (createChannelOptions is null)
136+
{
137+
return new CreateChannelOptions(config);
138+
}
139+
else
140+
{
141+
return createChannelOptions.WithConnectionConfig(config);
142+
}
143+
}
89144
}
90145
}

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
4242
{
4343
internal sealed class AutorecoveringChannel : IChannel, IRecoverable
4444
{
45-
private readonly ChannelOptions _channelOptions;
45+
private readonly CreateChannelOptions _createChannelOptions;
4646
private readonly List<string> _recordedConsumerTags = new List<string>();
4747

4848
private AutorecoveringConnection _connection;
@@ -73,11 +73,11 @@ public TimeSpan ContinuationTimeout
7373

7474
public AutorecoveringChannel(AutorecoveringConnection conn,
7575
RecoveryAwareChannel innerChannel,
76-
ChannelOptions channelOptions)
76+
CreateChannelOptions createChannelOptions)
7777
{
7878
_connection = conn;
7979
_innerChannel = innerChannel;
80-
_channelOptions = channelOptions;
80+
_createChannelOptions = createChannelOptions;
8181
}
8282

8383
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -162,7 +162,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
162162

163163
_connection = conn;
164164

165-
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_channelOptions, cancellationToken)
165+
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(_createChannelOptions, cancellationToken)
166166
.ConfigureAwait(false);
167167

168168
newChannel.TakeOver(_innerChannel);

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
185185
public IProtocol Protocol => Endpoint.Protocol;
186186

187187
public ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
188-
ChannelOptions channelOptions,
188+
CreateChannelOptions createChannelOptions,
189189
CancellationToken cancellationToken = default)
190190
{
191191
ISession session = InnerConnection.CreateSession();
192-
return RecoveryAwareChannel.CreateAndOpenAsync(session, channelOptions, cancellationToken);
192+
return RecoveryAwareChannel.CreateAndOpenAsync(session, createChannelOptions, cancellationToken);
193193
}
194194

195195
public override string ToString()
@@ -251,21 +251,16 @@ await CloseInnerConnectionAsync()
251251
}
252252
}
253253

254-
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
254+
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? createChannelOptions = default,
255255
CancellationToken cancellationToken = default)
256256
{
257257
EnsureIsOpen();
258258

259-
options ??= CreateChannelOptions.Default;
260-
261-
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
262-
263-
var channelOptions = ChannelOptions.From(options, _config);
264-
265-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(channelOptions, cancellationToken)
259+
createChannelOptions = CreateChannelOptions.CreateOrUpdate(createChannelOptions, _config);
260+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(createChannelOptions, cancellationToken)
266261
.ConfigureAwait(false);
267262

268-
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, channelOptions);
263+
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, createChannelOptions);
269264

270265
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
271266
.ConfigureAwait(false);

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ internal partial class Channel : IChannel, IRecoverable
6161

6262
internal readonly IConsumerDispatcher ConsumerDispatcher;
6363

64-
public Channel(ISession session, ChannelOptions channelOptions)
64+
public Channel(ISession session, CreateChannelOptions createChannelOptions)
6565
{
66-
ContinuationTimeout = channelOptions.ContinuationTimeout;
67-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, channelOptions.ConsumerDispatchConcurrency);
66+
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
67+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, createChannelOptions.InternalConsumerDispatchConcurrency);
6868
Func<Exception, string, CancellationToken, Task> onExceptionAsync = (exception, context, cancellationToken) =>
6969
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
7070
_basicAcksAsyncWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
@@ -359,12 +359,12 @@ protected bool Enqueue(IRpcContinuation k)
359359
}
360360
}
361361

362-
internal async Task<IChannel> OpenAsync(ChannelOptions channelOptions,
362+
internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
363363
CancellationToken cancellationToken)
364364
{
365-
ConfigurePublisherConfirmations(channelOptions.PublisherConfirmationsEnabled,
366-
channelOptions.PublisherConfirmationTrackingEnabled,
367-
channelOptions.OutstandingPublisherConfirmationsRateLimiter);
365+
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
366+
createChannelOptions.PublisherConfirmationTrackingEnabled,
367+
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);
368368

369369
bool enqueued = false;
370370
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -1493,13 +1493,11 @@ await ModelSendAsync(in method, k.CancellationToken)
14931493
}
14941494
}
14951495

1496-
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions,
1497-
ConnectionConfig connectionConfig, ISession session,
1496+
internal static Task<IChannel> CreateAndOpenAsync(CreateChannelOptions createChannelOptions, ISession session,
14981497
CancellationToken cancellationToken)
14991498
{
1500-
ChannelOptions channelOptions = ChannelOptions.From(createChannelOptions, connectionConfig);
1501-
var channel = new Channel(session, channelOptions);
1502-
return channel.OpenAsync(channelOptions, cancellationToken);
1499+
var channel = new Channel(session, createChannelOptions);
1500+
return channel.OpenAsync(createChannelOptions, cancellationToken);
15031501
}
15041502

15051503
/// <summary>

0 commit comments

Comments
 (0)