Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Nov 22, 2019

What changes were proposed in this pull request?

This patch checks the existence of output file for each task while committing the task, so that it doesn't throw FileNotFoundException while creating SinkFileStatus. The check is newly required for DSv2 implementation of FileStreamSink, as it is changed to create the output file lazily (as an improvement).

JSON writer for example:

override def write(row: InternalRow): Unit = {
val gen = jacksonGenerator.getOrElse {
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
// create the Generator without separator inserted between 2 records
val newGen = new JacksonGenerator(dataSchema, os, options)
jacksonGenerator = Some(newGen)
newGen
}
gen.write(row)
gen.writeLineEnding()
}

Why are the changes needed?

Without this patch, FileStreamSink throws FileNotFoundException when writing empty partition.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UT.

@HeartSaVioR
Copy link
Contributor Author

cc. @gengliangwang as he led DSv2 implementation for FileDataSource

@SparkQA
Copy link

SparkQA commented Nov 22, 2019

Test build #114293 has finished for PR 26639 at commit fd276b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master.
Thank you so much, @HeartSaVioR !

@dongjoon-hyun
Copy link
Member

cc @zsxwing and @gatorsmile

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-29999 branch November 25, 2019 00:14
@gatorsmile
Copy link
Member

This only affect DSV2? How about DSV1?

@gengliangwang
Copy link
Member

I don't think it is DSV2 only.
It should be caused by this PR: #23052
Checking file existence in each task commit is kind of expensive. We can have a better fix.

@dongjoon-hyun
Copy link
Member

Please create a follow-up for DSv1 with the similar test coverage~

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 25, 2019

@gengliangwang Thanks for the pointer on root issue. Missed it. Based on the root issue, this issue wouldn't only reside on DSv2.

In fact, the fix takes the easiest and non-intrusive approach since the issue is only found from streaming sink. There's another possible fix if we are not happy with that; let commitTask (optionally) receives actual list of files being written instead of dealing with its own tracking list. That would change the interface of FileCommitProtocol. (FileFormatDataWriter.commit() can pass the list as it tracks with statsTrackers, but SparkHadoopWriter doesn't do it, so the list would need to be optional, and we still need to check the existence if there's no hint.)

Would it make sense? I feel the change is bigger than just a follow-up issue, as we would change the interface.

@gatorsmile That's not only DSv2 issue but also DSv1 issue; I guess we'd be better to move the new UT to FileStreamSinkSuite so that the UT covers both. WDYT? If it works for you, I'll craft a followup PR. Thanks!

@HeartSaVioR
Copy link
Contributor Author

@dongjoon-hyun Sorry I saw your comment just after commenting mine. I'll craft a follow-up PR to move the UT to base class so that it could test both DSv1 and DSv2. Thanks!

@zsxwing
Copy link
Member

zsxwing commented Nov 26, 2019

I think the empty partition optimization seems overkill if every task needs to check file existence for each file. We should fix the root issue and revert this file sink fix. The unit test is great and it's worth to keep it to avoid regressions like this.

@dongjoon-hyun
Copy link
Member

I'm also fine with the reverting #23052 if we keep these unit tests like @zsxwing 's advice.

The unit test is great and it's worth to keep it to avoid regressions like this.

@HeartSaVioR
Copy link
Contributor Author

I'm OK with reverting both #23052 and this, and let me come up with another PR (maybe minor PR or reopening the JIRA issue and change the description) which only contains the UT.

I guess it's still viable to try to fix the root issue, like the proposal I proposed in #26639 (comment) as @zsxwing 's comment applies only when #23052 requires this patch. But for completeness, even we would want to keep #23052, the ideal step would be 1) reverting both -> 2) adding UT -> 3) submitting revised version of #23052 which contains a good approach for streaming sink as well.

@gengliangwang
Copy link
Member

@HeartSaVioR Yes please revert both and keep the UT in this PR(maybe more UTs).

@HeartSaVioR
Copy link
Contributor Author

Could I get some guidance here for revert? Because I have seen both approaches in community for reverting: 1) PR (I can also handle) 2) direct (I can't handle, only possible from committers) and given we would like to revert two issues which I couldn't find some reference. If we prefer reverting via PR, would it be OK to have a PR which contain two revert commits? (title would be a bit longer to contain both)

@gengliangwang
Copy link
Member

Yes, I think a PR containing two revert commits is OK

gengliangwang pushed a commit that referenced this pull request Nov 27, 2019
### What changes were proposed in this pull request?

This reverts commit 31c4fab (#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file.

This also reverts part of commit 0d3d46d (#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2.

### Why are the changes needed?

After the changes in SPARK-26081 (commit 31c4fab / #23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created.

SPARK-29999 (commit 0d3d46d / #26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Jenkins build will follow.

Closes #26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Dec 6, 2019
### What changes were proposed in this pull request?

This reverts commit 31c4fab (apache#23052) to make sure the partition calling `ManifestFileCommitProtocol.newTaskTempFile` creates actual file.

This also reverts part of commit 0d3d46d (apache#26639) since the commit fixes the issue raised from 31c4fab and we're reverting back. The reason of partial revert is that we found the UT be worth to keep as it is, preventing regression - given the UT can detect the issue on empty partition -> no actual file. This makes one more change to UT; moved intentionally to test both DSv1 and DSv2.

### Why are the changes needed?

After the changes in SPARK-26081 (commit 31c4fab / apache#23052), CSV/JSON/TEXT don't create actual file if the partition is empty. This optimization causes a problem in `ManifestFileCommitProtocol`: the API `newTaskTempFile` is called without actual file creation. Then `fs.getFileStatus` throws `FileNotFoundException` since the file is not created.

SPARK-29999 (commit 0d3d46d / apache#26639) fixes the problem. But it is too costly to check file existence on each task commit. We should simply restore the behavior before SPARK-26081.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Jenkins build will follow.

Closes apache#26671 from HeartSaVioR/revert-SPARK-26081-SPARK-29999.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants