Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jan 16, 2020

What changes were proposed in this pull request?

In the PR, I propose to fix the bug reported in SPARK-30530. CSV datasource returns invalid records in the case when parsedSchema is shorter than number of tokens returned by UniVocity parser. In the case UnivocityParser.convert() always throws BadRecordException independently from the result of applying filters.

For the described case, I propose to save the exception in badRecordException and continue value conversion according to parsedSchema. If a bad record doesn't pass filters, convert() returns empty Seq otherwise throws badRecordException.

Why are the changes needed?

It fixes the bug reported in the JIRA ticket.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new test from the JIRA ticket.

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116867 has finished for PR 27239 at commit fd48094.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

while (i < requiredSchema.length) {
try {
if (!skipRow) {
row(i) = valueConverters(i).apply(getToken(tokens, i))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the first column is corrupted, and the predicate is first_col is null, what will happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 cases:

  1. Univocity parser is not able to parse its input. For example, it faced to wrong Unicode symbol. In that case, it return null in tokens, and BadRecordException will be raised here
    throw BadRecordException(
    () => getCurrentInput,
    () => None,
    new RuntimeException("Malformed CSV record"))
  2. Univocity parser returns null in the first token. In this case, we will try to convert null to desired type according to requiredSchema. Most likely, the conversion raises an exception which is will be converted BadRecordException here
    badRecordException = badRecordException.orElse(Some(e))
    and here
    2.1 If conversion doesn't fail, the is null filter will be applied to the value and row could be passed to upper layer.
  3. Univocity parser returns a valid string at index 0 in tokens but conversion fails at
    row(i) = valueConverters(i).apply(getToken(tokens, i))
    with some exception. Similar situation to 2. The exception will be handled, and transformed to BadRecordException.

New implementation with filters pushdown does not change the behavior in those cases.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d4c6ec6 Jan 19, 2020
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
val checkedTokens = if (parsedSchema.length > tokens.length) {
checkedTokens = if (parsedSchema.length > tokens.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this checkedTokens now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not. The if can be replaced by:

    var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
      // If the number of tokens doesn't match the schema, we should treat it as a malformed record.
      Some(new RuntimeException("Malformed CSV record"))
    } else None

Let me do that in a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I already did at #27287. Let me address this comment there.

var skipRow = false
while (i < requiredSchema.length) {
try {
if (!skipRow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (skipRow) {
  row.setNullAt(i)
} else {
  row(i) = valueConverters(i).apply(getToken(tokens, i))
  if (csvFilters.skipRow(row, i)) {
    skipRow = true
  }
}

row.setNullAt(i)
}
} catch {
case NonFatal(e) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we rely on nulls already exiting in the array. Now we rely on java.lang.ArrayIndexOutOfBoundsException. I don't particularly like this approach .. but I'm good as it does simplify the codes.

@MaxGekk MaxGekk deleted the spark-30530-csv-filter-is-null branch June 5, 2020 19:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants