Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ struct Peer {
msgs_sent_since_pong: usize,
awaiting_pong_timer_tick_intervals: i8,
received_message_since_timer_tick: bool,
sent_gossip_timestamp_filter: bool,
}

impl Peer {
Expand All @@ -348,7 +349,11 @@ impl Peer {
/// announcements/updates for the given channel_id then we will send it when we get to that
/// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
/// sent the old versions, we should send the update, and so return true here.
fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
!self.sent_gossip_timestamp_filter {
return false;
}
match self.sync_status {
InitSyncTracker::NoSyncRequested => true,
InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
Expand All @@ -358,6 +363,10 @@ impl Peer {

/// Similar to the above, but for node announcements indexed by node_id.
fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
!self.sent_gossip_timestamp_filter {
return false;
}
match self.sync_status {
InitSyncTracker::NoSyncRequested => true,
InitSyncTracker::ChannelsSyncing(_) => false,
Expand Down Expand Up @@ -619,6 +628,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Expand Down Expand Up @@ -665,6 +675,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Expand Down Expand Up @@ -1058,7 +1069,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P

log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);

if msg.features.initial_routing_sync() {
// For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
}
if !msg.features.supports_static_remote_key() {
Expand Down Expand Up @@ -1205,7 +1217,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::GossipTimestampFilter(_msg) => {
// TODO: handle message
// When supporting gossip messages, start inital gossip sync only after we receive
// a GossipTimestampFilter
if peer.their_features.as_ref().unwrap().supports_gossip_queries() &&
!peer.sent_gossip_timestamp_filter {
peer.sent_gossip_timestamp_filter = true;
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
}
},

// Unknown messages:
Expand Down Expand Up @@ -1799,6 +1817,8 @@ mod tests {
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
peer_b.process_events();
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
peer_a.process_events();
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
(fd_a.clone(), fd_b.clone())
}

Expand Down Expand Up @@ -1862,21 +1882,21 @@ mod tests {
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);

// Make each peer to read the messages that the other peer just wrote to them. Note that
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
// due to the max-message-before-ping limits this may take a few iterations to complete.
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
peers[0].process_events();
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
assert!(!b_read_data.is_empty());

peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
peers[1].process_events();

let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
assert!(!a_read_data.is_empty());

peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
peers[0].process_events();

peers[1].process_events();
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
assert!(!b_read_data.is_empty());
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();

peers[0].process_events();
assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages");
}

// Check that each peer has received the expected number of channel updates and channel
Expand Down
40 changes: 38 additions & 2 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ use core::{cmp, mem};
use bitcoin::bech32::u5;
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};

#[cfg(feature = "std")]
use std::time::{SystemTime, UNIX_EPOCH};

pub struct TestVecWriter(pub Vec<u8>);
impl Writer for TestVecWriter {
fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> {
Expand Down Expand Up @@ -341,6 +344,7 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
pub struct TestRoutingMessageHandler {
pub chan_upds_recvd: AtomicUsize,
pub chan_anns_recvd: AtomicUsize,
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
pub request_full_sync: AtomicBool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should just fix this to honor request_full_sync and send a gossip filter on connected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the filter is currently already sent in peer_connected() and since request_full_sync was not used at all, I figured it would sense to simply remove it rather than to add much more logic to the so-far pretty slim TestRoutingMessageHandler. Or are you suggesting to introduce request_full_sync to other places, e.g., NetGraphMsgHandler, so it could be used by should_request_full_sync()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, who is sending the filter in peer_connected when we're using a TestRoutingMessageHandler? Its peer_connected method is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I get what you're saying. You're right, its probably easiest here to let the TestRoutingMessageHandler implementation of peer_connected mirror the functionality of NetGraphMsgHandler.

Copy link
Contributor Author

@tnull tnull Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went that route (replicating the peer_connected functionality in TestRoutingMessgaeHandler) and it generally seems to work nicely. However, now the second peer receives only 86/100 channel updates and 43/50 channel announcements, i.e., the test now fails at the last two assert_eqs. I currently struggle to find why this would be the case, especially since the logs show that the gossip is indeed just postponed until the peers have received gossip_timestamp_filter messages. I'll come around to continue staring at the code next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and before I adapt the test I at least would like to understand why suddenly not the same number of channel updates are delivered, because its not intuitive to me from the changes I made...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the channel updates will be delivered eventually, but the test currently handles a specific number of round-trips between the peers and then makes sure the messages were all delivered. We now need one more round-trip, I believe (but its not quite in the same set of steps as the existing loop, so we can't just add one).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 7a99de2 I now moved some things in the loop around. This makes the test succeed hopefully without reducing the coverage too much (i.e., it now checks for B's empty outbound_data once in the end, not in every iteration).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I mean I think that's fine. That said, I'd kinda prefer to update the pre-loop code to add a few message delivery calls that match the new message flow, but if its too annoying to figure that out exactly I'd be okay landing this as-is. Its kinda nice to use this test to ensure that we don't write more gossip messages to the buffer after we reach our limit, even if it makes the test kinda brittle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. The gossip filter messages basically added half a round trip. a31362a should be a cleaner fix now.

}

Expand All @@ -349,6 +353,7 @@ impl TestRoutingMessageHandler {
TestRoutingMessageHandler {
chan_upds_recvd: AtomicUsize::new(0),
chan_anns_recvd: AtomicUsize::new(0),
pending_events: Mutex::new(vec![]),
request_full_sync: AtomicBool::new(false),
}
}
Expand Down Expand Up @@ -384,7 +389,35 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
Vec::new()
}

fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {
if !init_msg.features.supports_gossip_queries() {
return ();
}

let should_request_full_sync = self.request_full_sync.load(Ordering::Acquire);

#[allow(unused_mut, unused_assignments)]
let mut gossip_start_time = 0;
#[cfg(feature = "std")]
{
gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs();
if should_request_full_sync {
Copy link
Contributor

@arik-so arik-so May 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be more legible to write

let delta = if should_request_full_sync { 
    60 * 60 * 24 * 7 * 2 // two weeks ago
} else { 
    60 * 60 // an hour ago
};
gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() - delta;

?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it was copied from the non-test code, so presumably fine-enough.

gossip_start_time -= 60 * 60 * 24 * 7 * 2; // 2 weeks ago
} else {
gossip_start_time -= 60 * 60; // an hour ago
}
}

let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::MessageSendEvent::SendGossipTimestampFilter {
node_id: their_node_id.clone(),
msg: msgs::GossipTimestampFilter {
chain_hash: genesis_block(Network::Testnet).header.block_hash(),
first_timestamp: gossip_start_time as u32,
timestamp_range: u32::max_value(),
},
});
}

fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
Ok(())
Expand All @@ -405,7 +438,10 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {

impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
vec![]
let mut ret = Vec::new();
let mut pending_events = self.pending_events.lock().unwrap();
core::mem::swap(&mut ret, &mut pending_events);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purely out of curiosity, what's the benefit of mem::swap over drain?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mem::swap exploits the Rust fact that an object's in-memory representation can always be copied for a move. Thus, this will just copy the (pointer, length, capacity) tuple from one to the other, drain(..) will (or could, I'm not sure how LLVM will optimize it in practice) actually allocate a new vec, move all the old vec elements, and then return that.

ret
}
}

Expand Down