diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a5cf71426607..9094dae5918d 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1339,6 +1339,7 @@ dependencies = [ name = "datafusion-common-runtime" version = "45.0.0" dependencies = [ + "futures", "log", "tokio", ] @@ -3702,6 +3703,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3955,6 +3965,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "thrift" version = "0.17.0" @@ -4165,6 +4185,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -4278,6 +4324,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "version_check" version = "0.9.5" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index d90ec3333cb9..0890de3a577a 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -73,6 +73,8 @@ tempfile = { workspace = true } test-utils = { path = "../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.12.1" +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3" } url = { workspace = true } uuid = "1.7" diff --git a/datafusion-examples/examples/tracing.rs b/datafusion-examples/examples/tracing.rs new file mode 100644 index 000000000000..334ee0f4e568 --- /dev/null +++ b/datafusion-examples/examples/tracing.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates the tracing injection feature for the DataFusion runtime. +//! Tasks spawned on new threads behave differently depending on whether a tracer is injected. +//! The log output clearly distinguishes the two cases. +//! +//! # Expected Log Output +//! +//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads +//! will _not_ include the `run_instrumented_query` span: +//! +//! ```text +//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER ***** +//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms +//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span ***** +//! ``` +//! +//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span: +//! +//! ```text +//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer... +//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER ***** +//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution +//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col" +//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream +//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned +//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span ***** +//! ``` + +use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer}; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::prelude::*; +use datafusion::test_util::parquet_test_data; +use futures::future::BoxFuture; +use futures::FutureExt; +use std::any::Any; +use std::sync::Arc; +use tracing::{info, instrument, Instrument, Level, Span}; + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing subscriber with thread info. + tracing_subscriber::fmt() + .with_thread_ids(true) + .with_thread_names(true) + .with_max_level(Level::DEBUG) + .init(); + + // Run query WITHOUT tracer injection. + info!("***** RUNNING WITHOUT INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****"); + + // Inject custom tracer so tasks run in the current span. + info!("Injecting custom tracer..."); + set_join_set_tracer(&SpanTracer).expect("Failed to set tracer"); + + // Run query WITH tracer injection. + info!("***** RUNNING WITH INJECTED TRACER *****"); + run_instrumented_query().await?; + info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****"); + + Ok(()) +} + +/// A simple tracer that ensures any spawned task or blocking closure +/// inherits the current span via `in_current_span`. +struct SpanTracer; + +/// Implement the `JoinSetTracer` trait so we can inject instrumentation +/// for both async futures and blocking closures. +impl JoinSetTracer for SpanTracer { + /// Instruments a boxed future to run in the current span. The future's + /// return type is erased to `Box`, which we simply + /// run inside the `Span::current()` context. + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box> { + fut.in_current_span().boxed() + } + + /// Instruments a boxed blocking closure by running it inside the + /// `Span::current()` context. + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send> { + let span = Span::current(); + Box::new(move || span.in_scope(f)) + } +} + +#[instrument(level = "info")] +async fn run_instrumented_query() -> Result<()> { + info!("Starting query execution"); + + let ctx = SessionContext::new(); + let test_data = parquet_test_data(); + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension("alltypes_tiny_pages_plain.parquet"); + + let table_path = format!("file://{test_data}/"); + info!("Registering table 'alltypes' from {}", table_path); + ctx.register_listing_table("alltypes", &table_path, listing_options, None, None) + .await + .expect("Failed to register table"); + + let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"; + info!(sql, "Executing SQL query"); + let result = ctx.sql(sql).await?.collect().await?; + info!("Query complete: {} batches returned", result.len()); + Ok(()) +} diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index a21c72cd9f83..57ef24866bca 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -36,6 +36,7 @@ name = "datafusion_common_runtime" path = "src/lib.rs" [dependencies] +futures = { workspace = true } log = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/common-runtime/src/common.rs b/datafusion/common-runtime/src/common.rs index 30f7526bc0b2..361f6af95cf1 100644 --- a/datafusion/common-runtime/src/common.rs +++ b/datafusion/common-runtime/src/common.rs @@ -17,7 +17,8 @@ use std::future::Future; -use tokio::task::{JoinError, JoinSet}; +use crate::JoinSet; +use tokio::task::JoinError; /// Helper that provides a simple API to spawn a single task and join it. /// Provides guarantees of aborting on `Drop` to keep it cancel-safe. diff --git a/datafusion/common-runtime/src/join_set.rs b/datafusion/common-runtime/src/join_set.rs new file mode 100644 index 000000000000..1857a4111dbc --- /dev/null +++ b/datafusion/common-runtime/src/join_set.rs @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::trace_utils::{trace_block, trace_future}; +use std::future::Future; +use std::task::{Context, Poll}; +use tokio::runtime::Handle; +use tokio::task::{AbortHandle, Id, JoinError, LocalSet}; + +/// A wrapper around Tokio's JoinSet that forwards all API calls while optionally +/// instrumenting spawned tasks and blocking closures with custom tracing behavior. +/// If no tracer is injected via `trace_utils::set_tracer`, tasks and closures are executed +/// without any instrumentation. +#[derive(Debug)] +pub struct JoinSet { + inner: tokio::task::JoinSet, +} + +impl Default for JoinSet { + fn default() -> Self { + Self::new() + } +} + +impl JoinSet { + /// [JoinSet::new](tokio::task::JoinSet::new) - Create a new JoinSet. + pub fn new() -> Self { + Self { + inner: tokio::task::JoinSet::new(), + } + } + + /// [JoinSet::len](tokio::task::JoinSet::len) - Return the number of tasks. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// [JoinSet::is_empty](tokio::task::JoinSet::is_empty) - Check if the JoinSet is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +impl JoinSet { + /// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task. + pub fn spawn(&mut self, task: F) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.inner.spawn(trace_future(task)) + } + + /// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime. + pub fn spawn_on(&mut self, task: F, handle: &Handle) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.inner.spawn_on(trace_future(task), handle) + } + + /// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task. + pub fn spawn_local(&mut self, task: F) -> AbortHandle + where + F: Future, + F: 'static, + { + self.inner.spawn_local(task) + } + + /// [JoinSet::spawn_local_on](tokio::task::JoinSet::spawn_local_on) - Spawn a local task on a provided LocalSet. + pub fn spawn_local_on(&mut self, task: F, local_set: &LocalSet) -> AbortHandle + where + F: Future, + F: 'static, + { + self.inner.spawn_local_on(task, local_set) + } + + /// [JoinSet::spawn_blocking](tokio::task::JoinSet::spawn_blocking) - Spawn a blocking task. + pub fn spawn_blocking(&mut self, f: F) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + self.inner.spawn_blocking(trace_block(f)) + } + + /// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime. + pub fn spawn_blocking_on(&mut self, f: F, handle: &Handle) -> AbortHandle + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send, + { + self.inner.spawn_blocking_on(trace_block(f), handle) + } + + /// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task. + pub async fn join_next(&mut self) -> Option> { + self.inner.join_next().await + } + + /// [JoinSet::try_join_next](tokio::task::JoinSet::try_join_next) - Try to join the next completed task. + pub fn try_join_next(&mut self) -> Option> { + self.inner.try_join_next() + } + + /// [JoinSet::abort_all](tokio::task::JoinSet::abort_all) - Abort all tasks. + pub fn abort_all(&mut self) { + self.inner.abort_all() + } + + /// [JoinSet::detach_all](tokio::task::JoinSet::detach_all) - Detach all tasks. + pub fn detach_all(&mut self) { + self.inner.detach_all() + } + + /// [JoinSet::poll_join_next](tokio::task::JoinSet::poll_join_next) - Poll for the next completed task. + pub fn poll_join_next( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.inner.poll_join_next(cx) + } + + /// [JoinSet::join_next_with_id](tokio::task::JoinSet::join_next_with_id) - Await the next completed task with its ID. + pub async fn join_next_with_id(&mut self) -> Option> { + self.inner.join_next_with_id().await + } + + /// [JoinSet::try_join_next_with_id](tokio::task::JoinSet::try_join_next_with_id) - Try to join the next completed task with its ID. + pub fn try_join_next_with_id(&mut self) -> Option> { + self.inner.try_join_next_with_id() + } + + /// [JoinSet::poll_join_next_with_id](tokio::task::JoinSet::poll_join_next_with_id) - Poll for the next completed task with its ID. + pub fn poll_join_next_with_id( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.inner.poll_join_next_with_id(cx) + } + + /// [JoinSet::shutdown](tokio::task::JoinSet::shutdown) - Abort all tasks and wait for shutdown. + pub async fn shutdown(&mut self) { + self.inner.shutdown().await + } + + /// [JoinSet::join_all](tokio::task::JoinSet::join_all) - Await all tasks. + pub async fn join_all(self) -> Vec { + self.inner.join_all().await + } +} diff --git a/datafusion/common-runtime/src/lib.rs b/datafusion/common-runtime/src/lib.rs index 51cb988ea06a..a5d74bddcada 100644 --- a/datafusion/common-runtime/src/lib.rs +++ b/datafusion/common-runtime/src/lib.rs @@ -19,5 +19,9 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod common; +mod join_set; +mod trace_utils; pub use common::SpawnedTask; +pub use join_set::JoinSet; +pub use trace_utils::{set_join_set_tracer, JoinSetTracer}; diff --git a/datafusion/common-runtime/src/trace_utils.rs b/datafusion/common-runtime/src/trace_utils.rs new file mode 100644 index 000000000000..c3a39c355fc8 --- /dev/null +++ b/datafusion/common-runtime/src/trace_utils.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::future::BoxFuture; +use futures::FutureExt; +use std::any::Any; +use std::error::Error; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::future::Future; +use tokio::sync::OnceCell; + +/// A trait for injecting instrumentation into either asynchronous futures or +/// blocking closures at runtime. +pub trait JoinSetTracer: Send + Sync + 'static { + /// Function pointer type for tracing a future. + /// + /// This function takes a boxed future (with its output type erased) + /// and returns a boxed future (with its output still erased). The + /// tracer must apply instrumentation without altering the output. + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box>; + + /// Function pointer type for tracing a blocking closure. + /// + /// This function takes a boxed closure (with its return type erased) + /// and returns a boxed closure (with its return type still erased). The + /// tracer must apply instrumentation without changing the return value. + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send>; +} + +/// A no-op tracer that does not modify or instrument any futures or closures. +/// This is used as a fallback if no custom tracer is set. +struct NoopTracer; + +impl JoinSetTracer for NoopTracer { + fn trace_future( + &self, + fut: BoxFuture<'static, Box>, + ) -> BoxFuture<'static, Box> { + fut + } + + fn trace_block( + &self, + f: Box Box + Send>, + ) -> Box Box + Send> { + f + } +} + +/// A custom error type for tracer injection failures. +#[derive(Debug)] +pub enum JoinSetTracerError { + /// The global tracer has already been set. + AlreadySet, +} + +impl Display for JoinSetTracerError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + JoinSetTracerError::AlreadySet => { + write!(f, "The global JoinSetTracer is already set") + } + } + } +} + +impl Error for JoinSetTracerError {} + +/// Global storage for an injected tracer. If no tracer is injected, a no-op +/// tracer is used instead. This ensures that calls to [`trace_future`] or +/// [`trace_block`] never panic due to missing instrumentation. +static GLOBAL_TRACER: OnceCell<&'static dyn JoinSetTracer> = OnceCell::const_new(); + +/// A no-op tracer singleton that is returned by [`get_tracer`] if no custom +/// tracer has been registered. +static NOOP_TRACER: NoopTracer = NoopTracer; + +/// Return the currently registered tracer, or the no-op tracer if none was +/// registered. +#[inline] +fn get_tracer() -> &'static dyn JoinSetTracer { + GLOBAL_TRACER.get().copied().unwrap_or(&NOOP_TRACER) +} + +/// Set the custom tracer for both futures and blocking closures. +/// +/// This should be called once at startup. If called more than once, an +/// `Err(JoinSetTracerError)` is returned. If not called at all, a no-op tracer that does nothing +/// is used. +pub fn set_join_set_tracer( + tracer: &'static dyn JoinSetTracer, +) -> Result<(), JoinSetTracerError> { + GLOBAL_TRACER + .set(tracer) + .map_err(|_set_err| JoinSetTracerError::AlreadySet) +} + +/// Optionally instruments a future with custom tracing. +/// +/// If a tracer has been injected via `set_tracer`, the future's output is +/// boxed (erasing its type), passed to the tracer, and then downcast back +/// to the expected type. If no tracer is set, the original future is returned. +/// +/// # Type Parameters +/// * `T` - The concrete output type of the future. +/// * `F` - The future type. +/// +/// # Parameters +/// * `future` - The future to potentially instrument. +pub fn trace_future(future: F) -> BoxFuture<'static, T> +where + F: Future + Send + 'static, + T: Send + 'static, +{ + // Erase the future’s output type first: + let erased_future = async move { + let result = future.await; + Box::new(result) as Box + } + .boxed(); + + // Forward through the global tracer: + get_tracer() + .trace_future(erased_future) + // Downcast from `Box` back to `T`: + .map(|any_box| { + *any_box + .downcast::() + .expect("Tracer must preserve the future’s output type!") + }) + .boxed() +} + +/// Optionally instruments a blocking closure with custom tracing. +/// +/// If a tracer has been injected via `set_tracer`, the closure is wrapped so that +/// its return value is boxed (erasing its type), passed to the tracer, and then the +/// result is downcast back to the original type. If no tracer is set, the closure is +/// returned unmodified (except for being boxed). +/// +/// # Type Parameters +/// * `T` - The concrete return type of the closure. +/// * `F` - The closure type. +/// +/// # Parameters +/// * `f` - The blocking closure to potentially instrument. +pub fn trace_block(f: F) -> Box T + Send> +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + // Erase the closure’s return type first: + let erased_closure = Box::new(|| { + let result = f(); + Box::new(result) as Box + }); + + // Forward through the global tracer: + let traced_closure = get_tracer().trace_block(erased_closure); + + // Downcast from `Box` back to `T`: + Box::new(move || { + let any_box = traced_closure(); + *any_box + .downcast::() + .expect("Tracer must preserve the closure’s return type!") + }) +} diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 1d9827ae0ab5..d6ca40b4ef15 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -47,7 +47,7 @@ use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, }; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr::PhysicalExpr; @@ -60,7 +60,6 @@ use futures::stream::BoxStream; use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4c7169764a76..9ad0620f57ae 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -56,7 +56,7 @@ use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, DEFAULT_PARQUET_EXTENSION, }; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; @@ -87,7 +87,6 @@ use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::task::JoinSet; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 7a271def7dc6..c8e167ee44e9 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -28,7 +28,7 @@ use crate::error::Result; use arrow_array::RecordBatch; use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::TaskContext; use bytes::Bytes; @@ -36,7 +36,6 @@ use futures::join; use object_store::ObjectStore; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; -use tokio::task::JoinSet; type WriterType = Box; type SerializerType = Arc; diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 31239ed332ae..da6801398ff2 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_catalog::Session; use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; use datafusion_expr::SortExpr; @@ -48,7 +49,6 @@ use futures::StreamExt; use log::debug; use parking_lot::Mutex; use tokio::sync::RwLock; -use tokio::task::JoinSet; /// Type alias for partition data pub type PartitionData = Arc>>; diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 55971f6f627c..26f883be8df5 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -41,6 +41,7 @@ use arrow::csv; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::Constraints; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; @@ -52,7 +53,6 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Execution plan for scanning a CSV file. /// diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 7ac062e549c4..8280b4c0014c 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -40,6 +40,7 @@ use crate::physical_plan::{ use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_common::Constraints; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -48,7 +49,6 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; /// Execution plan for scanning NdJson data source #[derive(Debug, Clone)] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs index 00926dc2330b..8fb46092e519 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs @@ -17,6 +17,7 @@ use crate::datasource::listing::ListingTableUrl; use datafusion_common::DataFusionError; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use futures::StreamExt; @@ -25,7 +26,6 @@ use object_store::path::Path; use parquet::arrow::AsyncArrowWriter; use parquet::file::properties::WriterProperties; use std::sync::Arc; -use tokio::task::JoinSet; /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index bcd88bae739a..c54fedbaadac 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -37,6 +37,7 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common_runtime::JoinSet; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; @@ -51,7 +52,6 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; use rand::rngs::StdRng; use rand::{thread_rng, Rng, SeedableRng}; use std::str; -use tokio::task::JoinSet; // ======================================================================== // The new aggregation fuzz tests based on [`AggregationFuzzer`] diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index d021e73f35b2..6c983f4c9db6 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use arrow::util::pretty::pretty_format_batches; use arrow_array::RecordBatch; use datafusion_common::{DataFusionError, Result}; +use datafusion_common_runtime::JoinSet; use rand::{thread_rng, Rng}; -use tokio::task::JoinSet; use crate::fuzz_cases::aggregation_fuzzer::{ check_equality_of_batches, diff --git a/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs b/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs index 64b858cebc84..ac2c74ee0af0 100644 --- a/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs @@ -24,8 +24,8 @@ use arrow_array::{Array, OffsetSizeTrait}; use arrow_array::cast::AsArray; use datafusion::datasource::MemTable; +use datafusion_common_runtime::JoinSet; use std::collections::HashSet; -use tokio::task::JoinSet; use datafusion::prelude::{SessionConfig, SessionContext}; use test_utils::StringBatchGenerator; diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a54b46111f53..d68dafb9b13a 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -47,12 +47,12 @@ use arrow::record_batch::RecordBatch; use arrow_array::Array; use datafusion_common::config::ConfigOptions; use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::{StreamExt, TryStreamExt}; -use tokio::task::JoinSet; /// Represent nodes in the DataFusion Physical Plan. /// @@ -265,7 +265,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// for structures to help ensure all background tasks are cancelled. /// /// [`spawn`]: tokio::task::spawn - /// [`JoinSet`]: tokio::task::JoinSet + /// [`JoinSet`]: datafusion_common_runtime::JoinSet /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder /// diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 8d180c212eba..709bb92f0caf 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1067,10 +1067,9 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_string_array; use datafusion_common::{arrow_datafusion_err, assert_batches_sorted_eq, exec_err}; + use datafusion_common_runtime::JoinSet; use datafusion_execution::runtime_env::RuntimeEnvBuilder; - use tokio::task::JoinSet; - #[tokio::test] async fn one_to_many_round_robin() -> Result<()> { // define input partitions diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 331cded165a8..223080945cd0 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -28,6 +28,7 @@ use crate::displayable; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{internal_err, Result}; +use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use futures::stream::BoxStream; @@ -35,7 +36,6 @@ use futures::{Future, Stream, StreamExt}; use log::debug; use pin_project_lite::pin_project; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::task::JoinSet; /// Creates a stream from a collection of producing tasks, routing panics to the stream. ///