Skip to content

Commit

Permalink
Merge pull request #171 from Dervanil/fix/messaging_rabbitmq
Browse files Browse the repository at this point in the history
Fix/messaging rabbitmq
  • Loading branch information
lucianareginalino committed Aug 28, 2023
2 parents 6d6f0ab + 10b3dc9 commit fc52fc1
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static class IServiceCollectionExtension
public static IServiceCollection AddLiquidRabbitMqProducer<TEntity>(this IServiceCollection services, string sectionName, bool activateTelemetry = true)
{
services.TryAddTransient<IRabbitMqFactory, RabbitMqFactory>();

if (activateTelemetry)
{
services.AddScoped((provider) =>
Expand Down Expand Up @@ -94,7 +95,7 @@ public static IServiceCollection AddLiquidRabbitMqConsumer<TWorker, TEntity>(thi

private static IServiceCollection AddConsumer<TEntity>(this IServiceCollection services, string sectionName, bool activateTelemetry = true)
{
services.AddTransient<IRabbitMqFactory, RabbitMqFactory>();
services.AddSingleton<IRabbitMqFactory, RabbitMqFactory>();

if (activateTelemetry)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
10 changes: 5 additions & 5 deletions src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Liquid.Messaging.Extensions;
using Liquid.Messaging.Interfaces;
using Liquid.Messaging.RabbitMq.Settings;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
Expand All @@ -26,7 +27,6 @@ public class RabbitMqConsumer<TEntity> : ILiquidConsumer<TEntity>
private readonly IRabbitMqFactory _factory;
private readonly RabbitMqConsumerSettings _settings;


///<inheritdoc/>
public event Func<ProcessMessageEventArgs<TEntity>, CancellationToken, Task> ProcessMessageAsync;

Expand All @@ -51,7 +51,7 @@ public void RegisterMessageHandler()
{
if (ProcessMessageAsync is null)
{
throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class.");
throw new NotImplementedException($"The {nameof(ProcessMessageAsync)} action must be added to class.");
}

_channelModel = _factory.GetReceiver(_settings);
Expand Down Expand Up @@ -79,12 +79,12 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella
_channelModel.BasicAck(deliverEvent.DeliveryTag, false);
}
}
catch (Exception ex)
catch (Exception)
{
if (!_autoAck)
{
_channelModel.BasicNack(deliverEvent.DeliveryTag, false, true);

throw new MessagingConsumerException(ex);
}
}
}

Expand Down
82 changes: 60 additions & 22 deletions src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,50 @@ namespace Liquid.Messaging.RabbitMq
{
///<inheritdoc/>
[ExcludeFromCodeCoverage]
public class RabbitMqFactory : IRabbitMqFactory
public class RabbitMqFactory : IRabbitMqFactory, IDisposable
{

private IConnection _connection;
private IModel _model;
private bool _disposed;

/// <summary>
/// Initialize a new instace of <see cref="RabbitMqFactory"/>
/// </summary>
public RabbitMqFactory()
{

}

///<inheritdoc/>
public IModel GetReceiver(RabbitMqConsumerSettings settings)
{
try
{
var connectionFactory = new ConnectionFactory
if (_connection == null && _model == null)
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

var connection = connectionFactory.CreateConnection();
var channelModel = connection.CreateModel();
_connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName);
_model = _connection.CreateModel();
}

if (settings.QueueSettings.Prefetch.HasValue &&
settings.QueueSettings.PrefetchCount.HasValue &&
settings.QueueSettings.Global.HasValue)
{
channelModel.BasicQos(settings.QueueSettings.Prefetch.Value,
_model.BasicQos(settings.QueueSettings.Prefetch.Value,
settings.QueueSettings.PrefetchCount.Value,
settings.QueueSettings.Global.Value);
}

channelModel.QueueBind(settings.Queue, settings.Exchange, string.Empty);
_model.QueueBind(settings.Queue, settings.Exchange, string.Empty);

return channelModel;
return _model;
}
catch (Exception ex)
{
Expand All @@ -58,22 +65,53 @@ public IModel GetSender(RabbitMqProducerSettings settings)
{
try
{
var connectionFactory = new ConnectionFactory
if (_connection == null && _model == null)
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

var connection = connectionFactory.CreateConnection();
var channelModel = connection.CreateModel();

return channelModel;
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

_connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName);
_model = _connection.CreateModel();
}

return _model;
}
catch (Exception ex)
{
throw new MessagingMissingConfigurationException(ex, "for exange '" + settings?.Exchange + "'");
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <param name="disposing">Indicates if method should perform dispose.</param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_connection?.Dispose();
_model?.Dispose();
}

_disposed = true;
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

}
}
8 changes: 8 additions & 0 deletions src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ public class RabbitMqSettings
/// </value>
public string ConnectionString { get; set; }

/// <summary>
/// Gets or sets the name of the connection.
/// </summary>
/// <value>
/// The connection string.
/// </value>
public string ConnectionName { get; set; }

/// <summary>
/// Gets or sets the request heart beat in seconds.
/// </summary>
Expand Down
8 changes: 2 additions & 6 deletions src/Liquid.Messaging/LiquidBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Liquid.Messaging.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -38,13 +39,9 @@ public LiquidBackgroundService(IServiceProvider serviceProvider, ILiquidConsumer
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.ProcessMessageAsync += ProcessMessageAsync;

_consumer.RegisterMessageHandler();

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await Task.CompletedTask;
}

/// <summary>
Expand All @@ -59,7 +56,6 @@ public async Task ProcessMessageAsync(ProcessMessageEventArgs<TEntity> args, Can
using (IServiceScope scope = _serviceProvider.CreateScope())
{
var worker = scope.ServiceProvider.GetRequiredService<ILiquidWorker<TEntity>>();

await worker.ProcessMessageAsync(args, cancellationToken);
}
}
Expand Down
22 changes: 2 additions & 20 deletions test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Liquid.Messaging.Exceptions;
using Liquid.Messaging.RabbitMq.Settings;
using Liquid.Messaging.RabbitMq.Tests.Mock;
using Microsoft.Extensions.Logging;
using NSubstitute;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
Expand All @@ -15,6 +16,7 @@ namespace Liquid.Messaging.RabbitMq.Tests
public class RabbitMqConsumerTest : RabbitMqConsumer<MessageMock>
{
public static readonly IRabbitMqFactory _factory = Substitute.For<IRabbitMqFactory>();

public RabbitMqConsumerTest()
: base(_factory, new RabbitMqConsumerSettings())
{
Expand Down Expand Up @@ -61,26 +63,6 @@ public async Task MessageHandler_WhenProcessExecutedSucessfully()
await MessageHandler(message, new CancellationToken());
}

[Fact]
public async Task MessageHandler_WhenProcessExecutionFail_ThrowException()
{
var message = new BasicDeliverEventArgs();

var entity = new MessageMock() { TestMessageId = 2 };

message.Body = entity.ToJsonBytes();

var messageReceiver = Substitute.For<IModel>();
_factory.GetReceiver(Arg.Any<RabbitMqConsumerSettings>()).Returns(messageReceiver);

ProcessMessageAsync += ProcessMessageAsyncMock;

var task = MessageHandler(message, new CancellationToken());

await Assert.ThrowsAsync<MessagingConsumerException>(() => task);

}

#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
private async Task ProcessMessageAsyncMock(ProcessMessageEventArgs<MessageMock> args, CancellationToken cancellationToken)
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
Expand Down

0 comments on commit fc52fc1

Please sign in to comment.