Skip to content

Commit 37f84d1

Browse files
Merge branch 'grpc:master' into Issue_fixed_12142
2 parents e5272b3 + 6dfa03c commit 37f84d1

File tree

9 files changed

+218
-17
lines changed

9 files changed

+218
-17
lines changed

binder/src/main/java/io/grpc/binder/internal/Outbound.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static com.google.common.base.Preconditions.checkState;
2121
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
22-
import static java.lang.Math.max;
2322

2423
import android.os.Parcel;
2524
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -397,8 +396,7 @@ protected int writeSuffix(Parcel parcel) throws IOException {
397396
@GuardedBy("this")
398397
void setDeadline(Deadline deadline) {
399398
headers.discardAll(TIMEOUT_KEY);
400-
long effectiveTimeoutNanos = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
401-
headers.put(TIMEOUT_KEY, effectiveTimeoutNanos);
399+
headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
402400
}
403401
}
404402

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright 2025 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.census;
18+
19+
import com.google.common.base.Stopwatch;
20+
import com.google.common.base.Supplier;
21+
import io.grpc.ClientInterceptor;
22+
import io.grpc.ExperimentalApi;
23+
import io.grpc.ManagedChannelBuilder;
24+
import io.grpc.ServerBuilder;
25+
import io.grpc.ServerStreamTracer;
26+
import io.opencensus.trace.Tracing;
27+
28+
/**
29+
* The entrypoint for OpenCensus instrumentation functionality in gRPC.
30+
*
31+
* <p>GrpcCensus uses {@link io.opencensus.api.OpenCensus} APIs for instrumentation.
32+
*
33+
*/
34+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/12178")
35+
public final class GrpcCensus {
36+
37+
private final boolean statsEnabled;
38+
private final boolean tracingEnabled;
39+
40+
private GrpcCensus(Builder builder) {
41+
this.statsEnabled = builder.statsEnabled;
42+
this.tracingEnabled = builder.tracingEnabled;
43+
}
44+
45+
/**
46+
* Creates a new builder for {@link GrpcCensus}.
47+
*/
48+
public static Builder newBuilder() {
49+
return new Builder();
50+
}
51+
52+
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
53+
@Override
54+
public Stopwatch get() {
55+
return Stopwatch.createUnstarted();
56+
}
57+
};
58+
59+
/**
60+
* Configures a {@link ServerBuilder} to enable census stats and tracing.
61+
*
62+
* @param serverBuilder The server builder to configure.
63+
* @return The configured server builder.
64+
*/
65+
public <T extends ServerBuilder<T>> T configureServerBuilder(T serverBuilder) {
66+
if (statsEnabled) {
67+
serverBuilder.addStreamTracerFactory(newServerStatsStreamTracerFactory());
68+
}
69+
if (tracingEnabled) {
70+
serverBuilder.addStreamTracerFactory(newServerTracingStreamTracerFactory());
71+
}
72+
return serverBuilder;
73+
}
74+
75+
/**
76+
* Configures a {@link ManagedChannelBuilder} to enable census stats and tracing.
77+
*
78+
* @param channelBuilder The channel builder to configure.
79+
* @return The configured channel builder.
80+
*/
81+
public <T extends ManagedChannelBuilder<T>> T configureChannelBuilder(T channelBuilder) {
82+
if (statsEnabled) {
83+
channelBuilder.intercept(newClientStatsInterceptor());
84+
}
85+
if (tracingEnabled) {
86+
channelBuilder.intercept(newClientTracingInterceptor());
87+
}
88+
return channelBuilder;
89+
}
90+
91+
/**
92+
* Returns a {@link ClientInterceptor} with default stats implementation.
93+
*/
94+
private static ClientInterceptor newClientStatsInterceptor() {
95+
CensusStatsModule censusStats =
96+
new CensusStatsModule(
97+
STOPWATCH_SUPPLIER,
98+
true,
99+
true,
100+
true,
101+
false,
102+
true);
103+
return censusStats.getClientInterceptor();
104+
}
105+
106+
/**
107+
* Returns a {@link ClientInterceptor} with default tracing implementation.
108+
*/
109+
private static ClientInterceptor newClientTracingInterceptor() {
110+
CensusTracingModule censusTracing =
111+
new CensusTracingModule(
112+
Tracing.getTracer(),
113+
Tracing.getPropagationComponent().getBinaryFormat());
114+
return censusTracing.getClientInterceptor();
115+
}
116+
117+
/**
118+
* Returns a {@link ServerStreamTracer.Factory} with default stats implementation.
119+
*/
120+
private static ServerStreamTracer.Factory newServerStatsStreamTracerFactory() {
121+
CensusStatsModule censusStats =
122+
new CensusStatsModule(
123+
STOPWATCH_SUPPLIER,
124+
true,
125+
true,
126+
true,
127+
false,
128+
true);
129+
return censusStats.getServerTracerFactory();
130+
}
131+
132+
/**
133+
* Returns a {@link ServerStreamTracer.Factory} with default tracing implementation.
134+
*/
135+
private static ServerStreamTracer.Factory newServerTracingStreamTracerFactory() {
136+
CensusTracingModule censusTracing =
137+
new CensusTracingModule(
138+
Tracing.getTracer(),
139+
Tracing.getPropagationComponent().getBinaryFormat());
140+
return censusTracing.getServerTracerFactory();
141+
}
142+
143+
/**
144+
* Builder for {@link GrpcCensus}.
145+
*/
146+
public static final class Builder {
147+
private boolean statsEnabled = true;
148+
private boolean tracingEnabled = true;
149+
150+
private Builder() {
151+
}
152+
153+
/**
154+
* Disables stats collection.
155+
*/
156+
public Builder disableStats() {
157+
this.statsEnabled = false;
158+
return this;
159+
}
160+
161+
/**
162+
* Disables tracing.
163+
*/
164+
public Builder disableTracing() {
165+
this.tracingEnabled = false;
166+
return this;
167+
}
168+
169+
/**
170+
* Builds a new {@link GrpcCensus}.
171+
*/
172+
public GrpcCensus build() {
173+
return new GrpcCensus(this);
174+
}
175+
}
176+
}

core/src/main/java/io/grpc/internal/AbstractClientStream.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
2222
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
2323
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24-
import static java.lang.Math.max;
2524

2625
import com.google.common.annotations.VisibleForTesting;
2726
import com.google.common.base.Preconditions;
@@ -124,8 +123,7 @@ protected AbstractClientStream(
124123
@Override
125124
public void setDeadline(Deadline deadline) {
126125
headers.discardAll(TIMEOUT_KEY);
127-
long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
128-
headers.put(TIMEOUT_KEY, effectiveTimeout);
126+
headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
129127
}
130128

131129
@Override

core/src/main/java/io/grpc/internal/GrpcUtil.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,12 +651,14 @@ public Stopwatch get() {
651651
static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> {
652652

653653
@Override
654-
public String toAsciiString(Long timeoutNanos) {
654+
public String toAsciiString(Long timeoutNanosObject) {
655655
long cutoff = 100000000;
656+
// Timeout checking is inherently racy. RPCs with timeouts in the past ideally don't even get
657+
// here, but if the timeout is expired assume that happened recently and adjust it to the
658+
// smallest allowed timeout
659+
long timeoutNanos = Math.max(1, timeoutNanosObject);
656660
TimeUnit unit = TimeUnit.NANOSECONDS;
657-
if (timeoutNanos < 0) {
658-
throw new IllegalArgumentException("Timeout too small");
659-
} else if (timeoutNanos < cutoff) {
661+
if (timeoutNanos < cutoff) {
660662
return timeoutNanos + "n";
661663
} else if (timeoutNanos < cutoff * 1000L) {
662664
return unit.toMicros(timeoutNanos) + "u";

core/src/main/java/io/grpc/internal/ServerImplBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public static ServerBuilder<?> forPort(int port) {
9999
ServerCallExecutorSupplier executorSupplier;
100100

101101
/**
102-
* An interface to provide to provide transport specific information for the server. This method
102+
* An interface to provide transport specific information for the server. This method
103103
* is meant for Transport implementors and should not be used by normal users.
104104
*/
105105
public interface ClientTransportServersBuilder {

core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,24 @@ allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTr
465465
.isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600));
466466
}
467467

468+
@Test
469+
public void setDeadline_thePastBecomesPositive() {
470+
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
471+
ClientStream stream = new BaseAbstractClientStream(
472+
allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx,
473+
transportTracer);
474+
475+
stream.setDeadline(Deadline.after(-1, TimeUnit.NANOSECONDS));
476+
stream.start(mockListener);
477+
478+
ArgumentCaptor<Metadata> headersCaptor = ArgumentCaptor.forClass(Metadata.class);
479+
verify(sink).writeHeaders(headersCaptor.capture(), ArgumentMatchers.<byte[]>any());
480+
481+
Metadata headers = headersCaptor.getValue();
482+
assertThat(headers.get(Metadata.Key.of("grpc-timeout", Metadata.ASCII_STRING_MARSHALLER)))
483+
.isEqualTo("1n");
484+
}
485+
468486
@Test
469487
public void appendTimeoutInsight() {
470488
InsightBuilder insight = new InsightBuilder();

core/src/test/java/io/grpc/internal/GrpcUtilTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public void timeoutTest() {
9898
GrpcUtil.TimeoutMarshaller marshaller =
9999
new GrpcUtil.TimeoutMarshaller();
100100
// nanos
101-
assertEquals("0n", marshaller.toAsciiString(0L));
102-
assertEquals(0L, (long) marshaller.parseAsciiString("0n"));
101+
assertEquals("1n", marshaller.toAsciiString(1L));
102+
assertEquals(1L, (long) marshaller.parseAsciiString("1n"));
103103

104104
assertEquals("99999999n", marshaller.toAsciiString(99999999L));
105105
assertEquals(99999999L, (long) marshaller.parseAsciiString("99999999n"));

core/src/test/java/io/grpc/internal/ServerImplTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import io.grpc.Channel;
5454
import io.grpc.Compressor;
5555
import io.grpc.Context;
56+
import io.grpc.Deadline;
5657
import io.grpc.Grpc;
5758
import io.grpc.HandlerRegistry;
5859
import io.grpc.IntegerMarshaller;
@@ -1146,11 +1147,21 @@ public ServerCall.Listener<String> startCall(
11461147
@Test
11471148
public void testContextExpiredBeforeStreamCreate_StreamCancelNotCalledBeforeSetListener()
11481149
throws Exception {
1150+
builder.ticker = new Deadline.Ticker() {
1151+
private long time;
1152+
1153+
@Override
1154+
public long nanoTime() {
1155+
time += 1000;
1156+
return time;
1157+
}
1158+
};
1159+
11491160
AtomicBoolean contextCancelled = new AtomicBoolean(false);
11501161
AtomicReference<Context> context = new AtomicReference<>();
11511162
AtomicReference<ServerCall<String, Integer>> callReference = new AtomicReference<>();
11521163

1153-
testStreamClose_setup(callReference, context, contextCancelled, 0L);
1164+
testStreamClose_setup(callReference, context, contextCancelled, 1L);
11541165

11551166
// This assert that stream.setListener(jumpListener) is called before stream.cancel(), which
11561167
// prevents extremely short deadlines causing NPEs.

inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21-
import static java.lang.Math.max;
2221

2322
import com.google.common.base.MoreObjects;
2423
import com.google.common.io.ByteStreams;
@@ -939,8 +938,7 @@ public void setMaxOutboundMessageSize(int maxSize) {}
939938
@Override
940939
public void setDeadline(Deadline deadline) {
941940
headers.discardAll(TIMEOUT_KEY);
942-
long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
943-
headers.put(TIMEOUT_KEY, effectiveTimeout);
941+
headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
944942
}
945943

946944
@Override

0 commit comments

Comments
 (0)