-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
The Repartition optimisation pass fails to account for order sensitive operations, which causes it to repartition a sorted partition. As there isn't (yet) an order preserving merge operator (#362) this results in an incorrect final physical plan.
To Reproduce
For example,
LogicalPlanBuilder::scan_csv(&path, options, None)?
.sort(vec![col("c4").sort(true, true)])?
.filter(col("c2").eq(lit(1)))?
.limit(10)?
.build()?;
Results in
GlobalLimitExec {
input: MergeExec {
input: CoalesceBatchesExec {
input: FilterExec {
predicate: BinaryExpr {
left: Column {
name: "c2",
},
op: Eq,
right: TryCastExpr {
expr: Literal {
value: Int32(1),
},
cast_type: Int64,
},
},
input: RepartitionExec {
input: SortExec {
input: CsvExec {
source: PartitionedFiles {
path: "/home/raphael/repos/external/arrow-datafusion/testing/data/csv/aggregate_test_100.csv",
filenames: [
"/home/raphael/repos/external/arrow-datafusion/testing/data/csv/aggregate_test_100.csv",
],
},
...
},
expr: [
PhysicalSortExpr {
expr: Column {
name: "c4",
},
options: SortOptions {
descending: false,
nulls_first: true,
},
},
],
...
},
...
},
},
target_batch_size: 4096,
},
},
limit: 10,
}
This is incorrect, as the MergeExec does not preserve ordering and therefore the limit will return an arbitrary set of rows
Expected behavior
I would expect the Repartition pass to not introduce partitioning on paths with operators that are order sensitive. Potentially once there is an order preserving merge, this restriction could be relaxed provided the inserted merge is order preserving, but until then this optimisation is incorrect.
I will shortly be creating a ticket with some proposals on how we could make DataFusion sort order aware
Additional context
This issue was spawned out of investigation on #378 (comment)