diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 47eeb70e00427..ad4d3c472a2cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -108,8 +108,26 @@ private[sql] class JSONOptions( val encoding: Option[String] = parameters.get("encoding") .orElse(parameters.get("charset")).map(checkedEncoding) - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.getOrElse("UTF-8")) + /** + * A sequence of bytes between two consecutive json records in read. + * Format of the `lineSep` option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case lineSep => + lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) } val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index e4e201995faa2..1ca7961dad130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -43,15 +43,36 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti val encoding: Option[String] = parameters.get(ENCODING) + /** + * A string between two consecutive text lines. + * Note: the option 'lineSep' uses a different default value in read and write. + */ val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { lineSep => require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") lineSep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) + /** + * A sequence of bytes between two consecutive text lines in read. + * Format of the `lineSep` option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case lineSep => + lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8)) } val lineSeparatorInWrite: Array[Byte] = lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) diff --git a/sql/core/src/test/resources/test-data/utf16LEWithBOM.json b/sql/core/src/test/resources/test-data/utf16LEWithBOM.json new file mode 100644 index 0000000000000..cf4d29328b860 Binary files /dev/null and b/sql/core/src/test/resources/test-data/utf16LEWithBOM.json differ diff --git a/sql/core/src/test/resources/test-data/utf16WithBOM.json b/sql/core/src/test/resources/test-data/utf16WithBOM.json index cf4d29328b860..65e7e2f729481 100644 Binary files a/sql/core/src/test/resources/test-data/utf16WithBOM.json and b/sql/core/src/test/resources/test-data/utf16WithBOM.json differ diff --git a/sql/core/src/test/resources/test-data/utf32WithBOM.json b/sql/core/src/test/resources/test-data/utf32WithBOM.json new file mode 100644 index 0000000000000..b1a0596c184b8 Binary files /dev/null and b/sql/core/src/test/resources/test-data/utf32WithBOM.json differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index eab15b35c97d3..d8b4fa3147f08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2173,7 +2173,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-23723: json in UTF-16 with BOM") { - val fileName = "test-data/utf16WithBOM.json" + val fileName = "test-data/utf16LEWithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) .option("multiline", "true") @@ -2339,12 +2339,20 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { def checkReadJson(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = { test(s"SPARK-23724: checks reading json in ${encoding} #${id}") { + def lineSepInBytes: Array[Byte] = { + if (lineSep.startsWith("x")) { + lineSep.replaceAll("[^0-9A-Fa-f]", "") + .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } else { + lineSep.getBytes(encoding) + } + } val schema = new StructType().add("f1", StringType).add("f2", IntegerType) withTempPath { path => val records = List(("a", 1), ("b", 2)) val data = records .map(rec => s"""{"f1":"${rec._1}", "f2":${rec._2}}""".getBytes(encoding)) - .reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2) + .reduce((a1, a2) => a1 ++ lineSepInBytes ++ a2) val os = new FileOutputStream(path) os.write(data) os.close() @@ -2377,9 +2385,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { (10, "\u000d\u000a", "UTF-32BE", false), (11, "\u000a\u000d", "UTF-8", true), (12, "===", "US-ASCII", false), - (13, "$^+", "utf-32le", true) - ).foreach { - case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum) + (13, "$^+", "utf-32le", true), + (14, "x00 0a 00 0d", "UTF-16BE", false), + (15, "x0a.00.00.00 0d.00.00.00", "UTF-32LE", true) + ).foreach { case (testNum, sep, encoding, inferSchema) => + checkReadJson(sep, encoding, inferSchema, testNum) } // scalastyle:on nonascii @@ -2494,4 +2504,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exception.getMessage.contains("encoding must not be included in the blacklist")) } } + + test("SPARK-24118: Read json in UTF-16 with BOM") { + val fileName = "test-data/utf16WithBOM.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiLine", false) + .option("lineSep", "x0d.00 0a.00") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-24118: Read json in UTF-32 with BOM") { + val fileName = "test-data/utf32WithBOM.json" + val schema = new StructType().add("firstName", StringType).add("lastName", StringType) + val jsonDF = spark.read.schema(schema) + .option("multiLine", false) + .option("lineSep", "x00 00 00 0a") + .json(testFile(fileName)) + + checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } }