From b2e92b4706c5ed3b141805933f29beb87e1b7371 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 11 Feb 2018 21:06:53 +0100 Subject: [PATCH 01/37] Test for reading json in UTF-16 with BOM --- .../test/resources/json-tests/utf16WithBOM.json | Bin 0 -> 170 bytes .../execution/datasources/json/JsonSuite.scala | 11 +++++++++++ .../execution/datasources/json/TestJsonData.scala | 4 ++++ 3 files changed, 15 insertions(+) create mode 100644 sql/core/src/test/resources/json-tests/utf16WithBOM.json diff --git a/sql/core/src/test/resources/json-tests/utf16WithBOM.json b/sql/core/src/test/resources/json-tests/utf16WithBOM.json new file mode 100644 index 0000000000000000000000000000000000000000..65e7e2f72948103b198017d2553f501efe850da0 GIT binary patch literal 170 zcmezWubM%LA&nuEp@^ZFp@hMYA(0`MAr&ZQ1;ow_89?z&po|iO4ub-a%mK<{s&fL0 if=oyOs;Fh)W#D4KXQ~TBK0_%(Isr3-fU5G5OauU86dZN{ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8c8d41ebf115a..49541b4951c59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2063,4 +2063,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } } + + test("json in UTF-16 with BOM") { + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("mode", "DROPMALFORMED") + .json(testFile("json-tests/utf16WithBOM.json")) + + checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") + )) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 13084ba4a7f04..0db33cca3c63b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -22,6 +22,10 @@ import org.apache.spark.sql.{Dataset, Encoders, SparkSession} private[json] trait TestJsonData { protected def spark: SparkSession + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + def primitiveFieldAndType: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize( """{"string":"this is a simple string.", From cb2f27ba73cb5838e2910c31ca204100bb4eebca Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 11 Feb 2018 21:48:35 +0100 Subject: [PATCH 02/37] Use user's charset or autodetect it if the charset is not specified --- .../spark/sql/catalyst/json/CreateJacksonParser.scala | 9 +++++++-- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 3 +++ .../sql/execution/datasources/json/JsonDataSource.scala | 4 +++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 025a388aacaa5..532f49fb96566 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -39,8 +39,13 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { - jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(charset: Option[String])(jsonFactory: JsonFactory, record: Text): JsonParser = { + charset.map {cs => + val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) + jsonFactory.createParser(new InputStreamReader(bain, cs)) + }.getOrElse { + jsonFactory.createParser(record.getBytes, 0, record.getLength) + } } def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 652412b34478a..99873cb1d8e36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -85,6 +85,9 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** Standard charset name. For example UTF-8, UTF-16 and UTF-32 */ + val charset = parameters.get("charset") + /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 77e7edc8e7a20..049f953b5f2ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -122,8 +122,10 @@ object TextInputJsonDataSource extends JsonDataSource { schema: StructType): Iterator[InternalRow] = { val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) + val charset = parser.options.charset + val safeParser = new FailureSafeParser[Text]( - input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), + input => parser.parse(input, CreateJacksonParser.text(charset), textToUTF8String), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) From 0d45fd382bb90ebd7161d57a3da23820b4497f67 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 10:48:08 +0100 Subject: [PATCH 03/37] Added a type and a comment for charset --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 99873cb1d8e36..073f4c6e2fcbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -85,8 +85,9 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - /** Standard charset name. For example UTF-8, UTF-16 and UTF-32 */ - val charset = parameters.get("charset") + /** Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. */ + val charset: Option[String] = parameters.get("charset") /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { From 1fb9b321a4fac0f41cfb9dd5f85b61feb6796227 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 11:00:27 +0100 Subject: [PATCH 04/37] Replacing the monadic chaining by matching because it is more readable --- .../spark/sql/catalyst/json/CreateJacksonParser.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 532f49fb96566..acb1df005c023 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -40,11 +40,12 @@ private[sql] object CreateJacksonParser extends Serializable { } def text(charset: Option[String])(jsonFactory: JsonFactory, record: Text): JsonParser = { - charset.map {cs => - val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) - jsonFactory.createParser(new InputStreamReader(bain, cs)) - }.getOrElse { - jsonFactory.createParser(record.getBytes, 0, record.getLength) + charset match { + case Some(cs) => + val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) + jsonFactory.createParser(new InputStreamReader(bain, cs)) + case _ => + jsonFactory.createParser(record.getBytes, 0, record.getLength) } } From c3b04ee68338ad4f93a5361a41db28b37f020907 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 11:44:19 +0100 Subject: [PATCH 05/37] Keeping the old method for backward compatibility --- .../spark/sql/catalyst/json/CreateJacksonParser.scala | 6 +++++- .../sql/execution/datasources/json/JsonDataSource.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index acb1df005c023..1a529bd8611b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -39,7 +39,7 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(charset: Option[String])(jsonFactory: JsonFactory, record: Text): JsonParser = { + def textInCharset(charset: Option[String])(jsonFactory: JsonFactory, record: Text): JsonParser = { charset match { case Some(cs) => val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) @@ -49,6 +49,10 @@ private[sql] object CreateJacksonParser extends Serializable { } } + def text(jsonFactory: JsonFactory, record: Text): JsonParser = { + textInCharset(None)(jsonFactory, record) + } + def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { jsonFactory.createParser(record) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 049f953b5f2ed..aa2c594bdfd0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -125,7 +125,7 @@ object TextInputJsonDataSource extends JsonDataSource { val charset = parser.options.charset val safeParser = new FailureSafeParser[Text]( - input => parser.parse(input, CreateJacksonParser.text(charset), textToUTF8String), + input => parser.parse(input, CreateJacksonParser.textInCharset(charset), textToUTF8String), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) From 93d38794dd261ee1bbe2497470ee43de1186ef3c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 11:54:52 +0100 Subject: [PATCH 06/37] testFile is moved into the test to make more local because it is used only in the test --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 6 +++++- .../spark/sql/execution/datasources/json/TestJsonData.scala | 4 ---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 49541b4951c59..0b5dc7addf6d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2065,10 +2065,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("json in UTF-16 with BOM") { + val testFile = { + val fileName = "json-tests/utf16WithBOM.json" + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) .option("mode", "DROPMALFORMED") - .json(testFile("json-tests/utf16WithBOM.json")) + .json(testFile) checkAnswer(jsonDF, Seq( Row("Chris", "Baird"), Row("Doug", "Rood") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 0db33cca3c63b..13084ba4a7f04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -22,10 +22,6 @@ import org.apache.spark.sql.{Dataset, Encoders, SparkSession} private[json] trait TestJsonData { protected def spark: SparkSession - def testFile(fileName: String): String = { - Thread.currentThread().getContextClassLoader.getResource(fileName).toString - } - def primitiveFieldAndType: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize( """{"string":"this is a simple string.", From 15798a1ce61df29e9a32f960e755495e3d63f4e3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 12:15:25 +0100 Subject: [PATCH 07/37] Adding the charset as third parameter to the text method --- .../spark/sql/catalyst/json/CreateJacksonParser.scala | 8 ++------ .../sql/execution/datasources/json/JsonDataSource.scala | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 1a529bd8611b1..bb9592f14272a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -39,7 +39,7 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def textInCharset(charset: Option[String])(jsonFactory: JsonFactory, record: Text): JsonParser = { + def text(jsonFactory: JsonFactory, record: Text, charset: Option[String] = None): JsonParser = { charset match { case Some(cs) => val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) @@ -48,11 +48,7 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } } - - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { - textInCharset(None)(jsonFactory, record) - } - + def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { jsonFactory.createParser(record) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index aa2c594bdfd0e..657916fc185c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -125,7 +125,7 @@ object TextInputJsonDataSource extends JsonDataSource { val charset = parser.options.charset val safeParser = new FailureSafeParser[Text]( - input => parser.parse(input, CreateJacksonParser.textInCharset(charset), textToUTF8String), + input => parser.parse[Text](input, CreateJacksonParser.text(_, _, charset), textToUTF8String), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) From cc05ce9af7c9f1d14bd10c1f46a60ce043c13fe1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 12:29:57 +0100 Subject: [PATCH 08/37] Removing whitespaces at the end of the line --- .../apache/spark/sql/catalyst/json/CreateJacksonParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index bb9592f14272a..cf2e3f85dc4fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -48,7 +48,7 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } } - + def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { jsonFactory.createParser(record) } From 74f2026e62389902ab7a4c418aa96a492fa14f6f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 13:29:28 +0100 Subject: [PATCH 09/37] Fix the comment in javadoc style --- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 073f4c6e2fcbc..c261778421c12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -85,8 +85,10 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - /** Standard charset name. For example UTF-8, UTF-16 and UTF-32. - * If charset is not specified (None), it will be detected automatically. */ + /** + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. + */ val charset: Option[String] = parameters.get("charset") /** Sets config options on a Jackson [[JsonFactory]]. */ From 4856b8e0b287b3ba3331865298f0603dde18459c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 13 Feb 2018 13:32:48 +0100 Subject: [PATCH 10/37] Simplifying of the UTF-16 test --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0b5dc7addf6d3..c84ebcd3c9af2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2065,10 +2065,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("json in UTF-16 with BOM") { - val testFile = { - val fileName = "json-tests/utf16WithBOM.json" - Thread.currentThread().getContextClassLoader.getResource(fileName).toString - } + val fileName = "json-tests/utf16WithBOM.json" + val testFile = Thread.currentThread().getContextClassLoader.getResource(fileName).toString val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) .option("mode", "DROPMALFORMED") From 084f41fb6edd7c86aeb8643973119cb4b38a34fa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Feb 2018 18:33:25 +0100 Subject: [PATCH 11/37] A hint to the exception how to set the charset explicitly --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7f6956994f31f..4f14e17ae6181 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayOutputStream, CharConversionException} import scala.collection.mutable.ArrayBuffer import scala.util.Try @@ -361,6 +361,14 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.charset.isEmpty => + val msg = e.getMessage + + """ + |Charset was detected automatically. You might want to set it explicitly via the charset option: + | .option("charset", "UTF-8") + |Example of supported charsets: UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE and etc. + """.stripMargin + throw new CharConversionException(msg) } } } From 31cd793a86e6a0e48e0150ffb8c36da2872c65ca Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Feb 2018 19:00:55 +0100 Subject: [PATCH 12/37] Fix for scala style checks --- .../apache/spark/sql/catalyst/json/JacksonParser.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 4f14e17ae6181..5ca823115e126 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -364,10 +364,10 @@ class JacksonParser( case e: CharConversionException if options.charset.isEmpty => val msg = e.getMessage + """ - |Charset was detected automatically. You might want to set it explicitly via the charset option: - | .option("charset", "UTF-8") - |Example of supported charsets: UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE and etc. - """.stripMargin + |Charset was detected automatically. You might want to set it explicitly + |via the charset option like: .option("charset", "UTF-8") + |Example of supported charsets: """.stripMargin + + "UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" throw new CharConversionException(msg) } } From 6eacd186a954a3f724ee607826b17f432ead77e1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Feb 2018 19:44:04 +0100 Subject: [PATCH 13/37] Run tests again --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 5ca823115e126..b46b6f27c27fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -366,8 +366,8 @@ class JacksonParser( """ |Charset was detected automatically. You might want to set it explicitly |via the charset option like: .option("charset", "UTF-8") - |Example of supported charsets: """.stripMargin + - "UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" + |Example of supported charsets:""".stripMargin + + " UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" throw new CharConversionException(msg) } } From 3b4a509d0260cfab720a5471ccd937de55c56093 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Feb 2018 20:06:06 +0100 Subject: [PATCH 14/37] Improving of the exception message --- .../spark/sql/catalyst/json/JacksonParser.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index b46b6f27c27fa..5fe975000c90f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -362,12 +362,12 @@ class JacksonParser( // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) case e: CharConversionException if options.charset.isEmpty => - val msg = e.getMessage + - """ - |Charset was detected automatically. You might want to set it explicitly - |via the charset option like: .option("charset", "UTF-8") - |Example of supported charsets:""".stripMargin + - " UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" + val msg = + """Failed to parse a character. Charset was detected automatically. + |You might want to set it explicitly via the charset option like: + | .option("charset", "UTF-8") + |Example of supported charsets: """.stripMargin + + "UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" throw new CharConversionException(msg) } } From cd1124ef7e6329f4dcd6926064271cd24b5a150d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Feb 2018 20:41:35 +0100 Subject: [PATCH 15/37] Appended the original message to the exception --- .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 5fe975000c90f..8ff165a1032dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -366,8 +366,9 @@ class JacksonParser( """Failed to parse a character. Charset was detected automatically. |You might want to set it explicitly via the charset option like: | .option("charset", "UTF-8") - |Example of supported charsets: """.stripMargin + - "UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE" + |Example of supported charsets: + | UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE + |""".stripMargin + e.getMessage throw new CharConversionException(msg) } } From ebf53904151582eef6d95780ca30b773404ae141 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 17 Feb 2018 21:36:28 +0100 Subject: [PATCH 16/37] Multi-line reading of json file in utf-32 --- .../resources/json-tests/utf32beWithBOM.json | Bin 0 -> 388 bytes .../datasources/json/JsonSuite.scala | 19 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/resources/json-tests/utf32beWithBOM.json diff --git a/sql/core/src/test/resources/json-tests/utf32beWithBOM.json b/sql/core/src/test/resources/json-tests/utf32beWithBOM.json new file mode 100644 index 0000000000000000000000000000000000000000..82e3ba8abcaa8b1e436db3d7b216abe7d9ecc26f GIT binary patch literal 388 zcmbu(u?mAg5QO2aeG0Mm2~ryC6t*FSpi#jfBwC5DZt@STa99N`KK2gw=EiLOn%Nx@ zVmC7rJkjBe4@TTj;)NR8yZF2^TWCMbQ- Date: Sat, 17 Feb 2018 22:43:23 +0100 Subject: [PATCH 17/37] Autodetect charset of jsons in the multiline mode --- .../catalyst/json/CreateJacksonParser.scala | 12 +++++-- .../datasources/json/JsonDataSource.scala | 34 ++++++++++-------- .../resources/json-tests/utf32beWithBOM.json | Bin 388 -> 172 bytes .../datasources/json/JsonSuite.scala | 4 +-- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index cf2e3f85dc4fe..16a9513d62399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -49,7 +49,15 @@ private[sql] object CreateJacksonParser extends Serializable { } } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { - jsonFactory.createParser(record) + def inputStream(jsonFactory: JsonFactory, + is: InputStream, + charset: Option[String] + ): JsonParser = { + charset match { + case Some(cs) => + jsonFactory.createParser(new InputStreamReader(is, cs)) + case _ => + jsonFactory.createParser(is) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 657916fc185c0..6d8a258406617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -148,6 +148,15 @@ object MultiLineJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): StructType = { val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) + def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + val path = new Path(record.getPath()) + CreateJacksonParser.inputStream( + jsonFactory, + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + parsedOptions.charset + ) + } + JsonInferSchema.infer(sampled, parsedOptions, createParser) } @@ -170,33 +179,30 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { - val path = new Path(record.getPath()) - CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) - } - override def readFile( conf: Configuration, file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { + def inputStream = { + CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) + } def partitionedFileString(ignored: Any): UTF8String = { - Utils.tryWithResource { - CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) - } { inputStream => - UTF8String.fromBytes(ByteStreams.toByteArray(inputStream)) + Utils.tryWithResource(inputStream) { is => + UTF8String.fromBytes(ByteStreams.toByteArray(is)) } } + val charset = parser.options.charset val safeParser = new FailureSafeParser[InputStream]( - input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString), + input => parser.parse[InputStream](input, + CreateJacksonParser.inputStream(_, _, charset), + partitionedFileString + ), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) - safeParser.parse( - CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) + safeParser.parse(inputStream) } } diff --git a/sql/core/src/test/resources/json-tests/utf32beWithBOM.json b/sql/core/src/test/resources/json-tests/utf32beWithBOM.json index 82e3ba8abcaa8b1e436db3d7b216abe7d9ecc26f..6c7733c577872845ed506ecb97ec70d68300198d 100644 GIT binary patch delta 17 YcmZo+Uc<=3!0_+?L=pFi=?RQn05X{cg#Z8m delta 117 zcmZ3(*upHr!0_)s0|P@e5OV>s0tioJapy#EJd$~JP-U$pU62KU?0g_D1>*FH0rs*% TPyvu&3J@y+aV-#ojEn^Uc4Q55 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f4e9f0a71addf..76f8ce1fc36b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2087,8 +2087,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("multiline", "true") .json(testFile(fileName)) - checkAnswer(jsonDF, Seq( - Row("Chris", "Baird"), Row("Doug", "Rood") - )) + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } } From ef5e6c6ec607239864375053a6e921acd3deae96 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 17 Feb 2018 22:54:21 +0100 Subject: [PATCH 18/37] Test for reading a json in UTF-16LE in the multiline mode by using user's charset --- .../src/test/resources/json-tests/utf16LE.json | Bin 0 -> 98 bytes .../sql/execution/datasources/json/JsonSuite.scala | 11 +++++++++++ 2 files changed, 11 insertions(+) create mode 100644 sql/core/src/test/resources/json-tests/utf16LE.json diff --git a/sql/core/src/test/resources/json-tests/utf16LE.json b/sql/core/src/test/resources/json-tests/utf16LE.json new file mode 100644 index 0000000000000000000000000000000000000000..ce4117fd299dfcbc7089e7c0530098bfcaf5a27e GIT binary patch literal 98 zcmbi20w;GhFpeJpqLd9J2PYe#WR62N(?$cl`!==KvkHk Roq(bsb5ek+xfp7J7y!-s4k`cu literal 0 HcmV?d00001 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 76f8ce1fc36b6..a1ea9033d7a3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2089,4 +2089,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } + + test("Use user's charset in reading of multi-line json in UTF-16LE") { + val fileName = "json-tests/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("charset", "UTF-16LE") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } } From f9b6ad141c7a1b9668fe0a2e4bdf6bdbdc54b98e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Feb 2018 04:08:37 +0100 Subject: [PATCH 19/37] Fix test: rename the test file - utf32be -> utf32BE --- .../{utf32beWithBOM.json => utf32BEWithBOM.json} | Bin 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/core/src/test/resources/json-tests/{utf32beWithBOM.json => utf32BEWithBOM.json} (100%) diff --git a/sql/core/src/test/resources/json-tests/utf32beWithBOM.json b/sql/core/src/test/resources/json-tests/utf32BEWithBOM.json similarity index 100% rename from sql/core/src/test/resources/json-tests/utf32beWithBOM.json rename to sql/core/src/test/resources/json-tests/utf32BEWithBOM.json From 3b7714c8bbe31475b4797e4303ded6c59634921a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Feb 2018 22:03:47 +0100 Subject: [PATCH 20/37] Fix code style --- .../spark/sql/catalyst/json/CreateJacksonParser.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 16a9513d62399..969f2d84c94e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -49,10 +49,10 @@ private[sql] object CreateJacksonParser extends Serializable { } } - def inputStream(jsonFactory: JsonFactory, - is: InputStream, - charset: Option[String] - ): JsonParser = { + def inputStream( + jsonFactory: JsonFactory, + is: InputStream, + charset: Option[String]): JsonParser = { charset match { case Some(cs) => jsonFactory.createParser(new InputStreamReader(is, cs)) From edb9167903c9e7667f6a536f139561ed3aadb6e6 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Feb 2018 22:09:28 +0100 Subject: [PATCH 21/37] Appending the create verb to the method for readability --- .../sql/execution/datasources/json/JsonDataSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 6d8a258406617..ec39141557d95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -184,11 +184,11 @@ object MultiLineJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - def inputStream = { + def createInputStream() = { CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) } def partitionedFileString(ignored: Any): UTF8String = { - Utils.tryWithResource(inputStream) { is => + Utils.tryWithResource(createInputStream()) { is => UTF8String.fromBytes(ByteStreams.toByteArray(is)) } } @@ -203,6 +203,6 @@ object MultiLineJsonDataSource extends JsonDataSource { schema, parser.options.columnNameOfCorruptRecord) - safeParser.parse(inputStream) + safeParser.parse(createInputStream()) } } From 5ba2881c252f40f7c736232cd01c1421ba4b811c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Feb 2018 22:20:04 +0100 Subject: [PATCH 22/37] Making the createParser as a separate private method --- .../catalyst/json/CreateJacksonParser.scala | 2 +- .../datasources/json/JsonDataSource.scala | 26 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 969f2d84c94e4..df393906557f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -52,7 +52,7 @@ private[sql] object CreateJacksonParser extends Serializable { def inputStream( jsonFactory: JsonFactory, is: InputStream, - charset: Option[String]): JsonParser = { + charset: Option[String] = None): JsonParser = { charset match { case Some(cs) => jsonFactory.createParser(new InputStreamReader(is, cs)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index ec39141557d95..912ac64c55dc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -148,16 +148,12 @@ object MultiLineJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): StructType = { val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) - def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { - val path = new Path(record.getPath()) - CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), - parsedOptions.charset - ) - } - JsonInferSchema.infer(sampled, parsedOptions, createParser) + JsonInferSchema.infer[PortableDataStream]( + sampled, + parsedOptions, + createParser(_, _, parsedOptions.charset) + ) } private def createBaseRdd( @@ -179,6 +175,18 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { + val path = new Path(record.getPath()) + CreateJacksonParser.inputStream( + jsonFactory, + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset + ) + } + override def readFile( conf: Configuration, file: PartitionedFile, From 1509e103f8b86393b5442d516ee283a16b7fa7e7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Feb 2018 22:27:01 +0100 Subject: [PATCH 23/37] Fix code style --- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 912ac64c55dc1..913b15c09b09b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -203,7 +203,8 @@ object MultiLineJsonDataSource extends JsonDataSource { val charset = parser.options.charset val safeParser = new FailureSafeParser[InputStream]( - input => parser.parse[InputStream](input, + input => parser.parse[InputStream]( + input, CreateJacksonParser.inputStream(_, _, charset), partitionedFileString ), From e3184b35e504ce46b82ee18babd3395b7d1fc34d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Feb 2018 22:17:43 +0100 Subject: [PATCH 24/37] Checks the charset option is supported --- python/pyspark/sql/tests.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 480815d27333f..95ca9cd665291 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -654,6 +654,12 @@ def test_multiLine_json(self): multiLine=True) self.assertEqual(people1.collect(), people_array.collect()) + def test_charset_json(self): + people1 = self.spark.read.option("charset", "UTF-8").json("python/test_support/sql/people.json") + people_array = self.spark.read.json("python/test_support/sql/people_array.json", + multiLine=True, charset="UTF-8") + self.assertEqual(people1.collect(), people_array.collect()) + def test_multiline_csv(self): ages_newlines = self.spark.read.csv( "python/test_support/sql/ages_newlines.csv", multiLine=True) From 87d259c7d190716a89016c85b7450d471b3481bf Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Feb 2018 22:19:02 +0100 Subject: [PATCH 25/37] Support charset as a parameter of the json method --- python/pyspark/sql/readwriter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index facc16bc53108..2add53d482ce5 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, - multiLine=None, allowUnquotedControlChars=None): + multiLine=None, allowUnquotedControlChars=None, charset=None): """ Loads JSON files and returns the results as a :class:`DataFrame`. @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. + :param charset: standard charset name, for example UTF-8, UTF-16 and UTF-32 If None is + set, the charset of input json will be detected automatically. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes From 76c1d08af25f8f4717314d6ba1409476d63b2ffd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 19 Feb 2018 23:16:31 +0100 Subject: [PATCH 26/37] Test for charset different from utf-8 --- python/pyspark/sql/tests.py | 9 +++++---- python/test_support/sql/people_array_utf16le.json | Bin 0 -> 182 bytes 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 python/test_support/sql/people_array_utf16le.json diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 95ca9cd665291..fc019f2d1ebeb 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -655,10 +655,11 @@ def test_multiLine_json(self): self.assertEqual(people1.collect(), people_array.collect()) def test_charset_json(self): - people1 = self.spark.read.option("charset", "UTF-8").json("python/test_support/sql/people.json") - people_array = self.spark.read.json("python/test_support/sql/people_array.json", - multiLine=True, charset="UTF-8") - self.assertEqual(people1.collect(), people_array.collect()) + people_array = self.spark.read\ + .json("python/test_support/sql/people_array_utf16le.json", + multiLine=True, charset="UTF-16LE") + expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')] + self.assertEqual(people_array.collect(), expected) def test_multiline_csv(self): ages_newlines = self.spark.read.csv( diff --git a/python/test_support/sql/people_array_utf16le.json b/python/test_support/sql/people_array_utf16le.json new file mode 100644 index 0000000000000000000000000000000000000000..9c657fa30ac9c651076ff8aa3676baa400b121fb GIT binary patch literal 182 zcma!M;9^h!!fGfDVk Date: Tue, 20 Feb 2018 13:39:34 +0100 Subject: [PATCH 27/37] Description of the charset option of the json method --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0139913aaa4e2..cd271bbdf6183 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. *
  • `multiLine` (default `false`): parse one record, which may span multiple lines, * per file
  • + *
  • `charset` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the charset + * is not specified (by default), the charset is detected automatically.
  • * * * @since 2.0.0 From f2f8ae72e024f39efaed8f93da11a7ebb0ef6870 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Feb 2018 13:48:41 +0100 Subject: [PATCH 28/37] Minor changes in comments: added . at the end of a sentence --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 2add53d482ce5..5a45708f16fd7 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -237,7 +237,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. - :param charset: standard charset name, for example UTF-8, UTF-16 and UTF-32 If None is + :param charset: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is set, the charset of input json will be detected automatically. >>> df1 = spark.read.json('python/test_support/sql/people.json') From b451a03f900aa76365e44fe10419cd8345feae09 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Feb 2018 20:47:58 +0100 Subject: [PATCH 29/37] Added a test for wrong charset name --- .../sql/execution/datasources/json/JsonSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a1ea9033d7a3e..0e6624f39ebb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2100,4 +2100,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } + + test("Unsupported charset name") { + val invalidCharset = "UTF-128" + val exception = intercept[SparkException] { + spark.read + .option("charset", invalidCharset) + .json(testFile("json-tests/utf16LE.json")) + .count() + } + val causedBy = exception.getCause + + assert(causedBy.isInstanceOf[java.io.UnsupportedEncodingException]) + assert(causedBy.getMessage.contains(invalidCharset)) + } } From c13c15946b077800d6d68fb77f0f4692cc9f3a17 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Feb 2018 21:05:03 +0100 Subject: [PATCH 30/37] Testing that charset in any case is acceptable --- .../sql/execution/datasources/json/JsonSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0e6624f39ebb2..f674ab352e98a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2114,4 +2114,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(causedBy.isInstanceOf[java.io.UnsupportedEncodingException]) assert(causedBy.getMessage.contains(invalidCharset)) } + + test("checking that the charset option is case agnostic") { + val fileName = "json-tests/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("charset", "uTf-16lE") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } } From 1cb3ac055bbdc6da720564a105b71f2eea3f5b55 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Feb 2018 21:07:19 +0100 Subject: [PATCH 31/37] Test: user specified wrong (but supported) charset --- .../execution/datasources/json/JsonSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f674ab352e98a..17ba95a094580 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2125,4 +2125,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) } + + + test("specified charset is not matched to actual charset") { + val fileName = "json-tests/utf16LE.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val exception = intercept[SparkException] { + spark.read.schema(schema) + .option("mode", "FAILFAST") + .option("multiline", "true") + .option("charset", "UTF-16BE") + .json(testFile(fileName)) + .count() + } + val errMsg = exception.getMessage + + assert(errMsg.contains("Malformed records are detected in record parsing")) + } } From 108e8e783d1575ea3dc406d372215002a871b02a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 25 Feb 2018 21:09:16 +0100 Subject: [PATCH 32/37] Set charset as an option --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 5a45708f16fd7..64f5507730a87 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -256,7 +256,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, - allowUnquotedControlChars=allowUnquotedControlChars) + allowUnquotedControlChars=allowUnquotedControlChars, charset=charset) if isinstance(path, basestring): path = [path] if type(path) == list: From 0d20cc699552e185b472202268407bd77aab3169 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Feb 2018 13:52:55 +0100 Subject: [PATCH 33/37] Test: saving to json in UTF-32BE --- .../execution/datasources/json/JsonSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 17ba95a094580..57e75c675acf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2142,4 +2142,22 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(errMsg.contains("Malformed records are detected in record parsing")) } + + test("save json in UTF-32BE") { + val charset = "UTF-32BE" + withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write + .option("charset", charset) + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + val jsonFiles = new File(path.getCanonicalPath).listFiles() + .filter(_.isFile).filter(_.getName.endsWith("json")) + val written = jsonFiles.map { file => + scala.io.Source.fromFile(file, charset).mkString + }.mkString.trim.replaceAll(" ", "") + + assert(written == """{"_1":"Dog","_2":42}""") + } + } } From 54baf9fb0fe34fe3c5f2457a80149c7f0b3d2211 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Feb 2018 14:16:58 +0100 Subject: [PATCH 34/37] Taking user's charset for saved json --- .../execution/datasources/json/JsonFileFormat.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 0862c746fffad..8d422dd95bfff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.json +import java.nio.charset.{Charset, StandardCharsets} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -151,7 +153,16 @@ private[json] class JsonOutputWriter( context: TaskAttemptContext) extends OutputWriter with Logging { - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + private val charset = options.charset match { + case Some(charsetName) => Charset.forName(charsetName) + case _ => StandardCharsets.UTF_8 + } + + private val writer = CodecStreams.createOutputStreamWriter( + context, + new Path(path), + charset + ) // create the Generator without separator inserted between 2 records private[this] val gen = new JacksonGenerator(dataSchema, writer, options) From 1d50d945587832d1b29cfbd8b92d989728df2ef8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Feb 2018 14:39:51 +0100 Subject: [PATCH 35/37] Test: output charset is UTF-8 by default --- .../datasources/json/JsonSuite.scala | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 57e75c675acf3..0f8261d0cd8d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2143,6 +2143,22 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(errMsg.contains("Malformed records are detected in record parsing")) } + def readJsonFiles(path: String, charset: String): String = { + val jsonFiles = new File(path) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) + val content = jsonFiles.map { file => + scala.io.Source.fromFile(file, charset).mkString + } + val result = content + .mkString + .trim + .replaceAll(" ", "") + + result + } + test("save json in UTF-32BE") { val charset = "UTF-32BE" withTempPath { path => @@ -2151,13 +2167,38 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("charset", charset) .format("json").mode("overwrite") .save(path.getCanonicalPath) - val jsonFiles = new File(path.getCanonicalPath).listFiles() - .filter(_.isFile).filter(_.getName.endsWith("json")) - val written = jsonFiles.map { file => - scala.io.Source.fromFile(file, charset).mkString - }.mkString.trim.replaceAll(" ", "") + val written = readJsonFiles(path.getCanonicalPath, charset) + + assert(written == """{"_1":"Dog","_2":42}""") + } + } + + test("save json in default charset - UTF-8") { + withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + val written = readJsonFiles(path.getCanonicalPath, "UTF-8") assert(written == """{"_1":"Dog","_2":42}""") } } + + test("wrong output charset") { + val charset = "UTF-128" + val exception = intercept[SparkException] { + withTempPath { path => + val df = spark.createDataset(Seq((0))) + df.write + .option("charset", charset) + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + } + } + val causedBy = exception.getCause.getCause.getCause + + assert(causedBy.isInstanceOf[java.nio.charset.UnsupportedCharsetException]) + assert(causedBy.getMessage == charset) + } } From bb537981d7fb2fb6fe359ebe70ffc0bc4f483a03 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Mar 2018 15:05:35 +0100 Subject: [PATCH 36/37] Changing the readJsonFiles method for readability --- .../datasources/json/JsonSuite.scala | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0f8261d0cd8d8..8ad8914e31c3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2143,20 +2143,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(errMsg.contains("Malformed records are detected in record parsing")) } - def readJsonFiles(path: String, charset: String): String = { - val jsonFiles = new File(path) + def checkCharset( + expectedCharset: String, + pathToJsonFiles: String, + expectedContent: String + ): Unit = { + val jsonFiles = new File(pathToJsonFiles) .listFiles() .filter(_.isFile) .filter(_.getName.endsWith("json")) - val content = jsonFiles.map { file => - scala.io.Source.fromFile(file, charset).mkString + val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString } - val result = content + val cleanedContent = jsonContent .mkString .trim .replaceAll(" ", "") - result + assert(cleanedContent == expectedContent) } test("save json in UTF-32BE") { @@ -2167,9 +2171,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("charset", charset) .format("json").mode("overwrite") .save(path.getCanonicalPath) - val written = readJsonFiles(path.getCanonicalPath, charset) - assert(written == """{"_1":"Dog","_2":42}""") + checkCharset( + expectedCharset = charset, + pathToJsonFiles = path.getCanonicalPath, + expectedContent = """{"_1":"Dog","_2":42}""" + ) } } @@ -2179,9 +2186,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { df.write .format("json").mode("overwrite") .save(path.getCanonicalPath) - val written = readJsonFiles(path.getCanonicalPath, "UTF-8") - assert(written == """{"_1":"Dog","_2":42}""") + checkCharset( + expectedCharset = "UTF-8", + pathToJsonFiles = path.getCanonicalPath, + expectedContent = """{"_1":"Dog","_2":42}""" + ) } } From 961b48225b450f9053681405f594911896e3a7ff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 4 Mar 2018 16:08:45 +0100 Subject: [PATCH 37/37] The test checks that json written by Spark can be read back --- .../datasources/json/JsonSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8ad8914e31c3c..0b18a6948035d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2072,6 +2072,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val fileName = "json-tests/utf16WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) + // The mode filters null rows produced because new line delimiter + // for UTF-8 is used by default. .option("mode", "DROPMALFORMED") .json(testFile(fileName)) @@ -2211,4 +2213,28 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(causedBy.isInstanceOf[java.nio.charset.UnsupportedCharsetException]) assert(causedBy.getMessage == charset) } + + test("read written json in UTF-16") { + val charset = "UTF-16" + case class Rec(f1: String, f2: Int) + withTempPath { path => + val ds = spark.createDataset(Seq( + ("a", 1), ("b", 2), ("c", 3)) + ).repartition(2) + ds.write + .option("charset", charset) + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + val savedDf = spark + .read + .schema(ds.schema) + .option("charset", charset) + // Wrong (nulls) rows are produced because new line delimiter + // for UTF-8 is used by default. + .option("mode", "DROPMALFORMED") + .json(path.getCanonicalPath) + + checkAnswer(savedDf.toDF(), ds.toDF()) + } + } }