Skip to content

Commit 4fa1a43

Browse files
HyukjinKwoncloud-fan
authored andcommitted
[SPARK-19641][SQL] JSON schema inference in DROPMALFORMED mode produces incorrect schema for non-array/object JSONs
## What changes were proposed in this pull request? Currently, when we infer the types for vaild JSON strings but object or array, we are producing empty schemas regardless of parse modes as below: ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` ```scala scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root ``` This PR proposes to handle parse modes in type inference. After this PR, ```scala scala> spark.read.option("mode", "DROPMALFORMED").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() root |-- a: long (nullable = true) ``` ``` scala> spark.read.option("mode", "FAILFAST").json(Seq("""{"a": 1}""", """"a"""").toDS).printSchema() java.lang.RuntimeException: Failed to infer a common schema. Struct types are expected but string was found. ``` This PR is based on NathanHowell@e233fd0 and I and NathanHowell talked about this in https://issues.apache.org/jira/browse/SPARK-19641 ## How was this patch tested? Unit tests in `JsonSuite` for both `DROPMALFORMED` and `FAILFAST` modes. Author: hyukjinkwon <[email protected]> Closes #17492 from HyukjinKwon/SPARK-19641.
1 parent 4d28e84 commit 4fa1a43

File tree

2 files changed

+78
-33
lines changed

2 files changed

+78
-33
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2626
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
2727
import org.apache.spark.sql.catalyst.json.JSONOptions
28-
import org.apache.spark.sql.catalyst.util.PermissiveMode
28+
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
2929
import org.apache.spark.sql.types._
3030
import org.apache.spark.util.Utils
3131

@@ -41,7 +41,7 @@ private[sql] object JsonInferSchema {
4141
json: RDD[T],
4242
configOptions: JSONOptions,
4343
createParser: (JsonFactory, T) => JsonParser): StructType = {
44-
val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode
44+
val parseMode = configOptions.parseMode
4545
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord
4646

4747
// perform schema inference on each row and merge afterwards
@@ -55,20 +55,24 @@ private[sql] object JsonInferSchema {
5555
Some(inferField(parser, configOptions))
5656
}
5757
} catch {
58-
case _: JsonParseException if shouldHandleCorruptRecord =>
59-
Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
60-
case _: JsonParseException =>
61-
None
58+
case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match {
59+
case PermissiveMode =>
60+
Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
61+
case DropMalformedMode =>
62+
None
63+
case FailFastMode =>
64+
throw e
65+
}
6266
}
6367
}
64-
}.fold(StructType(Seq()))(
65-
compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord))
68+
}.fold(StructType(Nil))(
69+
compatibleRootType(columnNameOfCorruptRecord, parseMode))
6670

6771
canonicalizeType(rootType) match {
6872
case Some(st: StructType) => st
6973
case _ =>
7074
// canonicalizeType erases all empty structs, including the only one we want to keep
71-
StructType(Seq())
75+
StructType(Nil)
7276
}
7377
}
7478

@@ -202,41 +206,54 @@ private[sql] object JsonInferSchema {
202206

203207
private def withCorruptField(
204208
struct: StructType,
205-
columnNameOfCorruptRecords: String): StructType = {
206-
if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
207-
// If this given struct does not have a column used for corrupt records,
208-
// add this field.
209-
val newFields: Array[StructField] =
210-
StructField(columnNameOfCorruptRecords, StringType, nullable = true) +: struct.fields
211-
// Note: other code relies on this sorting for correctness, so don't remove it!
212-
java.util.Arrays.sort(newFields, structFieldComparator)
213-
StructType(newFields)
214-
} else {
215-
// Otherwise, just return this struct.
209+
other: DataType,
210+
columnNameOfCorruptRecords: String,
211+
parseMode: ParseMode) = parseMode match {
212+
case PermissiveMode =>
213+
// If we see any other data type at the root level, we get records that cannot be
214+
// parsed. So, we use the struct as the data type and add the corrupt field to the schema.
215+
if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
216+
// If this given struct does not have a column used for corrupt records,
217+
// add this field.
218+
val newFields: Array[StructField] =
219+
StructField(columnNameOfCorruptRecords, StringType, nullable = true) +: struct.fields
220+
// Note: other code relies on this sorting for correctness, so don't remove it!
221+
java.util.Arrays.sort(newFields, structFieldComparator)
222+
StructType(newFields)
223+
} else {
224+
// Otherwise, just return this struct.
225+
struct
226+
}
227+
228+
case DropMalformedMode =>
229+
// If corrupt record handling is disabled we retain the valid schema and discard the other.
216230
struct
217-
}
231+
232+
case FailFastMode =>
233+
// If `other` is not struct type, consider it as malformed one and throws an exception.
234+
throw new RuntimeException("Failed to infer a common schema. Struct types are expected" +
235+
s" but ${other.catalogString} was found.")
218236
}
219237

220238
/**
221239
* Remove top-level ArrayType wrappers and merge the remaining schemas
222240
*/
223241
private def compatibleRootType(
224242
columnNameOfCorruptRecords: String,
225-
shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = {
243+
parseMode: ParseMode): (DataType, DataType) => DataType = {
226244
// Since we support array of json objects at the top level,
227245
// we need to check the element type and find the root level data type.
228246
case (ArrayType(ty1, _), ty2) =>
229-
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
247+
compatibleRootType(columnNameOfCorruptRecords, parseMode)(ty1, ty2)
230248
case (ty1, ArrayType(ty2, _)) =>
231-
compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
232-
// If we see any other data type at the root level, we get records that cannot be
233-
// parsed. So, we use the struct as the data type and add the corrupt field to the schema.
249+
compatibleRootType(columnNameOfCorruptRecords, parseMode)(ty1, ty2)
250+
// Discard null/empty documents
234251
case (struct: StructType, NullType) => struct
235252
case (NullType, struct: StructType) => struct
236-
case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
237-
withCorruptField(struct, columnNameOfCorruptRecords)
238-
case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
239-
withCorruptField(struct, columnNameOfCorruptRecords)
253+
case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
254+
withCorruptField(struct, o, columnNameOfCorruptRecords, parseMode)
255+
case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
256+
withCorruptField(struct, o, columnNameOfCorruptRecords, parseMode)
240257
// If we get anything else, we call compatibleType.
241258
// Usually, when we reach here, ty1 and ty2 are two StructTypes.
242259
case (ty1, ty2) => compatibleType(ty1, ty2)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
10411041
spark.read
10421042
.option("mode", "FAILFAST")
10431043
.json(corruptRecords)
1044-
.collect()
10451044
}
10461045
assert(exceptionOne.getMessage.contains("JsonParseException"))
10471046

@@ -1082,6 +1081,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
10821081
assert(jsonDFTwo.schema === schemaTwo)
10831082
}
10841083

1084+
test("SPARK-19641: Additional corrupt records: DROPMALFORMED mode") {
1085+
val schema = new StructType().add("dummy", StringType)
1086+
// `DROPMALFORMED` mode should skip corrupt records
1087+
val jsonDF = spark.read
1088+
.option("mode", "DROPMALFORMED")
1089+
.json(additionalCorruptRecords)
1090+
checkAnswer(
1091+
jsonDF,
1092+
Row("test"))
1093+
assert(jsonDF.schema === schema)
1094+
}
1095+
10851096
test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
10861097
val schema = StructType(
10871098
StructField("a", StringType, true) ::
@@ -1882,6 +1893,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
18821893
}
18831894
}
18841895

1896+
test("SPARK-19641: Handle multi-line corrupt documents (DROPMALFORMED)") {
1897+
withTempPath { dir =>
1898+
val path = dir.getCanonicalPath
1899+
val corruptRecordCount = additionalCorruptRecords.count().toInt
1900+
assert(corruptRecordCount === 5)
1901+
1902+
additionalCorruptRecords
1903+
.toDF("value")
1904+
// this is the minimum partition count that avoids hash collisions
1905+
.repartition(corruptRecordCount * 4, F.hash($"value"))
1906+
.write
1907+
.text(path)
1908+
1909+
val jsonDF = spark.read.option("wholeFile", true).option("mode", "DROPMALFORMED").json(path)
1910+
checkAnswer(jsonDF, Seq(Row("test")))
1911+
}
1912+
}
1913+
18851914
test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") {
18861915
withTempPath { dir =>
18871916
val path = dir.getCanonicalPath
@@ -1903,9 +1932,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
19031932
.option("wholeFile", true)
19041933
.option("mode", "FAILFAST")
19051934
.json(path)
1906-
.collect()
19071935
}
1908-
assert(exceptionOne.getMessage.contains("Failed to parse a value"))
1936+
assert(exceptionOne.getMessage.contains("Failed to infer a common schema"))
19091937

19101938
val exceptionTwo = intercept[SparkException] {
19111939
spark.read

0 commit comments

Comments
 (0)