From 03fe63a3e220c576b30f3bce4366b8f2bba0a352 Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Mon, 30 Nov 2015 12:23:34 -0600 Subject: [PATCH 1/4] Prevent failure on corrupt JSON records Return failed record when a record cannot be parsed. Allows parsing of files containing corrupt records of any form. --- .../sql/execution/datasources/json/JacksonParser.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index bfa140504105..37362e49441d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -266,11 +266,7 @@ object JacksonParser { } else { array.toArray[InternalRow](schema) } - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of " + - "the file (or each string in the RDD) is a valid JSON object or " + - "an array of JSON objects.") + case _ => failedRecord(record) } } } catch { From 3dd4b7bf9a52a52bf58b8263247ef582597c9a62 Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Tue, 1 Dec 2015 14:10:33 -0600 Subject: [PATCH 2/4] Add regression test for corrupt record JSON parsing --- .../datasources/json/JsonSuite.scala | 29 +++++++++++++++++++ .../datasources/json/TestJsonData.scala | 6 ++++ 2 files changed, 35 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ba7718c86463..8fd91e0737e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1427,4 +1427,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } } + + test("SPARK-12057 additional corrupt records do not throw exceptions") { + // Test if we can query corrupt records. + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + withTempTable("jsonTable") { + val jsonDF = sqlContext.read.json(additionalCorruptRecords) + jsonDF.registerTempTable("jsonTable") + val schema = StructType( + StructField("_unparsed", StringType, true) :: + StructField("dummy", StringType, true) :: Nil) + + assert(schema === jsonDF.schema) + + // In HiveContext, backticks should be used to access columns starting with a underscore. + checkAnswer( + sql( + """ + |SELECT dummy, _unparsed + |FROM jsonTable + """.stripMargin), + Row("test", null) :: + Row(null, """42""") :: + Row(null, """ ","ian":"test"}""") :: Nil + ) + + } + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 713d1da1cb51..f6af6a9113b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -188,6 +188,12 @@ private[json] trait TestJsonData { """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """]""" :: Nil) + def additionalCorruptRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"dummy":"test"}""" :: + """42""" :: + """ ","ian":"test"}""" :: Nil) + def emptyRecords: RDD[String] = sqlContext.sparkContext.parallelize( """{""" :: From d774bfe3f41f05add76c5368d2903d670ead52ac Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Tue, 1 Dec 2015 14:53:33 -0600 Subject: [PATCH 3/4] Correct schema --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8fd91e0737e6..2fc1daf96bcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1432,13 +1432,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") { - val jsonDF = sqlContext.read.json(additionalCorruptRecords) - jsonDF.registerTempTable("jsonTable") val schema = StructType( StructField("_unparsed", StringType, true) :: - StructField("dummy", StringType, true) :: Nil) - - assert(schema === jsonDF.schema) + StructField("dummy", StringType, true) :: Nil) + val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords) + jsonDF.registerTempTable("jsonTable") // In HiveContext, backticks should be used to access columns starting with a underscore. checkAnswer( From ad714338cfbf1d55dd454a7e4ee91a70d7bd373c Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 13 Dec 2015 20:57:20 -0800 Subject: [PATCH 4/4] Handle more cases. --- .../datasources/json/InferSchema.scala | 37 ++++++++++++++++-- .../datasources/json/JacksonParser.scala | 17 +++++++-- .../datasources/json/JsonSuite.scala | 38 ++++++++++++------- .../datasources/json/TestJsonData.scala | 3 +- 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 922fd5b21167..59ba4ae2cba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -61,7 +61,10 @@ private[json] object InferSchema { StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) } } - }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) + }.treeAggregate[DataType]( + StructType(Seq()))( + compatibleRootType(columnNameOfCorruptRecords), + compatibleRootType(columnNameOfCorruptRecords)) canonicalizeType(rootType) match { case Some(st: StructType) => st @@ -170,12 +173,38 @@ private[json] object InferSchema { case other => Some(other) } + private def withCorruptField( + struct: StructType, + columnNameOfCorruptRecords: String): StructType = { + if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) { + // If this given struct does not have a column used for corrupt records, + // add this field. + struct.add(columnNameOfCorruptRecords, StringType, nullable = true) + } else { + // Otherwise, just return this struct. + struct + } + } + /** * Remove top-level ArrayType wrappers and merge the remaining schemas */ - private def compatibleRootType: (DataType, DataType) => DataType = { - case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2) - case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2) + private def compatibleRootType( + columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = { + // Since we support array of json objects at the top level, + // we need to check the element type and find the root level data type. + case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) + case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) + // If we see any other data type at the root level, we get records that cannot be + // parsed. So, we use the struct as the data type and add the corrupt field to the schema. + case (struct: StructType, NullType) => struct + case (NullType, struct: StructType) => struct + case (struct: StructType, o) if !o.isInstanceOf[StructType] => + withCorruptField(struct, columnNameOfCorruptRecords) + case (o, struct: StructType) if !o.isInstanceOf[StructType] => + withCorruptField(struct, columnNameOfCorruptRecords) + // If we get anything else, we call compatibleType. + // Usually, when we reach here, ty1 and ty2 are two StructTypes. case (ty1, ty2) => compatibleType(ty1, ty2) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 37362e49441d..a5d7ca78a7b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils +private[json] class SparkSQLJsonProcessingException(msg: String) extends Exception(msg) + object JacksonParser { def parse( @@ -110,7 +112,7 @@ object JacksonParser { lowerCaseValue.equals("-inf")) { value.toFloat } else { - sys.error(s"Cannot parse $value as FloatType.") + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") } case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) => @@ -127,7 +129,7 @@ object JacksonParser { lowerCaseValue.equals("-inf")) { value.toDouble } else { - sys.error(s"Cannot parse $value as DoubleType.") + throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") } case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) => @@ -174,7 +176,11 @@ object JacksonParser { convertField(factory, parser, udt.sqlType) case (token, dataType) => - sys.error(s"Failed to parse a value for data type $dataType (current token: $token).") + // We cannot parse this token based on the given data type. So, we throw a + // SparkSQLJsonProcessingException and this exception will be caught by + // parseJson method. + throw new SparkSQLJsonProcessingException( + s"Failed to parse a value for data type $dataType (current token: $token).") } } @@ -266,12 +272,15 @@ object JacksonParser { } else { array.toArray[InternalRow](schema) } - case _ => failedRecord(record) + case _ => + failedRecord(record) } } } catch { case _: JsonProcessingException => failedRecord(record) + case _: SparkSQLJsonProcessingException => + failedRecord(record) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2fc1daf96bcd..baa258ad2615 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1435,21 +1435,31 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val schema = StructType( StructField("_unparsed", StringType, true) :: StructField("dummy", StringType, true) :: Nil) - val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords) - jsonDF.registerTempTable("jsonTable") - - // In HiveContext, backticks should be used to access columns starting with a underscore. - checkAnswer( - sql( - """ - |SELECT dummy, _unparsed - |FROM jsonTable - """.stripMargin), - Row("test", null) :: - Row(null, """42""") :: - Row(null, """ ","ian":"test"}""") :: Nil - ) + { + // We need to make sure we can infer the schema. + val jsonDF = sqlContext.read.json(additionalCorruptRecords) + assert(jsonDF.schema === schema) + } + + { + val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords) + jsonDF.registerTempTable("jsonTable") + + // In HiveContext, backticks should be used to access columns starting with a underscore. + checkAnswer( + sql( + """ + |SELECT dummy, _unparsed + |FROM jsonTable + """.stripMargin), + Row("test", null) :: + Row(null, """[1,2,3]""") :: + Row(null, """":"test", "a":1}""") :: + Row(null, """42""") :: + Row(null, """ ","ian":"test"}""") :: Nil + ) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index f6af6a9113b8..cb61f7eeca0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -191,6 +191,8 @@ private[json] trait TestJsonData { def additionalCorruptRecords: RDD[String] = sqlContext.sparkContext.parallelize( """{"dummy":"test"}""" :: + """[1,2,3]""" :: + """":"test", "a":1}""" :: """42""" :: """ ","ian":"test"}""" :: Nil) @@ -203,7 +205,6 @@ private[json] trait TestJsonData { """{"b": [{"c": {}}]}""" :: """]""" :: Nil) - lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())