Skip to content

Commit 998bf8c

Browse files
committed
Introduce stateless mode to allow for HA
- Create failing tests for `stateless` mode - Implement `stateless` mode by turning of SSE in Streamable HTTP, making the interaction a standard HTTP Req/Resp
1 parent ea21604 commit 998bf8c

File tree

2 files changed

+169
-13
lines changed

2 files changed

+169
-13
lines changed

lib/mcp/server/transports/streamable_http_transport.rb

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ module MCP
88
class Server
99
module Transports
1010
class StreamableHTTPTransport < Transport
11-
def initialize(server)
12-
super
11+
def initialize(server, stateless: false)
12+
super(server)
1313
# { session_id => { stream: stream_object }
1414
@sessions = {}
1515
@mutex = Mutex.new
16+
17+
@stateless = stateless
1618
end
1719

1820
def handle_request(request)
@@ -24,7 +26,7 @@ def handle_request(request)
2426
when "DELETE"
2527
handle_delete(request)
2628
else
27-
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
29+
method_not_allowed_response
2830
end
2931
end
3032

@@ -35,6 +37,9 @@ def close
3537
end
3638

3739
def send_notification(method, params = nil, session_id: nil)
40+
# Stateless mode doesn't support notifications
41+
raise "Stateless mode does not support notifications" if @stateless
42+
3843
notification = {
3944
jsonrpc: "2.0",
4045
method:,
@@ -119,6 +124,10 @@ def handle_post(request)
119124
end
120125

121126
def handle_get(request)
127+
if @stateless
128+
return method_not_allowed_response
129+
end
130+
122131
session_id = extract_session_id(request)
123132

124133
return missing_session_id_response unless session_id
@@ -128,6 +137,13 @@ def handle_get(request)
128137
end
129138

130139
def handle_delete(request)
140+
success_response = [200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]
141+
142+
if @stateless
143+
# Stateless mode doesn't support sessions, so we can just return a success response
144+
return success_response
145+
end
146+
131147
session_id = request.env["HTTP_MCP_SESSION_ID"]
132148

133149
return [
@@ -137,7 +153,7 @@ def handle_delete(request)
137153
] unless session_id
138154

139155
cleanup_session(session_id)
140-
[200, { "Content-Type" => "application/json" }, [{ success: true }.to_json]]
156+
success_response
141157
end
142158

143159
def cleanup_session(session_id)
@@ -177,21 +193,26 @@ def response?(body)
177193
end
178194

179195
def handle_initialization(body_string, body)
180-
session_id = SecureRandom.uuid
196+
session_id = nil
181197

182-
@mutex.synchronize do
183-
@sessions[session_id] = {
184-
stream: nil,
185-
}
198+
unless @stateless
199+
session_id = SecureRandom.uuid
200+
201+
@mutex.synchronize do
202+
@sessions[session_id] = {
203+
stream: nil,
204+
}
205+
end
186206
end
187207

188208
response = @server.handle_json(body_string)
189209

190210
headers = {
191211
"Content-Type" => "application/json",
192-
"Mcp-Session-Id" => session_id,
193212
}
194213

214+
headers["Mcp-Session-Id"] = session_id if session_id
215+
195216
[200, headers, [response]]
196217
end
197218

@@ -200,21 +221,32 @@ def handle_accepted
200221
end
201222

202223
def handle_regular_request(body_string, session_id)
203-
# If session ID is provided, but not in the sessions hash, return an error
204-
if session_id && !@sessions.key?(session_id)
205-
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
224+
unless @stateless
225+
# If session ID is provided, but not in the sessions hash, return an error
226+
if session_id && !@sessions.key?(session_id)
227+
return [400, { "Content-Type" => "application/json" }, [{ error: "Invalid session ID" }.to_json]]
228+
end
206229
end
207230

208231
response = @server.handle_json(body_string)
232+
233+
# Stream can be nil since stateless mode doesn't retain streams
209234
stream = get_session_stream(session_id) if session_id
210235

211236
if stream
212237
send_response_to_stream(stream, response, session_id)
238+
elsif response.nil? && notification_request?(body_string)
239+
[202, { "Content-Type" => "application/json" }, [response]]
213240
else
214241
[200, { "Content-Type" => "application/json" }, [response]]
215242
end
216243
end
217244

245+
def notification_request?(body_string)
246+
body = parse_request_body(body_string)
247+
body.is_a?(Hash) && body["method"].start_with?("notifications/")
248+
end
249+
218250
def get_session_stream(session_id)
219251
@mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) }
220252
end
@@ -236,6 +268,10 @@ def session_exists?(session_id)
236268
@mutex.synchronize { @sessions.key?(session_id) }
237269
end
238270

271+
def method_not_allowed_response
272+
[405, { "Content-Type" => "application/json" }, [{ error: "Method not allowed" }.to_json]]
273+
end
274+
239275
def missing_session_id_response
240276
[400, { "Content-Type" => "application/json" }, [{ error: "Missing session ID" }.to_json]]
241277
end

test/mcp/server/transports/streamable_http_transport_test.rb

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,126 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
573573
assert_equal "Method not allowed", body["error"]
574574
end
575575

576+
test "stateless mode allows requests without session IDs, responding with no session ID" do
577+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
578+
579+
init_request = create_rack_request(
580+
"POST",
581+
"/",
582+
{ "CONTENT_TYPE" => "application/json" },
583+
{ jsonrpc: "2.0", method: "initialize", id: "init" }.to_json,
584+
)
585+
init_response = stateless_transport.handle_request(init_request)
586+
assert_nil init_response[1]["Mcp-Session-Id"]
587+
end
588+
589+
test "stateless mode responds without any session ID when session ID is present" do
590+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
591+
592+
request = create_rack_request(
593+
"POST",
594+
"/",
595+
{
596+
"CONTENT_TYPE" => "application/json",
597+
"HTTP_MCP_SESSION_ID" => "unseen_session_id",
598+
},
599+
{ jsonrpc: "2.0", method: "ping", id: "123" }.to_json,
600+
)
601+
602+
response = stateless_transport.handle_request(request)
603+
assert_equal 200, response[0]
604+
assert_equal(
605+
{
606+
"Content-Type" => "application/json",
607+
},
608+
response[1],
609+
)
610+
611+
body = JSON.parse(response[2][0])
612+
assert_equal "2.0", body["jsonrpc"]
613+
assert_equal "123", body["id"]
614+
end
615+
616+
test "stateless mode responds with 405 when SSE is requested" do
617+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
618+
619+
get_request = create_rack_request(
620+
"GET",
621+
"/",
622+
{
623+
"CONTENT_TYPE" => "application/json,text/event-stream",
624+
},
625+
)
626+
response = stateless_transport.handle_request(get_request)
627+
assert_equal 405, response[0]
628+
assert_equal({ "Content-Type" => "application/json" }, response[1])
629+
630+
body = JSON.parse(response[2][0])
631+
assert_equal "Method not allowed", body["error"]
632+
end
633+
634+
test "stateless mode silently responds with success to session DELETE when session ID is not present" do
635+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
636+
637+
delete_request = create_rack_request(
638+
"DELETE",
639+
"/",
640+
{},
641+
)
642+
response = stateless_transport.handle_request(delete_request)
643+
assert_equal 200, response[0]
644+
assert_equal({ "Content-Type" => "application/json" }, response[1])
645+
646+
body = JSON.parse(response[2][0])
647+
assert body["success"]
648+
end
649+
650+
test "stateless mode silently responds with success to session DELETE when session ID is provided" do
651+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
652+
653+
delete_request = create_rack_request(
654+
"DELETE",
655+
"/",
656+
{ "HTTP_MCP_SESSION_ID" => "session_id" },
657+
)
658+
response = stateless_transport.handle_request(delete_request)
659+
assert_equal 200, response[0]
660+
assert_equal({ "Content-Type" => "application/json" }, response[1])
661+
662+
body = JSON.parse(response[2][0])
663+
assert body["success"]
664+
end
665+
666+
test "stateless mode does not support server-sent events" do
667+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
668+
669+
assert_raises(RuntimeError, "Stateless mode does not support notifications") do
670+
stateless_transport.send_notification(
671+
"test_notification",
672+
{ message: "Hello" },
673+
session_id: "some_session_id",
674+
)
675+
end
676+
end
677+
678+
test "stateless mode responds with 202 when client sends a notification/initialized request" do
679+
stateless_transport = StreamableHTTPTransport.new(@server, stateless: true)
680+
681+
request = create_rack_request(
682+
"POST",
683+
"/",
684+
{ "CONTENT_TYPE" => "application/json" },
685+
{ jsonrpc: "2.0", method: "notifications/initialized" }.to_json,
686+
)
687+
688+
response = stateless_transport.handle_request(request)
689+
assert_equal 202, response[0]
690+
assert_equal({ "Content-Type" => "application/json" }, response[1])
691+
692+
body = response[2][0]
693+
assert body.blank?
694+
end
695+
576696
test "handle post request with a standard error" do
577697
request = create_rack_request(
578698
"POST",

0 commit comments

Comments
 (0)