diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 803f561ece67b..df56bb762863a 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -304,16 +304,18 @@ def parquet(self, *paths):
@ignore_unicode_prefix
@since(1.6)
- def text(self, paths, wholetext=False):
+ def text(self, paths, wholetext=False, lineSep=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
are any.
- Each line in the text file is a new row in the resulting DataFrame.
+ By default, each line in the text file is a new row in the resulting DataFrame.
:param paths: string, or list of strings, for input path(s).
:param wholetext: if true, read each file from input path(s) as a single row.
+ :param lineSep: defines the line separator that should be used for parsing. If None is
+ set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
@@ -322,7 +324,7 @@ def text(self, paths, wholetext=False):
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
- self._set_opts(wholetext=wholetext)
+ self._set_opts(wholetext=wholetext, lineSep=lineSep)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -804,18 +806,20 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None):
self._jwrite.parquet(path)
@since(1.6)
- def text(self, path, compression=None):
+ def text(self, path, compression=None, lineSep=None):
"""Saves the content of the DataFrame in a text file at the specified path.
:param path: the path in any Hadoop supported file system
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
+ :param lineSep: defines the line separator that should be used for writing. If None is
+ set, it uses the default value, ``\\n``.
The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.
"""
- self._set_opts(compression=compression)
+ self._set_opts(compression=compression, lineSep=lineSep)
self._jwrite.text(path)
@since(2.0)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e8966c20a8f42..07f9ac1b5aa9e 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -531,17 +531,20 @@ def parquet(self, path):
@ignore_unicode_prefix
@since(2.0)
- def text(self, path):
+ def text(self, path, wholetext=False, lineSep=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
are any.
- Each line in the text file is a new row in the resulting DataFrame.
+ By default, each line in the text file is a new row in the resulting DataFrame.
.. note:: Evolving.
:param paths: string, or list of strings, for input path(s).
+ :param wholetext: if true, read each file from input path(s) as a single row.
+ :param lineSep: defines the line separator that should be used for parsing. If None is
+ set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
@@ -549,6 +552,7 @@ def text(self, path):
>>> "value" in str(text_sdf.schema)
True
"""
+ self._set_opts(wholetext=wholetext, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.text(path))
else:
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 480815d27333f..6a3c580e5ba07 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -648,7 +648,29 @@ def test_non_existed_udaf(self):
self.assertRaisesRegexp(AnalysisException, "Can not load class non_existed_udaf",
lambda: spark.udf.registerJavaUDAF("udaf1", "non_existed_udaf"))
- def test_multiLine_json(self):
+ def test_linesep_text(self):
+ df = self.spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",")
+ expected = [Row(value=u'Joe'), Row(value=u'20'), Row(value=u'"Hi'),
+ Row(value=u'\nI am Jeo"\nTom'), Row(value=u'30'),
+ Row(value=u'"My name is Tom"\nHyukjin'), Row(value=u'25'),
+ Row(value=u'"I am Hyukjin\n\nI love Spark!"\n')]
+ self.assertEqual(df.collect(), expected)
+
+ tpath = tempfile.mkdtemp()
+ shutil.rmtree(tpath)
+ try:
+ df.write.text(tpath, lineSep="!")
+ expected = [Row(value=u'Joe!20!"Hi!'), Row(value=u'I am Jeo"'),
+ Row(value=u'Tom!30!"My name is Tom"'),
+ Row(value=u'Hyukjin!25!"I am Hyukjin'),
+ Row(value=u''), Row(value=u'I love Spark!"'),
+ Row(value=u'!')]
+ readback = self.spark.read.text(tpath)
+ self.assertEqual(readback.collect(), expected)
+ finally:
+ shutil.rmtree(tpath)
+
+ def test_multiline_json(self):
people1 = self.spark.read.json("python/test_support/sql/people.json")
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
multiLine=True)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0139913aaa4e2..1a5e47508c070 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -647,14 +647,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
- * You can set the following text-specific option(s) for reading text files:
- *
- * - `wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
- *
- *
- * By default, each line in the text files is a new row in the resulting DataFrame.
- *
- * Usage example:
+ * By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
@@ -663,6 +656,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* spark.read().text("/path/to/spark/README.md")
* }}}
*
+ * You can set the following text-specific option(s) for reading text files:
+ *
+ * - `wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ * - `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
+ *
+ *
* @param paths input paths
* @since 1.6.0
*/
@@ -686,11 +687,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
- * You can set the following textFile-specific option(s) for reading text files:
- *
- * - `wholetext` ( default `false`): If true, read a file as a single row and not split by "\n".
- *
- *
* By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
@@ -700,6 +696,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* spark.read().textFile("/path/to/spark/README.md")
* }}}
*
+ * You can set the following textFile-specific option(s) for reading text files:
+ *
+ * - `wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ * - `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
+ *
+ *
* @param paths input path
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ed7a9100cc7f1..bb93889dc55e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -587,6 +587,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* `compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`).
+ * `lineSep` (default `\n`): defines the line separator that should
+ * be used for writing.
*
*
* @since 1.6.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index 83cf26c63a175..00a78f7343c59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -30,9 +30,22 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
/**
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
* in that file.
+ *
+ * @param file A part (i.e. "block") of a single file that should be read line by line.
+ * @param lineSeparator A line separator that should be used for each line. If the value is `None`,
+ * it covers `\r`, `\r\n` and `\n`.
+ * @param conf Hadoop configuration
+ *
+ * @note The behavior when `lineSeparator` is `None` (covering `\r`, `\r\n` and `\n`) is defined
+ * by [[LineRecordReader]], not within Spark.
*/
class HadoopFileLinesReader(
- file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
+ file: PartitionedFile,
+ lineSeparator: Option[Array[Byte]],
+ conf: Configuration) extends Iterator[Text] with Closeable {
+
+ def this(file: PartitionedFile, conf: Configuration) = this(file, None, conf)
+
private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
@@ -42,7 +55,13 @@ class HadoopFileLinesReader(
Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- val reader = new LineRecordReader()
+
+ val reader = lineSeparator match {
+ case Some(sep) => new LineRecordReader(sep)
+ // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`.
+ case _ => new LineRecordReader()
+ }
+
reader.initialize(fileSplit, hadoopAttemptContext)
new RecordReaderIterator(reader)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index c661e9bd3b94c..9647f09867643 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,11 +17,8 @@
package org.apache.spark.sql.execution.datasources.text
-import java.io.Closeable
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.TaskContext
@@ -89,7 +86,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new TextOutputWriter(path, dataSchema, context)
+ new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context)
}
override def getFileExtension(context: TaskAttemptContext): String = {
@@ -113,18 +110,18 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions.wholeText)
+ readToUnsafeMem(broadcastedHadoopConf, requiredSchema, textOptions)
}
private def readToUnsafeMem(
conf: Broadcast[SerializableConfiguration],
requiredSchema: StructType,
- wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow] = {
+ textOptions: TextOptions): (PartitionedFile) => Iterator[UnsafeRow] = {
(file: PartitionedFile) => {
val confValue = conf.value.value
- val reader = if (!wholeTextMode) {
- new HadoopFileLinesReader(file, confValue)
+ val reader = if (!textOptions.wholeText) {
+ new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, confValue)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
@@ -152,6 +149,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
class TextOutputWriter(
path: String,
dataSchema: StructType,
+ lineSeparator: Array[Byte],
context: TaskAttemptContext)
extends OutputWriter {
@@ -162,7 +160,7 @@ class TextOutputWriter(
val utf8string = row.getUTF8String(0)
utf8string.writeTo(writer)
}
- writer.write('\n')
+ writer.write(lineSeparator)
}
override def close(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 2a661561ab51e..18698df9fd8e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.text
+import java.nio.charset.StandardCharsets
+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}
/**
@@ -39,9 +41,19 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
+ private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep =>
+ require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
+ sep
+ }
+ // Note that the option 'lineSep' uses a different default value in read and write.
+ val lineSeparatorInRead: Option[Array[Byte]] =
+ lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
+ val lineSeparatorInWrite: Array[Byte] =
+ lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}
private[text] object TextOptions {
val COMPRESSION = "compression"
val WHOLETEXT = "wholetext"
+ val LINE_SEPARATOR = "lineSep"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index c393dcdfdd7e5..9b17406a816b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -387,7 +387,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* Loads text files and returns a `DataFrame` whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
- * Each line in the text files is a new row in the resulting DataFrame. For example:
+ * By default, each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.readStream.text("/path/to/directory/")
@@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.
+ * - `wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ * - `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
*
*
* @since 2.0.0
@@ -413,7 +417,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
- * Each line in the text file is a new element in the resulting Dataset. For example:
+ * By default, each line in the text file is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
* spark.readStream.textFile("/path/to/spark/README.md")
@@ -426,6 +430,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*
* - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.
+ * - `wholetext` (default `false`): If true, read a file as a single row and not split by "\n".
+ *
+ * - `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
+ * that should be used for parsing.
*
*
* @param path input path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 33287044f279e..e8a5299d6ba9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -18,10 +18,13 @@
package org.apache.spark.sql.execution.datasources.text
import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.TestUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -172,6 +175,43 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
+ def testLineSeparator(lineSep: String): Unit = {
+ test(s"SPARK-23577: Support line separator - lineSep: '$lineSep'") {
+ // Read
+ val values = Seq("a", "b", "\nc")
+ val data = values.mkString(lineSep)
+ val dataWithTrailingLineSep = s"$data$lineSep"
+ Seq(data, dataWithTrailingLineSep).foreach { lines =>
+ withTempPath { path =>
+ Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
+ val df = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath)
+ checkAnswer(df, Seq("a", "b", "\nc").toDF())
+ }
+ }
+
+ // Write
+ withTempPath { path =>
+ values.toDF().coalesce(1)
+ .write.option("lineSep", lineSep).text(path.getAbsolutePath)
+ val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
+ val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)
+ assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}")
+ }
+
+ // Roundtrip
+ withTempPath { path =>
+ val df = values.toDF()
+ df.write.option("lineSep", lineSep).text(path.getAbsolutePath)
+ val readBack = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath)
+ checkAnswer(df, readBack)
+ }
+ }
+ }
+
+ Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString).foreach { lineSep =>
+ testLineSeparator(lineSep)
+ }
+
private def testFile: String = {
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString
}