Skip to content

Commit 43d9c7e

Browse files
committed
[SPARK-30640][PYTHON][SQL] Prevent unnecessary copies of data during Arrow to Pandas conversion
### What changes were proposed in this pull request? Prevent unnecessary copies of data during conversion from Arrow to Pandas. ### Why are the changes needed? During conversion of pyarrow data to Pandas, columns are checked for timestamp types and then modified to correct for local timezone. If the data contains no timestamp types, then unnecessary copies of the data can be made. This is most prevalent when checking columns of a pandas DataFrame where each series is assigned back to the DataFrame, regardless if it had timestamps. See https://www.mail-archive.com/devarrow.apache.org/msg17008.html and ARROW-7596 for discussion. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests Closes #27358 from BryanCutler/pyspark-pandas-timestamp-copy-fix-SPARK-30640. Authored-by: Bryan Cutler <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
1 parent d0800fc commit 43d9c7e

File tree

3 files changed

+11
-20
lines changed

3 files changed

+11
-20
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def toPandas(self):
100100
# of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled.
101101
if use_arrow:
102102
try:
103-
from pyspark.sql.pandas.types import _check_dataframe_localize_timestamps
103+
from pyspark.sql.pandas.types import _check_series_localize_timestamps
104104
import pyarrow
105105
batches = self._collect_as_arrow()
106106
if len(batches) > 0:
@@ -109,7 +109,11 @@ def toPandas(self):
109109
# values, but we should use datetime.date to match the behavior with when
110110
# Arrow optimization is disabled.
111111
pdf = table.to_pandas(date_as_object=True)
112-
return _check_dataframe_localize_timestamps(pdf, timezone)
112+
for field in self.schema:
113+
if isinstance(field.dataType, TimestampType):
114+
pdf[field.name] = \
115+
_check_series_localize_timestamps(pdf[field.name], timezone)
116+
return pdf
113117
else:
114118
return pd.DataFrame.from_records([], columns=self.columns)
115119
except Exception as e:

python/pyspark/sql/pandas/serializers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name):
120120

121121
def arrow_to_pandas(self, arrow_column):
122122
from pyspark.sql.pandas.types import _check_series_localize_timestamps
123+
import pyarrow
123124

124125
# If the given column is a date type column, creates a series of datetime.date directly
125126
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
126127
# datetime64[ns] type handling.
127128
s = arrow_column.to_pandas(date_as_object=True)
128129

129-
s = _check_series_localize_timestamps(s, self._timezone)
130-
return s
130+
if pyarrow.types.is_timestamp(arrow_column.type):
131+
return _check_series_localize_timestamps(s, self._timezone)
132+
else:
133+
return s
131134

132135
def _create_batch(self, series):
133136
"""

python/pyspark/sql/pandas/types.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,6 @@ def _check_series_localize_timestamps(s, timezone):
165165
return s
166166

167167

168-
def _check_dataframe_localize_timestamps(pdf, timezone):
169-
"""
170-
Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone
171-
172-
:param pdf: pandas.DataFrame
173-
:param timezone: the timezone to convert. if None then use local timezone
174-
:return pandas.DataFrame where any timezone aware columns have been converted to tz-naive
175-
"""
176-
from pyspark.sql.pandas.utils import require_minimum_pandas_version
177-
require_minimum_pandas_version()
178-
179-
for column, series in pdf.iteritems():
180-
pdf[column] = _check_series_localize_timestamps(series, timezone)
181-
return pdf
182-
183-
184168
def _check_series_convert_timestamps_internal(s, timezone):
185169
"""
186170
Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for

0 commit comments

Comments
 (0)