diff --git a/lib/mcp/server/transports/streamable_http_transport.rb b/lib/mcp/server/transports/streamable_http_transport.rb index 0801ee8..ff9977d 100644 --- a/lib/mcp/server/transports/streamable_http_transport.rb +++ b/lib/mcp/server/transports/streamable_http_transport.rb @@ -19,6 +19,7 @@ def initialize(server, stateless: false) REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze + STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze def handle_request(request) case request.env["REQUEST_METHOD"] @@ -58,7 +59,7 @@ def send_notification(method, params = nil, session_id: nil) begin send_to_stream(session[:stream], notification) true - rescue IOError, Errno::EPIPE => e + rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: session_id, error: "Failed to send notification" }, @@ -77,7 +78,7 @@ def send_notification(method, params = nil, session_id: nil) begin send_to_stream(session[:stream], notification) sent_count += 1 - rescue IOError, Errno::EPIPE => e + rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: sid, error: "Failed to send notification" }, @@ -289,7 +290,7 @@ def send_response_to_stream(stream, response, session_id) message = JSON.parse(response) send_to_stream(stream, message) handle_accepted - rescue IOError, Errno::EPIPE => e + rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: session_id, error: "Stream closed during response" }, @@ -366,7 +367,7 @@ def send_keepalive_ping(session_id) send_ping_to_stream(@sessions[session_id][:stream]) end end - rescue IOError, Errno::EPIPE => e + rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: session_id, error: "Stream closed" }, diff --git a/test/mcp/server/transports/streamable_http_transport_test.rb b/test/mcp/server/transports/streamable_http_transport_test.rb index 0bbfdc1..96b5f43 100644 --- a/test/mcp/server/transports/streamable_http_transport_test.rb +++ b/test/mcp/server/transports/streamable_http_transport_test.rb @@ -210,6 +210,53 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase end end + test "handles POST request when Errno::ECONNRESET raised" do + # Create and initialize a session. + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Use a mock stream that raises Errno::ECONNRESET on write. + mock_stream = Object.new + mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET } + mock_stream.define_singleton_method(:close) {} + + # Connect with SSE using the mock stream. + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(mock_stream) if response[2].is_a?(Proc) + + # Give the stream time to set up. + sleep(0.1) + + request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { jsonrpc: "2.0", method: "ping", id: "789" }.to_json, + ) + + # This should handle Errno::ECONNRESET and return the original response. + response = @transport.handle_request(request) + assert_equal 200, response[0] + assert_equal({ "Content-Type" => "application/json" }, response[1]) + + # Verify session was cleaned up. + assert_not @transport.instance_variable_get(:@sessions).key?(session_id) + end + test "handles GET request with missing session ID" do request = create_rack_request( "GET", @@ -558,6 +605,156 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase assert_not @transport.instance_variable_get(:@sessions).key?(session_id) end + test "send_notification handles Errno::ECONNRESET gracefully" do + # Create and initialize a session. + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Use a mock stream that raises Errno::ECONNRESET on write. + mock_stream = Object.new + mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET } + mock_stream.define_singleton_method(:close) {} + + # Connect with SSE using the mock stream. + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(mock_stream) if response[2].is_a?(Proc) + + # Give the stream time to set up. + sleep(0.1) + + # Try to send notification - should handle ECONNRESET without raising. + result = @transport.send_notification("test", { message: "test" }, session_id: session_id) + + # Should return false and clean up the session. + refute result + + # Verify session was cleaned up. + assert_not @transport.instance_variable_get(:@sessions).key?(session_id) + end + + test "send_notification broadcast continues when one session raises Errno::ECONNRESET" do + # Create two sessions. + init_request1 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "1" }.to_json, + ) + init_response1 = @transport.handle_request(init_request1) + session_id1 = init_response1[1]["Mcp-Session-Id"] + + init_request2 = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "2" }.to_json, + ) + init_response2 = @transport.handle_request(init_request2) + session_id2 = init_response2[1]["Mcp-Session-Id"] + + # Session 1: mock stream that raises ECONNRESET. + broken_stream = Object.new + broken_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET } + broken_stream.define_singleton_method(:close) {} + + get_request1 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id1 }, + ) + response1 = @transport.handle_request(get_request1) + response1[2].call(broken_stream) if response1[2].is_a?(Proc) + + # Session 2: healthy stream. + healthy_stream = StringIO.new + get_request2 = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id2 }, + ) + response2 = @transport.handle_request(get_request2) + response2[2].call(healthy_stream) if response2[2].is_a?(Proc) + + # Give the streams time to set up. + sleep(0.1) + + # Broadcast notification - should not abort despite ECONNRESET from session 1. + sent_count = @transport.send_notification("test", { message: "hello" }, **{}) + + # Session 2 should have received the notification. + assert_equal 1, sent_count + + healthy_stream.rewind + output = healthy_stream.read + assert_includes output, '"method":"test"' + + # Session 1 should have been cleaned up. + assert_not @transport.instance_variable_get(:@sessions).key?(session_id1) + + # Session 2 should still exist. + assert @transport.instance_variable_get(:@sessions).key?(session_id2) + end + + test "send_keepalive_ping handles Errno::ECONNRESET gracefully" do + # Create and initialize a session. + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "123" }.to_json, + ) + init_response = @transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Use a mock stream that raises Errno::ECONNRESET on write. + mock_stream = Object.new + mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET } + mock_stream.define_singleton_method(:close) {} + + # Connect with SSE using the mock stream. + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + response = @transport.handle_request(get_request) + response[2].call(mock_stream) if response[2].is_a?(Proc) + + # Give the stream time to set up. + sleep(0.1) + + # send_keepalive_ping is private; re-raises to exit the keepalive loop. + # Errno::ECONNRESET should be caught by the rescue clause (which reports + # the exception) before being re-raised. Verify that exception_reporter + # is called — this fails if ECONNRESET is not in the rescue list. + reported_errors = [] + original_reporter = MCP.configuration.exception_reporter + MCP.configuration.exception_reporter = ->(error, context) { reported_errors << [error, context] } + + begin + assert_raises(Errno::ECONNRESET) do + @transport.send(:send_keepalive_ping, session_id) + end + + assert_equal(1, reported_errors.size) + assert_instance_of(Errno::ECONNRESET, reported_errors.first[0]) + assert_equal("Stream closed", reported_errors.first[1][:error]) + ensure + MCP.configuration.exception_reporter = original_reporter + end + end + test "responds with 405 for unsupported methods" do request = create_rack_request( "PUT",