-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Is your feature request related to a problem or challenge?
There's one assumption that seem to be true for most PhysicalPlan implementations, but not for RepartitionExec: calling RepartitionExec.execute() does not immediately call RepartitionExec.input.execute(), instead, it's lazy called when the returned arrow stream is first polled.
The rationale of why this is useful can be very well explained with the use case that I'm dealing with currently:
I have some custom leaf PhysicalPlan implementation that performs a API call (let's call it MyApiExec), and I would like to eagerly call the API and pre-fetch some data before the overall arrow stream is first polled. For that, I'd like to tokio::spawn a task when DataFusion calls MyApiExec::execute that will run in the background pre-fetching data from the API even if the stream is not being polled.
My expectation would be that .execute() is propagated immediately across the execution graph:
However, I found that the current implementation of the RepartitionExec node shipped in #10014 is delaying the .execute() calls to the children until the first message in the arrow stream is polled, which implies that the pre-fetching logic in MyApiExec is executed lazily upon the first message poll rather than eagerly before any poll:
Describe the solution you'd like
Instead of lazily calling input.execute() inside RepartitionExec here:
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 598 to 616 in 185a02d
| let stream = futures::stream::once(async move { | |
| let num_input_partitions = input.output_partitioning().partition_count(); | |
| let input_captured = Arc::clone(&input); | |
| let metrics_captured = metrics.clone(); | |
| let name_captured = name.clone(); | |
| let context_captured = Arc::clone(&context); | |
| let state = lazy_state | |
| .get_or_init(|| async move { | |
| Mutex::new(RepartitionExecState::new( | |
| input_captured, | |
| partitioning, | |
| metrics_captured, | |
| preserve_order, | |
| name_captured, | |
| context_captured, | |
| )) | |
| }) |
Call it immediately without any async gap so that further .execute() calls to the children happen before any message is polled.
Describe alternatives you've considered
Spawning the pre-fetching task in MyApiExec upon creating the node at planning time, but it feels counter intuitive and error prone.
Additional context
No response

