From 21e0c3ecde3c10af58a41b9cf67aaa61557f032a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 24 May 2018 12:03:02 -0700 Subject: [PATCH 01/15] added test for diff column order --- python/pyspark/sql/tests.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2d7a4f62d4ee8..a6575b9935d5d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5034,6 +5034,29 @@ def foo3(key, pdf): expected4 = udf3.func((), pdf) self.assertPandasEqual(expected4, result4) + def test_column_order(self): + import pandas as pd + from pyspark.sql.functions import pandas_udf, col, PandasUDFType + df = self.data + + def change_col_order(pdf): + # Constructing a DataFrame from a dict should result in the same order, + # but use from_items to ensure the pdf column order is different than schema + return pd.DataFrame.from_items([ + ('id', pdf.id), + ('u', pdf.v * 2), + ('v', pdf.v)]) + + ordered_udf = pandas_udf( + change_col_order, + 'id long, v int, u int', + PandasUDFType.GROUPED_MAP + ) + + result = df.groupby('id').apply(ordered_udf).toPandas() + expected = df.toPandas().groupby('id').apply(change_col_order).reset_index(drop=True) + self.assertPandasEqual(expected, result) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, From bbe3587acf421195e69107862dc38913be46c339 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 24 May 2018 12:18:47 -0700 Subject: [PATCH 02/15] needed to adjust expected values to compare results --- python/pyspark/sql/tests.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a6575b9935d5d..6310978af070f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5039,6 +5039,7 @@ def test_column_order(self): from pyspark.sql.functions import pandas_udf, col, PandasUDFType df = self.data + # Function returns a pdf with required column names, but order could be arbitrary using dict def change_col_order(pdf): # Constructing a DataFrame from a dict should result in the same order, # but use from_items to ensure the pdf column order is different than schema @@ -5053,8 +5054,11 @@ def change_col_order(pdf): PandasUDFType.GROUPED_MAP ) - result = df.groupby('id').apply(ordered_udf).toPandas() - expected = df.toPandas().groupby('id').apply(change_col_order).reset_index(drop=True) + # The UDF result should assign columns by name from the pdf + result = df.groupby('id').apply(ordered_udf).sort('id', 'v')\ + .select('id', 'u', 'v').toPandas() + pd_result = df.toPandas().groupby('id').apply(change_col_order) + expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) From a653e9b5606c235746eeca7ed451fdbf1a90503d Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 24 May 2018 12:21:36 -0700 Subject: [PATCH 03/15] for grouped map results, get columns based on name instead of position --- python/pyspark/worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 38fe2ef06eac5..b278dd973115a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -110,9 +110,7 @@ def wrapped(key_series, value_series): "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) - arrow_return_types = (to_arrow_type(field.dataType) for field in return_type) - return [(result[result.columns[i]], arrow_type) - for i, arrow_type in enumerate(arrow_return_types)] + return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] return wrapped From 88e2aa30a4eb527645cfa1d5ed646ecc64ced303 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 May 2018 10:56:01 -0700 Subject: [PATCH 04/15] added test for positional columns --- python/pyspark/sql/tests.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6310978af070f..4d1967a34abba 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5036,7 +5036,7 @@ def foo3(key, pdf): def test_column_order(self): import pandas as pd - from pyspark.sql.functions import pandas_udf, col, PandasUDFType + from pyspark.sql.functions import pandas_udf, PandasUDFType df = self.data # Function returns a pdf with required column names, but order could be arbitrary using dict @@ -5054,10 +5054,30 @@ def change_col_order(pdf): PandasUDFType.GROUPED_MAP ) + def positional_col_order(pdf): + # Create a DataFrame with positional columns + return pd.DataFrame(zip(pdf.id, pdf.v * 3, pdf.v)) + + positional_udf = pandas_udf( + positional_col_order, + 'id long, v int, u int', + PandasUDFType.GROUPED_MAP + ) + + grouped_df = df.groupby('id') + grouped_pdf = df.toPandas().groupby('id') + # The UDF result should assign columns by name from the pdf - result = df.groupby('id').apply(ordered_udf).sort('id', 'v')\ + result = grouped_df.apply(ordered_udf).sort('id', 'v')\ + .select('id', 'u', 'v').toPandas() + pd_result = grouped_pdf.apply(change_col_order) + expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) + self.assertPandasEqual(expected, result) + + # The UDF result uses positional columns from the pdf + result = grouped_df.apply(positional_udf).sort('id', 'v') \ .select('id', 'u', 'v').toPandas() - pd_result = df.toPandas().groupby('id').apply(change_col_order) + pd_result = grouped_pdf.apply(positional_udf) expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) From 7cc0c49dd7c06d1793c5c2936a5c4247ec18ce75 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 May 2018 12:09:19 -0700 Subject: [PATCH 05/15] adjust positional test --- python/pyspark/sql/tests.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4d1967a34abba..311d45b06aac7 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5055,12 +5055,12 @@ def change_col_order(pdf): ) def positional_col_order(pdf): - # Create a DataFrame with positional columns - return pd.DataFrame(zip(pdf.id, pdf.v * 3, pdf.v)) + # Create a DataFrame with positional columns, fix types to long + return pd.DataFrame(zip(pdf.id, pdf.v * 3, pdf.v), dtype='int64') positional_udf = pandas_udf( positional_col_order, - 'id long, v int, u int', + 'id long, u long, v long', PandasUDFType.GROUPED_MAP ) @@ -5077,7 +5077,9 @@ def positional_col_order(pdf): # The UDF result uses positional columns from the pdf result = grouped_df.apply(positional_udf).sort('id', 'v') \ .select('id', 'u', 'v').toPandas() - pd_result = grouped_pdf.apply(positional_udf) + pd_result = grouped_pdf.apply(positional_col_order) + pd_result.rename(columns={old: new for old, new in + zip(pd_result.columns, ['id', 'u', 'v'])}, inplace=True) expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) From d4b5da17452d6278435b011ef3ef5e83f360aaec Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 May 2018 12:12:14 -0700 Subject: [PATCH 06/15] add fallback to positional assignment --- python/pyspark/worker.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index b278dd973115a..3182c116b37fd 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -38,6 +38,9 @@ from pyspark.util import _get_argspec, fail_on_stopiteration from pyspark import shuffle +if sys.version >= '3': + basestring = str + pickleSer = PickleSerializer() utf8_deserializer = UTF8Deserializer() @@ -110,7 +113,16 @@ def wrapped(key_series, value_series): "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) - return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] + try: + # Assign result columns by schema name + return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] + except KeyError: + if all(not isinstance(name, basestring) for name in result.columns): + # Assign result columns by position if they are not named with strings + return [(result[result.columns[i]], to_arrow_type(field.dataType)) + for i, field in enumerate(return_type)] + else: + raise return wrapped From 63c39639599e24b116f1eb4283f6b74f2d0deea6 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 May 2018 12:29:36 -0700 Subject: [PATCH 07/15] added test for col name typo --- python/pyspark/sql/tests.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 311d45b06aac7..17772de00263d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5083,6 +5083,14 @@ def positional_col_order(pdf): expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) + @pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP) + def column_name_typo(pdf): + return pd.DataFrame({'iid': pdf.id, 'v': pdf.v}) + + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): + grouped_df.apply(column_name_typo).collect() + @unittest.skipIf( not _have_pandas or not _have_pyarrow, From 9bbf0143b7c581deb281cf7e6cb8780c90aee1a0 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 May 2018 13:28:51 -0700 Subject: [PATCH 08/15] fix test for py3 --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 17772de00263d..616903eb5f0cd 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5056,7 +5056,7 @@ def change_col_order(pdf): def positional_col_order(pdf): # Create a DataFrame with positional columns, fix types to long - return pd.DataFrame(zip(pdf.id, pdf.v * 3, pdf.v), dtype='int64') + return pd.DataFrame(list(zip(pdf.id, pdf.v * 3, pdf.v)), dtype='int64') positional_udf = pandas_udf( positional_col_order, From 74c5d8e267c37cb073b58cae01e30963481ffd0b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 May 2018 13:55:16 -0700 Subject: [PATCH 09/15] added tests for integer index, incorrect field type with range index --- python/pyspark/sql/tests.py | 58 ++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 616903eb5f0cd..65d57c6b9e526 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -5035,9 +5035,18 @@ def foo3(key, pdf): self.assertPandasEqual(expected4, result4) def test_column_order(self): + from collections import OrderedDict import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType + + # Helper function to set column names from a list + def rename_pdf(pdf, names): + pdf.rename(columns={old: new for old, new in + zip(pd_result.columns, names)}, inplace=True) + df = self.data + grouped_df = df.groupby('id') + grouped_pdf = df.toPandas().groupby('id') # Function returns a pdf with required column names, but order could be arbitrary using dict def change_col_order(pdf): @@ -5054,32 +5063,47 @@ def change_col_order(pdf): PandasUDFType.GROUPED_MAP ) - def positional_col_order(pdf): + # The UDF result should assign columns by name from the pdf + result = grouped_df.apply(ordered_udf).sort('id', 'v')\ + .select('id', 'u', 'v').toPandas() + pd_result = grouped_pdf.apply(change_col_order) + expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) + self.assertPandasEqual(expected, result) + + # Function returns a pdf with positional columns, indexed by range + def range_col_order(pdf): # Create a DataFrame with positional columns, fix types to long return pd.DataFrame(list(zip(pdf.id, pdf.v * 3, pdf.v)), dtype='int64') - positional_udf = pandas_udf( - positional_col_order, + range_udf = pandas_udf( + range_col_order, 'id long, u long, v long', PandasUDFType.GROUPED_MAP ) - grouped_df = df.groupby('id') - grouped_pdf = df.toPandas().groupby('id') - - # The UDF result should assign columns by name from the pdf - result = grouped_df.apply(ordered_udf).sort('id', 'v')\ + # The UDF result uses positional columns from the pdf + result = grouped_df.apply(range_udf).sort('id', 'v') \ .select('id', 'u', 'v').toPandas() - pd_result = grouped_pdf.apply(change_col_order) + pd_result = grouped_pdf.apply(range_col_order) + rename_pdf(pd_result, ['id', 'u', 'v']) expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) - # The UDF result uses positional columns from the pdf - result = grouped_df.apply(positional_udf).sort('id', 'v') \ + # Function returns a pdf with columns indexed with integers + def int_index(pdf): + return pd.DataFrame(OrderedDict([(0, pdf.id), (1, pdf.v * 4), (2, pdf.v)])) + + int_index_udf = pandas_udf( + int_index, + 'id long, u int, v int', + PandasUDFType.GROUPED_MAP + ) + + # The UDF result should assign columns by position of integer index + result = grouped_df.apply(int_index_udf).sort('id', 'v') \ .select('id', 'u', 'v').toPandas() - pd_result = grouped_pdf.apply(positional_col_order) - pd_result.rename(columns={old: new for old, new in - zip(pd_result.columns, ['id', 'u', 'v'])}, inplace=True) + pd_result = grouped_pdf.apply(int_index) + rename_pdf(pd_result, ['id', 'u', 'v']) expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True) self.assertPandasEqual(expected, result) @@ -5087,9 +5111,15 @@ def positional_col_order(pdf): def column_name_typo(pdf): return pd.DataFrame({'iid': pdf.id, 'v': pdf.v}) + @pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP) + def invalid_positional_types(pdf): + return pd.DataFrame([(u'a', 1.2)]) + with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): grouped_df.apply(column_name_typo).collect() + with self.assertRaisesRegexp(Exception, "No cast implemented"): + grouped_df.apply(invalid_positional_types).collect() @unittest.skipIf( From b2d0966e6f00fd42a200f7d2ddde82c76daaef15 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 May 2018 14:13:03 -0700 Subject: [PATCH 10/15] documented behavior of column labels --- docs/sql-programming-guide.md | 12 ++++-------- python/pyspark/sql/functions.py | 7 ++++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4d8a738507bd1..d2db067989434 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1752,14 +1752,10 @@ To use `groupBy().apply()`, the user needs to define the following: * A Python function that defines the computation for each group. * A `StructType` object or a string that defines the schema of the output `DataFrame`. -The output schema will be applied to the columns of the returned `pandas.DataFrame` in order by position, -not by name. This means that the columns in the `pandas.DataFrame` must be indexed so that their -position matches the corresponding field in the schema. - -Note that when creating a new `pandas.DataFrame` using a dictionary, the actual position of the column -can differ from the order that it was placed in the dictionary. It is recommended in this case to -explicitly define the column order using the `columns` keyword, e.g. -`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`, or alternatively use an `OrderedDict`. +The column labels of the returned `pandas.DataFrame` must either match the field names in the +defined output schema if specified as strings, or match the field data types by position if not +strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) +on how to label columns when constructing a `pandas.DataFrame`. Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index e6346691fb1d4..080145877f91a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2550,9 +2550,10 @@ def pandas_udf(f=None, returnType=None, functionType=None): A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame` The returnType should be a :class:`StructType` describing the schema of the returned - `pandas.DataFrame`. - The length of the returned `pandas.DataFrame` can be arbitrary and the columns must be - indexed so that their position matches the corresponding field in the schema. + `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match + the field names in the defined returnType schema if specified as strings, or match the + field data types by position if not strings, e.g. integer indices. + The length of the returned `pandas.DataFrame` can be arbitrary. Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`. From 5a7edb2bc30dc7fe93d19504e78fbf83c1f525d9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 18 Jun 2018 10:50:53 -0700 Subject: [PATCH 11/15] passing conf map to runner, tests pass --- python/pyspark/worker.py | 9 ++++++-- .../python/AggregateInPandasExec.scala | 21 +++++++++++++++---- .../python/ArrowEvalPythonExec.scala | 21 +++++++++++++++---- .../execution/python/ArrowPythonRunner.scala | 12 +++++------ .../python/FlatMapGroupsInPandasExec.scala | 20 ++++++++++++++---- .../execution/python/WindowInPandasExec.scala | 20 ++++++++++++++---- 6 files changed, 79 insertions(+), 24 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 3182c116b37fd..03bb026666b56 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -224,8 +224,13 @@ def read_udfs(pickleSer, infile, eval_type): PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF): - timezone = utf8_deserializer.loads(infile) - ser = ArrowStreamPandasSerializer(timezone) + runner_conf = {} + num_conf = read_int(infile) + for i in range(num_conf): + k = utf8_deserializer.loads(infile) + v = utf8_deserializer.loads(infile) + runner_conf[k] = v + ser = ArrowStreamPandasSerializer(runner_conf.get("spark.sql.session.timeZone", None)) else: ser = BatchedSerializer(PickleSerializer(), 100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 8e01e8e56a5bd..e32a6831d45d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -134,11 +135,23 @@ case class AggregateInPandasExec( rows } + val timeZoneConf = if (pandasRespectSessionTimeZone) { + Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) + } else { + Nil + } + val runnerConfEntries = Seq() ++ timeZoneConf + val runnerConf = Map(runnerConfEntries: _*) + val columnarBatchIter = new ArrowPythonRunner( - pyFuncs, bufferSize, reuseWorker, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema, - sessionLocalTimeZone, pandasRespectSessionTimeZone) - .compute(projectedRowIter, context.partitionId(), context) + pyFuncs, + bufferSize, + reuseWorker, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, + argOffsets, + aggInputSchema, + sessionLocalTimeZone, + runnerConf).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index c4de214679ae4..7bfe5755096c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -24,6 +24,7 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** @@ -79,11 +80,23 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // DO NOT use iter.grouped(). See BatchIterator. val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) + val timeZoneConf = if (pandasRespectSessionTimeZone) { + Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) + } else { + Nil + } + val runnerConfEntries = Seq() ++ timeZoneConf + val runnerConf = Map(runnerConfEntries: _*) + val columnarBatchIter = new ArrowPythonRunner( - funcs, bufferSize, reuseWorker, - PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema, - sessionLocalTimeZone, pandasRespectSessionTimeZone) - .compute(batchIter, context.partitionId(), context) + funcs, + bufferSize, + reuseWorker, + PythonEvalType.SQL_SCALAR_PANDAS_UDF, + argOffsets, + schema, + sessionLocalTimeZone, + runnerConf).compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 01e19bddbfb66..2646c78743e10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -45,7 +45,7 @@ class ArrowPythonRunner( argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, - respectTimeZone: Boolean) + conf: Map[String, String]) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( funcs, bufferSize, reuseWorker, evalType, argOffsets) { @@ -59,17 +59,17 @@ class ArrowPythonRunner( protected override def writeCommand(dataOut: DataOutputStream): Unit = { PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) - if (respectTimeZone) { - PythonRDD.writeUTF(timeZoneId, dataOut) - } else { - dataOut.writeInt(SpecialLengths.NULL) + dataOut.writeInt(conf.size) + for ((k, v) <- conf) { + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) } } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { - val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val root = VectorSchemaRoot.create(arrowSchema, allocator) Utils.tryWithSafeFinally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 513e174c7733e..a7729a804c713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType /** @@ -137,12 +138,23 @@ case class FlatMapGroupsInPandasExec( } val context = TaskContext.get() + val timeZoneConf = if (pandasRespectSessionTimeZone) { + Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) + } else { + Nil + } + val runnerConfEntries = Seq() ++ timeZoneConf + val runnerConf = Map(runnerConfEntries: _*) val columnarBatchIter = new ArrowPythonRunner( - chainedFunc, bufferSize, reuseWorker, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, dedupSchema, - sessionLocalTimeZone, pandasRespectSessionTimeZone) - .compute(grouped, context.partitionId(), context) + chainedFunc, + bufferSize, + reuseWorker, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + argOffsets, + dedupSchema, + sessionLocalTimeZone, + runnerConf).compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index c76832a1a3829..672cf7a7b0e97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -153,12 +154,23 @@ case class WindowInPandasExec( } } + val timeZoneConf = if (pandasRespectSessionTimeZone) { + Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) + } else { + Nil + } + val runnerConfEntries = Seq() ++ timeZoneConf + val runnerConf = Map(runnerConfEntries: _*) + val windowFunctionResult = new ArrowPythonRunner( - pyFuncs, bufferSize, reuseWorker, + pyFuncs, + bufferSize, + reuseWorker, PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF, - argOffsets, windowInputSchema, - sessionLocalTimeZone, pandasRespectSessionTimeZone) - .compute(pythonInput, context.partitionId(), context) + argOffsets, + windowInputSchema, + sessionLocalTimeZone, + runnerConf).compute(pythonInput, context.partitionId(), context) val joined = new JoinedRow val resultProj = createResultProjection(expressions) From 59972d6a9ab3d8a95f5b5eed5c30d73421dbe140 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 18 Jun 2018 15:05:41 -0700 Subject: [PATCH 12/15] added conf for assignment by position, passing general map of conf to python runner --- python/pyspark/sql/tests.py | 17 ++++- python/pyspark/worker.py | 71 +++++++++++-------- .../apache/spark/sql/internal/SQLConf.scala | 13 ++++ .../sql/execution/arrow/ArrowUtils.scala | 16 +++++ .../python/AggregateInPandasExec.scala | 12 +--- .../python/ArrowEvalPythonExec.scala | 12 +--- .../execution/python/ArrowPythonRunner.scala | 2 +- .../python/FlatMapGroupsInPandasExec.scala | 11 +-- .../execution/python/WindowInPandasExec.scala | 12 +--- 9 files changed, 96 insertions(+), 70 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 65d57c6b9e526..0dbc865a445d3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4449,7 +4449,6 @@ def test_vectorized_udf_chained(self): def test_vectorized_udf_wrong_return_type(self): from pyspark.sql.functions import pandas_udf, col - df = self.spark.range(10) with QuietTest(self.sc): with self.assertRaisesRegexp( NotImplementedError, @@ -5121,6 +5120,22 @@ def invalid_positional_types(pdf): with self.assertRaisesRegexp(Exception, "No cast implemented"): grouped_df.apply(invalid_positional_types).collect() + def test_positional_assignment_conf(self): + import pandas as pd + from pyspark.sql.functions import pandas_udf, PandasUDFType + + with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}): + + @pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP) + def foo(_): + return pd.DataFrame([('hi', 1)], columns=['x', 'y']) + + df = self.data + result = df.groupBy('id').apply(foo).select('a', 'b').collect() + for r in result: + self.assertEqual(r.a, 'hi') + self.assertEqual(r.b, 1) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 03bb026666b56..8f6b6ee69490d 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -95,7 +95,10 @@ def verify_result_length(*a): return lambda *a: (verify_result_length(*a), arrow_return_type) -def wrap_grouped_map_pandas_udf(f, return_type, argspec): +def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): + assign_cols_by_pos = runner_conf.get( + "spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) + def wrapped(key_series, value_series): import pandas as pd @@ -113,16 +116,20 @@ def wrapped(key_series, value_series): "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) - try: - # Assign result columns by schema name - return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] - except KeyError: - if all(not isinstance(name, basestring) for name in result.columns): - # Assign result columns by position if they are not named with strings - return [(result[result.columns[i]], to_arrow_type(field.dataType)) - for i, field in enumerate(return_type)] - else: - raise + + if not assign_cols_by_pos: + try: + # Assign result columns by schema name + return [(result[field.name], to_arrow_type(field.dataType)) + for field in return_type] + except KeyError: + # Raise error if columns are labeled with strings, else allow positional assignment + if any(isinstance(name, basestring) for name in result.columns): + raise + + # Assign result columns by position + return [(result[result.columns[i]], to_arrow_type(field.dataType)) + for i, field in enumerate(return_type)] return wrapped @@ -153,7 +160,7 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) -def read_single_udf(pickleSer, infile, eval_type): +def read_single_udf(pickleSer, infile, eval_type, runner_conf): num_arg = read_int(infile) arg_offsets = [read_int(infile) for i in range(num_arg)] row_func = None @@ -173,7 +180,7 @@ def read_single_udf(pickleSer, infile, eval_type): return arg_offsets, wrap_scalar_pandas_udf(func, return_type) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: argspec = _get_argspec(row_func) # signature was lost when wrapping it - return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec) + return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF: return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type) elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF: @@ -185,6 +192,26 @@ def read_single_udf(pickleSer, infile, eval_type): def read_udfs(pickleSer, infile, eval_type): + runner_conf = {} + + if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, + PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF): + + # Load conf used for pandas_udf evaluation + num_conf = read_int(infile) + for i in range(num_conf): + k = utf8_deserializer.loads(infile) + v = utf8_deserializer.loads(infile) + runner_conf[k] = v + + # NOTE: if timezone is set here, that implies respectSessionTimeZone is True + timezone = runner_conf.get("spark.sql.session.timeZone", None) + ser = ArrowStreamPandasSerializer(timezone) + else: + ser = BatchedSerializer(PickleSerializer(), 100) + num_udfs = read_int(infile) udfs = {} call_udf = [] @@ -199,7 +226,7 @@ def read_udfs(pickleSer, infile, eval_type): # See FlatMapGroupsInPandasExec for how arg_offsets are used to # distinguish between grouping attributes and data attributes - arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type) + arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) udfs['f'] = udf split_offset = arg_offsets[0] + 1 arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]] @@ -211,7 +238,7 @@ def read_udfs(pickleSer, infile, eval_type): # In the special case of a single UDF this will return a single result rather # than a tuple of results; this is the format that the JVM side expects. for i in range(num_udfs): - arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type) + arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) udfs['f%d' % i] = udf args = ["a[%d]" % o for o in arg_offsets] call_udf.append("f%d(%s)" % (i, ", ".join(args))) @@ -220,20 +247,6 @@ def read_udfs(pickleSer, infile, eval_type): mapper = eval(mapper_str, udfs) func = lambda _, it: map(mapper, it) - if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, - PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF): - runner_conf = {} - num_conf = read_int(infile) - for i in range(num_conf): - k = utf8_deserializer.loads(infile) - v = utf8_deserializer.loads(infile) - runner_conf[k] = v - ser = ArrowStreamPandasSerializer(runner_conf.get("spark.sql.session.timeZone", None)) - else: - ser = BatchedSerializer(PickleSerializer(), 100) - # profiling is not supported for UDF return func, None, ser, ser diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8d2320d8a6ed7..61b11ab12b28a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1161,6 +1161,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION = + buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition") + .internal() + .doc("When true, a grouped map Pandas UDF will assign columns from the returned " + + "Pandas DataFrame based on position, regardless of column label type. When false, " + + "columns will be looked up by name if labeled with a string and fallback to use" + + "position if not.") + .booleanConf + .createWithDefault(false) + val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter") .internal() .doc("When true, the apply function of the rule verifies whether the right node of the" + @@ -1647,6 +1657,9 @@ class SQLConf extends Serializable with Logging { def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE) + def pandasGroupedMapAssignColumnssByPosition: Boolean = + getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION) + def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER) def decimalOperationsAllowPrecisionLoss: Boolean = getConf(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala index 6ad11bda84bf6..93c8127681b3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala @@ -23,6 +23,7 @@ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ object ArrowUtils { @@ -120,4 +121,19 @@ object ArrowUtils { StructField(field.getName, dt, field.isNullable) }) } + + /** Return Map with conf settings to be used in ArrowPythonRunner */ + def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = { + val timeZoneConf = if (conf.pandasRespectSessionTimeZone) { + Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone) + } else { + Nil + } + val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) { + Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> "true") + } else { + Nil + } + Map(timeZoneConf ++ pandasColsByPosition: _*) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index e32a6831d45d9..4d703d0005c9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -82,7 +82,7 @@ case class AggregateInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) val (pyFuncs, inputs) = udfExpressions.map(collectFunctions).unzip @@ -135,14 +135,6 @@ case class AggregateInPandasExec( rows } - val timeZoneConf = if (pandasRespectSessionTimeZone) { - Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) - } else { - Nil - } - val runnerConfEntries = Seq() ++ timeZoneConf - val runnerConf = Map(runnerConfEntries: _*) - val columnarBatchIter = new ArrowPythonRunner( pyFuncs, bufferSize, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index 7bfe5755096c1..bb63145397caa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types.StructType /** @@ -64,7 +64,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone - private val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + private val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) protected override def evaluate( funcs: Seq[ChainedPythonFunctions], @@ -80,14 +80,6 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // DO NOT use iter.grouped(). See BatchIterator. val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) - val timeZoneConf = if (pandasRespectSessionTimeZone) { - Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) - } else { - Nil - } - val runnerConfEntries = Seq() ++ timeZoneConf - val runnerConf = Map(runnerConfEntries: _*) - val columnarBatchIter = new ArrowPythonRunner( funcs, bufferSize, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 2646c78743e10..0bd0fd74a9b3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -58,12 +58,12 @@ class ArrowPythonRunner( new WriterThread(env, worker, inputIterator, partitionIndex, context) { protected override def writeCommand(dataOut: DataOutputStream): Unit = { - PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) dataOut.writeInt(conf.size) for ((k, v) <- conf) { PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v, dataOut) } + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index a7729a804c713..249e2593a16db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types.StructType /** @@ -78,7 +78,7 @@ case class FlatMapGroupsInPandasExec( val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) // Deduplicate the grouping attributes. // If a grouping attribute also appears in data attributes, then we don't need to send the @@ -138,13 +138,6 @@ case class FlatMapGroupsInPandasExec( } val context = TaskContext.get() - val timeZoneConf = if (pandasRespectSessionTimeZone) { - Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) - } else { - Nil - } - val runnerConfEntries = Seq() ++ timeZoneConf - val runnerConf = Map(runnerConfEntries: _*) val columnarBatchIter = new ArrowPythonRunner( chainedFunc, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 672cf7a7b0e97..14f823b5264b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -98,7 +98,7 @@ case class WindowInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) // Extract window expressions and window functions val expressions = windowExpression.flatMap(_.collect { case e: WindowExpression => e }) @@ -154,14 +154,6 @@ case class WindowInPandasExec( } } - val timeZoneConf = if (pandasRespectSessionTimeZone) { - Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionLocalTimeZone) - } else { - Nil - } - val runnerConfEntries = Seq() ++ timeZoneConf - val runnerConf = Map(runnerConfEntries: _*) - val windowFunctionResult = new ArrowPythonRunner( pyFuncs, bufferSize, From 27b4cad4ad1ff6aea77951878132db5fff80a907 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 18 Jun 2018 15:24:36 -0700 Subject: [PATCH 13/15] fix some typos --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 61b11ab12b28a..d5fb524a1396f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1166,7 +1166,7 @@ object SQLConf { .internal() .doc("When true, a grouped map Pandas UDF will assign columns from the returned " + "Pandas DataFrame based on position, regardless of column label type. When false, " + - "columns will be looked up by name if labeled with a string and fallback to use" + + "columns will be looked up by name if labeled with a string and fallback to use " + "position if not.") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 0bd0fd74a9b3e..90e2ce486832a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -67,9 +67,9 @@ class ArrowPythonRunner( } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( s"stdout writer for $pythonExec", 0, Long.MaxValue) - val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val root = VectorSchemaRoot.create(arrowSchema, allocator) Utils.tryWithSafeFinally { From c593650aa527241da1ddd7e433c626db0fa26bb5 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 22 Jun 2018 10:20:47 -0700 Subject: [PATCH 14/15] address nits --- .../spark/sql/execution/python/AggregateInPandasExec.scala | 4 ++-- .../spark/sql/execution/python/ArrowEvalPythonExec.scala | 4 ++-- .../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 3 +++ .../sql/execution/python/FlatMapGroupsInPandasExec.scala | 4 ++-- .../spark/sql/execution/python/WindowInPandasExec.scala | 4 ++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 4d703d0005c9f..d00f6f042d6e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -82,7 +82,7 @@ case class AggregateInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) val (pyFuncs, inputs) = udfExpressions.map(collectFunctions).unzip @@ -143,7 +143,7 @@ case class AggregateInPandasExec( argOffsets, aggInputSchema, sessionLocalTimeZone, - runnerConf).compute(projectedRowIter, context.partitionId(), context) + pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context) val joinedAttributes = groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index bb63145397caa..0bc21c0986e69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -64,7 +64,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone - private val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) protected override def evaluate( funcs: Seq[ChainedPythonFunctions], @@ -88,7 +88,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi argOffsets, schema, sessionLocalTimeZone, - runnerConf).compute(batchIter, context.partitionId(), context) + pythonRunnerConf).compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 90e2ce486832a..ca665652f204d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -58,11 +58,14 @@ class ArrowPythonRunner( new WriterThread(env, worker, inputIterator, partitionIndex, context) { protected override def writeCommand(dataOut: DataOutputStream): Unit = { + + // Write config for the worker as a number of key -> value pairs of strings dataOut.writeInt(conf.size) for ((k, v) <- conf) { PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v, dataOut) } + PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 249e2593a16db..f5a563baf52df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -78,7 +78,7 @@ case class FlatMapGroupsInPandasExec( val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) // Deduplicate the grouping attributes. // If a grouping attribute also appears in data attributes, then we don't need to send the @@ -147,7 +147,7 @@ case class FlatMapGroupsInPandasExec( argOffsets, dedupSchema, sessionLocalTimeZone, - runnerConf).compute(grouped, context.partitionId(), context) + pythonRunnerConf).compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 14f823b5264b2..628029b13a6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -98,7 +98,7 @@ case class WindowInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) val sessionLocalTimeZone = conf.sessionLocalTimeZone - val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf) + val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf) // Extract window expressions and window functions val expressions = windowExpression.flatMap(_.collect { case e: WindowExpression => e }) @@ -162,7 +162,7 @@ case class WindowInPandasExec( argOffsets, windowInputSchema, sessionLocalTimeZone, - runnerConf).compute(pythonInput, context.partitionId(), context) + pythonRunnerConf).compute(pythonInput, context.partitionId(), context) val joined = new JoinedRow val resultProj = createResultProjection(expressions) From 2d2ced626f3ed9abf8100e5a83b007b8c8cfad99 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 22 Jun 2018 11:21:19 -0700 Subject: [PATCH 15/15] changed from catching KeyError to explicit check for string labels --- python/pyspark/worker.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8f6b6ee69490d..eaaae2b14e107 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -117,19 +117,12 @@ def wrapped(key_series, value_series): "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) - if not assign_cols_by_pos: - try: - # Assign result columns by schema name - return [(result[field.name], to_arrow_type(field.dataType)) - for field in return_type] - except KeyError: - # Raise error if columns are labeled with strings, else allow positional assignment - if any(isinstance(name, basestring) for name in result.columns): - raise - - # Assign result columns by position - return [(result[result.columns[i]], to_arrow_type(field.dataType)) - for i, field in enumerate(return_type)] + # Assign result columns by schema name if user labeled with strings, else use position + if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns): + return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type] + else: + return [(result[result.columns[i]], to_arrow_type(field.dataType)) + for i, field in enumerate(return_type)] return wrapped