Skip to content

Commit 865b2fd

Browse files
ueshincloud-fan
authored andcommitted
[SPARK-18937][SQL] Timezone support in CSV/JSON parsing
## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <[email protected]> Closes #16750 from ueshin/issues/SPARK-18937.
1 parent 6a9a85b commit 865b2fd

File tree

20 files changed

+351
-123
lines changed

20 files changed

+351
-123
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ def load(self, path=None, format=None, schema=None, **options):
158158
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
159159
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
160160
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
161-
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
161+
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
162+
timeZone=None):
162163
"""
163164
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
164165
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
@@ -204,11 +205,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
204205
:param dateFormat: sets the string that indicates a date format. Custom date formats
205206
follow the formats at ``java.text.SimpleDateFormat``. This
206207
applies to date type. If None is set, it uses the
207-
default value value, ``yyyy-MM-dd``.
208+
default value, ``yyyy-MM-dd``.
208209
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
209210
formats follow the formats at ``java.text.SimpleDateFormat``.
210211
This applies to timestamp type. If None is set, it uses the
211-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
212+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
213+
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
214+
If None is set, it uses the default value, session local timezone.
212215
213216
>>> df1 = spark.read.json('python/test_support/sql/people.json')
214217
>>> df1.dtypes
@@ -225,7 +228,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
225228
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
226229
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
227230
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
228-
timestampFormat=timestampFormat)
231+
timestampFormat=timestampFormat, timeZone=timeZone)
229232
if isinstance(path, basestring):
230233
path = [path]
231234
if type(path) == list:
@@ -298,7 +301,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
298301
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
299302
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
300303
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
301-
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
304+
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
302305
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
303306
304307
This function will go through the input once to determine the input schema if
@@ -341,11 +344,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
341344
:param dateFormat: sets the string that indicates a date format. Custom date formats
342345
follow the formats at ``java.text.SimpleDateFormat``. This
343346
applies to date type. If None is set, it uses the
344-
default value value, ``yyyy-MM-dd``.
347+
default value, ``yyyy-MM-dd``.
345348
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
346349
formats follow the formats at ``java.text.SimpleDateFormat``.
347350
This applies to timestamp type. If None is set, it uses the
348-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
351+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
349352
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
350353
set, it uses the default value, ``20480``.
351354
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -357,6 +360,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
357360
uses the default value, ``10``.
358361
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
359362
set, it uses the default value, ``PERMISSIVE``.
363+
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
364+
If None is set, it uses the default value, session local timezone.
360365
361366
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
362367
When a schema is set by user, it sets ``null`` for extra fields.
@@ -374,7 +379,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
374379
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
375380
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
376381
maxCharsPerColumn=maxCharsPerColumn,
377-
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
382+
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
378383
if isinstance(path, basestring):
379384
path = [path]
380385
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -591,7 +596,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
591596
self._jwrite.saveAsTable(name)
592597

593598
@since(1.4)
594-
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
599+
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
600+
timeZone=None):
595601
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
596602
597603
:param path: the path in any Hadoop supported file system
@@ -607,17 +613,20 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
607613
:param dateFormat: sets the string that indicates a date format. Custom date formats
608614
follow the formats at ``java.text.SimpleDateFormat``. This
609615
applies to date type. If None is set, it uses the
610-
default value value, ``yyyy-MM-dd``.
616+
default value, ``yyyy-MM-dd``.
611617
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
612618
formats follow the formats at ``java.text.SimpleDateFormat``.
613619
This applies to timestamp type. If None is set, it uses the
614-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
620+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
621+
:param timeZone: sets the string that indicates a timezone to be used to format timestamps.
622+
If None is set, it uses the default value, session local timezone.
615623
616624
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
617625
"""
618626
self.mode(mode)
619627
self._set_opts(
620-
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
628+
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
629+
timeZone=timeZone)
621630
self._jwrite.json(path)
622631

623632
@since(1.4)
@@ -664,7 +673,7 @@ def text(self, path, compression=None):
664673
@since(2.0)
665674
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
666675
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
667-
timestampFormat=None):
676+
timestampFormat=None, timeZone=None):
668677
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
669678
670679
:param path: the path in any Hadoop supported file system
@@ -699,18 +708,20 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
699708
:param dateFormat: sets the string that indicates a date format. Custom date formats
700709
follow the formats at ``java.text.SimpleDateFormat``. This
701710
applies to date type. If None is set, it uses the
702-
default value value, ``yyyy-MM-dd``.
711+
default value, ``yyyy-MM-dd``.
703712
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
704713
formats follow the formats at ``java.text.SimpleDateFormat``.
705714
This applies to timestamp type. If None is set, it uses the
706-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
715+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
716+
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
717+
If None is set, it uses the default value, session local timezone.
707718
708719
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
709720
"""
710721
self.mode(mode)
711722
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
712723
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
713-
dateFormat=dateFormat, timestampFormat=timestampFormat)
724+
dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
714725
self._jwrite.csv(path)
715726

716727
@since(1.5)

python/pyspark/sql/streaming.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
429429
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
430430
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
431431
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
432-
timestampFormat=None):
432+
timestampFormat=None, timeZone=None):
433433
"""
434434
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
435435
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.
@@ -476,11 +476,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
476476
:param dateFormat: sets the string that indicates a date format. Custom date formats
477477
follow the formats at ``java.text.SimpleDateFormat``. This
478478
applies to date type. If None is set, it uses the
479-
default value value, ``yyyy-MM-dd``.
479+
default value, ``yyyy-MM-dd``.
480480
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
481481
formats follow the formats at ``java.text.SimpleDateFormat``.
482482
This applies to timestamp type. If None is set, it uses the
483-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
483+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
484+
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
485+
If None is set, it uses the default value, session local timezone.
484486
485487
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
486488
>>> json_sdf.isStreaming
@@ -494,7 +496,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
494496
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
495497
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
496498
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
497-
timestampFormat=timestampFormat)
499+
timestampFormat=timestampFormat, timeZone=timeZone)
498500
if isinstance(path, basestring):
499501
return self._df(self._jreader.json(path))
500502
else:
@@ -552,7 +554,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
552554
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
553555
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
554556
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
555-
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
557+
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
556558
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
557559
558560
This function will go through the input once to determine the input schema if
@@ -597,18 +599,20 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
597599
:param dateFormat: sets the string that indicates a date format. Custom date formats
598600
follow the formats at ``java.text.SimpleDateFormat``. This
599601
applies to date type. If None is set, it uses the
600-
default value value, ``yyyy-MM-dd``.
602+
default value, ``yyyy-MM-dd``.
601603
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
602604
formats follow the formats at ``java.text.SimpleDateFormat``.
603605
This applies to timestamp type. If None is set, it uses the
604-
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
606+
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
605607
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
606608
set, it uses the default value, ``20480``.
607609
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
608610
value being read. If None is set, it uses the default value,
609611
``-1`` meaning unlimited length.
610612
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
611613
set, it uses the default value, ``PERMISSIVE``.
614+
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
615+
If None is set, it uses the default value, session local timezone.
612616
613617
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
614618
When a schema is set by user, it sets ``null`` for extra fields.
@@ -628,7 +632,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
628632
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
629633
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
630634
maxCharsPerColumn=maxCharsPerColumn,
631-
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
635+
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
632636
if isinstance(path, basestring):
633637
return self._df(self._jreader.csv(path))
634638
else:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
482482
/**
483483
* Converts an json input string to a [[StructType]] with the specified schema.
484484
*/
485-
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
486-
extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
485+
case class JsonToStruct(
486+
schema: StructType,
487+
options: Map[String, String],
488+
child: Expression,
489+
timeZoneId: Option[String] = None)
490+
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
487491
override def nullable: Boolean = true
488492

493+
def this(schema: StructType, options: Map[String, String], child: Expression) =
494+
this(schema, options, child, None)
495+
489496
@transient
490497
lazy val parser =
491498
new JacksonParser(
492499
schema,
493500
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
494-
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
501+
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
495502

496503
override def dataType: DataType = schema
497504

505+
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
506+
copy(timeZoneId = Option(timeZoneId))
507+
498508
override def nullSafeEval(json: Any): Any = {
499509
try parser.parse(json.toString).headOption.orNull catch {
500510
case _: SparkSQLJsonProcessingException => null
@@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
507517
/**
508518
* Converts a [[StructType]] to a json output string.
509519
*/
510-
case class StructToJson(options: Map[String, String], child: Expression)
511-
extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
520+
case class StructToJson(
521+
options: Map[String, String],
522+
child: Expression,
523+
timeZoneId: Option[String] = None)
524+
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
512525
override def nullable: Boolean = true
513526

527+
def this(options: Map[String, String], child: Expression) = this(options, child, None)
528+
514529
@transient
515530
lazy val writer = new CharArrayWriter()
516531

@@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression)
519534
new JacksonGenerator(
520535
child.dataType.asInstanceOf[StructType],
521536
writer,
522-
new JSONOptions(options))
537+
new JSONOptions(options, timeZoneId.get))
523538

524539
override def dataType: DataType = StringType
525540

@@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression)
538553
}
539554
}
540555

556+
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
557+
copy(timeZoneId = Option(timeZoneId))
558+
541559
override def nullSafeEval(row: Any): Any = {
542560
gen.write(row.asInstanceOf[InternalRow])
543561
gen.flush()

0 commit comments

Comments
 (0)