diff --git a/docs/guide/messaging/transports/mqtt.md b/docs/guide/messaging/transports/mqtt.md index 866818db1..5fc5a82f2 100644 --- a/docs/guide/messaging/transports/mqtt.md +++ b/docs/guide/messaging/transports/mqtt.md @@ -147,6 +147,16 @@ public class FirstMessage <sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L150-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup> <!-- endSnippet --> +## Listening by Topic Filter + +Wolverine supports topic filters for listening. The syntax is still just the same `ListenToMqttTopic(filter)` as shown +in this snippet from the Wolverine.MQTT test suite: + +snippet: sample_listen_to_mqtt_topic_filter + +In the case of receiving any message that matches the topic filter *according to the [MQTT topic filter rules](https://cedalo.com/blog/mqtt-topics-and-mqtt-wildcards-explained/)*, that message +will be handled by the listening endpoint defined for that filter. + ## Integrating with Non-Wolverine It's quite likely that in using Wolverine with an MQTT broker that you will be communicating with non-Wolverine systems diff --git a/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs new file mode 100644 index 000000000..514dec922 --- /dev/null +++ b/src/Transports/MQTT/Wolverine.MQTT.Tests/listen_with_topic_wildcards.cs @@ -0,0 +1,72 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Shouldly; +using TestingSupport; +using Wolverine.Tracking; +using Xunit.Abstractions; + +namespace Wolverine.MQTT.Tests; + +public class listen_with_topic_wildcards : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost _sender; + private IHost _receiver; + + public listen_with_topic_wildcards(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + var port = PortFinder.GetAvailablePort(); + + + Broker = new LocalMqttBroker(port) + { + Logger = new XUnitLogger( _output, "MQTT") + }; + + await Broker.StartAsync(); + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseMqttWithLocalBroker(port); + opts.Policies.DisableConventionalLocalRouting(); + }).StartAsync(); + + #region sample_listen_to_mqtt_topic_filter + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseMqttWithLocalBroker(port); + opts.ListenToMqttTopic("incoming/#").RetainMessages(); + }).StartAsync(); + + #endregion + } + + [Fact] + public async Task broadcast() + { + var session = await _sender.TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .ExecuteAndWaitAsync(m => m.BroadcastToTopicAsync("incoming/one", new ColorMessage("blue"))); + + var received = session.Received.SingleMessage<ColorMessage>(); + received.Color.ShouldBe("blue"); + } + + public LocalMqttBroker Broker { get; set; } + + public async Task DisposeAsync() + { + await Broker.StopAsync(); + await _sender.StopAsync(); + await _receiver.StopAsync(); + } +} \ No newline at end of file diff --git a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs index bd04ff852..506977e0c 100644 --- a/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs +++ b/src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs @@ -14,7 +14,8 @@ namespace Wolverine.MQTT.Internals; public class MqttTransport : TransportBase<MqttTopic>, IAsyncDisposable { public LightweightCache<string, MqttTopic> Topics { get; } = new(); - private ImHashMap<string, MqttListener> _listeners = ImHashMap<string, MqttListener>.Empty; + private List<MqttListener> _listeners = new(); + private ImHashMap<string, MqttListener> _topicListeners = ImHashMap<string, MqttListener>.Empty; private bool _subscribed; private ILogger<MqttTransport> _logger; @@ -82,7 +83,7 @@ private void startSubscribing() private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg) { var topicName = arg.ApplicationMessage.Topic; - if (_listeners.TryFind(topicName, out var listener)) + if (tryFindListener(topicName, out var listener)) { return listener.ReceiveAsync(arg); } @@ -93,6 +94,22 @@ private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg) } } + internal bool tryFindListener(string topicName, out MqttListener listener) + { + if (_topicListeners.TryFind(topicName, out listener)) + { + return listener is not null; + } + + listener = _listeners.FirstOrDefault(x => x.TopicName == topicName) ?? _listeners.FirstOrDefault(x => + MqttTopicFilterComparer.Compare(topicName, x.TopicName) == MqttTopicFilterCompareResult.IsMatch); + + _topicListeners = _topicListeners.AddOrUpdate(topicName, listener); + + + return listener is not null; + } + internal IManagedMqttClient Client { get; private set; } public ManagedMqttClientOptions Options { get; set; } = new ManagedMqttClientOptions @@ -110,7 +127,7 @@ public async ValueTask DisposeAsync() internal async ValueTask SubscribeToTopicAsync(string topicName, MqttListener listener, MqttTopic mqttTopic) { - _listeners = _listeners.AddOrUpdate(topicName, listener); + _listeners.Add(listener); await Client.SubscribeAsync(topicName, mqttTopic.QualityOfServiceLevel);