@@ -842,8 +842,8 @@ describe("StreamableHTTPServerTransport", () => {
842842 } ,
843843 body : JSON . stringify ( initMessage ) ,
844844 } ) ;
845-
846- await transport . handleRequest ( initReq , mockResponse ) ;
845+ const initResponse = createMockResponse ( ) ;
846+ await transport . handleRequest ( initReq , initResponse ) ;
847847 mockResponse . writeHead . mockClear ( ) ;
848848 } ) ;
849849
@@ -934,6 +934,136 @@ describe("StreamableHTTPServerTransport", () => {
934934 // Now stream should be closed
935935 expect ( mockResponse . end ) . toHaveBeenCalled ( ) ;
936936 } ) ;
937+
938+ it ( "should keep stream open when multiple requests share the same connection" , async ( ) => {
939+ // Create a fresh response for this test
940+ const sharedResponse = createMockResponse ( ) ;
941+
942+ // Send two requests in a batch that will share the same connection
943+ const batchRequests : JSONRPCMessage [ ] = [
944+ { jsonrpc : "2.0" , method : "method1" , params : { } , id : "req1" } ,
945+ { jsonrpc : "2.0" , method : "method2" , params : { } , id : "req2" }
946+ ] ;
947+
948+ const req = createMockRequest ( {
949+ method : "POST" ,
950+ headers : {
951+ "content-type" : "application/json" ,
952+ "accept" : "application/json, text/event-stream" ,
953+ "mcp-session-id" : transport . sessionId
954+ } ,
955+ body : JSON . stringify ( batchRequests )
956+ } ) ;
957+
958+ await transport . handleRequest ( req , sharedResponse ) ;
959+
960+ // Respond to first request
961+ const response1 : JSONRPCMessage = {
962+ jsonrpc : "2.0" ,
963+ result : { value : "result1" } ,
964+ id : "req1"
965+ } ;
966+
967+ await transport . send ( response1 ) ;
968+
969+ // Connection should remain open because req2 is still pending
970+ expect ( sharedResponse . write ) . toHaveBeenCalledWith (
971+ expect . stringContaining ( `event: message\ndata: ${ JSON . stringify ( response1 ) } \n\n` )
972+ ) ;
973+ expect ( sharedResponse . end ) . not . toHaveBeenCalled ( ) ;
974+
975+ // Respond to second request
976+ const response2 : JSONRPCMessage = {
977+ jsonrpc : "2.0" ,
978+ result : { value : "result2" } ,
979+ id : "req2"
980+ } ;
981+
982+ await transport . send ( response2 ) ;
983+
984+ // Now connection should close as all requests are complete
985+ expect ( sharedResponse . write ) . toHaveBeenCalledWith (
986+ expect . stringContaining ( `event: message\ndata: ${ JSON . stringify ( response2 ) } \n\n` )
987+ ) ;
988+ expect ( sharedResponse . end ) . toHaveBeenCalled ( ) ;
989+ } ) ;
990+
991+ it ( "should clean up connection tracking when a response is sent" , async ( ) => {
992+ const req = createMockRequest ( {
993+ method : "POST" ,
994+ headers : {
995+ "content-type" : "application/json" ,
996+ "accept" : "application/json, text/event-stream" ,
997+ "mcp-session-id" : transport . sessionId
998+ } ,
999+ body : JSON . stringify ( {
1000+ jsonrpc : "2.0" ,
1001+ method : "test" ,
1002+ params : { } ,
1003+ id : "cleanup-test"
1004+ } )
1005+ } ) ;
1006+
1007+ const response = createMockResponse ( ) ;
1008+ await transport . handleRequest ( req , response ) ;
1009+
1010+ // Verify that the request is tracked in the SSE map
1011+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 2 ) ;
1012+ expect ( transport [ "_sseResponseMapping" ] . has ( "cleanup-test" ) ) . toBe ( true ) ;
1013+
1014+ // Send a response
1015+ await transport . send ( {
1016+ jsonrpc : "2.0" ,
1017+ result : { } ,
1018+ id : "cleanup-test"
1019+ } ) ;
1020+
1021+ // Verify that the mapping was cleaned up
1022+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 1 ) ;
1023+ expect ( transport [ "_sseResponseMapping" ] . has ( "cleanup-test" ) ) . toBe ( false ) ;
1024+ } ) ;
1025+
1026+ it ( "should clean up connection tracking when client disconnects" , async ( ) => {
1027+ // Setup two requests that share a connection
1028+ const req = createMockRequest ( {
1029+ method : "POST" ,
1030+ headers : {
1031+ "content-type" : "application/json" ,
1032+ "accept" : "application/json, text/event-stream" ,
1033+ "mcp-session-id" : transport . sessionId
1034+ } ,
1035+ body : JSON . stringify ( [
1036+ { jsonrpc : "2.0" , method : "longRunning1" , params : { } , id : "req1" } ,
1037+ { jsonrpc : "2.0" , method : "longRunning2" , params : { } , id : "req2" }
1038+ ] )
1039+ } ) ;
1040+
1041+ const response = createMockResponse ( ) ;
1042+
1043+ // We need to manually store the callback to trigger it later
1044+ let closeCallback : ( ( ) => void ) | undefined ;
1045+ response . on . mockImplementation ( ( event , callback : ( ) => void ) => {
1046+ if ( typeof event === "string" && event === "close" ) {
1047+ closeCallback = callback ;
1048+ }
1049+ return response ;
1050+ } ) ;
1051+
1052+ await transport . handleRequest ( req , response ) ;
1053+
1054+ // Both requests should be mapped to the same response
1055+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 3 ) ;
1056+ expect ( transport [ "_sseResponseMapping" ] . get ( "req1" ) ) . toBe ( response ) ;
1057+ expect ( transport [ "_sseResponseMapping" ] . get ( "req2" ) ) . toBe ( response ) ;
1058+
1059+ // Simulate client disconnect by triggering the stored callback
1060+ if ( closeCallback ) closeCallback ( ) ;
1061+
1062+ // All entries using this response should be removed
1063+ expect ( transport [ "_sseResponseMapping" ] . size ) . toBe ( 1 ) ;
1064+ expect ( transport [ "_sseResponseMapping" ] . has ( "req1" ) ) . toBe ( false ) ;
1065+ expect ( transport [ "_sseResponseMapping" ] . has ( "req2" ) ) . toBe ( false ) ;
1066+ } ) ;
9371067 } ) ;
9381068
9391069 describe ( "Message Targeting" , ( ) => {
0 commit comments