@@ -30,55 +30,8 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
3030inputCon <- socketConnection(
3131 port = port , open = " rb" , blocking = TRUE , timeout = connectionTimeout )
3232
33- # Waits indefinitely for a socket connecion by default.
34- selectTimeout <- NULL
35-
36- # Exit code that children send to the parent to indicate they exited.
37- exitCode <- 1
38-
3933while (TRUE ) {
40- ready <- socketSelect(list (inputCon ), timeout = selectTimeout )
41-
42- # Note that the children should be terminated in the parent. If each child terminates
43- # itself, it appears that the resource is not released properly, that causes an unexpected
44- # termination of this daemon due to, for example, running out of file descriptors
45- # (see SPARK-21093). Therefore, the current implementation tries to retrieve children
46- # that are exited (but not terminated) and then sends a kill signal to terminate them properly
47- # in the parent.
48- #
49- # There are two paths that it attempts to send a signal to terminate the children in the parent.
50- #
51- # 1. Every second if any socket connection is not available and if there are child workers
52- # running.
53- # 2. Right after a socket connection is available.
54- #
55- # In other words, the parent attempts to send the signal to the children every second if
56- # any worker is running or right before launching other worker children from the following
57- # new socket connection.
58-
59- # Only the process IDs of children sent data to the parent are returned below. The children
60- # send a custom exit code to the parent after being exited and the parent tries
61- # to terminate them only if they sent the exit code.
62- children <- parallel ::: selectChildren(timeout = 0 )
63-
64- if (is.integer(children )) {
65- lapply(children , function (child ) {
66- # This data should be raw bytes if any data was sent from this child.
67- # Otherwise, this returns the PID.
68- data <- parallel ::: readChild(child )
69- if (is.raw(data )) {
70- # This checks if the data from this child is the exit code that indicates an exited child.
71- if (unserialize(data ) == exitCode ) {
72- # If so, we terminate this child.
73- tools :: pskill(child , tools :: SIGUSR1 )
74- }
75- }
76- })
77- } else if (is.null(children )) {
78- # If it is NULL, there are no children. Waits indefinitely for a socket connecion.
79- selectTimeout <- NULL
80- }
81-
34+ ready <- socketSelect(list (inputCon ))
8235 if (ready ) {
8336 port <- SparkR ::: readInt(inputCon )
8437 # There is a small chance that it could be interrupted by signal, retry one time
@@ -91,16 +44,12 @@ while (TRUE) {
9144 }
9245 p <- parallel ::: mcfork()
9346 if (inherits(p , " masterProcess" )) {
94- # Reach here because this is a child process.
9547 close(inputCon )
9648 Sys.setenv(SPARKR_WORKER_PORT = port )
9749 try(source(script ))
98- # Note that this mcexit does not fully terminate this child. So, this writes back
99- # a custom exit code so that the parent can read and terminate this child.
100- parallel ::: mcexit(0L , send = exitCode )
101- } else {
102- # Forking succeeded and we need to check if they finished their jobs every second.
103- selectTimeout <- 1
50+ # Set SIGUSR1 so that child can exit
51+ tools :: pskill(Sys.getpid(), tools :: SIGUSR1 )
52+ parallel ::: mcexit(0L )
10453 }
10554 }
10655}
0 commit comments