Skip to content

DRAFT - HttpRequest.Builder customizer #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Customize {@link HttpRequest.Builder} before sending out SSE or Streamable HTTP
* transport.
* <p>
* When used in a non-blocking context, implementations MUST be non-blocking.
*/
public interface AsyncHttpRequestCustomizer {

Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
@Nullable String body);

AsyncHttpRequestCustomizer NOOP = new Noop();

/**
* Wrap a sync implementation in an async wrapper.
* <p>
* Do NOT use in a non-blocking context.
*/
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
return (builder, method, uri, body) -> Mono.defer(() -> {
customizer.customize(builder, method, uri, body);
return Mono.just(builder);
});
}

class Noop implements AsyncHttpRequestCustomizer {

@Override
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
String body) {
return Mono.just(builder);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class HttpClientSseClientTransport implements McpClientTransport {
*/
protected final Sinks.One<String> messageEndpointSink = Sinks.one();

// TODO
private final AsyncHttpRequestCustomizer httpRequestCustomizer;

/**
* Creates a new transport instance with default HTTP client and object mapper.
* @param baseUri the base URI of the MCP server
Expand Down Expand Up @@ -172,18 +175,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
* @param objectMapper the object mapper for JSON serialization/deserialization
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
@Deprecated(forRemoval = true)
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper) {
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
}

/**
* Creates a new transport instance with custom HTTP client builder, object mapper,
* and headers.
* @param httpClient the HTTP client to use
* @param requestBuilder the HTTP request builder to use
* @param baseUri the base URI of the MCP server
* @param sseEndpoint the SSE endpoint path
* @param objectMapper the object mapper for JSON serialization/deserialization
* @param httpRequestCustomizer customizer for the requestBuilder before sending
* requests
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
*/
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
Assert.notNull(httpClient, "httpClient must not be null");
Assert.notNull(requestBuilder, "requestBuilder must not be null");
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
this.baseUri = URI.create(baseUri);
this.sseEndpoint = sseEndpoint;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.httpRequestCustomizer = httpRequestCustomizer;
}

/**
Expand Down Expand Up @@ -213,6 +236,8 @@ public static class Builder {
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.header("Content-Type", "application/json");

private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;

/**
* Creates a new builder instance.
*/
Expand Down Expand Up @@ -310,96 +335,111 @@ public Builder objectMapper(ObjectMapper objectMapper) {
return this;
}

/**
* In reactive, DONT USE THIS. Use AsyncHttpRequestCustomizer.
*/
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
return this;
}

public Builder httpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
return this;
}

/**
* Builds a new {@link HttpClientSseClientTransport} instance.
* @return a new transport instance
*/
public HttpClientSseClientTransport build() {
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
objectMapper);
objectMapper, httpRequestCustomizer);
}

}

@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);

return Mono.create(sink -> {

HttpRequest request = requestBuilder.copy()
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
return Mono
.just(requestBuilder.copy()
.uri(uri)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.GET()
.build();

Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.exceptionallyCompose(e -> {
sseSink.error(e);
return CompletableFuture.failedFuture(e);
}))
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
.flatMap(responseEvent -> {
if (isClosing) {
return Mono.empty();
}

int statusCode = responseEvent.responseInfo().statusCode();
.GET())
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null)))
.map(HttpRequest.Builder::build)
.flatMap(request -> Mono.create(sink -> {
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
.exceptionallyCompose(e -> {
sseSink.error(e);
return CompletableFuture.failedFuture(e);
}))
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
.flatMap(responseEvent -> {
if (isClosing) {
return Mono.empty();
}

if (statusCode >= 200 && statusCode < 300) {
try {
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
String messageEndpointUri = responseEvent.sseEvent().data();
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
int statusCode = responseEvent.responseInfo().statusCode();

if (statusCode >= 200 && statusCode < 300) {
try {
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
String messageEndpointUri = responseEvent.sseEvent().data();
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
sink.success();
return Flux.empty(); // No further processing
// needed
}
else {
sink.error(new McpError("Failed to handle SSE endpoint event"));
}
}
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseEvent.sseEvent().data());
sink.success();
return Flux.empty(); // No further processing needed
return Flux.just(message);
}
else {
sink.error(new McpError("Failed to handle SSE endpoint event"));
logger.error("Received unrecognized SSE event type: {}",
responseEvent.sseEvent().event());
sink.error(new McpError("Received unrecognized SSE event type: "
+ responseEvent.sseEvent().event()));
}
}
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseEvent.sseEvent().data());
sink.success();
return Flux.just(message);
}
else {
logger.error("Received unrecognized SSE event type: {}",
responseEvent.sseEvent().event());
sink.error(new McpError(
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
catch (IOException e) {
logger.error("Error processing SSE event", e);
sink.error(new McpError("Error processing SSE event"));
}
}
catch (IOException e) {
logger.error("Error processing SSE event", e);
sink.error(new McpError("Error processing SSE event"));
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Failed to send message: " + responseEvent));

})
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
.onErrorComplete(t -> {
if (!isClosing) {
logger.warn("SSE stream observed an error", t);
sink.error(t);
}
}
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Failed to send message: " + responseEvent));

})
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
.onErrorComplete(t -> {
if (!isClosing) {
logger.warn("SSE stream observed an error", t);
sink.error(t);
}
return true;
})
.doFinally(s -> {
Disposable ref = this.sseSubscription.getAndSet(null);
if (ref != null && !ref.isDisposed()) {
ref.dispose();
}
})
.contextWrite(sink.contextView())
.subscribe();
return true;
})
.doFinally(s -> {
Disposable ref = this.sseSubscription.getAndSet(null);
if (ref != null && !ref.isDisposed()) {
ref.dispose();
}
})
.contextWrite(sink.contextView())
.subscribe();

this.sseSubscription.set(connection);
});
this.sseSubscription.set(connection);
}));
}

/**
Expand Down Expand Up @@ -455,13 +495,11 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {

private Mono<HttpResponse<Void>> sendHttpPost(final String endpoint, final String body) {
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
final HttpRequest request = this.requestBuilder.copy()
.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();

// TODO: why discard the body?
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()));
return Mono.just(this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body)))
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body)))
.map(HttpRequest.Builder::build)
// TODO: why discard the body?
.flatMap(request -> Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())));
}

/**
Expand Down
Loading
Loading