Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -1859,6 +1860,31 @@ def test_with_different_versions_of_python(self):
finally:
self.sc.pythonVer = version

def test_exception_blocking(self):
"""
SPARK-21045
Test whether program is blocked when occur exception in worker sending
exception to PythonRDD

"""
import threading

def run():
try:

def f():
raise Exception("中")

self.sc.parallelize([1]).map(lambda x: f()).count()
except Exception:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check this with assertRaises and the error message too.


t = threading.Thread(target=run)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we run this in a thread?

Copy link
Author

@dataknocker dataknocker Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon This test mainly check whether it is blocked. So I use thread join, if it is blocked before fixing the bug the program will wait 10s and exit instead blocking other tests.

t.daemon = True
t.start()
t.join(10)
self.assertFalse(t.isAlive(), 'Spark executor is blocked.')


class SparkSubmitTests(unittest.TestCase):

Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
pickleSer = PickleSerializer()
utf8_deserializer = UTF8Deserializer()

if sys.version >= '3':
unicode = str


def report_times(outfile, boot, init, finish):
write_int(SpecialLengths.TIMING_DATA, outfile)
Expand Down Expand Up @@ -177,8 +180,11 @@ def process():
process()
except Exception:
try:
exc_info = traceback.format_exc()
if isinstance(exc_info, unicode):
exc_info = exc_info.encode('utf-8')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to follow #17267 each other to fix correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should take a closer look. BTW, just note that, they are a bit different in that sense this one needs to return bytes in Python 3 / string (bytes) in Python 2 whereas #17267 needs to produce string (unicode) in Python 3 / string (bytes) in Python 2.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @ueshin what need I do next?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait for the resolution of #17267 if you don't mind. I think we should be careful of this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zero323 and @davies here too (for the approach here). This instance is a bit different.

IMHO, we have a strong assumption that the string is in UTF-8 and this PR now allows writing out the bytes as are. This is a hole which I can't come up with a clean solution to handle because this means all other encoded strings can be written up to my knowledge. Also, we have this assumption in JVM side that this is in UTF-8.

However, in Java, it mangles if it is not in UTF-8 rather than throwing an exception up to my knowledge. I guess this is still better than hanging there.

Would you have a better idea to deal with this maybe or is there anything I missed here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon this pr can only be hanging?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
write_with_length(exc_info, outfile)
except IOError:
# JVM close the socket
pass
Expand Down