77//! | transport | client | server |
88//! |:-: |:-: |:-: |
99//! | std IO | [`child_process::TokioChildProcess`] | [`io::stdio`] |
10- //! | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::session::create_session`] |
10+ //! | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::StreamableHttpService`] |
1111//! | sse | [`sse_client::SseClientTransport`] | [`sse_server::SseServer`] |
1212//!
1313//!## Helper Transport Types
6464//! }
6565//! ```
6666
67+ use std:: sync:: Arc ;
68+
6769use crate :: service:: { RxJsonRpcMessage , ServiceRole , TxJsonRpcMessage } ;
6870
6971pub mod sink_stream;
@@ -122,7 +124,7 @@ pub use auth::{AuthError, AuthorizationManager, AuthorizationSession, Authorized
122124pub mod streamable_http_server;
123125#[ cfg( feature = "transport-streamable-http-server" ) ]
124126#[ cfg_attr( docsrs, doc( cfg( feature = "transport-streamable-http-server" ) ) ) ]
125- pub use streamable_http_server:: axum :: StreamableHttpServer ;
127+ pub use streamable_http_server:: tower :: { StreamableHttpServerConfig , StreamableHttpService } ;
126128
127129#[ cfg( feature = "transport-streamable-http-client" ) ]
128130#[ cfg_attr( docsrs, doc( cfg( feature = "transport-streamable-http-client" ) ) ) ]
@@ -138,7 +140,7 @@ pub trait Transport<R>: Send
138140where
139141 R : ServiceRole ,
140142{
141- type Error ;
143+ type Error : std :: error :: Error + Send + Sync + ' static ;
142144 /// Send a message to the transport
143145 ///
144146 /// Notice that the future returned by this function should be `Send` and `'static`.
@@ -169,9 +171,73 @@ impl<R, T, E> IntoTransport<R, E, TransportAdapterIdentity> for T
169171where
170172 T : Transport < R , Error = E > + Send + ' static ,
171173 R : ServiceRole ,
172- E : std:: error:: Error + Send + ' static ,
174+ E : std:: error:: Error + Send + Sync + ' static ,
173175{
174176 fn into_transport ( self ) -> impl Transport < R , Error = E > + ' static {
175177 self
176178 }
177179}
180+
181+ /// A transport that can send a single message and then close itself
182+ pub struct OneshotTransport < R >
183+ where
184+ R : ServiceRole ,
185+ {
186+ message : Option < RxJsonRpcMessage < R > > ,
187+ sender : tokio:: sync:: mpsc:: Sender < TxJsonRpcMessage < R > > ,
188+ finished_signal : Arc < tokio:: sync:: Notify > ,
189+ }
190+
191+ impl < R > OneshotTransport < R >
192+ where
193+ R : ServiceRole ,
194+ {
195+ pub fn new (
196+ message : RxJsonRpcMessage < R > ,
197+ ) -> ( Self , tokio:: sync:: mpsc:: Receiver < TxJsonRpcMessage < R > > ) {
198+ let ( sender, receiver) = tokio:: sync:: mpsc:: channel ( 16 ) ;
199+ (
200+ Self {
201+ message : Some ( message) ,
202+ sender,
203+ finished_signal : Arc :: new ( tokio:: sync:: Notify :: new ( ) ) ,
204+ } ,
205+ receiver,
206+ )
207+ }
208+ }
209+
210+ impl < R > Transport < R > for OneshotTransport < R >
211+ where
212+ R : ServiceRole ,
213+ {
214+ type Error = tokio:: sync:: mpsc:: error:: SendError < TxJsonRpcMessage < R > > ;
215+
216+ fn send (
217+ & mut self ,
218+ item : TxJsonRpcMessage < R > ,
219+ ) -> impl Future < Output = Result < ( ) , Self :: Error > > + Send + ' static {
220+ let sender = self . sender . clone ( ) ;
221+ let terminate = matches ! ( item, TxJsonRpcMessage :: <R >:: Response ( _) ) ;
222+ let signal = self . finished_signal . clone ( ) ;
223+ async move {
224+ sender. send ( item) . await ?;
225+ if terminate {
226+ signal. notify_waiters ( ) ;
227+ }
228+ Ok ( ( ) )
229+ }
230+ }
231+
232+ async fn receive ( & mut self ) -> Option < RxJsonRpcMessage < R > > {
233+ if self . message . is_none ( ) {
234+ self . finished_signal . notified ( ) . await ;
235+ }
236+ self . message . take ( )
237+ }
238+
239+ fn close ( & mut self ) -> impl Future < Output = Result < ( ) , Self :: Error > > + Send {
240+ self . message . take ( ) ;
241+ std:: future:: ready ( Ok ( ( ) ) )
242+ }
243+ }
0 commit comments