-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21610][SQL] Corrupt records are not handled properly when creating a dataframe from a file #18865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ting a dataframe from a file
|
cc @gatorsmile @cloud-fan Can you help trigger Jenkins for this? Thanks. |
|
ok to test |
|
|
||
| (file: PartitionedFile) => { | ||
| val parser = new JacksonParser(actualSchema, parsedOptions) | ||
| // SPARK-21610: when the `requiredSchema` only contains `_corrupt_record`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this bug apply for csv too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. But he is a beginner I'm mentoring to contribute to Spark. So we will keep this change focusing on Json. We may deal with csv later. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I understood it fixes the issue described in the JIRA but won't this introduce casting tries for all columns when the requested schema is empty? I think this one is a rather band-aid fix. I mean, I think if the actualSchema has few columns selected from many columns, it'd introduce similar problem again ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are two issues:
-
When required schema is empty. This is why the jenkins test fails. We're working on fix it.
-
If
actualSchemaselects few columns. We noticed that. The column_corrupted_recordis different when the selected columns are different. But correctly we treat it as a designed behavior. Not sure if this is an issue we need to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column _corrupted_record is different when the selected columns are different
If _corrupted_record is designed to have different values for different selected columns, it may makes sense to set _corrupted_record to null if no columns are selected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agreed. With current behavior, it is unavoidable to have some strange queries with _corrupted_record.
I'd suggest as #18865 (comment), we should document _corrupted_record in CSV, JSON data sources is a derived column and can be incorrect if not used with other columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to let users know that _corrupted_record is a derived column from other columns and cannot be selected alone in a query.
@viirya I created issue https://issues.apache.org/jira/browse/SPARK-21610 and need to select field "_corrupt_record" alone. This is possible with spark 2.2 (if a dataframe is created from a RDD) and it would be great to keep this behaviour in future versions of Spark.
My use case is the following one: a spark job reads JSON with an input schema, and will:
- save records that match the input schema in parquet format
- save "corrupt records" (invalid JSON or records that do match the input schema) to text files, in a separate folder.
Basically, I want :
- a folder with clean data in parquet format
- another folder with "corrupt records". I can then analyze corrupt records and for instance tell partners that they are sending invalid data. This enables a clean data pipeline that separates valid records from corrupt records
To get valid and corrupt records, I write :
val validRecords = df.filter(col("_corrupt_record").isNull)
.drop("_corrupt_record")
val corruptRecords = df.filter(col("_corrupt_record").isNotNull)
.select("_corrupt_record")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your usage scenario makes sense to me. The contents of _corrupt_record depends on the fields our parser passed. The workaround is you can save your output to the cache or a physical table.
val df = dfFromFile.cache()
df.filter($"_corrupt_record".isNull).drop("_corrupt_record").show()
df.filter($"_corrupt_record".isNotNull).select("_corrupt_record").show()My current suggestion is to capture the empty actual schema and issue an error with a reasonable workaround message. Users can at least know what happened and how to fix the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dm-tran I think the current change can support your use cases. When only _corrupt_record is selected, it is as the same as the effect of selecting all columns, i.e. a record is recognized as corrupt if any column is in invalid format.
However, it seems be countering the designed behavior that lets _corrupt_record depend on the selected columns, as we discussed in previous comments.
I think @gatorsmile's suggestion should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile @viirya I also think that @gatorsmile's suggestion looks good. Thanks for your replies!
|
cc @HyukjinKwon |
|
Test build #80350 has finished for PR 18865 at commit
|
|
retest this please. |
|
@HyukjinKwon @cloud-fan May you help trigger Jenkins? Thanks. |
|
Oh. sorry. Looks like the Jenkins run tests now. |
|
Test build #80375 has finished for PR 18865 at commit
|
|
Could we issue a better error message in such a scenario? |
|
I think it makes sense to issue an error with good helpful message when users only select |
|
gentle ping @viirya, @gatorsmile, made a minor change to throw reasonable workaround message. |
| // SPARK-21610: when the `requiredSchema` only contains `_corrupt_record`, | ||
| // the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. | ||
| // When users requires only `_corrupt_record`, we assume that the corrupt records are required | ||
| // for all json fields, i.g., all items in dataSchema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is wrong now. We can get rid of this comment as the following exception is self-explained.
|
|
||
| test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + | ||
| "from a file") { | ||
| val tempDir = Utils.createTempDir() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use withTempPath.
|
Test build #81334 has finished for PR 18865 at commit
|
|
Thanks @viirya's suggestion, the redundant comment is removed and |
|
Test build #81339 has finished for PR 18865 at commit
|
|
retest this please. |
|
Test build #81340 has finished for PR 18865 at commit
|
|
Test build #81451 has finished for PR 18865 at commit
|
| ## Upgrading From Spark SQL 2.2 to 2.3 | ||
|
|
||
| - The queries which select only `spark.sql.columnNameOfCorruptRecord` column are disallowed now. Notice that the queries which have only the column after column pruning (e.g. filtering on the column followed by a counting operation) are also disallowed. If you want to select only the corrupt records, you should cache or save the Dataset and DataFrame before running such queries. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cache or save the underlying Dataset and DataFrame ...
| if (requiredSchema.length == 1 && | ||
| requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { | ||
| throw new AnalysisException( | ||
| s"'${parsedOptions.columnNameOfCorruptRecord}' cannot be selected alone without other " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line and the follow ling are concatenated and maybe too long. Add a \n in the end of this line?
| throw new AnalysisException( | ||
| s"'${parsedOptions.columnNameOfCorruptRecord}' cannot be selected alone without other " + | ||
| "data columns, because its content is completely derived from the data columns parsed.\n" + | ||
| "If you want to select corrupt records only, cache or save the Dataset " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a \n here too?
| requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { | ||
| throw new AnalysisException( | ||
| s"'${parsedOptions.columnNameOfCorruptRecord}' cannot be selected alone without other " + | ||
| "data columns, because its content is completely derived from the data columns parsed.\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add one more sentence like Even your queries looks not only select this column, if after column pruning it isn't involving paring any data fields, e.g., filtering on the column followed by a counting, it can produce incorrect results and so disallowed.
|
Leave more few comments on the message. Otherwise LGTM. |
|
@viirya, thank you so much for taking a look and your time. |
|
Seems fine to me too. |
|
Thank you for review, @HyukjinKwon. cc @gatorsmile |
|
Test build #81561 has finished for PR 18865 at commit
|
docs/sql-programming-guide.md
Outdated
|
|
||
| ## Upgrading From Spark SQL 2.2 to 2.3 | ||
|
|
||
| - The queries which select only `spark.sql.columnNameOfCorruptRecord` column are disallowed now. Notice that the queries which have only the column after column pruning (e.g. filtering on the column followed by a counting operation) are also disallowed. If you want to select only the corrupt records, you should cache or save the underlying Dataset and DataFrame before running such queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named
_corrupt_columnby default). For example,spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()andspark.read.schema(schema).json(file).select("_corrupt_record").show(). Instead, you can cache or save the parsed results and then send the same query. For example,val df = spark.read.schema(schema).json(file).cache()and then
df.filter($"_corrupt_record".isNotNull).count().
| } | ||
| } | ||
|
|
||
| test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you include both negative cases I posted above?
Also include the workaround in the test case? It can ensure the future code changes will not break it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll update the PR.
|
@gatorsmile those negative cases and workaround are already added in |
| assert(msg.contains(expectedErrorMsg)) | ||
| // negative cases | ||
| msg = intercept[AnalysisException] { | ||
| spark.read.schema(schema).json(path).select("_corrupt_record").show() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already have the one using collect(). No need to do it here.
| spark.read.schema(schema).json(path).select("_corrupt_record").collect() | ||
| }.getMessage | ||
| assert(msg.contains(expectedErrorMsg)) | ||
| // negative cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to line 2049. Thanks!
| // workaround | ||
| val df = spark.read.schema(schema).json(path).cache() | ||
| assert(df.filter($"_corrupt_record".isNotNull).count() == 1) | ||
| assert(df.filter($"_corrupt_record".isNull).count() == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add another one
checkAnswer(
spark.read.schema(schema).json(path).select("_corrupt_record"),
Row(....
| "If you want to select corrupt records only, cache or save the Dataset\n" + | ||
| "before executing queries, as this parses all fields under the hood. For example: \n" + | ||
| "df.cache()\n" + | ||
| s"""df.select("${parsedOptions.columnNameOfCorruptRecord}")""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about also improving this based on the one we changed in sql-programming-guide.md? Thanks!
|
Test build #81591 has finished for PR 18865 at commit
|
|
Test build #81603 has finished for PR 18865 at commit
|
|
cc @gatorsmile Please take another look when you have time. I've already updated. Thanks! |
|
Thanks! Merged to master. @jmchung Could you submit a follow-up PR for CSV? Thanks! |
|
@gatorsmile Sure, I'll make a follow-up PR for CSV. |
What changes were proposed in this pull request?
When the
requiredSchemaonly contains_corrupt_record, the derivedactualSchemais empty and the_corrupt_recordare all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.How was this patch tested?
Added test case.