Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10551: Add topic id support to produce request and response #15968

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

OmniaGM
Copy link
Contributor

@OmniaGM OmniaGM commented May 15, 2024

  • Add support topicId in ProduceRequest/ProduceResponse. Topic name and Topic Id will become ignorable following the footstep of FetchRequest/FetchResponse
  • ReplicaManager still look for HostedPartition using TopicPartition and doesn't check topic id. This is an [OPEN QUESTION] if we should address this in this pr or wait for KAFKA-16212 as this will update ReplicaManager::getPartition to use TopicIdParittion once we update the cache. Other option is that we compare provided topicId with Partition topic id and return UNKNOW_TOPIC_ID or UNKNOW_TOPIC_PARTITION if we can't find partition with matched topic id.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@OmniaGM OmniaGM marked this pull request as draft May 15, 2024 15:53
@OmniaGM OmniaGM force-pushed the KAFKA-10551-produceRequest branch from 1805c4f to a8f0c91 Compare May 15, 2024 22:05
@OmniaGM OmniaGM marked this pull request as ready for review May 15, 2024 22:58
@OmniaGM OmniaGM changed the title Kafka-10551: Add topic id support to produce request and response KAFKA-10551: Add topic id support to produce request and response May 15, 2024
@OmniaGM
Copy link
Contributor Author

OmniaGM commented May 16, 2024

Few of the failed tests are related to this change and am working on fixing them

@OmniaGM OmniaGM force-pushed the KAFKA-10551-produceRequest branch from 63a6032 to 8c3602b Compare May 16, 2024 15:06
@OmniaGM
Copy link
Contributor Author

OmniaGM commented May 20, 2024

Few of the failed tests are related to this change and am working on fixing them

I believe that failed tests now are unrelated

@jolshan
Copy link
Contributor

jolshan commented May 20, 2024

Topic name and Topic Id will become optional following the footstep of FetchRequest/FetchResponse

My understanding is that all requests going forward will use ID and not name similar to fetch request. I believe that is what is in the PR, but the comment suggests otherwise.

@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
TopicPartition tp = new TopicPartition(r.name(), p.index());
// Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we do if metadata has refreshed and is no longer in the metadata?
For fetch it is a bit different since we have the session logic, and can handle missing topics.

I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments.

Copy link
Contributor Author

@OmniaGM OmniaGM May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If topic has been recreated and topic id is out of date, the client will get UNKNOWN_TOPIC_ID and on the retry the topic id will be updated
  • If topic has been reassigned to another broker then the client will get NOT_LEADER_OR_FOLLOWER and then the client can retry with the right broker.
  • Am not sure what upgrade case/downgrade you refer too here Do you mean the client and broker IBP combination? If yes then some of these are covered in ProduceRequestTest and RequestResponseTest

I added two test cases to cover the first two and the producer seem to self recover on retry.

Copy link
Contributor

@jolshan jolshan May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For the fetch request for example, there is code to make sure that all topics have IDs before we can send the fetch request. This is a bit less of an issue now, but if we have a cluster that is running on a MV < 2.8, topics will not have IDs. So when we decide which version of produce we want to send, we want to be aware of this.

Not only that, but even if the broker supports topic IDs on all topics, we also may have a case where we need to do a rolling upgrade to get the code that supports handling the latest API version. This may be less complicated for Produce since it is a client only API and doesn't rely on MV/IBP, so the apiVersions exchange between the client and the broker may be enough to ensure api compatibility.

We just want to confirm these upgrade paths are compatible since produce is the hot path and we don't want any (or at least not extended) downtime in the middle of an upgrade.

@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to pass this data structure here if we are not using the ID to check the append at the log level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two reasons here

  1. I didn't want to keep convert between TopicIdPartitions to TopicPartition
  2. KAFKA-16212 will eventually use TopicIdPartitions to getPartitionOrException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- once we start using these across the log layer it makes sense.

@jolshan
Copy link
Contributor

jolshan commented May 20, 2024

I would recommend taking a look at where we are passing the topic ID through and the checks we do. If we think it is useful to ensure we are writing to the right topic, we should do it, but if it is just adding complexity, we may want to consider changing.

@OmniaGM
Copy link
Contributor Author

OmniaGM commented May 22, 2024

Topic name and Topic Id will become optional following the footstep of FetchRequest/FetchResponse

My understanding is that all requests going forward will use ID and not name similar to fetch request. I believe that is what is in the PR, but the comment suggests otherwise.

I meant that in Json files both will be marked ignorable

@OmniaGM OmniaGM force-pushed the KAFKA-10551-produceRequest branch from 8daeb62 to 1abc2ac Compare May 28, 2024 15:43
@OmniaGM OmniaGM force-pushed the KAFKA-10551-produceRequest branch from 1abc2ac to 35dba4b Compare May 28, 2024 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants