11import { Transport } from "../shared/transport.js" ;
22import { JSONRPCMessage , JSONRPCMessageSchema } from "../types.js" ;
33import { auth , AuthResult , OAuthClientProvider , UnauthorizedError } from "./auth.js" ;
4- import { type ErrorEvent } from "eventsource" ;
54import { EventSourceMessage , EventSourceParserStream } from 'eventsource-parser/stream' ;
65export class StreamableHTTPError extends Error {
76 constructor (
87 public readonly code : number | undefined ,
98 message : string | undefined ,
10- public readonly event : ErrorEvent ,
119 ) {
1210 super ( `Streamable HTTP error: ${ message } ` ) ;
1311 }
@@ -83,7 +81,7 @@ export class StreamableHTTPClientTransport implements Transport {
8381 throw new UnauthorizedError ( ) ;
8482 }
8583
86- return await this . _startOrAuth ( ) ;
84+ return await this . _startOrAuthStandaloneSSE ( ) ;
8785 }
8886
8987 private async _commonHeaders ( ) : Promise < HeadersInit > {
@@ -102,7 +100,7 @@ export class StreamableHTTPClientTransport implements Transport {
102100 return headers ;
103101 }
104102
105- private async _startOrAuth ( ) : Promise < void > {
103+ private async _startOrAuthStandaloneSSE ( ) : Promise < void > {
106104 try {
107105 // Try to open an initial SSE stream with GET to listen for server messages
108106 // This is optional according to the spec - server may not support it
@@ -121,32 +119,76 @@ export class StreamableHTTPClientTransport implements Transport {
121119 signal : this . _abortController ?. signal ,
122120 } ) ;
123121
124- if ( response . status === 405 || response . status === 404 ) {
125- // Server doesn't support GET for SSE, which is allowed by the spec
126- // We'll rely on SSE responses to POST requests for communication
127- return ;
128- }
129-
130122 if ( ! response . ok ) {
131123 if ( response . status === 401 && this . _authProvider ) {
132124 // Need to authenticate
133125 return await this . _authThenStart ( ) ;
134126 }
135127
136- const error = new Error ( `Failed to open SSE stream: ${ response . status } ${ response . statusText } ` ) ;
128+ const error = new StreamableHTTPError (
129+ response . status ,
130+ `Failed to open SSE stream: ${ response . statusText } ` ,
131+ ) ;
137132 this . onerror ?.( error ) ;
138133 throw error ;
139134 }
140135
141136 // Successful connection, handle the SSE stream as a standalone listener
142- const streamId = `initial -${ Date . now ( ) } ` ;
137+ const streamId = `standalone-sse -${ Date . now ( ) } ` ;
143138 this . _handleSseStream ( response . body , streamId ) ;
144139 } catch ( error ) {
145140 this . onerror ?.( error as Error ) ;
146141 throw error ;
147142 }
148143 }
149144
145+ private _handleSseStream ( stream : ReadableStream < Uint8Array > | null , streamId : string ) : void {
146+ if ( ! stream ) {
147+ return ;
148+ }
149+
150+ // Create a pipeline: binary stream -> text decoder -> SSE parser
151+ const eventStream = stream
152+ . pipeThrough ( new TextDecoderStream ( ) )
153+ . pipeThrough ( new EventSourceParserStream ( ) ) ;
154+
155+ const reader = eventStream . getReader ( ) ;
156+ this . _activeStreams . set ( streamId , reader ) ;
157+
158+ const processStream = async ( ) => {
159+ try {
160+ while ( true ) {
161+ const { done, value : event } = await reader . read ( ) ;
162+ if ( done ) {
163+ this . _activeStreams . delete ( streamId ) ;
164+ break ;
165+ }
166+
167+ // Update last event ID if provided
168+ if ( event . id ) {
169+ this . _lastEventId = event . id ;
170+ }
171+
172+ // Handle message events (default event type is undefined per docs)
173+ // or explicit 'message' event type
174+ if ( ! event . event || event . event === 'message' ) {
175+ try {
176+ const message = JSONRPCMessageSchema . parse ( JSON . parse ( event . data ) ) ;
177+ this . onmessage ?.( message ) ;
178+ } catch ( error ) {
179+ this . onerror ?.( error as Error ) ;
180+ }
181+ }
182+ }
183+ } catch ( error ) {
184+ this . _activeStreams . delete ( streamId ) ;
185+ this . onerror ?.( error as Error ) ;
186+ }
187+ } ;
188+
189+ processStream ( ) ;
190+ }
191+
150192 async start ( ) {
151193 if ( this . _activeStreams . size > 0 ) {
152194 throw new Error (
@@ -155,7 +197,6 @@ export class StreamableHTTPClientTransport implements Transport {
155197 }
156198
157199 this . _abortController = new AbortController ( ) ;
158- return await this . _startOrAuth ( ) ;
159200 }
160201
161202 /**
@@ -271,50 +312,17 @@ export class StreamableHTTPClientTransport implements Transport {
271312 }
272313 }
273314
274- private _handleSseStream ( stream : ReadableStream < Uint8Array > | null , streamId : string ) : void {
275- if ( ! stream ) {
276- return ;
315+ /**
316+ * Opens SSE stream to receive messages from the server.
317+ *
318+ * This allows the server to push messages to the client without requiring the client
319+ * to first send a request via HTTP POST. Some servers may not support this feature.
320+ * If authentication is required but fails, this method will throw an UnauthorizedError.
321+ */
322+ async openSseStream ( ) : Promise < void > {
323+ if ( ! this . _abortController ) {
324+ this . _abortController = new AbortController ( ) ;
277325 }
278-
279- // Create a pipeline: binary stream -> text decoder -> SSE parser
280- const eventStream = stream
281- . pipeThrough ( new TextDecoderStream ( ) )
282- . pipeThrough ( new EventSourceParserStream ( ) ) ;
283-
284- const reader = eventStream . getReader ( ) ;
285- this . _activeStreams . set ( streamId , reader ) ;
286-
287- const processStream = async ( ) => {
288- try {
289- while ( true ) {
290- const { done, value : event } = await reader . read ( ) ;
291- if ( done ) {
292- this . _activeStreams . delete ( streamId ) ;
293- break ;
294- }
295-
296- // Update last event ID if provided
297- if ( event . id ) {
298- this . _lastEventId = event . id ;
299- }
300-
301- // Handle message events (default event type is undefined per docs)
302- // or explicit 'message' event type
303- if ( ! event . event || event . event === 'message' ) {
304- try {
305- const message = JSONRPCMessageSchema . parse ( JSON . parse ( event . data ) ) ;
306- this . onmessage ?.( message ) ;
307- } catch ( error ) {
308- this . onerror ?.( error as Error ) ;
309- }
310- }
311- }
312- } catch ( error ) {
313- this . _activeStreams . delete ( streamId ) ;
314- this . onerror ?.( error as Error ) ;
315- }
316- } ;
317-
318- processStream ( ) ;
326+ await this . _startOrAuthStandaloneSSE ( ) ;
319327 }
320328}
0 commit comments