Skip to content

Commit

Permalink
Merge pull request #188 from Avanade/feature/updateFrameworkVersion
Browse files Browse the repository at this point in the history
 feat: update Liquid.Messaging packages target framework
  • Loading branch information
lucianareginalino authored Nov 24, 2023
2 parents 992869b + 9df243d commit b8e0cbe
Show file tree
Hide file tree
Showing 42 changed files with 346 additions and 225 deletions.
6 changes: 6 additions & 0 deletions Liquid.Messaging.lutconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<LUTConfig Version="1.0">
<Repository />
<ParallelBuilds>true</ParallelBuilds>
<ParallelTestRuns>true</ParallelTestRuns>
<TestCaseTimeout>180000</TestCaseTimeout>
</LUTConfig>
13 changes: 7 additions & 6 deletions src/Liquid.Messaging.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Confluent.Kafka;
using Liquid.Core.Extensions;
using Liquid.Core.Utils;
using Liquid.Messaging.Exceptions;
using Liquid.Messaging.Extensions;
Expand All @@ -21,10 +22,10 @@ public class KafkaConsumer<TEntity> : ILiquidConsumer<TEntity>


///<inheritdoc/>
public event Func<ProcessMessageEventArgs<TEntity>, CancellationToken, Task> ProcessMessageAsync;
public event Func<ConsumerMessageEventArgs<TEntity>, CancellationToken, Task> ConsumeMessageAsync;

///<inheritdoc/>
public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync;
public event Func<ConsumerErrorEventArgs, Task> ProcessErrorAsync;

/// <summary>
/// Initialize a new instance of <see cref="KafkaConsumer{TEntity}"/>
Expand All @@ -41,7 +42,7 @@ public KafkaConsumer(IKafkaFactory kafkaFactory, KafkaSettings kafkaSettings)
///<inheritdoc/>
public void RegisterMessageHandler()
{
if (ProcessMessageAsync is null)
if (ConsumeMessageAsync is null)
{
throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class.");
}
Expand All @@ -63,7 +64,7 @@ protected async Task MessageHandler(ConsumeResult<Ignore, string> deliverEvent,
{
try
{
await ProcessMessageAsync(GetEventArgs(deliverEvent), cancellationToken);
await ConsumeMessageAsync(GetEventArgs(deliverEvent), cancellationToken);

if (!_settings.EnableAutoCommit)
{
Expand All @@ -76,15 +77,15 @@ protected async Task MessageHandler(ConsumeResult<Ignore, string> deliverEvent,
}
}

private ProcessMessageEventArgs<TEntity> GetEventArgs(ConsumeResult<Ignore, string> deliverEvent)
private ConsumerMessageEventArgs<TEntity> GetEventArgs(ConsumeResult<Ignore, string> deliverEvent)
{
var headers = deliverEvent.Message.Headers.GetCustomHeaders();

var data = headers.Count > 0 && headers["ContentType"]?.ToString() == CommonExtensions.GZipContentType
? Encoding.UTF8.GetBytes(deliverEvent.Message.Value).GzipDecompress().ParseJson<TEntity>()
: deliverEvent.Message.Value.ParseJson<TEntity>();

return new ProcessMessageEventArgs<TEntity> { Data = data, Headers = headers };
return new ConsumerMessageEventArgs<TEntity> { Data = data, Headers = headers };
}
}
}
3 changes: 2 additions & 1 deletion src/Liquid.Messaging.Kafka/KafkaProducer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Confluent.Kafka;
using Liquid.Core.Utils;
using Liquid.Core.Extensions;
using Liquid.Messaging.Exceptions;
using Liquid.Messaging.Interfaces;
using Liquid.Messaging.Kafka.Extensions;
Expand Down Expand Up @@ -68,7 +69,7 @@ public async Task SendMessageAsync(TEntity messageBody, IDictionary<string, obje

private Message<Null, string> GetMessage(TEntity messageBody, IDictionary<string, object> customProperties)
{
var message = !_settings.CompressMessage ? messageBody.ToJson() : Encoding.UTF8.GetString(messageBody.ToJson().GzipCompress());
var message = !_settings.CompressMessage ? messageBody.ToJsonString() : Encoding.UTF8.GetString(messageBody.ToJsonString().GzipCompress());

var request = new Message<Null, string>
{
Expand Down
6 changes: 5 additions & 1 deletion src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<PackageId>Liquid.Messaging.Kafka</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Authors>Avanade Brazil</Authors>
Expand Down Expand Up @@ -31,4 +31,8 @@
<PackageReference Include="Liquid.Messaging" Version="2.0.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Liquid.Messaging\Liquid.Messaging.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<PackageId>Liquid.Messaging.RabbitMq</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Authors>Avanade Brazil</Authors>
Expand Down
14 changes: 7 additions & 7 deletions src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public class RabbitMqConsumer<TEntity> : ILiquidConsumer<TEntity>
private readonly RabbitMqConsumerSettings _settings;

///<inheritdoc/>
public event Func<ProcessMessageEventArgs<TEntity>, CancellationToken, Task> ProcessMessageAsync;
public event Func<ConsumerMessageEventArgs<TEntity>, CancellationToken, Task> ConsumeMessageAsync;

///<inheritdoc/>
public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync;
public event Func<ConsumerErrorEventArgs, Task> ProcessErrorAsync;

Check warning on line 34 in src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

The event 'RabbitMqConsumer<TEntity>.ProcessErrorAsync' is never used

Check warning on line 34 in src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

The event 'RabbitMqConsumer<TEntity>.ProcessErrorAsync' is never used

Check warning on line 34 in src/Liquid.Messaging.RabbitMq/RabbitMqConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

The event 'RabbitMqConsumer<TEntity>.ProcessErrorAsync' is never used

/// <summary>
/// Initilize an instance of <see cref="RabbitMqConsumer{TEntity}"/>
Expand All @@ -49,9 +49,9 @@ public RabbitMqConsumer(IRabbitMqFactory factory, RabbitMqConsumerSettings setti
///<inheritdoc/>
public void RegisterMessageHandler()
{
if (ProcessMessageAsync is null)
if (ConsumeMessageAsync is null)
{
throw new NotImplementedException($"The {nameof(ProcessMessageAsync)} action must be added to class.");
throw new NotImplementedException($"The {nameof(ConsumeMessageAsync)} action must be added to class.");
}

_channelModel = _factory.GetReceiver(_settings);
Expand All @@ -72,7 +72,7 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella
{
try
{
await ProcessMessageAsync(GetEventArgs(deliverEvent), cancellationToken);
await ConsumeMessageAsync(GetEventArgs(deliverEvent), cancellationToken);

if (!_autoAck)
{
Expand All @@ -88,15 +88,15 @@ protected async Task MessageHandler(BasicDeliverEventArgs deliverEvent, Cancella
}
}

private ProcessMessageEventArgs<TEntity> GetEventArgs(BasicDeliverEventArgs deliverEvent)
private ConsumerMessageEventArgs<TEntity> GetEventArgs(BasicDeliverEventArgs deliverEvent)
{
var data = deliverEvent.BasicProperties?.ContentType == CommonExtensions.GZipContentType
? deliverEvent.Body.ToArray().GzipDecompress().ParseJson<TEntity>()
: deliverEvent.Body.ToArray().ParseJson<TEntity>();

var headers = deliverEvent.BasicProperties?.Headers;

return new ProcessMessageEventArgs<TEntity> { Data = data, Headers = headers };
return new ConsumerMessageEventArgs<TEntity> { Data = data, Headers = headers };
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Liquid.Core.Extensions.DependencyInjection;
using Liquid.Messaging.Extensions.DependencyInjection;
using Liquid.Messaging.Interfaces;
using Liquid.Messaging.ServiceBus.Settings;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using Microsoft.Extensions.Configuration;

namespace Liquid.Messaging.ServiceBus.Extensions.DependencyInjection
{
Expand All @@ -21,16 +23,24 @@ public static class IServiceCollectionExtensions
/// <typeparam name="TEntity">Type of entity that will be consumed by this service instance.</typeparam>
/// <param name="services">Extended service collection instance.</param>
/// <param name="sectionName">Configuration section name.</param>
/// <param name="entityPath">Entity path to configure this producer.</param>
/// <param name="activateTelemetry">Indicates if telemetry interceptor must be registered.</param>
public static IServiceCollection AddLiquidServiceBusProducer<TEntity>(this IServiceCollection services, string sectionName, bool activateTelemetry = true)
public static IServiceCollection AddLiquidServiceBusProducer<TEntity>(this IServiceCollection services,
string sectionName, string entityPath, bool activateTelemetry = true)
{
services.AddOptions<ServiceBusSettings>()
.Configure<IConfiguration>((settings, configuration) =>
{
configuration.GetSection(sectionName).Bind(settings);
});

services.TryAddTransient<IServiceBusFactory, ServiceBusFactory>();

if (activateTelemetry)
{
services.AddScoped((provider) =>
{
return ActivatorUtilities.CreateInstance<ServiceBusProducer<TEntity>>(provider, sectionName);
return ActivatorUtilities.CreateInstance<ServiceBusProducer<TEntity>>(provider, entityPath);
});

services.AddScopedLiquidTelemetry<ILiquidProducer<TEntity>, ServiceBusProducer<TEntity>>();
Expand All @@ -39,7 +49,7 @@ public static IServiceCollection AddLiquidServiceBusProducer<TEntity>(this IServ
{
services.AddScoped<ILiquidProducer<TEntity>>((provider) =>
{
return ActivatorUtilities.CreateInstance<ServiceBusProducer<TEntity>>(provider, sectionName);
return ActivatorUtilities.CreateInstance<ServiceBusProducer<TEntity>>(provider, entityPath);
});
}

Expand All @@ -56,17 +66,24 @@ public static IServiceCollection AddLiquidServiceBusProducer<TEntity>(this IServ
/// <typeparam name="TWorker">Type of implementation from <see cref="ILiquidWorker{TEntity}"/></typeparam>
/// <param name="services">Extended service collection instance.</param>
/// <param name="sectionName">Configuration section name.</param>
/// <param name="entityPath">Entity name to configure for this worker.</param>
/// <param name="activateTelemetry">Indicates if telemetry interceptor must be registered.</param>
/// <param name="assemblies">Array of assemblies that contains domain handlers implementation.</param>
public static IServiceCollection AddLiquidServiceBusConsumer<TWorker, TEntity>(this IServiceCollection services
, string sectionName
, string entityPath
, bool activateTelemetry = true
, params Assembly[] assemblies)
where TWorker : class, ILiquidWorker<TEntity>
{
services.AddOptions<ServiceBusSettings>()
.Configure<IConfiguration>((settings, configuration) =>
{
configuration.GetSection(sectionName).Bind(settings);
});
services.AddLiquidMessageConsumer<TWorker, TEntity>(assemblies);

services.AddConsumer<TEntity>(sectionName, activateTelemetry);
services.AddConsumer<TEntity>(entityPath, activateTelemetry);

return services;
}
Expand Down
15 changes: 10 additions & 5 deletions src/Liquid.Messaging.ServiceBus/IServiceBusFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Azure.ServiceBus.Core;
using Azure.Messaging.ServiceBus;

namespace Liquid.Messaging.ServiceBus
{
Expand All @@ -8,13 +8,18 @@ namespace Liquid.Messaging.ServiceBus
public interface IServiceBusFactory
{
/// <summary>
/// Initialize and return a new instance of <see cref="MessageSender"/>.
/// Initialize and return a new instance of <see cref="ServiceBusSender"/>.
/// </summary>
IMessageSender GetSender(string sectionName);
ServiceBusSender GetSender(string entityPath);

/// <summary>
/// Initialize and return a new instance of <see cref="MessageReceiver"/>
/// Initialize and return a new instance of <see cref="ServiceBusProcessor"/>
/// </summary>
IMessageReceiver GetReceiver(string sectionName);
ServiceBusProcessor GetProcessor(string entityPath);

/// <summary>
/// Initialize and return a new instance of <see cref="ServiceBusReceiver"/>
/// </summary>
ServiceBusReceiver GetReceiver(string entityPath);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<PackageId>Liquid.Messaging.ServiceBus</PackageId>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Authors>Avanade Brazil</Authors>
Expand All @@ -22,11 +22,8 @@

<ItemGroup>
<PackageReference Include="AutoFixture.Xunit2" Version="4.17.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.1.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Liquid.Messaging\Liquid.Messaging.csproj" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.0" />
<PackageReference Include="Liquid.Messaging" Version="6.0.0-preview-20231130-01" />
</ItemGroup>

<ItemGroup>
Expand All @@ -36,4 +33,8 @@
</None>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Liquid.Messaging\Liquid.Messaging.csproj" />
</ItemGroup>

</Project>
41 changes: 22 additions & 19 deletions src/Liquid.Messaging.ServiceBus/ServiceBusConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using Liquid.Messaging.Exceptions;
using Azure.Messaging.ServiceBus;
using Liquid.Messaging.Exceptions;
using Liquid.Messaging.Interfaces;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
Expand All @@ -13,17 +13,17 @@ namespace Liquid.Messaging.ServiceBus
///<inheritdoc/>
public class ServiceBusConsumer<TEntity> : ILiquidConsumer<TEntity>
{
private IMessageReceiver _messageReceiver;
private ServiceBusProcessor _messageProcessor;

private readonly IServiceBusFactory _factory;

private readonly string _settingsName;

///<inheritdoc/>
public event Func<ProcessMessageEventArgs<TEntity>, CancellationToken, Task> ProcessMessageAsync;
public event Func<ConsumerMessageEventArgs<TEntity>, CancellationToken, Task> ConsumeMessageAsync;

///<inheritdoc/>
public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync;
public event Func<ConsumerErrorEventArgs, Task> ProcessErrorAsync;

/// <summary>
/// Initilize an instance of <see cref="ServiceBusConsumer{TEntity}"/>
Expand All @@ -39,50 +39,53 @@ public ServiceBusConsumer(IServiceBusFactory factory, string settingsName)
///<inheritdoc/>
public void RegisterMessageHandler()
{
if (ProcessMessageAsync is null)
if (ConsumeMessageAsync is null)
{
throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class.");
}

_messageReceiver = _factory.GetReceiver(_settingsName);
_messageProcessor = _factory.GetProcessor(_settingsName);

ProcessErrorAsync += ProcessError;

_messageReceiver.RegisterMessageHandler(MessageHandler, new MessageHandlerOptions(ErrorHandler));
_messageProcessor.ProcessMessageAsync += MessageHandler;
_messageProcessor.ProcessErrorAsync += ErrorHandler;

_messageProcessor.StartProcessingAsync().GetAwaiter();

}

/// <summary>
/// Process incoming messages.
/// </summary>
/// <param name="message">Message to be processed.</param>
/// <param name="cancellationToken"> Propagates notification that operations should be canceled.</param>
protected async Task MessageHandler(Message message, CancellationToken cancellationToken)
protected async Task MessageHandler(ProcessMessageEventArgs message)
{
await ProcessMessageAsync(GetEventArgs(message), cancellationToken);
await ConsumeMessageAsync(GetEventArgs(message.Message), new CancellationToken());
}

/// <summary>
/// Process exception from message handler.
/// </summary>
/// <param name="args"></param>
protected async Task ErrorHandler(ExceptionReceivedEventArgs args)
protected async Task ErrorHandler(ProcessErrorEventArgs args)
{
await ProcessErrorAsync(new ProcessErrorEventArgs()
await ProcessErrorAsync(new ConsumerErrorEventArgs()
{
Exception = args.Exception
Exception = args.Exception
});
}

private ProcessMessageEventArgs<TEntity> GetEventArgs(Message message)
private ConsumerMessageEventArgs<TEntity> GetEventArgs(ServiceBusReceivedMessage message)
{
var data = JsonSerializer.Deserialize<TEntity>(Encoding.UTF8.GetString(message.Body));

var headers = message.UserProperties;
var headers = (IDictionary<string,object>)message.ApplicationProperties;

return new ProcessMessageEventArgs<TEntity> { Data = data, Headers = headers };
return new ConsumerMessageEventArgs<TEntity> { Data = data, Headers = headers };
}

private Task ProcessError(ProcessErrorEventArgs args)
protected Task ProcessError(ConsumerErrorEventArgs args)

Check warning on line 88 in src/Liquid.Messaging.ServiceBus/ServiceBusConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

Missing XML comment for publicly visible type or member 'ServiceBusConsumer<TEntity>.ProcessError(ConsumerErrorEventArgs)'

Check warning on line 88 in src/Liquid.Messaging.ServiceBus/ServiceBusConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

Missing XML comment for publicly visible type or member 'ServiceBusConsumer<TEntity>.ProcessError(ConsumerErrorEventArgs)'

Check warning on line 88 in src/Liquid.Messaging.ServiceBus/ServiceBusConsumer.cs

View workflow job for this annotation

GitHub Actions / call-reusable-build-workflow / build

Missing XML comment for publicly visible type or member 'ServiceBusConsumer<TEntity>.ProcessError(ConsumerErrorEventArgs)'
{
throw new MessagingConsumerException(args.Exception);
}
Expand Down
Loading

0 comments on commit b8e0cbe

Please sign in to comment.