From 58e779d2c87cedd2612402a6c299e7ff996ba5bb Mon Sep 17 00:00:00 2001 From: --global Date: Mon, 10 Apr 2023 20:54:23 -0700 Subject: [PATCH 1/8] add test --- .../org/apache/spark/sql/DatasetSuite.scala | 48 +++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 4aca7c8a5a666..c96b88d1b56bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,20 +19,18 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} - import scala.util.Random - import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ - import org.apache.spark.{SparkConf, SparkException, TaskContext} import org.apache.spark.TestUtils.withListener import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample} import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution} @@ -2429,6 +2427,50 @@ class DatasetSuite extends QueryTest assert(parquetFiles.size === 10) } } + + test("SPARK-37829: DataFrame outer join") { + // Same as "SPARK-15441: Dataset outer join" but using DataFrames instead of Datasets + val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF().as("left") + val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF().as("right") + val joined = left.joinWith(right, $"left.b" === $"right.b", "left") + + val leftFieldSchema = StructType( + Seq( + StructField("a", StringType), + StructField("b", IntegerType, nullable = false) + ) + ) + val rightFieldSchema = StructType( + Seq( + StructField("a", StringType), + StructField("b", IntegerType, nullable = false) + ) + ) + val expectedSchema = StructType( + Seq( + StructField( + "_1", + leftFieldSchema, + nullable = false + ), + // This is a left join, so the right output is nullable: + StructField( + "_2", + rightFieldSchema + ) + ) + ) + assert(joined.schema === expectedSchema) + + val result = joined.collect().toSet + val expected = Set( + new GenericRowWithSchema(Array("a", 1), leftFieldSchema) -> + null, + new GenericRowWithSchema(Array("b", 2), leftFieldSchema) -> + new GenericRowWithSchema(Array("x", 2), rightFieldSchema) + ) + assert(result == expected) + } } class DatasetLargeResultCollectingSuite extends QueryTest From 2f43ef41b6c6f3446db8fefb6cbe0176ed5b1eda Mon Sep 17 00:00:00 2001 From: --global Date: Tue, 11 Apr 2023 09:46:50 -0700 Subject: [PATCH 2/8] local var --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index faa165c298d09..94b393a2853a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -114,9 +114,12 @@ object ExpressionEncoder { If(IsNull(input), Literal.create(null, result.dataType), result) } + val serializer = nullSafe(newSerializerInput, newSerializer) + val deserializer = nullSafe(newDeserializerInput, newDeserializer) + new ExpressionEncoder[Any]( - nullSafe(newSerializerInput, newSerializer), - nullSafe(newDeserializerInput, newDeserializer), + serializer, + deserializer, ClassTag(cls)) } From 1809a4d63eb46a8670b21cae85789051d57590fe Mon Sep 17 00:00:00 2001 From: --global Date: Tue, 11 Apr 2023 17:19:02 -0700 Subject: [PATCH 3/8] add back null check in children deserializer --- .../sql/catalyst/encoders/ExpressionEncoder.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 94b393a2853a4..b051bd418be21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -104,9 +104,15 @@ object ExpressionEncoder { s"`GetColumnByOrdinal`, but there are ${getColExprs.size}") val input = GetStructField(newDeserializerInput, index) - enc.objDeserializer.transformUp { + val newDeserializer = enc.objDeserializer.transformUp { case GetColumnByOrdinal(0, _) => input } + + if (enc.objSerializer.nullable) { + If (IsNull(input), Literal.create(null, newDeserializer.dataType), newDeserializer) + } else { + newDeserializer + } } val newDeserializer = NewInstance(cls, deserializers, ObjectType(cls), propagateNull = false) @@ -114,12 +120,9 @@ object ExpressionEncoder { If(IsNull(input), Literal.create(null, result.dataType), result) } - val serializer = nullSafe(newSerializerInput, newSerializer) - val deserializer = nullSafe(newDeserializerInput, newDeserializer) - new ExpressionEncoder[Any]( - serializer, - deserializer, + nullSafe(newSerializerInput, newSerializer), + newDeserializer, ClassTag(cls)) } From 289a5463f91729361c9fc70aa51cd6098c30b337 Mon Sep 17 00:00:00 2001 From: --global Date: Wed, 12 Apr 2023 11:42:48 -0700 Subject: [PATCH 4/8] fix scala style check --- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c96b88d1b56bb..75cee40781972 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} + import scala.util.Random + import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ + import org.apache.spark.{SparkConf, SparkException, TaskContext} import org.apache.spark.TestUtils.withListener import org.apache.spark.internal.config.MAX_RESULT_SIZE From 413b632095d9de290da4a484b459b65d34e3ff09 Mon Sep 17 00:00:00 2001 From: --global Date: Thu, 13 Apr 2023 17:10:38 -0700 Subject: [PATCH 5/8] remove extra space --- .../apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index b051bd418be21..3e2d9db1f4ae0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -109,7 +109,7 @@ object ExpressionEncoder { } if (enc.objSerializer.nullable) { - If (IsNull(input), Literal.create(null, newDeserializer.dataType), newDeserializer) + If(IsNull(input), Literal.create(null, newDeserializer.dataType), newDeserializer) } else { newDeserializer } From 6912c3b8dace291b517a0f1acde7860c01bff89e Mon Sep 17 00:00:00 2001 From: --global Date: Thu, 13 Apr 2023 20:01:54 -0700 Subject: [PATCH 6/8] use exiting nullSafe --- .../sql/catalyst/encoders/ExpressionEncoder.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 3e2d9db1f4ae0..0923aec868de7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -97,8 +97,12 @@ object ExpressionEncoder { } val newSerializer = CreateStruct(serializers) + def nullSafe(input: Expression, result: Expression): Expression = { + If(IsNull(input), Literal.create(null, result.dataType), result) + } + val newDeserializerInput = GetColumnByOrdinal(0, newSerializer.dataType) - val deserializers = encoders.zipWithIndex.map { case (enc, index) => + val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) => val getColExprs = enc.objDeserializer.collect { case c: GetColumnByOrdinal => c }.distinct assert(getColExprs.size == 1, "object deserializer should have only one " + s"`GetColumnByOrdinal`, but there are ${getColExprs.size}") @@ -109,16 +113,13 @@ object ExpressionEncoder { } if (enc.objSerializer.nullable) { - If(IsNull(input), Literal.create(null, newDeserializer.dataType), newDeserializer) + nullSafe(input, newDeserializer) } else { newDeserializer } } - val newDeserializer = NewInstance(cls, deserializers, ObjectType(cls), propagateNull = false) - - def nullSafe(input: Expression, result: Expression): Expression = { - If(IsNull(input), Literal.create(null, result.dataType), result) - } + val newDeserializer = + NewInstance(cls, childrenDeserializers, ObjectType(cls), propagateNull = false) new ExpressionEncoder[Any]( nullSafe(newSerializerInput, newSerializer), From 2e59114c46ca55aacd449df325715cc887219701 Mon Sep 17 00:00:00 2001 From: --global Date: Fri, 14 Apr 2023 11:01:59 -0700 Subject: [PATCH 7/8] better naming --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 0923aec868de7..e1df5fd3356e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -108,14 +108,14 @@ object ExpressionEncoder { s"`GetColumnByOrdinal`, but there are ${getColExprs.size}") val input = GetStructField(newDeserializerInput, index) - val newDeserializer = enc.objDeserializer.transformUp { + val childDeserializer = enc.objDeserializer.transformUp { case GetColumnByOrdinal(0, _) => input } if (enc.objSerializer.nullable) { - nullSafe(input, newDeserializer) + nullSafe(input, childDeserializer) } else { - newDeserializer + childDeserializer } } val newDeserializer = From 2372d49b025d4cfc27c62453b51a6b1ef9cfbb87 Mon Sep 17 00:00:00 2001 From: --global Date: Tue, 18 Apr 2023 15:45:31 -0700 Subject: [PATCH 8/8] keep deserializer outermost null check --- .../apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index e1df5fd3356e4..8f7583c48fcac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -123,7 +123,7 @@ object ExpressionEncoder { new ExpressionEncoder[Any]( nullSafe(newSerializerInput, newSerializer), - newDeserializer, + nullSafe(newDeserializerInput, newDeserializer), ClassTag(cls)) }