Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState #15972

Merged
merged 10 commits into from
May 22, 2024

Conversation

apourchet
Copy link
Contributor

This rack information is required to compute rack-aware assignments, which many of the current assigners do.

The internal ClientMetadata class was also edited to pass around this rack information.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions);
}

public static DefaultTaskInfo of(final TaskId taskId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is an internal API, you can just have a normal public constructor. The static constructor thing is only for public classes where we want to make a "nice looking" fluent API

final Map<TaskId, Set<TopicPartition>> inputPartitionsForTask,
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask) {

final Set<TopicPartition> inputPartitions = inputPartitionsForTask.get(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the only place where we use the inputPartitionsForTask/changelogPartitionsForTask map, let's just pass in the inputPartitions & changelogPartitions sets directly

Comment on lines 66 to 71
inputPartitions.forEach(partition -> {
racksForPartition.computeIfAbsent(partition, k -> new HashSet<>());
final String consumer = previousOwnerForPartition.apply(partition);
final Optional<String> rack = rackForConsumer.get(consumer);
rack.ifPresent(s -> racksForPartition.get(partition).add(s));
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is not computing the right rack id -- this would be the rack id of the KafkaStreams node that had this partition assigned during the last rebalance. What we want is the rack.id of the broker node(s) that host this partition. This is going to be a bit complex so let's chat online (ditto for the changelogPartitions rack info as well)

final Set<String> stateStoreNames = new HashSet<>();
return new DefaultTaskInfo(
taskId,
isStateful, // All standby tasks are stateful.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, another thing to note here is that this class corresponds to a "logical task", not a "physical one". I just made up those terms but hopefully this will make sense: a "physical" task can be active or standby and represents an actual task that was assigned to a client and will be running on that client, where a "logical" task is just the metadata corresponding to that task id. Where a "task id" logically represents a combination of subtopology (grouping of processors) and partition number. So a logical task doesn't have a concept of active vs standby because it's just metadata, this class is basically telling the assignor which tasks exist in this application. The assignor then has to create a set of physical tasks to actually be assigned, basically one active task and however many standby tasks for each "logical task"

Hope that didn't make things more confusing...anyways this comment isn't incorrect, but it doesn't exactly apply in this context. The "isStateful" is just metadata related to whether it has state stores in this subtopology (I'll tell you how to get this info later, or you can even compute it based on stateStoresNames#isEmpty)

* @return the set of changelog topics, which includes both source changelog topics and non
* source changelog topics.
*/
public Set<InternalTopicConfig> stateChangelogTopics() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd just call this changelogTopics which I think helps make it obvious that this is the super-set of the #sourceChangelogTopics and #nonSourceChangelogTopics APIs

(you can rename the field itself as well but don't have to, that's up to you)

Comment on lines 496 to 497
final Set<String> changelogTopics = entry.getValue().stateChangelogTopics()
.stream().map(t -> t.name).collect(Collectors.toSet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all we need is the topic names from the #stateChangelogTopics API then let's just have it return that directly. You should be able to just return the #keySet of that stateChangelogTopics map to get a Set with the topic names right away

});
final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
changelogPartitionsForTask.forEach((taskId, partitions) -> {
logicalTaskIds.add(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this doesn't hurt anything since logicalTasks is a Set, but the taskIds returned by the partition grouper should be the same for the source and changelog topics. So you can remove this line

(alternatively you can create the logicalTaskIds map up front by copying the keyset of one of the partitionsForTask maps but that's just an implementation detail, up to you. However I would probably consider adding a check to make sure these two maps return the same set of tasks. Doesn't need to scan the entire thing, maybe just a simple

if (sourcePartitionsForTask.size() != changelogPartitionsForTask.size()) {
  log.error("Partition grouper returned {} tasks for source topics but {} tasks for changelog topics, 
                   sourcePartitionsForTask.size(), changelogPartitionsForTask.size());
  throw new TaskAssignmentException(//error msg );
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we'll also want to deduplicate the source-changelog partitions for the rack id computation. We should include them in the source topics/remove them from the changelog topics passed into the #getRacksForTopicPartitions call. Of course we still need the changelogTopicPartitions as well, so we'll want a third set of nonSourceChangelogTopicPartitions that's specifically for the rack id computation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more precise, I'm imagining something like this:

final Set<TopicPartition> sourceTopicPartitions = new HashSet<>();
final Set<TopicPartition> changelogTopicPartitions = new HashSet<>();
final Set<TopicPartition> nonSourceChangelogTopicPartitions = new HashSet<>();

for (final var entry : sourceTopicPartitions.entrySet()) {
    final TaskId task = entry.getKey();
    final Set<TopicPartition> taskSourcePartitions = entry.getValue();
    final Set<TopicPartition> taskChangelogPartitions = changelogTopicPartitions.get(taskId);
    final Set<TopicPartition> taskNonSourceChangelogPartitions = new HashSet(taskChangelogPartitions);
    taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);

    logicalTaskIds.add(taskId);
    sourceTopicPartitions.addAll(taskSourcePartitions);
    changelogTopicPartitions.addAll(taskChangelogPartitions);
   nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
}

Then we pass the nonSourceChangelogPartitions into the #getRacksForTopicPartition instead of the changelogPartitions set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the wall of text 😅 It might not seem like a huge deal but if it's an app with only source-changelog partitions, then doing this will save the assignor from having to make any DescribeTopics request since there are no non-source changelogs.

And yes, apps with only source changelogs do exist, they're pretty common for certain kinds of table-based processing (and especially apps that make heavy use of IQ). And saving a remote fetch is actually a pretty big deal, doing them in the middle of an assignment makes the rebalance vulnerable to timing out, especially when brokers are under heavy load or the app is experiencing rebalancing issues to begin with

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RackUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make this final and add private constructor so it's clear this is just a static utils class

cluster, topicsWithUpToDateMetadata);

final Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo =
describeTopics(internalTopicManager, topicsToDescribe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a huge deal but if we have time left at the end it might make sense to condense this into a single call where we describe all the topics in one go rather than making a separate request for the source topics and changelogs

But it really isn't a big deal because in general, after the first rebalance, all the source topics should have been created and we really will do only one call since only the changelogs will be unknown

on that note, can you check to make sure this skips the actual DescribeTopics request if the set of topics to describe is empty? Like does it end up making a call with the admin client? If not then we should guard this with a if (!topicsToDescribe.isEmpty) (or we can just add this check anyways to be safe)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I wrote it this way to mimic the exact pattern of use that the RackAwareAssigner uses. Once this is all wired and tested though we can make optimizations and changes like this one (and the lazy rack info one).

Comment on lines +83 to +84
LOG.error("TopicPartition {} doesn't exist in cluster", topicPartition);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an error for a topic to not be included in the Cluster, even source topics might not exist here if they had to be created by the assignor during the rebalance, since the Cluster metadata represents the state of the cluster when this rebalance/assignment first began.

Since the point of this method seems to be to collect topics with missing metadata that we'll need to look up via a DescribeTopics request, the ones for which cluster.partition(topicPartition) returns null are exactly the ones that should be returned by this method.

In fact I'd go ahead and remove everything past this line as well, this method should focus only on differentiating topics with missing metadata from ones we already have the info for. If the Cluster has metadata for this partition but the replicas set is missing/empty, then there's something wrong with this partition, and calling DescribeTopics probably won't help

Let's rename this to #topicsWithMissingMetadata while we're at it

Copy link
Contributor Author

@apourchet apourchet May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't how the current rack aware code works:

                final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
                if (partitionInfo == null) {
                    log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
                    return false;
                }
                final Node[] replica = partitionInfo.replicas();
                if (replica == null || replica.length == 0) {
                    topicsToDescribe.add(topicPartition.topic());
                    continue;
                }
                for (final Node node : replica) {
                    if (node.hasRack()) {
                        racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
                    } else {
                        log.warn("Node {} for topic partition {} doesn't have rack", node, topicPartition);
                        return false;
                    }
                }
            }

Above is the logic of populateTopicsToDescribe, which evidently uses the (replica == null || replica.length == 0) condition to decide to fetch further topic information, yet considers cluster.partition(topicPartition) to be an error, which causes the RackAwareAssignor to be turned off entirely.

final List<Node> replicas = partitionInfo.replicas();
if (replicas == null || replicas.isEmpty()) {
LOG.error("No replicas found for topic partition {}: {}", topic, partition);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you factor the lambda out into a separate method? I was really confused by this empty return for a while until I realized it wasn't returning from the getRacksForTopicPartition method, just the lambda inside this loop

Comment on lines +519 to +521
final Map<TopicPartition, Set<String>> racksForSourcePartitions = RackUtils.getRacksForTopicPartition(
cluster, internalTopicManager, sourceTopicPartitions, false);
final Map<TopicPartition, Set<String>> racksForChangelogPartitions = RackUtils.getRacksForTopicPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the rack info is nontrivial to compute and always makes a remote call (which can take a long time and even time out or otherwise fail) and not every assignor/app will actually use it I'm thinking maybe we should try to initialize it lazily, only if/when the user actually requests the rack info

I'm totally happy to push that into a followup PR to keep the scope well-defined for now, so don't worry about it for now. We'd still need everything you implemented here and would just be moving it around and/or subbing in function pointers instead of passing around data strucutres directly, so it shouldn't have any impact on how this PR goes. Just wanted to make a note so I don't forget

* source changelog topics.
*/
public Set<String> changelogTopics() {
return Collections.unmodifiableSet(new HashSet<>(stateChangelogTopics.keySet()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can skip the new HashSet step, that's pretty much redundant with the unmodifiableSet and since we don't plan on modifying the returned set, it's better to just wrap the keySet directly to save a bunch of unnecessary copying

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit wary of the original rack id computation logic but we can revisit that. This LGTM

@ableegoldman ableegoldman merged commit ef2c5e4 into apache:trunk May 22, 2024
1 check failed
@dajac
Copy link
Contributor

dajac commented May 23, 2024

Hey @ableegoldman @apourchet, I see new failures in trunk that seems to be related to this PR. The last build of this PR had 100+ failures: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15972/8/tests. Could you please take a look?

@ableegoldman
Copy link
Contributor

Yep, just noticed this. Sorry about that. We're taking a look

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants