Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 78 additions & 19 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use rollup_node_providers::L1MessageProvider;
use rollup_node_sequencer::{Sequencer, SequencerEvent};
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
use scroll_alloy_provider::ScrollEngineApi;
Expand Down Expand Up @@ -356,13 +356,23 @@ impl<
let _ = tx.send(self.network.handle().clone());
}
ChainOrchestratorCommand::UpdateFcsHead((head, sender)) => {
// Collect transactions of reverted blocks from l2 client.
let reverted_transactions = self
.collect_reverted_txs_in_range(
head.number.saturating_add(1),
self.engine.fcs().head_block_info().number,
)
.await?;
self.engine.update_fcs(Some(head), None, None).await?;
self.database
.tx_mut(move |tx| async move {
tx.purge_l1_message_to_l2_block_mappings(Some(head.number + 1)).await?;
tx.set_l2_head_block_number(head.number).await
})
.await?;

// Add all reverted transactions to the transaction pool.
self.reinsert_txs_into_pool(reverted_transactions).await;
self.notify(ChainOrchestratorEvent::FcsHeadUpdated(head));
let _ = sender.send(());
}
Expand Down Expand Up @@ -560,6 +570,43 @@ impl<
Ok(Some(ChainOrchestratorEvent::NewL1Block(block_number)))
}

/// Collects reverted L2 transactions in [from, to], excluding L1 messages.
async fn collect_reverted_txs_in_range(
&self,
from: u64,
to: u64,
) -> Result<Vec<ScrollTxEnvelope>, ChainOrchestratorError> {
let mut reverted_transactions: Vec<ScrollTxEnvelope> = Vec::new();
for number in from..=to {
let block = self
.l2_client
.get_block_by_number(number.into())
.full()
.await?
.ok_or_else(|| ChainOrchestratorError::L2BlockNotFoundInL2Client(number))?;

let block = block.into_consensus().map_transactions(|tx| tx.inner.into_inner());
reverted_transactions.extend(
block.into_body().transactions.into_iter().filter(|tx| !tx.is_l1_message()),
);
}
Ok(reverted_transactions)
}

/// Reinserts given L2 transactions into the transaction pool.
async fn reinsert_txs_into_pool(&self, txs: Vec<ScrollTxEnvelope>) {
for tx in txs {
let encoded_tx = tx.encoded_2718();
if let Err(err) = self.l2_client.send_raw_transaction(&encoded_tx).await {
tracing::warn!(
target: "scroll::chain_orchestrator",
?err,
"failed to reinsert reverted transaction into pool"
);
}
}
}

/// Handles a reorganization event by deleting all indexed data which is greater than the
/// provided block number.
async fn handle_l1_reorg(
Expand All @@ -570,27 +617,36 @@ impl<
let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
self.database.unwind(genesis_hash, block_number).await?;

let l2_head_block_info = if let Some(block_number) = l2_head_block_number {
// Fetch the block hash of the new L2 head block.
let block_hash = self
.l2_client
.get_block_by_number(block_number.into())
.full()
.await?
.expect("L2 head block must exist")
.header
.hash_slow();
let (l2_head_block_info, reverted_transactions) =
if let Some(block_number) = l2_head_block_number {
// Fetch the block hash of the new L2 head block.
let block_hash = self
.l2_client
.get_block_by_number(block_number.into())
.full()
.await?
.expect("L2 head block must exist")
.header
.hash_slow();

// Cancel the inflight payload building job if the head has changed.
if let Some(s) = self.sequencer.as_mut() {
s.cancel_payload_building_job();
};

// Collect transactions of reverted blocks from l2 client.
let reverted_transactions = self
.collect_reverted_txs_in_range(
block_number.saturating_add(1),
self.engine.fcs().head_block_info().number,
)
.await?;

// Cancel the inflight payload building job if the head has changed.
if let Some(s) = self.sequencer.as_mut() {
s.cancel_payload_building_job();
(Some(BlockInfo { number: block_number, hash: block_hash }), reverted_transactions)
} else {
(None, Vec::new())
};

Some(BlockInfo { number: block_number, hash: block_hash })
} else {
None
};

// If the L1 reorg is before the origin of the inflight payload building job, cancel it.
if Some(l1_block_number) <
self.sequencer
Expand All @@ -608,6 +664,9 @@ impl<
self.engine.update_fcs(l2_head_block_info, l2_safe_block_info, None).await?;
}

// Add all reverted transactions to the transaction pool.
self.reinsert_txs_into_pool(reverted_transactions).await;

let event = ChainOrchestratorEvent::L1Reorg {
l1_block_number,
queue_index,
Expand Down
152 changes: 151 additions & 1 deletion crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! End-to-end tests for the rollup node.

use alloy_eips::BlockNumberOrTag;
use alloy_eips::{eip2718::Encodable2718, BlockNumberOrTag};
use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256};
use alloy_rpc_types_eth::Block;
use alloy_signer::Signer;
Expand Down Expand Up @@ -1451,6 +1451,152 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> {
Ok(())
}

/// Test that when L2 block reorg happens due to an L1 reorg, the transactions that were reverted
/// are requeued.
#[tokio::test]
async fn requeues_transactions_after_l1_reorg() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let chain_spec = (*SCROLL_DEV).clone();
let mut config = default_sequencer_test_scroll_rollup_node_config();
config.sequencer_args.auto_start = false;
config.sequencer_args.block_time = 0;

let (mut nodes, _tasks, wallet) =
setup_engine(config, 1, chain_spec.clone(), false, false).await?;
let node = nodes.pop().expect("node exists");

let rnm_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let mut events = rnm_handle.get_event_listener().await?;
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();

l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?;
let _ = events.next().await;
let _ = events.next().await;

// Let the sequencer build 10 blocks.
for i in 1..=10 {
rnm_handle.build_block();
let b = wait_for_block_sequenced_5s(&mut events, i).await?;
tracing::info!(target: "scroll::test", block_number = ?b.header.number, block_hash = ?b.header.hash_slow(), "Sequenced block");
}

// Send a L1 message and wait for it to be indexed.
let l1_message_notification = L1Notification::L1Message {
message: TxL1Message {
queue_index: 0,
gas_limit: 21000,
to: Default::default(),
value: Default::default(),
sender: Default::default(),
input: Default::default(),
},
block_number: 2,
block_timestamp: 0,
};

// Build a L2 block with L1 message, so we can revert it later.
l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?;
l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(2))).await?;
wait_for_event_5s(&mut events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?;
wait_for_event_5s(&mut events, ChainOrchestratorEvent::NewL1Block(2)).await?;
rnm_handle.build_block();
wait_for_block_sequenced_5s(&mut events, 11).await?;

// Inject a user transaction and force the sequencer to include it in the next block
let wallet = Arc::new(Mutex::new(wallet));
let tx = generate_tx(wallet.clone()).await;
let injected_tx_bytes: Vec<u8> = tx.clone().into();
node.rpc.inject_tx(tx).await?;

rnm_handle.build_block();
let block_with_tx = wait_for_block_sequenced_5s(&mut events, 12).await?;
assert!(
block_contains_raw_tx(&block_with_tx, &injected_tx_bytes),
"block 11 should contain the injected transaction before the reorg"
);

// Trigger an L1 reorg that reverts the block containing the transaction
l1_watcher_tx.send(Arc::new(L1Notification::Reorg(1))).await?;
wait_for_event_predicate_5s(&mut events, |event| {
matches!(event, ChainOrchestratorEvent::L1Reorg { l1_block_number: 1, .. })
})
.await?;

// Build the next block – the reverted transaction should have been requeued
rnm_handle.build_block();
let reseq_block = wait_for_block_sequenced_5s(&mut events, 11).await?;
assert!(
block_contains_raw_tx(&reseq_block, &injected_tx_bytes),
"re-sequenced block should contain the reverted transaction"
);

Ok(())
}

/// Test that when the FCS head is reset to an earlier block via `UpdateFcsHead`,
/// the transactions from reverted blocks are requeued into the tx pool and can
/// be included again.
#[tokio::test]
async fn requeues_transactions_after_update_fcs_head() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let chain_spec = (*SCROLL_DEV).clone();
let mut config = default_sequencer_test_scroll_rollup_node_config();
config.sequencer_args.auto_start = false;
config.sequencer_args.block_time = 0;

let (mut nodes, _tasks, wallet) =
setup_engine(config, 1, chain_spec.clone(), false, false).await?;
let node = nodes.pop().expect("node exists");

let handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
let mut events = handle.get_event_listener().await?;

// Set L1 synced to allow sequencing.
let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();
l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?;
let _ = events.next().await;
let _ = events.next().await;

// Build a few blocks and remember block #4 as the future reset target.
let mut target_head: Option<BlockInfo> = None;
for i in 1..=4 {
handle.build_block();
let b = wait_for_block_sequenced_5s(&mut events, i).await?;
if i == 4 {
target_head = Some(BlockInfo { number: b.header.number, hash: b.header.hash_slow() });
}
}

// Inject a user transaction and include it in block 5.
let wallet = Arc::new(Mutex::new(wallet));
let tx = generate_tx(wallet.clone()).await;
let injected_tx_bytes: Vec<u8> = tx.clone().into();
node.rpc.inject_tx(tx).await?;

handle.build_block();
let block_with_tx = wait_for_block_sequenced_5s(&mut events, 5).await?;
assert!(
block_contains_raw_tx(&block_with_tx, &injected_tx_bytes),
"block 5 should contain the injected transaction before the FCS reset",
);

// Reset FCS head back to block 4; this should collect block 5's txs and requeue them.
let head = target_head.expect("target head exists");
handle.update_fcs_head(head).await.expect("update_fcs_head should succeed");

// Build the next block – the reverted transaction should have been requeued and included.
handle.build_block();
let reseq_block = wait_for_block_sequenced_5s(&mut events, 5).await?;
assert!(
block_contains_raw_tx(&reseq_block, &injected_tx_bytes),
"re-sequenced block should contain the reverted transaction after FCS reset",
);

Ok(())
}

/// Tests that a sequencer and follower node can produce blocks using a custom local genesis
/// configuration and properly propagate them between nodes.
#[tokio::test]
Expand Down Expand Up @@ -2176,3 +2322,7 @@ async fn assert_latest_block_on_rpc_by_hash(
)
.await;
}

fn block_contains_raw_tx(block: &ScrollBlock, raw_tx: &[u8]) -> bool {
block.body.transactions.iter().any(|tx| tx.encoded_2718().as_slice() == raw_tx)
}
Loading