-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23271[SQL] Parquet output contains only _SUCCESS file after writing an empty dataframe #20525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Update the title to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not legal to write an empty struct in parquet. Its explained by Herman in SPARK-20593. Previously, we didn't setup a write
task for this where as now with this fix we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra space before df1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thank you. Fixed.
|
Test build #87148 has finished for PR 20525 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like shuffle will be here if partitions number is zero. If so, maybe, other solution is possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea the shuffle can be avoided. We can just launch a write task for empty RDD, instead of calling rdd.repartition(1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @pashazm I was thinking, this would not be a regular event to write empty datasets , right ? Should we be even optimizing this path ? Secondly, is shuffling an empty data set that expensive ?
@cloud-fan, actually i had tried to launch a write task for empty RDD, but was hitting a NullPointerException from scheduler ? Looks like things are setup to only work off of partitions of RDD. Could we try to create this empty metadata file from the driver in this case ? If we go that route, then we may have to refactor the write task code. Seems like a lot for this little corner case, what do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could try coalesce(1) that should not shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thanks. I have a question. Can we go from zero partition to one partition with coalesce() ? In the code we seem to be doing a min(prevPartition, requestedPartition) to set the target number of partition code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Just tried. We stay at numPartitions = 0 after coalesce(). So it does not fix the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One simple way to fix it: create an empty 1-partition RDD and use it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, you can have
sparkSession.sparkContext.parallelize(Array.empty[InternalRow])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @jiangxb1987 Thanks a LOT. This works perfectly.
|
Test build #87149 has finished for PR 20525 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is specific to parquet, can we have this ParquetFileFormatSuite instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Thank you. Let me check if we have similar issue for orc. If not, i will move it to ParquetFileFormatSuite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dilipbiswal .
I checked with ORC, too. Your patch works for ORC too. I mean keeping schema although it create a file.
In this suite, can you extend the test case for ORC too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @dongjoon-hyun. You are super quick :-). Yes, i will add the test case for ORC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, FileBasedDataSourceSuite may be more suitable. It has a similar test case. You can add your test case there in a similar manner.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala#L59-L73
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Sure. Will take a look. Thanks !!
|
Test build #87182 has finished for PR 20525 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an easier to create an empty dataframe: spark.emptyDataFrame.select(lit(1).as("i"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about val rddWithNonEmptyPartitions ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
Test build #87193 has finished for PR 20525 at commit
|
|
Test build #87195 has finished for PR 20525 at commit
|
|
Test build #87200 has finished for PR 20525 at commit
|
|
retest this please |
|
BTW, this is a behavior change. We need to document it in the migration guide. |
|
Test build #87204 has finished for PR 20525 at commit
|
|
retest this please |
|
Test build #87211 has finished for PR 20525 at commit
|
|
@gatorsmile Thanks. I will create a doc pr and address it. |
|
I think it's better to have the doc change in the same PR, then it's more clear which patch caused the behavior change. |
|
@cloud-fan Actually i had already created the doc pr in the morning using the same JIRA number. Whenchen, if we want to have both the changes in the same commit , will we be able to do it when we merge the patch ? If not, pl let me know , i will close that PR and move over the change to this branch. |
|
no we can't merge 2 PRs together. Please pick one of your PRs and put all the changes there, thanks! |
|
@cloud-fan @gatorsmile Done. |
docs/sql-programming-guide.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Spark 2.3, writing an empty dataframe to a directory launches at least one write task, even physically the dataframe has no partition. This introduces a small behavior change that for self-described file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even -> even if ?
self-described -> self-describing ?
@cloud-fan Nicely written. Thanks. Let me know if you are ok with the above two change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea the above 2 changes are good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"launches at least one write task"
Actually isn't it exactly one write task ? I am okay with what you have. Just wanted to check to make sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it fail? If it's a runtime error we should fail earlier during analysis. This worth a new JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I forgot :-) I will double check and get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan It fails in the executor like this -
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message spark_schema {
}
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:225)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:376)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:387)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:278)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:276)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:281)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:206)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me study the code to see how we can fail earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's open a JIRA. We can fix it in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan OK Wenchen. Created SPARK-23372 - FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rddWithNonEmptyPartitions.partitions.indices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the change. Learnt a new trick today :-)
|
LGTM |
1 similar comment
|
LGTM |
|
Test build #88072 has finished for PR 20525 at commit
|
|
Test build #88078 has finished for PR 20525 at commit
|
…an empty dataframe
|
Test build #88099 has finished for PR 20525 at commit
|
|
thanks, merging to master! |
|
@cloud-fan @jiangxb1987 Thank you very much !! |
|
late LGTM too. |
What changes were proposed in this pull request?
Below are the two cases.
When we write the above data frame as parquet, we create a parquet file containing
just the schema of the data frame.
Case 2
For the 2nd case, since number of partitions = 0, we don't call the write task (the task has logic to create the empty metadata only parquet file)
The fix is to create a dummy single partition RDD and set up the write task based on it to ensure
the metadata-only file.
How was this patch tested?
A new test is added to DataframeReaderWriterSuite.