Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type ServerOptions struct {
SubscribeHandler func(context.Context, *SubscribeParams) error
// Function called when a client session unsubscribes from a resource.
UnsubscribeHandler func(context.Context, *UnsubscribeParams) error
// If true, the Mcp-Session-Id header will not be sent or validated.
// This enables stateless operation where each request is independent.
Stateless bool
}

// NewServer creates a new MCP server. The resulting server has no features:
Expand Down Expand Up @@ -720,14 +723,23 @@ func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any,
ss.mu.Lock()
initialized := ss.initialized
ss.mu.Unlock()

var isStateless bool
if transport, ok := ss.mcpConn.(*StreamableServerTransport); ok {
isStateless = transport.id == ""
}

// From the spec:
// "The client SHOULD NOT send requests other than pings before the server
// has responded to the initialize request."
switch req.Method {
case "initialize", "ping":
default:
if !initialized {
return nil, fmt.Errorf("method %q is invalid during session initialization", req.Method)
// However, in stateless mode, each request is independent and doesn't require initialization.
if !isStateless {
switch req.Method {
case "initialize", "ping":
default:
if !initialized {
return nil, fmt.Errorf("method %q is invalid during session initialization", req.Method)
}
}
}
// For the streamable transport, we need the request ID to correlate
Expand Down
84 changes: 54 additions & 30 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,31 +98,50 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
return
}

server := h.getServer(req)
if server == nil {
// The getServer argument to NewStreamableHTTPHandler returned nil.
http.Error(w, "no server available", http.StatusBadRequest)
return
}

stateless := server.opts.Stateless

var session *StreamableServerTransport
if id := req.Header.Get(sessionIDHeader); id != "" {
h.sessionsMu.Lock()
session, _ = h.sessions[id]
h.sessionsMu.Unlock()
if session == nil {
http.Error(w, "session not found", http.StatusNotFound)
return

// Only validate and lookup sessions if not in stateless mode
if !stateless {
if id := req.Header.Get(sessionIDHeader); id != "" {
h.sessionsMu.Lock()
session, _ = h.sessions[id]
h.sessionsMu.Unlock()
if session == nil {
http.Error(w, "session not found", http.StatusNotFound)
return
}
}
}

// TODO(rfindley): simplify the locking so that each request has only one
// critical section.
if req.Method == http.MethodDelete {
if session == nil {
// => Mcp-Session-Id was not set; else we'd have returned NotFound above.
http.Error(w, "DELETE requires an Mcp-Session-Id header", http.StatusBadRequest)
// TODO(rfindley): simplify the locking so that each request has only one
// critical section.
if req.Method == http.MethodDelete {
if session == nil {
// => Mcp-Session-Id was not set; else we'd have returned NotFound above.
http.Error(w, "DELETE requires an Mcp-Session-Id header", http.StatusBadRequest)
return
}
h.sessionsMu.Lock()
delete(h.sessions, session.id)
h.sessionsMu.Unlock()
session.Close()
w.WriteHeader(http.StatusNoContent)
return
}
} else {
// In stateless mode, DELETE method doesn't make sense since there are no persistent sessions
if req.Method == http.MethodDelete {
http.Error(w, "DELETE not supported in stateless mode", http.StatusMethodNotAllowed)
return
}
h.sessionsMu.Lock()
delete(h.sessions, session.id)
h.sessionsMu.Unlock()
session.Close()
w.WriteHeader(http.StatusNoContent)
return
}

switch req.Method {
Expand All @@ -134,23 +153,26 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
}

if session == nil {
s := NewStreamableServerTransport(randText(), nil)
server := h.getServer(req)
if server == nil {
// The getServer argument to NewStreamableHTTPHandler returned nil.
http.Error(w, "no server available", http.StatusBadRequest)
return
var sessionID = ""
if !stateless {
sessionID = randText()
}

s := NewStreamableServerTransport(sessionID, nil)
// Pass req.Context() here, to allow middleware to add context values.
// The context is detached in the jsonrpc2 library when handling the
// long-running stream.
if _, err := server.Connect(req.Context(), s); err != nil {
http.Error(w, "failed connection", http.StatusInternalServerError)
return
}
h.sessionsMu.Lock()
h.sessions[s.id] = s
h.sessionsMu.Unlock()

// Only store the session if not in stateless mode
if !stateless {
h.sessionsMu.Lock()
h.sessions[s.id] = s
h.sessionsMu.Unlock()
}
session = s
}

Expand Down Expand Up @@ -437,7 +459,9 @@ func (t *StreamableServerTransport) streamResponse(stream *stream, w http.Respon
return true
}

w.Header().Set(sessionIDHeader, t.id)
if t.id != "" {
w.Header().Set(sessionIDHeader, t.id)
}
w.Header().Set("Content-Type", "text/event-stream") // Accept checked in [StreamableHTTPHandler]
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
Expand Down
115 changes: 115 additions & 0 deletions mcp/streamable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,118 @@ func TestEventID(t *testing.T) {
})
}
}

func TestStreamableStateless(t *testing.T) {
// Test stateless mode behavior
ctx := context.Background()

// Create a server with stateless mode enabled
statelessServer := NewServer(testImpl, &ServerOptions{Stateless: true})
AddTool(statelessServer, &Tool{Name: "greet", Description: "say hi"}, sayHi)

// Create a regular server for comparison
regularServer := NewServer(testImpl, nil)
AddTool(regularServer, &Tool{Name: "greet", Description: "say hi"}, sayHi)

// Test stateless server
t.Run("stateless_server", func(t *testing.T) {
handler := NewStreamableHTTPHandler(func(*http.Request) *Server { return statelessServer }, nil)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()

// Verify we can call tools/list directly without initialization in stateless mode
req, err := http.NewRequestWithContext(ctx, http.MethodPost, httpServer.URL,
strings.NewReader(`{"jsonrpc":"2.0","method":"tools/list","id":1,"params":{}}`))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

// Verify that no session ID header is returned in stateless mode
sessionID := resp.Header.Get(sessionIDHeader)
if sessionID != "" {
t.Errorf("Expected no session ID header in stateless mode, got: %s", sessionID)
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
t.Errorf("Expected successful response in stateless mode, got status: %d", resp.StatusCode)
}

// Verify we can make another request without session ID
req2, err := http.NewRequestWithContext(ctx, http.MethodPost, httpServer.URL,
strings.NewReader(`{"jsonrpc":"2.0","method":"tools/call","id":2,"params":{"name":"greet","arguments":{"name":"World"}}}`))
if err != nil {
t.Fatal(err)
}
req2.Header.Set("Content-Type", "application/json")
req2.Header.Set("Accept", "application/json, text/event-stream")

resp2, err := http.DefaultClient.Do(req2)
if err != nil {
t.Fatal(err)
}
defer resp2.Body.Close()

if resp2.StatusCode != http.StatusOK && resp2.StatusCode != http.StatusAccepted {
t.Errorf("Expected successful response for tool call in stateless mode, got status: %d", resp2.StatusCode)
}
})

// Test regular server
t.Run("regular_server", func(t *testing.T) {
handler := NewStreamableHTTPHandler(func(*http.Request) *Server { return regularServer }, nil)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()

// Create a request to the regular server
req, err := http.NewRequestWithContext(ctx, http.MethodPost, httpServer.URL,
strings.NewReader(`{"jsonrpc":"2.0","method":"initialize","id":1,"params":{"protocolVersion":"2025-06-18","clientInfo":{"name":"test","version":"1.0"}}}`))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

// Verify that session ID header is returned in regular mode
sessionID := resp.Header.Get(sessionIDHeader)
if sessionID == "" {
t.Error("Expected session ID header in regular mode, got empty string")
}
})

// Test DELETE method rejection in stateless mode
t.Run("delete_rejected_in_stateless", func(t *testing.T) {
handler := NewStreamableHTTPHandler(func(*http.Request) *Server { return statelessServer }, nil)
httpServer := httptest.NewServer(handler)
defer httpServer.Close()

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, httpServer.URL, nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Accept", "application/json, text/event-stream")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusMethodNotAllowed {
t.Errorf("Expected 405 Method Not Allowed for DELETE in stateless mode, got: %d", resp.StatusCode)
}
})
}
Loading