Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,17 @@ def toPandas(self):

for f, t in dtype.items():
pdf[f] = pdf[f].astype(t, copy=False)

if self.sql_ctx.getConf("spark.sql.execution.pandas.timeZoneAware", "false").lower() \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to treat it as a bug and always respect the session local timezone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need a conf, even if it is a bug. This is just to avoid breaking any existing app. We can remove the conf in Spark 3.x.

== "true":
from dateutil import tz
tzlocal = tz.tzlocal()
timezone = self.sql_ctx.getConf("spark.sql.session.timeZone")
for field in self.schema:
if type(field.dataType) == TimestampType:
pdf[field.name] = pdf[field.name].apply(
lambda ts: ts.tz_localize(tzlocal).tz_convert(timezone))

return pdf

def _collectAsArrow(self):
Expand Down
31 changes: 31 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2507,6 +2507,37 @@ def test_to_pandas(self):
self.assertEquals(types[2], np.bool)
self.assertEquals(types[3], np.float32)

@unittest.skipIf(not _have_pandas, "Pandas not installed")
def test_to_pandas_timezone_aware(self):
import pandas as pd
from dateutil import tz
tzlocal = tz.tzlocal()
ts = datetime.datetime(1970, 1, 1)
pdf = pd.DataFrame.from_records([[ts]], columns=['ts'])

self.spark.conf.set('spark.sql.session.timeZone', 'America/Los_Angeles')

schema = StructType().add("ts", TimestampType())
df = self.spark.createDataFrame([(ts,)], schema)

pdf_naive = df.toPandas()
self.assertEqual(pdf_naive['ts'][0].tzinfo, None)
self.assertTrue(pdf_naive.equals(pdf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really a test that df.toPandas() is time zone naive. If that was true then you should be able to do

df = self.spark.createDataFrame([(ts,)], schema)
os.environ["TZ"] = "America/New_York"
time.tzset()
pdf_naive = df.toPandas()
self.assertTrue(pdf_naive.equals(pdf))

but this will fail because toPandas() does a conversion to local time, which is what the original data happens to be


self.spark.conf.set('spark.sql.execution.pandas.timeZoneAware', 'true')

pdf_pst = df.toPandas()
self.assertEqual(pdf_pst['ts'][0].tzinfo.zone, 'America/Los_Angeles')
self.assertFalse(pdf_pst.equals(pdf))

pdf_pst_naive = pdf_pst.copy()
pdf_pst_naive['ts'] = pdf_pst_naive['ts'].apply(
lambda ts: ts.tz_convert(tzlocal).tz_localize(None))
self.assertTrue(pdf_pst_naive.equals(pdf))

self.spark.conf.unset('spark.sql.execution.pandas.timeZoneAware')
self.spark.conf.unset('spark.sql.session.timeZone')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not a big deal but we could use finally just in case this test fails and other tests do not get affected by this test failure in the future)


def test_create_dataframe_from_array_of_long(self):
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,14 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val PANDAS_TIMEZONE_AWARE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other parts of the pyspark that doesn't use session local timezone. For instance, df.collect() and (maybe) python udf execution.

I am worried that having those to be inconsistent (some use local timezone, some doesn't) and complex (one configuration for each of these functionality?)

While it will be harder to fix, but how about we use one configuration to control the behavior of df.toPandas() and df.collect() and python udf regarding session local timezone?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with this. There is also inconsistent behavior when bringing data into Spark because TimestampType.toInternal does a conversion with local time and not with session local timezone.

buildConf("spark.sql.execution.pandas.timeZoneAware")
.internal()
.doc("When true, make Pandas DataFrame with timezone-aware timestamp type when converting " +
"by pyspark.sql.DataFrame.toPandas. The session local timezone is used for the timezone.")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change the default to true, since we agree that this is a bug.


object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down