Skip to content
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5355,6 +5355,11 @@
"Cannot have MAP type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is <dataType>."
]
},
"SET_OPERATION_ON_VARIANT_TYPE" : {
"message" : [
"Cannot have VARIANT type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is <dataType>."
]
},
"SET_PROPERTIES_AND_DBPROPERTIES" : {
"message" : [
"set PROPERTIES and DBPROPERTIES at the same time."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
dt.existsRecursively(_.isInstanceOf[MapType])
}

protected def hasVariantType(dt: DataType): Boolean = {
dt.existsRecursively(_.isInstanceOf[VariantType])
}

protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match {
case _: Intersect | _: Except | _: Distinct =>
plan.output.find(a => hasMapType(a.dataType))
Expand All @@ -84,6 +88,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case _ => None
}

protected def variantColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match {
case _: Intersect | _: Except | _: Distinct =>
plan.output.find(a => hasVariantType(a.dataType))
case d: Deduplicate =>
d.keys.find(a => hasVariantType(a.dataType))
case _ => None
}

private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => limitExpr.failAnalysis(
Expand Down Expand Up @@ -820,6 +832,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
"colName" -> toSQLId(mapCol.name),
"dataType" -> toSQLType(mapCol.dataType)))

// TODO: Remove this type check once we support Variant ordering
case o if variantColumnInSetOperation(o).isDefined =>
val variantCol = variantColumnInSetOperation(o).get
o.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
messageParameters = Map(
"colName" -> toSQLId(variantCol.name),
"dataType" -> toSQLType(variantCol.dataType)))

case o if o.expressions.exists(!_.deterministic) &&
!operatorAllowsNonDeterministicExpressions(o) &&
!o.isInstanceOf[Project] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,84 @@ class DataFrameSetOperationsSuite extends QueryTest
dates.intersect(widenTypedRows).collect()
}

test("SPARK-50373 - cannot run set operations with variant type") {
val df = sql("select parse_json(case when id = 0 then 'null' else '1' end)" +
" as v, id % 5 as id from range(0, 100, 1, 5)")
checkError(
exception = intercept[AnalysisException](df.intersect(df)),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`v`",
"dataType" -> "\"VARIANT\"")
)
checkError(
exception = intercept[AnalysisException](df.except(df)),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`v`",
"dataType" -> "\"VARIANT\"")
)
checkError(
exception = intercept[AnalysisException](df.distinct()),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`v`",
"dataType" -> "\"VARIANT\""))
checkError(
exception = intercept[AnalysisException](df.dropDuplicates()),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`v`",
"dataType" -> "\"VARIANT\""))
withTempView("tv") {
df.createOrReplaceTempView("tv")
checkError(
exception = intercept[AnalysisException](sql("SELECT DISTINCT v FROM tv")),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`v`",
"dataType" -> "\"VARIANT\""),
context = ExpectedContext(
fragment = "SELECT DISTINCT v FROM tv",
start = 0,
stop = 24)
)
checkError(
exception = intercept[AnalysisException](sql("SELECT DISTINCT STRUCT(v) FROM tv")),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`struct(v)`",
"dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\""),
context = ExpectedContext(
fragment = "SELECT DISTINCT STRUCT(v) FROM tv",
start = 0,
stop = 32)
)
checkError(
exception = intercept[AnalysisException](sql("SELECT DISTINCT ARRAY(v) FROM tv")),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_VARIANT_TYPE",
parameters = Map(
"colName" -> "`array(v)`",
"dataType" -> "\"ARRAY<VARIANT>\""),
context = ExpectedContext(
fragment = "SELECT DISTINCT ARRAY(v) FROM tv",
start = 0,
stop = 31)
)
checkError(
exception = intercept[AnalysisException](sql("SELECT DISTINCT MAP('m', v) FROM tv")),
condition = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
parameters = Map(
"colName" -> "`map(m, v)`",
"dataType" -> "\"MAP<STRING, VARIANT>\""),
context = ExpectedContext(
fragment = "SELECT DISTINCT MAP('m', v) FROM tv",
start = 0,
stop = 34)
)
}
}

test("SPARK-19893: cannot run set operations with map type") {
val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
checkError(
Expand Down