Skip to content

Commit ad29290

Browse files
Hisoka-XMaxGekk
authored andcommitted
[SPARK-44079][SQL][3.4] Fix ArrayIndexOutOfBoundsException when parse array as struct using PERMISSIVE mode with corrupt record
### What changes were proposed in this pull request? cherry pick #41662 , fix parse array as struct bug on branch 3.4 ### Why are the changes needed? Fix the bug when parse array as struct using PERMISSIVE mode with corrupt record ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #41784 from Hisoka-X/SPARK-44079_3.4_cherry_pick. Authored-by: Jia Fan <[email protected]> Signed-off-by: Max Gekk <[email protected]>
1 parent 0304382 commit ad29290

File tree

5 files changed

+51
-11
lines changed

5 files changed

+51
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ class UnivocityParser(
318318
if (tokens == null) {
319319
throw BadRecordException(
320320
() => getCurrentInput,
321-
() => None,
321+
() => Array.empty,
322322
QueryExecutionErrors.malformedCSVRecordError(""))
323323
}
324324

@@ -362,7 +362,7 @@ class UnivocityParser(
362362
} else {
363363
if (badRecordException.isDefined) {
364364
throw BadRecordException(
365-
() => currentInput, () => requiredRow.headOption, badRecordException.get)
365+
() => currentInput, () => Array(requiredRow.get), badRecordException.get)
366366
} else {
367367
requiredRow
368368
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class JacksonParser(
135135
// List([str_a_2,null], [null,str_b_3])
136136
//
137137
case START_ARRAY if allowArrayAsStructs =>
138-
val array = convertArray(parser, elementConverter, isRoot = true)
138+
val array = convertArray(parser, elementConverter, isRoot = true, arrayAsStructs = true)
139139
// Here, as we support reading top level JSON arrays and take every element
140140
// in such an array as a row, this case is possible.
141141
if (array.numElements() == 0) {
@@ -517,7 +517,8 @@ class JacksonParser(
517517
private def convertArray(
518518
parser: JsonParser,
519519
fieldConverter: ValueConverter,
520-
isRoot: Boolean = false): ArrayData = {
520+
isRoot: Boolean = false,
521+
arrayAsStructs: Boolean = false): ArrayData = {
521522
val values = ArrayBuffer.empty[Any]
522523
var badRecordException: Option[Throwable] = None
523524

@@ -537,6 +538,9 @@ class JacksonParser(
537538

538539
if (badRecordException.isEmpty) {
539540
arrayData
541+
} else if (arrayAsStructs) {
542+
throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
543+
badRecordException.get)
540544
} else {
541545
throw PartialResultException(InternalRow(arrayData), badRecordException.get)
542546
}
@@ -570,19 +574,25 @@ class JacksonParser(
570574
// JSON parser currently doesn't support partial results for corrupted records.
571575
// For such records, all fields other than the field configured by
572576
// `columnNameOfCorruptRecord` are set to `null`.
573-
throw BadRecordException(() => recordLiteral(record), () => None, e)
577+
throw BadRecordException(() => recordLiteral(record), () => Array.empty, e)
574578
case e: CharConversionException if options.encoding.isEmpty =>
575579
val msg =
576580
"""JSON parser cannot handle a character in its input.
577581
|Specifying encoding as an input option explicitly might help to resolve the issue.
578582
|""".stripMargin + e.getMessage
579583
val wrappedCharException = new CharConversionException(msg)
580584
wrappedCharException.initCause(e)
581-
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
585+
throw BadRecordException(() => recordLiteral(record), () => Array.empty,
586+
wrappedCharException)
582587
case PartialResultException(row, cause) =>
583588
throw BadRecordException(
584589
record = () => recordLiteral(record),
585-
partialResult = () => Some(row),
590+
partialResults = () => Array(row),
591+
cause)
592+
case PartialResultArrayException(rows, cause) =>
593+
throw BadRecordException(
594+
record = () => recordLiteral(record),
595+
partialResults = () => rows,
586596
cause)
587597
}
588598
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,24 @@ case class PartialResultException(
3030
cause: Throwable)
3131
extends Exception(cause)
3232

33+
/**
34+
* Exception thrown when the underlying parser returns partial result list of parsing.
35+
* @param partialResults the partial result list of parsing bad records.
36+
* @param cause the actual exception about why the parser cannot return full result.
37+
*/
38+
case class PartialResultArrayException(
39+
partialResults: Array[InternalRow],
40+
cause: Throwable)
41+
extends Exception(cause)
42+
3343
/**
3444
* Exception thrown when the underlying parser meet a bad record and can't parse it.
3545
* @param record a function to return the record that cause the parser to fail
36-
* @param partialResult a function that returns an optional row, which is the partial result of
46+
* @param partialResults a function that returns an row array, which is the partial results of
3747
* parsing this bad record.
3848
* @param cause the actual exception about why the record is bad and can't be parsed.
3949
*/
4050
case class BadRecordException(
4151
@transient record: () => UTF8String,
42-
@transient partialResult: () => Option[InternalRow],
52+
@transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
4353
cause: Throwable) extends Exception(cause)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,17 @@ class FailureSafeParser[IN](
6161
} catch {
6262
case e: BadRecordException => mode match {
6363
case PermissiveMode =>
64-
Iterator(toResultRow(e.partialResult(), e.record))
64+
val partialResults = e.partialResults()
65+
if (partialResults.nonEmpty) {
66+
partialResults.iterator.map(row => toResultRow(Some(row), e.record))
67+
} else {
68+
Iterator(toResultRow(None, e.record))
69+
}
6570
case DropMalformedMode =>
6671
Iterator.empty
6772
case FailFastMode =>
6873
throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
69-
toResultRow(e.partialResult(), e.record).toString, e)
74+
toResultRow(e.partialResults().headOption, e.record).toString, e)
7075
}
7176
}
7277
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3225,6 +3225,21 @@ abstract class JsonSuite
32253225
Row(null) :: Nil)
32263226
}
32273227

3228+
test("SPARK-44079: fix incorrect result when parse array as struct " +
3229+
"using PERMISSIVE mode with corrupt record") {
3230+
val data = """[{"a": "incorrect", "b": "correct"}, {"a": "incorrect", "b": "correct"}]"""
3231+
val schema = new StructType(Array(StructField("a", IntegerType),
3232+
StructField("b", StringType), StructField("_corrupt_record", StringType)))
3233+
3234+
val result = spark.read
3235+
.option("mode", "PERMISSIVE")
3236+
.option("multiline", "true")
3237+
.schema(schema)
3238+
.json(Seq(data).toDS())
3239+
3240+
checkAnswer(result, Seq(Row(null, "correct", data), Row(null, "correct", data)))
3241+
}
3242+
32283243
test("SPARK-36536: use casting when datetime pattern is not set") {
32293244
withSQLConf(
32303245
SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",

0 commit comments

Comments
 (0)