@@ -1010,6 +1010,148 @@ describe('StreamableHTTPClientTransport', () => {
10101010 } ) ;
10111011 } ) ;
10121012
1013+ describe ( 'SSE retry field handling' , ( ) => {
1014+ beforeEach ( ( ) => {
1015+ vi . useFakeTimers ( ) ;
1016+ ( global . fetch as Mock ) . mockReset ( ) ;
1017+ } ) ;
1018+ afterEach ( ( ) => vi . useRealTimers ( ) ) ;
1019+
1020+ it ( 'should use server-provided retry value for reconnection delay' , async ( ) => {
1021+ transport = new StreamableHTTPClientTransport ( new URL ( 'http://localhost:1234/mcp' ) , {
1022+ reconnectionOptions : {
1023+ initialReconnectionDelay : 100 ,
1024+ maxReconnectionDelay : 5000 ,
1025+ reconnectionDelayGrowFactor : 2 ,
1026+ maxRetries : 3
1027+ }
1028+ } ) ;
1029+
1030+ // Create a stream that sends a retry field
1031+ const encoder = new TextEncoder ( ) ;
1032+ const stream = new ReadableStream ( {
1033+ start ( controller ) {
1034+ // Send SSE event with retry field
1035+ const event =
1036+ 'retry: 3000\nevent: message\nid: evt-1\ndata: {"jsonrpc": "2.0", "method": "notification", "params": {}}\n\n' ;
1037+ controller . enqueue ( encoder . encode ( event ) ) ;
1038+ // Close stream to trigger reconnection
1039+ controller . close ( ) ;
1040+ }
1041+ } ) ;
1042+
1043+ const fetchMock = global . fetch as Mock ;
1044+ fetchMock . mockResolvedValueOnce ( {
1045+ ok : true ,
1046+ status : 200 ,
1047+ headers : new Headers ( { 'content-type' : 'text/event-stream' } ) ,
1048+ body : stream
1049+ } ) ;
1050+
1051+ // Second request for reconnection
1052+ fetchMock . mockResolvedValueOnce ( {
1053+ ok : true ,
1054+ status : 200 ,
1055+ headers : new Headers ( { 'content-type' : 'text/event-stream' } ) ,
1056+ body : new ReadableStream ( )
1057+ } ) ;
1058+
1059+ await transport . start ( ) ;
1060+ await transport [ '_startOrAuthSse' ] ( { } ) ;
1061+
1062+ // Wait for stream to close and reconnection to be scheduled
1063+ await vi . advanceTimersByTimeAsync ( 100 ) ;
1064+
1065+ // Verify the server retry value was captured
1066+ const transportInternal = transport as unknown as { _serverRetryMs ?: number } ;
1067+ expect ( transportInternal . _serverRetryMs ) . toBe ( 3000 ) ;
1068+
1069+ // Verify the delay calculation uses server retry value
1070+ const getDelay = transport [ '_getNextReconnectionDelay' ] . bind ( transport ) ;
1071+ expect ( getDelay ( 0 ) ) . toBe ( 3000 ) ; // Should use server value, not 100ms initial
1072+ expect ( getDelay ( 5 ) ) . toBe ( 3000 ) ; // Should still use server value for any attempt
1073+ } ) ;
1074+
1075+ it ( 'should fall back to exponential backoff when no server retry value' , ( ) => {
1076+ transport = new StreamableHTTPClientTransport ( new URL ( 'http://localhost:1234/mcp' ) , {
1077+ reconnectionOptions : {
1078+ initialReconnectionDelay : 100 ,
1079+ maxReconnectionDelay : 5000 ,
1080+ reconnectionDelayGrowFactor : 2 ,
1081+ maxRetries : 3
1082+ }
1083+ } ) ;
1084+
1085+ // Without any SSE stream, _serverRetryMs should be undefined
1086+ const transportInternal = transport as unknown as { _serverRetryMs ?: number } ;
1087+ expect ( transportInternal . _serverRetryMs ) . toBeUndefined ( ) ;
1088+
1089+ // Should use exponential backoff
1090+ const getDelay = transport [ '_getNextReconnectionDelay' ] . bind ( transport ) ;
1091+ expect ( getDelay ( 0 ) ) . toBe ( 100 ) ; // 100 * 2^0
1092+ expect ( getDelay ( 1 ) ) . toBe ( 200 ) ; // 100 * 2^1
1093+ expect ( getDelay ( 2 ) ) . toBe ( 400 ) ; // 100 * 2^2
1094+ expect ( getDelay ( 10 ) ) . toBe ( 5000 ) ; // capped at max
1095+ } ) ;
1096+
1097+ it ( 'should reconnect on graceful stream close' , async ( ) => {
1098+ transport = new StreamableHTTPClientTransport ( new URL ( 'http://localhost:1234/mcp' ) , {
1099+ reconnectionOptions : {
1100+ initialReconnectionDelay : 10 ,
1101+ maxReconnectionDelay : 1000 ,
1102+ reconnectionDelayGrowFactor : 1 ,
1103+ maxRetries : 1
1104+ }
1105+ } ) ;
1106+
1107+ // Create a stream that closes gracefully after sending an event with ID
1108+ const encoder = new TextEncoder ( ) ;
1109+ const stream = new ReadableStream ( {
1110+ start ( controller ) {
1111+ // Send priming event with ID and retry field
1112+ const event = 'id: evt-1\nretry: 100\ndata: \n\n' ;
1113+ controller . enqueue ( encoder . encode ( event ) ) ;
1114+ // Graceful close
1115+ controller . close ( ) ;
1116+ }
1117+ } ) ;
1118+
1119+ const fetchMock = global . fetch as Mock ;
1120+ fetchMock . mockResolvedValueOnce ( {
1121+ ok : true ,
1122+ status : 200 ,
1123+ headers : new Headers ( { 'content-type' : 'text/event-stream' } ) ,
1124+ body : stream
1125+ } ) ;
1126+
1127+ // Second request for reconnection
1128+ fetchMock . mockResolvedValueOnce ( {
1129+ ok : true ,
1130+ status : 200 ,
1131+ headers : new Headers ( { 'content-type' : 'text/event-stream' } ) ,
1132+ body : new ReadableStream ( )
1133+ } ) ;
1134+
1135+ await transport . start ( ) ;
1136+ await transport [ '_startOrAuthSse' ] ( { } ) ;
1137+
1138+ // Wait for stream to process and close
1139+ await vi . advanceTimersByTimeAsync ( 50 ) ;
1140+
1141+ // Wait for reconnection delay (100ms from retry field)
1142+ await vi . advanceTimersByTimeAsync ( 150 ) ;
1143+
1144+ // Should have attempted reconnection
1145+ expect ( fetchMock ) . toHaveBeenCalledTimes ( 2 ) ;
1146+ expect ( fetchMock . mock . calls [ 0 ] [ 1 ] ?. method ) . toBe ( 'GET' ) ;
1147+ expect ( fetchMock . mock . calls [ 1 ] [ 1 ] ?. method ) . toBe ( 'GET' ) ;
1148+
1149+ // Second call should include Last-Event-ID
1150+ const secondCallHeaders = fetchMock . mock . calls [ 1 ] [ 1 ] ?. headers ;
1151+ expect ( secondCallHeaders ?. get ( 'last-event-id' ) ) . toBe ( 'evt-1' ) ;
1152+ } ) ;
1153+ } ) ;
1154+
10131155 describe ( 'prevent infinite recursion when server returns 401 after successful auth' , ( ) => {
10141156 it ( 'should throw error when server returns 401 after successful auth' , async ( ) => {
10151157 const message : JSONRPCMessage = {
0 commit comments