Skip to content

Feat/monitor event stream #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
shellexpand = "3.1"
serde_yaml = { version = "0.9" }


# `msozin/flashblocks-v1.4.1` branch based on `flashblocks-rebase`
rollup-boost = { git = "http://github.com/flashbots/rollup-boost", rev = "8506dfb7d84c65746f7c88d250983658438f59e8" }

Expand Down
8 changes: 8 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub struct OpRbuilderArgs {
#[arg(long = "builder.log-pool-transactions", default_value = "false")]
pub log_pool_transactions: bool,

/// Signals whether to enable the txpool monitor
#[arg(long = "builder.enable-txpool-monitor", default_value = "false")]
pub enable_txpool_monitor: bool,

/// The buffer size for the txpool events
#[arg(long = "builder.txpool-monitor-buffer-size", default_value = "1000")]
pub txpool_monitor_buffer_size: usize,

/// How much time extra to wait for the block building job to complete and not get garbage collected
#[arg(long = "builder.extra-block-deadline-secs", default_value = "20")]
pub extra_block_deadline_secs: u64,
Expand Down
32 changes: 21 additions & 11 deletions crates/op-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use reth_optimism_node::{
node::{OpAddOnsBuilder, OpPoolBuilder},
OpNode,
};
use reth_transaction_pool::TransactionPool;

/// CLI argument parsing.
pub mod args;
Expand All @@ -22,7 +21,7 @@ use metrics::{
VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP,
VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
};
use monitor_tx_pool::monitor_tx_pool;
use monitor_tx_pool::{TransactionPoolMonitor, TxpoolExtApiServer};
use revert_protection::{EthApiOverrideServer, RevertProtectionExt};
use tx::FBPooledTransaction;

Expand Down Expand Up @@ -99,33 +98,44 @@ where
.build(),
)
.extend_rpc_modules(move |ctx| {
let pool = ctx.pool().clone();

if builder_args.enable_revert_protection {
tracing::info!("Revert protection enabled");

let pool = ctx.pool().clone();
let provider = ctx.provider().clone();
let revert_protection_ext = RevertProtectionExt::new(pool, provider);
let revert_protection_ext = RevertProtectionExt::new(pool.clone(), provider);

ctx.modules
.merge_configured(revert_protection_ext.into_rpc())?;
}

Ok(())
})
.on_node_started(move |ctx| {
VERSION.register_version_metrics();
if builder_args.log_pool_transactions {
if builder_args.log_pool_transactions || builder_args.enable_txpool_monitor {
tracing::info!("Logging pool transactions");
ctx.task_executor.spawn_critical(

let tx_monitor = TransactionPoolMonitor::new(
pool,
builder_args.log_pool_transactions,
builder_args.enable_txpool_monitor,
builder_args.txpool_monitor_buffer_size,
);
ctx.modules.merge_configured(tx_monitor.rpc().into_rpc())?;

ctx.node().task_executor.spawn_critical(
"txlogging",
Box::pin(async move {
monitor_tx_pool(ctx.pool.all_transactions_event_listener()).await;
tx_monitor.run().await;
}),
);
}

Ok(())
})
.on_node_started(move |_ctx| {
VERSION.register_version_metrics();

Ok(())
})
.launch()
.await?;

Expand Down
240 changes: 185 additions & 55 deletions crates/op-rbuilder/src/monitor_tx_pool.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,198 @@
use crate::tx::FBPooledTransaction;
use alloy_primitives::TxHash;
use futures_util::StreamExt;
use reth_transaction_pool::{AllTransactionsEvents, FullTransactionEvent};
use jsonrpsee::{
core::{async_trait, SubscriptionResult},
proc_macros::rpc,
PendingSubscriptionSink, SubscriptionMessage,
};
use reth_transaction_pool::{FullTransactionEvent, TransactionEvent, TransactionPool};
use serde::Serialize;
use tokio::sync::broadcast;
use tracing::info;

pub async fn monitor_tx_pool(mut new_transactions: AllTransactionsEvents<FBPooledTransaction>) {
while let Some(event) = new_transactions.next().await {
transaction_event_log(event);
#[rpc(server, namespace = "txpool")]
pub trait TxpoolExtApi {
/// Creates a subscription that returns the txpool events.
#[subscription(name = "subscribeEvents", item = usize)]
fn subscribe_events(&self) -> SubscriptionResult;
}

pub struct TransactionPoolMonitor<Pool> {
pool: Pool,
log_events: bool,
txpool_monitor: bool,
event_sender: broadcast::Sender<TransactionEventData>,
// Keep a receiver to prevent channel from closing
_event_receiver: broadcast::Receiver<TransactionEventData>,
}

impl<Pool> TransactionPoolMonitor<Pool> {
pub fn new(pool: Pool, log_events: bool, txpool_monitor: bool, buffer_size: usize) -> Self {
let (event_sender, _event_receiver) = broadcast::channel(buffer_size);

if log_events {
info!("Logging pool transactions");
}
if txpool_monitor {
info!("Monitoring txpool enabled");
}

Self {
pool,
log_events,
txpool_monitor,
event_sender,
_event_receiver,
}
}
}

fn transaction_event_log(event: FullTransactionEvent<FBPooledTransaction>) {
match event {
FullTransactionEvent::Pending(hash) => {
info!(
target = "monitoring",
tx_hash = hash.to_string(),
kind = "pending",
"Transaction event received"
)
impl<Pool> TransactionPoolMonitor<Pool>
where
Pool: TransactionPool<Transaction = FBPooledTransaction> + Clone + 'static,
{
pub fn rpc(&self) -> TransactionPoolMonitorRpc {
TransactionPoolMonitorRpc {
event_sender: self.event_sender.clone(),
}
FullTransactionEvent::Queued(hash) => {
info!(
target = "monitoring",
tx_hash = hash.to_string(),
kind = "queued",
"Transaction event received"
)
}

pub async fn run(self) {
let mut new_transactions = self.pool.all_transactions_event_listener();

while let Some(event) = new_transactions.next().await {
// Push the event to the buffer
let event_data = TransactionEventData::from(event);
if self.log_events {
info!(
target = "monitoring",
tx_hash = event_data.hash.to_string(),
kind = event_data.kind(),
"Transaction event received"
)
}

if self.txpool_monitor {
println!("Sending event: {:?}", event_data);
let _ = self.event_sender.send(event_data);
}
}
FullTransactionEvent::Mined {
tx_hash,
block_hash,
} => info!(
target = "monitoring",
tx_hash = tx_hash.to_string(),
kind = "mined",
block_hash = block_hash.to_string(),
"Transaction event received"
),
FullTransactionEvent::Replaced {
transaction,
replaced_by,
} => info!(
target = "monitoring",
tx_hash = transaction.hash().to_string(),
kind = "replaced",
replaced_by = replaced_by.to_string(),
"Transaction event received"
),
FullTransactionEvent::Discarded(hash) => {
info!(
target = "monitoring",
tx_hash = hash.to_string(),
kind = "discarded",
"Transaction event received"
)
}
}

pub struct TransactionPoolMonitorRpc {
event_sender: broadcast::Sender<TransactionEventData>,
}

#[async_trait]
impl TxpoolExtApiServer for TransactionPoolMonitorRpc {
fn subscribe_events(
&self,
pending_subscription_sink: PendingSubscriptionSink,
) -> SubscriptionResult {
println!("Subscribing to txpool events");
let mut event_receiver = self.event_sender.subscribe();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the buffer has elements, does this subscribe tries to read those elements first? or if will start at the beginning of the stream?


tokio::spawn(async move {
let sink = match pending_subscription_sink.accept().await {
Ok(sink) => sink,
Err(e) => {
tracing::warn!("failed to accept subscription: {e}");
return;
}
};

println!("Subscribed to txpool events");

loop {
match event_receiver.recv().await {
Ok(event) => {
println!("Received event: {:?}", event);

let msg = SubscriptionMessage::from(
serde_json::value::to_raw_value(&event)
.expect("Failed to serialize event"),
);

if sink.send(msg).await.is_err() {
tracing::debug!("Subscription closed");
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
tracing::warn!("Subscription lagged, some events were dropped");
continue;
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("Event channel closed");
break;
}
}
}
});

Ok(())
}
}

#[derive(Clone, Debug, Serialize)]
struct TransactionEventData {
hash: TxHash,
transaction_event: TransactionEvent,
}

impl TransactionEventData {
pub fn kind(&self) -> &str {
match self.transaction_event {
TransactionEvent::Pending => "pending",
TransactionEvent::Queued => "queued",
TransactionEvent::Mined(_) => "mined",
TransactionEvent::Replaced(_) => "replaced",
TransactionEvent::Discarded => "discarded",
TransactionEvent::Invalid => "invalid",
TransactionEvent::Propagated(_) => "propagated",
}
FullTransactionEvent::Invalid(hash) => {
info!(
target = "monitoring",
tx_hash = hash.to_string(),
kind = "invalid",
"Transaction event received"
)
}
}

impl From<FullTransactionEvent<FBPooledTransaction>> for TransactionEventData {
fn from(event: FullTransactionEvent<FBPooledTransaction>) -> Self {
match event {
FullTransactionEvent::Pending(hash) => Self {
hash,
transaction_event: TransactionEvent::Pending,
},
FullTransactionEvent::Queued(hash) => Self {
hash,
transaction_event: TransactionEvent::Queued,
},
FullTransactionEvent::Mined {
tx_hash,
block_hash,
} => Self {
hash: tx_hash,
transaction_event: TransactionEvent::Mined(block_hash),
},
FullTransactionEvent::Replaced {
transaction,
replaced_by,
} => Self {
hash: *transaction.hash(),
transaction_event: TransactionEvent::Replaced(replaced_by),
},
FullTransactionEvent::Discarded(hash) => Self {
hash,
transaction_event: TransactionEvent::Discarded,
},
FullTransactionEvent::Invalid(hash) => Self {
hash,
transaction_event: TransactionEvent::Invalid,
},
FullTransactionEvent::Propagated(kind) => Self {
hash: TxHash::default(),
transaction_event: TransactionEvent::Propagated(kind),
},
}
FullTransactionEvent::Propagated(_propagated) => {}
}
}
4 changes: 4 additions & 0 deletions crates/op-rbuilder/src/tests/framework/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ impl TestHarnessBuilder {
let builder_data_dir: PathBuf = std::env::temp_dir().join(Uuid::new_v4().to_string());
let builder_auth_rpc_port = get_available_port();
let builder_http_port = get_available_port();
let builder_ws_port = get_available_port();
let mut op_rbuilder_config = OpRbuilderConfig::new()
.chain_config_path(genesis_path.clone())
.data_dir(builder_data_dir)
.auth_rpc_port(builder_auth_rpc_port)
.network_port(get_available_port())
.http_port(builder_http_port)
.ws_port(builder_ws_port)
.with_builder_private_key(BUILDER_PRIVATE_KEY)
.with_revert_protection(self.use_revert_protection)
.with_namespaces(self.namespaces)
Expand Down Expand Up @@ -131,6 +133,7 @@ impl TestHarnessBuilder {
framework: framework,
builder_auth_rpc_port,
builder_http_port,
builder_ws_port,
validator_auth_rpc_port,
builder_log_path,
})
Expand All @@ -141,6 +144,7 @@ pub struct TestHarness {
framework: IntegrationFramework,
builder_auth_rpc_port: u16,
builder_http_port: u16,
pub builder_ws_port: u16,
validator_auth_rpc_port: u16,
builder_log_path: PathBuf,
}
Expand Down
Loading
Loading