Skip to content

Commit 13e43e3

Browse files
Implement retry interval support for SSE polling (SEP-1699)
Server now sends the retry field in SSE priming events when retry_interval is configured. Client respects this field and waits the specified interval before reconnecting. Changes: - Server: Add retry field to priming event when retry_interval is set - Server: Extract _send_priming_event() helper method - Client: Track retry interval from SSE events - Client: Wait for retry interval before reconnecting
1 parent 21db2a6 commit 13e43e3

File tree

2 files changed

+37
-14
lines changed

2 files changed

+37
-14
lines changed

src/mcp/client/streamable_http.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ async def _handle_sse_response(
330330
) -> None:
331331
"""Handle SSE response from the server."""
332332
last_event_id: str | None = None
333+
retry_interval_ms: int | None = None
333334

334335
try:
335336
event_source = EventSource(response)
@@ -338,6 +339,10 @@ async def _handle_sse_response(
338339
if sse.id:
339340
last_event_id = sse.id
340341

342+
# Track retry interval from server
343+
if sse.retry is not None:
344+
retry_interval_ms = sse.retry
345+
341346
is_complete = await self._handle_sse_event(
342347
sse,
343348
ctx.read_stream_writer,
@@ -352,16 +357,21 @@ async def _handle_sse_response(
352357
except Exception as e:
353358
logger.debug(f"SSE stream ended: {e}")
354359

355-
# Stream ended without response - reconnect if we have priming event
360+
# Stream ended without response - reconnect if we received an event with ID
356361
if last_event_id is not None:
357-
await self._handle_reconnection(ctx, last_event_id)
362+
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
358363

359364
async def _handle_reconnection(
360365
self,
361366
ctx: RequestContext,
362367
last_event_id: str,
368+
retry_interval_ms: int | None = None,
363369
) -> None:
364370
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
371+
# Wait for retry interval if specified by server
372+
if retry_interval_ms is not None:
373+
await anyio.sleep(retry_interval_ms / 1000.0)
374+
365375
headers = self._prepare_request_headers(ctx.headers)
366376
headers[LAST_EVENT_ID] = last_event_id
367377

@@ -383,10 +393,13 @@ async def _handle_reconnection(
383393

384394
# Track for potential further reconnection
385395
reconnect_last_event_id: str | None = last_event_id
396+
reconnect_retry_ms = retry_interval_ms
386397

387398
async for sse in event_source.aiter_sse():
388399
if sse.id:
389400
reconnect_last_event_id = sse.id
401+
if sse.retry is not None:
402+
reconnect_retry_ms = sse.retry
390403

391404
is_complete = await self._handle_sse_event(
392405
sse,
@@ -400,11 +413,11 @@ async def _handle_reconnection(
400413

401414
# Stream ended again without response - reconnect again
402415
if reconnect_last_event_id is not None:
403-
await self._handle_reconnection(ctx, reconnect_last_event_id)
416+
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms)
404417
except Exception as e:
405418
logger.debug(f"Reconnection failed: {e}")
406419
# Try to reconnect again if we still have an event ID
407-
await self._handle_reconnection(ctx, last_event_id)
420+
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)
408421

409422
async def _handle_unexpected_content_type(
410423
self,

src/mcp/server/streamable_http.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from contextlib import asynccontextmanager
1616
from dataclasses import dataclass
1717
from http import HTTPStatus
18+
from typing import Any
1819

1920
import anyio
2021
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -224,6 +225,23 @@ async def close_stream_callback() -> None:
224225
)
225226
return SessionMessage(message, metadata=metadata)
226227

228+
async def _send_priming_event(
229+
self,
230+
request_id: RequestId,
231+
sse_stream_writer: MemoryObjectSendStream[dict[str, Any]],
232+
) -> None:
233+
"""Send priming event for SSE resumability if event_store is configured."""
234+
if not self._event_store:
235+
return
236+
priming_event_id = await self._event_store.store_event(
237+
str(request_id), # Convert RequestId to StreamId (str)
238+
None, # Priming event has no payload
239+
)
240+
priming_event: dict[str, str | int] = {"id": priming_event_id, "data": ""}
241+
if self._retry_interval is not None:
242+
priming_event["retry"] = self._retry_interval
243+
await sse_stream_writer.send(priming_event)
244+
227245
def _create_error_response(
228246
self,
229247
error_message: str,
@@ -512,16 +530,8 @@ async def sse_writer():
512530
# Get the request ID from the incoming request message
513531
try:
514532
async with sse_stream_writer, request_stream_reader:
515-
# Send priming event if event_store is configured
516-
# This sends an event with ID but empty data, enabling
517-
# the client to reconnect with Last-Event-ID if needed
518-
if self._event_store:
519-
priming_event_id = await self._event_store.store_event(
520-
request_id,
521-
None, # Priming event has no payload
522-
)
523-
priming_event = {"id": priming_event_id, "data": ""}
524-
await sse_stream_writer.send(priming_event)
533+
# Send priming event for SSE resumability
534+
await self._send_priming_event(request_id, sse_stream_writer)
525535

526536
# Process messages from the request-specific stream
527537
async for event_message in request_stream_reader:

0 commit comments

Comments
 (0)