Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactive stream consumer : reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null #23

Open
patpatpat123 opened this issue Jan 29, 2023 · 3 comments

Comments

@patpatpat123
Copy link

patpatpat123 commented Jan 29, 2023

Hello team,

Just wanted to start an issue reporting an exception observed for the sample reactive stream consumer.

Just to avoid confusion, there is a kafka producer/consumer, there is a stream producer/consumer, there is a reactive stream producer/consumer. The issue is observed with the later.

I just took the code as it is, and just modified the grade file to a pom.
Please correct me if I am wrong, but I do not believe to have forgotten something while migrating to the pom.

package com.example.micrometer;

import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;

import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        log.warn("Remember about calling <.subscribe()> at the end of your Consumer<Flux> bean!");
        log.warn("Remember about finishing the span manually before calling subscribe!");
    }

    @Bean
    Consumer<Flux<Message<String>>> channel(Tracer tracer, ObservationRegistry observationRegistry) {
        return flux -> flux.doOnNext(msg -> log.info("<ACCEPTANCE_TEST> <TRACE:{}> Hello from consumer",
                tracer.currentSpan().context().traceId())).subscribe();
    }

}

I changed the topic

spring:
  application:
    name: stream-reactive-consumer
  cloud:
    stream:
      bindings:
        channel-in-0.destination: myinput

# For tests
logging.pattern.level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

When running the reactive streaming consumer app, the app starts and run fine.

However, upon consuming a message, this error happens:

2023-01-29T13:03:13.315+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Found no committed offset for partition myinput-0
2023-01-29T13:03:13.322+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Resetting offset for partition myinput-0 to position FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 1 rack: null)], epoch=0}}.
2023-01-29T13:03:13.328+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4: partitions assigned: [myinput-0]
2023-01-29T13:05:18.316+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'stream-reactive-consumer.channel-in-0' has 0 subscriber(s).
2023-01-29T13:05:18.317+08:00 ERROR [stream-reactive-consumer,,] 33308 --- [container-0-C-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
Caused by: java.lang.NullPointerException: Cannot invoke "io.micrometer.tracing.Span.context()" because the return value of "io.micrometer.tracing.Tracer.currentSpan()" is null
	at com.example.micrometer.StreamReactiveConsumerApplication.lambda$channel$0(StreamReactiveConsumerApplication.java:35) ~[classes/:na]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.SinkManyUnicastNoBackpressure.tryEmitNext(SinkManyUnicastNoBackpressure.java:120) ~[reactor-core-3.5.1.jar:3.5.1]
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.5.1.jar:3.5.1]
	at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:141) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.3.jar:6.0.3]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.3.jar:6.0.3]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.3.jar:6.0.3]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.3.jar:6.0.3]
	at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.10.2.jar:1.10.2]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.1.jar:6.0.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-2.0.0.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-2.0.0.jar:na]
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425) ~[spring-integration-kafka-6.0.1.jar:6.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2859) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2839) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2762) ~[spring-kafka-3.0.1.jar:3.0.1]
	at io.micrometer.observation.Observation.observe(Observation.java:559) ~[micrometer-observation-1.10.2.jar:1.10.2]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2760) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2612) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2498) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2144) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1499) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1463) ~[spring-kafka-3.0.1.jar:3.0.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1338) ~[spring-kafka-3.0.1.jar:3.0.1]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-01-29T13:12:10.480+08:00  INFO [stream-reactive-consumer,,] 33308 --- [container-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-2, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.
2023-01-29T13:12:11.485+08:00  INFO [stream-reactive-consumer,,] 33308 --- [pool-4-thread-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4-3, groupId=anonymous.3f50a5b5-9e4f-475f-b375-28a78f0310e4] Node -1 disconnected.

This error is observed on the consumer, for message sent using the reactive steam producer, or the normal kafka producer.
May I ask what did I miss please?

Thank you

@cmergenthaler
Copy link

Any updates on that? I have the exact same issue

@cmergenthaler
Copy link

FYI: I had to enable tracing for spring-kafka in order to fix the error

@birbirsbirbirs
Copy link

same error. I am trying to use in ChannelInterceptor.
import io.micrometer.tracing.Tracer; String traceId = tracer.currentSpan().context().traceId(); String spanId = tracer.currentSpan().context().spanId();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants