11import { Transport } from "../shared/transport.js" ;
2- import { isJSONRPCNotification , JSONRPCMessage , JSONRPCMessageSchema } from "../types.js" ;
2+ import { isJSONRPCNotification , isJSONRPCRequest , isJSONRPCResponse , JSONRPCMessage , JSONRPCMessageSchema } from "../types.js" ;
33import { auth , AuthResult , OAuthClientProvider , UnauthorizedError } from "./auth.js" ;
44import { EventSourceParserStream } from "eventsource-parser/stream" ;
55
@@ -23,11 +23,26 @@ export class StreamableHTTPError extends Error {
2323/**
2424 * Options for starting or authenticating an SSE connection
2525 */
26- export interface StartSSEOptions {
26+ interface StartSSEOptions {
2727 /**
28- * The ID of the last received event, used for resuming a disconnected stream
28+ * The resumption token used to continue long-running requests that were interrupted.
29+ *
30+ * This allows clients to reconnect and continue from where they left off.
31+ */
32+ resumptionToken ?: string ;
33+
34+ /**
35+ * A callback that is invoked when the resumption token changes.
36+ *
37+ * This allows clients to persist the latest token for potential reconnection.
2938 */
30- lastEventId ?: string ;
39+ onresumptiontoken ?: ( token : string ) => void ;
40+
41+ /**
42+ * Override Message ID to associate with the replay message
43+ * so that response can be associate with the new resumed request.
44+ */
45+ replayMessageId ?: string | number ;
3146}
3247
3348/**
@@ -88,6 +103,12 @@ export type StreamableHTTPClientTransportOptions = {
88103 * Options to configure the reconnection behavior.
89104 */
90105 reconnectionOptions ?: StreamableHTTPReconnectionOptions ;
106+
107+ /**
108+ * Session ID for the connection. This is used to identify the session on the server.
109+ * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID.
110+ */
111+ sessionId ?: string ;
91112} ;
92113
93114/**
@@ -114,6 +135,7 @@ export class StreamableHTTPClientTransport implements Transport {
114135 this . _url = url ;
115136 this . _requestInit = opts ?. requestInit ;
116137 this . _authProvider = opts ?. authProvider ;
138+ this . _sessionId = opts ?. sessionId ;
117139 this . _reconnectionOptions = opts ?. reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS ;
118140 }
119141
@@ -134,7 +156,7 @@ export class StreamableHTTPClientTransport implements Transport {
134156 throw new UnauthorizedError ( ) ;
135157 }
136158
137- return await this . _startOrAuthStandaloneSSE ( { lastEventId : undefined } ) ;
159+ return await this . _startOrAuthSse ( { resumptionToken : undefined } ) ;
138160 }
139161
140162 private async _commonHeaders ( ) : Promise < Headers > {
@@ -156,17 +178,17 @@ export class StreamableHTTPClientTransport implements Transport {
156178 }
157179
158180
159- private async _startOrAuthStandaloneSSE ( options : StartSSEOptions ) : Promise < void > {
160- const { lastEventId } = options ;
181+ private async _startOrAuthSse ( options : StartSSEOptions ) : Promise < void > {
182+ const { resumptionToken } = options ;
161183 try {
162184 // Try to open an initial SSE stream with GET to listen for server messages
163185 // This is optional according to the spec - server may not support it
164186 const headers = await this . _commonHeaders ( ) ;
165187 headers . set ( "Accept" , "text/event-stream" ) ;
166188
167189 // Include Last-Event-ID header for resumable streams if provided
168- if ( lastEventId ) {
169- headers . set ( "last-event-id" , lastEventId ) ;
190+ if ( resumptionToken ) {
191+ headers . set ( "last-event-id" , resumptionToken ) ;
170192 }
171193
172194 const response = await fetch ( this . _url , {
@@ -193,7 +215,7 @@ export class StreamableHTTPClientTransport implements Transport {
193215 ) ;
194216 }
195217
196- this . _handleSseStream ( response . body ) ;
218+ this . _handleSseStream ( response . body , options ) ;
197219 } catch ( error ) {
198220 this . onerror ?.( error as Error ) ;
199221 throw error ;
@@ -224,7 +246,7 @@ export class StreamableHTTPClientTransport implements Transport {
224246 * @param lastEventId The ID of the last received event for resumability
225247 * @param attemptCount Current reconnection attempt count for this specific stream
226248 */
227- private _scheduleReconnection ( lastEventId : string , attemptCount = 0 ) : void {
249+ private _scheduleReconnection ( options : StartSSEOptions , attemptCount = 0 ) : void {
228250 // Use provided options or default options
229251 const maxRetries = this . _reconnectionOptions . maxRetries ;
230252
@@ -240,18 +262,19 @@ export class StreamableHTTPClientTransport implements Transport {
240262 // Schedule the reconnection
241263 setTimeout ( ( ) => {
242264 // Use the last event ID to resume where we left off
243- this . _startOrAuthStandaloneSSE ( { lastEventId } ) . catch ( error => {
265+ this . _startOrAuthSse ( options ) . catch ( error => {
244266 this . onerror ?.( new Error ( `Failed to reconnect SSE stream: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
245267 // Schedule another attempt if this one failed, incrementing the attempt counter
246- this . _scheduleReconnection ( lastEventId , attemptCount + 1 ) ;
268+ this . _scheduleReconnection ( options , attemptCount + 1 ) ;
247269 } ) ;
248270 } , delay ) ;
249271 }
250272
251- private _handleSseStream ( stream : ReadableStream < Uint8Array > | null ) : void {
273+ private _handleSseStream ( stream : ReadableStream < Uint8Array > | null , options : StartSSEOptions ) : void {
252274 if ( ! stream ) {
253275 return ;
254276 }
277+ const { onresumptiontoken, replayMessageId } = options ;
255278
256279 let lastEventId : string | undefined ;
257280 const processStream = async ( ) => {
@@ -274,11 +297,15 @@ export class StreamableHTTPClientTransport implements Transport {
274297 // Update last event ID if provided
275298 if ( event . id ) {
276299 lastEventId = event . id ;
300+ onresumptiontoken ?.( event . id ) ;
277301 }
278302
279303 if ( ! event . event || event . event === "message" ) {
280304 try {
281305 const message = JSONRPCMessageSchema . parse ( JSON . parse ( event . data ) ) ;
306+ if ( replayMessageId !== undefined && isJSONRPCResponse ( message ) ) {
307+ message . id = replayMessageId ;
308+ }
282309 this . onmessage ?.( message ) ;
283310 } catch ( error ) {
284311 this . onerror ?.( error as Error ) ;
@@ -294,7 +321,11 @@ export class StreamableHTTPClientTransport implements Transport {
294321 // Use the exponential backoff reconnection strategy
295322 if ( lastEventId !== undefined ) {
296323 try {
297- this . _scheduleReconnection ( lastEventId , 0 ) ;
324+ this . _scheduleReconnection ( {
325+ resumptionToken : lastEventId ,
326+ onresumptiontoken,
327+ replayMessageId
328+ } , 0 ) ;
298329 }
299330 catch ( error ) {
300331 this . onerror ?.( new Error ( `Failed to reconnect: ${ error instanceof Error ? error . message : String ( error ) } ` ) ) ;
@@ -338,8 +369,16 @@ export class StreamableHTTPClientTransport implements Transport {
338369 this . onclose ?.( ) ;
339370 }
340371
341- async send ( message : JSONRPCMessage | JSONRPCMessage [ ] ) : Promise < void > {
372+ async send ( message : JSONRPCMessage | JSONRPCMessage [ ] , options ?: { resumptionToken ?: string , onresumptiontoken ?: ( token : string ) => void } ) : Promise < void > {
342373 try {
374+ const { resumptionToken, onresumptiontoken } = options || { } ;
375+
376+ if ( resumptionToken ) {
377+ // If we have at last event ID, we need to reconnect the SSE stream
378+ this . _startOrAuthSse ( { resumptionToken, replayMessageId : isJSONRPCRequest ( message ) ? message . id : undefined } ) . catch ( err => this . onerror ?.( err ) ) ;
379+ return ;
380+ }
381+
343382 const headers = await this . _commonHeaders ( ) ;
344383 headers . set ( "content-type" , "application/json" ) ;
345384 headers . set ( "accept" , "application/json, text/event-stream" ) ;
@@ -383,7 +422,7 @@ export class StreamableHTTPClientTransport implements Transport {
383422 // if it's supported by the server
384423 if ( isJSONRPCNotification ( message ) && message . method === "notifications/initialized" ) {
385424 // Start without a lastEventId since this is a fresh connection
386- this . _startOrAuthStandaloneSSE ( { lastEventId : undefined } ) . catch ( err => this . onerror ?.( err ) ) ;
425+ this . _startOrAuthSse ( { resumptionToken : undefined } ) . catch ( err => this . onerror ?.( err ) ) ;
387426 }
388427 return ;
389428 }
@@ -398,7 +437,10 @@ export class StreamableHTTPClientTransport implements Transport {
398437
399438 if ( hasRequests ) {
400439 if ( contentType ?. includes ( "text/event-stream" ) ) {
401- this . _handleSseStream ( response . body ) ;
440+ // Handle SSE stream responses for requests
441+ // We use the same handler as standalone streams, which now supports
442+ // reconnection with the last event ID
443+ this . _handleSseStream ( response . body , { onresumptiontoken } ) ;
402444 } else if ( contentType ?. includes ( "application/json" ) ) {
403445 // For non-streaming servers, we might get direct JSON responses
404446 const data = await response . json ( ) ;
@@ -421,4 +463,8 @@ export class StreamableHTTPClientTransport implements Transport {
421463 throw error ;
422464 }
423465 }
466+
467+ get sessionId ( ) : string | undefined {
468+ return this . _sessionId ;
469+ }
424470}
0 commit comments