Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1752,14 +1752,10 @@ To use `groupBy().apply()`, the user needs to define the following:
* A Python function that defines the computation for each group.
* A `StructType` object or a string that defines the schema of the output `DataFrame`.

The output schema will be applied to the columns of the returned `pandas.DataFrame` in order by position,
not by name. This means that the columns in the `pandas.DataFrame` must be indexed so that their
position matches the corresponding field in the schema.

Note that when creating a new `pandas.DataFrame` using a dictionary, the actual position of the column
can differ from the order that it was placed in the dictionary. It is recommended in this case to
explicitly define the column order using the `columns` keyword, e.g.
`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`, or alternatively use an `OrderedDict`.
The column labels of the returned `pandas.DataFrame` must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
on how to label columns when constructing a `pandas.DataFrame`.

Note that all data for a group will be loaded into memory before the function is applied. This can
lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,9 +2550,10 @@ def pandas_udf(f=None, returnType=None, functionType=None):

A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.
The length of the returned `pandas.DataFrame` can be arbitrary and the columns must be
indexed so that their position matches the corresponding field in the schema.
`pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
the field names in the defined returnType schema if specified as strings, or match the
field data types by position if not strings, e.g. integer indices.
The length of the returned `pandas.DataFrame` can be arbitrary.

Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.

Expand Down
104 changes: 103 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4449,7 +4449,6 @@ def test_vectorized_udf_chained(self):

def test_vectorized_udf_wrong_return_type(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
Expand Down Expand Up @@ -5034,6 +5033,109 @@ def foo3(key, pdf):
expected4 = udf3.func((), pdf)
self.assertPandasEqual(expected4, result4)

def test_column_order(self):
from collections import OrderedDict
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Helper function to set column names from a list
def rename_pdf(pdf, names):
pdf.rename(columns={old: new for old, new in
zip(pd_result.columns, names)}, inplace=True)

df = self.data
grouped_df = df.groupby('id')
grouped_pdf = df.toPandas().groupby('id')

# Function returns a pdf with required column names, but order could be arbitrary using dict
def change_col_order(pdf):
# Constructing a DataFrame from a dict should result in the same order,
# but use from_items to ensure the pdf column order is different than schema
return pd.DataFrame.from_items([
('id', pdf.id),
('u', pdf.v * 2),
('v', pdf.v)])

ordered_udf = pandas_udf(
change_col_order,
'id long, v int, u int',
PandasUDFType.GROUPED_MAP
)

# The UDF result should assign columns by name from the pdf
result = grouped_df.apply(ordered_udf).sort('id', 'v')\
.select('id', 'u', 'v').toPandas()
pd_result = grouped_pdf.apply(change_col_order)
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
self.assertPandasEqual(expected, result)

# Function returns a pdf with positional columns, indexed by range
def range_col_order(pdf):
# Create a DataFrame with positional columns, fix types to long
return pd.DataFrame(list(zip(pdf.id, pdf.v * 3, pdf.v)), dtype='int64')

range_udf = pandas_udf(
range_col_order,
'id long, u long, v long',
PandasUDFType.GROUPED_MAP
)

# The UDF result uses positional columns from the pdf
result = grouped_df.apply(range_udf).sort('id', 'v') \
.select('id', 'u', 'v').toPandas()
pd_result = grouped_pdf.apply(range_col_order)
rename_pdf(pd_result, ['id', 'u', 'v'])
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
self.assertPandasEqual(expected, result)

# Function returns a pdf with columns indexed with integers
def int_index(pdf):
return pd.DataFrame(OrderedDict([(0, pdf.id), (1, pdf.v * 4), (2, pdf.v)]))

int_index_udf = pandas_udf(
int_index,
'id long, u int, v int',
PandasUDFType.GROUPED_MAP
)

# The UDF result should assign columns by position of integer index
result = grouped_df.apply(int_index_udf).sort('id', 'v') \
.select('id', 'u', 'v').toPandas()
pd_result = grouped_pdf.apply(int_index)
rename_pdf(pd_result, ['id', 'u', 'v'])
expected = pd_result.sort_values(['id', 'v']).reset_index(drop=True)
self.assertPandasEqual(expected, result)

@pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
def column_name_typo(pdf):
return pd.DataFrame({'iid': pdf.id, 'v': pdf.v})

@pandas_udf('id long, v int', PandasUDFType.GROUPED_MAP)
def invalid_positional_types(pdf):
return pd.DataFrame([(u'a', 1.2)])

with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
with self.assertRaisesRegexp(Exception, "No cast implemented"):
grouped_df.apply(invalid_positional_types).collect()

def test_positional_assignment_conf(self):
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

with self.sql_conf({"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition": True}):

@pandas_udf("a string, b float", PandasUDFType.GROUPED_MAP)
def foo(_):
return pd.DataFrame([('hi', 1)], columns=['x', 'y'])

df = self.data
result = df.groupBy('id').apply(foo).select('a', 'b').collect()
for r in result:
self.assertEqual(r.a, 'hi')
self.assertEqual(r.b, 1)


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
Expand Down
55 changes: 38 additions & 17 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
from pyspark.util import _get_argspec, fail_on_stopiteration
from pyspark import shuffle

if sys.version >= '3':
basestring = str

pickleSer = PickleSerializer()
utf8_deserializer = UTF8Deserializer()

Expand Down Expand Up @@ -92,7 +95,10 @@ def verify_result_length(*a):
return lambda *a: (verify_result_length(*a), arrow_return_type)


def wrap_grouped_map_pandas_udf(f, return_type, argspec):
def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf):
assign_cols_by_pos = runner_conf.get(
"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False)

def wrapped(key_series, value_series):
import pandas as pd

Expand All @@ -110,9 +116,13 @@ def wrapped(key_series, value_series):
"Number of columns of the returned pandas.DataFrame "
"doesn't match specified schema. "
"Expected: {} Actual: {}".format(len(return_type), len(result.columns)))
arrow_return_types = (to_arrow_type(field.dataType) for field in return_type)
return [(result[result.columns[i]], arrow_type)
for i, arrow_type in enumerate(arrow_return_types)]

# Assign result columns by schema name if user labeled with strings, else use position
if not assign_cols_by_pos and any(isinstance(name, basestring) for name in result.columns):
return [(result[field.name], to_arrow_type(field.dataType)) for field in return_type]
else:
return [(result[result.columns[i]], to_arrow_type(field.dataType))
for i, field in enumerate(return_type)]

return wrapped

Expand Down Expand Up @@ -143,7 +153,7 @@ def wrapped(*series):
return lambda *a: (wrapped(*a), arrow_return_type)


def read_single_udf(pickleSer, infile, eval_type):
def read_single_udf(pickleSer, infile, eval_type, runner_conf):
num_arg = read_int(infile)
arg_offsets = [read_int(infile) for i in range(num_arg)]
row_func = None
Expand All @@ -163,7 +173,7 @@ def read_single_udf(pickleSer, infile, eval_type):
return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
argspec = _get_argspec(row_func) # signature was lost when wrapping it
return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec)
return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf)
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
Expand All @@ -175,6 +185,26 @@ def read_single_udf(pickleSer, infile, eval_type):


def read_udfs(pickleSer, infile, eval_type):
runner_conf = {}

if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):

# Load conf used for pandas_udf evaluation
num_conf = read_int(infile)
for i in range(num_conf):
k = utf8_deserializer.loads(infile)
v = utf8_deserializer.loads(infile)
runner_conf[k] = v

# NOTE: if timezone is set here, that implies respectSessionTimeZone is True
timezone = runner_conf.get("spark.sql.session.timeZone", None)
ser = ArrowStreamPandasSerializer(timezone)
else:
ser = BatchedSerializer(PickleSerializer(), 100)

num_udfs = read_int(infile)
udfs = {}
call_udf = []
Expand All @@ -189,7 +219,7 @@ def read_udfs(pickleSer, infile, eval_type):

# See FlatMapGroupsInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
udfs['f'] = udf
split_offset = arg_offsets[0] + 1
arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
Expand All @@ -201,7 +231,7 @@ def read_udfs(pickleSer, infile, eval_type):
# In the special case of a single UDF this will return a single result rather
# than a tuple of results; this is the format that the JVM side expects.
for i in range(num_udfs):
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
udfs['f%d' % i] = udf
args = ["a[%d]" % o for o in arg_offsets]
call_udf.append("f%d(%s)" % (i, ", ".join(args)))
Expand All @@ -210,15 +240,6 @@ def read_udfs(pickleSer, infile, eval_type):
mapper = eval(mapper_str, udfs)
func = lambda _, it: map(mapper, it)

if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
timezone = utf8_deserializer.loads(infile)
ser = ArrowStreamPandasSerializer(timezone)
else:
ser = BatchedSerializer(PickleSerializer(), 100)

# profiling is not supported for UDF
return func, None, ser, ser

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
.internal()
.doc("When true, a grouped map Pandas UDF will assign columns from the returned " +
"Pandas DataFrame based on position, regardless of column label type. When false, " +
"columns will be looked up by name if labeled with a string and fallback to use " +
"position if not.")
.booleanConf
.createWithDefault(false)

val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
.internal()
.doc("When true, the apply function of the rule verifies whether the right node of the" +
Expand Down Expand Up @@ -1647,6 +1657,9 @@ class SQLConf extends Serializable with Logging {

def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE)

def pandasGroupedMapAssignColumnssByPosition: Boolean =
getConf(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION)

def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)

def decimalOperationsAllowPrecisionLoss: Boolean = getConf(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

object ArrowUtils {
Expand Down Expand Up @@ -120,4 +121,19 @@ object ArrowUtils {
StructField(field.getName, dt, field.isNullable)
})
}

/** Return Map with conf settings to be used in ArrowPythonRunner */
def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
Copy link
Contributor

@icexelloss icexelloss Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this function out of ArrowUtils? Doesn't seem to be Arrow specific.

Edit: Actually, nvm

val timeZoneConf = if (conf.pandasRespectSessionTimeZone) {
Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)
} else {
Nil
}
val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do:

val pandasColByPosition = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> conf.pandasGroupedMapAssignColumnssByPosition)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to just omit the config for the default case, that way it's easier to process in the worker.

Copy link
Contributor

@icexelloss icexelloss Jun 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry can you explain why it's easier to process in the worker?

I think we just need to remove the default value here?
https://github.com/apache/spark/pull/21427/files#diff-d33eea00c68dfd120f4ceae6381f34cdR100

Also one thing is not great about omitting the conf for default case is that you need to put the default value in two places..(both python and java)

Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> "true")
} else {
Nil
}
Map(timeZoneConf ++ pandasColsByPosition: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -81,7 +82,7 @@ case class AggregateInPandasExec(
val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)

val (pyFuncs, inputs) = udfExpressions.map(collectFunctions).unzip

Expand Down Expand Up @@ -135,10 +136,14 @@ case class AggregateInPandasExec(
}

val columnarBatchIter = new ArrowPythonRunner(
pyFuncs, bufferSize, reuseWorker,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(projectedRowIter, context.partitionId(), context)
pyFuncs,
bufferSize,
reuseWorker,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
argOffsets,
aggInputSchema,
sessionLocalTimeZone,
pythonRunnerConf).compute(projectedRowIter, context.partitionId(), context)

val joinedAttributes =
groupingExpressions.map(_.toAttribute) ++ udfExpressions.map(_.resultAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.arrow.ArrowUtils
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -63,7 +64,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

private val batchSize = conf.arrowMaxRecordsPerBatch
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)

protected override def evaluate(
funcs: Seq[ChainedPythonFunctions],
Expand All @@ -80,10 +81,14 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter)

val columnarBatchIter = new ArrowPythonRunner(
funcs, bufferSize, reuseWorker,
PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(batchIter, context.partitionId(), context)
funcs,
bufferSize,
reuseWorker,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
argOffsets,
schema,
sessionLocalTimeZone,
pythonRunnerConf).compute(batchIter, context.partitionId(), context)

new Iterator[InternalRow] {

Expand Down
Loading