Skip to content

Commit 1dfe97e

Browse files
Implement SSE priming events for resumability (SEP-1699)
Server now sends a priming event (SSE event with ID but empty data) at the start of POST SSE streams when an EventStore is configured. This enables clients to reconnect with Last-Event-ID even if the server closes the connection before sending any actual data. Changes: - EventStore.store_event now accepts JSONRPCMessage | None (None for priming) - Server sends priming event before processing messages in sse_writer - Client calls resumption callback for empty-data events that have an ID
1 parent 3441e82 commit 1dfe97e

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

src/mcp/client/streamable_http.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,11 @@ async def _handle_sse_event(
160160
) -> bool:
161161
"""Handle an SSE event, returning True if the response is complete."""
162162
if sse.event == "message":
163-
# Skip empty data (keep-alive pings)
163+
# Handle priming events (empty data with ID) for resumability
164164
if not sse.data:
165+
# Call resumption callback for priming events that have an ID
166+
if sse.id and resumption_callback:
167+
await resumption_callback(sse.id)
165168
return False
166169
try:
167170
message = JSONRPCMessage.model_validate_json(sse.data)

src/mcp/server/streamable_http.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ class EventStore(ABC):
8787
"""
8888

8989
@abstractmethod
90-
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:
90+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
9191
"""
9292
Stores an event for later retrieval.
9393
9494
Args:
9595
stream_id: ID of the stream the event belongs to
96-
message: The JSON-RPC message to store
96+
message: The JSON-RPC message to store, or None for priming events
9797
9898
Returns:
9999
The generated event ID for the stored event
@@ -489,6 +489,17 @@ async def sse_writer():
489489
# Get the request ID from the incoming request message
490490
try:
491491
async with sse_stream_writer, request_stream_reader:
492+
# Send priming event if event_store is configured
493+
# This sends an event with ID but empty data, enabling
494+
# the client to reconnect with Last-Event-ID if needed
495+
if self._event_store:
496+
priming_event_id = await self._event_store.store_event(
497+
request_id,
498+
None, # Priming event has no payload
499+
)
500+
priming_event = {"id": priming_event_id, "data": ""}
501+
await sse_stream_writer.send(priming_event)
502+
492503
# Process messages from the request-specific stream
493504
async for event_message in request_stream_reader:
494505
# Build the event data

tests/shared/test_streamable_http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ class SimpleEventStore(EventStore):
7777
"""Simple in-memory event store for testing."""
7878

7979
def __init__(self):
80-
self._events: list[tuple[StreamId, EventId, types.JSONRPCMessage]] = []
80+
self._events: list[tuple[StreamId, EventId, types.JSONRPCMessage | None]] = []
8181
self._event_id_counter = 0
8282

83-
async def store_event(self, stream_id: StreamId, message: types.JSONRPCMessage) -> EventId: # pragma: no cover
83+
async def store_event(self, stream_id: StreamId, message: types.JSONRPCMessage | None) -> EventId: # pragma: no cover
8484
"""Store an event and return its ID."""
8585
self._event_id_counter += 1
8686
event_id = str(self._event_id_counter)

0 commit comments

Comments
 (0)