From 70430e63830c909088e36e6044bcbc3b4753a214 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Mon, 31 Mar 2025 11:54:36 -0700 Subject: [PATCH] chore: remove fortuna-specific keeper code --- apps/argus/Cargo.lock | 2 +- apps/argus/config.sample.yaml | 25 -- apps/argus/src/api.rs | 13 +- apps/argus/src/chain/reader.rs | 4 +- apps/argus/src/command/register_provider.rs | 42 +-- apps/argus/src/command/run.rs | 10 +- apps/argus/src/command/setup_provider.rs | 57 +-- apps/argus/src/config.rs | 38 +- apps/argus/src/keeper.rs | 77 +--- apps/argus/src/keeper/block.rs | 384 -------------------- apps/argus/src/keeper/process_event.rs | 131 ------- 11 files changed, 20 insertions(+), 763 deletions(-) delete mode 100644 apps/argus/src/keeper/block.rs delete mode 100644 apps/argus/src/keeper/process_event.rs diff --git a/apps/argus/Cargo.lock b/apps/argus/Cargo.lock index 7569c01028..fd5b9a62c6 100644 --- a/apps/argus/Cargo.lock +++ b/apps/argus/Cargo.lock @@ -1594,7 +1594,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.4.8" +version = "7.4.10" dependencies = [ "anyhow", "axum", diff --git a/apps/argus/config.sample.yaml b/apps/argus/config.sample.yaml index d2fa7fe205..7af9a7e914 100644 --- a/apps/argus/config.sample.yaml +++ b/apps/argus/config.sample.yaml @@ -3,9 +3,6 @@ chains: geth_rpc_addr: https://replicator.pegasus.lightlink.io/rpc/v1 contract_addr: 0x8250f4aF4B972684F7b336503E2D6dFeDeB1487a - # Keeper configuration for the chain - reveal_delay_blocks: 0 - gas_limit: 500000 # Multiplier for the priority fee estimate, as a percentage (i.e., 100 = no change). # Defaults to 100 if the field is omitted. @@ -38,23 +35,7 @@ chains: min_profit_pct: 0 target_profit_pct: 20 max_profit_pct: 100 - - # A list of block delays for processing blocks multiple times. Each number represents - # how many blocks to wait before processing. For example, [5, 10, 20] means process - # blocks after 5 blocks, then again after 10 blocks, and finally after 20 blocks. - block_delays: [5, 10, 20] - - # Historical commitments -- delete this block for local development purposes - commitments: - # prettier-ignore - - seed: [219,125,217,197,234,88,208,120,21,181,172,143,239,102,41,233,167,212,237,106,37,255,184,165,238,121,230,155,116,158,173,48] - chain_length: 10000 - original_commitment_sequence_number: 104 provider: - uri: http://localhost:8080/ - chain_length: 100000 - chain_sample_interval: 10 - # An ethereum wallet address and private key. Generate with `cast wallet new` address: 0xADDRESS private_key: @@ -63,12 +44,6 @@ provider: # For production, you can store the private key in a file. # file: provider-key.txt # A 32 byte random value in hexadecimal - # Generate with `openssl rand -hex 32` - secret: - # For local development, you can hardcode the value here - value: abcd - # For production, you can store the private key in a file. - # file: secret.txt # Set this to the address of your keeper wallet if you would like the keeper wallet to # be able to withdraw fees from the contract. diff --git a/apps/argus/src/api.rs b/apps/argus/src/api.rs index a6663aefc8..6481a73a20 100644 --- a/apps/argus/src/api.rs +++ b/apps/argus/src/api.rs @@ -1,7 +1,5 @@ use { - crate::{ - chain::reader::{BlockNumber, BlockStatus}, - }, + crate::chain::reader::BlockStatus, anyhow::Result, axum::{ body::Body, @@ -47,9 +45,7 @@ pub struct ApiState { } impl ApiState { - pub async fn new( - metrics_registry: Arc>, - ) -> ApiState { + pub async fn new(metrics_registry: Arc>) -> ApiState { let metrics = ApiMetrics { http_requests: Family::default(), }; @@ -68,16 +64,13 @@ impl ApiState { } } -/// The state of the randomness service for a single blockchain. +/// The state of the service for a single blockchain. #[derive(Clone)] pub struct BlockchainState { /// The chain id for this blockchain, useful for logging pub id: ChainId, /// The address of the provider that this server is operating for. pub provider_address: Address, - /// The server will wait for this many block confirmations of a request before revealing - /// the random number. - pub reveal_delay_blocks: BlockNumber, /// The BlockStatus of the block that is considered to be confirmed on the blockchain. /// For eg., Finalized, Safe pub confirmed_block_status: BlockStatus, diff --git a/apps/argus/src/chain/reader.rs b/apps/argus/src/chain/reader.rs index 853ea7b2ba..9d52a47e18 100644 --- a/apps/argus/src/chain/reader.rs +++ b/apps/argus/src/chain/reader.rs @@ -1,6 +1,4 @@ -use { - ethers::types::{Address, BlockNumber as EthersBlockNumber}, -}; +use ethers::types::{Address, BlockNumber as EthersBlockNumber}; pub type BlockNumber = u64; diff --git a/apps/argus/src/command/register_provider.rs b/apps/argus/src/command/register_provider.rs index 6e72a963ff..1236ff5f4b 100644 --- a/apps/argus/src/command/register_provider.rs +++ b/apps/argus/src/command/register_provider.rs @@ -1,14 +1,10 @@ use { crate::{ - api::{get_register_uri, ChainId}, + api::ChainId, chain::ethereum::SignablePythContract, config::{Config, EthereumConfig, ProviderConfig, RegisterProviderOptions}, }, anyhow::{anyhow, Result}, - ethers::{ - abi::Bytes, - types::U256, - }, std::sync::Arc, }; @@ -31,7 +27,7 @@ pub async fn register_provider(opts: &RegisterProviderOptions) -> Result<()> { pub async fn register_provider_from_config( provider_config: &ProviderConfig, - chain_id: &ChainId, + _chain_id: &ChainId, chain_config: &EthereumConfig, ) -> Result<()> { let private_key_string = provider_config.private_key.load()?.ok_or(anyhow!( @@ -39,40 +35,10 @@ pub async fn register_provider_from_config( ))?; // Initialize a Provider to interface with the EVM contract. - let contract = + let _contract = Arc::new(SignablePythContract::from_config(chain_config, &private_key_string).await?); - // Create a new random hash chain. - let random = rand::random::<[u8; 32]>(); - - // FIXME: delete this - let commitment_length = 1000; - // Arguments to the contract to register our new provider. - let fee_in_wei = chain_config.fee; - let commitment = [0; 32]; - // Store the random seed and chain length in the metadata field so that we can regenerate the hash - // chain at-will. (This is secure because you can't generate the chain unless you also have the secret) - let commitment_metadata = CommitmentMetadata { - seed: random, - chain_length: commitment_length, - }; - let uri = get_register_uri(&provider_config.uri, chain_id)?; - let call = contract.register( - fee_in_wei, - commitment, - bincode::serialize(&commitment_metadata)?.into(), - commitment_length, - // Use Bytes to serialize the uri. Most users will be using JS/TS to deserialize this uri. - // Bincode is a different encoding mechanisms, and I didn't find any JS/TS library to parse bincode. - Bytes::from(uri.as_str()).into(), - ); - let mut gas_estimate = call.estimate_gas().await?; - let gas_multiplier = U256::from(2); //TODO: smarter gas estimation - gas_estimate *= gas_multiplier; - let call_with_gas = call.gas(gas_estimate); - if let Some(r) = call_with_gas.send().await?.await? { - tracing::info!("Registered provider: {:?}", r); - } + // TODO: implement registration for Pulse Ok(()) } diff --git a/apps/argus/src/command/run.rs b/apps/argus/src/command/run.rs index 942eecee95..73f320f7dd 100644 --- a/apps/argus/src/command/run.rs +++ b/apps/argus/src/command/run.rs @@ -4,13 +4,13 @@ use { config::{Config, EthereumConfig, RunOptions}, keeper::{self, keeper_metrics::KeeperMetrics}, }, - fortuna::eth_utils::traced_client::{RpcMetrics, TracedClient}, anyhow::{anyhow, Error, Result}, axum::Router, ethers::{ middleware::Middleware, types::{Address, BlockNumber}, }, + fortuna::eth_utils::traced_client::{RpcMetrics, TracedClient}, futures::future::join_all, prometheus_client::{ encoding::EncodeLabelSet, @@ -110,12 +110,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let mut tasks = Vec::new(); for (chain_id, chain_config) in config.chains.clone() { tasks.push(spawn(async move { - let state = setup_chain_state( - &config.provider.address, - &chain_id, - &chain_config, - ) - .await; + let state = setup_chain_state(&config.provider.address, &chain_id, &chain_config).await; (chain_id, state) })); @@ -183,7 +178,6 @@ async fn setup_chain_state( let state = BlockchainState { id: chain_id.clone(), provider_address: *provider, - reveal_delay_blocks: chain_config.reveal_delay_blocks, confirmed_block_status: chain_config.confirmed_block_status, }; Ok(state) diff --git a/apps/argus/src/command/setup_provider.rs b/apps/argus/src/command/setup_provider.rs index 7cebbd9db9..6976c4f3a1 100644 --- a/apps/argus/src/command/setup_provider.rs +++ b/apps/argus/src/command/setup_provider.rs @@ -1,15 +1,14 @@ use { crate::{ - api::{get_register_uri, ChainId}, + api::ChainId, chain::ethereum::{ProviderInfo, SignablePythContract}, command::register_provider::register_provider_from_config, config::{Config, EthereumConfig, SetupProviderOptions}, }, anyhow::{anyhow, Result}, ethers::{ - abi::Bytes as AbiBytes, signers::{LocalWallet, Signer}, - types::{Address, Bytes}, + types::Address, }, futures::future::join_all, std::sync::Arc, @@ -88,11 +87,6 @@ async fn setup_chain_provider( .in_current_span() .await?; - let uri = get_register_uri(&provider_config.uri, chain_id)?; - sync_uri(&contract, &provider_info, uri) - .in_current_span() - .await?; - sync_fee_manager( &contract, &provider_info, @@ -101,34 +95,6 @@ async fn setup_chain_provider( .in_current_span() .await?; - sync_max_num_hashes( - &contract, - &provider_info, - chain_config.max_num_hashes.unwrap_or(0), - ) - .in_current_span() - .await?; - - Ok(()) -} - -async fn sync_uri( - contract: &Arc, - provider_info: &ProviderInfo, - uri: String, -) -> Result<()> { - let uri_as_bytes: Bytes = AbiBytes::from(uri.as_str()).into(); - if provider_info.uri != uri_as_bytes { - tracing::info!("Updating provider uri to {}", uri); - if let Some(receipt) = contract - .set_provider_uri(uri_as_bytes) - .send() - .await? - .await? - { - tracing::info!("Updated provider uri: {:?}", receipt); - } - } Ok(()) } @@ -164,22 +130,3 @@ async fn sync_fee_manager( } Ok(()) } - -async fn sync_max_num_hashes( - contract: &Arc, - provider_info: &ProviderInfo, - max_num_hashes: u32, -) -> Result<()> { - if provider_info.max_num_hashes != max_num_hashes { - tracing::info!("Updating provider max num hashes to {:?}", max_num_hashes); - if let Some(receipt) = contract - .set_max_num_hashes(max_num_hashes) - .send() - .await? - .await? - { - tracing::info!("Updated provider max num hashes to : {:?}", receipt); - } - } - Ok(()) -} diff --git a/apps/argus/src/config.rs b/apps/argus/src/config.rs index cd1769bccb..04f9b21cd4 100644 --- a/apps/argus/src/config.rs +++ b/apps/argus/src/config.rs @@ -1,12 +1,9 @@ use { - crate::{ - api::ChainId, - chain::reader::{BlockNumber, BlockStatus}, - }, - fortuna::eth_utils::utils::EscalationPolicy, + crate::{api::ChainId, chain::reader::BlockStatus}, anyhow::{anyhow, Result}, clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser}, ethers::types::Address, + fortuna::eth_utils::utils::EscalationPolicy, std::{collections::HashMap, fs}, }; pub use { @@ -66,7 +63,7 @@ pub enum Options { pub struct ConfigOptions { /// Path to a configuration file containing the list of supported blockchains #[arg(long = "config")] - #[arg(env = "FORTUNA_CONFIG")] + #[arg(env = "ARGUS_CONFIG")] #[arg(default_value = "config.yaml")] pub config: String, } @@ -117,12 +114,6 @@ pub struct EthereumConfig { /// Address of a Pyth Randomness contract to interact with. pub contract_addr: Address, - /// reveal_delay_blocks - The difference between the block number with the - /// confirmed_block_status(see below) and the block number of a request to - /// Entropy should be greater than `reveal_delay_blocks` for Fortuna to reveal - /// its commitment. - pub reveal_delay_blocks: BlockNumber, - /// The BlockStatus of the block that is considered confirmed. /// For example, Finalized, Safe, Latest #[serde(default)] @@ -136,7 +127,6 @@ pub struct EthereumConfig { pub gas_limit: u64, /// The percentage multiplier to apply to priority fee estimates (100 = no change, e.g. 150 = 150% of base fee) - #[serde(default = "default_priority_fee_multiplier_pct")] pub priority_fee_multiplier_pct: u64, /// The escalation policy governs how the gas limit and fee are increased during backoff retries. @@ -171,24 +161,6 @@ pub struct EthereumConfig { /// How much the provider charges for a request on this chain. #[serde(default)] pub fee: u128, - - /// Maximum number of hashes to record in a request. - /// This should be set according to the maximum gas limit the provider supports for callbacks. - pub max_num_hashes: Option, - - /// A list of delays (in blocks) that indicates how many blocks should be delayed - /// before we process a block. For retry logic, we can process blocks multiple times - /// at each specified delay. For example: [5, 10, 20]. - #[serde(default = "default_block_delays")] - pub block_delays: Vec, -} - -fn default_block_delays() -> Vec { - vec![5] -} - -fn default_priority_fee_multiplier_pct() -> u64 { - 100 } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -272,10 +244,6 @@ impl EscalationPolicyConfig { /// Configuration values that are common to a single provider (and shared across chains). #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ProviderConfig { - /// The URI where clients can retrieve random values from this provider, - /// i.e., wherever fortuna for this provider will be hosted. - pub uri: String, - /// The public key of the provider whose requests the server will respond to. pub address: Address, diff --git a/apps/argus/src/keeper.rs b/apps/argus/src/keeper.rs index 58ecfcca9e..5b625006dd 100644 --- a/apps/argus/src/keeper.rs +++ b/apps/argus/src/keeper.rs @@ -3,36 +3,27 @@ use { api::{BlockchainState, ChainId}, chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract}, config::EthereumConfig, - keeper::block::{ - get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper, - BlockRange, - }, keeper::fee::adjust_fee_wrapper, keeper::fee::withdraw_fees_wrapper, keeper::track::track_accrued_pyth_fees, keeper::track::track_balance, keeper::track::track_provider, }, - fortuna::eth_utils::traced_client::RpcMetrics, ethers::{signers::Signer, types::U256}, + fortuna::eth_utils::traced_client::RpcMetrics, keeper_metrics::{AccountLabel, KeeperMetrics}, - std::{collections::HashSet, sync::Arc}, + std::sync::Arc, tokio::{ spawn, - sync::{mpsc, RwLock}, time::{self, Duration}, }, tracing::{self, Instrument}, }; -pub(crate) mod block; pub(crate) mod fee; pub(crate) mod keeper_metrics; -pub(crate) mod process_event; pub(crate) mod track; -/// How many blocks to look back for events that might be missed when starting the keeper -const BACKLOG_RANGE: u64 = 1000; /// Track metrics in this interval const TRACK_INTERVAL: Duration = Duration::from_secs(10); /// Check whether we need to conduct a withdrawal at this interval. @@ -40,16 +31,6 @@ const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300); /// Check whether we need to adjust the fee at this interval. const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30); -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum RequestState { - /// Fulfilled means that the request was either revealed or we are sure we - /// will not be able to reveal it. - Fulfilled, - /// We have already processed the request but couldn't fulfill it and we are - /// unsure if we can fulfill it or not. - Processed, -} - /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and /// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] @@ -74,58 +55,6 @@ pub async fn run_keeper_threads( ); let keeper_address = contract.wallet().address(); - let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::::new())); - - let latest_safe_block = get_latest_safe_block(contract.clone(), &chain_state).in_current_span().await; - tracing::info!("latest safe block: {}", &latest_safe_block); - - // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. - let gas_limit: U256 = chain_eth_config.gas_limit.into(); - spawn( - process_backlog( - BlockRange { - from: latest_safe_block.saturating_sub(BACKLOG_RANGE), - to: latest_safe_block, - }, - contract.clone(), - gas_limit, - chain_eth_config.escalation_policy.to_policy(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - chain_eth_config.block_delays.clone(), - ) - .in_current_span(), - ); - - let (tx, rx) = mpsc::channel::(1000); - // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. - spawn( - watch_blocks_wrapper( - contract.clone(), - chain_state.clone(), - latest_safe_block, - tx, - chain_eth_config.geth_rpc_wss.clone(), - ) - .in_current_span(), - ); - - // Spawn a thread for block processing with configured delays - spawn( - process_new_blocks( - chain_state.clone(), - rx, - Arc::clone(&contract), - gas_limit, - chain_eth_config.escalation_policy.to_policy(), - metrics.clone(), - fulfilled_requests_cache.clone(), - chain_eth_config.block_delays.clone(), - ) - .in_current_span(), - ); - // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance. spawn( withdraw_fees_wrapper( @@ -152,6 +81,8 @@ pub async fn run_keeper_threads( // near the maximum gas limit. // In the unlikely event that the keeper fees aren't sufficient, the solution to this is to configure the target // fee percentage to be higher on that specific chain. + + // TODO: remove this, the gas limit is set by the consumer now. chain_eth_config.gas_limit, // NOTE: unwrap() here so we panic early if someone configures these values below -100. u64::try_from(100 + chain_eth_config.min_profit_pct) diff --git a/apps/argus/src/keeper/block.rs b/apps/argus/src/keeper/block.rs deleted file mode 100644 index 4941220797..0000000000 --- a/apps/argus/src/keeper/block.rs +++ /dev/null @@ -1,384 +0,0 @@ -use { - crate::{ - api::{self, BlockchainState}, - chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, - keeper::keeper_metrics::KeeperMetrics, - keeper::process_event::process_event_with_backoff, - }, - fortuna::eth_utils::utils::EscalationPolicy, - anyhow::{anyhow, Result}, - ethers::{ - providers::{Middleware, Provider, Ws}, - types::U256, - }, - futures::StreamExt, - std::{collections::HashSet, sync::Arc}, - tokio::{ - spawn, - sync::{mpsc, RwLock}, - time::{self, Duration}, - }, - tracing::{self, Instrument}, -}; - -/// How much to wait before retrying in case of an RPC error -const RETRY_INTERVAL: Duration = Duration::from_secs(5); -/// How many blocks to fetch events for in a single rpc call -const BLOCK_BATCH_SIZE: u64 = 100; -/// How much to wait before polling the next latest block -const POLL_INTERVAL: Duration = Duration::from_secs(2); -/// Retry last N blocks -const RETRY_PREVIOUS_BLOCKS: u64 = 100; - -#[derive(Debug, Clone)] -pub struct BlockRange { - pub from: BlockNumber, - pub to: BlockNumber, -} - -/// Get the latest safe block number for the chain. Retry internally if there is an error. -pub async fn get_latest_safe_block(contract: Arc, chain_state: &BlockchainState) -> BlockNumber { - loop { - match contract - .get_block_number(chain_state.confirmed_block_status) - .await - { - Ok(latest_confirmed_block) => { - tracing::info!( - "Fetched latest safe block {}", - latest_confirmed_block - chain_state.reveal_delay_blocks - ); - return latest_confirmed_block - chain_state.reveal_delay_blocks; - } - Err(e) => { - tracing::error!("Error while getting block number. error: {:?}", e); - time::sleep(RETRY_INTERVAL).await; - } - } - } -} - -/// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch. -#[tracing::instrument(skip_all, fields( - range_from_block = block_range.from, range_to_block = block_range.to -))] -pub async fn process_block_range( - block_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicy, - chain_state: api::BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, -) { - let BlockRange { - from: first_block, - to: last_block, - } = block_range; - let mut current_block = first_block; - while current_block <= last_block { - let mut to_block = current_block + BLOCK_BATCH_SIZE; - if to_block > last_block { - to_block = last_block; - } - - // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future. - process_single_block_batch( - BlockRange { - from: current_block, - to: to_block, - }, - contract.clone(), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - - current_block = to_block + 1; - } -} - -/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch -/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. -/// It won't reprocess it. If the request was already processed, it will reprocess it. -/// If the process fails, it will retry indefinitely. -#[tracing::instrument(name = "batch", skip_all, fields( - batch_from_block = block_range.from, batch_to_block = block_range.to -))] -pub async fn process_single_block_batch( - block_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicy, - chain_state: api::BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, -) { - loop { - let events_res = contract - .get_request_with_callback_events(block_range.from, block_range.to) - .await; - - match events_res { - Ok(events) => { - tracing::info!(num_of_events = &events.len(), "Processing",); - for event in &events { - // the write lock guarantees we spawn only one task per sequence number - let newly_inserted = fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number); - if newly_inserted { - spawn( - process_event_with_backoff( - event.clone(), - chain_state.clone(), - contract.clone(), - gas_limit, - escalation_policy.clone(), - metrics.clone(), - ) - .in_current_span(), - ); - } - } - tracing::info!(num_of_events = &events.len(), "Processed",); - break; - } - Err(e) => { - tracing::error!( - "Error while getting events. Waiting for {} seconds before retry. error: {:?}", - RETRY_INTERVAL.as_secs(), - e - ); - time::sleep(RETRY_INTERVAL).await; - } - } - } -} - -/// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay. -/// It retries indefinitely. -#[tracing::instrument(name = "watch_blocks", skip_all, fields( - initial_safe_block = latest_safe_block -))] -pub async fn watch_blocks_wrapper( - contract: Arc, - chain_state: BlockchainState, - latest_safe_block: BlockNumber, - tx: mpsc::Sender, - geth_rpc_wss: Option, -) { - let mut last_safe_block_processed = latest_safe_block; - loop { - if let Err(e) = watch_blocks( - contract.clone(), - chain_state.clone(), - &mut last_safe_block_processed, - tx.clone(), - geth_rpc_wss.clone(), - ) - .in_current_span() - .await - { - tracing::error!("watching blocks. error: {:?}", e); - time::sleep(RETRY_INTERVAL).await; - } - } -} - -/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel. -/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending -/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even -/// know about it. -pub async fn watch_blocks( - contract: Arc, - chain_state: BlockchainState, - last_safe_block_processed: &mut BlockNumber, - tx: mpsc::Sender, - geth_rpc_wss: Option, -) -> Result<()> { - tracing::info!("Watching blocks to handle new events"); - - let provider_option = match geth_rpc_wss { - Some(wss) => Some(match Provider::::connect(wss.clone()).await { - Ok(provider) => provider, - Err(e) => { - tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e); - return Err(e.into()); - } - }), - None => { - tracing::info!("No wss provided"); - None - } - }; - - let mut stream_option = match provider_option { - Some(ref provider) => Some(match provider.subscribe_blocks().await { - Ok(client) => client, - Err(e) => { - tracing::error!("Error while subscribing to blocks. error {:?}", e); - return Err(e.into()); - } - }), - None => None, - }; - - loop { - match stream_option { - Some(ref mut stream) => { - if stream.next().await.is_none() { - tracing::error!("Error blocks subscription stream ended"); - return Err(anyhow!("Error blocks subscription stream ended")); - } - } - None => { - time::sleep(POLL_INTERVAL).await; - } - } - - let latest_safe_block = get_latest_safe_block(contract.clone(), &chain_state).in_current_span().await; - if latest_safe_block > *last_safe_block_processed { - let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS); - - // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10) - // TODO: add a metric for this in separate PR. We need alerts - // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and - // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc - // to be in consistency after this much time. - if from > *last_safe_block_processed { - from = *last_safe_block_processed; - } - match tx - .send(BlockRange { - from, - to: latest_safe_block, - }) - .await - { - Ok(_) => { - tracing::info!( - from_block = from, - to_block = &latest_safe_block, - "Block range sent to handle events", - ); - *last_safe_block_processed = latest_safe_block; - } - Err(e) => { - tracing::error!( - from_block = from, - to_block = &latest_safe_block, - "Error while sending block range to handle events. These will be handled in next call. error: {:?}", - e - ); - } - }; - } - } -} - -/// It waits on rx channel to receive block ranges and then calls process_block_range to process them -/// for each configured block delay. -#[tracing::instrument(skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn process_new_blocks( - chain_state: BlockchainState, - mut rx: mpsc::Receiver, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicy, - metrics: Arc, - fulfilled_requests_cache: Arc>>, - block_delays: Vec, -) { - tracing::info!("Waiting for new block ranges to process"); - loop { - if let Some(block_range) = rx.recv().await { - // Process blocks immediately first - process_block_range( - block_range.clone(), - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - - // Then process with each configured delay - for delay in &block_delays { - let adjusted_range = BlockRange { - from: block_range.from.saturating_sub(*delay), - to: block_range.to.saturating_sub(*delay), - }; - process_block_range( - adjusted_range, - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - } - } - } -} - -/// Processes the backlog_range for a chain. -/// It processes the backlog range for each configured block delay. -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip_all)] -pub async fn process_backlog( - backlog_range: BlockRange, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicy, - chain_state: BlockchainState, - metrics: Arc, - fulfilled_requests_cache: Arc>>, - block_delays: Vec, -) { - tracing::info!("Processing backlog"); - // Process blocks immediately first - process_block_range( - backlog_range.clone(), - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - - // Then process with each configured delay - for delay in &block_delays { - let adjusted_range = BlockRange { - from: backlog_range.from.saturating_sub(*delay), - to: backlog_range.to.saturating_sub(*delay), - }; - process_block_range( - adjusted_range, - Arc::clone(&contract), - gas_limit, - escalation_policy.clone(), - chain_state.clone(), - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await; - } - tracing::info!("Backlog processed"); -} diff --git a/apps/argus/src/keeper/process_event.rs b/apps/argus/src/keeper/process_event.rs deleted file mode 100644 index b654194e5a..0000000000 --- a/apps/argus/src/keeper/process_event.rs +++ /dev/null @@ -1,131 +0,0 @@ -use { - super::keeper_metrics::{AccountLabel, KeeperMetrics}, - crate::{ - api::BlockchainState, - chain::{ethereum::InstrumentedSignablePythContract, reader::RequestedWithCallbackEvent}, - }, - fortuna::eth_utils::utils::{submit_tx_with_backoff, EscalationPolicy}, - anyhow::Result, - ethers::types::U256, - std::sync::Arc, - tracing, -}; - -/// Process an event with backoff. It will retry the reveal on failure for 5 minutes. -#[tracing::instrument(name = "process_event_with_backoff", skip_all, fields( - sequence_number = event.sequence_number -))] -pub async fn process_event_with_backoff( - event: RequestedWithCallbackEvent, - chain_state: BlockchainState, - contract: Arc, - gas_limit: U256, - escalation_policy: EscalationPolicy, - metrics: Arc, -) -> Result<()> { - // ignore requests that are not for the configured provider - if chain_state.provider_address != event.provider_address { - return Ok(()); - } - - let account_label = AccountLabel { - chain_id: chain_state.id.clone(), - address: chain_state.provider_address.to_string(), - }; - - metrics.requests.get_or_create(&account_label).inc(); - tracing::info!("Started processing event"); - - let provider_revelation = [0; 32]; - - let contract_call = contract.reveal_with_callback( - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ); - - let success = submit_tx_with_backoff( - contract.client(), - contract_call, - gas_limit, - escalation_policy, - ) - .await; - - metrics - .requests_processed - .get_or_create(&account_label) - .inc(); - - match success { - Ok(res) => { - tracing::info!("Processed event successfully in {:?}", res.duration); - - metrics - .requests_processed_success - .get_or_create(&account_label) - .inc(); - - metrics - .request_duration_ms - .get_or_create(&account_label) - .observe(res.duration.as_millis() as f64); - - // Track retry count, gas multiplier, and fee multiplier for successful transactions - metrics - .retry_count - .get_or_create(&account_label) - .observe(res.num_retries as f64); - - metrics - .final_gas_multiplier - .get_or_create(&account_label) - .observe(res.gas_multiplier as f64); - - metrics - .final_fee_multiplier - .get_or_create(&account_label) - .observe(res.fee_multiplier as f64); - - let receipt = res.receipt; - - if let Some(gas_used) = receipt.gas_used { - let gas_used_float = gas_used.as_u128() as f64 / 1e18; - metrics - .total_gas_spent - .get_or_create(&account_label) - .inc_by(gas_used_float); - - if let Some(gas_price) = receipt.effective_gas_price { - let gas_fee = (gas_used * gas_price).as_u128() as f64 / 1e18; - metrics - .total_gas_fee_spent - .get_or_create(&account_label) - .inc_by(gas_fee); - } - } - metrics.reveals.get_or_create(&account_label).inc(); - } - Err(e) => { - // In case the callback did not succeed, we double-check that the request is still on-chain. - // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but - // the RPC gave us an error anyway. - let req = contract - .get_request_wrapper(event.provider_address, event.sequence_number) - .await; - - tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req); - - // We only count failures for cases where we are completely certain that the callback failed. - if req.is_ok_and(|x| x.is_some()) { - metrics - .requests_processed_failure - .get_or_create(&account_label) - .inc(); - } - } - } - - Ok(()) -}