Skip to content

Commit 28dcbb5

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-2898] [PySpark] fix bugs in deamon.py
1. do not use signal handler for SIGCHILD, it's easy to cause deadlock 2. handle EINTR during accept() 3. pass errno into JVM 4. handle EAGAIN during fork() Now, it can pass 50k tasks tests in 180 seconds. Author: Davies Liu <[email protected]> Closes apache#1842 from davies/qa and squashes the following commits: f0ea451 [Davies Liu] fix lint 03a2e8c [Davies Liu] cleanup dead children every seconds 32cb829 [Davies Liu] fix lint 0cd0817 [Davies Liu] fix bugs in deamon.py
1 parent 1d03a26 commit 28dcbb5

File tree

2 files changed

+48
-32
lines changed

2 files changed

+48
-32
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
6868
val socket = new Socket(daemonHost, daemonPort)
6969
val pid = new DataInputStream(socket.getInputStream).readInt()
7070
if (pid < 0) {
71-
throw new IllegalStateException("Python daemon failed to launch worker")
71+
throw new IllegalStateException("Python daemon failed to launch worker with code " + pid)
7272
}
7373
daemonWorkers.put(socket, pid)
7474
socket

python/pyspark/daemon.py

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import socket
2323
import sys
2424
import traceback
25-
from errno import EINTR, ECHILD
25+
import time
26+
from errno import EINTR, ECHILD, EAGAIN
2627
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
2728
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
2829
from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def waitSocketClose(sock):
8081
os._exit(compute_real_exit_code(exit_code))
8182

8283

84+
# Cleanup zombie children
85+
def cleanup_dead_children():
86+
try:
87+
while True:
88+
pid, _ = os.waitpid(0, os.WNOHANG)
89+
if not pid:
90+
break
91+
except:
92+
pass
93+
94+
8395
def manager():
8496
# Create a new process group to corral our children
8597
os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def handle_sigterm(*args):
102114
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
103115
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
104116

105-
# Cleanup zombie children
106-
def handle_sigchld(*args):
107-
try:
108-
pid, status = os.waitpid(0, os.WNOHANG)
109-
if status != 0:
110-
msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
111-
print >> sys.stderr, msg
112-
except EnvironmentError as err:
113-
if err.errno not in (ECHILD, EINTR):
114-
raise
115-
signal.signal(SIGCHLD, handle_sigchld)
116-
117117
# Initialization complete
118118
sys.stdout.close()
119119
try:
120120
while True:
121121
try:
122-
ready_fds = select.select([0, listen_sock], [], [])[0]
122+
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
123123
except select.error as ex:
124124
if ex[0] == EINTR:
125125
continue
126126
else:
127127
raise
128+
129+
# cleanup in signal handler will cause deadlock
130+
cleanup_dead_children()
131+
128132
if 0 in ready_fds:
129133
try:
130134
worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def handle_sigchld(*args):
137141
pass # process already died
138142

139143
if listen_sock in ready_fds:
140-
sock, addr = listen_sock.accept()
144+
try:
145+
sock, _ = listen_sock.accept()
146+
except OSError as e:
147+
if e.errno == EINTR:
148+
continue
149+
raise
150+
141151
# Launch a worker process
142152
try:
143153
pid = os.fork()
144-
if pid == 0:
145-
listen_sock.close()
146-
try:
147-
worker(sock)
148-
except:
149-
traceback.print_exc()
150-
os._exit(1)
151-
else:
152-
os._exit(0)
154+
except OSError as e:
155+
if e.errno in (EAGAIN, EINTR):
156+
time.sleep(1)
157+
pid = os.fork() # error here will shutdown daemon
153158
else:
159+
outfile = sock.makefile('w')
160+
write_int(e.errno, outfile) # Signal that the fork failed
161+
outfile.flush()
162+
outfile.close()
154163
sock.close()
155-
156-
except OSError as e:
157-
print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
158-
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
159-
write_int(-1, outfile) # Signal that the fork failed
160-
outfile.flush()
161-
outfile.close()
164+
continue
165+
166+
if pid == 0:
167+
# in child process
168+
listen_sock.close()
169+
try:
170+
worker(sock)
171+
except:
172+
traceback.print_exc()
173+
os._exit(1)
174+
else:
175+
os._exit(0)
176+
else:
162177
sock.close()
178+
163179
finally:
164180
shutdown(1)
165181

0 commit comments

Comments
 (0)