Skip to content

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Oct 10, 2016

What changes were proposed in this pull request?

Add a flag to ignore corrupt files. For Spark core, the configuration is spark.files.ignoreCorruptFiles. For Spark SQL, it's spark.sql.files.ignoreCorruptFiles.

How was this patch tested?

The added unit tests

@zsxwing
Copy link
Member Author

zsxwing commented Oct 10, 2016

/cc @marmbrus

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66691 has finished for PR 15422 at commit f810937.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 11, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66698 has finished for PR 15422 at commit f810937.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Oct 11, 2016

Why would corrupt record cause EOFException to be thrown ?
EDIT: In the specific test, the entire file is corrupt. In general for gzip files, we can actually process partial file with suffix being unrecoverable.
In that case, it is not possible to proceed further anyway - so why is handling EOF here incorrect ?

@srowen
Copy link
Member

srowen commented Oct 11, 2016

If this happens it isn't clear that anything that was read is valid. It doesn't seem like something to ignore. Log, at least. I know people differ on this but I think continuing with partial and maybe corrupt read even with a warning seems more likely to cause tears.

@mridulm
Copy link
Contributor

mridulm commented Oct 11, 2016

@srowen The tuples already returned would have been valid, it is the subsequent block decompression which has failed. For example, in a 1gb file, the last few bytes missing (or corrupt) will cause the last block to be decompressed incorrectly - but all previous tuples already returned would be fine and valid.

The 'current' key/value resulted in exception.
Note that the returned value from the method is not used when finished = true (what is returned is ignored).

@marmbrus
Copy link
Contributor

I agree that the data that was already read is probably good. I also think that this is a pretty big behavior change where there are legitimate cases (i.e. tons of data and it is fine to miss some) where you'd only want a warning. Can we add a flag for failing on unexpected EOF? (probably set to true in master and false in branch-1.6).

@zsxwing
Copy link
Member Author

zsxwing commented Oct 11, 2016

@mridulm This fix just makes HadoopRDD consistent with NewHadoopRDD and the current behavior of Spark SQL in 2.0.

For 1.6, that's another story since Spark SQL uses HadoopRDD directly. However, note that this codes run in the executor side, which means logging a warning usually cannot be noticed by the user.

@mridulm
Copy link
Contributor

mridulm commented Oct 11, 2016

@zsxwing You are right, NewHadoopRDD is not handling this case.
Probably would be good to add exception handling there when nextKeyValue throws exception ?

Context is, for large jobs/data, it is not unexpected to see some data corruption at times. We dont want to throw out the entire job due to a few bad records.
For example, in MR you have the ability to even set the percentage of bad records you want to tolerate (we dont have that in spark).

@mridulm
Copy link
Contributor

mridulm commented Oct 11, 2016

@marmbrus +1 on logging, that is definitely something which was probably missed here.

@srowen
Copy link
Member

srowen commented Oct 11, 2016

@mridulm for the scenario you're imagining, maybe the data is OK, sure. That doesn't mean it's true in all cases. Yeah, this is really to work around bad input, which you can to some degree do at the user level. Other parts of Spark don't work this way. I'm neutral on whether this is a good idea at all, but would prefer consistency more than anything.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 11, 2016

For example, in MR you have the ability to even set the percentage of bad records you want to tolerate (we dont have that in spark).

I may be wrong. But in MR, I think bad records just means map or reduce throws an exception. It's not related to any IOException (including EOFExcpetion). (https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java#L1490)

@mridulm
Copy link
Contributor

mridulm commented Oct 11, 2016

@srowen Since this is happening 'below' the user code (in the hadoop rdd), is there a way around how to handle this ?
I agree that for a lot of usecases where it is critical to work off the entire data, we should abort rather than process incomplete (and corrupt) data; not sure if we want to customize ability to do this though.

EOFException being thrown instead of IOException is an implementation detail of the codec if I am not wrong (I dont think the contract specifies this) - @zsxwing can confirm though, I am not very familiar with that.

setting finished = true should probably be done not just for EOFException, but also for IOException.

I am fine with that as well, though this is a behavior change.

@zsxwing zsxwing changed the title [SPARK-17850][Core]HadoopRDD should not catch EOFException [SPARK-17850][Core]Add a flag to ignore corrupt files Oct 12, 2016
@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66776 has finished for PR 15422 at commit ef88a64.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

finished = true
} else {
throw e
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case e: IOException if ignoreCorruptFiles =>
would have been more concise.

"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why we are duplicating the parameter in sql namespace. Wont spark.files.ignoreCorruptFiles not do ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sql conf can appear in the following command:

sql("set -v").filter('key contains "files").show(truncate = false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, thanks for clarifying !

} else {
throw e
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing this too !

.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So either way we will have a behavioral change - if NewHadoopRDD vs HadoopRDD.
IMO that is fine, given that we are standardizing on the behavior and this is something which was a corner case anyway.

Setting default to false makes sense.

@mridulm
Copy link
Contributor

mridulm commented Oct 12, 2016

Merged - had issue with pip (new laptop, sigh), and so jira and pr did not get closed.

@asfgit asfgit closed this in 47776e7 Oct 12, 2016
@zsxwing
Copy link
Member Author

zsxwing commented Oct 12, 2016

I will work on a patch for 1.6.

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66839 has finished for PR 15422 at commit ceecc19.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing zsxwing deleted the SPARK-17850 branch October 12, 2016 22:26
@zsxwing
Copy link
Member Author

zsxwing commented Oct 12, 2016

PR for 1.6: #15454

@rxin
Copy link
Contributor

rxin commented Dec 7, 2016

@zsxwing shouldn't we at least log the exception?

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`.

## How was this patch tested?

The added unit tests

Author: Shixiong Zhu <[email protected]>

Closes apache#15422 from zsxwing/SPARK-17850.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants