-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18877][SQL] CSVInferSchema.inferField on DecimalType should find a common type with typeSoFar
#16320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #70280 has finished for PR 16320 at commit
|
|
Hi, @rxin , @falaki , and @HyukjinKwon . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, so, typeSoFar should keep the precision and scale while being (partially) aggregated within each partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @HyukjinKwon .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the fallback policy here is to use StringType, shoud we follow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @cloud-fan . I used NullType since mergeRowTypes does.
def mergeRowTypes(first: Array[DataType], second: Array[DataType]): Array[DataType] = {
first.zipAll(second, NullType, NullType).map { case (a, b) =>
findTightestCommonType(a, b).getOrElse(NullType)
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, otherwise, it might end up with an incorrect datatypes. For example,
val path = "/tmp/test1"
Seq(s"${Long.MaxValue}1", "2015-12-01 00:00:00", "1").toDF().coalesce(1).write.text(path)
spark.read.option("inferSchema", true).csv(path).printSchema()root
|-- _c0: integer (nullable = true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct. I'll change into StringType.
|
@dongjoon-hyun would we need a end-to-end test too? |
|
Actually, I made the end-to-end test based on the example of the use case in JIRA first. And, I removed that from here because the current test case is the minimal version of that. |
|
Hi, @gatorsmile . |
|
Test build #70576 has finished for PR 16320 at commit
|
|
Hi, @gatorsmile . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add another test case here using the input constant with more than 38 precision?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed your comment here. Let me try!
|
Thank you for review, @gatorsmile . |
|
Test build #70685 has finished for PR 16320 at commit
|
|
Could you review this |
|
LGTM cc @cloud-fan |
|
Thank you for review, @gatorsmile . |
|
Happy New Year! |
|
Thank you again, @cloud-fan and @HyukjinKwon . I updated the fallback datatype. |
|
Please add the test case? |
|
I assumed this one. Right? val path = "/tmp/test1"
Seq(s"${Long.MaxValue}1", "2015-12-01 00:00:00", "1").toDF().coalesce(1).write.text(path)
spark.read.option("inferSchema", true).csv(path).printSchema() |
|
Yep. I added the testcase as a minimized version, too. @gatorsmile |
|
Test build #70798 has started for PR 16320 at commit |
|
Test build #70795 has finished for PR 16320 at commit
|
|
The test case coverage in the suite |
|
Retest this please |
|
I see, @gatorsmile . I will try to make a PR to improve the coverage. For this issue, the failure about the last commit (adding test case) was a R failure. So, it's irrelevant. I rerun it. |
|
Test build #70809 has finished for PR 16320 at commit
|
|
thanks, merging to master! |
|
oh forget to backport it to 2.1/2.0, @gatorsmile can you do it? My connection is bad now. |
|
Thank you, @cloud-fan ! |
|
@dongjoon-hyun Could you submit a backport PR to 2.1? I am unable to merge this PR to 2.1. Thanks! |
|
Sure. I'll create a backpor PR for 2.1. |
…find a common type with `typeSoFar`
## What changes were proposed in this pull request?
CSV type inferencing causes `IllegalArgumentException` on decimal numbers with heterogeneous precisions and scales because the current logic uses the last decimal type in a **partition**. Specifically, `inferRowType`, the **seqOp** of **aggregate**, returns the last decimal type. This PR fixes it to use `findTightestCommonType`.
**decimal.csv**
```
9.03E+12
1.19E+11
```
**BEFORE**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(3,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
16/12/16 14:32:49 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Decimal precision 4 exceeds max precision 3
```
**AFTER**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(4,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
+---------+
| _c0|
+---------+
|9.030E+12|
| 1.19E+11|
+---------+
```
## How was this patch tested?
Pass the newly add test case.
Author: Dongjoon Hyun <[email protected]>
Closes apache#16320 from dongjoon-hyun/SPARK-18877.
…find a common type with `typeSoFar`
## What changes were proposed in this pull request?
CSV type inferencing causes `IllegalArgumentException` on decimal numbers with heterogeneous precisions and scales because the current logic uses the last decimal type in a **partition**. Specifically, `inferRowType`, the **seqOp** of **aggregate**, returns the last decimal type. This PR fixes it to use `findTightestCommonType`.
**decimal.csv**
```
9.03E+12
1.19E+11
```
**BEFORE**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(3,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
16/12/16 14:32:49 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Decimal precision 4 exceeds max precision 3
```
**AFTER**
```scala
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").printSchema
root
|-- _c0: decimal(4,-9) (nullable = true)
scala> spark.read.format("csv").option("inferSchema", true).load("decimal.csv").show
+---------+
| _c0|
+---------+
|9.030E+12|
| 1.19E+11|
+---------+
```
## How was this patch tested?
Pass the newly add test case.
Author: Dongjoon Hyun <[email protected]>
Closes apache#16320 from dongjoon-hyun/SPARK-18877.
What changes were proposed in this pull request?
CSV type inferencing causes
IllegalArgumentExceptionon decimal numbers with heterogeneous precisions and scales because the current logic uses the last decimal type in a partition. Specifically,inferRowType, the seqOp of aggregate, returns the last decimal type. This PR fixes it to usefindTightestCommonType.decimal.csv
BEFORE
AFTER
How was this patch tested?
Pass the newly add test case.