-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19163][PYTHON][SQL] Delay _judf initialization to the __call__ #16536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #71163 has finished for PR 16536 at commit
|
|
Test build #71254 has finished for PR 16536 at commit
|
|
Test build #71351 has finished for PR 16536 at commit
|
|
Test build #71350 has finished for PR 16536 at commit
|
|
Test build #71679 has finished for PR 16536 at commit
|
|
Test build #71688 has finished for PR 16536 at commit
|
|
+1 Looks good to me. |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I think this is going to be useful for Python UDF libraries. I've got a few questions - let me know what your thoughts are :)
python/pyspark/sql/functions.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment explaining the purposes of this, just for future readers of the code.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good test but maybe a bit too focused on testing the implementation specifics?
Maybe it might more sense to also have a test which verifies creating a UDF doesn't create a SparkSession since that is the intended purposes (we don't really care about delaying the initialization of _judfy that much per-se but we do care about verifying that we don't eagerly create the SparkSession on import). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it but I have this impression, maybe incorrect, that we avoid creating new contexts to keep total execution time manageable. If you think this justifies a separate TestCase I am more than fine with that (SPARK-19224 and [PYSPARK] Python tests organization , right?).
If not, we could mock this, and put assert on the number of calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a seperate test case and would able to be pretty light weight since it doesn't need to create a SparkContext or anything which traditionally takes longer to set up. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Separate case it is. As long as implementation is correct an overhead is negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep these tests, to make sure that _judf is initialized when necessary.
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a assertIsInstance function that could simplify this.
python/pyspark/sql/functions.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there isn't any lock around this - I suspect we aren't too likely to have concurrent calls to this - but just to be safe we should maybe think through what would happen if this is does happen (and then leave a comment about it)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit? I am not sure if I understand the issue.
Assignment is atomic (so we don't have to worry about corruption), for any practical purpose operation is idempotent (we can return expressions using different Java objects but as far as I am concerned this is just a detail of implementation), access to Py4J is thread safe and as far as I remember function registries are synchronized. I there any issue i missed here?
Thanks for looking into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @holdenk's concern is that this would allow concurrent calls to _create_udf. That would create two UserDefinedPythonFunction objects, but I don't see anything on the Scala side that is concerning about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I get this part, and this is a possible scenario. Question is if this justifies preventive lock. As far as I am aware there should be no correctness issues here. SparkSession already locks during initialization so we are safe there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think it should require a lock. I think concurrent calls are very unlikely and safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I stress tested this a bit and I haven't found any abnormalities but I found a small problem with __call__ on the way. Fixed now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'd maybe just leave a comment saying that we've left out the lock since double creation is both unlikely and OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Done.
|
Test build #72021 has finished for PR 16536 at commit
|
|
Test build #72034 has finished for PR 16536 at commit
|
|
Test build #72039 has finished for PR 16536 at commit
|
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk I believe that for a full test udf would have to create SparkContext. But mock is cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can create a testcase without the Spark base and verify that creating a UDF doesn't create a SparkContext. This does not require making a SparkContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Do you mean something like SparkContext._active_spark_context is None? Then we need to make sure it is tear down if it was initialized after all, right? Isn't mocking cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class UDFInitializationTestCase(unittest.TestCase):
def tearDown(self):
if SparkSession._instantiatedSession is not None:
SparkSession._instantiatedSession.stop()
if SparkContext._active_spark_context is not None:
SparkContext._active_spark_contex.stop()
def test_udf_context_access(self):
from pyspark.sql.functions import UserDefinedFunction
f = UserDefinedFunction(lambda x: x, StringType())
self.assertIsNone(SparkContext._active_spark_context)
self.assertIsNone(SparkSession._instantiatedSession)
python/pyspark/sql/tests.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And add a separate test case checking SparkContext and SparkSession state.
|
Test build #72170 has finished for PR 16536 at commit
|
|
Test build #72172 has finished for PR 16536 at commit
|
|
Test build #72173 has finished for PR 16536 at commit
|
|
The changes look good to me, I'll take a quick pass at the formatting to make sure - but otherwise I'll try and merge this tomorrow :) |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok it looks good, just one minor comment about having getOrCreate which acquires a lock in the hot path for __call__. Thanks for adding the tests :)
python/pyspark/sql/functions.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So by switching this to getOrCreate we put a lock acquisition in the path of __call__ which is maybe not ideal. We could maybe fix this by getting _judf first (e.g. judf = self._judf)? (Although it should be a mostly uncontended lock so it shouldn't be that bad, but if we ended up having a multi-threaded PySpark DataFrame UDF application this could maybe degrade a little bit).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Sounds reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I am not sure if it really matters here. If _instantiatedContext is not None we'll do the same thing, otherwise we fall back to initialization in _judf.
|
@holdenk I have one more suggestion. Shouldn't we replace def _create_judf(self):
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()with def _create_judf(self):
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContextI left it as is but I think it could be cleaner. |
|
Test build #72218 has finished for PR 16536 at commit
|
|
Test build #72219 has finished for PR 16536 at commit
|
|
@zero323 that sounds like a good improvement. |
|
@holdenk Done :) |
|
Great, I'll wait for jenkins then :) |
|
Test build #72222 has finished for PR 16536 at commit
|
|
Going to go ahead and merge. Still need to sort out the JIRA permissions so will take a bit for me to get that updated for you. |
|
Thanks a bunch @holdenk |
## What changes were proposed in this pull request? Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization. This allows users to define and import UDF without creating a context / session as a side effect. [SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163) ## How was this patch tested? Unit tests. Author: zero323 <[email protected]> Closes apache#16536 from zero323/SPARK-19163.
What changes were proposed in this pull request?
Defer
UserDefinedFunction._judfinitialization to the first call. This prevents unintendedSparkSessioninitialization. This allows users to define and import UDF without creating a context / session as a side effect.SPARK-19163
How was this patch tested?
Unit tests.