From 6a29aa4d8689bf5f5e1a4f3861d04c012ded8cd3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 18 May 2015 17:35:27 -0700 Subject: [PATCH 1/3] support datetime with timezone --- python/pyspark/sql/_types.py | 13 +++++++++ python/pyspark/sql/tests.py | 29 +++++++++++++++++++ .../spark/sql/execution/pythonUdfs.scala | 5 ++-- 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/_types.py b/python/pyspark/sql/_types.py index 9e7e9f04bc35..d1019a5e9c7a 100644 --- a/python/pyspark/sql/_types.py +++ b/python/pyspark/sql/_types.py @@ -19,6 +19,7 @@ import decimal import time import datetime +import calendar import keyword import warnings import json @@ -638,6 +639,8 @@ def _need_python_to_sql_conversion(dataType): elif isinstance(dataType, MapType): return _need_python_to_sql_conversion(dataType.keyType) or \ _need_python_to_sql_conversion(dataType.valueType) + elif isinstance(dataType, TimestampType): + return True elif isinstance(dataType, UserDefinedType): return True else: @@ -691,6 +694,16 @@ def converter(obj): key_converter = _python_to_sql_converter(dataType.keyType) value_converter = _python_to_sql_converter(dataType.valueType) return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) + + elif isinstance(dataType, TimestampType): + + def to_posix_timstamp(dt): + if dt.tzinfo is None: + return time.mktime(dt.timetuple()) + dt.microsecond / 1e6 + else: + return calendar.timegm(dt.utctimetuple()) + dt.microsecond / 1e6 + return to_posix_timstamp + elif isinstance(dataType, UserDefinedType): return lambda obj: dataType.serialize(obj) else: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d37c5dbed7f6..6b0dc2590016 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -26,6 +26,7 @@ import tempfile import pickle import functools +import time import datetime import py4j @@ -46,6 +47,20 @@ from pyspark.sql.functions import UserDefinedFunction +class UTC(datetime.tzinfo): + """UTC""" + ZERO = datetime.timedelta(0) + + def utcoffset(self, dt): + return self.ZERO + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return self.ZERO + + class ExamplePointUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePoint. @@ -571,6 +586,20 @@ def test_filter_with_datetime(self): self.assertEqual(0, df.filter(df.date > date).count()) self.assertEqual(0, df.filter(df.time > time).count()) + def test_time_with_timezone(self): + now = datetime.datetime.now() + ts = time.mktime(now.timetuple()) + now.microsecond / 1e6 + # class in __main__ is not serializable + from pyspark.sql.tests import UTC + utc = UTC() + utcnow = datetime.datetime.fromtimestamp(ts, utc) + df = self.sqlCtx.createDataFrame([(now, utcnow)]) + now1, utcnow1 = df.first() + # Spark SQL does not support microsecond, the error should be + # less than 1 millisecond + self.assertTrue(now1 - now < datetime.timedelta(0.001)) + self.assertTrue(utcnow1 - now < datetime.timedelta(0.001)) + def test_dropna(self): schema = StructType([ StructField("name", StringType(), True), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 11b2897f7678..ee52869eccb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -28,8 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Row, _} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -184,6 +183,8 @@ object EvaluatePython { case (c: java.util.Calendar, TimestampType) => new java.sql.Timestamp(c.getTime().getTime()) + case (c: Double, TimestampType) => + new java.sql.Timestamp((c * 1000).toLong) case (_, udt: UserDefinedType[_]) => fromJava(obj, udt.sqlType) From 99d9d9c58124c1c9c9c2e9c65cc3d43b6eb9ed32 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 10 Jun 2015 22:09:45 -0700 Subject: [PATCH 2/3] use int for timestamp --- python/pyspark/sql/tests.py | 6 +++--- python/pyspark/sql/types.py | 20 +++++--------------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f031e165e168..8d06a2e0a8de 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -612,10 +612,10 @@ def test_time_with_timezone(self): utcnow = datetime.datetime.fromtimestamp(ts, utc) df = self.sqlCtx.createDataFrame([(now, utcnow)]) now1, utcnow1 = df.first() - # Spark SQL does not support microsecond, the error should be + # Pyrolite does not support microsecond, the error should be # less than 1 millisecond - self.assertTrue(now1 - now < datetime.timedelta(0.001)) - self.assertTrue(utcnow1 - now < datetime.timedelta(0.001)) + self.assertTrue(now - now1 < datetime.timedelta(0.001)) + self.assertTrue(now - utcnow1 < datetime.timedelta(0.001)) def test_dropna(self): schema = StructType([ diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7b1bbca35870..c532640bb6e3 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -653,8 +653,6 @@ def _need_python_to_sql_conversion(dataType): elif isinstance(dataType, MapType): return _need_python_to_sql_conversion(dataType.keyType) or \ _need_python_to_sql_conversion(dataType.valueType) - elif isinstance(dataType, TimestampType): - return True elif isinstance(dataType, UserDefinedType): return True elif isinstance(dataType, TimestampType): @@ -711,25 +709,17 @@ def converter(obj): value_converter = _python_to_sql_converter(dataType.valueType) return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) - elif isinstance(dataType, TimestampType): - - def to_posix_timstamp(dt): - if dt.tzinfo is None: - return time.mktime(dt.timetuple()) + dt.microsecond / 1e6 - else: - return calendar.timegm(dt.utctimetuple()) + dt.microsecond / 1e6 - return to_posix_timstamp - elif isinstance(dataType, UserDefinedType): return lambda obj: dataType.serialize(obj) + elif isinstance(dataType, TimestampType): def to_posix_timstamp(dt): - if dt.tzinfo is None: - return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10) - else: - return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10) + seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) + return int(seconds * 1e7 + dt.microsecond * 10) return to_posix_timstamp + else: raise ValueError("Unexpected type %r" % dataType) From 44d84977571cba20531830b19ecb4186c93caa8f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 10 Jun 2015 23:33:26 -0700 Subject: [PATCH 3/3] add timezone support for DateType --- python/pyspark/sql/tests.py | 7 +++++-- python/pyspark/sql/types.py | 23 +++++++++++++++-------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 8d06a2e0a8de..b5fbb7d09882 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -604,14 +604,17 @@ def test_filter_with_datetime(self): self.assertEqual(0, df.filter(df.time > time).count()) def test_time_with_timezone(self): + day = datetime.date.today() now = datetime.datetime.now() ts = time.mktime(now.timetuple()) + now.microsecond / 1e6 # class in __main__ is not serializable from pyspark.sql.tests import UTC utc = UTC() utcnow = datetime.datetime.fromtimestamp(ts, utc) - df = self.sqlCtx.createDataFrame([(now, utcnow)]) - now1, utcnow1 = df.first() + df = self.sqlCtx.createDataFrame([(day, now, utcnow)]) + day1, now1, utcnow1 = df.first() + # Pyrolite serialize java.sql.Date as datetime, will be fixed in new version + self.assertEqual(day1.date(), day) # Pyrolite does not support microsecond, the error should be # less than 1 millisecond self.assertTrue(now - now1 < datetime.timedelta(0.001)) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index c532640bb6e3..23d9adb0daea 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType): _need_python_to_sql_conversion(dataType.valueType) elif isinstance(dataType, UserDefinedType): return True - elif isinstance(dataType, TimestampType): + elif isinstance(dataType, (DateType, TimestampType)): return True else: return False +EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal() + + def _python_to_sql_converter(dataType): """ Returns a converter that converts a Python object into a SQL datum for the given type. @@ -698,26 +701,30 @@ def converter(obj): return tuple(c(d.get(n)) for n, c in zip(names, converters)) else: return tuple(c(v) for c, v in zip(converters, obj)) - else: + elif obj is not None: raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType)) return converter elif isinstance(dataType, ArrayType): element_converter = _python_to_sql_converter(dataType.elementType) - return lambda a: [element_converter(v) for v in a] + return lambda a: a and [element_converter(v) for v in a] elif isinstance(dataType, MapType): key_converter = _python_to_sql_converter(dataType.keyType) value_converter = _python_to_sql_converter(dataType.valueType) - return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) + return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) elif isinstance(dataType, UserDefinedType): - return lambda obj: dataType.serialize(obj) + return lambda obj: obj and dataType.serialize(obj) + + elif isinstance(dataType, DateType): + return lambda d: d and d.toordinal() - EPOCH_ORDINAL elif isinstance(dataType, TimestampType): def to_posix_timstamp(dt): - seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) - return int(seconds * 1e7 + dt.microsecond * 10) + if dt: + seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) + return int(seconds * 1e7 + dt.microsecond * 10) return to_posix_timstamp else: