Skip to content

Commit d7483b5

Browse files
huaxingaodongjoon-hyun
authored andcommitted
[SPARK-40429][SQL][3.3] Only set KeyGroupedPartitioning when the referenced column is in the output
### What changes were proposed in this pull request? back porting [PR](#37886) to 3.3. Only set `KeyGroupedPartitioning` when the referenced column is in the output ### Why are the changes needed? bug fixing ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test Closes #37901 from huaxingao/3.3. Authored-by: huaxingao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d8e157d commit d7483b5

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,18 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
3939
}
4040

4141
val catalystPartitioning = scan.outputPartitioning() match {
42-
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
43-
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
42+
case kgp: KeyGroupedPartitioning =>
43+
val partitioning = sequenceToOption(kgp.keys().map(
44+
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
45+
if (partitioning.isEmpty) {
46+
None
47+
} else {
48+
if (partitioning.get.forall(p => p.references.subsetOf(d.outputSet))) {
49+
partitioning
50+
} else {
51+
None
52+
}
53+
}
4454
case _: UnknownPartitioning => None
4555
case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
4656
"type: " + p.getClass.getSimpleName)

sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
216216
.withColumn("right_all", struct($"right.*"))
217217
checkAnswer(dfQuery, Row(1, "a", "b", Row(1, "a"), Row(1, "b")))
218218
}
219+
220+
test("SPARK-40429: Only set KeyGroupedPartitioning when the referenced column is in the output") {
221+
withTable(tbl) {
222+
sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
223+
sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
224+
checkAnswer(
225+
spark.table(tbl).select("index", "_partition"),
226+
Seq(Row(0, "3"), Row(0, "2"), Row(0, "1"))
227+
)
228+
229+
checkAnswer(
230+
spark.table(tbl).select("id", "index", "_partition"),
231+
Seq(Row(3, 0, "3"), Row(2, 0, "2"), Row(1, 0, "1"))
232+
)
233+
}
234+
}
219235
}

0 commit comments

Comments
 (0)