From 7f15b0767979963d1f67d478d137d5516ad11c74 Mon Sep 17 00:00:00 2001 From: Dervanil Antonio da Silva Junior Date: Wed, 8 Jun 2022 09:14:42 -0300 Subject: [PATCH 1/4] =?UTF-8?q?-=20Corre=C3=A7=C3=B5es=20LiquidBackgroundS?= =?UTF-8?q?ervice=20adi=C3=A7=C3=A3o=20de=20async/await=20no=20metodo=20Pr?= =?UTF-8?q?ocessMessageAsync=20-=20Corre=C3=A7=C3=B5es=20Liquid=20Comsumer?= =?UTF-8?q?,=20remo=C3=A7=C3=A3o=20sem=20tratamento=20direto,=20que=20fazi?= =?UTF-8?q?a=20worker=20reiniciar,=20adi=C3=A7=C3=A3o=20de=20log=20para=20?= =?UTF-8?q?estes=20casos.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs | 9 ++++++--- src/Liquid.Messaging/LiquidBackgroundService.cs | 7 +++++-- .../RabbitMqConsumerTest.cs | 9 ++++++--- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs b/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs index e0b4d44d..9c6491d7 100644 --- a/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs +++ b/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs @@ -3,6 +3,7 @@ using Liquid.Messaging.Extensions; using Liquid.Messaging.Interfaces; using Liquid.Messaging.RabbitMq.Settings; +using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; @@ -24,7 +25,7 @@ public class RabbitMqConsumer : ILiquidConsumer private IModel _channelModel; private readonly IRabbitMqFactory _factory; private readonly RabbitMqConsumerSettings _settings; - + private ILogger> _logger; /// public event Func, CancellationToken, Task> ProcessMessageAsync; @@ -37,11 +38,13 @@ public class RabbitMqConsumer : ILiquidConsumer /// /// RabbitMq client factory. /// Configuration properties set. - public RabbitMqConsumer(IRabbitMqFactory factory, RabbitMqConsumerSettings settings) + /// Logger service instance. + public RabbitMqConsumer(IRabbitMqFactory factory, RabbitMqConsumerSettings settings, ILogger> logger) { _factory = factory ?? throw new ArgumentNullException(nameof(factory)); _settings = settings ?? throw new ArgumentNullException(nameof(settings)); + _logger = logger; _autoAck = _settings.AdvancedSettings?.AutoAck ?? true; } @@ -83,7 +86,7 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella if (!_autoAck) _channelModel.BasicNack(deliverEvent.DeliveryTag, false, true); - throw new MessagingConsumerException(ex); + _logger.LogError(ex, ex.Message); } } diff --git a/src/Liquid.Messaging/LiquidBackgroundService.cs b/src/Liquid.Messaging/LiquidBackgroundService.cs index 68a5c59f..09bd099b 100644 --- a/src/Liquid.Messaging/LiquidBackgroundService.cs +++ b/src/Liquid.Messaging/LiquidBackgroundService.cs @@ -1,6 +1,7 @@ using Liquid.Messaging.Interfaces; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using System; using System.Threading; using System.Threading.Tasks; @@ -54,14 +55,16 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// /// /// - public Task ProcessMessageAsync(ProcessMessageEventArgs args, CancellationToken cancellationToken) + public async Task ProcessMessageAsync(ProcessMessageEventArgs args, CancellationToken cancellationToken) { + using (IServiceScope scope = _serviceProvider.CreateScope()) { var worker = scope.ServiceProvider.GetRequiredService>(); - return worker.ProcessMessageAsync(args, cancellationToken); + await worker.ProcessMessageAsync(args, cancellationToken); } + } } } diff --git a/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs b/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs index 46586300..60a0edf9 100644 --- a/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs +++ b/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs @@ -2,6 +2,7 @@ using Liquid.Messaging.Exceptions; using Liquid.Messaging.RabbitMq.Settings; using Liquid.Messaging.RabbitMq.Tests.Mock; +using Microsoft.Extensions.Logging; using NSubstitute; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -15,8 +16,10 @@ namespace Liquid.Messaging.RabbitMq.Tests public class RabbitMqConsumerTest : RabbitMqConsumer { public static readonly IRabbitMqFactory _factory = Substitute.For(); + public static readonly ILogger> _logger = Substitute.For>>(); + public RabbitMqConsumerTest() - : base(_factory, new RabbitMqConsumerSettings()) + : base(_factory, new RabbitMqConsumerSettings(), _logger) { } @@ -62,7 +65,7 @@ public async Task MessageHandler_WhenProcessExecutedSucessfully() } [Fact] - public async Task MessageHandler_WhenProcessExecutionFail_ThrowException() + public void MessageHandler_WhenProcessExecutionFail_LogException() { var message = new BasicDeliverEventArgs(); @@ -77,7 +80,7 @@ public async Task MessageHandler_WhenProcessExecutionFail_ThrowException() var task = MessageHandler(message, new CancellationToken()); - await Assert.ThrowsAsync(() => task); + _logger.Received(1); } From d4307c2890eece7b273cba30af6a58c5e8166778 Mon Sep 17 00:00:00 2001 From: Dervanil Antonio da Silva Junior Date: Fri, 10 Jun 2022 13:14:43 -0300 Subject: [PATCH 2/4] =?UTF-8?q?-=20Remo=C3=A7=C3=A3o=20do=20log=20-=20Muda?= =?UTF-8?q?n=C3=A7a=20de=20vers=C3=A3o=20do=20componente?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Liquid.Messaging.RabbitMq.csproj | 2 +- .../RabbitMqConsumer.cs | 13 ++++------- .../RabbitMqConsumerTest.cs | 23 +------------------ 3 files changed, 7 insertions(+), 31 deletions(-) diff --git a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj index d65165bd..0440ce37 100644 --- a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj +++ b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj @@ -9,7 +9,7 @@ Avanade 2019 https://github.com/Avanade/Liquid.Messaging logo.png - 2.0.0 + 2.0.1-preview-20220906-01 true The Liquid.Messaging.RabbitMq provides producer and consumer patterns to allow the send and consumption of Messaging inside your microservice. diff --git a/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs b/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs index 9c6491d7..91d237b7 100644 --- a/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs +++ b/src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs @@ -25,7 +25,6 @@ public class RabbitMqConsumer : ILiquidConsumer private IModel _channelModel; private readonly IRabbitMqFactory _factory; private readonly RabbitMqConsumerSettings _settings; - private ILogger> _logger; /// public event Func, CancellationToken, Task> ProcessMessageAsync; @@ -38,13 +37,11 @@ public class RabbitMqConsumer : ILiquidConsumer /// /// RabbitMq client factory. /// Configuration properties set. - /// Logger service instance. - public RabbitMqConsumer(IRabbitMqFactory factory, RabbitMqConsumerSettings settings, ILogger> logger) + public RabbitMqConsumer(IRabbitMqFactory factory, RabbitMqConsumerSettings settings) { _factory = factory ?? throw new ArgumentNullException(nameof(factory)); _settings = settings ?? throw new ArgumentNullException(nameof(settings)); - _logger = logger; _autoAck = _settings.AdvancedSettings?.AutoAck ?? true; } @@ -53,7 +50,7 @@ public void RegisterMessageHandler() { if (ProcessMessageAsync is null) { - throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class."); + throw new NotImplementedException($"The {nameof(ProcessMessageAsync)} action must be added to class."); } _channelModel = _factory.GetReceiver(_settings); @@ -81,12 +78,12 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella _channelModel.BasicAck(deliverEvent.DeliveryTag, false); } } - catch (Exception ex) + catch (Exception) { if (!_autoAck) + { _channelModel.BasicNack(deliverEvent.DeliveryTag, false, true); - - _logger.LogError(ex, ex.Message); + } } } diff --git a/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs b/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs index 60a0edf9..69214eba 100644 --- a/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs +++ b/test/Liquid.Messaging.RabbitMq.Tests/RabbitMqConsumerTest.cs @@ -16,10 +16,9 @@ namespace Liquid.Messaging.RabbitMq.Tests public class RabbitMqConsumerTest : RabbitMqConsumer { public static readonly IRabbitMqFactory _factory = Substitute.For(); - public static readonly ILogger> _logger = Substitute.For>>(); public RabbitMqConsumerTest() - : base(_factory, new RabbitMqConsumerSettings(), _logger) + : base(_factory, new RabbitMqConsumerSettings()) { } @@ -64,26 +63,6 @@ public async Task MessageHandler_WhenProcessExecutedSucessfully() await MessageHandler(message, new CancellationToken()); } - [Fact] - public void MessageHandler_WhenProcessExecutionFail_LogException() - { - var message = new BasicDeliverEventArgs(); - - var entity = new MessageMock() { TestMessageId = 2 }; - - message.Body = entity.ToJsonBytes(); - - var messageReceiver = Substitute.For(); - _factory.GetReceiver(Arg.Any()).Returns(messageReceiver); - - ProcessMessageAsync += ProcessMessageAsyncMock; - - var task = MessageHandler(message, new CancellationToken()); - - _logger.Received(1); - - } - #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously private async Task ProcessMessageAsyncMock(ProcessMessageEventArgs args, CancellationToken cancellationToken) #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously From f1d4b85791c456358c4c4f7d3d939a52fb3d8c03 Mon Sep 17 00:00:00 2001 From: Dervanil Antonio da Silva Junior Date: Wed, 28 Jun 2023 18:06:20 -0300 Subject: [PATCH 3/4] =?UTF-8?q?Corre=C3=A7=C3=A3o=20do=20gerenciamento=20d?= =?UTF-8?q?e=20conex=C3=B5es=20com=20o=20RabbitMq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IServiceCollectionExtension.cs | 3 +- .../Liquid.Messaging.RabbitMq.csproj | 2 +- .../RabbitMqFactory.cs | 82 ++++++++++++++----- .../Settings/RabbitMqSettings.cs | 8 ++ .../LiquidBackgroundService.cs | 9 +- 5 files changed, 72 insertions(+), 32 deletions(-) diff --git a/src/Liquid.Messaging.RabbitMq/Extensions/DependencyInjection/IServiceCollectionExtension.cs b/src/Liquid.Messaging.RabbitMq/Extensions/DependencyInjection/IServiceCollectionExtension.cs index c5e8d9fd..bb7c17a3 100644 --- a/src/Liquid.Messaging.RabbitMq/Extensions/DependencyInjection/IServiceCollectionExtension.cs +++ b/src/Liquid.Messaging.RabbitMq/Extensions/DependencyInjection/IServiceCollectionExtension.cs @@ -26,6 +26,7 @@ public static class IServiceCollectionExtension public static IServiceCollection AddLiquidRabbitMqProducer(this IServiceCollection services, string sectionName, bool activateTelemetry = true) { services.TryAddTransient(); + if (activateTelemetry) { services.AddScoped((provider) => @@ -94,7 +95,7 @@ public static IServiceCollection AddLiquidRabbitMqConsumer(thi private static IServiceCollection AddConsumer(this IServiceCollection services, string sectionName, bool activateTelemetry = true) { - services.AddTransient(); + services.AddSingleton(); if (activateTelemetry) { diff --git a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj index 0440ce37..e0aa8928 100644 --- a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj +++ b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs b/src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs index 07135a20..b0b0a0c2 100644 --- a/src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs +++ b/src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs @@ -8,14 +8,18 @@ namespace Liquid.Messaging.RabbitMq { /// [ExcludeFromCodeCoverage] - public class RabbitMqFactory : IRabbitMqFactory + public class RabbitMqFactory : IRabbitMqFactory, IDisposable { + + private IConnection _connection; + private IModel _model; + private bool _disposed; + /// /// Initialize a new instace of /// public RabbitMqFactory() { - } /// @@ -23,28 +27,31 @@ public IModel GetReceiver(RabbitMqConsumerSettings settings) { try { - var connectionFactory = new ConnectionFactory + if (_connection == null && _model == null) { - Uri = new Uri(settings.QueueSettings.ConnectionString), - RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60), - AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true - }; + var connectionFactory = new ConnectionFactory + { + Uri = new Uri(settings.QueueSettings.ConnectionString), + RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60), + AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true + }; - var connection = connectionFactory.CreateConnection(); - var channelModel = connection.CreateModel(); + _connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName); + _model = _connection.CreateModel(); + } if (settings.QueueSettings.Prefetch.HasValue && settings.QueueSettings.PrefetchCount.HasValue && settings.QueueSettings.Global.HasValue) { - channelModel.BasicQos(settings.QueueSettings.Prefetch.Value, + _model.BasicQos(settings.QueueSettings.Prefetch.Value, settings.QueueSettings.PrefetchCount.Value, settings.QueueSettings.Global.Value); } - channelModel.QueueBind(settings.Queue, settings.Exchange, string.Empty); + _model.QueueBind(settings.Queue, settings.Exchange, string.Empty); - return channelModel; + return _model; } catch (Exception ex) { @@ -58,22 +65,53 @@ public IModel GetSender(RabbitMqProducerSettings settings) { try { - var connectionFactory = new ConnectionFactory + if (_connection == null && _model == null) { - Uri = new Uri(settings.QueueSettings.ConnectionString), - RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60), - AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true - }; - - var connection = connectionFactory.CreateConnection(); - var channelModel = connection.CreateModel(); - - return channelModel; + var connectionFactory = new ConnectionFactory + { + Uri = new Uri(settings.QueueSettings.ConnectionString), + RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60), + AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true + }; + + _connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName); + _model = _connection.CreateModel(); + } + + return _model; } catch (Exception ex) { throw new MessagingMissingConfigurationException(ex, "for exange '" + settings?.Exchange + "'"); } } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + /// Indicates if method should perform dispose. + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _connection?.Dispose(); + _model?.Dispose(); + } + + _disposed = true; + } + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + } } diff --git a/src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs b/src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs index f74e848b..ce9554d5 100644 --- a/src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs +++ b/src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs @@ -17,6 +17,14 @@ public class RabbitMqSettings /// public string ConnectionString { get; set; } + /// + /// Gets or sets the name of the connection. + /// + /// + /// The connection string. + /// + public string ConnectionName { get; set; } + /// /// Gets or sets the request heart beat in seconds. /// diff --git a/src/Liquid.Messaging/LiquidBackgroundService.cs b/src/Liquid.Messaging/LiquidBackgroundService.cs index 09bd099b..bea7b0a8 100644 --- a/src/Liquid.Messaging/LiquidBackgroundService.cs +++ b/src/Liquid.Messaging/LiquidBackgroundService.cs @@ -39,13 +39,9 @@ public LiquidBackgroundService(IServiceProvider serviceProvider, ILiquidConsumer protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _consumer.ProcessMessageAsync += ProcessMessageAsync; - _consumer.RegisterMessageHandler(); - while (!stoppingToken.IsCancellationRequested) - { - await Task.Delay(1000, stoppingToken); - } + await Task.CompletedTask; } /// @@ -57,14 +53,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// public async Task ProcessMessageAsync(ProcessMessageEventArgs args, CancellationToken cancellationToken) { - using (IServiceScope scope = _serviceProvider.CreateScope()) { var worker = scope.ServiceProvider.GetRequiredService>(); - await worker.ProcessMessageAsync(args, cancellationToken); } - } } } From 10b3dc94c09a55adf0fce76f04149f0041493e4a Mon Sep 17 00:00:00 2001 From: Dervanil Antonio da Silva Junior Date: Thu, 29 Jun 2023 09:07:16 -0300 Subject: [PATCH 4/4] =?UTF-8?q?Mudan=C3=A7a=20de=20vers=C3=A3o=20de=20paco?= =?UTF-8?q?tes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Liquid.Messaging.RabbitMq.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj index 57746787..0d85bce7 100644 --- a/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj +++ b/src/Liquid.Messaging.RabbitMq/Liquid.Messaging.RabbitMq.csproj @@ -20,8 +20,8 @@ - - + +