From 8dda6c6b03de0715067c182c7d2b7ca23476ad79 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 2 Jul 2020 19:02:01 -0700 Subject: [PATCH 1/3] Improved error message of GroupedMapInPandasTests.test_grouped_over_window_with_key --- .../sql/tests/test_pandas_grouped_map.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index 76119432662b..ceddde8983d3 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -545,13 +545,13 @@ def f(pdf): def test_grouped_over_window_with_key(self): - data = [(0, 1, "2018-03-10T00:00:00+00:00", False), - (1, 2, "2018-03-11T00:00:00+00:00", False), - (2, 2, "2018-03-12T00:00:00+00:00", False), - (3, 3, "2018-03-15T00:00:00+00:00", False), - (4, 3, "2018-03-16T00:00:00+00:00", False), - (5, 3, "2018-03-17T00:00:00+00:00", False), - (6, 3, "2018-03-21T00:00:00+00:00", False)] + data = [(0, 1, "2018-03-10T00:00:00+00:00"), + (1, 2, "2018-03-11T00:00:00+00:00"), + (2, 2, "2018-03-12T00:00:00+00:00"), + (3, 3, "2018-03-15T00:00:00+00:00"), + (4, 3, "2018-03-16T00:00:00+00:00"), + (5, 3, "2018-03-17T00:00:00+00:00"), + (6, 3, "2018-03-21T00:00:00+00:00")] expected_window = [ {'start': datetime.datetime(2018, 3, 10, 0, 0), @@ -570,17 +570,21 @@ def test_grouped_over_window_with_key(self): 5: (3, expected_window[1]), 6: (3, expected_window[2])} - df = self.spark.createDataFrame(data, ['id', 'group', 'ts', 'result']) - df = df.select(col('id'), col('group'), col('ts').cast('timestamp'), col('result')) + df = self.spark.createDataFrame(data, ['id', 'group', 'ts']) + df = df.select(col('id'), col('group'), col('ts').cast('timestamp')) @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) def f(key, pdf): group = key[0] window_range = key[1] - # Result will be True if group and window range equal to expected - is_expected = pdf.id.apply(lambda id: (expected[id][0] == group and - expected[id][1] == window_range)) - return pdf.assign(result=is_expected) + + # Make sure the key with group and window values are correct + for _, i in pdf.id.iteritems(): + assert expected[i][0] == group, "{} != {}".format(expected[i][0], group) + assert expected[i][1] == window_range, \ + "{}, != {}".format(expected[i][1], window_range) + + return pdf result = df.groupby('group', window('ts', '5 days')).apply(f).select('result').collect() From 43a8cf6b6fb2e8f0fdf5445c5425fe3a8ae527f8 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 2 Jul 2020 19:10:31 -0700 Subject: [PATCH 2/3] Fix typo --- python/pyspark/sql/tests/test_pandas_grouped_map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index ceddde8983d3..d590c6d94b2e 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -582,7 +582,7 @@ def f(key, pdf): for _, i in pdf.id.iteritems(): assert expected[i][0] == group, "{} != {}".format(expected[i][0], group) assert expected[i][1] == window_range, \ - "{}, != {}".format(expected[i][1], window_range) + "{} != {}".format(expected[i][1], window_range) return pdf From 70da8b5299096942af5926b4222990e135ceffaa Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 2 Jul 2020 19:44:42 -0700 Subject: [PATCH 3/3] Needed to properly check result with group num --- .../sql/tests/test_pandas_grouped_map.py | 57 +++++++++++-------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_grouped_map.py b/python/pyspark/sql/tests/test_pandas_grouped_map.py index d590c6d94b2e..cc6167e61928 100644 --- a/python/pyspark/sql/tests/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_grouped_map.py @@ -545,13 +545,13 @@ def f(pdf): def test_grouped_over_window_with_key(self): - data = [(0, 1, "2018-03-10T00:00:00+00:00"), - (1, 2, "2018-03-11T00:00:00+00:00"), - (2, 2, "2018-03-12T00:00:00+00:00"), - (3, 3, "2018-03-15T00:00:00+00:00"), - (4, 3, "2018-03-16T00:00:00+00:00"), - (5, 3, "2018-03-17T00:00:00+00:00"), - (6, 3, "2018-03-21T00:00:00+00:00")] + data = [(0, 1, "2018-03-10T00:00:00+00:00", [0]), + (1, 2, "2018-03-11T00:00:00+00:00", [0]), + (2, 2, "2018-03-12T00:00:00+00:00", [0]), + (3, 3, "2018-03-15T00:00:00+00:00", [0]), + (4, 3, "2018-03-16T00:00:00+00:00", [0]), + (5, 3, "2018-03-17T00:00:00+00:00", [0]), + (6, 3, "2018-03-21T00:00:00+00:00", [0])] expected_window = [ {'start': datetime.datetime(2018, 3, 10, 0, 0), @@ -562,34 +562,43 @@ def test_grouped_over_window_with_key(self): 'end': datetime.datetime(2018, 3, 25, 0, 0)}, ] - expected = {0: (1, expected_window[0]), - 1: (2, expected_window[0]), - 2: (2, expected_window[0]), - 3: (3, expected_window[1]), - 4: (3, expected_window[1]), - 5: (3, expected_window[1]), - 6: (3, expected_window[2])} + expected_key = {0: (1, expected_window[0]), + 1: (2, expected_window[0]), + 2: (2, expected_window[0]), + 3: (3, expected_window[1]), + 4: (3, expected_window[1]), + 5: (3, expected_window[1]), + 6: (3, expected_window[2])} + + # id -> array of group with len of num records in window + expected = {0: [1], + 1: [2, 2], + 2: [2, 2], + 3: [3, 3, 3], + 4: [3, 3, 3], + 5: [3, 3, 3], + 6: [3]} - df = self.spark.createDataFrame(data, ['id', 'group', 'ts']) - df = df.select(col('id'), col('group'), col('ts').cast('timestamp')) + df = self.spark.createDataFrame(data, ['id', 'group', 'ts', 'result']) + df = df.select(col('id'), col('group'), col('ts').cast('timestamp'), col('result')) - @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) def f(key, pdf): group = key[0] window_range = key[1] # Make sure the key with group and window values are correct for _, i in pdf.id.iteritems(): - assert expected[i][0] == group, "{} != {}".format(expected[i][0], group) - assert expected[i][1] == window_range, \ - "{} != {}".format(expected[i][1], window_range) + assert expected_key[i][0] == group, "{} != {}".format(expected_key[i][0], group) + assert expected_key[i][1] == window_range, \ + "{} != {}".format(expected_key[i][1], window_range) - return pdf + return pdf.assign(result=[[group] * len(pdf)] * len(pdf)) - result = df.groupby('group', window('ts', '5 days')).apply(f).select('result').collect() + result = df.groupby('group', window('ts', '5 days')).applyInPandas(f, df.schema)\ + .select('id', 'result').collect() - # Check that all group and window_range values from udf matched expected - self.assertTrue(all([r[0] for r in result])) + for r in result: + self.assertListEqual(expected[r[0]], r[1]) def test_case_insensitive_grouping_column(self): # SPARK-31915: case-insensitive grouping column should work.