Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def initialize(server, stateless: false)

REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze
REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze
STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze

def handle_request(request)
case request.env["REQUEST_METHOD"]
Expand Down Expand Up @@ -58,7 +59,7 @@ def send_notification(method, params = nil, session_id: nil)
begin
send_to_stream(session[:stream], notification)
true
rescue IOError, Errno::EPIPE => e
rescue *STREAM_WRITE_ERRORS => e
MCP.configuration.exception_reporter.call(
e,
{ session_id: session_id, error: "Failed to send notification" },
Expand All @@ -77,7 +78,7 @@ def send_notification(method, params = nil, session_id: nil)
begin
send_to_stream(session[:stream], notification)
sent_count += 1
rescue IOError, Errno::EPIPE => e
rescue *STREAM_WRITE_ERRORS => e
MCP.configuration.exception_reporter.call(
e,
{ session_id: sid, error: "Failed to send notification" },
Expand Down Expand Up @@ -289,7 +290,7 @@ def send_response_to_stream(stream, response, session_id)
message = JSON.parse(response)
send_to_stream(stream, message)
handle_accepted
rescue IOError, Errno::EPIPE => e
rescue *STREAM_WRITE_ERRORS => e
MCP.configuration.exception_reporter.call(
e,
{ session_id: session_id, error: "Stream closed during response" },
Expand Down Expand Up @@ -366,7 +367,7 @@ def send_keepalive_ping(session_id)
send_ping_to_stream(@sessions[session_id][:stream])
end
end
rescue IOError, Errno::EPIPE => e
rescue *STREAM_WRITE_ERRORS => e
MCP.configuration.exception_reporter.call(
e,
{ session_id: session_id, error: "Stream closed" },
Expand Down
197 changes: 197 additions & 0 deletions test/mcp/server/transports/streamable_http_transport_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,53 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
end
end

test "handles POST request when Errno::ECONNRESET raised" do
# Create and initialize a session.
init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
)
init_response = @transport.handle_request(init_request)
session_id = init_response[1]["Mcp-Session-Id"]

# Use a mock stream that raises Errno::ECONNRESET on write.
mock_stream = Object.new
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
mock_stream.define_singleton_method(:close) {}

# Connect with SSE using the mock stream.
get_request = create_rack_request(
"GET",
"/",
{ "HTTP_MCP_SESSION_ID" => session_id },
)
response = @transport.handle_request(get_request)
response[2].call(mock_stream) if response[2].is_a?(Proc)

# Give the stream time to set up.
sleep(0.1)

request = create_rack_request(
"POST",
"/",
{
"CONTENT_TYPE" => "application/json",
"HTTP_MCP_SESSION_ID" => session_id,
},
{ jsonrpc: "2.0", method: "ping", id: "789" }.to_json,
)

# This should handle Errno::ECONNRESET and return the original response.
response = @transport.handle_request(request)
assert_equal 200, response[0]
assert_equal({ "Content-Type" => "application/json" }, response[1])

# Verify session was cleaned up.
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
end

test "handles GET request with missing session ID" do
request = create_rack_request(
"GET",
Expand Down Expand Up @@ -558,6 +605,156 @@ class StreamableHTTPTransportTest < ActiveSupport::TestCase
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
end

test "send_notification handles Errno::ECONNRESET gracefully" do
# Create and initialize a session.
init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
)
init_response = @transport.handle_request(init_request)
session_id = init_response[1]["Mcp-Session-Id"]

# Use a mock stream that raises Errno::ECONNRESET on write.
mock_stream = Object.new
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
mock_stream.define_singleton_method(:close) {}

# Connect with SSE using the mock stream.
get_request = create_rack_request(
"GET",
"/",
{ "HTTP_MCP_SESSION_ID" => session_id },
)
response = @transport.handle_request(get_request)
response[2].call(mock_stream) if response[2].is_a?(Proc)

# Give the stream time to set up.
sleep(0.1)

# Try to send notification - should handle ECONNRESET without raising.
result = @transport.send_notification("test", { message: "test" }, session_id: session_id)

# Should return false and clean up the session.
refute result

# Verify session was cleaned up.
assert_not @transport.instance_variable_get(:@sessions).key?(session_id)
end

test "send_notification broadcast continues when one session raises Errno::ECONNRESET" do
# Create two sessions.
init_request1 = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "1" }.to_json,
)
init_response1 = @transport.handle_request(init_request1)
session_id1 = init_response1[1]["Mcp-Session-Id"]

init_request2 = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "2" }.to_json,
)
init_response2 = @transport.handle_request(init_request2)
session_id2 = init_response2[1]["Mcp-Session-Id"]

# Session 1: mock stream that raises ECONNRESET.
broken_stream = Object.new
broken_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
broken_stream.define_singleton_method(:close) {}

get_request1 = create_rack_request(
"GET",
"/",
{ "HTTP_MCP_SESSION_ID" => session_id1 },
)
response1 = @transport.handle_request(get_request1)
response1[2].call(broken_stream) if response1[2].is_a?(Proc)

# Session 2: healthy stream.
healthy_stream = StringIO.new
get_request2 = create_rack_request(
"GET",
"/",
{ "HTTP_MCP_SESSION_ID" => session_id2 },
)
response2 = @transport.handle_request(get_request2)
response2[2].call(healthy_stream) if response2[2].is_a?(Proc)

# Give the streams time to set up.
sleep(0.1)

# Broadcast notification - should not abort despite ECONNRESET from session 1.
sent_count = @transport.send_notification("test", { message: "hello" }, **{})

# Session 2 should have received the notification.
assert_equal 1, sent_count

healthy_stream.rewind
output = healthy_stream.read
assert_includes output, '"method":"test"'

# Session 1 should have been cleaned up.
assert_not @transport.instance_variable_get(:@sessions).key?(session_id1)

# Session 2 should still exist.
assert @transport.instance_variable_get(:@sessions).key?(session_id2)
end

test "send_keepalive_ping handles Errno::ECONNRESET gracefully" do
# Create and initialize a session.
init_request = create_rack_request(
"POST",
"/",
{ "CONTENT_TYPE" => "application/json" },
{ jsonrpc: "2.0", method: "initialize", id: "123" }.to_json,
)
init_response = @transport.handle_request(init_request)
session_id = init_response[1]["Mcp-Session-Id"]

# Use a mock stream that raises Errno::ECONNRESET on write.
mock_stream = Object.new
mock_stream.define_singleton_method(:write) { |_data| raise Errno::ECONNRESET }
mock_stream.define_singleton_method(:close) {}

# Connect with SSE using the mock stream.
get_request = create_rack_request(
"GET",
"/",
{ "HTTP_MCP_SESSION_ID" => session_id },
)
response = @transport.handle_request(get_request)
response[2].call(mock_stream) if response[2].is_a?(Proc)

# Give the stream time to set up.
sleep(0.1)

# send_keepalive_ping is private; re-raises to exit the keepalive loop.
# Errno::ECONNRESET should be caught by the rescue clause (which reports
# the exception) before being re-raised. Verify that exception_reporter
# is called — this fails if ECONNRESET is not in the rescue list.
reported_errors = []
original_reporter = MCP.configuration.exception_reporter
MCP.configuration.exception_reporter = ->(error, context) { reported_errors << [error, context] }

begin
assert_raises(Errno::ECONNRESET) do
@transport.send(:send_keepalive_ping, session_id)
end

assert_equal(1, reported_errors.size)
assert_instance_of(Errno::ECONNRESET, reported_errors.first[0])
assert_equal("Stream closed", reported_errors.first[1][:error])
ensure
MCP.configuration.exception_reporter = original_reporter
end
end

test "responds with 405 for unsupported methods" do
request = create_rack_request(
"PUT",
Expand Down