From f2df88e045d0b1cc13b1343b2d682315d568ce33 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Tue, 24 Jun 2025 12:12:27 +0800 Subject: [PATCH] fix: cancellable initialization process --- crates/rmcp/src/service/client.rs | 21 +++++++++++++++++++++ crates/rmcp/src/service/server.rs | 21 +++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/crates/rmcp/src/service/client.rs b/crates/rmcp/src/service/client.rs index 53a3e203..3de4609c 100644 --- a/crates/rmcp/src/service/client.rs +++ b/crates/rmcp/src/service/client.rs @@ -39,6 +39,9 @@ pub enum ClientInitializeError { error: E, context: Cow<'static, str>, }, + + #[error("Cancelled")] + Cancelled, } /// Helper function to get the next message from the stream @@ -121,6 +124,24 @@ pub async fn serve_client_with_ct( transport: T, ct: CancellationToken, ) -> Result, ClientInitializeError> +where + S: Service, + T: IntoTransport, + E: std::error::Error + Send + Sync + 'static, +{ + tokio::select! { + result = serve_client_with_ct_inner(service, transport, ct.clone()) => { result } + _ = ct.cancelled() => { + Err(ClientInitializeError::Cancelled) + } + } +} + +async fn serve_client_with_ct_inner( + service: S, + transport: T, + ct: CancellationToken, +) -> Result, ClientInitializeError> where S: Service, T: IntoTransport, diff --git a/crates/rmcp/src/service/server.rs b/crates/rmcp/src/service/server.rs index 6f585a61..b43bdb94 100644 --- a/crates/rmcp/src/service/server.rs +++ b/crates/rmcp/src/service/server.rs @@ -58,6 +58,9 @@ pub enum ServerInitializeError { error: E, context: Cow<'static, str>, }, + + #[error("Cancelled")] + Cancelled, } pub type ClientSink = Peer; @@ -140,6 +143,24 @@ pub async fn serve_server_with_ct( transport: T, ct: CancellationToken, ) -> Result, ServerInitializeError> +where + S: Service, + T: IntoTransport, + E: std::error::Error + Send + Sync + 'static, +{ + tokio::select! { + result = serve_server_with_ct_inner(service, transport, ct.clone()) => { result } + _ = ct.cancelled() => { + Err(ServerInitializeError::Cancelled) + } + } +} + +async fn serve_server_with_ct_inner( + service: S, + transport: T, + ct: CancellationToken, +) -> Result, ServerInitializeError> where S: Service, T: IntoTransport,