Skip to content

Commit bfa56c8

Browse files
committed
Revert "Revert "[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak""
This reverts commit 9f86d2f.
1 parent 9f86d2f commit bfa56c8

File tree

1 file changed

+55
-4
lines changed

1 file changed

+55
-4
lines changed

R/pkg/inst/worker/daemon.R

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
3030
inputCon <- socketConnection(
3131
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
3232

33+
# Waits indefinitely for a socket connecion by default.
34+
selectTimeout <- NULL
35+
36+
# Exit code that children send to the parent to indicate they exited.
37+
exitCode <- 1
38+
3339
while (TRUE) {
34-
ready <- socketSelect(list(inputCon))
40+
ready <- socketSelect(list(inputCon), timeout = selectTimeout)
41+
42+
# Note that the children should be terminated in the parent. If each child terminates
43+
# itself, it appears that the resource is not released properly, that causes an unexpected
44+
# termination of this daemon due to, for example, running out of file descriptors
45+
# (see SPARK-21093). Therefore, the current implementation tries to retrieve children
46+
# that are exited (but not terminated) and then sends a kill signal to terminate them properly
47+
# in the parent.
48+
#
49+
# There are two paths that it attempts to send a signal to terminate the children in the parent.
50+
#
51+
# 1. Every second if any socket connection is not available and if there are child workers
52+
# running.
53+
# 2. Right after a socket connection is available.
54+
#
55+
# In other words, the parent attempts to send the signal to the children every second if
56+
# any worker is running or right before launching other worker children from the following
57+
# new socket connection.
58+
59+
# Only the process IDs of children sent data to the parent are returned below. The children
60+
# send a custom exit code to the parent after being exited and the parent tries
61+
# to terminate them only if they sent the exit code.
62+
children <- parallel:::selectChildren(timeout = 0)
63+
64+
if (is.integer(children)) {
65+
lapply(children, function(child) {
66+
# This data should be raw bytes if any data was sent from this child.
67+
# Otherwise, this returns the PID.
68+
data <- parallel:::readChild(child)
69+
if (is.raw(data)) {
70+
# This checks if the data from this child is the exit code that indicates an exited child.
71+
if (unserialize(data) == exitCode) {
72+
# If so, we terminate this child.
73+
tools::pskill(child, tools::SIGUSR1)
74+
}
75+
}
76+
})
77+
} else if (is.null(children)) {
78+
# If it is NULL, there are no children. Waits indefinitely for a socket connecion.
79+
selectTimeout <- NULL
80+
}
81+
3582
if (ready) {
3683
port <- SparkR:::readInt(inputCon)
3784
# There is a small chance that it could be interrupted by signal, retry one time
@@ -44,12 +91,16 @@ while (TRUE) {
4491
}
4592
p <- parallel:::mcfork()
4693
if (inherits(p, "masterProcess")) {
94+
# Reach here because this is a child process.
4795
close(inputCon)
4896
Sys.setenv(SPARKR_WORKER_PORT = port)
4997
try(source(script))
50-
# Set SIGUSR1 so that child can exit
51-
tools::pskill(Sys.getpid(), tools::SIGUSR1)
52-
parallel:::mcexit(0L)
98+
# Note that this mcexit does not fully terminate this child. So, this writes back
99+
# a custom exit code so that the parent can read and terminate this child.
100+
parallel:::mcexit(0L, send = exitCode)
101+
} else {
102+
# Forking succeeded and we need to check if they finished their jobs every second.
103+
selectTimeout <- 1
53104
}
54105
}
55106
}

0 commit comments

Comments
 (0)