From 0aa53e3b455b172078b12964282f401b18ccf542 Mon Sep 17 00:00:00 2001 From: Rob Findley Date: Mon, 20 Oct 2025 16:28:52 +0000 Subject: [PATCH] mcp: add StreamableHTTPOptions.EventStore Allow users to enable streamable replay by setting a non-nil event store. Updates #587 --- mcp/streamable.go | 11 ++++++----- mcp/streamable_test.go | 25 +++++++++---------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/mcp/streamable.go b/mcp/streamable.go index 7204e2b1..de88b03b 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -72,8 +72,11 @@ type StreamableHTTPOptions struct { // If nil, do not log. Logger *slog.Logger - // TODO(rfindley): file a proposal to export this option, or something equivalent. - configureTransport func(req *http.Request, transport *StreamableServerTransport) + // EventStore enables stream resumption. + // + // If set, EventStore will be used to persist stream events and replay them + // upon stream resumption. + EventStore EventStore } // NewStreamableHTTPHandler returns a new [StreamableHTTPHandler]. @@ -237,12 +240,10 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque transport = &StreamableServerTransport{ SessionID: sessionID, Stateless: h.opts.Stateless, + EventStore: h.opts.EventStore, jsonResponse: h.opts.JSONResponse, logger: h.opts.Logger, } - if h.opts.configureTransport != nil { - h.opts.configureTransport(req, transport) - } // To support stateless mode, we initialize the session with a default // state, so that it doesn't reject subsequent requests. diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index a0893689..b2c21aaa 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -95,14 +95,13 @@ func TestStreamableTransports(t *testing.T) { // Start an httptest.Server with the StreamableHTTPHandler, wrapped in a // cookie-checking middleware. - handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, &StreamableHTTPOptions{ + opts := &StreamableHTTPOptions{ JSONResponse: test.useJSON, - configureTransport: func(_ *http.Request, transport *StreamableServerTransport) { - if test.replay { - transport.EventStore = NewMemoryEventStore(nil) - } - }, - }) + } + if test.replay { + opts.EventStore = NewMemoryEventStore(nil) + } + handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, opts) var ( headerMu sync.Mutex @@ -386,9 +385,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) { }) realServer := httptest.NewServer(mustNotPanic(t, NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, &StreamableHTTPOptions{ - configureTransport: func(_ *http.Request, t *StreamableServerTransport) { - t.EventStore = NewMemoryEventStore(nil) // necessary for replay - }, + EventStore: NewMemoryEventStore(nil), // necessary for replay }))) t.Cleanup(func() { t.Log("Closing real HTTP server") @@ -567,9 +564,7 @@ func TestServerInitiatedSSE(t *testing.T) { // However, it shouldn't be necessary to use replay here, as we should be // guaranteed that the standalone SSE stream is started by the time the // client is connected. - configureTransport: func(_ *http.Request, transport *StreamableServerTransport) { - transport.EventStore = NewMemoryEventStore(nil) - }, + EventStore: NewMemoryEventStore(nil), } httpServer := httptest.NewServer(mustNotPanic(t, NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, opts))) defer httpServer.Close() @@ -942,9 +937,7 @@ func TestStreamableServerTransport(t *testing.T) { opts := &StreamableHTTPOptions{} if test.replay { - opts.configureTransport = func(_ *http.Request, t *StreamableServerTransport) { - t.EventStore = NewMemoryEventStore(nil) - } + opts.EventStore = NewMemoryEventStore(nil) } // Start the streamable handler. handler := NewStreamableHTTPHandler(func(req *http.Request) *Server { return server }, opts)