Skip to content

Commit

Permalink
Use volatile
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaspeaks committed Feb 12, 2025
1 parent 00a85b8 commit e12f8ea
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -173,11 +172,11 @@ public final ClientStream newStream(
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(
PickSubchannelArgs args, ClientStreamTracer[] tracers, PickResult pickResult) {
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus.set(pickResult.getStatus());
pendingStream.lastPickStatus = pickResult.getStatus();
}
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
Expand Down Expand Up @@ -298,10 +297,8 @@ final void reprocess(@Nullable SubchannelPicker picker) {
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
synchronized (lock) {
if (callOptions.isWaitForReady() && pickResult.hasResult()) {
stream.lastPickStatus.set(pickResult.getStatus());
}
if (callOptions.isWaitForReady() && pickResult.hasResult()) {
stream.lastPickStatus = pickResult.getStatus();
}
// User code provided authority takes precedence over the LB provided one.
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
Expand Down Expand Up @@ -363,7 +360,7 @@ private class PendingStream extends DelayedStream {
private final PickSubchannelArgs args;
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private final AtomicReference<Status> lastPickStatus = new AtomicReference<>(null);
private volatile Status lastPickStatus;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
this.args = args;
Expand Down Expand Up @@ -420,9 +417,8 @@ protected void onEarlyCancellation(Status reason) {
public void appendTimeoutInsight(InsightBuilder insight) {
if (args.getCallOptions().isWaitForReady()) {
insight.append("wait_for_ready");
Status status = lastPickStatus.get();
if (status != null && !status.isOk()) {
insight.appendKeyValue("Last Pick Failure", status);
if (lastPickStatus != null && !lastPickStatus.isOk()) {
insight.appendKeyValue("Last Pick Failure", lastPickStatus);
}
}
super.appendTimeoutInsight(insight);
Expand Down

0 comments on commit e12f8ea

Please sign in to comment.