From 05c18786cc7b1ad9bffd574b0fcc03c18e115529 Mon Sep 17 00:00:00 2001 From: Brian H Date: Fri, 30 May 2025 07:43:16 +0000 Subject: [PATCH] sessionId= should not be required --- README.md | 22 ++++ justfile | 6 + .../http_sse_server/http_sse_server.rs | 119 ++++++++++++++---- 3 files changed, 120 insertions(+), 27 deletions(-) create mode 100644 justfile diff --git a/README.md b/README.md index 2c90eea..38f4864 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,28 @@ Example: This server implements the Model Context Protocol (MCP) which allows it to be easily integrated with LLM clients that support the protocol. For more information about MCP, visit [the MCP repository](https://github.com/modelcontextprotocol/mcp). +### VScode MCP, RooCode example + +```json +// Roo Code, use bunx or npx, sessionId= +{ + "mcpServers":{ + "rust-crate-docs": { + "command": "bunx", + "args": [ + "-y", + "mcp-remote@latest", + "http://127.0.0.1:3000/sse?sessionId=", + "--allow-http", + "--transport sse-only", + "--debug" + ] + } + } +} +``` + + ## License MIT License diff --git a/justfile b/justfile new file mode 100644 index 0000000..0ca0d5c --- /dev/null +++ b/justfile @@ -0,0 +1,6 @@ +run: + cargo run --bin cratedocs http --address 0.0.0.0:3000 --debug + +debug-mcp-remote: + # use bunx or npx to see how the mcp-remote proxy connects + bunx mcp-remote@latest "http://127.0.0.1:3000/sse" --allow-http --transport sse-only --debug diff --git a/src/transport/http_sse_server/http_sse_server.rs b/src/transport/http_sse_server/http_sse_server.rs index 8d2d884..4811811 100644 --- a/src/transport/http_sse_server/http_sse_server.rs +++ b/src/transport/http_sse_server/http_sse_server.rs @@ -52,47 +52,112 @@ fn session_id() -> SessionId { #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct PostEventQuery { - pub session_id: String, + #[serde(default)] // Use None if session_id is not present in query + pub session_id: Option, } async fn post_event_handler( State(app): State, - Query(PostEventQuery { session_id }): Query, + Query(query_params): Query, body: Body, ) -> Result { + tracing::debug!(?query_params, "Received POST request"); const BODY_BYTES_LIMIT: usize = 1 << 22; - let write_stream = { - let rg = app.txs.read().await; - rg.get(session_id.as_str()) - .ok_or(StatusCode::NOT_FOUND)? - .clone() - }; - let mut write_stream = write_stream.lock().await; - let mut body = body.into_data_stream(); - if let (_, Some(size)) = body.size_hint() { - if size > BODY_BYTES_LIMIT { + const BUFFER_SIZE: usize = 1 << 12; // For new sessions + + let (session_id_arc, c2s_writer_for_body): (SessionId, C2SWriter) = + match query_params.session_id { + Some(id_str) => { + tracing::debug!(session_id = %id_str, "sessionId provided in query"); + // Convert String to Arc for map lookup + let session_arc: SessionId = Arc::from(id_str.as_str()); + let rg = app.txs.read().await; + match rg.get(&session_arc) { + Some(writer) => { + tracing::debug!(session_id = %session_arc, "Found existing session writer"); + (session_arc, writer.clone()) + } + None => { + tracing::warn!(session_id = %session_arc, "sessionId provided but not found in active sessions"); + return Err(StatusCode::NOT_FOUND); + } + } + } + None => { + tracing::info!("sessionId not provided, creating new session for POST request"); + let new_session_id_arc = session_id(); // fn session_id() -> Arc + tracing::info!(new_session_id = %new_session_id_arc, "Generated new session ID"); + + let (c2s_read, c2s_write_half) = tokio::io::simplex(BUFFER_SIZE); + // s2c_read/write are also needed for the ByteTransport and Server::run + // _s2c_read is not directly used by this POST handler but needed for the spawned server task. + let (_s2c_read, s2c_write_half) = tokio::io::simplex(BUFFER_SIZE); + + let new_c2s_writer_for_map = Arc::new(Mutex::new(c2s_write_half)); + app.txs + .write() + .await + .insert(new_session_id_arc.clone(), new_c2s_writer_for_map.clone()); + tracing::info!(session_id = %new_session_id_arc, "Inserted new session writer into app.txs"); + + // Spawn the server task for the new session + let app_clone = app.clone(); + let task_session_id = new_session_id_arc.clone(); + tokio::spawn(async move { + let router = RouterService(DocRouter::new()); + let server = Server::new(router); + let bytes_transport = ByteTransport::new(c2s_read, s2c_write_half); + tracing::info!(session_id = %task_session_id, "Spawning server task for new POST session"); + let _result = server + .run(bytes_transport) + .await + .inspect_err(|e| { + tracing::error!(?e, session_id = %task_session_id, "Server run error for new POST session") + }); + app_clone.txs.write().await.remove(&task_session_id); + tracing::info!(session_id = %task_session_id, "Cleaned up new POST session from app.txs after server task completion"); + }); + (new_session_id_arc, new_c2s_writer_for_map) + } + }; + + // Process the request body using c2s_writer_for_body + let mut write_stream_locked = c2s_writer_for_body.lock().await; + let mut body_data_stream = body.into_data_stream(); + + if let (_, Some(size_hint)) = body_data_stream.size_hint() { + if size_hint > BODY_BYTES_LIMIT { + tracing::warn!(%session_id_arc, body_size_hint = size_hint, limit = BODY_BYTES_LIMIT, "Payload too large based on hint"); return Err(StatusCode::PAYLOAD_TOO_LARGE); } } - // calculate the body size - let mut size = 0; - while let Some(chunk) = body.next().await { - let Ok(chunk) = chunk else { - return Err(StatusCode::BAD_REQUEST); + + let mut actual_size = 0; + while let Some(chunk_result) = body_data_stream.next().await { + let chunk = match chunk_result { + Ok(c) => c, + Err(e) => { + tracing::error!(%session_id_arc, ?e, "Error reading chunk from body stream"); + return Err(StatusCode::BAD_REQUEST); + } }; - size += chunk.len(); - if size > BODY_BYTES_LIMIT { + actual_size += chunk.len(); + if actual_size > BODY_BYTES_LIMIT { + tracing::warn!(%session_id_arc, actual_body_size = actual_size, limit = BODY_BYTES_LIMIT, "Payload too large during streaming"); return Err(StatusCode::PAYLOAD_TOO_LARGE); } - write_stream - .write_all(&chunk) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + if let Err(e) = write_stream_locked.write_all(&chunk).await { + tracing::error!(%session_id_arc, ?e, "Error writing chunk to session stream"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } } - write_stream - .write_u8(b'\n') - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + if let Err(e) = write_stream_locked.write_u8(b'\n').await { + tracing::error!(%session_id_arc, ?e, "Error writing newline to session stream"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + + tracing::info!(%session_id_arc, "Successfully processed POST request body"); Ok(StatusCode::ACCEPTED) }