-
Notifications
You must be signed in to change notification settings - Fork 546
Adding StreamableHttpServerTransportProvider class along with unit & integration tests #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Adding StreamableHttpServerTransportProvider class along with unit & integration tests #290
Conversation
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey 👋 Thanks for a comprehensive PR! I did my first round focusing on the main themes. Happy to offer guidance to cover the essential aspects (simple/stateful servers, multiple streams per session, lifecycle) if you'd like to push this forward.
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
Thank you very much for all of the input @chemicL! I will begin making changes accordingly this afternoon. |
Today I'm targeting origin header validation and moving the dedicated GET stream to the StreamableHttpSession class, then adding an integ test for GET on /mcp to start the listening stream. |
Hi @ZachGerman I tried using your StreamableHttpServerTransportProvider.java file + java MCP SDK 0.10.0. My backend server is Jetty 12. The request from MCP Interceptor hangs in the below call. return streamSession.handle(message).then(Mono.just(responseType)).onErrorReturn(ResponseType.IMMEDIATE); This is the actuall line which gets blocked Can you please check this? |
@ZachGerman what do you mean by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few more comments.
if (sessionId == null) { | ||
response.setContentType(APPLICATION_JSON); | ||
response.setStatus(HttpServletResponse.SC_BAD_REQUEST); | ||
response.getWriter().write(createErrorJson("Session ID missing in request header")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modelcontextprotocol/modelcontextprotocol#282 please check this issue. I'm leaning towards separating the JSON-RPC lifecycle from the lower level transport lifecycle. In that world, GET and POST and session concepts are not at the JSON-RPC layer, so the lifecycle does not apply. In fact, the previous SSE transport did begin with a HTTP GET request before the initialization request could have been sent. For the stateless servers it should definitely be improved in the spec to mention that initialization is not required before other requests.
[EDIT]
For the record, the above was wishful thinking. I agree we are only able to generate the session ID upon POST of the initialize request. The spec is in fact melding the json-rpc layer with http, so the session ID gets generated only when processing a initialization request:
A server using the Streamable HTTP transport MAY assign a session ID at initialization time, by including it in an
Mcp-Session-Id
header on the HTTP response containing theInitializeResult
// Subscribe to the SSE stream and write events to the response | ||
sseStream.getEventFlux().doOnNext(event -> { | ||
try { | ||
if (event.id() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but do we emit events with no ID ? Reading the code I don't see a situation like this. My understanding of the mentioned spec is that if the ID is missing then the client would not use this id for Last-Event-ID tracking, but we are on the server side and we always generate an ID.
...ain/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java
Outdated
Show resolved
Hide resolved
@sivankri: Changing the related logic after my meeting with Dariusz this morning, so (hopefully) your issue goes away with the new response type mapping. |
merge commit 6c4830b is bad |
Oops! Thanks for the heads up! Fixed! |
6c4830b
to
7817da6
Compare
@sivankri the new logic added |
...ain/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java
Outdated
Show resolved
Hide resolved
mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncStreamableHttpServer.java
Outdated
Show resolved
Hide resolved
mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java
Outdated
Show resolved
Hide resolved
mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncStreamableHttpServer.java
Outdated
Show resolved
Hide resolved
mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java
Outdated
Show resolved
Hide resolved
@ZachGerman I tried running my mcp remote server against the mcp sdk generated from your branch. When i used the MCP inspector tool by setting query parameter transport=streamable-http, i could not get the connection established. Can you please help me with this? |
@sivankri I'm unfamiliar with the MCP inspector tool, but I imagine it's related to the fact that this JDK has |
mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java
Outdated
Show resolved
Hide resolved
… with Last-Event-Id header
77333d1
to
244208d
Compare
244208d
to
25ff3dc
Compare
Once I realized that I needed to identify the transports in the exchange's session from the tool handlers, I realized I need to either pass the entire JSONRPCRequest to the handler (instead of the request.params()), or add a third request.id() parameter to tool handlers. Either way, it brought more attention to the fact that we were using Tomorrow I will be making the JSONRPCRequest Edit: Ended up going with a per-request exchange instance instead of passing transport ID to tool call handlers. This allows tool implementers to not worry about transport routing with their exchange method calls. |
…age to their related transport streams
… Added extensive integration testing.
Removed streamTools in favor of upgrading transport instances to SSE based on McpAsyncServerExchange method usage. |
/** Sessions map, keyed by Session ID */ | ||
private static final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be static in order to allow having multiple independent MCP Server instances on the same Java Server. For example if we want to support having an MCP on paths /mcp/v1/
and /mcp/v2/
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for iterating over the Streamable HTTP server-side. I left a few comments. For now, please don't push more updates. Based on our exploratory work I am working on a parallel branch to have another transport type. I will focus on the WebFlux server and will let you know once I have all the architectural changes in so you can incorporate them into your PR and we can then make further progress.
if (response.statusCode().is2xxSuccessful() | ||
&& message instanceof JSONRPCNotification notification) { | ||
if (notification.method().equals("notifications/initialized")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not required to wait until the notification is sent out. If you see something in the spec, please let me know, but what I can see is this:
A server using the Streamable HTTP transport MAY assign a session ID at initialization time, by including it in an
Mcp-Session-Id
header on the HTTP response containing theInitializeResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we split the server session class, should we switch them all to mark the session as initialized once they send the InitializeResult
response? (They currently only mark them as initialized upon receiving the initialized
notification) Without that change or making the client wait to get the 200 from its initialized
notification, we will have clients trying to establish streams prior to initialization being complete.
*/ | ||
@JsonSerialize(using = McpId.Serializer.class) | ||
@JsonDeserialize(using = McpId.Deserializer.class) | ||
public static final class McpId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand this change is not strictly required for the Streamable HTTP Server implementation to work, so I'd prefer this to not be a part of this already big PR.
* This is for tool writers to use if they want to send their tool response over an | ||
* SSE stream without using any other McpAsyncServerExchange methods | ||
*/ | ||
public void upgradeTransport() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this is not a correct way to go about it. I'd suggest that if in the future someone asks for dynamic upgrade we should do it differently by providing a hook to inspect the request first, instruct the consuming session and transport that it will not only be a response but a bit more, and only then feed an exchange that has the SSE stream producer to feed with user interactions, but not in a stateful way like this.
For now I suggest we keep it simple as other SDKs:
- notifications and responses -> handle as 201 ACCEPTED
- requests
- sessionless -> application/json response expected
- with sessions -> always upgrade to SSE
@@ -92,14 +121,17 @@ public McpSchema.Implementation getClientInfo() { | |||
* Specification</a> | |||
*/ | |||
public Mono<McpSchema.CreateMessageResult> createMessage(McpSchema.CreateMessageRequest createMessageRequest) { | |||
|
|||
establishSseStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods returning Flux/Mono need to be lazy, even if they have side-effects. Another issue with this is explained in the comment above.
* @param <T> the type of the response that is expected as a result of handling the | ||
* request. | ||
*/ | ||
public interface StreamingRequestHandler<T> extends RequestHandler<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unused and should be removed I suppose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Forgot to remove this when I removed streamTools.
* with a connected client. | ||
*/ | ||
@FunctionalInterface | ||
public interface StreamableHttpSessionFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the boolean flag causes adding another factory and infects the transports with empty implementations. Unfortunately, we will need a parallel type hierarchy.
@@ -117,10 +118,6 @@ public static Builder builder(WebClient.Builder webClientBuilder) { | |||
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) { | |||
return Mono.deferContextual(ctx -> { | |||
this.handler.set(handler); | |||
if (openConnectionOnStartup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this can be removed but we also have to remove it from the constructor.
|
||
@Override | ||
public void setSessionFactory(McpServerSession.Factory sessionFactory) { | ||
// Required but not used for this implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't implement it this way. This transport needs to be of a different kind.
|
||
// Only set content type for requests | ||
if (message instanceof McpSchema.JSONRPCRequest) { | ||
response.setContentType(APPLICATION_JSON); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here you set it to application/json but later you let the user change this to text/event-stream. Unfortunately with other transports it might not be as straightforward and the builders are more type-safe and will forbid rolling back in such a way. That's why I am proposing to keep it simple for now as in another comment and always upgrade to SSE for requests.
Not planned:
Motivation and Context
Trying to reach spec parity with TS and Python for Java. Will continue working on other aspects of this.
How Has This Been Tested?
Unit tests and integ tests using the SHTTP web client in mcp-spring.
Breaking Changes
N/A
Types of changes
Checklist