@@ -17,10 +17,11 @@ use iroh::{
17
17
use n0_future:: { MaybeFuture , boxed:: BoxFuture } ;
18
18
use snafu:: Snafu ;
19
19
use tokio:: {
20
- sync:: mpsc,
20
+ sync:: { mpsc, oneshot } ,
21
21
task:: { JoinError , JoinSet } ,
22
22
} ;
23
23
use tokio_util:: time:: FutureExt ;
24
+ use tracing:: { debug, error, trace} ;
24
25
25
26
/// Configuration options for the connection pool
26
27
#[ derive( Debug , Clone , Copy ) ]
@@ -47,24 +48,36 @@ struct Context {
47
48
alpn : Vec < u8 > ,
48
49
}
49
50
50
- type BoxedHandler =
51
- Box < dyn FnOnce ( & PoolConnectResult ) -> BoxFuture < ExecuteResult > + Send + ' static > ;
51
+ type BoxedHandler = Box < dyn FnOnce ( PoolConnectResult ) -> BoxFuture < ExecuteResult > + Send + ' static > ;
52
52
53
53
/// Error when a connection can not be acquired
54
54
///
55
55
/// This includes the normal iroh connection errors as well as pool specific
56
56
/// errors such as timeouts and connection limits.
57
+ #[ derive( Debug , Clone ) ]
57
58
pub enum PoolConnectError {
58
59
/// Timeout during connect
59
60
Timeout ,
60
61
/// Too many connections
61
62
TooManyConnections ,
62
63
/// Error during connect
63
- ConnectError ( ConnectError ) ,
64
+ ConnectError ( Arc < ConnectError > ) ,
64
65
/// Error during last execute
65
- ExecuteError ( ExecuteError ) ,
66
+ ExecuteError ( Arc < ExecuteError > ) ,
66
67
/// Handler actor panicked
67
- JoinError ( JoinError ) ,
68
+ JoinError ( Arc < JoinError > ) ,
69
+ }
70
+
71
+ impl std:: fmt:: Display for PoolConnectError {
72
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
73
+ match self {
74
+ PoolConnectError :: Timeout => write ! ( f, "Connection timed out" ) ,
75
+ PoolConnectError :: TooManyConnections => write ! ( f, "Too many connections" ) ,
76
+ PoolConnectError :: ConnectError ( e) => write ! ( f, "Connection error: {}" , e) ,
77
+ PoolConnectError :: ExecuteError ( e) => write ! ( f, "Execution error: {}" , e) ,
78
+ PoolConnectError :: JoinError ( e) => write ! ( f, "Join error: {}" , e) ,
79
+ }
80
+ }
68
81
}
69
82
70
83
pub type PoolConnectResult = std:: result:: Result < Connection , PoolConnectError > ;
@@ -88,11 +101,14 @@ async fn run_connection_actor(
88
101
. await
89
102
{
90
103
Ok ( Ok ( conn) ) => Ok ( conn) ,
91
- Ok ( Err ( e) ) => Err ( PoolConnectError :: ConnectError ( e ) ) ,
104
+ Ok ( Err ( e) ) => Err ( PoolConnectError :: ConnectError ( Arc :: new ( e ) ) ) ,
92
105
Err ( _) => Err ( PoolConnectError :: Timeout ) ,
93
106
} ;
94
- if state. is_err ( ) && context. owner . close ( node_id) . await . is_err ( ) {
95
- return ;
107
+ if let Err ( e) = & state {
108
+ debug ! ( %node_id, "Failed to connect {e:?}, requesting shutdown" ) ;
109
+ if context. owner . close ( node_id) . await . is_err ( ) {
110
+ return ;
111
+ }
96
112
}
97
113
98
114
let mut tasks = JoinSet :: new ( ) ;
@@ -107,9 +123,10 @@ async fn run_connection_actor(
107
123
handler = rx. recv( ) => {
108
124
match handler {
109
125
Some ( handler) => {
126
+ trace!( %node_id, "Received new task" ) ;
110
127
// clear the idle timer
111
128
idle_timer. as_mut( ) . set_none( ) ;
112
- tasks. spawn( handler( & state) ) ;
129
+ tasks. spawn( handler( state. clone ( ) ) ) ;
113
130
}
114
131
None => {
115
132
// Channel closed - finish remaining tasks and exit
@@ -122,22 +139,22 @@ async fn run_connection_actor(
122
139
Some ( task_result) = tasks. join_next( ) , if !tasks. is_empty( ) => {
123
140
match task_result {
124
141
Ok ( Ok ( ( ) ) ) => {
125
- tracing :: debug!( "Task completed for node {}" , node_id ) ;
142
+ debug!( %node_id , "Task completed" ) ;
126
143
}
127
144
Ok ( Err ( e) ) => {
128
- tracing :: error!( "Task failed for node {} : {}" , node_id , e) ;
145
+ error!( %node_id , "Task failed: {}" , e) ;
129
146
if let Ok ( conn) = state {
130
147
conn. close( 1u32 . into( ) , b"error" ) ;
131
148
}
132
- state = Err ( PoolConnectError :: ExecuteError ( e ) ) ;
149
+ state = Err ( PoolConnectError :: ExecuteError ( Arc :: new ( e ) ) ) ;
133
150
let _ = context. owner. close( node_id) . await ;
134
151
}
135
152
Err ( e) => {
136
- tracing :: error!( "Task panicked for node {} : {}" , node_id , e) ;
153
+ error!( %node_id , "Task panicked: {}" , e) ;
137
154
if let Ok ( conn) = state {
138
155
conn. close( 1u32 . into( ) , b"panic" ) ;
139
156
}
140
- state = Err ( PoolConnectError :: JoinError ( e ) ) ;
157
+ state = Err ( PoolConnectError :: JoinError ( Arc :: new ( e ) ) ) ;
141
158
let _ = context. owner. close( node_id) . await ;
142
159
}
143
160
}
@@ -155,8 +172,7 @@ async fn run_connection_actor(
155
172
156
173
// Idle timeout - request shutdown
157
174
_ = & mut idle_timer => {
158
- tracing:: debug!( "Connection to {} idle, requesting shutdown" , node_id) ;
159
-
175
+ debug!( %node_id, "Connection idle, requesting shutdown" ) ;
160
176
context. owner. close( node_id) . await . ok( ) ;
161
177
// Don't break here - wait for main actor to close our channel
162
178
}
@@ -166,15 +182,15 @@ async fn run_connection_actor(
166
182
// Wait for remaining tasks to complete
167
183
while let Some ( task_result) = tasks. join_next ( ) . await {
168
184
if let Err ( e) = task_result {
169
- tracing :: error!( "Task failed during shutdown for node {} : {}" , node_id , e) ;
185
+ error ! ( %node_id , "Task failed during shutdown: {}" , e) ;
170
186
}
171
187
}
172
188
173
189
if let Ok ( connection) = & state {
174
190
connection. close ( 0u32 . into ( ) , b"idle" ) ;
175
191
}
176
192
177
- tracing :: debug!( "Connection actor for {} shutting down" , node_id ) ;
193
+ debug ! ( %node_id , "Connection actor shutting down" ) ;
178
194
}
179
195
180
196
struct Actor {
@@ -224,7 +240,7 @@ impl Actor {
224
240
225
241
// No connection actor or it died - spawn a new one
226
242
if self . connections . len ( ) >= self . context . options . max_connections {
227
- handler ( & Err ( PoolConnectError :: TooManyConnections ) )
243
+ handler ( Err ( PoolConnectError :: TooManyConnections ) )
228
244
. await
229
245
. ok ( ) ;
230
246
continue ;
@@ -273,7 +289,14 @@ pub struct ExecuteError;
273
289
274
290
type ExecuteResult = std:: result:: Result < ( ) , ExecuteError > ;
275
291
292
+ impl From < PoolConnectError > for ExecuteError {
293
+ fn from ( _: PoolConnectError ) -> Self {
294
+ ExecuteError
295
+ }
296
+ }
297
+
276
298
/// A connection pool
299
+ #[ derive( Debug , Clone ) ]
277
300
pub struct ConnectionPool {
278
301
tx : mpsc:: Sender < ActorMessage > ,
279
302
}
@@ -301,11 +324,11 @@ impl ConnectionPool {
301
324
f : F ,
302
325
) -> std:: result:: Result < ( ) , ConnectionPoolError >
303
326
where
304
- F : FnOnce ( & PoolConnectResult ) -> Fut + Send + ' static ,
305
- Fut : std :: future :: Future < Output = ExecuteResult > + Send + ' static ,
327
+ F : FnOnce ( PoolConnectResult ) -> Fut + Send + ' static ,
328
+ Fut : Future < Output = ExecuteResult > + Send + ' static ,
306
329
{
307
330
let handler =
308
- Box :: new ( move |conn : & PoolConnectResult | Box :: pin ( f ( conn) ) as BoxFuture < ExecuteResult > ) ;
331
+ Box :: new ( move |conn : PoolConnectResult | Box :: pin ( f ( conn) ) as BoxFuture < ExecuteResult > ) ;
309
332
310
333
self . tx
311
334
. send ( ActorMessage :: Handle { id, handler } )
@@ -315,6 +338,37 @@ impl ConnectionPool {
315
338
Ok ( ( ) )
316
339
}
317
340
341
+ pub async fn with_connection < F , Fut , I , E > (
342
+ & self ,
343
+ id : NodeId ,
344
+ f : F ,
345
+ ) -> Result < Result < Result < I , E > , PoolConnectError > , ConnectionPoolError >
346
+ where
347
+ F : FnOnce ( Connection ) -> Fut + Send + ' static ,
348
+ Fut : Future < Output = Result < I , E > > + Send + ' static ,
349
+ I : Send + ' static ,
350
+ E : Send + ' static ,
351
+ {
352
+ let ( tx, rx) = oneshot:: channel ( ) ;
353
+ self . connect ( id, |conn| async move {
354
+ let ( res, ret) = match conn {
355
+ Ok ( connection) => {
356
+ let res = f ( connection) . await ;
357
+ let ret = match & res {
358
+ Ok ( _) => Ok ( ( ) ) ,
359
+ Err ( _) => Err ( ExecuteError ) ,
360
+ } ;
361
+ ( Ok ( res) , ret)
362
+ }
363
+ Err ( e) => ( Err ( e) , Err ( ExecuteError ) ) ,
364
+ } ;
365
+ tx. send ( res) . ok ( ) ;
366
+ ret
367
+ } )
368
+ . await ?;
369
+ rx. await . map_err ( |_| ConnectionPoolError :: Shutdown )
370
+ }
371
+
318
372
/// Close an existing connection, if it exists
319
373
///
320
374
/// This will finish pending tasks and close the connection. New tasks will
0 commit comments