Skip to content

Commit

Permalink
Merge pull request #263 from Avanade/feat/newcartridges
Browse files Browse the repository at this point in the history
wip(KafkaConsumer.cs): includes error handler.
  • Loading branch information
lucianareginalino authored Jul 29, 2024
2 parents 42ebdd0 + f1156fd commit ec097fa
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
14 changes: 10 additions & 4 deletions src/Liquid.Messaging.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public Task RegisterMessageHandler(CancellationToken cancellationToken = default
throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class.");
}

ProcessErrorAsync += ProcessError;

_client = _factory.GetConsumer(_settings);


Expand All @@ -72,7 +74,7 @@ protected async Task MessageHandler(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var deliverEvent = _client.Consume();
var deliverEvent = _client.Consume(cancellationToken);

_ = Task.Run(async () =>
{
Expand All @@ -86,9 +88,7 @@ protected async Task MessageHandler(CancellationToken cancellationToken)
}
}
catch (Exception ex)
{
_client.Close();
{
var errorArgs = new ConsumerErrorEventArgs
{
Exception = ex,
Expand Down Expand Up @@ -118,5 +118,11 @@ private ConsumerMessageEventArgs<TEntity> GetEventArgs(ConsumeResult<Ignore, str

return new ConsumerMessageEventArgs<TEntity> { Data = data, Headers = headers };
}

private Task ProcessError(ConsumerErrorEventArgs args)
{
_client.Close();
throw args.Exception;
}
}
}
7 changes: 2 additions & 5 deletions src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<Copyright>Avanade 2019</Copyright>
<PackageProjectUrl>https://github.com/Avanade/Liquid-Application-Framework</PackageProjectUrl>
<PackageIcon>logo.png</PackageIcon>
<Version>8.0.0-beta-04</Version>
<Version>8.0.0-beta-05</Version>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Description>
The Liquid.Messaging.Kafka provides producer and consumer patterns to allow the send and consumption of Messaging inside your microservice.
Expand All @@ -28,10 +28,7 @@

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="Liquid.Core" Version="8.0.0-beta-09" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Liquid.Core\Liquid.Core.csproj" />
</ItemGroup>
</Project>

0 comments on commit ec097fa

Please sign in to comment.