From a790bb30e575cf6d4ffaeda307f0405f1bfecf03 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 17 Nov 2018 22:44:47 +0100 Subject: [PATCH 01/15] Added a test for default line separator --- .../execution/datasources/csv/CSVSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2efe1dda475c5..6055686de992e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.File -import java.nio.charset.{Charset, UnsupportedCharsetException} +import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.Files import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat @@ -26,18 +26,16 @@ import java.util.Locale import scala.collection.JavaConverters._ import scala.util.Properties - import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.log4j.{AppenderSkeleton, LogManager} import org.apache.log4j.spi.LoggingEvent - import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { @@ -1859,4 +1857,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(df, Row(null, csv)) } } + + test("""Support line separator - default value \r, \r\n and \n""") { + val data = "\"a\",1\r\"c\",2\r\n\"d\",3\n" + + withTempPath { path => + Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.option("inferSchema", true).csv(path.getAbsolutePath) + val expectedSchema = + StructType(StructField("_c0", StringType) :: StructField("_c1", IntegerType) :: Nil) + checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) + assert(df.schema === expectedSchema) + } + } } From 7a47990af7a9e8782fbde2955c0cf6e4848a3806 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 17 Nov 2018 22:56:34 +0100 Subject: [PATCH 02/15] Test for custom lineSep --- .../execution/datasources/csv/CSVSuite.scala | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 6055686de992e..5c6b1f1abd3c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.log4j.{AppenderSkeleton, LogManager} import org.apache.log4j.spi.LoggingEvent -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf @@ -1870,4 +1870,51 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(df.schema === expectedSchema) } } + + def testLineSeparator(lineSep: String): Unit = { + test(s"Support line separator - lineSep: '$lineSep'") { + // Read + val data = + s""" + | "a", + | 1$lineSep + | "c", 2$lineSep"d",3 + """.stripMargin + val dataWithTrailingLineSep = s"$data$lineSep" + + Seq(data, dataWithTrailingLineSep).foreach { lines => + withTempPath { path => + Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) + val schema = StructType(StructField("f", StringType) + :: StructField("f0", LongType) :: Nil) + val df = spark.read.schema(schema).option("lineSep", lineSep).json(path.getAbsolutePath) + checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) + } + } + +// // Write +// withTempPath { path => +// Seq("a", "b", "c").toDF("value").coalesce(1) +// .write.option("lineSep", lineSep).json(path.getAbsolutePath) +// val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head +// val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) +// assert( +// readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""") +// } +// +// // Roundtrip +// withTempPath { path => +// val df = Seq("a", "b", "c").toDF() +// df.write.option("lineSep", lineSep).json(path.getAbsolutePath) +// val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) +// checkAnswer(df, readBack) +// } + } + } + + // scalastyle:off nonascii + Seq("|"/*, "^", "::", "!!!@3", 0x1E.toChar.toString, "아"*/).foreach { lineSep => + testLineSeparator(lineSep) + } + // scalastyle:on nonascii } From be2870f1006c3f2e783cec0c40bd6e1c7e4c5652 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 10:59:07 +0100 Subject: [PATCH 03/15] Test on read --- .../spark/sql/catalyst/csv/CSVOptions.scala | 19 +++++++- .../sql/catalyst/csv/UnivocityParser.scala | 5 +- .../datasources/csv/CSVDataSource.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 46 ++++++++----------- 4 files changed, 41 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 6bb50b42a369c..907ee9f000819 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -192,6 +192,19 @@ class CSVOptions( */ val emptyValueInWrite = emptyValue.getOrElse("\"\"") + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => + require(sep.nonEmpty, "'lineSep' cannot be an empty string.") + sep + } + + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => + lineSep.getBytes("UTF-8") + } + val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat @@ -216,6 +229,7 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) + lineSeparatorInRead.foreach(sep => format.setLineSeparator(sep.map(_.toChar))) charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) @@ -227,7 +241,10 @@ class CSVOptions( settings.setEmptyValue(emptyValueInRead) settings.setMaxCharsPerColumn(maxCharsPerColumn) settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) - settings.setLineSeparatorDetectionEnabled(multiLine == true) + settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine) + lineSeparatorInRead.foreach { _ => + settings.setNormalizeLineEndingsWithinQuotes(!multiLine) + } settings } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 46ed58ed92830..6deb8931ecc7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -206,7 +206,10 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = { + val parsed = tokenizer.parseLine(input) + convert(parsed) + } private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 4808e8ef042d1..1d30997254813 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -95,7 +95,7 @@ object TextInputCSVDataSource extends CSVDataSource { headerChecker: CSVHeaderChecker, requiredSchema: StructType): Iterator[InternalRow] = { val lines = { - val linesReader = new HadoopFileLinesReader(file, conf) + val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close())) linesReader.map { line => new String(line.getBytes, 0, line.getLength, parser.options.charset) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 5c6b1f1abd3c1..7700989934831 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -26,16 +26,18 @@ import java.util.Locale import scala.collection.JavaConverters._ import scala.util.Properties + import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.log4j.{AppenderSkeleton, LogManager} import org.apache.log4j.spi.LoggingEvent -import org.apache.spark.{SparkException, TestUtils} + +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { @@ -1875,11 +1877,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test(s"Support line separator - lineSep: '$lineSep'") { // Read val data = - s""" - | "a", - | 1$lineSep - | "c", 2$lineSep"d",3 - """.stripMargin + s""""a",1$lineSep + |"c",2$lineSep" + |d",3""".stripMargin val dataWithTrailingLineSep = s"$data$lineSep" Seq(data, dataWithTrailingLineSep).foreach { lines => @@ -1887,33 +1887,23 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) val schema = StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - val df = spark.read.schema(schema).option("lineSep", lineSep).json(path.getAbsolutePath) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) + + val expected = Seq(("a", 1), ("\n\"c\"", 2), ("\nd", 3)).toDF() + Seq(false, true).foreach { multiLine => + val df = spark.read + .schema(schema) + .option("lineSep", lineSep) + .option("multiLine", multiLine) + .csv(path.getAbsolutePath) + checkAnswer(df, expected) + } } } - -// // Write -// withTempPath { path => -// Seq("a", "b", "c").toDF("value").coalesce(1) -// .write.option("lineSep", lineSep).json(path.getAbsolutePath) -// val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head -// val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) -// assert( -// readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""") -// } -// -// // Roundtrip -// withTempPath { path => -// val df = Seq("a", "b", "c").toDF() -// df.write.option("lineSep", lineSep).json(path.getAbsolutePath) -// val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) -// checkAnswer(df, readBack) -// } } } // scalastyle:off nonascii - Seq("|"/*, "^", "::", "!!!@3", 0x1E.toChar.toString, "아"*/).foreach { lineSep => + Seq("!").foreach { lineSep => testLineSeparator(lineSep) } // scalastyle:on nonascii From a058a6f2d6771173837ba4b6e829b2067993adb7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 11:33:12 +0100 Subject: [PATCH 04/15] Support lineSep in write --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 5 ++++- .../sql/execution/datasources/csv/CSVSuite.scala | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 907ee9f000819..8c7deb764fab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -203,7 +203,7 @@ class CSVOptions( val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => lineSep.getBytes("UTF-8") } - val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") + val lineSeparatorInWrite: Option[String] = lineSeparator def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() @@ -213,6 +213,8 @@ class CSVOptions( format.setQuoteEscape(escape) charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) + lineSeparatorInWrite.foreach(format.setLineSeparator) + writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) writerSettings.setNullValue(nullValue) @@ -232,6 +234,7 @@ class CSVOptions( lineSeparatorInRead.foreach(sep => format.setLineSeparator(sep.map(_.toChar))) charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) + settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead) settings.setReadInputOnSeparateThread(false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7700989934831..673d74b0d413b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.log4j.{AppenderSkeleton, LogManager} import org.apache.log4j.spi.LoggingEvent -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf @@ -1899,6 +1899,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + // Write + withTempPath { path => + Seq("a", "b", "c").toDF("value").coalesce(1) + .write.option("lineSep", lineSep).csv(path.getAbsolutePath) + val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head + val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) + assert( + readBack === s"a${lineSep}b${lineSep}c${lineSep}") + } } } From 7e3c0264ae93e270ed8b63c53897a2b775fa65ff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 11:36:17 +0100 Subject: [PATCH 05/15] Check roundtrip --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 673d74b0d413b..c7342cafdd7a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1909,6 +1909,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert( readBack === s"a${lineSep}b${lineSep}c${lineSep}") } + + // Roundtrip + withTempPath { path => + val df = Seq("a", "b", "c").toDF() + df.write.option("lineSep", lineSep).csv(path.getAbsolutePath) + val readBack = spark.read.option("lineSep", lineSep).csv(path.getAbsolutePath) + checkAnswer(df, readBack) + } } } From 486b090139ce6d7a93a24edae000fb546b4931db Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 11:42:08 +0100 Subject: [PATCH 06/15] Test another char --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c7342cafdd7a3..143a3921b4f80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1921,7 +1921,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } // scalastyle:off nonascii - Seq("!").foreach { lineSep => + Seq("!", 0x1E.toChar.toString).foreach { lineSep => testLineSeparator(lineSep) } // scalastyle:on nonascii From a0fedbbb06f33716fc632d3b4dd2a687b2587966 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 12:03:20 +0100 Subject: [PATCH 07/15] Don't keep quotes --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 143a3921b4f80..180de78a6a915 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1878,7 +1878,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // Read val data = s""""a",1$lineSep - |"c",2$lineSep" + |c,2$lineSep" |d",3""".stripMargin val dataWithTrailingLineSep = s"$data$lineSep" @@ -1888,7 +1888,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val schema = StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - val expected = Seq(("a", 1), ("\n\"c\"", 2), ("\nd", 3)).toDF() + val expected = Seq(("a", 1), ("\nc", 2), ("\nd", 3)).toDF() Seq(false, true).foreach { multiLine => val df = spark.read .schema(schema) From 5f013f505e7a57e4f72f6f1185f1dcdedc0960b5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 12:13:38 +0100 Subject: [PATCH 08/15] Support 2 chars as lineSep --- .../scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 5 ++++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 8c7deb764fab2..52dac25291c77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -231,7 +231,10 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) - lineSeparatorInRead.foreach(sep => format.setLineSeparator(sep.map(_.toChar))) + lineSeparatorInRead.foreach {sep => + format.setLineSeparator(sep.map(_.toChar)) + format.setNormalizedNewline(sep.head.toChar) + } charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 180de78a6a915..4eb14842e3cc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1921,7 +1921,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } // scalastyle:off nonascii - Seq("!", 0x1E.toChar.toString).foreach { lineSep => + Seq("|", "^", "::", 0x1E.toChar.toString).foreach { lineSep => testLineSeparator(lineSep) } // scalastyle:on nonascii From 65786dfabbb5c901e3f8d32f737a6b24a2f58b6b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 12:14:22 +0100 Subject: [PATCH 09/15] Revert unrelated changes --- .../org/apache/spark/sql/catalyst/csv/UnivocityParser.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 6deb8931ecc7b..46ed58ed92830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -206,10 +206,7 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = { - val parsed = tokenizer.parseLine(input) - convert(parsed) - } + def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) From 49b91ea06b757a2feed283de1634c36a59ace8f0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 12:26:19 +0100 Subject: [PATCH 10/15] Test restrictions for lineSep --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 1 + .../sql/execution/datasources/csv/CSVSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 52dac25291c77..32359a950673a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -197,6 +197,7 @@ class CSVOptions( */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") + require(sep.length <= 2, "'lineSep' can contain 1 or 2 characters.") sep } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4eb14842e3cc8..eac3f3c1f1e47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1925,4 +1925,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te testLineSeparator(lineSep) } // scalastyle:on nonascii + + test("lineSep restrictions") { + val errMsg1 = intercept[IllegalArgumentException] { + spark.read.option("lineSep", "").csv(testFile(carsFile)).collect + }.getMessage + assert(errMsg1.contains("'lineSep' cannot be an empty string")) + + val errMsg2 = intercept[IllegalArgumentException] { + spark.read.option("lineSep", "123").csv(testFile(carsFile)).collect + }.getMessage + assert(errMsg2.contains("'lineSep' can contain 1 or 2 characters")) + } } From 12022ad1a0194a4bab9007d66145071562e066a4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 18 Nov 2018 12:39:12 +0100 Subject: [PATCH 11/15] Updating comments and docs --- python/pyspark/sql/readwriter.py | 12 ++++++++---- python/pyspark/sql/streaming.py | 6 ++++-- .../scala/org/apache/spark/sql/DataFrameReader.scala | 2 ++ .../scala/org/apache/spark/sql/DataFrameWriter.scala | 2 ++ .../spark/sql/streaming/DataStreamReader.scala | 2 ++ 5 files changed, 18 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 726de4a965418..7903ff79530ab 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -353,7 +353,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None): + samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None): r"""Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -453,6 +453,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set, it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes @@ -472,7 +474,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, - enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale) + enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -868,7 +870,7 @@ def text(self, path, compression=None, lineSep=None): def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, - charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None): + charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None): r"""Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -922,6 +924,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No the default UTF-8 charset will be used. :param emptyValue: sets the string representation of an empty value. If None is set, it uses the default value, ``""``. + :param lineSep: defines the line separator that should be used for writing. If None is + set, it uses the default value, ``\\n``. Maximum length is 2. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ @@ -932,7 +936,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, - encoding=encoding, emptyValue=emptyValue) + encoding=encoding, emptyValue=emptyValue, lineSep=lineSep) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 58ca7b83e5b2b..c4c5229d2ca9b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -576,7 +576,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, - enforceSchema=None, emptyValue=None, locale=None): + enforceSchema=None, emptyValue=None, locale=None, lineSep=None): r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -675,6 +675,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param locale: sets a locale as language tag in IETF BCP 47 format. If None is set, it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. + :param lineSep: defines the line separator that should be used for parsing. If None is + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming @@ -692,7 +694,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, - emptyValue=emptyValue, locale=locale) + emptyValue=emptyValue, locale=locale, lineSep=lineSep) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index df18623e42a02..90a6dcb4bb746 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -608,6 +608,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `multiLine` (default `false`): parse one record, which may span multiple lines.
  • *
  • `locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format. * For instance, this is used while parsing dates and timestamps.
  • + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator + * that should be used for parsing. Maximum length is 2.
  • * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1b4998f94b25d..20662f164893a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -658,6 +658,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * whitespaces from values being written should be skipped. *
  • `ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not * trailing whitespaces from values being written should be skipped.
  • + *
  • `lineSep` (default `\n`): defines the line separator that should be used for writing. + * Maximum length is 2.
  • * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index bf6021e692382..e23375579610b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -377,6 +377,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `multiLine` (default `false`): parse one record, which may span multiple lines.
  • *
  • `locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format. * For instance, this is used while parsing dates and timestamps.
  • + *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator + * that should be used for parsing. Maximum length is 2.
  • * * * @since 2.0.0 From 0869b81f79bd2bb20ec5d314dbc729980d6f0320 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Nov 2018 12:08:36 +0100 Subject: [PATCH 12/15] Tests for lineSep in different encodings --- .../execution/datasources/csv/CSVSuite.scala | 60 ++++++++++++++----- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a8b5092ebfb6f..3734e2e652f76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1894,8 +1894,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } - def testLineSeparator(lineSep: String): Unit = { - test(s"Support line separator - lineSep: '$lineSep'") { + def testLineSeparator(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = { + test(s"Support line separator in ${encoding} #${id}") { // Read val data = s""""a",1$lineSep @@ -1905,17 +1905,23 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te Seq(data, dataWithTrailingLineSep).foreach { lines => withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val schema = StructType(StructField("f", StringType) - :: StructField("f0", LongType) :: Nil) + Files.write(path.toPath, lines.getBytes(encoding)) + val schema = StructType(StructField("_c0", StringType) + :: StructField("_c1", LongType) :: Nil) - val expected = Seq(("a", 1), ("\nc", 2), ("\nd", 3)).toDF() + val expected = Seq(("a", 1), ("\nc", 2), ("\nd", 3)) + .toDF("_c0", "_c1") Seq(false, true).foreach { multiLine => - val df = spark.read - .schema(schema) + val reader = spark + .read .option("lineSep", lineSep) .option("multiLine", multiLine) - .csv(path.getAbsolutePath) + .option("encoding", encoding) + val df = if (inferSchema) { + reader.option("inferSchema", true).csv(path.getAbsolutePath) + } else { + reader.schema(schema).csv(path.getAbsolutePath) + } checkAnswer(df, expected) } } @@ -1924,9 +1930,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // Write withTempPath { path => Seq("a", "b", "c").toDF("value").coalesce(1) - .write.option("lineSep", lineSep).csv(path.getAbsolutePath) + .write + .option("lineSep", lineSep) + .option("encoding", encoding) + .csv(path.getAbsolutePath) val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head - val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) + val readBack = new String(Files.readAllBytes(partFile.toPath), encoding) assert( readBack === s"a${lineSep}b${lineSep}c${lineSep}") } @@ -1934,16 +1943,37 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // Roundtrip withTempPath { path => val df = Seq("a", "b", "c").toDF() - df.write.option("lineSep", lineSep).csv(path.getAbsolutePath) - val readBack = spark.read.option("lineSep", lineSep).csv(path.getAbsolutePath) + df.write + .option("lineSep", lineSep) + .option("encoding", encoding) + .csv(path.getAbsolutePath) + val readBack = spark + .read + .option("lineSep", lineSep) + .option("encoding", encoding) + .csv(path.getAbsolutePath) checkAnswer(df, readBack) } } } // scalastyle:off nonascii - Seq("|", "^", "::", 0x1E.toChar.toString).foreach { lineSep => - testLineSeparator(lineSep) + List( + (0, "|", "UTF-8", false), + (1, "^", "UTF-16BE", true), + (2, "::", "ISO-8859-1", true), + (3, "!!", "UTF-32LE", false), + (4, 0x1E.toChar.toString, "UTF-8", true), + (5, "아", "UTF-32BE", false), + (6, "ку", "CP1251", true), + (8, "\r\n", "UTF-16LE", true), + (9, "\r\n", "utf-16be", false), + (10, "\u000d\u000a", "UTF-32BE", false), + (11, "\u000a\u000d", "UTF-8", true), + (12, "==", "US-ASCII", false), + (13, "$^", "utf-32le", true) + ).foreach { case (testNum, sep, encoding, inferSchema) => + testLineSeparator(sep, encoding, inferSchema, testNum) } // scalastyle:on nonascii From 1f5399f32a45fc7892cf5ce009b1a75221e844dd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Nov 2018 12:09:16 +0100 Subject: [PATCH 13/15] Support encoding for lineSep --- .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 32359a950673a..bc5fee40fda35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -202,7 +202,7 @@ class CSVOptions( } val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes("UTF-8") + lineSep.getBytes(charset) } val lineSeparatorInWrite: Option[String] = lineSeparator @@ -232,9 +232,9 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) - lineSeparatorInRead.foreach {sep => - format.setLineSeparator(sep.map(_.toChar)) - format.setNormalizedNewline(sep.head.toChar) + lineSeparator.foreach {sep => + format.setLineSeparator(sep) + format.setNormalizedNewline(0x00.toChar) } charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) From 918d163541cb54e37b7ddc4fc337a299343fc31d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 22 Nov 2018 11:41:33 +0100 Subject: [PATCH 14/15] Restrict lineSep by 1 character only --- .../spark/sql/catalyst/csv/CSVOptions.scala | 7 ++----- .../execution/datasources/csv/CSVSuite.scala | 18 ++++++++---------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index bc5fee40fda35..94bdb72d675d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -197,7 +197,7 @@ class CSVOptions( */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") - require(sep.length <= 2, "'lineSep' can contain 1 or 2 characters.") + require(sep.length == 1, "'lineSep' can contain only 1 character.") sep } @@ -232,10 +232,7 @@ class CSVOptions( format.setDelimiter(delimiter) format.setQuote(quote) format.setQuoteEscape(escape) - lineSeparator.foreach {sep => - format.setLineSeparator(sep) - format.setNormalizedNewline(0x00.toChar) - } + lineSeparator.foreach(format.setLineSeparator) charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping) format.setComment(comment) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3734e2e652f76..c275d63d32cc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1961,17 +1961,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te List( (0, "|", "UTF-8", false), (1, "^", "UTF-16BE", true), - (2, "::", "ISO-8859-1", true), - (3, "!!", "UTF-32LE", false), + (2, ":", "ISO-8859-1", true), + (3, "!", "UTF-32LE", false), (4, 0x1E.toChar.toString, "UTF-8", true), (5, "아", "UTF-32BE", false), - (6, "ку", "CP1251", true), - (8, "\r\n", "UTF-16LE", true), - (9, "\r\n", "utf-16be", false), - (10, "\u000d\u000a", "UTF-32BE", false), - (11, "\u000a\u000d", "UTF-8", true), - (12, "==", "US-ASCII", false), - (13, "$^", "utf-32le", true) + (6, "у", "CP1251", true), + (8, "\r", "UTF-16LE", true), + (9, "\u000d", "UTF-32BE", false), + (10, "=", "US-ASCII", false), + (11, "$", "utf-32le", true) ).foreach { case (testNum, sep, encoding, inferSchema) => testLineSeparator(sep, encoding, inferSchema, testNum) } @@ -1986,6 +1984,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val errMsg2 = intercept[IllegalArgumentException] { spark.read.option("lineSep", "123").csv(testFile(carsFile)).collect }.getMessage - assert(errMsg2.contains("'lineSep' can contain 1 or 2 characters")) + assert(errMsg2.contains("'lineSep' can contain only 1 character")) } } From a4c4b6710cb67bddd9badbb53aa07b0d93242bc5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 23 Nov 2018 08:44:25 +0100 Subject: [PATCH 15/15] Fix comments --- python/pyspark/sql/readwriter.py | 5 +++-- python/pyspark/sql/streaming.py | 3 ++- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7903ff79530ab..1d2dd4d808930 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -454,7 +454,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. :param lineSep: defines the line separator that should be used for parsing. If None is - set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2. + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. + Maximum length is 1 character. >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes @@ -925,7 +926,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No :param emptyValue: sets the string representation of an empty value. If None is set, it uses the default value, ``""``. :param lineSep: defines the line separator that should be used for writing. If None is - set, it uses the default value, ``\\n``. Maximum length is 2. + set, it uses the default value, ``\\n``. Maximum length is 1 character. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index c4c5229d2ca9b..d92b0d5677e25 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -676,7 +676,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non it uses the default value, ``en-US``. For instance, ``locale`` is used while parsing dates and timestamps. :param lineSep: defines the line separator that should be used for parsing. If None is - set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2. + set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. + Maximum length is 1 character. >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ea85650f1bd48..da88598eed061 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -610,7 +610,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format. * For instance, this is used while parsing dates and timestamps.
  • *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator - * that should be used for parsing. Maximum length is 2.
  • + * that should be used for parsing. Maximum length is 1 character. * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d9f2cf72fb241..5a807d3d4b93e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -659,7 +659,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not * trailing whitespaces from values being written should be skipped.
  • *
  • `lineSep` (default `\n`): defines the line separator that should be used for writing. - * Maximum length is 2.
  • + * Maximum length is 1 character. * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 57b86dbd8d24c..c8e3e1c191044 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -378,7 +378,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format. * For instance, this is used while parsing dates and timestamps.
  • *
  • `lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator - * that should be used for parsing. Maximum length is 2.
  • + * that should be used for parsing. Maximum length is 1 character. * * * @since 2.0.0