From 46c6ad78bab8549ada2b3f79acebcfe7a15524c0 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 3 Jan 2018 16:57:05 -0500 Subject: [PATCH 1/4] Add test for using non deterministic udf in aggregate; Fix docstring of pandas_udf w.r.t determinism --- python/pyspark/sql/functions.py | 12 +++++++- python/pyspark/sql/tests.py | 51 +++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index a4ed562ad48b4..733e32bd825b0 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2214,7 +2214,17 @@ def pandas_udf(f=None, returnType=None, functionType=None): .. seealso:: :meth:`pyspark.sql.GroupedData.apply` - .. note:: The user-defined function must be deterministic. + .. note:: The user-defined functions are considered deterministic by default. Due to + optimization, duplicate invocations may be eliminated or the function may even be invoked + more times than it is present in the query. If your function is not deterministic, call + `asNondeterministic` on the user defined function. E.g.: + + >>> @pandas_udf('double', PandasUDFType.SCALAR) # doctest: +SKIP + ... def random(v): + ... import numpy as np + ... import pandas as pd + ... return pd.Series(np.random.randn(len(v)) + >>> random = random.asNondeterministic() # doctest: +SKIP .. note:: The user-defined functions do not support conditional expressions or short curcuiting in boolean expressions and it ends up with being executed all internally. If the functions diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6dc767f9ec46e..3f80542457883 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -413,6 +413,18 @@ def test_nondeterministic_udf2(self): pydoc.render_doc(random_udf) pydoc.render_doc(random_udf1) + def test_nondeterministic_udf_in_aggregate(self): + from pyspark.sql.functions import udf, sum + import random + udf_random_col = udf(lambda: int(100 * random.random()), 'int').asNondeterministic() + df = self.spark.range(10) + + with QuietTest(self.sc): + with self.assertRaisesRegexp(AnalysisException, "nondeterministic"): + df.groupby('id').agg(sum(udf_random_col())).collect() + with self.assertRaisesRegexp(AnalysisException, "nondeterministic"): + df.agg(sum(udf_random_col())).collect() + def test_chained_udf(self): self.spark.catalog.registerFunction("double", lambda x: x + x, IntegerType()) [row] = self.spark.sql("SELECT double(1)").collect() @@ -3567,6 +3579,18 @@ def tearDownClass(cls): time.tzset() ReusedSQLTestCase.tearDownClass() + @property + def random_udf(self): + from pyspark.sql.functions import pandas_udf + + @pandas_udf('double') + def random_udf(v): + import pandas as pd + import numpy as np + return pd.Series(np.random.random(len(v))) + random_udf = random_udf.asNondeterministic() + return random_udf + def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10).select( @@ -3950,6 +3974,33 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) + def test_nondeterministic_udf(self): + # Non-deterministic UDFs should be allowed in select and withColumn + from pyspark.sql.functions import pandas_udf, col + + random_udf = self.random_udf + df = self.spark.range(10) + + result1 = df.select(random_udf(col('id')).alias('rand')).collect() + result2 = df.withColumn('rand', random_udf(col('id'))).collect() + + for row in result1: + self.assertTrue(0.0 <= row.rand < 1.0) + for row in result2: + self.assertTrue(0.0 <= row.rand < 1.0) + + def test_nondeterministic_udf_in_aggregate(self): + from pyspark.sql.functions import pandas_udf, sum + + df = self.spark.range(10) + random_udf = self.random_udf + + with QuietTest(self.sc): + with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): + df.groupby(df.id).agg(sum(random_udf(df.id))).collect() + with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'): + df.agg(sum(random_udf(df.id))).collect() + @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class GroupbyApplyTests(ReusedSQLTestCase): From 0d8d943a0a68ec28ef883768c031d7998bcbeebc Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 5 Jan 2018 17:26:50 -0500 Subject: [PATCH 2/4] Fix test_nondeterministic_udf --- python/pyspark/sql/tests.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3f80542457883..80e62d72d372c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -386,6 +386,8 @@ def test_udf3(self): self.assertEqual(row[0], 5) def test_nondeterministic_udf(self): + # Test that the result of nondeterministic UDFs are evaluated only once in + # chained UDF evaluations from pyspark.sql.functions import udf import random udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() @@ -3975,19 +3977,21 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): self.spark.conf.set("spark.sql.session.timeZone", orig_tz) def test_nondeterministic_udf(self): - # Non-deterministic UDFs should be allowed in select and withColumn - from pyspark.sql.functions import pandas_udf, col + # Test that the result of nondeterministic UDFs are evaluated only once in + # chained UDF evaluations + from pandas.testing import assert_series_equal + from pyspark.sql.functions import udf, pandas_udf, col + @pandas_udf('double') + def plus_ten(v): + return v + 10 random_udf = self.random_udf - df = self.spark.range(10) - result1 = df.select(random_udf(col('id')).alias('rand')).collect() - result2 = df.withColumn('rand', random_udf(col('id'))).collect() + df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) + result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() - for row in result1: - self.assertTrue(0.0 <= row.rand < 1.0) - for row in result2: - self.assertTrue(0.0 <= row.rand < 1.0) + self.assertEqual(random_udf.deterministic, False) + assert_series_equal(result1['plus_ten(rand)'], result1['rand'] + 10, check_names=False) def test_nondeterministic_udf_in_aggregate(self): from pyspark.sql.functions import pandas_udf, sum From b249bac939cbc2b5f1254cabf99827d7347b435a Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 5 Jan 2018 17:28:12 -0500 Subject: [PATCH 3/4] Small comment fix --- python/pyspark/sql/tests.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 80e62d72d372c..4089fb5c92d91 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -386,8 +386,7 @@ def test_udf3(self): self.assertEqual(row[0], 5) def test_nondeterministic_udf(self): - # Test that the result of nondeterministic UDFs are evaluated only once in - # chained UDF evaluations + # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations from pyspark.sql.functions import udf import random udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() @@ -3977,8 +3976,7 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): self.spark.conf.set("spark.sql.session.timeZone", orig_tz) def test_nondeterministic_udf(self): - # Test that the result of nondeterministic UDFs are evaluated only once in - # chained UDF evaluations + # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations from pandas.testing import assert_series_equal from pyspark.sql.functions import udf, pandas_udf, col From 2de3a37ac9638796bcbec86bdc1fad8c804eccd2 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Fri, 5 Jan 2018 18:11:03 -0500 Subject: [PATCH 4/4] Remove pandas.testing --- python/pyspark/sql/tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4089fb5c92d91..689736d8e6456 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3977,7 +3977,6 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): def test_nondeterministic_udf(self): # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations - from pandas.testing import assert_series_equal from pyspark.sql.functions import udf, pandas_udf, col @pandas_udf('double') @@ -3989,7 +3988,7 @@ def plus_ten(v): result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() self.assertEqual(random_udf.deterministic, False) - assert_series_equal(result1['plus_ten(rand)'], result1['rand'] + 10, check_names=False) + self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) def test_nondeterministic_udf_in_aggregate(self): from pyspark.sql.functions import pandas_udf, sum