Skip to content
Merged
80 changes: 80 additions & 0 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,4 +592,84 @@ describe("StreamableHTTPClientTransport", () => {
await expect(transport.send(message)).rejects.toThrow(UnauthorizedError);
expect(mockAuthProvider.redirectToAuthorization.mock.calls).toHaveLength(1);
});


describe('Reconnection Logic', () => {
// Use fake timers to control setTimeout and make the test instant.
beforeEach(() => jest.useFakeTimers());
afterEach(() => jest.useRealTimers());

it('should reconnect on stream failure even without a lastEventId', async () => {
// ARRANGE

// 1. Configure a transport that will retry quickly and at least once.
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
reconnectionOptions: {
initialReconnectionDelay: 10, // Reconnect almost instantly for the test
maxReconnectionDelay: 100,
reconnectionDelayGrowFactor: 1,
maxRetries: 1, // We only need to see one successful retry attempt
}
});

const errorSpy = jest.fn();
transport.onerror = errorSpy;

// 2. Mock the initial GET request. It will connect, but the stream will die immediately.
// This simulates the GCloud proxy killing the connection.
const failingStream = new ReadableStream({
start(controller) {
// Simulate an abrupt network error.
controller.error(new Error("Network connection terminated"));
}
});

const fetchMock = global.fetch as jest.Mock;
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: failingStream,
});

// 3. Mock the SECOND GET request (the reconnection attempt). This one can succeed.
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: new ReadableStream(), // A stable, empty stream
});

// ACT

// 4. Start the transport and initiate the SSE connection.
await transport.start();
// We call the internal method directly to trigger the GET request.
// This is cleaner than sending a full 'initialize' message for this test.
await transport["_startOrAuthSse"]({});

// 5. Advance timers to trigger the setTimeout in _scheduleReconnection.
await jest.advanceTimersByTimeAsync(20); // More than the 10ms delay

// ASSERT

// 6. Verify the initial disconnect error was caught.
expect(errorSpy).toHaveBeenCalledTimes(1);
expect(errorSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: expect.stringContaining('SSE stream disconnected: Error: Network connection terminated'),
})
);

// 7. THIS IS THE KEY ASSERTION: Verify that a second fetch call was made.
// This proves the reconnection logic was triggered.
expect(fetchMock).toHaveBeenCalledTimes(2);

// 8. Verify the second call was a GET request without a last-event-id header.
const secondCall = fetchMock.mock.calls[1];
const secondRequest = secondCall[1];
expect(secondRequest.method).toBe('GET');
expect(secondRequest.headers.has('last-event-id')).toBe(false);
});
});
});
20 changes: 9 additions & 11 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,16 @@ const response = await (this._fetch ?? fetch)(this._url, {
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
if (this._abortController && !this._abortController.signal.aborted) {
// Use the exponential backoff reconnection strategy
if (lastEventId !== undefined) {
try {
this._scheduleReconnection({
resumptionToken: lastEventId,
onresumptiontoken,
replayMessageId
}, 0);
}
catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
try {
this._scheduleReconnection({
resumptionToken: lastEventId,
onresumptiontoken,
replayMessageId
}, 0);
}
catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));

}
}
}
}
Expand Down