Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ Example:

This server implements the Model Context Protocol (MCP) which allows it to be easily integrated with LLM clients that support the protocol. For more information about MCP, visit [the MCP repository](https://github.com/modelcontextprotocol/mcp).

### VScode MCP, RooCode example

```json
// Roo Code, use bunx or npx, sessionId=
{
"mcpServers":{
"rust-crate-docs": {
"command": "bunx",
"args": [
"-y",
"mcp-remote@latest",
"http://127.0.0.1:3000/sse?sessionId=",
"--allow-http",
"--transport sse-only",
"--debug"
]
}
}
}
```


## License

MIT License
6 changes: 6 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
run:
cargo run --bin cratedocs http --address 0.0.0.0:3000 --debug

debug-mcp-remote:
# use bunx or npx to see how the mcp-remote proxy connects
bunx mcp-remote@latest "http://127.0.0.1:3000/sse" --allow-http --transport sse-only --debug
119 changes: 92 additions & 27 deletions src/transport/http_sse_server/http_sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,47 +52,112 @@ fn session_id() -> SessionId {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PostEventQuery {
pub session_id: String,
#[serde(default)] // Use None if session_id is not present in query
pub session_id: Option<String>,
}

async fn post_event_handler(
State(app): State<App>,
Query(PostEventQuery { session_id }): Query<PostEventQuery>,
Query(query_params): Query<PostEventQuery>,
body: Body,
) -> Result<StatusCode, StatusCode> {
tracing::debug!(?query_params, "Received POST request");
const BODY_BYTES_LIMIT: usize = 1 << 22;
let write_stream = {
let rg = app.txs.read().await;
rg.get(session_id.as_str())
.ok_or(StatusCode::NOT_FOUND)?
.clone()
};
let mut write_stream = write_stream.lock().await;
let mut body = body.into_data_stream();
if let (_, Some(size)) = body.size_hint() {
if size > BODY_BYTES_LIMIT {
const BUFFER_SIZE: usize = 1 << 12; // For new sessions

let (session_id_arc, c2s_writer_for_body): (SessionId, C2SWriter) =
match query_params.session_id {
Some(id_str) => {
tracing::debug!(session_id = %id_str, "sessionId provided in query");
// Convert String to Arc<str> for map lookup
let session_arc: SessionId = Arc::from(id_str.as_str());
let rg = app.txs.read().await;
match rg.get(&session_arc) {
Some(writer) => {
tracing::debug!(session_id = %session_arc, "Found existing session writer");
(session_arc, writer.clone())
}
None => {
tracing::warn!(session_id = %session_arc, "sessionId provided but not found in active sessions");
return Err(StatusCode::NOT_FOUND);
}
}
}
None => {
tracing::info!("sessionId not provided, creating new session for POST request");
let new_session_id_arc = session_id(); // fn session_id() -> Arc<str>
tracing::info!(new_session_id = %new_session_id_arc, "Generated new session ID");

let (c2s_read, c2s_write_half) = tokio::io::simplex(BUFFER_SIZE);
// s2c_read/write are also needed for the ByteTransport and Server::run
// _s2c_read is not directly used by this POST handler but needed for the spawned server task.
let (_s2c_read, s2c_write_half) = tokio::io::simplex(BUFFER_SIZE);

let new_c2s_writer_for_map = Arc::new(Mutex::new(c2s_write_half));
app.txs
.write()
.await
.insert(new_session_id_arc.clone(), new_c2s_writer_for_map.clone());
tracing::info!(session_id = %new_session_id_arc, "Inserted new session writer into app.txs");

// Spawn the server task for the new session
let app_clone = app.clone();
let task_session_id = new_session_id_arc.clone();
tokio::spawn(async move {
let router = RouterService(DocRouter::new());
let server = Server::new(router);
let bytes_transport = ByteTransport::new(c2s_read, s2c_write_half);
tracing::info!(session_id = %task_session_id, "Spawning server task for new POST session");
let _result = server
.run(bytes_transport)
.await
.inspect_err(|e| {
tracing::error!(?e, session_id = %task_session_id, "Server run error for new POST session")
});
app_clone.txs.write().await.remove(&task_session_id);
tracing::info!(session_id = %task_session_id, "Cleaned up new POST session from app.txs after server task completion");
});
(new_session_id_arc, new_c2s_writer_for_map)
}
};

// Process the request body using c2s_writer_for_body
let mut write_stream_locked = c2s_writer_for_body.lock().await;
let mut body_data_stream = body.into_data_stream();

if let (_, Some(size_hint)) = body_data_stream.size_hint() {
if size_hint > BODY_BYTES_LIMIT {
tracing::warn!(%session_id_arc, body_size_hint = size_hint, limit = BODY_BYTES_LIMIT, "Payload too large based on hint");
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
}
// calculate the body size
let mut size = 0;
while let Some(chunk) = body.next().await {
let Ok(chunk) = chunk else {
return Err(StatusCode::BAD_REQUEST);

let mut actual_size = 0;
while let Some(chunk_result) = body_data_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
tracing::error!(%session_id_arc, ?e, "Error reading chunk from body stream");
return Err(StatusCode::BAD_REQUEST);
}
};
size += chunk.len();
if size > BODY_BYTES_LIMIT {
actual_size += chunk.len();
if actual_size > BODY_BYTES_LIMIT {
tracing::warn!(%session_id_arc, actual_body_size = actual_size, limit = BODY_BYTES_LIMIT, "Payload too large during streaming");
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
write_stream
.write_all(&chunk)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if let Err(e) = write_stream_locked.write_all(&chunk).await {
tracing::error!(%session_id_arc, ?e, "Error writing chunk to session stream");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
write_stream
.write_u8(b'\n')
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

if let Err(e) = write_stream_locked.write_u8(b'\n').await {
tracing::error!(%session_id_arc, ?e, "Error writing newline to session stream");
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}

tracing::info!(%session_id_arc, "Successfully processed POST request body");
Ok(StatusCode::ACCEPTED)
}

Expand Down