-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: explicitly set request hash key for the ring hash LB policy #11881
base: master
Are you sure you want to change the base?
Changes from 5 commits
a0bccc2
29fc18c
a208780
bd04b72
fa52eab
c8b5fa0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
import io.grpc.EquivalentAddressGroup; | ||
import io.grpc.InternalLogId; | ||
import io.grpc.LoadBalancer; | ||
import io.grpc.Metadata; | ||
import io.grpc.Status; | ||
import io.grpc.SynchronizationContext; | ||
import io.grpc.util.MultiChildLoadBalancer; | ||
|
@@ -47,6 +48,7 @@ | |
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.Nullable; | ||
|
@@ -70,6 +72,7 @@ | |
private final XdsLogger logger; | ||
private final SynchronizationContext syncContext; | ||
private List<RingEntry> ring; | ||
private String requestHashHeader = ""; | ||
|
||
RingHashLoadBalancer(Helper helper) { | ||
super(helper); | ||
|
@@ -99,6 +102,7 @@ | |
if (config == null) { | ||
throw new IllegalArgumentException("Missing RingHash configuration"); | ||
} | ||
requestHashHeader = config.requestHashHeader; | ||
Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>(); | ||
long totalWeight = 0L; | ||
for (EquivalentAddressGroup eag : addrList) { | ||
|
@@ -213,7 +217,8 @@ | |
overallState = TRANSIENT_FAILURE; | ||
} | ||
|
||
RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates()); | ||
RingHashPicker picker = | ||
new RingHashPicker(syncContext, ring, getChildLbStates(), requestHashHeader); | ||
getHelper().updateBalancingState(overallState, picker); | ||
this.currentConnectivityState = overallState; | ||
} | ||
|
@@ -334,23 +339,30 @@ | |
} | ||
|
||
private static final class RingHashPicker extends SubchannelPicker { | ||
private final Random random = new Random(); | ||
private final SynchronizationContext syncContext; | ||
private final List<RingEntry> ring; | ||
// Avoid synchronization between pickSubchannel and subchannel's connectivity state change, | ||
// freeze picker's view of subchannel's connectivity state. | ||
// TODO(chengyuanzhang): can be more performance-friendly with | ||
// IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel. | ||
private final Map<Endpoint, SubchannelView> pickableSubchannels; // read-only | ||
private final String requestHashHeader; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want to move the construction of |
||
private boolean hasEndpointInConnectingState = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this |
||
|
||
private RingHashPicker( | ||
SynchronizationContext syncContext, List<RingEntry> ring, | ||
Collection<ChildLbState> children) { | ||
Collection<ChildLbState> children, String requestHashHeader) { | ||
this.syncContext = syncContext; | ||
this.ring = ring; | ||
this.requestHashHeader = requestHashHeader; | ||
pickableSubchannels = new HashMap<>(children.size()); | ||
for (ChildLbState childLbState : children) { | ||
pickableSubchannels.put((Endpoint)childLbState.getKey(), | ||
new SubchannelView(childLbState, childLbState.getCurrentState())); | ||
if (childLbState.getCurrentState() == CONNECTING) { | ||
hasEndpointInConnectingState = true; | ||
} | ||
} | ||
} | ||
|
||
|
@@ -381,38 +393,78 @@ | |
|
||
@Override | ||
public PickResult pickSubchannel(PickSubchannelArgs args) { | ||
Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY); | ||
if (requestHash == null) { | ||
return PickResult.withError(RPC_HASH_NOT_FOUND); | ||
// Determine request hash. | ||
boolean usingRandomHash = false; | ||
Long requestHash; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not box and think about null in these new cases that don't need it. We should make this |
||
if (requestHashHeader.isEmpty()) { | ||
// Set by the xDS config selector. | ||
requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY); | ||
if (requestHash == null) { | ||
return PickResult.withError(RPC_HASH_NOT_FOUND); | ||
} | ||
} else { | ||
String headerValue = | ||
args.getHeaders() | ||
.get(Metadata.Key.of(requestHashHeader, Metadata.ASCII_STRING_MARSHALLER)); | ||
ejona86 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (headerValue != null) { | ||
requestHash = hashFunc.hashAsciiString(headerValue); | ||
} else { | ||
requestHash = random.nextLong(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not thread-safe. See |
||
usingRandomHash = true; | ||
} | ||
} | ||
|
||
int targetIndex = getTargetIndex(requestHash); | ||
|
||
// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore | ||
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If | ||
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in | ||
// IDLE, we initiate a connection. | ||
for (int i = 0; i < ring.size(); i++) { | ||
int index = (targetIndex + i) % ring.size(); | ||
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey); | ||
ChildLbState childLbState = subchannelView.childLbState; | ||
|
||
if (subchannelView.connectivityState == READY) { | ||
return childLbState.getCurrentPicker().pickSubchannel(args); | ||
if (!usingRandomHash) { | ||
// Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore | ||
// all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If | ||
// CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in | ||
// IDLE, we initiate a connection. | ||
for (int i = 0; i < ring.size(); i++) { | ||
int index = (targetIndex + i) % ring.size(); | ||
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey); | ||
ChildLbState childLbState = subchannelView.childLbState; | ||
|
||
if (subchannelView.connectivityState == READY) { | ||
return childLbState.getCurrentPicker().pickSubchannel(args); | ||
} | ||
|
||
// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs | ||
// are failed unless there is a READY connection. | ||
if (subchannelView.connectivityState == CONNECTING) { | ||
return PickResult.withNoResult(); | ||
} | ||
|
||
if (subchannelView.connectivityState == IDLE) { | ||
syncContext.execute(() -> { | ||
childLbState.getLb().requestConnection(); | ||
}); | ||
|
||
return PickResult.withNoResult(); // Indicates that this should be retried after backoff | ||
} | ||
} | ||
|
||
// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs | ||
// are failed unless there is a READY connection. | ||
if (subchannelView.connectivityState == CONNECTING) { | ||
return PickResult.withNoResult(); | ||
} else { | ||
// Using a random hash. Find and use the first READY ring entry, triggering at most one | ||
// entry to attempt connection. | ||
boolean requestedConnection = hasEndpointInConnectingState; | ||
for (int i = 0; i < ring.size(); i++) { | ||
int index = (targetIndex + i) % ring.size(); | ||
SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey); | ||
ChildLbState childLbState = subchannelView.childLbState; | ||
if (subchannelView.connectivityState == READY) { | ||
return childLbState.getCurrentPicker().pickSubchannel(args); | ||
} | ||
if (!requestedConnection && subchannelView.connectivityState == IDLE) { | ||
syncContext.execute( | ||
() -> { | ||
childLbState.getLb().requestConnection(); | ||
}); | ||
requestedConnection = true; | ||
} | ||
} | ||
|
||
if (subchannelView.connectivityState == IDLE) { | ||
syncContext.execute(() -> { | ||
childLbState.getLb().requestConnection(); | ||
}); | ||
|
||
return PickResult.withNoResult(); // Indicates that this should be retried after backoff | ||
if (requestedConnection) { | ||
return PickResult.withNoResult(); | ||
} | ||
} | ||
|
||
|
@@ -460,20 +512,24 @@ | |
static final class RingHashConfig { | ||
final long minRingSize; | ||
final long maxRingSize; | ||
final String requestHashHeader; | ||
|
||
RingHashConfig(long minRingSize, long maxRingSize) { | ||
RingHashConfig(long minRingSize, long maxRingSize, String requestHashHeader) { | ||
checkArgument(minRingSize > 0, "minRingSize <= 0"); | ||
checkArgument(maxRingSize > 0, "maxRingSize <= 0"); | ||
checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize"); | ||
checkNotNull(requestHashHeader); | ||
this.minRingSize = minRingSize; | ||
this.maxRingSize = maxRingSize; | ||
this.requestHashHeader = requestHashHeader; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return MoreObjects.toStringHelper(this) | ||
.add("minRingSize", minRingSize) | ||
.add("maxRingSize", maxRingSize) | ||
.add("requestHashHeader", requestHashHeader) | ||
.toString(); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to share this object across pickers and even inject it when creating RingHashLB for tests. That way we don't have to deal with flakes.