Skip to content

Commit fb302fa

Browse files
committed
Split WriteData command into its own channel v2
1 parent 98fdb93 commit fb302fa

File tree

1 file changed

+77
-94
lines changed

1 file changed

+77
-94
lines changed

lightning-net/src/lib.rs

Lines changed: 77 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
6161
use std::sync::Arc;
6262
use std::thread::{self, JoinHandle};
6363

64-
use crossbeam_channel::{Receiver, Sender, TryRecvError, TrySendError};
64+
use crossbeam_channel::{select, Receiver, Sender, TryRecvError, TrySendError};
6565

6666
use bitcoin::secp256k1::key::PublicKey;
6767
use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, RoutingMessageHandler};
@@ -137,7 +137,8 @@ where
137137
UMH: CustomMessageHandler + 'static + Send + Sync,
138138
{
139139
// Init channels
140-
let (reader_cmd_tx, reader_cmd_rx, writer_cmd_tx, writer_cmd_rx) = init_channels();
140+
let (reader_cmd_tx, reader_cmd_rx, writer_cmd_tx, writer_cmd_rx, write_data_tx, write_data_rx) =
141+
init_channels();
141142

142143
// Generate a new ID that represents this connection
143144
let conn_id = next_connection_id();
@@ -151,8 +152,13 @@ where
151152
let tcp_disconnector = TcpDisconnectooor(disconnector_stream);
152153

153154
// Init SyncSocketDescriptor
154-
let mut descriptor =
155-
SyncSocketDescriptor::new(conn_id, tcp_disconnector, reader_cmd_tx, writer_cmd_tx);
155+
let mut descriptor = SyncSocketDescriptor::new(
156+
conn_id,
157+
tcp_disconnector,
158+
reader_cmd_tx,
159+
writer_cmd_tx,
160+
write_data_tx,
161+
);
156162

157163
// Init Reader and Writer
158164
let mut reader: Reader<CMH, RMH, L, UMH> = Reader::new(
@@ -166,6 +172,7 @@ where
166172
peer_manager.clone(),
167173
descriptor.clone(),
168174
writer_cmd_rx,
175+
write_data_rx,
169176
);
170177

171178
// Notify the PeerManager of the new connection depending on its ConnectionType.
@@ -236,29 +243,17 @@ enum ReaderCommand {
236243

237244
/// Commands that can be sent to the Writer.
238245
enum WriterCommand {
239-
WriteData(Vec<u8>),
240246
Shutdown,
241247
}
242248

243-
/// Initializes the crossbeam channels for sending `ReaderCommand`s and
244-
/// `WriterCommand`s.
245-
///
246-
/// The `ReaderCommand` channel is unbounded, and can be used to tell the
247-
/// `Reader` to resume reads, pause reads, or shut down.
248-
///
249-
/// The `WriterCommand` channel has a capacity of 2, and can be used to tell the
250-
/// `Writer` to write a Vec<u8> of data, or shut down.
249+
/// Initializes the crossbeam channels required for a connection.
251250
///
252-
/// - The WriterCommand channel is size 2 so as to have 1 dedicated space for
253-
/// each type of WriterCommand: WriteData and Shutdown. A Shutdown command can
254-
/// be pushed into the channel at any time, but to ensure that there is always
255-
/// space for it, send_data() will only ever push a WriteData command into the
256-
/// channel after it first detects that the channel is completely empty.
257-
/// - Allocating only one slot in the channel for WriteData commands allows
258-
/// send_data() to quickly detect that writes are still processing. This space
259-
/// can be thought of as a second buffer, where the first buffer is the Writer
260-
/// internal buffer (`self.buf`) and the third buffer is the &[u8] passed into
261-
/// send_data().
251+
/// - The `reader_cmd` channel is unbounded, and can be used to tell the
252+
/// `Reader` to resume reads, pause reads, or shut down.
253+
/// - The `writer_cmd` channel is unbounded, and can be used to tell the
254+
/// `Writer` to shut down.
255+
/// - The `write_data` channel has a capacity of 1, and can be used to request a
256+
/// write of a Vec<u8> of data.
262257
///
263258
/// Finally:
264259
///
@@ -271,38 +266,36 @@ fn init_channels() -> (
271266
Receiver<ReaderCommand>,
272267
Sender<WriterCommand>,
273268
Receiver<WriterCommand>,
269+
Sender<Vec<u8>>,
270+
Receiver<Vec<u8>>,
274271
) {
275272
let (reader_cmd_tx, reader_cmd_rx) = crossbeam_channel::unbounded();
273+
let (writer_cmd_tx, writer_cmd_rx) = crossbeam_channel::unbounded();
274+
let (write_data_tx, write_data_rx) = crossbeam_channel::bounded(1);
276275

277-
let (writer_cmd_tx, writer_cmd_rx) = crossbeam_channel::bounded(2);
278-
279-
(reader_cmd_tx, reader_cmd_rx, writer_cmd_tx, writer_cmd_rx)
276+
(
277+
reader_cmd_tx,
278+
reader_cmd_rx,
279+
writer_cmd_tx,
280+
writer_cmd_rx,
281+
write_data_tx,
282+
write_data_rx,
283+
)
280284
}
281285

282286
/// A concrete implementation of the SocketDescriptor.
283287
///
284288
/// A SyncSocketDescriptor is essentially a `clone()`able handle to an
285289
/// underlying connection as well as an identifier for that connection.
286290
///
287-
/// It consists of an ID representing the unique connection to the peer,
288-
/// crossbeam channel `Sender`s for the `Reader` and `Writer`, and a
289-
/// `TcpDisconnectooor` that can be used to shut down the underlying `TcpStream`
290-
/// in the event that both the `Reader` and the `Writer` are blocked on
291-
/// `recv()`ing from their crossbeam channels.
292-
///
293-
/// A `SyncSocketDescriptor` allows a `PeerManager` to manage a connection via
294-
/// its calls to `send_data()` and `disconnect_socket()`. Furthermore,
295-
/// the `Reader` and `Writer` each hold a copy to pass along with their
296-
/// calls into the `PeerManager`, so that the `PeerManager` can identify which
297-
/// connection is currently being processed during calls into the `PeerManager`.
298-
///
299291
/// This type is public only because handle_connection() requires it to be.
300292
#[derive(Clone)]
301293
pub struct SyncSocketDescriptor {
302294
id: u64,
303295
tcp_disconnector: TcpDisconnectooor,
304296
reader_cmd_tx: Sender<ReaderCommand>,
305297
writer_cmd_tx: Sender<WriterCommand>,
298+
write_data_tx: Sender<Vec<u8>>,
306299
}
307300
impl PartialEq for SyncSocketDescriptor {
308301
fn eq(&self, other: &Self) -> bool {
@@ -321,19 +314,21 @@ impl SyncSocketDescriptor {
321314
tcp_disconnector: TcpDisconnectooor,
322315
reader_cmd_tx: Sender<ReaderCommand>,
323316
writer_cmd_tx: Sender<WriterCommand>,
317+
write_data_tx: Sender<Vec<u8>>,
324318
) -> Self {
325319
Self {
326320
id: connection_id,
327321
tcp_disconnector,
328322
reader_cmd_tx,
329323
writer_cmd_tx,
324+
write_data_tx,
330325
}
331326
}
332327
}
333328
impl SocketDescriptor for SyncSocketDescriptor {
334329
/// Attempts to queue up some data from the given slice for the `Writer` to
335330
/// send. Returns the number of bytes that were successfully pushed to the
336-
/// `WriterCommand` channel.
331+
/// `write_data` channel.
337332
///
338333
/// This implementation never calls back into the PeerManager directly,
339334
/// thereby preventing reentrancy / deadlock issues. Instead, any commands
@@ -346,29 +341,22 @@ impl SocketDescriptor for SyncSocketDescriptor {
346341
/// amount of time that the PeerManager's internal locks are held.
347342
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
348343
if resume_read {
349-
// It doesn't matter whether the channel send is Ok or Err
350344
let _ = self.reader_cmd_tx.try_send(ReaderCommand::ResumeRead);
351345
}
352346

353347
if data.is_empty() {
354348
return 0;
355349
}
356350

357-
// To ensure that there is always space for a Shutdown command, only
358-
// push data into the writer_cmd channel if it is currently empty.
359-
if self.writer_cmd_tx.is_empty() {
351+
if self.write_data_tx.is_empty() {
360352
// The data must be copied into the channel since a &[u8] reference
361353
// cannot be sent across threads. This incurs a small amount of overhead.
362-
let cmd = WriterCommand::WriteData(data.to_vec());
363-
match self.writer_cmd_tx.try_send(cmd) {
364-
Ok(()) => {
365-
// Data was successfully sent to the Writer.
366-
data.len()
367-
}
354+
match self.write_data_tx.try_send(data.to_vec()) {
355+
Ok(()) => data.len(),
368356
Err(e) => match e {
369357
TrySendError::Full(_) => {
370-
// This could only happen if both channel slots were
371-
// consumed in between the if check above and now - a
358+
// This could only happen if another Sender pushed into
359+
// the channel in between the if check above and now - a
372360
// TOCTTOU error. This really shouldn't happen, but
373361
// let's just proceed normally: pause reads and return 0
374362
let _ = self.reader_cmd_tx.try_send(ReaderCommand::PauseRead);
@@ -382,8 +370,7 @@ impl SocketDescriptor for SyncSocketDescriptor {
382370
},
383371
}
384372
} else {
385-
// There wasn't enough space in the channel to hold the data AND
386-
// leave an empty space for a potential Shutdown command. Pause.
373+
// Writes are processing; pause reads.
387374
let _ = self.reader_cmd_tx.try_send(ReaderCommand::PauseRead);
388375
0
389376
}
@@ -406,7 +393,7 @@ impl SocketDescriptor for SyncSocketDescriptor {
406393
/// the trigger:
407394
/// - (1) and (2): If the Reader received the trigger, it will shut down
408395
/// BOTH halves of the shared TcpStream AND send a Shutdown command to the
409-
/// Reader.
396+
/// Writer.
410397
///
411398
/// - The explicit Shutdown command from the Reader is necessary because
412399
/// if the Reader is blocked on `writer_cmd_rx.recv()` due to its
@@ -639,6 +626,7 @@ where
639626
peer_manager: Arc<PeerManager<SyncSocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>,
640627
descriptor: SyncSocketDescriptor,
641628
writer_cmd_rx: Receiver<WriterCommand>,
629+
write_data_rx: Receiver<Vec<u8>>,
642630
/// An internal buffer which stores the data that the Writer is
643631
/// currently attempting to write.
644632
///
@@ -672,39 +660,44 @@ where
672660
peer_manager: Arc<PeerManager<SyncSocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>,
673661
descriptor: SyncSocketDescriptor,
674662
writer_cmd_rx: Receiver<WriterCommand>,
663+
write_data_rx: Receiver<Vec<u8>>,
675664
) -> Self {
676665
Self {
677666
inner: writer,
678667
peer_manager,
679668
descriptor,
680669
writer_cmd_rx,
670+
write_data_rx,
681671
buf: None,
682672
start: 0,
683673
}
684674
}
685675

686-
/// Process `WriteData` requests, or wait for the next `WriterCommand` if
687-
/// the internal buffer is empty. This implementation avoids busy loops and
688-
/// lets the thread go to sleep whenever writes or channel commands are
689-
/// pending.
676+
/// Process `write_data` requests, or block on the `writer_cmd` and
677+
/// `write_data` channels if the internal buffer is empty. This
678+
/// implementation avoids busy loops and lets the thread go to sleep
679+
/// whenever writes or channel messages are pending.
690680
///
691681
/// - If `self.buf == None`, block on `self.reader_cmd_rx.recv()` and handle
692682
/// any commands accordingly.
693683
/// - If `self.buf == Some(Vec<u8>)`, block on `self.inner.write()` and
694684
/// handle the response accordingly.
695-
/// - The Writer does NOT check for pending `WriterCommands` in between each
696-
/// event. This is because we do NOT want to take a potential WriteData
697-
/// request out of the channel in the case that the Writer is currently
698-
/// failing to write the data in self.buf(). This way, a failing write
699-
/// will cause the next send_data() call to fill up the space dedicated
700-
/// for WriteData commands, which in turn allows a later call to
701-
/// send_data() to detect that writes are still pending.
685+
/// - In between each event, do a non-blocking check for Shutdown commands.
702686
#[allow(clippy::single_match)]
703687
#[allow(clippy::comparison_chain)]
704688
fn run(&mut self) {
705689
use std::io::ErrorKind::*;
706690

707691
loop {
692+
// Do a non-blocking check to see if we've been told to shut down
693+
match self.writer_cmd_rx.try_recv() {
694+
Ok(WriterCommand::Shutdown) => break,
695+
Err(e) => match e {
696+
TryRecvError::Empty => {}
697+
TryRecvError::Disconnected => break,
698+
},
699+
}
700+
708701
match &self.buf {
709702
Some(buf) => {
710703
// We have data in our internal buffer; attempt to write it
@@ -755,39 +748,29 @@ where
755748
},
756749
}
757750
}
758-
None => {
759-
// We don't have data in our internal buffer; block on the
760-
// command channel
761-
match self.writer_cmd_rx.recv() {
762-
Ok(cmd) => match cmd {
763-
WriterCommand::WriteData(data) => {
764-
if !data.is_empty() {
765-
// Data fetched, add it to the buffer
766-
self.buf = Some(data);
767-
self.start = 0;
768-
}
751+
None => select! {
752+
recv(self.writer_cmd_rx) -> _ => break,
753+
recv(self.write_data_rx) -> res => match res {
754+
Ok(data) => {
755+
if !data.is_empty() {
756+
self.buf = Some(data);
757+
self.start = 0;
758+
}
769759

770-
// There is space for the next send_data()
771-
// request; notify the PeerManager
772-
if self
773-
.peer_manager
774-
.write_buffer_space_avail(&mut self.descriptor)
775-
.is_err()
776-
{
777-
// PeerManager wants us to disconnect
778-
break;
779-
}
760+
// There is space for the next send_data()
761+
// request; notify the PeerManager
762+
if self
763+
.peer_manager
764+
.write_buffer_space_avail(&mut self.descriptor)
765+
.is_err()
766+
{
767+
// PeerManager wants us to disconnect
768+
break;
780769
}
781-
WriterCommand::Shutdown => break,
782-
},
783-
Err(_) => {
784-
// Channel is empty and disconnected
785-
// => no more messages can be sent
786-
// => break the loop and shut down
787-
break;
788770
}
771+
Err(_) => break,
789772
}
790-
}
773+
},
791774
}
792775
}
793776

0 commit comments

Comments
 (0)