From 2d2fed923e9567b6bc22cf4355ee3b57f8699cd4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 6 Dec 2018 22:26:42 +0100 Subject: [PATCH 01/15] Test for bad records --- .../sql/execution/datasources/json/JsonSuite.scala | 11 +++++++++++ .../sql/execution/datasources/json/TestJsonData.scala | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dff37ca2d40f0..7f5048ed2db84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2563,4 +2563,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(!files.exists(_.getName.endsWith("json"))) } } + + test("return partial result for bad records") { + val schema = new StructType() + .add("a", DoubleType) + .add("b", StringType) + .add("c", TimestampType) + .add("_corrupt_record", StringType) + val jsonDF = spark.read.schema(schema).json(badRecords) + + jsonDF.show(false) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 6e9559edf8ec2..384b8c5c89dc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -229,6 +229,12 @@ private[json] trait TestJsonData { """{"date": "27/10/2014 18:30"}""" :: """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING) + def badRecords: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( + """{"a":0.1,"b":"abc","c":"2018-12-06T22:"""" :: + """{"a":"-","b":"abc","c":"2018-12-06T22:18:00.123"""" :: + """{"a":0.2,"b":{},"c":"2018-12-06T22:19:00.123"""" :: Nil))(Encoders.STRING) + lazy val singleRow: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING) From de4885899b2a5e866b6c4a365e91fc901cdc0f86 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 00:11:29 +0100 Subject: [PATCH 02/15] Added a test --- .../sql/execution/datasources/json/JsonSuite.scala | 13 ++++++++----- .../execution/datasources/json/TestJsonData.scala | 5 ++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7f5048ed2db84..8577eb2b8bc12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer( sql("select nullstr, headers.Host from jsonTable"), - Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, null)) + Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null)) ) } @@ -2567,11 +2567,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("return partial result for bad records") { val schema = new StructType() .add("a", DoubleType) - .add("b", StringType) - .add("c", TimestampType) + .add("b", ArrayType(IntegerType)) + .add("c", StringType) .add("_corrupt_record", StringType) - val jsonDF = spark.read.schema(schema).json(badRecords) + val df = spark.read.schema(schema).json(badRecords) - jsonDF.show(false) + checkAnswer( + df, + Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") :: + Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 384b8c5c89dc7..5155b2a3b78fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -231,9 +231,8 @@ private[json] trait TestJsonData { def badRecords: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize( - """{"a":0.1,"b":"abc","c":"2018-12-06T22:"""" :: - """{"a":"-","b":"abc","c":"2018-12-06T22:18:00.123"""" :: - """{"a":0.2,"b":{},"c":"2018-12-06T22:19:00.123"""" :: Nil))(Encoders.STRING) + """{"a":"-","b":[0, 1, 2],"c":"abc"}""" :: + """{"a":0.1,"b":{},"c":"def"}""" :: Nil))(Encoders.STRING) lazy val singleRow: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING) From 101df4e7fcee60c042c21951551bd3376ae325cb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 00:16:09 +0100 Subject: [PATCH 03/15] Return partial results --- .../sql/catalyst/json/JacksonParser.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2357595906b11..e9a042a600a49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -22,6 +22,7 @@ import java.nio.charset.MalformedInputException import scala.collection.mutable.ArrayBuffer import scala.util.Try +import scala.util.control.NonFatal import com.fasterxml.jackson.core._ @@ -29,7 +30,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -347,17 +347,28 @@ class JacksonParser( schema: StructType, fieldConverters: Array[ValueConverter]): InternalRow = { val row = new GenericInternalRow(schema.length) + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) - + try { + row.update(index, fieldConverters(index).apply(parser)) + } catch { + case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } case None => parser.skipChildren() } } - row + if (badRecordException.isEmpty) { + row + } else { + throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => Some(row), badRecordException.get) + } } /** @@ -428,6 +439,8 @@ class JacksonParser( val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + case e: BadRecordException => + throw e.copy(record = () => recordLiteral(record)) } } } From 542ae74a9240b2905f4ca1f9de62fd53f1562d64 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 10:02:37 +0100 Subject: [PATCH 04/15] Updating the migration guide --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index ed2ff139bcc33..48c813820769e 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, CSV/JSON datasource converts a malformed CSV/JSON string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV/JSON column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 From 439f57ac59a7c1f481317126934a55d20b38f579 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 10:04:58 +0100 Subject: [PATCH 05/15] Revert "Updating the migration guide" This reverts commit 542ae74a9240b2905f4ca1f9de62fd53f1562d64. --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 48c813820769e..ed2ff139bcc33 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. - - In Spark version 2.4 and earlier, CSV/JSON datasource converts a malformed CSV/JSON string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV/JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 From 77d767016607056809d5f8c4e9653809838b91dd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 10:10:31 +0100 Subject: [PATCH 06/15] Updating the migration guide - separate notes --- docs/sql-migration-guide-upgrade.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index ed2ff139bcc33..39d6b2ca2a2f2 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -37,6 +37,8 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions (`from_json`, `get_json_object` and etc.) convert a malformed JSON string to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. From fa431ee293d5ae3a7c3b8b441c4a0b6778ac84e1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 10:12:16 +0100 Subject: [PATCH 07/15] Updating the migration guide - minor --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 39d6b2ca2a2f2..3579552dd7f84 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -37,7 +37,7 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - - In Spark version 2.4 and earlier, JSON datasource and JSON functions (`from_json`, `get_json_object` and etc.) convert a malformed JSON string to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a malformed JSON string to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 From e13673de2537fb42bdc2bb8df9849560ae399c89 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 7 Dec 2018 11:00:07 +0100 Subject: [PATCH 08/15] Updating the migration guide - minor 2 --- docs/sql-migration-guide-upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 3579552dd7f84..04952f08de565 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -37,7 +37,7 @@ displayTitle: Spark SQL Upgrading Guide - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a malformed JSON string to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 From 2dc77adde2ee1f14aaad0800fe225078048392f5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 15:19:40 +0100 Subject: [PATCH 09/15] Test simplification --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8577eb2b8bc12..6b05c9d324bc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2565,11 +2565,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("return partial result for bad records") { - val schema = new StructType() - .add("a", DoubleType) - .add("b", ArrayType(IntegerType)) - .add("c", StringType) - .add("_corrupt_record", StringType) + val schema = "a double, b array, c string, _corrupt_record string" val df = spark.read.schema(schema).json(badRecords) checkAnswer( From 43112255aa5230dc0d3f198838ff83806906dbbd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 15:27:48 +0100 Subject: [PATCH 10/15] other fields -> malformed fields --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1d2dd4d808930..b7e166c8b7fc0 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d92b0d5677e25..d4fb45d7cd134 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 661fe98d8c901..d5ea49934b4fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
    *
  • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To * keep corrupt records, an user can set a string type field named * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the * field, it drops corrupt records during parsing. When inferring a schema, it implicitly diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index c8e3e1c191044..c881ac2a19d30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -273,7 +273,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
      *
    • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To * keep corrupt records, an user can set a string type field named * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the * field, it drops corrupt records during parsing. When inferring a schema, it implicitly From 62a6795713c64b48a6e56449e161da8dbf8ec0aa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 15:30:17 +0100 Subject: [PATCH 11/15] CSV: other fields -> malformed fields --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../scala/org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index b7e166c8b7fc0..7b10512a43294 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d4fb45d7cd134..fc23b9d99c34a 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index d5ea49934b4fb..b64a8f007cb21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -598,7 +598,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. It supports the following case-insensitive modes. *
        *
      • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. A record with less/more tokens than schema is not a corrupted record to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index c881ac2a19d30..012d77cde5d55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -360,7 +360,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. It supports the following case-insensitive modes. *
          *
        • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. A record with less/more tokens than schema is not a corrupted record to From b19b3e1ce25836a419b056a63bc320f1b82dc1b1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 15:45:41 +0100 Subject: [PATCH 12/15] Making scalastyle checker happy --- .../org/apache/spark/sql/DataFrameReader.scala | 14 +++++++------- .../spark/sql/streaming/DataStreamReader.scala | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b64a8f007cb21..9751528654ffb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -598,13 +598,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. It supports the following case-insensitive modes. *
            *
          • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To keep - * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` - * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. A record with less/more tokens than schema is not a corrupted record to - * CSV. When it meets a record having fewer tokens than the length of the schema, sets - * `null` to extra fields. When the record has more tokens than the length of the schema, - * it drops extra tokens.
          • + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. + * To keep corrupt records, an user can set a string type field named + * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have + * the field, it drops corrupt records during parsing. A record with less/more tokens + * than schema is not a corrupted record to CSV. When it meets a record having fewer + * tokens than the length of the schema, sets `null` to extra fields. When the record + * has more tokens than the length of the schema, it drops extra tokens. *
          • `DROPMALFORMED` : ignores the whole corrupted records.
          • *
          • `FAILFAST` : throws an exception when it meets corrupted records.
          • *
          diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 012d77cde5d55..914fa90ae7e14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -360,13 +360,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. It supports the following case-insensitive modes. *
            *
          • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To keep - * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` - * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. A record with less/more tokens than schema is not a corrupted record to - * CSV. When it meets a record having fewer tokens than the length of the schema, sets - * `null` to extra fields. When the record has more tokens than the length of the schema, - * it drops extra tokens.
          • + * field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. + * To keep corrupt records, an user can set a string type field named + * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have + * the field, it drops corrupt records during parsing. A record with less/more tokens + * than schema is not a corrupted record to CSV. When it meets a record having fewer + * tokens than the length of the schema, sets `null` to extra fields. When the record + * has more tokens than the length of the schema, it drops extra tokens. *
          • `DROPMALFORMED` : ignores the whole corrupted records.
          • *
          • `FAILFAST` : throws an exception when it meets corrupted records.
          • *
          From fc088df2e98133e11d8fe60f899d3212e4cbee35 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 8 Dec 2018 20:42:22 +0100 Subject: [PATCH 13/15] Added the --- docs/sql-migration-guide-upgrade.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 04952f08de565..3c346be4c45d0 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -35,9 +35,9 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. - - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 From 07e04981d616db0c8dc0713527ebb9f5963fde53 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Dec 2018 11:05:05 +0100 Subject: [PATCH 14/15] Moving badRecords to JsonSuite --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 3 +++ .../spark/sql/execution/datasources/json/TestJsonData.scala | 5 ----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6b05c9d324bc5..3330de3584ebb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2566,6 +2566,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("return partial result for bad records") { val schema = "a double, b array, c string, _corrupt_record string" + val badRecords = Seq( + """{"a":"-","b":[0, 1, 2],"c":"abc"}""", + """{"a":0.1,"b":{},"c":"def"}""").toDS() val df = spark.read.schema(schema).json(badRecords) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 5155b2a3b78fa..6e9559edf8ec2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -229,11 +229,6 @@ private[json] trait TestJsonData { """{"date": "27/10/2014 18:30"}""" :: """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING) - def badRecords: Dataset[String] = - spark.createDataset(spark.sparkContext.parallelize( - """{"a":"-","b":[0, 1, 2],"c":"abc"}""" :: - """{"a":0.1,"b":{},"c":"def"}""" :: Nil))(Encoders.STRING) - lazy val singleRow: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING) From 9ca9248ed3f9314747c1415bd19760c53019bf36 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 9 Dec 2018 16:20:09 +0100 Subject: [PATCH 15/15] Adding PartialResultException --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 9 ++++++--- .../spark/sql/catalyst/util/BadRecordException.scala | 10 ++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index e9a042a600a49..7e3bd4df51bb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -367,7 +367,7 @@ class JacksonParser( if (badRecordException.isEmpty) { row } else { - throw BadRecordException(() => UTF8String.EMPTY_UTF8, () => Some(row), badRecordException.get) + throw PartialResultException(row, badRecordException.get) } } @@ -439,8 +439,11 @@ class JacksonParser( val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) - case e: BadRecordException => - throw e.copy(record = () => recordLiteral(record)) + case PartialResultException(row, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResult = () => Some(row), + cause) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 985f0dc1cd60e..d719a33929fcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.unsafe.types.UTF8String +/** + * Exception thrown when the underlying parser returns a partial result of parsing. + * @param partialResult the partial result of parsing a bad record. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialResultException( + partialResult: InternalRow, + cause: Throwable) + extends Exception(cause) + /** * Exception thrown when the underlying parser meet a bad record and can't parse it. * @param record a function to return the record that cause the parser to fail