Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently we convert a spark DataFrame to Pandas Dataframe by pd.DataFrame.from_records. It infers the data type from the data and doesn't respect the spark DataFrame Schema. This PR fixes it.

How was this patch tested?

a new regression test

@cloud-fan
Copy link
Contributor Author

cc @ueshin @BryanCutler

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78392 has finished for PR 18378 at commit 8a033fb.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jun 21, 2017

LGTM, pending Jenkins.

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78395 has finished for PR 18378 at commit e352817.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78398 has finished for PR 18378 at commit afa74ab.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the easiest way to assign the types, but data is still loaded and inferred then the astype will then cast the data and I'm not sure if it will make a pass over the data or do it lazily. A more ideal way would be to not use from_records but then I think the data would need to be broken up into columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converting rows to column format is also costly I think, so if users wanna performance here, they should enable the arrow optimization :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might cause problems to have all non-primitive types as object. Things like timestamps will be inferred form a datetime object, for example:

In [10]: pdf = pd.DataFrame.from_records([(1.0, 1, "a", datetime.datetime.now())])

In [11]: pdf.dtypes
Out[11]: 
0           float64
1             int64
2            object
3    datetime64[ns]
dtype: object

In [12]: pdf.astype({0: "float32", 1: "int32", 2: "object", 3: "object"}).dtypes
Out[12]: 
0    float32
1      int32
2     object
3     object
dtype: object

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems ok for primitive types, but I think makes problems for other types. I'm also not sure if there will be more performance issues with using from_records with astype, which is already too slow ;)

@ueshin
Copy link
Member

ueshin commented Jun 21, 2017

How about applying astype only for primitive types?
I guess the problem here is up-convert from Byte/Short/IntegerType to int64, FloatType to float64.

@BryanCutler
Copy link
Member

How about applying astype only for primitive types?

Yeah, that might work since astype takes a dict you probably don't need to specify all the columns. It does seem like it makes a deep copy of the data that is being casted, so still might have an impact on performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check and skip if pandas is not able to import (numpy is a Pandas dependency. So checking Pandas alone should be fine)?

try:
    import pandas
    _have_pandas = True
except:
    # No Pandas, but that's okay, we'll skip those tests
    pass
...

    @unittest.skipIf(not _have_pandas, "Pandas not installed")
    def test_to_pandas(self):
        ...

I at least see the doctest is being skipped, >>> df.toPandas() # doctest: +SKIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Jenkins worker might not have Pandas installed and it's not a hard dependency for pyspark. To be sure the test gets run, it could be added to dev/run-pip-tests similar to #15821 for now.

@cloud-fan cloud-fan force-pushed the to_pandas branch 2 times, most recently from 31eec1f to 36f9cb6 Compare June 22, 2017 05:00
@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78426 has finished for PR 18378 at commit 36f9cb6.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78427 has finished for PR 18378 at commit 36dc5e7.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

It sounds ok to me just except missing _have_pandas = False above try: .

if (pandas_type):
dtype[field.name] = pandas_type

return pd.DataFrame.from_records(self.collect(), columns=self.columns).astype(dtype)
Copy link
Member

@viirya viirya Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The param copy of astype is true by default. Seems to me we don't need copying the data? No copying the data should benefit the performance.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78429 has finished for PR 18378 at commit 1e98c49.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Hm.. actually. this failure looks legitimate. I can reproduce this in my local too.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jun 22, 2017

@HyukjinKwon can you give me a hand for this? I can't reproduce this locally... thanks!

@HyukjinKwon
Copy link
Member

My pleasure. I will give a shot.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78432 has finished for PR 18378 at commit dfaa392.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

It sounds astype with the dict added from 0.19.0 - pandas-dev/pandas@63a1e5c#diff-fb14ed747473b618d0c021fdef7ee85b. Mine was lower then that and I assume Jenkins one is the same case too.

@HyukjinKwon
Copy link
Member

(I will try to find a workaround ...)

@HyukjinKwon
Copy link
Member

I sent a PR to your branch - cloud-fan#7 @cloud-fan. I will double check as well.

HyukjinKwon and others added 2 commits June 22, 2017 15:58
Work around astype with columns in Pandas < 0.19.0
@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78443 has finished for PR 18378 at commit 357a798.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def _to_corrected_pandas_type(dt):
"""
When converting Spark SQL records to Pandas DataFrame, the inferred data type may be wrong.
This method gets the correted data type for Pandas if that type may be inferred uncorrectly.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo correted.

@viirya
Copy link
Member

viirya commented Jun 22, 2017

LGTM

@HyukjinKwon
Copy link
Member

LGTM except for the nit ^.

@cloud-fan
Copy link
Contributor Author

the last commit just fixes a typo in comment, and the python style check passed locally, I'm going to merge this PR to unblock #15821

@cloud-fan
Copy link
Contributor Author

merged, thanks for your review!

@asfgit asfgit closed this in 67c7502 Jun 22, 2017
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

for f, t in dtype.items():
pdf[f] = pdf[f].astype(t, copy=False)
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just, just in case someone blames this in the future, as a little side note, it looks copy is introduced in 0.11.0 here. So, Pandas 0.10.0 does not work with it (see here).

from pyspark.sql.types import *

schema = StructType().add("a", IntegerType()).add("b", StringType())\
                     .add("c", BooleanType()).add("d", FloatType())
data = [
    (1, "foo", True, 3.0,), (2, "foo", True, 5.0),
    (3, "bar", False, -1.0), (4, "bar", False, 6.0),
]
spark.createDataFrame(data, schema).toPandas().dtypes

Pandas 0.10.0:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/sql/dataframe.py", line 1734, in toPandas
    pdf[f] = pdf[f].astype(t, copy=False)
TypeError: astype() got an unexpected keyword argument 'copy'

However, I guess it is really fine becuase:

  • 0.10.0 was released in 2012, when Spark was 0.6.x and Java was 6 & 7.

    I guess this is really fine. It was 5 years ago.

  • In 0.10.0, it does works without copy but the types are not properly set as proposed here:

    spark.createDataFrame(data, schema).toPandas().dtypes
    a      int64  # <- this should be 'int32'
    b     object
    c       bool
    d    float64  # <- this should be 'float32'
    

I am writing this comment only because, up to my knolwedge, we didn't specify Pandas version requirement -

'sql': ['pandas']
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the investigation! maybe we should specify the version requirement for pandas

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78448 has finished for PR 18378 at commit d8ba545.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

Looks good, I'll update #15821 with this

robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 29, 2017
## What changes were proposed in this pull request?

Currently we convert a spark DataFrame to Pandas Dataframe by `pd.DataFrame.from_records`. It infers the data type from the data and doesn't respect the spark DataFrame Schema. This PR fixes it.

## How was this patch tested?

a new regression test

Author: hyukjinkwon <[email protected]>
Author: Wenchen Fan <[email protected]>
Author: Wenchen Fan <[email protected]>

Closes apache#18378 from cloud-fan/to_pandas.
return np.int32
elif type(dt) == FloatType:
return np.float32
else:
Copy link

@edlee123 edlee123 Apr 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a question: in Spark 2.2.1, if I do a .toPandas on a Spark DataFrame with column integer type, the dtypes in pandas is int64. Whereas in in Spark 2.3.0 the ints are converted to int32. I ran the below in Spark 2.2.1 and 2.3.0:

df = spark.sparkContext.parallelize([(i, ) for i in [1, 2, 3]]).toDF(["a"]).select(sf.col('a').cast('int')).toPandas()
df.dtypes

Is this intended? We ran into this as we have unit tests in a project that passed in Spark 2.2.1 that fail in Spark 2.3.0 when we looked into upgrading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it was unfortunate but it was a bug that we should fix. Does that cause an actual break or simply just unit test failure?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can so far just some of our unit tests where we are asserting some expected pandas dataframes. Think maybe float also is affected... Should I create a ticket in Jira?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current change is actually more correct. Such changes might usually have to be avoided but there are strong reasons for it and I would classify this case as a bug. I would discourage to create a JIRA unless it breaks a senario which makes a strong sense.

@edlee123
Copy link

edlee123 commented Apr 15, 2018

Ok I see, I can see part of the rationale is performance (from discussion of astype above) and consistency with arrow https://arrow.apache.org/docs/python/pandas.html

I guess without knowing much about the work with Arrow I was expecting it to be consistent with how pandas converts python types e.g in Spark 2.2

What happens with Double and DateType?

@cloud-fan
Copy link
Contributor Author

It's pretty natural to convert integer type to int32. Although Spark tries its best to avoid behavior changes, it's allowed to fix some wrong behaviors in new releases, and I believe it's well documented in the Spark 2.3 release notes.

@edlee123
Copy link

I see the rationale now, thank you everyone

@BryanCutler
Copy link
Member

@edlee123 a Spark DoubleType will produce a float64 dtype in Pandas and FloatType will be float32. DateType will be Python datetime.date objects. Also keep in mind that if you have integer data with null values, then Pandas will treat it as floats and represent the null values as NaNs. In this case, Spark will not change the dtype.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants