Skip to content

Commit

Permalink
adds graceful shutdown support
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
OlegDokuka authored and OlegDokuka committed Apr 29, 2023
1 parent 6d07389 commit 3bf4de8
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.frame.decoder.PayloadDecoder;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

public class TestRequesterResponderSupport extends RequesterResponderSupport implements RSocket {
Expand All @@ -27,7 +28,8 @@ public TestRequesterResponderSupport(
PayloadDecoder.ZERO_COPY,
connection,
streamIdSupplier,
__ -> null);
__ -> null,
Sinks.empty());
this.requesterLeaseTracker = requesterLeaseTracker;
}

Expand Down
2 changes: 2 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/RSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ default double availability() {
@Override
default void dispose() {}

default void disposeGracefully() {}

@Override
default boolean isDisposed() {
return false;
Expand Down
15 changes: 14 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,16 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnGracefulShutdownSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnGracefulShutdownSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> requesterOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink =
Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
Expand All @@ -673,7 +679,12 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker,
requesterGracefulShutdownStartedSink,
requesterOnGracefulShutdownSink,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnGracefulShutdownSink.asMono(),
requesterOnGracefulShutdownSink.asMono()),
Mono.whenDelayError(
responderOnAllClosedSink.asMono(),
requesterOnAllClosedSink.asMono()));
Expand Down Expand Up @@ -725,7 +736,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
leases.sender)
: interceptors
::initResponderRequestInterceptor,
responderOnAllClosedSink);
responderOnGracefulShutdownSink,
responderOnAllClosedSink,
requesterGracefulShutdownStartedSink.asMono());

return wrappedRSocketRequester;
})
Expand Down
29 changes: 27 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.ConnectionCloseException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.ErrorFrameCodec;
Expand Down Expand Up @@ -67,6 +68,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {

@Nullable private final RequesterLeaseTracker requesterLeaseTracker;

private final Sinks.Empty<Void> onGracefulShutdownStartedSink;
private final Sinks.Empty<Void> onThisSideClosedSink;
private final Mono<Void> onAllClosed;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
Expand All @@ -83,7 +85,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
@Nullable KeepAliveHandler keepAliveHandler,
Function<RSocket, RequestInterceptor> requestInterceptorFunction,
@Nullable RequesterLeaseTracker requesterLeaseTracker,
Sinks.Empty<Void> onGracefulShutdownStartedSink,
Sinks.Empty<Void> onGracefulShutdownSink,
Sinks.Empty<Void> onThisSideClosedSink,
Mono<Void> onGracefulShutdownDone,
Mono<Void> onAllClosed) {
super(
mtu,
Expand All @@ -92,14 +97,17 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
payloadDecoder,
connection,
streamIdSupplier,
requestInterceptorFunction);
requestInterceptorFunction,
onGracefulShutdownSink);

this.requesterLeaseTracker = requesterLeaseTracker;
this.onGracefulShutdownStartedSink = onGracefulShutdownStartedSink;
this.onThisSideClosedSink = onThisSideClosedSink;
this.onAllClosed = onAllClosed;

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);
onGracefulShutdownDone.subscribe(null, null, connection::dispose);

connection.receive().subscribe(this::handleIncomingFrames, e -> {});

Expand Down Expand Up @@ -200,6 +208,17 @@ public void dispose() {
getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
}

@Override
public void disposeGracefully() {
getDuplexConnection()
.sendFrame(
0,
ErrorFrameCodec.encode(
getAllocator(), 0, new ConnectionCloseException("Graceful Shutdown")));
this.onGracefulShutdownStartedSink.tryEmitEmpty();
super.terminate();
}

@Override
public boolean isDisposed() {
return terminationError != null;
Expand Down Expand Up @@ -351,6 +370,12 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
}
if (terminationError == null) {
Throwable e = errorSupplier.get();

if (e instanceof ConnectionCloseException) {
this.onGracefulShutdownStartedSink.tryEmitEmpty();
super.terminate();
return;
}
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
terminate(e);
} else {
Expand Down Expand Up @@ -417,7 +442,7 @@ private void terminate(Throwable e) {
requesterLeaseTracker.dispose(e);
}

final Collection<FrameHandler> activeStreamsCopy;
final Collection<FrameHandler> activeStreamsCopy; // in case of graceful shut down is empty
synchronized (this) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
activeStreamsCopy = new ArrayList<>(activeStreams.values());
Expand Down
15 changes: 12 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
int maxFrameLength,
int maxInboundPayloadSize,
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
Sinks.Empty<Void> onThisSideClosedSink) {
Sinks.Empty<Void> onGracefulShutdownSink,
Sinks.Empty<Void> onThisSideClosedSink,
Mono<Void> onRequesterGracefulShutdownStarted) {
super(
mtu,
maxFrameLength,
maxInboundPayloadSize,
payloadDecoder,
connection,
null,
requestInterceptorFunction);
requestInterceptorFunction,
onGracefulShutdownSink);

this.requestHandler = requestHandler;

Expand All @@ -92,12 +95,18 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
.onClose()
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);

onRequesterGracefulShutdownStarted.subscribe(null, null, this::onGracefulShutdownStarted);

connection.receive().subscribe(this::handleFrame, e -> {});
}

private void onGracefulShutdownStarted() {
super.terminate();
requestHandler.disposeGracefully();
}

private void tryTerminateOnConnectionError(Throwable e) {
if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Try terminate connection on responder side");
}
tryTerminate(() -> e);
Expand Down
12 changes: 11 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,11 @@ private Mono<Void> acceptSetup(
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnGracefulShutdownSink = Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnGracefulShutdownSink = Sinks.unsafe().empty();
final Sinks.Empty<Void> requesterOnAllClosedSink = Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink = Sinks.unsafe().empty();
final Sinks.Empty<Void> requesterGracefulShutdownStartedSink = Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
Expand All @@ -454,7 +457,12 @@ private Mono<Void> acceptSetup(
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker,
requesterGracefulShutdownStartedSink,
requesterOnGracefulShutdownSink,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnGracefulShutdownSink.asMono(),
requesterOnGracefulShutdownSink.asMono()),
Mono.whenDelayError(
responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono()));

Expand Down Expand Up @@ -489,7 +497,9 @@ private Mono<Void> acceptSetup(
interceptors.initResponderRequestInterceptor(
rSocket, (RequestInterceptor) leases.sender)
: interceptors::initResponderRequestInterceptor,
responderOnAllClosedSink);
responderOnGracefulShutdownSink,
responderOnAllClosedSink,
requesterGracefulShutdownStartedSink.asMono());
})
.doFinally(signalType -> setupPayload.release())
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.exceptions.CanceledException;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.util.Objects;
import java.util.function.Function;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

class RequesterResponderSupport {
Expand All @@ -19,19 +21,24 @@ class RequesterResponderSupport {
private final PayloadDecoder payloadDecoder;
private final ByteBufAllocator allocator;
private final DuplexConnection connection;
private final Sinks.Empty<Void> onGracefulShutdownSink;
@Nullable private final RequestInterceptor requestInterceptor;

@Nullable final StreamIdSupplier streamIdSupplier;
final IntObjectMap<FrameHandler> activeStreams;

boolean terminating;
boolean terminated;

public RequesterResponderSupport(
int mtu,
int maxFrameLength,
int maxInboundPayloadSize,
PayloadDecoder payloadDecoder,
DuplexConnection connection,
@Nullable StreamIdSupplier streamIdSupplier,
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction,
Sinks.Empty<Void> onGracefulShutdownSink) {

this.activeStreams = new IntObjectHashMap<>();
this.mtu = mtu;
Expand All @@ -41,6 +48,7 @@ public RequesterResponderSupport(
this.allocator = connection.alloc();
this.streamIdSupplier = streamIdSupplier;
this.connection = connection;
this.onGracefulShutdownSink = onGracefulShutdownSink;
this.requestInterceptor = requestInterceptorFunction.apply((RSocket) this);
}

Expand Down Expand Up @@ -88,6 +96,9 @@ public int getNextStreamId() {
final StreamIdSupplier streamIdSupplier = this.streamIdSupplier;
if (streamIdSupplier != null) {
synchronized (this) {
if (this.terminating) {
throw new CanceledException("Disposed");
}
return streamIdSupplier.nextStreamId(this.activeStreams);
}
} else {
Expand All @@ -107,6 +118,10 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
if (streamIdSupplier != null) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
synchronized (this) {
if (this.terminating) {
throw new CanceledException("Disposed");
}

final int streamId = streamIdSupplier.nextStreamId(activeStreams);

activeStreams.put(streamId, frameHandler);
Expand All @@ -119,6 +134,11 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
}

public synchronized boolean add(int streamId, FrameHandler frameHandler) {
if (this.terminating) {
throw new CanceledException(
"This RSocket is either disposed or disposing, and no longer accepting new requests");
}

final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
// copy of Map.putIfAbsent(key, value) without `streamId` boxing
final FrameHandler previousHandler = activeStreams.get(streamId);
Expand Down Expand Up @@ -148,14 +168,45 @@ public synchronized FrameHandler get(int streamId) {
* @return {@code true} if there is {@link FrameHandler} for the given {@code streamId} and the
* instance equals to the passed one
*/
public synchronized boolean remove(int streamId, FrameHandler frameHandler) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
// copy of Map.remove(key, value) without `streamId` boxing
final FrameHandler curValue = activeStreams.get(streamId);
if (!Objects.equals(curValue, frameHandler)) {
return false;
public boolean remove(int streamId, FrameHandler frameHandler) {
final boolean terminated;
synchronized (this) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
// copy of Map.remove(key, value) without `streamId` boxing
final FrameHandler curValue = activeStreams.get(streamId);
if (!Objects.equals(curValue, frameHandler)) {
return false;
}
activeStreams.remove(streamId);
if (this.terminating && activeStreams.size() == 0) {
terminated = true;
this.terminated = true;
} else {
terminated = false;
}
}

if (terminated) {
onGracefulShutdownSink.tryEmitEmpty();
}
activeStreams.remove(streamId);
return true;
}

public void terminate() {
final boolean terminated;
synchronized (this) {
this.terminating = true;

if (activeStreams.size() == 0) {
terminated = true;
this.terminated = true;
} else {
terminated = false;
}
}

if (terminated) {
onGracefulShutdownSink.tryEmitEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
protected Runnable delayer;
protected Sinks.One<RSocket> producer;

protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
protected Sinks.Empty<Void> thisGracefulShutdownSink;
protected Sinks.Empty<Void> thisClosedSink;

@Override
Expand All @@ -740,6 +742,8 @@ protected void doInit() {

@Override
protected RSocket newRSocket() {
this.onGracefulShutdownStartedSink = Sinks.empty();
this.thisGracefulShutdownSink = Sinks.empty();
this.thisClosedSink = Sinks.empty();
return new RSocketRequester(
connection,
Expand All @@ -753,7 +757,10 @@ protected RSocket newRSocket() {
null,
__ -> null,
null,
onGracefulShutdownStartedSink,
thisGracefulShutdownSink,
thisClosedSink,
thisGracefulShutdownSink.asMono(),
thisClosedSink.asMono());
}
}
Expand Down
Loading

0 comments on commit 3bf4de8

Please sign in to comment.