Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: explicitly set request hash key for the ring hash LB policy #11881

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 86 additions & 29 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
Expand All @@ -34,6 +35,7 @@
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.util.MultiChildLoadBalancer;
Expand All @@ -47,6 +49,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -70,6 +73,7 @@
private final XdsLogger logger;
private final SynchronizationContext syncContext;
private List<RingEntry> ring;
private String requestHashHeader = "";

RingHashLoadBalancer(Helper helper) {
super(helper);
Expand Down Expand Up @@ -99,6 +103,7 @@
if (config == null) {
throw new IllegalArgumentException("Missing RingHash configuration");
}
requestHashHeader = config.requestHashHeader;
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
long totalWeight = 0L;
for (EquivalentAddressGroup eag : addrList) {
Expand Down Expand Up @@ -213,7 +218,8 @@
overallState = TRANSIENT_FAILURE;
}

RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates());
RingHashPicker picker =
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeader);
getHelper().updateBalancingState(overallState, picker);
this.currentConnectivityState = overallState;
}
Expand Down Expand Up @@ -334,23 +340,30 @@
}

private static final class RingHashPicker extends SubchannelPicker {
private final Random random = new Random();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to share this object across pickers and even inject it when creating RingHashLB for tests. That way we don't have to deal with flakes.

private final SynchronizationContext syncContext;
private final List<RingEntry> ring;
// Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
// freeze picker's view of subchannel's connectivity state.
// TODO(chengyuanzhang): can be more performance-friendly with
// IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
private final Map<Endpoint, SubchannelView> pickableSubchannels; // read-only
private final String requestHashHeader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to move the construction of Metadata.Key<String> earlier and store the object here. It'd be null when the hash header is empty string. We should also store the key in RingHashLoadBalancer.requestHashHeader. It's up to you whether we store it in RingHashConfig.

private boolean hasEndpointInConnectingState = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this final as well.


private RingHashPicker(
SynchronizationContext syncContext, List<RingEntry> ring,
Collection<ChildLbState> children) {
Collection<ChildLbState> children, String requestHashHeader) {
this.syncContext = syncContext;
this.ring = ring;
this.requestHashHeader = requestHashHeader;
pickableSubchannels = new HashMap<>(children.size());
for (ChildLbState childLbState : children) {
pickableSubchannels.put((Endpoint)childLbState.getKey(),
new SubchannelView(childLbState, childLbState.getCurrentState()));
if (childLbState.getCurrentState() == CONNECTING) {
hasEndpointInConnectingState = true;
}
}
}

Expand Down Expand Up @@ -381,38 +394,78 @@

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
if (requestHash == null) {
return PickResult.withError(RPC_HASH_NOT_FOUND);
// Determine request hash.
boolean usingRandomHash = false;
Long requestHash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not box and think about null in these new cases that don't need it. We should make this long and deal with the null in the only case that needs it. We should also change getTargetIndex() be be passed a long (since it assumes it is non-null already).

if (requestHashHeader.isEmpty()) {
// Set by the xDS config selector.
requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
if (requestHash == null) {
return PickResult.withError(RPC_HASH_NOT_FOUND);

Check warning on line 404 in xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java#L404

Added line #L404 was not covered by tests
}
} else {
Iterable<String> headerValues =
args.getHeaders()
.getAll(Metadata.Key.of(requestHashHeader, Metadata.ASCII_STRING_MARSHALLER));
if (headerValues != null) {
requestHash = hashFunc.hashAsciiString(Joiner.on(",").join(headerValues));
} else {
requestHash = random.nextLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not thread-safe. See io.grpc.xds.ThreadSafeRandom.

usingRandomHash = true;
}
}

int targetIndex = getTargetIndex(requestHash);

// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
// IDLE, we initiate a connection.
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;

if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
if (!usingRandomHash) {
// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
// IDLE, we initiate a connection.
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;

if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
}

// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
// are failed unless there is a READY connection.
if (subchannelView.connectivityState == CONNECTING) {
return PickResult.withNoResult();
}

if (subchannelView.connectivityState == IDLE) {
syncContext.execute(() -> {
childLbState.getLb().requestConnection();
});

return PickResult.withNoResult(); // Indicates that this should be retried after backoff
}
}

// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
// are failed unless there is a READY connection.
if (subchannelView.connectivityState == CONNECTING) {
return PickResult.withNoResult();
} else {
// Using a random hash. Find and use the first READY ring entry, triggering at most one
// entry to attempt connection.
boolean requestedConnection = hasEndpointInConnectingState;
for (int i = 0; i < ring.size(); i++) {
int index = (targetIndex + i) % ring.size();
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
ChildLbState childLbState = subchannelView.childLbState;
if (subchannelView.connectivityState == READY) {
return childLbState.getCurrentPicker().pickSubchannel(args);
}
if (!requestedConnection && subchannelView.connectivityState == IDLE) {
syncContext.execute(
() -> {
childLbState.getLb().requestConnection();
});
requestedConnection = true;
}
}

if (subchannelView.connectivityState == IDLE) {
syncContext.execute(() -> {
childLbState.getLb().requestConnection();
});

return PickResult.withNoResult(); // Indicates that this should be retried after backoff
if (requestedConnection) {
return PickResult.withNoResult();
}
}

Expand Down Expand Up @@ -460,20 +513,24 @@
static final class RingHashConfig {
final long minRingSize;
final long maxRingSize;
final String requestHashHeader;

RingHashConfig(long minRingSize, long maxRingSize) {
RingHashConfig(long minRingSize, long maxRingSize, String requestHashHeader) {
checkArgument(minRingSize > 0, "minRingSize <= 0");
checkArgument(maxRingSize > 0, "maxRingSize <= 0");
checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
checkNotNull(requestHashHeader);
this.minRingSize = minRingSize;
this.maxRingSize = maxRingSize;
this.requestHashHeader = requestHashHeader;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("minRingSize", minRingSize)
.add("maxRingSize", maxRingSize)
.add("requestHashHeader", requestHashHeader)
.toString();
}
}
Expand Down
11 changes: 10 additions & 1 deletion xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonUtil;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.RingHashOptions;
Expand Down Expand Up @@ -81,13 +82,20 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(
Map<String, ?> rawLoadBalancingPolicyConfig) {
Long minRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "minRingSize");
Long maxRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "maxRingSize");
String requestHashHeader = "";
if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY", false)) {
requestHashHeader = JsonUtil.getString(rawLoadBalancingPolicyConfig, "requestHashHeader");
}
long maxRingSizeCap = RingHashOptions.getRingSizeCap();
if (minRingSize == null) {
minRingSize = DEFAULT_MIN_RING_SIZE;
}
if (maxRingSize == null) {
maxRingSize = DEFAULT_MAX_RING_SIZE;
}
if (requestHashHeader == null) {
requestHashHeader = "";
}
if (minRingSize > maxRingSizeCap) {
minRingSize = maxRingSizeCap;
}
Expand All @@ -98,6 +106,7 @@ private ConfigOrError parseLoadBalancingPolicyConfigInternal(
return ConfigOrError.fromError(Status.UNAVAILABLE.withDescription(
"Invalid 'mingRingSize'/'maxRingSize'"));
}
return ConfigOrError.fromConfig(new RingHashConfig(minRingSize, maxRingSize));
return ConfigOrError.fromConfig(
new RingHashConfig(minRingSize, maxRingSize, requestHashHeader));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void uncaughtException(Thread t, Throwable e) {
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("round_robin"), null)));
private final Object ringHash = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L));
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L, ""));
private final Object leastRequest = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
@RunWith(JUnit4.class)
public class RingHashLoadBalancerProviderTest {
private static final String AUTHORITY = "foo.googleapis.com";
private static final String GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY =
"GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY";

private final SynchronizationContext syncContext = new SynchronizationContext(
new UncaughtExceptionHandler() {
Expand Down Expand Up @@ -81,6 +83,7 @@ public void parseLoadBalancingConfig_valid() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(10L);
assertThat(config.maxRingSize).isEqualTo(100L);
assertThat(config.requestHashHeader).isEmpty();
}

@Test
Expand All @@ -92,6 +95,7 @@ public void parseLoadBalancingConfig_missingRingSize_useDefaults() throws IOExce
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(RingHashLoadBalancerProvider.DEFAULT_MIN_RING_SIZE);
assertThat(config.maxRingSize).isEqualTo(RingHashLoadBalancerProvider.DEFAULT_MAX_RING_SIZE);
assertThat(config.requestHashHeader).isEmpty();
}

@Test
Expand Down Expand Up @@ -127,6 +131,7 @@ public void parseLoadBalancingConfig_ringTooLargeUsesCap() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(10);
assertThat(config.maxRingSize).isEqualTo(RingHashOptions.DEFAULT_RING_SIZE_CAP);
assertThat(config.requestHashHeader).isEmpty();
}

@Test
Expand All @@ -142,6 +147,7 @@ public void parseLoadBalancingConfig_ringCapCanBeRaised() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(RingHashOptions.MAX_RING_SIZE_CAP);
assertThat(config.maxRingSize).isEqualTo(RingHashOptions.MAX_RING_SIZE_CAP);
assertThat(config.requestHashHeader).isEmpty();
// Reset to avoid affecting subsequent test cases
RingHashOptions.setRingSizeCap(RingHashOptions.DEFAULT_RING_SIZE_CAP);
}
Expand All @@ -159,6 +165,7 @@ public void parseLoadBalancingConfig_ringCapIsClampedTo8M() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(RingHashOptions.MAX_RING_SIZE_CAP);
assertThat(config.maxRingSize).isEqualTo(RingHashOptions.MAX_RING_SIZE_CAP);
assertThat(config.requestHashHeader).isEmpty();
// Reset to avoid affecting subsequent test cases
RingHashOptions.setRingSizeCap(RingHashOptions.DEFAULT_RING_SIZE_CAP);
}
Expand All @@ -176,6 +183,7 @@ public void parseLoadBalancingConfig_ringCapCanBeLowered() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(1);
assertThat(config.maxRingSize).isEqualTo(1);
assertThat(config.requestHashHeader).isEmpty();
// Reset to avoid affecting subsequent test cases
RingHashOptions.setRingSizeCap(RingHashOptions.DEFAULT_RING_SIZE_CAP);
}
Expand All @@ -193,6 +201,7 @@ public void parseLoadBalancingConfig_ringCapLowerLimitIs1() throws IOException {
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(1);
assertThat(config.maxRingSize).isEqualTo(1);
assertThat(config.requestHashHeader).isEmpty();
// Reset to avoid affecting subsequent test cases
RingHashOptions.setRingSizeCap(RingHashOptions.DEFAULT_RING_SIZE_CAP);
}
Expand All @@ -219,6 +228,59 @@ public void parseLoadBalancingConfig_minRingSizeGreaterThanMaxRingSize() throws
.isEqualTo("Invalid 'mingRingSize'/'maxRingSize'");
}

@Test
public void parseLoadBalancingConfig_requestHashHeaderIgnoredWhenEnvVarNotSet()
throws IOException {
String lbConfig =
"{\"minRingSize\" : 10, \"maxRingSize\" : 100, \"requestHashHeader\" : \"dummy-hash\"}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(10L);
assertThat(config.maxRingSize).isEqualTo(100L);
assertThat(config.requestHashHeader).isEmpty();
}

@Test
public void parseLoadBalancingConfig_requestHashHeaderSetWhenEnvVarSet() throws IOException {
System.setProperty(GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY, "true");
try {
String lbConfig =
"{\"minRingSize\" : 10, \"maxRingSize\" : 100, \"requestHashHeader\" : \"dummy-hash\"}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(10L);
assertThat(config.maxRingSize).isEqualTo(100L);
assertThat(config.requestHashHeader).isEqualTo("dummy-hash");
assertThat(config.toString()).contains("minRingSize=10");
assertThat(config.toString()).contains("maxRingSize=100");
assertThat(config.toString()).contains("requestHashHeader=dummy-hash");
} finally {
System.clearProperty(GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY);
}
}

@Test
public void parseLoadBalancingConfig_requestHashHeaderUnsetWhenEnvVarSet_useDefaults()
throws IOException {
System.setProperty(GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY, "true");
try {
String lbConfig = "{\"minRingSize\" : 10, \"maxRingSize\" : 100}";
ConfigOrError configOrError =
provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
assertThat(configOrError.getConfig()).isNotNull();
RingHashConfig config = (RingHashConfig) configOrError.getConfig();
assertThat(config.minRingSize).isEqualTo(10L);
assertThat(config.maxRingSize).isEqualTo(100L);
assertThat(config.requestHashHeader).isEmpty();
} finally {
System.clearProperty(GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY);
}
}

@SuppressWarnings("unchecked")
private static Map<String, ?> parseJsonObject(String json) throws IOException {
return (Map<String, ?>) JsonParser.parse(json);
Expand Down
Loading