From 963094710509965c87022377bc7d8f19e10b8103 Mon Sep 17 00:00:00 2001 From: chernistry Date: Mon, 6 Apr 2026 17:43:00 +0300 Subject: [PATCH 1/2] fix: prevent infinite retry loop in StreamableHTTP client reconnection The `_handle_reconnection` method reset its attempt counter to 0 when a stream ended normally (without exception), causing an infinite retry loop when the server repeatedly dropped connections after partially delivering events. This is because the recursive call on the "stream ended without response" path passed `attempt=0` instead of incrementing. Change the normal-close reconnection path to pass `attempt + 1`, matching the exception path, so total reconnection attempts are tracked across the entire chain regardless of how each individual stream ended. Fixes #2393 Co-Authored-By: Claude Opus 4.6 (1M context) --- src/mcp/client/streamable_http.py | 4 +- tests/shared/test_streamable_http.py | 81 +++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c633..7d4dde1d0 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -421,9 +421,9 @@ async def _handle_reconnection( await event_source.response.aclose() return - # Stream ended again without response - reconnect again (reset attempt counter) + # Stream ended again without response - reconnect again logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) + await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, attempt + 1) except Exception as e: # pragma: no cover logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..7601be62c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -14,7 +14,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import Any -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from urllib.parse import urlparse import anyio @@ -29,7 +29,14 @@ from mcp import MCPError, types from mcp.client.session import ClientSession -from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client +from mcp.client.streamable_http import ( + MAX_RECONNECTION_ATTEMPTS, + StreamableHTTPTransport, + streamable_http_client, +) +from mcp.client.streamable_http import ( + RequestContext as ClientRequestContext, +) from mcp.server import Server, ServerRequestContext from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -2318,3 +2325,73 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_handle_reconnection_does_not_retry_infinitely(): + """Reconnection must count TOTAL attempts, not reset on each successful connect. + + Regression test for #2393: when a stream connects successfully but drops + before delivering a response, the attempt counter was reset to 0 on the + recursive call, allowing an infinite retry loop. + + This test simulates a stream that connects, yields one non-completing SSE + event, then ends — repeatedly. With MAX_RECONNECTION_ATTEMPTS the loop + must terminate. + """ + transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") + transport.session_id = "test-session" + + # Track how many times aconnect_sse is called + connect_count = 0 + + @asynccontextmanager + async def fake_aconnect_sse(*args: Any, **kwargs: Any) -> AsyncIterator[Any]: + """Simulate a stream that connects OK, yields one event, then ends.""" + nonlocal connect_count + connect_count += 1 + + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + + # Yield a single non-completing notification SSE event, then end the stream + # (simulating a server that drops the connection after partial delivery) + async def aiter_sse() -> AsyncIterator[ServerSentEvent]: + yield ServerSentEvent( + event="message", + data='{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"tok","progress":1,"total":10}}', + id=f"evt-{connect_count}", + retry=None, + ) + + event_source = MagicMock() + event_source.response = mock_response + event_source.aiter_sse = aiter_sse + yield event_source + + # Build a minimal RequestContext for _handle_reconnection + write_stream, read_stream = create_context_streams[SessionMessage | Exception](32) + + async with write_stream, read_stream: + request_message = JSONRPCRequest(jsonrpc="2.0", id="req-1", method="tools/call", params={}) + session_message = SessionMessage(request_message) + ctx = ClientRequestContext( + client=MagicMock(), + session_id="test-session", + session_message=session_message, + metadata=None, + read_stream_writer=write_stream, + ) + + with patch("mcp.client.streamable_http.aconnect_sse", fake_aconnect_sse): + # Use a short sleep override so the test doesn't wait on reconnection delays + with patch("mcp.client.streamable_http.DEFAULT_RECONNECTION_DELAY_MS", 0): + await transport._handle_reconnection(ctx, last_event_id="evt-0", retry_interval_ms=0) + + # The method should have connected at most MAX_RECONNECTION_ATTEMPTS times + # (one for the initial call at attempt=0, then up to MAX-1 more) + assert connect_count <= MAX_RECONNECTION_ATTEMPTS, ( + f"Expected at most {MAX_RECONNECTION_ATTEMPTS} reconnection attempts, " + f"but aconnect_sse was called {connect_count} times — " + f"the attempt counter is not being incremented across reconnections" + ) From 8761f6022d18eb314610229ce312f08fc70f64be Mon Sep 17 00:00:00 2001 From: chernistry Date: Mon, 6 Apr 2026 20:36:24 +0300 Subject: [PATCH 2/2] fix: reset retry counter only on forward progress, not unconditionally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix (attempt + 1 unconditionally) broke legitimate multi-reconnection scenarios where the server checkpoints progress between disconnections. Now we check whether new SSE events were delivered (last_event_id changed) — if so, reset the counter; if not, increment it toward MAX_RECONNECTION_ATTEMPTS. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/mcp/client/streamable_http.py | 8 +++++++- tests/shared/test_streamable_http.py | 15 +++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 7d4dde1d0..2aa8ee977 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -423,7 +423,13 @@ async def _handle_reconnection( # Stream ended again without response - reconnect again logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, attempt + 1) + # Reset attempt counter only if the stream delivered new events + # (i.e. made forward progress). If no new events arrived, the + # server is connecting then dropping immediately — count that + # towards the retry budget to avoid infinite loops (#2393). + made_progress = reconnect_last_event_id != last_event_id + next_attempt = 0 if made_progress else attempt + 1 + await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt) except Exception as e: # pragma: no cover logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 7601be62c..f6e3afe9a 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2329,15 +2329,16 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( @pytest.mark.anyio async def test_handle_reconnection_does_not_retry_infinitely(): - """Reconnection must count TOTAL attempts, not reset on each successful connect. + """Reconnection must give up when no forward progress is made. Regression test for #2393: when a stream connects successfully but drops before delivering a response, the attempt counter was reset to 0 on the recursive call, allowing an infinite retry loop. This test simulates a stream that connects, yields one non-completing SSE - event, then ends — repeatedly. With MAX_RECONNECTION_ATTEMPTS the loop - must terminate. + event with the SAME event ID each time (no new data), then ends — + repeatedly. Without forward progress the loop must terminate within + MAX_RECONNECTION_ATTEMPTS. """ transport = StreamableHTTPTransport(url="http://localhost:8000/mcp") transport.session_id = "test-session" @@ -2354,13 +2355,15 @@ async def fake_aconnect_sse(*args: Any, **kwargs: Any) -> AsyncIterator[Any]: mock_response = MagicMock() mock_response.raise_for_status = MagicMock() - # Yield a single non-completing notification SSE event, then end the stream - # (simulating a server that drops the connection after partial delivery) + # Yield a single non-completing notification SSE event with the SAME + # event ID every time, then end the stream. Because the ID never + # changes, the transport sees no forward progress and should count + # each reconnection towards the retry budget. async def aiter_sse() -> AsyncIterator[ServerSentEvent]: yield ServerSentEvent( event="message", data='{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"tok","progress":1,"total":10}}', - id=f"evt-{connect_count}", + id="evt-static", retry=None, )