Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai

### Quick start

Start a client in one line:
Start a client:

```rust, ignore
use rmcp::{ServiceExt, transport::TokioChildProcess};
use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
use tokio::process::Command;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = ().serve(
TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))?
).await?;
let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| {
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
}))?).await?;
Ok(())
}
```
Expand Down Expand Up @@ -92,7 +92,7 @@ let server = service.serve(transport).await?;
Once the server is initialized, you can send requests or notifications:

```rust, ignore
// request
// request
let roots = server.list_roots().await?;

// or send notification
Expand Down
8 changes: 7 additions & 1 deletion crates/rmcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ url = { version = "2.4", optional = true }
# For tower compatibility
tower-service = { version = "0.3", optional = true }

# for child process transport
process-wrap = { version = "8.2", features = ["tokio1"], optional = true}

# for ws transport
# tokio-tungstenite ={ version = "0.26", optional = true }
Expand Down Expand Up @@ -89,7 +91,11 @@ transport-streamable-http-client = [

transport-async-rw = ["tokio/io-util", "tokio-util/codec"]
transport-io = ["transport-async-rw", "tokio/io-std"]
transport-child-process = ["transport-async-rw", "tokio/process"]
transport-child-process = [
"transport-async-rw",
"tokio/process",
"dep:process-wrap",
]
transport-sse-server = [
"transport-async-rw",
"axum",
Expand Down
10 changes: 4 additions & 6 deletions crates/rmcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@
//!
//! ```rust
//! use anyhow::Result;
//! use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess};
//! use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
//! use tokio::process::Command;
//!
//! async fn client() -> Result<()> {
//! let service = ()
//! .serve(TokioChildProcess::new(
//! Command::new("uvx").arg("mcp-server-git"),
//! )?)
//! .await?;
//! let service = ().serve(TokioChildProcess::new(Command::new("uvx").configure(|cmd| {
//! cmd.arg("mcp-server-git");
//! }))?).await?;
//!
//! // Initialize
//! let server_info = service.peer_info();
Expand Down
2 changes: 1 addition & 1 deletion crates/rmcp/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub use worker::WorkerTransport;
pub mod child_process;
#[cfg(feature = "transport-child-process")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-child-process")))]
pub use child_process::TokioChildProcess;
pub use child_process::{ConfigureCommandExt, TokioChildProcess};

#[cfg(feature = "transport-io")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-io")))]
Expand Down
62 changes: 45 additions & 17 deletions crates/rmcp/src/transport/child_process.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
use tokio::{
io::AsyncRead,
process::{ChildStdin, ChildStdout},
Expand All @@ -7,29 +8,41 @@ use super::{IntoTransport, Transport};
use crate::service::ServiceRole;

pub(crate) fn child_process(
mut child: tokio::process::Child,
) -> std::io::Result<(tokio::process::Child, (ChildStdout, ChildStdin))> {
if child.stdin.is_none() {
return Err(std::io::Error::other("std in was taken"));
}
if child.stdout.is_none() {
return Err(std::io::Error::other("std out was taken"));
}
let child_stdin = child.stdin.take().expect("already checked");
let child_stdout = child.stdout.take().expect("already checked");
mut child: Box<dyn TokioChildWrapper>,
) -> std::io::Result<(Box<dyn TokioChildWrapper>, (ChildStdout, ChildStdin))> {
let child_stdin = match child.inner_mut().stdin().take() {
Some(stdin) => stdin,
None => return Err(std::io::Error::other("std in was taken")),
};
let child_stdout = match child.inner_mut().stdout().take() {
Some(stdout) => stdout,
None => return Err(std::io::Error::other("std out was taken")),
};
Ok((child, (child_stdout, child_stdin)))
}

pub struct TokioChildProcess {
child: tokio::process::Child,
child: ChildWithCleanup,
child_stdin: ChildStdin,
child_stdout: ChildStdout,
}

pub struct ChildWithCleanup {
inner: Box<dyn TokioChildWrapper>,
}

impl Drop for ChildWithCleanup {
fn drop(&mut self) {
if let Err(e) = self.inner.start_kill() {
tracing::warn!("Failed to kill child process: {e}");
}
}
}

// we hold the child process with stdout, for it's easier to implement AsyncRead
pin_project_lite::pin_project! {
pub struct TokioChildProcessOut {
child: tokio::process::Child,
child: ChildWithCleanup,
#[pin]
child_stdout: ChildStdout,
}
Expand All @@ -46,14 +59,18 @@ impl AsyncRead for TokioChildProcessOut {
}

impl TokioChildProcess {
pub fn new(child: &mut tokio::process::Command) -> std::io::Result<Self> {
child
.kill_on_drop(true)
pub fn new(mut command: tokio::process::Command) -> std::io::Result<Self> {
command
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped());
let (child, (child_stdout, child_stdin)) = child_process(child.spawn()?)?;
let mut command_wrap = TokioCommandWrap::from(command);
#[cfg(unix)]
command_wrap.wrap(process_wrap::tokio::ProcessGroup::leader());
#[cfg(windows)]
command_wrap.wrap(process_wrap::tokio::JobObject);
let (child, (child_stdout, child_stdin)) = child_process(command_wrap.spawn()?)?;
Ok(Self {
child,
child: ChildWithCleanup { inner: child },
child_stdin,
child_stdout,
})
Expand Down Expand Up @@ -82,3 +99,14 @@ impl<R: ServiceRole> IntoTransport<R, std::io::Error, ()> for TokioChildProcess
)
}
}

pub trait ConfigureCommandExt {
fn configure(self, f: impl FnOnce(&mut Self)) -> Self;
}

impl ConfigureCommandExt for tokio::process::Command {
fn configure(mut self, f: impl FnOnce(&mut Self)) -> Self {
f(&mut self);
self
}
}
9 changes: 5 additions & 4 deletions crates/rmcp/tests/test_with_js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rmcp::{
ServiceExt,
service::QuitReason,
transport::{
SseServer, StreamableHttpClientTransport, TokioChildProcess,
ConfigureCommandExt, SseServer, StreamableHttpClientTransport, TokioChildProcess,
streamable_http_server::axum::StreamableHttpServer,
},
};
Expand Down Expand Up @@ -59,9 +59,10 @@ async fn test_with_js_server() -> anyhow::Result<()> {
.spawn()?
.wait()
.await?;
let transport = TokioChildProcess::new(
tokio::process::Command::new("node").arg("tests/test_with_js/server.js"),
)?;
let transport =
TokioChildProcess::new(tokio::process::Command::new("node").configure(|cmd| {
cmd.arg("tests/test_with_js/server.js");
}))?;

let client = ().serve(transport).await?;
let resources = client.list_all_resources().await?;
Expand Down
10 changes: 4 additions & 6 deletions crates/rmcp/tests/test_with_python.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rmcp::{
ServiceExt,
transport::{SseServer, TokioChildProcess},
transport::{ConfigureCommandExt, SseServer, TokioChildProcess},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod common;
Expand Down Expand Up @@ -54,11 +54,9 @@ async fn test_with_python_server() -> anyhow::Result<()> {
.spawn()?
.wait()
.await?;
let transport = TokioChildProcess::new(
tokio::process::Command::new("uv")
.arg("run")
.arg("tests/test_with_python/server.py"),
)?;
let transport = TokioChildProcess::new(tokio::process::Command::new("uv").configure(|cmd| {
cmd.arg("run").arg("tests/test_with_python/server.py");
}))?;

let client = ().serve(transport).await?;
let resources = client.list_all_resources().await?;
Expand Down
10 changes: 5 additions & 5 deletions docs/readme/README.zh-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai
### 快速上手
一行代码启动客户端:
```rust
use rmcp::{ServiceExt, transport::TokioChildProcess};
use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
use tokio::process::Command;

let client = ().serve(
TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))?
).await?;
let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| {
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
}))?).await?;
```

#### 1. 构建传输层
Expand Down Expand Up @@ -63,7 +63,7 @@ let server = service.serve(transport).await?;
一旦服务初始化完成,你可以发送请求或通知:

```rust, ignore
// 请求
// 请求
let roots = server.list_roots().await?;

// 或发送通知
Expand Down
14 changes: 10 additions & 4 deletions examples/clients/src/collection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::collections::HashMap;

use anyhow::Result;
use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess};
use rmcp::{
model::CallToolRequestParam,
service::ServiceExt,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -20,9 +24,11 @@ async fn main() -> Result<()> {
for idx in 0..10 {
let service = ()
.into_dyn()
.serve(TokioChildProcess::new(
Command::new("uvx").arg("mcp-server-git"),
)?)
.serve(TokioChildProcess::new(Command::new("uvx").configure(
|cmd| {
cmd.arg("mcp-client-git");
},
))?)
.await?;
client_list.insert(idx, service);
}
Expand Down
12 changes: 6 additions & 6 deletions examples/clients/src/everything_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rmcp::{
ServiceExt,
model::{CallToolRequestParam, GetPromptRequestParam, ReadResourceRequestParam},
object,
transport::TokioChildProcess,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -21,11 +21,11 @@ async fn main() -> Result<()> {

// Start server
let service = ()
.serve(TokioChildProcess::new(
Command::new("npx")
.arg("-y")
.arg("@modelcontextprotocol/server-everything"),
)?)
.serve(TokioChildProcess::new(Command::new("npx").configure(
|cmd| {
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
},
))?)
.await?;

// Initialize
Expand Down
21 changes: 11 additions & 10 deletions examples/clients/src/std_io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use anyhow::Result;
use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess};
use rmcp::{
model::CallToolRequestParam,
service::ServiceExt,
transport::{ConfigureCommandExt, TokioChildProcess},
};
use tokio::process::Command;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -14,17 +18,14 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer())
.init();
let service = ()
.serve(TokioChildProcess::new(
Command::new("uvx").arg("mcp-server-git"),
)?)
.serve(TokioChildProcess::new(Command::new("uvx").configure(
|cmd| {
cmd.arg("mcp-server-git");
},
))?)
.await?;

// or
// serve_client(
// (),
// TokioChildProcess::new(Command::new("uvx").arg("mcp-server-git"))?,
// )
// .await?;
// or serve_client((), TokioChildProcess::new(cmd)?).await?;

// Initialize
let server_info = service.peer_info();
Expand Down
9 changes: 4 additions & 5 deletions examples/rig-integration/src/config/mcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, process::Stdio};

use rmcp::{RoleClient, ServiceExt, service::RunningService};
use rmcp::{RoleClient, ServiceExt, service::RunningService, transport::ConfigureCommandExt};
use serde::{Deserialize, Serialize};

use crate::mcp_adaptor::McpManager;
Expand Down Expand Up @@ -70,10 +70,9 @@ impl McpServerTransportConfig {
envs,
} => {
let transport = rmcp::transport::TokioChildProcess::new(
tokio::process::Command::new(command)
.args(args)
.envs(envs)
.stderr(Stdio::null()),
tokio::process::Command::new(command).configure(|cmd| {
cmd.args(args).envs(envs).stderr(Stdio::null());
}),
)?;
().serve(transport).await?
}
Expand Down
13 changes: 7 additions & 6 deletions examples/simple-chat-client/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, path::Path, process::Stdio};

use anyhow::Result;
use rmcp::{RoleClient, ServiceExt, service::RunningService};
use rmcp::{RoleClient, ServiceExt, service::RunningService, transport::ConfigureCommandExt};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -55,11 +55,12 @@ impl McpServerTransportConfig {
envs,
} => {
let transport = rmcp::transport::child_process::TokioChildProcess::new(
tokio::process::Command::new(command)
.args(args)
.envs(envs)
.stderr(Stdio::inherit())
.stdout(Stdio::inherit()),
tokio::process::Command::new(command).configure(|cmd| {
cmd.args(args)
.envs(envs)
.stderr(Stdio::inherit())
.stdout(Stdio::inherit());
}),
)?;
().serve(transport).await?
}
Expand Down
Loading