-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-37829][SQL] DataFrame.joinWith should return null rows for missing values #35139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
72603a8 to
51992c8
Compare
|
Can one of the admins verify this patch? |
|
cc @cloud-fan @viirya FYI |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
Outdated
Show resolved
Hide resolved
Let's be more clear about "What changes were proposed in this pull request?". I thought this is a test-only PR but it actually fixed a bug. Please describe the test in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean the objDeserializer can be something that doesn't propagate nulls, so we need to manually check null input and create null literal? If so please add some code comments to explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems so. I'm new to this part of the code so you'll certainly have a better idea of what's going on here.
Could it happen because joinWith creates a tuple from what were top-level Rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you run the test locally, add some print here to see what's the objDeserializer that causes the bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sure! It's a CreateExternalRow Expression. If I println(enc.objDeserializer), it prints
createexternalrow(getcolumnbyordinal(0, StructField(a,StringType,true), StructField(b,IntegerType,false))._0.toString, getcolumnbyordinal(0, StructField(a,StringType,true), StructField(b,IntegerType,false))._1, StructField(a,StringType,true), StructField(b,IntegerType,false))
createexternalrow(getcolumnbyordinal(0, StructField(a,StringType,true), StructField(b,IntegerType,false))._0.toString, getcolumnbyordinal(0, StructField(a,StringType,true), StructField(b,IntegerType,false))._1, StructField(a,StringType,true), StructField(b,IntegerType,false))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there's a broader change to make, we could reduce the blast radius by:
- limiting the change to
CreateExternalRow(i.e. checkenc.objDeserializer.isInstanceOf[CreateExternalRow])? - having a dedicated tuple ExpressionEncoder for
Dataset.joinWith(i.e. ~ updateExpressionEncoder.tupleto add anullSafe: Boolean = falseflag, set it to true forDataset.joinWithand manually propagate nulls if true) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference comes from the RowEncoder deserializer:
- When using a
Dataset[T], theExpressionEncoderis used and callsScalaReflection.deserializerForTypeto get a deserializer for classT, which automatically wraps the expression in a null-safe expression. - When using a
DataFrame, theRowEncoderis used and returns aCreateExternalRow(not wrapped in a null-safe expression).
I'm not sure there's an easy way to solve this, as the RowEncoder should guarantee (afaik) that top-level Rows aren't null.
Actually I think everything is already summarized in your initial PR that patched the tuple encoder to wrap deserializers in a null-safe way: #13425
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the RowEncoder should guarantee (afaik) that top-level Rows aren't null.
This is not true for outer join. Shall we also add a null check to wrap CreateExternalRow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Breaking the assumption that top-level rows can't be null would represent a huge amount of work afaiu. I've tried simply wrapping CreateExternalRow with a null check and a number of tests started failing as they were assuming top-level rows couldn't be null.
Instead, updating joinWith seems more practical as we'd just want to handle what looks like a corner-case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK let's update joinWith then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a new commit. There are multiple ways to implement this. Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we check input.nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input is a GetStructField(GetColumnByOrdinal) and calling nullable on it will throw org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object.
I've used enc.objSerializer.nullable because that's what was used before the regression.
Wrap tuple fields deserializers in null checks when calling on DataFrames as top-level rows are not nullable and won't propagate null values.
| */ | ||
| private[sql] def nullSafe(exprEnc: ExpressionEncoder[Row]): ExpressionEncoder[Row] = { | ||
| val newDeserializerInput = GetColumnByOrdinal(0, exprEnc.objSerializer.dataType) | ||
| val newDeserializer: Expression = if (exprEnc.objSerializer.nullable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry the code here is a bit confusing. We check exprEnc.objSerializer.nullable and then we construct IsNull(newDeserializerInput)? What's their connection?
| // As we might be running on DataFrames, we need a custom encoder that will properly | ||
| // handle null top-level Rows. | ||
| def nullSafe[V](exprEnc: ExpressionEncoder[V]): ExpressionEncoder[V] = { | ||
| if (exprEnc.clsTag.runtimeClass != classOf[Row]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit ugly.
I've tried simply wrapping CreateExternalRow with a null check and a number of tests started failing as they were assuming top-level rows couldn't be null.
Are they UT or end-to-end tests? If they are UT, we can simply update the tests because we have changed the assumption.
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
We add a unit test demonstrating a regression on
DataFrame.joinWithand fix the regression by updatingExpressionEncoder. The fix is equivalent to reverting this commit.Why are the changes needed?
Doing an outer-join using joinWith on DataFrames used to return missing values as null in Spark 2.4.8, but returns them as Rows with null values in Spark 3.0.0+.
The regression has been introduced in this commit.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a unit test. This unit test succeeds with Spark 2.4.8 but fails with Spark 3.0.0+.
The new unit test does a left outer join on two DataFrames using the
joinWithmethod.The join is performed on the
bfield ofClassData(Ints).The row
ClassData("a", 1)on the left side of the join has no matching row on the right side of the join as there is no row with value1for fieldb.The missing value (of Row type) is represented as a
GenericRowWithSchema(Array(null, null), rightFieldSchema)instead of anullvalue making the test fail.This new test is identical to this one and only differs in that it uses DataFrames instead of Datasets.
I've run unit tests for the
sql-coreandsql-catalystsubmodules locally with./build/mvn clean package -pl sql/core,cql/catalyst