Skip to content

Commit ac6fe67

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-5363] [PySpark] check ending mark in non-block way
There is chance of dead lock that the Python process is waiting for ending mark from JVM, but which is eaten by corrupted stream. This PR checks the ending mark from Python in non-block way, so it will not blocked by Python process. There is a small chance that the ending mark is sent by Python process but not available right now, then Python process will not be used. cc JoshRosen pwendell Author: Davies Liu <[email protected]> Closes #4601 from davies/freeze and squashes the following commits: e15a8c3 [Davies Liu] update logging 890329c [Davies Liu] Merge branch 'freeze' of github.com:davies/spark into freeze 2bd2228 [Davies Liu] add more logging 656d544 [Davies Liu] Update PythonRDD.scala 05e1085 [Davies Liu] check ending mark in non-block way
1 parent 0e180bf commit ac6fe67

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
144144
stream.readFully(update)
145145
accumulator += Collections.singletonList(update)
146146
}
147+
147148
// Check whether the worker is ready to be re-used.
148-
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
149-
if (reuse_worker) {
150-
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
151-
released = true
149+
if (reuse_worker) {
150+
// It has a high possibility that the ending mark is already available,
151+
// And current task should not be blocked by checking it
152+
153+
if (stream.available() >= 4) {
154+
val ending = stream.readInt()
155+
if (ending == SpecialLengths.END_OF_STREAM) {
156+
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
157+
released = true
158+
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
159+
} else {
160+
logInfo(s"Communication with worker did not end cleanly (ending with $ending), " +
161+
s"close it: $worker")
162+
}
163+
} else {
164+
logInfo(s"The ending mark from worker is not available, close it: $worker")
152165
}
153166
}
154167
null

python/pyspark/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def process():
121121
write_int(len(_accumulatorRegistry), outfile)
122122
for (aid, accum) in _accumulatorRegistry.items():
123123
pickleSer._write_with_length((aid, accum._value), outfile)
124+
outfile.flush()
124125

125126
# check end of stream
126127
if read_int(infile) == SpecialLengths.END_OF_STREAM:

0 commit comments

Comments
 (0)