diff --git a/src/Liquid.Messaging.Kafka/KafkaConsumer.cs b/src/Liquid.Messaging.Kafka/KafkaConsumer.cs index 52e5e1c..cbd6ddb 100644 --- a/src/Liquid.Messaging.Kafka/KafkaConsumer.cs +++ b/src/Liquid.Messaging.Kafka/KafkaConsumer.cs @@ -46,6 +46,8 @@ public Task RegisterMessageHandler(CancellationToken cancellationToken = default throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class."); } + ProcessErrorAsync += ProcessError; + _client = _factory.GetConsumer(_settings); @@ -72,7 +74,7 @@ protected async Task MessageHandler(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { - var deliverEvent = _client.Consume(); + var deliverEvent = _client.Consume(cancellationToken); _ = Task.Run(async () => { @@ -86,9 +88,7 @@ protected async Task MessageHandler(CancellationToken cancellationToken) } } catch (Exception ex) - { - _client.Close(); - + { var errorArgs = new ConsumerErrorEventArgs { Exception = ex, @@ -118,5 +118,11 @@ private ConsumerMessageEventArgs GetEventArgs(ConsumeResult { Data = data, Headers = headers }; } + + private Task ProcessError(ConsumerErrorEventArgs args) + { + _client.Close(); + throw args.Exception; + } } } \ No newline at end of file diff --git a/src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj b/src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj index 7febf85..9c505e9 100644 --- a/src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj +++ b/src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj @@ -10,7 +10,7 @@ Avanade 2019 https://github.com/Avanade/Liquid-Application-Framework logo.png - 8.0.0-beta-04 + 8.0.0-beta-05 true The Liquid.Messaging.Kafka provides producer and consumer patterns to allow the send and consumption of Messaging inside your microservice. @@ -28,10 +28,7 @@ + - - - -