Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 54 additions & 30 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
use crate::lsps2::msgs::LSPS2Message;
use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
use crate::prelude::{HashMap, ToString, Vec};
use crate::prelude::{HashMap, HashSet, ToString, Vec};
use crate::sync::{Arc, Mutex, RwLock};

use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ErrorAction, ErrorMessage, LightningError};
use lightning::ln::msgs::{ErrorAction, LightningError};
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::wire::CustomMessageReader;
use lightning::ln::ChannelId;
use lightning::sign::EntropySource;
use lightning::util::logger::Level;
use lightning::util::ser::Readable;
Expand Down Expand Up @@ -94,6 +93,8 @@ where
pending_messages: Arc<MessageQueue>,
pending_events: Arc<EventQueue>,
request_id_to_method_map: Mutex<HashMap<RequestId, LSPSMethod>>,
// We ignore peers if they send us bogus data.
ignored_peers: RwLock<HashSet<PublicKey>>,
lsps0_client_handler: LSPS0ClientHandler<ES>,
lsps0_service_handler: Option<LSPS0ServiceHandler>,
#[cfg(lsps1)]
Expand Down Expand Up @@ -126,6 +127,7 @@ where
where {
let pending_messages = Arc::new(MessageQueue::new());
let pending_events = Arc::new(EventQueue::new());
let ignored_peers = RwLock::new(HashSet::new());

let lsps0_client_handler = LSPS0ClientHandler::new(
entropy_source.clone(),
Expand Down Expand Up @@ -192,6 +194,7 @@ where {
pending_messages,
pending_events,
request_id_to_method_map: Mutex::new(HashMap::new()),
ignored_peers,
lsps0_client_handler,
lsps0_service_handler,
#[cfg(lsps1)]
Expand Down Expand Up @@ -480,41 +483,62 @@ where
fn handle_custom_message(
&self, msg: Self::CustomMessage, sender_node_id: &PublicKey,
) -> Result<(), lightning::ln::msgs::LightningError> {
{
if self.ignored_peers.read().unwrap().contains(&sender_node_id) {
let err = format!("Ignoring message from peer {}.", sender_node_id);
return Err(LightningError {
err,
action: ErrorAction::IgnoreAndLog(Level::Trace),
});
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this respond to peer with an error or just log it on our end? We've run into issues where we just silently fail/ignore requests (when user hits our rate limits) and the wallet user experience is odd because it just hangs. This could introduce that experience?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in this case it's really intended as it's a DoS protection if the user sends us bogus data. The alternative is disconnecting them which I'd even prefer, but we don't know if the user has any channels with us that still need to be kept operational. While this just adds the user to the ignorelist until restart, we def. want to remove them after a while in the future, especially once we start persisting things, as tracked here: https://github.com/lightningdevkit/lightning-liquidity/issues/117


let message = {
let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map).map_err(
|_| {
let error = ResponseError {
code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
data: None,
};

self.pending_messages.enqueue(sender_node_id, LSPSMessage::Invalid(error));
let err = format!("Failed to deserialize invalid LSPS message.");
let err_msg =
Some(ErrorMessage { channel_id: ChannelId([0; 32]), data: err.clone() });
LightningError { err, action: ErrorAction::DisconnectPeer { msg: err_msg } }
},
)?
{
let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
}
.map_err(|_| {
let error = ResponseError {
code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
data: None,
};

self.pending_messages.enqueue(sender_node_id, LSPSMessage::Invalid(error));
self.ignored_peers.write().unwrap().insert(*sender_node_id);
let err = format!(
"Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
sender_node_id
);
LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) }
})?
};

self.handle_lsps_message(message, sender_node_id)
}

fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
self.pending_messages
.get_and_clear_pending_msgs()
let pending_messages = self.pending_messages.get_and_clear_pending_msgs();

let mut request_ids_and_methods = pending_messages
.iter()
.map(|(public_key, lsps_message)| {
if let Some((request_id, method)) = lsps_message.get_request_id_and_method() {
request_id_to_method_map.insert(request_id, method);
}
(
*public_key,
RawLSPSMessage { payload: serde_json::to_string(&lsps_message).unwrap() },
)
.filter_map(|(_, msg)| msg.get_request_id_and_method())
.peekable();

if request_ids_and_methods.peek().is_some() {
let mut request_id_to_method_map_lock = self.request_id_to_method_map.lock().unwrap();
for (request_id, method) in request_ids_and_methods {
request_id_to_method_map_lock.insert(request_id, method);
}
}

pending_messages
.into_iter()
.filter_map(|(public_key, msg)| {
serde_json::to_string(&msg)
.ok()
.and_then(|payload| Some((public_key, RawLSPSMessage { payload })))
})
.collect()
}
Expand Down