Skip to content

Commit e5606c6

Browse files
committed
Refactor message broadcasting out into a utility method
This will allow us to broadcast messages received in the next commit.
1 parent 62d4007 commit e5606c6

File tree

1 file changed

+68
-46
lines changed

1 file changed

+68
-46
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 68 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,64 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10101010
Ok(())
10111011
}
10121012

1013+
fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message, except_node: Option<&PublicKey>) {
1014+
match msg {
1015+
wire::Message::ChannelAnnouncement(ref msg) => {
1016+
let encoded_msg = encode_msg!(msg);
1017+
1018+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1019+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1020+
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1021+
continue
1022+
}
1023+
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
1024+
peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) {
1025+
continue;
1026+
}
1027+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1028+
continue;
1029+
}
1030+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1031+
peers.peers_needing_send.insert((*descriptor).clone());
1032+
}
1033+
},
1034+
wire::Message::NodeAnnouncement(ref msg) => {
1035+
let encoded_msg = encode_msg!(msg);
1036+
1037+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1038+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1039+
!peer.should_forward_node_announcement(msg.contents.node_id) {
1040+
continue
1041+
}
1042+
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
1043+
continue;
1044+
}
1045+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1046+
continue;
1047+
}
1048+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1049+
peers.peers_needing_send.insert((*descriptor).clone());
1050+
}
1051+
},
1052+
wire::Message::ChannelUpdate(ref msg) => {
1053+
let encoded_msg = encode_msg!(msg);
1054+
1055+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1056+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1057+
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1058+
continue
1059+
}
1060+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1061+
continue;
1062+
}
1063+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1064+
peers.peers_needing_send.insert((*descriptor).clone());
1065+
}
1066+
},
1067+
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
1068+
}
1069+
}
1070+
10131071
/// Checks for any events generated by our handlers and processes them. Includes sending most
10141072
/// response messages as well as messages generated by calls to handler functions directly (eg
10151073
/// functions like ChannelManager::process_pending_htlc_forward or send_payment).
@@ -1157,59 +1215,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11571215
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
11581216
self.do_attempt_write_data(&mut descriptor, peer);
11591217
},
1160-
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1218+
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
11611219
log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
1162-
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
1163-
let encoded_msg = encode_msg!(msg);
1164-
let encoded_update_msg = encode_msg!(update_msg);
1165-
1166-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1167-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1168-
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1169-
continue
1170-
}
1171-
match peer.their_node_id {
1172-
None => continue,
1173-
Some(their_node_id) => {
1174-
if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
1175-
continue
1176-
}
1177-
}
1178-
}
1179-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1180-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
1181-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1182-
}
1220+
if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
1221+
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
1222+
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
11831223
}
11841224
},
1185-
MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
1225+
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
11861226
log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
1187-
if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
1188-
let encoded_msg = encode_msg!(msg);
1189-
1190-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1191-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1192-
!peer.should_forward_node_announcement(msg.contents.node_id) {
1193-
continue
1194-
}
1195-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1196-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1197-
}
1227+
if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
1228+
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
11981229
}
11991230
},
1200-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
1231+
MessageSendEvent::BroadcastChannelUpdate { msg } => {
12011232
log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
1202-
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
1203-
let encoded_msg = encode_msg!(msg);
1204-
1205-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1206-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1207-
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1208-
continue
1209-
}
1210-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1211-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1212-
}
1233+
if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
1234+
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
12131235
}
12141236
},
12151237
MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {

0 commit comments

Comments
 (0)