From 0af54b492e34dba71fd147ec18bc38295a0038ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Thu, 18 Sep 2025 17:50:17 +0200 Subject: [PATCH 01/13] mcp: use github.com/uber-go/goleak to test for leaks in unit tests --- mcp/cmd_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index cbaadcb0..dfa750cf 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -18,6 +18,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/modelcontextprotocol/go-sdk/mcp" + "go.uber.org/goleak" ) const runAsServer = "_MCP_RUN_AS_SERVER" @@ -35,6 +36,8 @@ func SayHi(ctx context.Context, req *mcp.CallToolRequest, args SayHiParams) (*mc } func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) + // If the runAsServer variable is set, execute the relevant serverFunc // instead of running tests (aka the fork and exec trick). if name := os.Getenv(runAsServer); name != "" { From c2458f9a769b73239c6737cab3c95be5578e9734 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Thu, 18 Sep 2025 18:06:34 +0200 Subject: [PATCH 02/13] mcp: fix goroutine leaks in unit tests Fixes #489 --- go.mod | 1 + go.sum | 12 ++++++++++-- mcp/client_example_test.go | 9 +++++++-- mcp/cmd_test.go | 4 ++++ mcp/mcp_test.go | 29 ++++++++++------------------- mcp/server.go | 1 + mcp/streamable_example_test.go | 4 ++-- mcp/streamable_test.go | 20 ++++++++++++++++++-- mcp/transport.go | 9 ++++++++- mcp/transport_example_test.go | 9 +++++++-- mcp/transport_test.go | 2 ++ 11 files changed, 70 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 06896252..86c0d489 100644 --- a/go.mod +++ b/go.mod @@ -7,5 +7,6 @@ require ( github.com/google/go-cmp v0.7.0 github.com/google/jsonschema-go v0.2.3 github.com/yosida95/uritemplate/v3 v3.0.2 + go.uber.org/goleak v1.3.0 golang.org/x/tools v0.34.0 ) diff --git a/go.sum b/go.sum index 7ccdc6a8..eee260d0 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,20 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/jsonschema-go v0.2.3-0.20250911201137-bbdc431016d2 h1:IIj7X4SH1HKy0WfPR4nNEj4dhIJWGdXM5YoBAbfpdoo= -github.com/google/jsonschema-go v0.2.3-0.20250911201137-bbdc431016d2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= github.com/google/jsonschema-go v0.2.3 h1:dkP3B96OtZKKFvdrUSaDkL+YDx8Uw9uC4Y+eukpCnmM= github.com/google/jsonschema-go v0.2.3/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mcp/client_example_test.go b/mcp/client_example_test.go index 3c3c3837..2bcde569 100644 --- a/mcp/client_example_test.go +++ b/mcp/client_example_test.go @@ -42,12 +42,17 @@ func Example_roots() { // Connect the server and client... t1, t2 := mcp.NewInMemoryTransports() - if _, err := s.Connect(ctx, t1, nil); err != nil { + sess1, err := s.Connect(ctx, t1, nil) + if err != nil { log.Fatal(err) } - if _, err := c.Connect(ctx, t2, nil); err != nil { + defer sess1.Close() + + sess2, err := c.Connect(ctx, t2, nil) + if err != nil { log.Fatal(err) } + defer sess2.Close() // ...and add a root. The server is notified about the change. c.AddRoots(&mcp.Root{URI: "file://b"}) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index dfa750cf..4af35667 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -100,6 +100,8 @@ func TestServerRunContextCancel(t *testing.T) { if err != nil { t.Fatal(err) } + t.Cleanup(func() { session.Close() }) + if err := session.Ping(context.Background(), nil); err != nil { t.Fatal(err) } @@ -120,6 +122,7 @@ func TestServerRunContextCancel(t *testing.T) { } func TestServerInterrupt(t *testing.T) { + t.Skip() if runtime.GOOS == "windows" { t.Skip("requires POSIX signals") } @@ -205,6 +208,7 @@ func TestStdioContextCancellation(t *testing.T) { } func TestCmdTransport(t *testing.T) { + t.Skip() requireExec(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/mcp/mcp_test.go b/mcp/mcp_test.go index fa941bf0..7d13be8f 100644 --- a/mcp/mcp_test.go +++ b/mcp/mcp_test.go @@ -118,16 +118,6 @@ func TestEndToEnd(t *testing.T) { t.Errorf("after connection, Clients() has length %d, want 1", len(got)) } - // Wait for the server to exit after the client closes its connection. - var clientWG sync.WaitGroup - clientWG.Add(1) - go func() { - if err := ss.Wait(); err != nil { - t.Errorf("server failed: %v", err) - } - clientWG.Done() - }() - loggingMessages := make(chan *LoggingMessageParams, 100) // big enough for all logging opts := &ClientOptions{ CreateMessageHandler: func(context.Context, *CreateMessageRequest) (*CreateMessageResult, error) { @@ -474,6 +464,7 @@ func TestEndToEnd(t *testing.T) { }) t.Run("resource_subscriptions", func(t *testing.T) { + t.Skip("TODO") err := cs.Subscribe(ctx, &SubscribeParams{ URI: "test", }) @@ -518,7 +509,9 @@ func TestEndToEnd(t *testing.T) { // Disconnect. cs.Close() - clientWG.Wait() + if err := ss.Wait(); err != nil { + t.Errorf("server failed: %v", err) + } // After disconnecting, neither client nor server should have any // connections. @@ -620,6 +613,7 @@ func basicClientServerConnection(t *testing.T, client *Client, server *Server, c if err != nil { t.Fatal(err) } + t.Cleanup(func() { _ = ss.Close() }) if client == nil { client = NewClient(testImpl, nil) @@ -628,6 +622,8 @@ func basicClientServerConnection(t *testing.T, client *Client, server *Server, c if err != nil { t.Fatal(err) } + t.Cleanup(func() { _ = cs.Close() }) + return cs, ss } @@ -741,14 +737,7 @@ func TestMiddleware(t *testing.T) { t.Fatal(err) } // Wait for the server to exit after the client closes its connection. - var clientWG sync.WaitGroup - clientWG.Add(1) - go func() { - if err := ss.Wait(); err != nil { - t.Errorf("server failed: %v", err) - } - clientWG.Done() - }() + t.Cleanup(func() { _ = ss.Close() }) var sbuf, cbuf bytes.Buffer sbuf.WriteByte('\n') @@ -767,6 +756,8 @@ func TestMiddleware(t *testing.T) { if err != nil { t.Fatal(err) } + t.Cleanup(func() { _ = cs.Close() }) + if _, err := cs.ListTools(ctx, nil); err != nil { t.Fatal(err) } diff --git a/mcp/server.go b/mcp/server.go index fd38a89a..1986677a 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -713,6 +713,7 @@ func (s *Server) Run(ctx context.Context, t Transport) error { select { case <-ctx.Done(): ss.Close() + <-ssClosed // wait until waiting go routine above actually completes return ctx.Err() case err := <-ssClosed: return err diff --git a/mcp/streamable_example_test.go b/mcp/streamable_example_test.go index 430f2745..4656bbf7 100644 --- a/mcp/streamable_example_test.go +++ b/mcp/streamable_example_test.go @@ -26,7 +26,7 @@ func ExampleStreamableHTTPHandler() { server := mcp.NewServer(&mcp.Implementation{Name: "server", Version: "v0.1.0"}, nil) handler := mcp.NewStreamableHTTPHandler(func(r *http.Request) *mcp.Server { return server - }, &mcp.StreamableHTTPOptions{JSONResponse: true}) + }, &mcp.StreamableHTTPOptions{JSONResponse: true, Stateless: true}) httpServer := httptest.NewServer(handler) defer httpServer.Close() @@ -45,7 +45,7 @@ func ExampleStreamableHTTPHandler_middleware() { server := mcp.NewServer(&mcp.Implementation{Name: "server", Version: "v0.1.0"}, nil) handler := mcp.NewStreamableHTTPHandler(func(r *http.Request) *mcp.Server { return server - }, nil) + }, &mcp.StreamableHTTPOptions{Stateless: true}) loggingHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // Example debugging; you could also capture the response. body, err := io.ReadAll(req.Body) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 79e9645f..c0616abc 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -34,6 +34,7 @@ import ( ) func TestStreamableTransports(t *testing.T) { + t.Skip() // This test checks that the streamable server and client transports can // communicate. @@ -270,6 +271,7 @@ func TestStreamableServerShutdown(t *testing.T) { // uses a proxy that is killed and restarted to simulate a recoverable network // outage. func TestClientReplay(t *testing.T) { + t.Skip() for _, test := range []clientReplayTest{ {"default", 0, true}, {"no retries", -1, false}, @@ -460,14 +462,15 @@ func TestServerTransportCleanup(t *testing.T) { if err != nil { t.Fatalf("client.Connect() failed: %v", err) } - defer clientSession.Close() + t.Cleanup(func() { _ = clientSession.Close() }) } for _, ch := range chans { select { case <-ctx.Done(): t.Errorf("did not capture transport deletion event from all session in 10 seconds") - case <-ch: // Received transport deletion signal of this session + case <-ch: + t.Log("Received transport deletion signal of this session") } } @@ -1254,6 +1257,7 @@ func TestStreamableStateless(t *testing.T) { if err != nil { t.Fatal(err) } + t.Cleanup(func() { cs.Close() }) res, err := cs.CallTool(ctx, &CallToolParams{Name: "greet", Arguments: hiParams{Name: "bar"}}) if err != nil { t.Fatal(err) @@ -1420,4 +1424,16 @@ func TestStreamableGET(t *testing.T) { if got, want := resp.StatusCode, http.StatusOK; got != want { t.Errorf("GET with session ID: got status %d, want %d", got, want) } + + t.Log("Sending final DELETE request to close session and release resources") + del := newReq("DELETE", nil) + del.Header.Set(sessionIDHeader, sessionID) + resp, err = http.DefaultClient.Do(del) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if got, want := resp.StatusCode, http.StatusNoContent; got != want { + t.Errorf("DELETE with session ID: got status %d, want %d", got, want) + } } diff --git a/mcp/transport.go b/mcp/transport.go index d2109e7d..b9215b3b 100644 --- a/mcp/transport.go +++ b/mcp/transport.go @@ -284,7 +284,14 @@ func (r rwc) Write(p []byte) (n int, err error) { } func (r rwc) Close() error { - return errors.Join(r.rc.Close(), r.wc.Close()) + rcErr := r.rc.Close() + + var wcErr error + if r.wc != nil { + wcErr = r.wc.Close() + } + + return errors.Join(rcErr, wcErr) } // An ioConn is a transport that delimits messages with newlines across diff --git a/mcp/transport_example_test.go b/mcp/transport_example_test.go index 064ab0f2..7390ea4e 100644 --- a/mcp/transport_example_test.go +++ b/mcp/transport_example_test.go @@ -24,16 +24,21 @@ func ExampleLoggingTransport() { ctx := context.Background() t1, t2 := mcp.NewInMemoryTransports() server := mcp.NewServer(&mcp.Implementation{Name: "server", Version: "v0.0.1"}, nil) - if _, err := server.Connect(ctx, t1, nil); err != nil { + serverSession, err := server.Connect(ctx, t1, nil) + if err != nil { log.Fatal(err) } + defer serverSession.Close() client := mcp.NewClient(&mcp.Implementation{Name: "client", Version: "v0.0.1"}, nil) var b bytes.Buffer logTransport := &mcp.LoggingTransport{Transport: t2, Writer: &b} - if _, err := client.Connect(ctx, logTransport, nil); err != nil { + clientSession, err := client.Connect(ctx, logTransport, nil) + if err != nil { log.Fatal(err) } + defer clientSession.Close() + // Sort for stability: reads are concurrent to writes. for _, line := range slices.Sorted(strings.SplitSeq(b.String(), "\n")) { fmt.Println(line) diff --git a/mcp/transport_test.go b/mcp/transport_test.go index d40ce10f..aeff3663 100644 --- a/mcp/transport_test.go +++ b/mcp/transport_test.go @@ -25,6 +25,7 @@ func TestBatchFraming(t *testing.T) { r, w := io.Pipe() tport := newIOConn(rwc{r, w}) tport.outgoingBatch = make([]jsonrpc.Message, 0, 2) + t.Cleanup(func() { tport.Close() }) // Read the two messages into a channel, for easy testing later. read := make(chan jsonrpc.Message) @@ -100,6 +101,7 @@ func TestIOConnRead(t *testing.T) { tr := newIOConn(rwc{ rc: io.NopCloser(strings.NewReader(tt.input)), }) + t.Cleanup(func() { tr.Close() }) if tt.protocolVersion != "" { tr.sessionUpdated(ServerSessionState{ InitializeParams: &InitializeParams{ From c9ef5b8715310a88ed56f109c110f1aec09342bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Thu, 18 Sep 2025 20:27:32 +0200 Subject: [PATCH 03/13] mcp: fix more flaky tests --- mcp/cmd_test.go | 5 ++--- mcp/mcp_test.go | 1 - mcp/streamable_test.go | 6 ++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 4af35667..4b970148 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -122,7 +122,6 @@ func TestServerRunContextCancel(t *testing.T) { } func TestServerInterrupt(t *testing.T) { - t.Skip() if runtime.GOOS == "windows" { t.Skip("requires POSIX signals") } @@ -134,10 +133,11 @@ func TestServerInterrupt(t *testing.T) { cmd := createServerCommand(t, "default") client := mcp.NewClient(testImpl, nil) - _, err := client.Connect(ctx, &mcp.CommandTransport{Command: cmd}, nil) + session, err := client.Connect(ctx, &mcp.CommandTransport{Command: cmd}, nil) if err != nil { t.Fatal(err) } + t.Cleanup(func() { session.Close() }) // get a signal when the server process exits onExit := make(chan struct{}) @@ -208,7 +208,6 @@ func TestStdioContextCancellation(t *testing.T) { } func TestCmdTransport(t *testing.T) { - t.Skip() requireExec(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/mcp/mcp_test.go b/mcp/mcp_test.go index 7d13be8f..2cf30f9b 100644 --- a/mcp/mcp_test.go +++ b/mcp/mcp_test.go @@ -464,7 +464,6 @@ func TestEndToEnd(t *testing.T) { }) t.Run("resource_subscriptions", func(t *testing.T) { - t.Skip("TODO") err := cs.Subscribe(ctx, &SubscribeParams{ URI: "test", }) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index c0616abc..885a0f6c 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -34,7 +34,6 @@ import ( ) func TestStreamableTransports(t *testing.T) { - t.Skip() // This test checks that the streamable server and client transports can // communicate. @@ -271,7 +270,6 @@ func TestStreamableServerShutdown(t *testing.T) { // uses a proxy that is killed and restarted to simulate a recoverable network // outage. func TestClientReplay(t *testing.T) { - t.Skip() for _, test := range []clientReplayTest{ {"default", 0, true}, {"no retries", -1, false}, @@ -344,7 +342,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) { if err != nil { t.Fatalf("client.Connect() failed: %v", err) } - defer clientSession.Close() + t.Cleanup(func() { clientSession.Close() }) var ( wg sync.WaitGroup @@ -386,7 +384,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) { restartedProxy := &http.Server{Handler: proxyHandler} go restartedProxy.Serve(listener) - defer restartedProxy.Close() + t.Cleanup(func() { restartedProxy.Close() }) wg.Wait() From 99ab514d526aa97acbc5eb4e2e13b751ca96bd6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Thu, 18 Sep 2025 22:32:35 +0200 Subject: [PATCH 04/13] mcp: fix more issues found in unit tests --- mcp/cmd_test.go | 37 ++++++++++++++++++------------------- mcp/streamable_test.go | 2 +- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 4b970148..9d1c8bba 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -36,8 +36,6 @@ func SayHi(ctx context.Context, req *mcp.CallToolRequest, args SayHiParams) (*mc } func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) - // If the runAsServer variable is set, execute the relevant serverFunc // instead of running tests (aka the fork and exec trick). if name := os.Getenv(runAsServer); name != "" { @@ -49,6 +47,8 @@ func TestMain(m *testing.M) { run() return } + + goleak.VerifyTestMain(m) os.Exit(m.Run()) } @@ -127,36 +127,35 @@ func TestServerInterrupt(t *testing.T) { } requireExec(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + t.Log("Starting server command") cmd := createServerCommand(t, "default") client := mcp.NewClient(testImpl, nil) + t.Log("Connecting to server") + + ctx := context.Background() session, err := client.Connect(ctx, &mcp.CommandTransport{Command: cmd}, nil) if err != nil { t.Fatal(err) } - t.Cleanup(func() { session.Close() }) - // get a signal when the server process exits - onExit := make(chan struct{}) - go func() { - cmd.Process.Wait() - close(onExit) - }() + _, err = session.ListTools(ctx, nil) + if err != nil { + t.Fatal(err) + } - // send a signal to the server process to terminate it + t.Log("Send a signal to the server process to terminate it") if err := cmd.Process.Signal(os.Interrupt); err != nil { t.Fatal(err) } - // wait for the server to exit - // TODO: use synctest when available - select { - case <-time.After(5 * time.Second): - t.Fatal("server did not exit after SIGINT") - case <-onExit: + t.Log("Closing client session so server can exit immediately") + session.Close() + + t.Log("Wait for process to terminate after interrupt signal") + _, err = cmd.Process.Wait() + if err == nil { + t.Errorf("unexpected error: %v", err) } } diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index dec30dc9..88f81ac5 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -316,7 +316,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) { }) realServer := httptest.NewServer(NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, nil)) - defer realServer.Close() + t.Cleanup(func() { realServer.Close() }) realServerURL, err := url.Parse(realServer.URL) if err != nil { t.Fatalf("Failed to parse real server URL: %v", err) From eec8eebb31fc3d8a428371e9c4156d1c1f4573c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Fri, 19 Sep 2025 08:50:30 +0200 Subject: [PATCH 05/13] mcp: close the mcpConn in the ServerSession --- mcp/server.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/mcp/server.go b/mcp/server.go index 04188a98..a4e0f843 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -10,6 +10,7 @@ import ( "encoding/base64" "encoding/gob" "encoding/json" + "errors" "fmt" "iter" "maps" @@ -1124,13 +1125,21 @@ func (ss *ServerSession) Close() error { // Close is idempotent and conn.Close() handles concurrent calls correctly ss.keepaliveCancel() } - err := ss.conn.Close() + + var connErr, mcpConnErr error + if err := ss.conn.Close(); err != nil { + connErr = fmt.Errorf("failed to close connection: %w", err) + } + + if err := ss.mcpConn.Close(); err != nil { + connErr = fmt.Errorf("failed to close mcp connection: %w", err) + } if ss.onClose != nil { ss.onClose() } - return err + return errors.Join(connErr, mcpConnErr) } // Wait waits for the connection to be closed by the client. From 2ff35bc78ad98d4c32697775641f745dcaf9e6b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Fri, 19 Sep 2025 09:56:00 +0200 Subject: [PATCH 06/13] mcp: debug TestClientReplay --- mcp/streamable_test.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 88f81ac5..95e7978f 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -316,7 +316,10 @@ func testClientReplay(t *testing.T, test clientReplayTest) { }) realServer := httptest.NewServer(NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, nil)) - t.Cleanup(func() { realServer.Close() }) + t.Cleanup(func() { + t.Log("Closing real HTTP server") + realServer.Close() + }) realServerURL, err := url.Parse(realServer.URL) if err != nil { t.Fatalf("Failed to parse real server URL: %v", err) @@ -342,21 +345,20 @@ func testClientReplay(t *testing.T, test clientReplayTest) { if err != nil { t.Fatalf("client.Connect() failed: %v", err) } - t.Cleanup(func() { clientSession.Close() }) + t.Cleanup(func() { + t.Log("Closing clientSession") + clientSession.Close() + }) - var ( - wg sync.WaitGroup - callErr error - ) - wg.Add(1) + toolCallResult := make(chan error, 1) go func() { - defer wg.Done() - _, callErr = clientSession.CallTool(ctx, &CallToolParams{Name: "multiMessageTool"}) + _, callErr := clientSession.CallTool(ctx, &CallToolParams{Name: "multiMessageTool"}) + toolCallResult <- callErr }() select { case <-serverReadyToKillProxy: - // Server has sent the first two messages and is paused. + t.Log("Server has sent the first two messages and is paused.") case <-ctx.Done(): t.Fatalf("Context timed out before server was ready to kill proxy") } @@ -386,7 +388,7 @@ func testClientReplay(t *testing.T, test clientReplayTest) { go restartedProxy.Serve(listener) t.Cleanup(func() { restartedProxy.Close() }) - wg.Wait() + callErr := <-toolCallResult if test.wantRecovered { // If we've recovered, we should get all 4 notifications and the tool call From 37423ef3f5da37c904fc3e29bc13b9b235c037d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Fri, 19 Sep 2025 11:03:28 +0200 Subject: [PATCH 07/13] Make leak checking opt-in --- mcp/cmd_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 9d1c8bba..75c4c2ea 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -7,6 +7,7 @@ package mcp_test import ( "context" "errors" + "flag" "log" "os" "os/exec" @@ -23,6 +24,9 @@ import ( const runAsServer = "_MCP_RUN_AS_SERVER" +// TODO: remove this flag and always check for goroutine leaks once issue with TestClientReplay is fixed +var leakCheck = flag.Bool("leak", false, "enable goroutine leak checking") + type SayHiParams struct { Name string `json:"name"` } @@ -48,7 +52,12 @@ func TestMain(m *testing.M) { return } - goleak.VerifyTestMain(m) + flag.Parse() + if *leakCheck { + goleak.VerifyTestMain(m) + return + } + os.Exit(m.Run()) } From 18d642bb491a5740be722431cca424d11cddecd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Fri, 19 Sep 2025 11:14:14 +0200 Subject: [PATCH 08/13] mcp: mention #499 in TODO note of leakCheck test flag --- mcp/cmd_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 75c4c2ea..16634bd5 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -24,7 +24,8 @@ import ( const runAsServer = "_MCP_RUN_AS_SERVER" -// TODO: remove this flag and always check for goroutine leaks once issue with TestClientReplay is fixed +// TODO: remove this flag and always check for goroutine leaks once +// . https://github.com/modelcontextprotocol/go-sdk/issues/499 is fixed var leakCheck = flag.Bool("leak", false, "enable goroutine leak checking") type SayHiParams struct { From 3b84ae345d7bf8b951f7574ed9437ab14b6bc33c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Fri, 19 Sep 2025 11:20:05 +0200 Subject: [PATCH 09/13] mcp: minor polishing of changed test code before review --- mcp/cmd_test.go | 5 ----- mcp/streamable_test.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 16634bd5..79c15445 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -149,11 +149,6 @@ func TestServerInterrupt(t *testing.T) { t.Fatal(err) } - _, err = session.ListTools(ctx, nil) - if err != nil { - t.Fatal(err) - } - t.Log("Send a signal to the server process to terminate it") if err := cmd.Process.Signal(os.Interrupt); err != nil { t.Fatal(err) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 95e7978f..3a34c161 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -470,7 +470,7 @@ func TestServerTransportCleanup(t *testing.T) { case <-ctx.Done(): t.Errorf("did not capture transport deletion event from all session in 10 seconds") case <-ch: - t.Log("Received transport deletion signal of this session") + t.Log("Received session transport deletion signal") } } From a43fb6ff32b6c00bcf015d8bc76be5f01f0a1b15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Mon, 6 Oct 2025 16:32:56 +0200 Subject: [PATCH 10/13] mcp: simplify ServerSession.Close() based on review feedback --- mcp/server.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/mcp/server.go b/mcp/server.go index a4e0f843..04188a98 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -10,7 +10,6 @@ import ( "encoding/base64" "encoding/gob" "encoding/json" - "errors" "fmt" "iter" "maps" @@ -1125,21 +1124,13 @@ func (ss *ServerSession) Close() error { // Close is idempotent and conn.Close() handles concurrent calls correctly ss.keepaliveCancel() } - - var connErr, mcpConnErr error - if err := ss.conn.Close(); err != nil { - connErr = fmt.Errorf("failed to close connection: %w", err) - } - - if err := ss.mcpConn.Close(); err != nil { - connErr = fmt.Errorf("failed to close mcp connection: %w", err) - } + err := ss.conn.Close() if ss.onClose != nil { ss.onClose() } - return errors.Join(connErr, mcpConnErr) + return err } // Wait waits for the connection to be closed by the client. From f41256c20656c0ba1de063cf13d12749234529b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Mon, 6 Oct 2025 16:43:59 +0200 Subject: [PATCH 11/13] mcp: add TODO, referencing #499 --- mcp/streamable_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 940057ca..5be9ac8d 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -269,6 +269,8 @@ func TestStreamableServerShutdown(t *testing.T) { // network failure and receive replayed messages (if replay is configured). It // uses a proxy that is killed and restarted to simulate a recoverable network // outage. +// +// TODO: Until we have a way to clean up abandoned sessions, this test will leak goroutines (see #499) func TestClientReplay(t *testing.T) { for _, test := range []clientReplayTest{ {"default", 0, true}, From 415ff5665186a860a3c7097d09e0b51ff36f5fc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Mon, 6 Oct 2025 16:46:35 +0200 Subject: [PATCH 12/13] mcp: add comment about nil write in unit tests --- mcp/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp/transport.go b/mcp/transport.go index 1fde5b74..75f5f8ea 100644 --- a/mcp/transport.go +++ b/mcp/transport.go @@ -303,7 +303,7 @@ func (r rwc) Close() error { rcErr := r.rc.Close() var wcErr error - if r.wc != nil { + if r.wc != nil { // we only allow a nil writer in unit tests wcErr = r.wc.Close() } From c29d3a7d132aa870405856bbd8d9c8865fafa9fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= <733004+fgrosse@users.noreply.github.com> Date: Wed, 15 Oct 2025 16:05:30 +0200 Subject: [PATCH 13/13] mcp: implement review feedback --- go.mod | 1 - go.sum | 10 ---------- mcp/client_example_test.go | 8 ++++---- mcp/cmd_test.go | 13 ------------- mcp/streamable_example_test.go | 3 ++- 5 files changed, 6 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index bf06a6c1..d5917a16 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/google/go-cmp v0.7.0 github.com/google/jsonschema-go v0.3.0 github.com/yosida95/uritemplate/v3 v3.0.2 - go.uber.org/goleak v1.3.0 golang.org/x/oauth2 v0.30.0 golang.org/x/tools v0.34.0 ) diff --git a/go.sum b/go.sum index ad70eb20..32ceedfe 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,12 @@ -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/jsonschema-go v0.3.0 h1:6AH2TxVNtk3IlvkkhjrtbUc4S8AvO0Xii0DxIygDg+Q= github.com/google/jsonschema-go v0.3.0/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/mcp/client_example_test.go b/mcp/client_example_test.go index 411fd631..cc7146c2 100644 --- a/mcp/client_example_test.go +++ b/mcp/client_example_test.go @@ -42,17 +42,17 @@ func Example_roots() { // Connect the server and client... t1, t2 := mcp.NewInMemoryTransports() - sess1, err := s.Connect(ctx, t1, nil) + serverSession, err := s.Connect(ctx, t1, nil) if err != nil { log.Fatal(err) } - defer sess1.Close() + defer serverSession.Close() - sess2, err := c.Connect(ctx, t2, nil) + clientSession, err := c.Connect(ctx, t2, nil) if err != nil { log.Fatal(err) } - defer sess2.Close() + defer clientSession.Close() // ...and add a root. The server is notified about the change. c.AddRoots(&mcp.Root{URI: "file://b"}) diff --git a/mcp/cmd_test.go b/mcp/cmd_test.go index 79c15445..ebf6b592 100644 --- a/mcp/cmd_test.go +++ b/mcp/cmd_test.go @@ -7,7 +7,6 @@ package mcp_test import ( "context" "errors" - "flag" "log" "os" "os/exec" @@ -19,15 +18,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/modelcontextprotocol/go-sdk/mcp" - "go.uber.org/goleak" ) const runAsServer = "_MCP_RUN_AS_SERVER" -// TODO: remove this flag and always check for goroutine leaks once -// . https://github.com/modelcontextprotocol/go-sdk/issues/499 is fixed -var leakCheck = flag.Bool("leak", false, "enable goroutine leak checking") - type SayHiParams struct { Name string `json:"name"` } @@ -52,13 +46,6 @@ func TestMain(m *testing.M) { run() return } - - flag.Parse() - if *leakCheck { - goleak.VerifyTestMain(m) - return - } - os.Exit(m.Run()) } diff --git a/mcp/streamable_example_test.go b/mcp/streamable_example_test.go index 4656bbf7..f1cdf90a 100644 --- a/mcp/streamable_example_test.go +++ b/mcp/streamable_example_test.go @@ -18,6 +18,7 @@ import ( // !+streamablehandler +// TODO: Until we have a way to clean up abandoned sessions, this test will leak goroutines (see #499) func ExampleStreamableHTTPHandler() { // Create a new streamable handler, using the same MCP server for every request. // @@ -26,7 +27,7 @@ func ExampleStreamableHTTPHandler() { server := mcp.NewServer(&mcp.Implementation{Name: "server", Version: "v0.1.0"}, nil) handler := mcp.NewStreamableHTTPHandler(func(r *http.Request) *mcp.Server { return server - }, &mcp.StreamableHTTPOptions{JSONResponse: true, Stateless: true}) + }, &mcp.StreamableHTTPOptions{JSONResponse: true}) httpServer := httptest.NewServer(handler) defer httpServer.Close()