Skip to content

Commit 424b007

Browse files
Davies Liurxin
authored andcommitted
[SPARK-6411] [SQL] [PySpark] support date/datetime with timezone in Python
Spark SQL does not support timezone, and Pyrolite does not support timezone well. This patch will convert datetime into POSIX timestamp (without confusing of timezone), which is used by SQL. If the datetime object does not have timezone, it's treated as local time. The timezone in RDD will be lost after one round trip, all the datetime from SQL will be local time. Because of Pyrolite, datetime from SQL only has precision as 1 millisecond. This PR also drop the timezone in date, convert it to number of days since epoch (used in SQL). Author: Davies Liu <[email protected]> Closes apache#6250 from davies/tzone and squashes the following commits: 44d8497 [Davies Liu] add timezone support for DateType 99d9d9c [Davies Liu] use int for timestamp 10aa7ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into tzone 6a29aa4 [Davies Liu] support datetime with timezone
1 parent 6b68366 commit 424b007

File tree

3 files changed

+51
-11
lines changed

3 files changed

+51
-11
lines changed

python/pyspark/sql/tests.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import tempfile
2727
import pickle
2828
import functools
29+
import time
2930
import datetime
3031

3132
import py4j
@@ -47,6 +48,20 @@
4748
from pyspark.sql.window import Window
4849

4950

51+
class UTC(datetime.tzinfo):
52+
"""UTC"""
53+
ZERO = datetime.timedelta(0)
54+
55+
def utcoffset(self, dt):
56+
return self.ZERO
57+
58+
def tzname(self, dt):
59+
return "UTC"
60+
61+
def dst(self, dt):
62+
return self.ZERO
63+
64+
5065
class ExamplePointUDT(UserDefinedType):
5166
"""
5267
User-defined type (UDT) for ExamplePoint.
@@ -588,6 +603,23 @@ def test_filter_with_datetime(self):
588603
self.assertEqual(0, df.filter(df.date > date).count())
589604
self.assertEqual(0, df.filter(df.time > time).count())
590605

606+
def test_time_with_timezone(self):
607+
day = datetime.date.today()
608+
now = datetime.datetime.now()
609+
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
610+
# class in __main__ is not serializable
611+
from pyspark.sql.tests import UTC
612+
utc = UTC()
613+
utcnow = datetime.datetime.fromtimestamp(ts, utc)
614+
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
615+
day1, now1, utcnow1 = df.first()
616+
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
617+
self.assertEqual(day1.date(), day)
618+
# Pyrolite does not support microsecond, the error should be
619+
# less than 1 millisecond
620+
self.assertTrue(now - now1 < datetime.timedelta(0.001))
621+
self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))
622+
591623
def test_dropna(self):
592624
schema = StructType([
593625
StructField("name", StringType(), True),

python/pyspark/sql/types.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
655655
_need_python_to_sql_conversion(dataType.valueType)
656656
elif isinstance(dataType, UserDefinedType):
657657
return True
658-
elif isinstance(dataType, TimestampType):
658+
elif isinstance(dataType, (DateType, TimestampType)):
659659
return True
660660
else:
661661
return False
662662

663663

664+
EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
665+
666+
664667
def _python_to_sql_converter(dataType):
665668
"""
666669
Returns a converter that converts a Python object into a SQL datum for the given type.
@@ -698,26 +701,32 @@ def converter(obj):
698701
return tuple(c(d.get(n)) for n, c in zip(names, converters))
699702
else:
700703
return tuple(c(v) for c, v in zip(converters, obj))
701-
else:
704+
elif obj is not None:
702705
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
703706
return converter
704707
elif isinstance(dataType, ArrayType):
705708
element_converter = _python_to_sql_converter(dataType.elementType)
706-
return lambda a: [element_converter(v) for v in a]
709+
return lambda a: a and [element_converter(v) for v in a]
707710
elif isinstance(dataType, MapType):
708711
key_converter = _python_to_sql_converter(dataType.keyType)
709712
value_converter = _python_to_sql_converter(dataType.valueType)
710-
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
713+
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
714+
711715
elif isinstance(dataType, UserDefinedType):
712-
return lambda obj: dataType.serialize(obj)
716+
return lambda obj: obj and dataType.serialize(obj)
717+
718+
elif isinstance(dataType, DateType):
719+
return lambda d: d and d.toordinal() - EPOCH_ORDINAL
720+
713721
elif isinstance(dataType, TimestampType):
714722

715723
def to_posix_timstamp(dt):
716-
if dt.tzinfo is None:
717-
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
718-
else:
719-
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
724+
if dt:
725+
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
726+
else time.mktime(dt.timetuple()))
727+
return int(seconds * 1e7 + dt.microsecond * 10)
720728
return to_posix_timstamp
729+
721730
else:
722731
raise ValueError("Unexpected type %r" % dataType)
723732

sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
2828
import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
2929
import org.apache.spark.broadcast.Broadcast
3030
import org.apache.spark.rdd.RDD
31-
import org.apache.spark.sql.catalyst.expressions.Row
32-
import org.apache.spark.sql.catalyst.expressions._
31+
import org.apache.spark.sql.catalyst.expressions.{Row, _}
3332
import org.apache.spark.sql.catalyst.plans.logical
3433
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3534
import org.apache.spark.sql.catalyst.rules.Rule

0 commit comments

Comments
 (0)