diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index df393906557f..f0c86d2cdcc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, InputStream, InputStreamReader} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.hadoop.io.Text +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.unsafe.types.UTF8String private[sql] object CreateJacksonParser extends Serializable { @@ -60,4 +61,13 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(is) } } + + def internalRow( + jsonFactory: JsonFactory, + row: InternalRow, + charset: Option[String] = None + ): JsonParser = { + val is = new ByteArrayInputStream(row.getBinary(0)) + inputStream(jsonFactory, is, charset) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index c261778421c1..b4deab202026 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -91,6 +91,31 @@ private[sql] class JSONOptions( */ val charset: Option[String] = parameters.get("charset") + /** + * A sequence of bytes between two consecutive json records. Format of the option is: + * selector (1 char) + delimiter body (any length) + * The following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' - specifies a regular expression + * - 'none' - json records are not divided by any delimiter + * + * Note: the option defines a delimiter for the json reader only, the json writer + * uses '\n' as the delimiter of output records (it is converted to sequence of + * bytes according to charset) + */ + val recordDelimiter: Option[Array[Byte]] = parameters.get("recordDelimiter").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("none") => + throw new NotImplementedError(s"the $reserved selector has not supported yet") + case delim => delim.getBytes(charset.getOrElse( + throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) + } + /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) @@ -102,4 +127,10 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } + + def getTextOptions: Map[String, String] = { + recordDelimiter.map{ bytes => + "recordDelimiter" -> bytes.map("%02x".format(_)).mkString + }.toMap + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index 83cf26c63a17..0e5a7462770b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -32,7 +32,10 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl * in that file. */ class HadoopFileLinesReader( - file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { + file: PartitionedFile, + conf: Configuration, + recordDelimiter: Option[Array[Byte]] = None + ) extends Iterator[Text] with Closeable { private val iterator = { val fileSplit = new FileSplit( new Path(new URI(file.filePath)), @@ -42,7 +45,10 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - val reader = new LineRecordReader() + val reader = recordDelimiter match { + case Some(delim) => new LineRecordReader(delim) + case _ => new LineRecordReader() + } reader.initialize(fileSplit, hadoopAttemptContext) new RecordReaderIterator(reader) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 913b15c09b09..33c4d47402dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -33,6 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat @@ -92,25 +93,33 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { - val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths) + val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) - val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) - JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) + val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd + + JsonInferSchema.infer[InternalRow]( + rdd, + parsedOptions, + CreateJacksonParser.internalRow(_, _, parsedOptions.charset) + ) } private def createBaseDataset( sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): Dataset[String] = { + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions + ): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, - className = classOf[TextFileFormat].getName + className = classOf[TextFileFormat].getName, + options = parsedOptions.getTextOptions ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } @@ -120,7 +129,7 @@ object TextInputJsonDataSource extends JsonDataSource { file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { - val linesReader = new HadoopFileLinesReader(file, conf) + val linesReader = new HadoopFileLinesReader(file, conf, parser.options.recordDelimiter) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) val charset = parser.options.charset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index c661e9bd3b94..c7af107e296d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -113,18 +113,24 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText) + readToUnsafeMem( + broadcastedHadoopConf, + requiredSchema, + textOptions.wholeText, + textOptions.recordDelimiter + ) } private def readToUnsafeMem( conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, - wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = { + wholeTextMode: Boolean, + recordDelimiter: Option[Array[Byte]]): (PartitionedFile) => Iterator[UnsafeRow] = { (file: PartitionedFile) => { val confValue = conf.value.value val reader = if (!wholeTextMode) { - new HadoopFileLinesReader(file, confValue) + new HadoopFileLinesReader(file, confValue, recordDelimiter) } else { new HadoopFileWholeTextReader(file, confValue) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 2a661561ab51..eea8e04ad171 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -39,9 +39,13 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val recordDelimiter: Option[Array[Byte]] = parameters.get(RECORDDELIMITER).map { hex => + hex.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val RECORDDELIMITER = "recordDelimiter" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0b18a6948035..8a818a388a62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.json -import java.io.{File, StringWriter} +import java.io.{File, FileOutputStream, StringWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util.Locale @@ -2072,9 +2072,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val fileName = "json-tests/utf16WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) - // The mode filters null rows produced because new line delimiter - // for UTF-8 is used by default. - .option("mode", "DROPMALFORMED") + .option("recordDelimiter", "x0d 00 0a 00") .json(testFile(fileName)) checkAnswer(jsonDF, Seq( @@ -2214,27 +2212,88 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(causedBy.getMessage == charset) } - test("read written json in UTF-16") { - val charset = "UTF-16" - case class Rec(f1: String, f2: Int) - withTempPath { path => - val ds = spark.createDataset(Seq( - ("a", 1), ("b", 2), ("c", 3)) - ).repartition(2) - ds.write - .option("charset", charset) - .format("json").mode("overwrite") - .save(path.getCanonicalPath) - val savedDf = spark - .read - .schema(ds.schema) - .option("charset", charset) - // Wrong (nulls) rows are produced because new line delimiter - // for UTF-8 is used by default. - .option("mode", "DROPMALFORMED") - .json(path.getCanonicalPath) + def checkReadWrittenJson(charset: String, delimiter: String, runId: Int): Unit = { + test(s"checks Spark is able to read json written by Spark itself #${runId}") { + withTempPath { path => + val ds = spark.createDataset(Seq( + ("a", 1), ("b", 2), ("c", 3)) + ).repartition(1) + ds.write + .option("charset", charset) + .format("json").mode("overwrite") + .save(path.getCanonicalPath) + val savedDf = spark + .read + .schema(ds.schema) + .option("charset", charset) + .option("recordDelimiter", delimiter) + .json(path.getCanonicalPath) + + checkAnswer(savedDf.toDF(), ds.toDF()) + } + } + } - checkAnswer(savedDf.toDF(), ds.toDF()) + List( + ("\n", "UTF-8"), + ("x00 0a", "UTF-16BE"), + ("\n", "UTF-16LE"), + ("\u000a", "UTF-32BE"), + ("x0a 00 00 00", "UTF-32LE") + ).zipWithIndex.foreach{case ((d, c), i) => checkReadWrittenJson(c, d, i)} + + def checkReadJson( + charset: String, + delimiter: String, + inferSchema: Boolean, + runId: Int + ): Unit = { + test(s"checks reading json in ${charset} #${runId}") { + val delimInBytes = { + if (delimiter.startsWith("x")) { + delimiter.replaceAll("[^0-9A-Fa-f]", "") + .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } else { + delimiter.getBytes(charset) + } + } + case class Rec(f1: String, f2: Int) { + def json = s"""{"f1":"${f1}", "f2":$f2}""" + def bytes = json.getBytes(charset) + def row = Row(f1, f2) + } + val schema = new StructType().add("f1", StringType).add("f2", IntegerType) + withTempPath { path => + val records = List(Rec("a", 1), Rec("b", 2)) + val data = records.map(_.bytes).reduce((a1, a2) => a1 ++ delimInBytes ++ a2) + val os = new FileOutputStream(path) + os.write(data) + os.close() + val reader = if (inferSchema) { + spark.read + } else { + spark.read.schema(schema) + } + val savedDf = reader + .option("charset", charset) + .option("recordDelimiter", delimiter) + .json(path.getCanonicalPath) + checkAnswer(savedDf, records.map(_.row)) + } } } + + List( + ("sep", "UTF-8", false), + ("x00 0a 00 0d", "UTF-16BE", false), + ("x00 0a 00 0d", "UTF-16BE", true), + ("\r\n", "UTF-16LE", false), + ("\r\n", "UTF-16LE", true), + ("\u000d\u000a", "UTF-32BE", false), + ("\u000a\u000d", "UTF-32BE", true), + ("===", "UTF-32LE", false), + ("$^+", "UTF-32LE", true), + ("xEA.F3.EA.F3", "CP1251", false), + ("xEA.F3.EA.F3", "CP1251", true) + ).zipWithIndex.foreach{case ((d, c, s), i) => checkReadJson(c, d, s, i)} }