Skip to content

Commit 2f2f893

Browse files
author
Davies Liu
committed
fix take of PythonRDD in JVM
1 parent c684e5f commit 2f2f893

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ private[spark] class PythonRDD(
7777

7878
context.addTaskCompletionListener { context =>
7979
writerThread.shutdownOnTaskCompletion()
80-
writerThread.join()
8180
if (!reuse_worker || !released) {
8281
try {
8382
worker.close()
@@ -249,13 +248,17 @@ private[spark] class PythonRDD(
249248
} catch {
250249
case e: Exception if context.isCompleted || context.isInterrupted =>
251250
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
252-
Utils.tryLog(worker.shutdownOutput())
251+
if (!worker.isClosed) {
252+
Utils.tryLog(worker.shutdownOutput())
253+
}
253254

254255
case e: Exception =>
255256
// We must avoid throwing exceptions here, because the thread uncaught exception handler
256257
// will kill the whole executor (see org.apache.spark.executor.Executor).
257258
_exception = e
258-
Utils.tryLog(worker.shutdownOutput())
259+
if (!worker.isClosed) {
260+
Utils.tryLog(worker.shutdownOutput())
261+
}
259262
} finally {
260263
// Release memory used by this thread for shuffles
261264
env.shuffleMemoryManager.releaseMemoryForThisThread()

python/pyspark/daemon.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ def worker(sock):
6161
except SystemExit as exc:
6262
exit_code = compute_real_exit_code(exc.code)
6363
finally:
64-
outfile.flush()
64+
try:
65+
outfile.flush()
66+
except Exception:
67+
pass
6568
return exit_code
6669

6770

python/pyspark/tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,11 @@ def test_multiple_python_java_RDD_conversions(self):
739739
converted_rdd = RDD(data_python_rdd, self.sc)
740740
self.assertEqual(2, converted_rdd.count())
741741

742+
# Regression test for SPARK-6294
743+
def test_take_on_jrdd(self):
744+
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
745+
rdd._jrdd.first()
746+
742747

743748
class ProfilerTests(PySparkTestCase):
744749

0 commit comments

Comments
 (0)