Skip to content

Commit 2858e34

Browse files
committed
Do not repartition sorted inputs SortPreservingMerge
1 parent 09c67d5 commit 2858e34

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

datafusion/src/physical_optimizer/repartition.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,12 @@ mod tests {
109109

110110
use super::*;
111111
use crate::datasource::PartitionedFile;
112-
use crate::physical_plan::expressions::col;
112+
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
113113
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
114114
use crate::physical_plan::filter::FilterExec;
115115
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
116116
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
117+
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
117118
use crate::physical_plan::union::UnionExec;
118119
use crate::physical_plan::{displayable, Statistics};
119120
use crate::test::object_store::TestObjectStore;
@@ -137,6 +138,17 @@ mod tests {
137138
))
138139
}
139140

141+
fn sort_preserving_merge_exec(
142+
input: Arc<dyn ExecutionPlan>,
143+
) -> Arc<dyn ExecutionPlan> {
144+
let expr = vec![PhysicalSortExpr {
145+
expr: col("c1", &schema()).unwrap(),
146+
options: arrow::compute::SortOptions::default(),
147+
}];
148+
149+
Arc::new(SortPreservingMergeExec::new(expr, input))
150+
}
151+
140152
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
141153
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
142154
}
@@ -276,4 +288,25 @@ mod tests {
276288
assert_eq!(&trim_plan_display(&plan), &expected);
277289
Ok(())
278290
}
291+
292+
#[test]
293+
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
294+
let optimizer = Repartition {};
295+
296+
let optimized = optimizer.optimize(
297+
sort_preserving_merge_exec(parquet_exec()),
298+
&ExecutionConfig::new().with_target_partitions(5),
299+
)?;
300+
301+
let plan = displayable(optimized.as_ref()).indent().to_string();
302+
303+
let expected = &[
304+
"SortPreservingMergeExec: [c1@0 ASC]",
305+
// Expect no repartition of SortPreservingMergeExec
306+
"ParquetExec: limit=None, partitions=[x]",
307+
];
308+
309+
assert_eq!(&trim_plan_display(&plan), &expected);
310+
Ok(())
311+
}
279312
}

datafusion/src/physical_plan/sorts/sort_preserving_merge.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec {
128128
Distribution::UnspecifiedDistribution
129129
}
130130

131+
fn should_repartition_children(&self) -> bool {
132+
// if the children are repartitioned they may no longer remain
133+
// sorted
134+
false
135+
}
136+
131137
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
132138
vec![self.input.clone()]
133139
}

0 commit comments

Comments
 (0)