Skip to content

Commit

Permalink
ensures connection is closed on keepalive timeout (#1118)
Browse files Browse the repository at this point in the history
* ensures connection is close on keepalive timeout

Signed-off-by: Oleh Dokuka <[email protected]>

* fix format

Signed-off-by: Oleh Dokuka <[email protected]>

* improve KeepaliveTest

Signed-off-by: Oleh Dokuka <[email protected]>

* fix format and failing test

Signed-off-by: Oleh Dokuka <[email protected]>

* adds reference to the original GH issue

Signed-off-by: Oleh Dokuka <[email protected]>

* fixes google format

Signed-off-by: Oleh Dokuka <[email protected]>

---------

Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka authored Jan 31, 2025
1 parent 9bc30c4 commit ccd67ba
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
() ->
new ConnectionErrorException(
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
getDuplexConnection().dispose();
}

private void tryShutdown(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package io.rsocket.integration;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketClient;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
import reactor.test.StepVerifier;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

/**
* Test case that reproduces the following <a
* href="https://github.com/rsocket/rsocket-java/issues/1099">GitHub Issue</a>
*/
public class KeepaliveTest {

private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class);
private static final int PORT = 23200;

private CloseableChannel server;

@BeforeEach
void setUp() {
server = createServer().block();
}

@AfterEach
void tearDown() {
server.dispose();
server.onClose().block();
}

@Test
void keepAliveTest() {
RSocketClient rsocketClient = createClient();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(
i ->
rsocketClient
.requestResponse(Mono.just(DefaultPayload.create("")))
.doOnNext(
__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete()))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

@Test
void keepAliveTestLazy() {
Mono<RSocket> rsocketMono = createClientLazy();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(
i ->
rsocketMono.flatMap(
rsocket ->
rsocket
.requestResponse(DefaultPayload.create(""))
.doOnNext(
__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete())))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

private static Mono<CloseableChannel> createServer() {
LOG.info("Starting server at port {}", PORT);

TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT);

return RSocketServer.create(
(setupPayload, rSocket) -> {
rSocket
.onClose()
.doFirst(() -> LOG.info("Connected on server side."))
.doOnTerminate(() -> LOG.info("Connection closed on server side."))
.subscribe();

return Mono.just(new MyServerRsocket());
})
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(tcpServer))
.doOnNext(closeableChannel -> LOG.info("RSocket server started."));
}

private static RSocketClient createClient() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec =
reason ->
Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

Mono<RSocket> rsocketMono =
RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));

RSocketClient client = RSocketClient.from(rsocketMono);

client
.source()
.doOnNext(r -> LOG.info("Got RSocket"))
.flatMap(RSocket::onClose)
.doOnError(err -> LOG.error("Error during onClose.", err))
.retryWhen(reconnectSpec.apply("client-close"))
.doFirst(() -> LOG.info("Connected on client side."))
.doOnTerminate(() -> LOG.info("Connection closed on client side."))
.repeat()
.subscribe();

return client;
}

private static Mono<RSocket> createClientLazy() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec =
reason ->
Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

return RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));
}

public static class MyServerRsocket implements RSocket {

@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just("Pong").map(DefaultPayload::create);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<logger name="io.rsocket.FrameLogger" level="INFO"/>
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
<logger name="io.rsocket.transport.netty" level="INFO"/>
<logger name="io.rsocket.FrameLogger" level="INFO"/>
<logger name="io.rsocket.core.RSocketRequester" level="DEBUG"/>
<logger name="io.rsocket.core.RSocketResponder" level="DEBUG"/>
<logger name="io.rsocket.test.TransportTest" level="DEBUG"/>
Expand Down

0 comments on commit ccd67ba

Please sign in to comment.