Skip to content

Commit 3258f27

Browse files
HyukjinKwonrxin
authored andcommitted
[SPARK-16216][SQL][BRANCH-2.0] Backport Read/write dateFormat/timestampFormat options for CSV and JSON
## What changes were proposed in this pull request? This PR backports #14279 to 2.0. ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <[email protected]> Closes #14799 from HyukjinKwon/SPARK-16216-json-csv-backport.
1 parent 9f363a6 commit 3258f27

File tree

18 files changed

+478
-110
lines changed

18 files changed

+478
-110
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def load(self, path=None, format=None, schema=None, **options):
156156
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
157157
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
158158
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
159-
mode=None, columnNameOfCorruptRecord=None):
159+
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
160160
"""
161161
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
162162
(one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
198198
``spark.sql.columnNameOfCorruptRecord``. If None is set,
199199
it uses the value specified in
200200
``spark.sql.columnNameOfCorruptRecord``.
201+
:param dateFormat: sets the string that indicates a date format. Custom date formats
202+
follow the formats at ``java.text.SimpleDateFormat``. This
203+
applies to date type. If None is set, it uses the
204+
default value value, ``yyyy-MM-dd``.
205+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
206+
formats follow the formats at ``java.text.SimpleDateFormat``.
207+
This applies to timestamp type. If None is set, it uses the
208+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
201209
202210
>>> df1 = spark.read.json('python/test_support/sql/people.json')
203211
>>> df1.dtypes
@@ -213,7 +221,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
213221
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
214222
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
215223
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
216-
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
224+
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
225+
timestampFormat=timestampFormat)
217226
if isinstance(path, basestring):
218227
path = [path]
219228
if type(path) == list:
@@ -285,8 +294,8 @@ def text(self, paths):
285294
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
286295
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
287296
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
288-
negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
289-
maxMalformedLogPerPartition=None, mode=None):
297+
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
298+
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
290299
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
291300
292301
This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
327336
is set, it uses the default value, ``Inf``.
328337
:param dateFormat: sets the string that indicates a date format. Custom date formats
329338
follow the formats at ``java.text.SimpleDateFormat``. This
330-
applies to both date type and timestamp type. By default, it is None
331-
which means trying to parse times and date by
332-
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
339+
applies to date type. If None is set, it uses the
340+
default value value, ``yyyy-MM-dd``.
341+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
342+
formats follow the formats at ``java.text.SimpleDateFormat``.
343+
This applies to timestamp type. If None is set, it uses the
344+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
333345
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
334346
set, it uses the default value, ``20480``.
335347
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
356368
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
357369
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
358370
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
359-
dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
371+
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
372+
maxCharsPerColumn=maxCharsPerColumn,
360373
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
361374
if isinstance(path, basestring):
362375
path = [path]
@@ -571,7 +584,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
571584
self._jwrite.saveAsTable(name)
572585

573586
@since(1.4)
574-
def json(self, path, mode=None, compression=None):
587+
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
575588
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
576589
577590
:param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ def json(self, path, mode=None, compression=None):
584597
:param compression: compression codec to use when saving to file. This can be one of the
585598
known case-insensitive shorten names (none, bzip2, gzip, lz4,
586599
snappy and deflate).
600+
:param dateFormat: sets the string that indicates a date format. Custom date formats
601+
follow the formats at ``java.text.SimpleDateFormat``. This
602+
applies to date type. If None is set, it uses the
603+
default value value, ``yyyy-MM-dd``.
604+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
605+
formats follow the formats at ``java.text.SimpleDateFormat``.
606+
This applies to timestamp type. If None is set, it uses the
607+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
587608
588609
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
589610
"""
590611
self.mode(mode)
591-
self._set_opts(compression=compression)
612+
self._set_opts(
613+
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
592614
self._jwrite.json(path)
593615

594616
@since(1.4)
@@ -634,7 +656,8 @@ def text(self, path, compression=None):
634656

635657
@since(2.0)
636658
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
637-
header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
659+
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
660+
timestampFormat=None):
638661
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
639662
640663
:param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
666689
the default value, ``false``.
667690
:param nullValue: sets the string representation of a null value. If None is set, it uses
668691
the default value, empty string.
692+
:param dateFormat: sets the string that indicates a date format. Custom date formats
693+
follow the formats at ``java.text.SimpleDateFormat``. This
694+
applies to date type. If None is set, it uses the
695+
default value value, ``yyyy-MM-dd``.
696+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
697+
formats follow the formats at ``java.text.SimpleDateFormat``.
698+
This applies to timestamp type. If None is set, it uses the
699+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
669700
670701
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
671702
"""
672703
self.mode(mode)
673704
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
674-
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
705+
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
706+
dateFormat=dateFormat, timestampFormat=timestampFormat)
675707
self._jwrite.csv(path)
676708

677709
@since(1.5)

python/pyspark/sql/streaming.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ def load(self, path=None, format=None, schema=None, **options):
338338
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
339339
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
340340
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
341-
mode=None, columnNameOfCorruptRecord=None):
341+
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
342+
timestampFormat=None):
342343
"""
343344
Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
344345
@@ -381,6 +382,14 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
381382
``spark.sql.columnNameOfCorruptRecord``. If None is set,
382383
it uses the value specified in
383384
``spark.sql.columnNameOfCorruptRecord``.
385+
:param dateFormat: sets the string that indicates a date format. Custom date formats
386+
follow the formats at ``java.text.SimpleDateFormat``. This
387+
applies to date type. If None is set, it uses the
388+
default value value, ``yyyy-MM-dd``.
389+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
390+
formats follow the formats at ``java.text.SimpleDateFormat``.
391+
This applies to timestamp type. If None is set, it uses the
392+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
384393
385394
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
386395
>>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
393402
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
394403
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
395404
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
396-
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
405+
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
406+
timestampFormat=timestampFormat)
397407
if isinstance(path, basestring):
398408
return self._df(self._jreader.json(path))
399409
else:
@@ -450,8 +460,8 @@ def text(self, path):
450460
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
451461
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
452462
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
453-
negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
454-
maxMalformedLogPerPartition=None, mode=None):
463+
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
464+
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
455465
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
456466
457467
This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
494504
is set, it uses the default value, ``Inf``.
495505
:param dateFormat: sets the string that indicates a date format. Custom date formats
496506
follow the formats at ``java.text.SimpleDateFormat``. This
497-
applies to both date type and timestamp type. By default, it is None
498-
which means trying to parse times and date by
499-
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
507+
applies to date type. If None is set, it uses the
508+
default value value, ``yyyy-MM-dd``.
509+
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
510+
formats follow the formats at ``java.text.SimpleDateFormat``.
511+
This applies to timestamp type. If None is set, it uses the
512+
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
500513
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
501514
set, it uses the default value, ``20480``.
502515
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
521534
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
522535
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
523536
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
524-
dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
537+
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
538+
maxCharsPerColumn=maxCharsPerColumn,
525539
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
526540
if isinstance(path, basestring):
527541
return self._df(self._jreader.csv(path))

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
280280
* <li>`columnNameOfCorruptRecord` (default is the value specified in
281281
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
282282
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
283+
* <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
284+
* Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
285+
* date type.</li>
286+
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
287+
* indicates a timestamp format. Custom date formats follow the formats at
288+
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
283289
* </ul>
284290
* @since 2.0.0
285291
*/
@@ -376,10 +382,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
376382
* value.</li>
377383
* <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
378384
* value.</li>
379-
* <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
380-
* formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
381-
* and timestamp type. By default, it is `null` which means trying to parse times and date by
382-
* `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
385+
* <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
386+
* Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
387+
* date type.</li>
388+
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
389+
* indicates a timestamp format. Custom date formats follow the formats at
390+
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
391+
* `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.</li>
383392
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
384393
* a record can have.</li>
385394
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
452452
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
453453
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
454454
* `snappy` and `deflate`). </li>
455+
* <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
456+
* Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
457+
* date type.</li>
458+
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
459+
* indicates a timestamp format. Custom date formats follow the formats at
460+
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
455461
*
456462
* @since 1.4.0
457463
*/
@@ -544,6 +550,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
544550
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
545551
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
546552
* `snappy` and `deflate`). </li>
553+
* <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
554+
* Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
555+
* date type.</li>
556+
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
557+
* indicates a timestamp format. Custom date formats follow the formats at
558+
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
547559
*
548560
* @since 2.0.0
549561
*/

0 commit comments

Comments
 (0)