Skip to content

Commit

Permalink
Correção do gerenciamento de conexões com o RabbitMq
Browse files Browse the repository at this point in the history
  • Loading branch information
dervanil-junior committed Jun 28, 2023
1 parent 927418b commit f1d4b85
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static class IServiceCollectionExtension
public static IServiceCollection AddLiquidRabbitMqProducer<TEntity>(this IServiceCollection services, string sectionName, bool activateTelemetry = true)
{
services.TryAddTransient<IRabbitMqFactory, RabbitMqFactory>();

if (activateTelemetry)
{
services.AddScoped((provider) =>
Expand Down Expand Up @@ -94,7 +95,7 @@ public static IServiceCollection AddLiquidRabbitMqConsumer<TWorker, TEntity>(thi

private static IServiceCollection AddConsumer<TEntity>(this IServiceCollection services, string sectionName, bool activateTelemetry = true)
{
services.AddTransient<IRabbitMqFactory, RabbitMqFactory>();
services.AddSingleton<IRabbitMqFactory, RabbitMqFactory>();

if (activateTelemetry)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0-preview.2.21154.6" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
82 changes: 60 additions & 22 deletions src/Liquid.Messaging.RabbitMq/RabbitMqFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,50 @@ namespace Liquid.Messaging.RabbitMq
{
///<inheritdoc/>
[ExcludeFromCodeCoverage]
public class RabbitMqFactory : IRabbitMqFactory
public class RabbitMqFactory : IRabbitMqFactory, IDisposable
{

private IConnection _connection;
private IModel _model;
private bool _disposed;

/// <summary>
/// Initialize a new instace of <see cref="RabbitMqFactory"/>
/// </summary>
public RabbitMqFactory()
{

}

///<inheritdoc/>
public IModel GetReceiver(RabbitMqConsumerSettings settings)
{
try
{
var connectionFactory = new ConnectionFactory
if (_connection == null && _model == null)
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

var connection = connectionFactory.CreateConnection();
var channelModel = connection.CreateModel();
_connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName);
_model = _connection.CreateModel();
}

if (settings.QueueSettings.Prefetch.HasValue &&
settings.QueueSettings.PrefetchCount.HasValue &&
settings.QueueSettings.Global.HasValue)
{
channelModel.BasicQos(settings.QueueSettings.Prefetch.Value,
_model.BasicQos(settings.QueueSettings.Prefetch.Value,
settings.QueueSettings.PrefetchCount.Value,
settings.QueueSettings.Global.Value);
}

channelModel.QueueBind(settings.Queue, settings.Exchange, string.Empty);
_model.QueueBind(settings.Queue, settings.Exchange, string.Empty);

return channelModel;
return _model;
}
catch (Exception ex)
{
Expand All @@ -58,22 +65,53 @@ public IModel GetSender(RabbitMqProducerSettings settings)
{
try
{
var connectionFactory = new ConnectionFactory
if (_connection == null && _model == null)
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

var connection = connectionFactory.CreateConnection();
var channelModel = connection.CreateModel();

return channelModel;
var connectionFactory = new ConnectionFactory
{
Uri = new Uri(settings.QueueSettings.ConnectionString),
RequestedHeartbeat = TimeSpan.FromSeconds(settings.QueueSettings?.RequestHeartBeatInSeconds ?? 60),
AutomaticRecoveryEnabled = settings.QueueSettings?.AutoRecovery ?? true
};

_connection = connectionFactory.CreateConnection(settings.QueueSettings.ConnectionName);
_model = _connection.CreateModel();
}

return _model;
}
catch (Exception ex)
{
throw new MessagingMissingConfigurationException(ex, "for exange '" + settings?.Exchange + "'");
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <param name="disposing">Indicates if method should perform dispose.</param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_connection?.Dispose();
_model?.Dispose();
}

_disposed = true;
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

}
}
8 changes: 8 additions & 0 deletions src/Liquid.Messaging.RabbitMq/Settings/RabbitMqSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ public class RabbitMqSettings
/// </value>
public string ConnectionString { get; set; }

/// <summary>
/// Gets or sets the name of the connection.
/// </summary>
/// <value>
/// The connection string.
/// </value>
public string ConnectionName { get; set; }

/// <summary>
/// Gets or sets the request heart beat in seconds.
/// </summary>
Expand Down
9 changes: 1 addition & 8 deletions src/Liquid.Messaging/LiquidBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,9 @@ public LiquidBackgroundService(IServiceProvider serviceProvider, ILiquidConsumer
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.ProcessMessageAsync += ProcessMessageAsync;

_consumer.RegisterMessageHandler();

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
await Task.CompletedTask;
}

/// <summary>
Expand All @@ -57,14 +53,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="cancellationToken"></param>
public async Task ProcessMessageAsync(ProcessMessageEventArgs<TEntity> args, CancellationToken cancellationToken)
{

using (IServiceScope scope = _serviceProvider.CreateScope())
{
var worker = scope.ServiceProvider.GetRequiredService<ILiquidWorker<TEntity>>();

await worker.ProcessMessageAsync(args, cancellationToken);
}

}
}
}

0 comments on commit f1d4b85

Please sign in to comment.