diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index a535330f4b1..8567d526a08 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -74,6 +74,7 @@ protected interface Sink { private final StatsTraceContext statsTraceCtx; private boolean outboundClosed; private boolean headersSent; + private boolean closeCalled; protected AbstractServerStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) { @@ -120,6 +121,7 @@ public final void deliverFrame( @Override public final void close(Status status, Metadata trailers) { + Preconditions.checkState(!closeCalled, "call already closed"); Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(trailers, "trailers"); if (!outboundClosed) { @@ -130,6 +132,7 @@ public final void close(Status status, Metadata trailers) { // closedStatus is only set from here, and is read from a place that has happen-after // guarantees with respect to here. transportState().setClosedStatus(status); + closeCalled = true; abstractServerStreamSink().writeTrailers(trailers, headersSent, status); } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..f01fbee69a5 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server // Only accessed from callExecutor. private ServerStreamListener listener; - public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor, + ServerStream stream, Context.CancellableContext context, Tag tag) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; @@ -808,10 +808,13 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose(Throwable t) { - // TODO(ejona86): this is not thread-safe :) + void internalClose(Throwable t) { String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); + Metadata metadata = Status.trailersFromThrowable(t); + if (metadata == null) { + metadata = new Metadata(); + } + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3125edca1e6..a138f024c30 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -78,7 +78,6 @@ import io.grpc.StringMarshaller; import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; -import io.grpc.internal.SingleMessageProducer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.util.MutableHandlerRegistry; import io.perfmark.PerfMark; @@ -1535,6 +1534,25 @@ public void channelz_transport_membershp() throws Exception { assertTrue(after.end); } + @Test + public void testInternalClose_withNullMetadata() { + JumpToApplicationThreadServerStreamListener listener + = new JumpToApplicationThreadServerStreamListener( + executor.getScheduledExecutorService(), + executor.getScheduledExecutorService(), + stream, + Context.ROOT.withCancellation(), + PerfMark.createTag()); + Throwable throwableMock = mock(Throwable.class); + // Stub Status.trailersFromThrowable to return null, simulating the case where metadata is null + when(Status.trailersFromThrowable(throwableMock)).thenReturn(null); + listener.internalClose(throwableMock); + // Capture the arguments passed to stream.close() + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); + } + private void createAndStartServer() throws IOException { createServer(); server.start(); diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index aea7ff49032..abb8b124a04 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -1573,6 +1573,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception { verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1); } + @SuppressWarnings("MissingFail") @Test public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener); @@ -1597,10 +1598,14 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata(), true); - server.stream.writeMessage(methodDescriptor.streamResponse("response")); - server.stream.close(Status.INTERNAL, new Metadata()); + try { + // Ensure that for a closed ServerStream, interactions are noops + server.stream.writeHeaders(new Metadata(), true); + server.stream.writeMessage(methodDescriptor.streamResponse("response")); + server.stream.close(Status.INTERNAL, new Metadata()); + } catch (Exception exception) { + assertTrue(exception.getMessage().contains("call already closed")); + } // Make sure new streams still work properly doPingPong(serverListener);