Skip to content

Commit cec77b6

Browse files
committed
Support request body
1 parent 69f3a76 commit cec77b6

File tree

4 files changed

+112
-25
lines changed

4 files changed

+112
-25
lines changed

.gitignore

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
HELP.md
2+
target/
3+
!.mvn/wrapper/maven-wrapper.jar
4+
!**/src/main/**/target/
5+
!**/src/test/**/target/
6+
7+
### STS ###
8+
.apt_generated
9+
.classpath
10+
.factorypath
11+
.project
12+
.settings
13+
.springBeans
14+
.sts4-cache
15+
16+
### IntelliJ IDEA ###
17+
.idea
18+
*.iws
19+
*.iml
20+
*.ipr
21+
22+
### NetBeans ###
23+
/nbproject/private/
24+
/nbbuild/
25+
/dist/
26+
/nbdist/
27+
/.nb-gradle/
28+
build/
29+
!**/src/main/**/build/
30+
!**/src/test/**/build/
31+
32+
### VS Code ###
33+
.vscode/

tsunagu-client/src/main/java/am/ik/tsunagu/TsunaguConnector.java

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package am.ik.tsunagu;
22

3+
import java.io.IOException;
34
import java.io.UncheckedIOException;
45
import java.net.URI;
56
import java.time.Duration;
67
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.function.Function;
79

810
import com.fasterxml.jackson.core.JsonProcessingException;
911
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -13,6 +15,7 @@
1315
import io.rsocket.Payload;
1416
import io.rsocket.RSocket;
1517
import io.rsocket.util.DefaultPayload;
18+
import org.reactivestreams.Publisher;
1619
import org.slf4j.Logger;
1720
import org.slf4j.LoggerFactory;
1821
import reactor.core.publisher.Flux;
@@ -24,6 +27,7 @@
2427
import org.springframework.messaging.rsocket.RSocketRequester;
2528
import org.springframework.messaging.rsocket.RSocketRequester.Builder;
2629
import org.springframework.stereotype.Component;
30+
import org.springframework.web.reactive.function.client.ClientResponse;
2731
import org.springframework.web.reactive.function.client.WebClient;
2832
import org.springframework.web.util.UriComponentsBuilder;
2933

@@ -63,25 +67,52 @@ public Flux<Payload> requestStream(Payload payload) {
6367
return this.webClient.method(httpRequestMetadata.getMethod())
6468
.uri(uri)
6569
.headers(httpHeaders -> httpHeaders.addAll(httpRequestMetadata.getHeaders()))
66-
.exchangeToFlux(response -> {
67-
try {
68-
final HttpResponseMetadata httpResponseMetadata = new HttpResponseMetadata(response.statusCode(), response.headers().asHttpHeaders());
69-
final byte[] httpResponseMetadataBytes = this.objectMapper.writeValueAsBytes(httpResponseMetadata);
70-
final AtomicBoolean headerSent = new AtomicBoolean(false);
71-
return response.bodyToFlux(ByteBuf.class)
72-
.map(body -> DefaultPayload.create(body, headerSent.compareAndSet(false, true) ? Unpooled.copiedBuffer(httpResponseMetadataBytes) : Unpooled.EMPTY_BUFFER))
73-
.switchIfEmpty(Flux.just(DefaultPayload.create(new byte[] {}, httpResponseMetadataBytes)));
74-
}
75-
catch (JsonProcessingException e) {
76-
throw new UncheckedIOException(e);
77-
}
78-
});
70+
.exchangeToFlux(this.handleResponse());
7971
}
80-
catch (Throwable e) {
72+
catch (IOException e) {
8173
return Flux.error(e);
8274
}
8375
}
8476

77+
@Override
78+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
79+
return Flux.from(payloads)
80+
.switchOnFirst((signal, flux) -> {
81+
try {
82+
final byte[] httpRequestMetadataBytes = ByteBufUtil.getBytes(signal.get().metadata());
83+
final HttpRequestMetadata httpRequestMetadata = this.objectMapper.readValue(httpRequestMetadataBytes, HttpRequestMetadata.class);
84+
final URI uri = UriComponentsBuilder.fromUri(httpRequestMetadata.getUri())
85+
.uri(this.props.getUpstream())
86+
.build()
87+
.toUri();
88+
return this.webClient.method(httpRequestMetadata.getMethod())
89+
.uri(uri)
90+
.body(flux.map(Payload::data), ByteBuf.class)
91+
.headers(httpHeaders -> httpHeaders.addAll(httpRequestMetadata.getHeaders()))
92+
.exchangeToFlux(this.handleResponse());
93+
}
94+
catch (IOException e) {
95+
return Flux.error(e);
96+
}
97+
});
98+
}
99+
100+
Function<ClientResponse, Flux<Payload>> handleResponse() {
101+
return response -> {
102+
try {
103+
final HttpResponseMetadata httpResponseMetadata = new HttpResponseMetadata(response.statusCode(), response.headers().asHttpHeaders());
104+
final byte[] httpResponseMetadataBytes = this.objectMapper.writeValueAsBytes(httpResponseMetadata);
105+
final AtomicBoolean headerSent = new AtomicBoolean(false);
106+
return response.bodyToFlux(ByteBuf.class)
107+
.map(body -> DefaultPayload.create(body, headerSent.compareAndSet(false, true) ? Unpooled.copiedBuffer(httpResponseMetadataBytes) : Unpooled.EMPTY_BUFFER))
108+
.switchIfEmpty(Mono.fromCallable(() -> DefaultPayload.create(new byte[] {}, httpResponseMetadataBytes)));
109+
}
110+
catch (JsonProcessingException e) {
111+
throw new UncheckedIOException(e);
112+
}
113+
};
114+
}
115+
85116
@Override
86117
public void run(String... args) throws Exception {
87118
this.connect();

tsunagu-server/src/main/java/am/ik/tsunagu/HttpRequestMetadata.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public HttpHeaders getHeaders() {
3535
return headers;
3636
}
3737

38+
public boolean hasBody() {
39+
return this.method == HttpMethod.POST || this.method == HttpMethod.PUT || this.method == HttpMethod.PATCH;
40+
}
41+
3842
@Override
3943
public String toString() {
4044
return "{method=" + method +

tsunagu-server/src/main/java/am/ik/tsunagu/TsunaguController.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,25 @@
55
import java.util.Collections;
66
import java.util.Set;
77
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.function.BiFunction;
89

910
import com.fasterxml.jackson.databind.ObjectMapper;
1011
import io.netty.buffer.ByteBufUtil;
12+
import io.netty.buffer.PooledByteBufAllocator;
13+
import io.netty.buffer.Unpooled;
1114
import io.rsocket.Payload;
15+
import io.rsocket.core.RSocketClient;
1216
import io.rsocket.util.DefaultPayload;
17+
import org.reactivestreams.Publisher;
1318
import org.slf4j.Logger;
1419
import org.slf4j.LoggerFactory;
1520
import reactor.core.publisher.Flux;
1621
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.Signal;
1723

1824
import org.springframework.core.io.buffer.DataBuffer;
19-
import org.springframework.core.io.buffer.DataBufferFactory;
2025
import org.springframework.core.io.buffer.DataBufferUtils;
21-
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
26+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
2227
import org.springframework.http.HttpStatus;
2328
import org.springframework.http.ResponseEntity;
2429
import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -37,22 +42,36 @@ public class TsunaguController {
3742

3843
private final ObjectMapper objectMapper;
3944

40-
private final DataBufferFactory dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
45+
private final NettyDataBufferFactory dataBufferFactory = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
4146

4247
public TsunaguController(ObjectMapper objectMapper) {
4348
this.objectMapper = objectMapper;
4449
}
4550

4651
@RequestMapping(path = "**")
47-
public Mono<ResponseEntity<?>> proxy(ServerHttpRequest request) {
48-
final RSocketRequester requester = this.getRequester();
52+
public Mono<ResponseEntity<?>> proxy(ServerHttpRequest request) throws Exception {
53+
final RSocketClient rsocketClient = this.getRequester().rsocketClient();
4954
final HttpRequestMetadata httpRequestMetadata = new HttpRequestMetadata(request.getMethod(), request.getURI(), request.getHeaders());
50-
final Mono<Payload> requestPayload = Mono.fromCallable(() -> this.objectMapper.writeValueAsBytes(httpRequestMetadata))
51-
.map(metadata -> DefaultPayload.create(new byte[] {}, metadata));
52-
final Flux<Payload> responseStream = requester.rsocketClient().requestStream(requestPayload);
53-
return responseStream.<ResponseEntity<?>>switchOnFirst((signal, flux) -> {
55+
final byte[] metadata = this.objectMapper.writeValueAsBytes(httpRequestMetadata);
56+
final Flux<Payload> responseStream;
57+
if (httpRequestMetadata.hasBody()) {
58+
final Flux<Payload> requestPayload = request.getBody()
59+
.map(NettyDataBufferFactory::toByteBuf)
60+
.map(data -> DefaultPayload.create(data, Unpooled.copiedBuffer(metadata)))
61+
.switchIfEmpty(Mono.fromCallable(() -> DefaultPayload.create(new byte[] {}, metadata)));
62+
responseStream = rsocketClient.requestChannel(requestPayload);
63+
}
64+
else {
65+
final Mono<Payload> requestPayload = Mono.just(DefaultPayload.create(new byte[] {}, metadata));
66+
responseStream = rsocketClient.requestStream(requestPayload);
67+
}
68+
return responseStream.switchOnFirst(this.handleResponse(httpRequestMetadata)).single();
69+
}
70+
71+
BiFunction<Signal<? extends Payload>, Flux<Payload>, Publisher<? extends ResponseEntity<?>>> handleResponse(HttpRequestMetadata httpRequestMetadata) {
72+
return (signal, flux) -> {
5473
final byte[] httpResponseMetadataBytes = ByteBufUtil.getBytes(signal.get().metadata());
55-
final Mono<DataBuffer> bodyMono = DataBufferUtils.join(flux.map(payload -> dataBufferFactory.wrap(payload.getData())));
74+
final Mono<DataBuffer> bodyMono = DataBufferUtils.join(flux.map(payload -> dataBufferFactory.wrap(payload.data())));
5675
try {
5776
final HttpResponseMetadata httpResponseMetadata = this.objectMapper.readValue(httpResponseMetadataBytes, HttpResponseMetadata.class);
5877
log.info("\nrequest:\t{}\nresponse:\t{}", httpRequestMetadata, httpResponseMetadata);
@@ -63,7 +82,7 @@ public Mono<ResponseEntity<?>> proxy(ServerHttpRequest request) {
6382
catch (IOException e) {
6483
throw new UncheckedIOException(e);
6584
}
66-
}).single();
85+
};
6786
}
6887

6988
private RSocketRequester getRequester() {

0 commit comments

Comments
 (0)