Skip to content

Commit

Permalink
Use insertion sorting for more stability
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Sep 20, 2024
1 parent cdf3490 commit 527e431
Showing 1 changed file with 16 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -88,12 +87,6 @@ record ConsumerEntry(Consumer consumer, MutableInt consumerSelectionCounter)
public int compareTo(ConsumerEntry o) {
return CONSUMER_ENTRY_COMPARATOR.compare(this, o);
}

// comparison without the usage count so that the consumer doesn't get changed too eagerly
// when entries are removed
public int baseCompareTo(ConsumerEntry o) {
return BASE_CONSUMER_ENTRY_COMPARATOR.compare(this, o);
}
}

private final List<ConsumerEntry> consumers;
Expand All @@ -104,39 +97,33 @@ public HashRingEntry() {
}

public void addConsumer(Consumer consumer, MutableInt selectedCounter) {
consumers.add(new ConsumerEntry(consumer, selectedCounter));
selectConsumer(null);
ConsumerEntry consumerEntry = new ConsumerEntry(consumer, selectedCounter);
insertConsumer(consumerEntry);
selectConsumer();
}

private void insertConsumer(ConsumerEntry consumerEntry) {
for (int i = 0; i < consumers.size(); i++) {
if (consumers.get(i).compareTo(consumerEntry) > 0) {
consumers.add(i, consumerEntry);
return;
}
}
consumers.add(consumerEntry);
}

public boolean removeConsumer(Consumer consumer) {
boolean removed = consumers.removeIf(consumerEntry -> consumerEntry.consumer().equals(consumer));
selectConsumer(consumer);
selectConsumer();
return removed;
}

public Consumer getSelectedConsumer() {
return selectedConsumerEntry != null ? selectedConsumerEntry.consumer() : null;
}

private void selectConsumer(Consumer removedConsumer) {
if (consumers.size() > 1) {
boolean addOperation = removedConsumer == null;
if (addOperation) {
Collections.sort(consumers);
}
ConsumerEntry newSelectedConsumer = consumers.get(0);
// change the selected consumer only if the newer has higher priority,
// or the same priority and an earlier name in sorting order
if (selectedConsumerEntry == null || addOperation
|| selectedConsumerEntry.consumer.equals(removedConsumer)
|| selectedConsumerEntry.baseCompareTo(newSelectedConsumer) > 0) {
changeSelectedConsumerEntry(newSelectedConsumer);
}
} else if (consumers.size() == 1) {
changeSelectedConsumerEntry(consumers.get(0));
} else {
changeSelectedConsumerEntry(null);
}
private void selectConsumer() {
changeSelectedConsumerEntry(consumers.isEmpty() ? null : consumers.get(0));
}

private void changeSelectedConsumerEntry(ConsumerEntry newSelectedConsumer) {
Expand Down

0 comments on commit 527e431

Please sign in to comment.