From 40ff04e09018845799a18c387fb0ff6a75e8e557 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Mon, 10 Feb 2025 09:46:03 +0100 Subject: [PATCH 1/3] feat: instrument spawned tasks with current tracing span when `tracing` feature is enabled --- README.md | 1 + datafusion-cli/Cargo.lock | 63 ++++++ datafusion-examples/Cargo.toml | 4 +- datafusion-examples/examples/tracing.rs | 127 +++++++++++ datafusion/common-runtime/Cargo.toml | 5 + datafusion/common-runtime/src/common.rs | 3 +- datafusion/common-runtime/src/join_set.rs | 207 ++++++++++++++++++ datafusion/common-runtime/src/lib.rs | 2 + datafusion/core/Cargo.toml | 1 + .../core/src/datasource/file_format/arrow.rs | 3 +- .../src/datasource/file_format/parquet.rs | 3 +- .../file_format/write/orchestration.rs | 3 +- datafusion/core/src/datasource/memory.rs | 2 +- .../core/src/datasource/physical_plan/csv.rs | 2 +- .../core/src/datasource/physical_plan/json.rs | 2 +- .../physical_plan/parquet/writer.rs | 2 +- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +- .../fuzz_cases/aggregation_fuzzer/fuzzer.rs | 2 +- .../fuzz_cases/distinct_count_string_fuzz.rs | 2 +- .../physical-plan/src/execution_plan.rs | 4 +- .../physical-plan/src/repartition/mod.rs | 3 +- datafusion/physical-plan/src/stream.rs | 2 +- 22 files changed, 425 insertions(+), 20 deletions(-) create mode 100644 datafusion-examples/examples/tracing.rs create mode 100644 datafusion/common-runtime/src/join_set.rs diff --git a/README.md b/README.md index 0377306abb68..a10be473d992 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ Optional features: - `backtrace`: include backtrace information in error messages - `pyarrow`: conversions between PyArrow and DataFusion types - `serde`: enable arrow-schema's `serde` feature +- `tracing`: propagates the current span across thread boundaries [apache avro]: https://avro.apache.org/ [apache parquet]: https://parquet.apache.org/ diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a5cf71426607..7f2110ea912b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1341,6 +1341,8 @@ version = "45.0.0" dependencies = [ "log", "tokio", + "tracing", + "tracing-futures", ] [[package]] @@ -3702,6 +3704,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3955,6 +3966,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "thrift" version = "0.17.0" @@ -4165,6 +4186,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -4278,6 +4335,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "version_check" version = "0.9.5" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d90ec3333cb9..ef73671aea6a 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -61,7 +61,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples -datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } @@ -73,6 +73,8 @@ tempfile = { workspace = true } test-utils = { path = "../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.12.1" +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3" } url = { workspace = true } uuid = "1.7" diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/tracing.rs new file mode 100644 index 000000000000..855bbe2f15a1 --- /dev/null +++ b/datafusion-examples/examples/tracing.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates the trace feature in DataFusion’s runtime. +//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those +//! created during repartitioning or when reading Parquet files) are instrumented +//! with the current tracing span, allowing to propagate any existing tracing context. +//! +//! In this example we create a session configured to use multiple partitions, +//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file), +//! and run a query that should trigger parallel execution on multiple threads. +//! We wrap the entire query execution within a custom span and log messages. +//! By inspecting the tracing output, we should see that the tasks spawned +//! internally inherit the span context. + +use arrow::util::pretty::pretty_format_batches; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::prelude::*; +use datafusion::test_util::parquet_test_data; +use std::sync::Arc; +use tracing::{info, instrument, Level}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize a tracing subscriber that prints to stdout. + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_thread_names(true) + .with_max_level(Level::DEBUG) + .init(); + + log::info!("Starting example, this log is not captured by tracing"); + + // execute the query within a tracing span + let result = run_instrumented_query().await; + + info!( + "Finished example. Check the logs above for tracing span details showing \ +that tasks were spawned within the 'run_instrumented_query' span on different threads." + ); + + result +} + +#[instrument(level = "info")] +async fn run_instrumented_query() -> Result<()> { + info!("Starting query execution within the custom tracing span"); + + // The default session will set the number of partitions to `std::thread::available_parallelism()`. + let ctx = SessionContext::new(); + + // Get the path to the test parquet data. + let test_data = parquet_test_data(); + // Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file. + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension("alltypes_tiny_pages_plain.parquet"); + + info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}"); + + // Register a listing table using an absolute URL. + let table_path = format!("file://{test_data}/"); + ctx.register_listing_table( + "alltypes", + &table_path, + listing_options.clone(), + None, + None, + ) + .await + .expect("register_listing_table failed"); + + info!("Registered Parquet table 'alltypes' from {table_path}"); + + // Run a query that will trigger parallel execution on multiple threads. + let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col + FROM ( + SELECT bool_col, date_string_col, string_col FROM alltypes + UNION ALL + SELECT bool_col, date_string_col, string_col FROM alltypes + ) AS t + GROUP BY bool_col, date_string_col, string_col + ORDER BY 1,2,3,4 DESC + LIMIT 5;"; + info!(%sql, "Executing SQL query"); + let df = ctx.sql(sql).await?; + + let results: Vec = df.collect().await?; + info!("Query execution complete"); + + // Print out the results and tracing output. + datafusion::common::assert_batches_eq!( + [ + "+----------+----------+-----------------+------------+", + "| count(*) | bool_col | date_string_col | string_col |", + "+----------+----------+-----------------+------------+", + "| 2 | false | 01/01/09 | 9 |", + "| 2 | false | 01/01/09 | 7 |", + "| 2 | false | 01/01/09 | 5 |", + "| 2 | false | 01/01/09 | 3 |", + "| 2 | false | 01/01/09 | 1 |", + "+----------+----------+-----------------+------------+", + ], + &results + ); + + info!("Query results:\n{}", pretty_format_batches(&results)?); + + Ok(()) +} diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index a21c72cd9f83..80fcedeb6598 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -31,6 +31,9 @@ rust-version = { workspace = true } [lints] workspace = true +[features] +tracing = ["dep:tracing", "dep:tracing-futures"] + [lib] name = "datafusion_common_runtime" path = "src/lib.rs" @@ -38,6 +41,8 @@ path = "src/lib.rs" [dependencies] log = { workspace = true } tokio = { workspace = true } +tracing = { version = "0.1", optional = true } +tracing-futures = { version = "0.2", optional = true } [dev-dependencies] tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } diff --git a/datafusion/common-runtime/src/common.rs b/datafusion/common-runtime/src/common.rs index 30f7526bc0b2..361f6af95cf1 100644 --- a/datafusion/common-runtime/src/common.rs +++ b/datafusion/common-runtime/src/common.rs @@ -17,7 +17,8 @@ use std::future::Future; -use tokio::task::{JoinError, JoinSet}; +use crate::JoinSet; +use tokio::task::JoinError; /// Helper that provides a simple API to spawn a single task and join it. /// Provides guarantees of aborting on `Drop` to keep it cancel-safe. diff --git a/datafusion/common-runtime/src/join_set.rs b/datafusion/common-runtime/src/join_set.rs new file mode 100644 index 000000000000..1852302d5a63 --- /dev/null +++ b/datafusion/common-runtime/src/join_set.rs @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::future::Future; +use std::task::{Context, Poll}; +use tokio::runtime::Handle; +use tokio::task::JoinSet as TokioJoinSet; +use tokio::task::{AbortHandle, Id, JoinError, LocalSet}; + +#[cfg(feature = "tracing")] +mod trace_utils { + use std::future::Future; + use tracing_futures::Instrument; + + /// Instruments the provided future with the current tracing span. + pub fn trace_future(future: F) -> impl Future + Send + where + F: Future + Send + 'static, + T: Send, + { + future.in_current_span() + } + + /// Wraps the provided blocking function to execute within the current tracing span. + pub fn trace_block(f: F) -> impl FnOnce() -> T + Send + 'static + where + F: FnOnce() -> T + Send + 'static, + T: Send, + { + let span = tracing::Span::current(); + move || span.in_scope(f) + } +} + +/// A wrapper around Tokio's [`JoinSet`](tokio::task::JoinSet) that transparently forwards all public API calls +/// while optionally instrumenting spawned tasks and blocking closures with the current tracing span +/// when the `tracing` feature is enabled. +#[derive(Debug)] +pub struct JoinSet { + inner: TokioJoinSet, +} + +impl Default for JoinSet { + fn default() -> Self { + Self::new() + } +} + +impl JoinSet { + /// [JoinSet::new](tokio::task::JoinSet::new) - Create a new JoinSet. + pub fn new() -> Self { + Self { + inner: TokioJoinSet::new(), + } + } + + /// [JoinSet::len](tokio::task::JoinSet::len) - Return the number of tasks. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// [JoinSet::is_empty](tokio::task::JoinSet::is_empty) - Check if the JoinSet is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} +impl JoinSet { + /// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task. + pub fn spawn(&mut self, task: F) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + #[cfg(feature = "tracing")] + let task = trace_utils::trace_future(task); + + self.inner.spawn(task) + } + + /// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime. + pub fn spawn_on(&mut self, task: F, handle: &Handle) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + #[cfg(feature = "tracing")] + let task = trace_utils::trace_future(task); + + self.inner.spawn_on(task, handle) + } + + /// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task. + pub fn spawn_local(&mut self, task: F) -> AbortHandle + where + F: Future, + F: 'static, + { + self.inner.spawn_local(task) + } + + /// [JoinSet::spawn_local_on](tokio::task::JoinSet::spawn_local_on) - Spawn a local task on a provided LocalSet. + pub fn spawn_local_on(&mut self, task: F, local_set: &LocalSet) -> AbortHandle + where + F: Future, + F: 'static, + { + self.inner.spawn_local_on(task, local_set) + } + + /// [JoinSet::spawn_blocking](tokio::task::JoinSet::spawn_blocking) - Spawn a blocking task. + pub fn spawn_blocking(&mut self, f: F) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + #[cfg(feature = "tracing")] + let f = trace_utils::trace_block(f); + + self.inner.spawn_blocking(f) + } + + /// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime. + pub fn spawn_blocking_on(&mut self, f: F, handle: &Handle) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + #[cfg(feature = "tracing")] + let f = trace_utils::trace_block(f); + + self.inner.spawn_blocking_on(f, handle) + } + + /// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task. + pub async fn join_next(&mut self) -> Option> { + self.inner.join_next().await + } + + /// [JoinSet::try_join_next](tokio::task::JoinSet::try_join_next) - Try to join the next completed task. + pub fn try_join_next(&mut self) -> Option> { + self.inner.try_join_next() + } + + /// [JoinSet::abort_all](tokio::task::JoinSet::abort_all) - Abort all tasks. + pub fn abort_all(&mut self) { + self.inner.abort_all() + } + + /// [JoinSet::detach_all](tokio::task::JoinSet::detach_all) - Detach all tasks. + pub fn detach_all(&mut self) { + self.inner.detach_all() + } + + /// [JoinSet::poll_join_next](tokio::task::JoinSet::poll_join_next) - Poll for the next completed task. + pub fn poll_join_next( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.inner.poll_join_next(cx) + } + + /// [JoinSet::join_next_with_id](tokio::task::JoinSet::join_next_with_id) - Await the next completed task with its ID. + pub async fn join_next_with_id(&mut self) -> Option> { + self.inner.join_next_with_id().await + } + + /// [JoinSet::try_join_next_with_id](tokio::task::JoinSet::try_join_next_with_id) - Try to join the next completed task with its ID. + pub fn try_join_next_with_id(&mut self) -> Option> { + self.inner.try_join_next_with_id() + } + + /// [JoinSet::poll_join_next_with_id](tokio::task::JoinSet::poll_join_next_with_id) - Poll for the next completed task with its ID. + pub fn poll_join_next_with_id( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.inner.poll_join_next_with_id(cx) + } + + /// [JoinSet::shutdown](tokio::task::JoinSet::shutdown) - Abort all tasks and wait for shutdown. + pub async fn shutdown(&mut self) { + self.inner.shutdown().await + } + + /// [JoinSet::join_all](tokio::task::JoinSet::join_all) - Await all tasks. + pub async fn join_all(self) -> Vec { + self.inner.join_all().await + } +} diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index 51cb988ea06a..e07f8f481e19 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -19,5 +19,7 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod common; +mod join_set; pub use common::SpawnedTask; +pub use join_set::JoinSet; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c9b059ad0f40..9141572b805a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -76,6 +76,7 @@ recursive_protection = [ ] serde = ["arrow-schema/serde"] string_expressions = ["datafusion-functions/string_expressions"] +tracing = ["datafusion-common-runtime/tracing"] unicode_expressions = [ "datafusion-sql/unicode_expressions", "datafusion-functions/unicode_expressions", diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 1d9827ae0ab5..d6ca40b4ef15 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -47,7 +47,7 @@ use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, }; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr::PhysicalExpr; @@ -60,7 +60,6 @@ use futures::stream::BoxStream; use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4c7169764a76..9ad0620f57ae 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -56,7 +56,7 @@ use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, DEFAULT_PARQUET_EXTENSION, }; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; @@ -87,7 +87,6 @@ use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::task::JoinSet; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 7a271def7dc6..c8e167ee44e9 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -28,7 +28,7 @@ use crate::error::Result; use arrow_array::RecordBatch; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::TaskContext; use bytes::Bytes; @@ -36,7 +36,6 @@ use futures::join; use object_store::ObjectStore; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; -use tokio::task::JoinSet; type WriterType = Box; type SerializerType = Arc; diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 31239ed332ae..da6801398ff2 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_catalog::Session; use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; use datafusion_expr::SortExpr; @@ -48,7 +49,6 @@ use futures::StreamExt; use log::debug; use parking_lot::Mutex; use tokio::sync::RwLock; -use tokio::task::JoinSet; /// Type alias for partition data pub type PartitionData = Arc>>; diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 55971f6f627c..26f883be8df5 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -41,6 +41,7 @@ use arrow::csv; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::Constraints; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; @@ -52,7 +53,6 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Execution plan for scanning a CSV file. /// diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 7ac062e549c4..8280b4c0014c 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -40,6 +40,7 @@ use crate::physical_plan::{ use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_common::Constraints; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -48,7 +49,6 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Execution plan for scanning NdJson data source #[derive(Debug, Clone)] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs index 00926dc2330b..8fb46092e519 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs @@ -17,6 +17,7 @@ use crate::datasource::listing::ListingTableUrl; use datafusion_common::DataFusionError; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use futures::StreamExt; @@ -25,7 +26,6 @@ use object_store::path::Path; use parquet::arrow::AsyncArrowWriter; use parquet::file::properties::WriterProperties; use std::sync::Arc; -use tokio::task::JoinSet; /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index bcd88bae739a..c54fedbaadac 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -37,6 +37,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common_runtime::JoinSet; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; @@ -51,7 +52,6 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; use rand::rngs::StdRng; use rand::{thread_rng, Rng, SeedableRng}; use std::str; -use tokio::task::JoinSet; // ======================================================================== // The new aggregation fuzz tests based on [`AggregationFuzzer`] diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index d021e73f35b2..6c983f4c9db6 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use arrow::util::pretty::pretty_format_batches; use arrow_array::RecordBatch; use datafusion_common::{DataFusionError, Result}; +use datafusion_common_runtime::JoinSet; use rand::{thread_rng, Rng}; -use tokio::task::JoinSet; use crate::fuzz_cases::aggregation_fuzzer::{ check_equality_of_batches, diff --git a/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs b/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs index 64b858cebc84..ac2c74ee0af0 100644 --- a/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs @@ -24,8 +24,8 @@ use arrow_array::{Array, OffsetSizeTrait}; use arrow_array::cast::AsArray; use datafusion::datasource::MemTable; +use datafusion_common_runtime::JoinSet; use std::collections::HashSet; -use tokio::task::JoinSet; use datafusion::prelude::{SessionConfig, SessionContext}; use test_utils::StringBatchGenerator; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a54b46111f53..d68dafb9b13a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -47,12 +47,12 @@ use arrow::record_batch::RecordBatch; use arrow_array::Array; use datafusion_common::config::ConfigOptions; use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::{StreamExt, TryStreamExt}; -use tokio::task::JoinSet; /// Represent nodes in the DataFusion Physical Plan. /// @@ -265,7 +265,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// for structures to help ensure all background tasks are cancelled. /// /// [`spawn`]: tokio::task::spawn - /// [`JoinSet`]: tokio::task::JoinSet + /// [`JoinSet`]: datafusion_common_runtime::JoinSet /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder /// diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 8d180c212eba..709bb92f0caf 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1067,10 +1067,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::{arrow_datafusion_err, assert_batches_sorted_eq, exec_err}; + use datafusion_common_runtime::JoinSet; use datafusion_execution::runtime_env::RuntimeEnvBuilder; - use tokio::task::JoinSet; - #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { // define input partitions diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 331cded165a8..223080945cd0 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -28,6 +28,7 @@ use crate::displayable; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{internal_err, Result}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use futures::stream::BoxStream; @@ -35,7 +36,6 @@ use futures::{Future, Stream, StreamExt}; use log::debug; use pin_project_lite::pin_project; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::task::JoinSet; /// Creates a stream from a collection of producing tasks, routing panics to the stream. /// From 0090658142c6c791ad342d06c04e21060fe8a1ab Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Tue, 4 Mar 2025 10:45:40 +0100 Subject: [PATCH 2/3] feat: allowing injecting custom join_set tracer to avoid dependency on `tracing` crate --- README.md | 1 - datafusion-cli/Cargo.lock | 13 +- datafusion-examples/Cargo.toml | 2 +- datafusion-examples/examples/tracing.rs | 166 ++++++++++--------- datafusion/common-runtime/Cargo.toml | 6 +- datafusion/common-runtime/src/join_set.rs | 184 ++++++++++++++++++---- datafusion/common-runtime/src/lib.rs | 2 + datafusion/core/Cargo.toml | 1 - 8 files changed, 243 insertions(+), 132 deletions(-) diff --git a/README.md b/README.md index a10be473d992..0377306abb68 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,6 @@ Optional features: - `backtrace`: include backtrace information in error messages - `pyarrow`: conversions between PyArrow and DataFusion types - `serde`: enable arrow-schema's `serde` feature -- `tracing`: propagates the current span across thread boundaries [apache avro]: https://avro.apache.org/ [apache parquet]: https://parquet.apache.org/ diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 7f2110ea912b..9094dae5918d 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1339,10 +1339,9 @@ dependencies = [ name = "datafusion-common-runtime" version = "45.0.0" dependencies = [ + "futures", "log", "tokio", - "tracing", - "tracing-futures", ] [[package]] @@ -4189,16 +4188,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "tracing-log" version = "0.2.0" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ef73671aea6a..ced94f8d29a7 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -61,7 +61,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples -datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] } +datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/tracing.rs index 855bbe2f15a1..1d6d302f55dd 100644 --- a/datafusion-examples/examples/tracing.rs +++ b/datafusion-examples/examples/tracing.rs @@ -15,113 +15,125 @@ // specific language governing permissions and limitations // under the License. -//! This example demonstrates the trace feature in DataFusion’s runtime. -//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those -//! created during repartitioning or when reading Parquet files) are instrumented -//! with the current tracing span, allowing to propagate any existing tracing context. +//! This example demonstrates the tracing injection feature for the DataFusion runtime. +//! Tasks spawned on new threads behave differently depending on whether a tracer is injected. +//! The log output clearly distinguishes the two cases. //! -//! In this example we create a session configured to use multiple partitions, -//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file), -//! and run a query that should trigger parallel execution on multiple threads. -//! We wrap the entire query execution within a custom span and log messages. -//! By inspecting the tracing output, we should see that the tasks spawned -//! internally inherit the span context. - -use arrow::util::pretty::pretty_format_batches; -use datafusion::arrow::record_batch::RecordBatch; +//! # Expected Log Output +//! +//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads +//! will _not_ include the `run_instrumented_query` span: +//! +//! ```text +//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER ***** +//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms +//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span ***** +//! ``` +//! +//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span: +//! +//! ```text +//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer... +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER ***** +//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span ***** +//! ``` + +use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; use datafusion::test_util::parquet_test_data; +use futures::future::BoxFuture; +use futures::FutureExt; +use std::any::Any; use std::sync::Arc; -use tracing::{info, instrument, Level}; +use tracing::{info, instrument, Instrument, Level, Span}; #[tokio::main] async fn main() -> Result<()> { - // Initialize a tracing subscriber that prints to stdout. + // Initialize tracing subscriber with thread info. tracing_subscriber::fmt() .with_thread_ids(true) .with_thread_names(true) .with_max_level(Level::DEBUG) .init(); - log::info!("Starting example, this log is not captured by tracing"); + // Run query WITHOUT tracer injection. + info!("***** RUNNING WITHOUT INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****"); + + // Inject custom tracer so tasks run in the current span. + info!("Injecting custom tracer..."); + set_join_set_tracer(&SpanTracer).expect("Failed to set tracer"); - // execute the query within a tracing span - let result = run_instrumented_query().await; + // Run query WITH tracer injection. + info!("***** RUNNING WITH INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****"); - info!( - "Finished example. Check the logs above for tracing span details showing \ -that tasks were spawned within the 'run_instrumented_query' span on different threads." - ); + Ok(()) +} - result +/// A simple tracer that ensures any spawned task or blocking closure +/// inherits the current span via `in_current_span`. +struct SpanTracer; + +/// Implement the `JoinSetTracer` trait so we can inject instrumentation +/// for both async futures and blocking closures. +impl JoinSetTracer for SpanTracer { + /// Instruments a boxed future to run in the current span. The future's + /// return type is erased to `Box`, which we simply + /// run inside the `Span::current()` context. + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box> { + fut.in_current_span().boxed() + } + + /// Instruments a boxed blocking closure by running it inside the + /// `Span::current()` context. + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send> { + let span = Span::current(); + Box::new(move || span.in_scope(|| f())) + } } #[instrument(level = "info")] async fn run_instrumented_query() -> Result<()> { - info!("Starting query execution within the custom tracing span"); + info!("Starting query execution"); - // The default session will set the number of partitions to `std::thread::available_parallelism()`. let ctx = SessionContext::new(); - - // Get the path to the test parquet data. let test_data = parquet_test_data(); - // Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file. let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension("alltypes_tiny_pages_plain.parquet"); - info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}"); - - // Register a listing table using an absolute URL. let table_path = format!("file://{test_data}/"); - ctx.register_listing_table( - "alltypes", - &table_path, - listing_options.clone(), - None, - None, - ) - .await - .expect("register_listing_table failed"); - - info!("Registered Parquet table 'alltypes' from {table_path}"); - - // Run a query that will trigger parallel execution on multiple threads. - let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col - FROM ( - SELECT bool_col, date_string_col, string_col FROM alltypes - UNION ALL - SELECT bool_col, date_string_col, string_col FROM alltypes - ) AS t - GROUP BY bool_col, date_string_col, string_col - ORDER BY 1,2,3,4 DESC - LIMIT 5;"; - info!(%sql, "Executing SQL query"); - let df = ctx.sql(sql).await?; - - let results: Vec = df.collect().await?; - info!("Query execution complete"); - - // Print out the results and tracing output. - datafusion::common::assert_batches_eq!( - [ - "+----------+----------+-----------------+------------+", - "| count(*) | bool_col | date_string_col | string_col |", - "+----------+----------+-----------------+------------+", - "| 2 | false | 01/01/09 | 9 |", - "| 2 | false | 01/01/09 | 7 |", - "| 2 | false | 01/01/09 | 5 |", - "| 2 | false | 01/01/09 | 3 |", - "| 2 | false | 01/01/09 | 1 |", - "+----------+----------+-----------------+------------+", - ], - &results - ); - - info!("Query results:\n{}", pretty_format_batches(&results)?); - + info!("Registering table 'alltypes' from {}", table_path); + ctx.register_listing_table("alltypes", &table_path, listing_options, None, None) + .await + .expect("Failed to register table"); + + let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"; + info!(sql, "Executing SQL query"); + let result = ctx.sql(sql).await?.collect().await?; + info!("Query complete: {} batches returned", result.len()); Ok(()) } diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index 80fcedeb6598..57ef24866bca 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -31,18 +31,14 @@ rust-version = { workspace = true } [lints] workspace = true -[features] -tracing = ["dep:tracing", "dep:tracing-futures"] - [lib] name = "datafusion_common_runtime" path = "src/lib.rs" [dependencies] +futures = { workspace = true } log = { workspace = true } tokio = { workspace = true } -tracing = { version = "0.1", optional = true } -tracing-futures = { version = "0.2", optional = true } [dev-dependencies] tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } diff --git a/datafusion/common-runtime/src/join_set.rs b/datafusion/common-runtime/src/join_set.rs index 1852302d5a63..d2eeb05a6e73 100644 --- a/datafusion/common-runtime/src/join_set.rs +++ b/datafusion/common-runtime/src/join_set.rs @@ -15,43 +15,167 @@ // specific language governing permissions and limitations // under the License. +use futures::FutureExt; +use std::any::Any; use std::future::Future; use std::task::{Context, Poll}; use tokio::runtime::Handle; -use tokio::task::JoinSet as TokioJoinSet; use tokio::task::{AbortHandle, Id, JoinError, LocalSet}; -#[cfg(feature = "tracing")] -mod trace_utils { - use std::future::Future; - use tracing_futures::Instrument; +pub mod trace_utils { + use super::*; + use futures::future::BoxFuture; + use tokio::sync::OnceCell; - /// Instruments the provided future with the current tracing span. - pub fn trace_future(future: F) -> impl Future + Send + /// A trait for injecting instrumentation into either asynchronous futures or + /// blocking closures at runtime. + pub trait JoinSetTracer: Send + Sync + 'static { + /// Function pointer type for tracing a future. + /// + /// This function takes a boxed future (with its output type erased) + /// and returns a boxed future (with its output still erased). The + /// tracer must apply instrumentation without altering the output. + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box>; + + /// Function pointer type for tracing a blocking closure. + /// + /// This function takes a boxed closure (with its return type erased) + /// and returns a boxed closure (with its return type still erased). The + /// tracer must apply instrumentation without changing the return value. + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send>; + } + + /// A no-op tracer that does not modify or instrument any futures or closures. + /// This is used as a fallback if no custom tracer is set. + struct NoopTracer; + + impl JoinSetTracer for NoopTracer { + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box> { + fut + } + + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send> { + f + } + } + + /// Global storage for an injected tracer. If no tracer is injected, a no-op + /// tracer is used instead. This ensures that calls to [`trace_future`] or + /// [`trace_block`] never panic due to missing instrumentation. + static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new(); + + /// A no-op tracer singleton that is returned by [`get_tracer`] if no custom + /// tracer has been registered. + static NOOP_TRACER: NoopTracer = NoopTracer; + + /// Return the currently registered tracer, or the no-op tracer if none was + /// registered. + #[inline] + fn get_tracer() -> &'static dyn JoinSetTracer { + GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER) + } + + /// Set the custom tracer for both futures and blocking closures. + /// + /// This should be called once at startup. If called more than once, an + /// `Err(())` is returned. If not called at all, a no-op tracer that does nothing + /// is used. + pub fn set_join_set_tracer(tracer: &'static dyn JoinSetTracer) -> Result<(), ()> { + GLOBAL_TRACER.set(tracer).map_err(|_| ()) + } + + /// Optionally instruments a future with custom tracing. + /// + /// If a tracer has been injected via `set_tracer`, the future's output is + /// boxed (erasing its type), passed to the tracer, and then downcast back + /// to the expected type. If no tracer is set, the original future is returned. + /// + /// # Type Parameters + /// * `T` - The concrete output type of the future. + /// * `F` - The future type. + /// + /// # Parameters + /// * `future` - The future to potentially instrument. + pub fn trace_future(future: F) -> BoxFuture<'static, T> where F: Future + Send + 'static, - T: Send, + T: Send + 'static, { - future.in_current_span() + // Erase the future’s output type first: + let erased_future = async move { + let result = future.await; + Box::new(result) as Box + } + .boxed(); + + // Forward through the global tracer: + get_tracer() + .trace_future(erased_future) + // Downcast from `Box` back to `T`: + .map(|any_box| { + *any_box + .downcast::() + .expect("Tracer must preserve the future’s output type!") + }) + .boxed() } - /// Wraps the provided blocking function to execute within the current tracing span. - pub fn trace_block(f: F) -> impl FnOnce() -> T + Send + 'static + /// Optionally instruments a blocking closure with custom tracing. + /// + /// If a tracer has been injected via `set_tracer`, the closure is wrapped so that + /// its return value is boxed (erasing its type), passed to the tracer, and then the + /// result is downcast back to the original type. If no tracer is set, the closure is + /// returned unmodified (except for being boxed). + /// + /// # Type Parameters + /// * `T` - The concrete return type of the closure. + /// * `F` - The closure type. + /// + /// # Parameters + /// * `f` - The blocking closure to potentially instrument. + pub fn trace_block(f: F) -> Box T + Send> where F: FnOnce() -> T + Send + 'static, - T: Send, + T: Send + 'static, { - let span = tracing::Span::current(); - move || span.in_scope(f) + // Erase the closure’s return type first: + let erased_closure = Box::new(|| { + let result = f(); + Box::new(result) as Box + }); + + // Forward through the global tracer: + let traced_closure = get_tracer().trace_block(erased_closure); + + // Downcast from `Box` back to `T`: + Box::new(move || { + let any_box = traced_closure(); + *any_box + .downcast::() + .expect("Tracer must preserve the closure’s return type!") + }) } } -/// A wrapper around Tokio's [`JoinSet`](tokio::task::JoinSet) that transparently forwards all public API calls -/// while optionally instrumenting spawned tasks and blocking closures with the current tracing span -/// when the `tracing` feature is enabled. +/// A wrapper around Tokio's JoinSet that forwards all API calls while optionally +/// instrumenting spawned tasks and blocking closures with custom tracing behavior. +/// If no tracer is injected via `trace_utils::set_tracer`, tasks and closures are executed +/// without any instrumentation. #[derive(Debug)] pub struct JoinSet { - inner: TokioJoinSet, + inner: tokio::task::JoinSet, } impl Default for JoinSet { @@ -64,7 +188,7 @@ impl JoinSet { /// [JoinSet::new](tokio::task::JoinSet::new) - Create a new JoinSet. pub fn new() -> Self { Self { - inner: TokioJoinSet::new(), + inner: tokio::task::JoinSet::new(), } } @@ -78,6 +202,7 @@ impl JoinSet { self.inner.is_empty() } } + impl JoinSet { /// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task. pub fn spawn(&mut self, task: F) -> AbortHandle @@ -86,10 +211,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let task = trace_utils::trace_future(task); - - self.inner.spawn(task) + self.inner.spawn(trace_utils::trace_future(task)) } /// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime. @@ -99,10 +221,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let task = trace_utils::trace_future(task); - - self.inner.spawn_on(task, handle) + self.inner.spawn_on(trace_utils::trace_future(task), handle) } /// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task. @@ -130,10 +249,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let f = trace_utils::trace_block(f); - - self.inner.spawn_blocking(f) + self.inner.spawn_blocking(trace_utils::trace_block(f)) } /// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime. @@ -143,10 +259,8 @@ impl JoinSet { F: Send + 'static, T: Send, { - #[cfg(feature = "tracing")] - let f = trace_utils::trace_block(f); - - self.inner.spawn_blocking_on(f, handle) + self.inner + .spawn_blocking_on(trace_utils::trace_block(f), handle) } /// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task. diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index e07f8f481e19..c3a1c57015c3 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -22,4 +22,6 @@ pub mod common; mod join_set; pub use common::SpawnedTask; +pub use join_set::trace_utils::set_join_set_tracer; +pub use join_set::trace_utils::JoinSetTracer; pub use join_set::JoinSet; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 9141572b805a..c9b059ad0f40 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -76,7 +76,6 @@ recursive_protection = [ ] serde = ["arrow-schema/serde"] string_expressions = ["datafusion-functions/string_expressions"] -tracing = ["datafusion-common-runtime/tracing"] unicode_expressions = [ "datafusion-sql/unicode_expressions", "datafusion-functions/unicode_expressions", From 464d0d1db4a57e0aba9268c025045f66ba6ffa6e Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Tue, 4 Mar 2025 13:50:14 +0100 Subject: [PATCH 3/3] feat: move the `join_set` tracing utils to a dedicated file --- datafusion-examples/Cargo.toml | 2 +- datafusion-examples/examples/tracing.rs | 2 +- datafusion/common-runtime/src/join_set.rs | 159 +--------------- datafusion/common-runtime/src/lib.rs | 4 +- datafusion/common-runtime/src/trace_utils.rs | 188 +++++++++++++++++++ 5 files changed, 197 insertions(+), 158 deletions(-) create mode 100644 datafusion/common-runtime/src/trace_utils.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ced94f8d29a7..0890de3a577a 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -74,7 +74,7 @@ test-utils = { path = "../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.12.1" tracing = { version = "0.1" } -tracing-subscriber = { version = "0.3" } +tracing-subscriber = { version = "0.3" } url = { workspace = true } uuid = "1.7" diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/tracing.rs index 1d6d302f55dd..334ee0f4e568 100644 --- a/datafusion-examples/examples/tracing.rs +++ b/datafusion-examples/examples/tracing.rs @@ -111,7 +111,7 @@ impl JoinSetTracer for SpanTracer { f: Box Box + Send>, ) -> Box Box + Send> { let span = Span::current(); - Box::new(move || span.in_scope(|| f())) + Box::new(move || span.in_scope(f)) } } diff --git a/datafusion/common-runtime/src/join_set.rs b/datafusion/common-runtime/src/join_set.rs index d2eeb05a6e73..1857a4111dbc 100644 --- a/datafusion/common-runtime/src/join_set.rs +++ b/datafusion/common-runtime/src/join_set.rs @@ -15,160 +15,12 @@ // specific language governing permissions and limitations // under the License. -use futures::FutureExt; -use std::any::Any; +use crate::trace_utils::{trace_block, trace_future}; use std::future::Future; use std::task::{Context, Poll}; use tokio::runtime::Handle; use tokio::task::{AbortHandle, Id, JoinError, LocalSet}; -pub mod trace_utils { - use super::*; - use futures::future::BoxFuture; - use tokio::sync::OnceCell; - - /// A trait for injecting instrumentation into either asynchronous futures or - /// blocking closures at runtime. - pub trait JoinSetTracer: Send + Sync + 'static { - /// Function pointer type for tracing a future. - /// - /// This function takes a boxed future (with its output type erased) - /// and returns a boxed future (with its output still erased). The - /// tracer must apply instrumentation without altering the output. - fn trace_future( - &self, - fut: BoxFuture<'static, Box>, - ) -> BoxFuture<'static, Box>; - - /// Function pointer type for tracing a blocking closure. - /// - /// This function takes a boxed closure (with its return type erased) - /// and returns a boxed closure (with its return type still erased). The - /// tracer must apply instrumentation without changing the return value. - fn trace_block( - &self, - f: Box Box + Send>, - ) -> Box Box + Send>; - } - - /// A no-op tracer that does not modify or instrument any futures or closures. - /// This is used as a fallback if no custom tracer is set. - struct NoopTracer; - - impl JoinSetTracer for NoopTracer { - fn trace_future( - &self, - fut: BoxFuture<'static, Box>, - ) -> BoxFuture<'static, Box> { - fut - } - - fn trace_block( - &self, - f: Box Box + Send>, - ) -> Box Box + Send> { - f - } - } - - /// Global storage for an injected tracer. If no tracer is injected, a no-op - /// tracer is used instead. This ensures that calls to [`trace_future`] or - /// [`trace_block`] never panic due to missing instrumentation. - static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new(); - - /// A no-op tracer singleton that is returned by [`get_tracer`] if no custom - /// tracer has been registered. - static NOOP_TRACER: NoopTracer = NoopTracer; - - /// Return the currently registered tracer, or the no-op tracer if none was - /// registered. - #[inline] - fn get_tracer() -> &'static dyn JoinSetTracer { - GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER) - } - - /// Set the custom tracer for both futures and blocking closures. - /// - /// This should be called once at startup. If called more than once, an - /// `Err(())` is returned. If not called at all, a no-op tracer that does nothing - /// is used. - pub fn set_join_set_tracer(tracer: &'static dyn JoinSetTracer) -> Result<(), ()> { - GLOBAL_TRACER.set(tracer).map_err(|_| ()) - } - - /// Optionally instruments a future with custom tracing. - /// - /// If a tracer has been injected via `set_tracer`, the future's output is - /// boxed (erasing its type), passed to the tracer, and then downcast back - /// to the expected type. If no tracer is set, the original future is returned. - /// - /// # Type Parameters - /// * `T` - The concrete output type of the future. - /// * `F` - The future type. - /// - /// # Parameters - /// * `future` - The future to potentially instrument. - pub fn trace_future(future: F) -> BoxFuture<'static, T> - where - F: Future + Send + 'static, - T: Send + 'static, - { - // Erase the future’s output type first: - let erased_future = async move { - let result = future.await; - Box::new(result) as Box - } - .boxed(); - - // Forward through the global tracer: - get_tracer() - .trace_future(erased_future) - // Downcast from `Box` back to `T`: - .map(|any_box| { - *any_box - .downcast::() - .expect("Tracer must preserve the future’s output type!") - }) - .boxed() - } - - /// Optionally instruments a blocking closure with custom tracing. - /// - /// If a tracer has been injected via `set_tracer`, the closure is wrapped so that - /// its return value is boxed (erasing its type), passed to the tracer, and then the - /// result is downcast back to the original type. If no tracer is set, the closure is - /// returned unmodified (except for being boxed). - /// - /// # Type Parameters - /// * `T` - The concrete return type of the closure. - /// * `F` - The closure type. - /// - /// # Parameters - /// * `f` - The blocking closure to potentially instrument. - pub fn trace_block(f: F) -> Box T + Send> - where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, - { - // Erase the closure’s return type first: - let erased_closure = Box::new(|| { - let result = f(); - Box::new(result) as Box - }); - - // Forward through the global tracer: - let traced_closure = get_tracer().trace_block(erased_closure); - - // Downcast from `Box` back to `T`: - Box::new(move || { - let any_box = traced_closure(); - *any_box - .downcast::() - .expect("Tracer must preserve the closure’s return type!") - }) - } -} - /// A wrapper around Tokio's JoinSet that forwards all API calls while optionally /// instrumenting spawned tasks and blocking closures with custom tracing behavior. /// If no tracer is injected via `trace_utils::set_tracer`, tasks and closures are executed @@ -211,7 +63,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - self.inner.spawn(trace_utils::trace_future(task)) + self.inner.spawn(trace_future(task)) } /// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime. @@ -221,7 +73,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - self.inner.spawn_on(trace_utils::trace_future(task), handle) + self.inner.spawn_on(trace_future(task), handle) } /// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task. @@ -249,7 +101,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - self.inner.spawn_blocking(trace_utils::trace_block(f)) + self.inner.spawn_blocking(trace_block(f)) } /// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime. @@ -259,8 +111,7 @@ impl JoinSet { F: Send + 'static, T: Send, { - self.inner - .spawn_blocking_on(trace_utils::trace_block(f), handle) + self.inner.spawn_blocking_on(trace_block(f), handle) } /// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task. diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index c3a1c57015c3..a5d74bddcada 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -20,8 +20,8 @@ pub mod common; mod join_set; +mod trace_utils; pub use common::SpawnedTask; -pub use join_set::trace_utils::set_join_set_tracer; -pub use join_set::trace_utils::JoinSetTracer; pub use join_set::JoinSet; +pub use trace_utils::{set_join_set_tracer, JoinSetTracer}; diff --git a/datafusion/common-runtime/src/trace_utils.rs b/datafusion/common-runtime/src/trace_utils.rs new file mode 100644 index 000000000000..c3a39c355fc8 --- /dev/null +++ b/datafusion/common-runtime/src/trace_utils.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::future::BoxFuture; +use futures::FutureExt; +use std::any::Any; +use std::error::Error; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::future::Future; +use tokio::sync::OnceCell; + +/// A trait for injecting instrumentation into either asynchronous futures or +/// blocking closures at runtime. +pub trait JoinSetTracer: Send + Sync + 'static { + /// Function pointer type for tracing a future. + /// + /// This function takes a boxed future (with its output type erased) + /// and returns a boxed future (with its output still erased). The + /// tracer must apply instrumentation without altering the output. + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box>; + + /// Function pointer type for tracing a blocking closure. + /// + /// This function takes a boxed closure (with its return type erased) + /// and returns a boxed closure (with its return type still erased). The + /// tracer must apply instrumentation without changing the return value. + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send>; +} + +/// A no-op tracer that does not modify or instrument any futures or closures. +/// This is used as a fallback if no custom tracer is set. +struct NoopTracer; + +impl JoinSetTracer for NoopTracer { + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box> { + fut + } + + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send> { + f + } +} + +/// A custom error type for tracer injection failures. +#[derive(Debug)] +pub enum JoinSetTracerError { + /// The global tracer has already been set. + AlreadySet, +} + +impl Display for JoinSetTracerError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + JoinSetTracerError::AlreadySet => { + write!(f, "The global JoinSetTracer is already set") + } + } + } +} + +impl Error for JoinSetTracerError {} + +/// Global storage for an injected tracer. If no tracer is injected, a no-op +/// tracer is used instead. This ensures that calls to [`trace_future`] or +/// [`trace_block`] never panic due to missing instrumentation. +static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new(); + +/// A no-op tracer singleton that is returned by [`get_tracer`] if no custom +/// tracer has been registered. +static NOOP_TRACER: NoopTracer = NoopTracer; + +/// Return the currently registered tracer, or the no-op tracer if none was +/// registered. +#[inline] +fn get_tracer() -> &'static dyn JoinSetTracer { + GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER) +} + +/// Set the custom tracer for both futures and blocking closures. +/// +/// This should be called once at startup. If called more than once, an +/// `Err(JoinSetTracerError)` is returned. If not called at all, a no-op tracer that does nothing +/// is used. +pub fn set_join_set_tracer( + tracer: &'static dyn JoinSetTracer, +) -> Result<(), JoinSetTracerError> { + GLOBAL_TRACER + .set(tracer) + .map_err(|_set_err| JoinSetTracerError::AlreadySet) +} + +/// Optionally instruments a future with custom tracing. +/// +/// If a tracer has been injected via `set_tracer`, the future's output is +/// boxed (erasing its type), passed to the tracer, and then downcast back +/// to the expected type. If no tracer is set, the original future is returned. +/// +/// # Type Parameters +/// * `T` - The concrete output type of the future. +/// * `F` - The future type. +/// +/// # Parameters +/// * `future` - The future to potentially instrument. +pub fn trace_future(future: F) -> BoxFuture<'static, T> +where + F: Future + Send + 'static, + T: Send + 'static, +{ + // Erase the future’s output type first: + let erased_future = async move { + let result = future.await; + Box::new(result) as Box + } + .boxed(); + + // Forward through the global tracer: + get_tracer() + .trace_future(erased_future) + // Downcast from `Box` back to `T`: + .map(|any_box| { + *any_box + .downcast::() + .expect("Tracer must preserve the future’s output type!") + }) + .boxed() +} + +/// Optionally instruments a blocking closure with custom tracing. +/// +/// If a tracer has been injected via `set_tracer`, the closure is wrapped so that +/// its return value is boxed (erasing its type), passed to the tracer, and then the +/// result is downcast back to the original type. If no tracer is set, the closure is +/// returned unmodified (except for being boxed). +/// +/// # Type Parameters +/// * `T` - The concrete return type of the closure. +/// * `F` - The closure type. +/// +/// # Parameters +/// * `f` - The blocking closure to potentially instrument. +pub fn trace_block(f: F) -> Box T + Send> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + // Erase the closure’s return type first: + let erased_closure = Box::new(|| { + let result = f(); + Box::new(result) as Box + }); + + // Forward through the global tracer: + let traced_closure = get_tracer().trace_block(erased_closure); + + // Downcast from `Box` back to `T`: + Box::new(move || { + let any_box = traced_closure(); + *any_box + .downcast::() + .expect("Tracer must preserve the closure’s return type!") + }) +}