-
-
Notifications
You must be signed in to change notification settings - Fork 255
kfake: harden fetch and group assignment encoding #1187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
kfake: harden fetch and group assignment encoding #1187
Conversation
* Ensure fetch responses always include non-nil RecordBatches so empty/error partitions encode as empty-but-present record sets, matching Kafka clients such as librdkafka. * Fix race condition in client_conn response writing: the old code reused a single `buf` slice across the response loop and passed `buf[start:]` directly to goroutines for writing. This meant concurrent goroutines could read corrupted data if the main loop modified `buf` for the next response before the previous write completed. The fix copies `buf` into a goroutine-local `payload` slice before launching each write goroutine, ensuring each concurrent writer has an independent snapshot of the response data. * Simplify client_conn request parsing and response framing for flexible vs non-flexible responses, and improve debug logging with request key/version. * Guarantee group member assignments are always encoded as syntactically valid ConsumerMemberAssignment blobs (even when logically empty) via assignmentOrEmpty/ensureMemberAssignment, so followers and stable members can always decode assignments. * Add groups_assignment_test to exercise completeLeaderSync and handleSync for leader/follower members, and seed_topics_test to verify SeedTopics-created topics work with ProduceSync/Ping against SeedTopics-created topics. * Scope all changes to pkg/kfake and tests; no exported API or kgo client semantics are modified.
Can you explain a bit more about this one? I think the only way clients can experience problems here is if a leader is sending empty assignments for everybody else, and then everybody else is having deserialization problems. I think that technically the patch is invalid if I use any protocol type besides That said, I think this points to a client bug (although I guess the Kafka broker is more tolerant already in this case). How did you notice / where did you run into this? |
| start := 0 | ||
| l := len(buf) - 4 | ||
| if !resp.kresp.IsFlexible() || resp.kresp.Key() == 18 { | ||
| l-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason I had the code ugly in the prior version was so that I could only do two appends, which at most would be two reallocs.
Your version is way better though. The choice for my version makes no sense anyway because the buf slice is reused for future writes, and this is a fake implementation that doesn't need to care about a single alloc anyway.
| payload := append([]byte(nil), buf...) | ||
| go func() { | ||
| _, err := cc.conn.Write(buf[start:]) | ||
| _, err := cc.conn.Write(payload) | ||
| writeCh <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is actually unnecessary. The select case below waits for either the write to be done, OR for the cluster to be shut down. If the cluster is shut down, this write loop exits. In either case, there is no race on the buf data: it's used for writing, only here. Only if writing completes (causing the select case to exit/continue) do we loop back to the top of the write loop and reuse the buffer.
|
|
||
| // TestSeedTopicsProduceSync_NoHang verifies that producing to a topic created | ||
| // via SeedTopics succeeds without hanging in the client metadata path. | ||
| func TestSeedTopicsProduceSync_NoHang(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this test?
| // than null. A nil slice would be serialized as a length of -1 (or 0 | ||
| // in the compact-nullable form), which some clients (for example, | ||
| // librdkafka) interpret as an invalid MessageSet size. | ||
| sp.RecordBatches = make([]byte, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay to add this here, but I'm fairly sure this if running librdkafka is failing on this, it's technically a librdkafka bug:
I do remember being surprised when implementing the serializing years ago that, technically, records could be nullable.
| resp.ProtocolType = kmsg.StringPtr(g.protocolType) | ||
| resp.Protocol = kmsg.StringPtr(g.protocol) | ||
| resp.MemberAssignment = m.assignment | ||
| resp.MemberAssignment = ensureMemberAssignment(m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically with the changes in completeLeaderSync, this line here is unnecessary
Ensure fetch responses always include non-nil
RecordBatchesso empty/error partitions encode as empty-but-present record sets, matching Kafka clients such aslibrdkafka.Fix race condition in
client_connresponse writing: the old code reused a singlebufslice across the response loop and passedbuf[start:]directly to goroutines for writing. This meant concurrent goroutines could read corrupted data if the main loop modifiedbuffor the next response before the previous write completed. The fix copiesbufinto a goroutine-localpayloadslice before launching each write goroutine, ensuring each concurrent writer has an independent snapshot of the response data.Simplify
client_connrequest parsing and response framing for flexible vs non-flexible responses, and improve debug logging with request key/version.Guarantee group member assignments are always encoded as syntactically valid
ConsumerMemberAssignmentblobs (even when logically empty) viaassignmentOrEmpty/ensureMemberAssignment, so followers and stable members can always decode assignments.Add
groups_assignment_testto exercisecompleteLeaderSyncandhandleSyncfor leader/follower members, andseed_topics_testto verifySeedTopics-created topics work withProduceSync/PingagainstSeedTopics-created topics.Scope all changes to
pkg/kfakeand tests; no exported API or kgo client semantics are modified.