Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,19 @@ pub struct EthereumConfig {
/// at each specified delay. For example: [5, 10, 20].
#[serde(default = "default_block_delays")]
pub block_delays: Vec<u64>,

#[serde(default = "default_retry_previous_blocks")]
pub retry_previous_blocks: u64,
}

fn default_block_delays() -> Vec<u64> {
vec![5]
}

fn default_retry_previous_blocks() -> u64 {
100
}

fn default_priority_fee_multiplier_pct() -> u64 {
100
}
Expand Down
10 changes: 9 additions & 1 deletion apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ pub async fn run_keeper_threads(

let (tx, rx) = mpsc::channel::<BlockRange>(1000);
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span());
spawn(
watch_blocks_wrapper(
chain_state.clone(),
latest_safe_block,
tx,
chain_eth_config.retry_previous_blocks,
)
.in_current_span(),
);

// Spawn a thread for block processing with configured delays
spawn(
Expand Down
7 changes: 4 additions & 3 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(5);
const BLOCK_BATCH_SIZE: u64 = 100;
/// How much to wait before polling the next latest block
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Retry last N blocks
const RETRY_PREVIOUS_BLOCKS: u64 = 100;

#[derive(Debug, Clone)]
pub struct BlockRange {
Expand Down Expand Up @@ -196,13 +194,15 @@ pub async fn watch_blocks_wrapper(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
tx: mpsc::Sender<BlockRange>,
retry_previous_blocks: u64,
) {
let mut last_safe_block_processed = latest_safe_block;
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
&mut last_safe_block_processed,
tx.clone(),
retry_previous_blocks,
)
.in_current_span()
.await
Expand All @@ -221,6 +221,7 @@ pub async fn watch_blocks(
chain_state: BlockchainState,
last_safe_block_processed: &mut BlockNumber,
tx: mpsc::Sender<BlockRange>,
retry_previous_blocks: u64,
) -> Result<()> {
tracing::info!("Watching blocks to handle new events");

Expand All @@ -229,7 +230,7 @@ pub async fn watch_blocks(

let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > *last_safe_block_processed {
let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS);
let mut from = latest_safe_block.saturating_sub(retry_previous_blocks);

// In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
// TODO: add a metric for this in separate PR. We need alerts
Expand Down
Loading