Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,18 @@ def toPandas(self):
1 5 Bob
"""
import pandas as pd
return pd.DataFrame.from_records(self.collect(), columns=self.columns)

dtype = {}
for field in self.schema:
pandas_type = _to_corrected_pandas_type(field.dataType)
if pandas_type is not None:
dtype[field.name] = pandas_type

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

for f, t in dtype.items():
pdf[f] = pdf[f].astype(t, copy=False)
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just, just in case someone blames this in the future, as a little side note, it looks copy is introduced in 0.11.0 here. So, Pandas 0.10.0 does not work with it (see here).

from pyspark.sql.types import *

schema = StructType().add("a", IntegerType()).add("b", StringType())\
                     .add("c", BooleanType()).add("d", FloatType())
data = [
    (1, "foo", True, 3.0,), (2, "foo", True, 5.0),
    (3, "bar", False, -1.0), (4, "bar", False, 6.0),
]
spark.createDataFrame(data, schema).toPandas().dtypes

Pandas 0.10.0:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/dataframe.py", line 1734, in toPandas
    pdf[f] = pdf[f].astype(t, copy=False)
TypeError: astype() got an unexpected keyword argument 'copy'

However, I guess it is really fine becuase:

  • 0.10.0 was released in 2012, when Spark was 0.6.x and Java was 6 & 7.

    I guess this is really fine. It was 5 years ago.

  • In 0.10.0, it does works without copy but the types are not properly set as proposed here:

    spark.createDataFrame(data, schema).toPandas().dtypes
    a      int64  # <- this should be 'int32'
    b     object
    c       bool
    d    float64  # <- this should be 'float32'
    

I am writing this comment only because, up to my knolwedge, we didn't specify Pandas version requirement -

'sql': ['pandas']
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the investigation! maybe we should specify the version requirement for pandas

return pdf

##########################################################################################
# Pandas compatibility
Expand Down Expand Up @@ -1750,6 +1761,24 @@ def _to_scala_map(sc, jm):
return sc._jvm.PythonUtils.toScalaMap(jm)


def _to_corrected_pandas_type(dt):
"""
When converting Spark SQL records to Pandas DataFrame, the inferred data type may be wrong.
This method gets the corrected data type for Pandas if that type may be inferred uncorrectly.
"""
import numpy as np
if type(dt) == ByteType:
return np.int8
elif type(dt) == ShortType:
return np.int16
elif type(dt) == IntegerType:
return np.int32
elif type(dt) == FloatType:
return np.float32
else:
Copy link

@edlee123 edlee123 Apr 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a question: in Spark 2.2.1, if I do a .toPandas on a Spark DataFrame with column integer type, the dtypes in pandas is int64. Whereas in in Spark 2.3.0 the ints are converted to int32. I ran the below in Spark 2.2.1 and 2.3.0:

df = spark.sparkContext.parallelize([(i, ) for i in [1, 2, 3]]).toDF(["a"]).select(sf.col('a').cast('int')).toPandas()
df.dtypes

Is this intended? We ran into this as we have unit tests in a project that passed in Spark 2.2.1 that fail in Spark 2.3.0 when we looked into upgrading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it was unfortunate but it was a bug that we should fix. Does that cause an actual break or simply just unit test failure?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can so far just some of our unit tests where we are asserting some expected pandas dataframes. Think maybe float also is affected... Should I create a ticket in Jira?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current change is actually more correct. Such changes might usually have to be avoided but there are strong reasons for it and I would classify this case as a bug. I would discourage to create a JIRA unless it breaks a senario which makes a strong sense.

return None


class DataFrameNaFunctions(object):
"""Functionality for working with missing data in :class:`DataFrame`.

Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
else:
import unittest

_have_pandas = False
try:
import pandas
_have_pandas = True
except:
# No Pandas, but that's okay, we'll skip those tests
pass

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
from pyspark.sql.types import *
Expand Down Expand Up @@ -2274,6 +2282,22 @@ def count_bucketed_cols(names, table="pyspark_bucket"):
.mode("overwrite").saveAsTable("pyspark_bucket"))
self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))

@unittest.skipIf(not _have_pandas, "Pandas not installed")
def test_to_pandas(self):
import numpy as np
schema = StructType().add("a", IntegerType()).add("b", StringType())\
.add("c", BooleanType()).add("d", FloatType())
data = [
(1, "foo", True, 3.0), (2, "foo", True, 5.0),
(3, "bar", False, -1.0), (4, "bar", False, 6.0),
]
df = self.spark.createDataFrame(data, schema)
types = df.toPandas().dtypes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check and skip if pandas is not able to import (numpy is a Pandas dependency. So checking Pandas alone should be fine)?

try:
    import pandas
    _have_pandas = True
except:
    # No Pandas, but that's okay, we'll skip those tests
    pass
...

    @unittest.skipIf(not _have_pandas, "Pandas not installed")
    def test_to_pandas(self):
        ...

I at least see the doctest is being skipped, >>> df.toPandas() # doctest: +SKIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Jenkins worker might not have Pandas installed and it's not a hard dependency for pyspark. To be sure the test gets run, it could be added to dev/run-pip-tests similar to #15821 for now.

self.assertEquals(types[0], np.int32)
self.assertEquals(types[1], np.object)
self.assertEquals(types[2], np.bool)
self.assertEquals(types[3], np.float32)


class HiveSparkSubmitTests(SparkSubmitTests):

Expand Down