Skip to content

Commit b5a1e3b

Browse files
Kehrlanntzolov
authored andcommitted
Add option for immediate execution in McpSyncServer
- The McpSyncServer wraps an async server. By default, reactive operations are scheduled on a bounded-elastic scheduler, to offload blocking work and prevent accidental blocking of non-blocking operations. - With the default behavior, there will be thead ops, even in a blocking context, which means thread-locals from the request thread will be lost. This is inconenvient for frameworks that store state in thread-locals. - This commit adds the ability to avoid offloading, when the user is sure they are executing code in a blocking environment. Work happens in the calling thread, and thread-locals are available throughout the execution.
1 parent c711f83 commit b5a1e3b

File tree

6 files changed

+172
-34
lines changed

6 files changed

+172
-34
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.server;
@@ -695,6 +695,8 @@ class SyncSpecification {
695695

696696
private Duration requestTimeout = Duration.ofSeconds(10); // Default timeout
697697

698+
private boolean immediateExecution = false;
699+
698700
private SyncSpecification(McpServerTransportProvider transportProvider) {
699701
Assert.notNull(transportProvider, "Transport provider must not be null");
700702
this.transportProvider = transportProvider;
@@ -1116,6 +1118,22 @@ public SyncSpecification objectMapper(ObjectMapper objectMapper) {
11161118
return this;
11171119
}
11181120

1121+
/**
1122+
* Enable on "immediate execution" of the operations on the underlying
1123+
* {@link McpAsyncServer}. Defaults to false, which does blocking code offloading
1124+
* to prevent accidental blocking of the non-blocking transport.
1125+
* <p>
1126+
* Do NOT set to true if the underlying transport is a non-blocking
1127+
* implementation.
1128+
* @param immediateExecution When true, do not offload work asynchronously.
1129+
* @return This builder instance for method chaining.
1130+
*
1131+
*/
1132+
public SyncSpecification immediateExecution(boolean immediateExecution) {
1133+
this.immediateExecution = immediateExecution;
1134+
return this;
1135+
}
1136+
11191137
/**
11201138
* Builds a synchronous MCP server that provides blocking operations.
11211139
* @return A new instance of {@link McpSyncServer} configured with this builder's
@@ -1125,12 +1143,13 @@ public McpSyncServer build() {
11251143
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
11261144
this.tools, this.resources, this.resourceTemplates, this.prompts, this.completions,
11271145
this.rootsChangeHandlers, this.instructions);
1128-
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
1146+
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures,
1147+
this.immediateExecution);
11291148
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
11301149
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures, this.requestTimeout,
11311150
this.uriTemplateManagerFactory);
11321151

1133-
return new McpSyncServer(asyncServer);
1152+
return new McpSyncServer(asyncServer, this.immediateExecution);
11341153
}
11351154

11361155
}

mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024-2024 the original author or authors.
2+
* Copyright 2024-2025 the original author or authors.
33
*/
44

55
package io.modelcontextprotocol.server;
@@ -95,28 +95,30 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
9595
* blocking code offloading to prevent accidental blocking of the non-blocking
9696
* transport.
9797
* @param syncSpec a potentially blocking, synchronous specification.
98+
* @param immediateExecution when true, do not offload. Do NOT set to true when
99+
* using a non-blocking transport.
98100
* @return a specification which is protected from blocking calls specified by the
99101
* user.
100102
*/
101-
static Async fromSync(Sync syncSpec) {
103+
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
102104
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
103105
for (var tool : syncSpec.tools()) {
104-
tools.add(AsyncToolSpecification.fromSync(tool));
106+
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
105107
}
106108

107109
Map<String, AsyncResourceSpecification> resources = new HashMap<>();
108110
syncSpec.resources().forEach((key, resource) -> {
109-
resources.put(key, AsyncResourceSpecification.fromSync(resource));
111+
resources.put(key, AsyncResourceSpecification.fromSync(resource, immediateExecution));
110112
});
111113

112114
Map<String, AsyncPromptSpecification> prompts = new HashMap<>();
113115
syncSpec.prompts().forEach((key, prompt) -> {
114-
prompts.put(key, AsyncPromptSpecification.fromSync(prompt));
116+
prompts.put(key, AsyncPromptSpecification.fromSync(prompt, immediateExecution));
115117
});
116118

117119
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new HashMap<>();
118120
syncSpec.completions().forEach((key, completion) -> {
119-
completions.put(key, AsyncCompletionSpecification.fromSync(completion));
121+
completions.put(key, AsyncCompletionSpecification.fromSync(completion, immediateExecution));
120122
});
121123

122124
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootChangeConsumers = new ArrayList<>();
@@ -239,15 +241,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
239241
public record AsyncToolSpecification(McpSchema.Tool tool,
240242
BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<McpSchema.CallToolResult>> call) {
241243

242-
static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
244+
static AsyncToolSpecification fromSync(SyncToolSpecification tool, boolean immediate) {
243245
// FIXME: This is temporary, proper validation should be implemented
244246
if (tool == null) {
245247
return null;
246248
}
247-
return new AsyncToolSpecification(tool.tool(),
248-
(exchange, map) -> Mono
249-
.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map))
250-
.subscribeOn(Schedulers.boundedElastic()));
249+
return new AsyncToolSpecification(tool.tool(), (exchange, map) -> {
250+
var toolResult = Mono.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map));
251+
return immediate ? toolResult : toolResult.subscribeOn(Schedulers.boundedElastic());
252+
});
251253
}
252254
}
253255

@@ -281,15 +283,16 @@ static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
281283
public record AsyncResourceSpecification(McpSchema.Resource resource,
282284
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {
283285

284-
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
286+
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
285287
// FIXME: This is temporary, proper validation should be implemented
286288
if (resource == null) {
287289
return null;
288290
}
289-
return new AsyncResourceSpecification(resource.resource(),
290-
(exchange, req) -> Mono
291-
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req))
292-
.subscribeOn(Schedulers.boundedElastic()));
291+
return new AsyncResourceSpecification(resource.resource(), (exchange, req) -> {
292+
var resourceResult = Mono
293+
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req));
294+
return immediateExecution ? resourceResult : resourceResult.subscribeOn(Schedulers.boundedElastic());
295+
});
293296
}
294297
}
295298

@@ -327,15 +330,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
327330
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
328331
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {
329332

330-
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt) {
333+
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
331334
// FIXME: This is temporary, proper validation should be implemented
332335
if (prompt == null) {
333336
return null;
334337
}
335-
return new AsyncPromptSpecification(prompt.prompt(),
336-
(exchange, req) -> Mono
337-
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req))
338-
.subscribeOn(Schedulers.boundedElastic()));
338+
return new AsyncPromptSpecification(prompt.prompt(), (exchange, req) -> {
339+
var promptResult = Mono
340+
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req));
341+
return immediateExecution ? promptResult : promptResult.subscribeOn(Schedulers.boundedElastic());
342+
});
339343
}
340344
}
341345

@@ -366,14 +370,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
366370
* @return an asynchronous wrapper of the provided sync specification, or
367371
* {@code null} if input is null
368372
*/
369-
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion) {
373+
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
374+
boolean immediateExecution) {
370375
if (completion == null) {
371376
return null;
372377
}
373-
return new AsyncCompletionSpecification(completion.referenceKey(),
374-
(exchange, request) -> Mono.fromCallable(
375-
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request))
376-
.subscribeOn(Schedulers.boundedElastic()));
378+
return new AsyncCompletionSpecification(completion.referenceKey(), (exchange, request) -> {
379+
var completionResult = Mono.fromCallable(
380+
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request));
381+
return immediateExecution ? completionResult
382+
: completionResult.subscribeOn(Schedulers.boundedElastic());
383+
});
377384
}
378385
}
379386

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,37 @@ public class McpSyncServer {
5454
*/
5555
private final McpAsyncServer asyncServer;
5656

57+
private final boolean immediateExecution;
58+
5759
/**
5860
* Creates a new synchronous server that wraps the provided async server.
5961
* @param asyncServer The async server to wrap
6062
*/
6163
public McpSyncServer(McpAsyncServer asyncServer) {
64+
this(asyncServer, false);
65+
}
66+
67+
/**
68+
* Creates a new synchronous server that wraps the provided async server.
69+
* @param asyncServer The async server to wrap
70+
* @param immediateExecution Tools, prompts, and resources handlers execute work
71+
* without blocking code offloading. Do NOT set to true if the {@code asyncServer}'s
72+
* transport is non-blocking.
73+
*/
74+
public McpSyncServer(McpAsyncServer asyncServer, boolean immediateExecution) {
6275
Assert.notNull(asyncServer, "Async server must not be null");
6376
this.asyncServer = asyncServer;
77+
this.immediateExecution = immediateExecution;
6478
}
6579

6680
/**
6781
* Add a new tool handler.
6882
* @param toolHandler The tool handler to add
6983
*/
7084
public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {
71-
this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();
85+
this.asyncServer
86+
.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler, this.immediateExecution))
87+
.block();
7288
}
7389

7490
/**
@@ -84,7 +100,10 @@ public void removeTool(String toolName) {
84100
* @param resourceHandler The resource handler to add
85101
*/
86102
public void addResource(McpServerFeatures.SyncResourceSpecification resourceHandler) {
87-
this.asyncServer.addResource(McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler)).block();
103+
this.asyncServer
104+
.addResource(
105+
McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler, this.immediateExecution))
106+
.block();
88107
}
89108

90109
/**
@@ -100,7 +119,10 @@ public void removeResource(String resourceUri) {
100119
* @param promptSpecification The prompt specification to add
101120
*/
102121
public void addPrompt(McpServerFeatures.SyncPromptSpecification promptSpecification) {
103-
this.asyncServer.addPrompt(McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification)).block();
122+
this.asyncServer
123+
.addPrompt(
124+
McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification, this.immediateExecution))
125+
.block();
104126
}
105127

106128
/**

mcp/src/test/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProviderIntegrationTests.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
2+
* Copyright 2024 - 2025 the original author or authors.
33
*/
44
package io.modelcontextprotocol.server.transport;
55

@@ -37,7 +37,6 @@
3737
import org.apache.catalina.startup.Tomcat;
3838
import org.junit.jupiter.api.AfterEach;
3939
import org.junit.jupiter.api.BeforeEach;
40-
import org.junit.jupiter.api.Disabled;
4140
import org.junit.jupiter.api.Test;
4241
import reactor.core.publisher.Mono;
4342
import reactor.test.StepVerifier;
@@ -46,6 +45,7 @@
4645

4746
import static org.assertj.core.api.Assertions.assertThat;
4847
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
48+
import static org.assertj.core.api.InstanceOfAssertFactories.type;
4949
import static org.awaitility.Awaitility.await;
5050
import static org.mockito.Mockito.mock;
5151

@@ -728,6 +728,9 @@ void testToolCallSuccess() {
728728
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
729729
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
730730
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
731+
assertThat(McpTestServletFilter.getThreadLocalValue())
732+
.as("blocking code exectuion should be offloaded")
733+
.isNull();
731734
// perform a blocking call to a remote service
732735
String response = RestClient.create()
733736
.get()
@@ -758,6 +761,37 @@ void testToolCallSuccess() {
758761
mcpServer.close();
759762
}
760763

764+
@Test
765+
void testToolCallImmediateExecution() {
766+
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
767+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
768+
var threadLocalValue = McpTestServletFilter.getThreadLocalValue();
769+
return CallToolResult.builder()
770+
.addTextContent(threadLocalValue != null ? threadLocalValue : "<unset>")
771+
.build();
772+
});
773+
774+
var mcpServer = McpServer.sync(mcpServerTransportProvider)
775+
.capabilities(ServerCapabilities.builder().tools(true).build())
776+
.tools(tool1)
777+
.immediateExecution(true)
778+
.build();
779+
780+
try (var mcpClient = clientBuilder.build()) {
781+
mcpClient.initialize();
782+
783+
CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
784+
785+
assertThat(response).isNotNull();
786+
assertThat(response.content()).first()
787+
.asInstanceOf(type(McpSchema.TextContent.class))
788+
.extracting(McpSchema.TextContent::text)
789+
.isEqualTo(McpTestServletFilter.THREAD_LOCAL_VALUE);
790+
}
791+
792+
mcpServer.close();
793+
}
794+
761795
@Test
762796
void testToolListChangeHandlingSuccess() {
763797

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 - 2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server.transport;
6+
7+
import java.io.IOException;
8+
9+
import jakarta.servlet.Filter;
10+
import jakarta.servlet.FilterChain;
11+
import jakarta.servlet.ServletException;
12+
import jakarta.servlet.ServletRequest;
13+
import jakarta.servlet.ServletResponse;
14+
15+
/**
16+
* Simple {@link Filter} which sets a value in a thread local. Used to verify whether MCP
17+
* executions happen on the thread processing the request or are offloaded.
18+
*
19+
* @author Daniel Garnier-Moiroux
20+
*/
21+
public class McpTestServletFilter implements Filter {
22+
23+
public static final String THREAD_LOCAL_VALUE = McpTestServletFilter.class.getName();
24+
25+
private static final ThreadLocal<String> holder = new ThreadLocal<>();
26+
27+
@Override
28+
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
29+
throws IOException, ServletException {
30+
holder.set(THREAD_LOCAL_VALUE);
31+
try {
32+
filterChain.doFilter(servletRequest, servletResponse);
33+
}
34+
finally {
35+
holder.remove();
36+
}
37+
}
38+
39+
public static String getThreadLocalValue() {
40+
return holder.get();
41+
}
42+
43+
}

mcp/src/test/java/io/modelcontextprotocol/server/transport/TomcatTestUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
import jakarta.servlet.Servlet;
1111
import org.apache.catalina.Context;
1212
import org.apache.catalina.startup.Tomcat;
13+
import org.apache.tomcat.util.descriptor.web.FilterDef;
14+
import org.apache.tomcat.util.descriptor.web.FilterMap;
1315

1416
/**
1517
* @author Christian Tzolov
18+
* @author Daniel Garnier-Moiroux
1619
*/
1720
public class TomcatTestUtil {
1821

@@ -39,6 +42,16 @@ public static Tomcat createTomcatServer(String contextPath, int port, Servlet se
3942
context.addChild(wrapper);
4043
context.addServletMappingDecoded("/*", "mcpServlet");
4144

45+
var filterDef = new FilterDef();
46+
filterDef.setFilterClass(McpTestServletFilter.class.getName());
47+
filterDef.setFilterName(McpTestServletFilter.class.getSimpleName());
48+
context.addFilterDef(filterDef);
49+
50+
var filterMap = new FilterMap();
51+
filterMap.setFilterName(McpTestServletFilter.class.getSimpleName());
52+
filterMap.addURLPattern("/*");
53+
context.addFilterMap(filterMap);
54+
4255
var connector = tomcat.getConnector();
4356
connector.setAsyncTimeout(3000);
4457

0 commit comments

Comments
 (0)