Skip to content

Commit b233d09

Browse files
committed
Backport "Use type-widened encoder for DataFrame rather than existing encoder to allow type-widening from set operations"
1 parent b959dab commit b233d09

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,7 +1456,7 @@ class Dataset[T] private[sql](
14561456
* @group typedrel
14571457
* @since 2.0.0
14581458
*/
1459-
def union(other: Dataset[T]): Dataset[T] = withTypedPlan {
1459+
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
14601460
// This breaks caching, but it's usually ok because it addresses a very specific use case:
14611461
// using union to union many files or partitions.
14621462
CombineUnions(Union(logicalPlan, other.logicalPlan))
@@ -1472,7 +1472,7 @@ class Dataset[T] private[sql](
14721472
* @group typedrel
14731473
* @since 1.6.0
14741474
*/
1475-
def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
1475+
def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
14761476
Intersect(logicalPlan, other.logicalPlan)
14771477
}
14781478

@@ -1486,7 +1486,7 @@ class Dataset[T] private[sql](
14861486
* @group typedrel
14871487
* @since 2.0.0
14881488
*/
1489-
def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
1489+
def except(other: Dataset[T]): Dataset[T] = withSetOperator {
14901490
Except(logicalPlan, other.logicalPlan)
14911491
}
14921492

@@ -2607,4 +2607,14 @@ class Dataset[T] private[sql](
26072607
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
26082608
Dataset(sparkSession, logicalPlan)
26092609
}
2610+
2611+
/** A convenient function to wrap a set based logical plan and produce a Dataset. */
2612+
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
2613+
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
2614+
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
2615+
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
2616+
} else {
2617+
Dataset(sparkSession, logicalPlan)
2618+
}
2619+
}
26102620
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql
1919

2020
import java.io.File
2121
import java.nio.charset.StandardCharsets
22+
import java.sql.{Date, Timestamp}
2223
import java.util.UUID
2324

2425
import scala.language.postfixOps
@@ -1585,4 +1586,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
15851586
}
15861587
}
15871588
}
1589+
1590+
test("SPARK-17123: Performing set operations that combine non-scala native types") {
1591+
val dates = Seq(
1592+
(BigDecimal.valueOf(1), new Timestamp(2)),
1593+
(BigDecimal.valueOf(4), new Timestamp(5))
1594+
).toDF("timestamp", "decimal")
1595+
1596+
val widenTypedRows = Seq(
1597+
(10.5D, "string")
1598+
).toDF("timestamp", "decimal")
1599+
1600+
dates.union(widenTypedRows).collect()
1601+
dates.except(widenTypedRows).collect()
1602+
dates.intersect(widenTypedRows).collect()
1603+
}
15881604
}

0 commit comments

Comments
 (0)