Skip to content

Adding StreamableHttpServerTransportProvider class along with unit & integration tests #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

ZachGerman
Copy link

@ZachGerman ZachGerman commented Jun 2, 2025

Not planned:

  • Backward-compatible endpoint combo

Motivation and Context

Trying to reach spec parity with TS and Python for Java. Will continue working on other aspects of this.

How Has This Been Tested?

Unit tests and integ tests using the SHTTP web client in mcp-spring.

Breaking Changes

N/A

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey 👋 Thanks for a comprehensive PR! I did my first round focusing on the main themes. Happy to offer guidance to cover the essential aspects (simple/stateful servers, multiple streams per session, lifecycle) if you'd like to push this forward.

@ZachGerman
Copy link
Author

Thank you very much for all of the input @chemicL! I will begin making changes accordingly this afternoon.

@ZachGerman
Copy link
Author

Today I'm targeting origin header validation and moving the dedicated GET stream to the StreamableHttpSession class, then adding an integ test for GET on /mcp to start the listening stream.
After that, I believe we should have all core functionality except sessionless and proper SSE response upgrade logic.

@sivankri
Copy link

Hi @ZachGerman I tried using your StreamableHttpServerTransportProvider.java file + java MCP SDK 0.10.0. My backend server is Jetty 12. The request from MCP Interceptor hangs in the below call.

return streamSession.handle(message).then(Mono.just(responseType)).onErrorReturn(ResponseType.IMMEDIATE);

This is the actuall line which gets blocked
--McpServerSession.java --> return var10000.flatMap(var10001::sendMessage);

Can you please check this?

@tzolov
Copy link
Contributor

tzolov commented Jun 26, 2025

Today I'm targeting origin header validation and moving the dedicated GET stream to the StreamableHttpSession class, then adding an integ test for GET on /mcp to start the listening stream. After that, I believe we should have all core functionality except sessionless and proper SSE response upgrade logic.

@ZachGerman what do you mean by origin header validation? I hope it is not overlapping with the #284

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few more comments.

if (sessionId == null) {
response.setContentType(APPLICATION_JSON);
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
response.getWriter().write(createErrorJson("Session ID missing in request header"));
Copy link
Member

@chemicL chemicL Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modelcontextprotocol/modelcontextprotocol#282 please check this issue. I'm leaning towards separating the JSON-RPC lifecycle from the lower level transport lifecycle. In that world, GET and POST and session concepts are not at the JSON-RPC layer, so the lifecycle does not apply. In fact, the previous SSE transport did begin with a HTTP GET request before the initialization request could have been sent. For the stateless servers it should definitely be improved in the spec to mention that initialization is not required before other requests.

[EDIT]

For the record, the above was wishful thinking. I agree we are only able to generate the session ID upon POST of the initialize request. The spec is in fact melding the json-rpc layer with http, so the session ID gets generated only when processing a initialization request:

A server using the Streamable HTTP transport MAY assign a session ID at initialization time, by including it in an Mcp-Session-Id header on the HTTP response containing the InitializeResult

// Subscribe to the SSE stream and write events to the response
sseStream.getEventFlux().doOnNext(event -> {
try {
if (event.id() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but do we emit events with no ID ? Reading the code I don't see a situation like this. My understanding of the mentioned spec is that if the ID is missing then the client would not use this id for Last-Event-ID tracking, but we are on the server side and we always generate an ID.

@ZachGerman
Copy link
Author

ZachGerman commented Jun 26, 2025

@sivankri: Changing the related logic after my meeting with Dariusz this morning, so (hopefully) your issue goes away with the new response type mapping.
@tzolov: It's related, but I don't think it's overlapping as I'm just adding the ability to set a list of allowed origins to the server transport provider and enforcing it if it's set.

@viyaviya
Copy link

merge commit 6c4830b is bad

@ZachGerman
Copy link
Author

ZachGerman commented Jun 27, 2025

merge commit 6c4830b is bad

Oops! Thanks for the heads up! Fixed!

@ZachGerman ZachGerman force-pushed the StreamableHttpServerTransportProvider branch from 6c4830b to 7817da6 Compare June 27, 2025 16:45
@ZachGerman
Copy link
Author

ZachGerman commented Jun 27, 2025

@sivankri the new logic added streamTools to McpServerFeatures, used in the Async constructor, which facilitates a list of tool specifications that return Flux instead of Mono via the new AsyncStreamingToolSpecification record type and uses the return type of the call to differentiate between direct-HTTP and SSE-stream responses.

@sivankri
Copy link

sivankri commented Jul 1, 2025

@ZachGerman I tried running my mcp remote server against the mcp sdk generated from your branch. When i used the MCP inspector tool by setting query parameter transport=streamable-http, i could not get the connection established. Can you please help me with this?

@ZachGerman ZachGerman changed the title Adding StreamableHttpServerTransportProvider class and unit tests Adding StreamableHttpServerTransportProvider class along with unit & integration tests Jul 1, 2025
@ZachGerman
Copy link
Author

ZachGerman commented Jul 1, 2025

@sivankri I'm unfamiliar with the MCP inspector tool, but I imagine it's related to the fact that this JDK has public static final String LATEST_PROTOCOL_VERSION = "2024-11-05"; in the McpSchema spec class & that is currently used for all server instantiation. You can update it locally and the server from my branch, as well as @chemicL's webClient, should work with the update, but it will break a lot of tests. I plan on tackling protocol version negotiation soon if nobody else does.

@ZachGerman ZachGerman force-pushed the StreamableHttpServerTransportProvider branch from 77333d1 to 244208d Compare July 8, 2025 06:18
@ZachGerman ZachGerman force-pushed the StreamableHttpServerTransportProvider branch from 244208d to 25ff3dc Compare July 8, 2025 06:40
@ZachGerman
Copy link
Author

ZachGerman commented Jul 8, 2025

Once I realized that I needed to identify the transports in the exchange's session from the tool handlers, I realized I need to either pass the entire JSONRPCRequest to the handler (instead of the request.params()), or add a third request.id() parameter to tool handlers.

Either way, it brought more attention to the fact that we were using Object for JSONRPCRequest/Response id, so I added a little sub-class in the McpSchema to enforce the "(String | Number) && Non-null" requirements of that field.

Tomorrow I will be making the JSONRPCRequest id available to tool handlers and it can be passed to the McpAsyncServerExchange class methods in order to route notifications and requests to the related SSE stream.


Edit: Ended up going with a per-request exchange instance instead of passing transport ID to tool call handlers. This allows tool implementers to not worry about transport routing with their exchange method calls.

@ZachGerman
Copy link
Author

Removed streamTools in favor of upgrading transport instances to SSE based on McpAsyncServerExchange method usage.
Also added a more extensive suite of integration tests.
Will speak to @chemicL about adding a config option at the server level to declare default transport type, as I believe he wants that to be SSE for session-bound requests.
Also planning to make event histories some kind of append-only ordered iterable struct, as using Map.keySet().stream().sorted() when replaying is a bit gross in hind-sight.

Comment on lines +123 to +124
/** Sessions map, keyed by Session ID */
private static final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be static in order to allow having multiple independent MCP Server instances on the same Java Server. For example if we want to support having an MCP on paths /mcp/v1/ and /mcp/v2/.

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for iterating over the Streamable HTTP server-side. I left a few comments. For now, please don't push more updates. Based on our exploratory work I am working on a parallel branch to have another transport type. I will focus on the WebFlux server and will let you know once I have all the architectural changes in so you can incorporate them into your PR and we can then make further progress.

Comment on lines +251 to +253
if (response.statusCode().is2xxSuccessful()
&& message instanceof JSONRPCNotification notification) {
if (notification.method().equals("notifications/initialized")) {
Copy link
Member

@chemicL chemicL Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not required to wait until the notification is sent out. If you see something in the spec, please let me know, but what I can see is this:

A server using the Streamable HTTP transport MAY assign a session ID at initialization time, by including it in an Mcp-Session-Id header on the HTTP response containing the InitializeResult

Copy link
Author

@ZachGerman ZachGerman Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we split the server session class, should we switch them all to mark the session as initialized once they send the InitializeResult response? (They currently only mark them as initialized upon receiving the initialized notification) Without that change or making the client wait to get the 200 from its initialized notification, we will have clients trying to establish streams prior to initialization being complete.

*/
@JsonSerialize(using = McpId.Serializer.class)
@JsonDeserialize(using = McpId.Deserializer.class)
public static final class McpId {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand this change is not strictly required for the Streamable HTTP Server implementation to work, so I'd prefer this to not be a part of this already big PR.

* This is for tool writers to use if they want to send their tool response over an
* SSE stream without using any other McpAsyncServerExchange methods
*/
public void upgradeTransport() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is not a correct way to go about it. I'd suggest that if in the future someone asks for dynamic upgrade we should do it differently by providing a hook to inspect the request first, instruct the consuming session and transport that it will not only be a response but a bit more, and only then feed an exchange that has the SSE stream producer to feed with user interactions, but not in a stateful way like this.

For now I suggest we keep it simple as other SDKs:

  • notifications and responses -> handle as 201 ACCEPTED
  • requests
    • sessionless -> application/json response expected
    • with sessions -> always upgrade to SSE

@@ -92,14 +121,17 @@ public McpSchema.Implementation getClientInfo() {
* Specification</a>
*/
public Mono<McpSchema.CreateMessageResult> createMessage(McpSchema.CreateMessageRequest createMessageRequest) {

establishSseStream();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods returning Flux/Mono need to be lazy, even if they have side-effects. Another issue with this is explained in the comment above.

* @param <T> the type of the response that is expected as a result of handling the
* request.
*/
public interface StreamingRequestHandler<T> extends RequestHandler<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused and should be removed I suppose?

Copy link
Author

@ZachGerman ZachGerman Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Forgot to remove this when I removed streamTools.

* with a connected client.
*/
@FunctionalInterface
public interface StreamableHttpSessionFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the boolean flag causes adding another factory and infects the transports with empty implementations. Unfortunately, we will need a parallel type hierarchy.

@@ -117,10 +118,6 @@ public static Builder builder(WebClient.Builder webClientBuilder) {
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
return Mono.deferContextual(ctx -> {
this.handler.set(handler);
if (openConnectionOnStartup) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this can be removed but we also have to remove it from the constructor.


@Override
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
// Required but not used for this implementation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't implement it this way. This transport needs to be of a different kind.


// Only set content type for requests
if (message instanceof McpSchema.JSONRPCRequest) {
response.setContentType(APPLICATION_JSON);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here you set it to application/json but later you let the user change this to text/event-stream. Unfortunately with other transports it might not be as straightforward and the builders are more type-safe and will forbid rolling back in such a way. That's why I am proposing to keep it simple for now as in another comment and always upgrade to SSE for requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants