Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Jan 31, 2025
1 parent 0b3a9a5 commit 6ff8f71
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,9 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu
private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
tryTerminate(
() -> {
ConnectionErrorException exception = new ConnectionErrorException(String.format(
"No keep-alive acks for %d ms",
keepAlive.getTimeout()
.toMillis()));

ConnectionErrorException exception =
new ConnectionErrorException(
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()));

getDuplexConnection().dispose();
return exception;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package io.rsocket.integration;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketClient;
Expand All @@ -14,6 +10,9 @@
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,152 +26,163 @@

public class KeepaliveTest {

private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class);
private static final int PORT = 23200;

@Test
void keepAliveTest() {
createServer().block();
RSocketClient rsocketClient = createClient();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(i ->
rsocketClient.requestResponse(Mono.just(DefaultPayload.create("")))
.doOnNext(__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete()
))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

@Test
void keepAliveTestLazy() {
createServer().block();
Mono<RSocket> rsocketMono = createClientLazy();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(i ->
rsocketMono.flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create(""))
.doOnNext(__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete()
)
))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

private static Mono<CloseableChannel> createServer() {
LOG.info("Starting server at port {}", PORT);

TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT);

return RSocketServer.create((setupPayload, rSocket) -> {
rSocket.onClose()
.doFirst(() -> LOG.info("Connected on server side."))
.doOnTerminate(() -> LOG.info("Connection closed on server side."))
.subscribe();

return Mono.just(new MyServerRsocket());
})
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(tcpServer))
.doOnNext(closeableChannel -> LOG.info("RSocket server started."));
}

private static RSocketClient createClient() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

Mono<RSocket> rsocketMono = RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));

RSocketClient client = RSocketClient.from(rsocketMono);

client
.source()
.doOnNext(r -> LOG.info("Got RSocket"))
.flatMap(RSocket::onClose)
.doOnError(err -> LOG.error("Error during onClose.", err))
.retryWhen(reconnectSpec.apply("client-close"))
.doFirst(() -> LOG.info("Connected on client side."))
.doOnTerminate(() -> LOG.info("Connection closed on client side."))
.repeat()
.subscribe();

return client;
}


private static Mono<RSocket> createClientLazy() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec = reason -> Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

return RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));

// RSocketClient client = RSocketClient.from(rsocketMono);

// client
// .source()
// .doOnNext(r -> LOG.info("Got RSocket"))
// .flatMap(RSocket::onClose)
// .doOnError(err -> LOG.error("Error during onClose.", err))
// .retryWhen(reconnectSpec.apply("client-close"))
// .doFirst(() -> LOG.info("Connected on client side."))
// .doOnTerminate(() -> LOG.info("Connection closed on client side."))
// .repeat()
// .subscribe();

// return client;
}

public static class MyServerRsocket implements RSocket {

@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just("Pong").map(DefaultPayload::create);
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(KeepaliveTest.class);
private static final int PORT = 23200;

@Test
void keepAliveTest() {
createServer().block();
RSocketClient rsocketClient = createClient();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(
i ->
rsocketClient
.requestResponse(Mono.just(DefaultPayload.create("")))
.doOnNext(
__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete()))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

@Test
void keepAliveTestLazy() {
createServer().block();
Mono<RSocket> rsocketMono = createClientLazy();

int expectedCount = 4;
AtomicBoolean sleepOnce = new AtomicBoolean(true);
StepVerifier.create(
Flux.range(0, expectedCount)
.delayElements(Duration.ofMillis(2000))
.concatMap(
i ->
rsocketMono.flatMap(
rsocket ->
rsocket
.requestResponse(DefaultPayload.create(""))
.doOnNext(
__ -> {
if (sleepOnce.getAndSet(false)) {
try {
LOG.info("Sleeping...");
Thread.sleep(1_000);
LOG.info("Waking up.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
})
.log("id " + i)
.onErrorComplete())))
.expectSubscription()
.expectNextCount(expectedCount)
.verifyComplete();
}

private static Mono<CloseableChannel> createServer() {
LOG.info("Starting server at port {}", PORT);

TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT);

return RSocketServer.create(
(setupPayload, rSocket) -> {
rSocket
.onClose()
.doFirst(() -> LOG.info("Connected on server side."))
.doOnTerminate(() -> LOG.info("Connection closed on server side."))
.subscribe();

return Mono.just(new MyServerRsocket());
})
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(tcpServer))
.doOnNext(closeableChannel -> LOG.info("RSocket server started."));
}

private static RSocketClient createClient() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec =
reason ->
Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

Mono<RSocket> rsocketMono =
RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));

RSocketClient client = RSocketClient.from(rsocketMono);

client
.source()
.doOnNext(r -> LOG.info("Got RSocket"))
.flatMap(RSocket::onClose)
.doOnError(err -> LOG.error("Error during onClose.", err))
.retryWhen(reconnectSpec.apply("client-close"))
.doFirst(() -> LOG.info("Connected on client side."))
.doOnTerminate(() -> LOG.info("Connection closed on client side."))
.repeat()
.subscribe();

return client;
}

private static Mono<RSocket> createClientLazy() {
LOG.info("Connecting....");

Function<String, RetryBackoffSpec> reconnectSpec =
reason ->
Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(10L))
.doBeforeRetry(retrySignal -> LOG.info("Reconnecting. Reason: {}", reason));

return RSocketConnector.create()
.fragment(16384)
.reconnect(reconnectSpec.apply("connector-close"))
.keepAlive(Duration.ofMillis(100L), Duration.ofMillis(900L))
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)));

// RSocketClient client = RSocketClient.from(rsocketMono);

// client
// .source()
// .doOnNext(r -> LOG.info("Got RSocket"))
// .flatMap(RSocket::onClose)
// .doOnError(err -> LOG.error("Error during onClose.", err))
// .retryWhen(reconnectSpec.apply("client-close"))
// .doFirst(() -> LOG.info("Connected on client side."))
// .doOnTerminate(() -> LOG.info("Connection closed on client side."))
// .repeat()
// .subscribe();

// return client;
}

public static class MyServerRsocket implements RSocket {

@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just("Pong").map(DefaultPayload::create);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<logger name="io.rsocket.FrameLogger" level="INFO"/>
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
<logger name="io.rsocket.transport.netty" level="INFO"/>
<logger name="io.rsocket.FrameLogger" level="INFO"/>
<logger name="io.rsocket.core.RSocketRequester" level="DEBUG"/>
<logger name="io.rsocket.core.RSocketResponder" level="DEBUG"/>
<logger name="io.rsocket.test.TransportTest" level="DEBUG"/>
Expand Down

0 comments on commit 6ff8f71

Please sign in to comment.