-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 #15945
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for finding this and raising this! I've a question around onMetadataUpdate
implementation in BrokerRegistrationTracker
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
Outdated
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
Show resolved
Hide resolved
metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Show resolved
Hide resolved
if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) && | ||
registration.directories().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)
evaluates to false
, then isn't registration.directories().isEmpty()
always true
?
The comment above mentions the directory list changing as a way to get here, but the only ways the directory list can change are:
- On a broker registration that includes dirs
- When a broker that has previously registered with more than one dir turns one of them offline
Both of these require that the broker has registered at least once with directories, which can only happen after MetadataVersion.IBP_3_7_IV2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check the broker configuration? If the broker isn't configured with multiple log directories there's no need to re-register. Perhaps we shouldn't even setup this metadata listener if there's only one configured log dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) evaluates to false, then isn't registration.directories().isEmpty() always true?
No. We can get to this point in the code just because of a metadata version change. For example, from 3.7-IV1 to 3.7-IV2. In that case registration.directories()
would not be empty and we would not want to do anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check the broker configuration? If the broker isn't configured with multiple log directories there's no need to re-register. Perhaps we shouldn't even setup this metadata listener if there's only one configured log dir.
broker IDs do have benefits for non-JBOD brokers. One benefit is knowing when a directory has been re-initialized. In any case, the impact of resending the registration in this specific case is pretty small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we add directories to registration records pre 3.7-IV2, so if we're bumping the MV from 3.7-IV1, how does registration.directories()
get populated before a broker re-registration?
kafka/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Lines 422 to 424 in 8d11d95
if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { | |
record.setLogDirs(request.logDirs()); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't express myself well. I agree that registration.directories()
will indeed be empty if we're coming from 3.7-IV1
or earlier. But, we still need to check the current MV in this "if" statement since we do not want to resend the registration unless the MV is 3.7-IV2
or higher AND the registration does not contain directories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the point is to trigger the re-registration only once, and empty directories signals that hasn't happened yet.
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
Show resolved
Hide resolved
There are some conflicts that need addressing, and the JDK 21 pipeline didn't run. |
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced. This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code. Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.
rebased |
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.
This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code.
Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.