Skip to content

Commit 9063835

Browse files
zero323holdenk
authored andcommitted
[SPARK-19163][PYTHON][SQL] Delay _judf initialization to the __call__
## What changes were proposed in this pull request? Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization. This allows users to define and import UDF without creating a context / session as a side effect. [SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163) ## How was this patch tested? Unit tests. Author: zero323 <[email protected]> Closes #16536 from zero323/SPARK-19163.
1 parent 081b7ad commit 9063835

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

python/pyspark/sql/functions.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1826,25 +1826,38 @@ class UserDefinedFunction(object):
18261826
def __init__(self, func, returnType, name=None):
18271827
self.func = func
18281828
self.returnType = returnType
1829-
self._judf = self._create_judf(name)
1830-
1831-
def _create_judf(self, name):
1829+
# Stores UserDefinedPythonFunctions jobj, once initialized
1830+
self._judf_placeholder = None
1831+
self._name = name or (
1832+
func.__name__ if hasattr(func, '__name__')
1833+
else func.__class__.__name__)
1834+
1835+
@property
1836+
def _judf(self):
1837+
# It is possible that concurrent access, to newly created UDF,
1838+
# will initialize multiple UserDefinedPythonFunctions.
1839+
# This is unlikely, doesn't affect correctness,
1840+
# and should have a minimal performance impact.
1841+
if self._judf_placeholder is None:
1842+
self._judf_placeholder = self._create_judf()
1843+
return self._judf_placeholder
1844+
1845+
def _create_judf(self):
18321846
from pyspark.sql import SparkSession
1833-
sc = SparkContext.getOrCreate()
1834-
wrapped_func = _wrap_function(sc, self.func, self.returnType)
1847+
18351848
spark = SparkSession.builder.getOrCreate()
1849+
sc = spark.sparkContext
1850+
1851+
wrapped_func = _wrap_function(sc, self.func, self.returnType)
18361852
jdt = spark._jsparkSession.parseDataType(self.returnType.json())
1837-
if name is None:
1838-
f = self.func
1839-
name = f.__name__ if hasattr(f, '__name__') else f.__class__.__name__
18401853
judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
1841-
name, wrapped_func, jdt)
1854+
self._name, wrapped_func, jdt)
18421855
return judf
18431856

18441857
def __call__(self, *cols):
1858+
judf = self._judf
18451859
sc = SparkContext._active_spark_context
1846-
jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
1847-
return Column(jc)
1860+
return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
18481861

18491862

18501863
@since(1.3)

python/pyspark/sql/tests.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,27 @@ def filename(path):
468468
row2 = df2.select(sameText(df2['file'])).first()
469469
self.assertTrue(row2[0].find("people.json") != -1)
470470

471+
def test_udf_defers_judf_initalization(self):
472+
# This is separate of UDFInitializationTests
473+
# to avoid context initialization
474+
# when udf is called
475+
476+
from pyspark.sql.functions import UserDefinedFunction
477+
478+
f = UserDefinedFunction(lambda x: x, StringType())
479+
480+
self.assertIsNone(
481+
f._judf_placeholder,
482+
"judf should not be initialized before the first call."
483+
)
484+
485+
self.assertIsInstance(f("foo"), Column, "UDF call should return a Column.")
486+
487+
self.assertIsNotNone(
488+
f._judf_placeholder,
489+
"judf should be initialized after UDF has been called."
490+
)
491+
471492
def test_basic_functions(self):
472493
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
473494
df = self.spark.read.json(rdd)
@@ -1947,6 +1968,29 @@ def test_sparksession_with_stopped_sparkcontext(self):
19471968
df.collect()
19481969

19491970

1971+
class UDFInitializationTests(unittest.TestCase):
1972+
def tearDown(self):
1973+
if SparkSession._instantiatedSession is not None:
1974+
SparkSession._instantiatedSession.stop()
1975+
1976+
if SparkContext._active_spark_context is not None:
1977+
SparkContext._active_spark_contex.stop()
1978+
1979+
def test_udf_init_shouldnt_initalize_context(self):
1980+
from pyspark.sql.functions import UserDefinedFunction
1981+
1982+
UserDefinedFunction(lambda x: x, StringType())
1983+
1984+
self.assertIsNone(
1985+
SparkContext._active_spark_context,
1986+
"SparkContext shouldn't be initialized when UserDefinedFunction is created."
1987+
)
1988+
self.assertIsNone(
1989+
SparkSession._instantiatedSession,
1990+
"SparkSession shouldn't be initialized when UserDefinedFunction is created."
1991+
)
1992+
1993+
19501994
class HiveContextSQLTests(ReusedPySparkTestCase):
19511995

19521996
@classmethod

0 commit comments

Comments
 (0)