Skip to content

Commit 8288e16

Browse files
jerryshaocloud-fan
authored andcommitted
[SPARK-15620][SQL] Fix transformed dataset attributes revolve failure
## What changes were proposed in this pull request? Join on transformed dataset has attributes conflicts, which make query execution failure, for example: ``` val dataset = Seq(1, 2, 3).toDs val mappedDs = dataset.map(_ + 1) mappedDs.as("t1").joinWith(mappedDs.as("t2"), $"t1.value" === $"t2.value").show() ``` will throw exception: ``` org.apache.spark.sql.AnalysisException: cannot resolve '`t1.value`' given input columns: [value]; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:59) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287) ``` ## How was this patch tested? Unit test. Author: jerryshao <[email protected]> Closes #13399 from jerryshao/SPARK-15620.
1 parent 6dddb70 commit 8288e16

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,10 @@ class Analyzer(
524524
val newVersion = oldVersion.newInstance()
525525
(oldVersion, newVersion)
526526

527+
case oldVersion: SerializeFromObject
528+
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
529+
(oldVersion, oldVersion.copy(serializer = oldVersion.serializer.map(_.newInstance())))
530+
527531
// Handle projects that create conflicting aliases.
528532
case oldVersion @ Project(projectList, _)
529533
if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
769769
}
770770
}
771771

772+
test("mapped dataset should resolve duplicated attributes for self join") {
773+
val ds = Seq(1, 2, 3).toDS().map(_ + 1)
774+
val ds1 = ds.as("d1")
775+
val ds2 = ds.as("d2")
776+
777+
checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (2, 2), (3, 3), (4, 4))
778+
checkDataset(ds1.intersect(ds2), 2, 3, 4)
779+
checkDataset(ds1.except(ds1))
780+
}
781+
772782
test("SPARK-15441: Dataset outer join") {
773783
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDS().as("left")
774784
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDS().as("right")

0 commit comments

Comments
 (0)