From d4615527d5781a760b58e286434011588ee8d62c Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Mon, 30 Nov 2015 12:23:34 -0600 Subject: [PATCH 1/4] Prevent failure on corrupt JSON records Return failed record when a record cannot be parsed. Allows parsing of files containing corrupt records of any form. --- .../sql/execution/datasources/json/JacksonParser.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index bfa140504105..37362e49441d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -266,11 +266,7 @@ object JacksonParser { } else { array.toArray[InternalRow](schema) } - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of " + - "the file (or each string in the RDD) is a valid JSON object or " + - "an array of JSON objects.") + case _ => failedRecord(record) } } } catch { From 02a742babcd8473772be9740357d6ce996902e8d Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Tue, 1 Dec 2015 14:10:33 -0600 Subject: [PATCH 2/4] Add regression test for corrupt record JSON parsing --- .../datasources/json/JsonSuite.scala | 29 +++++++++++++++++++ .../datasources/json/TestJsonData.scala | 6 ++++ 2 files changed, 35 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ba7718c86463..8fd91e0737e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1427,4 +1427,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } } + + test("SPARK-12057 additional corrupt records do not throw exceptions") { + // Test if we can query corrupt records. + withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { + withTempTable("jsonTable") { + val jsonDF = sqlContext.read.json(additionalCorruptRecords) + jsonDF.registerTempTable("jsonTable") + val schema = StructType( + StructField("_unparsed", StringType, true) :: + StructField("dummy", StringType, true) :: Nil) + + assert(schema === jsonDF.schema) + + // In HiveContext, backticks should be used to access columns starting with a underscore. + checkAnswer( + sql( + """ + |SELECT dummy, _unparsed + |FROM jsonTable + """.stripMargin), + Row("test", null) :: + Row(null, """42""") :: + Row(null, """ ","ian":"test"}""") :: Nil + ) + + } + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 713d1da1cb51..f6af6a9113b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -188,6 +188,12 @@ private[json] trait TestJsonData { """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """]""" :: Nil) + def additionalCorruptRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"dummy":"test"}""" :: + """42""" :: + """ ","ian":"test"}""" :: Nil) + def emptyRecords: RDD[String] = sqlContext.sparkContext.parallelize( """{""" :: From fb4fa7b9f85e8b546d86b4329bfbce7a767036cc Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Tue, 1 Dec 2015 14:53:33 -0600 Subject: [PATCH 3/4] Correct schema --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8fd91e0737e6..2fc1daf96bcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1432,13 +1432,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") { - val jsonDF = sqlContext.read.json(additionalCorruptRecords) - jsonDF.registerTempTable("jsonTable") val schema = StructType( StructField("_unparsed", StringType, true) :: - StructField("dummy", StringType, true) :: Nil) - - assert(schema === jsonDF.schema) + StructField("dummy", StringType, true) :: Nil) + val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords) + jsonDF.registerTempTable("jsonTable") // In HiveContext, backticks should be used to access columns starting with a underscore. checkAnswer( From 8fd677f01dd88c9eb637bf8091adc7a00af568ce Mon Sep 17 00:00:00 2001 From: Ian Macalinao Date: Tue, 1 Dec 2015 16:01:17 -0600 Subject: [PATCH 4/4] dummy