Skip to content
Merged
44 changes: 22 additions & 22 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::hash_types::{BlockHash, WPubkeyHash};

use lightning::chain;
use lightning::chain::{BestBlock, chainmonitor, channelmonitor, Confirm, Watch};
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
use lightning::chain::{BestBlock, ChannelMonitorUpdateErr, chainmonitor, channelmonitor, Confirm, Watch};
use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent};
use lightning::chain::transaction::OutPoint;
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{KeysInterface, InMemorySigner};
Expand Down Expand Up @@ -99,8 +99,8 @@ impl Writer for VecWriter {
struct TestChainMonitor {
pub logger: Arc<dyn Logger>,
pub keys: Arc<KeyProvider>,
pub persister: Arc<TestPersister>,
pub chain_monitor: Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
// logic will automatically force-close our channels for us (as we don't have an up-to-date
// monitor implying we are not able to punish misbehaving counterparties). Because this test
Expand All @@ -112,28 +112,27 @@ struct TestChainMonitor {
impl TestChainMonitor {
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
Self {
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest, persister)),
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest, Arc::clone(&persister))),
logger,
keys,
update_ret: Mutex::new(Ok(())),
persister,
latest_monitors: Mutex::new(HashMap::new()),
should_update_manager: atomic::AtomicBool::new(false),
}
}
}
impl chain::Watch<EnforcingSigner> for TestChainMonitor {
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
panic!("Already had monitor pre-watch_channel");
}
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
assert!(self.chain_monitor.watch_channel(funding_txo, monitor).is_ok());
self.update_ret.lock().unwrap().clone()
self.chain_monitor.watch_channel(funding_txo, monitor)
}

fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
fn update_channel(&self, funding_txo: OutPoint, update: channelmonitor::ChannelMonitorUpdate) -> Result<(), chain::ChannelMonitorUpdateErr> {
let mut map_lock = self.latest_monitors.lock().unwrap();
let mut map_entry = match map_lock.entry(funding_txo) {
hash_map::Entry::Occupied(entry) => entry,
Expand All @@ -146,8 +145,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
deserialized_monitor.write(&mut ser).unwrap();
map_entry.insert((update.update_id, ser.0));
self.should_update_manager.store(true, atomic::Ordering::Relaxed);
assert!(self.chain_monitor.update_channel(funding_txo, update).is_ok());
self.update_ret.lock().unwrap().clone()
self.chain_monitor.update_channel(funding_txo, update)
}

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
Expand Down Expand Up @@ -346,7 +344,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
($node_id: expr, $fee_estimator: expr) => { {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(HashMap::new()) });
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(), Arc::new(TestPersister{}), Arc::clone(&keys_manager)));
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(Ok(())) }), Arc::clone(&keys_manager)));

let mut config = UserConfig::default();
config.channel_options.forwarding_fee_proportional_millionths = 0;
Expand All @@ -365,7 +364,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
($ser: expr, $node_id: expr, $old_monitors: expr, $keys_manager: expr, $fee_estimator: expr) => { {
let keys_manager = Arc::clone(& $keys_manager);
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(), Arc::new(TestPersister{}), Arc::clone(& $keys_manager)));
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(Ok(())) }), Arc::clone(& $keys_manager)));

let mut config = UserConfig::default();
config.channel_options.forwarding_fee_proportional_millionths = 0;
Expand Down Expand Up @@ -846,12 +846,12 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
// bit-twiddling mutations to have similar effects. This is probably overkill, but no
// harm in doing so.

0x00 => *monitor_a.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x01 => *monitor_b.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x02 => *monitor_c.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x04 => *monitor_a.update_ret.lock().unwrap() = Ok(()),
0x05 => *monitor_b.update_ret.lock().unwrap() = Ok(()),
0x06 => *monitor_c.update_ret.lock().unwrap() = Ok(()),
0x00 => *monitor_a.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x01 => *monitor_b.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x02 => *monitor_c.persister.update_ret.lock().unwrap() = Err(ChannelMonitorUpdateErr::TemporaryFailure),
0x04 => *monitor_a.persister.update_ret.lock().unwrap() = Ok(()),
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = Ok(()),
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = Ok(()),

0x08 => {
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
Expand Down Expand Up @@ -1072,9 +1072,9 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
// after we resolve all pending events.
// First make sure there are no pending monitor updates, resetting the error state
// and calling channel_monitor_updated for each monitor.
*monitor_a.update_ret.lock().unwrap() = Ok(());
*monitor_b.update_ret.lock().unwrap() = Ok(());
*monitor_c.update_ret.lock().unwrap() = Ok(());
*monitor_a.persister.update_ret.lock().unwrap() = Ok(());
*monitor_b.persister.update_ret.lock().unwrap() = Ok(());
*monitor_c.persister.update_ret.lock().unwrap() = Ok(());

if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
Expand Down
3 changes: 2 additions & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
};

let broadcast = Arc::new(TestBroadcaster{ txn_broadcasted: Mutex::new(Vec::new()) });
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(), Arc::new(TestPersister{})));
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(Ok(())) })));

let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
let mut config = UserConfig::default();
Expand Down
19 changes: 12 additions & 7 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use lightning::chain::channelmonitor;
use lightning::chain;
use lightning::chain::{chainmonitor, channelmonitor};
use lightning::chain::transaction::OutPoint;
use lightning::util::enforcing_trait_impls::EnforcingSigner;

pub struct TestPersister {}
impl channelmonitor::Persist<EnforcingSigner> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
Ok(())
use std::sync::Mutex;

pub struct TestPersister {
pub update_ret: Mutex<Result<(), chain::ChannelMonitorUpdateErr>>,
}
impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
self.update_ret.lock().unwrap().clone()
}

fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
Ok(())
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
self.update_ret.lock().unwrap().clone()
}
}
5 changes: 2 additions & 3 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::ChainMonitor;
use lightning::chain::channelmonitor;
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
Expand Down Expand Up @@ -194,7 +193,7 @@ impl BackgroundProcessor {
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + channelmonitor::Persist<Signer>,
P::Target: 'static + Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
Expand Down
4 changes: 2 additions & 2 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ BlockSourceResult<ValidatedBlockHeader> {
///
/// use lightning::chain;
/// use lightning::chain::Watch;
/// use lightning::chain::chainmonitor;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor;
/// use lightning::chain::channelmonitor::ChannelMonitor;
/// use lightning::chain::chaininterface::BroadcasterInterface;
/// use lightning::chain::chaininterface::FeeEstimator;
Expand All @@ -65,7 +65,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// F: FeeEstimator,
/// L: Logger,
/// C: chain::Filter,
/// P: channelmonitor::Persist<S>,
/// P: chainmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
Expand Down
2 changes: 1 addition & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
//! type Logger = dyn lightning::util::logger::Logger + Send + Sync;
//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type DataPersister = dyn lightning::chain::chainmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
Expand Down
17 changes: 9 additions & 8 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
use crate::util::DiskWriteable;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr};
use lightning::chain::channelmonitor;
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
use lightning::chain::chainmonitor;
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::chain::transaction::OutPoint;
use lightning::ln::channelmanager::ChannelManager;
Expand Down Expand Up @@ -158,17 +158,17 @@ impl FilesystemPersister {
}
}

impl<ChannelSigner: Sign> channelmonitor::Persist<ChannelSigner> for FilesystemPersister {
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| ChannelMonitorUpdateErr::PermanentFailure)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}

fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| ChannelMonitorUpdateErr::PermanentFailure)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}
}

Expand All @@ -180,7 +180,8 @@ mod tests {
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::hashes::hex::FromHex;
use bitcoin::Txid;
use lightning::chain::channelmonitor::{Persist, ChannelMonitorUpdateErr};
use lightning::chain::ChannelMonitorUpdateErr;
use lightning::chain::chainmonitor::Persist;
use lightning::chain::transaction::OutPoint;
use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors};
use lightning::ln::features::InitFeatures;
Expand Down
Loading