-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. #15961
base: trunk
Are you sure you want to change the base?
Conversation
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
@@ -2974,7 +2974,7 @@ public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) { | |||
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); | |||
consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer)); | |||
|
|||
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO)); | |||
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ofMillis(100))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing you had to change the poll timeout here to make the test pass? Definitely a sign that we have some issues still. We should be able to keep the test as it was, passing with poll(0), as it does for the legacy consumer. I would suggest to address my concern above, and then review this to know if there is something else making the test fail with poll(0)
Hey @appchemist , thanks a lot for taking this one! I left some comments, mainly concerns about the approach, this is definitely a tricky bit. Thanks! |
Hi @lianetm |
hey @appchemist - let us know if this is ready for review. thanks! |
@philipnee I got it. Thank you! |
@lianetm & @philipnee If you have a moment, Please take a look |
Hey @appchemist, thanks for the updates! sorry I'm caught up in other tasks but we discussed offline and @philipnee will follow-up here to help with reviews on this one. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @appchemist!
I'm a bit concerned about the current approach because it touches a lot of low-level sections of the client network layer.
So my first, general question is, 'Is there a simpler way to handle this?'
Ultimately, we want the consumer to invoke Metadata.maybeThrowAnyException()
, right?
NetworkClientDelegate
is to the new consumer as ConsumerNetworkClient
is to the old consumer. So could we update NetworkClientDelegate.poll()
to include a call to maybeThrowAnyException()
similar to how ConsumerNetworkClient.poll()
invokes maybeThrowAnyException()
?
Thanks!
kindly ping @philipnee |
Hey @appchemist , thanks for the updates! High level comment with the goal of trying to simplify if possible. I don't quite get the need for a new way of updating metadata (
With this, we know that metadata updates will be applied in the same way for both consumer impl (ConsumerNetworkClient and NetworkClientDelegate), in the end, that update is all about keeping an in-memory view of the metadata received in a MetadataResponse. Then the only difference is how the error is propagated (directly thrown in the legacy consumer vs propagated via events in the new one). Does this make sense? Also I suggest we add a test in the Thanks! |
@@ -172,7 +173,37 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |||
null, | |||
new DefaultHostResolver(), | |||
throttleTimeSensor, | |||
clientTelemetrySender); | |||
clientTelemetrySender, | |||
Function.identity()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we are going pretty deep to access the metadata error, i wonder if we could simplify the pattern.
we can probably directly access the metadata error in the background thread and send it to the client, in this case we don't need to create a separated wrapper around the updater.
wdyt if we just create a metadata error manager in the consumer network thread level and poll the metadata error directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting creating a RequestManager for Metadata Errors?
In my understanding, the role of a RequestManager is to manage outgoing requests.
However, looking at the MembershipManagerImpl, It doesn't seem like its role is necessarily limited to requests.
By adopting this approach, we could propagate Metadata Error while maintaining the structure of the new Consumer.
So I think it's good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee I've applied it
To summarize, the locations where Metadata Errors can be handled, it seems like three main options exist:
When handled by
|
…when invalid topic in metadata
…when invalid topic in metadata - add MetadataUpdaterProxy - propagate metadata errors from background thread to foreground thread
…when invalid topic in metadata - cleaning code - add unittest
…when invalid topic in metadata - add assert statement
…when invalid topic in metadata - fix test
…when invalid topic in metadata - revert changes
…when invalid topic in metadata - add MetadataErrorManager
…when invalid topic in metadata - add ConsumerNetworkThreadTest cases
Committer Checklist (excluded from commit message)