diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index f7680ccdc843..8548cd222bf1 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -100,7 +100,7 @@ def toPandas(self): # of PyArrow is found, if 'spark.sql.execution.arrow.pyspark.enabled' is enabled. if use_arrow: try: - from pyspark.sql.pandas.types import _check_dataframe_localize_timestamps + from pyspark.sql.pandas.types import _check_series_localize_timestamps import pyarrow batches = self._collect_as_arrow() if len(batches) > 0: @@ -109,7 +109,11 @@ def toPandas(self): # values, but we should use datetime.date to match the behavior with when # Arrow optimization is disabled. pdf = table.to_pandas(date_as_object=True) - return _check_dataframe_localize_timestamps(pdf, timezone) + for field in self.schema: + if isinstance(field.dataType, TimestampType): + pdf[field.name] = \ + _check_series_localize_timestamps(pdf[field.name], timezone) + return pdf else: return pd.DataFrame.from_records([], columns=self.columns) except Exception as e: diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 4bb5b8fb1729..6f46e92f5a2a 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -120,14 +120,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name): def arrow_to_pandas(self, arrow_column): from pyspark.sql.pandas.types import _check_series_localize_timestamps + import pyarrow # If the given column is a date type column, creates a series of datetime.date directly # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by # datetime64[ns] type handling. s = arrow_column.to_pandas(date_as_object=True) - s = _check_series_localize_timestamps(s, self._timezone) - return s + if pyarrow.types.is_timestamp(arrow_column.type): + return _check_series_localize_timestamps(s, self._timezone) + else: + return s def _create_batch(self, series): """ diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 81618bd41f58..d1edf3f9c47c 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -165,22 +165,6 @@ def _check_series_localize_timestamps(s, timezone): return s -def _check_dataframe_localize_timestamps(pdf, timezone): - """ - Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone - - :param pdf: pandas.DataFrame - :param timezone: the timezone to convert. if None then use local timezone - :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive - """ - from pyspark.sql.pandas.utils import require_minimum_pandas_version - require_minimum_pandas_version() - - for column, series in pdf.iteritems(): - pdf[column] = _check_series_localize_timestamps(series, timezone) - return pdf - - def _check_series_convert_timestamps_internal(s, timezone): """ Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for