@@ -51,6 +51,12 @@ pub(super) struct Prioritize {
5151
5252 /// What `DATA` frame is currently being sent in the codec.
5353 in_flight_data_frame : InFlightData ,
54+
55+ /// The max send buffer size allowed.
56+ max_send_buffer_size : usize ,
57+
58+ /// The current send buffer size.
59+ current_send_buffer_size : usize ,
5460}
5561
5662#[ derive( Debug , Eq , PartialEq ) ]
@@ -93,9 +99,17 @@ impl Prioritize {
9399 flow,
94100 last_opened_id : StreamId :: ZERO ,
95101 in_flight_data_frame : InFlightData :: Nothing ,
102+ max_send_buffer_size : usize:: MAX ,
103+ current_send_buffer_size : 0 ,
96104 }
97105 }
98106
107+ pub fn set_max_send_buffer_size ( & mut self , max : usize , store : & mut Store , counts : & mut Counts ) {
108+ self . max_send_buffer_size = max;
109+
110+ self . assign_connection_capacity ( 0 , store, counts) ;
111+ }
112+
99113 /// Queue a frame to be sent to the remote
100114 pub fn queue_frame < B > (
101115 & mut self ,
@@ -175,6 +189,8 @@ impl Prioritize {
175189 self . try_assign_capacity ( stream) ;
176190 }
177191
192+ self . current_send_buffer_size += sz as usize ;
193+
178194 if frame. is_end_stream ( ) {
179195 stream. state . send_close ( ) ;
180196 self . reserve_capacity ( 0 , stream, counts) ;
@@ -350,7 +366,7 @@ impl Prioritize {
350366 self . flow . assign_capacity ( inc) ;
351367
352368 // Assign newly acquired capacity to streams pending capacity.
353- while self . flow . available ( ) > 0 {
369+ while self . available ( ) > 0 {
354370 let stream = match self . pending_capacity . pop ( store) {
355371 Some ( stream) => stream,
356372 None => return ,
@@ -373,6 +389,17 @@ impl Prioritize {
373389 }
374390 }
375391
392+ fn available ( & self ) -> WindowSize {
393+ cmp:: min (
394+ self . flow . available ( ) . as_size ( ) as usize ,
395+ cmp:: min (
396+ self . max_send_buffer_size
397+ . saturating_sub ( self . current_send_buffer_size ) ,
398+ WindowSize :: MAX as usize ,
399+ ) ,
400+ ) as WindowSize
401+ }
402+
376403 /// Request capacity to send data
377404 fn try_assign_capacity ( & mut self , stream : & mut store:: Ptr ) {
378405 let total_requested = stream. requested_send_capacity ;
@@ -395,7 +422,8 @@ impl Prioritize {
395422 additional,
396423 buffered = stream. buffered_send_data,
397424 window = stream. send_flow. window_size( ) ,
398- conn = %self . flow. available( )
425+ conn_window = %self . flow. available( ) ,
426+ conn = self . available( ) ,
399427 ) ;
400428
401429 if additional == 0 {
@@ -413,7 +441,7 @@ impl Prioritize {
413441 ) ;
414442
415443 // The amount of currently available capacity on the connection
416- let conn_available = self . flow . available ( ) . as_size ( ) ;
444+ let conn_available = self . available ( ) ;
417445
418446 // First check if capacity is immediately available
419447 if conn_available > 0 {
@@ -509,6 +537,10 @@ impl Prioritize {
509537
510538 // Because, always try to reclaim...
511539 self . reclaim_frame ( buffer, store, dst) ;
540+
541+ // Maybe schedule streams if the send buffer is not full
542+ // anymore.
543+ self . assign_connection_capacity ( 0 , store, counts) ;
512544 }
513545 None => {
514546 // Try to flush the codec.
@@ -630,6 +662,8 @@ impl Prioritize {
630662 tracing:: trace!( ?frame, "dropping" ) ;
631663 }
632664
665+ self . current_send_buffer_size -= stream. buffered_send_data ;
666+
633667 stream. buffered_send_data = 0 ;
634668 stream. requested_send_capacity = 0 ;
635669 if let InFlightData :: DataFrame ( key) = self . in_flight_data_frame {
@@ -736,6 +770,8 @@ impl Prioritize {
736770 tracing:: trace_span!( "updating stream flow" ) . in_scope ( || {
737771 stream. send_flow . send_data ( len) ;
738772
773+ self . current_send_buffer_size -= len as usize ;
774+
739775 // Decrement the stream's buffered data counter
740776 debug_assert ! ( stream. buffered_send_data >= len as usize ) ;
741777 stream. buffered_send_data -= len as usize ;
0 commit comments