-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas #19459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas #19459
Conversation
|
Test build #82559 has finished for PR 19459 at commit
|
|
Benchmarks for running in local mode 16 GB memory, i7-4800MQ CPU @ 2.70GHz × 8 cores Code: import pandas as pd
import numpy as np
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
pdf = pd.DataFrame(np.random.rand(100000, 10), columns=list("abcdefghij"))
%timeit spark.createDataFrame(pdf)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
%timeit spark.createDataFrame(pdf)Without Arrow: With Arrow: Speedup of ~ 235x Also, tested creating up to 2 million rows with Arrow and results scale linearly |
|
Test build #82560 has finished for PR 19459 at commit
|
python/pyspark/sql/tests.py
Outdated
| pdf = self.createPandasDataFrameFromeData() | ||
| self.spark.conf.set("spark.sql.execution.arrow.enable", "false") | ||
| df_no_arrow = self.spark.createDataFrame(pdf) | ||
| self.spark.conf.set("spark.sql.execution.arrow.enable", "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd set this to true in finally just in case the test failed in df_no_arrow = self.spark.createDataFrame(pdf) and spark.sql.execution.arrow.enable remains false affecting other test cases in the future if I didn't miss something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. I guess this would make the failure easier to see?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I thought the tearDownClass was there but it's actually in #18664. Maybe I should put it in here since that needs some more discussion.
| } | ||
| } | ||
|
|
||
| def toDataFrame( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to make this public to be callable with py4j. Alternatively, something could be added to o.a.s.sql.api.python.PythonSQLUtils?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think we should put it there, o.a.s.sql.api.python.PythonSQLUtils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left the conversion logic in ArrowConverters because I think there is a good chance it will change, so just added a wrapper to PythonSQLUtils let me know if it's ok.
|
Test build #82573 has finished for PR 19459 at commit
|
python/pyspark/sql/session.py
Outdated
| if schema is None: | ||
| schema = [str(x) for x in data.columns] | ||
| data = [r.tolist() for r in data.to_records(index=False)] | ||
| if self.conf.get("spark.sql.execution.arrow.enable", "false").lower() == "true" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh thanks, I didn't see that go in. I'll update.
|
Test build #82601 has finished for PR 19459 at commit
|
python/pyspark/sql/tests.py
Outdated
| ("\n\nWithout:\n%s\n%s" % (df_without, df_without.dtypes))) | ||
| self.assertTrue(df_without.equals(df_with_arrow), msg=msg) | ||
|
|
||
| def createPandasDataFrameFromeData(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo createPandasDataFrameFromeData -> createPandasDataFrameFromData
python/pyspark/sql/session.py
Outdated
| os.unlink(tempFile.name) | ||
|
|
||
| # Create the Spark DataFrame, there will be at least 1 batch | ||
| schema = from_arrow_schema(batches[0].schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a user specify the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can pass the schema, if provided, into to_pandas for pyarrow to use when creating the RecordBatch.
python/pyspark/sql/session.py
Outdated
| data = [r.tolist() for r in data.to_records(index=False)] | ||
| if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ | ||
| and len(data) > 0: | ||
| from pyspark.serializers import ArrowSerializer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should split this block to a method like _createFromPandasDataFrame as the same as the other create methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably a good idea since it's a big block of code. The other create methods return a (rdd, schema) pair, then do further processing to create a DataFrame. Here we would have to just return a DataFrame since we don't want to do the further processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, without Arrow it creates a (rdd, schema) pair like the others, so having with Arrow and without in _createFromPandasDataFrame doesn't fit because they return different things. How about just putting the new arrow code in a method like _createFromArrowPandas?
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good to me except for those comments.
python/pyspark/sql/session.py
Outdated
|
|
||
| # Slice the DataFrame into batches | ||
| split = -(-len(data) // self.sparkContext.defaultParallelism) # round int up | ||
| slices = (data[i:i + split] for i in xrange(0, len(data), split)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about split -> size (or length) and i -> offset (or start)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, how about size, start and step? That looks like the terminology used in parallelize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, sounds fine.
python/pyspark/sql/session.py
Outdated
|
|
||
| # write batches to temp file, read by JVM (borrowed from context.parallelize) | ||
| import os | ||
| from tempfile import NamedTemporaryFile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put those imports above with import pyarrow as pa or top of this file ...
python/pyspark/sql/session.py
Outdated
| jdf = self._jvm.PythonSQLUtils.arrowPayloadToDataFrame( | ||
| jrdd, schema.json(), self._wrapped._jsqlContext) | ||
| df = DataFrame(jdf, self._wrapped) | ||
| df._schema = schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave some comments here about why _schema should be manually set rather than using it's own schema. Actually, mind if I ask why should we set this manually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the schema is not set here, then it will lazily create it through a py4j exchange with the java DataFrame. Since we already have it here, we can just set it and save some time. I don't like manually setting it like this though, it should be an optional arg in the DataFrame constructor. I'll make that change, but if you prefer not to do that I can revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, okay, that's fine to me.
|
Thanks for the reviews @ueshin and @HyukjinKwon! I added |
python/pyspark/sql/session.py
Outdated
|
|
||
| def _createFromPandasWithArrow(self, df, schema): | ||
| """ | ||
| Create a DataFrame from a given pandas.DataFrame by slicing the into partitions, converting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: slicing the ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! fixed
python/pyspark/sql/session.py
Outdated
| for df_slice in df_slices] | ||
|
|
||
| # write batches to temp file, read by JVM (borrowed from context.parallelize) | ||
| tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks kind of duplicate with the main logic of context.parallelize. Maybe we can extract a common function from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is - I didn't want to mess around with the parallelize() logic so I left it alone. If we were to make a common function it would look like this
def _dump_to_tempfile(data, serializer, parallelism):
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
try:
serializer.dump_stream(c, tempFile)
tempFile.close()
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
return readRDDFromFile(self._jsc, tempFile.name, parallelism)
finally:
# readRDDFromFile eagerily reads the file so we can delete right after.
os.unlink(tempFile.name)and some changes to parallelize to call it
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
jrdd = _dump_to_tempfile(c, serializer, numSlices)Let me know if you all think we should change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer less duplicate. Let's see if others support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
python/pyspark/sql/session.py
Outdated
| data = [schema.toInternal(row) for row in data] | ||
| return self._sc.parallelize(data), schema | ||
|
|
||
| def _createFromPandasWithArrow(self, df, schema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: df -> pdf.
|
Test build #82764 has finished for PR 19459 at commit
|
|
LGTM with few minor comments. |
python/pyspark/sql/dataframe.py
Outdated
| self._sc = sql_ctx and sql_ctx._sc | ||
| self.is_cached = False | ||
| self._schema = None # initialized lazily | ||
| self._schema = schema # initialized lazily if None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler, what do you think about taking this out back? Maybe, I am too much worried but I think we maybe should avoid it to be assigned actually except for few special cases ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can undo it. I don't really like manually assigning the schema after the constructor but it's just done in these 2 special cases..
| def arrowPayloadToDataFrame( | ||
| payloadRDD: JavaRDD[Array[Byte]], | ||
| schemaString: String, | ||
| sqlContext: SQLContext): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't believe I found this looks 5 spaces instead of 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh man, good catch! I don't know how that happened :\
|
I think it is a bug, we should fix it first. BTW I'm fine to upgrade arrow, just make sure we get everything we need at the arrow version we wanna upgrade, then remove all the hacks at Spark side. We should throw exception if users have an old arrow version installed. |
|
Test build #83233 has finished for PR 19459 at commit
|
|
I made SPARK-22417 for fixing reading from timestamps without arrow |
|
Test build #83569 has started for PR 19459 at commit |
|
Jenkins, retest this please. |
|
Test build #83579 has finished for PR 19459 at commit
|
python/pyspark/serializers.py
Outdated
|
|
||
|
|
||
| def _create_batch(series): | ||
| def _create_batch(series, copy=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need copy here?
I might miss something but looks like all occurrence of copy=copy in this method are always copied by s.fillna(0) in advance so we don't need to use copy=True.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot that fillna returns a copy. Do you think it would be worth it to first check for any nulls and only fillna if needed? The mask of nulls is already created so just need to add a function like this in _create_batch and call before series.astype():
def fill_series(s, mask):
return s.fillna(0) if mask.any() else s
What do you think @ueshin ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess it depends.
With the method, it can reduce the number of copy if s doesn't include null values, but also it might increase the number if s includes null values and copy=True.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we don't want to end up double copying if copy=True. Let me try something and if it ends up making things too complicated then we can remove the copy flag altogether and just rely on fillna(0) to always make a copy - not ideal but will be more simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin this ended up having no effect, so I took it out. For the case of Timestamps, the timezone conversions will make a copy regardless. For the case of ints being promoted to floats then that means they will have null values and need to call fillna(0) which makes a copy anyway. So it seems this only makes copies when necessary.
python/pyspark/serializers.py
Outdated
| Create an Arrow record batch from the given pandas.Series or list of Series, with optional type. | ||
| :param series: A single pandas.Series, list of Series, or list of (series, arrow_type) | ||
| :param copy: Option to make a copy of the series before performing any type casts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this description.
|
Test build #83635 has finished for PR 19459 at commit
|
|
Test build #83647 has finished for PR 19459 at commit
|
|
@ueshin @HyukjinKwon does this look ready to merge? cc @cloud-fan |
|
Looks pretty solid. Will take a another look today (KST) and merge this one in few days if there are no more comments and/or other committers are busy to take a look and merge. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
python/pyspark/serializers.py
Outdated
| # NOTE: this is not necessary with Arrow >= 0.7 | ||
| def cast_series(s, t): | ||
| if type(t) == pa.TimestampType: | ||
| if t is not None and type(t) == pa.TimestampType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, mind asking why t is not None is added? I thought None is NoneType and won't be pa.TimestampType anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be needed anymore. It came from an error when comparing pyarrow type instances to None.
>>> import pyarrow as pa
>>> type(None) == pa.TimestampType
False
>>> None == pa.date32()
Segmentation fault
So this check is still needed right below when we check for date32(). I can't remember if this was fixed in current versions of pyarrow, but I'll add a note here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verified this is not an issue with pyarrow >= 0.7.1
python/pyspark/sql/session.py
Outdated
| for pdf_slice in pdf_slices] | ||
|
|
||
| # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing) | ||
| if schema is None or isinstance(schema, list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isinstance(schema, list) -> isinstance(schema, (list, tuple)) maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, a test like spark.createDataFrame([[1]], ("v",)) would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this check should include tuples as well for converting from unicode?
https://github.com/apache/spark/pull/19459/files#diff-3b5463566251d5b09fd328738a9e9bc5L579
I'll change that since it's related..
python/pyspark/sql/session.py
Outdated
| # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing) | ||
| if schema is None or isinstance(schema, list): | ||
| schema_from_arrow = from_arrow_schema(batches[0].schema) | ||
| names = pdf.columns if schema is None else schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we maybe just resemble
spark/python/pyspark/sql/session.py
Lines 403 to 411 in 1d34104
| if schema is None or isinstance(schema, (list, tuple)): | |
| struct = self._inferSchemaFromList(data) | |
| converter = _create_converter(struct) | |
| data = map(converter, data) | |
| if isinstance(schema, (list, tuple)): | |
| for i, name in enumerate(schema): | |
| struct.fields[i].name = name | |
| struct.names[i] = name | |
| schema = struct |
just to be more readable in a way?
if schema is None or isinstance(schema, (list, tuple)):
struct = from_arrow_schema(batches[0].schema)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
schema = structThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. We still need the line
schema = [str(x) for x in pdf.columns] if schema is None else schema
for the case when schema is None, because the pdf column names are lost when creating the Arrow RecordBatch from using pandas.Series with _create_batch.
Otherwise, I think this makes it easier to follow too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python/pyspark/sql/tests.py
Outdated
|
|
||
| def test_createDataFrame_with_incorrect_schema(self): | ||
| pdf = self.create_pandas_data_frame() | ||
| wrong_schema = StructType([field for field in reversed(self.schema)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal at all: StructType([field for field in reversed(self.schema)]) -> StructType(list(reversed(st)))
| def test_createDataFrame_with_names(self): | ||
| pdf = self.create_pandas_data_frame() | ||
| df = self.spark.createDataFrame(pdf, schema=list('abcdefg')) | ||
| self.assertEquals(df.schema.fieldNames(), list('abcdefg')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said above, let's add a test with a tuple too.
|
Test build #83703 has finished for PR 19459 at commit
|
|
retest this please |
|
Test build #83761 has finished for PR 19459 at commit
|
|
|
||
| # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing) | ||
| if isinstance(schema, (list, tuple)): | ||
| struct = from_arrow_schema(batches[0].schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler, I think here we'd meet the same issue, SPARK-15244 in this code path. Mind opening a followup with a simple test if it is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do
|
Merged to master. |
|
Thanks @HyukjinKwon @ueshin and @viirya ! |
What changes were proposed in this pull request?
This change uses Arrow to optimize the creation of a Spark DataFrame from a Pandas DataFrame. The input df is sliced according to the default parallelism. The optimization is enabled with the existing conf "spark.sql.execution.arrow.enabled" and is disabled by default.
How was this patch tested?
Added new unit test to create DataFrame with and without the optimization enabled, then compare results.