Skip to content

Commit 08e0d03

Browse files
HyukjinKwonFelix Cheung
authored andcommitted
[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak
## What changes were proposed in this pull request? This is a retry for #18320. This PR was reverted due to unexpected test failures with -10 error code. I was unable to reproduce in MacOS, CentOS and Ubuntu but only in Jenkins. So, the tests proceeded to verify this and revert the past try here - #18456 This new approach was tested in #18463. **Test results**: - With the part of suspicious change in the past try (466325d) Tests ran 4 times and 2 times passed and 2 time failed. - Without the part of suspicious change in the past try (466325d) Tests ran 5 times and they all passed. - With this new approach (0a7589c) Tests ran 5 times and they all passed. It looks the cause is as below (see 466325d): ```diff + exitCode <- 1 ... + data <- parallel:::readChild(child) + if (is.raw(data)) { + if (unserialize(data) == exitCode) { ... + } + } ... - parallel:::mcexit(0L) + parallel:::mcexit(0L, send = exitCode) ``` Two possibilities I think - `parallel:::mcexit(.. , send = exitCode)` https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > It sends send to the master (unless NULL) and then shuts down the child process. However, it looks possible that the parent attemps to terminate the child right after getting our custom exit code. So, the child gets terminated between "send" and "shuts down", failing to exit properly. - A bug between `parallel:::mcexit(..., send = ...)` and `parallel:::readChild`. **Proposal**: To resolve this, I simply decided to avoid both possibilities with this new approach here (9ff89a7). To support this idea, I explained with some quotation of the documentation as below: https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html > `readChild` and `readChildren` return a raw vector with a "pid" attribute if data were available, an integer vector of length one with the process ID if a child terminated or `NULL` if the child no longer exists (no children at all for `readChildren`). `readChild` returns "an integer vector of length one with the process ID if a child terminated" so we can check if it is `integer` and the same selected "process ID". I believe this makes sure that the children are exited. In case that children happen to send any data manually to parent (which is why we introduced the suspicious part of the change (466325d)), this should be raw bytes and will be discarded (and then will try to read the next and check if it is `integer` in the next loop). ## How was this patch tested? Manual tests and Jenkins tests. Author: hyukjinkwon <[email protected]> Closes #18465 from HyukjinKwon/SPARK-21093-retry-1.
1 parent c3712b7 commit 08e0d03

File tree

1 file changed

+48
-3
lines changed

1 file changed

+48
-3
lines changed

R/pkg/inst/worker/daemon.R

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,50 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
3030
inputCon <- socketConnection(
3131
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
3232

33+
# Waits indefinitely for a socket connecion by default.
34+
selectTimeout <- NULL
35+
3336
while (TRUE) {
34-
ready <- socketSelect(list(inputCon))
37+
ready <- socketSelect(list(inputCon), timeout = selectTimeout)
38+
39+
# Note that the children should be terminated in the parent. If each child terminates
40+
# itself, it appears that the resource is not released properly, that causes an unexpected
41+
# termination of this daemon due to, for example, running out of file descriptors
42+
# (see SPARK-21093). Therefore, the current implementation tries to retrieve children
43+
# that are exited (but not terminated) and then sends a kill signal to terminate them properly
44+
# in the parent.
45+
#
46+
# There are two paths that it attempts to send a signal to terminate the children in the parent.
47+
#
48+
# 1. Every second if any socket connection is not available and if there are child workers
49+
# running.
50+
# 2. Right after a socket connection is available.
51+
#
52+
# In other words, the parent attempts to send the signal to the children every second if
53+
# any worker is running or right before launching other worker children from the following
54+
# new socket connection.
55+
56+
# The process IDs of exited children are returned below.
57+
children <- parallel:::selectChildren(timeout = 0)
58+
59+
if (is.integer(children)) {
60+
lapply(children, function(child) {
61+
# This should be the PIDs of exited children. Otherwise, this returns raw bytes if any data
62+
# was sent from this child. In this case, we discard it.
63+
pid <- parallel:::readChild(child)
64+
if (is.integer(pid)) {
65+
# This checks if the data from this child is the same pid of this selected child.
66+
if (child == pid) {
67+
# If so, we terminate this child.
68+
tools::pskill(child, tools::SIGUSR1)
69+
}
70+
}
71+
})
72+
} else if (is.null(children)) {
73+
# If it is NULL, there are no children. Waits indefinitely for a socket connecion.
74+
selectTimeout <- NULL
75+
}
76+
3577
if (ready) {
3678
port <- SparkR:::readInt(inputCon)
3779
# There is a small chance that it could be interrupted by signal, retry one time
@@ -44,12 +86,15 @@ while (TRUE) {
4486
}
4587
p <- parallel:::mcfork()
4688
if (inherits(p, "masterProcess")) {
89+
# Reach here because this is a child process.
4790
close(inputCon)
4891
Sys.setenv(SPARKR_WORKER_PORT = port)
4992
try(source(script))
50-
# Set SIGUSR1 so that child can exit
51-
tools::pskill(Sys.getpid(), tools::SIGUSR1)
93+
# Note that this mcexit does not fully terminate this child.
5294
parallel:::mcexit(0L)
95+
} else {
96+
# Forking succeeded and we need to check if they finished their jobs every second.
97+
selectTimeout <- 1
5398
}
5499
}
55100
}

0 commit comments

Comments
 (0)