You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
reactive stream consumer : reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
#23
Open
patpatpat123 opened this issue
Jan 29, 2023
· 3 comments
Just wanted to start an issue reporting an exception observed for the sample reactive stream consumer.
Just to avoid confusion, there is a kafka producer/consumer, there is a stream producer/consumer, there is a reactive stream producer/consumer. The issue is observed with the later.
I just took the code as it is, and just modified the grade file to a pom.
Please correct me if I am wrong, but I do not believe to have forgotten something while migrating to the pom.
package com.example.micrometer;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import java.util.function.Consumer;
@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);
public static void main(String... args) {
new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
log.warn("Remember about calling <.subscribe()> at the end of your Consumer<Flux> bean!");
log.warn("Remember about finishing the span manually before calling subscribe!");
}
@Bean
Consumer<Flux<Message<String>>> channel(Tracer tracer, ObservationRegistry observationRegistry) {
return flux -> flux.doOnNext(msg -> log.info("<ACCEPTANCE_TEST> <TRACE:{}> Hello from consumer",
tracer.currentSpan().context().traceId())).subscribe();
}
}
When running the reactive streaming consumer app, the app starts and run fine.
However, upon consuming a message, this error happens:
2023-01-29T13:03:13.315+08:00 INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Found no committed offset for partition myinput-0
2023-01-29T13:03:13.322+08:00 INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Resetting offset for partition myinput-0 to position FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}.
2023-01-29T13:03:13.328+08:00 INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2 : anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4: partitions assigned: [myinput-0]
2023-01-29T13:05:18.316+08:00 INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'stream-reactive-consumer.channel-in-0' has 0 subscriber(s).
2023-01-29T13:05:18.317+08:00 ERROR [stream-reactive-consumer,,] 33308 --- [container-0-C-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
Caused by: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
at com.example.micrometer.StreamReactiveConsumerApplication.lambda$channel$0(StreamReactiveConsumerApplication.java:35) ~[classes/:na]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.SinkManyUnicastNoBackpressure.tryEmitNext(SinkManyUnicastNoBackpressure.java:120) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:141) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.3.jar:6.0.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.3.jar:6.0.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.3.jar:6.0.3]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.3.jar:6.0.3]
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.10.2.jar:1.10.2]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-2.0.0.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-2.0.0.jar:na]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2859) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2839) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2762) ~[spring-kafka-3.0.1.jar:3.0.1]
at io.micrometer.observation.Observation.observe(Observation.java:559) ~[micrometer-observation-1.10.2.jar:1.10.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2760) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2612) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2498) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2144) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1499) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1463) ~[spring-kafka-3.0.1.jar:3.0.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-3.0.1.jar:3.0.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2023-01-29T13:12:10.480+08:00 INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.
2023-01-29T13:12:11.485+08:00 INFO [stream-reactive-consumer,,] 33308 --- [pool-4-thread-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-3, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.
This error is observed on the consumer, for message sent using the reactive steam producer, or the normal kafka producer.
May I ask what did I miss please?
Thank you
The text was updated successfully, but these errors were encountered:
same error. I am trying to use in ChannelInterceptor. import io.micrometer.tracing.Tracer; String traceId = tracer.currentSpan().context().traceId(); String spanId = tracer.currentSpan().context().spanId();
Hello team,
Just wanted to start an issue reporting an exception observed for the sample reactive stream consumer.
Just to avoid confusion, there is a kafka producer/consumer, there is a stream producer/consumer, there is a reactive stream producer/consumer. The issue is observed with the later.
I just took the code as it is, and just modified the grade file to a pom.
Please correct me if I am wrong, but I do not believe to have forgotten something while migrating to the pom.
I changed the topic
When running the reactive streaming consumer app, the app starts and run fine.
However, upon consuming a message, this error happens:
This error is observed on the consumer, for message sent using the reactive steam producer, or the normal kafka producer.
May I ask what did I miss please?
Thank you
The text was updated successfully, but these errors were encountered: