-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19918][SQL] Use TextFileFormat in implementation of TextInputJsonDataSource #17255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
5e90d04 to
e2d34b8
Compare
|
(let me wait for the tests before cc'ing someone) |
|
Test build #74375 has finished for PR 17255 at commit
|
|
cc @cloud-fan, @JoshRosen and @NathanHowell could you take a look and see if it makes sense when you have some time? |
|
Test build #74402 has finished for PR 17255 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the Encoders import still necessary?
|
Would there be any additional benefit of replacing more (or all?) of the uses of |
|
Let me take a look for that. I think we can replace |
|
Test build #74415 has finished for PR 17255 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df.queryExecution.toRDD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make it return Dataset like CSV?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try.
|
Test build #74426 has started for PR 17255 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be too much. I am willing to revert this back if anyone feels it is a bit odd.
I just made this just for matching it to CSVUtils that contains variants for logically same preprocessing performed on different data type (e.g. Iterator, RDD, Dataset).
|
Test build #74427 has started for PR 17255 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file basically resembles CSVDataSource. (Note that this is almost identical if #17256 is merged).
|
Test build #74429 has started for PR 17255 at commit |
|
It seems all builds were gone failed unexpectedly. Let me restart. |
3e6138a to
76dceb2
Compare
|
Test build #74434 has finished for PR 17255 at commit
|
| json | ||
| } else { | ||
| json.sample(withReplacement = false, configOptions.samplingRatio, 1) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move the sample logic out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because JsonInferSchema.infer takes an RDD[T] which is the actual source to parse JSON strings. In case of whole text, it is RDD[PortableDataStream] whereas for normal one, it is RDD[UTF8String].
Thing is, it seems there is an advantage of doing the sample operation on Dataset[String] (not on RDD). So, the sample had to be applied onto Dataset[String] before converting it into RDD[UTF8String].
In a simple view:
-
TextInputJsonDataSource:val json: Dataset[String] = ... val sampled: Dataset[String] = JsonUtils.sample(...) val rdd: RDD[UTF8String] = ... JsonInferSchema.infer(rdd)
-
WholeFileJsonDataSource:val json: RDD[PortableDataStream] = ... val sampled: RDD[PortableDataStream] = JsonUtils.sample(...) JsonInferSchema.infer(rdd)
I could not find a good way to generalize JsonInferSchema.infer to take both Dataset and RDD as the source so that keep the logic within here with small and clean changes.
If this question is about why it use Dataset.sample instead of RDD.sample, it is suggested in #17255 (comment).
Up to my knowledge, both use the same sampler BernoulliCellSampler as replacements are disabled but for Dataset one, it generates the codes. So, I thought there might be a bit of benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly, maybe this is not directly related with the JIRA. I am willing to revert this change back or please let me know if you have a better idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah, perhaps the RDD->Dataset changes should be done under a separate issue. I think it can be done across the board (removing most/all RDD references) but I'm not sure what other implications it would have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(My worry is though.. it might be a grunt work to check each really is better on Dataset..) @cloud-fan, if you are not sure of here, let me revert. Please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine
|
thanks, merging to master! |
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of #47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., #29063, #15813, #17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…aframe read / write API ### What changes were proposed in this pull request? This PR is a retry of apache#47328 which replaces RDD to Dataset to write SparkR metadata plus this PR removes `repartition(1)`. We actually don't need this when the input is single row as it creates only single partition: https://github.com/apache/spark/blob/e5e751b98f9ef5b8640079c07a9a342ef471d75d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala#L49-L57 ### Why are the changes needed? In order to leverage Catalyst optimizer and SQL engine. For example, now we leverage UTF-8 encoding instead of plain JDK ser/de for strings. We have made similar changes in the past, e.g., apache#29063, apache#15813, apache#17255 and SPARK-19918. Also, we remove `repartition(1)`. To avoid unnecessary shuffle. With `repartition(1)`: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=6] +- LocalTableScan [_1#0] ``` Without `repartition(1)`: ``` == Physical Plan == LocalTableScan [_1#2] ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify the change ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47341 from HyukjinKwon/SPARK-48883-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR proposes to use text datasource when Json schema inference.
This basically proposes the similar approach in #15813 If we use Dataset for initial loading when inferring the schema, there are advantages. Please refer SPARK-18362
It seems JSON one was supposed to be fixed together but taken out according to #15813
Also, this seems affecting some functionalities because it does not use
FileScanRDD. This problem is described in SPARK-19885 (but it was CSV's case).How was this patch tested?
Existing tests should cover this and manual test by
spark.read.json(path)and check the UI.