-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16289][SQL] Implement posexplode table generating function #13971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #61461 has finished for PR 13971 at commit
|
|
cc @rxin and @cloud-fan . |
| */ | ||
| // scalastyle:off line.size.limit | ||
| @ExpressionDescription( | ||
| usage = "_FUNC_(a) - Separates the elements of array a into multiple rows, or the elements of map a into multiple rows and columns.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an example would be useful.
|
Do we have unit tests for explode expression? (not end-to-end tests) if not, do you mind looking into it? |
|
Hi, @rxin . Thank you for review. I updated the followings.
For the |
|
Can we create a suite for unit testing generators? |
|
If you want that for In general, |
|
Yea let's start with that, and we can add more in the future. I'd also add it for the other ones you are implementing, e.g. inline, in those prs. |
|
Sure. Thank you for fast feedback! :) |
|
Now, |
| private final val int_array = Seq(1, 2, 3) | ||
| private final val str_array = Seq("a", "b", "c") | ||
|
|
||
| test("explode") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a test case for empty input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Done.
|
Now, the followings are updated.
|
|
Test build #61492 has finished for PR 13971 at commit
|
|
Could you do me a favor? There is a tiny fix. Could you take a look at #13730 ? |
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| class GeneratorSuite extends SparkFunSuite with ExpressionEvalHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thsi one maybe GeneratorExpressionSuite
|
This looks pretty good. Let's fix the remaining minor issues and merge it. |
|
I tried to add to Python/R. |
|
LGTM pending Jenkins. |
|
Oops. I added R, too. The exactly same semantic of current |
|
Test build #61498 has finished for PR 13971 at commit
|
|
@rxin Thank you for intensive reviewing this PR. |
|
Test build #61500 has finished for PR 13971 at commit
|
|
Test build #61501 has finished for PR 13971 at commit
|
|
Test build #61504 has finished for PR 13971 at commit
|
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
||
| class GeneratorFunctionSuite extends QueryTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we put expression level unit test in sql core module instead of catalyst?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I just realized it's not expression level unit test, but end-to-end test
|
LGTM except the unit test, @rxin do we need expression level unit test for it? |
|
He added it, didn't he? |
|
Test build #61509 has finished for PR 13971 at commit
|
|
|
||
| class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { | ||
| private def checkTuple(actual: ExplodeBase, expected: Seq[InternalRow]): Unit = { | ||
| assert(actual.eval(null).toSeq === expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have checkEvaluation for this purpose, how about using that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thank you for review, @cloud-fan , too.
Do we have an example of checkEvaluation to check the generator, multiple InternalRows?
I just thought checkEvaluation is just for a single row, e.g., values, arrays, maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, how to check the zero row? At Line 39,
https://github.com/apache/spark/pull/13971/files#diff-6715134a4e95980149a7600ecb71674cR41
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkEvaluation takes Any as expected result, so I don't think checkEvaluation is only used for a single row.
Have you tried to pass a Seq[Row] to checkEvaluation? If it doesn't work, is it possible to improve checkEvaluation so that it can work for this case? thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. @cloud-fan . In fact, I try everything you told me in many ways because I trust you. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a evidence, let me write the results of the most simplest case.
checkEvaluation(Explode(CreateArray(Seq.empty)), Seq.empty[Row])
checkEvaluation(Explode(CreateArray(Seq.empty)), Seq.empty[InternalRow])
checkEvaluation(Explode(CreateArray(Seq.empty)), Seq.empty)All the above returns the followings.
Incorrect evaluation (codegen off): explode(array()), actual: InternalRow;(), expected: []
Here is the body of checkEvaluation. The following comments are the limitation I found.
// 1. This makes `Seq[Any]` into `GenericArrayData` generally.
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
// 2. Here, `val actual = plan(inputRow).get(0, expression.dataType)` is called to try casting to `expression.dataType`.
checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
// 3. Here, `val unsafeRow = plan(inputRow)` with one row assumption.
checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
}
// 4. Here, `checkResult` fails at `result == expected`.
checkEvaluationWithOptimization(expression, catalystValue, inputRow)In short, every steps of the checkEvaluation seem to depend on the single row assumption heavily. If we wan to change this. We should do in a separate issue since it's not trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I didn't misunderstand, it's definitely valuable issue to investigate more. If we can upgrade checkEvaluation later, we can unify the testcases of this PR with checkEvaluation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not change it for now. We also don't want test code to become so complicated that is is no longer obvious what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Thank you. I'll investigate it later.
|
Test build #61512 has finished for PR 13971 at commit
|
|
If I understand correctly, the remaining issue is |
|
Merging in master. Thanks! |
|
Thank you for review and merging, @rxin and @cloud-fan . |
|
|
||
|
|
||
| @since(2.1) | ||
| def posexplode(col): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rxin , is posexplode a special hive fallback function that we need to register? other ones don't get registered in functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this one, I thought the reason is explode is already registered. posexplode is a pair of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea this one is probably fine.
i wouldn't register the other ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reconfirming!
This PR implements `posexplode` table generating function. Currently, master branch raises the following exception for `map` argument. It's different from Hive.
**Before**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... posexplode() takes an array as a parameter; line 1 pos 7
```
**After**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| 1|
| 1| b| 2|
+---+---+-----+
```
For `array` argument, `after` is the same with `before`.
```
scala> sql("select posexplode(array(1, 2, 3))").show
+---+---+
|pos|col|
+---+---+
| 0| 1|
| 1| 2|
| 2| 3|
+---+---+
```
Pass the Jenkins tests with newly added testcases.
Author: Dongjoon Hyun <[email protected]>
Closes #13971 from dongjoon-hyun/SPARK-16289.
(cherry picked from commit 46395db)
Signed-off-by: Reynold Xin <[email protected]>
### What changes were proposed in this pull request? The pr aims to upgrade `netty` from `4.1.108.Final` to `4.1.109.Final`. ### Why are the changes needed? https://netty.io/news/2024/04/15/4-1-109-Final.html This version has brought some bug fixes and improvements, such as: - Fix DefaultChannelId#asLongText NPE ([#13971](netty/netty#13971)) - Rewrite ZstdDecoder to remove the need of allocate a huge byte[] internally ([#13928](netty/netty#13928)) - Don't send a RST frame when closing the stream in a write future while processing inbound frames ([#13973](netty/netty#13973)) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46112 from panbingkun/netty_for_spark4. Authored-by: panbingkun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR implements
posexplodetable generating function. Currently, master branch raises the following exception formapargument. It's different from Hive.Before
After
For
arrayargument,afteris the same withbefore.How was this patch tested?
Pass the Jenkins tests with newly added testcases.