From 3b30f5072bb877612efe484f1f5f195688481392 Mon Sep 17 00:00:00 2001 From: goecho Date: Wed, 28 May 2025 14:12:30 +0800 Subject: [PATCH 1/2] Fix: Handle SSE ping messages in WebFluxSseClientTransport - Add check for SSE ping messages from fastMCP server - Log debug message when ping received instead of throwing exception - Complete the stream on ping events - Matches behavior of Python SDK which treats pings as keep-alive --- .../transport/WebFluxSseClientTransport.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 37abe295b..5b4306751 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -3,10 +3,6 @@ */ package io.modelcontextprotocol.client.transport; -import java.io.IOException; -import java.util.function.BiConsumer; -import java.util.function.Function; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.spec.McpClientTransport; @@ -16,6 +12,10 @@ import io.modelcontextprotocol.util.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,10 +25,9 @@ import reactor.util.retry.Retry; import reactor.util.retry.Retry.RetrySignal; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.MediaType; -import org.springframework.http.codec.ServerSentEvent; -import org.springframework.web.reactive.function.client.WebClient; +import java.io.IOException; +import java.util.function.BiConsumer; +import java.util.function.Function; /** * Server-Sent Events (SSE) implementation of the @@ -166,6 +165,10 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe this.sseEndpoint = sseEndpoint; } + private boolean isPingEvent(ServerSentEvent event) { + return event.comment() != null && event.comment().startsWith("ping"); + } + /** * Establishes a connection to the MCP server using Server-Sent Events (SSE). This * method initiates the SSE connection and sets up the message processing pipeline. @@ -212,6 +215,10 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) { s.error(ioException); } } + else if (isPingEvent(event)) { + logger.debug("Received SSE ping message"); + s.complete(); + } else { s.error(new McpError("Received unrecognized SSE event type: " + event.event())); } From c768c74c517433e4d6ec3530f0b94a1a851d2a38 Mon Sep 17 00:00:00 2001 From: goecho Date: Fri, 25 Jul 2025 11:25:38 +0800 Subject: [PATCH 2/2] Refactor: Improve SSE ping comment handling and generalize to isCommentLine() - Strengthen the logic for detecting SSE ping comments by checking that id, event, and data fields are empty. - Generalize comment detection into isCommentLine(ServerSentEvent) for clarity and reusability. - Improves consistency with the SSE specification and matches behavior of the Python SDK. - Replaces previous hardcoded check with a more robust and maintainable solution. --- .../transport/WebFluxSseClientTransport.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 5b4306751..26fbfdf9b 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.common.util.StringUtils; import io.modelcontextprotocol.spec.McpClientTransport; import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; @@ -165,8 +166,16 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe this.sseEndpoint = sseEndpoint; } - private boolean isPingEvent(ServerSentEvent event) { - return event.comment() != null && event.comment().startsWith("ping"); + private boolean isCommentLine(ServerSentEvent event) { + String id = event.id(); + String type = event.event(); + String data = event.data(); + String comment = event.comment(); + + return StringUtils.isEmpty(id) && + StringUtils.isEmpty(type) && + StringUtils.isEmpty(data) && + comment != null; } /** @@ -215,8 +224,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) { s.error(ioException); } } - else if (isPingEvent(event)) { - logger.debug("Received SSE ping message"); + else if (isCommentLine(event)) { + logger.debug("Received SSE comment: {}", event.comment()); s.complete(); } else {