Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);

impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
// Needed to differentiate test expectations.
#[cfg(test)]
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}

fn filtered_block_connected(&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
Expand Down
50 changes: 46 additions & 4 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub trait BlockSource : Sync + Send {

/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData>;

/// Returns the hash of the best block and, optionally, its height.
///
Expand Down Expand Up @@ -152,6 +152,18 @@ pub struct BlockHeaderData {
pub chainwork: Uint256,
}

/// A block including either all its transactions or only the block header.
///
/// [`BlockSource`] may be implemented to either always return full blocks or, in the case of
/// compact block filters (BIP 157/158), return header-only blocks when no pertinent transactions
/// match. See [`chain::Filter`] for details on how to notify a source of such transactions.
pub enum BlockData {
/// A block containing all its transactions.
FullBlock(Block),
/// A block header for when the block does not contain any pertinent transactions.
HeaderOnly(BlockHeader),
}

/// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified
/// Payment Verification (SPV).
///
Expand Down Expand Up @@ -396,13 +408,22 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis
chain_poller: &mut P,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
for header in connected_blocks.drain(..).rev() {
let block = chain_poller
let height = header.height;
let block_data = chain_poller
.fetch_block(&header).await
.or_else(|e| Err((e, Some(new_tip))))?;
debug_assert_eq!(block.block_hash, header.block_hash);
debug_assert_eq!(block_data.block_hash, header.block_hash);

match block_data.deref() {
BlockData::FullBlock(block) => {
self.chain_listener.block_connected(&block, height);
},
BlockData::HeaderOnly(header) => {
self.chain_listener.filtered_block_connected(&header, &[], height);
},
}

self.header_cache.block_connected(header.block_hash, header);
self.chain_listener.block_connected(&block, header.height);
new_tip = header;
}

Expand Down Expand Up @@ -707,4 +728,25 @@ mod chain_notifier_tests {
Ok(_) => panic!("Expected error"),
}
}

#[tokio::test]
async fn sync_from_chain_with_filtered_blocks() {
let mut chain = Blockchain::default().with_height(3).filtered_blocks();

let new_tip = chain.tip();
let old_tip = chain.at_height(1);
let chain_listener = &MockChainListener::new()
.expect_filtered_block_connected(*chain.at_height(2))
.expect_filtered_block_connected(*new_tip);
let mut notifier = ChainNotifier {
header_cache: &mut chain.header_cache(0..=1),
chain_listener,
};
let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await {
Err((e, _)) => panic!("Unexpected error: {:?}", e),
Ok(_) => {},
}
}

}
32 changes: 19 additions & 13 deletions lightning-block-sync/src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! Adapters that make one or more [`BlockSource`]s simpler to poll for new chain tip transitions.

use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult};
use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult};

use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;

Expand Down Expand Up @@ -71,24 +70,31 @@ impl Validate for BlockHeaderData {
}
}

impl Validate for Block {
impl Validate for BlockData {
type T = ValidatedBlock;

fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
let pow_valid_block_hash = self.header
.validate_pow(&self.header.target())
let header = match &self {
BlockData::FullBlock(block) => &block.header,
BlockData::HeaderOnly(header) => header,
};

let pow_valid_block_hash = header
.validate_pow(&header.target())
.or_else(|e| Err(BlockSourceError::persistent(e)))?;

if pow_valid_block_hash != block_hash {
return Err(BlockSourceError::persistent("invalid block hash"));
}

if !self.check_merkle_root() {
return Err(BlockSourceError::persistent("invalid merkle root"));
}
if let BlockData::FullBlock(block) = &self {
if !block.check_merkle_root() {
return Err(BlockSourceError::persistent("invalid merkle root"));
}

if !self.check_witness_commitment() {
return Err(BlockSourceError::persistent("invalid witness commitment"));
if !block.check_witness_commitment() {
return Err(BlockSourceError::persistent("invalid witness commitment"));
}
}

Ok(ValidatedBlock { block_hash, inner: self })
Expand Down Expand Up @@ -145,11 +151,11 @@ impl ValidatedBlockHeader {
/// A block with validated data against its transaction list and corresponding block hash.
pub struct ValidatedBlock {
pub(crate) block_hash: BlockHash,
inner: Block,
inner: BlockData,
}

impl std::ops::Deref for ValidatedBlock {
type Target = Block;
type Target = BlockData;

fn deref(&self) -> &Self::Target {
&self.inner
Expand All @@ -161,7 +167,7 @@ mod sealed {
pub trait Validate {}

impl Validate for crate::BlockHeaderData {}
impl Validate for bitcoin::blockdata::block::Block {}
impl Validate for crate::BlockData {}
}

/// The canonical `Poll` implementation used for a single `BlockSource`.
Expand Down
7 changes: 3 additions & 4 deletions lightning-block-sync/src/rest.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Simple REST client implementation which implements [`BlockSource`] against a Bitcoin Core REST
//! endpoint.

use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse};

use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

Expand Down Expand Up @@ -45,10 +44,10 @@ impl BlockSource for RestClient {
})
}

fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
Ok(BlockData::FullBlock(self.request_resource::<BinaryResponse, _>(&resource_path).await?))
})
}

Expand Down
7 changes: 3 additions & 4 deletions lightning-block-sync/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC
//! endpoint.

use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};

use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;

Expand Down Expand Up @@ -91,11 +90,11 @@ impl BlockSource for RpcClient {
})
}

fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
})
}

Expand Down
43 changes: 38 additions & 5 deletions lightning-block-sync/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache};
use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache};
use crate::poll::{Validate, ValidatedBlockHeader};

use bitcoin::blockdata::block::{Block, BlockHeader};
Expand All @@ -20,6 +20,7 @@ pub struct Blockchain {
without_blocks: Option<std::ops::RangeFrom<usize>>,
without_headers: bool,
malformed_headers: bool,
filtered_blocks: bool,
}

impl Blockchain {
Expand Down Expand Up @@ -77,6 +78,10 @@ impl Blockchain {
Self { malformed_headers: true, ..self }
}

pub fn filtered_blocks(self) -> Self {
Self { filtered_blocks: true, ..self }
}

pub fn fork_at_height(&self, height: usize) -> Self {
assert!(height + 1 < self.blocks.len());
let mut blocks = self.blocks.clone();
Expand Down Expand Up @@ -146,7 +151,7 @@ impl BlockSource for Blockchain {
})
}

fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
Box::pin(async move {
for (height, block) in self.blocks.iter().enumerate() {
if block.header.block_hash() == *header_hash {
Expand All @@ -156,7 +161,11 @@ impl BlockSource for Blockchain {
}
}

return Ok(block.clone());
if self.filtered_blocks {
return Ok(BlockData::HeaderOnly(block.header.clone()));
} else {
return Ok(BlockData::FullBlock(block.clone()));
}
}
}
Err(BlockSourceError::transient("block not found"))
Expand Down Expand Up @@ -185,13 +194,15 @@ impl chain::Listen for NullChainListener {

pub struct MockChainListener {
expected_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_filtered_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_blocks_disconnected: RefCell<VecDeque<BlockHeaderData>>,
}

impl MockChainListener {
pub fn new() -> Self {
Self {
expected_blocks_connected: RefCell::new(VecDeque::new()),
expected_filtered_blocks_connected: RefCell::new(VecDeque::new()),
expected_blocks_disconnected: RefCell::new(VecDeque::new()),
}
}
Expand All @@ -201,17 +212,34 @@ impl MockChainListener {
self
}

pub fn expect_filtered_block_connected(self, block: BlockHeaderData) -> Self {
self.expected_filtered_blocks_connected.borrow_mut().push_back(block);
self
}

pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_disconnected.borrow_mut().push_back(block);
self
}
}

impl chain::Listen for MockChainListener {
fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) {
fn block_connected(&self, block: &Block, height: u32) {
match self.expected_blocks_connected.borrow_mut().pop_front() {
None => {
panic!("Unexpected block connected: {:?}", header.block_hash());
panic!("Unexpected block connected: {:?}", block.block_hash());
},
Some(expected_block) => {
assert_eq!(block.block_hash(), expected_block.header.block_hash());
assert_eq!(height, expected_block.height);
},
}
}

fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) {
match self.expected_filtered_blocks_connected.borrow_mut().pop_front() {
None => {
panic!("Unexpected filtered block connected: {:?}", header.block_hash());
},
Some(expected_block) => {
assert_eq!(header.block_hash(), expected_block.header.block_hash());
Expand Down Expand Up @@ -244,6 +272,11 @@ impl Drop for MockChainListener {
panic!("Expected blocks connected: {:?}", expected_blocks_connected);
}

let expected_filtered_blocks_connected = self.expected_filtered_blocks_connected.borrow();
if !expected_filtered_blocks_connected.is_empty() {
panic!("Expected filtered_blocks connected: {:?}", expected_filtered_blocks_connected);
}

let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow();
if !expected_blocks_disconnected.is_empty() {
panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected);
Expand Down