Skip to content

Commit

Permalink
Support message grouping (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret authored Jul 25, 2024
1 parent d5b47a2 commit 105631d
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/ArtemisNetCoreClient/Framing/Incoming/SessionReceiveMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var expiration);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out var timestamp);
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var priority);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType);
readBytes += DecodeProperties(buffer[readBytes..], out var properties, out var routingType, out var groupId);
readBytes += ArtemisBinaryConverter.ReadInt64(buffer[readBytes..], out ConsumerId);
readBytes += ArtemisBinaryConverter.ReadInt32(buffer[readBytes..], out DeliveryCount);

Expand All @@ -40,7 +40,8 @@ public SessionReceiveMessage(ReadOnlySpan<byte> buffer)
Priority = priority,
Properties = properties,
MessageDelivery = new MessageDelivery(ConsumerId, messageId),
RoutingType = routingType
RoutingType = routingType,
GroupId = groupId,
};

Debug.Assert(readBytes == buffer.Length, $"Expected to read {buffer.Length} bytes but got {readBytes}");
Expand All @@ -61,9 +62,13 @@ private static int DecodeMessageBody(ReadOnlySpan<byte> buffer, out ReadOnlyMemo
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDictionary<string, object?> properties, out RoutingType? routingType)
private static int DecodeProperties(ReadOnlySpan<byte> buffer,
out IReadOnlyDictionary<string, object?> properties,
out RoutingType? routingType,
out string? groupId)
{
routingType = null;
groupId = null;
properties = ReadOnlyDictionary<string, object?>.Empty;

var readBytes = ArtemisBinaryConverter.ReadByte(buffer, out var isNotNull);
Expand All @@ -84,6 +89,14 @@ private static int DecodeProperties(ReadOnlySpan<byte> buffer, out IReadOnlyDict
routingType = (RoutingType) routingTypeByte;
}
}
else if (key == MessageHeaders.GroupId)
{
readBytes += ArtemisBinaryConverter.ReadByte(buffer[readBytes..], out var type);
if (type == DataConstants.String)
{
readBytes += ArtemisBinaryConverter.ReadSimpleString(buffer[readBytes..], out groupId);
}
}
else
{
readBytes += ArtemisBinaryConverter.ReadNullableObject(buffer[readBytes..], out var obj);
Expand Down
12 changes: 12 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public int GetRequiredBufferSize()
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.RoutingType);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) RoutingType);
}

if (Message.GroupId != null)
{
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.GroupId);
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(Message.GroupId);
}
}

byteCount += sizeof(bool); // RequiresResponse
Expand Down Expand Up @@ -137,6 +143,12 @@ private int EncodeProperties(Span<byte> buffer)
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.RoutingType);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) RoutingType.Value);
}

if (Message.GroupId != null)
{
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.GroupId);
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), Message.GroupId);
}
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions src/ArtemisNetCoreClient/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public class Message
/// The message body (payload)
/// </summary>
public ReadOnlyMemory<byte> Body { get; set; }

/// <summary>
/// The Group ID used when sending the message. This is used for message grouping feature. Messages with the same message group
/// are always consumed by the same consumer, even if multiple consumers are listening on the same queue.
/// </summary>
public string? GroupId { get; set; }
}
1 change: 1 addition & 0 deletions src/ArtemisNetCoreClient/MessageHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ namespace ActiveMQ.Artemis.Core.Client;
internal static class MessageHeaders
{
public const string RoutingType = "_AMQ_ROUTING_TYPE";
public const string GroupId = "_AMQ_GROUP_ID";
}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public class ReceivedMessage
/// The routing type used when sending the message.
/// </summary>
public required RoutingType? RoutingType { get; init; }

/// <summary>
/// The Group ID used when sending the message.
/// </summary>
public required string? GroupId { get; init; }
}
68 changes: 68 additions & 0 deletions test/ArtemisNetCoreClient.Tests/MessageGroupingSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Core.Client.Tests;

public class MessageGroupingSpec(ITestOutputHelper testOutputHelper)
{
[Fact]
public async Task Should_deliver_messages_with_the_same_GroupId_to_the_same_consumer()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync();

var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName);

await using var producer = await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName
});

await using var consumer1 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});
await using var consumer2 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});
await using var consumer3 = await session.CreateConsumerAsync(new ConsumerConfiguration
{
QueueName = queueName
});

await SendMessagesToGroup(producer, "group1", 5);
await SendMessagesToGroup(producer, "group2", 5);
await SendMessagesToGroup(producer, "group3", 5);

await AssertReceivedAllMessagesWithTheSameGroupId(consumer1, 5);
await AssertReceivedAllMessagesWithTheSameGroupId(consumer2, 5);
await AssertReceivedAllMessagesWithTheSameGroupId(consumer3, 5);
}

private static async Task SendMessagesToGroup(IProducer producer, string groupId, int count)
{
for (int i = 1; i <= count; i++)
{
await producer.SendMessageAsync(new Message
{
GroupId = groupId,
});
}
}

private async Task AssertReceivedAllMessagesWithTheSameGroupId(IConsumer consumer, int count)
{
var messages = new List<ReceivedMessage>();
for (int i = 1; i <= count; i++)
{
var message = await consumer.ReceiveMessageAsync();
messages.Add(message);
}

Assert.Single(messages.GroupBy(x => x.GroupId));
}
}

0 comments on commit 105631d

Please sign in to comment.