diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index e5288636c596e..ce428b228338d 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
- multiLine=None, allowUnquotedControlChars=None):
+ multiLine=None, allowUnquotedControlChars=None, lineSep=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
@@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
+ :param lineSep: defines the line separator that should be used for parsing. If None is
+ set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -254,7 +256,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
- allowUnquotedControlChars=allowUnquotedControlChars)
+ allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -746,7 +748,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
+ lineSep=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON `_) at the
specified path.
@@ -770,12 +773,15 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
+ :param lineSep: defines the line separator that should be used for writing. If None is
+ set, it uses the default value, ``\\n``.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
- compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
+ lineSep=lineSep)
self._jwrite.json(path)
@since(1.4)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 07f9ac1b5aa9e..490df4accf879 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -407,7 +407,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
- multiLine=None, allowUnquotedControlChars=None):
+ multiLine=None, allowUnquotedControlChars=None, lineSep=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
@@ -470,6 +470,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
+ :param lineSep: defines the line separator that should be used for parsing. If None is
+ set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -484,7 +486,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
- allowUnquotedControlChars=allowUnquotedControlChars)
+ allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 967cc83166f3f..505fc056369f5 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -676,6 +676,23 @@ def test_multiline_json(self):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())
+ def test_linesep_json(self):
+ df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
+ expected = [Row(_corrupt_record=None, name=u'Michael'),
+ Row(_corrupt_record=u' "age":30}\n{"name":"Justin"', name=None),
+ Row(_corrupt_record=u' "age":19}\n', name=None)]
+ self.assertEqual(df.collect(), expected)
+
+ tpath = tempfile.mkdtemp()
+ shutil.rmtree(tpath)
+ try:
+ df = self.spark.read.json("python/test_support/sql/people.json")
+ df.write.json(tpath, lineSep="!!")
+ readback = self.spark.read.json(tpath, lineSep="!!")
+ self.assertEqual(readback.collect(), df.collect())
+ finally:
+ shutil.rmtree(tpath)
+
def test_multiline_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", multiLine=True)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 652412b34478a..5c9adc3332bc0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.json
+import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
@@ -85,6 +86,16 @@ private[sql] class JSONOptions(
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
+ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
+ require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
+ sep
+ }
+ // Note that the option 'lineSep' uses a different default value in read and write.
+ val lineSeparatorInRead: Option[Array[Byte]] =
+ lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
+ // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
+ val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
+
/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index eb06e4f304f0a..9c413de752a8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.json
import java.io.Writer
+import java.nio.charset.StandardCharsets
import com.fasterxml.jackson.core._
@@ -74,6 +75,8 @@ private[sql] class JacksonGenerator(
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+ private val lineSeparator: String = options.lineSeparatorInWrite
+
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
@@ -251,5 +254,8 @@ private[sql] class JacksonGenerator(
mapType = dataType.asInstanceOf[MapType]))
}
- def writeLineEnding(): Unit = gen.writeRaw('\n')
+ def writeLineEnding(): Unit = {
+ // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
+ gen.writeRaw(lineSeparator)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1a5e47508c070..ae3ba1690f696 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -366,6 +366,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.
*
`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file
+ * `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
*
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index bb93889dc55e9..bbc063148a72c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -518,6 +518,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.
+ * `lineSep` (default `\n`): defines the line separator that should
+ * be used for writing.
*
*
* @since 1.4.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 77e7edc8e7a20..5769c09c9a1d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -92,7 +92,8 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
- val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
+ val json: Dataset[String] = createBaseDataset(
+ sparkSession, inputPaths, parsedOptions.lineSeparator)
inferFromDataset(json, parsedOptions)
}
@@ -104,13 +105,19 @@ object TextInputJsonDataSource extends JsonDataSource {
private def createBaseDataset(
sparkSession: SparkSession,
- inputPaths: Seq[FileStatus]): Dataset[String] = {
+ inputPaths: Seq[FileStatus],
+ lineSeparator: Option[String]): Dataset[String] = {
+ val textOptions = lineSeparator.map { lineSep =>
+ Map(TextOptions.LINE_SEPARATOR -> lineSep)
+ }.getOrElse(Map.empty[String, String])
+
val paths = inputPaths.map(_.getPath.toString)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
- className = classOf[TextFileFormat].getName
+ className = classOf[TextFileFormat].getName,
+ options = textOptions
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
@@ -120,7 +127,7 @@ object TextInputJsonDataSource extends JsonDataSource {
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
- val linesReader = new HadoopFileLinesReader(file, conf)
+ val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val safeParser = new FailureSafeParser[Text](
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 18698df9fd8e5..5c1a35434f7b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -52,7 +52,7 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}
-private[text] object TextOptions {
+private[datasources] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
val LINE_SEPARATOR = "lineSep"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9b17406a816b5..ae93965bc50ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -268,6 +268,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* `java.text.SimpleDateFormat`. This applies to timestamp type.
* `multiLine` (default `false`): parse one record, which may span multiple lines,
* per file
+ * `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
*
*
* @since 2.0.0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 8c8d41ebf115a..10bac0554484a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.nio.charset.StandardCharsets
+import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.util.Locale
@@ -27,7 +28,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
@@ -2063,4 +2064,67 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
}
+
+ def testLineSeparator(lineSep: String): Unit = {
+ test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
+ // Read
+ val data =
+ s"""
+ | {"f":
+ |"a", "f0": 1}$lineSep{"f":
+ |
+ |"c", "f0": 2}$lineSep{"f": "d", "f0": 3}
+ """.stripMargin
+ val dataWithTrailingLineSep = s"$data$lineSep"
+
+ Seq(data, dataWithTrailingLineSep).foreach { lines =>
+ withTempPath { path =>
+ Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
+ val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
+ val expectedSchema =
+ StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
+ checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
+ assert(df.schema === expectedSchema)
+ }
+ }
+
+ // Write
+ withTempPath { path =>
+ Seq("a", "b", "c").toDF("value").coalesce(1)
+ .write.option("lineSep", lineSep).json(path.getAbsolutePath)
+ val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
+ val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
+ assert(
+ readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""")
+ }
+
+ // Roundtrip
+ withTempPath { path =>
+ val df = Seq("a", "b", "c").toDF()
+ df.write.option("lineSep", lineSep).json(path.getAbsolutePath)
+ val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath)
+ checkAnswer(df, readBack)
+ }
+ }
+ }
+
+ // scalastyle:off nonascii
+ Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep =>
+ testLineSeparator(lineSep)
+ }
+ // scalastyle:on nonascii
+
+ test("""SPARK-21289: Support line separator - default value \r, \r\n and \n""") {
+ val data =
+ "{\"f\": \"a\", \"f0\": 1}\r{\"f\": \"c\", \"f0\": 2}\r\n{\"f\": \"d\", \"f0\": 3}\n"
+
+ withTempPath { path =>
+ Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+ val df = spark.read.json(path.getAbsolutePath)
+ val expectedSchema =
+ StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil)
+ checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
+ assert(df.schema === expectedSchema)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index e8a5299d6ba9d..0e7f3afa9c3ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -208,9 +208,11 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
- Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString).foreach { lineSep =>
+ // scalastyle:off nonascii
+ Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep =>
testLineSeparator(lineSep)
}
+ // scalastyle:on nonascii
private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString