Skip to content

Commit 6b3d022

Browse files
HyukjinKwonFelix Cheung
authored andcommitted
[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak
## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .................................................................................................................................... ``` Author: hyukjinkwon <[email protected]> Closes #18320 from HyukjinKwon/SPARK-21093.
1 parent 884347e commit 6b3d022

File tree

1 file changed

+55
-4
lines changed

1 file changed

+55
-4
lines changed

R/pkg/inst/worker/daemon.R

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
3030
inputCon <- socketConnection(
3131
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
3232

33+
# Waits indefinitely for a socket connecion by default.
34+
selectTimeout <- NULL
35+
36+
# Exit code that children send to the parent to indicate they exited.
37+
exitCode <- 1
38+
3339
while (TRUE) {
34-
ready <- socketSelect(list(inputCon))
40+
ready <- socketSelect(list(inputCon), timeout = selectTimeout)
41+
42+
# Note that the children should be terminated in the parent. If each child terminates
43+
# itself, it appears that the resource is not released properly, that causes an unexpected
44+
# termination of this daemon due to, for example, running out of file descriptors
45+
# (see SPARK-21093). Therefore, the current implementation tries to retrieve children
46+
# that are exited (but not terminated) and then sends a kill signal to terminate them properly
47+
# in the parent.
48+
#
49+
# There are two paths that it attempts to send a signal to terminate the children in the parent.
50+
#
51+
# 1. Every second if any socket connection is not available and if there are child workers
52+
# running.
53+
# 2. Right after a socket connection is available.
54+
#
55+
# In other words, the parent attempts to send the signal to the children every second if
56+
# any worker is running or right before launching other worker children from the following
57+
# new socket connection.
58+
59+
# Only the process IDs of children sent data to the parent are returned below. The children
60+
# send a custom exit code to the parent after being exited and the parent tries
61+
# to terminate them only if they sent the exit code.
62+
children <- parallel:::selectChildren(timeout = 0)
63+
64+
if (is.integer(children)) {
65+
lapply(children, function(child) {
66+
# This data should be raw bytes if any data was sent from this child.
67+
# Otherwise, this returns the PID.
68+
data <- parallel:::readChild(child)
69+
if (is.raw(data)) {
70+
# This checks if the data from this child is the exit code that indicates an exited child.
71+
if (unserialize(data) == exitCode) {
72+
# If so, we terminate this child.
73+
tools::pskill(child, tools::SIGUSR1)
74+
}
75+
}
76+
})
77+
} else if (is.null(children)) {
78+
# If it is NULL, there are no children. Waits indefinitely for a socket connecion.
79+
selectTimeout <- NULL
80+
}
81+
3582
if (ready) {
3683
port <- SparkR:::readInt(inputCon)
3784
# There is a small chance that it could be interrupted by signal, retry one time
@@ -44,12 +91,16 @@ while (TRUE) {
4491
}
4592
p <- parallel:::mcfork()
4693
if (inherits(p, "masterProcess")) {
94+
# Reach here because this is a child process.
4795
close(inputCon)
4896
Sys.setenv(SPARKR_WORKER_PORT = port)
4997
try(source(script))
50-
# Set SIGUSR1 so that child can exit
51-
tools::pskill(Sys.getpid(), tools::SIGUSR1)
52-
parallel:::mcexit(0L)
98+
# Note that this mcexit does not fully terminate this child. So, this writes back
99+
# a custom exit code so that the parent can read and terminate this child.
100+
parallel:::mcexit(0L, send = exitCode)
101+
} else {
102+
# Forking succeeded and we need to check if they finished their jobs every second.
103+
selectTimeout <- 1
53104
}
54105
}
55106
}

0 commit comments

Comments
 (0)