diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 6ec634db780b2..caba243e02b36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -85,35 +85,59 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - /** - * A sequence of bytes between two consecutive json records. - */ - val lineSeparator: Option[String] = parameters.get("lineSep") - /** * Standard charset name. For example UTF-8, UTF-16LE and UTF-32BE. * If the encoding is not specified (None), it will be detected automatically. */ val encoding: Option[String] = parameters.get("encoding") .orElse(parameters.get("charset")).map { enc => - val blacklist = List("UTF16", "UTF32") - val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) - require(multiLine || !isBlacklisted, - s"""The ${enc} encoding must not be included in the blacklist: - | ${blacklist.mkString(", ")}""".stripMargin) - - val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) - require(forcingLineSep, - s"""The lineSep option must be specified for the $enc encoding. - |Example: .option("lineSep", "|^|") - |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin) - enc + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, + s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + enc } - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(encoding.getOrElse("UTF-8")) + /** + * A sequence of bytes between two consecutive json objects. + * Format of the option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case "" => throw new IllegalArgumentException("lineSep cannot be empty string") + case lineSep => lineSep.getBytes(encoding.getOrElse("UTF-8")) + }.orElse { + val forcingLineSep = multiLine || encoding.isEmpty || encoding == Some("UTF-8") + require(forcingLineSep, + s"""The lineSep option must be specified for the ${encoding.get} encoding. + |Example: .option("lineSep", "|^|") + |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin) + None } - val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") + + /** + * A sequence of bytes between two consecutive json objects used by JSON Reader to + * split input stream/text. + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator + /** + * JSON Writer puts the string between json objects in output stream/text. + */ + val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { @@ -128,7 +152,7 @@ private[sql] class JSONOptions( } def getTextOptions: Map[String, String] = { - Map[String, String]() ++ - encoding.map("encoding" -> _) ++ lineSeparator.map("lineSep" -> _) + Map[String, String]() ++ encoding.map("encoding" -> _) ++ + lineSeparator.map("lineSep" -> _.map("x%02x".format(_)).mkString) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9c413de752a8c..11073bb3343c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer -import java.nio.charset.StandardCharsets +import java.nio.charset.Charset import com.fasterxml.jackson.core._ @@ -75,7 +75,12 @@ private[sql] class JacksonGenerator( private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) - private val lineSeparator: String = options.lineSeparatorInWrite + private val lineSeparator: String = { + new String( + options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte)), + Charset.forName(options.encoding.getOrElse("UTF-8")) + ) + } private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => @@ -255,7 +260,6 @@ private[sql] class JacksonGenerator( } def writeLineEnding(): Unit = { - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. gen.writeRaw(lineSeparator) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 9647f09867643..9e67abac7ec20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -86,7 +86,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context) + new TextOutputWriter(path, dataSchema, textOptions, context) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -149,10 +149,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { class TextOutputWriter( path: String, dataSchema: StructType, - lineSeparator: Array[Byte], + options: TextOptions, context: TaskAttemptContext) extends OutputWriter { + private val lineSeparator = options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte)) + private val writer = CodecStreams.createOutputStream(context, new Path(path)) override def write(row: InternalRow): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 53b95f01c4613..83852123708e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -43,16 +43,37 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti val encoding: Option[String] = parameters.get(ENCODING) - val lineSeparator: Option[Array[Byte]] = parameters.get(LINE_SEPARATOR).map { lineSep => - require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") - - lineSep.getBytes(encoding.getOrElse("UTF-8")) + /** + * A sequence of bytes between two consecutive lines in a text. + * Format of the option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case "" => throw new IllegalArgumentException("lineSep cannot be empty string") + case lineSep => lineSep.getBytes(encoding.getOrElse("UTF-8")) } - // Note that the option 'lineSep' uses a different default value in read and write. + /** + * A sequence of bytes between two consecutive lines used by Text Reader to + * split input stream/text. + */ val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator - val lineSeparatorInWrite: Array[Byte] = - lineSeparatorInRead.getOrElse("\n".getBytes("UTF-8")) + /** + * Text Writer puts the string between lines in output stream/text. + */ + val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator } private[datasources] object TextOptions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 2b49279059aed..eb3803f17bfe9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2140,6 +2140,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("encoding", "UTF-16") .json(testFile(fileName)) + val a = jsonDF.collect() checkAnswer(jsonDF, Seq( Row("Chris", "Baird"), Row("Doug", "Rood") )) @@ -2349,7 +2350,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("\u000d\u000a", "encoding", "UTF-32BE", false), ("\u000a\u000d", "encoding", "UTF-8", true), ("===", "encoding", "US-ASCII", false), - ("$^+", "encoding", "utf-32le", true) + ("$^+", "encoding", "utf-32le", true), + ("x00 0a 00 0d", "encoding", "UTF-16BE", false), + ("x0a.00.00.00 0d.00.00.00", "encoding", "UTF-32LE", true) ).zipWithIndex.foreach{case ((d, o, c, s), i) => checkReadJson(d, o, c, s, i)} // scalastyle:on nonascii