Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,26 @@ private[sql] class JSONOptions(
val encoding: Option[String] = parameters.get("encoding")
.orElse(parameters.get("charset")).map(checkedEncoding)

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.getOrElse("UTF-8"))
/**
* A sequence of bytes between two consecutive json records in read.
* Format of the `lineSep` option is:
* selector (1 char) + separator spec (any length) | sequence of chars
*
* Currently the following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this one (sequence of bytes) should be supported via option APIs, not only for lineSep specifically. For example, I proposed an array type support in option - #16611. Maybe, we could try a similar approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your approach looks interesting but your PRs #20125 and #16611 stuck. Are there any specific reasons for that?

Maybe, we could try a similar approach.

We could but it requires extension of public API more widely than it is needed by this changes. Do you think it makes sense? Actually I would consider lineSep as a string in JSON format like:

.option("lineSep", "[0x00, 0x0A, 0x00, 0x0D]")

or

.option("lineSep", """{"sep": "\r\n", "encoding": "UTF-16LE"}""")

but for regular cases of simple lineSep, this approach looks pretty redundant:

.option("lineSep", ",") vs .option("lineSep", """{"sep": ","}""")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why they are stuck. I was just thinking of doing sequence of bytes separately since I feel in the same way about the need and there's a PR that potentially covers this case.
I am less sure about other formats yet - don't feel strongly but I don't have better alternatives yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for clarification, you will accept only the solution with an array for lineSep, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I thought and I hope the sequence of bytes alone is taken out here if you won't mind. In this case, I don't want to block this PR too likewise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I misunderstood this PR because you proposed a regex support too before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reserved the r prefix for that. It is hard to support it right now because it requires some support from Hadoop's LineReader.

* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' and '/' - reserved for future use
*/
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"The $reserved selector has not supported yet")
case lineSep =>
lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8))
}
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,36 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti

val encoding: Option[String] = parameters.get(ENCODING)

/**
* A string between two consecutive text lines.
* Note: the option 'lineSep' uses a different default value in read and write.
*/
val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { lineSep =>
require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")

lineSep
}

// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8))
/**
* A sequence of bytes between two consecutive text lines in read.
* Format of the `lineSep` option is:
* selector (1 char) + separator spec (any length) | sequence of chars
*
* Currently the following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' and '/' - reserved for future use
*/
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"The $reserved selector has not supported yet")
case lineSep =>
lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8))
}
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
Expand Down
Binary file not shown.
Binary file modified sql/core/src/test/resources/test-data/utf16WithBOM.json
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2173,7 +2173,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

test("SPARK-23723: json in UTF-16 with BOM") {
val fileName = "test-data/utf16WithBOM.json"
val fileName = "test-data/utf16LEWithBOM.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val jsonDF = spark.read.schema(schema)
.option("multiline", "true")
Expand Down Expand Up @@ -2339,12 +2339,20 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

def checkReadJson(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = {
test(s"SPARK-23724: checks reading json in ${encoding} #${id}") {
def lineSepInBytes: Array[Byte] = {
if (lineSep.startsWith("x")) {
lineSep.replaceAll("[^0-9A-Fa-f]", "")
.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
} else {
lineSep.getBytes(encoding)
}
}
val schema = new StructType().add("f1", StringType).add("f2", IntegerType)
withTempPath { path =>
val records = List(("a", 1), ("b", 2))
val data = records
.map(rec => s"""{"f1":"${rec._1}", "f2":${rec._2}}""".getBytes(encoding))
.reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2)
.reduce((a1, a2) => a1 ++ lineSepInBytes ++ a2)
val os = new FileOutputStream(path)
os.write(data)
os.close()
Expand Down Expand Up @@ -2377,9 +2385,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
(10, "\u000d\u000a", "UTF-32BE", false),
(11, "\u000a\u000d", "UTF-8", true),
(12, "===", "US-ASCII", false),
(13, "$^+", "utf-32le", true)
).foreach {
case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum)
(13, "$^+", "utf-32le", true),
(14, "x00 0a 00 0d", "UTF-16BE", false),
(15, "x0a.00.00.00 0d.00.00.00", "UTF-32LE", true)
).foreach { case (testNum, sep, encoding, inferSchema) =>
checkReadJson(sep, encoding, inferSchema, testNum)
}
// scalastyle:on nonascii

Expand Down Expand Up @@ -2494,4 +2504,26 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(exception.getMessage.contains("encoding must not be included in the blacklist"))
}
}

test("SPARK-24118: Read json in UTF-16 with BOM") {
val fileName = "test-data/utf16WithBOM.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val jsonDF = spark.read.schema(schema)
.option("multiLine", false)
.option("lineSep", "x0d.00 0a.00")
.json(testFile(fileName))

checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))
}

test("SPARK-24118: Read json in UTF-32 with BOM") {
val fileName = "test-data/utf32WithBOM.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val jsonDF = spark.read.schema(schema)
.option("multiLine", false)
.option("lineSep", "x00 00 00 0a")
.json(testFile(fileName))

checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))
}
}