diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index edc7ca6f5146..abf68da3a3aa 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1760,6 +1760,17 @@ def toPandas(self): for f, t in dtype.items(): pdf[f] = pdf[f].astype(t, copy=False) + + if self.sql_ctx.getConf("spark.sql.execution.pandas.timeZoneAware", "false").lower() \ + == "true": + from dateutil import tz + tzlocal = tz.tzlocal() + timezone = self.sql_ctx.getConf("spark.sql.session.timeZone") + for field in self.schema: + if type(field.dataType) == TimestampType: + pdf[field.name] = pdf[field.name].apply( + lambda ts: ts.tz_localize(tzlocal).tz_convert(timezone)) + return pdf def _collectAsArrow(self): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cf2c473a1645..cee0dabc0a4b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2507,6 +2507,37 @@ def test_to_pandas(self): self.assertEquals(types[2], np.bool) self.assertEquals(types[3], np.float32) + @unittest.skipIf(not _have_pandas, "Pandas not installed") + def test_to_pandas_timezone_aware(self): + import pandas as pd + from dateutil import tz + tzlocal = tz.tzlocal() + ts = datetime.datetime(1970, 1, 1) + pdf = pd.DataFrame.from_records([[ts]], columns=['ts']) + + self.spark.conf.set('spark.sql.session.timeZone', 'America/Los_Angeles') + + schema = StructType().add("ts", TimestampType()) + df = self.spark.createDataFrame([(ts,)], schema) + + pdf_naive = df.toPandas() + self.assertEqual(pdf_naive['ts'][0].tzinfo, None) + self.assertTrue(pdf_naive.equals(pdf)) + + self.spark.conf.set('spark.sql.execution.pandas.timeZoneAware', 'true') + + pdf_pst = df.toPandas() + self.assertEqual(pdf_pst['ts'][0].tzinfo.zone, 'America/Los_Angeles') + self.assertFalse(pdf_pst.equals(pdf)) + + pdf_pst_naive = pdf_pst.copy() + pdf_pst_naive['ts'] = pdf_pst_naive['ts'].apply( + lambda ts: ts.tz_convert(tzlocal).tz_localize(None)) + self.assertTrue(pdf_pst_naive.equals(pdf)) + + self.spark.conf.unset('spark.sql.execution.pandas.timeZoneAware') + self.spark.conf.unset('spark.sql.session.timeZone') + def test_create_dataframe_from_array_of_long(self): import array data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 733d80e9d46c..7f48f4193239 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -912,6 +912,14 @@ object SQLConf { .intConf .createWithDefault(10000) + val PANDAS_TIMEZONE_AWARE = + buildConf("spark.sql.execution.pandas.timeZoneAware") + .internal() + .doc("When true, make Pandas DataFrame with timezone-aware timestamp type when converting " + + "by pyspark.sql.DataFrame.toPandas. The session local timezone is used for the timezone.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" }