Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
- Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation.
- Since Spark 2.4, empty strings are saved as quoted empty strings `""`. In version 2.3 and earlier, empty strings are equal to `null` values and do not reflect to any characters in saved CSV files. For example, the row of `"a", null, "", 1` was writted as `a,,,1`. Since Spark 2.4, the same row is saved as `a,,"",1`. To restore the previous behavior, set the CSV option `emptyValue` to empty (not quoted) string.

## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above

Expand Down
12 changes: 8 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None):
samplingRatio=None, enforceSchema=None, emptyValue=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -444,6 +444,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
different, ``\0`` otherwise.
:param samplingRatio: defines fraction of rows used for schema inferring.
If None is set, it uses the default value, ``1.0``.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, empty string.

>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -463,7 +465,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema)
enforceSchema=enforceSchema, emptyValue=emptyValue)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -859,7 +861,7 @@ def text(self, path, compression=None, lineSep=None):
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None):
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -911,6 +913,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
different, ``\0`` otherwise..
:param encoding: sets the encoding (charset) of saved csv files. If None is set,
the default UTF-8 charset will be used.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, ``""``.

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand All @@ -921,7 +925,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
encoding=encoding)
encoding=encoding, emptyValue=emptyValue)
self._jwrite.csv(path)

@since(1.5)
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None):
enforceSchema=None, emptyValue=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -658,6 +658,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
the quote character. If None is set, the default value is
escape character when escape and quote characters are
different, ``\0`` otherwise..
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, empty string.

>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
Expand All @@ -674,7 +676,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema)
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* enclosed in quotes. Default is to only escape values containing a quote character.</li>
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`emptyValue` (default `""`): sets the string representation of an empty value.</li>
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved csv
* files. If it is not set, the UTF-8 charset will be used.</li>
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ class CSVOptions(
*/
val enforceSchema = getBool("enforceSchema", default = true)


/**
* String representation of an empty value in read and in write.
*/
val emptyValue = parameters.get("emptyValue")
/**
* The string is returned when CSV reader doesn't have any characters for input value,
* or an empty quoted string `""`. Default value is empty string.
*/
val emptyValueInRead = emptyValue.getOrElse("")
/**
* The value is used instead of an empty string in write. Default value is `""`
*/
val emptyValueInWrite = emptyValue.getOrElse("\"\"")

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand All @@ -173,7 +188,7 @@ class CSVOptions(
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
writerSettings.setEmptyValue("\"\"")
writerSettings.setEmptyValue(emptyValueInWrite)
writerSettings.setSkipEmptyLines(true)
writerSettings.setQuoteAllFields(quoteAll)
writerSettings.setQuoteEscapingEnabled(escapeQuotes)
Expand All @@ -194,7 +209,7 @@ class CSVOptions(
settings.setInputBufferSize(inputBufferSize)
settings.setMaxColumns(maxColumns)
settings.setNullValue(nullValue)
settings.setEmptyValue("")
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
* value.</li>
Expand Down
4 changes: 4 additions & 0 deletions sql/core/src/test/resources/test-data/cars-empty-value.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
year,make,model,comment,blank
"2012","Tesla","S","",""
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt,,""
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val carsAltFile = "test-data/cars-alternative.csv"
private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv"
private val carsNullFile = "test-data/cars-null.csv"
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
private val carsBlankColName = "test-data/cars-blank-column-name.csv"
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
Expand Down Expand Up @@ -668,6 +669,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}

test("empty fields with user defined empty values") {

// year,make,model,comment,blank
val dataSchema = StructType(List(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = false),
StructField("model", StringType, nullable = false),
StructField("comment", StringType, nullable = true),
StructField("blank", StringType, nullable = true)))
val cars = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.option("emptyValue", "empty")
.load(testFile(carsEmptyValueFile))

verifyCars(cars, withHeader = true, checkValues = false)
val results = cars.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
assert(results(1).toSeq ===
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
}

test("save csv with empty fields with user defined empty values") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath

// year,make,model,comment,blank
val dataSchema = StructType(List(
StructField("year", IntegerType, nullable = true),
StructField("make", StringType, nullable = false),
StructField("model", StringType, nullable = false),
StructField("comment", StringType, nullable = true),
StructField("blank", StringType, nullable = true)))
val cars = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.option("nullValue", "NULL")
.load(testFile(carsEmptyValueFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("emptyValue", "empty")
.option("nullValue", null)
.save(csvDir)

val carsCopy = spark.read
.format("csv")
.schema(dataSchema)
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true, checkValues = false)
val results = carsCopy.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
assert(results(1).toSeq ===
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
}
}

test("save csv with compression codec option") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
Expand Down Expand Up @@ -1375,6 +1440,52 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}

test("SPARK-25241: An empty string should not be coerced to null when emptyValue is passed.") {
val litNull: String = null
val df = Seq(
(1, "John Doe"),
(2, ""),
(3, "-"),
(4, litNull)
).toDF("id", "name")

// Checks for new behavior where a null is not coerced to an empty string when `emptyValue` is
// set to anything but an empty string literal.
withTempPath { path =>
df.write
.option("emptyValue", "-")
.csv(path.getAbsolutePath)
val computed = spark.read
.option("emptyValue", "-")
.schema(df.schema)
.csv(path.getAbsolutePath)
val expected = Seq(
(1, "John Doe"),
(2, "-"),
(3, "-"),
(4, "-")
).toDF("id", "name")

checkAnswer(computed, expected)
}
// Keeps the old behavior where empty string us coerced to emptyValue is not passed.
withTempPath { path =>
df.write
.csv(path.getAbsolutePath)
val computed = spark.read
.schema(df.schema)
.csv(path.getAbsolutePath)
val expected = Seq(
(1, "John Doe"),
(2, litNull),
(3, "-"),
(4, litNull)
).toDF("id", "name")

checkAnswer(computed, expected)
}
}

test("SPARK-24329: skip lines with comments, and one or multiple whitespaces") {
val schema = new StructType().add("colA", StringType)
val ds = spark
Expand Down