Skip to content

Commit cfa7a51

Browse files
authored
mcp: add StreamableHTTPOptions.EventStore (#593)
Allow users to enable streamable replay by setting a non-nil event store. Updates #587
1 parent 8aee8d3 commit cfa7a51

File tree

2 files changed

+15
-21
lines changed

2 files changed

+15
-21
lines changed

mcp/streamable.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,11 @@ type StreamableHTTPOptions struct {
7272
// If nil, do not log.
7373
Logger *slog.Logger
7474

75-
// TODO(rfindley): file a proposal to export this option, or something equivalent.
76-
configureTransport func(req *http.Request, transport *StreamableServerTransport)
75+
// EventStore enables stream resumption.
76+
//
77+
// If set, EventStore will be used to persist stream events and replay them
78+
// upon stream resumption.
79+
EventStore EventStore
7780
}
7881

7982
// NewStreamableHTTPHandler returns a new [StreamableHTTPHandler].
@@ -237,12 +240,10 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
237240
transport = &StreamableServerTransport{
238241
SessionID: sessionID,
239242
Stateless: h.opts.Stateless,
243+
EventStore: h.opts.EventStore,
240244
jsonResponse: h.opts.JSONResponse,
241245
logger: h.opts.Logger,
242246
}
243-
if h.opts.configureTransport != nil {
244-
h.opts.configureTransport(req, transport)
245-
}
246247

247248
// To support stateless mode, we initialize the session with a default
248249
// state, so that it doesn't reject subsequent requests.

mcp/streamable_test.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,13 @@ func TestStreamableTransports(t *testing.T) {
125125

126126
// Start an httptest.Server with the StreamableHTTPHandler, wrapped in a
127127
// cookie-checking middleware.
128-
handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, &StreamableHTTPOptions{
128+
opts := &StreamableHTTPOptions{
129129
JSONResponse: test.useJSON,
130-
configureTransport: func(_ *http.Request, transport *StreamableServerTransport) {
131-
if test.replay {
132-
transport.EventStore = NewMemoryEventStore(nil)
133-
}
134-
},
135-
})
130+
}
131+
if test.replay {
132+
opts.EventStore = NewMemoryEventStore(nil)
133+
}
134+
handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, opts)
136135

137136
var (
138137
headerMu sync.Mutex
@@ -420,9 +419,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) {
420419
})
421420

422421
realServer := httptest.NewServer(mustNotPanic(t, NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, &StreamableHTTPOptions{
423-
configureTransport: func(_ *http.Request, t *StreamableServerTransport) {
424-
t.EventStore = NewMemoryEventStore(nil) // necessary for replay
425-
},
422+
EventStore: NewMemoryEventStore(nil), // necessary for replay
426423
})))
427424
t.Cleanup(func() {
428425
t.Log("Closing real HTTP server")
@@ -601,9 +598,7 @@ func TestServerInitiatedSSE(t *testing.T) {
601598
// However, it shouldn't be necessary to use replay here, as we should be
602599
// guaranteed that the standalone SSE stream is started by the time the
603600
// client is connected.
604-
configureTransport: func(_ *http.Request, transport *StreamableServerTransport) {
605-
transport.EventStore = NewMemoryEventStore(nil)
606-
},
601+
EventStore: NewMemoryEventStore(nil),
607602
}
608603
httpServer := httptest.NewServer(mustNotPanic(t, NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, opts)))
609604
defer httpServer.Close()
@@ -976,9 +971,7 @@ func TestStreamableServerTransport(t *testing.T) {
976971

977972
opts := &StreamableHTTPOptions{}
978973
if test.replay {
979-
opts.configureTransport = func(_ *http.Request, t *StreamableServerTransport) {
980-
t.EventStore = NewMemoryEventStore(nil)
981-
}
974+
opts.EventStore = NewMemoryEventStore(nil)
982975
}
983976
// Start the streamable handler.
984977
handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, opts)

0 commit comments

Comments
 (0)