Skip to content

xds: explicitly set request hash key for the ring hash LB policy #11881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 101 additions & 30 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
Expand All @@ -34,9 +36,11 @@
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.SocketAddress;
Expand Down Expand Up @@ -69,13 +73,21 @@
new LazyLoadBalancer.Factory(pickFirstLbProvider);
private final XdsLogger logger;
private final SynchronizationContext syncContext;
private final ThreadSafeRandom random;
private List<RingEntry> ring;
@Nullable private Metadata.Key<String> requestHashHeaderKey;

RingHashLoadBalancer(Helper helper) {
this(helper, ThreadSafeRandomImpl.instance);
}

@VisibleForTesting
RingHashLoadBalancer(Helper helper, ThreadSafeRandom random) {
super(helper);
syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
this.random = checkNotNull(random, "random");
}

@Override
Expand All @@ -92,6 +104,10 @@
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");
}
requestHashHeaderKey =
config.requestHashHeader.isEmpty()
? null
: Metadata.Key.of(config.requestHashHeader, Metadata.ASCII_STRING_MARSHALLER);
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Expand Down Expand Up @@ -197,7 +213,8 @@
overallState = TRANSIENT_FAILURE;
}

RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates());
RingHashPicker picker =
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeaderKey, random);
getHelper().updateBalancingState(overallState, picker);
this.currentConnectivityState = overallState;
}
Expand Down Expand Up @@ -325,21 +342,32 @@
// TODO(chengyuanzhang): can be more performance-friendly with
// IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
private final Map<Endpoint, SubchannelView> pickableSubchannels; // read-only
@Nullable private final Metadata.Key<String> requestHashHeaderKey;
private final ThreadSafeRandom random;
private final boolean hasEndpointInConnectingState;

private RingHashPicker(
SynchronizationContext syncContext, List<RingEntry> ring,
Collection<ChildLbState> children) {
Collection<ChildLbState> children, Metadata.Key<String> requestHashHeaderKey,
ThreadSafeRandom random) {
this.syncContext = syncContext;
this.ring = ring;
this.requestHashHeaderKey = requestHashHeaderKey;
this.random = random;
pickableSubchannels = new HashMap<>(children.size());
boolean hasConnectingState = false;
for (ChildLbState childLbState : children) {
pickableSubchannels.put((Endpoint)childLbState.getKey(),
new SubchannelView(childLbState, childLbState.getCurrentState()));
if (childLbState.getCurrentState() == CONNECTING) {
hasConnectingState = true;
}
}
this.hasEndpointInConnectingState = hasConnectingState;
}

// Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
private int getTargetIndex(Long requestHash) {
private int getTargetIndex(long requestHash) {
if (ring.size() <= 1) {
return 0;
}
Expand All @@ -365,38 +393,77 @@

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
if (requestHash == null) {
return PickResult.withError(RPC_HASH_NOT_FOUND);
// Determine request hash.
boolean usingRandomHash = false;
long requestHash;
if (requestHashHeaderKey == null) {
// Set by the xDS config selector.
Long rpcHashFromCallOptions = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
if (rpcHashFromCallOptions == null) {
return PickResult.withError(RPC_HASH_NOT_FOUND);

Check warning on line 403 in xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java#L403

Added line #L403 was not covered by tests
}
requestHash = rpcHashFromCallOptions;
} else {
Iterable<String> headerValues = args.getHeaders().getAll(requestHashHeaderKey);
if (headerValues != null) {
requestHash = hashFunc.hashAsciiString(Joiner.on(",").join(headerValues));
} else {
requestHash = random.nextLong();
usingRandomHash = true;
}
}

int targetIndex = getTargetIndex(requestHash);

// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
// IDLE, we initiate a connection.
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;

if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
if (!usingRandomHash) {
// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
// IDLE, we initiate a connection.
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;

if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
}

// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
// are failed unless there is a READY connection.
if (subchannelView.connectivityState == CONNECTING) {
return PickResult.withNoResult();
}

if (subchannelView.connectivityState == IDLE) {
syncContext.execute(() -> {
childLbState.getLb().requestConnection();
});

return PickResult.withNoResult(); // Indicates that this should be retried after backoff
}
}

// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
// are failed unless there is a READY connection.
if (subchannelView.connectivityState == CONNECTING) {
return PickResult.withNoResult();
} else {
// Using a random hash. Find and use the first READY ring entry, triggering at most one
// entry to attempt connection.
boolean requestedConnection = hasEndpointInConnectingState;
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;
if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
}
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
syncContext.execute(
() -> {
childLbState.getLb().requestConnection();
});
requestedConnection = true;
}
}

if (subchannelView.connectivityState == IDLE) {
syncContext.execute(() -> {
childLbState.getLb().requestConnection();
});

return PickResult.withNoResult(); // Indicates that this should be retried after backoff
if (requestedConnection) {
return PickResult.withNoResult();
}
}

Expand Down Expand Up @@ -444,20 +511,24 @@
static final class RingHashConfig {
final long minRingSize;
final long maxRingSize;
final String requestHashHeader;

RingHashConfig(long minRingSize, long maxRingSize) {
RingHashConfig(long minRingSize, long maxRingSize, String requestHashHeader) {
checkArgument(minRingSize > 0, "minRingSize <= 0");
checkArgument(maxRingSize > 0, "maxRingSize <= 0");
checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
checkNotNull(requestHashHeader);
this.minRingSize = minRingSize;
this.maxRingSize = maxRingSize;
this.requestHashHeader = requestHashHeader;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("minRingSize", minRingSize)
.add("maxRingSize", maxRingSize)
.add("requestHashHeader", requestHashHeader)
.toString();
}
}
Expand Down
11 changes: 10 additions & 1 deletion xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonUtil;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.RingHashOptions;
Expand Down Expand Up @@ -81,13 +82,20 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(
Map<String, ?> rawLoadBalancingPolicyConfig) {
Long minRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "minRingSize");
Long maxRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "maxRingSize");
String requestHashHeader = "";
if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY", false)) {
requestHashHeader = JsonUtil.getString(rawLoadBalancingPolicyConfig, "requestHashHeader");
}
long maxRingSizeCap = RingHashOptions.getRingSizeCap();
if (minRingSize == null) {
minRingSize = DEFAULT_MIN_RING_SIZE;
}
if (maxRingSize == null) {
maxRingSize = DEFAULT_MAX_RING_SIZE;
}
if (requestHashHeader == null) {
requestHashHeader = "";
}
if (minRingSize > maxRingSizeCap) {
minRingSize = maxRingSizeCap;
}
Expand All @@ -98,6 +106,7 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
"Invalid 'mingRingSize'/'maxRingSize'"));
}
return ConfigOrError.fromConfig(new RingHashConfig(minRingSize, maxRingSize));
return ConfigOrError.fromConfig(
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void uncaughtException(Thread t, Throwable e) {
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("round_robin"), null)));
private final Object ringHash = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L));
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L, ""));
private final Object leastRequest = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
Expand Down
Loading
Loading