-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10551: Add topic id support to produce request and response #15968
base: trunk
Are you sure you want to change the base?
Conversation
1805c4f
to
a8f0c91
Compare
Few of the failed tests are related to this change and am working on fixing them |
63a6032
to
8c3602b
Compare
I believe that failed tests now are unrelated |
My understanding is that all requests going forward will use ID and not name similar to fetch request. I believe that is what is in the PR, but the comment suggests otherwise. |
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, | |||
// This will be set by completeBatch. | |||
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); | |||
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { | |||
TopicPartition tp = new TopicPartition(r.name(), p.index()); | |||
// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. | |||
String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we do if metadata has refreshed and is no longer in the metadata?
For fetch it is a bit different since we have the session logic, and can handle missing topics.
I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If topic has been recreated and topic id is out of date, the client will get
UNKNOWN_TOPIC_ID
and on the retry the topic id will be updated - If topic has been reassigned to another broker then the client will get
NOT_LEADER_OR_FOLLOWER
and then the client can retry with the right broker. - Am not sure what
upgrade case/downgrade
you refer too here Do you mean the client and broker IBP combination? If yes then some of these are covered inProduceRequestTest
andRequestResponseTest
I added two test cases to cover the first two and the producer seem to self recover on retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this.
Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility.
We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Outdated
Show resolved
Hide resolved
@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig, | |||
*/ | |||
private def appendToLocalLog(internalTopicsAllowed: Boolean, | |||
origin: AppendOrigin, | |||
entriesPerPartition: Map[TopicPartition, MemoryRecords], | |||
entriesPerPartition: Map[TopicIdPartition, MemoryRecords], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason to pass this data structure here if we are not using the ID to check the append at the log level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two reasons here
- I didn't want to keep convert between
TopicIdPartitions
toTopicPartition
- KAFKA-16212 will eventually use
TopicIdPartitions
togetPartitionOrException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok -- once we start using these across the log layer it makes sense.
I would recommend taking a look at where we are passing the topic ID through and the checks we do. If we think it is useful to ensure we are writing to the right topic, we should do it, but if it is just adding complexity, we may want to consider changing. |
I meant that in Json files both will be marked |
8daeb62
to
1abc2ac
Compare
1abc2ac
to
35dba4b
Compare
ProduceRequest
/ProduceResponse
. Topic name and Topic Id will becomeignorable
following the footstep ofFetchRequest
/FetchResponse
HostedPartition
usingTopicPartition
and doesn't check topic id. This is an [OPEN QUESTION] if we should address this in this pr or wait for KAFKA-16212 as this will updateReplicaManager::getPartition
to useTopicIdParittion
once we update the cache. Other option is that we compare providedtopicId
withPartition
topic id and returnUNKNOW_TOPIC_ID
orUNKNOW_TOPIC_PARTITION
if we can't find partition with matched topic id.Committer Checklist (excluded from commit message)