-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources #23052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #98887 has finished for PR 23052 at commit
|
|
@MaxGekk, actually this is kind of important behaviour change. This basically means we're unable to read the empty files back. Similar changes were proposed in Parquet few years ago (by me) and reverted. We should better investigate and match the behaviours first across datasources. IIRC, ORC does not create files (if that's not updated from what I have checked long ago) but Parquet does. |
|
|
||
| private val gen = new UnivocityGenerator(dataSchema, writer, params) | ||
| override def write(row: InternalRow): Unit = { | ||
| val gen = univocityGenerator.getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, one thing we should not forget about is, CSV could have headers even if the records are empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it is fine to write only headers if an user wants to have them. Filtering the header out on this level could be slightly difficult.
What was the main reason to revert it? If it is possible could you give me a link to your PR. |
|
Which should be ... this #12855 |
|
One try to add some tests for reading/writing empty dataframes was here #13253 fyi |
|
related another try #13252 |
|
I think now it should be good timing to match the behaviours. |
|
Test build #99107 has finished for PR 23052 at commit
|
|
I have read the tickets you pointed out but haven't found what could potentially block the changes. One of corner cases is saving an empty dataframe. In this case, no files would be written, but this is ok for text-based datasources because , in any case, we cannot restore the schema fully from empty files (comparing to parquet files where we can). So, a schema must be provided by an user. |
|
@MaxGekk I didn't mean to block this PR. Since we're going ahead for 3.0, it should be good to match and fix the behaviours across data sources. For instance, CSV should still be able to read the header. Shall we clarify each data sources behaviour? |
|
Also, it's not always for Parquet to write empty files. That does not write empty files when data frames are created from emptyRDD (the one pointed out in the PR link I gave). We should match this behaviour as well. |
|
cc @cloud-fan as well |
|
First of all, sometimes we do need to write "empty" files, so that we can infer schema of a parquet directory. Empty parquet file is not really empty, as it has header/footer. #20525 guarantees we always write out at least one empty file. One important thing is, when we write out an empty dataframe to file, and read it back, it should still be an empty dataframe. I'd suggest we skip reading empty file in text-based data sources, and later on send a followup PR to not write empty text files, as a perf improvement. |
|
There are two more things to deal with: #23052 (comment) comment will still be valid - at least it should be double checked because dataframes originated from emptyRDD does not write anything all times. One thing is CSV for text-based datasources because it can write out headers. Thing is, the header is currently written when the first row is written. I think the headers should be written too because it's kind of schema. This is what #13252 PR targeted before. I closed this because there's no interests but we should fix. (Note that we can only read the header by header=true and inferSchema=false). |
|
Test build #99221 has finished for PR 23052 at commit
|
| params: CSVOptions) extends OutputWriter with Logging { | ||
|
|
||
| private val charset = Charset.forName(params.charset) | ||
| private var univocityGenerator: Option[UnivocityGenerator] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a race condition below then where multiple generators can be created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have not observe any race conditions so far. Instances of UnivocityGenerator are created per-tasks as well as OutputStreamWriters. They share instances of schema and CSVOptions but we do not modify them while writing. Inside of each UnivocityGenerator, we create an instance of CsvWriter but I almost absolutely sure they do not share anything internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean that it would cause an error, but that it could create many generators and writers that aren't closed. It may not be obvious that it's happening. Unless we know writes will only happen in one thread what about breaking out and synchronizing the get/create part of this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... but that it could create many generators and writers that aren't closed.
Writers/generators are created inside of tasks:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
Lines 228 to 256 in ab1650d
| val dataWriter = | |
| if (sparkPartitionId != 0 && !iterator.hasNext) { | |
| // In case of empty job, leave first partition to save meta for file format like parquet. | |
| new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) | |
| } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { | |
| new SingleDirectoryDataWriter(description, taskAttemptContext, committer) | |
| } else { | |
| new DynamicPartitionDataWriter(description, taskAttemptContext, committer) | |
| } | |
| try { | |
| Utils.tryWithSafeFinallyAndFailureCallbacks(block = { | |
| // Execute the task to write rows out and commit the task. | |
| while (iterator.hasNext) { | |
| dataWriter.write(iterator.next()) | |
| } | |
| dataWriter.commit() | |
| })(catchBlock = { | |
| // If there is an error, abort the task | |
| dataWriter.abort() | |
| logError(s"Job $jobId aborted.") | |
| }) | |
| } catch { | |
| case e: FetchFailedException => | |
| throw e | |
| case t: Throwable => | |
| throw new SparkException("Task failed while writing rows.", t) | |
| } | |
| } |
dataWriter.commit() and dataWriter.abort() close writers/generators. So, number of not closed generators is less or equal to the size of the task thread pool on executors at any moment.
Unless we know writes will only happen in one thread ...
According to comments below, this is our assumption:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Lines 33 to 37 in e816776
| /** | |
| * Abstract class for writing out data in a single Spark task. | |
| * Exceptions thrown by the implementation of this trait will automatically trigger task aborts. | |
| */ | |
| abstract class FileFormatDataWriter( |
|
retest this please |
|
Test build #99354 has finished for PR 23052 at commit
|
|
retest this please |
|
Test build #99361 has finished for PR 23052 at commit
|
|
seems like a real failure |
I am looking at it. It seems the test is not deterministic. |
|
Actually it needs similar changes like in #23130 |
|
Test build #99372 has finished for PR 23052 at commit
|
|
jenkins, retest this, please |
|
Test build #99380 has finished for PR 23052 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
Outdated
Show resolved
Hide resolved
|
Test build #99407 has finished for PR 23052 at commit
|
|
Merged to master |
…atasources ## What changes were proposed in this pull request? In the PR, I propose to postpone creation of `OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location. ## How was this patch tested? Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files. Closes apache#23052 from MaxGekk/text-empty-files. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Maxim Gekk <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? This reverts commit 31c4fab (#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file. This also reverts part of commit 0d3d46d (#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2. ### Why are the changes needed? After the changes in SPARK-26081 (commit 31c4fab / #23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created. SPARK-29999 (commit 0d3d46d / #26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Jenkins build will follow. Closes #26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? This reverts commit 31c4fab (apache#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file. This also reverts part of commit 0d3d46d (apache#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2. ### Why are the changes needed? After the changes in SPARK-26081 (commit 31c4fab / apache#23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created. SPARK-29999 (commit 0d3d46d / apache#26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Jenkins build will follow. Closes apache#26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to postpone creation of
OutputStream/Univocity/JacksonGeneratortill the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location.How was this patch tested?
Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files.