Skip to content

Commit f6ec4c3

Browse files
authored
#16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size (#16995) (#17068)
* #16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size * #16994 Extend default ExecutionPlan invariant checks Add checks that verify the length of the vectors returned by methods that need to return a value per child. (cherry picked from commit 2968331)
1 parent 9cfb9cd commit f6ec4c3

File tree

4 files changed

+42
-15
lines changed

4 files changed

+42
-15
lines changed

datafusion/physical-plan/src/coop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ impl ExecutionPlan for CooperativeExec {
254254
}
255255

256256
fn maintains_input_order(&self) -> Vec<bool> {
257-
self.input.maintains_input_order()
257+
vec![true; self.children().len()]
258258
}
259259

260260
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::stream::RecordBatchStreamAdapter;
4848
use arrow::array::{Array, RecordBatch};
4949
use arrow::datatypes::SchemaRef;
5050
use datafusion_common::config::ConfigOptions;
51-
use datafusion_common::{exec_err, Constraints, Result};
51+
use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
5252
use datafusion_common_runtime::JoinSet;
5353
use datafusion_execution::TaskContext;
5454
use datafusion_physical_expr::EquivalenceProperties;
@@ -118,10 +118,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
118118
/// Returns an error if this individual node does not conform to its invariants.
119119
/// These invariants are typically only checked in debug mode.
120120
///
121-
/// A default set of invariants is provided in the default implementation.
121+
/// A default set of invariants is provided in the [check_default_invariants] function.
122+
/// The default implementation of `check_invariants` calls this function.
122123
/// Extension nodes can provide their own invariants.
123-
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
124-
Ok(())
124+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
125+
check_default_invariants(self, check)
125126
}
126127

127128
/// Specifies the data distribution requirements for all the
@@ -1045,6 +1046,37 @@ impl PlanProperties {
10451046
}
10461047
}
10471048

1049+
macro_rules! check_len {
1050+
($target:expr, $func_name:ident, $expected_len:expr) => {
1051+
let actual_len = $target.$func_name().len();
1052+
if actual_len != $expected_len {
1053+
return internal_err!(
1054+
"{}::{} returned Vec with incorrect size: {} != {}",
1055+
$target.name(),
1056+
stringify!($func_name),
1057+
actual_len,
1058+
$expected_len
1059+
);
1060+
}
1061+
};
1062+
}
1063+
1064+
/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1065+
/// Returns an error if the given node does not conform.
1066+
pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1067+
plan: &P,
1068+
_check: InvariantLevel,
1069+
) -> Result<(), DataFusionError> {
1070+
let children_len = plan.children().len();
1071+
1072+
check_len!(plan, maintains_input_order, children_len);
1073+
check_len!(plan, required_input_ordering, children_len);
1074+
check_len!(plan, required_input_distribution, children_len);
1075+
check_len!(plan, benefits_from_input_partitioning, children_len);
1076+
1077+
Ok(())
1078+
}
1079+
10481080
/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
10491081
/// especially for the distributed engine to judge whether need to deal with shuffling.
10501082
/// Currently, there are 3 kinds of execution plan which needs data exchange

datafusion/physical-plan/src/union.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use super::{
3333
SendableRecordBatchStream, Statistics,
3434
};
3535
use crate::execution_plan::{
36-
boundedness_from_children, emission_type_from_children, InvariantLevel,
36+
boundedness_from_children, check_default_invariants, emission_type_from_children,
37+
InvariantLevel,
3738
};
3839
use crate::metrics::BaselineMetrics;
3940
use crate::projection::{make_with_child, ProjectionExec};
@@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec {
176177
&self.cache
177178
}
178179

179-
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
180+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
181+
check_default_invariants(self, check)?;
182+
180183
(self.inputs().len() >= 2)
181184
.then_some(())
182185
.ok_or(DataFusionError::Internal(

datafusion/physical-plan/src/work_table.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec {
174174
&self.cache
175175
}
176176

177-
fn maintains_input_order(&self) -> Vec<bool> {
178-
vec![false]
179-
}
180-
181-
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
182-
vec![false]
183-
}
184-
185177
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
186178
vec![]
187179
}

0 commit comments

Comments
 (0)