Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String

private[sql] object CreateJacksonParser extends Serializable {
Expand Down Expand Up @@ -60,4 +61,13 @@ private[sql] object CreateJacksonParser extends Serializable {
jsonFactory.createParser(is)
}
}

def internalRow(
jsonFactory: JsonFactory,
row: InternalRow,
charset: Option[String] = None
): JsonParser = {
val is = new ByteArrayInputStream(row.getBinary(0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to assume the data is in the first column of the input row. How about

def bytes(jsonFactory: JsonFactory, bytes: Array[Byte], charset: Option[String])

inputStream(jsonFactory, is, charset)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,31 @@ private[sql] class JSONOptions(
*/
val charset: Option[String] = parameters.get("charset")

/**
* A sequence of bytes between two consecutive json records. Format of the option is:
* selector (1 char) + delimiter body (any length)
* The following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' - specifies a regular expression
* - 'none' - json records are not divided by any delimiter
*
* Note: the option defines a delimiter for the json reader only, the json writer
* uses '\n' as the delimiter of output records (it is converted to sequence of
* bytes according to charset)
*/
val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("none") =>
throw new NotImplementedError(s"the $reserved selector has not supported yet")
case delim => delim.getBytes(charset.getOrElse(
throw new IllegalArgumentException("Please, set the charset option for the delimiter")))
}

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
Expand All @@ -102,4 +127,10 @@ private[sql] class JSONOptions(
allowBackslashEscapingAnyCharacter)
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
}

def getTextOptions: Map[String, String] = {
recordDelimiter.map{ bytes =>
"recordDelimiter" -> bytes.map("%02x".format(_)).mkString
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
* in that file.
*/
class HadoopFileLinesReader(
file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
file: PartitionedFile,
conf: Configuration,
recordDelimiter: Option[Array[Byte]] = None
) extends Iterator[Text] with Closeable {
private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
Expand All @@ -42,7 +45,10 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
val reader = new LineRecordReader()
val reader = recordDelimiter match {
case Some(delim) => new LineRecordReader(delim)
case _ => new LineRecordReader()
Copy link

@HyukjinKwon HyukjinKwon Mar 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is, we can't cover only when charset is set. It covers \r \r\n and \n in UTF-8 by default. If charset alone is given, it should also cover this case likewise.

BTW, I am leaving all those comments because I have walked through the almost identical approach a long ago in my local and I just decided to not propose that change.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more note: this was the end of my try. I just gave up with a conclusion that we need a complete rewrite of LineRecordReader and thought it's not worth.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we force users to provide a customer recordDelimiter if charset is specified?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe arguably the more common case is a case when charset is set alone. It looks a bit odds that charset option is dependent on recordDelimiter.
If I understood correctly,charset can't be used alone in that way but I believe that's what most users would try because the same option charset is there in CSV.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does CSV solve this problem?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV has the same issue. That option is incomplete. So, I believe we currently support ascii-compatible encodings only in CSV.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, see:

curl -O https://raw.githubusercontent.com/HyukjinKwon/spark/264a1dc603164bd264e0c084608f31ffb8ad5f69/sql/core/src/test/resources/cars_utf-16.csv
scala> spark.read.option("encoding", "utf-16").option("header", true).csv("cars_utf-16.csv").show()
+----+-----+-----+--------------------+------+
|year| make|model|             comment|blank�|
+----+-----+-----+--------------------+------+
|2012|Tesla|    S|          No comment|     �|
|   �| null| null|                null|  null|
|1997| Ford| E350|Go get one now th...|     �|
|2015|Chevy|Volt�|                null|  null|
+----+-----+-----+--------------------+------+

So, I was thinking we should rather deprecate this option in CSV.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an offline discussion with @cloud-fan about #1 (comment). I am fine with that. There seems at least no hole.

}
reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
Expand Down Expand Up @@ -92,25 +93,33 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
inferFromDataset(json, parsedOptions)
}

def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd

JsonInferSchema.infer[InternalRow](
rdd,
parsedOptions,
CreateJacksonParser.internalRow(_, _, parsedOptions.charset)
)
}

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): Dataset[String] = {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions
): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
className = classOf[TextFileFormat].getName,
options = parsedOptions.getTextOptions
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
Expand All @@ -120,7 +129,7 @@ object TextInputJsonDataSource extends JsonDataSource {
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file, conf)
val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordDelimiter)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val charset = parser.options.charset

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,24 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
readToUnsafeMem(
broadcastedHadoopConf,
requiredSchema,
textOptions.wholeText,
textOptions.recordDelimiter
)
}

private def readToUnsafeMem(
conf: Broadcast[SerializableConfiguration],
requiredSchema: StructType,
wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = {
wholeTextMode: Boolean,
recordDelimiter: Option[Array[Byte]]): (PartitionedFile) => Iterator[UnsafeRow] = {

(file: PartitionedFile) => {
val confValue = conf.value.value
val reader = if (!wholeTextMode) {
new HadoopFileLinesReader(file, confValue)
new HadoopFileLinesReader(file, confValue, recordDelimiter)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean

val recordDelimiter: Option[Array[Byte]] = parameters.get(RECORDDELIMITER).map { hex =>
hex.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
}
}

private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
val RECORDDELIMITER = "recordDelimiter"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.json

import java.io.{File, StringWriter}
import java.io.{File, FileOutputStream, StringWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.Locale
Expand Down Expand Up @@ -2072,9 +2072,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val fileName = "json-tests/utf16WithBOM.json"
val schema = new StructType().add("firstName", StringType).add("lastName", StringType)
val jsonDF = spark.read.schema(schema)
// The mode filters null rows produced because new line delimiter
// for UTF-8 is used by default.
.option("mode", "DROPMALFORMED")
.option("recordDelimiter", "x0d 00 0a 00")
.json(testFile(fileName))

checkAnswer(jsonDF, Seq(
Expand Down Expand Up @@ -2214,27 +2212,88 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(causedBy.getMessage == charset)
}

test("read written json in UTF-16") {
val charset = "UTF-16"
case class Rec(f1: String, f2: Int)
withTempPath { path =>
val ds = spark.createDataset(Seq(
("a", 1), ("b", 2), ("c", 3))
).repartition(2)
ds.write
.option("charset", charset)
.format("json").mode("overwrite")
.save(path.getCanonicalPath)
val savedDf = spark
.read
.schema(ds.schema)
.option("charset", charset)
// Wrong (nulls) rows are produced because new line delimiter
// for UTF-8 is used by default.
.option("mode", "DROPMALFORMED")
.json(path.getCanonicalPath)
def checkReadWrittenJson(charset: String, delimiter: String, runId: Int): Unit = {
test(s"checks Spark is able to read json written by Spark itself #${runId}") {
withTempPath { path =>
val ds = spark.createDataset(Seq(
("a", 1), ("b", 2), ("c", 3))
).repartition(1)
ds.write
.option("charset", charset)
.format("json").mode("overwrite")
.save(path.getCanonicalPath)
val savedDf = spark
.read
.schema(ds.schema)
.option("charset", charset)
.option("recordDelimiter", delimiter)
.json(path.getCanonicalPath)

checkAnswer(savedDf.toDF(), ds.toDF())
}
}
}

checkAnswer(savedDf.toDF(), ds.toDF())
List(
("\n", "UTF-8"),
("x00 0a", "UTF-16BE"),
("\n", "UTF-16LE"),
("\u000a", "UTF-32BE"),
("x0a 00 00 00", "UTF-32LE")
).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)}

def checkReadJson(
charset: String,
delimiter: String,
inferSchema: Boolean,
runId: Int
): Unit = {
test(s"checks reading json in ${charset} #${runId}") {
val delimInBytes = {
if (delimiter.startsWith("x")) {
delimiter.replaceAll("[^0-9A-Fa-f]", "")
.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
} else {
delimiter.getBytes(charset)
}
}
case class Rec(f1: String, f2: Int) {
def json = s"""{"f1":"${f1}", "f2":$f2}"""
def bytes = json.getBytes(charset)
def row = Row(f1, f2)
}
val schema = new StructType().add("f1", StringType).add("f2", IntegerType)
withTempPath { path =>
val records = List(Rec("a", 1), Rec("b", 2))
val data = records.map(_.bytes).reduce((a1, a2) => a1 ++ delimInBytes ++ a2)
val os = new FileOutputStream(path)
os.write(data)
os.close()
val reader = if (inferSchema) {
spark.read
} else {
spark.read.schema(schema)
}
val savedDf = reader
.option("charset", charset)
.option("recordDelimiter", delimiter)
.json(path.getCanonicalPath)
checkAnswer(savedDf, records.map(_.row))
}
}
}

List(
("sep", "UTF-8", false),
("x00 0a 00 0d", "UTF-16BE", false),
("x00 0a 00 0d", "UTF-16BE", true),
("\r\n", "UTF-16LE", false),
("\r\n", "UTF-16LE", true),
("\u000d\u000a", "UTF-32BE", false),
("\u000a\u000d", "UTF-32BE", true),
("===", "UTF-32LE", false),
("$^+", "UTF-32LE", true),
("xEA.F3.EA.F3", "CP1251", false),
("xEA.F3.EA.F3", "CP1251", true)
).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)}
}