Skip to content

Commit d9a884f

Browse files
committed
refactor(net): replace rx_deferred_frame
Before `readv` change, `rx_deferred_frame` was used to signal if there was present packet in the internal rx buffer, so we would try to send it first before attempting to read new one. With `readv` we don't use internal buffer for RX packets anymore, so the logic of `deferred` packet changed. Now any packet is `deferred` when it is written to the guest memory. After the write we try to notify guest about it, but only if rate limiter allows it. To track this we now use `deferred_rx_bytes` which holds the number of bytes we yet to notify guest about. We can conveniently use `Option<NonZeroUsize>` for this where `Some(...)` will mean there are some `deferred` bytes we yet to notify guest about and `None` will mean we are done and can read the next packet. Signed-off-by: Egor Lazarchuk <[email protected]>
1 parent 1486edb commit d9a884f

File tree

1 file changed

+41
-135
lines changed

1 file changed

+41
-135
lines changed

src/vmm/src/devices/virtio/net/device.rs

Lines changed: 41 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use std::mem;
99
use std::net::Ipv4Addr;
10+
use std::num::NonZeroUsize;
1011
use std::sync::{Arc, Mutex};
1112

1213
use libc::EAGAIN;
@@ -107,9 +108,9 @@ pub struct Net {
107108
pub(crate) rx_rate_limiter: RateLimiter,
108109
pub(crate) tx_rate_limiter: RateLimiter,
109110

110-
pub(crate) rx_deferred_frame: bool,
111-
112-
rx_bytes_read: usize,
111+
/// Used to store last RX packet size and
112+
/// rate limit RX queue.
113+
deferred_rx_bytes: Option<NonZeroUsize>,
113114
rx_frame_buf: [u8; MAX_BUFFER_SIZE],
114115

115116
tx_frame_headers: [u8; frame_hdr_len()],
@@ -176,8 +177,7 @@ impl Net {
176177
queue_evts,
177178
rx_rate_limiter,
178179
tx_rate_limiter,
179-
rx_deferred_frame: false,
180-
rx_bytes_read: 0,
180+
deferred_rx_bytes: None,
181181
rx_frame_buf: [0u8; MAX_BUFFER_SIZE],
182182
tx_frame_headers: [0u8; frame_hdr_len()],
183183
irq_trigger: IrqTrigger::new().map_err(NetError::EventFd)?,
@@ -299,16 +299,22 @@ impl Net {
299299
// Attempts to copy a single frame into the guest if there is enough
300300
// rate limiting budget.
301301
// Returns true on successful frame delivery.
302-
fn rate_limited_rx_single_frame(&mut self) -> bool {
303-
let rx_queue = &mut self.queues[RX_INDEX];
304-
if !Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, self.rx_bytes_read as u64) {
305-
self.metrics.rx_rate_limiter_throttled.inc();
306-
return false;
302+
fn send_deferred_rx_bytes(&mut self) -> bool {
303+
match self.deferred_rx_bytes {
304+
Some(bytes) => {
305+
if Self::rate_limiter_consume_op(&mut self.rx_rate_limiter, bytes.get() as u64) {
306+
// The packet is good to go, reset `deferred_rx_bytes`.
307+
self.deferred_rx_bytes = None;
308+
// Attempt frame delivery.
309+
self.rx_buffer.notify_queue(&mut self.queues[RX_INDEX]);
310+
true
311+
} else {
312+
self.metrics.rx_rate_limiter_throttled.inc();
313+
false
314+
}
315+
}
316+
None => true,
307317
}
308-
309-
// Attempt frame delivery.
310-
self.rx_buffer.notify_queue(rx_queue);
311-
true
312318
}
313319

314320
/// Parse available RX `DescriptorChains` from the queue and
@@ -457,6 +463,10 @@ impl Net {
457463

458464
/// Read as many frames as possible.
459465
fn process_rx(&mut self) -> Result<(), DeviceError> {
466+
if !self.send_deferred_rx_bytes() {
467+
return Ok(());
468+
}
469+
460470
if self.rx_buffer.is_empty() {
461471
self.parse_rx_descriptors();
462472
}
@@ -468,11 +478,10 @@ impl Net {
468478
break;
469479
}
470480
Ok(count) => {
471-
self.rx_bytes_read = count;
481+
self.deferred_rx_bytes = NonZeroUsize::new(count);
472482
self.metrics.rx_count.inc();
473483
self.metrics.rx_packets_count.inc();
474-
if !self.rate_limited_rx_single_frame() {
475-
self.rx_deferred_frame = true;
484+
if !self.send_deferred_rx_bytes() {
476485
break;
477486
}
478487
}
@@ -498,26 +507,6 @@ impl Net {
498507
self.try_signal_queue(NetQueue::Rx)
499508
}
500509

501-
// Process the deferred frame first, then continue reading from tap.
502-
fn handle_deferred_frame(&mut self) -> Result<(), DeviceError> {
503-
if self.rate_limited_rx_single_frame() {
504-
self.rx_deferred_frame = false;
505-
// process_rx() was interrupted possibly before consuming all
506-
// packets in the tap; try continuing now.
507-
return self.process_rx();
508-
}
509-
510-
self.try_signal_queue(NetQueue::Rx)
511-
}
512-
513-
fn resume_rx(&mut self) -> Result<(), DeviceError> {
514-
if self.rx_deferred_frame {
515-
self.handle_deferred_frame()
516-
} else {
517-
self.process_rx()
518-
}
519-
}
520-
521510
fn process_tx(&mut self) -> Result<(), DeviceError> {
522511
// This is safe since we checked in the event handler that the device is activated.
523512
let mem = self.device_state.mem().unwrap();
@@ -576,7 +565,7 @@ impl Net {
576565
&self.metrics,
577566
)
578567
.unwrap_or(false);
579-
if frame_consumed_by_mmds && !self.rx_deferred_frame {
568+
if frame_consumed_by_mmds {
580569
// MMDS consumed this frame/request, let's also try to process the response.
581570
process_rx_for_mmds = true;
582571
}
@@ -687,7 +676,7 @@ impl Net {
687676
self.metrics.rx_rate_limiter_throttled.inc();
688677
} else {
689678
// If the limiter is not blocked, resume the receiving of bytes.
690-
self.resume_rx()
679+
self.process_rx()
691680
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
692681
}
693682
}
@@ -696,31 +685,14 @@ impl Net {
696685
// This is safe since we checked in the event handler that the device is activated.
697686
self.metrics.rx_tap_event_count.inc();
698687

699-
// While there are no available RX queue buffers and there's a deferred_frame
700-
// don't process any more incoming. Otherwise start processing a frame. In the
701-
// process the deferred_frame flag will be set in order to avoid freezing the
702-
// RX queue.
703-
if self.queues[RX_INDEX].is_empty() && self.rx_deferred_frame {
704-
self.metrics.no_rx_avail_buffer.inc();
705-
return;
706-
}
707-
708688
// While limiter is blocked, don't process any more incoming.
709689
if self.rx_rate_limiter.is_blocked() {
710690
self.metrics.rx_rate_limiter_throttled.inc();
711691
return;
712692
}
713693

714-
if self.rx_deferred_frame
715-
// Process a deferred frame first if available. Don't read from tap again
716-
// until we manage to receive this deferred frame.
717-
{
718-
self.handle_deferred_frame()
719-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
720-
} else {
721-
self.process_rx()
722-
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
723-
}
694+
self.process_rx()
695+
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
724696
}
725697

726698
/// Process a single TX queue event.
@@ -750,7 +722,7 @@ impl Net {
750722
match self.rx_rate_limiter.event_handler() {
751723
Ok(_) => {
752724
// There might be enough budget now to receive the frame.
753-
self.resume_rx()
725+
self.process_rx()
754726
.unwrap_or_else(|err| report_net_event_fail(&self.metrics, err));
755727
}
756728
Err(err) => {
@@ -779,7 +751,7 @@ impl Net {
779751

780752
/// Process device virtio queue(s).
781753
pub fn process_virtio_queues(&mut self) {
782-
let _ = self.resume_rx();
754+
let _ = self.process_rx();
783755
let _ = self.process_tx();
784756
}
785757
}
@@ -1197,7 +1169,7 @@ pub mod tests {
11971169
th.rxq.check_used_elem(0, 0, 0);
11981170
th.rxq.check_used_elem(1, 3, 0);
11991171
// Check that the frame wasn't deferred.
1200-
assert!(!th.net().rx_deferred_frame);
1172+
assert!(th.net().deferred_rx_bytes.is_none());
12011173
// Check that the frame has been written successfully to the valid Rx descriptor chain.
12021174
th.rxq
12031175
.check_used_elem(2, 4, frame.len().try_into().unwrap());
@@ -1244,7 +1216,7 @@ pub mod tests {
12441216
);
12451217

12461218
// Check that the frame wasn't deferred.
1247-
assert!(!th.net().rx_deferred_frame);
1219+
assert!(th.net().deferred_rx_bytes.is_none());
12481220
// Check that the used queue has advanced.
12491221
assert_eq!(th.rxq.used.idx.get(), 1);
12501222
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1297,7 +1269,7 @@ pub mod tests {
12971269
);
12981270

12991271
// Check that the frames weren't deferred.
1300-
assert!(!th.net().rx_deferred_frame);
1272+
assert!(th.net().deferred_rx_bytes.is_none());
13011273
// Check that the used queue has advanced.
13021274
assert_eq!(th.rxq.used.idx.get(), 2);
13031275
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1361,7 +1333,7 @@ pub mod tests {
13611333
);
13621334

13631335
// Check that the frame wasn't deferred.
1364-
assert!(!th.net().rx_deferred_frame);
1336+
assert!(th.net().deferred_rx_bytes.is_none());
13651337
// Check that the used queue has advanced.
13661338
assert_eq!(th.rxq.used.idx.get(), 2);
13671339
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
@@ -1787,20 +1759,7 @@ pub mod tests {
17871759
// SAFETY: its a valid fd
17881760
unsafe { libc::close(th.net.lock().unwrap().tap.as_raw_fd()) };
17891761

1790-
// The RX queue is empty and rx_deffered_frame is set.
1791-
th.net().rx_deferred_frame = true;
1792-
check_metric_after_block!(
1793-
th.net().metrics.no_rx_avail_buffer,
1794-
1,
1795-
th.simulate_event(NetEvent::Tap)
1796-
);
1797-
1798-
// We need to set this here to false, otherwise the device will try to
1799-
// handle a deferred frame, it will fail and will never try to read from
1800-
// the tap.
1801-
th.net().rx_deferred_frame = false;
1802-
1803-
// Fake an avail buffer; this time, tap reading should error out.
1762+
// Fake an avail buffer; tap reading should error out.
18041763
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
18051764
check_metric_after_block!(
18061765
th.net().metrics.tap_read_fails,
@@ -1809,59 +1768,6 @@ pub mod tests {
18091768
);
18101769
}
18111770

1812-
#[test]
1813-
fn test_deferred_frame() {
1814-
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
1815-
let mut th = TestHelper::get_default(&mem);
1816-
th.activate_net();
1817-
1818-
let rx_packets_count = th.net().metrics.rx_packets_count.count();
1819-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1820-
// Trigger a Tap event that. This should fail since there
1821-
// are not any available descriptors in the queue
1822-
check_metric_after_block!(
1823-
th.net().metrics.no_rx_avail_buffer,
1824-
1,
1825-
th.simulate_event(NetEvent::Tap)
1826-
);
1827-
// The frame we read from the tap should be deferred now and
1828-
// no frames should have been transmitted
1829-
assert!(th.net().rx_deferred_frame);
1830-
assert_eq!(th.net().metrics.rx_packets_count.count(), rx_packets_count);
1831-
1832-
// Let's add a second frame, which should really have the same
1833-
// fate.
1834-
let _ = inject_tap_tx_frame(&th.net(), 1000);
1835-
1836-
// Adding a descriptor in the queue. This should handle the first deferred
1837-
// frame. However, this should try to handle the second tap as well and fail
1838-
// since there's only one Descriptor Chain in the queue.
1839-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1840-
check_metric_after_block!(
1841-
th.net().metrics.no_rx_avail_buffer,
1842-
1,
1843-
th.simulate_event(NetEvent::Tap)
1844-
);
1845-
// We should still have a deferred frame
1846-
assert!(th.net().rx_deferred_frame);
1847-
// However, we should have delivered the first frame
1848-
assert_eq!(
1849-
th.net().metrics.rx_packets_count.count(),
1850-
rx_packets_count + 1
1851-
);
1852-
1853-
// Let's add one more descriptor and try to handle the last frame as well.
1854-
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
1855-
check_metric_after_block!(
1856-
th.net().metrics.rx_packets_count,
1857-
1,
1858-
th.simulate_event(NetEvent::RxQueue)
1859-
);
1860-
1861-
// We should be done with any deferred frame
1862-
assert!(!th.net().rx_deferred_frame);
1863-
}
1864-
18651771
#[test]
18661772
fn test_rx_rate_limiter_handling() {
18671773
let mem = single_region_mem(2 * MAX_BUFFER_SIZE);
@@ -1974,7 +1880,7 @@ pub mod tests {
19741880
let mut rl = RateLimiter::new(1000, 0, 500, 0, 0, 0).unwrap();
19751881

19761882
// set up RX
1977-
assert!(!th.net().rx_deferred_frame);
1883+
assert!(th.net().deferred_rx_bytes.is_none());
19781884
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
19791885

19801886
let frame = inject_tap_tx_frame(&th.net(), 1000);
@@ -1994,7 +1900,7 @@ pub mod tests {
19941900
// assert that limiter is blocked
19951901
assert!(th.net().rx_rate_limiter.is_blocked());
19961902
assert_eq!(th.net().metrics.rx_rate_limiter_throttled.count(), 1);
1997-
assert!(th.net().rx_deferred_frame);
1903+
assert!(th.net().deferred_rx_bytes.is_some());
19981904
// assert that no operation actually completed (limiter blocked it)
19991905
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
20001906
// make sure the data is still queued for processing
@@ -2092,7 +1998,7 @@ pub mod tests {
20921998
let mut rl = RateLimiter::new(0, 0, 0, 1, 0, 500).unwrap();
20931999

20942000
// set up RX
2095-
assert!(!th.net().rx_deferred_frame);
2001+
assert!(th.net().deferred_rx_bytes.is_none());
20962002
th.add_desc_chain(NetQueue::Rx, 0, &[(0, 4096, VIRTQ_DESC_F_WRITE)]);
20972003
let frame = inject_tap_tx_frame(&th.net(), 1234);
20982004

@@ -2114,7 +2020,7 @@ pub mod tests {
21142020
// assert that limiter is blocked
21152021
assert!(th.net().rx_rate_limiter.is_blocked());
21162022
assert!(th.net().metrics.rx_rate_limiter_throttled.count() >= 1);
2117-
assert!(th.net().rx_deferred_frame);
2023+
assert!(th.net().deferred_rx_bytes.is_some());
21182024
// assert that no operation actually completed (limiter blocked it)
21192025
assert!(&th.net().irq_trigger.has_pending_irq(IrqType::Vring));
21202026
// make sure the data is still queued for processing

0 commit comments

Comments
 (0)