Skip to content

core: Added changes to make ServerImpl.internalClose() thread-safe #11924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected interface Sink {
private final StatsTraceContext statsTraceCtx;
private boolean outboundClosed;
private boolean headersSent;
private boolean closeCalled;

protected AbstractServerStream(
WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
Expand Down Expand Up @@ -120,6 +121,7 @@ public final void deliverFrame(

@Override
public final void close(Status status, Metadata trailers) {
Preconditions.checkState(!closeCalled, "call already closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is precondition the right thing to do or will logging an error message do? With an example callstack for close

close:123, AbstractServerStream (io.grpc.internal)            
closeInternal:227, ServerCallImpl (io.grpc.internal)
close:213, ServerCallImpl (io.grpc.internal)
onCompleted:395, ServerCalls$ServerCallStreamObserverImpl (io.grpc.stub)
sayHello:104, HelloWorldServer$GreeterImpl (io.grpc.examples.helloworld)
invoke:285, GreeterGrpc$MethodHandlers (io.grpc.examples.helloworld)
onHalfClose:182, ServerCalls$UnaryServerCallHandler$UnaryServerCallListener (io.grpc.stub)
halfClosed:356, ServerCallImpl$ServerStreamListenerImpl (io.grpc.internal)
runInContext:861, ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed (io.grpc.internal)  
run:37, ContextRunnable (io.grpc.internal)
run:133, SerializingExecutor (io.grpc.internal)                             
runWorker:1144, ThreadPoolExecutor (java.util.concurrent)
run:642, ThreadPoolExecutor$Worker (java.util.concurrent)
runWith:1596, Thread (java.lang)
run:1583, Thread (java.lang)

Failed precondition will cause the exception to be caught at runInContext:861, ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed and again call internalClose and rethrow the exception which then will be caught and logged in run:133, SerializingExecutor.
If the application continues to attempt to write messages these steps will keep repeated for each attempt and the application is not going to come to know of it anyway. It may be better to just write an error log instead. @ejona86 to confirm.

Preconditions.checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers");
if (!outboundClosed) {
Copy link
Contributor

@kannanjgithub kannanjgithub Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If internalClose was called before, outboundClosed would have been set to true and any further calls to close() from anywhere else already skips over if outboundClosed is true. Your changes here don't seem to be achieving anything extra.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make the check apply for usages of stream in ServerCallImpl (application thread usages of the stream) if the stream had already been closed. Refer comment. Also we should not need to add another field to track the stream closed status since there is already an outboundClosed field and this should be able to be used for such checks.

Expand All @@ -130,6 +132,7 @@ public final void close(Status status, Metadata trailers) {
// closedStatus is only set from here, and is read from a place that has happen-after
// guarantees with respect to here.
transportState().setClosedStatus(status);
closeCalled = true;
abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
}
}
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server
// Only accessed from callExecutor.
private ServerStreamListener listener;

public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor,
ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
Expand Down Expand Up @@ -808,10 +808,13 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
void internalClose(Throwable t) {
String description = "Application error processing RPC";
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
}
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata);
}

@Override
Expand Down
20 changes: 19 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import io.grpc.StringMarshaller;
import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SingleMessageProducer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.util.MutableHandlerRegistry;
import io.perfmark.PerfMark;
Expand Down Expand Up @@ -1535,6 +1534,25 @@ public void channelz_transport_membershp() throws Exception {
assertTrue(after.end);
}

@Test
public void testInternalClose_withNullMetadata() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Throwable throwableMock = mock(Throwable.class);
// Stub Status.trailersFromThrowable to return null, simulating the case where metadata is null
when(Status.trailersFromThrowable(throwableMock)).thenReturn(null);
listener.internalClose(throwableMock);
// Capture the arguments passed to stream.close()
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
}

private void createAndStartServer() throws IOException {
createServer();
server.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception {
verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1);
}

@SuppressWarnings("MissingFail")
@Test
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);
Expand All @@ -1597,10 +1598,14 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));

// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
try {
// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("call already closed"));
}

// Make sure new streams still work properly
doPingPong(serverListener);
Expand Down