Skip to content

Commit

Permalink
MQTT wildcard listening. Closes GH-654
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Dec 15, 2023
1 parent 1ce184e commit 0f47842
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 3 deletions.
10 changes: 10 additions & 0 deletions docs/guide/messaging/transports/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public class FirstMessage
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/send_by_topics.cs#L150-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_attribute-1' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Listening by Topic Filter

Wolverine supports topic filters for listening. The syntax is still just the same `ListenToMqttTopic(filter)` as shown
in this snippet from the Wolverine.MQTT test suite:

snippet: sample_listen_to_mqtt_topic_filter

In the case of receiving any message that matches the topic filter *according to the [MQTT topic filter rules](https://cedalo.com/blog/mqtt-topics-and-mqtt-wildcards-explained/)*, that message
will be handled by the listening endpoint defined for that filter.

## Integrating with Non-Wolverine

It's quite likely that in using Wolverine with an MQTT broker that you will be communicating with non-Wolverine systems
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Shouldly;
using TestingSupport;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace Wolverine.MQTT.Tests;

public class listen_with_topic_wildcards : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private IHost _sender;
private IHost _receiver;

public listen_with_topic_wildcards(ITestOutputHelper output)
{
_output = output;
}

public async Task InitializeAsync()
{
var port = PortFinder.GetAvailablePort();


Broker = new LocalMqttBroker(port)
{
Logger = new XUnitLogger( _output, "MQTT")
};

await Broker.StartAsync();

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseMqttWithLocalBroker(port);
opts.Policies.DisableConventionalLocalRouting();
}).StartAsync();

#region sample_listen_to_mqtt_topic_filter

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseMqttWithLocalBroker(port);
opts.ListenToMqttTopic("incoming/#").RetainMessages();
}).StartAsync();

#endregion
}

[Fact]
public async Task broadcast()
{
var session = await _sender.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(30.Seconds())
.ExecuteAndWaitAsync(m => m.BroadcastToTopicAsync("incoming/one", new ColorMessage("blue")));

var received = session.Received.SingleMessage<ColorMessage>();
received.Color.ShouldBe("blue");
}

public LocalMqttBroker Broker { get; set; }

public async Task DisposeAsync()
{
await Broker.StopAsync();
await _sender.StopAsync();
await _receiver.StopAsync();
}
}
23 changes: 20 additions & 3 deletions src/Transports/MQTT/Wolverine.MQTT/Internals/MqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace Wolverine.MQTT.Internals;
public class MqttTransport : TransportBase<MqttTopic>, IAsyncDisposable
{
public LightweightCache<string, MqttTopic> Topics { get; } = new();
private ImHashMap<string, MqttListener> _listeners = ImHashMap<string, MqttListener>.Empty;
private List<MqttListener> _listeners = new();
private ImHashMap<string, MqttListener> _topicListeners = ImHashMap<string, MqttListener>.Empty;
private bool _subscribed;
private ILogger<MqttTransport> _logger;

Expand Down Expand Up @@ -82,7 +83,7 @@ private void startSubscribing()
private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var topicName = arg.ApplicationMessage.Topic;
if (_listeners.TryFind(topicName, out var listener))
if (tryFindListener(topicName, out var listener))
{
return listener.ReceiveAsync(arg);
}
Expand All @@ -93,6 +94,22 @@ private Task receiveAsync(MqttApplicationMessageReceivedEventArgs arg)
}
}

internal bool tryFindListener(string topicName, out MqttListener listener)
{
if (_topicListeners.TryFind(topicName, out listener))
{
return listener is not null;
}

listener = _listeners.FirstOrDefault(x => x.TopicName == topicName) ?? _listeners.FirstOrDefault(x =>
MqttTopicFilterComparer.Compare(topicName, x.TopicName) == MqttTopicFilterCompareResult.IsMatch);

_topicListeners = _topicListeners.AddOrUpdate(topicName, listener);


return listener is not null;
}

internal IManagedMqttClient Client { get; private set; }

public ManagedMqttClientOptions Options { get; set; } = new ManagedMqttClientOptions
Expand All @@ -110,7 +127,7 @@ public async ValueTask DisposeAsync()

internal async ValueTask SubscribeToTopicAsync(string topicName, MqttListener listener, MqttTopic mqttTopic)
{
_listeners = _listeners.AddOrUpdate(topicName, listener);
_listeners.Add(listener);

await Client.SubscribeAsync(topicName, mqttTopic.QualityOfServiceLevel);

Expand Down

0 comments on commit 0f47842

Please sign in to comment.