From ee9ccf6d8edd00b0dd2d1a301fe607b83c741bfe Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Thu, 31 Jul 2025 15:17:05 +0200 Subject: [PATCH 1/2] #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size --- datafusion/physical-plan/src/coop.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index be0afa07eac2..89d5ba6f4da9 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -254,7 +254,7 @@ impl ExecutionPlan for CooperativeExec { } fn maintains_input_order(&self) -> Vec { - self.input.maintains_input_order() + vec![true; self.children().len()] } fn children(&self) -> Vec<&Arc> { From 74b153df4024337c48324eb041588d3eff8df20d Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Fri, 1 Aug 2025 10:52:40 +0200 Subject: [PATCH 2/2] #16994 Extend default ExecutionPlan invariant checks Add checks that verify the length of the vectors returned by methods that need to return a value per child. --- .../physical-plan/src/execution_plan.rs | 40 +++++++++++++++++-- datafusion/physical-plan/src/union.rs | 7 +++- datafusion/physical-plan/src/work_table.rs | 8 ---- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6d51bf195dc6..3ed85b9267e3 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -48,7 +48,7 @@ use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::{exec_err, Constraints, Result}; +use datafusion_common::{exec_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; @@ -118,10 +118,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. /// - /// A default set of invariants is provided in the default implementation. + /// A default set of invariants is provided in the [check_default_invariants] function. + /// The default implementation of `check_invariants` calls this function. /// Extension nodes can provide their own invariants. - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { - Ok(()) + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check) } /// Specifies the data distribution requirements for all the @@ -1045,6 +1046,37 @@ impl PlanProperties { } } +macro_rules! check_len { + ($target:expr, $func_name:ident, $expected_len:expr) => { + let actual_len = $target.$func_name().len(); + if actual_len != $expected_len { + return internal_err!( + "{}::{} returned Vec with incorrect size: {} != {}", + $target.name(), + stringify!($func_name), + actual_len, + $expected_len + ); + } + }; +} + +/// Checks a set of invariants that apply to all ExecutionPlan implementations. +/// Returns an error if the given node does not conform. +pub fn check_default_invariants( + plan: &P, + _check: InvariantLevel, +) -> Result<(), DataFusionError> { + let children_len = plan.children().len(); + + check_len!(plan, maintains_input_order, children_len); + check_len!(plan, required_input_ordering, children_len); + check_len!(plan, required_input_distribution, children_len); + check_len!(plan, benefits_from_input_partitioning, children_len); + + Ok(()) +} + /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. /// Currently, there are 3 kinds of execution plan which needs data exchange diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 73d7933e7c05..aca03c57b1b4 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -33,7 +33,8 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution_plan::{ - boundedness_from_children, emission_type_from_children, InvariantLevel, + boundedness_from_children, check_default_invariants, emission_type_from_children, + InvariantLevel, }; use crate::metrics::BaselineMetrics; use crate::projection::{make_with_child, ProjectionExec}; @@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec { &self.cache } - fn check_invariants(&self, _check: InvariantLevel) -> Result<()> { + fn check_invariants(&self, check: InvariantLevel) -> Result<()> { + check_default_invariants(self, check)?; + (self.inputs().len() >= 2) .then_some(()) .ok_or(DataFusionError::Internal( diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 076e30ab902d..40a22f94b81f 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec { &self.cache } - fn maintains_input_order(&self) -> Vec { - vec![false] - } - - fn benefits_from_input_partitioning(&self) -> Vec { - vec![false] - } - fn children(&self) -> Vec<&Arc> { vec![] }