Skip to content

Conversation

@cjohnson-confluent
Copy link

  • Ensure fetch responses always include non-nil RecordBatches so empty/error partitions encode as empty-but-present record sets, matching Kafka clients such as librdkafka.

  • Fix race condition in client_conn response writing: the old code reused a single buf slice across the response loop and passed buf[start:] directly to goroutines for writing. This meant concurrent goroutines could read corrupted data if the main loop modified buf for the next response before the previous write completed. The fix copies buf into a goroutine-local payload slice before launching each write goroutine, ensuring each concurrent writer has an independent snapshot of the response data.

  • Simplify client_conn request parsing and response framing for flexible vs non-flexible responses, and improve debug logging with request key/version.

  • Guarantee group member assignments are always encoded as syntactically valid ConsumerMemberAssignment blobs (even when logically empty) via assignmentOrEmpty/ensureMemberAssignment, so followers and stable members can always decode assignments.

  • Add groups_assignment_test to exercise completeLeaderSync and handleSync for leader/follower members, and seed_topics_test to verify SeedTopics-created topics work with ProduceSync/Ping against SeedTopics-created topics.

  • Scope all changes to pkg/kfake and tests; no exported API or kgo client semantics are modified.

* Ensure fetch responses always include non-nil RecordBatches so empty/error partitions encode as empty-but-present record sets, matching Kafka clients such as librdkafka.

* Fix race condition in client_conn response writing: the old code reused a single `buf` slice across the response loop and passed `buf[start:]` directly to goroutines for writing. This meant concurrent goroutines could read corrupted data if the main loop modified `buf` for the next response before the previous write completed. The fix copies `buf` into a goroutine-local `payload` slice before launching each write goroutine, ensuring each concurrent writer has an independent snapshot of the response data.

* Simplify client_conn request parsing and response framing for flexible vs non-flexible responses, and improve debug logging with request key/version.

* Guarantee group member assignments are always encoded as syntactically valid ConsumerMemberAssignment blobs (even when logically empty) via assignmentOrEmpty/ensureMemberAssignment, so followers and stable members can always decode assignments.

* Add groups_assignment_test to exercise completeLeaderSync and handleSync for leader/follower members, and seed_topics_test to verify SeedTopics-created topics work with ProduceSync/Ping against SeedTopics-created topics.

* Scope all changes to pkg/kfake and tests; no exported API or kgo client semantics are modified.
@twmb
Copy link
Owner

twmb commented Nov 21, 2025

Guarantee group member assignments are always encoded as syntactically valid ConsumerMemberAssignment blobs (even when logically empty) via assignmentOrEmpty/ensureMemberAssignment, so followers and stable members can always decode assignments.

Can you explain a bit more about this one? I think the only way clients can experience problems here is if a leader is sending empty assignments for everybody else, and then everybody else is having deserialization problems.

I think that technically the patch is invalid if I use any protocol type besides consumer, and technically I can hijack the consumer protocol type and do whatever I want with it anyway with a custom implemented group balancer. I know the kafka broker itself has some special code for the consumer protocol type (search the Kafka repo under group-coordinator for any mention of .PROTOCOL_TYPE -- not mentioned in any KIP which is kewl). If we want to mirror some specialization into kfake to work around clients that are sending empty assignments, I think we should only do this if the protocol type == "consumer".

That said, I think this points to a client bug (although I guess the Kafka broker is more tolerant already in this case). How did you notice / where did you run into this?

start := 0
l := len(buf) - 4
if !resp.kresp.IsFlexible() || resp.kresp.Key() == 18 {
l--
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason I had the code ugly in the prior version was so that I could only do two appends, which at most would be two reallocs.

Your version is way better though. The choice for my version makes no sense anyway because the buf slice is reused for future writes, and this is a fake implementation that doesn't need to care about a single alloc anyway.

Comment on lines +164 to 167
payload := append([]byte(nil), buf...)
go func() {
_, err := cc.conn.Write(buf[start:])
_, err := cc.conn.Write(payload)
writeCh <- err
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is actually unnecessary. The select case below waits for either the write to be done, OR for the cluster to be shut down. If the cluster is shut down, this write loop exits. In either case, there is no race on the buf data: it's used for writing, only here. Only if writing completes (causing the select case to exit/continue) do we loop back to the top of the write loop and reuse the buffer.


// TestSeedTopicsProduceSync_NoHang verifies that producing to a topic created
// via SeedTopics succeeds without hanging in the client metadata path.
func TestSeedTopicsProduceSync_NoHang(t *testing.T) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this test?

// than null. A nil slice would be serialized as a length of -1 (or 0
// in the compact-nullable form), which some clients (for example,
// librdkafka) interpret as an invalid MessageSet size.
sp.RecordBatches = make([]byte, 0)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay to add this here, but I'm fairly sure this if running librdkafka is failing on this, it's technically a librdkafka bug:

https://github.com/apache/kafka/blob/d04f171d3123a158a32ea65eb59600bc2f0fd628/generator/src/main/java/org/apache/kafka/message/FieldType.java#L292-L295

I do remember being surprised when implementing the serializing years ago that, technically, records could be nullable.

resp.ProtocolType = kmsg.StringPtr(g.protocolType)
resp.Protocol = kmsg.StringPtr(g.protocol)
resp.MemberAssignment = m.assignment
resp.MemberAssignment = ensureMemberAssignment(m)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically with the changes in completeLeaderSync, this line here is unnecessary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants