From 60d5828df1b81b17eedf0bf5d307e4cef2f4453b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 21:33:45 +0200 Subject: [PATCH 1/4] Flexible format of the lineSep option --- .../spark/sql/catalyst/json/JSONOptions.scala | 34 ++++++++------ .../datasources/text/TextOptions.scala | 27 +++++++++-- .../resources/test-data/utf32WithBOM.json | Bin 0 -> 332 bytes .../datasources/json/JsonSuite.scala | 42 +++++++++++++++--- 4 files changed, 81 insertions(+), 22 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/utf32WithBOM.json diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5f130af606e19..277fd94bedcd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -101,18 +101,6 @@ private[sql] class JSONOptions( */ val encoding: Option[String] = parameters.get("encoding") .orElse(parameters.get("charset")).map { enc => - // The following encodings are not supported in per-line mode (multiline is false) - // because they cause some problems in reading files with BOM which is supposed to - // present in the files with such encodings. After splitting input files by lines, - // only the first lines will have the BOM which leads to impossibility for reading - // the rest lines. Besides of that, the lineSep option must have the BOM in such - // encodings which can never present between lines. - val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) - val isBlacklisted = blacklist.contains(Charset.forName(enc)) - require(multiLine || !isBlacklisted, - s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: - | ${blacklist.mkString(", ")}""".stripMargin) - val isLineSepRequired = !(multiLine == false && Charset.forName(enc) != StandardCharsets.UTF_8 && lineSeparator.isEmpty) require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding") @@ -120,8 +108,26 @@ private[sql] class JSONOptions( enc } - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.getOrElse("UTF-8")) + /** + * A sequence of bytes between two consecutive json records in read. + * Format of the `lineSep` option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case lineSep => + lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) } val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index e4e201995faa2..1ca7961dad130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -43,15 +43,36 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti val encoding: Option[String] = parameters.get(ENCODING) + /** + * A string between two consecutive text lines. + * Note: the option 'lineSep' uses a different default value in read and write. + */ val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { lineSep => require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") lineSep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) + /** + * A sequence of bytes between two consecutive text lines in read. + * Format of the `lineSep` option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case lineSep => + lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) } val lineSeparatorInWrite: Array[Byte] = lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) diff --git a/sql/core/src/test/resources/test-data/utf32WithBOM.json b/sql/core/src/test/resources/test-data/utf32WithBOM.json new file mode 100644 index 0000000000000000000000000000000000000000..b1a0596c184b88907fdad1414dc0c8dab98dbcd3 GIT binary patch literal 332 zcmZQz`1hZIfuS0Rm4G-6h%2#AY;xCDs(fH)C|bAdP&h(YSCptLiP&H!SNdXPSl z9+12a5Gz30IY1hupBVF;plV@mNP(JB3#7S#m|S val records = List(("a", 1), ("b", 2)) val data = records .map(rec => s"""{"f1":"${rec._1}", "f2":${rec._2}}""".getBytes(encoding)) - .reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2) + .reduce((a1, a2) => a1 ++ lineSepInBytes ++ a2) val os = new FileOutputStream(path) os.write(data) os.close() @@ -2353,9 +2361,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { (10, "\u000d\u000a", "UTF-32BE", false), (11, "\u000a\u000d", "UTF-8", true), (12, "===", "US-ASCII", false), - (13, "$^+", "utf-32le", true) - ).foreach { - case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum) + (13, "$^+", "utf-32le", true), + (14, "x00 0a 00 0d", "UTF-16BE", false), + (15, "x0a.00.00.00 0d.00.00.00", "UTF-32LE", true) + ).foreach { case (testNum, sep, encoding, inferSchema) => + checkReadJson(sep, encoding, inferSchema, testNum) } // scalastyle:on nonascii @@ -2408,4 +2418,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), Row(badJson)) } + + test("Read json in UTF-16 with BOM") { + val fileName = "test-data/utf16WithBOM.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiLine", false) + .option("lineSep", "0d.00 0a.00") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("Read json in UTF-32 with BOM") { + val fileName = "test-data/utf32WithBOM.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiLine", false) + .option("lineSep", "x00 00 00 0a") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } } From 5f6f8b2fffb5f6b1e1921fcccae90b202d8f0ff9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 22:06:40 +0200 Subject: [PATCH 2/4] Add ticket number to tests titles --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f171938cd3a50..16a5a1b6787d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2419,7 +2419,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(badJson)) } - test("Read json in UTF-16 with BOM") { + test("SPARK-24118: Read json in UTF-16 with BOM") { val fileName = "test-data/utf16WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) @@ -2430,7 +2430,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) } - test("Read json in UTF-32 with BOM") { + test("SPARK-24118: Read json in UTF-32 with BOM") { val fileName = "test-data/utf32WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) From 4b3592f1c51185080474ba5512ea2c2c1472f902 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 22:07:35 +0200 Subject: [PATCH 3/4] Revert restrictions for UTF-16 and UTF-32 --- .../apache/spark/sql/catalyst/json/JSONOptions.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 277fd94bedcd0..21effd451cc57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -101,6 +101,18 @@ private[sql] class JSONOptions( */ val encoding: Option[String] = parameters.get("encoding") .orElse(parameters.get("charset")).map { enc => + // The following encodings are not supported in per-line mode (multiline is false) + // because they cause some problems in reading files with BOM which is supposed to + // present in the files with such encodings. After splitting input files by lines, + // only the first lines will have the BOM which leads to impossibility for reading + // the rest lines. Besides of that, the lineSep option must have the BOM in such + // encodings which can never present between lines. + val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) + val isBlacklisted = blacklist.contains(Charset.forName(enc)) + require(multiLine || !isBlacklisted, + s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: + | ${blacklist.mkString(", ")}""".stripMargin) + val isLineSepRequired = !(multiLine == false && Charset.forName(enc) != StandardCharsets.UTF_8 && lineSeparator.isEmpty) require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding") From bcc17e0aea4855ce6fed008b040ccd3a7c130ffc Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 1 May 2018 13:28:05 +0200 Subject: [PATCH 4/4] Fix a test: use multi-line files as an input --- .../resources/test-data/utf16LEWithBOM.json | Bin 0 -> 200 bytes .../test/resources/test-data/utf16WithBOM.json | Bin 200 -> 170 bytes .../execution/datasources/json/JsonSuite.scala | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/utf16LEWithBOM.json diff --git a/sql/core/src/test/resources/test-data/utf16LEWithBOM.json b/sql/core/src/test/resources/test-data/utf16LEWithBOM.json new file mode 100644 index 0000000000000000000000000000000000000000..cf4d29328b860ffe8288edea437222c6d432a100 GIT binary patch literal 200 zcmezWFPedufr~)_3ac5E7}6Lr8HyN+8A=%Z7!nzB8B&2_RzU2`kO36W1j;Be=m6C# tG2{T{G1WN%ML{N{09DiiRT68y3qw9bDMLB|(}RGj@}XvfOpXPc4*LCm}iTodzy0Xm)q)Bpeg delta 50 mcmZ3*c!H7t|G#JkUIs1(1qhrNE;~`qj8_L$ESN18EC2u(^a$br diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 16a5a1b6787d9..2870d6377acdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2173,7 +2173,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-23723: json in UTF-16 with BOM") { - val fileName = "test-data/utf16WithBOM.json" + val fileName = "test-data/utf16LEWithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) .option("multiline", "true") @@ -2424,7 +2424,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) .option("multiLine", false) - .option("lineSep", "0d.00 0a.00") + .option("lineSep", "x0d.00 0a.00") .json(testFile(fileName)) checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))