Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. #15961

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from

Conversation

appchemist
Copy link
Contributor

@appchemist appchemist commented May 15, 2024

  • Add ErrorPropagateMetadataUpdater
    • Just proxy but propagates error though BackgroundEventHandler

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -2974,7 +2974,7 @@ public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) {
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));

assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ofMillis(100)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing you had to change the poll timeout here to make the test pass? Definitely a sign that we have some issues still. We should be able to keep the test as it was, passing with poll(0), as it does for the legacy consumer. I would suggest to address my concern above, and then review this to know if there is something else making the test fail with poll(0)

@lianetm
Copy link
Contributor

lianetm commented May 15, 2024

Hey @appchemist , thanks a lot for taking this one! I left some comments, mainly concerns about the approach, this is definitely a tricky bit. Thanks!

@appchemist
Copy link
Contributor Author

Hi @lianetm
Thanks for review!

@appchemist appchemist requested a review from lianetm May 16, 2024 00:21
@appchemist appchemist changed the title KAFKA-16764: New consumer should throw InvalidTopicException on poll … [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata May 16, 2024
@philipnee
Copy link
Collaborator

hey @appchemist - let us know if this is ready for review. thanks!

@appchemist
Copy link
Contributor Author

@philipnee I got it. Thank you!

@appchemist appchemist changed the title [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. May 17, 2024
@appchemist
Copy link
Contributor Author

@lianetm & @philipnee If you have a moment, Please take a look

@appchemist appchemist changed the title [Draft] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata May 22, 2024
@lianetm
Copy link
Contributor

lianetm commented May 22, 2024

Hey @appchemist, thanks for the updates! sorry I'm caught up in other tasks but we discussed offline and @philipnee will follow-up here to help with reviews on this one. Thanks!

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @appchemist!

I'm a bit concerned about the current approach because it touches a lot of low-level sections of the client network layer.

So my first, general question is, 'Is there a simpler way to handle this?'

Ultimately, we want the consumer to invoke Metadata.maybeThrowAnyException(), right?

NetworkClientDelegate is to the new consumer as ConsumerNetworkClient is to the old consumer. So could we update NetworkClientDelegate.poll() to include a call to maybeThrowAnyException() similar to how ConsumerNetworkClient.poll() invokes maybeThrowAnyException()?

Thanks!

@appchemist
Copy link
Contributor Author

@lianetm I got it, Thank you!
@kirktrue Right! I think what you said is simple and more intuitive.

@appchemist
Copy link
Contributor Author

kindly ping @philipnee

@lianetm
Copy link
Contributor

lianetm commented May 28, 2024

Hey @appchemist , thanks for the updates! High level comment with the goal of trying to simplify if possible. I don't quite get the need for a new way of updating metadata (ErrorPropagateMetadataUpdater) when it seems to me that the way metadata is updated is not different for the new consumer vs the legacy one. It's how the errors are propagated what's different here (and what we need to sort out). Also I noted that the new component is propagating the error on every update action, which is not exactly the same timing we have in the old consumer, where metadata updates only save the errors to be thrown later. So what about the option of just keeping the metadata update logic as it was, and only changing the propagation logic?

  1. call a new maybeThrowMetadataErrors() in the NetworkClientDelegate poll()
  2. define maybeThrowMetadataErrors simply as :
    private void maybeThrowMetadataErrors() {
        try {
            metadata.maybeThrowAnyException();
        } catch (Exception e) {
            backgroundEventHandler.add(new ErrorEvent(e));
        }
    }

With this, we know that metadata updates will be applied in the same way for both consumer impl (ConsumerNetworkClient and NetworkClientDelegate), in the end, that update is all about keeping an in-memory view of the metadata received in a MetadataResponse. Then the only difference is how the error is propagated (directly thrown in the legacy consumer vs propagated via events in the new one). Does this make sense?

Also I suggest we add a test in the NetworkClientDelegate, mocking metadata errors, and seeing how polling the network client should generate the ErrorEvent.

Thanks!

@@ -172,7 +173,37 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
null,
new DefaultHostResolver(),
throttleTimeSensor,
clientTelemetrySender);
clientTelemetrySender,
Function.identity());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we are going pretty deep to access the metadata error, i wonder if we could simplify the pattern.

we can probably directly access the metadata error in the background thread and send it to the client, in this case we don't need to create a separated wrapper around the updater.

wdyt if we just create a metadata error manager in the consumer network thread level and poll the metadata error directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting creating a RequestManager for Metadata Errors?

In my understanding, the role of a RequestManager is to manage outgoing requests.
However, looking at the MembershipManagerImpl, It doesn't seem like its role is necessarily limited to requests.
By adopting this approach, we could propagate Metadata Error while maintaining the structure of the new Consumer.
So I think it's good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philipnee I've applied it

@appchemist
Copy link
Contributor Author

appchemist commented May 29, 2024

To summarize, the locations where Metadata Errors can be handled, it seems like three main options exist:

  • ConsumerNetworkThread level
  • NetworkClientDelegate level
  • KafkaClient level
    • I think we all agree that the location is too deep and low-level for handling Metadata Errors.

When handled by NetworkClientDelegate

For legacy Consumers, ConsumerNetworkClient.poll is propagating the error.
For new Consumers, NetworkClientDelegate appears to provide high-level access to KafkaClient, like ConsumerNetworkClient.
From this perspective, handling Metadata Errors in NetworkClientDelegate.poll can be considered analogous to the approach in the legacy Consumer.
Since both legacy and new Consumers handle error at the same level, this is considered an expected location.
I think that's what the people who commented about handling it here were focusing on.
What do you think? @philipnee

When handled by ConsumerNetworkThread

Handling in NetworkClientDelegate.poll is called inside ConsumerNetworkThread.runOnce, which hides the intention of error propagation.
Also, it adds the responsibility of propagating metadata errors to the existing responsibility of interfacing with the NetworkClient.
But ConsumerNetworkThread.runOnce is explicitly defines and runs a background thread task.
So There is an opinion that create Metadata Error Manager in ConsumerNetworkThread level.
What do you think? @lianetm & @kirktrue

My Opinion

My suggestion would be to add a RequestManager for Metadata Error in ConsumerNetworkThread level.
This is because the NetworkClientDelegate focused solely on interfacing with the network client and handling errors within ConsumerNetworkThread.

Please let me know if I've misunderstood anything

…when invalid topic in metadata

- add MetadataUpdaterProxy
- propagate metadata errors from background thread to foreground thread
…when invalid topic in metadata

- cleaning code
- add unittest
…when invalid topic in metadata

- add assert statement
…when invalid topic in metadata

- revert changes
…when invalid topic in metadata

- add MetadataErrorManager
…when invalid topic in metadata

- add ConsumerNetworkThreadTest cases
@appchemist appchemist changed the title KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. May 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants