Skip to content

Conversation

@4t145
Copy link
Collaborator

@4t145 4t145 commented May 9, 2025

related issue: #170, #55

1. Refactor transport trait

Now we have a real transport trait:

pub trait Transport<R>: Send
where
    R: ServiceRole,
{
    type Error;
    fn send(
        &mut self,
        item: TxJsonRpcMessage<R>,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static;
    fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<R>>> + Send + '_;
    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

and this new transport trait is compatible with old Sink + Stream constrainment, with a thread shared sink.

pub struct SinkStreamTransport<Si, St> {
    stream: St,
    sink: Arc<Mutex<Si>>,
}

impl<Si, St> SinkStreamTransport<Si, St> {
    pub fn new(sink: Si, stream: St) -> Self {
        Self {
            stream,
            sink: Arc::new(Mutex::new(sink)),
        }
    }
}

impl<Role: ServiceRole, Si, St> Transport<Role> for SinkStreamTransport<Si, St>
where
    St: Send + Stream<Item = RxJsonRpcMessage<Role>> + Unpin,
    Si: Send + Sink<TxJsonRpcMessage<Role>> + Unpin + 'static,
{
    type Error = Si::Error;

    fn send(
        &mut self,
        item: TxJsonRpcMessage<Role>,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
        use futures::SinkExt;
        let lock = self.sink.clone();
        async move {
            let mut write = lock.lock().await;
            write.send(item).await
        }
    }

    fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<Role>>> + '_ {
        use futures::StreamExt;
        self.stream.next()
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        Ok(())
    }
}

Based on those changes, we can make serve_inner loop not blocked by sending request.

  1. New worker transport type:
    For the case those we need to create a new worker task to run the transport, there's a new trait Worker, you can implement Worker and hand it over to WorkerTransport
pub trait Worker: Sized + Send + 'static {
    type Error: std::error::Error + Send + Sync + 'static;
    type Role: ServiceRole;
    fn err_closed() -> Self::Error;
    fn err_join(e: tokio::task::JoinError) -> Self::Error;
    fn run(
        self,
        context: WorkerContext<Self>,
    ) -> impl Future<Output = Result<(), WorkerQuitReason>> + Send;
    fn config(&self) -> WorkerConfig {
        WorkerConfig::default()
    }
}

And with this new trait, I refactor many existed implementation based on it.

  1. Streamable http client
    Implemented streamable http client transport based on new worker.

  2. Auth client
    Trying to make auth client reusable between different http client.

  3. Add cfg-features for document

Motivation and Context

#170

How Has This Been Tested?

Breaking Changes

  1. Rename some feature:
    sse -> sse_client (it's more concret)

  2. Refactor the sse client transport, so do the construct methods.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

@4t145
Copy link
Collaborator Author

4t145 commented May 9, 2025

@jokemanfire I'am planing to extract the common part of reqwest client, so there may have the necessary to rewrite some part of current sse client, and then, both of streamable client and sse client can share the logic of Oauth2.

@jokemanfire
Copy link
Collaborator

Thanks, That's well.

@4t145
Copy link
Collaborator Author

4t145 commented May 12, 2025

I am going to refactor sse client in those aspects:

Cleint trait:

  1. move error from generic parameter to associated type (usually the error type should be a associated type)
  2. use impl Future as return type
pub trait NeoSseClient: Clone + Send + Sync + 'static  {
    type Error: std::error::Error + Send + Sync + 'static;
    fn post_message(
        &self,
        uri: Arc<str>,
        message: ClientJsonRpcMessage,
    ) -> impl Future<Output = Result<(), SseTransportError<Self::Error>>>
    + Send
    + '_;
    fn get_stream(
        &self,
        uri: Arc<str>,
        last_event_id: Option<String>,
    ) -> impl Future<
        Output = Result<
            BoxedSseResponse,
            SseTransportError<Self::Error>,
        >,
    > + Send
    + '_;
}

Move the connection logic to a worker

This can make code more maintainable and easier to understand.

Just like what I've done with this streamable http client in this PR.

Implement client trait directly for raw reqwest client

  1. The reason we have a wrapper at the first is we don't have a trait. And now we have a trait so we can remove the wrapper. (I guess)
  2. User can share the same configured client between diffent http based client transport types.

Make OAuth2 a common wrapper for different client.

@jokemanfire
Copy link
Collaborator

No problem.

@4t145
Copy link
Collaborator Author

4t145 commented May 14, 2025

As it‘s messioned in #170, There are a lot of refactor. But the changes in examples is minimal. So it should not be impact to users.

@4t145 4t145 requested a review from Copilot May 14, 2025 09:29
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new streamable HTTP client feature while phasing out legacy SSE and IO transport implementations and unifying the transport interface. Key changes include:

  • Removal of the legacy SSE transport (sse.rs) and IO transport code.
  • Addition of a new sink_stream transport module and updates to common reqwest-based streamable HTTP client and SSE client modules.
  • Modifications in service and transport modules to use a unified Transport trait.

Reviewed Changes

Copilot reviewed 31 out of 31 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
crates/rmcp/src/transport/sse.rs Removed outdated SSE transport implementation.
crates/rmcp/src/transport/sink_stream.rs Introduced new sink_stream transport implementation.
crates/rmcp/src/transport/io.rs Removed legacy IO transport code.
crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs Added streamable HTTP client support using reqwest with SSE and JSON responses.
crates/rmcp/Cargo.toml Updated dependency features and configuration for streamable HTTP client and related modules.
(Other transport and service modules) Updated to adopt the new unified Transport trait and auth wrappers.
Comments suppressed due to low confidence (1)

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs:92

  • Setting the ACCEPT header twice may result in the first value being overwritten. Consider combining the MIME types into a single header using a comma-separated string.
let mut request = request.header(ACCEPT, EVENT_STREAM_MIME_TYPE).header(ACCEPT, JSON_MIME_TYPE);

@4t145 4t145 changed the title feat: Streamable http client refactor: Transport trait and worker transport, and streamable http client with those new features. May 14, 2025
@4t145
Copy link
Collaborator Author

4t145 commented May 14, 2025

It'a big refactor, please check it again @jokemanfire. And I just updated the introduce of this PR.

@4t145 4t145 requested a review from Copilot May 14, 2025 11:06
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the core transport abstraction to a new Transport trait, adds worker-based transports, and implements streamable HTTP and SSE clients (with optional OAuth2 via AuthClient).

  • Define and implement the new Transport<R> trait in place of raw Sink/Stream.
  • Add SinkStreamTransport, AsyncRwTransport, and WorkerTransport adapters.
  • Introduce streamable HTTP/SSE client modules and an AuthClient wrapper.

Reviewed Changes

Copilot reviewed 31 out of 31 changed files in this pull request and generated no comments.

Show a summary per file
File Description
crates/rmcp/src/transport/sink_stream.rs New SinkStreamTransport adapter implementing Transport
crates/rmcp/src/transport/async_rw.rs New AsyncRwTransport for AsyncRead/AsyncWrite under Transport
crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs New StreamableHttpClient impl for reqwest::Client
crates/rmcp/src/transport/common/reqwest/sse_client.rs New SSE client impl for reqwest::Client
crates/rmcp/src/transport/auth.rs Added AuthClient wrapper for OAuth2
crates/rmcp/src/transport.rs Updated Transport/IntoTransport traits and module exports
crates/rmcp/src/service/server.rs Refactored server to use Transport trait
crates/rmcp/src/service/client.rs Refactored client to use Transport trait
Comments suppressed due to low confidence (3)

crates/rmcp/src/transport/sink_stream.rs:1

  • The file defines methods returning impl Future<...> but does not import std::future::Future. Add use std::future::Future; to bring Future into scope.
use std::sync::Arc;

crates/rmcp/src/transport/auth.rs:16

  • The RwLock import is unused in this file. Consider removing it to avoid warnings and improve clarity.
use tokio::sync::{Mutex, RwLock};

crates/rmcp/src/transport/sink_stream.rs:66

  • [nitpick] The TransportAdapterAsyncCombinedRW enum is declared but never used. Remove or repurpose it to avoid dead code.
pub enum TransportAdapterAsyncCombinedRW {}

@jokemanfire
Copy link
Collaborator

I will check ,after this PR merger, I plan to include the example in the integration testing and further improve the documentation. I think we can release the first release version, but we need to agree that minor modifications should not include interface destructive changes, and only consider introducing it in the larger version. We also need to consider forward compatibility issues. What do you think?

@4t145
Copy link
Collaborator Author

4t145 commented May 15, 2025

@jokemanfire
I guess there could be possible breaking changes in transport part like web server support. And we still have a broken macro support, which could also be unstable for a while.

@4t145 4t145 force-pushed the streamable-http-client branch from 880ff03 to 89fa930 Compare May 15, 2025 09:40
@jokemanfire
Copy link
Collaborator

I will watch it after work tomorrow.

@4t145 4t145 marked this pull request as ready for review May 15, 2025 11:10
@4t145
Copy link
Collaborator Author

4t145 commented May 15, 2025

I think it's ready for review now. But still need to add an example to show how to use streamable http client.

@4t145 4t145 mentioned this pull request May 16, 2025
Copy link
Collaborator

@jokemanfire jokemanfire left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to keep watching tomorrow.

let next_sse = match event {
Event::Sse(Some(Ok(next_sse))) => next_sse,
Event::Sse(Some(Err(e))) => {
tracing::warn!("sse stream error: {e}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it deserve a error level log.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as it's still trying to reconnect, it haven't reached an unrecoverable error, that's what I think.

let next_sse = match event {
Some(Ok(next_sse)) => next_sse,
Some(Err(e)) => {
tracing::warn!("sse stream error: {e}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error level log

Self {
uri: "localhost".into(),
retry_config: SseRetryConfig::default(),
channel_buffer_capacity: 16,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it too small?

@jokemanfire
Copy link
Collaborator

jokemanfire commented May 17, 2025

Actually, for me, things like sse_client, sse_client streamable_tttp_client streamable_tttp_derver require a higher level of work async_rw, sink_stream, and so on are more low level. As they are called, I tend to separate these two parts in the code organization of transport for a fresher code structure. Thank you for completing such a large workload.

@4t145 4t145 force-pushed the streamable-http-client branch from 0576a27 to cc141b3 Compare May 17, 2025 10:32
@4t145
Copy link
Collaborator Author

4t145 commented May 17, 2025

@jokemanfire Now it's ready for review again

  1. I move the sse stream loop to common::client_side_sse
  2. Still remain the older interface for sse client, instead of use WorkerTransport
  3. For worker channel buffer size, I think 16 is enough for most case. Sending to transport and handling by handler both won't block the service loop both now, so there are little chance to really pending on the await point.

@4t145 4t145 requested a review from jokemanfire May 17, 2025 13:43
@4t145 4t145 merged commit 9a771fb into modelcontextprotocol:main May 17, 2025
9 checks passed
@gau-nernst gau-nernst mentioned this pull request May 19, 2025
3 tasks
@github-actions github-actions bot mentioned this pull request Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants