Skip to content

Commit 6bbe31c

Browse files
committed
updated grouped map to new args format
1 parent 7321141 commit 6bbe31c

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

python/pyspark/worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ def map_batch(batch):
372372
arg_offsets, udf = read_single_udf(
373373
pickleSer, infile, eval_type, runner_conf, udf_index=0)
374374
udfs['f'] = udf
375-
split_offset = arg_offsets[0] + 1
376-
arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
377-
arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
375+
parsed_offsets = parse_grouped_arg_offsets(arg_offsets)
376+
arg0 = ["a[%d]" % o for o in parsed_offsets[0][0]]
377+
arg1 = ["a[%d]" % o for o in parsed_offsets[0][1]]
378378
mapper_str = "lambda a: f([%s], [%s])" % (", ".join(arg0), ", ".join(arg1))
379379
elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
380380
# We assume there is only one UDF here because cogrouped map doesn't

0 commit comments

Comments
 (0)