Skip to content

Commit 88c3813

Browse files
harshmotw-dbhvanhovell
authored andcommitted
[SPARK-50503][SQL] Prohibit partitioning by Variant data
### What changes were proposed in this pull request? Prior to this PR, repartition by Variant producing expressions wasn't blocked during analysis. It should be blocked because Variant equality is not defined. It is similar to [this PR](#48909) which blocked Variant from Set operations. ### Why are the changes needed? Variant equality is not defined yet and therefore shouldn't be allowed in repartitioning. ### Does this PR introduce _any_ user-facing change? Yes, prior to this PR, Variants repartition did not throw a well defined error. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49080 from harshmotw-db/harsh-motwani_data/variant_repartition. Authored-by: Harsh Motwani <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent 88102d3 commit 88c3813

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5309,6 +5309,11 @@
53095309
"Parameter markers are not allowed in <statement>."
53105310
]
53115311
},
5312+
"PARTITION_BY_VARIANT" : {
5313+
"message" : [
5314+
"Cannot use VARIANT producing expressions to partition a DataFrame, but the type of expression <expr> is <dataType>."
5315+
]
5316+
},
53125317
"PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED" : {
53135318
"message" : [
53145319
"Invalid partitioning: <cols> is missing or is in a map or array."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
9696
case _ => None
9797
}
9898

99+
protected def variantExprInPartitionExpression(plan: LogicalPlan): Option[Expression] =
100+
plan match {
101+
case r: RepartitionByExpression =>
102+
r.partitionExpressions.find(e => hasVariantType(e.dataType))
103+
case _ => None
104+
}
105+
99106
private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = {
100107
limitExpr match {
101108
case e if !e.foldable => limitExpr.failAnalysis(
@@ -853,6 +860,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
853860
"colName" -> toSQLId(variantCol.name),
854861
"dataType" -> toSQLType(variantCol.dataType)))
855862

863+
case o if variantExprInPartitionExpression(o).isDefined =>
864+
val variantExpr = variantExprInPartitionExpression(o).get
865+
o.failAnalysis(
866+
errorClass = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
867+
messageParameters = Map(
868+
"expr" -> variantExpr.sql,
869+
"dataType" -> toSQLType(variantExpr.dataType)))
870+
856871
case o if o.expressions.exists(!_.deterministic) &&
857872
!operatorAllowsNonDeterministicExpressions(o) &&
858873
!o.isInstanceOf[Project] &&

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,69 @@ class DataFrameSuite extends QueryTest
308308
testData.select("key").collect().toSeq)
309309
}
310310

311+
test("SPARK-50503 - cannot partition by variant columns") {
312+
val df = sql("select parse_json(case when id = 0 then 'null' else '1' end)" +
313+
" as v, id % 5 as id, named_struct('v', parse_json(id::string)) s from range(0, 100, 1, 5)")
314+
// variant column
315+
checkError(
316+
exception = intercept[AnalysisException](df.repartition(5, col("v"))),
317+
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
318+
parameters = Map(
319+
"expr" -> "v",
320+
"dataType" -> "\"VARIANT\"")
321+
)
322+
// nested variant column
323+
checkError(
324+
exception = intercept[AnalysisException](df.repartition(5, col("s"))),
325+
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
326+
parameters = Map(
327+
"expr" -> "s",
328+
"dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\"")
329+
)
330+
// variant producing expression
331+
checkError(
332+
exception =
333+
intercept[AnalysisException](df.repartition(5, parse_json(col("id").cast("string")))),
334+
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
335+
parameters = Map(
336+
"expr" -> "parse_json(CAST(id AS STRING))",
337+
"dataType" -> "\"VARIANT\"")
338+
)
339+
// Partitioning by non-variant column works
340+
try {
341+
df.repartition(5, col("id")).collect()
342+
} catch {
343+
case e: Exception =>
344+
fail(s"Expected no exception to be thrown but an exception was thrown: ${e.getMessage}")
345+
}
346+
// SQL
347+
withTempView("tv") {
348+
df.createOrReplaceTempView("tv")
349+
checkError(
350+
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY v")),
351+
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
352+
parameters = Map(
353+
"expr" -> "tv.v",
354+
"dataType" -> "\"VARIANT\""),
355+
context = ExpectedContext(
356+
fragment = "DISTRIBUTE BY v",
357+
start = 17,
358+
stop = 31)
359+
)
360+
checkError(
361+
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY s")),
362+
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
363+
parameters = Map(
364+
"expr" -> "tv.s",
365+
"dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\""),
366+
context = ExpectedContext(
367+
fragment = "DISTRIBUTE BY s",
368+
start = 17,
369+
stop = 31)
370+
)
371+
}
372+
}
373+
311374
test("repartition with SortOrder") {
312375
// passing SortOrder expressions to .repartition() should result in an informative error
313376

0 commit comments

Comments
 (0)