Skip to content

Commit c9e29dd

Browse files
codyspeckjeremydmiller
authored andcommitted
Allow configuration of batched sender parallelism
1 parent 0aba6c5 commit c9e29dd

File tree

3 files changed

+16
-1
lines changed

3 files changed

+16
-1
lines changed

src/Wolverine/Configuration/Endpoint.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ protected Endpoint(Uri uri, EndpointRole role)
116116
/// </summary>
117117
public int MessageBatchSize { get; set; } = 100;
118118

119+
/// <summary>
120+
/// For endpoints that send messages in batches, this governs the maximum number
121+
/// of concurrent outgoing batches
122+
/// </summary>
123+
public int MessageBatchMaxDegreeOfParallelism { get; set; } = 1;
119124

120125
/// <summary>
121126
/// Mark whether or not the receiver for this listener should use

src/Wolverine/Configuration/SubscriberConfiguration.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ public T MessageBatchSize(int batchSize)
6868
add(e => e.MessageBatchSize = batchSize);
6969
return this.As<T>();
7070
}
71+
72+
/// <summary>
73+
/// For endpoints that send messages in batches, this governs the maximum number
74+
/// of concurrent outgoing batches
75+
/// </summary>
76+
public T MessageBatchMaxDegreeOfParallelism(int batchMaxDegreeOfParallelism)
77+
{
78+
add(e => e.MessageBatchMaxDegreeOfParallelism = batchMaxDegreeOfParallelism);
79+
return this.As<T>();
80+
}
7181

7282
public T DefaultSerializer(IMessageSerializer serializer)
7383
{

src/Wolverine/Transports/Sending/BatchedSender.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public BatchedSender(Endpoint destination, ISenderProtocol protocol, Cancellatio
2929

3030
_sender = new ActionBlock<OutgoingMessageBatch>(SendBatchAsync, new ExecutionDataflowBlockOptions
3131
{
32-
MaxDegreeOfParallelism = 1,
32+
MaxDegreeOfParallelism = destination.MessageBatchMaxDegreeOfParallelism,
3333
CancellationToken = _cancellation,
3434
BoundedCapacity = DataflowBlockOptions.Unbounded
3535
});

0 commit comments

Comments
 (0)