diff --git a/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java b/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java index 420da66ba..b5070f1ee 100644 --- a/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java +++ b/rsocket-core/src/jcstress/java/io/rsocket/core/TestRequesterResponderSupport.java @@ -5,6 +5,7 @@ import io.rsocket.DuplexConnection; import io.rsocket.RSocket; import io.rsocket.frame.decoder.PayloadDecoder; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; public class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket { @@ -27,7 +28,8 @@ public TestRequesterResponderSupport( PayloadDecoder.ZERO_COPY, connection, streamIdSupplier, - __ -> null); + __ -> null, + Sinks.empty()); this.requesterLeaseTracker = requesterLeaseTracker; } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index b05241365..c0a5646d3 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -87,6 +87,8 @@ default double availability() { @Override default void dispose() {} + default void disposeGracefully() {} + @Override default boolean isDisposed() { return false; diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index de494c4e3..f8b10fc21 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -655,10 +655,16 @@ public Mono connect(Supplier transportSupplier) { requesterLeaseTracker = null; } + final Sinks.Empty requesterOnGracefulShutdownSink = + Sinks.unsafe().empty(); + final Sinks.Empty responderOnGracefulShutdownSink = + Sinks.unsafe().empty(); final Sinks.Empty requesterOnAllClosedSink = Sinks.unsafe().empty(); final Sinks.Empty responderOnAllClosedSink = Sinks.unsafe().empty(); + final Sinks.Empty requesterGracefulShutdownStartedSink = + Sinks.unsafe().empty(); RSocket rSocketRequester = new RSocketRequester( @@ -673,7 +679,12 @@ public Mono connect(Supplier transportSupplier) { keepAliveHandler, interceptors::initRequesterRequestInterceptor, requesterLeaseTracker, + requesterGracefulShutdownStartedSink, + requesterOnGracefulShutdownSink, requesterOnAllClosedSink, + Mono.whenDelayError( + responderOnGracefulShutdownSink.asMono(), + requesterOnGracefulShutdownSink.asMono()), Mono.whenDelayError( responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono())); @@ -725,7 +736,9 @@ public Mono connect(Supplier transportSupplier) { leases.sender) : interceptors ::initResponderRequestInterceptor, - responderOnAllClosedSink); + responderOnGracefulShutdownSink, + responderOnAllClosedSink, + requesterGracefulShutdownStartedSink.asMono()); return wrappedRSocketRequester; }) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 9e8d349bf..49b612cd0 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -23,6 +23,7 @@ import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.exceptions.ConnectionCloseException; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.exceptions.Exceptions; import io.rsocket.frame.ErrorFrameCodec; @@ -67,6 +68,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { @Nullable private final RequesterLeaseTracker requesterLeaseTracker; + private final Sinks.Empty onGracefulShutdownStartedSink; private final Sinks.Empty onThisSideClosedSink; private final Mono onAllClosed; private final KeepAliveFramesAcceptor keepAliveFramesAcceptor; @@ -83,7 +85,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { @Nullable KeepAliveHandler keepAliveHandler, Function requestInterceptorFunction, @Nullable RequesterLeaseTracker requesterLeaseTracker, + Sinks.Empty onGracefulShutdownStartedSink, + Sinks.Empty onGracefulShutdownSink, Sinks.Empty onThisSideClosedSink, + Mono onGracefulShutdownDone, Mono onAllClosed) { super( mtu, @@ -92,14 +97,17 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { payloadDecoder, connection, streamIdSupplier, - requestInterceptorFunction); + requestInterceptorFunction, + onGracefulShutdownSink); this.requesterLeaseTracker = requesterLeaseTracker; + this.onGracefulShutdownStartedSink = onGracefulShutdownStartedSink; this.onThisSideClosedSink = onThisSideClosedSink; this.onAllClosed = onAllClosed; // DO NOT Change the order here. The Send processor must be subscribed to before receiving connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown); + onGracefulShutdownDone.subscribe(null, null, connection::dispose); connection.receive().subscribe(this::handleIncomingFrames, e -> {}); @@ -200,6 +208,17 @@ public void dispose() { getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed")); } + @Override + public void disposeGracefully() { + getDuplexConnection() + .sendFrame( + 0, + ErrorFrameCodec.encode( + getAllocator(), 0, new ConnectionCloseException("Graceful Shutdown"))); + this.onGracefulShutdownStartedSink.tryEmitEmpty(); + super.terminate(); + } + @Override public boolean isDisposed() { return terminationError != null; @@ -351,6 +370,12 @@ private void tryTerminate(Supplier errorSupplier) { } if (terminationError == null) { Throwable e = errorSupplier.get(); + + if (e instanceof ConnectionCloseException) { + this.onGracefulShutdownStartedSink.tryEmitEmpty(); + super.terminate(); + return; + } if (TERMINATION_ERROR.compareAndSet(this, null, e)) { terminate(e); } else { @@ -417,7 +442,7 @@ private void terminate(Throwable e) { requesterLeaseTracker.dispose(e); } - final Collection activeStreamsCopy; + final Collection activeStreamsCopy; // in case of graceful shut down is empty synchronized (this) { final IntObjectMap activeStreams = this.activeStreams; activeStreamsCopy = new ArrayList<>(activeStreams.values()); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 50c5ba54c..05f33043f 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -73,7 +73,9 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { int maxFrameLength, int maxInboundPayloadSize, Function requestInterceptorFunction, - Sinks.Empty onThisSideClosedSink) { + Sinks.Empty onGracefulShutdownSink, + Sinks.Empty onThisSideClosedSink, + Mono onRequesterGracefulShutdownStarted) { super( mtu, maxFrameLength, @@ -81,7 +83,8 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { payloadDecoder, connection, null, - requestInterceptorFunction); + requestInterceptorFunction, + onGracefulShutdownSink); this.requestHandler = requestHandler; @@ -92,12 +95,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { .onClose() .subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose); + onRequesterGracefulShutdownStarted.subscribe(null, null, this::onGracefulShutdownStarted); + connection.receive().subscribe(this::handleFrame, e -> {}); } + private void onGracefulShutdownStarted() { + super.terminate(); + requestHandler.disposeGracefully(); + } + private void tryTerminateOnConnectionError(Throwable e) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Try terminate connection on responder side"); } tryTerminate(() -> e); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 0c68db6df..d5fd5813c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -438,8 +438,11 @@ private Mono acceptSetup( requesterLeaseTracker = null; } + final Sinks.Empty requesterOnGracefulShutdownSink = Sinks.unsafe().empty(); + final Sinks.Empty responderOnGracefulShutdownSink = Sinks.unsafe().empty(); final Sinks.Empty requesterOnAllClosedSink = Sinks.unsafe().empty(); final Sinks.Empty responderOnAllClosedSink = Sinks.unsafe().empty(); + final Sinks.Empty requesterGracefulShutdownStartedSink = Sinks.unsafe().empty(); RSocket rSocketRequester = new RSocketRequester( @@ -454,7 +457,12 @@ private Mono acceptSetup( keepAliveHandler, interceptors::initRequesterRequestInterceptor, requesterLeaseTracker, + requesterGracefulShutdownStartedSink, + requesterOnGracefulShutdownSink, requesterOnAllClosedSink, + Mono.whenDelayError( + responderOnGracefulShutdownSink.asMono(), + requesterOnGracefulShutdownSink.asMono()), Mono.whenDelayError( responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono())); @@ -489,7 +497,9 @@ private Mono acceptSetup( interceptors.initResponderRequestInterceptor( rSocket, (RequestInterceptor) leases.sender) : interceptors::initResponderRequestInterceptor, - responderOnAllClosedSink); + responderOnGracefulShutdownSink, + responderOnAllClosedSink, + requesterGracefulShutdownStartedSink.asMono()); }) .doFinally(signalType -> setupPayload.release()) .then(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java index bea7dc1aa..daee8dcb6 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java @@ -5,10 +5,12 @@ import io.netty.util.collection.IntObjectMap; import io.rsocket.DuplexConnection; import io.rsocket.RSocket; +import io.rsocket.exceptions.CanceledException; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RequestInterceptor; import java.util.Objects; import java.util.function.Function; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; class RequesterResponderSupport { @@ -19,11 +21,15 @@ class RequesterResponderSupport { private final PayloadDecoder payloadDecoder; private final ByteBufAllocator allocator; private final DuplexConnection connection; + private final Sinks.Empty onGracefulShutdownSink; @Nullable private final RequestInterceptor requestInterceptor; @Nullable final StreamIdSupplier streamIdSupplier; final IntObjectMap activeStreams; + boolean terminating; + boolean terminated; + public RequesterResponderSupport( int mtu, int maxFrameLength, @@ -31,7 +37,8 @@ public RequesterResponderSupport( PayloadDecoder payloadDecoder, DuplexConnection connection, @Nullable StreamIdSupplier streamIdSupplier, - Function requestInterceptorFunction) { + Function requestInterceptorFunction, + Sinks.Empty onGracefulShutdownSink) { this.activeStreams = new IntObjectHashMap<>(); this.mtu = mtu; @@ -41,6 +48,7 @@ public RequesterResponderSupport( this.allocator = connection.alloc(); this.streamIdSupplier = streamIdSupplier; this.connection = connection; + this.onGracefulShutdownSink = onGracefulShutdownSink; this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this); } @@ -88,6 +96,9 @@ public int getNextStreamId() { final StreamIdSupplier streamIdSupplier = this.streamIdSupplier; if (streamIdSupplier != null) { synchronized (this) { + if (this.terminating) { + throw new CanceledException("Disposed"); + } return streamIdSupplier.nextStreamId(this.activeStreams); } } else { @@ -107,6 +118,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) { if (streamIdSupplier != null) { final IntObjectMap activeStreams = this.activeStreams; synchronized (this) { + if (this.terminating) { + throw new CanceledException("Disposed"); + } + final int streamId = streamIdSupplier.nextStreamId(activeStreams); activeStreams.put(streamId, frameHandler); @@ -119,6 +134,11 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) { } public synchronized boolean add(int streamId, FrameHandler frameHandler) { + if (this.terminating) { + throw new CanceledException( + "This RSocket is either disposed or disposing, and no longer accepting new requests"); + } + final IntObjectMap activeStreams = this.activeStreams; // copy of Map.putIfAbsent(key, value) without `streamId` boxing final FrameHandler previousHandler = activeStreams.get(streamId); @@ -148,14 +168,45 @@ public synchronized FrameHandler get(int streamId) { * @return {@code true} if there is {@link FrameHandler} for the given {@code streamId} and the * instance equals to the passed one */ - public synchronized boolean remove(int streamId, FrameHandler frameHandler) { - final IntObjectMap activeStreams = this.activeStreams; - // copy of Map.remove(key, value) without `streamId` boxing - final FrameHandler curValue = activeStreams.get(streamId); - if (!Objects.equals(curValue, frameHandler)) { - return false; + public boolean remove(int streamId, FrameHandler frameHandler) { + final boolean terminated; + synchronized (this) { + final IntObjectMap activeStreams = this.activeStreams; + // copy of Map.remove(key, value) without `streamId` boxing + final FrameHandler curValue = activeStreams.get(streamId); + if (!Objects.equals(curValue, frameHandler)) { + return false; + } + activeStreams.remove(streamId); + if (this.terminating && activeStreams.size() == 0) { + terminated = true; + this.terminated = true; + } else { + terminated = false; + } + } + + if (terminated) { + onGracefulShutdownSink.tryEmitEmpty(); } - activeStreams.remove(streamId); return true; } + + public void terminate() { + final boolean terminated; + synchronized (this) { + this.terminating = true; + + if (activeStreams.size() == 0) { + terminated = true; + this.terminated = true; + } else { + terminated = false; + } + } + + if (terminated) { + onGracefulShutdownSink.tryEmitEmpty(); + } + } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java index 84576e6ce..795555e68 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java @@ -721,6 +721,8 @@ public static class ClientSocketRule extends AbstractSocketRule { protected Runnable delayer; protected Sinks.One producer; + protected Sinks.Empty onGracefulShutdownStartedSink; + protected Sinks.Empty thisGracefulShutdownSink; protected Sinks.Empty thisClosedSink; @Override @@ -740,6 +742,8 @@ protected void doInit() { @Override protected RSocket newRSocket() { + this.onGracefulShutdownStartedSink = Sinks.empty(); + this.thisGracefulShutdownSink = Sinks.empty(); this.thisClosedSink = Sinks.empty(); return new RSocketRequester( connection, @@ -753,7 +757,10 @@ protected RSocket newRSocket() { null, __ -> null, null, + onGracefulShutdownStartedSink, + thisGracefulShutdownSink, thisClosedSink, + thisGracefulShutdownSink.asMono(), thisClosedSink.asMono()); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java index 5be59235c..b1b01ec58 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java @@ -84,7 +84,10 @@ static RSocketState requester(int tickPeriod, int timeout) { new DefaultKeepAliveHandler(), r -> null, null, + Sinks.empty(), + Sinks.empty(), empty, + Sinks.empty().asMono(), empty.asMono()); return new RSocketState(rSocket, allocator, connection, empty); } @@ -117,7 +120,10 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { Mockito.mock(ResumeStateHolder.class)), __ -> null, null, + Sinks.empty(), + Sinks.empty(), onClose, + Sinks.empty().asMono(), onClose.asMono()); return new ResumableRSocketState(rSocket, connection, resumableConnection, onClose, allocator); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java index a461833d3..824169b23 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java @@ -92,6 +92,9 @@ class RSocketLeaseTest { private Sinks.Many leaseSender = Sinks.many().multicast().onBackpressureBuffer(); private RequesterLeaseTracker requesterLeaseTracker; + protected Sinks.Empty onGracefulShutdownStartedSink; + protected Sinks.Empty otherGracefulShutdownSink; + protected Sinks.Empty thisGracefulShutdownSink; protected Sinks.Empty thisClosedSink; protected Sinks.Empty otherClosedSink; @@ -103,6 +106,9 @@ void setUp() { connection = new TestDuplexConnection(byteBufAllocator); requesterLeaseTracker = new RequesterLeaseTracker(TAG, 0); responderLeaseTracker = new ResponderLeaseTracker(TAG, connection, () -> leaseSender.asFlux()); + this.onGracefulShutdownStartedSink = Sinks.empty(); + this.otherGracefulShutdownSink = Sinks.empty(); + this.thisGracefulShutdownSink = Sinks.empty(); this.thisClosedSink = Sinks.empty(); this.otherClosedSink = Sinks.empty(); @@ -121,7 +127,10 @@ void setUp() { null, __ -> null, requesterLeaseTracker, + onGracefulShutdownStartedSink, + thisGracefulShutdownSink, thisClosedSink, + otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()), otherClosedSink.asMono().and(thisClosedSink.asMono())); mockRSocketHandler = mock(RSocket.class); @@ -190,7 +199,9 @@ protected void hookOnError(Throwable throwable) { FRAME_LENGTH_MASK, Integer.MAX_VALUE, __ -> null, - otherClosedSink); + otherGracefulShutdownSink, + otherClosedSink, + onGracefulShutdownStartedSink.asMono()); } @AfterEach diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 01eb998c7..2bafe1be3 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -63,6 +63,9 @@ class RSocketRequesterSubscribersTest { private LeaksTrackingByteBufAllocator allocator; private RSocket rSocketRequester; private TestDuplexConnection connection; + protected Sinks.Empty onGracefulShutdownStartedSink; + protected Sinks.Empty otherGracefulShutdownSink; + protected Sinks.Empty thisGracefulShutdownSink; protected Sinks.Empty thisClosedSink; protected Sinks.Empty otherClosedSink; @@ -75,6 +78,9 @@ void tearDownAndCheckNoLeaks() { void setUp() { allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); connection = new TestDuplexConnection(allocator); + this.onGracefulShutdownStartedSink = Sinks.empty(); + this.otherGracefulShutdownSink = Sinks.empty(); + this.thisGracefulShutdownSink = Sinks.empty(); this.thisClosedSink = Sinks.empty(); this.otherClosedSink = Sinks.empty(); rSocketRequester = @@ -90,7 +96,10 @@ void setUp() { null, __ -> null, null, + onGracefulShutdownStartedSink, + thisGracefulShutdownSink, thisClosedSink, + otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()), otherClosedSink.asMono().and(thisClosedSink.asMono())); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index a1199f698..27e132665 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -82,6 +82,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Stream; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -96,6 +97,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.Disposable; import reactor.core.Scannable; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @@ -1469,13 +1471,70 @@ public void testWorkaround959(String type) { } } + @Test + public void testDisposeGracefully() { + System.out.println( + FrameHeaderCodec.frameType( + Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump("000000012400")))); + final RSocketRequester rSocketRequester = rule.socket; + final AssertSubscriber onGracefulShutdownSubscriber = + rule.thisGracefulShutdownSink.asMono().subscribeWith(AssertSubscriber.create()); + final AssertSubscriber onCloseSubscriber = + rSocketRequester.onClose().subscribeWith(new AssertSubscriber<>()); + + final Disposable stream = rSocketRequester.requestStream(EmptyPayload.INSTANCE).subscribe(); + + FrameAssert.assertThat(rule.connection.awaitFrame()) + .typeOf(REQUEST_STREAM) + .hasClientSideStreamId() + .hasNoLeaks(); + + Assertions.assertThat(rSocketRequester.isDisposed()).isFalse(); + + rSocketRequester.disposeGracefully(); + Assertions.assertThat(rSocketRequester.isDisposed()).isFalse(); + onGracefulShutdownSubscriber.assertNotTerminated(); + + FrameAssert.assertThat(rule.connection.awaitFrame()) + .typeOf(FrameType.ERROR) + .hasStreamIdZero() + .hasData("Graceful Shutdown") + .hasNoLeaks(); + + stream.dispose(); + Assertions.assertThat(rSocketRequester.isDisposed()).isFalse(); + Assertions.assertThat(rule.connection.isDisposed()).isFalse(); + onGracefulShutdownSubscriber.assertTerminated(); + + FrameAssert.assertThat(rule.connection.awaitFrame()) + .typeOf(CANCEL) + .hasClientSideStreamId() + .hasNoLeaks(); + + rule.otherGracefulShutdownSink.tryEmitEmpty(); + Assertions.assertThat(rSocketRequester.isDisposed()).isTrue(); + Assertions.assertThat(rule.connection.isDisposed()).isTrue(); + onCloseSubscriber.assertNotTerminated(); + + rule.otherClosedSink.tryEmitEmpty(); + onCloseSubscriber.assertTerminated(); + + rule.assertHasNoLeaks(); + } + public static class ClientSocketRule extends AbstractSocketRule { + protected Sinks.Empty onGracefulShutdownStartedSink; + protected Sinks.Empty otherGracefulShutdownSink; + protected Sinks.Empty thisGracefulShutdownSink; protected Sinks.Empty thisClosedSink; protected Sinks.Empty otherClosedSink; @Override protected RSocketRequester newRSocket() { + this.onGracefulShutdownStartedSink = Sinks.empty(); + this.otherGracefulShutdownSink = Sinks.empty(); + this.thisGracefulShutdownSink = Sinks.empty(); this.thisClosedSink = Sinks.empty(); this.otherClosedSink = Sinks.empty(); return new RSocketRequester( @@ -1490,7 +1549,10 @@ protected RSocketRequester newRSocket() { null, (__) -> null, null, + onGracefulShutdownStartedSink, + thisGracefulShutdownSink, thisClosedSink, + otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()), otherClosedSink.asMono().and(thisClosedSink.asMono())); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 4f689e396..e48e5a2f0 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -67,12 +67,14 @@ import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.time.Duration; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -1179,12 +1181,79 @@ public Flux requestChannel(Publisher payloads) { rule.assertHasNoLeaks(); } + @Test + void testGracefulShutdown() { + final AssertSubscriber onCloseSubscriber = AssertSubscriber.create(); + final AssertSubscriber onGracefulShutdownSubscriber = AssertSubscriber.create(); + final Sinks.Empty onDisposeGracefullySink = Sinks.unsafe().empty(); + + boolean[] disposed = new boolean[] {false}; + boolean[] disposedGracefully = new boolean[] {false}; + + rule.setAcceptingSocket( + new RSocket() { + + @Override + public Flux requestStream(Payload payload) { + return Flux.interval(Duration.ofMillis(100)) + .takeUntilOther(onDisposeGracefullySink.asMono()) + .map(tick -> ByteBufPayload.create(String.valueOf(tick))); + } + + @Override + public void dispose() { + disposed[0] = true; + } + + @Override + public void disposeGracefully() { + disposedGracefully[0] = true; + } + }); + + rule.connection.addToReceivedBuffer( + RequestStreamFrameCodec.encode( + rule.allocator, 1, false, Long.MAX_VALUE, null, Unpooled.EMPTY_BUFFER)); + + rule.onCloseSink.asMono().subscribe(onCloseSubscriber); + rule.onGracefulShutdownSink.asMono().subscribe(onGracefulShutdownSubscriber); + + rule.onGracefulShutdownStartedSink.tryEmitEmpty(); + Assertions.assertThat(disposed[0]).isFalse(); + Assertions.assertThat(disposedGracefully[0]).isTrue(); + Assertions.assertThat(rule.connection.isDisposed()).isFalse(); + onCloseSubscriber.assertNotTerminated(); + onGracefulShutdownSubscriber.assertNotTerminated(); + + onDisposeGracefullySink.tryEmitEmpty(); + Assertions.assertThat(disposed[0]).isFalse(); + Assertions.assertThat(disposedGracefully[0]).isTrue(); + Assertions.assertThat(rule.connection.isDisposed()).isFalse(); + onCloseSubscriber.assertNotTerminated(); + onGracefulShutdownSubscriber.assertTerminated(); + + ByteBuf possibleCompleteFrame = rule.connection.pollFrame(); + + if (possibleCompleteFrame != null) { + FrameAssert.assertThat(possibleCompleteFrame).typeOf(COMPLETE).hasNoLeaks(); + } + + rule.connection.dispose(); + Assertions.assertThat(disposed[0]).isTrue(); + Assertions.assertThat(disposedGracefully[0]).isTrue(); + Assertions.assertThat(rule.connection.isDisposed()).isTrue(); + onCloseSubscriber.assertTerminated(); + onGracefulShutdownSubscriber.assertTerminated(); + } + public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket; private volatile int prefetch; private RequestInterceptor requestInterceptor; + protected Sinks.Empty onGracefulShutdownSink; protected Sinks.Empty onCloseSink; + protected Sinks.Empty onGracefulShutdownStartedSink; @Override protected void doInit() { @@ -1221,7 +1290,9 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { @Override protected RSocketResponder newRSocket() { + onGracefulShutdownSink = Sinks.empty(); onCloseSink = Sinks.empty(); + onGracefulShutdownStartedSink = Sinks.empty(); return new RSocketResponder( connection, acceptingSocket, @@ -1231,7 +1302,9 @@ protected RSocketResponder newRSocket() { maxFrameLength, maxInboundPayloadSize, __ -> requestInterceptor, - onCloseSink); + onGracefulShutdownSink, + onCloseSink, + onGracefulShutdownStartedSink.asMono()); } private void sendRequest(int streamId, FrameType frameType) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index e01e6ebdc..43fb89586 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -515,6 +515,9 @@ public static class SocketRule { private RSocket requestAcceptor; private LeaksTrackingByteBufAllocator allocator; + protected Sinks.Empty onGracefulShutdownStartedSink; + protected Sinks.Empty otherGracefulShutdownSink; + protected Sinks.Empty thisGracefulShutdownSink; protected Sinks.Empty thisClosedSink; protected Sinks.Empty otherClosedSink; @@ -527,6 +530,9 @@ public void init() { serverProcessor = Sinks.many().multicast().directBestEffort(); clientProcessor = Sinks.many().multicast().directBestEffort(); + this.onGracefulShutdownStartedSink = Sinks.empty(); + this.otherGracefulShutdownSink = Sinks.empty(); + this.thisGracefulShutdownSink = Sinks.empty(); this.thisClosedSink = Sinks.empty(); this.otherClosedSink = Sinks.empty(); @@ -578,7 +584,9 @@ public Flux requestChannel(Publisher payloads) { FRAME_LENGTH_MASK, Integer.MAX_VALUE, __ -> null, - otherClosedSink); + otherGracefulShutdownSink, + otherClosedSink, + onGracefulShutdownStartedSink.asMono()); crs = new RSocketRequester( @@ -593,7 +601,10 @@ public Flux requestChannel(Publisher payloads) { null, __ -> null, null, + onGracefulShutdownStartedSink, + thisGracefulShutdownSink, thisClosedSink, + otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()), otherClosedSink.asMono().and(thisClosedSink.asMono())); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java index 87c3a865f..cbfbccd77 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java @@ -71,6 +71,8 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection conn = new TestDuplexConnection(allocator); + Sinks.Empty onGracefulShutdownStartedSink = Sinks.empty(); + Sinks.Empty onGracefulShutdownSink = Sinks.empty(); Sinks.Empty onThisSideClosedSink = Sinks.empty(); RSocketRequester rSocket = @@ -86,7 +88,10 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { null, __ -> null, null, + onGracefulShutdownStartedSink, + onGracefulShutdownSink, onThisSideClosedSink, + onGracefulShutdownSink.asMono(), onThisSideClosedSink.asMono()); String errorMsg = "error"; @@ -114,6 +119,8 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection conn = new TestDuplexConnection(allocator); + Sinks.Empty onGracefulShutdownStartedSink = Sinks.empty(); + Sinks.Empty onGracefulShutdownSink = Sinks.empty(); Sinks.Empty onThisSideClosedSink = Sinks.empty(); RSocketRequester rSocket = new RSocketRequester( @@ -128,7 +135,10 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() { null, __ -> null, null, + onGracefulShutdownStartedSink, + onGracefulShutdownSink, onThisSideClosedSink, + onGracefulShutdownSink.asMono(), onThisSideClosedSink.asMono()); conn.addToReceivedBuffer( diff --git a/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java b/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java index e282d72d5..8fb8cd37e 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java +++ b/rsocket-core/src/test/java/io/rsocket/core/TestRequesterResponderSupport.java @@ -34,6 +34,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.assertj.core.api.Assertions; import reactor.core.Exceptions; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; final class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket { @@ -58,7 +59,8 @@ final class TestRequesterResponderSupport extends RequesterResponderSupport impl PayloadDecoder.ZERO_COPY, connection, streamIdSupplier, - (__) -> requestInterceptor); + (__) -> requestInterceptor, + Sinks.empty()); this.error = error; } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java new file mode 100644 index 000000000..28eb65adf --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java @@ -0,0 +1,93 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.examples.transport.tcp.gracefulshutdown; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public final class ClientStreamingToServer { + + private static final Logger logger = LoggerFactory.getLogger(ClientStreamingToServer.class); + + public static void main(String[] args) throws InterruptedException { + RSocketServer.create( + (setup, sendingSocket) -> { + sendingSocket.disposeGracefully(); + + return Mono.just( + new RSocket() { + @Override + public Flux requestStream(Payload payload) { + return Flux.interval(Duration.ofMillis(100)) + .map(aLong -> DefaultPayload.create("Interval: " + aLong)); + } + + @Override + public void disposeGracefully() {} + }); + }) + .bindNow(TcpServerTransport.create("localhost", 7000)); + + RSocket socket = + RSocketConnector.create() + .acceptor( + SocketAcceptor.with( + new RSocket() { + @Override + public void disposeGracefully() {} + })) + .setupPayload(DefaultPayload.create("test", "test")) + .connect(TcpClientTransport.create("localhost", 7000)) + .block(); + + AtomicInteger counter = new AtomicInteger(); + + final Payload payload = DefaultPayload.create("Hello"); + socket + .requestStream(payload) + .map(Payload::getDataUtf8) + .doOnNext( + msg -> { + logger.debug(msg); + counter.incrementAndGet(); + }) + .subscribe(); + + logger.debug("dispose gracefully"); + socket.disposeGracefully(); + // + // Mono.delay(Duration.ofSeconds(10)) + // .doFinally((__) -> socket.dispose()) + // .subscribe(); + + socket.onClose().block(); + + logger.debug("Observe " + counter.get() + " of 100 events"); + } +} diff --git a/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java new file mode 100644 index 000000000..9d8ca96e0 --- /dev/null +++ b/rsocket-examples/src/test/java/io/rsocket/integration/GracefulShutdownIntegrationTest.java @@ -0,0 +1,177 @@ +/* + * Copyright 2015-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.ByteBufPayload; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.test.StepVerifier; + +public class GracefulShutdownIntegrationTest { + private RSocket handler; + private Sinks.Empty clientGracefulShutdownSink; + private Sinks.Empty serverGracefulShutdownSink; + private Disposable stream; + + private CloseableChannel server; + + @BeforeEach + public void startup() { + server = + RSocketServer.create( + (setup, sendingSocket) -> { + stream = + sendingSocket + .requestStream(ByteBufPayload.create("REQUEST", "META")) + .subscribe(); + return Mono.just(handler); + }) + .bind(TcpServerTransport.create("localhost", 0)) + .block(); + } + + @AfterEach + public void cleanup() { + server.dispose(); + } + + @Test + @Timeout(15_000L) + public void testCompleteWithoutNext() { + clientGracefulShutdownSink = Sinks.unsafe().empty(); + serverGracefulShutdownSink = Sinks.unsafe().empty(); + final AtomicBoolean gracefullyDisposedServer = new AtomicBoolean(); + final AtomicBoolean gracefullyDisposedClient = new AtomicBoolean(); + final AtomicBoolean disposedServer = new AtomicBoolean(); + final AtomicBoolean disposedClient = new AtomicBoolean(); + final Sinks.Empty requestHandled = Sinks.unsafe().empty(); + handler = + new RSocket() { + @Override + public Flux requestStream(Payload payload) { + payload.release(); + + requestHandled.tryEmitEmpty(); + return Flux.never().takeUntilOther(serverGracefulShutdownSink.asMono()); + } + + @Override + public void dispose() { + disposedServer.set(true); + } + + @Override + public void disposeGracefully() { + gracefullyDisposedServer.set(true); + } + }; + RSocket client = + RSocketConnector.create() + .acceptor( + SocketAcceptor.with( + new RSocket() { + @Override + public Flux requestStream(Payload payload) { + payload.release(); + return Flux.never() + .takeUntilOther(clientGracefulShutdownSink.asMono()); + } + + @Override + public void dispose() { + disposedClient.set(true); + } + + @Override + public void disposeGracefully() { + gracefullyDisposedClient.set(true); + } + })) + .connect(TcpClientTransport.create(server.address())) + .block(); + + AtomicReference terminalSignal = new AtomicReference<>(); + + StepVerifier clientRequestVerifier = + client + .requestStream(ByteBufPayload.create("REQUEST", "META")) + .onErrorResume( + t -> Mono.empty()) // FIXME: onComplete frame may not be delivered on time + .doFinally(terminalSignal::set) + .as(StepVerifier::create) + .expectSubscription() + .expectComplete() + .verifyLater(); + + requestHandled.asMono().block(Duration.ofSeconds(5)); + + assertThat(gracefullyDisposedServer).isFalse(); + assertThat(gracefullyDisposedClient).isFalse(); + + client.disposeGracefully(); + + assertThat(client.isDisposed()).isFalse(); + assertThat(gracefullyDisposedServer) + .as("gracefullyDisposedServer after disposeGracefully") + .isTrue(); + assertThat(gracefullyDisposedClient) + .as("gracefullyDisposedClient after disposeGracefully") + .isTrue(); + + assertThat(disposedServer).as("disposedServer after disposeGracefully").isFalse(); + assertThat(disposedClient).as("disposedClient after disposeGracefully").isFalse(); + assertThat(terminalSignal).as("terminalSignal after disposeGracefully").hasValue(null); + assertThat(stream.isDisposed()).isFalse(); + + clientGracefulShutdownSink.tryEmitEmpty(); + + assertThat(client.isDisposed()).isFalse(); + assertThat(terminalSignal) + .as("terminalSignal after clientGracefulShutdownSink.tryEmitEmpty") + .hasValue(null); + Awaitility.waitAtMost(Duration.ofSeconds(5)).until(() -> stream.isDisposed()); + + serverGracefulShutdownSink.tryEmitEmpty(); + + clientRequestVerifier.verify(Duration.ofSeconds(5)); + assertThat(client.isDisposed()).isTrue(); + assertThat(stream.isDisposed()).isTrue(); + Awaitility.waitAtMost(Duration.ofSeconds(5)).untilTrue(disposedServer); + Awaitility.waitAtMost(Duration.ofSeconds(5)).untilTrue(disposedClient); + } +}