From dc4f83eaa2c4ee794937a7d83fe3cac6c312d548 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sun, 8 Jun 2025 16:15:03 +0900 Subject: [PATCH 01/15] Add docs and message field for ProgressNotification --- .../modelcontextprotocol/spec/McpSchema.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 59713094..56c723ef 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -23,6 +23,7 @@ import io.modelcontextprotocol.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.util.annotation.Nullable; /** * Based on the JSON-RPC 2.0 @@ -872,10 +873,15 @@ private static JsonSchema parseSchema(String schema) { @JsonIgnoreProperties(ignoreUnknown = true) public record CallToolRequest(// @formatter:off @JsonProperty("name") String name, - @JsonProperty("arguments") Map arguments) implements Request { + @JsonProperty("arguments") Map arguments, + @Nullable @JsonProperty("_meta") Map _meta) implements Request { public CallToolRequest(String name, String jsonArguments) { - this(name, parseJsonArguments(jsonArguments)); + this(name, parseJsonArguments(jsonArguments), null); + } + + public CallToolRequest(String name, Map arguments) { + this(name, arguments, null); } private static Map parseJsonArguments(String jsonArguments) { @@ -1309,11 +1315,23 @@ public record PaginatedResult(@JsonProperty("nextCursor") String nextCursor) { // --------------------------- // Progress and Logging // --------------------------- + + /** + * The Model Context Protocol (MCP) supports optional progress tracking for long-running + * operations through notification messages. Either side can send progress notifications + * to provide updates about operation status. + * + * @param progressToken The original progress token + * @param progress The current progress value so far + * @param total An optional “total” value + * @param message An optional “message” value + */ @JsonIgnoreProperties(ignoreUnknown = true) public record ProgressNotification(// @formatter:off @JsonProperty("progressToken") String progressToken, - @JsonProperty("progress") double progress, - @JsonProperty("total") Double total) { + @JsonProperty("progress") Double progress, + @JsonProperty("total") Double total, + @JsonProperty("message") String message) { }// @formatter:on /** From a675e8c7fa71c462aaf1a108c63566395dcedd74 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sun, 8 Jun 2025 16:20:34 +0900 Subject: [PATCH 02/15] add notifications/progress constant --- mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 56c723ef..cb538edd 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -58,6 +58,8 @@ private McpSchema() { public static final String METHOD_PING = "ping"; + public static final String METHOD_NOTIFICATION_PROGRESS = "notifications/progress"; + // Tool Methods public static final String METHOD_TOOLS_LIST = "tools/list"; From 705c70f7346b5718ef0d16e8678160decc903947 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sun, 8 Jun 2025 16:31:21 +0900 Subject: [PATCH 03/15] Make server can handle _meta field and send notification to client --- .../io/modelcontextprotocol/server/McpAsyncServer.java | 2 +- .../server/McpAsyncServerExchange.java | 10 ++++++++++ .../java/io/modelcontextprotocol/server/McpServer.java | 8 ++++---- .../modelcontextprotocol/server/McpServerFeatures.java | 8 ++++---- .../java/io/modelcontextprotocol/spec/McpSchema.java | 8 +++++--- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 02ad955b..dfb77afd 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -360,7 +360,7 @@ private McpServerSession.RequestHandler toolsCallRequestHandler( return Mono.error(new McpError("Tool not found: " + callToolRequest.name())); } - return toolSpecification.map(tool -> tool.call().apply(exchange, callToolRequest.arguments())) + return toolSpecification.map(tool -> tool.call().apply(exchange, callToolRequest)) .orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name()))); }; } diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java index 2fd95a10..aea869bc 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java @@ -139,6 +139,16 @@ public Mono listRoots(String cursor) { LIST_ROOTS_RESULT_TYPE_REF); } + public Mono notification(String method, Object params) { + if (method == null || method.isEmpty()) { + return Mono.error(new McpError("Method must not be null or empty")); + } + if (params == null) { + return Mono.error(new McpError("Params must not be null")); + } + return this.session.sendNotification(method, params); + } + /** * Send a logging message notification to the client. Messages below the current * minimum logging level will be filtered out. diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java index d6ec2cc3..fc38d291 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java @@ -309,7 +309,7 @@ public AsyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabi * Example usage:
{@code
 		 * .tool(
 		 *     new Tool("calculator", "Performs calculations", schema),
-		 *     (exchange, args) -> Mono.fromSupplier(() -> calculate(args))
+		 *     (exchange, request) -> Mono.fromSupplier(() -> calculate(request))
 		 *         .map(result -> new CallToolResult("Result: " + result))
 		 * )
 		 * }
@@ -323,7 +323,7 @@ public AsyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabi * @throws IllegalArgumentException if tool or handler is null */ public AsyncSpecification tool(McpSchema.Tool tool, - BiFunction, Mono> handler) { + BiFunction> handler) { Assert.notNull(tool, "Tool must not be null"); Assert.notNull(handler, "Handler must not be null"); @@ -801,7 +801,7 @@ public SyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabil * Example usage:
{@code
 		 * .tool(
 		 *     new Tool("calculator", "Performs calculations", schema),
-		 *     (exchange, args) -> new CallToolResult("Result: " + calculate(args))
+		 *     (exchange, request) -> new CallToolResult("Result: " + calculate(request))
 		 * )
 		 * }
* @param tool The tool definition including name, description, and schema. Must @@ -814,7 +814,7 @@ public SyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabil * @throws IllegalArgumentException if tool or handler is null */ public SyncSpecification tool(McpSchema.Tool tool, - BiFunction, McpSchema.CallToolResult> handler) { + BiFunction handler) { Assert.notNull(tool, "Tool must not be null"); Assert.notNull(handler, "Handler must not be null"); diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java index 8311f5d4..95e55bae 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java @@ -222,8 +222,8 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se * .required("expression") * .property("expression", JsonSchemaType.STRING) * ), - * (exchange, args) -> { - * String expr = (String) args.get("expression"); + * (exchange, request) -> { + * String expr = (String) request.arguments().get("expression"); * return Mono.fromSupplier(() -> evaluate(expr)) * .map(result -> new CallToolResult("Result: " + result)); * } @@ -237,7 +237,7 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se * connected client. The second arguments is a map of tool arguments. */ public record AsyncToolSpecification(McpSchema.Tool tool, - BiFunction, Mono> call) { + BiFunction> call) { static AsyncToolSpecification fromSync(SyncToolSpecification tool) { // FIXME: This is temporary, proper validation should be implemented @@ -413,7 +413,7 @@ static AsyncCompletionSpecification fromSync(SyncCompletionSpecification complet * client. The second arguments is a map of arguments passed to the tool. */ public record SyncToolSpecification(McpSchema.Tool tool, - BiFunction, McpSchema.CallToolResult> call) { + BiFunction call) { } /** diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index cb538edd..e1a17924 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -870,6 +870,8 @@ private static JsonSchema parseSchema(String schema) { * tools/list. * @param arguments Arguments to pass to the tool. These must conform to the tool's * input schema. + * @param _meta Optional metadata about the request. This can include additional + * information like `progressToken` */ @JsonInclude(JsonInclude.Include.NON_ABSENT) @JsonIgnoreProperties(ignoreUnknown = true) @@ -1319,9 +1321,9 @@ public record PaginatedResult(@JsonProperty("nextCursor") String nextCursor) { // --------------------------- /** - * The Model Context Protocol (MCP) supports optional progress tracking for long-running - * operations through notification messages. Either side can send progress notifications - * to provide updates about operation status. + * The Model Context Protocol (MCP) supports optional progress tracking for + * long-running operations through notification messages. Either side can send + * progress notifications to provide updates about operation status. * * @param progressToken The original progress token * @param progress The current progress value so far From aa95c0dbabd2cd2df446d7883de63c8fc263b7b1 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sun, 8 Jun 2025 16:36:10 +0900 Subject: [PATCH 04/15] Add JSON deserialization test --- .../spec/McpSchemaTests.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java index df8176a4..bb09cf66 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java @@ -186,6 +186,24 @@ void testJSONRPCRequest() throws Exception { {"jsonrpc":"2.0","method":"method_name","id":1,"params":{"key":"value"}}""")); } + @Test + void testJSONRPCRequestWithMeta() throws Exception { + Map params = new HashMap<>(); + params.put("key", "value"); + params.put("_meta", Map.of("progressToken", "abc123")); + + McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method_name", 1, + params); + + String value = mapper.writeValueAsString(request); + assertThatJson(value).when(Option.IGNORING_ARRAY_ORDER) + .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) + .isObject() + .isEqualTo( + json(""" + {"jsonrpc":"2.0","method":"method_name","id":1,"params":{"key":"value"},"_meta":{"progressToken":"abc123"}}""")); + } + @Test void testJSONRPCNotification() throws Exception { Map params = new HashMap<>(); From a88c2414c15c8ce3fd3a28813e18fe9c15a95882 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sun, 8 Jun 2025 17:18:44 +0900 Subject: [PATCH 05/15] Implement for client BREAKING : Client Specification record is braking --- .../client/McpAsyncClient.java | 24 +++++++++++ .../client/McpClient.java | 41 +++++++++++++++++-- .../client/McpClientFeatures.java | 18 +++++++- 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index a7dac4c0..dd6217dc 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -267,6 +267,16 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, asyncLoggingNotificationHandler(loggingConsumersFinal)); + // Utility Progress Notification + List>> progressConsumersFinal = new ArrayList<>(); + progressConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); + if (!Utils.isEmpty(features.progressConsumers())) { + progressConsumersFinal.addAll(features.progressConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, + asyncProgressNotificationHandler(progressConsumersFinal)); + this.transport.setExceptionHandler(this::handleException); this.sessionSupplier = () -> new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers); @@ -985,6 +995,20 @@ private NotificationHandler asyncLoggingNotificationHandler( }; } + private NotificationHandler asyncProgressNotificationHandler( + List>> progressConsumers) { + + return params -> { + McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, + new TypeReference() { + }); + + return Flux.fromIterable(progressConsumers) + .flatMap(consumer -> consumer.apply(progressNotification)) + .then(); + }; + } + /** * Sets the minimum logging level for messages received from the server. The client * will only receive log messages at or above the specified severity level. diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index d8925b00..ce603a0f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -177,6 +177,8 @@ class SyncSpec { private final List> loggingConsumers = new ArrayList<>(); + private final List> progressConsumers = new ArrayList<>(); + private Function samplingHandler; private Function elicitationHandler; @@ -377,6 +379,36 @@ public SyncSpec loggingConsumers(List progressConsumer) { + Assert.notNull(progressConsumer, "Progress consumer must not be null"); + this.progressConsumers.add(progressConsumer); + return this; + } + + /** + * Adds a multiple consumers to be notified of progress notifications from the + * server. This allows the client to track long-running operations and provide + * feedback to users. + * @param progressConsumers A list of consumers that receives progress + * notifications. Must not be null. + * @return This builder instance for method chaining + * @throws IllegalArgumentException if progressConsumer is null + */ + public SyncSpec progressConsumers(List> progressConsumers) { + Assert.notNull(progressConsumers, "Progress consumers must not be null"); + this.progressConsumers.addAll(progressConsumers); + return this; + } + /** * Create an instance of {@link McpSyncClient} with the provided configurations or * sensible defaults. @@ -385,7 +417,8 @@ public SyncSpec loggingConsumers(List>> loggingConsumers = new ArrayList<>(); + private final List>> progressConsumers = new ArrayList<>(); + private Function> samplingHandler; private Function> elicitationHandler; @@ -663,8 +698,8 @@ public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, - this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, - this.elicitationHandler)); + this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, + this.samplingHandler, this.elicitationHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index bd1a0985..ed3f48b9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -59,6 +59,7 @@ class McpClientFeatures { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -68,6 +69,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -79,6 +81,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progressconsumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -89,6 +92,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -106,6 +110,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } @@ -149,6 +154,12 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } + List>> progressConsumers = new ArrayList<>(); + for (Consumer consumer : syncSpec.progressConsumers()) { + progressConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p)) + .subscribeOn(Schedulers.boundedElastic())); + } + Function> samplingHandler = r -> Mono .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); @@ -159,7 +170,7 @@ public static Async fromSync(Sync syncSpec) { return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, - loggingConsumers, samplingHandler, elicitationHandler); + loggingConsumers, progressConsumers, samplingHandler, elicitationHandler); } } @@ -174,6 +185,7 @@ public static Async fromSync(Sync syncSpec) { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -183,6 +195,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -196,6 +209,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -205,6 +219,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -222,6 +237,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } From bdfb4b56928d650a651efba33cbf4f03f7f31215 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Mon, 16 Jun 2025 11:52:01 +0900 Subject: [PATCH 06/15] add test --- ...rverTransportProviderIntegrationTests.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java index dc9d1cfa..a93e017f 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java @@ -958,4 +958,75 @@ void testLoggingNotification() { mcpServer.close(); } + // --------------------------------------- + // Progress Tests + // --------------------------------------- + @Test + void testProgressNotification() { + // Create a list to store received logging notifications + List receivedNotifications = new ArrayList<>(); + + // Create server with a tool that sends logging notifications + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("progress-test", "Test progress notifications", emptyJsonSchema), + (exchange, request) -> { + + var progressToken = (String) request._meta().get("progressToken"); + + exchange + .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, + new McpSchema.ProgressNotification(progressToken, 0.1, 1.0, "Test progress 1/10")) + .block(); + + exchange + .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, + new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Test progress 5/10")) + .block(); + + exchange + .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, + new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Test progress 10/10")) + .block(); + + return Mono.just(new CallToolResult("Progress test completed", false)); + }); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().logging().tools(true).build()) + .tools(tool) + .build(); + try ( + // Create client with progress notification handler + var mcpClient = clientBuilder.progressConsumer(receivedNotifications::add).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Call the tool that sends progress notifications + CallToolResult result = mcpClient.callTool( + new McpSchema.CallToolRequest("progress-test", Map.of(), Map.of("progressToken", "test-token"))); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Progress test completed"); + + // Wait for notifications to be processed + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + + System.out.println("Received notifications: " + receivedNotifications); + + // Should have received 3 notifications + assertThat(receivedNotifications).hasSize(3); + + // Check the progress notifications + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progressToken)) + .containsExactlyInAnyOrder("test-token", "test-token", "test-token"); + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progress)) + .containsExactlyInAnyOrder(0.1, 0.5, 1.0); + }); + } + mcpServer.close(); + } + } From 3b1b00a3b7cf272de3969207d915181c12ed75cc Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 09:52:56 +0900 Subject: [PATCH 07/15] fix: use static ProgressNotification type reference --- .../java/io/modelcontextprotocol/client/McpAsyncClient.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index fec113d2..ed98962f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -100,6 +100,9 @@ public class McpAsyncClient { public static final TypeReference LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeReference<>() { }; + public static final TypeReference PROGRESS_NOTIFICATION_TYPE_REF = new TypeReference<>() { + }; + /** * Client capabilities. */ @@ -843,8 +846,7 @@ private NotificationHandler asyncProgressNotificationHandler( return params -> { McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, - new TypeReference() { - }); + PROGRESS_NOTIFICATION_TYPE_REF); return Flux.fromIterable(progressConsumers) .flatMap(consumer -> consumer.apply(progressNotification)) From 17186c071d6748d232893110246e73760365e102 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 09:57:53 +0900 Subject: [PATCH 08/15] fix: correct parameter name from '_meta' to 'meta' in method documentation --- mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java index 17fec6dc..2a84802b 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java @@ -1056,7 +1056,7 @@ private static JsonSchema parseSchema(String schema) { * tools/list. * @param arguments Arguments to pass to the tool. These must conform to the tool's * input schema. - * @param _meta Optional metadata about the request. This can include additional + * @param meta Optional metadata about the request. This can include additional * information like `progressToken` */ @JsonInclude(JsonInclude.Include.NON_ABSENT) @@ -1623,7 +1623,7 @@ public record ProgressNotification(// @formatter:off @JsonProperty("progressToken") String progressToken, @JsonProperty("progress") Double progress, @JsonProperty("total") Double total, - @JsonProperty("message") String message) { + @JsonProperty("message") String message) { }// @formatter:on /** From efb4f2e2eb228f96e83155a22aba6ee4fc9fc387 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:02:09 +0900 Subject: [PATCH 09/15] fix: replace general notification method to specific method: progressNotification to send client progress updates --- .../server/McpAsyncServerExchange.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java index 22204c66..c6bb2358 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java @@ -157,16 +157,6 @@ public Mono listRoots(String cursor) { LIST_ROOTS_RESULT_TYPE_REF); } - public Mono notification(String method, Object params) { - if (method == null || method.isEmpty()) { - return Mono.error(new McpError("Method must not be null or empty")); - } - if (params == null) { - return Mono.error(new McpError("Params must not be null")); - } - return this.session.sendNotification(method, params); - } - /** * Send a logging message notification to the client. Messages below the current * minimum logging level will be filtered out. @@ -187,6 +177,20 @@ public Mono loggingNotification(LoggingMessageNotification loggingMessageN }); } + /** + * Sends a notification to the client that the current progress status has changed for + * long-running operations. + * @param progressNotification The progress notification to send + * @return A Mono that completes when the notification has been sent + */ + public Mono progressNotification(McpSchema.ProgressNotification progressNotification) { + if (progressNotification == null) { + return Mono.error(new McpError("Progress notification must not be null")); + } + + return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_PROGRESS, progressNotification); + } + /** * Sends a ping request to the client. * @return A Mono that completes with clients's ping response From d7e073270e0c118b08931d81b4745d20be54692d Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:04:11 +0900 Subject: [PATCH 10/15] fix: restore unintended changes from merge commit --- .../java/io/modelcontextprotocol/server/McpServer.java | 8 ++++---- .../io/modelcontextprotocol/server/McpServerFeatures.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java index addf6ec1..63b58922 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java @@ -313,7 +313,7 @@ public AsyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabi * Example usage:
{@code
 		 * .tool(
 		 *     new Tool("calculator", "Performs calculations", schema),
-		 *     (exchange, request) -> Mono.fromSupplier(() -> calculate(request))
+		 *     (exchange, args) -> Mono.fromSupplier(() -> calculate(args))
 		 *         .map(result -> new CallToolResult("Result: " + result))
 		 * )
 		 * }
@@ -330,7 +330,7 @@ public AsyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabi */ @Deprecated public AsyncSpecification tool(McpSchema.Tool tool, - BiFunction> handler) { + BiFunction, Mono> handler) { Assert.notNull(tool, "Tool must not be null"); Assert.notNull(handler, "Handler must not be null"); assertNoDuplicateTool(tool.name()); @@ -849,7 +849,7 @@ public SyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabil * Example usage:
{@code
 		 * .tool(
 		 *     new Tool("calculator", "Performs calculations", schema),
-		 *     (exchange, request) -> new CallToolResult("Result: " + calculate(request))
+		 *     (exchange, args) -> new CallToolResult("Result: " + calculate(args))
 		 * )
 		 * }
* @param tool The tool definition including name, description, and schema. Must @@ -865,7 +865,7 @@ public SyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabil */ @Deprecated public SyncSpecification tool(McpSchema.Tool tool, - BiFunction handler) { + BiFunction, McpSchema.CallToolResult> handler) { Assert.notNull(tool, "Tool must not be null"); Assert.notNull(handler, "Handler must not be null"); assertNoDuplicateTool(tool.name()); diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java index f553a420..3ce599c8 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java @@ -227,7 +227,7 @@ public record AsyncToolSpecification(McpSchema.Tool tool, **/ @Deprecated public AsyncToolSpecification(McpSchema.Tool tool, - BiFunction> call) { + BiFunction, Mono> call) { this(tool, call, (exchange, toolReq) -> call.apply(exchange, toolReq.arguments())); } @@ -483,7 +483,7 @@ public record SyncToolSpecification(McpSchema.Tool tool, @Deprecated public SyncToolSpecification(McpSchema.Tool tool, - BiFunction call) { + BiFunction, McpSchema.CallToolResult> call) { this(tool, call, (exchange, toolReq) -> call.apply(exchange, toolReq.arguments())); } From e23b8974ff5b45229d01cb4a7ff78f2058fcb8b0 Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:05:37 +0900 Subject: [PATCH 11/15] fix: restore client changes --- .../client/McpAsyncClient.java | 26 ------------ .../client/McpClient.java | 41 ++----------------- .../client/McpClientFeatures.java | 18 +------- 3 files changed, 4 insertions(+), 81 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index ed98962f..cf8142c6 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -100,9 +100,6 @@ public class McpAsyncClient { public static final TypeReference LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeReference<>() { }; - public static final TypeReference PROGRESS_NOTIFICATION_TYPE_REF = new TypeReference<>() { - }; - /** * Client capabilities. */ @@ -256,16 +253,6 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, asyncLoggingNotificationHandler(loggingConsumersFinal)); - // Utility Progress Notification - List>> progressConsumersFinal = new ArrayList<>(); - progressConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); - if (!Utils.isEmpty(features.progressConsumers())) { - progressConsumersFinal.addAll(features.progressConsumers()); - } - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, - asyncProgressNotificationHandler(progressConsumersFinal)); - this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, List.of(McpSchema.LATEST_PROTOCOL_VERSION), initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers, @@ -841,19 +828,6 @@ private NotificationHandler asyncLoggingNotificationHandler( }; } - private NotificationHandler asyncProgressNotificationHandler( - List>> progressConsumers) { - - return params -> { - McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, - PROGRESS_NOTIFICATION_TYPE_REF); - - return Flux.fromIterable(progressConsumers) - .flatMap(consumer -> consumer.apply(progressNotification)) - .then(); - }; - } - /** * Sets the minimum logging level for messages received from the server. The client * will only receive log messages at or above the specified severity level. diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index ce603a0f..d8925b00 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -177,8 +177,6 @@ class SyncSpec { private final List> loggingConsumers = new ArrayList<>(); - private final List> progressConsumers = new ArrayList<>(); - private Function samplingHandler; private Function elicitationHandler; @@ -379,36 +377,6 @@ public SyncSpec loggingConsumers(List progressConsumer) { - Assert.notNull(progressConsumer, "Progress consumer must not be null"); - this.progressConsumers.add(progressConsumer); - return this; - } - - /** - * Adds a multiple consumers to be notified of progress notifications from the - * server. This allows the client to track long-running operations and provide - * feedback to users. - * @param progressConsumers A list of consumers that receives progress - * notifications. Must not be null. - * @return This builder instance for method chaining - * @throws IllegalArgumentException if progressConsumer is null - */ - public SyncSpec progressConsumers(List> progressConsumers) { - Assert.notNull(progressConsumers, "Progress consumers must not be null"); - this.progressConsumers.addAll(progressConsumers); - return this; - } - /** * Create an instance of {@link McpSyncClient} with the provided configurations or * sensible defaults. @@ -417,8 +385,7 @@ public SyncSpec progressConsumers(List> public McpSyncClient build() { McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, - this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler, - this.elicitationHandler); + this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, this.elicitationHandler); McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures); @@ -468,8 +435,6 @@ class AsyncSpec { private final List>> loggingConsumers = new ArrayList<>(); - private final List>> progressConsumers = new ArrayList<>(); - private Function> samplingHandler; private Function> elicitationHandler; @@ -698,8 +663,8 @@ public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, - this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, - this.samplingHandler, this.elicitationHandler)); + this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, + this.elicitationHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index ed3f48b9..bd1a0985 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -59,7 +59,6 @@ class McpClientFeatures { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. - * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -69,7 +68,6 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, - List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -81,7 +79,6 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. - * @param progressConsumers the progressconsumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -92,7 +89,6 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, - List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -110,7 +106,6 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); - this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } @@ -154,12 +149,6 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } - List>> progressConsumers = new ArrayList<>(); - for (Consumer consumer : syncSpec.progressConsumers()) { - progressConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p)) - .subscribeOn(Schedulers.boundedElastic())); - } - Function> samplingHandler = r -> Mono .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); @@ -170,7 +159,7 @@ public static Async fromSync(Sync syncSpec) { return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, - loggingConsumers, progressConsumers, samplingHandler, elicitationHandler); + loggingConsumers, samplingHandler, elicitationHandler); } } @@ -185,7 +174,6 @@ public static Async fromSync(Sync syncSpec) { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. - * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -195,7 +183,6 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, - List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -209,7 +196,6 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. - * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -219,7 +205,6 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, - List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -237,7 +222,6 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); - this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } From 283f8f503e3a0de8537eb53a9afc21b3f263486d Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:09:16 +0900 Subject: [PATCH 12/15] fix: restore integration test because of the client code is restored for PR seperation --- ...rverTransportProviderIntegrationTests.java | 71 ------------------- 1 file changed, 71 deletions(-) diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java index f5c749ee..ac10df4f 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java @@ -1070,75 +1070,4 @@ void testPingSuccess() { mcpServer.close(); } - // --------------------------------------- - // Progress Tests - // --------------------------------------- - @Test - void testProgressNotification() { - // Create a list to store received logging notifications - List receivedNotifications = new ArrayList<>(); - - // Create server with a tool that sends logging notifications - McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( - new McpSchema.Tool("progress-test", "Test progress notifications", emptyJsonSchema), - (exchange, request) -> { - - var progressToken = (String) request._meta().get("progressToken"); - - exchange - .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, - new McpSchema.ProgressNotification(progressToken, 0.1, 1.0, "Test progress 1/10")) - .block(); - - exchange - .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, - new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Test progress 5/10")) - .block(); - - exchange - .notification(McpSchema.METHOD_NOTIFICATION_PROGRESS, - new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Test progress 10/10")) - .block(); - - return Mono.just(new CallToolResult("Progress test completed", false)); - }); - - var mcpServer = McpServer.async(mcpServerTransportProvider) - .serverInfo("test-server", "1.0.0") - .capabilities(ServerCapabilities.builder().logging().tools(true).build()) - .tools(tool) - .build(); - try ( - // Create client with progress notification handler - var mcpClient = clientBuilder.progressConsumer(receivedNotifications::add).build()) { - - // Initialize client - InitializeResult initResult = mcpClient.initialize(); - assertThat(initResult).isNotNull(); - - // Call the tool that sends progress notifications - CallToolResult result = mcpClient.callTool( - new McpSchema.CallToolRequest("progress-test", Map.of(), Map.of("progressToken", "test-token"))); - assertThat(result).isNotNull(); - assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); - assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Progress test completed"); - - // Wait for notifications to be processed - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - - System.out.println("Received notifications: " + receivedNotifications); - - // Should have received 3 notifications - assertThat(receivedNotifications).hasSize(3); - - // Check the progress notifications - assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progressToken)) - .containsExactlyInAnyOrder("test-token", "test-token", "test-token"); - assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progress)) - .containsExactlyInAnyOrder(0.1, 0.5, 1.0); - }); - } - mcpServer.close(); - } - } From 8b43c34d6c0fc34243f93f353f7905554b68509a Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:10:46 +0900 Subject: [PATCH 13/15] fix: restore McpSchemaTests because of duplicating --- .../spec/McpSchemaTests.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java index c85379bb..1d0ba302 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/spec/McpSchemaTests.java @@ -251,24 +251,6 @@ void testJSONRPCRequest() throws Exception { {"jsonrpc":"2.0","method":"method_name","id":1,"params":{"key":"value"}}""")); } - @Test - void testJSONRPCRequestWithMeta() throws Exception { - Map params = new HashMap<>(); - params.put("key", "value"); - params.put("_meta", Map.of("progressToken", "abc123")); - - McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method_name", 1, - params); - - String value = mapper.writeValueAsString(request); - assertThatJson(value).when(Option.IGNORING_ARRAY_ORDER) - .when(Option.IGNORING_EXTRA_ARRAY_ITEMS) - .isObject() - .isEqualTo( - json(""" - {"jsonrpc":"2.0","method":"method_name","id":1,"params":{"key":"value"},"_meta":{"progressToken":"abc123"}}""")); - } - @Test void testJSONRPCNotification() throws Exception { Map params = new HashMap<>(); From a1266cd245da1960210d76224998b8119b99708f Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:17:20 +0900 Subject: [PATCH 14/15] feat: Client implementation --- .../client/McpAsyncClient.java | 26 ++++++++++++ .../client/McpClient.java | 41 +++++++++++++++++-- .../client/McpClientFeatures.java | 18 +++++++- 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index cf8142c6..ed98962f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -100,6 +100,9 @@ public class McpAsyncClient { public static final TypeReference LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeReference<>() { }; + public static final TypeReference PROGRESS_NOTIFICATION_TYPE_REF = new TypeReference<>() { + }; + /** * Client capabilities. */ @@ -253,6 +256,16 @@ public class McpAsyncClient { notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, asyncLoggingNotificationHandler(loggingConsumersFinal)); + // Utility Progress Notification + List>> progressConsumersFinal = new ArrayList<>(); + progressConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); + if (!Utils.isEmpty(features.progressConsumers())) { + progressConsumersFinal.addAll(features.progressConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, + asyncProgressNotificationHandler(progressConsumersFinal)); + this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, List.of(McpSchema.LATEST_PROTOCOL_VERSION), initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers, @@ -828,6 +841,19 @@ private NotificationHandler asyncLoggingNotificationHandler( }; } + private NotificationHandler asyncProgressNotificationHandler( + List>> progressConsumers) { + + return params -> { + McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, + PROGRESS_NOTIFICATION_TYPE_REF); + + return Flux.fromIterable(progressConsumers) + .flatMap(consumer -> consumer.apply(progressNotification)) + .then(); + }; + } + /** * Sets the minimum logging level for messages received from the server. The client * will only receive log messages at or above the specified severity level. diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java index d8925b00..ce603a0f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -177,6 +177,8 @@ class SyncSpec { private final List> loggingConsumers = new ArrayList<>(); + private final List> progressConsumers = new ArrayList<>(); + private Function samplingHandler; private Function elicitationHandler; @@ -377,6 +379,36 @@ public SyncSpec loggingConsumers(List progressConsumer) { + Assert.notNull(progressConsumer, "Progress consumer must not be null"); + this.progressConsumers.add(progressConsumer); + return this; + } + + /** + * Adds a multiple consumers to be notified of progress notifications from the + * server. This allows the client to track long-running operations and provide + * feedback to users. + * @param progressConsumers A list of consumers that receives progress + * notifications. Must not be null. + * @return This builder instance for method chaining + * @throws IllegalArgumentException if progressConsumer is null + */ + public SyncSpec progressConsumers(List> progressConsumers) { + Assert.notNull(progressConsumers, "Progress consumers must not be null"); + this.progressConsumers.addAll(progressConsumers); + return this; + } + /** * Create an instance of {@link McpSyncClient} with the provided configurations or * sensible defaults. @@ -385,7 +417,8 @@ public SyncSpec loggingConsumers(List>> loggingConsumers = new ArrayList<>(); + private final List>> progressConsumers = new ArrayList<>(); + private Function> samplingHandler; private Function> elicitationHandler; @@ -663,8 +698,8 @@ public McpAsyncClient build() { return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, - this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler, - this.elicitationHandler)); + this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, + this.samplingHandler, this.elicitationHandler)); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java index bd1a0985..ed3f48b9 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java @@ -59,6 +59,7 @@ class McpClientFeatures { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -68,6 +69,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -79,6 +81,7 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progressconsumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -89,6 +92,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c List, Mono>> resourcesUpdateConsumers, List, Mono>> promptsChangeConsumers, List>> loggingConsumers, + List>> progressConsumers, Function> samplingHandler, Function> elicitationHandler) { @@ -106,6 +110,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } @@ -149,6 +154,12 @@ public static Async fromSync(Sync syncSpec) { .subscribeOn(Schedulers.boundedElastic())); } + List>> progressConsumers = new ArrayList<>(); + for (Consumer consumer : syncSpec.progressConsumers()) { + progressConsumers.add(p -> Mono.fromRunnable(() -> consumer.accept(p)) + .subscribeOn(Schedulers.boundedElastic())); + } + Function> samplingHandler = r -> Mono .fromCallable(() -> syncSpec.samplingHandler().apply(r)) .subscribeOn(Schedulers.boundedElastic()); @@ -159,7 +170,7 @@ public static Async fromSync(Sync syncSpec) { return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(), toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers, - loggingConsumers, samplingHandler, elicitationHandler); + loggingConsumers, progressConsumers, samplingHandler, elicitationHandler); } } @@ -174,6 +185,7 @@ public static Async fromSync(Sync syncSpec) { * @param resourcesChangeConsumers the resources change consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -183,6 +195,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -196,6 +209,7 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili * @param resourcesUpdateConsumers the resource update consumers. * @param promptsChangeConsumers the prompts change consumers. * @param loggingConsumers the logging consumers. + * @param progressConsumers the progress consumers. * @param samplingHandler the sampling handler. * @param elicitationHandler the elicitation handler. */ @@ -205,6 +219,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl List>> resourcesUpdateConsumers, List>> promptsChangeConsumers, List> loggingConsumers, + List> progressConsumers, Function samplingHandler, Function elicitationHandler) { @@ -222,6 +237,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of(); this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of(); this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of(); + this.progressConsumers = progressConsumers != null ? progressConsumers : List.of(); this.samplingHandler = samplingHandler; this.elicitationHandler = elicitationHandler; } From 446c6337275f0e11ff563e5cbb4b640691f70caa Mon Sep 17 00:00:00 2001 From: JiHwan Oh Date: Sat, 12 Jul 2025 10:17:48 +0900 Subject: [PATCH 15/15] feat: Add tests --- .../server/McpServerFeatures.java | 2 +- ...rverTransportProviderIntegrationTests.java | 71 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java index 3ce599c8..12edfb34 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java @@ -209,7 +209,7 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se * represents a specific capability. * * @param tool The tool definition including name, description, and parameter schema - * @param call Deprecated. Uset he {@link AsyncToolSpecification#callHandler} instead. + * @param call Deprecated. Use the {@link AsyncToolSpecification#callHandler} instead. * @param callHandler The function that implements the tool's logic, receiving a * {@link McpAsyncServerExchange} and a * {@link io.modelcontextprotocol.spec.McpSchema.CallToolRequest} and returning diff --git a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java index ac10df4f..514903e3 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java @@ -1070,4 +1070,75 @@ void testPingSuccess() { mcpServer.close(); } + // --------------------------------------- + // Progress Tests + // --------------------------------------- + @Test + void testProgressNotification() { + // Create a list to store received logging notifications + List receivedNotifications = new ArrayList<>(); + + // Create server with a tool that sends logging notifications + + McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification( + new McpSchema.Tool("progress-test", "Test progress notifications", emptyJsonSchema), null, + (exchange, request) -> { + var progressToken = (String) request.meta().get("progressToken"); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.1, 1.0, "Test progress 1/10")) + .block(); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 0.5, 1.0, "Test progress 5/10")) + .block(); + + exchange + .progressNotification( + new McpSchema.ProgressNotification(progressToken, 1.0, 1.0, "Test progress 10/10")) + .block(); + + return Mono.just(new CallToolResult("Progress test completed", false)); + }); + + var mcpServer = McpServer.async(mcpServerTransportProvider) + .serverInfo("test-server", "1.0.0") + .capabilities(ServerCapabilities.builder().logging().tools(true).build()) + .tools(tool) + .build(); + try ( + // Create client with progress notification handler + var mcpClient = clientBuilder.progressConsumer(receivedNotifications::add).build()) { + + // Initialize client + InitializeResult initResult = mcpClient.initialize(); + assertThat(initResult).isNotNull(); + + // Call the tool that sends progress notifications + CallToolResult result = mcpClient.callTool( + new McpSchema.CallToolRequest("progress-test", Map.of(), Map.of("progressToken", "test-token"))); + assertThat(result).isNotNull(); + assertThat(result.content().get(0)).isInstanceOf(McpSchema.TextContent.class); + assertThat(((McpSchema.TextContent) result.content().get(0)).text()).isEqualTo("Progress test completed"); + + // Wait for notifications to be processed + await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { + + System.out.println("Received notifications: " + receivedNotifications); + + // Should have received 3 notifications + assertThat(receivedNotifications).hasSize(3); + + // Check the progress notifications + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progressToken)) + .containsExactlyInAnyOrder("test-token", "test-token", "test-token"); + assertThat(receivedNotifications.stream().map(McpSchema.ProgressNotification::progress)) + .containsExactlyInAnyOrder(0.1, 0.5, 1.0); + }); + } + mcpServer.close(); + } + }