Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 54 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ mod cli;
mod convert;
mod disk;
mod hex_utils;
mod sweep;

use crate::bitcoind_client::BitcoindClient;
use crate::disk::FilesystemLogger;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode;
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::Secp256k1;
use bitcoin::BlockHash;
use bitcoin_bech32::WitnessProgram;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{EntropySource, InMemorySigner, KeysManager};
use lightning::chain::keysinterface::{
EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor,
};
use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
use lightning::chain::{Filter, Watch};
use lightning::events::{Event, PaymentFailureReason, PaymentPurpose};
Expand All @@ -30,6 +31,7 @@ use lightning::routing::gossip;
use lightning::routing::gossip::{NodeId, P2PGossipSync};
use lightning::routing::router::DefaultRouter;
use lightning::util::config::UserConfig;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::ReadableArgs;
use lightning_background_processor::{process_events_async, GossipSync};
use lightning_block_sync::init;
Expand All @@ -52,6 +54,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

pub(crate) const PENDING_SPENDABLE_OUTPUT_DIR: &'static str = "pending_spendable_outputs";

pub(crate) enum HTLCStatus {
Pending,
Succeeded,
Expand Down Expand Up @@ -107,7 +111,7 @@ async fn handle_ldk_events(
channel_manager: &Arc<ChannelManager>, bitcoind_client: &BitcoindClient,
network_graph: &NetworkGraph, keys_manager: &KeysManager,
inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage,
network: Network, event: Event,
persister: &Arc<FilesystemPersister>, network: Network, event: Event,
) {
match event {
Event::FundingGenerationReady {
Expand Down Expand Up @@ -331,20 +335,23 @@ async fn handle_ldk_events(
});
}
Event::SpendableOutputs { outputs } => {
let destination_address = bitcoind_client.get_new_address().await;
let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
let tx_feerate =
bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
let spending_tx = keys_manager
.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
&Secp256k1::new(),
)
.unwrap();
bitcoind_client.broadcast_transaction(&spending_tx);
// SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track
// of! While a `StaticOutput` descriptor is just an output to a static, well-known key,
// other descriptors are not currently ever regenerated for you by LDK. Once we return
// from this method, the descriptor will be gone, and you may lose track of some funds.
//
// Here we simply persist them to disk, with a background task running which will try
// to spend them regularly (possibly duplicatively/RBF'ing them). These can just be
// treated as normal funds where possible - they are only spendable by us and there is
// no rush to claim them.
for output in outputs {
let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
// Note that if the type here changes our read code needs to change as well.
let output: SpendableOutputDescriptor = output;
persister
.persist(&format!("{}/{}", PENDING_SPENDABLE_OUTPUT_DIR, key), &output)
.unwrap();
}
}
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
println!(
Expand Down Expand Up @@ -693,6 +700,7 @@ async fn start_ldk() {
let keys_manager_event_listener = Arc::clone(&keys_manager);
let inbound_payments_event_listener = Arc::clone(&inbound_payments);
let outbound_payments_event_listener = Arc::clone(&outbound_payments);
let persister_event_listener = Arc::clone(&persister);
let network = args.network;
let event_handler = move |event: Event| {
let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener);
Expand All @@ -701,6 +709,7 @@ async fn start_ldk() {
let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener);
let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener);
let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener);
let persister_event_listener = Arc::clone(&persister_event_listener);
async move {
handle_ldk_events(
&channel_manager_event_listener,
Expand All @@ -709,6 +718,7 @@ async fn start_ldk() {
&keys_manager_event_listener,
&inbound_payments_event_listener,
&outbound_payments_event_listener,
&persister_event_listener,
network,
event,
)
Expand All @@ -722,7 +732,7 @@ async fn start_ldk() {
// Step 20: Background Processing
let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(());
let background_processor = tokio::spawn(process_events_async(
persister,
Arc::clone(&persister),
event_handler,
chain_monitor.clone(),
channel_manager.clone(),
Expand Down Expand Up @@ -781,24 +791,38 @@ async fn start_ldk() {
});

// Regularly broadcast our node_announcement. This is only required (or possible) if we have
// some public channels, and is only useful if we have public listen address(es) to announce.
// In a production environment, this should occur only after the announcement of new channels
// to avoid churn in the global network graph.
// some public channels.
let peer_man = Arc::clone(&peer_manager);
let chan_man = Arc::clone(&channel_manager);
let network = args.network;
if !args.ldk_announced_listen_addr.is_empty() {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
tokio::spawn(async move {
// First wait a minute until we have some peers and maybe have opened a channel.
tokio::time::sleep(Duration::from_secs(60)).await;
// Then, update our announcement once an hour to keep it fresh but avoid unnecessary churn
// in the global gossip network.
let mut interval = tokio::time::interval(Duration::from_secs(3600));
loop {
interval.tick().await;
// Don't bother trying to announce if we don't have any public channls, though our
// peers should drop such an announcement anyway. Note that announcement may not
// propagate until we have a channel with 6+ confirmations.
if chan_man.list_channels().iter().any(|chan| chan.is_public) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also check it has 6 confs, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh? I mean yes the announcement wont propagate until then, but I'm lazy. I commented it instead.

Copy link
Contributor

@tnull tnull May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it should be at least list_usable_channels, as otherwise we may end up broadcasting when we have no connected peers? And, if we miss it the first time around, we'd wait for an hour to try again? So maybe only try and tick if we have usable channels or !peer_man.get_node_ids().is_empty(), and sleep a bit otherwise?

(see lightningdevkit/ldk-node@7225d42#diff-b1a35a68f14e696205874893c07fd24fdb88882b47c23cc0e0c80a30c7d53759R786-R809)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see an issue with that – this is more about making sure the broadcast is valid. Also, list_usable_channels isn't what we want here since we could be connected to non-channel peers and they should still receive our updates regardless of whether we're connected to our channel counterparties.

Copy link
Contributor

@tnull tnull May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think we want to check whether we have any online connection, doesn't have to be the one of the public channel.

The issue is that we may "waste" the broadcast tick when we're not connected and then only try again after an hour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no issue with "wasting" the broadcast tick as we don't try again until the next tick anyway. We could add a bunch of complexity and retry the claim after 10 seconds instead of an hour and check for if we have peers, but I'm really not convinced its worth it? This is only for public routing nodes anyway, which need to be online reliably with reasonable uptime, and broadcasts are only valid after an hour (6 blocks), so its not like we're really in a rush to get an announcement out super quick. We can just try again in an hour.

peer_man.broadcast_node_announcement(
[0; 3],
args.ldk_announced_node_name,
args.ldk_announced_listen_addr.clone(),
);
}
});
}
}
});

tokio::spawn(sweep::periodic_sweep(
ldk_data_dir.clone(),
Arc::clone(&keys_manager),
Arc::clone(&logger),
Arc::clone(&persister),
Arc::clone(&bitcoind_client),
));

// Start the CLI.
cli::poll_for_user_input(
Expand All @@ -809,7 +833,7 @@ async fn start_ldk() {
Arc::clone(&onion_messenger),
inbound_payments,
outbound_payments,
ldk_data_dir.clone(),
ldk_data_dir,
network,
Arc::clone(&logger),
)
Expand Down
127 changes: 127 additions & 0 deletions src/sweep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, io};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{EntropySource, KeysManager, SpendableOutputDescriptor};
use lightning::util::logger::Logger;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::{Readable, WithoutLength};

use bitcoin::secp256k1::Secp256k1;

use crate::hex_utils;
use crate::BitcoindClient;
use crate::FilesystemLogger;
use crate::FilesystemPersister;

/// If we have any pending claimable outputs, we should slowly sweep them to our Bitcoin Core
/// wallet. We technically don't need to do this - they're ours to spend when we want and can just
/// use them to build new transactions instead, but we cannot feed them direclty into Bitcoin
/// Core's wallet so we have to sweep.
///
/// Note that this is unececssary for [`SpendableOutputDescriptor::StaticOutput`]s, which *do* have
/// an associated secret key we could simply import into Bitcoin Core's wallet, but for consistency
/// we don't do that here either.
pub(crate) async fn periodic_sweep(
ldk_data_dir: String, keys_manager: Arc<KeysManager>, logger: Arc<FilesystemLogger>,
persister: Arc<FilesystemPersister>, bitcoind_client: Arc<BitcoindClient>,
) {
// Regularly claim outputs which are exclusively spendable by us and send them to Bitcoin Core.
// Note that if you more tightly integrate your wallet with LDK you may not need to do this -
// these outputs can just be treated as normal outputs during coin selection.
let pending_spendables_dir =
format!("{}/{}", crate::PENDING_SPENDABLE_OUTPUT_DIR, ldk_data_dir);
let processing_spendables_dir = format!("{}/processing_spendable_outputs", ldk_data_dir);
let spendables_dir = format!("{}/spendable_outputs", ldk_data_dir);

// We batch together claims of all spendable outputs generated each day, however only after
// batching any claims of spendable outputs which were generated prior to restart. On a mobile
// device we likely won't ever be online for more than a minute, so we have to ensure we sweep
// any pending claims on startup, but for an always-online node you may wish to sweep even less
// frequently than this (or move the interval await to the top of the loop)!
//
// There is no particular rush here, we just have to ensure funds are availably by the time we
// need to send funds.
let mut interval = tokio::time::interval(Duration::from_secs(60 * 60 * 24));

loop {
interval.tick().await; // Note that the first tick completes immediately
if let Ok(dir_iter) = fs::read_dir(&pending_spendables_dir) {
// Move any spendable descriptors from pending folder so that we don't have any
// races with new files being added.
for file_res in dir_iter {
let file = file_res.unwrap();
// Only move a file if its a 32-byte-hex'd filename, otherwise it might be a
// temporary file.
if file.file_name().len() == 64 {
fs::create_dir_all(&processing_spendables_dir).unwrap();
let mut holding_path = PathBuf::new();
holding_path.push(&processing_spendables_dir);
holding_path.push(&file.file_name());
fs::rename(file.path(), holding_path).unwrap();
}
}
// Now concatenate all the pending files we moved into one file in the
// `spendable_outputs` directory and drop the processing directory.
let mut outputs = Vec::new();
if let Ok(processing_iter) = fs::read_dir(&processing_spendables_dir) {
for file_res in processing_iter {
outputs.append(&mut fs::read(file_res.unwrap().path()).unwrap());
}
}
if !outputs.is_empty() {
let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
persister
.persist(&format!("spendable_outputs/{}", key), &WithoutLength(&outputs))
.unwrap();
fs::remove_dir_all(&processing_spendables_dir).unwrap();
}
}
// Iterate over all the sets of spendable outputs in `spendables_dir` and try to claim
// them.
// Note that here we try to claim each set of spendable outputs over and over again
// forever, even long after its been claimed. While this isn't an issue per se, in practice
// you may wish to track when the claiming transaction has confirmed and remove the
// spendable outputs set. You may also wish to merge groups of unspent spendable outputs to
// combine batches.
if let Ok(dir_iter) = fs::read_dir(&spendables_dir) {
for file_res in dir_iter {
let mut outputs: Vec<SpendableOutputDescriptor> = Vec::new();
let mut file = fs::File::open(file_res.unwrap().path()).unwrap();
loop {
// Check if there are any bytes left to read, and if so read a descriptor.
match file.read_exact(&mut [0; 1]) {
Ok(_) => {
file.seek(SeekFrom::Current(-1)).unwrap();
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => Err(e).unwrap(),
}
outputs.push(Readable::read(&mut file).unwrap());
}
let destination_address = bitcoind_client.get_new_address().await;
let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
let tx_feerate =
bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Background);
if let Ok(spending_tx) = keys_manager.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
&Secp256k1::new(),
) {
// Note that, most likely, we've already sweeped this set of outputs
// and they're already confirmed on-chain, so this broadcast will fail.
bitcoind_client.broadcast_transaction(&spending_tx);
} else {
lightning::log_error!(
logger,
"Failed to sweep spendable outputs! This may indicate the outputs are dust. Will try again in a day.");
}
}
}
}
}