-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25243][SQL] Use FailureSafeParser in from_json #22237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95266 has finished for PR 22237 at commit
|
|
jenkins, retest this, please |
|
Test build #95267 has finished for PR 22237 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep using previous default mode FailFastMode? Now default mode becomes PermissiveMode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous settings of FailFastMode didn't impact on the behavior because the mode option wasn't handled at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not handled by JacksonParser, and the behavior in here is somehow similar to PermissiveMode as @HyukjinKwon pointed out at https://github.com/apache/spark/pull/22237/files#r212850156, but not exactly the same.
Seems now the PermissiveMode on FailureSafeParser has different result on corrupted records. I noticed that some existing tests maybe changed due to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it work for DROPMALFORMED mode? This doesn't actually drop the record like JSON datasource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DROPMALFORMED mode returns null for malformed JSON lines. User can filter them out later. @HyukjinKwon Do you know how to drop rows in UnaryExpressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, only possibility I raised was to make it generator expression. I haven't proposed a parse mode for this reason so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsonToStructs resembles PERMISSIVE mode (from the first place) although their behaviours are slightly different. This is going to be different with PERMISSIVE and also FAILFAST modes. They are actually behaviour changes if we just use PERMISSIVE mode here by default (as @viirya pointed out).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Behavior of JsonToStructs is pretty close to PERMISSIVE actually. I have to make just a few small changes in tests that checks processing malformed inputs.
|
Test build #95269 has finished for PR 22237 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get)
val rawParser = new JacksonParser(nullableSchema, parsedOptions)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord,
parsedOptions.multiLine)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move actualSchema and resultRow into if (corruptFieldIndex.isDefined) { inside?
|
I think one thing we could do this for now is, only to support both FAILFAST and PERMISSIVE mode and throws an exception otherwise, to match the current behaviour to PERMISSIVE mode, explain that in the migration guide. |
@HyukjinKwon Should I target to Spark 3.0 or 2.4? |
|
If we can finish it before the code freeze, it will be 2.4; otherwise it is 3.0 |
|
Test build #95378 has finished for PR 22237 at commit
|
|
Test build #95436 has finished for PR 22237 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this verification into the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we use AnalysisException instead of require?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't put require to the constructor body directly because of timeZoneId. If I move the checking up, I need to move val parsedOptions = new JSONOptions(options, timeZoneId.get) too (lazy or not lazy). Checking will force getting of timeZoneId.get which will raise an exception. I will check this today or tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema -> dataType?
|
Test build #95464 has finished for PR 22237 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix the code to throw an analysis exception in analysis phases instead of execution phases (.collect() called)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced it by AnalysisException but I think it is wrong decision. Throwing of AnalysisException at run-time looks ugly:
Caused by: org.apache.spark.sql.AnalysisException: from_json() doesn't support the DROPMALFORMED mode. Acceptable modes are PERMISSIVE and FAILFAST.;
at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser$lzycompute(jsonExpressions.scala:568)
at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser(jsonExpressions.scala:564)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I am going to replace it by something else or revert back to IllegalArgumentException.
docs/sql-programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: from_json -> `from_json`.
|
I agree with the current approach but wanna make sure if we want this in 2.4.0 or 3.0.0 since there's no way to keep the previous behaviour and code freeze is super close. I actually prefer to go ahead in 3.0.0. @gatorsmile and @cloud-fan, WDYT? I think this will likely break existing user apps. |
|
Test build #95532 has finished for PR 22237 at commit
|
|
Test build #95541 has finished for PR 22237 at commit
|
|
Test build #95760 has finished for PR 22237 at commit
|
|
@HyukjinKwon I re-targeted the changes for Spark 3.0. Please, take a look at it one more time. |
|
retest this please |
|
Will take a look soon. |
@HyukjinKwon Thank you. Waiting for your feedback. |
|
Test build #95867 has finished for PR 22237 at commit
|
|
Test build #95896 has finished for PR 22237 at commit
|
|
@HyukjinKwon Please, take a look at it again. |
…has more than 1 element for struct schema
d91f34f to
b2988c7
Compare
|
https://github.com/apache/spark/pull/22237/files#r223707899 makes sense to me. Addressed. LGTM from my side as well |
|
LGTM, pending jenkins. |
|
Test build #97958 has finished for PR 22237 at commit
|
|
retest this please |
|
Test build #97966 has finished for PR 22237 at commit
|
|
thanks, merging to master! |
|
Thanks all!! |
|
@HyukjinKwon Thank you for following up work on the PR. @cloud-fan @viirya @maropu Thanks for your reviews. |
| schema2 <- structType(structField("date", "date")) | ||
| s <- collect(select(df, from_json(df$col, schema2))) | ||
| expect_equal(s[[1]][[1]], NA) | ||
| expect_equal(s[[1]][[1]]$date, NA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason we made this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean this particular line or in general?
This line was changed because in the PERMISSIVE mode we usually return a Row with null fields that we wasn't able to parse instead of just null for whole row.
In general, to support the PERMISSIVE and FAILFAST modes as for JSON datasource. Before the changes from_json didn't support any modes and the columnNameOfCorruptRecord option in particular.
## What changes were proposed in this pull request? In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`. ## How was this patch tested? It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes. Closes apache#22237 from MaxGekk/from_json-failuresafe. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: hyukjinkwon <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to switch
from_jsononFailureSafeParser, and to make the function compatible toPERMISSIVEmode by default, and to support theFAILFASTmode as well. TheDROPMALFORMEDmode is not supported byfrom_json.How was this patch tested?
It was tested by existing
JsonSuite/CSVSuite,JsonFunctionsSuiteandJsonExpressionsSuiteas well as new tests forfrom_jsonwhich checks different modes.